From 77f3b83d687d261968a0d10ab9dde77183a81717 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Sat, 21 Feb 2009 14:56:49 -0700
Subject: [PATCH] crawler: fix performance problems: only save state once per
 timeslice (not after every bucket), don't start the crawler until 5 minutes
 after node startup

---
 src/allmydata/storage/crawler.py   | 89 ++++++++++++++++++++----------
 src/allmydata/test/test_crawler.py | 20 ++++++-
 src/allmydata/test/test_storage.py |  5 +-
 3 files changed, 82 insertions(+), 32 deletions(-)

diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py
index df0f6e5d..9d950537 100644
--- a/src/allmydata/storage/crawler.py
+++ b/src/allmydata/storage/crawler.py
@@ -13,8 +13,8 @@ class ShareCrawler(service.MultiService):
     """A ShareCrawler subclass is attached to a StorageServer, and
     periodically walks all of its shares, processing each one in some
     fashion. This crawl is rate-limited, to reduce the IO burden on the host,
-    since large servers will have several million shares, which can take
-    hours or days to read.
+    since large servers can easily have a terabyte of shares, in several
+    million files, which can take hours or days to read.
 
     Once the crawler starts a cycle, it will proceed at a rate limited by the
     allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor
@@ -26,12 +26,12 @@ class ShareCrawler(service.MultiService):
     of two consecutive cycles.
 
     We assume that the normal upload/download/get_buckets traffic of a tahoe
-    grid will cause the prefixdir contents to be mostly cached, or that the
-    number of buckets in each prefixdir will be small enough to load quickly.
-    A 1TB allmydata.com server was measured to have 2.56M buckets, spread
-    into the 1024 prefixdirs, with about 2500 buckets per prefix. On this
-    server, each prefixdir took 130ms-200ms to list the first time, and 17ms
-    to list the second time.
+    grid will cause the prefixdir contents to be mostly cached in the kernel,
+    or that the number of buckets in each prefixdir will be small enough to
+    load quickly. A 1TB allmydata.com server was measured to have 2.56M
+    buckets, spread into the 1024 prefixdirs, with about 2500 buckets per
+    prefix. On this server, each prefixdir took 130ms-200ms to list the first
+    time, and 17ms to list the second time.
 
     To use a crawler, create a subclass which implements the process_bucket()
     method. It will be called with a prefixdir and a base32 storage index
@@ -48,13 +48,18 @@ class ShareCrawler(service.MultiService):
     filename where it can store persistent state. The statefile is used to
     keep track of how far around the ring the process has travelled, as well
     as timing history to allow the pace to be predicted and controlled. The
-    statefile will be updated and written to disk after every bucket is
-    processed.
+    statefile will be updated and written to disk after each time slice (just
+    before the crawler yields to the reactor), and also after each cycle is
+    finished, and also when stopService() is called. Note that this means
+    that a crawler which is interrupted with SIGKILL while it is in the
+    middle of a time slice will lose progress: the next time the node is
+    started, the crawler will repeat some unknown amount of work.
 
     The crawler instance must be started with startService() before it will
     do any work. To make it stop doing work, call stopService().
     """
 
+    slow_start = 300 # don't start crawling for 5 minutes after startup
     # all three of these can be changed at any time
     allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average
     cpu_slice = 1.0 # use up to 1.0 seconds before yielding
@@ -186,29 +191,31 @@ class ShareCrawler(service.MultiService):
         # arrange things to look like we were just sleeping, so
         # status/progress values work correctly
         self.sleeping_between_cycles = True
-        self.current_sleep_time = 0
-        self.next_wake_time = time.time()
-        self.timer = reactor.callLater(0, self.start_slice)
+        self.current_sleep_time = self.slow_start
+        self.next_wake_time = time.time() + self.slow_start
+        self.timer = reactor.callLater(self.slow_start, self.start_slice)
         service.MultiService.startService(self)
 
     def stopService(self):
         if self.timer:
             self.timer.cancel()
             self.timer = None
+        self.save_state()
         return service.MultiService.stopService(self)
 
     def start_slice(self):
+        start_slice = time.time()
         self.timer = None
         self.sleeping_between_cycles = False
         self.current_sleep_time = None
         self.next_wake_time = None
-        start_slice = time.time()
         try:
             s = self.last_complete_prefix_index
             self.start_current_prefix(start_slice)
             finished_cycle = True
         except TimeSliceExceeded:
             finished_cycle = False
