self.bucket_cache = (None, [])
self.current_sleep_time = None
self.next_wake_time = None
+ self.last_prefix_finished_time = None
+ self.last_prefix_elapsed_time = None
+ self.last_cycle_started_time = None
+ self.last_cycle_elapsed_time = None
+ def minus_or_none(self, a, b):
+ if a is None:
+ return None
+ return a-b
def get_progress(self):
"""I return information about how much progress the crawler is
making. My return value is a dictionary. The primary key is
'cycle-in-progress': True if the crawler is currently traversing the
shares, False if it is idle between cycles.
+ Note that any of these 'time' keys could be None if I am called at
+ certain moments, so application code must be prepared to tolerate
+ this case. The estimates will also be None if insufficient data has
+ been gatherered to form an estimate.
If cycle-in-progress is True, the following keys will be present::
cycle-complete-percentage': float, from 0.0 to 100.0, indicating how
far the crawler has progressed through
the current cycle
remaining-sleep-time: float, seconds from now when we do more work
+ estimated-cycle-complete-time-left:
+ float, seconds remaining until the current cycle is finished.
+ TODO: this does not yet include the remaining time left in
+ the current prefixdir, and it will be very inaccurate on fast
+ crawlers (which can process a whole prefix in a single tick)
+ estimated-time-per-cycle: float, seconds required to do a complete
+ cycle
If cycle-in-progress is False, the following keys are available::
- next-crawl-time: float, seconds-since-epoch when next crawl starts
- remaining-wait-time: float, seconds from now when next crawl starts
+ next-crawl-time: float, seconds-since-epoch when next crawl starts
+ remaining-wait-time: float, seconds from now when next crawl starts
+ estimated-time-per-cycle: float, seconds required to do a complete
+ cycle
d = {}
if self.state["current-cycle"] is None:
d["cycle-in-progress"] = False
d["next-crawl-time"] = self.next_wake_time
- d["remaining-wait-time"] = self.next_wake_time - time.time()
+ d["remaining-wait-time"] = self.minus_or_none(self.next_wake_time,
+ time.time())
d["cycle-in-progress"] = True
pct = 100.0 * self.last_complete_prefix_index / len(self.prefixes)
d["cycle-complete-percentage"] = pct
- d["remaining-sleep-time"] = self.next_wake_time - time.time()
+ remaining = None
+ if self.last_prefix_elapsed_time is not None:
+ left = len(self.prefixes) - self.last_complete_prefix_index
+ remaining = left * self.last_prefix_elapsed_time
+ # TODO: remainder of this prefix: we need to estimate the
+ # per-bucket time, probably by measuring the time spent on
+ # this prefix so far, divided by the number of buckets we've
+ # processed.
+ d["estimated-cycle-complete-time-left"] = remaining
+ # it's possible to call get_progress() from inside a crawler's
+ # finished_prefix() function
+ d["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time,
+ time.time())
+ per_cycle = None
+ if self.last_cycle_elapsed_time is not None:
+ per_cycle = self.last_cycle_elapsed_time
+ elif self.last_prefix_elapsed_time is not None:
+ per_cycle = len(self.prefixes) * self.last_prefix_elapsed_time
+ d["estimated-time-per-cycle"] = per_cycle
return d
def get_state(self):
def start_current_prefix(self, start_slice):
state = self.state
if state["current-cycle"] is None:
+ self.last_cycle_started_time = time.time()
if state["last-cycle-finished"] is None:
state["current-cycle"] = 0
self.process_prefixdir(cycle, prefix, prefixdir,
buckets, start_slice)
self.last_complete_prefix_index = i
+ now = time.time()
+ if self.last_prefix_finished_time is not None:
+ elapsed = now - self.last_prefix_finished_time
+ self.last_prefix_elapsed_time = elapsed
+ self.last_prefix_finished_time = now
self.finished_prefix(cycle, prefix)
if time.time() >= start_slice + self.cpu_slice:
raise TimeSliceExceeded()
# yay! we finished the whole cycle
self.last_complete_prefix_index = -1
+ self.last_prefix_finished_time = None # don't include the sleep
+ now = time.time()
+ if self.last_cycle_started_time is not None:
+ self.last_cycle_elapsed_time = now - self.last_cycle_started_time
state["last-complete-bucket"] = None
state["last-cycle-finished"] = cycle
state["current-cycle"] = None
from import BucketWriter, BucketReader
from import DataTooLargeError
from import LeaseInfo
+from import BucketCountingCrawler
from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
from allmydata.interfaces import BadWriteEnablerError
s = re.sub(r'\s+', ' ', s)
return s
+class MyBucketCountingCrawler(BucketCountingCrawler):
+ def finished_prefix(self, cycle, prefix):
+ BucketCountingCrawler.finished_prefix(self, cycle, prefix)
+ if self.hook_ds:
+ d = self.hook_ds.pop(0)
+ d.callback(None)
+class MyStorageServer(StorageServer):
+ def add_bucket_counter(self):
+ statefile = os.path.join(self.storedir, "bucket_counter.state")
+ self.bucket_counter = MyBucketCountingCrawler(self, statefile)
+ self.bucket_counter.setServiceParent(self)
class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
def setUp(self):
return d
+ def test_bucket_counter_eta(self):
+ basedir = "storage/BucketCounter/bucket_counter_eta"
+ fileutil.make_dirs(basedir)
+ ss = MyStorageServer(basedir, "\x00" * 20)
+ ss.bucket_counter.slow_start = 0
+ # these will be fired inside finished_prefix()
+ hooks = ss.bucket_counter.hook_ds = [defer.Deferred() for i in range(3)]
+ w = StorageStatus(ss)
+ d = defer.Deferred()
+ def _check_1(ignored):
+ # no ETA is available yet
+ html = w.renderSynchronously()
+ s = remove_tags(html)
+ self.failUnlessIn("complete (next work", s)
+ def _check_2(ignored):
+ # one prefix has finished, so an ETA based upon that elapsed time
+ # should be available.
+ html = w.renderSynchronously()
+ s = remove_tags(html)
+ self.failUnlessIn("complete (ETA ", s)
+ def _check_3(ignored):
+ # two prefixes have finished
+ html = w.renderSynchronously()
+ s = remove_tags(html)
+ self.failUnlessIn("complete (ETA ", s)
+ d.callback("done")
+ hooks[0].addCallback(_check_1).addErrback(d.errback)
+ hooks[1].addCallback(_check_2).addErrback(d.errback)
+ hooks[2].addCallback(_check_3).addErrback(d.errback)
+ ss.setServiceParent(self.s)
+ return d
class NoStatvfsServer(StorageServer):
def do_statvfs(self):
raise AttributeError
def render_count_crawler_status(self, ctx, storage):
s =
+ cycletime = s["estimated-time-per-cycle"]
+ cycletime_s = ""
+ if cycletime is not None:
+ cycletime_s = " (estimated cycle time %ds)" % cycletime
if s["cycle-in-progress"]:
pct = s["cycle-complete-percentage"]
soon = s["remaining-sleep-time"]
+ eta = s["estimated-cycle-complete-time-left"]
+ eta_s = ""
+ if eta is not None:
+ eta_s = " (ETA %ds)" % eta
return ctx.tag["Current crawl %.1f%% complete" % pct,
- " (next work in %s)" % abbreviate_time(soon)]
+ eta_s,
+ " (next work in %s)" % abbreviate_time(soon),
+ cycletime_s,
+ ]
soon = s["remaining-wait-time"]
- return ctx.tag["Next crawl in %s" % abbreviate_time(soon)]
+ return ctx.tag["Next crawl in %s" % abbreviate_time(soon),
+ cycletime_s]