From: Brian Warner Date: Tue, 4 Mar 2008 08:07:44 +0000 (-0700) Subject: webish: add primitive publish/retrieve status pages X-Git-Tag: allmydata-tahoe-0.9.0~77 X-Git-Url: https://git.rkrishnan.org/simplejson/components/%22file:/...?a=commitdiff_plain;h=68fbd89e66452847f2ebe30d5da6adf8d358fd91;p=tahoe-lafs%2Ftahoe-lafs.git webish: add primitive publish/retrieve status pages --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index c20d422b..3e5544a1 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -18,7 +18,7 @@ from allmydata.introducer import IntroducerClient from allmydata.util import hashutil, base32, testutil from allmydata.filenode import FileNode from allmydata.dirnode import NewDirectoryNode -from allmydata.mutable import MutableFileNode +from allmydata.mutable import MutableFileNode, MutableWatcher from allmydata.stats import StatsProvider from allmydata.interfaces import IURI, INewDirectoryURI, \ IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI @@ -67,6 +67,7 @@ class Client(node.Node, testutil.PollMixin): self.add_service(Uploader(helper_furl)) self.add_service(Downloader()) self.add_service(Checker()) + self.add_service(MutableWatcher()) # ControlServer and Helper are attached after Tub startup hotline_file = os.path.join(self.basedir, @@ -251,6 +252,11 @@ class Client(node.Node, testutil.PollMixin): assert IMutableFileURI.providedBy(u), u return MutableFileNode(self).init_from_uri(u) + def notify_publish(self, p): + self.getServiceNamed("mutable-watcher").notify_publish(p) + def notify_retrieve(self, r): + self.getServiceNamed("mutable-watcher").notify_retrieve(r) + def create_empty_dirnode(self): n = NewDirectoryNode(self) d = n.create() @@ -271,25 +277,39 @@ class Client(node.Node, testutil.PollMixin): def list_all_uploads(self): uploader = self.getServiceNamed("uploader") return uploader.list_all_uploads() - - def list_all_downloads(self): - downloader = self.getServiceNamed("downloader") - return downloader.list_all_downloads() - - def list_active_uploads(self): uploader = self.getServiceNamed("uploader") return uploader.list_active_uploads() - - def list_active_downloads(self): - downloader = self.getServiceNamed("downloader") - return downloader.list_active_downloads() - - def list_recent_uploads(self): uploader = self.getServiceNamed("uploader") return uploader.list_recent_uploads() + def list_all_downloads(self): + downloader = self.getServiceNamed("downloader") + return downloader.list_all_downloads() + def list_active_downloads(self): + downloader = self.getServiceNamed("downloader") + return downloader.list_active_downloads() def list_recent_downloads(self): downloader = self.getServiceNamed("downloader") return downloader.list_recent_downloads() + + def list_all_publish(self): + watcher = self.getServiceNamed("mutable-watcher") + return watcher.list_all_publish() + def list_active_publish(self): + watcher = self.getServiceNamed("mutable-watcher") + return watcher.list_active_publish() + def list_recent_publish(self): + watcher = self.getServiceNamed("mutable-watcher") + return watcher.list_recent_publish() + + def list_all_retrieve(self): + watcher = self.getServiceNamed("mutable-watcher") + return watcher.list_all_retrieve() + def list_active_retrieve(self): + watcher = self.getServiceNamed("mutable-watcher") + return watcher.list_active_retrieve() + def list_recent_retrieve(self): + watcher = self.getServiceNamed("mutable-watcher") + return watcher.list_recent_retrieve() diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 82516957..b9ff96f4 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1513,6 +1513,10 @@ class IDownloadStatus(Interface): that number. This provides a handle to this particular download, so a web page can generate a suitable hyperlink.""" +class IPublishStatus(Interface): + pass +class IRetrieveStatus(Interface): + pass class NotCapableError(Exception): """You have tried to write to a read-only node.""" diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index 01b6f5f9..20d1bb8d 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -1,11 +1,13 @@ -import os, struct -from itertools import islice +import os, struct, time, weakref +from itertools import islice, count from zope.interface import implements from twisted.internet import defer from twisted.python import failure +from twisted.application import service from foolscap.eventual import eventually -from allmydata.interfaces import IMutableFileNode, IMutableFileURI +from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \ + IPublishStatus, IRetrieveStatus from allmydata.util import base32, hashutil, mathutil, idlib, log from allmydata.uri import WriteableSSKFileURI from allmydata import hashtree, codec, storage @@ -196,6 +198,48 @@ def pack_share(prefix, verification_key, signature, return final_share +class RetrieveStatus: + implements(IRetrieveStatus) + statusid_counter = count(0) + def __init__(self): + self.timings = {} + self.sharemap = None + self.active = True + self.storage_index = None + self.helper = False + self.size = None + self.status = "Not started" + self.progress = 0.0 + self.counter = self.statusid_counter.next() + + def get_storage_index(self): + return self.storage_index + def using_helper(self): + return self.helper + def get_size(self): + return self.size + def get_status(self): + return self.status + def get_progress(self): + return self.progress + def get_active(self): + return self.active + def get_counter(self): + return self.counter + + def set_storage_index(self, si): + self.storage_index = si + def set_helper(self, helper): + self.helper = helper + def set_size(self, size): + self.size = size + def set_status(self, status): + self.status = status + def set_progress(self, value): + self.progress = value + def set_active(self, value): + self.active = value + class Retrieve: def __init__(self, filenode): self._node = filenode @@ -210,6 +254,11 @@ class Retrieve: self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5] num = self._node._client.log("Retrieve(%s): starting" % prefix) self._log_number = num + self._status = RetrieveStatus() + self._status.set_storage_index(self._storage_index) + self._status.set_helper(False) + self._status.set_progress(0.0) + self._status.set_active(True) def log(self, msg, **kwargs): prefix = self._log_prefix @@ -704,8 +753,14 @@ class Retrieve: def _done(self, contents): self.log("DONE") self._running = False + self._status.set_active(False) + self._status.set_status("Done") + self._status.set_progress(1.0) + self._status.set_size(len(contents)) eventually(self._done_deferred.callback, contents) + def get_status(self): + return self._status class DictOfSets(dict): @@ -715,6 +770,48 @@ class DictOfSets(dict): else: self[key] = set([value]) +class PublishStatus: + implements(IPublishStatus) + statusid_counter = count(0) + def __init__(self): + self.timings = {} + self.sharemap = None + self.active = True + self.storage_index = None + self.helper = False + self.size = None + self.status = "Not started" + self.progress = 0.0 + self.counter = self.statusid_counter.next() + + def get_storage_index(self): + return self.storage_index + def using_helper(self): + return self.helper + def get_size(self): + return self.size + def get_status(self): + return self.status + def get_progress(self): + return self.progress + def get_active(self): + return self.active + def get_counter(self): + return self.counter + + def set_storage_index(self, si): + self.storage_index = si + def set_helper(self, helper): + self.helper = helper + def set_size(self, size): + self.size = size + def set_status(self, status): + self.status = status + def set_progress(self, value): + self.progress = value + def set_active(self, value): + self.active = value + class Publish: """I represent a single act of publishing the mutable file to the grid.""" @@ -724,6 +821,11 @@ class Publish: self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5] num = self._node._client.log("Publish(%s): starting" % prefix) self._log_number = num + self._status = PublishStatus() + self._status.set_storage_index(self._storage_index) + self._status.set_helper(False) + self._status.set_progress(0.0) + self._status.set_active(True) def log(self, *args, **kwargs): if 'parent' not in kwargs: @@ -754,6 +856,8 @@ class Publish: # 5: when enough responses are back, we're done self.log("starting publish, datalen is %s" % len(newdata)) + self._started = time.time() + self._status.set_size(len(newdata)) self._writekey = self._node.get_writekey() assert self._writekey, "need write capability to publish" @@ -796,7 +900,7 @@ class Publish: d.addCallback(self._send_shares, IV) d.addCallback(self._maybe_recover) - d.addCallback(lambda res: None) + d.addCallback(self._done) return d def _query_peers(self, total_shares): @@ -1293,6 +1397,17 @@ class Publish: # but dispatch_map will help us do it raise UncoordinatedWriteError("I was surprised!") + def _done(self, res): + now = time.time() + self._status.timings["total"] = now - self._started + self._status.set_active(False) + self._status.set_status("Done") + self._status.set_progress(1.0) + return None + + def get_status(self): + return self._status + # use client.create_mutable_file() to make one of these @@ -1374,6 +1489,7 @@ class MutableFileNode: def _publish(self, initial_contents): p = self.publish_class(self) + self._client.notify_publish(p) d = p.publish(initial_contents) d.addCallback(lambda res: self) return d @@ -1491,11 +1607,54 @@ class MutableFileNode: return d def download_to_data(self): - r = Retrieve(self) + r = self.retrieve_class(self) + self._client.notify_retrieve(r) return r.retrieve() def replace(self, newdata): - r = Retrieve(self) + r = self.retrieve_class(self) + self._client.notify_retrieve(r) d = r.retrieve() d.addCallback(lambda res: self._publish(newdata)) return d + +class MutableWatcher(service.MultiService): + MAX_PUBLISH_STATUSES = 20 + MAX_RETRIEVE_STATUSES = 20 + name = "mutable-watcher" + + def __init__(self): + service.MultiService.__init__(self) + self._all_publish = weakref.WeakKeyDictionary() + self._recent_publish_status = [] + self._all_retrieve = weakref.WeakKeyDictionary() + self._recent_retrieve_status = [] + + def notify_publish(self, p): + self._all_publish[p] = None + self._recent_publish_status.append(p.get_status()) + while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES: + self._recent_publish_status.pop(0) + + def list_all_publish(self): + return self._all_publish.keys() + def list_active_publish(self): + return [p.get_status() for p in self._all_publish.keys() + if p.get_status().get_active()] + def list_recent_publish(self): + return self._recent_publish_status + + + def notify_retrieve(self, r): + self._all_retrieve[r] = None + self._recent_retrieve_status.append(r.get_status()) + while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES: + self._recent_retrieve_status.pop(0) + + def list_all_retrieve(self): + return self._all_retrieve.keys() + def list_active_retrieve(self): + return [p.get_status() for p in self._all_retrieve.keys() + if p.get_status().get_active()] + def list_recent_retrieve(self): + return self._recent_retrieve_status diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index bba90e7f..b3b403d5 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -79,10 +79,19 @@ class FakeClient(service.MultiService): return self._all_upload_status def list_active_downloads(self): return self._all_download_status + def list_active_publish(self): + return [] + def list_active_retrieve(self): + return [] + def list_recent_uploads(self): return self._all_upload_status def list_recent_downloads(self): return self._all_download_status + def list_recent_publish(self): + return [] + def list_recent_retrieve(self): + return [] class WebMixin(object): @@ -391,17 +400,17 @@ class Web(WebMixin, unittest.TestCase): ul_num = self.s.list_recent_uploads()[0].get_counter() d = self.GET("/status", followRedirect=True) def _check(res): - self.failUnless('Upload and Download Status' in res) - self.failUnless('"down-%d"' % dl_num in res) - self.failUnless('"up-%d"' % ul_num in res) + self.failUnless('Upload and Download Status' in res, res) + self.failUnless('"down-%d"' % dl_num in res, res) + self.failUnless('"up-%d"' % ul_num in res, res) d.addCallback(_check) d.addCallback(lambda res: self.GET("/status/down-%d" % dl_num)) def _check_dl(res): - self.failUnless("File Download Status" in res) + self.failUnless("File Download Status" in res, res) d.addCallback(_check_dl) d.addCallback(lambda res: self.GET("/status/up-%d" % ul_num)) def _check_ul(res): - self.failUnless("File Upload Status" in res) + self.failUnless("File Upload Status" in res, res) d.addCallback(_check_ul) return d diff --git a/src/allmydata/web/publish-status.xhtml b/src/allmydata/web/publish-status.xhtml new file mode 100644 index 00000000..657bde3c --- /dev/null +++ b/src/allmydata/web/publish-status.xhtml @@ -0,0 +1,21 @@ + + + AllMyData - Tahoe - Mutable File Publish Status + + + + + + +

