From b3cd4952bd2cbaccd4b806f27365527c12b72aa5 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Fri, 20 Feb 2009 21:04:08 -0700
Subject: [PATCH] storage: add bucket-counting share crawler, add its output
 (number of files+directories maintained by a storage server) and status to
 the webapi /storage page

---
 src/allmydata/storage/crawler.py       |  91 ++++++++++++--
 src/allmydata/storage/server.py        |   5 +
 src/allmydata/test/test_crawler.py     |  12 ++
 src/allmydata/test/test_storage.py     | 158 ++++++++++++++++++++++---
 src/allmydata/web/storage.py           |  38 ++++--
 src/allmydata/web/storage_status.xhtml |  25 ++--
 6 files changed, 280 insertions(+), 49 deletions(-)

diff --git a/src/allmydata/storage/crawler.py b/src/allmydata/storage/crawler.py
index 435bd2fa..2daeeba3 100644
--- a/src/allmydata/storage/crawler.py
+++ b/src/allmydata/storage/crawler.py
@@ -98,7 +98,6 @@ class ShareCrawler(service.MultiService):
 
         d = {}
         if self.state["current-cycle"] is None:
-            assert self.sleeping_between_cycles
             d["cycle-in-progress"] = False
             d["next-crawl-time"] = self.next_wake_time
             d["remaining-wait-time"] = self.next_wake_time - time.time()
@@ -145,7 +144,7 @@ class ShareCrawler(service.MultiService):
         except EnvironmentError:
             state = {"version": 1,
                      "last-cycle-finished": None,
-                     "current-cycle": 0,
+                     "current-cycle": None,
                      "last-complete-prefix": None,
                      "last-complete-bucket": None,
                      }
@@ -184,6 +183,11 @@ class ShareCrawler(service.MultiService):
 
     def startService(self):
         self.load_state()
+        # arrange things to look like we were just sleeping, so
+        # status/progress values work correctly
+        self.sleeping_between_cycles = True
+        self.current_sleep_time = 0
+        self.next_wake_time = time.time()
         self.timer = reactor.callLater(0, self.start_slice)
         service.MultiService.startService(self)
 
@@ -195,10 +199,12 @@ class ShareCrawler(service.MultiService):
 
     def start_slice(self):
         self.timer = None
+        self.sleeping_between_cycles = False
         self.current_sleep_time = None
         self.next_wake_time = None
         start_slice = time.time()
         try:
+            s = self.last_complete_prefix_index
             self.start_current_prefix(start_slice)
             finished_cycle = True
         except TimeSliceExceeded:
@@ -229,14 +235,15 @@ class ShareCrawler(service.MultiService):
         self.timer = reactor.callLater(sleep_time, self.start_slice)
 
     def start_current_prefix(self, start_slice):
-        if self.state["current-cycle"] is None:
-            assert self.state["last-cycle-finished"] is not None
-            self.state["current-cycle"] = self.state["last-cycle-finished"] + 1
-        cycle = self.state["current-cycle"]
+        state = self.state
+        if state["current-cycle"] is None:
+            if state["last-cycle-finished"] is None:
+                state["current-cycle"] = 0
+            else:
+                state["current-cycle"] = state["last-cycle-finished"] + 1
+        cycle = state["current-cycle"]
 
         for i in range(self.last_complete_prefix_index+1, len(self.prefixes)):
-            if time.time() > start_slice + self.cpu_slice:
-                raise TimeSliceExceeded()
             # if we want to yield earlier, just raise TimeSliceExceeded()
             prefix = self.prefixes[i]
             prefixdir = os.path.join(self.sharedir, prefix)
@@ -253,11 +260,13 @@ class ShareCrawler(service.MultiService):
                                    buckets, start_slice)
             self.last_complete_prefix_index = i
             self.save_state()
+            if time.time() > start_slice + self.cpu_slice:
+                raise TimeSliceExceeded()
         # yay! we finished the whole cycle
         self.last_complete_prefix_index = -1
