]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/storage/server.py
storage: remove the storage server's "remote_cancel_lease" function
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / server.py
index b5f83f4ede933359a0c2072a1b2be212466e5961..8350e813c11c709af7dd6d97aab355c76e6d23c9 100644 (file)
@@ -1,11 +1,11 @@
 import os, re, weakref, struct, time
 
-from foolscap import Referenceable
+from foolscap.api import Referenceable
 from twisted.application import service
 
 from zope.interface import implements
 from allmydata.interfaces import RIStorageServer, IStatsProducer
-from allmydata.util import base32, fileutil, log, time_format
+from allmydata.util import fileutil, idlib, log, time_format
 import allmydata # for __full_version__
 
 from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
@@ -15,6 +15,7 @@ from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
      create_mutable_sharefile
 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
 from allmydata.storage.crawler import BucketCountingCrawler
+from allmydata.storage.expirer import LeaseCheckingCrawler
 
 # storage/
 # storage/shares/incoming
@@ -34,10 +35,16 @@ NUM_RE=re.compile("^[0-9]+$")
 class StorageServer(service.MultiService, Referenceable):
     implements(RIStorageServer, IStatsProducer)
     name = 'storage'
+    LeaseCheckerClass = LeaseCheckingCrawler
 
     def __init__(self, storedir, nodeid, reserved_space=0,
                  discard_storage=False, readonly_storage=False,
-                 stats_provider=None):
+                 stats_provider=None,
+                 expiration_enabled=False,
+                 expiration_mode="age",
+                 expiration_override_lease_duration=None,
+                 expiration_cutoff_date=None,
+                 expiration_sharetypes=("mutable", "immutable")):
         service.MultiService.__init__(self)
         assert isinstance(nodeid, str)
         assert len(nodeid) == 20
@@ -59,11 +66,11 @@ class StorageServer(service.MultiService, Referenceable):
         self._clean_incomplete()
         fileutil.make_dirs(self.incomingdir)
         self._active_writers = weakref.WeakKeyDictionary()
-        lp = log.msg("StorageServer created", facility="tahoe.storage")
+        log.msg("StorageServer created", facility="tahoe.storage")
 
         if reserved_space:
             if self.get_available_space() is None:
-                log.msg("warning: [storage]reserved_space= is set, but this platform does not support statvfs(2), so this reservation cannot be honored",
+                log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
                         umin="0wZ27w", level=log.UNUSUAL)
 
         self.latencies = {"allocate": [], # immutable
@@ -77,8 +84,23 @@ class StorageServer(service.MultiService, Referenceable):
                           "renew": [],
                           "cancel": [],
                           }
-
-        statefile = os.path.join(storedir, "bucket_counter.state")
+        self.add_bucket_counter()
+
+        statefile = os.path.join(self.storedir, "lease_checker.state")
+        historyfile = os.path.join(self.storedir, "lease_checker.history")
+        klass = self.LeaseCheckerClass
+        self.lease_checker = klass(self, statefile, historyfile,
+                                   expiration_enabled, expiration_mode,
+                                   expiration_override_lease_duration,
+                                   expiration_cutoff_date,
+                                   expiration_sharetypes)
+        self.lease_checker.setServiceParent(self)
+
+    def __repr__(self):
+        return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
+
+    def add_bucket_counter(self):
+        statefile = os.path.join(self.storedir, "bucket_counter.state")
         self.bucket_counter = BucketCountingCrawler(self, statefile)
         self.bucket_counter.setServiceParent(self)
 
