Changes to node classes (Node, Client and StorageServer).
authorDaira Hopwood <daira@jacaranda.org>
Fri, 17 Apr 2015 18:26:45 +0000 (19:26 +0100)
committerDaira Hopwood <daira@jacaranda.org>
Fri, 17 Apr 2015 21:31:02 +0000 (22:31 +0100)
Signed-off-by: Daira Hopwood <daira@jacaranda.org>
src/allmydata/client.py
src/allmydata/node.py
src/allmydata/storage/backends/disk/disk_backend.py
src/allmydata/storage/server.py
src/allmydata/test/no_network.py
src/allmydata/test/test_client.py
src/allmydata/test/test_web.py

index c2d1b3b5063b766877821e1b0424d7f47f68b4b3..9f56a7d11ee1bdba7ecffe216d2c7106496aaa91 100644 (file)
@@ -8,7 +8,12 @@ from twisted.application.internet import TimerService
 from pycryptopp.publickey import rsa
 
 import allmydata
+from allmydata.node import InvalidValueError
 from allmydata.storage.server import StorageServer
+from allmydata.storage.backends.null.null_backend import configure_null_backend
+from allmydata.storage.backends.disk.disk_backend import configure_disk_backend
+from allmydata.storage.backends.cloud.cloud_backend import configure_cloud_backend
+from allmydata.storage.backends.cloud.mock_cloud import configure_mock_cloud_backend
 from allmydata.storage.expiration import ExpirationPolicy
 from allmydata import storage_client
 from allmydata.immutable.upload import Uploader
@@ -16,7 +21,7 @@ from allmydata.immutable.offloaded import Helper
 from allmydata.control import ControlServer
 from allmydata.introducer.client import IntroducerClient
 from allmydata.util import hashutil, base32, pollmixin, log, keyutil, idlib
-from allmydata.util.encodingutil import get_filesystem_encoding, \
+from allmydata.util.encodingutil import get_filesystem_encoding, quote_output, \
      from_utf8_or_none
 from allmydata.util.fileutil import abspath_expanduser_unicode
 from allmydata.util.abbreviate import parse_abbreviated_size
@@ -171,7 +176,8 @@ class Client(node.Node, pollmixin.PollMixin):
             self.init_web(webport) # strports string
 
     def _sequencer(self):
-        seqnum_s = self.get_config_from_file("announcement-seqnum")
+        seqnum_path = os.path.join(self.basedir, "announcement-seqnum")
+        seqnum_s = self.get_optional_config_from_file(seqnum_path)
         if not seqnum_s:
             seqnum_s = "0"
         seqnum = int(seqnum_s.strip())
@@ -222,6 +228,7 @@ class Client(node.Node, pollmixin.PollMixin):
         def _make_key():
             sk_vs,vk_vs = keyutil.make_keypair()
             return sk_vs+"\n"
+
         sk_vs = self.get_or_create_private_config("node.privkey", _make_key)
         sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
         self.write_config("node.pubkey", vk_vs+"\n")
@@ -236,11 +243,10 @@ class Client(node.Node, pollmixin.PollMixin):
         return idlib.nodeid_b2a(self.nodeid)
 
     def _init_permutation_seed(self, ss):
-        seed = self.get_config_from_file("permutation-seed")
+        seed = self.get_optional_private_config("permutation-seed")
         if not seed:
-            have_shares = ss.have_shares()
-            if have_shares:
-                # if the server has shares but not a recorded
+            if ss.backend.must_use_tubid_as_permutation_seed():
+                # If a server using a disk backend has shares but not a recorded
                 # permutation-seed, then it has been around since pre-#466
                 # days, and the clients who uploaded those shares used our
                 # TubID as a permutation-seed. We should keep using that same
@@ -258,22 +264,31 @@ class Client(node.Node, pollmixin.PollMixin):
 
     def init_storage(self):
         self.accountant = None
-        # should we run a storage server (and publish it for others to use)?
+        # Should we run a storage server (and publish it for others to use)?
         if not self.get_config("storage", "enabled", True, boolean=True):
             return
-        readonly = self.get_config("storage", "readonly", False, boolean=True)
 
         storedir = os.path.join(self.basedir, self.STOREDIR)
 
-        data = self.get_config("storage", "reserved_space", None)
-        try:
-            reserved = parse_abbreviated_size(data)
-        except ValueError:
-            log.msg("[storage]reserved_space= contains unparseable value %s"
-                    % data)
-            raise
-        if reserved is None:
-            reserved = 0
+        # What sort of backend?
+        backendtype = self.get_config("storage", "backend", "disk")
+        if backendtype == "s3":
+            backendtype = "cloud.s3"
+        backendprefix = backendtype.partition('.')[0]
+
+        backend_configurators = {
+            'disk': configure_disk_backend,
+            'cloud': configure_cloud_backend,
+            'mock_cloud': configure_mock_cloud_backend,
+            'debug_discard': configure_null_backend,
+        }
+
+        if backendprefix not in backend_configurators:
+            raise InvalidValueError("%s is not supported; it must start with one of %s"
+                                    % (quote_output("[storage]backend = " + backendtype), backend_configurators.keys()) )
+
+        backend = backend_configurators[backendprefix](storedir, self)
+
         if self.get_config("storage", "debug_discard", False, boolean=True):
             raise OldConfigOptionError("[storage]debug_discard = True is no longer supported.")
 
