From: Brian Warner Date: Fri, 20 Feb 2009 22:19:11 +0000 (-0700) Subject: crawler: provide for one-shot crawlers, which stop after their first full cycle,... X-Git-Tag: allmydata-tahoe-1.4.0~169 X-Git-Url: https://git.rkrishnan.org/pf/content/en/footer/module-simplejson.encoder.html?a=commitdiff_plain;h=c6a061e600e3a3ca10927aaff4a551e110b1b49b;p=tahoe-lafs%2Ftahoe-lafs.git crawler: provide for one-shot crawlers, which stop after their first full cycle, for share-upgraders and database-populaters --- diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py index 9c80e867..4ead36ce 100644 --- a/src/allmydata/storage/crawler.py +++ b/src/allmydata/storage/crawler.py @@ -16,20 +16,33 @@ class ShareCrawler(service.MultiService): since large servers will have several million shares, which can take hours or days to read. + Once the crawler starts a cycle, it will proceed at a rate limited by the + allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor + after it has worked for 'cpu_slice' seconds, and not resuming right away, + always trying to use less than 'allowed_cpu_percentage'. + + Once the crawler finishes a cycle, it will put off starting the next one + long enough to ensure that 'minimum_cycle_time' elapses between the start + of two consecutive cycles. + We assume that the normal upload/download/get_buckets traffic of a tahoe grid will cause the prefixdir contents to be mostly cached, or that the number of buckets in each prefixdir will be small enough to load quickly. A 1TB allmydata.com server was measured to have 2.56M buckets, spread - into the 1040 prefixdirs, with about 2460 buckets per prefix. On this + into the 1024 prefixdirs, with about 2500 buckets per prefix. On this server, each prefixdir took 130ms-200ms to list the first time, and 17ms to list the second time. - To use this, create a subclass which implements the process_bucket() + To use a crawler, create a subclass which implements the process_bucket() method. It will be called with a prefixdir and a base32 storage index - string. process_bucket() should run synchronously. Any keys added to + string. process_bucket() must run synchronously. Any keys added to self.state will be preserved. Override add_initial_state() to set up initial state keys. Override finished_cycle() to perform additional - processing when the cycle is complete. + processing when the cycle is complete. Any status that the crawler + produces should be put in the self.state dictionary. Status renderers + (like a web page which describes the accomplishments of your crawler) + will use crawler.get_state() to retrieve this dictionary; they can + present the contents as they see fit. Then create an instance, with a reference to a StorageServer and a filename where it can store persistent state. The statefile is used to @@ -39,8 +52,7 @@ class ShareCrawler(service.MultiService): processed. The crawler instance must be started with startService() before it will - do any work. To make it stop doing work, call stopService() and wait for - the Deferred that it returns. + do any work. To make it stop doing work, call stopService(). """ # all three of these can be changed at any time @@ -162,6 +174,9 @@ class ShareCrawler(service.MultiService): finished_cycle = True except TimeSliceExceeded: finished_cycle = False + if not self.running: + # someone might have used stopService() to shut us down + return # either we finished a whole cycle, or we ran out of time now = time.time() this_slice = now - start_slice @@ -254,6 +269,13 @@ class ShareCrawler(service.MultiService): that just finished. This method should perform summary work and update self.state to publish information to status displays. + One-shot crawlers, such as those used to upgrade shares to a new + format or populate a database for the first time, can call + self.stopService() (or more likely self.disownServiceParent()) to + prevent it from running a second time. Don't forget to set some + persistent state so that the upgrader won't be run again the next + time the node is started. + This method for subclasses to override. No upcall is necessary. """ pass diff --git a/src/allmydata/test/test_crawler.py b/src/allmydata/test/test_crawler.py index a5a0f17a..49b60f7d 100644 --- a/src/allmydata/test/test_crawler.py +++ b/src/allmydata/test/test_crawler.py @@ -4,7 +4,7 @@ import os.path from twisted.trial import unittest from twisted.application import service from twisted.internet import defer -from foolscap.eventual import eventually +from foolscap import eventual from allmydata.util import fileutil, hashutil, pollmixin from allmydata.storage.server import StorageServer, si_b2a @@ -22,7 +22,7 @@ class BucketEnumeratingCrawler(ShareCrawler): def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): self.all_buckets.append(storage_index_b32) def finished_cycle(self, cycle): - eventually(self.finished_d.callback, None) + eventual.eventually(self.finished_d.callback, None) class PacedCrawler(ShareCrawler): cpu_slice = 500 # make sure it can complete in a single slice @@ -40,7 +40,7 @@ class PacedCrawler(ShareCrawler): def yielding(self, sleep_time): self.cpu_slice = 500 def finished_cycle(self, cycle): - eventually(self.finished_d.callback, None) + eventual.eventually(self.finished_d.callback, None) class ConsumingCrawler(ShareCrawler): cpu_slice = 0.5 @@ -63,6 +63,18 @@ class ConsumingCrawler(ShareCrawler): def yielding(self, sleep_time): self.last_yield = 0.0 +class OneShotCrawler(ShareCrawler): + cpu_slice = 500 # make sure it can complete in a single slice + def __init__(self, *args, **kwargs): + ShareCrawler.__init__(self, *args, **kwargs) + self.counter = 0 + self.finished_d = defer.Deferred() + def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): + self.counter += 1 + def finished_cycle(self, cycle): + self.finished_d.callback(None) + self.disownServiceParent() + class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin): def setUp(self): self.s = service.MultiService() @@ -330,3 +342,30 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin): d.addCallback(_done) return d + + def test_oneshot(self): + self.basedir = "crawler/Basic/oneshot" + fileutil.make_dirs(self.basedir) + serverid = "\x00" * 20 + ss = StorageServer(self.basedir, serverid) + ss.setServiceParent(self.s) + + sis = [self.write(i, ss, serverid) for i in range(30)] + + statefile = os.path.join(self.basedir, "statefile") + c = OneShotCrawler(ss, statefile) + c.setServiceParent(self.s) + + d = c.finished_d + def _finished_first_cycle(ignored): + return eventual.fireEventually(c.counter) + d.addCallback(_finished_first_cycle) + def _check(old_counter): + # the crawler should do any work after it's been stopped + self.failUnlessEqual(old_counter, c.counter) + self.failIf(c.running) + self.failIf(c.timer) + self.failIf(c.current_sleep_time) + d.addCallback(_check) + return d +