+        self.save_state()
         if not self.running:
             # someone might have used stopService() to shut us down
             return
@@ -219,7 +226,10 @@ class ShareCrawler(service.MultiService):
         # this_slice/percentage = this_slice+sleep_time
         # sleep_time = (this_slice/percentage) - this_slice
         sleep_time = (this_slice / self.allowed_cpu_percentage) - this_slice
-        # if the math gets weird, or a timequake happens, don't sleep forever
+        # if the math gets weird, or a timequake happens, don't sleep
+        # forever. Note that this means that, while a cycle is running, we
+        # will process at least one bucket every 5 minutes, no matter how
+        # long that bucket takes.
         sleep_time = max(0.0, min(sleep_time, 299))
         if finished_cycle:
             # how long should we sleep between cycles? Don't run faster than
@@ -259,7 +269,7 @@ class ShareCrawler(service.MultiService):
             self.process_prefixdir(cycle, prefix, prefixdir,
                                    buckets, start_slice)
             self.last_complete_prefix_index = i
-            self.save_state()
+            self.finished_prefix(cycle, prefix)
             if time.time() >= start_slice + self.cpu_slice:
                 raise TimeSliceExceeded()
         # yay! we finished the whole cycle
@@ -274,31 +284,54 @@ class ShareCrawler(service.MultiService):
         """This gets a list of bucket names (i.e. storage index strings,
         base32-encoded) in sorted order.
 
-        Override this if your crawler doesn't care about the actual shares,
-        for example a crawler which merely keeps track of how many buckets
-        are being managed by this server.
+        You can override this if your crawler doesn't care about the actual
+        shares, for example a crawler which merely keeps track of how many
+        buckets are being managed by this server.
+
+        Subclasses which *do* care about actual bucket should leave this
+        method along, and implement process_bucket() instead.
         """
+
         for bucket in buckets:
             if bucket <= self.state["last-complete-bucket"]:
                 continue
             self.process_bucket(cycle, prefix, prefixdir, bucket)
             self.state["last-complete-bucket"] = bucket
-            # note: saving the state after every bucket is somewhat
-            # time-consuming, but lets us avoid losing more than one bucket's
-            # worth of progress.
-            self.save_state()
             if time.time() >= start_slice + self.cpu_slice:
                 raise TimeSliceExceeded()
 
+    # the remaining methods are explictly for subclasses to implement.
+
     def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
         """Examine a single bucket. Subclasses should do whatever they want
         to do to the shares therein, then update self.state as necessary.
 
-        This method will be called exactly once per share (per cycle), unless
-        the crawler was interrupted (by node restart, for example), in which
-        case it might be called a second time on a bucket which was processed
-        during the previous node's incarnation. However, in that case, no
-        changes to self.state will have been recorded.
+        If the crawler is never interrupted by SIGKILL, this method will be
+        called exactly once per share (per cycle). If it *is* interrupted,
+        then the next time the node is started, some amount of work will be
+        duplicated, according to when self.save_state() was last called. By
+        default, save_state() is called at the end of each timeslice, and
+        after finished_cycle() returns, and when stopService() is called.
+
+        To reduce the chance of duplicate work (i.e. to avoid adding multiple
+        records to a database), you can call save_state() at the end of your
+        process_bucket() method. This will reduce the maximum duplicated work
+        to one bucket per SIGKILL. It will also add overhead, probably 1-20ms
+        per bucket (and some disk writes), which will count against your
+        allowed_cpu_percentage, and which may be considerable if
+        process_bucket() runs quickly.
+
+        This method for subclasses to override. No upcall is necessary.
+        """
+        pass
+
+    def finished_prefix(self, cycle, prefix):
+        """Notify a subclass that the crawler has just finished processing a
+        prefix directory (all buckets with the same two-character/10bit
+        prefix). To impose a limit on how much work might be duplicated by a
+        SIGKILL that occurs during a timeslice, you can call
+        self.save_state() here, but be aware that it may represent a
+        significant performance hit.
 
         This method for subclasses to override. No upcall is necessary.
         """
diff --git a/src/allmydata/test/test_crawler.py b/src/allmydata/test/test_crawler.py
index bed132de..b4cf56e7 100644
--- a/src/allmydata/test/test_crawler.py
+++ b/src/allmydata/test/test_crawler.py
@@ -15,6 +15,7 @@ from common_util import StallMixin
 
 class BucketEnumeratingCrawler(ShareCrawler):
     cpu_slice = 500 # make sure it can complete in a single slice
