From ef4ff21ae7489cc57945434582a523fe3afa8e4c Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Thu, 19 Feb 2009 19:31:42 -0700
Subject: [PATCH] crawler: modify API to support upcoming bucket-counting
 crawler

---
 src/allmydata/storage/crawler.py   | 155 ++++++++++++++++++++++-------
 src/allmydata/test/test_crawler.py |  32 +++---
 2 files changed, 137 insertions(+), 50 deletions(-)

diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py
index e4f8622c..9c80e867 100644
--- a/src/allmydata/storage/crawler.py
+++ b/src/allmydata/storage/crawler.py
@@ -1,5 +1,6 @@
 
-import os, time, struct, pickle
+import os, time, struct
+import cPickle as pickle
 from twisted.internet import reactor
 from twisted.application import service
 from allmydata.storage.server import si_b2a
@@ -25,7 +26,10 @@ class ShareCrawler(service.MultiService):
 
     To use this, create a subclass which implements the process_bucket()
     method. It will be called with a prefixdir and a base32 storage index
-    string. process_bucket() should run synchronously.
+    string. process_bucket() should run synchronously. Any keys added to
+    self.state will be preserved. Override add_initial_state() to set up
+    initial state keys. Override finished_cycle() to perform additional
+    processing when the cycle is complete.
 
     Then create an instance, with a reference to a StorageServer and a
     filename where it can store persistent state. The statefile is used to