Mutable File Publish Status

+ + + + diff --git a/src/allmydata/web/retrieve-status.xhtml b/src/allmydata/web/retrieve-status.xhtml new file mode 100644 index 00000000..1d2bb554 --- /dev/null +++ b/src/allmydata/web/retrieve-status.xhtml @@ -0,0 +1,21 @@ + + + AllMyData - Tahoe - Mutable File Retrieve Status + + + + + + +

Mutable File Retrieve Status

+ + + + diff --git a/src/allmydata/web/status.xhtml b/src/allmydata/web/status.xhtml index 62edf5f5..f0a24247 100644 --- a/src/allmydata/web/status.xhtml +++ b/src/allmydata/web/status.xhtml @@ -11,89 +11,47 @@

Upload and Download Status

-

Active Uploads:

- - - - - - - - - - - - - - - - - - - - -
Storage IndexHelper?Total SizeProgress (Hash)Progress (Ciphertext)Progress (Encode+Push)Status
No active uploads!
- -

Active Downloads:

- +

Active Operations:

+
+ - + + - +
Type Storage Index Helper? Total Size Progress Status
No active downloads!
No active operations!
-

Recent Uploads:

- - - - - - - - - - - - - - - - - - - - -
Storage IndexHelper?Total SizeProgress (Hash)Progress (Ciphertext)Progress (Encode+Push)Status
No recent uploads!
- -