-        self.state["last-complete-bucket"] = None
-        self.state["last-cycle-finished"] = cycle
-        self.state["current-cycle"] = None
+        state["last-complete-bucket"] = None
+        state["last-cycle-finished"] = cycle
+        state["current-cycle"] = None
         self.finished_cycle(cycle)
         self.save_state()
 
@@ -272,11 +281,14 @@ class ShareCrawler(service.MultiService):
         for bucket in buckets:
             if bucket <= self.state["last-complete-bucket"]:
                 continue
-            if time.time() > start_slice + self.cpu_slice:
-                raise TimeSliceExceeded()
             self.process_bucket(cycle, prefix, prefixdir, bucket)
             self.state["last-complete-bucket"] = bucket
+            # note: saving the state after every bucket is somewhat
+            # time-consuming, but lets us avoid losing more than one bucket's
+            # worth of progress.
             self.save_state()
+            if time.time() > start_slice + self.cpu_slice:
+                raise TimeSliceExceeded()
 
     def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
         """Examine a single bucket. Subclasses should do whatever they want
@@ -317,3 +329,56 @@ class ShareCrawler(service.MultiService):
         """
         pass
 
+
+class BucketCountingCrawler(ShareCrawler):
+    """I keep track of how many buckets are being managed by this server.
+    This is equivalent to the number of distributed files and directories for
+    which I am providing storage. The actual number of files+directories in
+    the full grid is probably higher (especially when there are more servers
+    than 'N', the number of generated shares), because some files+directories
+    will have shares on other servers instead of me.
+    """
+
+    minimum_cycle_time = 60*60 # we don't need this more than once an hour
+
+    def __init__(self, server, statefile, num_sample_prefixes=1):
+        ShareCrawler.__init__(self, server, statefile)
+        self.num_sample_prefixes = num_sample_prefixes
+
+    def add_initial_state(self):
+        # ["share-counts"][cyclenum][prefix] = number
+        # ["last-complete-cycle"] = cyclenum # maintained by base class
+        # ["last-complete-share-count"] = number
+        # ["storage-index-samples"][prefix] = (cyclenum,
+        #                                      list of SI strings (base32))
+        self.state.setdefault("share-counts", {})
+        self.state.setdefault("last-complete-share-count", None)
+        self.state.setdefault("storage-index-samples", {})
+
+    def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
+        # we override process_prefixdir() because we don't want to look at
+        # the individual buckets. We'll save state after each one. On my
+        # laptop, a mostly-empty storage server can process about 70
+        # prefixdirs in a 1.0s slice.
+        if cycle not in self.state["share-counts"]:
+            self.state["share-counts"][cycle] = {}
+        self.state["share-counts"][cycle][prefix] = len(buckets)
+        if prefix in self.prefixes[:self.num_sample_prefixes]:
+            self.state["storage-index-samples"][prefix] = (cycle, buckets)
+
+    def finished_cycle(self, cycle):
+        last_counts = self.state["share-counts"].get(cycle, [])
+        if len(last_counts) == len(self.prefixes):
+            # great, we have a whole cycle.
+            num_buckets = sum(last_counts.values())
+            self.state["last-complete-share-count"] = (cycle, num_buckets)
+            # get rid of old counts
+            for old_cycle in list(self.state["share-counts"].keys()):
+                if old_cycle != cycle:
+                    del self.state["share-counts"][old_cycle]
+        # get rid of old samples too
+        for prefix in list(self.state["storage-index-samples"].keys()):
+            old_cycle,buckets = self.state["storage-index-samples"][prefix]
+            if old_cycle != cycle:
+                del self.state["storage-index-samples"][prefix]
+
diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py
index 96e5b6ab..3b30fe96 100644
--- a/src/allmydata/storage/server.py
+++ b/src/allmydata/storage/server.py
@@ -14,6 +14,7 @@ from allmydata.storage.lease import LeaseInfo
 from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
      create_mutable_sharefile
 from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