@@ -39,15 +43,15 @@ class ShareCrawler(service.MultiService):
     the Deferred that it returns.
     """
 
-    # use up to 10% of the CPU, on average. This can be changed at any time.
-    allowed_cpu_percentage = .10
-    # use up to 1.0 seconds before yielding. This can be changed at any time.
-    cpu_slice = 1.0
-    # don't run a cycle faster than this
-    minimum_cycle_time = 300
+    # all three of these can be changed at any time
+    allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average
+    cpu_slice = 1.0 # use up to 1.0 seconds before yielding
+    minimum_cycle_time = 300 # don't run a cycle faster than this
 
-    def __init__(self, server, statefile):
+    def __init__(self, server, statefile, allowed_cpu_percentage=None):
         service.MultiService.__init__(self)
+        if allowed_cpu_percentage is not None:
+            self.allowed_cpu_percentage = allowed_cpu_percentage
         self.server = server
         self.sharedir = server.sharedir
         self.statefile = statefile
@@ -56,24 +60,73 @@ class ShareCrawler(service.MultiService):
         self.prefixes.sort()
         self.timer = None
         self.bucket_cache = (None, [])
-        self.first_cycle_finished = False
+        self.current_sleep_time = None
+        self.next_wake_time = None
+
+    def get_state(self):
+        """I return the current state of the crawler. This is a copy of my
+        state dictionary, plus the following keys::
+
+         current-sleep-time: float, duration of our current sleep
+         next-wake-time: float, seconds-since-epoch of when we will next wake
+
+        If we are not currently sleeping (i.e. get_state() was called from
+        inside the process_prefixdir, process_bucket, or finished_cycle()
+        methods, or if startService has not yet been called on this crawler),
+        these two keys will be None.
+        """
+        state = self.state.copy() # it isn't a deepcopy, so don't go crazy
+        state["current-sleep-time"] = self.current_sleep_time
+        state["next-wake-time"] = self.next_wake_time
+        return state
 
     def load_state(self):
+        # we use this to store state for both the crawler's internals and
+        # anything the subclass-specific code needs. The state is stored
+        # after each bucket is processed, after each prefixdir is processed,
+        # and after a cycle is complete. The internal keys we use are:
+        #  ["version"]: int, always 1
+        #  ["last-cycle-finished"]: int, or None if we have not yet finished
+        #                           any cycle
+        #  ["current-cycle"]: int, or None if we are sleeping between cycles
+        #  ["last-complete-prefix"]: str, two-letter name of the last prefixdir
+        #                            that was fully processed, or None if we
+        #                            are sleeping between cycles, or if we
+        #                            have not yet finished any prefixdir since
+        #                            a cycle was started
+        #  ["last-complete-bucket"]: str, base32 storage index bucket name
+        #                            of the last bucket to be processed, or
+        #                            None if we are sleeping between cycles
         try:
             f = open(self.statefile, "rb")
             state = pickle.load(f)
-            lcp = state["last-complete-prefix"]
-            if lcp == None:
-                self.last_complete_prefix_index = -1
-            else:
-                self.last_complete_prefix_index = self.prefixes.index(lcp)
-            self.last_complete_bucket = state["last-complete-bucket"]
-            self.first_cycle_finished = state["first-cycle-finished"]
             f.close()
         except EnvironmentError:
+            state = {"version": 1,
+                     "last-cycle-finished": None,
+                     "current-cycle": 0,
+                     "last-complete-prefix": None,
+                     "last-complete-bucket": None,
+                     }
+        self.state = state
+        lcp = state["last-complete-prefix"]
+        if lcp == None:
             self.last_complete_prefix_index = -1
-            self.last_complete_bucket = None
-            self.first_cycle_finished = False
+        else:
+            self.last_complete_prefix_index = self.prefixes.index(lcp)
+        self.add_initial_state()
+
+    def add_initial_state(self):
+        """Hook method to add extra keys to self.state when first loaded.
+
+        The first time this Crawler is used, or when the code has been
+        upgraded, the saved state file may not contain all the keys you
+        expect. Use this method to add any missing keys. Simply modify
+        self.state as needed.
+
+        This method for subclasses to override. No upcall is necessary.
+        """
+        pass
 
     def save_state(self):
         lcpi = self.last_complete_prefix_index
@@ -81,14 +134,10 @@ class ShareCrawler(service.MultiService):
             last_complete_prefix = None
         else:
             last_complete_prefix = self.prefixes[lcpi]
-        state = {"version": 1,
-                 "last-complete-prefix": last_complete_prefix,
-                 "last-complete-bucket": self.last_complete_bucket,
-                 "first-cycle-finished": self.first_cycle_finished,
-                 }
+        self.state["last-complete-prefix"] = last_complete_prefix
         tmpfile = self.statefile + ".tmp"
         f = open(tmpfile, "wb")
-        pickle.dump(state, f)
+        pickle.dump(self.state, f)
         f.close()
         fileutil.move_into_place(tmpfile, self.statefile)
 
@@ -105,6 +154,8 @@ class ShareCrawler(service.MultiService):
 
     def start_slice(self):
         self.timer = None
+        self.current_sleep_time = None
+        self.next_wake_time = None
         start_slice = time.time()
         try:
             self.start_current_prefix(start_slice)
@@ -112,7 +163,8 @@ class ShareCrawler(service.MultiService):
         except TimeSliceExceeded:
             finished_cycle = False
         # either we finished a whole cycle, or we ran out of time
-        this_slice = time.time() - start_slice
+        now = time.time()
+        this_slice = now - start_slice
         # this_slice/(this_slice+sleep_time) = percentage
         # this_slice/percentage = this_slice+sleep_time
         # sleep_time = (this_slice/percentage) - this_slice
@@ -128,10 +180,16 @@ class ShareCrawler(service.MultiService):
         else:
             self.sleeping_between_cycles = False
         self.current_sleep_time = sleep_time # for status page
+        self.next_wake_time = now + sleep_time
         self.yielding(sleep_time)
         self.timer = reactor.callLater(sleep_time, self.start_slice)
 
     def start_current_prefix(self, start_slice):
+        if self.state["current-cycle"] is None:
+            assert self.state["last-cycle-finished"] is not None
+            self.state["current-cycle"] = self.state["last-cycle-finished"] + 1
+        cycle = self.state["current-cycle"]
+
         for i in range(self.last_complete_prefix_index+1, len(self.prefixes)):
             if time.time() > start_slice + self.cpu_slice:
                 raise TimeSliceExceeded()
@@ -147,17 +205,19 @@ class ShareCrawler(service.MultiService):
                 except EnvironmentError:
                     buckets = []
                 self.bucket_cache = (i, buckets)
-            self.process_prefixdir(prefixdir, buckets, start_slice)
+            self.process_prefixdir(cycle, prefix, prefixdir,
+                                   buckets, start_slice)
             self.last_complete_prefix_index = i
             self.save_state()
         # yay! we finished the whole cycle
         self.last_complete_prefix_index = -1
-        self.last_complete_bucket = None
-        self.first_cycle_finished = True
+        self.state["last-complete-bucket"] = None
+        self.state["last-cycle-finished"] = cycle
+        self.state["current-cycle"] = None
+        self.finished_cycle(cycle)
         self.save_state()
-        self.finished_cycle()
 
-    def process_prefixdir(self, prefixdir, buckets, start_slice):
+    def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
         """This gets a list of bucket names (i.e. storage index strings,
         base32-encoded) in sorted order.
 
