From 63f181050d8b87addb718e07e4f6976fd7729404 Mon Sep 17 00:00:00 2001
From: Daira Hopwood <daira@jacaranda.org>
Date: Wed, 12 Dec 2012 07:01:59 +0000
Subject: [PATCH] Changes to crawler classes (ShareCrawler and
 AccountingCrawler). Pass in a Clock to allow (in theory) deterministic
 testing, although this isn't used yet by tests. Simplify the generic
 ShareCrawler code by not attempting to track state during processing of a
 single prefix.

Signed-off-by: David-Sarah Hopwood <david-sarah@jacaranda.org>
---
 src/allmydata/storage/accounting_crawler.py | 219 ++++++++-------
 src/allmydata/storage/crawler.py            | 282 ++++++++------------
 2 files changed, 231 insertions(+), 270 deletions(-)

diff --git a/src/allmydata/storage/accounting_crawler.py b/src/allmydata/storage/accounting_crawler.py
index 493dc54b..1adc9dba 100644
--- a/src/allmydata/storage/accounting_crawler.py
+++ b/src/allmydata/storage/accounting_crawler.py
@@ -1,14 +1,13 @@
 
-import os, time
+import time
 
 from twisted.internet import defer
 
 from allmydata.util.deferredutil import for_items
-from allmydata.util.fileutil import get_used_space
 from allmydata.util import log
 from allmydata.storage.crawler import ShareCrawler
 from allmydata.storage.common import si_a2b
-from allmydata.storage.leasedb import SHARETYPES, SHARETYPE_UNKNOWN
+from allmydata.storage.leasedb import SHARETYPES, SHARETYPE_UNKNOWN, SHARETYPE_CORRUPTED
 
 
 class AccountingCrawler(ShareCrawler):
@@ -22,116 +21,134 @@ class AccountingCrawler(ShareCrawler):
     - Recover from a situation where the leasedb is lost or detectably
       corrupted. This is handled in the same way as upgrading.
     - Detect shares that have unexpectedly disappeared from storage.