+from allmydata.storage.crawler import BucketCountingCrawler
 
 # storage/
 # storage/shares/incoming
@@ -77,6 +78,10 @@ class StorageServer(service.MultiService, Referenceable):
                           "cancel": [],
                           }
 
+        statefile = os.path.join(storedir, "bucket_counter.state")
+        self.bucket_counter = BucketCountingCrawler(self, statefile)
+        self.bucket_counter.setServiceParent(self)
+
     def count(self, name, delta=1):
         if self.stats_provider:
             self.stats_provider.count("storage_server." + name, delta)
diff --git a/src/allmydata/test/test_crawler.py b/src/allmydata/test/test_crawler.py
index 113a94e3..bed132de 100644
--- a/src/allmydata/test/test_crawler.py
+++ b/src/allmydata/test/test_crawler.py
@@ -146,6 +146,15 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
         c = BucketEnumeratingCrawler(ss, statefile)
         c.setServiceParent(self.s)
 
+        # it should be legal to call get_state() and get_progress() right
+        # away, even before the first tick is performed. No work should have
+        # been done yet.
+        s = c.get_state()
+        p = c.get_progress()
+        self.failUnlessEqual(s["last-complete-prefix"], None)
+        self.failUnlessEqual(s["current-cycle"], None)
+        self.failUnlessEqual(p["cycle-in-progress"], False)
+
         d = c.finished_d
         def _check(ignored):
             self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
@@ -405,6 +414,9 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
             self.failIf(c.running)
             self.failIf(c.timer)
             self.failIf(c.current_sleep_time)
+            s = c.get_state()
+            self.failUnlessEqual(s["last-cycle-finished"], 0)
+            self.failUnlessEqual(s["current-cycle"], None)
         d.addCallback(_check)
         return d
 
diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py
index d8788098..435aac20 100644
--- a/src/allmydata/test/test_storage.py
+++ b/src/allmydata/test/test_storage.py
@@ -1,10 +1,14 @@
+
+import time, os.path, stat, re
+
 from twisted.trial import unittest
 
 from twisted.internet import defer
-import time, os.path, stat, re
+from twisted.application import service
+from foolscap import eventual
 import itertools
 from allmydata import interfaces
-from allmydata.util import fileutil, hashutil, base32
+from allmydata.util import fileutil, hashutil, base32, pollmixin
 from allmydata.storage.server import StorageServer, storage_index_to_dir
 from allmydata.storage.mutable import MutableShareFile
 from allmydata.storage.immutable import BucketWriter, BucketReader
@@ -14,8 +18,7 @@ from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
      ReadBucketProxy
 from allmydata.interfaces import BadWriteEnablerError
 from allmydata.test.common import LoggingServiceParent
-from allmydata.web.storage import StorageStatus, abbreviate_if_known, \
-     remove_prefix
+from allmydata.web.storage import StorageStatus, remove_prefix
 
 class Marker:
     pass
@@ -1290,33 +1293,135 @@ class Stats(unittest.TestCase):
         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
 
