From: Brian Warner Date: Sat, 21 Feb 2009 04:04:08 +0000 (-0700) Subject: storage: add bucket-counting share crawler, add its output (number of files+directori... X-Git-Tag: allmydata-tahoe-1.4.0~165 X-Git-Url: https://git.rkrishnan.org/vdrive/%3C?a=commitdiff_plain;h=b3cd4952bd2cbaccd4b806f27365527c12b72aa5;p=tahoe-lafs%2Ftahoe-lafs.git storage: add bucket-counting share crawler, add its output (number of files+directories maintained by a storage server) and status to the webapi /storage page --- diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py index 435bd2fa..2daeeba3 100644 --- a/src/allmydata/storage/crawler.py +++ b/src/allmydata/storage/crawler.py @@ -98,7 +98,6 @@ class ShareCrawler(service.MultiService): d = {} if self.state["current-cycle"] is None: - assert self.sleeping_between_cycles d["cycle-in-progress"] = False d["next-crawl-time"] = self.next_wake_time d["remaining-wait-time"] = self.next_wake_time - time.time() @@ -145,7 +144,7 @@ class ShareCrawler(service.MultiService): except EnvironmentError: state = {"version": 1, "last-cycle-finished": None, - "current-cycle": 0, + "current-cycle": None, "last-complete-prefix": None, "last-complete-bucket": None, } @@ -184,6 +183,11 @@ class ShareCrawler(service.MultiService): def startService(self): self.load_state() + # arrange things to look like we were just sleeping, so + # status/progress values work correctly + self.sleeping_between_cycles = True + self.current_sleep_time = 0 + self.next_wake_time = time.time() self.timer = reactor.callLater(0, self.start_slice) service.MultiService.startService(self) @@ -195,10 +199,12 @@ class ShareCrawler(service.MultiService): def start_slice(self): self.timer = None + self.sleeping_between_cycles = False self.current_sleep_time = None self.next_wake_time = None start_slice = time.time() try: + s = self.last_complete_prefix_index self.start_current_prefix(start_slice) finished_cycle = True except TimeSliceExceeded: @@ -229,14 +235,15 @@ class ShareCrawler(service.MultiService): self.timer = reactor.callLater(sleep_time, self.start_slice) def start_current_prefix(self, start_slice): - if self.state["current-cycle"] is None: - assert self.state["last-cycle-finished"] is not None - self.state["current-cycle"] = self.state["last-cycle-finished"] + 1 - cycle = self.state["current-cycle"] + state = self.state + if state["current-cycle"] is None: + if state["last-cycle-finished"] is None: + state["current-cycle"] = 0 + else: + state["current-cycle"] = state["last-cycle-finished"] + 1 + cycle = state["current-cycle"] for i in range(self.last_complete_prefix_index+1, len(self.prefixes)): - if time.time() > start_slice + self.cpu_slice: - raise TimeSliceExceeded() # if we want to yield earlier, just raise TimeSliceExceeded() prefix = self.prefixes[i] prefixdir = os.path.join(self.sharedir, prefix) @@ -253,11 +260,13 @@ class ShareCrawler(service.MultiService): buckets, start_slice) self.last_complete_prefix_index = i self.save_state() + if time.time() > start_slice + self.cpu_slice: + raise TimeSliceExceeded() # yay! we finished the whole cycle self.last_complete_prefix_index = -1 - self.state["last-complete-bucket"] = None - self.state["last-cycle-finished"] = cycle - self.state["current-cycle"] = None + state["last-complete-bucket"] = None + state["last-cycle-finished"] = cycle + state["current-cycle"] = None self.finished_cycle(cycle) self.save_state() @@ -272,11 +281,14 @@ class ShareCrawler(service.MultiService): for bucket in buckets: if bucket <= self.state["last-complete-bucket"]: continue - if time.time() > start_slice + self.cpu_slice: - raise TimeSliceExceeded() self.process_bucket(cycle, prefix, prefixdir, bucket) self.state["last-complete-bucket"] = bucket + # note: saving the state after every bucket is somewhat + # time-consuming, but lets us avoid losing more than one bucket's + # worth of progress. self.save_state() + if time.time() > start_slice + self.cpu_slice: + raise TimeSliceExceeded() def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32): """Examine a single bucket. Subclasses should do whatever they want @@ -317,3 +329,56 @@ class ShareCrawler(service.MultiService): """ pass + +class BucketCountingCrawler(ShareCrawler): + """I keep track of how many buckets are being managed by this server. + This is equivalent to the number of distributed files and directories for + which I am providing storage. The actual number of files+directories in + the full grid is probably higher (especially when there are more servers + than 'N', the number of generated shares), because some files+directories + will have shares on other servers instead of me. + """ + + minimum_cycle_time = 60*60 # we don't need this more than once an hour + + def __init__(self, server, statefile, num_sample_prefixes=1): + ShareCrawler.__init__(self, server, statefile) + self.num_sample_prefixes = num_sample_prefixes + + def add_initial_state(self): + # ["share-counts"][cyclenum][prefix] = number + # ["last-complete-cycle"] = cyclenum # maintained by base class + # ["last-complete-share-count"] = number + # ["storage-index-samples"][prefix] = (cyclenum, + # list of SI strings (base32)) + self.state.setdefault("share-counts", {}) + self.state.setdefault("last-complete-share-count", None) + self.state.setdefault("storage-index-samples", {}) + + def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice): + # we override process_prefixdir() because we don't want to look at + # the individual buckets. We'll save state after each one. On my + # laptop, a mostly-empty storage server can process about 70 + # prefixdirs in a 1.0s slice. + if cycle not in self.state["share-counts"]: + self.state["share-counts"][cycle] = {} + self.state["share-counts"][cycle][prefix] = len(buckets) + if prefix in self.prefixes[:self.num_sample_prefixes]: + self.state["storage-index-samples"][prefix] = (cycle, buckets) + + def finished_cycle(self, cycle): + last_counts = self.state["share-counts"].get(cycle, []) + if len(last_counts) == len(self.prefixes): + # great, we have a whole cycle. + num_buckets = sum(last_counts.values()) + self.state["last-complete-share-count"] = (cycle, num_buckets) + # get rid of old counts + for old_cycle in list(self.state["share-counts"].keys()): + if old_cycle != cycle: + del self.state["share-counts"][old_cycle] + # get rid of old samples too + for prefix in list(self.state["storage-index-samples"].keys()): + old_cycle,buckets = self.state["storage-index-samples"][prefix] + if old_cycle != cycle: + del self.state["storage-index-samples"][prefix] + diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 96e5b6ab..3b30fe96 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -14,6 +14,7 @@ from allmydata.storage.lease import LeaseInfo from allmydata.storage.mutable import MutableShareFile, EmptyShare, \ create_mutable_sharefile from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader +from allmydata.storage.crawler import BucketCountingCrawler # storage/ # storage/shares/incoming @@ -77,6 +78,10 @@ class StorageServer(service.MultiService, Referenceable): "cancel": [], } + statefile = os.path.join(storedir, "bucket_counter.state") + self.bucket_counter = BucketCountingCrawler(self, statefile) + self.bucket_counter.setServiceParent(self) + def count(self, name, delta=1): if self.stats_provider: self.stats_provider.count("storage_server." + name, delta) diff --git a/src/allmydata/test/test_crawler.py b/src/allmydata/test/test_crawler.py index 113a94e3..bed132de 100644 --- a/src/allmydata/test/test_crawler.py +++ b/src/allmydata/test/test_crawler.py @@ -146,6 +146,15 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin): c = BucketEnumeratingCrawler(ss, statefile) c.setServiceParent(self.s) + # it should be legal to call get_state() and get_progress() right + # away, even before the first tick is performed. No work should have + # been done yet. + s = c.get_state() + p = c.get_progress() + self.failUnlessEqual(s["last-complete-prefix"], None) + self.failUnlessEqual(s["current-cycle"], None) + self.failUnlessEqual(p["cycle-in-progress"], False) + d = c.finished_d def _check(ignored): self.failUnlessEqual(sorted(sis), sorted(c.all_buckets)) @@ -405,6 +414,9 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin): self.failIf(c.running) self.failIf(c.timer) self.failIf(c.current_sleep_time) + s = c.get_state() + self.failUnlessEqual(s["last-cycle-finished"], 0) + self.failUnlessEqual(s["current-cycle"], None) d.addCallback(_check) return d diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index d8788098..435aac20 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -1,10 +1,14 @@ + +import time, os.path, stat, re + from twisted.trial import unittest from twisted.internet import defer -import time, os.path, stat, re +from twisted.application import service +from foolscap import eventual import itertools from allmydata import interfaces -from allmydata.util import fileutil, hashutil, base32 +from allmydata.util import fileutil, hashutil, base32, pollmixin from allmydata.storage.server import StorageServer, storage_index_to_dir from allmydata.storage.mutable import MutableShareFile from allmydata.storage.immutable import BucketWriter, BucketReader @@ -14,8 +18,7 @@ from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \ ReadBucketProxy from allmydata.interfaces import BadWriteEnablerError from allmydata.test.common import LoggingServiceParent -from allmydata.web.storage import StorageStatus, abbreviate_if_known, \ - remove_prefix +from allmydata.web.storage import StorageStatus, remove_prefix class Marker: pass @@ -1290,33 +1293,135 @@ class Stats(unittest.TestCase): self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1) self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1) +def remove_tags(s): + s = re.sub(r'<[^>]*>', ' ', s) + s = re.sub(r'\s+', ' ', s) + return s + +class BucketCounter(unittest.TestCase, pollmixin.PollMixin): + + def setUp(self): + self.s = service.MultiService() + self.s.startService() + def tearDown(self): + return self.s.stopService() + + def test_bucket_counter(self): + basedir = "storage/BucketCounter/bucket_counter" + fileutil.make_dirs(basedir) + ss = StorageServer(basedir, "\x00" * 20) + # to make sure we capture the bucket-counting-crawler in the middle + # of a cycle, we reach in and reduce its maximum slice time to 0. + orig_cpu_slice = ss.bucket_counter.cpu_slice + ss.bucket_counter.cpu_slice = 0 + ss.setServiceParent(self.s) + + w = StorageStatus(ss) + + # this sample is before the crawler has started doing anything + html = w.renderSynchronously() + self.failUnless("

Storage Server Status

" in html, html) + s = remove_tags(html) + self.failUnless("Accepting new shares: Yes" in s, s) + self.failUnless("Reserved space: - 0 B (0)" in s, s) + self.failUnless("Total buckets: Not computed yet" in s, s) + self.failUnless("Next crawl in" in s, s) + + # give the bucket-counting-crawler one tick to get started. The + # cpu_slice=0 will force it to yield right after it processes the + # first prefix + + d = eventual.fireEventually() + def _check(ignored): + # are we really right after the first prefix? + state = ss.bucket_counter.get_state() + self.failUnlessEqual(state["last-complete-prefix"], + ss.bucket_counter.prefixes[0]) + ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible + html = w.renderSynchronously() + s = remove_tags(html) + self.failUnless(" Current crawl " in s, s) + self.failUnless(" (next work in " in s, s) + d.addCallback(_check) + + # now give it enough time to complete a full cycle + def _watch(): + return not ss.bucket_counter.get_progress()["cycle-in-progress"] + d.addCallback(lambda ignored: self.poll(_watch)) + def _check2(ignored): + ss.bucket_counter.cpu_slice = orig_cpu_slice + html = w.renderSynchronously() + s = remove_tags(html) + self.failUnless("Total buckets: 0 (the number of" in s, s) + self.failUnless("Next crawl in 359" in s, s) # about 3600-1 seconds + d.addCallback(_check2) + return d + + def test_bucket_counter_cleanup(self): + basedir = "storage/BucketCounter/bucket_counter_cleanup" + fileutil.make_dirs(basedir) + ss = StorageServer(basedir, "\x00" * 20) + # to make sure we capture the bucket-counting-crawler in the middle + # of a cycle, we reach in and reduce its maximum slice time to 0. + orig_cpu_slice = ss.bucket_counter.cpu_slice + ss.bucket_counter.cpu_slice = 0 + ss.setServiceParent(self.s) + + d = eventual.fireEventually() + + def _after_first_prefix(ignored): + ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible + # now sneak in and mess with its state, to make sure it cleans up + # properly at the end of the cycle + state = ss.bucket_counter.state + self.failUnlessEqual(state["last-complete-prefix"], + ss.bucket_counter.prefixes[0]) + state["share-counts"][-12] = {} + state["storage-index-samples"]["bogusprefix!"] = (-12, []) + ss.bucket_counter.save_state() + d.addCallback(_after_first_prefix) + + # now give it enough time to complete a cycle + def _watch(): + return not ss.bucket_counter.get_progress()["cycle-in-progress"] + d.addCallback(lambda ignored: self.poll(_watch)) + def _check2(ignored): + ss.bucket_counter.cpu_slice = orig_cpu_slice + s = ss.bucket_counter.get_state() + self.failIf(-12 in s["share-counts"], s["share-counts"].keys()) + self.failIf("bogusprefix!" in s["storage-index-samples"], + s["storage-index-samples"].keys()) + d.addCallback(_check2) + return d + class NoStatvfsServer(StorageServer): def do_statvfs(self): raise AttributeError -class WebStatus(unittest.TestCase): +class WebStatus(unittest.TestCase, pollmixin.PollMixin): + + def setUp(self): + self.s = service.MultiService() + self.s.startService() + def tearDown(self): + return self.s.stopService() def test_no_server(self): w = StorageStatus(None) html = w.renderSynchronously() self.failUnless("

No Storage Server Running

" in html, html) - - def remove_tags(self, s): - s = re.sub(r'<[^>]*>', ' ', s) - s = re.sub(r'\s+', ' ', s) - return s - def test_status(self): basedir = "storage/WebStatus/status" fileutil.make_dirs(basedir) ss = StorageServer(basedir, "\x00" * 20) + ss.setServiceParent(self.s) w = StorageStatus(ss) html = w.renderSynchronously() self.failUnless("

Storage Server Status

" in html, html) - s = self.remove_tags(html) + s = remove_tags(html) self.failUnless("Accepting new shares: Yes" in s, s) - self.failUnless("Reserved space: - 0B" in s, s) + self.failUnless("Reserved space: - 0 B (0)" in s, s) def test_status_no_statvfs(self): # windows has no os.statvfs . Make sure the code handles that even on @@ -1324,10 +1429,11 @@ class WebStatus(unittest.TestCase): basedir = "storage/WebStatus/status_no_statvfs" fileutil.make_dirs(basedir) ss = NoStatvfsServer(basedir, "\x00" * 20) + ss.setServiceParent(self.s) w = StorageStatus(ss) html = w.renderSynchronously() self.failUnless("

Storage Server Status

" in html, html) - s = self.remove_tags(html) + s = remove_tags(html) self.failUnless("Accepting new shares: Yes" in s, s) self.failUnless("Total disk space: ?" in s, s) @@ -1335,25 +1441,39 @@ class WebStatus(unittest.TestCase): basedir = "storage/WebStatus/readonly" fileutil.make_dirs(basedir) ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True) + ss.setServiceParent(self.s) w = StorageStatus(ss) html = w.renderSynchronously() self.failUnless("

Storage Server Status

" in html, html) - s = self.remove_tags(html) + s = remove_tags(html) self.failUnless("Accepting new shares: No" in s, s) def test_reserved(self): basedir = "storage/WebStatus/reserved" fileutil.make_dirs(basedir) ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6) + ss.setServiceParent(self.s) w = StorageStatus(ss) html = w.renderSynchronously() self.failUnless("

Storage Server Status

" in html, html) - s = self.remove_tags(html) - self.failUnless("Reserved space: - 10.00MB" in s, s) + s = remove_tags(html) + self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s) + + def test_huge_reserved(self): + basedir = "storage/WebStatus/reserved" + fileutil.make_dirs(basedir) + ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6) + ss.setServiceParent(self.s) + w = StorageStatus(ss) + html = w.renderSynchronously() + self.failUnless("

Storage Server Status

" in html, html) + s = remove_tags(html) + self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s) def test_util(self): - self.failUnlessEqual(abbreviate_if_known(None), "?") - self.failUnlessEqual(abbreviate_if_known(10e6), "10.00MB") + w = StorageStatus(None) + self.failUnlessEqual(w.render_space(None, None), "?") + self.failUnlessEqual(w.render_space(None, 10e6), "10.00 MB (10000000)") self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar") self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None) diff --git a/src/allmydata/web/storage.py b/src/allmydata/web/storage.py index a8698a47..fcdcef02 100644 --- a/src/allmydata/web/storage.py +++ b/src/allmydata/web/storage.py @@ -1,11 +1,7 @@ from nevow import rend, tags as T -from allmydata.web.common import getxmlfile, abbreviate_size - -def abbreviate_if_known(size): - if size is None: - return "?" - return abbreviate_size(size) +from allmydata.web.common import getxmlfile, abbreviate_time +from allmydata.util.abbreviate import abbreviate_space def remove_prefix(s, prefix): if not s.startswith(prefix): @@ -29,8 +25,10 @@ class StorageStatus(rend.Page): def render_bool(self, ctx, data): return {True: "Yes", False: "No"}[bool(data)] - def render_space(self, ctx, data): - return abbreviate_if_known(data) + def render_space(self, ctx, size): + if size is None: + return "?" + return "%s (%d)" % (abbreviate_space(size), size) def data_stats(self, ctx, data): # FYI: 'data' appears to be self, rather than the StorageServer @@ -51,8 +49,9 @@ class StorageStatus(rend.Page): # missing keys will cause an error, even if the renderer can tolerate # None values. To overcome this, we either need a dict-like object # that always returns None for unknown keys, or we must pre-populate - # our dict with those missing keys (or find some way to override - # Nevow's handling of dictionaries). + # our dict with those missing keys, or we should get rid of data_ + # methods that return dicts (or find some way to override Nevow's + # handling of dictionaries). d = dict([ (remove_prefix(k, "storage_server."), v) for k,v in self.storage.get_stats().items() ]) @@ -61,3 +60,22 @@ class StorageStatus(rend.Page): d.setdefault("reserved_space", None) d.setdefault("disk_avail", None) return d + + def data_last_complete_share_count(self, ctx, data): + s = self.storage.bucket_counter.get_state() + lcsc = s.get("last-complete-share-count") + if lcsc is None: + return "Not computed yet" + cycle, count = lcsc + return count + + def render_count_crawler_status(self, ctx, storage): + s = self.storage.bucket_counter.get_progress() + if s["cycle-in-progress"]: + pct = s["cycle-complete-percentage"] + soon = s["remaining-sleep-time"] + return ctx.tag["Current crawl %.1f%% complete" % pct, + " (next work in %s)" % abbreviate_time(soon)] + else: + soon = s["remaining-wait-time"] + return ctx.tag["Next crawl in %s" % abbreviate_time(soon)] diff --git a/src/allmydata/web/storage_status.xhtml b/src/allmydata/web/storage_status.xhtml index 49139446..c98e6c6e 100644 --- a/src/allmydata/web/storage_status.xhtml +++ b/src/allmydata/web/storage_status.xhtml @@ -10,11 +10,6 @@

Storage Server Status

- - @@ -24,10 +19,26 @@ - - + +
Total disk space:
-
======
Space Available:<
Space Available to Tahoe:
+ + + +