@@ -300,15 +315,13 @@ class Client(node.Node, pollmixin.PollMixin):
         expiration_policy = ExpirationPolicy(enabled=expire, mode=mode, override_lease_duration=o_l_d,
                                              cutoff_date=cutoff_date)
 
-        ss = StorageServer(storedir, self.nodeid,
-                           reserved_space=reserved,
-                           readonly_storage=readonly,
+        statedir = storedir
+        ss = StorageServer(self.nodeid, backend, statedir,
                            stats_provider=self.stats_provider)
-        self.storage_server = ss
-        self.add_service(ss)
-
         self.accountant = ss.get_accountant()
         self.accountant.set_expiration_policy(expiration_policy)
+        self.storage_server = ss
+        self.add_service(ss)
 
         d = self.when_tub_ready()
         # we can't do registerReference until the Tub is ready
index 35ccb355e5c2feebf20b277775cbc24f1a82b5c1..9ebb3787c022d0d42e070fed535e5be53337ee24 100644 (file)
@@ -1,3 +1,4 @@
+
 import datetime, os.path, re, types, ConfigParser, tempfile
 from base64 import b32decode, b32encode
 
@@ -12,6 +13,8 @@ from allmydata.util import fileutil, iputil, observer
 from allmydata.util.assertutil import precondition, _assert
 from allmydata.util.fileutil import abspath_expanduser_unicode
 from allmydata.util.encodingutil import get_filesystem_encoding, quote_output
+from allmydata.util.abbreviate import parse_abbreviated_size
+
 
 # Add our application versions to the data that Foolscap's LogPublisher
 # reports.
@@ -37,9 +40,12 @@ such as private keys.  On Unix-like systems, the permissions on this directory
 are set to disallow users other than its owner from reading the contents of
 the files.   See the 'configuration.rst' documentation file for details."""
 
-class _None: # used as a marker in get_config()
+class _None: # used as a marker in get_config() and get_or_create_private_config()
     pass
 
+class InvalidValueError(Exception):
+    """ The configured value was not valid. """
+
 class MissingConfigEntry(Exception):
     """ A required config entry was not found. """
 
@@ -75,7 +81,7 @@ class Node(service.MultiService):
         self._portnumfile = os.path.join(self.basedir, self.PORTNUMFILE)
         self._tub_ready_observerlist = observer.OneShotObserverList()
         fileutil.make_dirs(os.path.join(self.basedir, "private"), 0700)
-        open(os.path.join(self.basedir, "private", "README"), "w").write(PRIV_README)
+        fileutil.write(os.path.join(self.basedir, "private", "README"), PRIV_README, mode="")
 
         # creates self.config
         self.read_config()
@@ -133,6 +139,16 @@ class Node(service.MultiService):
                                          % (quote_output(fn), section, option))
             return default
 
+    def get_config_size(self, section, option, default=_None):
+        data = self.get_config(section, option, default)
+        if data is None:
+            return None
+        try:
+            return parse_abbreviated_size(data)
+        except ValueError:
+            raise InvalidValueError("[%s]%s= contains unparseable size value %s"
+                                    % (section, option, quote_output(data)) )
+
     def set_config(self, section, option, value):
         if not self.config.has_section(section):
             self.config.add_section(section)
@@ -230,43 +246,42 @@ class Node(service.MultiService):
         # TODO: merge this with allmydata.get_package_versions
         return dict(app_versions.versions)
 
-    def get_config_from_file(self, name, required=False):
-        """Get the (string) contents of a config file, or None if the file
-        did not exist. If required=True, raise an exception rather than
-        returning None. Any leading or trailing whitespace will be stripped
-        from the data."""
-        fn = os.path.join(self.basedir, name)
+    def get_optional_config_from_file(self, path):
+        """Read the (string) contents of a file. Any leading or trailing
+        whitespace will be stripped from the data. If the file does not exist,
+        return None."""
         try:
-            return fileutil.read(fn).strip()
+            value = fileutil.read(path)
         except EnvironmentError:
-            if not required:
-                return None
-            raise
+            if os.path.exists(path):
+                raise
+            return None
+        return value.strip()
+
+    def _get_private_config_path(self, name):
+        return os.path.join(self.basedir, "private", name)
 
     def write_private_config(self, name, value):
         """Write the (string) contents of a private config file (which is a
         config file that resides within the subdirectory named 'private'), and
         return it.
         """
-        privname = os.path.join(self.basedir, "private", name)
-        open(privname, "w").write(value)
+        fileutil.write(self._get_private_config_path(name), value, mode="")
+
+    def get_optional_private_config(self, name):
+        """Try to get the (string) contents of a private config file (which
+        is a config file that resides within the subdirectory named
+        'private'), and return it. Any leading or trailing whitespace will be
+        stripped from the data. If the file does not exist, return None.
+        """
+        return self.get_optional_config_from_file(self._get_private_config_path(name))
 
-    def get_private_config(self, name, default=_None):
+    def get_private_config(self, name):
         """Read the (string) contents of a private config file (which is a
         config file that resides within the subdirectory named 'private'),
-        and return it. Return a default, or raise an error if one was not
-        given.
+        and return it. Raise an error if the file was not found.
         """
-        privname = os.path.join(self.basedir, "private", name)
-        try:
-            return fileutil.read(privname)
-        except EnvironmentError:
-            if os.path.exists(privname):
-                raise
-            if default is _None:
-                raise MissingConfigEntry("The required configuration file %s is missing."
-                                         % (quote_output(privname),))
-            return default
+        return self.get_or_create_private_config(name)
 
     def get_or_create_private_config(self, name, default=_None):
         """Try to get the (string) contents of a private config file (which
@@ -280,21 +295,18 @@ class Node(service.MultiService):
         If 'default' is a string, use it as a default value. If not, treat it
         as a zero-argument callable that is expected to return a string.
         """
-        privname = os.path.join(self.basedir, "private", name)
-        try:
-            value = fileutil.read(privname)
-        except EnvironmentError:
-            if os.path.exists(privname):
-                raise
+        value = self.get_optional_private_config(name)
+        if value is None:
+            privpath = self._get_private_config_path(name)
             if default is _None:
                 raise MissingConfigEntry("The required configuration file %s is missing."
-                                         % (quote_output(privname),))
-            if isinstance(default, basestring):
-                value = default
+                                         % (quote_output(privpath),))
+            elif isinstance(default, basestring):
+                value = default.strip()
             else:
-                value = default()
-            fileutil.write(privname, value)
-        return value.strip()
+                value = default().strip()
+            fileutil.write(privpath, value, mode="")
+        return value
 
     def write_config(self, name, value, mode="w"):
         """Write a string to a config file."""
index 2d24f6941ca4da226eb0f253bc7798e8a2d45537..d507d1520d2cc9b02d4449a9cf21c776e1769e5f 100644 (file)
@@ -58,7 +58,7 @@ class DiskBackend(Backend):
         Backend.__init__(self)
         self._storedir = storedir
         self._readonly = readonly
-        self._reserved_space = int(reserved_space)
+        self._reserved_space = reserved_space
         self._sharedir = os.path.join(self._storedir, 'shares')
         fileutil.make_dirs(self._sharedir)
         self._incomingdir = os.path.join(self._sharedir, 'incoming')
index 91e5347c20f9493c893fff5a87b61e77a5e6d4c3..58aa4530ba2c8588f69052982990136e3e0755e5 100644 (file)
@@ -1,39 +1,21 @@
 
-import os, re, weakref, struct, time
+import os, weakref
 
 from twisted.application import service
+from twisted.internet import defer, reactor
 
 from zope.interface import implements
-from allmydata.interfaces import IStatsProducer
+from allmydata.interfaces import IStatsProducer, IStorageBackend
+from allmydata.util.assertutil import precondition
 from allmydata.util import fileutil, idlib, log, time_format
 import allmydata # for __full_version__
 
 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
 _pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
-from allmydata.storage.backends.disk.mutable import MutableShareFile, EmptyShare, \
-     create_mutable_sharefile
 from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
-from allmydata.storage.backends.disk.immutable import ShareFile
-from allmydata.storage.bucket import BucketWriter, BucketReader
 from allmydata.storage.crawler import BucketCountingCrawler
 from allmydata.storage.accountant import Accountant
 from allmydata.storage.expiration import ExpirationPolicy
-from allmydata.storage.leasedb import SHARETYPE_MUTABLE
-
-
-# storage/
-# storage/shares/incoming
-#   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
-#   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
-# storage/shares/$START/$STORAGEINDEX
-# storage/shares/$START/$STORAGEINDEX/$SHARENUM
-
-# Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
-# base-32 chars).
-
-# $SHARENUM matches this regex:
-NUM_RE=re.compile("^[0-9]+$")
-
 
 
 class StorageServer(service.MultiService):
@@ -42,36 +24,33 @@ class StorageServer(service.MultiService):
     BucketCounterClass = BucketCountingCrawler
     DEFAULT_EXPIRATION_POLICY = ExpirationPolicy(enabled=False)
 
-    def __init__(self, storedir, nodeid, reserved_space=0,
-                 readonly_storage=False,
+    def __init__(self, serverid, backend, statedir,
                  stats_provider=None,
-                 expiration_policy=None):
+                 expiration_policy=None,
+                 clock=None):
         service.MultiService.__init__(self)
-        assert isinstance(nodeid, str)
-        assert len(nodeid) == 20
-        self.my_nodeid = nodeid
-        self.storedir = storedir
-        sharedir = os.path.join(storedir, "shares")
-        fileutil.make_dirs(sharedir)
-        self.sharedir = sharedir
-        # we don't actually create the corruption-advisory dir until necessary
-        self.corruption_advisory_dir = os.path.join(storedir,
-                                                    "corruption-advisories")
-        self.reserved_space = int(reserved_space)
-        self.readonly_storage = readonly_storage
+        precondition(IStorageBackend.providedBy(backend), backend)
+        precondition(isinstance(serverid, str), serverid)
+        precondition(len(serverid) == 20, serverid)
+
+        self._serverid = serverid
+        self.clock = clock or reactor
         self.stats_provider = stats_provider
         if self.stats_provider:
             self.stats_provider.register_producer(self)
-        self.incomingdir = os.path.join(sharedir, 'incoming')
-        self._clean_incomplete()
-        fileutil.make_dirs(self.incomingdir)
+
+        self.backend = backend
+        self.backend.setServiceParent(self)
+
         self._active_writers = weakref.WeakKeyDictionary()
-        log.msg("StorageServer created", facility="tahoe.storage")
+        self._statedir = statedir
+        fileutil.make_dirs(self._statedir)
+
+        # we don't actually create the corruption-advisory dir until necessary
+        self._corruption_advisory_dir = os.path.join(self._statedir,
+                                                     "corruption-advisories")
 
-        if reserved_space:
-            if self.get_available_space() is None:
-                log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
-                        umin="0wZ27w", level=log.UNUSUAL)
+        log.msg("StorageServer created", facility="tahoe.storage")
 
         self.latencies = {"allocate": [], # immutable
                           "write": [],
@@ -84,13 +63,14 @@ class StorageServer(service.MultiService):
                           "renew": [],
                           "cancel": [],
                           }
-        self.add_bucket_counter()
+
+        self.init_bucket_counter()
         self.init_accountant(expiration_policy or self.DEFAULT_EXPIRATION_POLICY)
 
     def init_accountant(self, expiration_policy):
-        dbfile = os.path.join(self.storedir, "leasedb.sqlite")
-        statefile = os.path.join(self.storedir, "leasedb_crawler.state")
-        self.accountant = Accountant(self, dbfile, statefile)
+        dbfile = os.path.join(self._statedir, "leasedb.sqlite")
+        statefile = os.path.join(self._statedir, "accounting_crawler.state")
+        self.accountant = Accountant(self, dbfile, statefile, clock=self.clock)
         self.accountant.set_expiration_policy(expiration_policy)
         self.accountant.setServiceParent(self)
 
@@ -106,20 +86,16 @@ class StorageServer(service.MultiService):
     def get_bucket_counter(self):
         return self.bucket_counter
 
-    def get_nodeid(self):
-        return self.my_nodeid
+    def get_serverid(self):
+        return self._serverid
 
     def __repr__(self):
-        return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
+        return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.get_serverid()),)
 
-    def have_shares(self):
-        # quick test to decide if we need to commit to an implicit
-        # permutation-seed or if we should use a new one
-        return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
-
-    def add_bucket_counter(self):
-        statefile = os.path.join(self.storedir, "bucket_counter.state")
-        self.bucket_counter = BucketCountingCrawler(self, statefile)
+    def init_bucket_counter(self):
+        statefile = os.path.join(self._statedir, "bucket_counter.state")
+        self.bucket_counter = self.BucketCounterClass(self.backend, statefile,
+                                                      clock=self.clock)
         self.bucket_counter.setServiceParent(self)
 
     def count(self, name, delta=1):
@@ -132,11 +108,15 @@ class StorageServer(service.MultiService):
         if len(a) > 1000:
             self.latencies[category] = a[-1000:]
 
+    def _add_latency(self, res, category, start):
+        self.add_latency(category, self.clock.seconds() - start)
+        return res
+
     def get_latencies(self):
         """Return a dict, indexed by category, that contains a dict of
         latency numbers for each category. If there are sufficient samples
         for unambiguous interpretation, each dict will contain the
-        following keys: mean, 01_0_percentile, 10_0_percentile,
+        following keys: samplesize, mean, 01_0_percentile, 10_0_percentile,
         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
         99_0_percentile, 99_9_percentile.  If there are insufficient
         samples for a given percentile to be interpreted unambiguously
@@ -177,52 +157,25 @@ class StorageServer(service.MultiService):
             kwargs["facility"] = "tahoe.storage"
         return log.msg(*args, **kwargs)
 
-    def _clean_incomplete(self):
-        fileutil.rm_dir(self.incomingdir)
-
     def get_stats(self):
         # remember: RIStatsProvider requires that our return dict
-        # contains numeric values.
+        # contains numeric, or None values.
         stats = { 'storage_server.allocated': self.allocated_size(), }
-        stats['storage_server.reserved_space'] = self.reserved_space
         for category,ld in self.get_latencies().items():
             for name,v in ld.items():
                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
 
-        try:
-            disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
-            writeable = disk['avail'] > 0
-
-            # spacetime predictors should use disk_avail / (d(disk_used)/dt)
-            stats['storage_server.disk_total'] = disk['total']
-            stats['storage_server.disk_used'] = disk['used']
-            stats['storage_server.disk_free_for_root'] = disk['free_for_root']
-            stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
-            stats['storage_server.disk_avail'] = disk['avail']
-        except AttributeError:
-            writeable = True
-        except EnvironmentError:
-            log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
-            writeable = False
-
-        if self.readonly_storage:
-            stats['storage_server.disk_avail'] = 0
-            writeable = False
-
-        stats['storage_server.accepting_immutable_shares'] = int(writeable)
-        s = self.bucket_counter.get_state()
-        bucket_count = s.get("last-complete-bucket-count")
-        if bucket_count:
-            stats['storage_server.total_bucket_count'] = bucket_count
+        self.backend.fill_in_space_stats(stats)
+
+        if self.bucket_counter:
+            s = self.bucket_counter.get_state()
+            bucket_count = s.get("last-complete-bucket-count")
+            if bucket_count:
+                stats['storage_server.total_bucket_count'] = bucket_count
         return stats
 
     def get_available_space(self):
-        """Returns available space for share storage in bytes, or None if no
-        API to get this information is available."""
-
-        if self.readonly_storage:
-            return 0
-        return fileutil.get_available_space(self.sharedir, self.reserved_space)
+        return self.backend.get_available_space()
 
     def allocated_size(self):
         space = 0
@@ -233,7 +186,7 @@ class StorageServer(service.MultiService):
     # these methods can be invoked by our callers
 
     def client_get_version(self, account):
-        remaining_space = self.get_available_space()
+        remaining_space = self.backend.get_available_space()
         if remaining_space is None:
             # We're on a platform that has no API to get disk stats.
             remaining_space = 2**64
@@ -246,267 +199,154 @@ class StorageServer(service.MultiService):
                       "delete-mutable-shares-with-zero-length-writev": True,
                       "fills-holes-with-zero-bytes": True,
                       "prevents-read-past-end-of-share-data": True,
-                      "accounting-v1": {},
+                      "ignores-lease-renewal-and-cancel-secrets": True,
+                      "has-immutable-readv": True,
                       },
                     "application-version": str(allmydata.__full_version__),
                     }
         return version
 
     def client_allocate_buckets(self, storage_index,
-                                sharenums, allocated_size,
+                                sharenums, allocated_data_length,
                                 canary, account):
