"""
pass
-
-class BucketCountingCrawler(ShareCrawler):
- """
- I keep track of how many sharesets, each corresponding to a storage index,
- are being managed by this server. This is equivalent to the number of
- distributed files and directories for which I am providing storage. The
- actual number of files and directories in the full grid is probably higher
- (especially when there are more servers than 'N', the number of generated
- shares), because some files and directories will have shares on other
- servers instead of me. Also note that the number of sharesets will differ
- from the number of shares in small grids, when more than one share is
- placed on a single server.
- """
-
- minimum_cycle_time = 60*60 # we don't need this more than once an hour
-
- def add_initial_state(self):
- # ["bucket-counts"][cyclenum][prefix] = number
- # ["last-complete-cycle"] = cyclenum # maintained by base class
- # ["last-complete-bucket-count"] = number
- self.state.setdefault("bucket-counts", {})
- self.state.setdefault("last-complete-bucket-count", None)
-
- def process_prefix(self, cycle, prefix, start_slice):
- # We don't need to look at the individual sharesets.
- d = self.backend.get_sharesets_for_prefix(prefix)
- def _got_sharesets(sharesets):
- if cycle not in self.state["bucket-counts"]:
- self.state["bucket-counts"][cycle] = {}
- self.state["bucket-counts"][cycle][prefix] = len(sharesets)
- d.addCallback(_got_sharesets)
- return d
-
- def finished_cycle(self, cycle):
- last_counts = self.state["bucket-counts"].get(cycle, [])
- if len(last_counts) == len(self.prefixes):
- # great, we have a whole cycle.
- num_sharesets = sum(last_counts.values())
- self.state["last-complete-bucket-count"] = num_sharesets
- # get rid of old counts
- for old_cycle in list(self.state["bucket-counts"].keys()):
- if old_cycle != cycle:
- del self.state["bucket-counts"][old_cycle]
from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
_pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
-from allmydata.storage.crawler import BucketCountingCrawler
from allmydata.storage.accountant import Accountant
from allmydata.storage.expiration import ExpirationPolicy
class StorageServer(service.MultiService):
implements(IStatsProducer)
name = 'storage'
- BucketCounterClass = BucketCountingCrawler
DEFAULT_EXPIRATION_POLICY = ExpirationPolicy(enabled=False)
def __init__(self, serverid, backend, statedir,
"cancel": [],
}
- self.init_bucket_counter()
self.init_accountant(expiration_policy or self.DEFAULT_EXPIRATION_POLICY)
def init_accountant(self, expiration_policy):
def get_expiration_policy(self):
return self.accountant.get_accounting_crawler().get_expiration_policy()
- def get_bucket_counter(self):
- return self.bucket_counter
-
def get_serverid(self):
return self._serverid
def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.get_serverid()),)
- def init_bucket_counter(self):
- statefile = os.path.join(self._statedir, "bucket_counter.state")
- self.bucket_counter = self.BucketCounterClass(self.backend, statefile,
- clock=self.clock)
- self.bucket_counter.setServiceParent(self)
-
def count(self, name, delta=1):
if self.stats_provider:
self.stats_provider.count("storage_server." + name, delta)
return s
-class BucketCounterTest(WithDiskBackend, CrawlerTestMixin, ReallyEqualMixin, unittest.TestCase):
- def test_bucket_counter(self):
- server = self.create("test_bucket_counter", detached=True)
- bucket_counter = server.bucket_counter
-
- # finish as fast as possible
- bucket_counter.slow_start = 0
- bucket_counter.cpu_slice = 100.0
-
- d = server.bucket_counter.set_hook('after_prefix')
-
- server.setServiceParent(self.sparent)
-
- w = StorageStatus(server)
-
- # this sample is before the crawler has started doing anything
- html = w.renderSynchronously()
- self.failUnlessIn("<h1>Storage Server Status</h1>", html)
- s = remove_tags(html)
- self.failUnlessIn("Accepting new shares: Yes", s)
- self.failUnlessIn("Reserved space: - 0 B (0)", s)
- self.failUnlessIn("Total sharesets: Not computed yet", s)
- self.failUnlessIn("Next crawl in", s)
-
- def _after_first_prefix(prefix):
- server.bucket_counter.save_state()
- state = bucket_counter.get_state()
- self.failUnlessEqual(prefix, state["last-complete-prefix"])
- self.failUnlessEqual(prefix, bucket_counter.prefixes[0])
-
- html = w.renderSynchronously()
- s = remove_tags(html)
- self.failUnlessIn(" Current crawl ", s)
- self.failUnlessIn(" (next work in ", s)
-
- return bucket_counter.set_hook('after_cycle')
- d.addCallback(_after_first_prefix)
-
- def _after_first_cycle(cycle):
- self.failUnlessEqual(cycle, 0)
- progress = bucket_counter.get_progress()
- self.failUnlessReallyEqual(progress["cycle-in-progress"], False)
- d.addCallback(_after_first_cycle)
- d.addBoth(self._wait_for_yield, bucket_counter)
-
- def _after_yield(ign):
- html = w.renderSynchronously()
- s = remove_tags(html)
- self.failUnlessIn("Total sharesets: 0 (the number of", s)
- self.failUnless("Next crawl in 59 minutes" in s or "Next crawl in 60 minutes" in s, s)
- d.addCallback(_after_yield)
- return d
-
- def test_bucket_counter_cleanup(self):
- server = self.create("test_bucket_counter_cleanup", detached=True)
- bucket_counter = server.bucket_counter
-
- # finish as fast as possible
- bucket_counter.slow_start = 0
- bucket_counter.cpu_slice = 100.0
-
- d = bucket_counter.set_hook('after_prefix')
-
- server.setServiceParent(self.sparent)
-
- def _after_first_prefix(prefix):
- bucket_counter.save_state()
- state = bucket_counter.state
- self.failUnlessEqual(prefix, state["last-complete-prefix"])
- self.failUnlessEqual(prefix, bucket_counter.prefixes[0])
-
- # now sneak in and mess with its state, to make sure it cleans up
- # properly at the end of the cycle
- state["bucket-counts"][-12] = {}
- bucket_counter.save_state()
-
- return bucket_counter.set_hook('after_cycle')
- d.addCallback(_after_first_prefix)
-
- def _after_first_cycle(cycle):
- self.failUnlessEqual(cycle, 0)
- progress = bucket_counter.get_progress()
- self.failUnlessReallyEqual(progress["cycle-in-progress"], False)
-
- s = bucket_counter.get_state()
- self.failIf(-12 in s["bucket-counts"], s["bucket-counts"].keys())
- d.addCallback(_after_first_cycle)
- d.addBoth(self._wait_for_yield, bucket_counter)
- return d
-
- def test_bucket_counter_eta(self):
- server = self.create("test_bucket_counter_eta", detached=True)
- bucket_counter = server.bucket_counter
-
- # finish as fast as possible
- bucket_counter.slow_start = 0
- bucket_counter.cpu_slice = 100.0
-
- d = bucket_counter.set_hook('after_prefix')
-
- server.setServiceParent(self.sparent)
-
- w = StorageStatus(server)
-
- def _check_1(prefix1):
- # no ETA is available yet
- html = w.renderSynchronously()
- s = remove_tags(html)
- self.failUnlessIn("complete (next work", s)
-
- return bucket_counter.set_hook('after_prefix')
- d.addCallback(_check_1)
-
- def _check_2(prefix2):
- # an ETA based upon elapsed time should be available.
- html = w.renderSynchronously()
- s = remove_tags(html)
- self.failUnlessIn("complete (ETA ", s)
- d.addCallback(_check_2)
- d.addBoth(self._wait_for_yield, bucket_counter)
- return d
-
-
class AccountingCrawlerTest(CrawlerTestMixin, WebRenderingMixin, ReallyEqualMixin):
def make_shares(self, server):
aa = server.get_accountant().get_anonymous_account()
def render_JSON(self, req):
req.setHeader("content-type", "text/plain")
accounting_crawler = self.storage.get_accounting_crawler()
- bucket_counter = self.storage.get_bucket_counter()
d = {"stats": self.storage.get_stats(),
- "bucket-counter": bucket_counter.get_state(),
+ "bucket-counter": None,
"lease-checker": accounting_crawler.get_state(),
"lease-checker-progress": accounting_crawler.get_progress(),
}
return d
def data_last_complete_bucket_count(self, ctx, data):
- s = self.storage.get_bucket_counter().get_state()
- count = s.get("last-complete-bucket-count")
- if count is None:
+ s = self.storage.get_stats()
+ if "storage_server.total_bucket_count" not in s:
return "Not computed yet"
- return count
+ return s['storage_server.total_bucket_count']
def render_count_crawler_status(self, ctx, storage):
- p = self.storage.get_bucket_counter().get_progress()
- return ctx.tag[self.format_crawler_progress(p)]
+ return ctx.tag
def format_crawler_progress(self, p):
cycletime = p["estimated-time-per-cycle"]