-import os, time, struct, pickle
+import os, time, struct
+import cPickle as pickle
from twisted.internet import reactor
from twisted.application import service
from allmydata.storage.server import si_b2a
To use this, create a subclass which implements the process_bucket()
method. It will be called with a prefixdir and a base32 storage index
- string. process_bucket() should run synchronously.
+ string. process_bucket() should run synchronously. Any keys added to
+ self.state will be preserved. Override add_initial_state() to set up
+ initial state keys. Override finished_cycle() to perform additional
+ processing when the cycle is complete.
Then create an instance, with a reference to a StorageServer and a
filename where it can store persistent state. The statefile is used to
the Deferred that it returns.
"""
- # use up to 10% of the CPU, on average. This can be changed at any time.
- allowed_cpu_percentage = .10
- # use up to 1.0 seconds before yielding. This can be changed at any time.
- cpu_slice = 1.0
- # don't run a cycle faster than this
- minimum_cycle_time = 300
+ # all three of these can be changed at any time
+ allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average
+ cpu_slice = 1.0 # use up to 1.0 seconds before yielding
+ minimum_cycle_time = 300 # don't run a cycle faster than this
- def __init__(self, server, statefile):
+ def __init__(self, server, statefile, allowed_cpu_percentage=None):
service.MultiService.__init__(self)
+ if allowed_cpu_percentage is not None:
+ self.allowed_cpu_percentage = allowed_cpu_percentage
self.server = server
self.sharedir = server.sharedir
self.statefile = statefile
self.prefixes.sort()
self.timer = None
self.bucket_cache = (None, [])
- self.first_cycle_finished = False
+ self.current_sleep_time = None
+ self.next_wake_time = None
+
+ def get_state(self):
+ """I return the current state of the crawler. This is a copy of my
+ state dictionary, plus the following keys::
+
+ current-sleep-time: float, duration of our current sleep
+ next-wake-time: float, seconds-since-epoch of when we will next wake
+
+ If we are not currently sleeping (i.e. get_state() was called from
+ inside the process_prefixdir, process_bucket, or finished_cycle()
+ methods, or if startService has not yet been called on this crawler),
+ these two keys will be None.
+ """
+ state = self.state.copy() # it isn't a deepcopy, so don't go crazy
+ state["current-sleep-time"] = self.current_sleep_time
+ state["next-wake-time"] = self.next_wake_time
+ return state
def load_state(self):
+ # we use this to store state for both the crawler's internals and
+ # anything the subclass-specific code needs. The state is stored
+ # after each bucket is processed, after each prefixdir is processed,
+ # and after a cycle is complete. The internal keys we use are:
+ # ["version"]: int, always 1
+ # ["last-cycle-finished"]: int, or None if we have not yet finished
+ # any cycle
+ # ["current-cycle"]: int, or None if we are sleeping between cycles
+ # ["last-complete-prefix"]: str, two-letter name of the last prefixdir
+ # that was fully processed, or None if we
+ # are sleeping between cycles, or if we
+ # have not yet finished any prefixdir since
+ # a cycle was started
+ # ["last-complete-bucket"]: str, base32 storage index bucket name
+ # of the last bucket to be processed, or
+ # None if we are sleeping between cycles
try:
f = open(self.statefile, "rb")
state = pickle.load(f)
- lcp = state["last-complete-prefix"]
- if lcp == None:
- self.last_complete_prefix_index = -1
- else:
- self.last_complete_prefix_index = self.prefixes.index(lcp)
- self.last_complete_bucket = state["last-complete-bucket"]
- self.first_cycle_finished = state["first-cycle-finished"]
f.close()
except EnvironmentError:
+ state = {"version": 1,
+ "last-cycle-finished": None,
+ "current-cycle": 0,
+ "last-complete-prefix": None,
+ "last-complete-bucket": None,
+ }
+ self.state = state
+ lcp = state["last-complete-prefix"]
+ if lcp == None:
self.last_complete_prefix_index = -1
- self.last_complete_bucket = None
- self.first_cycle_finished = False
+ else:
+ self.last_complete_prefix_index = self.prefixes.index(lcp)
+ self.add_initial_state()
+
+ def add_initial_state(self):
+ """Hook method to add extra keys to self.state when first loaded.
+
+ The first time this Crawler is used, or when the code has been
+ upgraded, the saved state file may not contain all the keys you
+ expect. Use this method to add any missing keys. Simply modify
+ self.state as needed.
+
+ This method for subclasses to override. No upcall is necessary.
+ """
+ pass
def save_state(self):
lcpi = self.last_complete_prefix_index
last_complete_prefix = None
else:
last_complete_prefix = self.prefixes[lcpi]
- state = {"version": 1,
- "last-complete-prefix": last_complete_prefix,
- "last-complete-bucket": self.last_complete_bucket,
- "first-cycle-finished": self.first_cycle_finished,
- }
+ self.state["last-complete-prefix"] = last_complete_prefix
tmpfile = self.statefile + ".tmp"
f = open(tmpfile, "wb")
- pickle.dump(state, f)
+ pickle.dump(self.state, f)
f.close()
fileutil.move_into_place(tmpfile, self.statefile)
def start_slice(self):
self.timer = None
+ self.current_sleep_time = None
+ self.next_wake_time = None
start_slice = time.time()
try:
self.start_current_prefix(start_slice)
except TimeSliceExceeded:
finished_cycle = False
# either we finished a whole cycle, or we ran out of time
- this_slice = time.time() - start_slice
+ now = time.time()
+ this_slice = now - start_slice
# this_slice/(this_slice+sleep_time) = percentage
# this_slice/percentage = this_slice+sleep_time
# sleep_time = (this_slice/percentage) - this_slice
else:
self.sleeping_between_cycles = False
self.current_sleep_time = sleep_time # for status page
+ self.next_wake_time = now + sleep_time
self.yielding(sleep_time)
self.timer = reactor.callLater(sleep_time, self.start_slice)
def start_current_prefix(self, start_slice):
+ if self.state["current-cycle"] is None:
+ assert self.state["last-cycle-finished"] is not None
+ self.state["current-cycle"] = self.state["last-cycle-finished"] + 1
+ cycle = self.state["current-cycle"]
+
for i in range(self.last_complete_prefix_index+1, len(self.prefixes)):
if time.time() > start_slice + self.cpu_slice:
raise TimeSliceExceeded()
except EnvironmentError:
buckets = []
self.bucket_cache = (i, buckets)
- self.process_prefixdir(prefixdir, buckets, start_slice)
+ self.process_prefixdir(cycle, prefix, prefixdir,
+ buckets, start_slice)
self.last_complete_prefix_index = i
self.save_state()
# yay! we finished the whole cycle
self.last_complete_prefix_index = -1
- self.last_complete_bucket = None
- self.first_cycle_finished = True
+ self.state["last-complete-bucket"] = None
+ self.state["last-cycle-finished"] = cycle
+ self.state["current-cycle"] = None
+ self.finished_cycle(cycle)
self.save_state()
- self.finished_cycle()
- def process_prefixdir(self, prefixdir, buckets, start_slice):
+ def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
"""This gets a list of bucket names (i.e. storage index strings,
base32-encoded) in sorted order.
are being managed by this server.
"""
for bucket in buckets:
- if bucket <= self.last_complete_bucket:
+ if bucket <= self.state["last-complete-bucket"]:
continue
if time.time() > start_slice + self.cpu_slice:
raise TimeSliceExceeded()
- self.process_bucket(prefixdir, bucket)
- self.last_complete_bucket = bucket
+ self.process_bucket(cycle, prefix, prefixdir, bucket)
+ self.state["last-complete-bucket"] = bucket
self.save_state()
- def process_bucket(self, prefixdir, storage_index_b32):
+ def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
+ """Examine a single bucket. Subclasses should do whatever they want
+ to do to the shares therein, then update self.state as necessary.
+
+ This method will be called exactly once per share (per cycle), unless
+ the crawler was interrupted (by node restart, for example), in which
+ case it might be called a second time on a bucket which was processed
+ during the previous node's incarnation. However, in that case, no
+ changes to self.state will have been recorded.
+
+ This method for subclasses to override. No upcall is necessary.
+ """
pass
- def finished_cycle(self):
+ def finished_cycle(self, cycle):
+ """Notify subclass that a cycle (one complete traversal of all
+ prefixdirs) has just finished. 'cycle' is the number of the cycle
+ that just finished. This method should perform summary work and
+ update self.state to publish information to status displays.
+
+ This method for subclasses to override. No upcall is necessary.
+ """
pass
def yielding(self, sleep_time):
+ """The crawler is about to sleep for 'sleep_time' seconds. This
+ method is mostly for the convenience of unit tests.
+
+ This method for subclasses to override. No upcall is necessary.
+ """
pass
class BucketEnumeratingCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice
- def __init__(self, server, statefile):
- ShareCrawler.__init__(self, server, statefile)
+ def __init__(self, *args, **kwargs):
+ ShareCrawler.__init__(self, *args, **kwargs)
self.all_buckets = []
self.finished_d = defer.Deferred()
- def process_bucket(self, prefixdir, storage_index_b32):
+ def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
self.all_buckets.append(storage_index_b32)
- def finished_cycle(self):
+ def finished_cycle(self, cycle):
eventually(self.finished_d.callback, None)
class PacedCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice
- def __init__(self, server, statefile):
- ShareCrawler.__init__(self, server, statefile)
+ def __init__(self, *args, **kwargs):
+ ShareCrawler.__init__(self, *args, **kwargs)
self.countdown = 6
self.all_buckets = []
self.finished_d = defer.Deferred()
- def process_bucket(self, prefixdir, storage_index_b32):
+ def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
self.all_buckets.append(storage_index_b32)
self.countdown -= 1
if self.countdown == 0:
self.cpu_slice = -1.0
def yielding(self, sleep_time):
self.cpu_slice = 500
- def finished_cycle(self):
+ def finished_cycle(self, cycle):
eventually(self.finished_d.callback, None)
class ConsumingCrawler(ShareCrawler):
allowed_cpu_percentage = 0.5
minimum_cycle_time = 0
- def __init__(self, server, statefile):
- ShareCrawler.__init__(self, server, statefile)
+ def __init__(self, *args, **kwargs):
+ ShareCrawler.__init__(self, *args, **kwargs)
self.accumulated = 0.0
self.cycles = 0
self.last_yield = 0.0
- def process_bucket(self, prefixdir, storage_index_b32):
+ def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
start = time.time()
time.sleep(0.05)
elapsed = time.time() - start
self.accumulated += elapsed
self.last_yield += elapsed
- def finished_cycle(self):
+ def finished_cycle(self, cycle):
self.cycles += 1
def yielding(self, sleep_time):
self.last_yield = 0.0
sis = [self.write(i, ss, serverid) for i in range(10)]
statefile = os.path.join(self.basedir, "statefile")
- c = BucketEnumeratingCrawler(ss, statefile)
+ c = BucketEnumeratingCrawler(ss, statefile, allowed_cpu_percentage=.1)
c.load_state()
c.start_current_prefix(time.time())
# empty methods in the base class
def _check():
- return c.first_cycle_finished
+ return bool(c.state["last-cycle-finished"] is not None)
d = self.poll(_check)
+ def _done(ignored):
+ state = c.get_state()
+ self.failUnless(state["last-cycle-finished"] is not None)
+ d.addCallback(_done)
return d