from allmydata.util import hashutil, base32, testutil
from allmydata.filenode import FileNode
from allmydata.dirnode import NewDirectoryNode
-from allmydata.mutable import MutableFileNode
+from allmydata.mutable import MutableFileNode, MutableWatcher
from allmydata.stats import StatsProvider
from allmydata.interfaces import IURI, INewDirectoryURI, \
IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI
self.add_service(Uploader(helper_furl))
self.add_service(Downloader())
self.add_service(Checker())
+ self.add_service(MutableWatcher())
# ControlServer and Helper are attached after Tub startup
hotline_file = os.path.join(self.basedir,
assert IMutableFileURI.providedBy(u), u
return MutableFileNode(self).init_from_uri(u)
+ def notify_publish(self, p):
+ self.getServiceNamed("mutable-watcher").notify_publish(p)
+ def notify_retrieve(self, r):
+ self.getServiceNamed("mutable-watcher").notify_retrieve(r)
+
def create_empty_dirnode(self):
n = NewDirectoryNode(self)
d = n.create()
def list_all_uploads(self):
uploader = self.getServiceNamed("uploader")
return uploader.list_all_uploads()
-
- def list_all_downloads(self):
- downloader = self.getServiceNamed("downloader")
- return downloader.list_all_downloads()
-
-
def list_active_uploads(self):
uploader = self.getServiceNamed("uploader")
return uploader.list_active_uploads()
-
- def list_active_downloads(self):
- downloader = self.getServiceNamed("downloader")
- return downloader.list_active_downloads()
-
-
def list_recent_uploads(self):
uploader = self.getServiceNamed("uploader")
return uploader.list_recent_uploads()
+ def list_all_downloads(self):
+ downloader = self.getServiceNamed("downloader")
+ return downloader.list_all_downloads()
+ def list_active_downloads(self):
+ downloader = self.getServiceNamed("downloader")
+ return downloader.list_active_downloads()
def list_recent_downloads(self):
downloader = self.getServiceNamed("downloader")
return downloader.list_recent_downloads()
+
+ def list_all_publish(self):
+ watcher = self.getServiceNamed("mutable-watcher")
+ return watcher.list_all_publish()
+ def list_active_publish(self):
+ watcher = self.getServiceNamed("mutable-watcher")
+ return watcher.list_active_publish()
+ def list_recent_publish(self):
+ watcher = self.getServiceNamed("mutable-watcher")
+ return watcher.list_recent_publish()
+
+ def list_all_retrieve(self):
+ watcher = self.getServiceNamed("mutable-watcher")
+ return watcher.list_all_retrieve()
+ def list_active_retrieve(self):
+ watcher = self.getServiceNamed("mutable-watcher")
+ return watcher.list_active_retrieve()
+ def list_recent_retrieve(self):
+ watcher = self.getServiceNamed("mutable-watcher")
+ return watcher.list_recent_retrieve()
that number. This provides a handle to this particular download, so a
web page can generate a suitable hyperlink."""
+class IPublishStatus(Interface):
+ pass
+class IRetrieveStatus(Interface):
+ pass
class NotCapableError(Exception):
"""You have tried to write to a read-only node."""
-import os, struct
-from itertools import islice
+import os, struct, time, weakref
+from itertools import islice, count
from zope.interface import implements
from twisted.internet import defer
from twisted.python import failure
+from twisted.application import service
from foolscap.eventual import eventually
-from allmydata.interfaces import IMutableFileNode, IMutableFileURI
+from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
+ IPublishStatus, IRetrieveStatus
from allmydata.util import base32, hashutil, mathutil, idlib, log
from allmydata.uri import WriteableSSKFileURI
from allmydata import hashtree, codec, storage
return final_share
+class RetrieveStatus:
+ implements(IRetrieveStatus)
+ statusid_counter = count(0)
+ def __init__(self):
+ self.timings = {}
+ self.sharemap = None
+ self.active = True
+ self.storage_index = None
+ self.helper = False
+ self.size = None
+ self.status = "Not started"
+ self.progress = 0.0
+ self.counter = self.statusid_counter.next()
+
+ def get_storage_index(self):
+ return self.storage_index
+ def using_helper(self):
+ return self.helper
+ def get_size(self):
+ return self.size
+ def get_status(self):
+ return self.status
+ def get_progress(self):
+ return self.progress
+ def get_active(self):
+ return self.active
+ def get_counter(self):
+ return self.counter
+
+ def set_storage_index(self, si):
+ self.storage_index = si
+ def set_helper(self, helper):
+ self.helper = helper
+ def set_size(self, size):
+ self.size = size
+ def set_status(self, status):
+ self.status = status
+ def set_progress(self, value):
+ self.progress = value
+ def set_active(self, value):
+ self.active = value
+
class Retrieve:
def __init__(self, filenode):
self._node = filenode
self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
num = self._node._client.log("Retrieve(%s): starting" % prefix)
self._log_number = num
+ self._status = RetrieveStatus()
+ self._status.set_storage_index(self._storage_index)
+ self._status.set_helper(False)
+ self._status.set_progress(0.0)
+ self._status.set_active(True)
def log(self, msg, **kwargs):
prefix = self._log_prefix
def _done(self, contents):
self.log("DONE")
self._running = False
+ self._status.set_active(False)
+ self._status.set_status("Done")
+ self._status.set_progress(1.0)
+ self._status.set_size(len(contents))
eventually(self._done_deferred.callback, contents)
+ def get_status(self):
+ return self._status
class DictOfSets(dict):
else:
self[key] = set([value])
+class PublishStatus:
+ implements(IPublishStatus)
+ statusid_counter = count(0)
+ def __init__(self):
+ self.timings = {}
+ self.sharemap = None
+ self.active = True
+ self.storage_index = None
+ self.helper = False
+ self.size = None
+ self.status = "Not started"
+ self.progress = 0.0
+ self.counter = self.statusid_counter.next()
+
+ def get_storage_index(self):
+ return self.storage_index
+ def using_helper(self):
+ return self.helper
+ def get_size(self):
+ return self.size
+ def get_status(self):
+ return self.status
+ def get_progress(self):
+ return self.progress
+ def get_active(self):
+ return self.active
+ def get_counter(self):
+ return self.counter
+
+ def set_storage_index(self, si):
+ self.storage_index = si
+ def set_helper(self, helper):
+ self.helper = helper
+ def set_size(self, size):
+ self.size = size
+ def set_status(self, status):
+ self.status = status
+ def set_progress(self, value):
+ self.progress = value
+ def set_active(self, value):
+ self.active = value
+
class Publish:
"""I represent a single act of publishing the mutable file to the grid."""
self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
num = self._node._client.log("Publish(%s): starting" % prefix)
self._log_number = num
+ self._status = PublishStatus()
+ self._status.set_storage_index(self._storage_index)
+ self._status.set_helper(False)
+ self._status.set_progress(0.0)
+ self._status.set_active(True)
def log(self, *args, **kwargs):
if 'parent' not in kwargs:
# 5: when enough responses are back, we're done
self.log("starting publish, datalen is %s" % len(newdata))
+ self._started = time.time()
+ self._status.set_size(len(newdata))
self._writekey = self._node.get_writekey()
assert self._writekey, "need write capability to publish"
d.addCallback(self._send_shares, IV)
d.addCallback(self._maybe_recover)
- d.addCallback(lambda res: None)
+ d.addCallback(self._done)
return d
def _query_peers(self, total_shares):
# but dispatch_map will help us do it
raise UncoordinatedWriteError("I was surprised!")
+ def _done(self, res):
+ now = time.time()
+ self._status.timings["total"] = now - self._started
+ self._status.set_active(False)
+ self._status.set_status("Done")
+ self._status.set_progress(1.0)
+ return None
+
+ def get_status(self):
+ return self._status
+
# use client.create_mutable_file() to make one of these
def _publish(self, initial_contents):
p = self.publish_class(self)
+ self._client.notify_publish(p)
d = p.publish(initial_contents)
d.addCallback(lambda res: self)
return d
return d
def download_to_data(self):
- r = Retrieve(self)
+ r = self.retrieve_class(self)
+ self._client.notify_retrieve(r)
return r.retrieve()
def replace(self, newdata):
- r = Retrieve(self)
+ r = self.retrieve_class(self)
+ self._client.notify_retrieve(r)
d = r.retrieve()
d.addCallback(lambda res: self._publish(newdata))
return d
+
+class MutableWatcher(service.MultiService):
+ MAX_PUBLISH_STATUSES = 20
+ MAX_RETRIEVE_STATUSES = 20
+ name = "mutable-watcher"
+
+ def __init__(self):
+ service.MultiService.__init__(self)
+ self._all_publish = weakref.WeakKeyDictionary()
+ self._recent_publish_status = []
+ self._all_retrieve = weakref.WeakKeyDictionary()
+ self._recent_retrieve_status = []
+
+ def notify_publish(self, p):
+ self._all_publish[p] = None
+ self._recent_publish_status.append(p.get_status())
+ while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
+ self._recent_publish_status.pop(0)
+
+ def list_all_publish(self):
+ return self._all_publish.keys()
+ def list_active_publish(self):
+ return [p.get_status() for p in self._all_publish.keys()
+ if p.get_status().get_active()]
+ def list_recent_publish(self):
+ return self._recent_publish_status
+
+
+ def notify_retrieve(self, r):
+ self._all_retrieve[r] = None
+ self._recent_retrieve_status.append(r.get_status())
+ while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
+ self._recent_retrieve_status.pop(0)
+
+ def list_all_retrieve(self):
+ return self._all_retrieve.keys()
+ def list_active_retrieve(self):
+ return [p.get_status() for p in self._all_retrieve.keys()
+ if p.get_status().get_active()]
+ def list_recent_retrieve(self):
+ return self._recent_retrieve_status
return self._all_upload_status
def list_active_downloads(self):
return self._all_download_status
+ def list_active_publish(self):
+ return []
+ def list_active_retrieve(self):
+ return []
+
def list_recent_uploads(self):
return self._all_upload_status
def list_recent_downloads(self):
return self._all_download_status
+ def list_recent_publish(self):
+ return []
+ def list_recent_retrieve(self):
+ return []
class WebMixin(object):
ul_num = self.s.list_recent_uploads()[0].get_counter()
d = self.GET("/status", followRedirect=True)
def _check(res):
- self.failUnless('Upload and Download Status' in res)
- self.failUnless('"down-%d"' % dl_num in res)
- self.failUnless('"up-%d"' % ul_num in res)
+ self.failUnless('Upload and Download Status' in res, res)
+ self.failUnless('"down-%d"' % dl_num in res, res)
+ self.failUnless('"up-%d"' % ul_num in res, res)
d.addCallback(_check)
d.addCallback(lambda res: self.GET("/status/down-%d" % dl_num))
def _check_dl(res):
- self.failUnless("File Download Status" in res)
+ self.failUnless("File Download Status" in res, res)
d.addCallback(_check_dl)
d.addCallback(lambda res: self.GET("/status/up-%d" % ul_num))
def _check_ul(res):
- self.failUnless("File Upload Status" in res)
+ self.failUnless("File Upload Status" in res, res)
d.addCallback(_check_ul)
return d
--- /dev/null
+<html xmlns:n="http://nevow.com/ns/nevow/0.1">
+ <head>
+ <title>AllMyData - Tahoe - Mutable File Publish Status</title>
+ <!-- <link href="http://www.allmydata.com/common/css/styles.css"
+ rel="stylesheet" type="text/css"/> -->
+ <link href="/webform_css" rel="stylesheet" type="text/css"/>
+ <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+ </head>
+ <body>
+
+<h1>Mutable File Publish Status</h1>
+
+<ul>
+ <li>Storage Index: <span n:render="si"/></li>
+ <li>Helper?: <span n:render="helper"/></li>
+ <li>Current Size: <span n:render="current_size"/></li>
+ <li>Progress: <span n:render="progress"/></li>
+ <li>Status: <span n:render="status"/></li>
+</ul>
+
+</body></html>
--- /dev/null
+<html xmlns:n="http://nevow.com/ns/nevow/0.1">
+ <head>
+ <title>AllMyData - Tahoe - Mutable File Retrieve Status</title>
+ <!-- <link href="http://www.allmydata.com/common/css/styles.css"
+ rel="stylesheet" type="text/css"/> -->
+ <link href="/webform_css" rel="stylesheet" type="text/css"/>
+ <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+ </head>
+ <body>
+
+<h1>Mutable File Retrieve Status</h1>
+
+<ul>
+ <li>Storage Index: <span n:render="si"/></li>
+ <li>Helper?: <span n:render="helper"/></li>
+ <li>Current Size: <span n:render="current_size"/></li>
+ <li>Progress: <span n:render="progress"/></li>
+ <li>Status: <span n:render="status"/></li>
+</ul>
+
+</body></html>
<h1>Upload and Download Status</h1>
-<h2>Active Uploads:</h2>
-<table n:render="sequence" n:data="active_uploads" border="1">
- <tr n:pattern="header">
- <td>Storage Index</td>
- <td>Helper?</td>
- <td>Total Size</td>
- <td>Progress (Hash)</td>
- <td>Progress (Ciphertext)</td>
- <td>Progress (Encode+Push)</td>
- <td>Status</td>
- </tr>
- <tr n:pattern="item" n:render="row_upload">
- <td><n:slot name="si"/></td>
- <td><n:slot name="helper"/></td>
- <td><n:slot name="total_size"/></td>
- <td><n:slot name="progress_hash"/></td>
- <td><n:slot name="progress_ciphertext"/></td>
- <td><n:slot name="progress_encode"/></td>
- <td><n:slot name="status"/></td>
- </tr>
- <tr n:pattern="empty"><td>No active uploads!</td></tr>
-</table>
-
-<h2>Active Downloads:</h2>
-<table n:render="sequence" n:data="active_downloads" border="1">
+<h2>Active Operations:</h2>
+<table n:render="sequence" n:data="active_operations" border="1">
<tr n:pattern="header">
+ <td>Type</td>
<td>Storage Index</td>
<td>Helper?</td>
<td>Total Size</td>
<td>Progress</td>
<td>Status</td>
</tr>
- <tr n:pattern="item" n:render="row_download">
+ <tr n:pattern="item" n:render="row">
+ <td><n:slot name="type"/></td>
<td><n:slot name="si"/></td>
<td><n:slot name="helper"/></td>
<td><n:slot name="total_size"/></td>
<td><n:slot name="progress"/></td>
<td><n:slot name="status"/></td>
</tr>
- <tr n:pattern="empty"><td>No active downloads!</td></tr>
+ <tr n:pattern="empty"><td>No active operations!</td></tr>
</table>
-<h2>Recent Uploads:</h2>
-<table n:render="sequence" n:data="recent_uploads" border="1">
- <tr n:pattern="header">
- <td>Storage Index</td>
- <td>Helper?</td>
- <td>Total Size</td>
- <td>Progress (Hash)</td>
- <td>Progress (Ciphertext)</td>
- <td>Progress (Encode+Push)</td>
- <td>Status</td>
- </tr>
- <tr n:pattern="item" n:render="row_upload">
- <td><n:slot name="si"/></td>
- <td><n:slot name="helper"/></td>
- <td><n:slot name="total_size"/></td>
- <td><n:slot name="progress_hash"/></td>
- <td><n:slot name="progress_ciphertext"/></td>
- <td><n:slot name="progress_encode"/></td>
- <td><n:slot name="status"/></td>
- </tr>
- <tr n:pattern="empty"><td>No recent uploads!</td></tr>
-</table>
-
-<h2>Recent Downloads:</h2>
-<table n:render="sequence" n:data="recent_downloads" border="1">
+<h2>Recent Operations:</h2>
+<table n:render="sequence" n:data="recent_operations" border="1">
<tr n:pattern="header">
+ <td>Type</td>
<td>Storage Index</td>
<td>Helper?</td>
<td>Total Size</td>
<td>Progress</td>
<td>Status</td>
</tr>
- <tr n:pattern="item" n:render="row_download">
+ <tr n:pattern="item" n:render="row">
+ <td><n:slot name="type"/></td>
<td><n:slot name="si"/></td>
<td><n:slot name="helper"/></td>
<td><n:slot name="total_size"/></td>
<td><n:slot name="progress"/></td>
<td><n:slot name="status"/></td>
</tr>
- <tr n:pattern="empty"><td>No recent downloads!</td></tr>
+ <tr n:pattern="empty"><td>No recent operations!</td></tr>
</table>
<div>Return to the <a href="/">Welcome Page</a></div>
from allmydata.util import base32, fileutil, idlib, observer, log
import simplejson
from allmydata.interfaces import IDownloadTarget, IDirectoryNode, IFileNode, \
- IMutableFileNode, IUploadStatus, IDownloadStatus
+ IMutableFileNode, IUploadStatus, IDownloadStatus, IPublishStatus, \
+ IRetrieveStatus
import allmydata # to display import path
from allmydata import download
from allmydata.upload import FileHandle, FileName
def render_status(self, ctx, data):
return data.get_status()
+class RetrieveStatusPage(rend.Page):
+ docFactory = getxmlfile("retrieve-status.xhtml")
+
+ def render_si(self, ctx, data):
+ si_s = base32.b2a_or_none(data.get_storage_index())
+ if si_s is None:
+ si_s = "(None)"
+ return si_s
+
+ def render_helper(self, ctx, data):
+ return {True: "Yes",
+ False: "No"}[data.using_helper()]
+
+ def render_current_size(self, ctx, data):
+ size = data.get_size()
+ if size is None:
+ size = "(unknown)"
+ return size
+
+ def render_progress(self, ctx, data):
+ progress = data.get_progress()
+ # TODO: make an ascii-art bar
+ return "%.1f%%" % (100.0 * progress)
+
+ def render_status(self, ctx, data):
+ return data.get_status()
+
+class PublishStatusPage(rend.Page):
+ docFactory = getxmlfile("publish-status.xhtml")
+
+ def render_si(self, ctx, data):
+ si_s = base32.b2a_or_none(data.get_storage_index())
+ if si_s is None:
+ si_s = "(None)"
+ return si_s
+
+ def render_helper(self, ctx, data):
+ return {True: "Yes",
+ False: "No"}[data.using_helper()]
+
+ def render_current_size(self, ctx, data):
+ size = data.get_size()
+ if size is None:
+ size = "(unknown)"
+ return size
+
+ def render_progress(self, ctx, data):
+ progress = data.get_progress()
+ # TODO: make an ascii-art bar
+ return "%.1f%%" % (100.0 * progress)
+
+ def render_status(self, ctx, data):
+ return data.get_status()
+
class Status(rend.Page):
docFactory = getxmlfile("status.xhtml")
addSlash = True
- def data_active_uploads(self, ctx, data):
- return [u for u in IClient(ctx).list_active_uploads()]
- def data_active_downloads(self, ctx, data):
- return [d for d in IClient(ctx).list_active_downloads()]
- def data_recent_uploads(self, ctx, data):
- return [u for u in IClient(ctx).list_recent_uploads()
- if not u.get_active()]
- def data_recent_downloads(self, ctx, data):
- return [d for d in IClient(ctx).list_recent_downloads()
- if not d.get_active()]
+ def data_active_operations(self, ctx, data):
+ active = (IClient(ctx).list_active_uploads() +
+ IClient(ctx).list_active_downloads() +
+ IClient(ctx).list_active_publish() +
+ IClient(ctx).list_active_retrieve())
+ return active
+
+ def data_recent_operations(self, ctx, data):
+ recent = [o for o in (IClient(ctx).list_recent_uploads() +
+ IClient(ctx).list_recent_downloads() +
+ IClient(ctx).list_recent_publish() +
+ IClient(ctx).list_recent_retrieve())
+ if not o.get_active()]
+ return recent
+
+ def render_row(self, ctx, data):
+ s = data
+ si_s = base32.b2a_or_none(s.get_storage_index())
+ if si_s is None:
+ si_s = "(None)"
+ ctx.fillSlots("si", si_s)
+ ctx.fillSlots("helper", {True: "Yes",
+ False: "No"}[s.using_helper()])
+
+ size = s.get_size()
+ if size is None:
+ size = "(unknown)"
+ ctx.fillSlots("total_size", size)
+
+ progress = data.get_progress()
+ if IUploadStatus.providedBy(data):
+ link = "up-%d" % data.get_counter()
+ ctx.fillSlots("type", "upload")
+ # TODO: make an ascii-art bar
+ (chk, ciphertext, encandpush) = progress
+ progress_s = ("hash: %.1f%%, ciphertext: %.1f%%, encode: %.1f%%" %
+ ( (100.0 * chk),
+ (100.0 * ciphertext),
+ (100.0 * encandpush) ))
+ ctx.fillSlots("progress", progress_s)
+ elif IDownloadStatus.providedBy(data):
+ link = "down-%d" % data.get_counter()
+ ctx.fillSlots("type", "download")
+ ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress))
+ elif IPublishStatus.providedBy(data):
+ link = "publish-%d" % data.get_counter()
+ ctx.fillSlots("type", "publish")
+ ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress))
+ else:
+ assert IRetrieveStatus.providedBy(data)
+ ctx.fillSlots("type", "retrieve")
+ link = "retrieve-%d" % data.get_counter()
+ ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress))
+ ctx.fillSlots("status", T.a(href=link)[s.get_status()])
+ return ctx.tag
def childFactory(self, ctx, name):
client = IClient(ctx)
for s in client.list_all_downloads():
if s.get_counter() == count:
return DownloadStatusPage(s)
-
- def _render_common(self, ctx, data):
- s = data
- si_s = base32.b2a_or_none(s.get_storage_index())
- if si_s is None:
- si_s = "(None)"
- ctx.fillSlots("si", si_s)
- ctx.fillSlots("helper", {True: "Yes",
- False: "No"}[s.using_helper()])
- size = s.get_size()
- if size is None:
- size = "(unknown)"
- ctx.fillSlots("total_size", size)
- if IUploadStatus.providedBy(data):
- link = "up-%d" % data.get_counter()
- else:
- assert IDownloadStatus.providedBy(data)
- link = "down-%d" % data.get_counter()
- ctx.fillSlots("status", T.a(href=link)[s.get_status()])
-
- def render_row_upload(self, ctx, data):
- self._render_common(ctx, data)
- (chk, ciphertext, encandpush) = data.get_progress()
- # TODO: make an ascii-art bar
- ctx.fillSlots("progress_hash", "%.1f%%" % (100.0 * chk))
- ctx.fillSlots("progress_ciphertext", "%.1f%%" % (100.0 * ciphertext))
- ctx.fillSlots("progress_encode", "%.1f%%" % (100.0 * encandpush))
- return ctx.tag
-
- def render_row_download(self, ctx, data):
- self._render_common(ctx, data)
- progress = data.get_progress()
- # TODO: make an ascii-art bar
- ctx.fillSlots("progress", "%.1f%%" % (100.0 * progress))
- return ctx.tag
+ if stype == "publish":
+ for s in client.list_recent_publish():
+ if s.get_counter() == count:
+ return PublishStatusPage(s)
+ for s in client.list_all_publish():
+ if s.get_counter() == count:
+ return PublishStatusPage(s)
+ if stype == "retrieve":
+ for s in client.list_recent_retrieve():
+ if s.get_counter() == count:
+ return RetrieveStatusPage(s)
+ for s in client.list_all_retrieve():
+ if s.get_counter() == count:
+ return RetrieveStatusPage(s)
class Root(rend.Page):