+
+    See ticket #1834 for a proposal to greatly reduce the scope of what I am
+    responsible for, and the times when I might do work.
     """
 
     slow_start = 600 # don't start crawling for 10 minutes after startup
     minimum_cycle_time = 12*60*60 # not more than twice per day
 
-    def __init__(self, server, statefile, leasedb):
-        ShareCrawler.__init__(self, server, statefile)
+    def __init__(self, backend, statefile, leasedb, clock=None):
+        ShareCrawler.__init__(self, backend, statefile, clock=clock)
         self._leasedb = leasedb
 
-    def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
-        # assume that we can list every prefixdir in this prefix quickly.
-        # Otherwise we have to retain more state between timeslices.
-
-        # we define "shareid" as (SI string, shnum)
-        disk_shares = set() # shareid
-        for si_s in buckets:
-            bucketdir = os.path.join(prefixdir, si_s)
-            for sharefile in os.listdir(bucketdir):
-                try:
-                    shnum = int(sharefile)
-                except ValueError:
-                    continue # non-numeric means not a sharefile
-                shareid = (si_s, shnum)
-                disk_shares.add(shareid)
-
-        # now check the database for everything in this prefix
-        db_sharemap = self._leasedb.get_shares_for_prefix(prefix)
-        db_shares = set(db_sharemap)
-
-        rec = self.state["cycle-to-date"]["space-recovered"]
-        examined_sharesets = [set() for st in xrange(len(SHARETYPES))]
-
-        # The lease crawler used to calculate the lease age histogram while
-        # crawling shares, and tests currently rely on that, but it would be
-        # more efficient to maintain the histogram as leases are added,
-        # updated, and removed.
-        for key, value in db_sharemap.iteritems():
-            (si_s, shnum) = key
-            (used_space, sharetype) = value
-
-            examined_sharesets[sharetype].add(si_s)
-
-            for age in self._leasedb.get_lease_ages(si_a2b(si_s), shnum, start_slice):
-                self.add_lease_age_to_histogram(age)
-
-            self.increment(rec, "examined-shares", 1)
-            self.increment(rec, "examined-sharebytes", used_space)
-            self.increment(rec, "examined-shares-" + SHARETYPES[sharetype], 1)
-            self.increment(rec, "examined-sharebytes-" + SHARETYPES[sharetype], used_space)
-
-        self.increment(rec, "examined-buckets", sum([len(s) for s in examined_sharesets]))
-        for st in SHARETYPES:
-            self.increment(rec, "examined-buckets-" + SHARETYPES[st], len(examined_sharesets[st]))
-
-        # add new shares to the DB
-        new_shares = disk_shares - db_shares
-        for (si_s, shnum) in new_shares:
-            sharefile = os.path.join(prefixdir, si_s, str(shnum))
-            used_space = get_used_space(sharefile)
-            # FIXME
-            sharetype = SHARETYPE_UNKNOWN
-            self._leasedb.add_new_share(si_a2b(si_s), shnum, used_space, sharetype)
-            self._leasedb.add_starter_lease(si_s, shnum)
-
-        # remove disappeared shares from DB
-        disappeared_shares = db_shares - disk_shares
-        for (si_s, shnum) in disappeared_shares:
-            log.msg(format="share SI=%(si_s)s shnum=%(shnum)s unexpectedly disappeared",
-                    si_s=si_s, shnum=shnum, level=log.WEIRD)
-            self._leasedb.remove_deleted_share(si_a2b(si_s), shnum)
-
-        recovered_sharesets = [set() for st in xrange(len(SHARETYPES))]
-
-        def _delete_share(ign, key, value):
-            (si_s, shnum) = key
-            (used_space, sharetype) = value
-            storage_index = si_a2b(si_s)
+    def process_prefix(self, cycle, prefix, start_slice):
+        # Assume that we can list every prefixdir in this prefix quickly.
+        # Otherwise we would have to retain more state between timeslices.
+
+        d = self.backend.get_sharesets_for_prefix(prefix)
+        def _got_sharesets(sharesets):
+            stored_sharemap = {}  # (SI string, shnum) -> (used_space, sharetype)
             d2 = defer.succeed(None)
-            def _mark_and_delete(ign):
-                self._leasedb.mark_share_as_going(storage_index, shnum)
-                return self.server.delete_share(storage_index, shnum)
-            d2.addCallback(_mark_and_delete)
-            def _deleted(ign):
-                self._leasedb.remove_deleted_share(storage_index, shnum)
-
-                recovered_sharesets[sharetype].add(si_s)
-
-                self.increment(rec, "actual-shares", 1)
-                self.increment(rec, "actual-sharebytes", used_space)
-                self.increment(rec, "actual-shares-" + SHARETYPES[sharetype], 1)
-                self.increment(rec, "actual-sharebytes-" + SHARETYPES[sharetype], used_space)
-            def _not_deleted(f):
-                log.err(format="accounting crawler could not delete share SI=%(si_s)s shnum=%(shnum)s",
-                        si_s=si_s, shnum=shnum, failure=f, level=log.WEIRD)
-                try:
-                    self._leasedb.mark_share_as_stable(storage_index, shnum)
-                except Exception, e:
-                    log.err(e)
-                # discard the failure
-            d2.addCallbacks(_deleted, _not_deleted)
+            for shareset in sharesets:
+                d2.addCallback(lambda ign, shareset=shareset: shareset.get_shares())
+                def _got_some_shares( (valid, corrupted) ):
+                    for share in valid:
+                        shareid = (share.get_storage_index_string(), share.get_shnum())
+                        sharetype = SHARETYPE_UNKNOWN  # FIXME
+                        stored_sharemap[shareid] = (share.get_used_space(), sharetype)
+
+                    for share in corrupted:
+                        shareid = (share.get_storage_index_string(), share.get_shnum())
+                        sharetype = SHARETYPE_CORRUPTED
+                        stored_sharemap[shareid] = (share.get_used_space(), sharetype)
+
+                d2.addCallback(_got_some_shares)
+
+            d2.addCallback(lambda ign: stored_sharemap)
             return d2
+        d.addCallback(_got_sharesets)
+
+        def _got_stored_sharemap(stored_sharemap):
+            # now check the database for everything in this prefix
+            db_sharemap = self._leasedb.get_shares_for_prefix(prefix)
+
+            rec = self.state["cycle-to-date"]["space-recovered"]
+            examined_sharesets = [set() for st in xrange(len(SHARETYPES))]
+
+            # The lease crawler used to calculate the lease age histogram while
+            # crawling shares, and tests currently rely on that, but it would be
+            # more efficient to maintain the histogram as leases are added,
+            # updated, and removed.
+            for key, value in db_sharemap.iteritems():
+                (si_s, shnum) = key
+                (used_space, sharetype) = value
 
-        unleased_sharemap = self._leasedb.get_unleased_shares_for_prefix(prefix)
-        d = for_items(_delete_share, unleased_sharemap)
+                examined_sharesets[sharetype].add(si_s)
 
-        def _inc_recovered_sharesets(ign):
-            self.increment(rec, "actual-buckets", sum([len(s) for s in recovered_sharesets]))
+                for age in self._leasedb.get_lease_ages(si_a2b(si_s), shnum, start_slice):
+                    self.add_lease_age_to_histogram(age)
+
+                self.increment(rec, "examined-shares", 1)
+                self.increment(rec, "examined-sharebytes", used_space)
+                self.increment(rec, "examined-shares-" + SHARETYPES[sharetype], 1)
+                self.increment(rec, "examined-sharebytes-" + SHARETYPES[sharetype], used_space)
+
+            self.increment(rec, "examined-buckets", sum([len(s) for s in examined_sharesets]))
             for st in SHARETYPES:
-                self.increment(rec, "actual-buckets-" + SHARETYPES[st], len(recovered_sharesets[st]))
-        d.addCallback(_inc_recovered_sharesets)
+                self.increment(rec, "examined-buckets-" + SHARETYPES[st], len(examined_sharesets[st]))
+
+            stored_shares = set(stored_sharemap)
+            db_shares = set(db_sharemap)
+
+            # add new shares to the DB
+            new_shares = stored_shares - db_shares
+            for shareid in new_shares:
+                (si_s, shnum) = shareid
+                (used_space, sharetype) = stored_sharemap[shareid]
+
+                self._leasedb.add_new_share(si_a2b(si_s), shnum, used_space, sharetype)
+                self._leasedb.add_starter_lease(si_s, shnum)
+
+            # remove disappeared shares from DB
+            disappeared_shares = db_shares - stored_shares
+            for (si_s, shnum) in disappeared_shares:
+                log.msg(format="share SI=%(si_s)s shnum=%(shnum)s unexpectedly disappeared",
+                        si_s=si_s, shnum=shnum, level=log.WEIRD)
+                self._leasedb.remove_deleted_share(si_a2b(si_s), shnum)
+
+            recovered_sharesets = [set() for st in xrange(len(SHARETYPES))]
+
+            def _delete_share(ign, key, value):
+                (si_s, shnum) = key
+                (used_space, sharetype) = value
+                storage_index = si_a2b(si_s)
+                d3 = defer.succeed(None)
+                def _mark_and_delete(ign):
+                    self._leasedb.mark_share_as_going(storage_index, shnum)
+                    return self.backend.get_shareset(storage_index).delete_share(shnum)
+                d3.addCallback(_mark_and_delete)
+                def _deleted(ign):
+                    self._leasedb.remove_deleted_share(storage_index, shnum)
+
+                    recovered_sharesets[sharetype].add(si_s)
+
+                    self.increment(rec, "actual-shares", 1)
+                    self.increment(rec, "actual-sharebytes", used_space)
+                    self.increment(rec, "actual-shares-" + SHARETYPES[sharetype], 1)
+                    self.increment(rec, "actual-sharebytes-" + SHARETYPES[sharetype], used_space)
+                def _not_deleted(f):
+                    log.err(format="accounting crawler could not delete share SI=%(si_s)s shnum=%(shnum)s",
+                            si_s=si_s, shnum=shnum, failure=f, level=log.WEIRD)
+                    try:
+                        self._leasedb.mark_share_as_stable(storage_index, shnum)
+                    except Exception, e:
+                        log.err(e)
+                    # discard the failure
+                d3.addCallbacks(_deleted, _not_deleted)
+                return d3
+
+            unleased_sharemap = self._leasedb.get_unleased_shares_for_prefix(prefix)
+            d2 = for_items(_delete_share, unleased_sharemap)
+
+            def _inc_recovered_sharesets(ign):
+                self.increment(rec, "actual-buckets", sum([len(s) for s in recovered_sharesets]))
+                for st in SHARETYPES:
+                    self.increment(rec, "actual-buckets-" + SHARETYPES[st], len(recovered_sharesets[st]))
+            d2.addCallback(_inc_recovered_sharesets)
+            return d2
+        d.addCallback(_got_stored_sharemap)
         return d
 
     # these methods are for outside callers to use
diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py
index 571734cf..7659edbe 100644
--- a/src/allmydata/storage/crawler.py
+++ b/src/allmydata/storage/crawler.py
@@ -1,12 +1,15 @@
 
-import os, time, struct
+import time, struct
 import cPickle as pickle
 
 from twisted.internet import defer, reactor
 from twisted.application import service
 
+from allmydata.interfaces import IStorageBackend
+
 from allmydata.storage.common import si_b2a
 from allmydata.util import fileutil
+from allmydata.util.assertutil import precondition
 from allmydata.util.deferredutil import HookMixin, async_iterate
 
 
@@ -15,11 +18,12 @@ class TimeSliceExceeded(Exception):
 
 
 class ShareCrawler(HookMixin, service.MultiService):
-    """A ShareCrawler subclass is attached to a StorageServer, and
-    periodically walks all of its shares, processing each one in some
-    fashion. This crawl is rate-limited, to reduce the IO burden on the host,
-    since large servers can easily have a terabyte of shares, in several
-    million files, which can take hours or days to read.
+    """
+    An instance of a subclass of ShareCrawler is attached to a storage
+    backend, and periodically walks the backend's shares, processing them
+    in some fashion. This crawl is rate-limited to reduce the I/O burden on
+    the host, since large servers can easily have a terabyte of shares in
+    several million files, which can take hours or days to read.
 
     Once the crawler starts a cycle, it will proceed at a rate limited by the
     allowed_cpu_proportion= and cpu_slice= parameters: yielding the reactor
@@ -30,38 +34,47 @@ class ShareCrawler(HookMixin, service.MultiService):
     long enough to ensure that 'minimum_cycle_time' elapses between the start
     of two consecutive cycles.
 
-    We assume that the normal upload/download/get_buckets traffic of a tahoe
+    We assume that the normal upload/download/DYHB traffic of a Tahoe-LAFS
     grid will cause the prefixdir contents to be mostly cached in the kernel,
-    or that the number of buckets in each prefixdir will be small enough to
-    load quickly. A 1TB allmydata.com server was measured to have 2.56M
-    buckets, spread into the 1024 prefixdirs, with about 2500 buckets per
-    prefix. On this server, each prefixdir took 130ms-200ms to list the first
+    or that the number of sharesets in each prefixdir will be small enough to
+    load quickly. A 1TB allmydata.com server was measured to have 2.56 million
+    sharesets, spread into the 1024 prefixes, with about 2500 sharesets per
+    prefix. On this server, each prefix took 130ms-200ms to list the first
     time, and 17ms to list the second time.
 
-    To use a crawler, create a subclass which implements the process_bucket()
-    method. It will be called with a prefixdir and a base32 storage index
-    string. process_bucket() must run synchronously. Any keys added to
-    self.state will be preserved. Override add_initial_state() to set up
-    initial state keys. Override finished_cycle() to perform additional
-    processing when the cycle is complete. Any status that the crawler
-    produces should be put in the self.state dictionary. Status renderers
-    (like a web page which describes the accomplishments of your crawler)
-    will use crawler.get_state() to retrieve this dictionary; they can
-    present the contents as they see fit.
-
-    Then create an instance, with a reference to a StorageServer and a
-    filename where it can store persistent state. The statefile is used to
-    keep track of how far around the ring the process has travelled, as well
-    as timing history to allow the pace to be predicted and controlled. The
-    statefile will be updated and written to disk after each time slice (just
-    before the crawler yields to the reactor), and also after each cycle is
-    finished, and also when stopService() is called. Note that this means
-    that a crawler which is interrupted with SIGKILL while it is in the
-    middle of a time slice will lose progress: the next time the node is
-    started, the crawler will repeat some unknown amount of work.
+    To implement a crawler, create a subclass that implements the
+    process_prefix() method. This method may be asynchronous. It will be
+    called with a string prefix. Any keys that it adds to self.state will be
+    preserved. Override add_initial_state() to set up initial state keys.
+    Override finished_cycle() to perform additional processing when the cycle
+    is complete. Any status that the crawler produces should be put in the
+    self.state dictionary. Status renderers (like a web page describing the
+    accomplishments of your crawler) will use crawler.get_state() to retrieve
+    this dictionary; they can present the contents as they see fit.
+
+    Then create an instance, with a reference to a backend object providing
+    the IStorageBackend interface, and a filename where it can store
+    persistent state. The statefile is used to keep track of how far around
+    the ring the process has travelled, as well as timing history to allow
+    the pace to be predicted and controlled. The statefile will be updated
+    and written to disk after each time slice (just before the crawler yields
+    to the reactor), and also after each cycle is finished, and also when
+    stopService() is called. Note that this means that a crawler that is
+    interrupted with SIGKILL while it is in the middle of a time slice will
+    lose progress: the next time the node is started, the crawler will repeat
+    some unknown amount of work.
 
     The crawler instance must be started with startService() before it will
-    do any work. To make it stop doing work, call stopService().
+    do any work. To make it stop doing work, call stopService(). A crawler
+    is usually a child service of a StorageServer, although it should not
+    depend on that.
+
+    For historical reasons, some dictionary key names use the term "bucket"
+    for what is now preferably called a "shareset" (the set of shares that a
+    server holds under a given storage index).
+
+    Subclasses should measure time using self.clock.seconds(), rather than
+    time.time(), in order to make themselves deterministically testable.
     """
 
     slow_start = 300 # don't start crawling for 5 minutes after startup