+def remove_tags(s):
+    s = re.sub(r'<[^>]*>', ' ', s)
+    s = re.sub(r'\s+', ' ', s)
+    return s
+
+class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
+
+    def setUp(self):
+        self.s = service.MultiService()
+        self.s.startService()
+    def tearDown(self):
+        return self.s.stopService()
+
+    def test_bucket_counter(self):
+        basedir = "storage/BucketCounter/bucket_counter"
+        fileutil.make_dirs(basedir)
+        ss = StorageServer(basedir, "\x00" * 20)
+        # to make sure we capture the bucket-counting-crawler in the middle
+        # of a cycle, we reach in and reduce its maximum slice time to 0.
+        orig_cpu_slice = ss.bucket_counter.cpu_slice
+        ss.bucket_counter.cpu_slice = 0
+        ss.setServiceParent(self.s)
+
+        w = StorageStatus(ss)
+
+        # this sample is before the crawler has started doing anything
+        html = w.renderSynchronously()
+        self.failUnless("<h1>Storage Server Status</h1>" in html, html)
+        s = remove_tags(html)
+        self.failUnless("Accepting new shares: Yes" in s, s)
+        self.failUnless("Reserved space: - 0 B (0)" in s, s)
+        self.failUnless("Total buckets: Not computed yet" in s, s)
+        self.failUnless("Next crawl in" in s, s)
+
+        # give the bucket-counting-crawler one tick to get started. The
+        # cpu_slice=0 will force it to yield right after it processes the
+        # first prefix
+
+        d = eventual.fireEventually()
+        def _check(ignored):
+            # are we really right after the first prefix?
+            state = ss.bucket_counter.get_state()
+            self.failUnlessEqual(state["last-complete-prefix"],
+                                 ss.bucket_counter.prefixes[0])
+            ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
+            html = w.renderSynchronously()
+            s = remove_tags(html)
+            self.failUnless(" Current crawl " in s, s)
+            self.failUnless(" (next work in " in s, s)
+        d.addCallback(_check)
+
+        # now give it enough time to complete a full cycle
+        def _watch():
+            return not ss.bucket_counter.get_progress()["cycle-in-progress"]
+        d.addCallback(lambda ignored: self.poll(_watch))
+        def _check2(ignored):
+            ss.bucket_counter.cpu_slice = orig_cpu_slice
+            html = w.renderSynchronously()
+            s = remove_tags(html)
+            self.failUnless("Total buckets: 0 (the number of" in s, s)
+            self.failUnless("Next crawl in 359" in s, s) # about 3600-1 seconds
+        d.addCallback(_check2)
+        return d
+
+    def test_bucket_counter_cleanup(self):
+        basedir = "storage/BucketCounter/bucket_counter_cleanup"
+        fileutil.make_dirs(basedir)
+        ss = StorageServer(basedir, "\x00" * 20)
+        # to make sure we capture the bucket-counting-crawler in the middle
+        # of a cycle, we reach in and reduce its maximum slice time to 0.
+        orig_cpu_slice = ss.bucket_counter.cpu_slice
+        ss.bucket_counter.cpu_slice = 0
+        ss.setServiceParent(self.s)
+
+        d = eventual.fireEventually()
+
+        def _after_first_prefix(ignored):
+            ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
+            # now sneak in and mess with its state, to make sure it cleans up
+            # properly at the end of the cycle
+            state = ss.bucket_counter.state
+            self.failUnlessEqual(state["last-complete-prefix"],
+                                 ss.bucket_counter.prefixes[0])
+            state["share-counts"][-12] = {}
+            state["storage-index-samples"]["bogusprefix!"] = (-12, [])
+            ss.bucket_counter.save_state()
+        d.addCallback(_after_first_prefix)
+
+        # now give it enough time to complete a cycle
+        def _watch():
+            return not ss.bucket_counter.get_progress()["cycle-in-progress"]
+        d.addCallback(lambda ignored: self.poll(_watch))
+        def _check2(ignored):
+            ss.bucket_counter.cpu_slice = orig_cpu_slice
+            s = ss.bucket_counter.get_state()
+            self.failIf(-12 in s["share-counts"], s["share-counts"].keys())
+            self.failIf("bogusprefix!" in s["storage-index-samples"],
+                        s["storage-index-samples"].keys())
+        d.addCallback(_check2)
+        return d
+
 class NoStatvfsServer(StorageServer):
     def do_statvfs(self):
         raise AttributeError
 
