d = {}
if self.state["current-cycle"] is None:
- assert self.sleeping_between_cycles
d["cycle-in-progress"] = False
d["next-crawl-time"] = self.next_wake_time
d["remaining-wait-time"] = self.next_wake_time - time.time()
except EnvironmentError:
state = {"version": 1,
"last-cycle-finished": None,
- "current-cycle": 0,
+ "current-cycle": None,
"last-complete-prefix": None,
"last-complete-bucket": None,
}
def startService(self):
self.load_state()
+ # arrange things to look like we were just sleeping, so
+ # status/progress values work correctly
+ self.sleeping_between_cycles = True
+ self.current_sleep_time = 0
+ self.next_wake_time = time.time()
self.timer = reactor.callLater(0, self.start_slice)
service.MultiService.startService(self)
def start_slice(self):
self.timer = None
+ self.sleeping_between_cycles = False
self.current_sleep_time = None
self.next_wake_time = None
start_slice = time.time()
try:
+ s = self.last_complete_prefix_index
self.start_current_prefix(start_slice)
finished_cycle = True
except TimeSliceExceeded:
self.timer = reactor.callLater(sleep_time, self.start_slice)
def start_current_prefix(self, start_slice):
- if self.state["current-cycle"] is None:
- assert self.state["last-cycle-finished"] is not None
- self.state["current-cycle"] = self.state["last-cycle-finished"] + 1
- cycle = self.state["current-cycle"]
+ state = self.state
+ if state["current-cycle"] is None:
+ if state["last-cycle-finished"] is None:
+ state["current-cycle"] = 0
+ else:
+ state["current-cycle"] = state["last-cycle-finished"] + 1
+ cycle = state["current-cycle"]
for i in range(self.last_complete_prefix_index+1, len(self.prefixes)):
- if time.time() > start_slice + self.cpu_slice:
- raise TimeSliceExceeded()
# if we want to yield earlier, just raise TimeSliceExceeded()
prefix = self.prefixes[i]
prefixdir = os.path.join(self.sharedir, prefix)
buckets, start_slice)
self.last_complete_prefix_index = i
self.save_state()
+ if time.time() > start_slice + self.cpu_slice:
+ raise TimeSliceExceeded()
# yay! we finished the whole cycle
self.last_complete_prefix_index = -1
- self.state["last-complete-bucket"] = None
- self.state["last-cycle-finished"] = cycle
- self.state["current-cycle"] = None
+ state["last-complete-bucket"] = None
+ state["last-cycle-finished"] = cycle
+ state["current-cycle"] = None
self.finished_cycle(cycle)
self.save_state()
for bucket in buckets:
if bucket <= self.state["last-complete-bucket"]:
continue
- if time.time() > start_slice + self.cpu_slice:
- raise TimeSliceExceeded()
self.process_bucket(cycle, prefix, prefixdir, bucket)
self.state["last-complete-bucket"] = bucket
+ # note: saving the state after every bucket is somewhat
+ # time-consuming, but lets us avoid losing more than one bucket's
+ # worth of progress.
self.save_state()
+ if time.time() > start_slice + self.cpu_slice:
+ raise TimeSliceExceeded()
def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
"""Examine a single bucket. Subclasses should do whatever they want
"""
pass
+
+class BucketCountingCrawler(ShareCrawler):
+ """I keep track of how many buckets are being managed by this server.
+ This is equivalent to the number of distributed files and directories for
+ which I am providing storage. The actual number of files+directories in
+ the full grid is probably higher (especially when there are more servers
+ than 'N', the number of generated shares), because some files+directories
+ will have shares on other servers instead of me.
+ """
+
+ minimum_cycle_time = 60*60 # we don't need this more than once an hour
+
+ def __init__(self, server, statefile, num_sample_prefixes=1):
+ ShareCrawler.__init__(self, server, statefile)
+ self.num_sample_prefixes = num_sample_prefixes
+
+ def add_initial_state(self):
+ # ["share-counts"][cyclenum][prefix] = number
+ # ["last-complete-cycle"] = cyclenum # maintained by base class
+ # ["last-complete-share-count"] = number
+ # ["storage-index-samples"][prefix] = (cyclenum,
+ # list of SI strings (base32))
+ self.state.setdefault("share-counts", {})
+ self.state.setdefault("last-complete-share-count", None)
+ self.state.setdefault("storage-index-samples", {})
+
+ def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
+ # we override process_prefixdir() because we don't want to look at
+ # the individual buckets. We'll save state after each one. On my
+ # laptop, a mostly-empty storage server can process about 70
+ # prefixdirs in a 1.0s slice.
+ if cycle not in self.state["share-counts"]:
+ self.state["share-counts"][cycle] = {}
+ self.state["share-counts"][cycle][prefix] = len(buckets)
+ if prefix in self.prefixes[:self.num_sample_prefixes]:
+ self.state["storage-index-samples"][prefix] = (cycle, buckets)
+
+ def finished_cycle(self, cycle):
+ last_counts = self.state["share-counts"].get(cycle, [])
+ if len(last_counts) == len(self.prefixes):
+ # great, we have a whole cycle.
+ num_buckets = sum(last_counts.values())
+ self.state["last-complete-share-count"] = (cycle, num_buckets)
+ # get rid of old counts
+ for old_cycle in list(self.state["share-counts"].keys()):
+ if old_cycle != cycle:
+ del self.state["share-counts"][old_cycle]
+ # get rid of old samples too
+ for prefix in list(self.state["storage-index-samples"].keys()):
+ old_cycle,buckets = self.state["storage-index-samples"][prefix]
+ if old_cycle != cycle:
+ del self.state["storage-index-samples"][prefix]
+
from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
create_mutable_sharefile
from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
+from allmydata.storage.crawler import BucketCountingCrawler
# storage/
# storage/shares/incoming
"cancel": [],
}
+ statefile = os.path.join(storedir, "bucket_counter.state")
+ self.bucket_counter = BucketCountingCrawler(self, statefile)
+ self.bucket_counter.setServiceParent(self)
+
def count(self, name, delta=1):
if self.stats_provider:
self.stats_provider.count("storage_server." + name, delta)
c = BucketEnumeratingCrawler(ss, statefile)
c.setServiceParent(self.s)
+ # it should be legal to call get_state() and get_progress() right
+ # away, even before the first tick is performed. No work should have
+ # been done yet.
+ s = c.get_state()
+ p = c.get_progress()
+ self.failUnlessEqual(s["last-complete-prefix"], None)
+ self.failUnlessEqual(s["current-cycle"], None)
+ self.failUnlessEqual(p["cycle-in-progress"], False)
+
d = c.finished_d
def _check(ignored):
self.failUnlessEqual(sorted(sis), sorted(c.all_buckets))
self.failIf(c.running)
self.failIf(c.timer)
self.failIf(c.current_sleep_time)
+ s = c.get_state()
+ self.failUnlessEqual(s["last-cycle-finished"], 0)
+ self.failUnlessEqual(s["current-cycle"], None)
d.addCallback(_check)
return d
+
+import time, os.path, stat, re
+
from twisted.trial import unittest
from twisted.internet import defer
-import time, os.path, stat, re
+from twisted.application import service
+from foolscap import eventual
import itertools
from allmydata import interfaces
-from allmydata.util import fileutil, hashutil, base32
+from allmydata.util import fileutil, hashutil, base32, pollmixin
from allmydata.storage.server import StorageServer, storage_index_to_dir
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import BucketWriter, BucketReader
ReadBucketProxy
from allmydata.interfaces import BadWriteEnablerError
from allmydata.test.common import LoggingServiceParent
-from allmydata.web.storage import StorageStatus, abbreviate_if_known, \
- remove_prefix
+from allmydata.web.storage import StorageStatus, remove_prefix
class Marker:
pass
self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
+def remove_tags(s):
+ s = re.sub(r'<[^>]*>', ' ', s)
+ s = re.sub(r'\s+', ' ', s)
+ return s
+
+class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
+
+ def setUp(self):
+ self.s = service.MultiService()
+ self.s.startService()
+ def tearDown(self):
+ return self.s.stopService()
+
+ def test_bucket_counter(self):
+ basedir = "storage/BucketCounter/bucket_counter"
+ fileutil.make_dirs(basedir)
+ ss = StorageServer(basedir, "\x00" * 20)
+ # to make sure we capture the bucket-counting-crawler in the middle
+ # of a cycle, we reach in and reduce its maximum slice time to 0.
+ orig_cpu_slice = ss.bucket_counter.cpu_slice
+ ss.bucket_counter.cpu_slice = 0
+ ss.setServiceParent(self.s)
+
+ w = StorageStatus(ss)
+
+ # this sample is before the crawler has started doing anything
+ html = w.renderSynchronously()
+ self.failUnless("<h1>Storage Server Status</h1>" in html, html)
+ s = remove_tags(html)
+ self.failUnless("Accepting new shares: Yes" in s, s)
+ self.failUnless("Reserved space: - 0 B (0)" in s, s)
+ self.failUnless("Total buckets: Not computed yet" in s, s)
+ self.failUnless("Next crawl in" in s, s)
+
+ # give the bucket-counting-crawler one tick to get started. The
+ # cpu_slice=0 will force it to yield right after it processes the
+ # first prefix
+
+ d = eventual.fireEventually()
+ def _check(ignored):
+ # are we really right after the first prefix?
+ state = ss.bucket_counter.get_state()
+ self.failUnlessEqual(state["last-complete-prefix"],
+ ss.bucket_counter.prefixes[0])
+ ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
+ html = w.renderSynchronously()
+ s = remove_tags(html)
+ self.failUnless(" Current crawl " in s, s)
+ self.failUnless(" (next work in " in s, s)
+ d.addCallback(_check)
+
+ # now give it enough time to complete a full cycle
+ def _watch():
+ return not ss.bucket_counter.get_progress()["cycle-in-progress"]
+ d.addCallback(lambda ignored: self.poll(_watch))
+ def _check2(ignored):
+ ss.bucket_counter.cpu_slice = orig_cpu_slice
+ html = w.renderSynchronously()
+ s = remove_tags(html)
+ self.failUnless("Total buckets: 0 (the number of" in s, s)
+ self.failUnless("Next crawl in 359" in s, s) # about 3600-1 seconds
+ d.addCallback(_check2)
+ return d
+
+ def test_bucket_counter_cleanup(self):
+ basedir = "storage/BucketCounter/bucket_counter_cleanup"
+ fileutil.make_dirs(basedir)
+ ss = StorageServer(basedir, "\x00" * 20)
+ # to make sure we capture the bucket-counting-crawler in the middle
+ # of a cycle, we reach in and reduce its maximum slice time to 0.
+ orig_cpu_slice = ss.bucket_counter.cpu_slice
+ ss.bucket_counter.cpu_slice = 0
+ ss.setServiceParent(self.s)
+
+ d = eventual.fireEventually()
+
+ def _after_first_prefix(ignored):
+ ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
+ # now sneak in and mess with its state, to make sure it cleans up
+ # properly at the end of the cycle
+ state = ss.bucket_counter.state
+ self.failUnlessEqual(state["last-complete-prefix"],
+ ss.bucket_counter.prefixes[0])
+ state["share-counts"][-12] = {}
+ state["storage-index-samples"]["bogusprefix!"] = (-12, [])
+ ss.bucket_counter.save_state()
+ d.addCallback(_after_first_prefix)
+
+ # now give it enough time to complete a cycle
+ def _watch():
+ return not ss.bucket_counter.get_progress()["cycle-in-progress"]
+ d.addCallback(lambda ignored: self.poll(_watch))
+ def _check2(ignored):
+ ss.bucket_counter.cpu_slice = orig_cpu_slice
+ s = ss.bucket_counter.get_state()
+ self.failIf(-12 in s["share-counts"], s["share-counts"].keys())
+ self.failIf("bogusprefix!" in s["storage-index-samples"],
+ s["storage-index-samples"].keys())
+ d.addCallback(_check2)
+ return d
+
class NoStatvfsServer(StorageServer):
def do_statvfs(self):
raise AttributeError
-class WebStatus(unittest.TestCase):
+class WebStatus(unittest.TestCase, pollmixin.PollMixin):
+
+ def setUp(self):
+ self.s = service.MultiService()
+ self.s.startService()
+ def tearDown(self):
+ return self.s.stopService()
def test_no_server(self):
w = StorageStatus(None)
html = w.renderSynchronously()
self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
-
- def remove_tags(self, s):
- s = re.sub(r'<[^>]*>', ' ', s)
- s = re.sub(r'\s+', ' ', s)
- return s
-
def test_status(self):
basedir = "storage/WebStatus/status"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20)
+ ss.setServiceParent(self.s)
w = StorageStatus(ss)
html = w.renderSynchronously()
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
- s = self.remove_tags(html)
+ s = remove_tags(html)
self.failUnless("Accepting new shares: Yes" in s, s)
- self.failUnless("Reserved space: - 0B" in s, s)
+ self.failUnless("Reserved space: - 0 B (0)" in s, s)
def test_status_no_statvfs(self):
# windows has no os.statvfs . Make sure the code handles that even on
basedir = "storage/WebStatus/status_no_statvfs"
fileutil.make_dirs(basedir)
ss = NoStatvfsServer(basedir, "\x00" * 20)
+ ss.setServiceParent(self.s)
w = StorageStatus(ss)
html = w.renderSynchronously()
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
- s = self.remove_tags(html)
+ s = remove_tags(html)
self.failUnless("Accepting new shares: Yes" in s, s)
self.failUnless("Total disk space: ?" in s, s)
basedir = "storage/WebStatus/readonly"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
+ ss.setServiceParent(self.s)
w = StorageStatus(ss)
html = w.renderSynchronously()
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
- s = self.remove_tags(html)
+ s = remove_tags(html)
self.failUnless("Accepting new shares: No" in s, s)
def test_reserved(self):
basedir = "storage/WebStatus/reserved"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
+ ss.setServiceParent(self.s)
w = StorageStatus(ss)
html = w.renderSynchronously()
self.failUnless("<h1>Storage Server Status</h1>" in html, html)
- s = self.remove_tags(html)
- self.failUnless("Reserved space: - 10.00MB" in s, s)
+ s = remove_tags(html)
+ self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
+
+ def test_huge_reserved(self):
+ basedir = "storage/WebStatus/reserved"
+ fileutil.make_dirs(basedir)
+ ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
+ ss.setServiceParent(self.s)
+ w = StorageStatus(ss)
+ html = w.renderSynchronously()
+ self.failUnless("<h1>Storage Server Status</h1>" in html, html)
+ s = remove_tags(html)
+ self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
def test_util(self):
- self.failUnlessEqual(abbreviate_if_known(None), "?")
- self.failUnlessEqual(abbreviate_if_known(10e6), "10.00MB")
+ w = StorageStatus(None)
+ self.failUnlessEqual(w.render_space(None, None), "?")
+ self.failUnlessEqual(w.render_space(None, 10e6), "10.00 MB (10000000)")
self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
from nevow import rend, tags as T
-from allmydata.web.common import getxmlfile, abbreviate_size
-
-def abbreviate_if_known(size):
- if size is None:
- return "?"
- return abbreviate_size(size)
+from allmydata.web.common import getxmlfile, abbreviate_time
+from allmydata.util.abbreviate import abbreviate_space
def remove_prefix(s, prefix):
if not s.startswith(prefix):
def render_bool(self, ctx, data):
return {True: "Yes", False: "No"}[bool(data)]
- def render_space(self, ctx, data):
- return abbreviate_if_known(data)
+ def render_space(self, ctx, size):
+ if size is None:
+ return "?"
+ return "%s (%d)" % (abbreviate_space(size), size)
def data_stats(self, ctx, data):
# FYI: 'data' appears to be self, rather than the StorageServer
# missing keys will cause an error, even if the renderer can tolerate
# None values. To overcome this, we either need a dict-like object
# that always returns None for unknown keys, or we must pre-populate
- # our dict with those missing keys (or find some way to override
- # Nevow's handling of dictionaries).
+ # our dict with those missing keys, or we should get rid of data_
+ # methods that return dicts (or find some way to override Nevow's
+ # handling of dictionaries).
d = dict([ (remove_prefix(k, "storage_server."), v)
for k,v in self.storage.get_stats().items() ])
d.setdefault("reserved_space", None)
d.setdefault("disk_avail", None)
return d
+
+ def data_last_complete_share_count(self, ctx, data):
+ s = self.storage.bucket_counter.get_state()
+ lcsc = s.get("last-complete-share-count")
+ if lcsc is None:
+ return "Not computed yet"
+ cycle, count = lcsc
+ return count
+
+ def render_count_crawler_status(self, ctx, storage):
+ s = self.storage.bucket_counter.get_progress()
+ if s["cycle-in-progress"]:
+ pct = s["cycle-complete-percentage"]
+ soon = s["remaining-sleep-time"]
+ return ctx.tag["Current crawl %.1f%% complete" % pct,
+ " (next work in %s)" % abbreviate_time(soon)]
+ else:
+ soon = s["remaining-wait-time"]
+ return ctx.tag["Next crawl in %s" % abbreviate_time(soon)]
<h1>Storage Server Status</h1>
- <ul n:data="stats">
- <li>Accepting new shares:
- <span n:render="bool" n:data="accepting_immutable_shares" /></li>
- </ul>
-
<table n:data="stats">
<tr><td>Total disk space:</td>
<td><span n:render="space" n:data="disk_total" /></td></tr>
<td>- <span n:render="space" n:data="reserved_space" /></td></tr>
<tr><td />
<td>======</td></tr>
- <tr><td>Space Available:</td>
- <td>< <span n:render="space" n:data="disk_avail" /></td></tr>
+ <tr><td>Space Available to Tahoe:</td>
+ <td><span n:render="space" n:data="disk_avail" /></td></tr>
</table>
+ <ul n:data="stats">
+ <li>Accepting new shares:
+ <span n:render="bool" n:data="accepting_immutable_shares" /></li>
+ </ul>
+
+ <ul>
+ <li>Total buckets:
+ <span n:render="string" n:data="last_complete_share_count" />
+ (the number of files and directories for which this server is holding
+ a share)
+ <ul>
+ <li n:render="count_crawler_status" />
+ </ul>
+ </li>
+ </ul>
+
</div>
</body>