From 112dc355630c65a495f358fdcee6e692bed00bd7 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@allmydata.com>
Date: Thu, 26 Feb 2009 19:42:48 -0700
Subject: [PATCH] crawler: add ETA to get_progress()

---
 src/allmydata/storage/crawler.py   | 66 +++++++++++++++++++++++++++---
 src/allmydata/storage/server.py    |  4 +-
 src/allmydata/test/test_storage.py | 53 ++++++++++++++++++++++++
 src/allmydata/web/storage.py       | 20 ++++++++-
 4 files changed, 134 insertions(+), 9 deletions(-)

diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py
index 9d950537..d0af7807 100644
--- a/src/allmydata/storage/crawler.py
+++ b/src/allmydata/storage/crawler.py
@@ -79,39 +79,80 @@ class ShareCrawler(service.MultiService):
         self.bucket_cache = (None, [])
         self.current_sleep_time = None
         self.next_wake_time = None
+        self.last_prefix_finished_time = None
+        self.last_prefix_elapsed_time = None
+        self.last_cycle_started_time = None
+        self.last_cycle_elapsed_time = None
         self.load_state()
 
+    def minus_or_none(self, a, b):
+        if a is None:
+            return None
+        return a-b
+
     def get_progress(self):
         """I return information about how much progress the crawler is
         making. My return value is a dictionary. The primary key is
         'cycle-in-progress': True if the crawler is currently traversing the
         shares, False if it is idle between cycles.
 
+        Note that any of these 'time' keys could be None if I am called at
+        certain moments, so application code must be prepared to tolerate
+        this case. The estimates will also be None if insufficient data has
+        been gatherered to form an estimate.
+
         If cycle-in-progress is True, the following keys will be present::
 
          cycle-complete-percentage': float, from 0.0 to 100.0, indicating how
                                      far the crawler has progressed through
                                      the current cycle
          remaining-sleep-time: float, seconds from now when we do more work
-
+         estimated-cycle-complete-time-left:
+                float, seconds remaining until the current cycle is finished.
+                TODO: this does not yet include the remaining time left in
+                the current prefixdir, and it will be very inaccurate on fast
+                crawlers (which can process a whole prefix in a single tick)
+         estimated-time-per-cycle: float, seconds required to do a complete
+                                   cycle
 
         If cycle-in-progress is False, the following keys are available::
 
-           next-crawl-time: float, seconds-since-epoch when next crawl starts
-
-           remaining-wait-time: float, seconds from now when next crawl starts
+         next-crawl-time: float, seconds-since-epoch when next crawl starts
+         remaining-wait-time: float, seconds from now when next crawl starts
+         estimated-time-per-cycle: float, seconds required to do a complete
+                                   cycle
         """
 
         d = {}
+
         if self.state["current-cycle"] is None:
             d["cycle-in-progress"] = False
             d["next-crawl-time"] = self.next_wake_time
-            d["remaining-wait-time"] = self.next_wake_time - time.time()
+            d["remaining-wait-time"] = self.minus_or_none(self.next_wake_time,
+                                                          time.time())
         else:
             d["cycle-in-progress"] = True
             pct = 100.0 * self.last_complete_prefix_index / len(self.prefixes)
             d["cycle-complete-percentage"] = pct
-            d["remaining-sleep-time"] = self.next_wake_time - time.time()
+            remaining = None
+            if self.last_prefix_elapsed_time is not None:
+                left = len(self.prefixes) - self.last_complete_prefix_index
+                remaining = left * self.last_prefix_elapsed_time
+                # TODO: remainder of this prefix: we need to estimate the
+                # per-bucket time, probably by measuring the time spent on
+                # this prefix so far, divided by the number of buckets we've
+                # processed.
+            d["estimated-cycle-complete-time-left"] = remaining
+            # it's possible to call get_progress() from inside a crawler's
+            # finished_prefix() function
+            d["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time,
+                                                           time.time())
+        per_cycle = None
+        if self.last_cycle_elapsed_time is not None:
+            per_cycle = self.last_cycle_elapsed_time
+        elif self.last_prefix_elapsed_time is not None:
+            per_cycle = len(self.prefixes) * self.last_prefix_elapsed_time
+        d["estimated-time-per-cycle"] = per_cycle
         return d
 
     def get_state(self):
@@ -247,6 +288,7 @@ class ShareCrawler(service.MultiService):
     def start_current_prefix(self, start_slice):
         state = self.state
         if state["current-cycle"] is None:
+            self.last_cycle_started_time = time.time()
             if state["last-cycle-finished"] is None:
                 state["current-cycle"] = 0
             else:
@@ -269,11 +311,23 @@ class ShareCrawler(service.MultiService):
             self.process_prefixdir(cycle, prefix, prefixdir,
                                    buckets, start_slice)
             self.last_complete_prefix_index = i
+
+            now = time.time()
+            if self.last_prefix_finished_time is not None:
+                elapsed = now - self.last_prefix_finished_time
+                self.last_prefix_elapsed_time = elapsed
+            self.last_prefix_finished_time = now
+
             self.finished_prefix(cycle, prefix)
             if time.time() >= start_slice + self.cpu_slice:
                 raise TimeSliceExceeded()
+
         # yay! we finished the whole cycle
         self.last_complete_prefix_index = -1
