"""A ShareCrawler subclass is attached to a StorageServer, and
periodically walks all of its shares, processing each one in some
fashion. This crawl is rate-limited, to reduce the IO burden on the host,
- since large servers will have several million shares, which can take
- hours or days to read.
+ since large servers can easily have a terabyte of shares, in several
+ million files, which can take hours or days to read.
Once the crawler starts a cycle, it will proceed at a rate limited by the
allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor
of two consecutive cycles.
We assume that the normal upload/download/get_buckets traffic of a tahoe
- grid will cause the prefixdir contents to be mostly cached, or that the
- number of buckets in each prefixdir will be small enough to load quickly.
- A 1TB allmydata.com server was measured to have 2.56M buckets, spread
- into the 1024 prefixdirs, with about 2500 buckets per prefix. On this
- server, each prefixdir took 130ms-200ms to list the first time, and 17ms
- to list the second time.
+ grid will cause the prefixdir contents to be mostly cached in the kernel,
+ or that the number of buckets in each prefixdir will be small enough to
+ load quickly. A 1TB allmydata.com server was measured to have 2.56M
+ buckets, spread into the 1024 prefixdirs, with about 2500 buckets per
+ prefix. On this server, each prefixdir took 130ms-200ms to list the first
+ time, and 17ms to list the second time.
To use a crawler, create a subclass which implements the process_bucket()
method. It will be called with a prefixdir and a base32 storage index
filename where it can store persistent state. The statefile is used to
keep track of how far around the ring the process has travelled, as well
as timing history to allow the pace to be predicted and controlled. The
- statefile will be updated and written to disk after every bucket is
- processed.
+ statefile will be updated and written to disk after each time slice (just
+ before the crawler yields to the reactor), and also after each cycle is
+ finished, and also when stopService() is called. Note that this means
+ that a crawler which is interrupted with SIGKILL while it is in the
+ middle of a time slice will lose progress: the next time the node is
+ started, the crawler will repeat some unknown amount of work.
The crawler instance must be started with startService() before it will
do any work. To make it stop doing work, call stopService().
"""
+ slow_start = 300 # don't start crawling for 5 minutes after startup
# all three of these can be changed at any time
allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average
cpu_slice = 1.0 # use up to 1.0 seconds before yielding
# arrange things to look like we were just sleeping, so
# status/progress values work correctly
self.sleeping_between_cycles = True
- self.current_sleep_time = 0
- self.next_wake_time = time.time()
- self.timer = reactor.callLater(0, self.start_slice)
+ self.current_sleep_time = self.slow_start
+ self.next_wake_time = time.time() + self.slow_start
+ self.timer = reactor.callLater(self.slow_start, self.start_slice)
service.MultiService.startService(self)
def stopService(self):
if self.timer:
self.timer.cancel()
self.timer = None
+ self.save_state()
return service.MultiService.stopService(self)
def start_slice(self):
+ start_slice = time.time()
self.timer = None
self.sleeping_between_cycles = False
self.current_sleep_time = None
self.next_wake_time = None
- start_slice = time.time()
try:
s = self.last_complete_prefix_index
self.start_current_prefix(start_slice)
finished_cycle = True
except TimeSliceExceeded:
finished_cycle = False
+ self.save_state()
if not self.running:
# someone might have used stopService() to shut us down
return
# this_slice/percentage = this_slice+sleep_time
# sleep_time = (this_slice/percentage) - this_slice
sleep_time = (this_slice / self.allowed_cpu_percentage) - this_slice
- # if the math gets weird, or a timequake happens, don't sleep forever
+ # if the math gets weird, or a timequake happens, don't sleep
+ # forever. Note that this means that, while a cycle is running, we
+ # will process at least one bucket every 5 minutes, no matter how
+ # long that bucket takes.
sleep_time = max(0.0, min(sleep_time, 299))
if finished_cycle:
# how long should we sleep between cycles? Don't run faster than
self.process_prefixdir(cycle, prefix, prefixdir,
buckets, start_slice)
self.last_complete_prefix_index = i
- self.save_state()
+ self.finished_prefix(cycle, prefix)
if time.time() >= start_slice + self.cpu_slice:
raise TimeSliceExceeded()
# yay! we finished the whole cycle
"""This gets a list of bucket names (i.e. storage index strings,
base32-encoded) in sorted order.
- Override this if your crawler doesn't care about the actual shares,
- for example a crawler which merely keeps track of how many buckets
- are being managed by this server.
+ You can override this if your crawler doesn't care about the actual
+ shares, for example a crawler which merely keeps track of how many
+ buckets are being managed by this server.
+
+ Subclasses which *do* care about actual bucket should leave this
+ method along, and implement process_bucket() instead.
"""
+
for bucket in buckets:
if bucket <= self.state["last-complete-bucket"]:
continue
self.process_bucket(cycle, prefix, prefixdir, bucket)
self.state["last-complete-bucket"] = bucket
- # note: saving the state after every bucket is somewhat
- # time-consuming, but lets us avoid losing more than one bucket's
- # worth of progress.
- self.save_state()
if time.time() >= start_slice + self.cpu_slice:
raise TimeSliceExceeded()
+ # the remaining methods are explictly for subclasses to implement.
+
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
"""Examine a single bucket. Subclasses should do whatever they want
to do to the shares therein, then update self.state as necessary.
- This method will be called exactly once per share (per cycle), unless
- the crawler was interrupted (by node restart, for example), in which
- case it might be called a second time on a bucket which was processed
- during the previous node's incarnation. However, in that case, no
- changes to self.state will have been recorded.
+ If the crawler is never interrupted by SIGKILL, this method will be
+ called exactly once per share (per cycle). If it *is* interrupted,
+ then the next time the node is started, some amount of work will be
+ duplicated, according to when self.save_state() was last called. By
+ default, save_state() is called at the end of each timeslice, and
+ after finished_cycle() returns, and when stopService() is called.
+
+ To reduce the chance of duplicate work (i.e. to avoid adding multiple
+ records to a database), you can call save_state() at the end of your
+ process_bucket() method. This will reduce the maximum duplicated work
+ to one bucket per SIGKILL. It will also add overhead, probably 1-20ms
+ per bucket (and some disk writes), which will count against your
+ allowed_cpu_percentage, and which may be considerable if
+ process_bucket() runs quickly.
+
+ This method for subclasses to override. No upcall is necessary.
+ """
+ pass
+
+ def finished_prefix(self, cycle, prefix):
+ """Notify a subclass that the crawler has just finished processing a
+ prefix directory (all buckets with the same two-character/10bit
+ prefix). To impose a limit on how much work might be duplicated by a
+ SIGKILL that occurs during a timeslice, you can call
+ self.save_state() here, but be aware that it may represent a
+ significant performance hit.
This method for subclasses to override. No upcall is necessary.
"""
class BucketEnumeratingCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice
+ slow_start = 0
def __init__(self, *args, **kwargs):
ShareCrawler.__init__(self, *args, **kwargs)
self.all_buckets = []
class PacedCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice
+ slow_start = 0
def __init__(self, *args, **kwargs):
ShareCrawler.__init__(self, *args, **kwargs)
self.countdown = 6
cpu_slice = 0.5
allowed_cpu_percentage = 0.5
minimum_cycle_time = 0
+ slow_start = 0
def __init__(self, *args, **kwargs):
ShareCrawler.__init__(self, *args, **kwargs)
class OneShotCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice
+ slow_start = 0
def __init__(self, *args, **kwargs):
ShareCrawler.__init__(self, *args, **kwargs)
self.counter = 0
c.start_current_prefix(time.time())
except TimeSliceExceeded:
pass
- # that should stop in the middle of one of the buckets.
+ # that should stop in the middle of one of the buckets. Since we
+ # aren't using its normal scheduler, we have to save its state
+ # manually.
+ c.save_state()
c.cpu_slice = PacedCrawler.cpu_slice
self.failUnlessEqual(len(c.all_buckets), 6)
c.start_current_prefix(time.time())
except TimeSliceExceeded:
pass
- # that should stop in the middle of one of the buckets
+ # that should stop in the middle of one of the buckets. Since we
+ # aren't using its normal scheduler, we have to save its state
+ # manually.
+ c.save_state()
c.cpu_slice = PacedCrawler.cpu_slice
# a third crawler should pick up from where it left off
c.start_current_prefix(time.time())
except TimeSliceExceeded:
pass
- # that should stop at the end of one of the buckets.
+ # that should stop at the end of one of the buckets. Again we must
+ # save state manually.
+ c.save_state()
c.cpu_slice = PacedCrawler.cpu_slice
self.failUnlessEqual(len(c.all_buckets), 4)
c.start_current_prefix(time.time()) # finish it
except TimeSliceExceeded:
pass
# that should stop at the end of one of the buckets.
+ c.save_state()
c2 = PacedCrawler(ss, statefile)
c2.all_buckets = c.all_buckets[:]
statefile = os.path.join(self.basedir, "statefile")
c = ShareCrawler(ss, statefile)
+ c.slow_start = 0
c.setServiceParent(self.s)
# we just let it run for a while, to get figleaf coverage of the