@@ -70,18 +83,18 @@ class ShareCrawler(HookMixin, service.MultiService):
     cpu_slice = 1.0 # use up to 1.0 seconds before yielding
     minimum_cycle_time = 300 # don't run a cycle faster than this
 
-    def __init__(self, server, statefile, allowed_cpu_proportion=None):
+    def __init__(self, backend, statefile, allowed_cpu_proportion=None, clock=None):
+        precondition(IStorageBackend.providedBy(backend), backend)
         service.MultiService.__init__(self)
+        self.backend = backend
+        self.statefile = statefile
         if allowed_cpu_proportion is not None:
             self.allowed_cpu_proportion = allowed_cpu_proportion
-        self.server = server
-        self.sharedir = server.sharedir
-        self.statefile = statefile
+        self.clock = clock or reactor
         self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2]
                          for i in range(2**10)]
         self.prefixes.sort()
         self.timer = None
-        self.bucket_cache = (None, [])
         self.current_sleep_time = None
         self.next_wake_time = None
         self.last_prefix_finished_time = None
@@ -117,9 +130,9 @@ class ShareCrawler(HookMixin, service.MultiService):
          remaining-sleep-time: float, seconds from now when we do more work
          estimated-cycle-complete-time-left:
                 float, seconds remaining until the current cycle is finished.