Recent Downloads:

- +

Recent Operations:

+
+ - + + - +
Type Storage Index Helper? Total Size Progress Status
No recent downloads!
No recent operations!
Return to the Welcome Page
diff --git a/src/allmydata/webish.py b/src/allmydata/webish.py index ed24496f..bdbce443 100644 --- a/src/allmydata/webish.py +++ b/src/allmydata/webish.py @@ -9,7 +9,8 @@ from nevow.static import File as nevow_File # TODO: merge with static.File? from allmydata.util import base32, fileutil, idlib, observer, log import simplejson from allmydata.interfaces import IDownloadTarget, IDirectoryNode, IFileNode, \ - IMutableFileNode, IUploadStatus, IDownloadStatus + IMutableFileNode, IUploadStatus, IDownloadStatus, IPublishStatus, \ + IRetrieveStatus import allmydata # to display import path from allmydata import download from allmydata.upload import FileHandle, FileName @@ -1864,20 +1865,119 @@ class DownloadStatusPage(DownloadResultsRendererMixin, rend.Page): def render_status(self, ctx, data): return data.get_status() +class RetrieveStatusPage(rend.Page): + docFactory = getxmlfile("retrieve-status.xhtml") + + def render_si(self, ctx, data): + si_s = base32.b2a_or_none(data.get_storage_index()) + if si_s is None: + si_s = "(None)" + return si_s + + def render_helper(self, ctx, data): + return {True: "Yes", + False: "No"}[data.using_helper()] + + def render_current_size(self, ctx, data): + size = data.get_size() + if size is None: + size = "(unknown)" + return size + + def render_progress(self, ctx, data): + progress = data.get_progress() + # TODO: make an ascii-art bar + return "%.1f%%" % (100.0 * progress) + + def render_status(self, ctx, data): + return data.get_status() + +class PublishStatusPage(rend.Page): + docFactory = getxmlfile("publish-status.xhtml") + + def render_si(self, ctx, data): + si_s = base32.b2a_or_none(data.get_storage_index()) + if si_s is None: + si_s = "(None)" + return si_s + + def render_helper(self, ctx, data): + return {True: "Yes", + False: "No"}[data.using_helper()] + + def render_current_size(self, ctx, data): + size = data.get_size() + if size is None: + size = "(unknown)" + return size + + def render_progress(self, ctx, data): + progress = data.get_progress() + # TODO: make an ascii-art bar + return "%.1f%%" % (100.0 * progress) + + def render_status(self, ctx, data): + return data.get_status() + class Status(rend.Page): docFactory = getxmlfile("status.xhtml") addSlash = True - def data_active_uploads(self, ctx, data): - return [u for u in IClient(ctx).list_active_uploads()] - def data_active_downloads(self, ctx, data): - return [d for d in IClient(ctx).list_active_downloads()] - def data_recent_uploads(self, ctx, data): - return [u for u in IClient(ctx).list_recent_uploads() - if not u.get_active()] - def data_recent_downloads(self, ctx, data): - return [d for d in IClient(ctx).list_recent_downloads() - if not d.get_active()] + def data_active_operations(self, ctx, data): + active = (IClient(ctx).list_active_uploads() + + IClient(ctx).list_active_downloads() + + IClient(ctx).list_active_publish() + + IClient(ctx).list_active_retrieve()) + return active + + def data_recent_operations(self, ctx, data): + recent = [o for o in (IClient(ctx).list_recent_uploads() + + IClient(ctx).list_recent_downloads() + + IClient(ctx).list_recent_publish() + + IClient(ctx).list_recent_retrieve()) + if not o.get_active()] + return recent + + def render_row(self, ctx, data): + s = data + si_s = base32.b2a_or_none(s.get_storage_index()) + if si_s is None: + si_s = "(None)" + ctx.fillSlots("si", si_s) + ctx.fillSlots("helper", {True: "Yes", + False: "No"}[s.using_helper()]) + + size = s.get_size() + if size is None: + size = "(unknown)" + ctx.fillSlots("total_size", size) + + progress = data.get_progress() + if IUploadStatus.providedBy(data): + link = "up-%d" % data.get_counter() + ctx.fillSlots("type", "upload") + # TODO: make an ascii-art bar + (chk, ciphertext, encandpush) = progress + progress_s = ("hash: %.1f%%, ciphertext: %.1f%%, encode: %.1f%%" % + ( (100.0 * chk), + (100.0 * ciphertext), + (100.0 * encandpush) )) + ctx.fillSlots("progress", progress_s) + elif IDownloadStatus.providedBy(data): + link = "down-%d" % data.get_counter() + ctx.fillSlots("type", "download") + ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress)) + elif IPublishStatus.providedBy(data): + link = "publish-%d" % data.get_counter() + ctx.fillSlots("type", "publish") + ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress)) + else: + assert IRetrieveStatus.providedBy(data) + ctx.fillSlots("type", "retrieve") + link = "retrieve-%d" % data.get_counter() + ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress)) + ctx.fillSlots("status", T.a(href=link)[s.get_status()]) + return ctx.tag def childFactory(self, ctx, name): client = IClient(ctx) @@ -1897,41 +1997,20 @@ class Status(rend.Page): for s in client.list_all_downloads(): if s.get_counter() == count: return DownloadStatusPage(s) - - def _render_common(self, ctx, data): - s = data - si_s = base32.b2a_or_none(s.get_storage_index()) - if si_s is None: - si_s = "(None)" - ctx.fillSlots("si", si_s) - ctx.fillSlots("helper", {True: "Yes", - False: "No"}[s.using_helper()]) - size = s.get_size() - if size is None: - size = "(unknown)" - ctx.fillSlots("total_size", size) - if IUploadStatus.providedBy(data): - link = "up-%d" % data.get_counter() - else: - assert IDownloadStatus.providedBy(data) - link = "down-%d" % data.get_counter() - ctx.fillSlots("status", T.a(href=link)[s.get_status()]) - - def render_row_upload(self, ctx, data): - self._render_common(ctx, data) - (chk, ciphertext, encandpush) = data.get_progress() - # TODO: make an ascii-art bar - ctx.fillSlots("progress_hash", "%.1f%%" % (100.0 * chk)) - ctx.fillSlots("progress_ciphertext", "%.1f%%" % (100.0 * ciphertext)) - ctx.fillSlots("progress_encode", "%.1f%%" % (100.0 * encandpush)) - return ctx.tag - - def render_row_download(self, ctx, data): - self._render_common(ctx, data) - progress = data.get_progress() - # TODO: make an ascii-art bar - ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress)) - return ctx.tag + if stype == "publish": + for s in client.list_recent_publish(): + if s.get_counter() == count: + return PublishStatusPage(s) + for s in client.list_all_publish(): + if s.get_counter() == count: + return PublishStatusPage(s) + if stype == "retrieve": + for s in client.list_recent_retrieve(): + if s.get_counter() == count: + return RetrieveStatusPage(s) + for s in client.list_all_retrieve(): + if s.get_counter() == count: + return RetrieveStatusPage(s) class Root(rend.Page):