@@ -94,12 +116,15 @@ class StorageServer(service.MultiService, Referenceable):
 
     def get_latencies(self):
         """Return a dict, indexed by category, that contains a dict of
-        latency numbers for each category. Each dict will contain the
+        latency numbers for each category. If there are sufficient samples
+        for unambiguous interpretation, each dict will contain the
         following keys: mean, 01_0_percentile, 10_0_percentile,
         50_0_percentile (median), 90_0_percentile, 95_0_percentile,
-        99_0_percentile, 99_9_percentile. If no samples have been collected
-        for the given category, then that category name will not be present
-        in the return value."""
+        99_0_percentile, 99_9_percentile.  If there are insufficient
+        samples for a given percentile to be interpreted unambiguously
+        that percentile will be reported as None. If no samples have been
+        collected for the given category, then that category name will
+        not be present in the return value. """
         # note that Amazon's Dynamo paper says they use 99.9% percentile.
         output = {}
         for category in self.latencies:
@@ -107,16 +132,25 @@ class StorageServer(service.MultiService, Referenceable):
                 continue
             stats = {}
             samples = self.latencies[category][:]
-            samples.sort()
             count = len(samples)
-            stats["mean"] = sum(samples) / count
-            stats["01_0_percentile"] = samples[int(0.01 * count)]
-            stats["10_0_percentile"] = samples[int(0.1 * count)]
-            stats["50_0_percentile"] = samples[int(0.5 * count)]
-            stats["90_0_percentile"] = samples[int(0.9 * count)]
-            stats["95_0_percentile"] = samples[int(0.95 * count)]
-            stats["99_0_percentile"] = samples[int(0.99 * count)]
-            stats["99_9_percentile"] = samples[int(0.999 * count)]
+            stats["samplesize"] = count
+            samples.sort()
+            if count > 1:
+                stats["mean"] = sum(samples) / count
+            else:
+                stats["mean"] = None
+
+            orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
+                             (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
+                             (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
+                             (0.999, "99_9_percentile", 1000)]
+
+            for percentile, percentilestring, minnumtoobserve in orderstatlist:
+                if count >= minnumtoobserve:
+                    stats[percentilestring] = samples[int(percentile*count)]
+                else:
+                    stats[percentilestring] = None
+
             output[category] = stats
         return output
 
@@ -128,67 +162,49 @@ class StorageServer(service.MultiService, Referenceable):
     def _clean_incomplete(self):
         fileutil.rm_dir(self.incomingdir)
 
-    def do_statvfs(self):
-        return os.statvfs(self.storedir)
-
     def get_stats(self):
         # remember: RIStatsProvider requires that our return dict
         # contains numeric values.
         stats = { 'storage_server.allocated': self.allocated_size(), }
-        stats["storage_server.reserved_space"] = self.reserved_space
+        stats['storage_server.reserved_space'] = self.reserved_space
         for category,ld in self.get_latencies().items():
             for name,v in ld.items():
                 stats['storage_server.latencies.%s.%s' % (category, name)] = v
-        writeable = True
-        if self.readonly_storage:
-            writeable = False
+
         try:
-            s = self.do_statvfs()
-            disk_total = s.f_bsize * s.f_blocks
-            disk_used = s.f_bsize * (s.f_blocks - s.f_bfree)
-            # spacetime predictors should look at the slope of disk_used.
-            disk_free_for_root = s.f_bsize * s.f_bfree
-            disk_free_for_nonroot = s.f_bsize * s.f_bavail
-
-            # include our local policy here: if we stop accepting shares when
-            # the available space drops below 1GB, then include that fact in
-            # disk_avail.
-            disk_avail = disk_free_for_nonroot - self.reserved_space
-            disk_avail = max(disk_avail, 0)
-            if self.readonly_storage:
-                disk_avail = 0
-            if disk_avail == 0:
-                writeable = False
+            disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
+            writeable = disk['avail'] > 0
 
             # spacetime predictors should use disk_avail / (d(disk_used)/dt)
-            stats["storage_server.disk_total"] = disk_total
-            stats["storage_server.disk_used"] = disk_used
-            stats["storage_server.disk_free_for_root"] = disk_free_for_root
-            stats["storage_server.disk_free_for_nonroot"] = disk_free_for_nonroot
-            stats["storage_server.disk_avail"] = disk_avail
+            stats['storage_server.disk_total'] = disk['total']
+            stats['storage_server.disk_used'] = disk['used']
+            stats['storage_server.disk_free_for_root'] = disk['free_for_root']
+            stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
+            stats['storage_server.disk_avail'] = disk['avail']
         except AttributeError:
-            # os.statvfs is available only on unix
-            pass
-        stats["storage_server.accepting_immutable_shares"] = int(writeable)
-        return stats
+            writeable = True
+        except EnvironmentError:
+            log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
+            writeable = False
 
+        if self.readonly_storage:
+            stats['storage_server.disk_avail'] = 0
+            writeable = False
 
-    def stat_disk(self, d):
-        s = os.statvfs(d)
-        # s.f_bavail: available to non-root users
-        disk_avail = s.f_bsize * s.f_bavail
-        return disk_avail
+        stats['storage_server.accepting_immutable_shares'] = int(writeable)
+        s = self.bucket_counter.get_state()
+        bucket_count = s.get("last-complete-bucket-count")
+        if bucket_count:
+            stats['storage_server.total_bucket_count'] = bucket_count
+        return stats
 
     def get_available_space(self):
-        # returns None if it cannot be measured (windows)
-        try:
-            disk_avail = self.stat_disk(self.storedir)
-            disk_avail -= self.reserved_space
-        except AttributeError:
-            disk_avail = None
+        """Returns available space for share storage in bytes, or None if no
+        API to get this information is available."""
+
         if self.readonly_storage:
-            disk_avail = 0
-        return disk_avail
+            return 0
+        return fileutil.get_available_space(self.sharedir, self.reserved_space)
 
     def allocated_size(self):
         space = 0
@@ -199,9 +215,9 @@ class StorageServer(service.MultiService, Referenceable):
     def remote_get_version(self):
         remaining_space = self.get_available_space()
         if remaining_space is None:
-            # we're on a platform that doesn't have 'df', so make a vague
-            # guess.
+            # We're on a platform that has no API to get disk stats.
             remaining_space = 2**64
+
         version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
                     { "maximum-immutable-share-size": remaining_space,
                       "tolerates-immutable-read-overrun": True,
@@ -245,6 +261,7 @@ class StorageServer(service.MultiService, Referenceable):
             # has already been written to disk, where it will show up in
             # get_available_space.
             remaining_space -= self.allocated_size()
+        # self.readonly_storage causes remaining_space <= 0
 
         # fill alreadygot with all shares that we have, not just the ones
         # they asked about: this will save them a lot of work. Add or update
@@ -255,8 +272,6 @@ class StorageServer(service.MultiService, Referenceable):
             sf = ShareFile(fn)
             sf.add_or_renew_lease(lease_info)
 
-        # self.readonly_storage causes remaining_space=0
-
         for shnum in sharenums:
             incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
             finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
@@ -330,34 +345,6 @@ class StorageServer(service.MultiService, Referenceable):
         if not found_buckets:
             raise IndexError("no such lease to renew")
 
-    def remote_cancel_lease(self, storage_index, cancel_secret):
-        start = time.time()
-        self.count("cancel")
-
-        total_space_freed = 0
-        found_buckets = False
-        for sf in self._iter_share_files(storage_index):
-            # note: if we can't find a lease on one share, we won't bother
-            # looking in the others. Unless something broke internally
-            # (perhaps we ran out of disk space while adding a lease), the
-            # leases on all shares will be identical.
-            found_buckets = True
-            # this raises IndexError if the lease wasn't present XXXX
-            total_space_freed += sf.cancel_lease(cancel_secret)
-
-        if found_buckets:
-            storagedir = os.path.join(self.sharedir,
-                                      storage_index_to_dir(storage_index))
-            if not os.listdir(storagedir):
-                os.rmdir(storagedir)
-
-        if self.stats_provider:
-            self.stats_provider.count('storage_server.bytes_freed',
-                                      total_space_freed)
-        self.add_latency("cancel", time.time() - start)
-        if not found_buckets:
-            raise IndexError("no such storage index")
-
     def bucket_writer_closed(self, bw, consumed_size):
         if self.stats_provider:
             self.stats_provider.count('storage_server.bytes_added', consumed_size)
@@ -391,8 +378,7 @@ class StorageServer(service.MultiService, Referenceable):
 
     def get_leases(self, storage_index):
         """Provide an iterator that yields all of the leases attached to this