-class WebStatus(unittest.TestCase):
+class WebStatus(unittest.TestCase, pollmixin.PollMixin):
+
+    def setUp(self):
+        self.s = service.MultiService()
+        self.s.startService()
+    def tearDown(self):
+        return self.s.stopService()
 
     def test_no_server(self):
         w = StorageStatus(None)
         html = w.renderSynchronously()
         self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
 
-
-    def remove_tags(self, s):
-        s = re.sub(r'<[^>]*>', ' ', s)
-        s = re.sub(r'\s+', ' ', s)
-        return s
-
     def test_status(self):
         basedir = "storage/WebStatus/status"
         fileutil.make_dirs(basedir)
         ss = StorageServer(basedir, "\x00" * 20)
+        ss.setServiceParent(self.s)
         w = StorageStatus(ss)
         html = w.renderSynchronously()
         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
-        s = self.remove_tags(html)
+        s = remove_tags(html)
         self.failUnless("Accepting new shares: Yes" in s, s)
-        self.failUnless("Reserved space: - 0B" in s, s)
+        self.failUnless("Reserved space: - 0 B (0)" in s, s)
 
     def test_status_no_statvfs(self):
         # windows has no os.statvfs . Make sure the code handles that even on
@@ -1324,10 +1429,11 @@ class WebStatus(unittest.TestCase):
         basedir = "storage/WebStatus/status_no_statvfs"
         fileutil.make_dirs(basedir)
         ss = NoStatvfsServer(basedir, "\x00" * 20)
+        ss.setServiceParent(self.s)
         w = StorageStatus(ss)
         html = w.renderSynchronously()
         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
-        s = self.remove_tags(html)
+        s = remove_tags(html)
         self.failUnless("Accepting new shares: Yes" in s, s)
         self.failUnless("Total disk space: ?" in s, s)
 
@@ -1335,25 +1441,39 @@ class WebStatus(unittest.TestCase):
         basedir = "storage/WebStatus/readonly"
         fileutil.make_dirs(basedir)
         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
+        ss.setServiceParent(self.s)
         w = StorageStatus(ss)
         html = w.renderSynchronously()
         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
-        s = self.remove_tags(html)
+        s = remove_tags(html)
         self.failUnless("Accepting new shares: No" in s, s)
 
     def test_reserved(self):
         basedir = "storage/WebStatus/reserved"
         fileutil.make_dirs(basedir)
         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
+        ss.setServiceParent(self.s)
         w = StorageStatus(ss)
         html = w.renderSynchronously()
         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
-        s = self.remove_tags(html)
-        self.failUnless("Reserved space: - 10.00MB" in s, s)
+        s = remove_tags(html)
+        self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
+
+    def test_huge_reserved(self):
+        basedir = "storage/WebStatus/reserved"
+        fileutil.make_dirs(basedir)
+        ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
+        ss.setServiceParent(self.s)
+        w = StorageStatus(ss)
+        html = w.renderSynchronously()
+        self.failUnless("<h1>Storage Server Status</h1>" in html, html)
+        s = remove_tags(html)
+        self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
 
     def test_util(self):
-        self.failUnlessEqual(abbreviate_if_known(None), "?")
-        self.failUnlessEqual(abbreviate_if_known(10e6), "10.00MB")
+        w = StorageStatus(None)
+        self.failUnlessEqual(w.render_space(None, None), "?")
+        self.failUnlessEqual(w.render_space(None, 10e6), "10.00 MB (10000000)")
         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
 
diff --git a/src/allmydata/web/storage.py b/src/allmydata/web/storage.py
index a8698a47..fcdcef02 100644
--- a/src/allmydata/web/storage.py
+++ b/src/allmydata/web/storage.py
@@ -1,11 +1,7 @@
 
 from nevow import rend, tags as T
-from allmydata.web.common import getxmlfile, abbreviate_size
-
-def abbreviate_if_known(size):
-    if size is None:
-        return "?"
-    return abbreviate_size(size)
+from allmydata.web.common import getxmlfile, abbreviate_time
+from allmydata.util.abbreviate import abbreviate_space
 
 def remove_prefix(s, prefix):
     if not s.startswith(prefix):
