From: Brian Warner Date: Thu, 19 Feb 2009 04:46:33 +0000 (-0700) Subject: #633: first version of a rate-limited interruptable share-crawler X-Git-Tag: allmydata-tahoe-1.4.0~190 X-Git-Url: https://git.rkrishnan.org/pf/something?a=commitdiff_plain;h=193889f793ce8bc2541b91395af1f32138ffe7c8;p=tahoe-lafs%2Ftahoe-lafs.git #633: first version of a rate-limited interruptable share-crawler --- diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py new file mode 100644 index 00000000..071f2060 --- /dev/null +++ b/src/allmydata/storage/crawler.py @@ -0,0 +1,183 @@ + +import os, time, struct, pickle +from twisted.internet import reactor +from twisted.application import service +from allmydata.storage.server import si_b2a + +class TimeSliceExceeded(Exception): + pass + +class ShareCrawler(service.MultiService): + """A ShareCrawler subclass is attached to a StorageServer, and + periodically walks all of its shares, processing each one in some + fashion. This crawl is rate-limited, to reduce the IO burden on the host, + since large servers will have several million shares, which can take + hours or days to read. + + We assume that the normal upload/download/get_buckets traffic of a tahoe + grid will cause the prefixdir contents to be mostly cached, or that the + number of buckets in each prefixdir will be small enough to load quickly. + A 1TB allmydata.com server was measured to have 2.56M buckets, spread + into the 1040 prefixdirs, with about 2460 buckets per prefix. On this + server, each prefixdir took 130ms-200ms to list the first time, and 17ms + to list the second time. + + To use this, create a subclass which implements the process_bucket() + method. It will be called with a prefixdir and a base32 storage index + string. process_bucket() should run synchronously. + + Then create an instance, with a reference to a StorageServer and a + filename where it can store persistent state. The statefile is used to + keep track of how far around the ring the process has travelled, as well + as timing history to allow the pace to be predicted and controlled. The + statefile will be updated and written to disk after every bucket is + processed. + + The crawler instance must be started with startService() before it will + do any work. To make it stop doing work, call stopService() and wait for + the Deferred that it returns. + """ + + # use up to 10% of the CPU, on average. This can be changed at any time. + allowed_cpu_percentage = .10 + # use up to 1.0 seconds before yielding. This can be changed at any time. + cpu_slice = 1.0 + # don't run a cycle faster than this + minimum_cycle_time = 300 + + def __init__(self, server, statefile): + service.MultiService.__init__(self) + self.server = server + self.sharedir = server.sharedir + self.statefile = statefile + self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2] + for i in range(2**10)] + self.prefixes.sort() + self.timer = None + self.bucket_cache = (None, []) + self.first_cycle_finished = False + + def load_state(self): + try: + f = open(self.statefile, "rb") + state = pickle.load(f) + lcp = state["last-complete-prefix"] + if lcp == None: + self.last_complete_prefix_index = -1 + else: + self.last_complete_prefix_index = self.prefixes.index(lcp) + self.last_complete_bucket = state["last-complete-bucket"] + self.first_cycle_finished = state["first-cycle-finished"] + f.close() + except EnvironmentError: + self.last_complete_prefix_index = -1 + self.last_complete_bucket = None + self.first_cycle_finished = False + + def save_state(self): + lcpi = self.last_complete_prefix_index + if lcpi == -1: + last_complete_prefix = None + else: + last_complete_prefix = self.prefixes[lcpi] + state = {"version": 1, + "last-complete-prefix": last_complete_prefix, + "last-complete-bucket": self.last_complete_bucket, + "first-cycle-finished": self.first_cycle_finished, + } + tmpfile = self.statefile + ".tmp" + f = open(tmpfile, "wb") + pickle.dump(state, f) + f.close() + os.rename(tmpfile, self.statefile) + + def startService(self): + self.load_state() + self.timer = reactor.callLater(0, self.start_slice) + service.MultiService.startService(self) + + def stopService(self): + if self.timer: + self.timer.cancel() + self.timer = None + return service.MultiService.stopService(self) + + def start_slice(self): + self.timer = None + start_slice = time.time() + try: + self.start_current_prefix(start_slice) + finished_cycle = True + except TimeSliceExceeded: + finished_cycle = False + # either we finished a whole cycle, or we ran out of time + this_slice = time.time() - start_slice + # this_slice/(this_slice+sleep_time) = percentage + # this_slice/percentage = this_slice+sleep_time + # sleep_time = (this_slice/percentage) - this_slice + sleep_time = (this_slice / self.allowed_cpu_percentage) - this_slice + # if the math gets weird, or a timequake happens, don't sleep forever + sleep_time = max(0.0, min(sleep_time, 299)) + if finished_cycle: + # how long should we sleep between cycles? Don't run faster than + # allowed_cpu_percentage says, but also run faster than + # minimum_cycle_time + self.sleeping_between_cycles = True + sleep_time = max(sleep_time, self.minimum_cycle_time) + else: + self.sleeping_between_cycles = False + self.current_sleep_time = sleep_time # for status page + self.yielding(sleep_time) + self.timer = reactor.callLater(sleep_time, self.start_slice) + + def start_current_prefix(self, start_slice): + for i in range(self.last_complete_prefix_index+1, len(self.prefixes)): + if time.time() > start_slice + self.cpu_slice: + raise TimeSliceExceeded() + # if we want to yield earlier, just raise TimeSliceExceeded() + prefix = self.prefixes[i] + prefixdir = os.path.join(self.sharedir, prefix) + if i == self.bucket_cache[0]: + buckets = self.bucket_cache[1] + else: + try: + buckets = os.listdir(prefixdir) + buckets.sort() + except EnvironmentError: + buckets = [] + self.bucket_cache = (i, buckets) + self.process_prefixdir(prefixdir, buckets, start_slice) + self.last_complete_prefix_index = i + self.save_state() + # yay! we finished the whole cycle + self.last_complete_prefix_index = -1 + self.last_complete_bucket = None + self.first_cycle_finished = True + self.save_state() + self.finished_cycle() + + def process_prefixdir(self, prefixdir, buckets, start_slice): + """This gets a list of bucket names (i.e. storage index strings, + base32-encoded) in sorted order. + + Override this if your crawler doesn't care about the actual shares, + for example a crawler which merely keeps track of how many buckets + are being managed by this server. + """ + for bucket in buckets: + if bucket <= self.last_complete_bucket: + continue + if time.time() > start_slice + self.cpu_slice: + raise TimeSliceExceeded() + self.process_bucket(prefixdir, bucket) + self.last_complete_bucket = bucket + self.save_state() + + def process_bucket(self, prefixdir, storage_index_b32): + pass + + def finished_cycle(self): + pass + + def yielding(self, sleep_time): + pass diff --git a/src/allmydata/test/test_crawler.py b/src/allmydata/test/test_crawler.py new file mode 100644 index 00000000..dfee79a3 --- /dev/null +++ b/src/allmydata/test/test_crawler.py @@ -0,0 +1,320 @@ + +import time +import os.path +from twisted.trial import unittest +from twisted.application import service +from twisted.internet import defer +from foolscap.eventual import eventually + +from allmydata.util import fileutil, hashutil, pollmixin +from allmydata.storage.server import StorageServer, si_b2a +from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded + +from test_storage import FakeCanary +from common_util import StallMixin + +class BucketEnumeratingCrawler(ShareCrawler): + cpu_slice = 500 # make sure it can complete in a single slice + def __init__(self, server, statefile): + ShareCrawler.__init__(self, server, statefile) + self.all_buckets = [] + self.finished_d = defer.Deferred() + def process_bucket(self, prefixdir, storage_index_b32): + self.all_buckets.append(storage_index_b32) + def finished_cycle(self): + eventually(self.finished_d.callback, None) + +class PacedCrawler(ShareCrawler): + cpu_slice = 500 # make sure it can complete in a single slice + def __init__(self, server, statefile): + ShareCrawler.__init__(self, server, statefile) + self.countdown = 6 + self.all_buckets = [] + self.finished_d = defer.Deferred() + def process_bucket(self, prefixdir, storage_index_b32): + self.all_buckets.append(storage_index_b32) + self.countdown -= 1 + if self.countdown == 0: + # force a timeout. We restore it in yielding() + self.cpu_slice = -1.0 + def yielding(self, sleep_time): + self.cpu_slice = 500 + def finished_cycle(self): + eventually(self.finished_d.callback, None) + +class ConsumingCrawler(ShareCrawler): + cpu_slice = 0.5 + allowed_cpu_percentage = 0.5 + minimum_cycle_time = 0 + + def __init__(self, server, statefile): + ShareCrawler.__init__(self, server, statefile) + self.accumulated = 0.0 + self.cycles = 0 + self.last_yield = 0.0 + def process_bucket(self, prefixdir, storage_index_b32): + start = time.time() + time.sleep(0.05) + elapsed = time.time() - start + self.accumulated += elapsed + self.last_yield += elapsed + def finished_cycle(self): + self.cycles += 1 + def yielding(self, sleep_time): + self.last_yield = 0.0 + +class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin): + def setUp(self): + self.s = service.MultiService() + self.s.startService() + + def tearDown(self): + return self.s.stopService() + + def si(self, i): + return hashutil.storage_index_hash(str(i)) + def rs(self, i, serverid): + return hashutil.bucket_renewal_secret_hash(str(i), serverid) + def cs(self, i, serverid): + return hashutil.bucket_cancel_secret_hash(str(i), serverid) + + def write(self, i, ss, serverid, tail=0): + si = self.si(i) + si = si[:-1] + chr(tail) + had,made = ss.remote_allocate_buckets(si, + self.rs(i, serverid), + self.cs(i, serverid), + set([0]), 99, FakeCanary()) + made[0].remote_write(0, "data") + made[0].remote_close() + return si_b2a(si) + + def test_immediate(self): + self.basedir = "crawler/Basic/immediate" + fileutil.make_dirs(self.basedir) + serverid = "\x00" * 20 + ss = StorageServer(self.basedir, serverid) + ss.setServiceParent(self.s) + + sis = [self.write(i, ss, serverid) for i in range(10)] + statefile = os.path.join(self.basedir, "statefile") + + c = BucketEnumeratingCrawler(ss, statefile) + c.load_state() + + c.start_current_prefix(time.time()) + self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) + + # make sure the statefile has been returned to the starting point + c.finished_d = defer.Deferred() + c.all_buckets = [] + c.start_current_prefix(time.time()) + self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) + + # check that a new crawler picks up on the state file properly + c2 = BucketEnumeratingCrawler(ss, statefile) + c2.load_state() + + c2.start_current_prefix(time.time()) + self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets)) + + def test_service(self): + self.basedir = "crawler/Basic/service" + fileutil.make_dirs(self.basedir) + serverid = "\x00" * 20 + ss = StorageServer(self.basedir, serverid) + ss.setServiceParent(self.s) + + sis = [self.write(i, ss, serverid) for i in range(10)] + + statefile = os.path.join(self.basedir, "statefile") + c = BucketEnumeratingCrawler(ss, statefile) + c.setServiceParent(self.s) + + d = c.finished_d + def _check(ignored): + self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) + d.addCallback(_check) + return d + + def test_paced(self): + self.basedir = "crawler/Basic/paced" + fileutil.make_dirs(self.basedir) + serverid = "\x00" * 20 + ss = StorageServer(self.basedir, serverid) + ss.setServiceParent(self.s) + + # put four buckets in each prefixdir + sis = [] + for i in range(10): + for tail in range(4): + sis.append(self.write(i, ss, serverid, tail)) + + statefile = os.path.join(self.basedir, "statefile") + + c = PacedCrawler(ss, statefile) + c.load_state() + try: + c.start_current_prefix(time.time()) + except TimeSliceExceeded: + pass + # that should stop in the middle of one of the buckets. + c.cpu_slice = PacedCrawler.cpu_slice + self.failUnlessEqual(len(c.all_buckets), 6) + c.start_current_prefix(time.time()) # finish it + self.failUnlessEqual(len(sis), len(c.all_buckets)) + self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) + + # make sure the statefile has been returned to the starting point + c.finished_d = defer.Deferred() + c.all_buckets = [] + c.start_current_prefix(time.time()) + self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) + del c + + # start a new crawler, it should start from the beginning + c = PacedCrawler(ss, statefile) + c.load_state() + try: + c.start_current_prefix(time.time()) + except TimeSliceExceeded: + pass + # that should stop in the middle of one of the buckets + c.cpu_slice = PacedCrawler.cpu_slice + + # a third crawler should pick up from where it left off + c2 = PacedCrawler(ss, statefile) + c2.all_buckets = c.all_buckets[:] + c2.load_state() + c2.countdown = -1 + c2.start_current_prefix(time.time()) + self.failUnlessEqual(len(sis), len(c2.all_buckets)) + self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets)) + del c, c2 + + # now stop it at the end of a bucket (countdown=4), to exercise a + # different place that checks the time + c = PacedCrawler(ss, statefile) + c.load_state() + c.countdown = 4 + try: + c.start_current_prefix(time.time()) + except TimeSliceExceeded: + pass + # that should stop at the end of one of the buckets. + c.cpu_slice = PacedCrawler.cpu_slice + self.failUnlessEqual(len(c.all_buckets), 4) + c.start_current_prefix(time.time()) # finish it + self.failUnlessEqual(len(sis), len(c.all_buckets)) + self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) + del c + + # stop it again at the end of the bucket, check that a new checker + # picks up correctly + c = PacedCrawler(ss, statefile) + c.load_state() + c.countdown = 4 + try: + c.start_current_prefix(time.time()) + except TimeSliceExceeded: + pass + # that should stop at the end of one of the buckets. + + c2 = PacedCrawler(ss, statefile) + c2.all_buckets = c.all_buckets[:] + c2.load_state() + c2.countdown = -1 + c2.start_current_prefix(time.time()) + self.failUnlessEqual(len(sis), len(c2.all_buckets)) + self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets)) + del c, c2 + + def test_paced_service(self): + self.basedir = "crawler/Basic/paced_service" + fileutil.make_dirs(self.basedir) + serverid = "\x00" * 20 + ss = StorageServer(self.basedir, serverid) + ss.setServiceParent(self.s) + + sis = [self.write(i, ss, serverid) for i in range(10)] + + statefile = os.path.join(self.basedir, "statefile") + c = PacedCrawler(ss, statefile) + c.setServiceParent(self.s) + # that should get through 6 buckets, pause for a little while, then + # resume + + d = c.finished_d + def _check(ignored): + self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) + # at this point, the crawler should be sitting in the inter-cycle + # timer, which should be pegged at the minumum cycle time + self.failUnless(c.timer) + self.failUnless(c.sleeping_between_cycles) + self.failUnlessEqual(c.current_sleep_time, c.minimum_cycle_time) + d.addCallback(_check) + return d + + def test_cpu_usage(self): + self.basedir = "crawler/Basic/cpu_usage" + fileutil.make_dirs(self.basedir) + serverid = "\x00" * 20 + ss = StorageServer(self.basedir, serverid) + ss.setServiceParent(self.s) + + sis = [self.write(i, ss, serverid) for i in range(10)] + + statefile = os.path.join(self.basedir, "statefile") + c = ConsumingCrawler(ss, statefile) + c.setServiceParent(self.s) + + # this will run as fast as it can, consuming about 50ms per call to + # process_bucket(), limited by the Crawler to about 50% cpu. We let + # it run for a few seconds, then compare how much time + # process_bucket() got vs wallclock time. It should get between 10% + # and 70% CPU. This is dicey, there's about 100ms of overhead per + # 300ms slice (saving the state file takes about 150-200us, but we do + # it 1024 times per cycle, one for each [empty] prefixdir), leaving + # 200ms for actual processing, which is enough to get through 4 + # buckets each slice, then the crawler sleeps for 300ms/0.5 = 600ms, + # giving us 900ms wallclock per slice. In 4.0 seconds we can do 4.4 + # slices, giving us about 17 shares, so we merely assert that we've + # finished at least one cycle in that time. + + # with a short cpu_slice (so we can keep this test down to 4 + # seconds), the overhead is enough to make a nominal 50% usage more + # like 30%. Forcing sleep_time to 0 only gets us 67% usage. + + # who knows what will happen on our slower buildslaves. I'll ditch + # the cycles>1 test first. + + start = time.time() + d = self.stall(delay=4.0) + def _done(res): + elapsed = time.time() - start + percent = 100.0 * c.accumulated / elapsed + self.failUnless(20 < percent < 70, "crawler got %d%%" % percent) + self.failUnless(c.cycles >= 1, c.cycles) + d.addCallback(_done) + return d + + def test_empty_subclass(self): + self.basedir = "crawler/Basic/empty_subclass" + fileutil.make_dirs(self.basedir) + serverid = "\x00" * 20 + ss = StorageServer(self.basedir, serverid) + ss.setServiceParent(self.s) + + sis = [self.write(i, ss, serverid) for i in range(10)] + + statefile = os.path.join(self.basedir, "statefile") + c = ShareCrawler(ss, statefile) + c.setServiceParent(self.s) + + # we just let it run for a while, to get figleaf coverage of the + # empty methods in the base class + + def _check(): + return c.first_cycle_finished + d = self.poll(_check) + return d