-                TODO: this does not yet include the remaining time left in
-                the current prefixdir, and it will be very inaccurate on fast
-                crawlers (which can process a whole prefix in a single tick)
+                This does not include the remaining time left in the current
+                prefix, and it will be very inaccurate on fast crawlers
+                (which can process a whole prefix in a single tick)
          estimated-time-per-cycle: float, seconds required to do a complete
                                    cycle
 
@@ -146,15 +159,13 @@ class ShareCrawler(HookMixin, service.MultiService):
             if self.last_prefix_elapsed_time is not None:
                 left = len(self.prefixes) - self.last_complete_prefix_index
                 remaining = left * self.last_prefix_elapsed_time
-                # TODO: remainder of this prefix: we need to estimate the
-                # per-bucket time, probably by measuring the time spent on
-                # this prefix so far, divided by the number of buckets we've
-                # processed.
+
             p["estimated-cycle-complete-time-left"] = remaining
             # it's possible to call get_progress() from inside a crawler's
             # finished_prefix() function
             p["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time,
-                                                           time.time())
+                                                           self.clock.seconds())
+
         per_cycle = None
         if self.last_cycle_elapsed_time is not None:
             per_cycle = self.last_cycle_elapsed_time
@@ -167,11 +178,6 @@ class ShareCrawler(HookMixin, service.MultiService):
         """I return the current state of the crawler. This is a copy of my
         state dictionary.
 
-        If we are not currently sleeping (i.e. get_state() was called from
-        inside the process_prefixdir, process_bucket, or finished_cycle()
-        methods, or if startService has not yet been called on this crawler),
-        these two keys will be None.
-
         Subclasses can override this to add computed keys to the return value,
         but don't forget to start with the upcall.
         """