-        start = time.time()
+        start = self.clock.seconds()
         self.count("allocate")
-        alreadygot = set()
         bucketwriters = {} # k: shnum, v: BucketWriter
-        si_dir = storage_index_to_dir(storage_index)
         si_s = si_b2a(storage_index)
 
         log.msg("storage: allocate_buckets %s" % si_s)
 
-        # Note that the lease should not be added until the BucketWriter has
-        # been closed. This is handled in BucketWriter.close()
-
-        max_space_per_bucket = allocated_size
-
         remaining_space = self.get_available_space()
         limited = remaining_space is not None
         if limited:
-            # this is a bit conservative, since some of this allocated_size()
-            # has already been written to disk, where it will show up in
+            # This is a bit conservative, since some of this allocated_size()
+            # has already been written to the backend, where it will show up in
             # get_available_space.
             remaining_space -= self.allocated_size()
-        # self.readonly_storage causes remaining_space <= 0
-
-        # fill alreadygot with all shares that we have, not just the ones
-        # they asked about: this will save them a lot of work. Add or update
-        # leases for all of them: if they want us to hold shares for this
-        # file, they'll want us to hold leases for this file.
-        for (shnum, fn) in self._get_bucket_shares(storage_index):
-            alreadygot.add(shnum)
-
-        for shnum in sharenums:
-            incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
-            finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
-            if os.path.exists(finalhome):
-                # great! we already have it. easy.
-                pass
-            elif os.path.exists(incominghome):
-                # Note that we don't create BucketWriters for shnums that
-                # have a partial share (in incoming/), so if a second upload
-                # occurs while the first is still in progress, the second
-                # uploader will use different storage servers.
-                pass
-            elif (not limited) or (remaining_space >= max_space_per_bucket):
-                # ok! we need to create the new share file.
-                bw = BucketWriter(self, account, storage_index, shnum,
-                                  incominghome, finalhome,
-                                  max_space_per_bucket, canary)
-                bucketwriters[shnum] = bw
-                self._active_writers[bw] = 1
-                if limited:
-                    remaining_space -= max_space_per_bucket
-            else:
-                # bummer! not enough space to accept this bucket
-                pass
+            # If the backend is read-only, remaining_space will be <= 0.
 
