From: Brian Warner Date: Thu, 15 Jan 2009 00:36:20 +0000 (-0700) Subject: mutable: move recent operation history management code (MutableWatcher) into history... X-Git-Tag: allmydata-tahoe-1.3.0~193 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/%22doc.html/COPYING.GPL?a=commitdiff_plain;h=2fe099a0b3942555698c7e18bd14fb07f3c2e3c0;p=tahoe-lafs%2Ftahoe-lafs.git mutable: move recent operation history management code (MutableWatcher) into history.py, have History provide stats --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 321ec2fa..41c2f7e7 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -22,7 +22,7 @@ from allmydata.util import hashutil, base32, pollmixin, cachedir from allmydata.util.abbreviate import parse_abbreviated_size from allmydata.uri import LiteralFileURI from allmydata.dirnode import NewDirectoryNode -from allmydata.mutable.filenode import MutableFileNode, MutableWatcher +from allmydata.mutable.filenode import MutableFileNode from allmydata.stats import StatsProvider from allmydata.history import History from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \ @@ -189,14 +189,13 @@ class Client(node.Node, pollmixin.PollMixin): convergence_s = self.get_or_create_private_config('convergence', _make_secret) self.convergence = base32.a2b(convergence_s) self._node_cache = weakref.WeakValueDictionary() # uri -> node - self.add_service(History()) + self.add_service(History(self.stats_provider)) self.add_service(Uploader(helper_furl, self.stats_provider)) download_cachedir = os.path.join(self.basedir, "private", "cache", "download") self.download_cache = cachedir.CacheDirectoryManager(download_cachedir) self.download_cache.setServiceParent(self) self.add_service(Downloader(self.stats_provider)) - self.add_service(MutableWatcher(self.stats_provider)) def _publish(res): # we publish an empty object so that the introducer can count how # many clients are connected and see what versions they're @@ -373,14 +372,6 @@ class Client(node.Node, pollmixin.PollMixin): self._node_cache[u_s] = node return self._node_cache[u_s] - def notify_publish(self, publish_status, size): - self.getServiceNamed("mutable-watcher").notify_publish(publish_status, - size) - def notify_retrieve(self, retrieve_status): - self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status) - def notify_mapupdate(self, update_status): - self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status) - def create_empty_dirnode(self): n = NewDirectoryNode(self) d = n.create(self._generate_pubprivkeys) @@ -421,14 +412,11 @@ class Client(node.Node, pollmixin.PollMixin): return self.get_history().list_all_download_statuses() def list_all_mapupdate_statuses(self): - watcher = self.getServiceNamed("mutable-watcher") - return watcher.list_all_mapupdate_statuses() + return self.get_history().list_all_mapupdate_statuses() def list_all_publish_statuses(self): - watcher = self.getServiceNamed("mutable-watcher") - return watcher.list_all_publish_statuses() + return self.get_history().list_all_publish_statuses() def list_all_retrieve_statuses(self): - watcher = self.getServiceNamed("mutable-watcher") - return watcher.list_all_retrieve_statuses() + return self.get_history().list_all_retrieve_statuses() def list_all_helper_statuses(self): try: diff --git a/src/allmydata/history.py b/src/allmydata/history.py index 23798928..40b6cf09 100644 --- a/src/allmydata/history.py +++ b/src/allmydata/history.py @@ -8,13 +8,26 @@ class History(service.Service): name = "history" MAX_DOWNLOAD_STATUSES = 10 MAX_UPLOAD_STATUSES = 10 + MAX_MAPUPDATE_STATUSES = 20 + MAX_PUBLISH_STATUSES = 20 + MAX_RETRIEVE_STATUSES = 20 + + def __init__(self, stats_provider=None): + self.stats_provider = stats_provider - def __init__(self): self.all_downloads_statuses = weakref.WeakKeyDictionary() self.recent_download_statuses = [] self.all_upload_statuses = weakref.WeakKeyDictionary() self.recent_upload_statuses = [] + self.all_mapupdate_status = weakref.WeakKeyDictionary() + self.recent_mapupdate_status = [] + self.all_publish_status = weakref.WeakKeyDictionary() + self.recent_publish_status = [] + self.all_retrieve_status = weakref.WeakKeyDictionary() + self.recent_retrieve_status = [] + + def add_download(self, download_status): self.all_downloads_statuses[download_status] = None self.recent_download_statuses.append(download_status) @@ -34,3 +47,47 @@ class History(service.Service): def list_all_upload_statuses(self): for us in self.all_upload_statuses: yield us + + + + def notify_mapupdate(self, p): + self.all_mapupdate_status[p] = None + self.recent_mapupdate_status.append(p) + while len(self.recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES: + self.recent_mapupdate_status.pop(0) + + def notify_publish(self, p, size): + self.all_publish_status[p] = None + self.recent_publish_status.append(p) + if self.stats_provider: + self.stats_provider.count('mutable.files_published', 1) + # We must be told bytes_published as an argument, since the + # publish_status does not yet know how much data it will be asked + # to send. When we move to MDMF we'll need to find a better way + # to handle this. + self.stats_provider.count('mutable.bytes_published', size) + while len(self.recent_publish_status) > self.MAX_PUBLISH_STATUSES: + self.recent_publish_status.pop(0) + + def notify_retrieve(self, r): + self.all_retrieve_status[r] = None + self.recent_retrieve_status.append(r) + if self.stats_provider: + self.stats_provider.count('mutable.files_retrieved', 1) + self.stats_provider.count('mutable.bytes_retrieved', r.get_size()) + while len(self.recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES: + self.recent_retrieve_status.pop(0) + + + def list_all_mapupdate_statuses(self): + for s in self.all_mapupdate_status: + yield s + def list_all_publish_statuses(self): + for s in self.all_publish_status: + yield s + def list_all_retrieve_statuses(self): + for s in self.all_retrieve_status: + yield s + + + diff --git a/src/allmydata/mutable/checker.py b/src/allmydata/mutable/checker.py index f1bba512..c0dd701e 100644 --- a/src/allmydata/mutable/checker.py +++ b/src/allmydata/mutable/checker.py @@ -24,7 +24,9 @@ class MutableChecker: def check(self, verify=False): servermap = ServerMap() u = ServermapUpdater(self._node, self._monitor, servermap, MODE_CHECK) - self._node._client.notify_mapupdate(u.get_status()) + history = self._node._client.get_history() + if history: + history.notify_mapupdate(u.get_status()) d = u.update() d.addCallback(self._got_mapupdate_results) if verify: diff --git a/src/allmydata/mutable/filenode.py b/src/allmydata/mutable/filenode.py index 1951ff1d..5c2e183d 100644 --- a/src/allmydata/mutable/filenode.py +++ b/src/allmydata/mutable/filenode.py @@ -1,6 +1,5 @@ -import weakref, random -from twisted.application import service +import random from zope.interface import implements from twisted.internet import defer, reactor @@ -407,7 +406,9 @@ class MutableFileNode: return self._update_servermap(servermap, mode) def _update_servermap(self, servermap, mode): u = ServermapUpdater(self, Monitor(), servermap, mode) - self._client.notify_mapupdate(u.get_status()) + history = self._client.get_history() + if history: + history.notify_mapupdate(u.get_status()) return u.update() def download_version(self, servermap, version, fetch_privkey=False): @@ -416,7 +417,9 @@ class MutableFileNode: def _try_once_to_download_version(self, servermap, version, fetch_privkey=False): r = Retrieve(self, servermap, version, fetch_privkey) - self._client.notify_retrieve(r.get_status()) + history = self._client.get_history() + if history: + history.notify_retrieve(r.get_status()) return r.download() def upload(self, new_contents, servermap): @@ -424,61 +427,7 @@ class MutableFileNode: def _upload(self, new_contents, servermap): assert self._pubkey, "update_servermap must be called before publish" p = Publish(self, servermap) - self._client.notify_publish(p.get_status(), len(new_contents)) + history = self._client.get_history() + if history: + history.notify_publish(p.get_status(), len(new_contents)) return p.publish(new_contents) - - - - -class MutableWatcher(service.MultiService): - MAX_MAPUPDATE_STATUSES = 20 - MAX_PUBLISH_STATUSES = 20 - MAX_RETRIEVE_STATUSES = 20 - name = "mutable-watcher" - - def __init__(self, stats_provider=None): - service.MultiService.__init__(self) - self.stats_provider = stats_provider - self._all_mapupdate_status = weakref.WeakKeyDictionary() - self._recent_mapupdate_status = [] - self._all_publish_status = weakref.WeakKeyDictionary() - self._recent_publish_status = [] - self._all_retrieve_status = weakref.WeakKeyDictionary() - self._recent_retrieve_status = [] - - - def notify_mapupdate(self, p): - self._all_mapupdate_status[p] = None - self._recent_mapupdate_status.append(p) - while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES: - self._recent_mapupdate_status.pop(0) - - def notify_publish(self, p, size): - self._all_publish_status[p] = None - self._recent_publish_status.append(p) - if self.stats_provider: - self.stats_provider.count('mutable.files_published', 1) - # We must be told bytes_published as an argument, since the - # publish_status does not yet know how much data it will be asked - # to send. When we move to MDMF we'll need to find a better way - # to handle this. - self.stats_provider.count('mutable.bytes_published', size) - while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES: - self._recent_publish_status.pop(0) - - def notify_retrieve(self, r): - self._all_retrieve_status[r] = None - self._recent_retrieve_status.append(r) - if self.stats_provider: - self.stats_provider.count('mutable.files_retrieved', 1) - self.stats_provider.count('mutable.bytes_retrieved', r.get_size()) - while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES: - self._recent_retrieve_status.pop(0) - - - def list_all_mapupdate_statuses(self): - return self._all_mapupdate_status.keys() - def list_all_publish_statuses(self): - return self._all_publish_status.keys() - def list_all_retrieve_statuses(self): - return self._all_retrieve_status.keys() diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 329912bc..f3c6e469 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -194,12 +194,8 @@ class FakeClient: d.addCallback(lambda res: n) return d - def notify_retrieve(self, r): - pass - def notify_publish(self, p, size): - pass - def notify_mapupdate(self, u): - pass + def get_history(self): + return None def create_node_from_uri(self, u): u = IURI(u) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index cdd5708d..172bd603 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -1199,11 +1199,11 @@ class SystemTest(SystemTestMixin, unittest.TestCase): for us in self.clients[0].list_all_upload_statuses(): if us.get_size() > 200: self._up_status = us.get_counter() - rs = self.clients[0].list_all_retrieve_statuses()[0] + rs = list(self.clients[0].list_all_retrieve_statuses())[0] self._retrieve_status = rs.get_counter() - ps = self.clients[0].list_all_publish_statuses()[0] + ps = list(self.clients[0].list_all_publish_statuses())[0] self._publish_status = ps.get_counter() - us = self.clients[0].list_all_mapupdate_statuses()[0] + us = list(self.clients[0].list_all_mapupdate_statuses())[0] self._update_status = us.get_counter() # and that there are some upload- and download- status pages