@@ -179,10 +185,10 @@ class ShareCrawler(HookMixin, service.MultiService):
         return state
 
     def load_state(self):
-        # we use this to store state for both the crawler's internals and
+        # We use this to store state for both the crawler's internals and
         # anything the subclass-specific code needs. The state is stored
-        # after each bucket is processed, after each prefixdir is processed,
-        # and after a cycle is complete. The internal keys we use are:
+        # after each prefix is processed, and after a cycle is complete.
+        # The internal keys we use are:
         #  ["version"]: int, always 1
         #  ["last-cycle-finished"]: int, or None if we have not yet finished
         #                           any cycle
@@ -195,21 +201,18 @@ class ShareCrawler(HookMixin, service.MultiService):
         #                            are sleeping between cycles, or if we
         #                            have not yet finished any prefixdir since
         #                            a cycle was started
-        #  ["last-complete-bucket"]: str, base32 storage index bucket name
-        #                            of the last bucket to be processed, or
-        #                            None if we are sleeping between cycles
         try:
-            f = open(self.statefile, "rb")
-            state = pickle.load(f)
-            f.close()
+            pickled = fileutil.read(self.statefile)
         except Exception:
             state = {"version": 1,
                      "last-cycle-finished": None,
                      "current-cycle": None,
                      "last-complete-prefix": None,
-                     "last-complete-bucket": None,
                      }