-        if bucketwriters:
-            fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
+        # Fill alreadygot with all shares that we have, not just the ones
+        # they asked about: this will save them a lot of work. Leases will
+        # be added or updated for all of them.
+        alreadygot = set()
+        shareset = self.backend.get_shareset(storage_index)
+        d = shareset.get_shares()
+        def _got_shares( (shares, corrupted) ):
+            remaining = remaining_space
+            for share in shares:
+                # XXX do we need to explicitly add a lease here?
+                alreadygot.add(share.get_shnum())
+
+            d2 = defer.succeed(None)
+
+            # We don't create BucketWriters for shnums where we have a share
+            # that is corrupted. Is that right, or should we allow the corrupted
+            # share to be clobbered? Note that currently the disk share classes
+            # have assertions that prevent them from clobbering existing files.
+            for shnum in set(sharenums) - alreadygot - corrupted:
+                if shareset.has_incoming(shnum):
+                    # Note that we don't create BucketWriters for shnums that
+                    # have an incoming share, so if a second upload occurs while
+                    # the first is still in progress, the second uploader will
+                    # use different storage servers.
+                    pass
+                elif (not limited) or remaining >= allocated_data_length:
+                    if limited:
+                        remaining -= allocated_data_length
+
+                    d2.addCallback(lambda ign, shnum=shnum:
+                                   shareset.make_bucket_writer(account, shnum, allocated_data_length,
+                                                               canary))
+                    def _record_writer(bw, shnum=shnum):
+                        bucketwriters[shnum] = bw
+                        self._active_writers[bw] = 1
+                    d2.addCallback(_record_writer)
+                else:
+                    # not enough space to accept this share
+                    pass
 
