From: Daira Hopwood Date: Wed, 12 Dec 2012 07:01:59 +0000 (+0000) Subject: Changes to crawler classes (ShareCrawler and AccountingCrawler). X-Git-Url: https://git.rkrishnan.org/%5B/frontends/%22file:/somewhere?a=commitdiff_plain;h=63f181050d8b87addb718e07e4f6976fd7729404;p=tahoe-lafs%2Ftahoe-lafs.git Changes to crawler classes (ShareCrawler and AccountingCrawler). Pass in a Clock to allow (in theory) deterministic testing, although this isn't used yet by tests. Simplify the generic ShareCrawler code by not attempting to track state during processing of a single prefix. Signed-off-by: David-Sarah Hopwood --- diff --git a/src/allmydata/storage/accounting_crawler.py b/src/allmydata/storage/accounting_crawler.py index 493dc54b..1adc9dba 100644 --- a/src/allmydata/storage/accounting_crawler.py +++ b/src/allmydata/storage/accounting_crawler.py @@ -1,14 +1,13 @@ -import os, time +import time from twisted.internet import defer from allmydata.util.deferredutil import for_items -from allmydata.util.fileutil import get_used_space from allmydata.util import log from allmydata.storage.crawler import ShareCrawler from allmydata.storage.common import si_a2b -from allmydata.storage.leasedb import SHARETYPES, SHARETYPE_UNKNOWN +from allmydata.storage.leasedb import SHARETYPES, SHARETYPE_UNKNOWN, SHARETYPE_CORRUPTED class AccountingCrawler(ShareCrawler): @@ -22,116 +21,134 @@ class AccountingCrawler(ShareCrawler): - Recover from a situation where the leasedb is lost or detectably corrupted. This is handled in the same way as upgrading. - Detect shares that have unexpectedly disappeared from storage. + + See ticket #1834 for a proposal to greatly reduce the scope of what I am + responsible for, and the times when I might do work. """ slow_start = 600 # don't start crawling for 10 minutes after startup minimum_cycle_time = 12*60*60 # not more than twice per day - def __init__(self, server, statefile, leasedb): - ShareCrawler.__init__(self, server, statefile) + def __init__(self, backend, statefile, leasedb, clock=None): + ShareCrawler.__init__(self, backend, statefile, clock=clock) self._leasedb = leasedb - def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice): - # assume that we can list every prefixdir in this prefix quickly. - # Otherwise we have to retain more state between timeslices. - - # we define "shareid" as (SI string, shnum) - disk_shares = set() # shareid - for si_s in buckets: - bucketdir = os.path.join(prefixdir, si_s) - for sharefile in os.listdir(bucketdir): - try: - shnum = int(sharefile) - except ValueError: - continue # non-numeric means not a sharefile - shareid = (si_s, shnum) - disk_shares.add(shareid) - - # now check the database for everything in this prefix - db_sharemap = self._leasedb.get_shares_for_prefix(prefix) - db_shares = set(db_sharemap) - - rec = self.state["cycle-to-date"]["space-recovered"] - examined_sharesets = [set() for st in xrange(len(SHARETYPES))] - - # The lease crawler used to calculate the lease age histogram while - # crawling shares, and tests currently rely on that, but it would be - # more efficient to maintain the histogram as leases are added, - # updated, and removed. - for key, value in db_sharemap.iteritems(): - (si_s, shnum) = key - (used_space, sharetype) = value - - examined_sharesets[sharetype].add(si_s) - - for age in self._leasedb.get_lease_ages(si_a2b(si_s), shnum, start_slice): - self.add_lease_age_to_histogram(age) - - self.increment(rec, "examined-shares", 1) - self.increment(rec, "examined-sharebytes", used_space) - self.increment(rec, "examined-shares-" + SHARETYPES[sharetype], 1) - self.increment(rec, "examined-sharebytes-" + SHARETYPES[sharetype], used_space) - - self.increment(rec, "examined-buckets", sum([len(s) for s in examined_sharesets])) - for st in SHARETYPES: - self.increment(rec, "examined-buckets-" + SHARETYPES[st], len(examined_sharesets[st])) - - # add new shares to the DB - new_shares = disk_shares - db_shares - for (si_s, shnum) in new_shares: - sharefile = os.path.join(prefixdir, si_s, str(shnum)) - used_space = get_used_space(sharefile) - # FIXME - sharetype = SHARETYPE_UNKNOWN - self._leasedb.add_new_share(si_a2b(si_s), shnum, used_space, sharetype) - self._leasedb.add_starter_lease(si_s, shnum) - - # remove disappeared shares from DB - disappeared_shares = db_shares - disk_shares - for (si_s, shnum) in disappeared_shares: - log.msg(format="share SI=%(si_s)s shnum=%(shnum)s unexpectedly disappeared", - si_s=si_s, shnum=shnum, level=log.WEIRD) - self._leasedb.remove_deleted_share(si_a2b(si_s), shnum) - - recovered_sharesets = [set() for st in xrange(len(SHARETYPES))] - - def _delete_share(ign, key, value): - (si_s, shnum) = key - (used_space, sharetype) = value - storage_index = si_a2b(si_s) + def process_prefix(self, cycle, prefix, start_slice): + # Assume that we can list every prefixdir in this prefix quickly. + # Otherwise we would have to retain more state between timeslices. + + d = self.backend.get_sharesets_for_prefix(prefix) + def _got_sharesets(sharesets): + stored_sharemap = {} # (SI string, shnum) -> (used_space, sharetype) d2 = defer.succeed(None) - def _mark_and_delete(ign): - self._leasedb.mark_share_as_going(storage_index, shnum) - return self.server.delete_share(storage_index, shnum) - d2.addCallback(_mark_and_delete) - def _deleted(ign): - self._leasedb.remove_deleted_share(storage_index, shnum) - - recovered_sharesets[sharetype].add(si_s) - - self.increment(rec, "actual-shares", 1) - self.increment(rec, "actual-sharebytes", used_space) - self.increment(rec, "actual-shares-" + SHARETYPES[sharetype], 1) - self.increment(rec, "actual-sharebytes-" + SHARETYPES[sharetype], used_space) - def _not_deleted(f): - log.err(format="accounting crawler could not delete share SI=%(si_s)s shnum=%(shnum)s", - si_s=si_s, shnum=shnum, failure=f, level=log.WEIRD) - try: - self._leasedb.mark_share_as_stable(storage_index, shnum) - except Exception, e: - log.err(e) - # discard the failure - d2.addCallbacks(_deleted, _not_deleted) + for shareset in sharesets: + d2.addCallback(lambda ign, shareset=shareset: shareset.get_shares()) + def _got_some_shares( (valid, corrupted) ): + for share in valid: + shareid = (share.get_storage_index_string(), share.get_shnum()) + sharetype = SHARETYPE_UNKNOWN # FIXME + stored_sharemap[shareid] = (share.get_used_space(), sharetype) + + for share in corrupted: + shareid = (share.get_storage_index_string(), share.get_shnum()) + sharetype = SHARETYPE_CORRUPTED + stored_sharemap[shareid] = (share.get_used_space(), sharetype) + + d2.addCallback(_got_some_shares) + + d2.addCallback(lambda ign: stored_sharemap) return d2 + d.addCallback(_got_sharesets) + + def _got_stored_sharemap(stored_sharemap): + # now check the database for everything in this prefix + db_sharemap = self._leasedb.get_shares_for_prefix(prefix) + + rec = self.state["cycle-to-date"]["space-recovered"] + examined_sharesets = [set() for st in xrange(len(SHARETYPES))] + + # The lease crawler used to calculate the lease age histogram while + # crawling shares, and tests currently rely on that, but it would be + # more efficient to maintain the histogram as leases are added, + # updated, and removed. + for key, value in db_sharemap.iteritems(): + (si_s, shnum) = key + (used_space, sharetype) = value - unleased_sharemap = self._leasedb.get_unleased_shares_for_prefix(prefix) - d = for_items(_delete_share, unleased_sharemap) + examined_sharesets[sharetype].add(si_s) - def _inc_recovered_sharesets(ign): - self.increment(rec, "actual-buckets", sum([len(s) for s in recovered_sharesets])) + for age in self._leasedb.get_lease_ages(si_a2b(si_s), shnum, start_slice): + self.add_lease_age_to_histogram(age) + + self.increment(rec, "examined-shares", 1) + self.increment(rec, "examined-sharebytes", used_space) + self.increment(rec, "examined-shares-" + SHARETYPES[sharetype], 1) + self.increment(rec, "examined-sharebytes-" + SHARETYPES[sharetype], used_space) + + self.increment(rec, "examined-buckets", sum([len(s) for s in examined_sharesets])) for st in SHARETYPES: - self.increment(rec, "actual-buckets-" + SHARETYPES[st], len(recovered_sharesets[st])) - d.addCallback(_inc_recovered_sharesets) + self.increment(rec, "examined-buckets-" + SHARETYPES[st], len(examined_sharesets[st])) + + stored_shares = set(stored_sharemap) + db_shares = set(db_sharemap) + + # add new shares to the DB + new_shares = stored_shares - db_shares + for shareid in new_shares: + (si_s, shnum) = shareid + (used_space, sharetype) = stored_sharemap[shareid] + + self._leasedb.add_new_share(si_a2b(si_s), shnum, used_space, sharetype) + self._leasedb.add_starter_lease(si_s, shnum) + + # remove disappeared shares from DB + disappeared_shares = db_shares - stored_shares + for (si_s, shnum) in disappeared_shares: + log.msg(format="share SI=%(si_s)s shnum=%(shnum)s unexpectedly disappeared", + si_s=si_s, shnum=shnum, level=log.WEIRD) + self._leasedb.remove_deleted_share(si_a2b(si_s), shnum) + + recovered_sharesets = [set() for st in xrange(len(SHARETYPES))] + + def _delete_share(ign, key, value): + (si_s, shnum) = key + (used_space, sharetype) = value + storage_index = si_a2b(si_s) + d3 = defer.succeed(None) + def _mark_and_delete(ign): + self._leasedb.mark_share_as_going(storage_index, shnum) + return self.backend.get_shareset(storage_index).delete_share(shnum) + d3.addCallback(_mark_and_delete) + def _deleted(ign): + self._leasedb.remove_deleted_share(storage_index, shnum) + + recovered_sharesets[sharetype].add(si_s) + + self.increment(rec, "actual-shares", 1) + self.increment(rec, "actual-sharebytes", used_space) + self.increment(rec, "actual-shares-" + SHARETYPES[sharetype], 1) + self.increment(rec, "actual-sharebytes-" + SHARETYPES[sharetype], used_space) + def _not_deleted(f): + log.err(format="accounting crawler could not delete share SI=%(si_s)s shnum=%(shnum)s", + si_s=si_s, shnum=shnum, failure=f, level=log.WEIRD) + try: + self._leasedb.mark_share_as_stable(storage_index, shnum) + except Exception, e: + log.err(e) + # discard the failure + d3.addCallbacks(_deleted, _not_deleted) + return d3 + + unleased_sharemap = self._leasedb.get_unleased_shares_for_prefix(prefix) + d2 = for_items(_delete_share, unleased_sharemap) + + def _inc_recovered_sharesets(ign): + self.increment(rec, "actual-buckets", sum([len(s) for s in recovered_sharesets])) + for st in SHARETYPES: + self.increment(rec, "actual-buckets-" + SHARETYPES[st], len(recovered_sharesets[st])) + d2.addCallback(_inc_recovered_sharesets) + return d2 + d.addCallback(_got_stored_sharemap) return d # these methods are for outside callers to use diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py index 571734cf..7659edbe 100644 --- a/src/allmydata/storage/crawler.py +++ b/src/allmydata/storage/crawler.py @@ -1,12 +1,15 @@ -import os, time, struct +import time, struct import cPickle as pickle from twisted.internet import defer, reactor from twisted.application import service +from allmydata.interfaces import IStorageBackend + from allmydata.storage.common import si_b2a from allmydata.util import fileutil +from allmydata.util.assertutil import precondition from allmydata.util.deferredutil import HookMixin, async_iterate @@ -15,11 +18,12 @@ class TimeSliceExceeded(Exception): class ShareCrawler(HookMixin, service.MultiService): - """A ShareCrawler subclass is attached to a StorageServer, and - periodically walks all of its shares, processing each one in some - fashion. This crawl is rate-limited, to reduce the IO burden on the host, - since large servers can easily have a terabyte of shares, in several - million files, which can take hours or days to read. + """ + An instance of a subclass of ShareCrawler is attached to a storage + backend, and periodically walks the backend's shares, processing them + in some fashion. This crawl is rate-limited to reduce the I/O burden on + the host, since large servers can easily have a terabyte of shares in + several million files, which can take hours or days to read. Once the crawler starts a cycle, it will proceed at a rate limited by the allowed_cpu_proportion= and cpu_slice= parameters: yielding the reactor @@ -30,38 +34,47 @@ class ShareCrawler(HookMixin, service.MultiService): long enough to ensure that 'minimum_cycle_time' elapses between the start of two consecutive cycles. - We assume that the normal upload/download/get_buckets traffic of a tahoe + We assume that the normal upload/download/DYHB traffic of a Tahoe-LAFS grid will cause the prefixdir contents to be mostly cached in the kernel, - or that the number of buckets in each prefixdir will be small enough to - load quickly. A 1TB allmydata.com server was measured to have 2.56M - buckets, spread into the 1024 prefixdirs, with about 2500 buckets per - prefix. On this server, each prefixdir took 130ms-200ms to list the first + or that the number of sharesets in each prefixdir will be small enough to + load quickly. A 1TB allmydata.com server was measured to have 2.56 million + sharesets, spread into the 1024 prefixes, with about 2500 sharesets per + prefix. On this server, each prefix took 130ms-200ms to list the first time, and 17ms to list the second time. - To use a crawler, create a subclass which implements the process_bucket() - method. It will be called with a prefixdir and a base32 storage index - string. process_bucket() must run synchronously. Any keys added to - self.state will be preserved. Override add_initial_state() to set up - initial state keys. Override finished_cycle() to perform additional - processing when the cycle is complete. Any status that the crawler - produces should be put in the self.state dictionary. Status renderers - (like a web page which describes the accomplishments of your crawler) - will use crawler.get_state() to retrieve this dictionary; they can - present the contents as they see fit. - - Then create an instance, with a reference to a StorageServer and a - filename where it can store persistent state. The statefile is used to - keep track of how far around the ring the process has travelled, as well - as timing history to allow the pace to be predicted and controlled. The - statefile will be updated and written to disk after each time slice (just - before the crawler yields to the reactor), and also after each cycle is - finished, and also when stopService() is called. Note that this means - that a crawler which is interrupted with SIGKILL while it is in the - middle of a time slice will lose progress: the next time the node is - started, the crawler will repeat some unknown amount of work. + To implement a crawler, create a subclass that implements the + process_prefix() method. This method may be asynchronous. It will be + called with a string prefix. Any keys that it adds to self.state will be + preserved. Override add_initial_state() to set up initial state keys. + Override finished_cycle() to perform additional processing when the cycle + is complete. Any status that the crawler produces should be put in the + self.state dictionary. Status renderers (like a web page describing the + accomplishments of your crawler) will use crawler.get_state() to retrieve + this dictionary; they can present the contents as they see fit. + + Then create an instance, with a reference to a backend object providing + the IStorageBackend interface, and a filename where it can store + persistent state. The statefile is used to keep track of how far around + the ring the process has travelled, as well as timing history to allow + the pace to be predicted and controlled. The statefile will be updated + and written to disk after each time slice (just before the crawler yields + to the reactor), and also after each cycle is finished, and also when + stopService() is called. Note that this means that a crawler that is + interrupted with SIGKILL while it is in the middle of a time slice will + lose progress: the next time the node is started, the crawler will repeat + some unknown amount of work. The crawler instance must be started with startService() before it will - do any work. To make it stop doing work, call stopService(). + do any work. To make it stop doing work, call stopService(). A crawler + is usually a child service of a StorageServer, although it should not + depend on that. + + For historical reasons, some dictionary key names use the term "bucket" + for what is now preferably called a "shareset" (the set of shares that a + server holds under a given storage index). + + Subclasses should measure time using self.clock.seconds(), rather than + time.time(), in order to make themselves deterministically testable. """ slow_start = 300 # don't start crawling for 5 minutes after startup @@ -70,18 +83,18 @@ class ShareCrawler(HookMixin, service.MultiService): cpu_slice = 1.0 # use up to 1.0 seconds before yielding minimum_cycle_time = 300 # don't run a cycle faster than this - def __init__(self, server, statefile, allowed_cpu_proportion=None): + def __init__(self, backend, statefile, allowed_cpu_proportion=None, clock=None): + precondition(IStorageBackend.providedBy(backend), backend) service.MultiService.__init__(self) + self.backend = backend + self.statefile = statefile if allowed_cpu_proportion is not None: self.allowed_cpu_proportion = allowed_cpu_proportion - self.server = server - self.sharedir = server.sharedir - self.statefile = statefile + self.clock = clock or reactor self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2] for i in range(2**10)] self.prefixes.sort() self.timer = None - self.bucket_cache = (None, []) self.current_sleep_time = None self.next_wake_time = None self.last_prefix_finished_time = None @@ -117,9 +130,9 @@ class ShareCrawler(HookMixin, service.MultiService): remaining-sleep-time: float, seconds from now when we do more work estimated-cycle-complete-time-left: float, seconds remaining until the current cycle is finished. - TODO: this does not yet include the remaining time left in - the current prefixdir, and it will be very inaccurate on fast - crawlers (which can process a whole prefix in a single tick) + This does not include the remaining time left in the current + prefix, and it will be very inaccurate on fast crawlers + (which can process a whole prefix in a single tick) estimated-time-per-cycle: float, seconds required to do a complete cycle @@ -146,15 +159,13 @@ class ShareCrawler(HookMixin, service.MultiService): if self.last_prefix_elapsed_time is not None: left = len(self.prefixes) - self.last_complete_prefix_index remaining = left * self.last_prefix_elapsed_time - # TODO: remainder of this prefix: we need to estimate the - # per-bucket time, probably by measuring the time spent on - # this prefix so far, divided by the number of buckets we've - # processed. + p["estimated-cycle-complete-time-left"] = remaining # it's possible to call get_progress() from inside a crawler's # finished_prefix() function p["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time, - time.time()) + self.clock.seconds()) + per_cycle = None if self.last_cycle_elapsed_time is not None: per_cycle = self.last_cycle_elapsed_time @@ -167,11 +178,6 @@ class ShareCrawler(HookMixin, service.MultiService): """I return the current state of the crawler. This is a copy of my state dictionary. - If we are not currently sleeping (i.e. get_state() was called from - inside the process_prefixdir, process_bucket, or finished_cycle() - methods, or if startService has not yet been called on this crawler), - these two keys will be None. - Subclasses can override this to add computed keys to the return value, but don't forget to start with the upcall. """ @@ -179,10 +185,10 @@ class ShareCrawler(HookMixin, service.MultiService): return state def load_state(self): - # we use this to store state for both the crawler's internals and + # We use this to store state for both the crawler's internals and # anything the subclass-specific code needs. The state is stored - # after each bucket is processed, after each prefixdir is processed, - # and after a cycle is complete. The internal keys we use are: + # after each prefix is processed, and after a cycle is complete. + # The internal keys we use are: # ["version"]: int, always 1 # ["last-cycle-finished"]: int, or None if we have not yet finished # any cycle @@ -195,21 +201,18 @@ class ShareCrawler(HookMixin, service.MultiService): # are sleeping between cycles, or if we # have not yet finished any prefixdir since # a cycle was started - # ["last-complete-bucket"]: str, base32 storage index bucket name - # of the last bucket to be processed, or - # None if we are sleeping between cycles try: - f = open(self.statefile, "rb") - state = pickle.load(f) - f.close() + pickled = fileutil.read(self.statefile) except Exception: state = {"version": 1, "last-cycle-finished": None, "current-cycle": None, "last-complete-prefix": None, - "last-complete-bucket": None, } - state.setdefault("current-cycle-start-time", time.time()) # approximate + else: + state = pickle.loads(pickled) + + state.setdefault("current-cycle-start-time", self.clock.seconds()) # approximate self.state = state lcp = state["last-complete-prefix"] if lcp == None: @@ -237,19 +240,16 @@ class ShareCrawler(HookMixin, service.MultiService): else: last_complete_prefix = self.prefixes[lcpi] self.state["last-complete-prefix"] = last_complete_prefix - tmpfile = self.statefile + ".tmp" - f = open(tmpfile, "wb") - pickle.dump(self.state, f) - f.close() - fileutil.move_into_place(tmpfile, self.statefile) + pickled = pickle.dumps(self.state) + fileutil.write(self.statefile, pickled) def startService(self): # arrange things to look like we were just sleeping, so # status/progress values work correctly self.sleeping_between_cycles = True self.current_sleep_time = self.slow_start - self.next_wake_time = time.time() + self.slow_start - self.timer = reactor.callLater(self.slow_start, self.start_slice) + self.next_wake_time = self.clock.seconds() + self.slow_start + self.timer = self.clock.callLater(self.slow_start, self.start_slice) service.MultiService.startService(self) def stopService(self): @@ -260,7 +260,7 @@ class ShareCrawler(HookMixin, service.MultiService): return service.MultiService.stopService(self) def start_slice(self): - start_slice = time.time() + start_slice = self.clock.seconds() self.timer = None self.sleeping_between_cycles = False self.current_sleep_time = None @@ -278,18 +278,22 @@ class ShareCrawler(HookMixin, service.MultiService): if not self.running: # someone might have used stopService() to shut us down return - # either we finished a whole cycle, or we ran out of time - now = time.time() + + # Either we finished a whole cycle, or we ran out of time. + now = self.clock.seconds() this_slice = now - start_slice + # this_slice/(this_slice+sleep_time) = percentage # this_slice/percentage = this_slice+sleep_time # sleep_time = (this_slice/percentage) - this_slice sleep_time = (this_slice / self.allowed_cpu_proportion) - this_slice - # if the math gets weird, or a timequake happens, don't sleep + + # If the math gets weird, or a timequake happens, don't sleep # forever. Note that this means that, while a cycle is running, we - # will process at least one bucket every 5 minutes, no matter how - # long that bucket takes. + # will process at least one prefix every 5 minutes, provided prefixes + # do not take more than 5 minutes to process. sleep_time = max(0.0, min(sleep_time, 299)) + if finished_cycle: # how long should we sleep between cycles? Don't run faster than # allowed_cpu_proportion says, but also run faster than @@ -298,10 +302,11 @@ class ShareCrawler(HookMixin, service.MultiService): sleep_time = max(sleep_time, self.minimum_cycle_time) else: self.sleeping_between_cycles = False + self.current_sleep_time = sleep_time # for status page self.next_wake_time = now + sleep_time self.yielding(sleep_time) - self.timer = reactor.callLater(sleep_time, self.start_slice) + self.timer = self.clock.callLater(sleep_time, self.start_slice) d.addCallback(_done) d.addBoth(self._call_hook, 'yield') return d @@ -345,19 +350,7 @@ class ShareCrawler(HookMixin, service.MultiService): def _do_prefix(self, cycle, i, start_slice): prefix = self.prefixes[i] - prefixdir = os.path.join(self.sharedir, prefix) - if i == self.bucket_cache[0]: - buckets = self.bucket_cache[1] - else: - try: - buckets = os.listdir(prefixdir) - buckets.sort() - except EnvironmentError: - buckets = [] - self.bucket_cache = (i, buckets) - - d = defer.maybeDeferred(self.process_prefixdir, - cycle, prefix, prefixdir, buckets, start_slice) + d = defer.maybeDeferred(self.process_prefix, cycle, prefix, start_slice) def _done(ign): self.last_complete_prefix_index = i @@ -376,27 +369,11 @@ class ShareCrawler(HookMixin, service.MultiService): d.addCallback(_done) return d - def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice): - """This gets a list of bucket names (i.e. storage index strings, - base32-encoded) in sorted order. - FIXME: it would probably make more sense for the storage indices - to be binary. - - You can override this if your crawler doesn't care about the actual - shares, for example a crawler which merely keeps track of how many - buckets are being managed by this server. - - Subclasses which *do* care about actual bucket should leave this - method along, and implement process_bucket() instead. + def process_prefix(self, cycle, prefix, start_slice): """ - - for bucket in buckets: - if bucket <= self.state["last-complete-bucket"]: - continue - self.process_bucket(cycle, prefix, prefixdir, bucket) - self.state["last-complete-bucket"] = bucket - if time.time() >= start_slice + self.cpu_slice: - raise TimeSliceExceeded() + Called for each prefix. + """ + return defer.succeed(None) # the remaining methods are explictly for subclasses to implement. @@ -407,29 +384,6 @@ class ShareCrawler(HookMixin, service.MultiService): """ pass - def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): - """Examine a single bucket. Subclasses should do whatever they want - to do to the shares therein, then update self.state as necessary. - - If the crawler is never interrupted by SIGKILL, this method will be - called exactly once per share (per cycle). If it *is* interrupted, - then the next time the node is started, some amount of work will be - duplicated, according to when self.save_state() was last called. By - default, save_state() is called at the end of each timeslice, and - after finished_cycle() returns, and when stopService() is called. - - To reduce the chance of duplicate work (i.e. to avoid adding multiple - records to a database), you can call save_state() at the end of your - process_bucket() method. This will reduce the maximum duplicated work - to one bucket per SIGKILL. It will also add overhead, probably 1-20ms - per bucket (and some disk writes), which will count against your - allowed_cpu_proportion, and which may be considerable if - process_bucket() runs quickly. - - This method is for subclasses to override. No upcall is necessary. - """ - pass - def finished_prefix(self, cycle, prefix): """Notify a subclass that the crawler has just finished processing a prefix directory (all buckets with the same two-character/10bit @@ -443,8 +397,9 @@ class ShareCrawler(HookMixin, service.MultiService): pass def finished_cycle(self, cycle): - """Notify subclass that a cycle (one complete traversal of all - prefixdirs) has just finished. 'cycle' is the number of the cycle + """ + Notify subclass that a cycle (one complete traversal of all + prefixes) has just finished. 'cycle' is the number of the cycle that just finished. This method should perform summary work and update self.state to publish information to status displays. @@ -460,7 +415,8 @@ class ShareCrawler(HookMixin, service.MultiService): pass def yielding(self, sleep_time): - """The crawler is about to sleep for 'sleep_time' seconds. This + """ + The crawler is about to sleep for 'sleep_time' seconds. This method is mostly for the convenience of unit tests. This method is for subclasses to override. No upcall is necessary. @@ -469,56 +425,44 @@ class ShareCrawler(HookMixin, service.MultiService): class BucketCountingCrawler(ShareCrawler): - """I keep track of how many buckets are being managed by this server. - This is equivalent to the number of distributed files and directories for - which I am providing storage. The actual number of files+directories in - the full grid is probably higher (especially when there are more servers - than 'N', the number of generated shares), because some files+directories - will have shares on other servers instead of me. Also note that the - number of buckets will differ from the number of shares in small grids, - when more than one share is placed on a single server. + """ + I keep track of how many sharesets, each corresponding to a storage index, + are being managed by this server. This is equivalent to the number of + distributed files and directories for which I am providing storage. The + actual number of files and directories in the full grid is probably higher + (especially when there are more servers than 'N', the number of generated + shares), because some files and directories will have shares on other + servers instead of me. Also note that the number of sharesets will differ + from the number of shares in small grids, when more than one share is + placed on a single server. """ minimum_cycle_time = 60*60 # we don't need this more than once an hour - def __init__(self, server, statefile, num_sample_prefixes=1): - ShareCrawler.__init__(self, server, statefile) - self.num_sample_prefixes = num_sample_prefixes - def add_initial_state(self): # ["bucket-counts"][cyclenum][prefix] = number # ["last-complete-cycle"] = cyclenum # maintained by base class # ["last-complete-bucket-count"] = number - # ["storage-index-samples"][prefix] = (cyclenum, - # list of SI strings (base32)) self.state.setdefault("bucket-counts", {}) self.state.setdefault("last-complete-bucket-count", None) - self.state.setdefault("storage-index-samples", {}) - - def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice): - # we override process_prefixdir() because we don't want to look at - # the individual buckets. We'll save state after each one. On my - # laptop, a mostly-empty storage server can process about 70 - # prefixdirs in a 1.0s slice. - if cycle not in self.state["bucket-counts"]: - self.state["bucket-counts"][cycle] = {} - self.state["bucket-counts"][cycle][prefix] = len(buckets) - if prefix in self.prefixes[:self.num_sample_prefixes]: - self.state["storage-index-samples"][prefix] = (cycle, buckets) + + def process_prefix(self, cycle, prefix, start_slice): + # We don't need to look at the individual sharesets. + d = self.backend.get_sharesets_for_prefix(prefix) + def _got_sharesets(sharesets): + if cycle not in self.state["bucket-counts"]: + self.state["bucket-counts"][cycle] = {} + self.state["bucket-counts"][cycle][prefix] = len(sharesets) + d.addCallback(_got_sharesets) + return d def finished_cycle(self, cycle): last_counts = self.state["bucket-counts"].get(cycle, []) if len(last_counts) == len(self.prefixes): # great, we have a whole cycle. - num_buckets = sum(last_counts.values()) - self.state["last-complete-bucket-count"] = num_buckets + num_sharesets = sum(last_counts.values()) + self.state["last-complete-bucket-count"] = num_sharesets # get rid of old counts for old_cycle in list(self.state["bucket-counts"].keys()): if old_cycle != cycle: del self.state["bucket-counts"][old_cycle] - # get rid of old samples too - for prefix in list(self.state["storage-index-samples"].keys()): - old_cycle,buckets = self.state["storage-index-samples"][prefix] - if old_cycle != cycle: - del self.state["storage-index-samples"][prefix] -