#633: first version of a rate-limited interruptable share-crawler
authorBrian Warner <warner@lothar.com>
Thu, 19 Feb 2009 04:46:33 +0000 (21:46 -0700)
committerBrian Warner <warner@lothar.com>
Thu, 19 Feb 2009 04:46:33 +0000 (21:46 -0700)
src/allmydata/storage/crawler.py [new file with mode: 0644]
src/allmydata/test/test_crawler.py [new file with mode: 0644]

diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py
new file mode 100644 (file)
index 0000000..071f206
--- /dev/null
@@ -0,0 +1,183 @@
+
+import os, time, struct, pickle
+from twisted.internet import reactor
+from twisted.application import service
+from allmydata.storage.server import si_b2a
+
+class TimeSliceExceeded(Exception):
+    pass
+
+class ShareCrawler(service.MultiService):
+    """A ShareCrawler subclass is attached to a StorageServer, and
+    periodically walks all of its shares, processing each one in some
+    fashion. This crawl is rate-limited, to reduce the IO burden on the host,
+    since large servers will have several million shares, which can take
+    hours or days to read.
+
+    We assume that the normal upload/download/get_buckets traffic of a tahoe
+    grid will cause the prefixdir contents to be mostly cached, or that the
+    number of buckets in each prefixdir will be small enough to load quickly.
+    A 1TB allmydata.com server was measured to have 2.56M buckets, spread
+    into the 1040 prefixdirs, with about 2460 buckets per prefix. On this
+    server, each prefixdir took 130ms-200ms to list the first time, and 17ms
+    to list the second time.
+
+    To use this, create a subclass which implements the process_bucket()
+    method. It will be called with a prefixdir and a base32 storage index
+    string. process_bucket() should run synchronously.
+
+    Then create an instance, with a reference to a StorageServer and a
+    filename where it can store persistent state. The statefile is used to
+    keep track of how far around the ring the process has travelled, as well
+    as timing history to allow the pace to be predicted and controlled. The
+    statefile will be updated and written to disk after every bucket is
+    processed.
+
+    The crawler instance must be started with startService() before it will
+    do any work. To make it stop doing work, call stopService() and wait for
+    the Deferred that it returns.
+    """
+
+    # use up to 10% of the CPU, on average. This can be changed at any time.
+    allowed_cpu_percentage = .10
+    # use up to 1.0 seconds before yielding. This can be changed at any time.
+    cpu_slice = 1.0
+    # don't run a cycle faster than this
+    minimum_cycle_time = 300
+
+    def __init__(self, server, statefile):
+        service.MultiService.__init__(self)
+        self.server = server
+        self.sharedir = server.sharedir
+        self.statefile = statefile
+        self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2]
+                         for i in range(2**10)]
+        self.prefixes.sort()
+        self.timer = None
+        self.bucket_cache = (None, [])
+        self.first_cycle_finished = False
+
+    def load_state(self):
+        try:
+            f = open(self.statefile, "rb")
+            state = pickle.load(f)
+            lcp = state["last-complete-prefix"]
+            if lcp == None:
+                self.last_complete_prefix_index = -1
+            else:
+                self.last_complete_prefix_index = self.prefixes.index(lcp)
+            self.last_complete_bucket = state["last-complete-bucket"]
+            self.first_cycle_finished = state["first-cycle-finished"]
+            f.close()
+        except EnvironmentError:
+            self.last_complete_prefix_index = -1
+            self.last_complete_bucket = None
+            self.first_cycle_finished = False
+
+    def save_state(self):
+        lcpi = self.last_complete_prefix_index
+        if lcpi == -1:
+            last_complete_prefix = None
+        else:
+            last_complete_prefix = self.prefixes[lcpi]
+        state = {"version": 1,
+                 "last-complete-prefix": last_complete_prefix,
+                 "last-complete-bucket": self.last_complete_bucket,
+                 "first-cycle-finished": self.first_cycle_finished,
+                 }
+        tmpfile = self.statefile + ".tmp"
+        f = open(tmpfile, "wb")
+        pickle.dump(state, f)
+        f.close()
+        os.rename(tmpfile, self.statefile)
+
+    def startService(self):
+        self.load_state()
+        self.timer = reactor.callLater(0, self.start_slice)
+        service.MultiService.startService(self)
+
+    def stopService(self):
+        if self.timer:
+            self.timer.cancel()
+            self.timer = None
+        return service.MultiService.stopService(self)
+
+    def start_slice(self):
+        self.timer = None
+        start_slice = time.time()
+        try:
+            self.start_current_prefix(start_slice)
+            finished_cycle = True
+        except TimeSliceExceeded:
+            finished_cycle = False
+        # either we finished a whole cycle, or we ran out of time
+        this_slice = time.time() - start_slice
+        # this_slice/(this_slice+sleep_time) = percentage
+        # this_slice/percentage = this_slice+sleep_time
+        # sleep_time = (this_slice/percentage) - this_slice
+        sleep_time = (this_slice / self.allowed_cpu_percentage) - this_slice
+        # if the math gets weird, or a timequake happens, don't sleep forever
+        sleep_time = max(0.0, min(sleep_time, 299))
+        if finished_cycle:
+            # how long should we sleep between cycles? Don't run faster than
+            # allowed_cpu_percentage says, but also run faster than
+            # minimum_cycle_time
+            self.sleeping_between_cycles = True
+            sleep_time = max(sleep_time, self.minimum_cycle_time)
+        else:
+            self.sleeping_between_cycles = False
+        self.current_sleep_time = sleep_time # for status page
+        self.yielding(sleep_time)
+        self.timer = reactor.callLater(sleep_time, self.start_slice)
+
+    def start_current_prefix(self, start_slice):
+        for i in range(self.last_complete_prefix_index+1, len(self.prefixes)):
+            if time.time() > start_slice + self.cpu_slice:
+                raise TimeSliceExceeded()
+            # if we want to yield earlier, just raise TimeSliceExceeded()
+            prefix = self.prefixes[i]
+            prefixdir = os.path.join(self.sharedir, prefix)
+            if i == self.bucket_cache[0]:
+                buckets = self.bucket_cache[1]
+            else:
+                try:
+                    buckets = os.listdir(prefixdir)
+                    buckets.sort()
+                except EnvironmentError:
+                    buckets = []
+                self.bucket_cache = (i, buckets)
+            self.process_prefixdir(prefixdir, buckets, start_slice)
+            self.last_complete_prefix_index = i
+            self.save_state()
+        # yay! we finished the whole cycle
+        self.last_complete_prefix_index = -1
+        self.last_complete_bucket = None
+        self.first_cycle_finished = True
+        self.save_state()
+        self.finished_cycle()
+
+    def process_prefixdir(self, prefixdir, buckets, start_slice):
+        """This gets a list of bucket names (i.e. storage index strings,
+        base32-encoded) in sorted order.
+
+        Override this if your crawler doesn't care about the actual shares,
+        for example a crawler which merely keeps track of how many buckets
+        are being managed by this server.
+        """
+        for bucket in buckets:
+            if bucket <= self.last_complete_bucket:
+                continue
+            if time.time() > start_slice + self.cpu_slice:
+                raise TimeSliceExceeded()
+            self.process_bucket(prefixdir, bucket)
+            self.last_complete_bucket = bucket
+            self.save_state()
+
+    def process_bucket(self, prefixdir, storage_index_b32):
+        pass
+
+    def finished_cycle(self):
+        pass
+
+    def yielding(self, sleep_time):
+        pass
diff --git a/src/allmydata/test/test_crawler.py b/src/allmydata/test/test_crawler.py
new file mode 100644 (file)
index 0000000..dfee79a
--- /dev/null
@@ -0,0 +1,320 @@
+
+import time
+import os.path
+from twisted.trial import unittest
+from twisted.application import service
+from twisted.internet import defer
+from foolscap.eventual import eventually
+
+from allmydata.util import fileutil, hashutil, pollmixin
+from allmydata.storage.server import StorageServer, si_b2a
+from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded
+
+from test_storage import FakeCanary
+from common_util import StallMixin
+
+class BucketEnumeratingCrawler(ShareCrawler):
+    cpu_slice = 500 # make sure it can complete in a single slice
+    def __init__(self, server, statefile):
+        ShareCrawler.__init__(self, server, statefile)
+        self.all_buckets = []
+        self.finished_d = defer.Deferred()
+    def process_bucket(self, prefixdir, storage_index_b32):
+        self.all_buckets.append(storage_index_b32)
+    def finished_cycle(self):
+        eventually(self.finished_d.callback, None)
+
+class PacedCrawler(ShareCrawler):
+    cpu_slice = 500 # make sure it can complete in a single slice
+    def __init__(self, server, statefile):
+        ShareCrawler.__init__(self, server, statefile)
+        self.countdown = 6
+        self.all_buckets = []
+        self.finished_d = defer.Deferred()
+    def process_bucket(self, prefixdir, storage_index_b32):
+        self.all_buckets.append(storage_index_b32)
+        self.countdown -= 1
+        if self.countdown == 0:
+            # force a timeout. We restore it in yielding()
+            self.cpu_slice = -1.0
+    def yielding(self, sleep_time):
+        self.cpu_slice = 500
+    def finished_cycle(self):
+        eventually(self.finished_d.callback, None)
+
+class ConsumingCrawler(ShareCrawler):
+    cpu_slice = 0.5
+    allowed_cpu_percentage = 0.5
+    minimum_cycle_time = 0
+
+    def __init__(self, server, statefile):
+        ShareCrawler.__init__(self, server, statefile)
+        self.accumulated = 0.0
+        self.cycles = 0
+        self.last_yield = 0.0
+    def process_bucket(self, prefixdir, storage_index_b32):
+        start = time.time()
+        time.sleep(0.05)
+        elapsed = time.time() - start
+        self.accumulated += elapsed
+        self.last_yield += elapsed
+    def finished_cycle(self):
+        self.cycles += 1
+    def yielding(self, sleep_time):
+        self.last_yield = 0.0
+
+class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
+    def setUp(self):
+        self.s = service.MultiService()
+        self.s.startService()
+
+    def tearDown(self):
+        return self.s.stopService()
+
+    def si(self, i):
+        return hashutil.storage_index_hash(str(i))
+    def rs(self, i, serverid):
+        return hashutil.bucket_renewal_secret_hash(str(i), serverid)
+    def cs(self, i, serverid):
+        return hashutil.bucket_cancel_secret_hash(str(i), serverid)
+
+    def write(self, i, ss, serverid, tail=0):
+        si = self.si(i)
+        si = si[:-1] + chr(tail)
+        had,made = ss.remote_allocate_buckets(si,
+                                              self.rs(i, serverid),
+                                              self.cs(i, serverid),
+                                              set([0]), 99, FakeCanary())
+        made[0].remote_write(0, "data")
+        made[0].remote_close()
+        return si_b2a(si)
+
+    def test_immediate(self):
+        self.basedir = "crawler/Basic/immediate"
+        fileutil.make_dirs(self.basedir)
+        serverid = "\x00" * 20
+        ss = StorageServer(self.basedir, serverid)
+        ss.setServiceParent(self.s)
+
+        sis = [self.write(i, ss, serverid) for i in range(10)]
+        statefile = os.path.join(self.basedir, "statefile")
+
+        c = BucketEnumeratingCrawler(ss, statefile)
+        c.load_state()
+
+        c.start_current_prefix(time.time())
+        self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
+
+        # make sure the statefile has been returned to the starting point
+        c.finished_d = defer.Deferred()
+        c.all_buckets = []
+        c.start_current_prefix(time.time())
+        self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
+
+        # check that a new crawler picks up on the state file properly
+        c2 = BucketEnumeratingCrawler(ss, statefile)
+        c2.load_state()
+
+        c2.start_current_prefix(time.time())
+        self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
+
+    def test_service(self):
+        self.basedir = "crawler/Basic/service"
+        fileutil.make_dirs(self.basedir)
+        serverid = "\x00" * 20
+        ss = StorageServer(self.basedir, serverid)
+        ss.setServiceParent(self.s)
+
+        sis = [self.write(i, ss, serverid) for i in range(10)]
+
+        statefile = os.path.join(self.basedir, "statefile")
+        c = BucketEnumeratingCrawler(ss, statefile)
+        c.setServiceParent(self.s)
+
+        d = c.finished_d
+        def _check(ignored):
+            self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
+        d.addCallback(_check)
+        return d
+
+    def test_paced(self):
+        self.basedir = "crawler/Basic/paced"
+        fileutil.make_dirs(self.basedir)
+        serverid = "\x00" * 20
+        ss = StorageServer(self.basedir, serverid)
+        ss.setServiceParent(self.s)
+
+        # put four buckets in each prefixdir
+        sis = []
+        for i in range(10):
+            for tail in range(4):
+                sis.append(self.write(i, ss, serverid, tail))
+
+        statefile = os.path.join(self.basedir, "statefile")
+
+        c = PacedCrawler(ss, statefile)
+        c.load_state()
+        try:
+            c.start_current_prefix(time.time())
+        except TimeSliceExceeded:
+            pass
+        # that should stop in the middle of one of the buckets.
+        c.cpu_slice = PacedCrawler.cpu_slice
+        self.failUnlessEqual(len(c.all_buckets), 6)
+        c.start_current_prefix(time.time()) # finish it
+        self.failUnlessEqual(len(sis), len(c.all_buckets))
+        self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
+
+        # make sure the statefile has been returned to the starting point
+        c.finished_d = defer.Deferred()
+        c.all_buckets = []
+        c.start_current_prefix(time.time())
+        self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
+        del c
+
+        # start a new crawler, it should start from the beginning
+        c = PacedCrawler(ss, statefile)
+        c.load_state()
+        try:
+            c.start_current_prefix(time.time())
+        except TimeSliceExceeded:
+            pass
+        # that should stop in the middle of one of the buckets
+        c.cpu_slice = PacedCrawler.cpu_slice
+
+        # a third crawler should pick up from where it left off
+        c2 = PacedCrawler(ss, statefile)
+        c2.all_buckets = c.all_buckets[:]
+        c2.load_state()
+        c2.countdown = -1
+        c2.start_current_prefix(time.time())
+        self.failUnlessEqual(len(sis), len(c2.all_buckets))
+        self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
+        del c, c2
+
+        # now stop it at the end of a bucket (countdown=4), to exercise a
+        # different place that checks the time
+        c = PacedCrawler(ss, statefile)
+        c.load_state()
+        c.countdown = 4
+        try:
+            c.start_current_prefix(time.time())
+        except TimeSliceExceeded:
+            pass
+        # that should stop at the end of one of the buckets.
+        c.cpu_slice = PacedCrawler.cpu_slice
+        self.failUnlessEqual(len(c.all_buckets), 4)
+        c.start_current_prefix(time.time()) # finish it
+        self.failUnlessEqual(len(sis), len(c.all_buckets))
+        self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
+        del c
+
+        # stop it again at the end of the bucket, check that a new checker
+        # picks up correctly
+        c = PacedCrawler(ss, statefile)
+        c.load_state()
+        c.countdown = 4
+        try:
+            c.start_current_prefix(time.time())
+        except TimeSliceExceeded:
+            pass
+        # that should stop at the end of one of the buckets.
+
+        c2 = PacedCrawler(ss, statefile)
+        c2.all_buckets = c.all_buckets[:]
+        c2.load_state()
+        c2.countdown = -1
+        c2.start_current_prefix(time.time())
+        self.failUnlessEqual(len(sis), len(c2.all_buckets))
+        self.failUnlessEqual(sorted(sis), sorted(c2.all_buckets))
+        del c, c2
+
+    def test_paced_service(self):
+        self.basedir = "crawler/Basic/paced_service"
+        fileutil.make_dirs(self.basedir)
+        serverid = "\x00" * 20
+        ss = StorageServer(self.basedir, serverid)
+        ss.setServiceParent(self.s)
+
+        sis = [self.write(i, ss, serverid) for i in range(10)]
+
+        statefile = os.path.join(self.basedir, "statefile")
+        c = PacedCrawler(ss, statefile)
+        c.setServiceParent(self.s)
+        # that should get through 6 buckets, pause for a little while, then
+        # resume
+
+        d = c.finished_d
+        def _check(ignored):
+            self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
+            # at this point, the crawler should be sitting in the inter-cycle
+            # timer, which should be pegged at the minumum cycle time
+            self.failUnless(c.timer)
+            self.failUnless(c.sleeping_between_cycles)
+            self.failUnlessEqual(c.current_sleep_time, c.minimum_cycle_time)
+        d.addCallback(_check)
+        return d
+
+    def test_cpu_usage(self):
+        self.basedir = "crawler/Basic/cpu_usage"
+        fileutil.make_dirs(self.basedir)
+        serverid = "\x00" * 20
+        ss = StorageServer(self.basedir, serverid)
+        ss.setServiceParent(self.s)
+
+        sis = [self.write(i, ss, serverid) for i in range(10)]
+
+        statefile = os.path.join(self.basedir, "statefile")
+        c = ConsumingCrawler(ss, statefile)
+        c.setServiceParent(self.s)
+
+        # this will run as fast as it can, consuming about 50ms per call to
+        # process_bucket(), limited by the Crawler to about 50% cpu. We let
+        # it run for a few seconds, then compare how much time
+        # process_bucket() got vs wallclock time. It should get between 10%
+        # and 70% CPU. This is dicey, there's about 100ms of overhead per
+        # 300ms slice (saving the state file takes about 150-200us, but we do
+        # it 1024 times per cycle, one for each [empty] prefixdir), leaving
+        # 200ms for actual processing, which is enough to get through 4
+        # buckets each slice, then the crawler sleeps for 300ms/0.5 = 600ms,
+        # giving us 900ms wallclock per slice. In 4.0 seconds we can do 4.4
+        # slices, giving us about 17 shares, so we merely assert that we've
+        # finished at least one cycle in that time.
+
+        # with a short cpu_slice (so we can keep this test down to 4
+        # seconds), the overhead is enough to make a nominal 50% usage more
+        # like 30%. Forcing sleep_time to 0 only gets us 67% usage.
+
+        # who knows what will happen on our slower buildslaves. I'll ditch
+        # the cycles>1 test first.
+
+        start = time.time()
+        d = self.stall(delay=4.0)
+        def _done(res):
+            elapsed = time.time() - start
+            percent = 100.0 * c.accumulated / elapsed
+            self.failUnless(20 < percent < 70, "crawler got %d%%" % percent)
+            self.failUnless(c.cycles >= 1, c.cycles)
+        d.addCallback(_done)
+        return d
+
+    def test_empty_subclass(self):
+        self.basedir = "crawler/Basic/empty_subclass"
+        fileutil.make_dirs(self.basedir)
+        serverid = "\x00" * 20
+        ss = StorageServer(self.basedir, serverid)
+        ss.setServiceParent(self.s)
+
+        sis = [self.write(i, ss, serverid) for i in range(10)]
+
+        statefile = os.path.join(self.basedir, "statefile")
+        c = ShareCrawler(ss, statefile)
+        c.setServiceParent(self.s)
+
+        # we just let it run for a while, to get figleaf coverage of the
+        # empty methods in the base class
+
+        def _check():
+            return c.first_cycle_finished
+        d = self.poll(_check)
+        return d