-        self.add_latency("allocate", time.time() - start)
-        return alreadygot, bucketwriters
-
-    def _iter_share_files(self, storage_index):
-        for shnum, filename in self._get_bucket_shares(storage_index):
-            f = open(filename, 'rb')
-            header = f.read(32)
-            f.close()
-            if header[:32] == MutableShareFile.MAGIC:
-                sf = MutableShareFile(filename, self)
-                # note: if the share has been migrated, the renew_lease()
-                # call will throw an exception, with information to help the
-                # client update the lease.
-            elif header[:4] == struct.pack(">L", 1):
-                sf = ShareFile(filename)
-            else:
-                continue # non-sharefile
-            yield sf
+            d2.addCallback(lambda ign: (alreadygot, bucketwriters))
+            return d2
+        d.addCallback(_got_shares)
+        d.addBoth(self._add_latency, "allocate", start)
+        return d
 
     def bucket_writer_closed(self, bw, consumed_size):
         if self.stats_provider:
             self.stats_provider.count('storage_server.bytes_added', consumed_size)
         del self._active_writers[bw]
 
-    def _get_bucket_shares(self, storage_index):
-        """Return a list of (shnum, pathname) tuples for files that hold
-        shares for this storage_index. In each tuple, 'shnum' will always be
-        the integer form of the last component of 'pathname'."""
-        storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
-        try:
-            for f in os.listdir(storagedir):
-                if NUM_RE.match(f):
-                    filename = os.path.join(storagedir, f)
-                    yield (int(f), filename)
-        except OSError:
-            # Commonly caused by there being no buckets at all.
-            pass
-
-    def client_get_buckets(self, storage_index):
-        start = time.time()
+    def client_get_buckets(self, storage_index, account):
+        start = self.clock.seconds()
         self.count("get")
         si_s = si_b2a(storage_index)
         log.msg("storage: get_buckets %s" % si_s)
         bucketreaders = {} # k: sharenum, v: BucketReader