-        bucket. Each lease is returned as a tuple of (owner_num,
-        renew_secret, cancel_secret, expiration_time).
+        bucket. Each lease is returned as a LeaseInfo instance.
 
         This method is not for client use.
         """
@@ -402,7 +388,7 @@ class StorageServer(service.MultiService, Referenceable):
         try:
             shnum, filename = self._get_bucket_shares(storage_index).next()
             sf = ShareFile(filename)
-            return sf.iter_leases()
+            return sf.get_leases()
         except StopIteration:
             return iter([])
 
@@ -413,7 +399,7 @@ class StorageServer(service.MultiService, Referenceable):
         start = time.time()
         self.count("writev")
         si_s = si_b2a(storage_index)
-        lp = log.msg("storage: slot_writev %s" % si_s)
+        log.msg("storage: slot_writev %s" % si_s)
         si_dir = storage_index_to_dir(storage_index)
         (write_enabler, renew_secret, cancel_secret) = secrets
         # shares exist if there is a file for them
@@ -531,7 +517,7 @@ class StorageServer(service.MultiService, Referenceable):
                                     reason):
         fileutil.make_dirs(self.corruption_advisory_dir)
         now = time_format.iso_utc(sep="T")
-        si_s = base32.b2a(storage_index)
+        si_s = si_b2a(storage_index)
         # windows can't handle colons in the filename
         fn = os.path.join(self.corruption_advisory_dir,
                           "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
@@ -549,4 +535,3 @@ class StorageServer(service.MultiService, Referenceable):
                 share_type=share_type, si=si_s, shnum=shnum, reason=reason,
                 level=log.SCARY, umid="SGx2fA")
         return None
-