-import os, time
+import time
from twisted.internet import defer
from allmydata.util.deferredutil import for_items
-from allmydata.util.fileutil import get_used_space
from allmydata.util import log
from allmydata.storage.crawler import ShareCrawler
from allmydata.storage.common import si_a2b
-from allmydata.storage.leasedb import SHARETYPES, SHARETYPE_UNKNOWN
+from allmydata.storage.leasedb import SHARETYPES, SHARETYPE_UNKNOWN, SHARETYPE_CORRUPTED
class AccountingCrawler(ShareCrawler):
- Recover from a situation where the leasedb is lost or detectably
corrupted. This is handled in the same way as upgrading.
- Detect shares that have unexpectedly disappeared from storage.
+
+ See ticket #1834 for a proposal to greatly reduce the scope of what I am
+ responsible for, and the times when I might do work.
"""
slow_start = 600 # don't start crawling for 10 minutes after startup
minimum_cycle_time = 12*60*60 # not more than twice per day
- def __init__(self, server, statefile, leasedb):
- ShareCrawler.__init__(self, server, statefile)
+ def __init__(self, backend, statefile, leasedb, clock=None):
+ ShareCrawler.__init__(self, backend, statefile, clock=clock)
self._leasedb = leasedb
- def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
- # assume that we can list every prefixdir in this prefix quickly.
- # Otherwise we have to retain more state between timeslices.
-
- # we define "shareid" as (SI string, shnum)
- disk_shares = set() # shareid
- for si_s in buckets:
- bucketdir = os.path.join(prefixdir, si_s)
- for sharefile in os.listdir(bucketdir):
- try:
- shnum = int(sharefile)
- except ValueError:
- continue # non-numeric means not a sharefile
- shareid = (si_s, shnum)
- disk_shares.add(shareid)
-
- # now check the database for everything in this prefix
- db_sharemap = self._leasedb.get_shares_for_prefix(prefix)
- db_shares = set(db_sharemap)
-
- rec = self.state["cycle-to-date"]["space-recovered"]
- examined_sharesets = [set() for st in xrange(len(SHARETYPES))]
-
- # The lease crawler used to calculate the lease age histogram while
- # crawling shares, and tests currently rely on that, but it would be
- # more efficient to maintain the histogram as leases are added,
- # updated, and removed.
- for key, value in db_sharemap.iteritems():
- (si_s, shnum) = key
- (used_space, sharetype) = value
-
- examined_sharesets[sharetype].add(si_s)
-
- for age in self._leasedb.get_lease_ages(si_a2b(si_s), shnum, start_slice):
- self.add_lease_age_to_histogram(age)
-
- self.increment(rec, "examined-shares", 1)
- self.increment(rec, "examined-sharebytes", used_space)
- self.increment(rec, "examined-shares-" + SHARETYPES[sharetype], 1)
- self.increment(rec, "examined-sharebytes-" + SHARETYPES[sharetype], used_space)
-
- self.increment(rec, "examined-buckets", sum([len(s) for s in examined_sharesets]))
- for st in SHARETYPES:
- self.increment(rec, "examined-buckets-" + SHARETYPES[st], len(examined_sharesets[st]))
-
- # add new shares to the DB
- new_shares = disk_shares - db_shares
- for (si_s, shnum) in new_shares:
- sharefile = os.path.join(prefixdir, si_s, str(shnum))
- used_space = get_used_space(sharefile)
- # FIXME
- sharetype = SHARETYPE_UNKNOWN
- self._leasedb.add_new_share(si_a2b(si_s), shnum, used_space, sharetype)
- self._leasedb.add_starter_lease(si_s, shnum)
-
- # remove disappeared shares from DB
- disappeared_shares = db_shares - disk_shares
- for (si_s, shnum) in disappeared_shares:
- log.msg(format="share SI=%(si_s)s shnum=%(shnum)s unexpectedly disappeared",
- si_s=si_s, shnum=shnum, level=log.WEIRD)
- self._leasedb.remove_deleted_share(si_a2b(si_s), shnum)
-
- recovered_sharesets = [set() for st in xrange(len(SHARETYPES))]
-
- def _delete_share(ign, key, value):
- (si_s, shnum) = key
- (used_space, sharetype) = value
- storage_index = si_a2b(si_s)
+ def process_prefix(self, cycle, prefix, start_slice):
+ # Assume that we can list every prefixdir in this prefix quickly.
+ # Otherwise we would have to retain more state between timeslices.
+
+ d = self.backend.get_sharesets_for_prefix(prefix)
+ def _got_sharesets(sharesets):
+ stored_sharemap = {} # (SI string, shnum) -> (used_space, sharetype)
d2 = defer.succeed(None)
- def _mark_and_delete(ign):
- self._leasedb.mark_share_as_going(storage_index, shnum)
- return self.server.delete_share(storage_index, shnum)
- d2.addCallback(_mark_and_delete)
- def _deleted(ign):
- self._leasedb.remove_deleted_share(storage_index, shnum)
-
- recovered_sharesets[sharetype].add(si_s)
-
- self.increment(rec, "actual-shares", 1)
- self.increment(rec, "actual-sharebytes", used_space)
- self.increment(rec, "actual-shares-" + SHARETYPES[sharetype], 1)
- self.increment(rec, "actual-sharebytes-" + SHARETYPES[sharetype], used_space)
- def _not_deleted(f):
- log.err(format="accounting crawler could not delete share SI=%(si_s)s shnum=%(shnum)s",
- si_s=si_s, shnum=shnum, failure=f, level=log.WEIRD)
- try:
- self._leasedb.mark_share_as_stable(storage_index, shnum)
- except Exception, e:
- log.err(e)
- # discard the failure
- d2.addCallbacks(_deleted, _not_deleted)
+ for shareset in sharesets:
+ d2.addCallback(lambda ign, shareset=shareset: shareset.get_shares())
+ def _got_some_shares( (valid, corrupted) ):
+ for share in valid:
+ shareid = (share.get_storage_index_string(), share.get_shnum())
+ sharetype = SHARETYPE_UNKNOWN # FIXME
+ stored_sharemap[shareid] = (share.get_used_space(), sharetype)
+
+ for share in corrupted:
+ shareid = (share.get_storage_index_string(), share.get_shnum())
+ sharetype = SHARETYPE_CORRUPTED
+ stored_sharemap[shareid] = (share.get_used_space(), sharetype)
+
+ d2.addCallback(_got_some_shares)
+
+ d2.addCallback(lambda ign: stored_sharemap)
return d2
+ d.addCallback(_got_sharesets)
+
+ def _got_stored_sharemap(stored_sharemap):
+ # now check the database for everything in this prefix
+ db_sharemap = self._leasedb.get_shares_for_prefix(prefix)
+
+ rec = self.state["cycle-to-date"]["space-recovered"]
+ examined_sharesets = [set() for st in xrange(len(SHARETYPES))]
+
+ # The lease crawler used to calculate the lease age histogram while
+ # crawling shares, and tests currently rely on that, but it would be
+ # more efficient to maintain the histogram as leases are added,
+ # updated, and removed.
+ for key, value in db_sharemap.iteritems():
+ (si_s, shnum) = key
+ (used_space, sharetype) = value
- unleased_sharemap = self._leasedb.get_unleased_shares_for_prefix(prefix)
- d = for_items(_delete_share, unleased_sharemap)
+ examined_sharesets[sharetype].add(si_s)
- def _inc_recovered_sharesets(ign):
- self.increment(rec, "actual-buckets", sum([len(s) for s in recovered_sharesets]))
+ for age in self._leasedb.get_lease_ages(si_a2b(si_s), shnum, start_slice):
+ self.add_lease_age_to_histogram(age)
+
+ self.increment(rec, "examined-shares", 1)
+ self.increment(rec, "examined-sharebytes", used_space)
+ self.increment(rec, "examined-shares-" + SHARETYPES[sharetype], 1)
+ self.increment(rec, "examined-sharebytes-" + SHARETYPES[sharetype], used_space)
+
+ self.increment(rec, "examined-buckets", sum([len(s) for s in examined_sharesets]))
for st in SHARETYPES:
- self.increment(rec, "actual-buckets-" + SHARETYPES[st], len(recovered_sharesets[st]))
- d.addCallback(_inc_recovered_sharesets)
+ self.increment(rec, "examined-buckets-" + SHARETYPES[st], len(examined_sharesets[st]))
+
+ stored_shares = set(stored_sharemap)
+ db_shares = set(db_sharemap)
+
+ # add new shares to the DB
+ new_shares = stored_shares - db_shares
+ for shareid in new_shares:
+ (si_s, shnum) = shareid
+ (used_space, sharetype) = stored_sharemap[shareid]
+
+ self._leasedb.add_new_share(si_a2b(si_s), shnum, used_space, sharetype)
+ self._leasedb.add_starter_lease(si_s, shnum)
+
+ # remove disappeared shares from DB
+ disappeared_shares = db_shares - stored_shares
+ for (si_s, shnum) in disappeared_shares:
+ log.msg(format="share SI=%(si_s)s shnum=%(shnum)s unexpectedly disappeared",
+ si_s=si_s, shnum=shnum, level=log.WEIRD)
+ self._leasedb.remove_deleted_share(si_a2b(si_s), shnum)
+
+ recovered_sharesets = [set() for st in xrange(len(SHARETYPES))]
+
+ def _delete_share(ign, key, value):
+ (si_s, shnum) = key
+ (used_space, sharetype) = value
+ storage_index = si_a2b(si_s)
+ d3 = defer.succeed(None)
+ def _mark_and_delete(ign):
+ self._leasedb.mark_share_as_going(storage_index, shnum)
+ return self.backend.get_shareset(storage_index).delete_share(shnum)
+ d3.addCallback(_mark_and_delete)
+ def _deleted(ign):
+ self._leasedb.remove_deleted_share(storage_index, shnum)
+
+ recovered_sharesets[sharetype].add(si_s)
+
+ self.increment(rec, "actual-shares", 1)
+ self.increment(rec, "actual-sharebytes", used_space)
+ self.increment(rec, "actual-shares-" + SHARETYPES[sharetype], 1)
+ self.increment(rec, "actual-sharebytes-" + SHARETYPES[sharetype], used_space)
+ def _not_deleted(f):
+ log.err(format="accounting crawler could not delete share SI=%(si_s)s shnum=%(shnum)s",
+ si_s=si_s, shnum=shnum, failure=f, level=log.WEIRD)
+ try:
+ self._leasedb.mark_share_as_stable(storage_index, shnum)
+ except Exception, e:
+ log.err(e)
+ # discard the failure
+ d3.addCallbacks(_deleted, _not_deleted)
+ return d3
+
+ unleased_sharemap = self._leasedb.get_unleased_shares_for_prefix(prefix)
+ d2 = for_items(_delete_share, unleased_sharemap)
+
+ def _inc_recovered_sharesets(ign):
+ self.increment(rec, "actual-buckets", sum([len(s) for s in recovered_sharesets]))
+ for st in SHARETYPES:
+ self.increment(rec, "actual-buckets-" + SHARETYPES[st], len(recovered_sharesets[st]))
+ d2.addCallback(_inc_recovered_sharesets)
+ return d2
+ d.addCallback(_got_stored_sharemap)
return d
# these methods are for outside callers to use
-import os, time, struct
+import time, struct
import cPickle as pickle
from twisted.internet import defer, reactor
from twisted.application import service
+from allmydata.interfaces import IStorageBackend
+
from allmydata.storage.common import si_b2a
from allmydata.util import fileutil
+from allmydata.util.assertutil import precondition
from allmydata.util.deferredutil import HookMixin, async_iterate
class ShareCrawler(HookMixin, service.MultiService):
- """A ShareCrawler subclass is attached to a StorageServer, and
- periodically walks all of its shares, processing each one in some
- fashion. This crawl is rate-limited, to reduce the IO burden on the host,
- since large servers can easily have a terabyte of shares, in several
- million files, which can take hours or days to read.
+ """
+ An instance of a subclass of ShareCrawler is attached to a storage
+ backend, and periodically walks the backend's shares, processing them
+ in some fashion. This crawl is rate-limited to reduce the I/O burden on
+ the host, since large servers can easily have a terabyte of shares in
+ several million files, which can take hours or days to read.
Once the crawler starts a cycle, it will proceed at a rate limited by the
allowed_cpu_proportion= and cpu_slice= parameters: yielding the reactor
long enough to ensure that 'minimum_cycle_time' elapses between the start
of two consecutive cycles.
- We assume that the normal upload/download/get_buckets traffic of a tahoe
+ We assume that the normal upload/download/DYHB traffic of a Tahoe-LAFS
grid will cause the prefixdir contents to be mostly cached in the kernel,
- or that the number of buckets in each prefixdir will be small enough to
- load quickly. A 1TB allmydata.com server was measured to have 2.56M
- buckets, spread into the 1024 prefixdirs, with about 2500 buckets per
- prefix. On this server, each prefixdir took 130ms-200ms to list the first
+ or that the number of sharesets in each prefixdir will be small enough to
+ load quickly. A 1TB allmydata.com server was measured to have 2.56 million
+ sharesets, spread into the 1024 prefixes, with about 2500 sharesets per
+ prefix. On this server, each prefix took 130ms-200ms to list the first
time, and 17ms to list the second time.
- To use a crawler, create a subclass which implements the process_bucket()
- method. It will be called with a prefixdir and a base32 storage index
- string. process_bucket() must run synchronously. Any keys added to
- self.state will be preserved. Override add_initial_state() to set up
- initial state keys. Override finished_cycle() to perform additional
- processing when the cycle is complete. Any status that the crawler
- produces should be put in the self.state dictionary. Status renderers
- (like a web page which describes the accomplishments of your crawler)
- will use crawler.get_state() to retrieve this dictionary; they can
- present the contents as they see fit.
-
- Then create an instance, with a reference to a StorageServer and a
- filename where it can store persistent state. The statefile is used to
- keep track of how far around the ring the process has travelled, as well
- as timing history to allow the pace to be predicted and controlled. The
- statefile will be updated and written to disk after each time slice (just
- before the crawler yields to the reactor), and also after each cycle is
- finished, and also when stopService() is called. Note that this means
- that a crawler which is interrupted with SIGKILL while it is in the
- middle of a time slice will lose progress: the next time the node is
- started, the crawler will repeat some unknown amount of work.
+ To implement a crawler, create a subclass that implements the
+ process_prefix() method. This method may be asynchronous. It will be
+ called with a string prefix. Any keys that it adds to self.state will be
+ preserved. Override add_initial_state() to set up initial state keys.
+ Override finished_cycle() to perform additional processing when the cycle
+ is complete. Any status that the crawler produces should be put in the
+ self.state dictionary. Status renderers (like a web page describing the
+ accomplishments of your crawler) will use crawler.get_state() to retrieve
+ this dictionary; they can present the contents as they see fit.
+
+ Then create an instance, with a reference to a backend object providing
+ the IStorageBackend interface, and a filename where it can store
+ persistent state. The statefile is used to keep track of how far around
+ the ring the process has travelled, as well as timing history to allow
+ the pace to be predicted and controlled. The statefile will be updated
+ and written to disk after each time slice (just before the crawler yields
+ to the reactor), and also after each cycle is finished, and also when
+ stopService() is called. Note that this means that a crawler that is
+ interrupted with SIGKILL while it is in the middle of a time slice will
+ lose progress: the next time the node is started, the crawler will repeat
+ some unknown amount of work.
The crawler instance must be started with startService() before it will
- do any work. To make it stop doing work, call stopService().
+ do any work. To make it stop doing work, call stopService(). A crawler
+ is usually a child service of a StorageServer, although it should not
+ depend on that.
+
+ For historical reasons, some dictionary key names use the term "bucket"
+ for what is now preferably called a "shareset" (the set of shares that a
+ server holds under a given storage index).
+
+ Subclasses should measure time using self.clock.seconds(), rather than
+ time.time(), in order to make themselves deterministically testable.
"""
slow_start = 300 # don't start crawling for 5 minutes after startup
cpu_slice = 1.0 # use up to 1.0 seconds before yielding
minimum_cycle_time = 300 # don't run a cycle faster than this
- def __init__(self, server, statefile, allowed_cpu_proportion=None):
+ def __init__(self, backend, statefile, allowed_cpu_proportion=None, clock=None):
+ precondition(IStorageBackend.providedBy(backend), backend)
service.MultiService.__init__(self)
+ self.backend = backend
+ self.statefile = statefile
if allowed_cpu_proportion is not None:
self.allowed_cpu_proportion = allowed_cpu_proportion
- self.server = server
- self.sharedir = server.sharedir
- self.statefile = statefile
+ self.clock = clock or reactor
self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2]
for i in range(2**10)]
self.prefixes.sort()
self.timer = None
- self.bucket_cache = (None, [])
self.current_sleep_time = None
self.next_wake_time = None
self.last_prefix_finished_time = None
remaining-sleep-time: float, seconds from now when we do more work
estimated-cycle-complete-time-left:
float, seconds remaining until the current cycle is finished.
- TODO: this does not yet include the remaining time left in
- the current prefixdir, and it will be very inaccurate on fast
- crawlers (which can process a whole prefix in a single tick)
+ This does not include the remaining time left in the current
+ prefix, and it will be very inaccurate on fast crawlers
+ (which can process a whole prefix in a single tick)
estimated-time-per-cycle: float, seconds required to do a complete
cycle
if self.last_prefix_elapsed_time is not None:
left = len(self.prefixes) - self.last_complete_prefix_index
remaining = left * self.last_prefix_elapsed_time
- # TODO: remainder of this prefix: we need to estimate the
- # per-bucket time, probably by measuring the time spent on
- # this prefix so far, divided by the number of buckets we've
- # processed.
+
p["estimated-cycle-complete-time-left"] = remaining
# it's possible to call get_progress() from inside a crawler's
# finished_prefix() function
p["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time,
- time.time())
+ self.clock.seconds())
+
per_cycle = None
if self.last_cycle_elapsed_time is not None:
per_cycle = self.last_cycle_elapsed_time
"""I return the current state of the crawler. This is a copy of my
state dictionary.
- If we are not currently sleeping (i.e. get_state() was called from
- inside the process_prefixdir, process_bucket, or finished_cycle()
- methods, or if startService has not yet been called on this crawler),
- these two keys will be None.
-
Subclasses can override this to add computed keys to the return value,
but don't forget to start with the upcall.
"""
return state
def load_state(self):
- # we use this to store state for both the crawler's internals and
+ # We use this to store state for both the crawler's internals and
# anything the subclass-specific code needs. The state is stored
- # after each bucket is processed, after each prefixdir is processed,
- # and after a cycle is complete. The internal keys we use are:
+ # after each prefix is processed, and after a cycle is complete.
+ # The internal keys we use are:
# ["version"]: int, always 1
# ["last-cycle-finished"]: int, or None if we have not yet finished
# any cycle
# are sleeping between cycles, or if we
# have not yet finished any prefixdir since
# a cycle was started
- # ["last-complete-bucket"]: str, base32 storage index bucket name
- # of the last bucket to be processed, or
- # None if we are sleeping between cycles
try:
- f = open(self.statefile, "rb")
- state = pickle.load(f)
- f.close()
+ pickled = fileutil.read(self.statefile)
except Exception:
state = {"version": 1,
"last-cycle-finished": None,
"current-cycle": None,
"last-complete-prefix": None,
- "last-complete-bucket": None,
}
- state.setdefault("current-cycle-start-time", time.time()) # approximate
+ else:
+ state = pickle.loads(pickled)
+
+ state.setdefault("current-cycle-start-time", self.clock.seconds()) # approximate
self.state = state
lcp = state["last-complete-prefix"]
if lcp == None:
else:
last_complete_prefix = self.prefixes[lcpi]
self.state["last-complete-prefix"] = last_complete_prefix
- tmpfile = self.statefile + ".tmp"
- f = open(tmpfile, "wb")
- pickle.dump(self.state, f)
- f.close()
- fileutil.move_into_place(tmpfile, self.statefile)
+ pickled = pickle.dumps(self.state)
+ fileutil.write(self.statefile, pickled)
def startService(self):
# arrange things to look like we were just sleeping, so
# status/progress values work correctly
self.sleeping_between_cycles = True
self.current_sleep_time = self.slow_start
- self.next_wake_time = time.time() + self.slow_start
- self.timer = reactor.callLater(self.slow_start, self.start_slice)
+ self.next_wake_time = self.clock.seconds() + self.slow_start
+ self.timer = self.clock.callLater(self.slow_start, self.start_slice)
service.MultiService.startService(self)
def stopService(self):
return service.MultiService.stopService(self)
def start_slice(self):
- start_slice = time.time()
+ start_slice = self.clock.seconds()
self.timer = None
self.sleeping_between_cycles = False
self.current_sleep_time = None
if not self.running:
# someone might have used stopService() to shut us down
return
- # either we finished a whole cycle, or we ran out of time
- now = time.time()
+
+ # Either we finished a whole cycle, or we ran out of time.
+ now = self.clock.seconds()
this_slice = now - start_slice
+
# this_slice/(this_slice+sleep_time) = percentage
# this_slice/percentage = this_slice+sleep_time
# sleep_time = (this_slice/percentage) - this_slice
sleep_time = (this_slice / self.allowed_cpu_proportion) - this_slice
- # if the math gets weird, or a timequake happens, don't sleep
+
+ # If the math gets weird, or a timequake happens, don't sleep
# forever. Note that this means that, while a cycle is running, we
- # will process at least one bucket every 5 minutes, no matter how
- # long that bucket takes.
+ # will process at least one prefix every 5 minutes, provided prefixes
+ # do not take more than 5 minutes to process.
sleep_time = max(0.0, min(sleep_time, 299))
+
if finished_cycle:
# how long should we sleep between cycles? Don't run faster than
# allowed_cpu_proportion says, but also run faster than
sleep_time = max(sleep_time, self.minimum_cycle_time)
else:
self.sleeping_between_cycles = False
+
self.current_sleep_time = sleep_time # for status page
self.next_wake_time = now + sleep_time
self.yielding(sleep_time)
- self.timer = reactor.callLater(sleep_time, self.start_slice)
+ self.timer = self.clock.callLater(sleep_time, self.start_slice)
d.addCallback(_done)
d.addBoth(self._call_hook, 'yield')
return d
def _do_prefix(self, cycle, i, start_slice):
prefix = self.prefixes[i]
- prefixdir = os.path.join(self.sharedir, prefix)
- if i == self.bucket_cache[0]:
- buckets = self.bucket_cache[1]
- else:
- try:
- buckets = os.listdir(prefixdir)
- buckets.sort()
- except EnvironmentError:
- buckets = []
- self.bucket_cache = (i, buckets)
-
- d = defer.maybeDeferred(self.process_prefixdir,
- cycle, prefix, prefixdir, buckets, start_slice)
+ d = defer.maybeDeferred(self.process_prefix, cycle, prefix, start_slice)
def _done(ign):
self.last_complete_prefix_index = i
d.addCallback(_done)
return d
- def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
- """This gets a list of bucket names (i.e. storage index strings,
- base32-encoded) in sorted order.
- FIXME: it would probably make more sense for the storage indices
- to be binary.
-
- You can override this if your crawler doesn't care about the actual
- shares, for example a crawler which merely keeps track of how many
- buckets are being managed by this server.
-
- Subclasses which *do* care about actual bucket should leave this
- method along, and implement process_bucket() instead.
+ def process_prefix(self, cycle, prefix, start_slice):
"""
-
- for bucket in buckets:
- if bucket <= self.state["last-complete-bucket"]:
- continue
- self.process_bucket(cycle, prefix, prefixdir, bucket)
- self.state["last-complete-bucket"] = bucket
- if time.time() >= start_slice + self.cpu_slice:
- raise TimeSliceExceeded()
+ Called for each prefix.
+ """
+ return defer.succeed(None)
# the remaining methods are explictly for subclasses to implement.
"""
pass
- def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
- """Examine a single bucket. Subclasses should do whatever they want
- to do to the shares therein, then update self.state as necessary.
-
- If the crawler is never interrupted by SIGKILL, this method will be
- called exactly once per share (per cycle). If it *is* interrupted,
- then the next time the node is started, some amount of work will be
- duplicated, according to when self.save_state() was last called. By
- default, save_state() is called at the end of each timeslice, and
- after finished_cycle() returns, and when stopService() is called.
-
- To reduce the chance of duplicate work (i.e. to avoid adding multiple
- records to a database), you can call save_state() at the end of your
- process_bucket() method. This will reduce the maximum duplicated work
- to one bucket per SIGKILL. It will also add overhead, probably 1-20ms
- per bucket (and some disk writes), which will count against your
- allowed_cpu_proportion, and which may be considerable if
- process_bucket() runs quickly.
-
- This method is for subclasses to override. No upcall is necessary.
- """
- pass
-
def finished_prefix(self, cycle, prefix):
"""Notify a subclass that the crawler has just finished processing a
prefix directory (all buckets with the same two-character/10bit
pass
def finished_cycle(self, cycle):
- """Notify subclass that a cycle (one complete traversal of all
- prefixdirs) has just finished. 'cycle' is the number of the cycle
+ """
+ Notify subclass that a cycle (one complete traversal of all
+ prefixes) has just finished. 'cycle' is the number of the cycle
that just finished. This method should perform summary work and
update self.state to publish information to status displays.
pass
def yielding(self, sleep_time):
- """The crawler is about to sleep for 'sleep_time' seconds. This
+ """
+ The crawler is about to sleep for 'sleep_time' seconds. This
method is mostly for the convenience of unit tests.
This method is for subclasses to override. No upcall is necessary.
class BucketCountingCrawler(ShareCrawler):
- """I keep track of how many buckets are being managed by this server.
- This is equivalent to the number of distributed files and directories for
- which I am providing storage. The actual number of files+directories in
- the full grid is probably higher (especially when there are more servers
- than 'N', the number of generated shares), because some files+directories
- will have shares on other servers instead of me. Also note that the
- number of buckets will differ from the number of shares in small grids,
- when more than one share is placed on a single server.
+ """
+ I keep track of how many sharesets, each corresponding to a storage index,
+ are being managed by this server. This is equivalent to the number of
+ distributed files and directories for which I am providing storage. The
+ actual number of files and directories in the full grid is probably higher
+ (especially when there are more servers than 'N', the number of generated
+ shares), because some files and directories will have shares on other
+ servers instead of me. Also note that the number of sharesets will differ
+ from the number of shares in small grids, when more than one share is
+ placed on a single server.
"""
minimum_cycle_time = 60*60 # we don't need this more than once an hour
- def __init__(self, server, statefile, num_sample_prefixes=1):
- ShareCrawler.__init__(self, server, statefile)
- self.num_sample_prefixes = num_sample_prefixes
-
def add_initial_state(self):
# ["bucket-counts"][cyclenum][prefix] = number
# ["last-complete-cycle"] = cyclenum # maintained by base class
# ["last-complete-bucket-count"] = number
- # ["storage-index-samples"][prefix] = (cyclenum,
- # list of SI strings (base32))
self.state.setdefault("bucket-counts", {})
self.state.setdefault("last-complete-bucket-count", None)
- self.state.setdefault("storage-index-samples", {})
-
- def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
- # we override process_prefixdir() because we don't want to look at
- # the individual buckets. We'll save state after each one. On my
- # laptop, a mostly-empty storage server can process about 70
- # prefixdirs in a 1.0s slice.
- if cycle not in self.state["bucket-counts"]:
- self.state["bucket-counts"][cycle] = {}
- self.state["bucket-counts"][cycle][prefix] = len(buckets)
- if prefix in self.prefixes[:self.num_sample_prefixes]:
- self.state["storage-index-samples"][prefix] = (cycle, buckets)
+
+ def process_prefix(self, cycle, prefix, start_slice):
+ # We don't need to look at the individual sharesets.
+ d = self.backend.get_sharesets_for_prefix(prefix)
+ def _got_sharesets(sharesets):
+ if cycle not in self.state["bucket-counts"]:
+ self.state["bucket-counts"][cycle] = {}
+ self.state["bucket-counts"][cycle][prefix] = len(sharesets)
+ d.addCallback(_got_sharesets)
+ return d
def finished_cycle(self, cycle):
last_counts = self.state["bucket-counts"].get(cycle, [])
if len(last_counts) == len(self.prefixes):
# great, we have a whole cycle.
- num_buckets = sum(last_counts.values())
- self.state["last-complete-bucket-count"] = num_buckets
+ num_sharesets = sum(last_counts.values())
+ self.state["last-complete-bucket-count"] = num_sharesets
# get rid of old counts
for old_cycle in list(self.state["bucket-counts"].keys()):
if old_cycle != cycle:
del self.state["bucket-counts"][old_cycle]
- # get rid of old samples too
- for prefix in list(self.state["storage-index-samples"].keys()):
- old_cycle,buckets = self.state["storage-index-samples"][prefix]
- if old_cycle != cycle:
- del self.state["storage-index-samples"][prefix]
-