From: Daira Hopwood Date: Mon, 15 Apr 2013 19:31:16 +0000 (+0100) Subject: Changes to node classes (Node, Client and StorageServer). X-Git-Url: https://git.rkrishnan.org/%5B/frontends/flags/quickstart.html?a=commitdiff_plain;h=cdbc1bcf36d9f95ea63891b02fa5886e34886b40;p=tahoe-lafs%2Ftahoe-lafs.git Changes to node classes (Node, Client and StorageServer). Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index ff75f507..c2242c2f 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -8,7 +8,12 @@ from twisted.application.internet import TimerService from pycryptopp.publickey import rsa import allmydata +from allmydata.node import InvalidValueError from allmydata.storage.server import StorageServer +from allmydata.storage.backends.null.null_backend import configure_null_backend +from allmydata.storage.backends.disk.disk_backend import configure_disk_backend +from allmydata.storage.backends.cloud.cloud_backend import configure_cloud_backend +from allmydata.storage.backends.cloud.mock_cloud import configure_mock_cloud_backend from allmydata.storage.expiration import ExpirationPolicy from allmydata import storage_client from allmydata.immutable.upload import Uploader @@ -16,8 +21,7 @@ from allmydata.immutable.offloaded import Helper from allmydata.control import ControlServer from allmydata.introducer.client import IntroducerClient from allmydata.util import hashutil, base32, pollmixin, log, keyutil, idlib -from allmydata.util.encodingutil import get_filesystem_encoding -from allmydata.util.abbreviate import parse_abbreviated_size +from allmydata.util.encodingutil import get_filesystem_encoding, quote_output from allmydata.util.time_format import parse_duration, parse_date from allmydata.stats import StatsProvider from allmydata.history import History @@ -166,7 +170,8 @@ class Client(node.Node, pollmixin.PollMixin): self.init_web(webport) # strports string def _sequencer(self): - seqnum_s = self.get_config_from_file("announcement-seqnum") + seqnum_path = os.path.join(self.basedir, "announcement-seqnum") + seqnum_s = self.get_optional_config_from_file(seqnum_path) if not seqnum_s: seqnum_s = "0" seqnum = int(seqnum_s.strip()) @@ -217,6 +222,7 @@ class Client(node.Node, pollmixin.PollMixin): def _make_key(): sk_vs,vk_vs = keyutil.make_keypair() return sk_vs+"\n" + sk_vs = self.get_or_create_private_config("node.privkey", _make_key) sk,vk_vs = keyutil.parse_privkey(sk_vs.strip()) self.write_config("node.pubkey", vk_vs+"\n") @@ -231,11 +237,10 @@ class Client(node.Node, pollmixin.PollMixin): return idlib.nodeid_b2a(self.nodeid) def _init_permutation_seed(self, ss): - seed = self.get_config_from_file("permutation-seed") + seed = self.get_optional_private_config("permutation-seed") if not seed: - have_shares = ss.have_shares() - if have_shares: - # if the server has shares but not a recorded + if ss.backend.must_use_tubid_as_permutation_seed(): + # If a server using a disk backend has shares but not a recorded # permutation-seed, then it has been around since pre-#466 # days, and the clients who uploaded those shares used our # TubID as a permutation-seed. We should keep using that same @@ -253,22 +258,31 @@ class Client(node.Node, pollmixin.PollMixin): def init_storage(self): self.accountant = None - # should we run a storage server (and publish it for others to use)? + # Should we run a storage server (and publish it for others to use)? if not self.get_config("storage", "enabled", True, boolean=True): return - readonly = self.get_config("storage", "readonly", False, boolean=True) storedir = os.path.join(self.basedir, self.STOREDIR) - data = self.get_config("storage", "reserved_space", None) - try: - reserved = parse_abbreviated_size(data) - except ValueError: - log.msg("[storage]reserved_space= contains unparseable value %s" - % data) - raise - if reserved is None: - reserved = 0 + # What sort of backend? + backendtype = self.get_config("storage", "backend", "disk") + if backendtype == "s3": + backendtype = "cloud.s3" + backendprefix = backendtype.partition('.')[0] + + backend_configurators = { + 'disk': configure_disk_backend, + 'cloud': configure_cloud_backend, + 'mock_cloud': configure_mock_cloud_backend, + 'debug_discard': configure_null_backend, + } + + if backendprefix not in backend_configurators: + raise InvalidValueError("%s is not supported; it must start with one of %s" + % (quote_output("[storage]backend = " + backendtype), backend_configurators.keys()) ) + + backend = backend_configurators[backendprefix](storedir, self) + if self.get_config("storage", "debug_discard", False, boolean=True): raise OldConfigOptionError("[storage]debug_discard = True is no longer supported.") @@ -295,15 +309,13 @@ class Client(node.Node, pollmixin.PollMixin): expiration_policy = ExpirationPolicy(enabled=expire, mode=mode, override_lease_duration=o_l_d, cutoff_date=cutoff_date) - ss = StorageServer(storedir, self.nodeid, - reserved_space=reserved, - readonly_storage=readonly, + statedir = storedir + ss = StorageServer(self.nodeid, backend, statedir, stats_provider=self.stats_provider) - self.storage_server = ss - self.add_service(ss) - self.accountant = ss.get_accountant() self.accountant.set_expiration_policy(expiration_policy) + self.storage_server = ss + self.add_service(ss) d = self.when_tub_ready() # we can't do registerReference until the Tub is ready diff --git a/src/allmydata/node.py b/src/allmydata/node.py index 8873e5c7..4dccf965 100644 --- a/src/allmydata/node.py +++ b/src/allmydata/node.py @@ -1,3 +1,4 @@ + import datetime, os.path, re, types, ConfigParser, tempfile from base64 import b32decode, b32encode @@ -12,6 +13,8 @@ from allmydata.util import fileutil, iputil, observer from allmydata.util.assertutil import precondition, _assert from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.util.encodingutil import get_filesystem_encoding, quote_output +from allmydata.util.abbreviate import parse_abbreviated_size + # Add our application versions to the data that Foolscap's LogPublisher # reports. @@ -37,9 +40,12 @@ such as private keys. On Unix-like systems, the permissions on this directory are set to disallow users other than its owner from reading the contents of the files. See the 'configuration.rst' documentation file for details.""" -class _None: # used as a marker in get_config() +class _None: # used as a marker in get_config() and get_or_create_private_config() pass +class InvalidValueError(Exception): + """ The configured value was not valid. """ + class MissingConfigEntry(Exception): """ A required config entry was not found. """ @@ -70,7 +76,7 @@ class Node(service.MultiService): self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE) self._tub_ready_observerlist = observer.OneShotObserverList() fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700) - open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README) + fileutil.write(os.path.join(self.basedir, "private", "README"), PRIV_README, mode="") # creates self.config self.read_config() @@ -113,6 +119,16 @@ class Node(service.MultiService): % (quote_output(fn), section, option)) return default + def get_config_size(self, section, option, default=_None): + data = self.get_config(section, option, default) + if data is None: + return None + try: + return parse_abbreviated_size(data) + except ValueError: + raise InvalidValueError("[%s]%s= contains unparseable size value %s" + % (section, option, quote_output(data)) ) + def set_config(self, section, option, value): if not self.config.has_section(section): self.config.add_section(section) @@ -209,43 +225,42 @@ class Node(service.MultiService): # TODO: merge this with allmydata.get_package_versions return dict(app_versions.versions) - def get_config_from_file(self, name, required=False): - """Get the (string) contents of a config file, or None if the file - did not exist. If required=True, raise an exception rather than - returning None. Any leading or trailing whitespace will be stripped - from the data.""" - fn = os.path.join(self.basedir, name) + def get_optional_config_from_file(self, path): + """Read the (string) contents of a file. Any leading or trailing + whitespace will be stripped from the data. If the file does not exist, + return None.""" try: - return fileutil.read(fn).strip() + value = fileutil.read(path) except EnvironmentError: - if not required: - return None - raise + if os.path.exists(path): + raise + return None + return value.strip() + + def _get_private_config_path(self, name): + return os.path.join(self.basedir, "private", name) def write_private_config(self, name, value): """Write the (string) contents of a private config file (which is a config file that resides within the subdirectory named 'private'), and return it. """ - privname = os.path.join(self.basedir, "private", name) - open(privname, "w").write(value) + fileutil.write(self._get_private_config_path(name), value, mode="") + + def get_optional_private_config(self, name): + """Try to get the (string) contents of a private config file (which + is a config file that resides within the subdirectory named + 'private'), and return it. Any leading or trailing whitespace will be + stripped from the data. If the file does not exist, return None. + """ + return self.get_optional_config_from_file(self._get_private_config_path(name)) - def get_private_config(self, name, default=_None): + def get_private_config(self, name): """Read the (string) contents of a private config file (which is a config file that resides within the subdirectory named 'private'), - and return it. Return a default, or raise an error if one was not - given. + and return it. Raise an error if the file was not found. """ - privname = os.path.join(self.basedir, "private", name) - try: - return fileutil.read(privname) - except EnvironmentError: - if os.path.exists(privname): - raise - if default is _None: - raise MissingConfigEntry("The required configuration file %s is missing." - % (quote_output(privname),)) - return default + return self.get_or_create_private_config(name) def get_or_create_private_config(self, name, default=_None): """Try to get the (string) contents of a private config file (which @@ -259,21 +274,18 @@ class Node(service.MultiService): If 'default' is a string, use it as a default value. If not, treat it as a zero-argument callable that is expected to return a string. """ - privname = os.path.join(self.basedir, "private", name) - try: - value = fileutil.read(privname) - except EnvironmentError: - if os.path.exists(privname): - raise + value = self.get_optional_private_config(name) + if value is None: + privpath = self._get_private_config_path(name) if default is _None: raise MissingConfigEntry("The required configuration file %s is missing." - % (quote_output(privname),)) - if isinstance(default, basestring): - value = default + % (quote_output(privpath),)) + elif isinstance(default, basestring): + value = default.strip() else: - value = default() - fileutil.write(privname, value) - return value.strip() + value = default().strip() + fileutil.write(privpath, value, mode="") + return value def write_config(self, name, value, mode="w"): """Write a string to a config file.""" diff --git a/src/allmydata/storage/backends/disk/disk_backend.py b/src/allmydata/storage/backends/disk/disk_backend.py index 2d24f694..d507d152 100644 --- a/src/allmydata/storage/backends/disk/disk_backend.py +++ b/src/allmydata/storage/backends/disk/disk_backend.py @@ -58,7 +58,7 @@ class DiskBackend(Backend): Backend.__init__(self) self._storedir = storedir self._readonly = readonly - self._reserved_space = int(reserved_space) + self._reserved_space = reserved_space self._sharedir = os.path.join(self._storedir, 'shares') fileutil.make_dirs(self._sharedir) self._incomingdir = os.path.join(self._sharedir, 'incoming') diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 89c8fe9d..f74d67bf 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -1,39 +1,21 @@ -import os, re, weakref, struct, time +import os, weakref from twisted.application import service +from twisted.internet import defer, reactor from zope.interface import implements -from allmydata.interfaces import IStatsProducer +from allmydata.interfaces import IStatsProducer, IStorageBackend +from allmydata.util.assertutil import precondition from allmydata.util import fileutil, idlib, log, time_format import allmydata # for __full_version__ from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported -from allmydata.storage.backends.disk.mutable import MutableShareFile, EmptyShare, \ - create_mutable_sharefile from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE -from allmydata.storage.backends.disk.immutable import ShareFile -from allmydata.storage.bucket import BucketWriter, BucketReader from allmydata.storage.crawler import BucketCountingCrawler from allmydata.storage.accountant import Accountant from allmydata.storage.expiration import ExpirationPolicy -from allmydata.storage.leasedb import SHARETYPE_MUTABLE - - -# storage/ -# storage/shares/incoming -# incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will -# be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success -# storage/shares/$START/$STORAGEINDEX -# storage/shares/$START/$STORAGEINDEX/$SHARENUM - -# Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2 -# base-32 chars). - -# $SHARENUM matches this regex: -NUM_RE=re.compile("^[0-9]+$") - class StorageServer(service.MultiService): @@ -42,36 +24,33 @@ class StorageServer(service.MultiService): BucketCounterClass = BucketCountingCrawler DEFAULT_EXPIRATION_POLICY = ExpirationPolicy(enabled=False) - def __init__(self, storedir, nodeid, reserved_space=0, - readonly_storage=False, + def __init__(self, serverid, backend, statedir, stats_provider=None, - expiration_policy=None): + expiration_policy=None, + clock=None): service.MultiService.__init__(self) - assert isinstance(nodeid, str) - assert len(nodeid) == 20 - self.my_nodeid = nodeid - self.storedir = storedir - sharedir = os.path.join(storedir, "shares") - fileutil.make_dirs(sharedir) - self.sharedir = sharedir - # we don't actually create the corruption-advisory dir until necessary - self.corruption_advisory_dir = os.path.join(storedir, - "corruption-advisories") - self.reserved_space = int(reserved_space) - self.readonly_storage = readonly_storage + precondition(IStorageBackend.providedBy(backend), backend) + precondition(isinstance(serverid, str), serverid) + precondition(len(serverid) == 20, serverid) + + self._serverid = serverid + self.clock = clock or reactor self.stats_provider = stats_provider if self.stats_provider: self.stats_provider.register_producer(self) - self.incomingdir = os.path.join(sharedir, 'incoming') - self._clean_incomplete() - fileutil.make_dirs(self.incomingdir) + + self.backend = backend + self.backend.setServiceParent(self) + self._active_writers = weakref.WeakKeyDictionary() - log.msg("StorageServer created", facility="tahoe.storage") + self._statedir = statedir + fileutil.make_dirs(self._statedir) + + # we don't actually create the corruption-advisory dir until necessary + self._corruption_advisory_dir = os.path.join(self._statedir, + "corruption-advisories") - if reserved_space: - if self.get_available_space() is None: - log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored", - umin="0wZ27w", level=log.UNUSUAL) + log.msg("StorageServer created", facility="tahoe.storage") self.latencies = {"allocate": [], # immutable "write": [], @@ -84,13 +63,14 @@ class StorageServer(service.MultiService): "renew": [], "cancel": [], } - self.add_bucket_counter() + + self.init_bucket_counter() self.init_accountant(expiration_policy or self.DEFAULT_EXPIRATION_POLICY) def init_accountant(self, expiration_policy): - dbfile = os.path.join(self.storedir, "leasedb.sqlite") - statefile = os.path.join(self.storedir, "leasedb_crawler.state") - self.accountant = Accountant(self, dbfile, statefile) + dbfile = os.path.join(self._statedir, "leasedb.sqlite") + statefile = os.path.join(self._statedir, "accounting_crawler.state") + self.accountant = Accountant(self, dbfile, statefile, clock=self.clock) self.accountant.set_expiration_policy(expiration_policy) self.accountant.setServiceParent(self) @@ -106,20 +86,16 @@ class StorageServer(service.MultiService): def get_bucket_counter(self): return self.bucket_counter - def get_nodeid(self): - return self.my_nodeid + def get_serverid(self): + return self._serverid def __repr__(self): - return "" % (idlib.shortnodeid_b2a(self.my_nodeid),) + return "" % (idlib.shortnodeid_b2a(self.get_serverid()),) - def have_shares(self): - # quick test to decide if we need to commit to an implicit - # permutation-seed or if we should use a new one - return bool(set(os.listdir(self.sharedir)) - set(["incoming"])) - - def add_bucket_counter(self): - statefile = os.path.join(self.storedir, "bucket_counter.state") - self.bucket_counter = BucketCountingCrawler(self, statefile) + def init_bucket_counter(self): + statefile = os.path.join(self._statedir, "bucket_counter.state") + self.bucket_counter = self.BucketCounterClass(self.backend, statefile, + clock=self.clock) self.bucket_counter.setServiceParent(self) def count(self, name, delta=1): @@ -132,11 +108,15 @@ class StorageServer(service.MultiService): if len(a) > 1000: self.latencies[category] = a[-1000:] + def _add_latency(self, res, category, start): + self.add_latency(category, self.clock.seconds() - start) + return res + def get_latencies(self): """Return a dict, indexed by category, that contains a dict of latency numbers for each category. If there are sufficient samples for unambiguous interpretation, each dict will contain the - following keys: mean, 01_0_percentile, 10_0_percentile, + following keys: samplesize, mean, 01_0_percentile, 10_0_percentile, 50_0_percentile (median), 90_0_percentile, 95_0_percentile, 99_0_percentile, 99_9_percentile. If there are insufficient samples for a given percentile to be interpreted unambiguously @@ -177,52 +157,25 @@ class StorageServer(service.MultiService): kwargs["facility"] = "tahoe.storage" return log.msg(*args, **kwargs) - def _clean_incomplete(self): - fileutil.rm_dir(self.incomingdir) - def get_stats(self): # remember: RIStatsProvider requires that our return dict - # contains numeric values. + # contains numeric, or None values. stats = { 'storage_server.allocated': self.allocated_size(), } - stats['storage_server.reserved_space'] = self.reserved_space for category,ld in self.get_latencies().items(): for name,v in ld.items(): stats['storage_server.latencies.%s.%s' % (category, name)] = v - try: - disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space) - writeable = disk['avail'] > 0 - - # spacetime predictors should use disk_avail / (d(disk_used)/dt) - stats['storage_server.disk_total'] = disk['total'] - stats['storage_server.disk_used'] = disk['used'] - stats['storage_server.disk_free_for_root'] = disk['free_for_root'] - stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot'] - stats['storage_server.disk_avail'] = disk['avail'] - except AttributeError: - writeable = True - except EnvironmentError: - log.msg("OS call to get disk statistics failed", level=log.UNUSUAL) - writeable = False - - if self.readonly_storage: - stats['storage_server.disk_avail'] = 0 - writeable = False - - stats['storage_server.accepting_immutable_shares'] = int(writeable) - s = self.bucket_counter.get_state() - bucket_count = s.get("last-complete-bucket-count") - if bucket_count: - stats['storage_server.total_bucket_count'] = bucket_count + self.backend.fill_in_space_stats(stats) + + if self.bucket_counter: + s = self.bucket_counter.get_state() + bucket_count = s.get("last-complete-bucket-count") + if bucket_count: + stats['storage_server.total_bucket_count'] = bucket_count return stats def get_available_space(self): - """Returns available space for share storage in bytes, or None if no - API to get this information is available.""" - - if self.readonly_storage: - return 0 - return fileutil.get_available_space(self.sharedir, self.reserved_space) + return self.backend.get_available_space() def allocated_size(self): space = 0 @@ -233,7 +186,7 @@ class StorageServer(service.MultiService): # these methods can be invoked by our callers def client_get_version(self, account): - remaining_space = self.get_available_space() + remaining_space = self.backend.get_available_space() if remaining_space is None: # We're on a platform that has no API to get disk stats. remaining_space = 2**64 @@ -245,267 +198,154 @@ class StorageServer(service.MultiService): "delete-mutable-shares-with-zero-length-writev": True, "fills-holes-with-zero-bytes": True, "prevents-read-past-end-of-share-data": True, - "accounting-v1": {}, + "ignores-lease-renewal-and-cancel-secrets": True, + "has-immutable-readv": True, }, "application-version": str(allmydata.__full_version__), } return version def client_allocate_buckets(self, storage_index, - sharenums, allocated_size, + sharenums, allocated_data_length, canary, account): - start = time.time() + start = self.clock.seconds() self.count("allocate") - alreadygot = set() bucketwriters = {} # k: shnum, v: BucketWriter - si_dir = storage_index_to_dir(storage_index) si_s = si_b2a(storage_index) log.msg("storage: allocate_buckets %s" % si_s) - # Note that the lease should not be added until the BucketWriter has - # been closed. This is handled in BucketWriter.close() - - max_space_per_bucket = allocated_size - remaining_space = self.get_available_space() limited = remaining_space is not None if limited: - # this is a bit conservative, since some of this allocated_size() - # has already been written to disk, where it will show up in + # This is a bit conservative, since some of this allocated_size() + # has already been written to the backend, where it will show up in # get_available_space. remaining_space -= self.allocated_size() - # self.readonly_storage causes remaining_space <= 0 - - # fill alreadygot with all shares that we have, not just the ones - # they asked about: this will save them a lot of work. Add or update - # leases for all of them: if they want us to hold shares for this - # file, they'll want us to hold leases for this file. - for (shnum, fn) in self._get_bucket_shares(storage_index): - alreadygot.add(shnum) - - for shnum in sharenums: - incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum) - finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum) - if os.path.exists(finalhome): - # great! we already have it. easy. - pass - elif os.path.exists(incominghome): - # Note that we don't create BucketWriters for shnums that - # have a partial share (in incoming/), so if a second upload - # occurs while the first is still in progress, the second - # uploader will use different storage servers. - pass - elif (not limited) or (remaining_space >= max_space_per_bucket): - # ok! we need to create the new share file. - bw = BucketWriter(self, account, storage_index, shnum, - incominghome, finalhome, - max_space_per_bucket, canary) - bucketwriters[shnum] = bw - self._active_writers[bw] = 1 - if limited: - remaining_space -= max_space_per_bucket - else: - # bummer! not enough space to accept this bucket - pass + # If the backend is read-only, remaining_space will be <= 0. - if bucketwriters: - fileutil.make_dirs(os.path.join(self.sharedir, si_dir)) + # Fill alreadygot with all shares that we have, not just the ones + # they asked about: this will save them a lot of work. Leases will + # be added or updated for all of them. + alreadygot = set() + shareset = self.backend.get_shareset(storage_index) + d = shareset.get_shares() + def _got_shares( (shares, corrupted) ): + remaining = remaining_space + for share in shares: + # XXX do we need to explicitly add a lease here? + alreadygot.add(share.get_shnum()) + + d2 = defer.succeed(None) + + # We don't create BucketWriters for shnums where we have a share + # that is corrupted. Is that right, or should we allow the corrupted + # share to be clobbered? Note that currently the disk share classes + # have assertions that prevent them from clobbering existing files. + for shnum in set(sharenums) - alreadygot - corrupted: + if shareset.has_incoming(shnum): + # Note that we don't create BucketWriters for shnums that + # have an incoming share, so if a second upload occurs while + # the first is still in progress, the second uploader will + # use different storage servers. + pass + elif (not limited) or remaining >= allocated_data_length: + if limited: + remaining -= allocated_data_length + + d2.addCallback(lambda ign, shnum=shnum: + shareset.make_bucket_writer(account, shnum, allocated_data_length, + canary)) + def _record_writer(bw, shnum=shnum): + bucketwriters[shnum] = bw + self._active_writers[bw] = 1 + d2.addCallback(_record_writer) + else: + # not enough space to accept this share + pass - self.add_latency("allocate", time.time() - start) - return alreadygot, bucketwriters - - def _iter_share_files(self, storage_index): - for shnum, filename in self._get_bucket_shares(storage_index): - f = open(filename, 'rb') - header = f.read(32) - f.close() - if header[:32] == MutableShareFile.MAGIC: - sf = MutableShareFile(filename, self) - # note: if the share has been migrated, the renew_lease() - # call will throw an exception, with information to help the - # client update the lease. - elif header[:4] == struct.pack(">L", 1): - sf = ShareFile(filename) - else: - continue # non-sharefile - yield sf + d2.addCallback(lambda ign: (alreadygot, bucketwriters)) + return d2 + d.addCallback(_got_shares) + d.addBoth(self._add_latency, "allocate", start) + return d def bucket_writer_closed(self, bw, consumed_size): if self.stats_provider: self.stats_provider.count('storage_server.bytes_added', consumed_size) del self._active_writers[bw] - def _get_bucket_shares(self, storage_index): - """Return a list of (shnum, pathname) tuples for files that hold - shares for this storage_index. In each tuple, 'shnum' will always be - the integer form of the last component of 'pathname'.""" - storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index)) - try: - for f in os.listdir(storagedir): - if NUM_RE.match(f): - filename = os.path.join(storagedir, f) - yield (int(f), filename) - except OSError: - # Commonly caused by there being no buckets at all. - pass - - def client_get_buckets(self, storage_index): - start = time.time() + def client_get_buckets(self, storage_index, account): + start = self.clock.seconds() self.count("get") si_s = si_b2a(storage_index) log.msg("storage: get_buckets %s" % si_s) bucketreaders = {} # k: sharenum, v: BucketReader - for shnum, filename in self._get_bucket_shares(storage_index): - bucketreaders[shnum] = BucketReader(self, filename, - storage_index, shnum) - self.add_latency("get", time.time() - start) - return bucketreaders + + shareset = self.backend.get_shareset(storage_index) + d = shareset.get_shares() + def _make_readers( (shares, corrupted) ): + # We don't create BucketReaders for corrupted shares. + for share in shares: + assert not isinstance(share, defer.Deferred), share + bucketreaders[share.get_shnum()] = shareset.make_bucket_reader(account, share) + return bucketreaders + d.addCallback(_make_readers) + d.addBoth(self._add_latency, "get", start) + return d def client_slot_testv_and_readv_and_writev(self, storage_index, write_enabler, test_and_write_vectors, read_vector, account): - start = time.time() + start = self.clock.seconds() self.count("writev") si_s = si_b2a(storage_index) - log.msg("storage: slot_writev %s" % si_s) - si_dir = storage_index_to_dir(storage_index) - - # shares exist if there is a file for them - bucketdir = os.path.join(self.sharedir, si_dir) - shares = {} - if os.path.isdir(bucketdir): - for sharenum_s in os.listdir(bucketdir): - try: - sharenum = int(sharenum_s) - except ValueError: - continue - filename = os.path.join(bucketdir, sharenum_s) - msf = MutableShareFile(filename, self) - msf.check_write_enabler(write_enabler, si_s) - shares[sharenum] = msf - # write_enabler is good for all existing shares. - - # Now evaluate test vectors. - testv_is_good = True - for sharenum in test_and_write_vectors: - (testv, datav, new_length) = test_and_write_vectors[sharenum] - if sharenum in shares: - if not shares[sharenum].check_testv(testv): - self.log("testv failed: [%d]: %r" % (sharenum, testv)) - testv_is_good = False - break - else: - # compare the vectors against an empty share, in which all - # reads return empty strings. - if not EmptyShare().check_testv(testv): - self.log("testv failed (empty): [%d] %r" % (sharenum, - testv)) - testv_is_good = False - break - - # now gather the read vectors, before we do any writes - read_data = {} - for sharenum, share in shares.items(): - read_data[sharenum] = share.readv(read_vector) - - if testv_is_good: - # now apply the write vectors - for sharenum in test_and_write_vectors: - (testv, datav, new_length) = test_and_write_vectors[sharenum] - if new_length == 0: - if sharenum in shares: - shares[sharenum].unlink() - account.remove_share_and_leases(storage_index, sharenum) - else: - if sharenum not in shares: - # allocate a new share - allocated_size = 2000 # arbitrary, really # REMOVE - share = self._allocate_slot_share(bucketdir, - write_enabler, - sharenum, - allocated_size) - shares[sharenum] = share - shares[sharenum].writev(datav, new_length) - account.add_share(storage_index, sharenum, - shares[sharenum].get_used_space(), SHARETYPE_MUTABLE) - else: - # apply the write vector and update the lease - shares[sharenum].writev(datav, new_length) - - account.add_or_renew_default_lease(storage_index, sharenum) - account.mark_share_as_stable(storage_index, sharenum, - shares[sharenum].get_used_space()) - - if new_length == 0: - # delete empty bucket directories - if not os.listdir(bucketdir): - os.rmdir(bucketdir) - - # all done - self.add_latency("writev", time.time() - start) - return (testv_is_good, read_data) - - def _allocate_slot_share(self, bucketdir, write_enabler, sharenum, allocated_size): - my_nodeid = self.my_nodeid - fileutil.make_dirs(bucketdir) - filename = os.path.join(bucketdir, "%d" % sharenum) - share = create_mutable_sharefile(filename, my_nodeid, write_enabler, - self) - return share - - def delete_share(self, storage_index, shnum): - si_dir = storage_index_to_dir(storage_index) - filename = os.path.join(self.sharedir, si_dir, "%d" % (shnum,)) - os.unlink(filename) + + shareset = self.backend.get_shareset(storage_index) + expiration_time = start + 31*24*60*60 # one month from now + + d = shareset.testv_and_readv_and_writev(write_enabler, test_and_write_vectors, + read_vector, expiration_time, account) + d.addBoth(self._add_latency, "writev", start) + return d def client_slot_readv(self, storage_index, shares, readv, account): - start = time.time() + start = self.clock.seconds() self.count("readv") si_s = si_b2a(storage_index) - lp = log.msg("storage: slot_readv %s %s" % (si_s, shares), - facility="tahoe.storage", level=log.OPERATIONAL) - si_dir = storage_index_to_dir(storage_index) - # shares exist if there is a file for them - bucketdir = os.path.join(self.sharedir, si_dir) - if not os.path.isdir(bucketdir): - self.add_latency("readv", time.time() - start) - return {} - datavs = {} - for sharenum_s in os.listdir(bucketdir): - try: - sharenum = int(sharenum_s) - except ValueError: - continue - if sharenum in shares or not shares: - filename = os.path.join(bucketdir, sharenum_s) - msf = MutableShareFile(filename, self) - datavs[sharenum] = msf.readv(readv) - log.msg("returning shares %s" % (datavs.keys(),), - facility="tahoe.storage", level=log.NOISY, parent=lp) - self.add_latency("readv", time.time() - start) - return datavs - - def client_advise_corrupt_share(self, share_type, storage_index, shnum, reason): - fileutil.make_dirs(self.corruption_advisory_dir) + log.msg("storage: slot_readv %s %s" % (si_s, shares), + facility="tahoe.storage", level=log.OPERATIONAL) + + shareset = self.backend.get_shareset(storage_index) + d = shareset.readv(shares, readv) + d.addBoth(self._add_latency, "readv", start) + return d + + def client_advise_corrupt_share(self, share_type, storage_index, shnum, reason, account): + fileutil.make_dirs(self._corruption_advisory_dir) now = time_format.iso_utc(sep="T") si_s = si_b2a(storage_index) + owner_num = account.get_owner_num() + # windows can't handle colons in the filename - fn = os.path.join(self.corruption_advisory_dir, + fn = os.path.join(self._corruption_advisory_dir, "%s--%s-%d" % (now, si_s, shnum)).replace(":","") f = open(fn, "w") - f.write("report: Share Corruption\n") - f.write("type: %s\n" % share_type) - f.write("storage_index: %s\n" % si_s) - f.write("share_number: %d\n" % shnum) - f.write("\n") - f.write(reason) - f.write("\n") - f.close() - log.msg(format=("client claims corruption in (%(share_type)s) " + + try: + f.write("report: Share Corruption\n") + f.write("type: %s\n" % (share_type,)) + f.write("storage_index: %s\n" % (si_s,)) + f.write("share_number: %d\n" % (shnum,)) + f.write("owner_num: %s\n" % (owner_num,)) + f.write("\n") + f.write(reason) + f.write("\n") + finally: + f.close() + + log.msg(format=("client #%(owner_num)d claims corruption in (%(share_type)s) " + "%(si)s-%(shnum)d: %(reason)s"), - share_type=share_type, si=si_s, shnum=shnum, reason=reason, + owner_num=owner_num, share_type=share_type, si=si_s, shnum=shnum, reason=reason, level=log.SCARY, umid="SGx2fA") - return None diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index 45823181..2a2a9b40 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -328,11 +328,13 @@ class NoNetworkGrid(service.MultiService): ss.hung_until = None def nuke_from_orbit(self): - """ Empty all share directories in this grid. It's the only way to be sure ;-) """ + """Empty all share directories in this grid. It's the only way to be sure ;-) + This works only for a disk backend.""" for server in self.servers_by_number.values(): - for prefixdir in os.listdir(server.sharedir): + sharedir = server.backend._sharedir + for prefixdir in os.listdir(sharedir): if prefixdir != 'incoming': - fileutil.rm_dir(os.path.join(server.sharedir, prefixdir)) + fileutil.rm_dir(os.path.join(sharedir, prefixdir)) class GridTestMixin: diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index a77d51ca..d5ee894c 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -3,7 +3,7 @@ from twisted.trial import unittest from twisted.application import service import allmydata -from allmydata.node import OldConfigError, OldConfigOptionError, MissingConfigEntry +from allmydata.node import InvalidValueError, OldConfigError, OldConfigOptionError, MissingConfigEntry from allmydata import client from allmydata.storage_client import StorageFarmBroker from allmydata.util import base32, fileutil @@ -148,7 +148,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase): "[storage]\n" + \ "enabled = true\n" + \ "reserved_space = bogus\n") - self.failUnlessRaises(ValueError, client.Client, basedir) + self.failUnlessRaises(InvalidValueError, client.Client, basedir) def test_expire_mutable_false_unsupported(self): basedir = "client.Basic.test_expire_mutable_false_unsupported" diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index 51478347..ebbf15ea 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -218,7 +218,7 @@ class FakeStorageServer(service.MultiService): name = 'storage' def __init__(self, nodeid, nickname): service.MultiService.__init__(self) - self.my_nodeid = nodeid + self.serverid = nodeid self.nickname = nickname self.bucket_counter = FakeBucketCounter() self.accounting_crawler = FakeAccountingCrawler() @@ -226,8 +226,8 @@ class FakeStorageServer(service.MultiService): self.expiration_policy = ExpirationPolicy(enabled=False) def get_stats(self): return {"storage_server.accepting_immutable_shares": False} - def get_nodeid(self): - return self.my_nodeid + def get_serverid(self): + return self.serverid def get_bucket_counter(self): return self.bucket_counter def get_accounting_crawler(self):