-        state.setdefault("current-cycle-start-time", time.time()) # approximate
+        else:
+            state = pickle.loads(pickled)
+
+        state.setdefault("current-cycle-start-time", self.clock.seconds()) # approximate
         self.state = state
         lcp = state["last-complete-prefix"]
         if lcp == None:
@@ -237,19 +240,16 @@ class ShareCrawler(HookMixin, service.MultiService):
         else:
             last_complete_prefix = self.prefixes[lcpi]
         self.state["last-complete-prefix"] = last_complete_prefix
-        tmpfile = self.statefile + ".tmp"
-        f = open(tmpfile, "wb")
-        pickle.dump(self.state, f)
-        f.close()
-        fileutil.move_into_place(tmpfile, self.statefile)
+        pickled = pickle.dumps(self.state)
+        fileutil.write(self.statefile, pickled)
 
     def startService(self):
         # arrange things to look like we were just sleeping, so
         # status/progress values work correctly
         self.sleeping_between_cycles = True
         self.current_sleep_time = self.slow_start
-        self.next_wake_time = time.time() + self.slow_start
-        self.timer = reactor.callLater(self.slow_start, self.start_slice)
+        self.next_wake_time = self.clock.seconds() + self.slow_start
+        self.timer = self.clock.callLater(self.slow_start, self.start_slice)
         service.MultiService.startService(self)
 
     def stopService(self):
@@ -260,7 +260,7 @@ class ShareCrawler(HookMixin, service.MultiService):
         return service.MultiService.stopService(self)
 
     def start_slice(self):
-        start_slice = time.time()
+        start_slice = self.clock.seconds()
         self.timer = None
         self.sleeping_between_cycles = False
         self.current_sleep_time = None
@@ -278,18 +278,22 @@ class ShareCrawler(HookMixin, service.MultiService):
             if not self.running:
                 # someone might have used stopService() to shut us down
                 return
-            # either we finished a whole cycle, or we ran out of time
-            now = time.time()
+
+            # Either we finished a whole cycle, or we ran out of time.
+            now = self.clock.seconds()
             this_slice = now - start_slice
+
             # this_slice/(this_slice+sleep_time) = percentage
             # this_slice/percentage = this_slice+sleep_time
             # sleep_time = (this_slice/percentage) - this_slice
             sleep_time = (this_slice / self.allowed_cpu_proportion) - this_slice
-            # if the math gets weird, or a timequake happens, don't sleep
+
+            # If the math gets weird, or a timequake happens, don't sleep
             # forever. Note that this means that, while a cycle is running, we
-            # will process at least one bucket every 5 minutes, no matter how
-            # long that bucket takes.
+            # will process at least one prefix every 5 minutes, provided prefixes
+            # do not take more than 5 minutes to process.
             sleep_time = max(0.0, min(sleep_time, 299))