-        for shnum, filename in self._get_bucket_shares(storage_index):
-            bucketreaders[shnum] = BucketReader(self, filename,
-                                                storage_index, shnum)
-        self.add_latency("get", time.time() - start)
-        return bucketreaders
+
+        shareset = self.backend.get_shareset(storage_index)
+        d = shareset.get_shares()
+        def _make_readers( (shares, corrupted) ):
+            # We don't create BucketReaders for corrupted shares.
+            for share in shares:
+                assert not isinstance(share, defer.Deferred), share
+                bucketreaders[share.get_shnum()] = shareset.make_bucket_reader(account, share)
+            return bucketreaders
+        d.addCallback(_make_readers)
+        d.addBoth(self._add_latency, "get", start)
+        return d
 
     def client_slot_testv_and_readv_and_writev(self, storage_index,
                                                write_enabler,
                                                test_and_write_vectors,
                                                read_vector, account):
-        start = time.time()
+        start = self.clock.seconds()
         self.count("writev")
         si_s = si_b2a(storage_index)
-
         log.msg("storage: slot_writev %s" % si_s)
-        si_dir = storage_index_to_dir(storage_index)
-
-        # shares exist if there is a file for them
-        bucketdir = os.path.join(self.sharedir, si_dir)
-        shares = {}
-        if os.path.isdir(bucketdir):
-            for sharenum_s in os.listdir(bucketdir):
-                try:
-                    sharenum = int(sharenum_s)
-                except ValueError:
-                    continue
-                filename = os.path.join(bucketdir, sharenum_s)
-                msf = MutableShareFile(filename, self)
-                msf.check_write_enabler(write_enabler, si_s)
-                shares[sharenum] = msf
-        # write_enabler is good for all existing shares.
-
-        # Now evaluate test vectors.
-        testv_is_good = True
-        for sharenum in test_and_write_vectors:
-            (testv, datav, new_length) = test_and_write_vectors[sharenum]
-            if sharenum in shares:
-                if not shares[sharenum].check_testv(testv):
-                    self.log("testv failed: [%d]: %r" % (sharenum, testv))
-                    testv_is_good = False
-                    break
-            else:
-                # compare the vectors against an empty share, in which all
-                # reads return empty strings.
-                if not EmptyShare().check_testv(testv):
-                    self.log("testv failed (empty): [%d] %r" % (sharenum,
-                                                                testv))
-                    testv_is_good = False
-                    break
-
-        # now gather the read vectors, before we do any writes
-        read_data = {}
-        for sharenum, share in shares.items():
-            read_data[sharenum] = share.readv(read_vector)
-
-        if testv_is_good:
-            # now apply the write vectors
-            for sharenum in test_and_write_vectors:
-                (testv, datav, new_length) = test_and_write_vectors[sharenum]
-                if new_length == 0:
-                    if sharenum in shares:
-                        shares[sharenum].unlink()
-                        account.remove_share_and_leases(storage_index, sharenum)
-                else:
-                    if sharenum not in shares:
-                        # allocate a new share
-                        allocated_size = 2000 # arbitrary, really # REMOVE
-                        share = self._allocate_slot_share(bucketdir,
-                                                          write_enabler,
-                                                          sharenum,
-                                                          allocated_size)
-                        shares[sharenum] = share
-                        shares[sharenum].writev(datav, new_length)
-                        account.add_share(storage_index, sharenum,
-                                          shares[sharenum].get_used_space(), SHARETYPE_MUTABLE)
-                    else:
-                        # apply the write vector and update the lease
-                        shares[sharenum].writev(datav, new_length)
-
-                    account.add_or_renew_default_lease(storage_index, sharenum)
-                    account.mark_share_as_stable(storage_index, sharenum,
-                                                 shares[sharenum].get_used_space())
-
-            if new_length == 0:
-                # delete empty bucket directories
-                if not os.listdir(bucketdir):
-                    os.rmdir(bucketdir)
-
-        # all done
-        self.add_latency("writev", time.time() - start)
-        return (testv_is_good, read_data)
-
-    def _allocate_slot_share(self, bucketdir, write_enabler, sharenum, allocated_size):
-        my_nodeid = self.my_nodeid
-        fileutil.make_dirs(bucketdir)
-        filename = os.path.join(bucketdir, "%d" % sharenum)
-        share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
-                                         self)
-        return share
-
-    def delete_share(self, storage_index, shnum):
-        si_dir = storage_index_to_dir(storage_index)
-        filename = os.path.join(self.sharedir, si_dir, "%d" % (shnum,))
-        os.unlink(filename)
+
+        shareset = self.backend.get_shareset(storage_index)
+        expiration_time = start + 31*24*60*60   # one month from now
+
+        d = shareset.testv_and_readv_and_writev(write_enabler, test_and_write_vectors,
+                                                read_vector, expiration_time, account)
+        d.addBoth(self._add_latency, "writev", start)
+        return d
 
     def client_slot_readv(self, storage_index, shares, readv, account):
