From: Brian Warner Date: Thu, 17 Apr 2008 20:02:22 +0000 (-0700) Subject: mutable WIP: clean up status handling, shrink the code a lot, improve test coverage X-Git-Tag: allmydata-tahoe-1.1.0~234 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/frontends/something?a=commitdiff_plain;h=e1838ba2171424f76801e939f3765af4c228f40d;p=tahoe-lafs%2Ftahoe-lafs.git mutable WIP: clean up status handling, shrink the code a lot, improve test coverage --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index f3b9c599..e21ed6ff 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -329,7 +329,8 @@ class Client(node.Node, testutil.PollMixin): d.addCallback(make_key_objs) return d else: - # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs + # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 + # secs signer = rsa.generate(key_size) verifier = signer.get_verifying_key() return verifier, signer @@ -339,66 +340,28 @@ class Client(node.Node, testutil.PollMixin): return uploader.upload(uploadable) - def list_all_uploads(self): + def list_all_upload_statuses(self): uploader = self.getServiceNamed("uploader") - return uploader.list_all_uploads() - def list_active_uploads(self): - uploader = self.getServiceNamed("uploader") - return uploader.list_active_uploads() - def list_recent_uploads(self): - uploader = self.getServiceNamed("uploader") - return uploader.list_recent_uploads() + return uploader.list_all_upload_statuses() - def list_all_downloads(self): - downloader = self.getServiceNamed("downloader") - return downloader.list_all_downloads() - def list_active_downloads(self): - downloader = self.getServiceNamed("downloader") - return downloader.list_active_downloads() - def list_recent_downloads(self): + def list_all_download_statuses(self): downloader = self.getServiceNamed("downloader") - return downloader.list_recent_downloads() - - def list_all_mapupdate(self): - watcher = self.getServiceNamed("mutable-watcher") - return watcher.list_all_mapupdate() - def list_active_mapupdate(self): - watcher = self.getServiceNamed("mutable-watcher") - return watcher.list_active_mapupdate() - def list_recent_mapupdate(self): - watcher = self.getServiceNamed("mutable-watcher") - return watcher.list_recent_mapupdate() + return downloader.list_all_download_statuses() - def list_all_publish(self): + def list_all_mapupdate_statuses(self): watcher = self.getServiceNamed("mutable-watcher") - return watcher.list_all_publish() - def list_active_publish(self): + return watcher.list_all_mapupdate_statuses() + def list_all_publish_statuses(self): watcher = self.getServiceNamed("mutable-watcher") - return watcher.list_active_publish() - def list_recent_publish(self): + return watcher.list_all_publish_statuses() + def list_all_retrieve_statuses(self): watcher = self.getServiceNamed("mutable-watcher") - return watcher.list_recent_publish() + return watcher.list_all_retrieve_statuses() - def list_all_retrieve(self): - watcher = self.getServiceNamed("mutable-watcher") - return watcher.list_all_retrieve() - def list_active_retrieve(self): - watcher = self.getServiceNamed("mutable-watcher") - return watcher.list_active_retrieve() - def list_recent_retrieve(self): - watcher = self.getServiceNamed("mutable-watcher") - return watcher.list_recent_retrieve() - - def list_active_helper_statuses(self): - try: - helper = self.getServiceNamed("helper") - except KeyError: - return [] - return helper.get_active_upload_statuses() - def list_recent_helper_statuses(self): + def list_all_helper_statuses(self): try: helper = self.getServiceNamed("helper") except KeyError: return [] - return helper.get_recent_upload_statuses() + return helper.get_all_upload_statuses() diff --git a/src/allmydata/download.py b/src/allmydata/download.py index 12f94bc9..e08ef3c6 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -1045,9 +1045,10 @@ class Downloader(service.MultiService): def __init__(self, stats_provider=None): service.MultiService.__init__(self) - self._all_downloads = weakref.WeakKeyDictionary() self.stats_provider = stats_provider - self._recent_download_status = [] + self._all_downloads = weakref.WeakKeyDictionary() # for debugging + self._all_download_statuses = weakref.WeakKeyDictionary() + self._recent_download_statuses = [] def download(self, u, t): assert self.parent @@ -1067,10 +1068,7 @@ class Downloader(service.MultiService): dl = FileDownloader(self.parent, u, t) else: raise RuntimeError("I don't know how to download a %s" % u) - self._all_downloads[dl] = None - self._recent_download_status.append(dl.get_download_status()) - while len(self._recent_download_status) > self.MAX_DOWNLOAD_STATUSES: - self._recent_download_status.pop(0) + self._add_download(dl) d = dl.start() return d @@ -1082,11 +1080,14 @@ class Downloader(service.MultiService): def download_to_filehandle(self, uri, filehandle): return self.download(uri, FileHandle(filehandle)) - - def list_all_downloads(self): - return self._all_downloads.keys() - def list_active_downloads(self): - return [d.get_download_status() for d in self._all_downloads.keys() - if d.get_download_status().get_active()] - def list_recent_downloads(self): - return self._recent_download_status + def _add_download(self, downloader): + self._all_downloads[downloader] = None + s = downloader.get_download_status() + self._all_download_statuses[s] = None + self._recent_download_statuses.append(s) + while len(self._recent_download_statuses) > self.MAX_DOWNLOAD_STATUSES: + self._recent_download_statuses.pop(0) + + def list_all_download_statuses(self): + for ds in self._all_download_statuses: + yield ds diff --git a/src/allmydata/mutable/node.py b/src/allmydata/mutable/node.py index 77f05a42..5307de37 100644 --- a/src/allmydata/mutable/node.py +++ b/src/allmydata/mutable/node.py @@ -350,14 +350,6 @@ class MutableWatcher(service.MultiService): while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES: self._recent_mapupdate_status.pop(0) - def list_all_mapupdate(self): - return self._all_mapupdate_status.keys() - def list_active_mapupdate(self): - return [p for p in self._all_mapupdate_status.keys() if p.get_active()] - def list_recent_mapupdate(self): - return self._recent_mapupdate_status - - def notify_publish(self, p): self._all_publish_status[p] = None self._recent_publish_status.append(p) @@ -371,14 +363,6 @@ class MutableWatcher(service.MultiService): while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES: self._recent_publish_status.pop(0) - def list_all_publish(self): - return self._all_publish_status.keys() - def list_active_publish(self): - return [p for p in self._all_publish_status.keys() if p.get_active()] - def list_recent_publish(self): - return self._recent_publish_status - - def notify_retrieve(self, r): self._all_retrieve_status[r] = None self._recent_retrieve_status.append(r) @@ -388,9 +372,10 @@ class MutableWatcher(service.MultiService): while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES: self._recent_retrieve_status.pop(0) - def list_all_retrieve(self): + + def list_all_mapupdate_statuses(self): + return self._all_mapupdate_status.keys() + def list_all_publish_statuses(self): + return self._all_publish_status.keys() + def list_all_retrieve_statuses(self): return self._all_retrieve_status.keys() - def list_active_retrieve(self): - return [p for p in self._all_retrieve_status.keys() if p.get_active()] - def list_recent_retrieve(self): - return self._recent_retrieve_status diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py index acbe5cee..1155505c 100644 --- a/src/allmydata/mutable/publish.py +++ b/src/allmydata/mutable/publish.py @@ -109,7 +109,6 @@ class Publish: self._status.set_helper(False) self._status.set_progress(0.0) self._status.set_active(True) - self._status.set_servermap(servermap) def log(self, *args, **kwargs): if 'parent' not in kwargs: @@ -160,6 +159,7 @@ class Publish: # initial publish self._new_seqnum = 1 self._servermap = ServerMap() + self._status.set_servermap(self._servermap) self.log(format="new seqnum will be %(seqnum)d", seqnum=self._new_seqnum, level=log.NOISY) diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index 6cc2e9c4..1448e392 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -1,5 +1,5 @@ -import os, stat, time +import os, stat, time, weakref from zope.interface import implements from twisted.application import service from twisted.internet import defer @@ -490,6 +490,8 @@ class Helper(Referenceable, service.MultiService): fileutil.make_dirs(self._chk_incoming) fileutil.make_dirs(self._chk_encoding) self._active_uploads = {} + self._all_uploads = weakref.WeakKeyDictionary() # for debugging + self._all_upload_statuses = weakref.WeakKeyDictionary() self._recent_upload_statuses = [] self.stats_provider = stats_provider if stats_provider: @@ -588,6 +590,7 @@ class Helper(Referenceable, service.MultiService): incoming_file, encoding_file, r, lp) self._active_uploads[storage_index] = uh + self._add_upload(uh) return uh.start() d.addCallback(_checked) def _err(f): @@ -622,18 +625,21 @@ class Helper(Referenceable, service.MultiService): d.addCallback(_checked) return d + def _add_upload(self, uh): + self._all_uploads[uh] = None + s = uh.get_upload_status() + self._all_upload_statuses[s] = None + self._recent_upload_statuses.append(s) + while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES: + self._recent_upload_statuses.pop(0) + def upload_finished(self, storage_index, size): + # this is called with size=0 if the upload failed self.count("chk_upload_helper.encoded_bytes", size) uh = self._active_uploads[storage_index] del self._active_uploads[storage_index] s = uh.get_upload_status() s.set_active(False) - self._recent_upload_statuses.append(s) - while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES: - self._recent_upload_statuses.pop(0) - - def get_active_upload_statuses(self): - return [u.get_upload_status() for u in self._active_uploads.values()] - def get_recent_upload_statuses(self): - return self._recent_upload_statuses + def get_all_upload_statuses(self): + return self._all_upload_statuses diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 00ae1caa..4a84a654 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -1285,16 +1285,18 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase): def _got_status(res): # find an interesting upload and download to look at. LIT files # are not interesting. - for dl in self.clients[0].list_recent_downloads(): - if dl.get_size() > 200: - self._down_status = dl.get_counter() - for ul in self.clients[0].list_recent_uploads(): - if ul.get_size() > 200: - self._up_status = ul.get_counter() - rs = self.clients[0].list_recent_retrieve()[0] + for ds in self.clients[0].list_all_download_statuses(): + if ds.get_size() > 200: + self._down_status = ds.get_counter() + for us in self.clients[0].list_all_upload_statuses(): + if us.get_size() > 200: + self._up_status = us.get_counter() + rs = self.clients[0].list_all_retrieve_statuses()[0] self._retrieve_status = rs.get_counter() - ps = self.clients[0].list_recent_publish()[0] + ps = self.clients[0].list_all_publish_statuses()[0] self._publish_status = ps.get_counter() + us = self.clients[0].list_all_mapupdate_statuses()[0] + self._update_status = us.get_counter() # and that there are some upload- and download- status pages return self.GET("status/up-%d" % self._up_status) @@ -1303,8 +1305,11 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase): return self.GET("status/down-%d" % self._down_status) d.addCallback(_got_up) def _got_down(res): - return self.GET("status/publish-%d" % self._publish_status) + return self.GET("status/mapupdate-%d" % self._update_status) d.addCallback(_got_down) + def _got_update(res): + return self.GET("status/publish-%d" % self._publish_status) + d.addCallback(_got_update) def _got_publish(res): return self.GET("status/retrieve-%d" % self._retrieve_status) d.addCallback(_got_publish) diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index c9a7b96d..cdbad95b 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -8,8 +8,11 @@ from twisted.python import failure, log from allmydata import interfaces, provisioning, uri, webish, upload, download from allmydata.web import status from allmydata.util import fileutil -from allmydata.test.common import FakeDirectoryNode, FakeCHKFileNode, FakeMutableFileNode, create_chk_filenode -from allmydata.interfaces import IURI, INewDirectoryURI, IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, IMutableFileNode +from allmydata.test.common import FakeDirectoryNode, FakeCHKFileNode, \ + FakeMutableFileNode, create_chk_filenode +from allmydata.interfaces import IURI, INewDirectoryURI, \ + IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, IMutableFileNode +from allmydata.mutable import servermap, publish, retrieve # create a fake uploader/downloader, and a couple of fake dirnodes, then # create a webserver that works against them @@ -35,6 +38,9 @@ class FakeClient(service.MultiService): introducer_client = FakeIntroducerClient() _all_upload_status = [upload.UploadStatus()] _all_download_status = [download.DownloadStatus()] + _all_mapupdate_statuses = [servermap.UpdateStatus()] + _all_publish_statuses = [publish.PublishStatus()] + _all_retrieve_statuses = [retrieve.RetrieveStatus()] convergence = "some random string" def connected_to_introducer(self): @@ -72,35 +78,17 @@ class FakeClient(service.MultiService): d.addCallback(_got_data) return d - def list_all_uploads(self): - return [] - def list_all_downloads(self): - return [] - - def list_active_uploads(self): - return self._all_upload_status - def list_active_downloads(self): - return self._all_download_status - def list_active_publish(self): - return [] - def list_active_retrieve(self): - return [] - def list_active_mapupdate(self): - return [] - def list_recent_mapupdate(self): - return [] - - def list_recent_uploads(self): + def list_all_upload_statuses(self): return self._all_upload_status - def list_recent_downloads(self): + def list_all_download_statuses(self): return self._all_download_status - def list_recent_publish(self): - return [] - def list_recent_retrieve(self): - return [] - def list_active_helper_statuses(self): - return [] - def list_recent_helper_statuses(self): + def list_all_mapupdate_statuses(self): + return self._all_mapupdate_statuses + def list_all_publish_statuses(self): + return self._all_publish_statuses + def list_all_retrieve_statuses(self): + return self._all_retrieve_statuses + def list_all_helper_statuses(self): return [] class WebMixin(object): @@ -411,13 +399,19 @@ class Web(WebMixin, unittest.TestCase): return d def test_status(self): - dl_num = self.s.list_recent_downloads()[0].get_counter() - ul_num = self.s.list_recent_uploads()[0].get_counter() + dl_num = self.s.list_all_download_statuses()[0].get_counter() + ul_num = self.s.list_all_upload_statuses()[0].get_counter() + mu_num = self.s.list_all_mapupdate_statuses()[0].get_counter() + pub_num = self.s.list_all_publish_statuses()[0].get_counter() + ret_num = self.s.list_all_retrieve_statuses()[0].get_counter() d = self.GET("/status", followRedirect=True) def _check(res): self.failUnless('Upload and Download Status' in res, res) self.failUnless('"down-%d"' % dl_num in res, res) self.failUnless('"up-%d"' % ul_num in res, res) + self.failUnless('"mapupdate-%d"' % mu_num in res, res) + self.failUnless('"publish-%d"' % pub_num in res, res) + self.failUnless('"retrieve-%d"' % ret_num in res, res) d.addCallback(_check) d.addCallback(lambda res: self.GET("/status/down-%d" % dl_num)) def _check_dl(res): @@ -427,6 +421,19 @@ class Web(WebMixin, unittest.TestCase): def _check_ul(res): self.failUnless("File Upload Status" in res, res) d.addCallback(_check_ul) + d.addCallback(lambda res: self.GET("/status/mapupdate-%d" % mu_num)) + def _check_mapupdate(res): + self.failUnless("Mutable File Servermap Update Status" in res, res) + d.addCallback(_check_mapupdate) + d.addCallback(lambda res: self.GET("/status/publish-%d" % pub_num)) + def _check_publish(res): + self.failUnless("Mutable File Publish Status" in res, res) + d.addCallback(_check_publish) + d.addCallback(lambda res: self.GET("/status/retrieve-%d" % ret_num)) + def _check_retrieve(res): + self.failUnless("Mutable File Retrieve Status" in res, res) + d.addCallback(_check_retrieve) + return d def test_status_numbers(self): diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index c99270c9..714e7a53 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -1200,8 +1200,9 @@ class Uploader(service.MultiService): self._helper_furl = helper_furl self.stats_provider = stats_provider self._helper = None - self._all_uploads = weakref.WeakKeyDictionary() - self._recent_upload_status = [] + self._all_uploads = weakref.WeakKeyDictionary() # for debugging + self._all_upload_statuses = weakref.WeakKeyDictionary() + self._recent_upload_statuses = [] service.MultiService.__init__(self) def startService(self): @@ -1243,10 +1244,7 @@ class Uploader(service.MultiService): uploader = AssistedUploader(self._helper) else: uploader = self.uploader_class(self.parent) - self._all_uploads[uploader] = None - self._recent_upload_status.append(uploader.get_upload_status()) - while len(self._recent_upload_status) > self.MAX_UPLOAD_STATUSES: - self._recent_upload_status.pop(0) + self._add_upload(uploader) return uploader.start(uploadable) d.addCallback(_got_size) def _done(res): @@ -1255,10 +1253,14 @@ class Uploader(service.MultiService): d.addBoth(_done) return d - def list_all_uploads(self): - return self._all_uploads.keys() - def list_active_uploads(self): - return [u.get_upload_status() for u in self._all_uploads.keys() - if u.get_upload_status().get_active()] - def list_recent_uploads(self): - return self._recent_upload_status + def _add_upload(self, uploader): + s = uploader.get_upload_status() + self._all_uploads[uploader] = None + self._all_upload_statuses[s] = None + self._recent_upload_statuses.append(s) + while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES: + self._recent_upload_statuses.pop(0) + + def list_all_upload_statuses(self): + for us in self._all_upload_statuses: + yield us diff --git a/src/allmydata/web/status.py b/src/allmydata/web/status.py index 0b51b2b7..0cc7304a 100644 --- a/src/allmydata/web/status.py +++ b/src/allmydata/web/status.py @@ -1,5 +1,5 @@ -import time, pprint +import time, pprint, itertools import simplejson from twisted.internet import defer from nevow import rend, inevow, tags as T @@ -143,13 +143,9 @@ class UploadResultsRendererMixin(RateAndTimeMixin): d = self.upload_results() def _convert(r): file_size = r.file_size - if file_size is None: - return None time1 = r.timings.get("cumulative_encoding") - if time1 is None: - return None time2 = r.timings.get("cumulative_sending") - if time2 is None: + if (file_size is None or time1 is None or time2 is None): return None try: return 1.0 * file_size / (time1+time2) @@ -162,10 +158,8 @@ class UploadResultsRendererMixin(RateAndTimeMixin): d = self.upload_results() def _convert(r): fetch_size = r.ciphertext_fetched - if fetch_size is None: - return None time = r.timings.get("cumulative_fetch") - if time is None: + if (fetch_size is None or time is None): return None try: return 1.0 * fetch_size / time @@ -474,17 +468,11 @@ class RetrieveStatusPage(rend.Page, RateAndTimeMixin): def data_rate_total(self, ctx, data): return self._get_rate(data, "total") - def data_time_peer_selection(self, ctx, data): - return self.retrieve_status.timings.get("peer_selection") - def data_time_fetch(self, ctx, data): return self.retrieve_status.timings.get("fetch") def data_rate_fetch(self, ctx, data): return self._get_rate(data, "fetch") - def data_time_cumulative_verify(self, ctx, data): - return self.retrieve_status.timings.get("cumulative_verify") - def data_time_decode(self, ctx, data): return self.retrieve_status.timings.get("decode") def data_rate_decode(self, ctx, data): @@ -703,25 +691,27 @@ class Status(rend.Page): docFactory = getxmlfile("status.xhtml") addSlash = True + def _get_all_statuses(self, client): + return itertools.chain(client.list_all_upload_statuses(), + client.list_all_download_statuses(), + client.list_all_mapupdate_statuses(), + client.list_all_publish_statuses(), + client.list_all_retrieve_statuses(), + client.list_all_helper_statuses(), + ) + def data_active_operations(self, ctx, data): client = IClient(ctx) - active = (client.list_active_uploads() + - client.list_active_downloads() + - client.list_active_mapupdate() + - client.list_active_publish() + - client.list_active_retrieve() + - client.list_active_helper_statuses()) + active = [s + for s in self._get_all_statuses(client) + if s.get_active()] return active def data_recent_operations(self, ctx, data): client = IClient(ctx) - recent = [o for o in (client.list_recent_uploads() + - client.list_recent_downloads() + - client.list_recent_mapupdate() + - client.list_recent_publish() + - client.list_recent_retrieve() + - client.list_recent_helper_statuses()) - if not o.get_active()] + recent = [s + for s in self._get_all_statuses(client) + if not s.get_active()] recent.sort(lambda a,b: cmp(a.get_started(), b.get_started())) recent.reverse() return recent @@ -782,50 +772,26 @@ class Status(rend.Page): stype,count_s = name.split("-") count = int(count_s) if stype == "up": - for s in client.list_recent_uploads(): - if s.get_counter() == count: - return UploadStatusPage(s) - for u in client.list_all_uploads(): - # u is an uploader object - s = u.get_upload_status() + for s in itertools.chain(client.list_all_upload_statuses(), + client.list_all_helper_statuses()): + # immutable-upload helpers use the same status object as a + # regular immutable-upload if s.get_counter() == count: return UploadStatusPage(s) - for s in (client.list_active_helper_statuses() + - client.list_recent_helper_statuses()): - if s.get_counter() == count: - # immutable-upload helpers use the same status object as - # a regular immutable-upload - return UploadStatusPage(s) if stype == "down": - for s in client.list_recent_downloads(): - if s.get_counter() == count: - return DownloadStatusPage(s) - for d in client.list_all_downloads(): - s = d.get_download_status() + for s in client.list_all_download_statuses(): if s.get_counter() == count: return DownloadStatusPage(s) if stype == "mapupdate": - for s in client.list_recent_mapupdate(): - if s.get_counter() == count: - return MapupdateStatusPage(s) - for p in client.list_all_mapupdate(): - s = p.get_status() + for s in client.list_all_mapupdate_statuses(): if s.get_counter() == count: return MapupdateStatusPage(s) if stype == "publish": - for s in client.list_recent_publish(): - if s.get_counter() == count: - return PublishStatusPage(s) - for p in client.list_all_publish(): - s = p.get_status() + for s in client.list_all_publish_statuses(): if s.get_counter() == count: return PublishStatusPage(s) if stype == "retrieve": - for s in client.list_recent_retrieve(): - if s.get_counter() == count: - return RetrieveStatusPage(s) - for r in client.list_all_retrieve(): - s = r.get_status() + for s in client.list_all_retrieve_statuses(): if s.get_counter() == count: return RetrieveStatusPage(s)