+
             if finished_cycle:
                 # how long should we sleep between cycles? Don't run faster than
                 # allowed_cpu_proportion says, but also run faster than
@@ -298,10 +302,11 @@ class ShareCrawler(HookMixin, service.MultiService):
                 sleep_time = max(sleep_time, self.minimum_cycle_time)
             else:
                 self.sleeping_between_cycles = False
+
             self.current_sleep_time = sleep_time # for status page
             self.next_wake_time = now + sleep_time
             self.yielding(sleep_time)
-            self.timer = reactor.callLater(sleep_time, self.start_slice)
+            self.timer = self.clock.callLater(sleep_time, self.start_slice)
         d.addCallback(_done)
         d.addBoth(self._call_hook, 'yield')
         return d
@@ -345,19 +350,7 @@ class ShareCrawler(HookMixin, service.MultiService):
 
     def _do_prefix(self, cycle, i, start_slice):
         prefix = self.prefixes[i]
-        prefixdir = os.path.join(self.sharedir, prefix)
-        if i == self.bucket_cache[0]:
-            buckets = self.bucket_cache[1]
-        else:
-            try:
-                buckets = os.listdir(prefixdir)
-                buckets.sort()
-            except EnvironmentError:
-                buckets = []
-            self.bucket_cache = (i, buckets)
-
-        d = defer.maybeDeferred(self.process_prefixdir,
-                                cycle, prefix, prefixdir, buckets, start_slice)
+        d = defer.maybeDeferred(self.process_prefix, cycle, prefix, start_slice)
         def _done(ign):
             self.last_complete_prefix_index = i
 
@@ -376,27 +369,11 @@ class ShareCrawler(HookMixin, service.MultiService):
         d.addCallback(_done)
         return d
 
-    def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
-        """This gets a list of bucket names (i.e. storage index strings,
-        base32-encoded) in sorted order.
-        FIXME: it would probably make more sense for the storage indices
-        to be binary.
-
-        You can override this if your crawler doesn't care about the actual
-        shares, for example a crawler which merely keeps track of how many
-        buckets are being managed by this server.
-
-        Subclasses which *do* care about actual bucket should leave this
-        method along, and implement process_bucket() instead.
+    def process_prefix(self, cycle, prefix, start_slice):
         """
-
-        for bucket in buckets:
-            if bucket <= self.state["last-complete-bucket"]:
-                continue
-            self.process_bucket(cycle, prefix, prefixdir, bucket)
-            self.state["last-complete-bucket"] = bucket
-            if time.time() >= start_slice + self.cpu_slice:
-                raise TimeSliceExceeded()
+        Called for each prefix.
+        """
+        return defer.succeed(None)
 
     # the remaining methods are explictly for subclasses to implement.
 
@@ -407,29 +384,6 @@ class ShareCrawler(HookMixin, service.MultiService):
         """
         pass
 
