From: Brian Warner Date: Fri, 20 Feb 2009 02:31:42 +0000 (-0700) Subject: crawler: modify API to support upcoming bucket-counting crawler X-Git-Tag: allmydata-tahoe-1.4.0~175 X-Git-Url: https://git.rkrishnan.org/simplejson/components//%22file:/%22?a=commitdiff_plain;h=ef4ff21ae7489cc57945434582a523fe3afa8e4c;p=tahoe-lafs%2Ftahoe-lafs.git crawler: modify API to support upcoming bucket-counting crawler --- diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py index e4f8622c..9c80e867 100644 --- a/src/allmydata/storage/crawler.py +++ b/src/allmydata/storage/crawler.py @@ -1,5 +1,6 @@ -import os, time, struct, pickle +import os, time, struct +import cPickle as pickle from twisted.internet import reactor from twisted.application import service from allmydata.storage.server import si_b2a @@ -25,7 +26,10 @@ class ShareCrawler(service.MultiService): To use this, create a subclass which implements the process_bucket() method. It will be called with a prefixdir and a base32 storage index - string. process_bucket() should run synchronously. + string. process_bucket() should run synchronously. Any keys added to + self.state will be preserved. Override add_initial_state() to set up + initial state keys. Override finished_cycle() to perform additional + processing when the cycle is complete. Then create an instance, with a reference to a StorageServer and a filename where it can store persistent state. The statefile is used to @@ -39,15 +43,15 @@ class ShareCrawler(service.MultiService): the Deferred that it returns. """ - # use up to 10% of the CPU, on average. This can be changed at any time. - allowed_cpu_percentage = .10 - # use up to 1.0 seconds before yielding. This can be changed at any time. - cpu_slice = 1.0 - # don't run a cycle faster than this - minimum_cycle_time = 300 + # all three of these can be changed at any time + allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average + cpu_slice = 1.0 # use up to 1.0 seconds before yielding + minimum_cycle_time = 300 # don't run a cycle faster than this - def __init__(self, server, statefile): + def __init__(self, server, statefile, allowed_cpu_percentage=None): service.MultiService.__init__(self) + if allowed_cpu_percentage is not None: + self.allowed_cpu_percentage = allowed_cpu_percentage self.server = server self.sharedir = server.sharedir self.statefile = statefile @@ -56,24 +60,73 @@ class ShareCrawler(service.MultiService): self.prefixes.sort() self.timer = None self.bucket_cache = (None, []) - self.first_cycle_finished = False + self.current_sleep_time = None + self.next_wake_time = None + + def get_state(self): + """I return the current state of the crawler. This is a copy of my + state dictionary, plus the following keys:: + + current-sleep-time: float, duration of our current sleep + next-wake-time: float, seconds-since-epoch of when we will next wake + + If we are not currently sleeping (i.e. get_state() was called from + inside the process_prefixdir, process_bucket, or finished_cycle() + methods, or if startService has not yet been called on this crawler), + these two keys will be None. + """ + state = self.state.copy() # it isn't a deepcopy, so don't go crazy + state["current-sleep-time"] = self.current_sleep_time + state["next-wake-time"] = self.next_wake_time + return state def load_state(self): + # we use this to store state for both the crawler's internals and + # anything the subclass-specific code needs. The state is stored + # after each bucket is processed, after each prefixdir is processed, + # and after a cycle is complete. The internal keys we use are: + # ["version"]: int, always 1 + # ["last-cycle-finished"]: int, or None if we have not yet finished + # any cycle + # ["current-cycle"]: int, or None if we are sleeping between cycles + # ["last-complete-prefix"]: str, two-letter name of the last prefixdir + # that was fully processed, or None if we + # are sleeping between cycles, or if we + # have not yet finished any prefixdir since + # a cycle was started + # ["last-complete-bucket"]: str, base32 storage index bucket name + # of the last bucket to be processed, or + # None if we are sleeping between cycles try: f = open(self.statefile, "rb") state = pickle.load(f) - lcp = state["last-complete-prefix"] - if lcp == None: - self.last_complete_prefix_index = -1 - else: - self.last_complete_prefix_index = self.prefixes.index(lcp) - self.last_complete_bucket = state["last-complete-bucket"] - self.first_cycle_finished = state["first-cycle-finished"] f.close() except EnvironmentError: + state = {"version": 1, + "last-cycle-finished": None, + "current-cycle": 0, + "last-complete-prefix": None, + "last-complete-bucket": None, + } + self.state = state + lcp = state["last-complete-prefix"] + if lcp == None: self.last_complete_prefix_index = -1 - self.last_complete_bucket = None - self.first_cycle_finished = False + else: + self.last_complete_prefix_index = self.prefixes.index(lcp) + self.add_initial_state() + + def add_initial_state(self): + """Hook method to add extra keys to self.state when first loaded. + + The first time this Crawler is used, or when the code has been + upgraded, the saved state file may not contain all the keys you + expect. Use this method to add any missing keys. Simply modify + self.state as needed. + + This method for subclasses to override. No upcall is necessary. + """ + pass def save_state(self): lcpi = self.last_complete_prefix_index @@ -81,14 +134,10 @@ class ShareCrawler(service.MultiService): last_complete_prefix = None else: last_complete_prefix = self.prefixes[lcpi] - state = {"version": 1, - "last-complete-prefix": last_complete_prefix, - "last-complete-bucket": self.last_complete_bucket, - "first-cycle-finished": self.first_cycle_finished, - } + self.state["last-complete-prefix"] = last_complete_prefix tmpfile = self.statefile + ".tmp" f = open(tmpfile, "wb") - pickle.dump(state, f) + pickle.dump(self.state, f) f.close() fileutil.move_into_place(tmpfile, self.statefile) @@ -105,6 +154,8 @@ class ShareCrawler(service.MultiService): def start_slice(self): self.timer = None + self.current_sleep_time = None + self.next_wake_time = None start_slice = time.time() try: self.start_current_prefix(start_slice) @@ -112,7 +163,8 @@ class ShareCrawler(service.MultiService): except TimeSliceExceeded: finished_cycle = False # either we finished a whole cycle, or we ran out of time - this_slice = time.time() - start_slice + now = time.time() + this_slice = now - start_slice # this_slice/(this_slice+sleep_time) = percentage # this_slice/percentage = this_slice+sleep_time # sleep_time = (this_slice/percentage) - this_slice @@ -128,10 +180,16 @@ class ShareCrawler(service.MultiService): else: self.sleeping_between_cycles = False self.current_sleep_time = sleep_time # for status page + self.next_wake_time = now + sleep_time self.yielding(sleep_time) self.timer = reactor.callLater(sleep_time, self.start_slice) def start_current_prefix(self, start_slice): + if self.state["current-cycle"] is None: + assert self.state["last-cycle-finished"] is not None + self.state["current-cycle"] = self.state["last-cycle-finished"] + 1 + cycle = self.state["current-cycle"] + for i in range(self.last_complete_prefix_index+1, len(self.prefixes)): if time.time() > start_slice + self.cpu_slice: raise TimeSliceExceeded() @@ -147,17 +205,19 @@ class ShareCrawler(service.MultiService): except EnvironmentError: buckets = [] self.bucket_cache = (i, buckets) - self.process_prefixdir(prefixdir, buckets, start_slice) + self.process_prefixdir(cycle, prefix, prefixdir, + buckets, start_slice) self.last_complete_prefix_index = i self.save_state() # yay! we finished the whole cycle self.last_complete_prefix_index = -1 - self.last_complete_bucket = None - self.first_cycle_finished = True + self.state["last-complete-bucket"] = None + self.state["last-cycle-finished"] = cycle + self.state["current-cycle"] = None + self.finished_cycle(cycle) self.save_state() - self.finished_cycle() - def process_prefixdir(self, prefixdir, buckets, start_slice): + def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice): """This gets a list of bucket names (i.e. storage index strings, base32-encoded) in sorted order. @@ -166,20 +226,43 @@ class ShareCrawler(service.MultiService): are being managed by this server. """ for bucket in buckets: - if bucket <= self.last_complete_bucket: + if bucket <= self.state["last-complete-bucket"]: continue if time.time() > start_slice + self.cpu_slice: raise TimeSliceExceeded() - self.process_bucket(prefixdir, bucket) - self.last_complete_bucket = bucket + self.process_bucket(cycle, prefix, prefixdir, bucket) + self.state["last-complete-bucket"] = bucket self.save_state() - def process_bucket(self, prefixdir, storage_index_b32): + def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): + """Examine a single bucket. Subclasses should do whatever they want + to do to the shares therein, then update self.state as necessary. + + This method will be called exactly once per share (per cycle), unless + the crawler was interrupted (by node restart, for example), in which + case it might be called a second time on a bucket which was processed + during the previous node's incarnation. However, in that case, no + changes to self.state will have been recorded. + + This method for subclasses to override. No upcall is necessary. + """ pass - def finished_cycle(self): + def finished_cycle(self, cycle): + """Notify subclass that a cycle (one complete traversal of all + prefixdirs) has just finished. 'cycle' is the number of the cycle + that just finished. This method should perform summary work and + update self.state to publish information to status displays. + + This method for subclasses to override. No upcall is necessary. + """ pass def yielding(self, sleep_time): + """The crawler is about to sleep for 'sleep_time' seconds. This + method is mostly for the convenience of unit tests. + + This method for subclasses to override. No upcall is necessary. + """ pass diff --git a/src/allmydata/test/test_crawler.py b/src/allmydata/test/test_crawler.py index 99456d1e..a5a0f17a 100644 --- a/src/allmydata/test/test_crawler.py +++ b/src/allmydata/test/test_crawler.py @@ -15,23 +15,23 @@ from common_util import StallMixin class BucketEnumeratingCrawler(ShareCrawler): cpu_slice = 500 # make sure it can complete in a single slice - def __init__(self, server, statefile): - ShareCrawler.__init__(self, server, statefile) + def __init__(self, *args, **kwargs): + ShareCrawler.__init__(self, *args, **kwargs) self.all_buckets = [] self.finished_d = defer.Deferred() - def process_bucket(self, prefixdir, storage_index_b32): + def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): self.all_buckets.append(storage_index_b32) - def finished_cycle(self): + def finished_cycle(self, cycle): eventually(self.finished_d.callback, None) class PacedCrawler(ShareCrawler): cpu_slice = 500 # make sure it can complete in a single slice - def __init__(self, server, statefile): - ShareCrawler.__init__(self, server, statefile) + def __init__(self, *args, **kwargs): + ShareCrawler.__init__(self, *args, **kwargs) self.countdown = 6 self.all_buckets = [] self.finished_d = defer.Deferred() - def process_bucket(self, prefixdir, storage_index_b32): + def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): self.all_buckets.append(storage_index_b32) self.countdown -= 1 if self.countdown == 0: @@ -39,7 +39,7 @@ class PacedCrawler(ShareCrawler): self.cpu_slice = -1.0 def yielding(self, sleep_time): self.cpu_slice = 500 - def finished_cycle(self): + def finished_cycle(self, cycle): eventually(self.finished_d.callback, None) class ConsumingCrawler(ShareCrawler): @@ -47,18 +47,18 @@ class ConsumingCrawler(ShareCrawler): allowed_cpu_percentage = 0.5 minimum_cycle_time = 0 - def __init__(self, server, statefile): - ShareCrawler.__init__(self, server, statefile) + def __init__(self, *args, **kwargs): + ShareCrawler.__init__(self, *args, **kwargs) self.accumulated = 0.0 self.cycles = 0 self.last_yield = 0.0 - def process_bucket(self, prefixdir, storage_index_b32): + def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): start = time.time() time.sleep(0.05) elapsed = time.time() - start self.accumulated += elapsed self.last_yield += elapsed - def finished_cycle(self): + def finished_cycle(self, cycle): self.cycles += 1 def yielding(self, sleep_time): self.last_yield = 0.0 @@ -99,7 +99,7 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin): sis = [self.write(i, ss, serverid) for i in range(10)] statefile = os.path.join(self.basedir, "statefile") - c = BucketEnumeratingCrawler(ss, statefile) + c = BucketEnumeratingCrawler(ss, statefile, allowed_cpu_percentage=.1) c.load_state() c.start_current_prefix(time.time()) @@ -322,7 +322,11 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin): # empty methods in the base class def _check(): - return c.first_cycle_finished + return bool(c.state["last-cycle-finished"] is not None) d = self.poll(_check) + def _done(ignored): + state = c.get_state() + self.failUnless(state["last-cycle-finished"] is not None) + d.addCallback(_done) return d