+    slow_start = 0
     def __init__(self, *args, **kwargs):
         ShareCrawler.__init__(self, *args, **kwargs)
         self.all_buckets = []
@@ -26,6 +27,7 @@ class BucketEnumeratingCrawler(ShareCrawler):
 
 class PacedCrawler(ShareCrawler):
     cpu_slice = 500 # make sure it can complete in a single slice
+    slow_start = 0
     def __init__(self, *args, **kwargs):
         ShareCrawler.__init__(self, *args, **kwargs)
         self.countdown = 6
@@ -49,6 +51,7 @@ class ConsumingCrawler(ShareCrawler):
     cpu_slice = 0.5
     allowed_cpu_percentage = 0.5
     minimum_cycle_time = 0
+    slow_start = 0
 
     def __init__(self, *args, **kwargs):
         ShareCrawler.__init__(self, *args, **kwargs)
@@ -68,6 +71,7 @@ class ConsumingCrawler(ShareCrawler):
 
 class OneShotCrawler(ShareCrawler):
     cpu_slice = 500 # make sure it can complete in a single slice
+    slow_start = 0
     def __init__(self, *args, **kwargs):
         ShareCrawler.__init__(self, *args, **kwargs)
         self.counter = 0
@@ -182,7 +186,10 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
             c.start_current_prefix(time.time())
         except TimeSliceExceeded:
             pass
-        # that should stop in the middle of one of the buckets.
+        # that should stop in the middle of one of the buckets. Since we
+        # aren't using its normal scheduler, we have to save its state
+        # manually.
+        c.save_state()
         c.cpu_slice = PacedCrawler.cpu_slice
         self.failUnlessEqual(len(c.all_buckets), 6)
 
@@ -204,7 +211,10 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
             c.start_current_prefix(time.time())
         except TimeSliceExceeded:
             pass
-        # that should stop in the middle of one of the buckets
+        # that should stop in the middle of one of the buckets. Since we
+        # aren't using its normal scheduler, we have to save its state
+        # manually.
+        c.save_state()
         c.cpu_slice = PacedCrawler.cpu_slice
 
         # a third crawler should pick up from where it left off
@@ -226,7 +236,9 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
             c.start_current_prefix(time.time())
         except TimeSliceExceeded:
             pass
-        # that should stop at the end of one of the buckets.
+        # that should stop at the end of one of the buckets. Again we must
+        # save state manually.
+        c.save_state()
         c.cpu_slice = PacedCrawler.cpu_slice
         self.failUnlessEqual(len(c.all_buckets), 4)
         c.start_current_prefix(time.time()) # finish it
@@ -244,6 +256,7 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
         except TimeSliceExceeded:
             pass
         # that should stop at the end of one of the buckets.
+        c.save_state()
 
         c2 = PacedCrawler(ss, statefile)
         c2.all_buckets = c.all_buckets[:]
@@ -376,6 +389,7 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
 
         statefile = os.path.join(self.basedir, "statefile")
         c = ShareCrawler(ss, statefile)
+        c.slow_start = 0
         c.setServiceParent(self.s)
 
         # we just let it run for a while, to get figleaf coverage of the
diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py
index 0abab19a..e0c07d9a 100644
--- a/src/allmydata/test/test_storage.py
+++ b/src/allmydata/test/test_storage.py
@@ -1312,7 +1312,9 @@ class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
         fileutil.make_dirs(basedir)
         ss = StorageServer(basedir, "\x00" * 20)
         # to make sure we capture the bucket-counting-crawler in the middle
-        # of a cycle, we reach in and reduce its maximum slice time to 0.
+        # of a cycle, we reach in and reduce its maximum slice time to 0. We
+        # also make it start sooner than usual.
+        ss.bucket_counter.slow_start = 0
         orig_cpu_slice = ss.bucket_counter.cpu_slice
         ss.bucket_counter.cpu_slice = 0
         ss.setServiceParent(self.s)
@@ -1364,6 +1366,7 @@ class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
         ss = StorageServer(basedir, "\x00" * 20)
         # to make sure we capture the bucket-counting-crawler in the middle
         # of a cycle, we reach in and reduce its maximum slice time to 0.
+        ss.bucket_counter.slow_start = 0
         orig_cpu_slice = ss.bucket_counter.cpu_slice
         ss.bucket_counter.cpu_slice = 0
         ss.setServiceParent(self.s)
-- 
2.45.2