@@ -29,8 +25,10 @@ class StorageStatus(rend.Page):
     def render_bool(self, ctx, data):
         return {True: "Yes", False: "No"}[bool(data)]
 
-    def render_space(self, ctx, data):
-        return abbreviate_if_known(data)
+    def render_space(self, ctx, size):
+        if size is None:
+            return "?"
+        return "%s (%d)" % (abbreviate_space(size), size)
 
     def data_stats(self, ctx, data):
         # FYI: 'data' appears to be self, rather than the StorageServer
@@ -51,8 +49,9 @@ class StorageStatus(rend.Page):
         # missing keys will cause an error, even if the renderer can tolerate
         # None values. To overcome this, we either need a dict-like object
         # that always returns None for unknown keys, or we must pre-populate
-        # our dict with those missing keys (or find some way to override
-        # Nevow's handling of dictionaries).
+        # our dict with those missing keys, or we should get rid of data_
+        # methods that return dicts (or find some way to override Nevow's
+        # handling of dictionaries).
 
         d = dict([ (remove_prefix(k, "storage_server."), v)
                    for k,v in self.storage.get_stats().items() ])
@@ -61,3 +60,22 @@ class StorageStatus(rend.Page):
         d.setdefault("reserved_space", None)
         d.setdefault("disk_avail", None)
         return d
+
+    def data_last_complete_share_count(self, ctx, data):
+        s = self.storage.bucket_counter.get_state()
+        lcsc = s.get("last-complete-share-count")
+        if lcsc is None:
+            return "Not computed yet"
+        cycle, count = lcsc
+        return count
+
+    def render_count_crawler_status(self, ctx, storage):
+        s = self.storage.bucket_counter.get_progress()
+        if s["cycle-in-progress"]:
+            pct = s["cycle-complete-percentage"]
+            soon = s["remaining-sleep-time"]
+            return ctx.tag["Current crawl %.1f%% complete" % pct,
+                           " (next work in %s)" % abbreviate_time(soon)]
+        else:
+            soon = s["remaining-wait-time"]
+            return ctx.tag["Next crawl in %s" % abbreviate_time(soon)]
diff --git a/src/allmydata/web/storage_status.xhtml b/src/allmydata/web/storage_status.xhtml
index 49139446..c98e6c6e 100644
--- a/src/allmydata/web/storage_status.xhtml
+++ b/src/allmydata/web/storage_status.xhtml
@@ -10,11 +10,6 @@
 
   <h1>Storage Server Status</h1>
 
-  <ul n:data="stats">
-    <li>Accepting new shares:
-       <span n:render="bool" n:data="accepting_immutable_shares" /></li>
-  </ul>
-
   <table n:data="stats">
     <tr><td>Total disk space:</td>
         <td><span n:render="space" n:data="disk_total" /></td></tr>
@@ -24,10 +19,26 @@
         <td>- <span n:render="space" n:data="reserved_space" /></td></tr>
     <tr><td />
         <td>======</td></tr>
-    <tr><td>Space Available:</td>
-        <td>&lt; <span n:render="space" n:data="disk_avail" /></td></tr>
+    <tr><td>Space Available to Tahoe:</td>
+        <td><span n:render="space" n:data="disk_avail" /></td></tr>
   </table>
 
+  <ul n:data="stats">
+    <li>Accepting new shares:
+       <span n:render="bool" n:data="accepting_immutable_shares" /></li>
+  </ul>
+
+  <ul>
+    <li>Total buckets:
+       <span n:render="string" n:data="last_complete_share_count" />
+       (the number of files and directories for which this server is holding
+        a share)
+      <ul>
+        <li n:render="count_crawler_status" />
+      </ul>
+    </li>
+  </ul>
+
 </div>
 
 </body>
-- 
2.45.2