since large servers will have several million shares, which can take
hours or days to read.
+ Once the crawler starts a cycle, it will proceed at a rate limited by the
+ allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor
+ after it has worked for 'cpu_slice' seconds, and not resuming right away,
+ always trying to use less than 'allowed_cpu_percentage'.
+
+ Once the crawler finishes a cycle, it will put off starting the next one
+ long enough to ensure that 'minimum_cycle_time' elapses between the start
+ of two consecutive cycles.
+
We assume that the normal upload/download/get_buckets traffic of a tahoe
grid will cause the prefixdir contents to be mostly cached, or that the
number of buckets in each prefixdir will be small enough to load quickly.
A 1TB allmydata.com server was measured to have 2.56M buckets, spread
- into the 1040 prefixdirs, with about 2460 buckets per prefix. On this
+ into the 1024 prefixdirs, with about 2500 buckets per prefix. On this
server, each prefixdir took 130ms-200ms to list the first time, and 17ms
to list the second time.
- To use this, create a subclass which implements the process_bucket()
+ To use a crawler, create a subclass which implements the process_bucket()
method. It will be called with a prefixdir and a base32 storage index
- string. process_bucket() should run synchronously. Any keys added to
+ string. process_bucket() must run synchronously. Any keys added to
self.state will be preserved. Override add_initial_state() to set up
initial state keys. Override finished_cycle() to perform additional
- processing when the cycle is complete.
+ processing when the cycle is complete. Any status that the crawler
+ produces should be put in the self.state dictionary. Status renderers
+ (like a web page which describes the accomplishments of your crawler)
+ will use crawler.get_state() to retrieve this dictionary; they can
+ present the contents as they see fit.
Then create an instance, with a reference to a StorageServer and a
filename where it can store persistent state. The statefile is used to
processed.
The crawler instance must be started with startService() before it will
- do any work. To make it stop doing work, call stopService() and wait for
- the Deferred that it returns.
+ do any work. To make it stop doing work, call stopService().
"""
# all three of these can be changed at any time
finished_cycle = True
except TimeSliceExceeded:
finished_cycle = False
+ if not self.running:
+ # someone might have used stopService() to shut us down
+ return
# either we finished a whole cycle, or we ran out of time
now = time.time()
this_slice = now - start_slice
that just finished. This method should perform summary work and
update self.state to publish information to status displays.
+ One-shot crawlers, such as those used to upgrade shares to a new
+ format or populate a database for the first time, can call
+ self.stopService() (or more likely self.disownServiceParent()) to
+ prevent it from running a second time. Don't forget to set some
+ persistent state so that the upgrader won't be run again the next
+ time the node is started.
+
This method for subclasses to override. No upcall is necessary.
"""
pass
from twisted.trial import unittest
from twisted.application import service
from twisted.internet import defer
-from foolscap.eventual import eventually
+from foolscap import eventual
from allmydata.util import fileutil, hashutil, pollmixin
from allmydata.storage.server import StorageServer, si_b2a
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
self.all_buckets.append(storage_index_b32)
def finished_cycle(self, cycle):
- eventually(self.finished_d.callback, None)
+ eventual.eventually(self.finished_d.callback, None)
class PacedCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice
def yielding(self, sleep_time):
self.cpu_slice = 500
def finished_cycle(self, cycle):
- eventually(self.finished_d.callback, None)
+ eventual.eventually(self.finished_d.callback, None)
class ConsumingCrawler(ShareCrawler):
cpu_slice = 0.5
def yielding(self, sleep_time):
self.last_yield = 0.0
+class OneShotCrawler(ShareCrawler):
+ cpu_slice = 500 # make sure it can complete in a single slice
+ def __init__(self, *args, **kwargs):
+ ShareCrawler.__init__(self, *args, **kwargs)
+ self.counter = 0
+ self.finished_d = defer.Deferred()
+ def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
+ self.counter += 1
+ def finished_cycle(self, cycle):
+ self.finished_d.callback(None)
+ self.disownServiceParent()
+
class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
def setUp(self):
self.s = service.MultiService()
d.addCallback(_done)
return d
+
+ def test_oneshot(self):
+ self.basedir = "crawler/Basic/oneshot"
+ fileutil.make_dirs(self.basedir)
+ serverid = "\x00" * 20
+ ss = StorageServer(self.basedir, serverid)
+ ss.setServiceParent(self.s)
+
+ sis = [self.write(i, ss, serverid) for i in range(30)]
+
+ statefile = os.path.join(self.basedir, "statefile")
+ c = OneShotCrawler(ss, statefile)
+ c.setServiceParent(self.s)
+
+ d = c.finished_d
+ def _finished_first_cycle(ignored):
+ return eventual.fireEventually(c.counter)
+ d.addCallback(_finished_first_cycle)
+ def _check(old_counter):
+ # the crawler should do any work after it's been stopped
+ self.failUnlessEqual(old_counter, c.counter)
+ self.failIf(c.running)
+ self.failIf(c.timer)
+ self.failIf(c.current_sleep_time)
+ d.addCallback(_check)
+ return d
+