-        start = time.time()
+        start = self.clock.seconds()
         self.count("readv")
         si_s = si_b2a(storage_index)
-        lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
-                     facility="tahoe.storage", level=log.OPERATIONAL)
-        si_dir = storage_index_to_dir(storage_index)
-        # shares exist if there is a file for them
-        bucketdir = os.path.join(self.sharedir, si_dir)
-        if not os.path.isdir(bucketdir):
-            self.add_latency("readv", time.time() - start)
-            return {}
-        datavs = {}
-        for sharenum_s in os.listdir(bucketdir):
-            try:
-                sharenum = int(sharenum_s)
-            except ValueError:
-                continue
-            if sharenum in shares or not shares:
-                filename = os.path.join(bucketdir, sharenum_s)
-                msf = MutableShareFile(filename, self)
-                datavs[sharenum] = msf.readv(readv)
-        log.msg("returning shares %s" % (datavs.keys(),),
-                facility="tahoe.storage", level=log.NOISY, parent=lp)
-        self.add_latency("readv", time.time() - start)
-        return datavs
-
-    def client_advise_corrupt_share(self, share_type, storage_index, shnum, reason):
-        fileutil.make_dirs(self.corruption_advisory_dir)
+        log.msg("storage: slot_readv %s %s" % (si_s, shares),
+                facility="tahoe.storage", level=log.OPERATIONAL)
+
+        shareset = self.backend.get_shareset(storage_index)
+        d = shareset.readv(shares, readv)
+        d.addBoth(self._add_latency, "readv", start)
+        return d
+
+    def client_advise_corrupt_share(self, share_type, storage_index, shnum, reason, account):
+        fileutil.make_dirs(self._corruption_advisory_dir)
         now = time_format.iso_utc(sep="T")
         si_s = si_b2a(storage_index)