@@ -166,20 +226,43 @@ class ShareCrawler(service.MultiService):
         are being managed by this server.
         """
         for bucket in buckets:
-            if bucket <= self.last_complete_bucket:
+            if bucket <= self.state["last-complete-bucket"]:
                 continue
             if time.time() > start_slice + self.cpu_slice:
                 raise TimeSliceExceeded()
-            self.process_bucket(prefixdir, bucket)
-            self.last_complete_bucket = bucket
+            self.process_bucket(cycle, prefix, prefixdir, bucket)
+            self.state["last-complete-bucket"] = bucket
             self.save_state()
 
-    def process_bucket(self, prefixdir, storage_index_b32):
+    def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
+        """Examine a single bucket. Subclasses should do whatever they want
+        to do to the shares therein, then update self.state as necessary.
+
+        This method will be called exactly once per share (per cycle), unless
+        the crawler was interrupted (by node restart, for example), in which
+        case it might be called a second time on a bucket which was processed
+        during the previous node's incarnation. However, in that case, no
+        changes to self.state will have been recorded.
+
+        This method for subclasses to override. No upcall is necessary.
+        """
         pass
 
-    def finished_cycle(self):
+    def finished_cycle(self, cycle):
+        """Notify subclass that a cycle (one complete traversal of all
+        prefixdirs) has just finished. 'cycle' is the number of the cycle
+        that just finished. This method should perform summary work and
+        update self.state to publish information to status displays.
+
+        This method for subclasses to override. No upcall is necessary.
+        """
         pass
 
     def yielding(self, sleep_time):
+        """The crawler is about to sleep for 'sleep_time' seconds. This
+        method is mostly for the convenience of unit tests.
+
+        This method for subclasses to override. No upcall is necessary.
+        """
         pass
 
diff --git a/src/allmydata/test/test_crawler.py b/src/allmydata/test/test_crawler.py
index 99456d1e..a5a0f17a 100644
--- a/src/allmydata/test/test_crawler.py
+++ b/src/allmydata/test/test_crawler.py
@@ -15,23 +15,23 @@ from common_util import StallMixin
 
 class BucketEnumeratingCrawler(ShareCrawler):
     cpu_slice = 500 # make sure it can complete in a single slice
-    def __init__(self, server, statefile):
-        ShareCrawler.__init__(self, server, statefile)
+    def __init__(self, *args, **kwargs):
+        ShareCrawler.__init__(self, *args, **kwargs)
         self.all_buckets = []
         self.finished_d = defer.Deferred()
-    def process_bucket(self, prefixdir, storage_index_b32):
+    def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
         self.all_buckets.append(storage_index_b32)
-    def finished_cycle(self):
+    def finished_cycle(self, cycle):
         eventually(self.finished_d.callback, None)
 
 class PacedCrawler(ShareCrawler):
     cpu_slice = 500 # make sure it can complete in a single slice
-    def __init__(self, server, statefile):
-        ShareCrawler.__init__(self, server, statefile)
+    def __init__(self, *args, **kwargs):
+        ShareCrawler.__init__(self, *args, **kwargs)
         self.countdown = 6
         self.all_buckets = []
         self.finished_d = defer.Deferred()
-    def process_bucket(self, prefixdir, storage_index_b32):
+    def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
         self.all_buckets.append(storage_index_b32)
         self.countdown -= 1
         if self.countdown == 0:
@@ -39,7 +39,7 @@ class PacedCrawler(ShareCrawler):
             self.cpu_slice = -1.0
     def yielding(self, sleep_time):
         self.cpu_slice = 500
-    def finished_cycle(self):
+    def finished_cycle(self, cycle):
         eventually(self.finished_d.callback, None)
 
 class ConsumingCrawler(ShareCrawler):
@@ -47,18 +47,18 @@ class ConsumingCrawler(ShareCrawler):
     allowed_cpu_percentage = 0.5
     minimum_cycle_time = 0
 
-    def __init__(self, server, statefile):
-        ShareCrawler.__init__(self, server, statefile)
+    def __init__(self, *args, **kwargs):
+        ShareCrawler.__init__(self, *args, **kwargs)
         self.accumulated = 0.0
         self.cycles = 0
         self.last_yield = 0.0
-    def process_bucket(self, prefixdir, storage_index_b32):
+    def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
         start = time.time()
         time.sleep(0.05)
         elapsed = time.time() - start
         self.accumulated += elapsed
         self.last_yield += elapsed
-    def finished_cycle(self):
+    def finished_cycle(self, cycle):
         self.cycles += 1
     def yielding(self, sleep_time):
         self.last_yield = 0.0
@@ -99,7 +99,7 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
         sis = [self.write(i, ss, serverid) for i in range(10)]
         statefile = os.path.join(self.basedir, "statefile")
 
-        c = BucketEnumeratingCrawler(ss, statefile)
+        c = BucketEnumeratingCrawler(ss, statefile, allowed_cpu_percentage=.1)
         c.load_state()
 
         c.start_current_prefix(time.time())
@@ -322,7 +322,11 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
         # empty methods in the base class
 
         def _check():
-            return c.first_cycle_finished
+            return bool(c.state["last-cycle-finished"] is not None)
         d = self.poll(_check)
+        def _done(ignored):
+            state = c.get_state()
+            self.failUnless(state["last-cycle-finished"] is not None)
+        d.addCallback(_done)
         return d
 
-- 
2.45.2