+        self.last_prefix_finished_time = None # don't include the sleep
+        now = time.time()
+        if self.last_cycle_started_time is not None:
+            self.last_cycle_elapsed_time = now - self.last_cycle_started_time
         state["last-complete-bucket"] = None
         state["last-cycle-finished"] = cycle
         state["current-cycle"] = None
diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py
index 1604b10e..6ae5dad6 100644
--- a/src/allmydata/storage/server.py
+++ b/src/allmydata/storage/server.py
@@ -77,8 +77,10 @@ class StorageServer(service.MultiService, Referenceable):
                           "renew": [],
                           "cancel": [],
                           }
+        self.add_bucket_counter()
 
-        statefile = os.path.join(storedir, "bucket_counter.state")
+    def add_bucket_counter(self):
+        statefile = os.path.join(self.storedir, "bucket_counter.state")
         self.bucket_counter = BucketCountingCrawler(self, statefile)
         self.bucket_counter.setServiceParent(self)
 
diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py
index e0c07d9a..770c6fd1 100644
--- a/src/allmydata/test/test_storage.py
+++ b/src/allmydata/test/test_storage.py
@@ -14,6 +14,7 @@ from allmydata.storage.mutable import MutableShareFile
 from allmydata.storage.immutable import BucketWriter, BucketReader
 from allmydata.storage.common import DataTooLargeError
 from allmydata.storage.lease import LeaseInfo
+from allmydata.storage.crawler import BucketCountingCrawler
 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
      ReadBucketProxy
 from allmydata.interfaces import BadWriteEnablerError
@@ -1299,6 +1300,19 @@ def remove_tags(s):
     s = re.sub(r'\s+', ' ', s)
     return s
 
+class MyBucketCountingCrawler(BucketCountingCrawler):
+    def finished_prefix(self, cycle, prefix):
+        BucketCountingCrawler.finished_prefix(self, cycle, prefix)
+        if self.hook_ds:
+            d = self.hook_ds.pop(0)
+            d.callback(None)
+
+class MyStorageServer(StorageServer):
+    def add_bucket_counter(self):
+        statefile = os.path.join(self.storedir, "bucket_counter.state")
+        self.bucket_counter = MyBucketCountingCrawler(self, statefile)
+        self.bucket_counter.setServiceParent(self)
+
 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
 
     def setUp(self):
@@ -1398,6 +1412,45 @@ class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
         d.addCallback(_check2)
         return d
 
+    def test_bucket_counter_eta(self):
+        basedir = "storage/BucketCounter/bucket_counter_eta"
+        fileutil.make_dirs(basedir)
+        ss = MyStorageServer(basedir, "\x00" * 20)
+        ss.bucket_counter.slow_start = 0
+        # these will be fired inside finished_prefix()
+        hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
+        w = StorageStatus(ss)
+
+        d = defer.Deferred()
+
+        def _check_1(ignored):
+            # no ETA is available yet
+            html = w.renderSynchronously()
+            s = remove_tags(html)
+            self.failUnlessIn("complete (next work", s)
+
+        def _check_2(ignored):
+            # one prefix has finished, so an ETA based upon that elapsed time
+            # should be available.
+            html = w.renderSynchronously()
+            s = remove_tags(html)
+            self.failUnlessIn("complete (ETA ", s)
+
+        def _check_3(ignored):
+            # two prefixes have finished
+            html = w.renderSynchronously()
+            s = remove_tags(html)
+            self.failUnlessIn("complete (ETA ", s)
+            d.callback("done")
+
+        hooks[0].addCallback(_check_1).addErrback(d.errback)
+        hooks[1].addCallback(_check_2).addErrback(d.errback)
+        hooks[2].addCallback(_check_3).addErrback(d.errback)
+
+        ss.setServiceParent(self.s)
+        return d
+
+
 class NoStatvfsServer(StorageServer):
     def do_statvfs(self):
         raise AttributeError
diff --git a/src/allmydata/web/storage.py b/src/allmydata/web/storage.py
index 6cff994a..c0ad5cbd 100644
--- a/src/allmydata/web/storage.py
+++ b/src/allmydata/web/storage.py
@@ -72,11 +72,27 @@ class StorageStatus(rend.Page):
 
     def render_count_crawler_status(self, ctx, storage):
         s = self.storage.bucket_counter.get_progress()
+
+        cycletime = s["estimated-time-per-cycle"]
+        cycletime_s = ""
+        if cycletime is not None:
+            cycletime_s = " (estimated cycle time %ds)" % cycletime
+
         if s["cycle-in-progress"]:
             pct = s["cycle-complete-percentage"]
             soon = s["remaining-sleep-time"]
+
+            eta = s["estimated-cycle-complete-time-left"]
+            eta_s = ""
+            if eta is not None:
+                eta_s = " (ETA %ds)" % eta
+
             return ctx.tag["Current crawl %.1f%% complete" % pct,
-                           " (next work in %s)" % abbreviate_time(soon)]
+                           eta_s,
+                           " (next work in %s)" % abbreviate_time(soon),
+                           cycletime_s,
+                           ]
         else:
             soon = s["remaining-wait-time"]
-            return ctx.tag["Next crawl in %s" % abbreviate_time(soon)]
+            return ctx.tag["Next crawl in %s" % abbreviate_time(soon),
+                           cycletime_s]
-- 
2.45.2