+        owner_num = account.get_owner_num()
+
         # windows can't handle colons in the filename
-        fn = os.path.join(self.corruption_advisory_dir,
+        fn = os.path.join(self._corruption_advisory_dir,
                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
         f = open(fn, "w")
-        f.write("report: Share Corruption\n")
-        f.write("type: %s\n" % share_type)
-        f.write("storage_index: %s\n" % si_s)
-        f.write("share_number: %d\n" % shnum)
-        f.write("\n")
-        f.write(reason)
-        f.write("\n")
-        f.close()
-        log.msg(format=("client claims corruption in (%(share_type)s) " +
+        try:
+            f.write("report: Share Corruption\n")
+            f.write("type: %s\n" % (share_type,))
+            f.write("storage_index: %s\n" % (si_s,))
+            f.write("share_number: %d\n" % (shnum,))
+            f.write("owner_num: %s\n" % (owner_num,))
+            f.write("\n")
+            f.write(reason)
+            f.write("\n")
+        finally:
+            f.close()
+
+        log.msg(format=("client #%(owner_num)d claims corruption in (%(share_type)s) " +
                         "%(si)s-%(shnum)d: %(reason)s"),
-                share_type=share_type, si=si_s, shnum=shnum, reason=reason,
+                owner_num=owner_num, share_type=share_type, si=si_s, shnum=shnum, reason=reason,
                 level=log.SCARY, umid="SGx2fA")
-        return None
index 35398fa1eb725fa7584a81d5c5f394178768d258..fd25d8d69a71907c35a794fb35f2ef0aaeb1a886 100644 (file)
@@ -328,11 +328,13 @@ class NoNetworkGrid(service.MultiService):
         ss.hung_until = None
 
     def nuke_from_orbit(self):
-        """ Empty all share directories in this grid. It's the only way to be sure ;-) """
+        """Empty all share directories in this grid. It's the only way to be sure ;-)
+        This works only for a disk backend."""
         for server in self.servers_by_number.values():
-            for prefixdir in os.listdir(server.sharedir):
+            sharedir = server.backend._sharedir
+            for prefixdir in os.listdir(sharedir):
                 if prefixdir != 'incoming':
-                    fileutil.rm_dir(os.path.join(server.sharedir, prefixdir))
+                    fileutil.rm_dir(os.path.join(sharedir, prefixdir))
 
 
 class GridTestMixin:
index a17c68a15714f6efc768763bea0f88f823437a64..53a2a660e7b44d74d99f68d08d5494da3a4cc024 100644 (file)
@@ -3,7 +3,8 @@ from twisted.trial import unittest
 from twisted.application import service
 
 import allmydata
-from allmydata.node import Node, OldConfigError, OldConfigOptionError, MissingConfigEntry, UnescapedHashError
+from allmydata.node import Node, InvalidValueError, OldConfigError, \
+     OldConfigOptionError, MissingConfigEntry, UnescapedHashError
 from allmydata.frontends.auth import NeedRootcapLookupScheme
 from allmydata import client
 from allmydata.storage_client import StorageFarmBroker
@@ -174,7 +175,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
                            "[storage]\n" + \
                            "enabled = true\n" + \
                            "reserved_space = bogus\n")
-        self.failUnlessRaises(ValueError, client.Client, basedir)
+        self.failUnlessRaises(InvalidValueError, client.Client, basedir)
 
     def test_web_staticdir(self):
         basedir = u"client.Basic.test_web_staticdir"
index 0e53e86bf8541741488a05d63d6ec0da9cae34a9..858115629b6eae2fd92938a400e978807ea1f944 100644 (file)
@@ -220,7 +220,7 @@ class FakeStorageServer(service.MultiService):
     name = 'storage'
     def __init__(self, nodeid, nickname):
         service.MultiService.__init__(self)
-        self.my_nodeid = nodeid
+        self.serverid = nodeid
         self.nickname = nickname
         self.bucket_counter = FakeBucketCounter()
         self.accounting_crawler = FakeAccountingCrawler()
@@ -228,8 +228,8 @@ class FakeStorageServer(service.MultiService):
         self.expiration_policy = ExpirationPolicy(enabled=False)
     def get_stats(self):
         return {"storage_server.accepting_immutable_shares": False}
-    def get_nodeid(self):
-        return self.my_nodeid
+    def get_serverid(self):
+        return self.serverid
     def get_bucket_counter(self):
         return self.bucket_counter
     def get_accounting_crawler(self):