From c6a061e600e3a3ca10927aaff4a551e110b1b49b Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Fri, 20 Feb 2009 15:19:11 -0700
Subject: [PATCH] crawler: provide for one-shot crawlers, which stop after
 their first full cycle, for share-upgraders and database-populaters

---
 src/allmydata/storage/crawler.py   | 34 ++++++++++++++++++----
 src/allmydata/test/test_crawler.py | 45 ++++++++++++++++++++++++++++--
 2 files changed, 70 insertions(+), 9 deletions(-)

diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py
index 9c80e867..4ead36ce 100644
--- a/src/allmydata/storage/crawler.py
+++ b/src/allmydata/storage/crawler.py
@@ -16,20 +16,33 @@ class ShareCrawler(service.MultiService):
     since large servers will have several million shares, which can take
     hours or days to read.
 
+    Once the crawler starts a cycle, it will proceed at a rate limited by the
+    allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor
+    after it has worked for 'cpu_slice' seconds, and not resuming right away,
+    always trying to use less than 'allowed_cpu_percentage'.
+
+    Once the crawler finishes a cycle, it will put off starting the next one
+    long enough to ensure that 'minimum_cycle_time' elapses between the start
+    of two consecutive cycles.
+
     We assume that the normal upload/download/get_buckets traffic of a tahoe
     grid will cause the prefixdir contents to be mostly cached, or that the
     number of buckets in each prefixdir will be small enough to load quickly.
     A 1TB allmydata.com server was measured to have 2.56M buckets, spread
-    into the 1040 prefixdirs, with about 2460 buckets per prefix. On this
+    into the 1024 prefixdirs, with about 2500 buckets per prefix. On this
     server, each prefixdir took 130ms-200ms to list the first time, and 17ms
     to list the second time.
 
-    To use this, create a subclass which implements the process_bucket()
+    To use a crawler, create a subclass which implements the process_bucket()
     method. It will be called with a prefixdir and a base32 storage index
-    string. process_bucket() should run synchronously. Any keys added to
+    string. process_bucket() must run synchronously. Any keys added to
     self.state will be preserved. Override add_initial_state() to set up
     initial state keys. Override finished_cycle() to perform additional
-    processing when the cycle is complete.
+    processing when the cycle is complete. Any status that the crawler
+    produces should be put in the self.state dictionary. Status renderers
+    (like a web page which describes the accomplishments of your crawler)
+    will use crawler.get_state() to retrieve this dictionary; they can
+    present the contents as they see fit.
 
     Then create an instance, with a reference to a StorageServer and a
     filename where it can store persistent state. The statefile is used to
@@ -39,8 +52,7 @@ class ShareCrawler(service.MultiService):
     processed.
 
     The crawler instance must be started with startService() before it will
-    do any work. To make it stop doing work, call stopService() and wait for
-    the Deferred that it returns.
+    do any work. To make it stop doing work, call stopService().
     """
 
     # all three of these can be changed at any time
@@ -162,6 +174,9 @@ class ShareCrawler(service.MultiService):
             finished_cycle = True
         except TimeSliceExceeded:
             finished_cycle = False
+        if not self.running:
+            # someone might have used stopService() to shut us down
+            return
         # either we finished a whole cycle, or we ran out of time
         now = time.time()
         this_slice = now - start_slice
@@ -254,6 +269,13 @@ class ShareCrawler(service.MultiService):
         that just finished. This method should perform summary work and
         update self.state to publish information to status displays.
 
+        One-shot crawlers, such as those used to upgrade shares to a new
+        format or populate a database for the first time, can call
+        self.stopService() (or more likely self.disownServiceParent()) to
+        prevent it from running a second time. Don't forget to set some
+        persistent state so that the upgrader won't be run again the next
+        time the node is started.
+
         This method for subclasses to override. No upcall is necessary.
         """
         pass
diff --git a/src/allmydata/test/test_crawler.py b/src/allmydata/test/test_crawler.py
index a5a0f17a..49b60f7d 100644
--- a/src/allmydata/test/test_crawler.py
+++ b/src/allmydata/test/test_crawler.py
@@ -4,7 +4,7 @@ import os.path
 from twisted.trial import unittest
 from twisted.application import service
 from twisted.internet import defer
-from foolscap.eventual import eventually
+from foolscap import eventual
 
 from allmydata.util import fileutil, hashutil, pollmixin
 from allmydata.storage.server import StorageServer, si_b2a
@@ -22,7 +22,7 @@ class BucketEnumeratingCrawler(ShareCrawler):
     def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
         self.all_buckets.append(storage_index_b32)
     def finished_cycle(self, cycle):
-        eventually(self.finished_d.callback, None)
+        eventual.eventually(self.finished_d.callback, None)
 
 class PacedCrawler(ShareCrawler):
     cpu_slice = 500 # make sure it can complete in a single slice
@@ -40,7 +40,7 @@ class PacedCrawler(ShareCrawler):
     def yielding(self, sleep_time):
         self.cpu_slice = 500
     def finished_cycle(self, cycle):
-        eventually(self.finished_d.callback, None)
+        eventual.eventually(self.finished_d.callback, None)
 
 class ConsumingCrawler(ShareCrawler):
     cpu_slice = 0.5
@@ -63,6 +63,18 @@ class ConsumingCrawler(ShareCrawler):
     def yielding(self, sleep_time):
         self.last_yield = 0.0
 
+class OneShotCrawler(ShareCrawler):
+    cpu_slice = 500 # make sure it can complete in a single slice
+    def __init__(self, *args, **kwargs):
+        ShareCrawler.__init__(self, *args, **kwargs)
+        self.counter = 0
+        self.finished_d = defer.Deferred()
+    def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
+        self.counter += 1
+    def finished_cycle(self, cycle):
+        self.finished_d.callback(None)
+        self.disownServiceParent()
+
 class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
     def setUp(self):
         self.s = service.MultiService()
@@ -330,3 +342,30 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
         d.addCallback(_done)
         return d
 
+
+    def test_oneshot(self):
+        self.basedir = "crawler/Basic/oneshot"
+        fileutil.make_dirs(self.basedir)
+        serverid = "\x00" * 20
+        ss = StorageServer(self.basedir, serverid)
+        ss.setServiceParent(self.s)
+
+        sis = [self.write(i, ss, serverid) for i in range(30)]
+
+        statefile = os.path.join(self.basedir, "statefile")
+        c = OneShotCrawler(ss, statefile)
+        c.setServiceParent(self.s)
+
+        d = c.finished_d
+        def _finished_first_cycle(ignored):
+            return eventual.fireEventually(c.counter)
+        d.addCallback(_finished_first_cycle)
+        def _check(old_counter):
+            # the crawler should do any work after it's been stopped
+            self.failUnlessEqual(old_counter, c.counter)
+            self.failIf(c.running)
+            self.failIf(c.timer)
+            self.failIf(c.current_sleep_time)
+        d.addCallback(_check)
+        return d
+
-- 
2.45.2