(which is going away, to be replaced by the AccountingCrawler).
Signed-off-by: David-Sarah Hopwood <david-sarah@jacaranda.org>
import os, time, struct
import cPickle as pickle
-from twisted.internet import reactor
+
+from twisted.internet import defer, reactor
from twisted.application import service
+
from allmydata.storage.common import si_b2a
from allmydata.util import fileutil
+from allmydata.util.deferredutil import HookMixin, async_iterate
+
class TimeSliceExceeded(Exception):
pass
-class ShareCrawler(service.MultiService):
+
+class ShareCrawler(HookMixin, service.MultiService):
"""A ShareCrawler subclass is attached to a StorageServer, and
periodically walks all of its shares, processing each one in some
fashion. This crawl is rate-limited, to reduce the IO burden on the host,
million files, which can take hours or days to read.
Once the crawler starts a cycle, it will proceed at a rate limited by the
- allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor
+ allowed_cpu_proportion= and cpu_slice= parameters: yielding the reactor
after it has worked for 'cpu_slice' seconds, and not resuming right away,
- always trying to use less than 'allowed_cpu_percentage'.
+ always trying to use less than 'allowed_cpu_proportion'.
Once the crawler finishes a cycle, it will put off starting the next one
long enough to ensure that 'minimum_cycle_time' elapses between the start
slow_start = 300 # don't start crawling for 5 minutes after startup
# all three of these can be changed at any time
- allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average
+ allowed_cpu_proportion = .10 # use up to 10% of the CPU, on average
cpu_slice = 1.0 # use up to 1.0 seconds before yielding
minimum_cycle_time = 300 # don't run a cycle faster than this
- def __init__(self, server, statefile, allowed_cpu_percentage=None):
+ def __init__(self, server, statefile, allowed_cpu_proportion=None):
service.MultiService.__init__(self)
- if allowed_cpu_percentage is not None:
- self.allowed_cpu_percentage = allowed_cpu_percentage
+ if allowed_cpu_proportion is not None:
+ self.allowed_cpu_proportion = allowed_cpu_proportion
self.server = server
self.sharedir = server.sharedir
self.statefile = statefile
self.last_cycle_elapsed_time = None
self.load_state()
+ # used by tests
+ self._hooks = {'after_prefix': None, 'after_cycle': None, 'yield': None}
+
def minus_or_none(self, a, b):
if a is None:
return None
self.sleeping_between_cycles = False
self.current_sleep_time = None
self.next_wake_time = None
- try:
- self.start_current_prefix(start_slice)
- finished_cycle = True
- except TimeSliceExceeded:
- finished_cycle = False
- self.save_state()
- if not self.running:
- # someone might have used stopService() to shut us down
- return
- # either we finished a whole cycle, or we ran out of time
- now = time.time()
- this_slice = now - start_slice
- # this_slice/(this_slice+sleep_time) = percentage
- # this_slice/percentage = this_slice+sleep_time
- # sleep_time = (this_slice/percentage) - this_slice
- sleep_time = (this_slice / self.allowed_cpu_percentage) - this_slice
- # if the math gets weird, or a timequake happens, don't sleep
- # forever. Note that this means that, while a cycle is running, we
- # will process at least one bucket every 5 minutes, no matter how
- # long that bucket takes.
- sleep_time = max(0.0, min(sleep_time, 299))
- if finished_cycle:
- # how long should we sleep between cycles? Don't run faster than
- # allowed_cpu_percentage says, but also run faster than
- # minimum_cycle_time
- self.sleeping_between_cycles = True
- sleep_time = max(sleep_time, self.minimum_cycle_time)
- else:
- self.sleeping_between_cycles = False
- self.current_sleep_time = sleep_time # for status page
- self.next_wake_time = now + sleep_time
- self.yielding(sleep_time)
- self.timer = reactor.callLater(sleep_time, self.start_slice)
+
+ d = self.start_current_prefix(start_slice)
+ def _err(f):
+ f.trap(TimeSliceExceeded)
+ return False
+ def _ok(ign):
+ return True
+ d.addCallbacks(_ok, _err)
+ def _done(finished_cycle):
+ self.save_state()
+ if not self.running:
+ # someone might have used stopService() to shut us down
+ return
+ # either we finished a whole cycle, or we ran out of time
+ now = time.time()
+ this_slice = now - start_slice
+ # this_slice/(this_slice+sleep_time) = percentage
+ # this_slice/percentage = this_slice+sleep_time
+ # sleep_time = (this_slice/percentage) - this_slice
+ sleep_time = (this_slice / self.allowed_cpu_proportion) - this_slice
+ # if the math gets weird, or a timequake happens, don't sleep
+ # forever. Note that this means that, while a cycle is running, we
+ # will process at least one bucket every 5 minutes, no matter how
+ # long that bucket takes.
+ sleep_time = max(0.0, min(sleep_time, 299))
+ if finished_cycle:
+ # how long should we sleep between cycles? Don't run faster than
+ # allowed_cpu_proportion says, but also run faster than
+ # minimum_cycle_time
+ self.sleeping_between_cycles = True
+ sleep_time = max(sleep_time, self.minimum_cycle_time)
+ else:
+ self.sleeping_between_cycles = False
+ self.current_sleep_time = sleep_time # for status page
+ self.next_wake_time = now + sleep_time
+ self.yielding(sleep_time)
+ self.timer = reactor.callLater(sleep_time, self.start_slice)
+ d.addCallback(_done)
+ d.addBoth(self._call_hook, 'yield')
+ return d
def start_current_prefix(self, start_slice):
state = self.state
self.started_cycle(state["current-cycle"])
cycle = state["current-cycle"]
- for i in range(self.last_complete_prefix_index+1, len(self.prefixes)):
- # if we want to yield earlier, just raise TimeSliceExceeded()
- prefix = self.prefixes[i]
- prefixdir = os.path.join(self.sharedir, prefix)
- if i == self.bucket_cache[0]:
- buckets = self.bucket_cache[1]
- else:
- try:
- buckets = os.listdir(prefixdir)
- buckets.sort()
- except EnvironmentError:
- buckets = []
- self.bucket_cache = (i, buckets)
- self.process_prefixdir(cycle, prefix, prefixdir,
- buckets, start_slice)
+ def _prefix_loop(i):
+ d2 = self._do_prefix(cycle, i, start_slice)
+ d2.addBoth(self._call_hook, 'after_prefix')
+ d2.addCallback(lambda ign: True)
+ return d2
+ d = async_iterate(_prefix_loop, xrange(self.last_complete_prefix_index + 1, len(self.prefixes)))
+
+ def _cycle_done(ign):
+ # yay! we finished the whole cycle
+ self.last_complete_prefix_index = -1
+ self.last_prefix_finished_time = None # don't include the sleep
+ now = time.time()
+ if self.last_cycle_started_time is not None:
+ self.last_cycle_elapsed_time = now - self.last_cycle_started_time
+ state["last-complete-bucket"] = None
+ state["last-cycle-finished"] = cycle
+ state["current-cycle"] = None
+
+ self.finished_cycle(cycle)
+ self.save_state()
+ return cycle
+ d.addCallback(_cycle_done)
+ d.addBoth(self._call_hook, 'after_cycle')
+ return d
+
+ def _do_prefix(self, cycle, i, start_slice):
+ prefix = self.prefixes[i]
+ prefixdir = os.path.join(self.sharedir, prefix)
+ if i == self.bucket_cache[0]:
+ buckets = self.bucket_cache[1]
+ else:
+ try:
+ buckets = os.listdir(prefixdir)
+ buckets.sort()
+ except EnvironmentError:
+ buckets = []
+ self.bucket_cache = (i, buckets)
+
+ d = defer.maybeDeferred(self.process_prefixdir,
+ cycle, prefix, prefixdir, buckets, start_slice)
+ def _done(ign):
self.last_complete_prefix_index = i
now = time.time()
self.last_prefix_finished_time = now
self.finished_prefix(cycle, prefix)
+
if time.time() >= start_slice + self.cpu_slice:
raise TimeSliceExceeded()
- # yay! we finished the whole cycle
- self.last_complete_prefix_index = -1
- self.last_prefix_finished_time = None # don't include the sleep
- now = time.time()
- if self.last_cycle_started_time is not None:
- self.last_cycle_elapsed_time = now - self.last_cycle_started_time
- state["last-complete-bucket"] = None
- state["last-cycle-finished"] = cycle
- state["current-cycle"] = None
- self.finished_cycle(cycle)
- self.save_state()
+ return prefix
+ d.addCallback(_done)
+ return d
def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
"""This gets a list of bucket names (i.e. storage index strings,
base32-encoded) in sorted order.
+ FIXME: it would probably make more sense for the storage indices
+ to be binary.
You can override this if your crawler doesn't care about the actual
shares, for example a crawler which merely keeps track of how many
process_bucket() method. This will reduce the maximum duplicated work
to one bucket per SIGKILL. It will also add overhead, probably 1-20ms
per bucket (and some disk writes), which will count against your
- allowed_cpu_percentage, and which may be considerable if
+ allowed_cpu_proportion, and which may be considerable if
process_bucket() runs quickly.
This method is for subclasses to override. No upcall is necessary.
return filenode
+class CrawlerTestMixin:
+ def _wait_for_yield(self, res, crawler):
+ """
+ Wait for the crawler to yield. This should be called at the end of a test
+ so that we leave a clean reactor.
+ """
+ if isinstance(res, failure.Failure):
+ print res
+ d = crawler.set_hook('yield')
+ d.addCallback(lambda ign: res)
+ return d
+
+ def _after_prefix(self, prefix, target_prefix, crawler):
+ """
+ Wait for the crawler to reach a given target_prefix. Return a deferred
+ for the crawler state at that point.
+ """
+ if prefix != target_prefix:
+ d = crawler.set_hook('after_prefix')
+ d.addCallback(self._after_prefix, target_prefix, crawler)
+ return d
+
+ crawler.save_state()
+ state = crawler.get_state()
+ self.failUnlessEqual(prefix, state["last-complete-prefix"])
+ return defer.succeed(state)
+
+
class LoggingServiceParent(service.MultiService):
def log(self, *args, **kwargs):
return log.msg(*args, **kwargs)
from twisted.trial import unittest
from twisted.application import service
from twisted.internet import defer
-from foolscap.api import eventually, fireEventually
+from foolscap.api import fireEventually
-from allmydata.util import fileutil, hashutil, pollmixin
+from allmydata.util import fileutil, hashutil
from allmydata.storage.server import StorageServer, si_b2a
-from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded
+from allmydata.storage.crawler import ShareCrawler
from allmydata.test.test_storage import FakeCanary
+from allmydata.test.common import CrawlerTestMixin
from allmydata.test.common_util import StallMixin
-class BucketEnumeratingCrawler(ShareCrawler):
- cpu_slice = 500 # make sure it can complete in a single slice
- slow_start = 0
- def __init__(self, *args, **kwargs):
- ShareCrawler.__init__(self, *args, **kwargs)
- self.all_buckets = []
- self.finished_d = defer.Deferred()
- def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
- self.all_buckets.append(storage_index_b32)
- def finished_cycle(self, cycle):
- eventually(self.finished_d.callback, None)
-class PacedCrawler(ShareCrawler):
+class EnumeratingCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice
slow_start = 0
+
def __init__(self, *args, **kwargs):
ShareCrawler.__init__(self, *args, **kwargs)
- self.countdown = 6
- self.all_buckets = []
- self.finished_d = defer.Deferred()
- self.yield_cb = None
+ self.sharesets = []
+
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
- self.all_buckets.append(storage_index_b32)
- self.countdown -= 1
- if self.countdown == 0:
- # force a timeout. We restore it in yielding()
- self.cpu_slice = -1.0
- def yielding(self, sleep_time):
- self.cpu_slice = 500
- if self.yield_cb:
- self.yield_cb()
- def finished_cycle(self, cycle):
- eventually(self.finished_d.callback, None)
+ self.sharesets.append(storage_index_b32)
+
class ConsumingCrawler(ShareCrawler):
cpu_slice = 0.5
- allowed_cpu_percentage = 0.5
+ allowed_cpu_proportion = 0.5
minimum_cycle_time = 0
slow_start = 0
self.accumulated = 0.0
self.cycles = 0
self.last_yield = 0.0
+
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
start = time.time()
time.sleep(0.05)
elapsed = time.time() - start
self.accumulated += elapsed
self.last_yield += elapsed
+
def finished_cycle(self, cycle):
self.cycles += 1
+
def yielding(self, sleep_time):
self.last_yield = 0.0
-class OneShotCrawler(ShareCrawler):
- cpu_slice = 500 # make sure it can complete in a single slice
- slow_start = 0
- def __init__(self, *args, **kwargs):
- ShareCrawler.__init__(self, *args, **kwargs)
- self.counter = 0
- self.finished_d = defer.Deferred()
- def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
- self.counter += 1
- def finished_cycle(self, cycle):
- self.finished_d.callback(None)
- self.disownServiceParent()
-class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
+class Basic(unittest.TestCase, StallMixin, CrawlerTestMixin):
def setUp(self):
self.s = service.MultiService()
self.s.startService()
def cs(self, i, serverid):
return hashutil.bucket_cancel_secret_hash(str(i), serverid)
+ def create(self, basedir):
+ self.basedir = basedir
+ fileutil.make_dirs(basedir)
+ self.serverid = "\x00" * 20
+ server = StorageServer(basedir, self.serverid)
+ server.setServiceParent(self.s)
+ return server
+
def write(self, i, ss, serverid, tail=0):
si = self.si(i)
si = si[:-1] + chr(tail)
made[0].remote_close()
return si_b2a(si)
- def test_immediate(self):
- self.basedir = "crawler/Basic/immediate"
- fileutil.make_dirs(self.basedir)
- serverid = "\x00" * 20
- ss = StorageServer(self.basedir, serverid)
- ss.setServiceParent(self.s)
-
- sis = [self.write(i, ss, serverid) for i in range(10)]
- statefile = os.path.join(self.basedir, "statefile")
-
- c = BucketEnumeratingCrawler(ss, statefile, allowed_cpu_percentage=.1)
- c.load_state()
-
- c.start_current_prefix(time.time())
- self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
-
- # make sure the statefile has been returned to the starting point
- c.finished_d = defer.Deferred()
- c.all_buckets = []
- c.start_current_prefix(time.time())
- self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
-
- # check that a new crawler picks up on the state file properly
- c2 = BucketEnumeratingCrawler(ss, statefile)
- c2.load_state()
-
- c2.start_current_prefix(time.time())
- self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
-
def test_service(self):
- self.basedir = "crawler/Basic/service"
- fileutil.make_dirs(self.basedir)
- serverid = "\x00" * 20
- ss = StorageServer(self.basedir, serverid)
- ss.setServiceParent(self.s)
+ ss = self.create("crawler/Basic/service")
- sis = [self.write(i, ss, serverid) for i in range(10)]
+ sis = [self.write(i, ss, self.serverid) for i in range(10)]
statefile = os.path.join(self.basedir, "statefile")
- c = BucketEnumeratingCrawler(ss, statefile)
+ c = EnumeratingCrawler(ss, statefile)
c.setServiceParent(self.s)
# it should be legal to call get_state() and get_progress() right
self.failUnlessEqual(s["current-cycle"], None)
self.failUnlessEqual(p["cycle-in-progress"], False)
- d = c.finished_d
- def _check(ignored):
- self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
- d.addCallback(_check)
- return d
-
- def test_paced(self):
- self.basedir = "crawler/Basic/paced"
- fileutil.make_dirs(self.basedir)
- serverid = "\x00" * 20
- ss = StorageServer(self.basedir, serverid)
- ss.setServiceParent(self.s)
-
- # put four buckets in each prefixdir
- sis = []
- for i in range(10):
- for tail in range(4):
- sis.append(self.write(i, ss, serverid, tail))
-
- statefile = os.path.join(self.basedir, "statefile")
-
- c = PacedCrawler(ss, statefile)
- c.load_state()
- try:
- c.start_current_prefix(time.time())
- except TimeSliceExceeded:
- pass
- # that should stop in the middle of one of the buckets. Since we
- # aren't using its normal scheduler, we have to save its state
- # manually.
- c.save_state()
- c.cpu_slice = PacedCrawler.cpu_slice
- self.failUnlessEqual(len(c.all_buckets), 6)
-
- c.start_current_prefix(time.time()) # finish it
- self.failUnlessEqual(len(sis), len(c.all_buckets))
- self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
-
- # make sure the statefile has been returned to the starting point
- c.finished_d = defer.Deferred()
- c.all_buckets = []
- c.start_current_prefix(time.time())
- self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
- del c
-
- # start a new crawler, it should start from the beginning
- c = PacedCrawler(ss, statefile)
- c.load_state()
- try:
- c.start_current_prefix(time.time())
- except TimeSliceExceeded:
- pass
- # that should stop in the middle of one of the buckets. Since we
- # aren't using its normal scheduler, we have to save its state
- # manually.
- c.save_state()
- c.cpu_slice = PacedCrawler.cpu_slice
-
- # a third crawler should pick up from where it left off
- c2 = PacedCrawler(ss, statefile)
- c2.all_buckets = c.all_buckets[:]
- c2.load_state()
- c2.countdown = -1
- c2.start_current_prefix(time.time())
- self.failUnlessEqual(len(sis), len(c2.all_buckets))
- self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
- del c, c2
-
- # now stop it at the end of a bucket (countdown=4), to exercise a
- # different place that checks the time
- c = PacedCrawler(ss, statefile)
- c.load_state()
- c.countdown = 4
- try:
- c.start_current_prefix(time.time())
- except TimeSliceExceeded:
- pass
- # that should stop at the end of one of the buckets. Again we must
- # save state manually.
- c.save_state()
- c.cpu_slice = PacedCrawler.cpu_slice
- self.failUnlessEqual(len(c.all_buckets), 4)
- c.start_current_prefix(time.time()) # finish it
- self.failUnlessEqual(len(sis), len(c.all_buckets))
- self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
- del c
-
- # stop it again at the end of the bucket, check that a new checker
- # picks up correctly
- c = PacedCrawler(ss, statefile)
- c.load_state()
- c.countdown = 4
- try:
- c.start_current_prefix(time.time())
- except TimeSliceExceeded:
- pass
- # that should stop at the end of one of the buckets.
- c.save_state()
-
- c2 = PacedCrawler(ss, statefile)
- c2.all_buckets = c.all_buckets[:]
- c2.load_state()
- c2.countdown = -1
- c2.start_current_prefix(time.time())
- self.failUnlessEqual(len(sis), len(c2.all_buckets))
- self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
- del c, c2
-
- def test_paced_service(self):
- self.basedir = "crawler/Basic/paced_service"
- fileutil.make_dirs(self.basedir)
- serverid = "\x00" * 20
- ss = StorageServer(self.basedir, serverid)
- ss.setServiceParent(self.s)
-
- sis = [self.write(i, ss, serverid) for i in range(10)]
-
- statefile = os.path.join(self.basedir, "statefile")
- c = PacedCrawler(ss, statefile)
-
- did_check_progress = [False]
- def check_progress():
- c.yield_cb = None
- try:
- p = c.get_progress()
- self.failUnlessEqual(p["cycle-in-progress"], True)
- pct = p["cycle-complete-percentage"]
- # after 6 buckets, we happen to be at 76.17% complete. As
- # long as we create shares in deterministic order, this will
- # continue to be true.
- self.failUnlessEqual(int(pct), 76)
- left = p["remaining-sleep-time"]
- self.failUnless(isinstance(left, float), left)
- self.failUnless(left > 0.0, left)
- except Exception, e:
- did_check_progress[0] = e
- else:
- did_check_progress[0] = True
- c.yield_cb = check_progress
-
- c.setServiceParent(self.s)
- # that should get through 6 buckets, pause for a little while (and
- # run check_progress()), then resume
-
- d = c.finished_d
- def _check(ignored):
- if did_check_progress[0] is not True:
- raise did_check_progress[0]
- self.failUnless(did_check_progress[0])
- self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
- # at this point, the crawler should be sitting in the inter-cycle
- # timer, which should be pegged at the minumum cycle time
- self.failUnless(c.timer)
- self.failUnless(c.sleeping_between_cycles)
- self.failUnlessEqual(c.current_sleep_time, c.minimum_cycle_time)
-
+ d = self._after_prefix(None, 'sg', c)
+ def _after_sg_prefix(state):
p = c.get_progress()
- self.failUnlessEqual(p["cycle-in-progress"], False)
- naptime = p["remaining-wait-time"]
- self.failUnless(isinstance(naptime, float), naptime)
- # min-cycle-time is 300, so this is basically testing that it took
- # less than 290s to crawl
- self.failUnless(naptime > 10.0, naptime)
- soon = p["next-crawl-time"] - time.time()
- self.failUnless(soon > 10.0, soon)
-
- d.addCallback(_check)
+ self.failUnlessEqual(p["cycle-in-progress"], True)
+ pct = p["cycle-complete-percentage"]
+ # After the 'sg' prefix, we happen to be 76.17% complete and to
+ # have processed 6 sharesets. As long as we create shares in
+ # deterministic order, this will continue to be true.
+ self.failUnlessEqual(int(pct), 76)
+ self.failUnlessEqual(len(c.sharesets), 6)
+
+ return c.set_hook('after_cycle')
+ d.addCallback(_after_sg_prefix)
+
+ def _after_first_cycle(ignored):
+ self.failUnlessEqual(sorted(sis), sorted(c.sharesets))
+ d.addCallback(_after_first_cycle)
+ d.addBoth(self._wait_for_yield, c)
+
+ # Check that a new crawler picks up on the state file correctly.
+ def _new_crawler(ign):
+ c2 = EnumeratingCrawler(ss, statefile)
+ c2.setServiceParent(self.s)
+
+ d2 = c2.set_hook('after_cycle')
+ def _after_first_cycle2(ignored):
+ self.failUnlessEqual(sorted(sis), sorted(c2.sharesets))
+ d2.addCallback(_after_first_cycle2)
+ d2.addBoth(self._wait_for_yield, c2)
+ return d2
+ d.addCallback(_new_crawler)
return d
def OFF_test_cpu_usage(self):
- # this test can't actually assert anything, because too many
+ # This test can't actually assert anything, because too many
# buildslave machines are slow. But on a fast developer machine, it
# can produce interesting results. So if you care about how well the
# Crawler is accomplishing it's run-slowly goals, re-enable this test
# and read the stdout when it runs.
- self.basedir = "crawler/Basic/cpu_usage"
- fileutil.make_dirs(self.basedir)
- serverid = "\x00" * 20
- ss = StorageServer(self.basedir, serverid)
- ss.setServiceParent(self.s)
+ ss = self.create("crawler/Basic/cpu_usage")
for i in range(10):
- self.write(i, ss, serverid)
+ self.write(i, ss, self.serverid)
statefile = os.path.join(self.basedir, "statefile")
c = ConsumingCrawler(ss, statefile)
c.setServiceParent(self.s)
- # this will run as fast as it can, consuming about 50ms per call to
+ # This will run as fast as it can, consuming about 50ms per call to
# process_bucket(), limited by the Crawler to about 50% cpu. We let
# it run for a few seconds, then compare how much time
# process_bucket() got vs wallclock time. It should get between 10%
print "crawler: got %d%% percent when trying for 50%%" % percent
print "crawler: got %d full cycles" % c.cycles
d.addCallback(_done)
+ d.addBoth(self._wait_for_yield, c)
return d
def test_empty_subclass(self):
- self.basedir = "crawler/Basic/empty_subclass"
- fileutil.make_dirs(self.basedir)
- serverid = "\x00" * 20
- ss = StorageServer(self.basedir, serverid)
- ss.setServiceParent(self.s)
+ ss = self.create("crawler/Basic/empty_subclass")
for i in range(10):
- self.write(i, ss, serverid)
+ self.write(i, ss, self.serverid)
statefile = os.path.join(self.basedir, "statefile")
c = ShareCrawler(ss, statefile)
c.slow_start = 0
c.setServiceParent(self.s)
- # we just let it run for a while, to get figleaf coverage of the
- # empty methods in the base class
+ # We just let it run for a while, to get coverage of the
+ # empty methods in the base class.
- def _check():
- return bool(c.state["last-cycle-finished"] is not None)
- d = self.poll(_check)
- def _done(ignored):
- state = c.get_state()
- self.failUnless(state["last-cycle-finished"] is not None)
- d.addCallback(_done)
+ d = defer.succeed(None)
+ d.addBoth(self._wait_for_yield, c)
return d
-
def test_oneshot(self):
- self.basedir = "crawler/Basic/oneshot"
- fileutil.make_dirs(self.basedir)
- serverid = "\x00" * 20
- ss = StorageServer(self.basedir, serverid)
- ss.setServiceParent(self.s)
+ ss = self.create("crawler/Basic/oneshot")
for i in range(30):
- self.write(i, ss, serverid)
+ self.write(i, ss, self.serverid)
statefile = os.path.join(self.basedir, "statefile")
- c = OneShotCrawler(ss, statefile)
+ c = EnumeratingCrawler(ss, statefile)
c.setServiceParent(self.s)
- d = c.finished_d
- def _finished_first_cycle(ignored):
- return fireEventually(c.counter)
- d.addCallback(_finished_first_cycle)
+ d = c.set_hook('after_cycle')
+ def _after_first_cycle(ignored):
+ c.disownServiceParent()
+ return fireEventually(len(c.sharesets))
+ d.addCallback(_after_first_cycle)
def _check(old_counter):
- # the crawler should do any work after it's been stopped
- self.failUnlessEqual(old_counter, c.counter)
+ # The crawler shouldn't do any work after it has been stopped.
+ self.failUnlessEqual(old_counter, len(c.sharesets))
self.failIf(c.running)
self.failIf(c.timer)
self.failIf(c.current_sleep_time)
from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
from allmydata.storage.lease import LeaseInfo
-from allmydata.storage.crawler import BucketCountingCrawler
from allmydata.storage.expirer import LeaseCheckingCrawler
from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
ReadBucketProxy
VERIFICATION_KEY_SIZE, \
SHARE_HASH_CHAIN_SIZE
from allmydata.interfaces import BadWriteEnablerError
-from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
+from allmydata.test.common import LoggingServiceParent, ShouldFailMixin, CrawlerTestMixin
+from allmydata.test.common_util import ReallyEqualMixin
from allmydata.test.common_web import WebRenderingMixin
from allmydata.test.no_network import NoNetworkServer
from allmydata.web.storage import StorageStatus, remove_prefix
s = re.sub(r'\s+', ' ', s)
return s
-class MyBucketCountingCrawler(BucketCountingCrawler):
- def finished_prefix(self, cycle, prefix):
- BucketCountingCrawler.finished_prefix(self, cycle, prefix)
- if self.hook_ds:
- d = self.hook_ds.pop(0)
- d.callback(None)
-class MyStorageServer(StorageServer):
- def add_bucket_counter(self):
- statefile = os.path.join(self.storedir, "bucket_counter.state")
- self.bucket_counter = MyBucketCountingCrawler(self, statefile)
- self.bucket_counter.setServiceParent(self)
-
-class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
+class BucketCounterTest(unittest.TestCase, CrawlerTestMixin, ReallyEqualMixin):
def setUp(self):
self.s = service.MultiService()
basedir = "storage/BucketCounter/bucket_counter"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20)
- # to make sure we capture the bucket-counting-crawler in the middle
- # of a cycle, we reach in and reduce its maximum slice time to 0. We
- # also make it start sooner than usual.
+
+ # finish as fast as possible
ss.bucket_counter.slow_start = 0
- orig_cpu_slice = ss.bucket_counter.cpu_slice
- ss.bucket_counter.cpu_slice = 0
+ ss.bucket_counter.cpu_slice = 100.0
+
+ d = ss.bucket_counter.set_hook('after_prefix')
+
ss.setServiceParent(self.s)
w = StorageStatus(ss)
self.failUnlessIn("Total buckets: Not computed yet", s)
self.failUnlessIn("Next crawl in", s)
- # give the bucket-counting-crawler one tick to get started. The
- # cpu_slice=0 will force it to yield right after it processes the
- # first prefix
-
- d = fireEventually()
- def _check(ignored):
- # are we really right after the first prefix?
+ def _after_first_prefix(prefix):
+ ss.bucket_counter.save_state()
state = ss.bucket_counter.get_state()
- if state["last-complete-prefix"] is None:
- d2 = fireEventually()
- d2.addCallback(_check)
- return d2
- self.failUnlessEqual(state["last-complete-prefix"],
- ss.bucket_counter.prefixes[0])
- ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
+ self.failUnlessEqual(prefix, state["last-complete-prefix"])
+ self.failUnlessEqual(prefix, ss.bucket_counter.prefixes[0])
+
html = w.renderSynchronously()
s = remove_tags(html)
self.failUnlessIn(" Current crawl ", s)
self.failUnlessIn(" (next work in ", s)
- d.addCallback(_check)
- # now give it enough time to complete a full cycle
- def _watch():
- return not ss.bucket_counter.get_progress()["cycle-in-progress"]
- d.addCallback(lambda ignored: self.poll(_watch))
- def _check2(ignored):
- ss.bucket_counter.cpu_slice = orig_cpu_slice
+ return ss.bucket_counter.set_hook('after_cycle')
+ d.addCallback(_after_first_prefix)
+
+ def _after_first_cycle(cycle):
+ self.failUnlessEqual(cycle, 0)
+ progress = ss.bucket_counter.get_progress()
+ self.failUnlessReallyEqual(progress["cycle-in-progress"], False)
+ d.addCallback(_after_first_cycle)
+ d.addBoth(self._wait_for_yield, ss.bucket_counter)
+
+ def _after_yield(ign):
html = w.renderSynchronously()
s = remove_tags(html)
self.failUnlessIn("Total buckets: 0 (the number of", s)
self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
- d.addCallback(_check2)
+ d.addCallback(_after_yield)
return d
def test_bucket_counter_cleanup(self):
basedir = "storage/BucketCounter/bucket_counter_cleanup"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20)
- # to make sure we capture the bucket-counting-crawler in the middle
- # of a cycle, we reach in and reduce its maximum slice time to 0.
+
+ # finish as fast as possible
ss.bucket_counter.slow_start = 0
- orig_cpu_slice = ss.bucket_counter.cpu_slice
- ss.bucket_counter.cpu_slice = 0
- ss.setServiceParent(self.s)
+ ss.bucket_counter.cpu_slice = 100.0
- d = fireEventually()
+ d = ss.bucket_counter.set_hook('after_prefix')
+
+ ss.setServiceParent(self.s)
- def _after_first_prefix(ignored):
+ def _after_first_prefix(prefix):
+ ss.bucket_counter.save_state()
state = ss.bucket_counter.state
- if state["last-complete-prefix"] is None:
- d2 = fireEventually()
- d2.addCallback(_after_first_prefix)
- return d2
- ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
+ self.failUnlessEqual(prefix, state["last-complete-prefix"])
+ self.failUnlessEqual(prefix, ss.bucket_counter.prefixes[0])
+
# now sneak in and mess with its state, to make sure it cleans up
# properly at the end of the cycle
- self.failUnlessEqual(state["last-complete-prefix"],
- ss.bucket_counter.prefixes[0])
state["bucket-counts"][-12] = {}
state["storage-index-samples"]["bogusprefix!"] = (-12, [])
ss.bucket_counter.save_state()
+
+ return ss.bucket_counter.set_hook('after_cycle')
d.addCallback(_after_first_prefix)
- # now give it enough time to complete a cycle
- def _watch():
- return not ss.bucket_counter.get_progress()["cycle-in-progress"]
- d.addCallback(lambda ignored: self.poll(_watch))
- def _check2(ignored):
- ss.bucket_counter.cpu_slice = orig_cpu_slice
+ def _after_first_cycle(cycle):
+ self.failUnlessEqual(cycle, 0)
+ progress = ss.bucket_counter.get_progress()
+ self.failUnlessReallyEqual(progress["cycle-in-progress"], False)
+
s = ss.bucket_counter.get_state()
self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
self.failIf("bogusprefix!" in s["storage-index-samples"],
s["storage-index-samples"].keys())
- d.addCallback(_check2)
+ d.addCallback(_after_first_cycle)
+ d.addBoth(self._wait_for_yield, ss.bucket_counter)
return d
def test_bucket_counter_eta(self):
basedir = "storage/BucketCounter/bucket_counter_eta"
fileutil.make_dirs(basedir)
- ss = MyStorageServer(basedir, "\x00" * 20)
+ ss = StorageServer(basedir, "\x00" * 20)
+
+ # finish as fast as possible
ss.bucket_counter.slow_start = 0
- # these will be fired inside finished_prefix()
- hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
- w = StorageStatus(ss)
+ ss.bucket_counter.cpu_slice = 100.0
- d = defer.Deferred()
+ d = ss.bucket_counter.set_hook('after_prefix')
- def _check_1(ignored):
+ ss.setServiceParent(self.s)
+
+ w = StorageStatus(ss)
+
+ def _check_1(prefix1):
# no ETA is available yet
html = w.renderSynchronously()
s = remove_tags(html)
self.failUnlessIn("complete (next work", s)
- def _check_2(ignored):
- # one prefix has finished, so an ETA based upon that elapsed time
- # should be available.
- html = w.renderSynchronously()
- s = remove_tags(html)
- self.failUnlessIn("complete (ETA ", s)
+ return ss.bucket_counter.set_hook('after_prefix')
+ d.addCallback(_check_1)
- def _check_3(ignored):
- # two prefixes have finished
+ def _check_2(prefix2):
+ # an ETA based upon elapsed time should be available.
html = w.renderSynchronously()
s = remove_tags(html)
self.failUnlessIn("complete (ETA ", s)
- d.callback("done")
-
- hooks[0].addCallback(_check_1).addErrback(d.errback)
- hooks[1].addCallback(_check_2).addErrback(d.errback)
- hooks[2].addCallback(_check_3).addErrback(d.errback)
-
- ss.setServiceParent(self.s)
+ d.addCallback(_check_2)
+ d.addBoth(self._wait_for_yield, ss.bucket_counter)
return d
class InstrumentedLeaseCheckingCrawler(LeaseCheckingCrawler):
self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]
self.cancel_secrets = [cs0, cs1, cs1a, cs2, cs3, cs3a]
- def test_basic(self):
+ def BROKEN_test_basic(self):
basedir = "storage/LeaseCrawler/basic"
fileutil.make_dirs(basedir)
ss = InstrumentedStorageServer(basedir, "\x00" * 20)
return
raise IndexError("unable to renew non-existent lease")
- def test_expire_age(self):
+ def BROKEN_test_expire_age(self):
basedir = "storage/LeaseCrawler/expire_age"
fileutil.make_dirs(basedir)
# setting expiration_time to 2000 means that any lease which is more
d.addCallback(_check_html)
return d
- def test_expire_cutoff_date(self):
+ def BROKEN_test_expire_cutoff_date(self):
basedir = "storage/LeaseCrawler/expire_cutoff_date"
fileutil.make_dirs(basedir)
# setting cutoff-date to 2000 seconds ago means that any lease which
self.failUnless(isinstance(p("2009-03-18"), int), p("2009-03-18"))
self.failUnlessEqual(p("2009-03-18"), 1237334400)
- def test_limited_history(self):
+ def BROKEN_test_limited_history(self):
basedir = "storage/LeaseCrawler/limited_history"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20)
d.addCallback(_check)
return d
- def test_unpredictable_future(self):
+ def BROKEN_test_unpredictable_future(self):
basedir = "storage/LeaseCrawler/unpredictable_future"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20)
d.addCallback(_check)
return d
- def test_no_st_blocks(self):
+ def BROKEN_test_no_st_blocks(self):
basedir = "storage/LeaseCrawler/no_st_blocks"
fileutil.make_dirs(basedir)
ss = No_ST_BLOCKS_StorageServer(basedir, "\x00" * 20,
d.addCallback(_check)
return d
- def test_share_corruption(self):
+ def BROKEN_test_share_corruption(self):
self._poll_should_ignore_these_errors = [
UnknownMutableContainerVersionError,
UnknownImmutableContainerVersionError,