From: Brian Warner Date: Fri, 27 Feb 2009 02:42:48 +0000 (-0700) Subject: crawler: add ETA to get_progress() X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/%22doc.html/architecture.txt?a=commitdiff_plain;h=112dc355630c65a495f358fdcee6e692bed00bd7;p=tahoe-lafs%2Ftahoe-lafs.git crawler: add ETA to get_progress() --- diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py index 9d950537..d0af7807 100644 --- a/src/allmydata/storage/crawler.py +++ b/src/allmydata/storage/crawler.py @@ -79,39 +79,80 @@ class ShareCrawler(service.MultiService): self.bucket_cache = (None, []) self.current_sleep_time = None self.next_wake_time = None + self.last_prefix_finished_time = None + self.last_prefix_elapsed_time = None + self.last_cycle_started_time = None + self.last_cycle_elapsed_time = None self.load_state() + def minus_or_none(self, a, b): + if a is None: + return None + return a-b + def get_progress(self): """I return information about how much progress the crawler is making. My return value is a dictionary. The primary key is 'cycle-in-progress': True if the crawler is currently traversing the shares, False if it is idle between cycles. + Note that any of these 'time' keys could be None if I am called at + certain moments, so application code must be prepared to tolerate + this case. The estimates will also be None if insufficient data has + been gatherered to form an estimate. + If cycle-in-progress is True, the following keys will be present:: cycle-complete-percentage': float, from 0.0 to 100.0, indicating how far the crawler has progressed through the current cycle remaining-sleep-time: float, seconds from now when we do more work - + estimated-cycle-complete-time-left: + float, seconds remaining until the current cycle is finished. + TODO: this does not yet include the remaining time left in + the current prefixdir, and it will be very inaccurate on fast + crawlers (which can process a whole prefix in a single tick) + estimated-time-per-cycle: float, seconds required to do a complete + cycle If cycle-in-progress is False, the following keys are available:: - next-crawl-time: float, seconds-since-epoch when next crawl starts - - remaining-wait-time: float, seconds from now when next crawl starts + next-crawl-time: float, seconds-since-epoch when next crawl starts + remaining-wait-time: float, seconds from now when next crawl starts + estimated-time-per-cycle: float, seconds required to do a complete + cycle """ d = {} + if self.state["current-cycle"] is None: d["cycle-in-progress"] = False d["next-crawl-time"] = self.next_wake_time - d["remaining-wait-time"] = self.next_wake_time - time.time() + d["remaining-wait-time"] = self.minus_or_none(self.next_wake_time, + time.time()) else: d["cycle-in-progress"] = True pct = 100.0 * self.last_complete_prefix_index / len(self.prefixes) d["cycle-complete-percentage"] = pct - d["remaining-sleep-time"] = self.next_wake_time - time.time() + remaining = None + if self.last_prefix_elapsed_time is not None: + left = len(self.prefixes) - self.last_complete_prefix_index + remaining = left * self.last_prefix_elapsed_time + # TODO: remainder of this prefix: we need to estimate the + # per-bucket time, probably by measuring the time spent on + # this prefix so far, divided by the number of buckets we've + # processed. + d["estimated-cycle-complete-time-left"] = remaining + # it's possible to call get_progress() from inside a crawler's + # finished_prefix() function + d["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time, + time.time()) + per_cycle = None + if self.last_cycle_elapsed_time is not None: + per_cycle = self.last_cycle_elapsed_time + elif self.last_prefix_elapsed_time is not None: + per_cycle = len(self.prefixes) * self.last_prefix_elapsed_time + d["estimated-time-per-cycle"] = per_cycle return d def get_state(self): @@ -247,6 +288,7 @@ class ShareCrawler(service.MultiService): def start_current_prefix(self, start_slice): state = self.state if state["current-cycle"] is None: + self.last_cycle_started_time = time.time() if state["last-cycle-finished"] is None: state["current-cycle"] = 0 else: @@ -269,11 +311,23 @@ class ShareCrawler(service.MultiService): self.process_prefixdir(cycle, prefix, prefixdir, buckets, start_slice) self.last_complete_prefix_index = i + + now = time.time() + if self.last_prefix_finished_time is not None: + elapsed = now - self.last_prefix_finished_time + self.last_prefix_elapsed_time = elapsed + self.last_prefix_finished_time = now + self.finished_prefix(cycle, prefix) if time.time() >= start_slice + self.cpu_slice: raise TimeSliceExceeded() + # yay! we finished the whole cycle self.last_complete_prefix_index = -1 + self.last_prefix_finished_time = None # don't include the sleep + now = time.time() + if self.last_cycle_started_time is not None: + self.last_cycle_elapsed_time = now - self.last_cycle_started_time state["last-complete-bucket"] = None state["last-cycle-finished"] = cycle state["current-cycle"] = None diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 1604b10e..6ae5dad6 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -77,8 +77,10 @@ class StorageServer(service.MultiService, Referenceable): "renew": [], "cancel": [], } + self.add_bucket_counter() - statefile = os.path.join(storedir, "bucket_counter.state") + def add_bucket_counter(self): + statefile = os.path.join(self.storedir, "bucket_counter.state") self.bucket_counter = BucketCountingCrawler(self, statefile) self.bucket_counter.setServiceParent(self) diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index e0c07d9a..770c6fd1 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -14,6 +14,7 @@ from allmydata.storage.mutable import MutableShareFile from allmydata.storage.immutable import BucketWriter, BucketReader from allmydata.storage.common import DataTooLargeError from allmydata.storage.lease import LeaseInfo +from allmydata.storage.crawler import BucketCountingCrawler from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \ ReadBucketProxy from allmydata.interfaces import BadWriteEnablerError @@ -1299,6 +1300,19 @@ def remove_tags(s): s = re.sub(r'\s+', ' ', s) return s +class MyBucketCountingCrawler(BucketCountingCrawler): + def finished_prefix(self, cycle, prefix): + BucketCountingCrawler.finished_prefix(self, cycle, prefix) + if self.hook_ds: + d = self.hook_ds.pop(0) + d.callback(None) + +class MyStorageServer(StorageServer): + def add_bucket_counter(self): + statefile = os.path.join(self.storedir, "bucket_counter.state") + self.bucket_counter = MyBucketCountingCrawler(self, statefile) + self.bucket_counter.setServiceParent(self) + class BucketCounter(unittest.TestCase, pollmixin.PollMixin): def setUp(self): @@ -1398,6 +1412,45 @@ class BucketCounter(unittest.TestCase, pollmixin.PollMixin): d.addCallback(_check2) return d + def test_bucket_counter_eta(self): + basedir = "storage/BucketCounter/bucket_counter_eta" + fileutil.make_dirs(basedir) + ss = MyStorageServer(basedir, "\x00" * 20) + ss.bucket_counter.slow_start = 0 + # these will be fired inside finished_prefix() + hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)] + w = StorageStatus(ss) + + d = defer.Deferred() + + def _check_1(ignored): + # no ETA is available yet + html = w.renderSynchronously() + s = remove_tags(html) + self.failUnlessIn("complete (next work", s) + + def _check_2(ignored): + # one prefix has finished, so an ETA based upon that elapsed time + # should be available. + html = w.renderSynchronously() + s = remove_tags(html) + self.failUnlessIn("complete (ETA ", s) + + def _check_3(ignored): + # two prefixes have finished + html = w.renderSynchronously() + s = remove_tags(html) + self.failUnlessIn("complete (ETA ", s) + d.callback("done") + + hooks[0].addCallback(_check_1).addErrback(d.errback) + hooks[1].addCallback(_check_2).addErrback(d.errback) + hooks[2].addCallback(_check_3).addErrback(d.errback) + + ss.setServiceParent(self.s) + return d + + class NoStatvfsServer(StorageServer): def do_statvfs(self): raise AttributeError diff --git a/src/allmydata/web/storage.py b/src/allmydata/web/storage.py index 6cff994a..c0ad5cbd 100644 --- a/src/allmydata/web/storage.py +++ b/src/allmydata/web/storage.py @@ -72,11 +72,27 @@ class StorageStatus(rend.Page): def render_count_crawler_status(self, ctx, storage): s = self.storage.bucket_counter.get_progress() + + cycletime = s["estimated-time-per-cycle"] + cycletime_s = "" + if cycletime is not None: + cycletime_s = " (estimated cycle time %ds)" % cycletime + if s["cycle-in-progress"]: pct = s["cycle-complete-percentage"] soon = s["remaining-sleep-time"] + + eta = s["estimated-cycle-complete-time-left"] + eta_s = "" + if eta is not None: + eta_s = " (ETA %ds)" % eta + return ctx.tag["Current crawl %.1f%% complete" % pct, - " (next work in %s)" % abbreviate_time(soon)] + eta_s, + " (next work in %s)" % abbreviate_time(soon), + cycletime_s, + ] else: soon = s["remaining-wait-time"] - return ctx.tag["Next crawl in %s" % abbreviate_time(soon)] + return ctx.tag["Next crawl in %s" % abbreviate_time(soon), + cycletime_s]