-    def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
-        """Examine a single bucket. Subclasses should do whatever they want
-        to do to the shares therein, then update self.state as necessary.
-
-        If the crawler is never interrupted by SIGKILL, this method will be
-        called exactly once per share (per cycle). If it *is* interrupted,
-        then the next time the node is started, some amount of work will be
-        duplicated, according to when self.save_state() was last called. By
-        default, save_state() is called at the end of each timeslice, and
-        after finished_cycle() returns, and when stopService() is called.
-
-        To reduce the chance of duplicate work (i.e. to avoid adding multiple
-        records to a database), you can call save_state() at the end of your
-        process_bucket() method. This will reduce the maximum duplicated work
-        to one bucket per SIGKILL. It will also add overhead, probably 1-20ms
-        per bucket (and some disk writes), which will count against your
-        allowed_cpu_proportion, and which may be considerable if
-        process_bucket() runs quickly.
-
-        This method is for subclasses to override. No upcall is necessary.
-        """
-        pass
-
     def finished_prefix(self, cycle, prefix):
         """Notify a subclass that the crawler has just finished processing a
         prefix directory (all buckets with the same two-character/10bit
@@ -443,8 +397,9 @@ class ShareCrawler(HookMixin, service.MultiService):
         pass
 
     def finished_cycle(self, cycle):
-        """Notify subclass that a cycle (one complete traversal of all
-        prefixdirs) has just finished. 'cycle' is the number of the cycle
+        """
+        Notify subclass that a cycle (one complete traversal of all
+        prefixes) has just finished. 'cycle' is the number of the cycle
         that just finished. This method should perform summary work and
         update self.state to publish information to status displays.
 
@@ -460,7 +415,8 @@ class ShareCrawler(HookMixin, service.MultiService):
         pass
 
     def yielding(self, sleep_time):
-        """The crawler is about to sleep for 'sleep_time' seconds. This
+        """
+        The crawler is about to sleep for 'sleep_time' seconds. This
         method is mostly for the convenience of unit tests.
 
         This method is for subclasses to override. No upcall is necessary.
@@ -469,56 +425,44 @@ class ShareCrawler(HookMixin, service.MultiService):
 
 
 class BucketCountingCrawler(ShareCrawler):
-    """I keep track of how many buckets are being managed by this server.
-    This is equivalent to the number of distributed files and directories for
-    which I am providing storage. The actual number of files+directories in
-    the full grid is probably higher (especially when there are more servers
-    than 'N', the number of generated shares), because some files+directories
-    will have shares on other servers instead of me. Also note that the
-    number of buckets will differ from the number of shares in small grids,
-    when more than one share is placed on a single server.
+    """
+    I keep track of how many sharesets, each corresponding to a storage index,
+    are being managed by this server. This is equivalent to the number of
+    distributed files and directories for which I am providing storage. The
+    actual number of files and directories in the full grid is probably higher
+    (especially when there are more servers than 'N', the number of generated
+    shares), because some files and directories will have shares on other
+    servers instead of me. Also note that the number of sharesets will differ
+    from the number of shares in small grids, when more than one share is
+    placed on a single server.
     """
 
     minimum_cycle_time = 60*60 # we don't need this more than once an hour
 
-    def __init__(self, server, statefile, num_sample_prefixes=1):
-        ShareCrawler.__init__(self, server, statefile)
-        self.num_sample_prefixes = num_sample_prefixes
-
     def add_initial_state(self):
         # ["bucket-counts"][cyclenum][prefix] = number
         # ["last-complete-cycle"] = cyclenum # maintained by base class
         # ["last-complete-bucket-count"] = number
-        # ["storage-index-samples"][prefix] = (cyclenum,
-        #                                      list of SI strings (base32))
         self.state.setdefault("bucket-counts", {})
         self.state.setdefault("last-complete-bucket-count", None)
-        self.state.setdefault("storage-index-samples", {})
-
-    def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
-        # we override process_prefixdir() because we don't want to look at
-        # the individual buckets. We'll save state after each one. On my
-        # laptop, a mostly-empty storage server can process about 70
-        # prefixdirs in a 1.0s slice.
-        if cycle not in self.state["bucket-counts"]:
-            self.state["bucket-counts"][cycle] = {}
-        self.state["bucket-counts"][cycle][prefix] = len(buckets)
-        if prefix in self.prefixes[:self.num_sample_prefixes]:
-            self.state["storage-index-samples"][prefix] = (cycle, buckets)
+
+    def process_prefix(self, cycle, prefix, start_slice):
+        # We don't need to look at the individual sharesets.
+        d = self.backend.get_sharesets_for_prefix(prefix)
+        def _got_sharesets(sharesets):
+            if cycle not in self.state["bucket-counts"]:
+                self.state["bucket-counts"][cycle] = {}
+            self.state["bucket-counts"][cycle][prefix] = len(sharesets)
+        d.addCallback(_got_sharesets)
+        return d
 
     def finished_cycle(self, cycle):
         last_counts = self.state["bucket-counts"].get(cycle, [])
         if len(last_counts) == len(self.prefixes):
             # great, we have a whole cycle.
-            num_buckets = sum(last_counts.values())
-            self.state["last-complete-bucket-count"] = num_buckets
+            num_sharesets = sum(last_counts.values())
+            self.state["last-complete-bucket-count"] = num_sharesets
             # get rid of old counts
             for old_cycle in list(self.state["bucket-counts"].keys()):
                 if old_cycle != cycle:
                     del self.state["bucket-counts"][old_cycle]
-        # get rid of old samples too
-        for prefix in list(self.state["storage-index-samples"].keys()):
-            old_cycle,buckets = self.state["storage-index-samples"][prefix]
-            if old_cycle != cycle:
-                del self.state["storage-index-samples"][prefix]
-
-- 
2.45.2