From: meejah <>
Date: Thu, 12 Nov 2015 23:16:28 +0000 (-0700)
Subject: Flesh out "tahoe magic-folder status" command

Flesh out "tahoe magic-folder status" command


 - a JSON endpoint
 - CLI to display information
 - QueuedItem + IQueuedItem for uploader/downloader
 - IProgress interface + PercentProgress implementation
 - progress= args to many upload/download APIs

diff --git a/src/allmydata/ b/src/allmydata/
index 9652c702..f0af41df 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -130,7 +130,7 @@ class ProhibitedNode:
     def get_best_readable_version(self):
         raise FileProhibited(self.reason)
-    def download_best_version(self):
+    def download_best_version(self, progress=None):
         raise FileProhibited(self.reason)
     def get_best_mutable_version(self):
diff --git a/src/allmydata/ b/src/allmydata/
index 9a0a5de8..a919d9a1 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -515,6 +515,7 @@ class Client(node.Node, pollmixin.PollMixin):
             from allmydata.frontends import magic_folder
             umask = self.get_config("magic_folder", "download.umask", 0077)
             s = magic_folder.MagicFolder(self, upload_dircap, collective_dircap, local_dir, dbfile, umask)
+            self._magic_folder = s
diff --git a/src/allmydata/ b/src/allmydata/
index 5fddec41..4c17c663 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -588,7 +588,7 @@ class DirectoryNode:
         return d
-    def add_file(self, namex, uploadable, metadata=None, overwrite=True):
+    def add_file(self, namex, uploadable, metadata=None, overwrite=True, progress=None):
         """I upload a file (using the given IUploadable), then attach the
         resulting FileNode to the directory at the given name. I return a
         Deferred that fires (with the IFileNode of the uploaded file) when
@@ -596,7 +596,7 @@ class DirectoryNode:
         name = normalize(namex)
         if self.is_readonly():
-        d = self._uploader.upload(uploadable)
+        d = self._uploader.upload(uploadable, progress=progress)
         d.addCallback(lambda results:
                       self._create_and_validate_node(results.get_uri(), None,
diff --git a/src/allmydata/frontends/ b/src/allmydata/frontends/
index afe26081..8a2b07e1 100644
--- a/src/allmydata/frontends/
+++ b/src/allmydata/frontends/
@@ -15,6 +15,7 @@ from allmydata.util import log
 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
 from allmydata.util.assertutil import precondition, _assert
 from allmydata.util.deferredutil import HookMixin
+from allmydata.util.progress import PercentProgress
 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
      extend_filepath, unicode_from_filepath, unicode_segments_from, \
      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
@@ -126,10 +127,21 @@ class QueueMixin(HookMixin):
                                  % quote_local_unicode_path(self._local_path_u))
         self._deque = deque()
+        # do we also want to bound on "maximum age"?
+        self._process_history = deque(maxlen=10)
         self._lazy_tail = defer.succeed(None)
         self._stopped = False
         self._turn_delay = 0
+    def get_status(self):
+        """
+        Returns an iterable of instances that implement IQueuedItem
+        """
+        for item in self._deque:
+            yield item
+        for item in self._process_history:
+            yield item
     def _get_filepath(self, relpath_u):
         self._log("_get_filepath(%r)" % (relpath_u,))
         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
@@ -162,8 +174,10 @@ class QueueMixin(HookMixin):
-                item = self._deque.pop()
-                self._log("popped %r" % (item,))
+                item = IQueuedItem(self._deque.pop())
+                self._process_history.append(item)
+                self._log("popped %r, now have %d" % (item, len(self._deque)))
                 self._count('objects_queued', -1)
             except IndexError:
                 self._log("deque is now empty")
@@ -177,6 +191,7 @@ class QueueMixin(HookMixin):
                 self._lazy_tail.addBoth(self._logcb, "got past _process")
                 self._lazy_tail.addBoth(self._call_hook, 'processed', async=True)
                 self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
+                self._lazy_tail.addErrback(log.err)
                 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
                 self._lazy_tail.addBoth(self._logcb, "got past deferLater")
         except Exception as e:
@@ -184,6 +199,44 @@ class QueueMixin(HookMixin):
+from zope.interface import Interface, implementer
+class IQueuedItem(Interface):
+    pass
+class QueuedItem(object):
+    def __init__(self, relpath_u, progress):
+        self.relpath_u = relpath_u
+        self.progress = progress
+        self._status_history = dict()
+    def set_status(self, status, current_time=None):
+        if current_time is None:
+            current_time = time.time()
+        self._status_history[status] = current_time
+    def status_time(self, state):
+        """
+        Returns None if there's no status-update for 'state', else returns
+        the timestamp when that state was reached.
+        """
+        return self._status_history.get(state, None)
+    def status_history(self):
+        """
+        Returns a list of 2-tuples of (state, timestamp) sorted by timestamp
+        """
+        hist = self._status_history.items()
+        hist.sort(lambda a, b: cmp(a[1], b[1]))
+        return hist
+class UploadItem(QueuedItem):
+    pass
 class Uploader(QueueMixin):
     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
@@ -256,7 +309,12 @@ class Uploader(QueueMixin):
     def _extend_queue_and_keep_going(self, relpaths_u):
         self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
-        self._deque.extend(relpaths_u)
+        for relpath_u in relpaths_u:
+            progress = PercentProgress()
+            item = UploadItem(relpath_u, progress)
+            item.set_status('queued', self._clock.seconds())
+            self._deque.append(item)
         self._count('objects_queued', len(relpaths_u))
         if self.is_ready:
@@ -273,7 +331,7 @@ class Uploader(QueueMixin):
     def _add_pending(self, relpath_u):
-        self._log("add pending %r" % (relpath_u,))        
+        self._log("add pending %r" % (relpath_u,))
         if not magicpath.should_ignore_file(relpath_u):
@@ -327,10 +385,14 @@ class Uploader(QueueMixin):
     def _when_queue_is_empty(self):
         return defer.succeed(None)
-    def _process(self, relpath_u):
+    def _process(self, item):
         # Uploader
+        relpath_u = item.relpath_u
         self._log("_process(%r)" % (relpath_u,))
+        item.set_status('started', self._clock.seconds())
         if relpath_u is None:
+            item.set_status('invalid_path', self._clock.seconds())
         precondition(isinstance(relpath_u, unicode), relpath_u)
         precondition(not relpath_u.endswith(u'/'), relpath_u)
@@ -374,8 +436,12 @@ class Uploader(QueueMixin):
                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
                 empty_uploadable = Data("", self._client.convergence)
-                d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
-                                                   metadata=metadata, overwrite=True)
+                d2 = self._upload_dirnode.add_file(
+                    encoded_path_u, empty_uploadable,
+                    metadata=metadata,
+                    overwrite=True,
+                    progress=item.progress,
+                )
                 def _add_db_entry(filenode):
                     filecap = filenode.get_uri()
@@ -397,7 +463,12 @@ class Uploader(QueueMixin):
                 uploadable = Data("", self._client.convergence)
                 encoded_path_u += magicpath.path2magic(u"/")
                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
-                upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
+                upload_d = self._upload_dirnode.add_file(
+                    encoded_path_u, uploadable,
+                    metadata={"version": 0},
+                    overwrite=True,
+                    progress=item.progress,
+                )
                 def _dir_succeeded(ign):
                     self._log("created subdirectory %r" % (relpath_u,))
@@ -428,8 +499,12 @@ class Uploader(QueueMixin):
                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
-                d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
-                                                   metadata=metadata, overwrite=True)
+                d2 = self._upload_dirnode.add_file(
+                    encoded_path_u, uploadable,
+                    metadata=metadata,
+                    overwrite=True,
+                    progress=item.progress,
+                )
                 def _add_db_entry(filenode):
                     filecap = filenode.get_uri()
@@ -448,10 +523,12 @@ class Uploader(QueueMixin):
         def _succeeded(res):
+            item.set_status('success', self._clock.seconds())
             return res
         def _failed(f):
             self._log("%s while processing %r" % (f, relpath_u))
+            item.set_status('failure', self._clock.seconds())
             return f
         d.addCallbacks(_succeeded, _failed)
         return d
@@ -540,6 +617,13 @@ class WriteFileMixin(object):
         return abspath_u
+class DownloadItem(QueuedItem):
+    def __init__(self, relpath_u, progress, filenode, metadata):
+        super(DownloadItem, self).__init__(relpath_u, progress)
+        self.file_node = filenode
+        self.metadata = metadata
 class Downloader(QueueMixin, WriteFileMixin):
     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
@@ -561,6 +645,7 @@ class Downloader(QueueMixin, WriteFileMixin):
     def start_downloading(self):
+        self._turn_delay = self.REMOTE_SCAN_INTERVAL
         files = self._db.get_all_relpaths()
         self._log("all files %s" % files)
@@ -685,7 +770,14 @@ class Downloader(QueueMixin, WriteFileMixin):
                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
                 if self._should_download(relpath_u, metadata['version']):
-                    self._deque.append( (relpath_u, file_node, metadata) )
+                    to_dl = DownloadItem(
+                        relpath_u,
+                        PercentProgress(file_node.get_size()),
+                        file_node,
+                        metadata,
+                    )
+                    to_dl.set_status('queued', self._clock.seconds())
+                    self._deque.append(to_dl)
                     self._log("Excluding %r" % (relpath_u,))
                     self._call_hook(None, 'processed', async=True)
@@ -703,42 +795,49 @@ class Downloader(QueueMixin, WriteFileMixin):
     def _process(self, item, now=None):
         # Downloader
         self._log("_process(%r)" % (item,))
-        if now is None:
-            now = time.time()
-        (relpath_u, file_node, metadata) = item
-        fp = self._get_filepath(relpath_u)
+        if now is None:  # XXX why can we pass in now?
+            now = time.time()  # self._clock.seconds()
+        self._log("started! %s" % (now,))
+        item.set_status('started', now)
+        fp = self._get_filepath(item.relpath_u)
         abspath_u = unicode_from_filepath(fp)
         conflict_path_u = self._get_conflicted_filename(abspath_u)
         d = defer.succeed(None)
         def do_update_db(written_abspath_u):
-            filecap = file_node.get_uri()
-            last_uploaded_uri = metadata.get('last_uploaded_uri', None)
+            filecap = item.file_node.get_uri()
+            last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
             last_downloaded_uri = filecap
             last_downloaded_timestamp = now
             written_pathinfo = get_pathinfo(written_abspath_u)
-            if not written_pathinfo.exists and not metadata.get('deleted', False):
+            if not written_pathinfo.exists and not item.metadata.get('deleted', False):
                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
-            self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
-                                        last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
+            self._db.did_upload_version(
+                item.relpath_u, item.metadata['version'], last_uploaded_uri,
+                last_downloaded_uri, last_downloaded_timestamp, written_pathinfo,
+            )
+            item.set_status('success', self._clock.seconds())
         def failed(f):
+            item.set_status('failure', self._clock.seconds())
             self._log("download failed: %s" % (str(f),))
             return f
         if os.path.isfile(conflict_path_u):
             def fail(res):
-                raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
+                raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,))
             is_conflict = False
-            db_entry = self._db.get_db_entry(relpath_u)
-            dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
-            dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
+            db_entry = self._db.get_db_entry(item.relpath_u)
+            dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None)
+            dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
             if db_entry:
                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
@@ -747,22 +846,22 @@ class Downloader(QueueMixin, WriteFileMixin):
                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
                     is_conflict = True
-                elif self._is_upload_pending(relpath_u):
+                elif self._is_upload_pending(item.relpath_u):
                     is_conflict = True
-            if relpath_u.endswith(u"/"):
-                if metadata.get('deleted', False):
+            if item.relpath_u.endswith(u"/"):
+                if item.metadata.get('deleted', False):
                     self._log("rmdir(%r) ignored" % (abspath_u,))
                     self._log("mkdir(%r)" % (abspath_u,))
                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
                     d.addCallback(lambda ign: abspath_u)
-                if metadata.get('deleted', False):
+                if item.metadata.get('deleted', False):
                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
-                    d.addCallback(lambda ign: file_node.download_best_version())
+                    d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress))
                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
diff --git a/src/allmydata/immutable/ b/src/allmydata/immutable/
index cf308dd3..8cc32d35 100644
--- a/src/allmydata/immutable/
+++ b/src/allmydata/immutable/
@@ -74,7 +74,7 @@ PiB=1024*TiB
 class Encoder(object):
-    def __init__(self, log_parent=None, upload_status=None):
+    def __init__(self, log_parent=None, upload_status=None, progress=None):
         self.uri_extension_data = {}
         self._codec = None
@@ -86,6 +86,7 @@ class Encoder(object):
         self._log_number = log.msg("creating Encoder %s" % self,
                                    facility="tahoe.encoder", parent=log_parent)
         self._aborted = False
+        self._progress = progress
     def __repr__(self):
         if hasattr(self, "_storage_index"):
@@ -105,6 +106,8 @@ class Encoder(object):
         def _got_size(size):
             self.log(format="file size: %(size)d", size=size)
             self.file_size = size
+            if self._progress:
+                self._progress.set_progress_total(self.file_size)
         d.addCallback(lambda res: eu.get_all_encoding_parameters())
@@ -436,6 +439,7 @@ class Encoder(object):
             shareid = shareids[i]
             d = self.send_block(shareid, segnum, block, lognum)
             block_hash = hashutil.block_hash(block)
             #from allmydata.util import base32
             #log.msg("creating block (shareid=%d, blocknum=%d) "
@@ -445,6 +449,14 @@ class Encoder(object):
         dl = self._gather_responses(dl)
+        def do_progress(ign):
+            done = self.segment_size * (segnum + 1)
+            if self._progress:
+                self._progress.set_progress(done)
+            return ign
+        dl.addCallback(do_progress)
         def _logit(res):
             self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
diff --git a/src/allmydata/immutable/ b/src/allmydata/immutable/
index 1c3780ba..77997767 100644
--- a/src/allmydata/immutable/
+++ b/src/allmydata/immutable/
@@ -8,7 +8,7 @@ from twisted.internet import defer
 from allmydata import uri
 from twisted.internet.interfaces import IConsumer
 from allmydata.interfaces import IImmutableFileNode, IUploadResults
-from allmydata.util import consumer
+from allmydata.util import consumer, progress
 from allmydata.check_results import CheckResults, CheckAndRepairResults
 from allmydata.util.dictutil import DictOfSets
 from allmydata.util.happinessutil import servers_of_happiness
@@ -245,11 +245,13 @@ class ImmutableFileNode:
     # we keep it here, we should also put this on CiphertextFileNode
     def __hash__(self):
         return self.u.__hash__()
     def __eq__(self, other):
         if isinstance(other, ImmutableFileNode):
             return self.u.__eq__(other.u)
             return False
     def __ne__(self, other):
         if isinstance(other, ImmutableFileNode):
             return self.u.__eq__(other.u)
@@ -273,12 +275,16 @@ class ImmutableFileNode:
     def get_uri(self):
         return self.u.to_string()
     def get_cap(self):
         return self.u
     def get_readcap(self):
         return self.u.get_readonly()
     def get_verify_cap(self):
         return self.u.get_verify_cap()
     def get_repair_cap(self):
         # CHK files can be repaired with just the verifycap
         return self.u.get_verify_cap()
@@ -288,6 +294,7 @@ class ImmutableFileNode:
     def get_size(self):
         return self.u.get_size()
     def get_current_size(self):
         return defer.succeed(self.get_size())
@@ -305,6 +312,7 @@ class ImmutableFileNode:
     def check_and_repair(self, monitor, verify=False, add_lease=False):
         return self._cnode.check_and_repair(monitor, verify, add_lease)
     def check(self, monitor, verify=False, add_lease=False):
         return self._cnode.check(monitor, verify, add_lease)
@@ -316,14 +324,13 @@ class ImmutableFileNode:
         return defer.succeed(self)
-    def download_best_version(self):
+    def download_best_version(self, progress=None):
         Download the best version of this file, returning its contents
         as a bytestring. Since there is only one version of an immutable
         file, we download and return the contents of this file.
-        d = consumer.download_to_data(self)
+        d = consumer.download_to_data(self, progress=progress)
         return d
     # for an immutable file, download_to_data (specified in IReadable)
diff --git a/src/allmydata/immutable/ b/src/allmydata/immutable/
index dc77b4ec..b18dccb6 100644
--- a/src/allmydata/immutable/
+++ b/src/allmydata/immutable/
@@ -113,7 +113,10 @@ class LiteralFileNode(_ImmutableFileNodeBase):
         return defer.succeed(self)
-    def download_best_version(self):
+    def download_best_version(self, progress=None):
+        if progress is not None:
+            progress.set_progress_total(len(
+            progress.set_progress(len(
         return defer.succeed(
diff --git a/src/allmydata/immutable/ b/src/allmydata/immutable/
index 84390b1d..937dceae 100644
--- a/src/allmydata/immutable/
+++ b/src/allmydata/immutable/
@@ -137,7 +137,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
     def __init__(self, storage_index,
                  helper, storage_broker, secret_holder,
                  incoming_file, encoding_file,
-                 log_number):
+                 log_number, progress=None):
+        upload.CHKUploader.__init__(self, storage_broker, secret_holder, progress=progress)
         self._storage_index = storage_index
         self._helper = helper
         self._incoming_file = incoming_file
diff --git a/src/allmydata/immutable/ b/src/allmydata/immutable/
index c6324046..99168f4e 100644
--- a/src/allmydata/immutable/
+++ b/src/allmydata/immutable/
@@ -21,7 +21,7 @@ from allmydata.util.rrefutil import add_version_to_remote_reference
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
      NoServersError, InsufficientVersionError, UploadUnhappinessError, \
 from allmydata.immutable import layout
 from pycryptopp.cipher.aes import AES
@@ -623,7 +623,7 @@ class EncryptAnUploadable:
     CHUNKSIZE = 50*1024
-    def __init__(self, original, log_parent=None):
+    def __init__(self, original, log_parent=None, progress=None):
                      "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
         self.original = IUploadable(original)
@@ -636,6 +636,7 @@ class EncryptAnUploadable:
         self._file_size = None
         self._ciphertext_bytes_read = 0
         self._status = None
+        self._progress = progress
     def set_upload_status(self, upload_status):
         self._status = IUploadStatus(upload_status)
@@ -656,6 +657,8 @@ class EncryptAnUploadable:
             self._file_size = size
             if self._status:
+            if self._progress:
+                self._progress.set_progress_total(size)
             return size
         return d
@@ -894,7 +897,7 @@ class UploadStatus:
 class CHKUploader:
     server_selector_class = Tahoe2ServerSelector
-    def __init__(self, storage_broker, secret_holder):
+    def __init__(self, storage_broker, secret_holder, progress=None):
         # server_selector needs storage_broker and secret_holder
         self._storage_broker = storage_broker
         self._secret_holder = secret_holder
@@ -904,6 +907,7 @@ class CHKUploader:
         self._upload_status = UploadStatus()
+        self._progress = progress
         # locate_all_shareholders() will create the following attribute:
         # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
@@ -947,8 +951,11 @@ class CHKUploader:
         eu = IEncryptedUploadable(encrypted)
         started = time.time()
-        self._encoder = e = encode.Encoder(self._log_number,
-                                           self._upload_status)
+        self._encoder = e = encode.Encoder(
+            self._log_number,
+            self._upload_status,
+            progress=self._progress,
+        )
         d = e.set_encrypted_uploadable(eu)
         d.addCallback(self.locate_all_shareholders, started)
         d.addCallback(self.set_shareholders, e)
@@ -1073,12 +1080,13 @@ def read_this_many_bytes(uploadable, size, prepend_data=[]):
 class LiteralUploader:
-    def __init__(self):
+    def __init__(self, progress=None):
         self._status = s = UploadStatus()
         s.set_progress(0, 1.0)
+        self._progress = progress
     def start(self, uploadable):
         uploadable = IUploadable(uploadable)
@@ -1086,6 +1094,8 @@ class LiteralUploader:
         def _got_size(size):
             self._size = size
+            if self._progress:
+                self._progress.set_progress_total(size)
             return read_this_many_bytes(uploadable, size)
         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
@@ -1109,6 +1119,8 @@ class LiteralUploader:
         self._status.set_progress(1, 1.0)
         self._status.set_progress(2, 1.0)
+        if self._progress:
+            self._progress.set_progress(self._size)
         return ur
     def close(self):
@@ -1503,12 +1515,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
     name = "uploader"
-    def __init__(self, helper_furl=None, stats_provider=None, history=None):
+    def __init__(self, helper_furl=None, stats_provider=None, history=None, progress=None):
         self._helper_furl = helper_furl
         self.stats_provider = stats_provider
         self._history = history
         self._helper = None
         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
+        self._progress = progress
         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
@@ -1542,12 +1555,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
         return (self._helper_furl, bool(self._helper))
-    def upload(self, uploadable):
+    def upload(self, uploadable, progress=None):
         Returns a Deferred that will fire with the UploadResults instance.
         assert self.parent
         assert self.running
+        assert progress is None or IProgress.providedBy(progress)
         uploadable = IUploadable(uploadable)
         d = uploadable.get_size()
@@ -1556,13 +1570,15 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
             precondition(isinstance(default_params, dict), default_params)
             precondition("max_segment_size" in default_params, default_params)
+            if progress:
+                progress.set_progress_total(size)
             if self.stats_provider:
                 self.stats_provider.count('uploader.files_uploaded', 1)
                 self.stats_provider.count('uploader.bytes_uploaded', size)
             if size <= self.URI_LIT_SIZE_THRESHOLD:
-                uploader = LiteralUploader()
+                uploader = LiteralUploader(progress=progress)
                 return uploader.start(uploadable)
                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
@@ -1575,7 +1591,7 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
                     storage_broker = self.parent.get_storage_broker()
                     secret_holder = self.parent._secret_holder
-                    uploader = CHKUploader(storage_broker, secret_holder)
+                    uploader = CHKUploader(storage_broker, secret_holder, progress=progress)
                     d2.addCallback(lambda x: uploader.start(eu))
                 self._all_uploads[uploader] = None
diff --git a/src/allmydata/ b/src/allmydata/
index 31fc6144..40ef040b 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -1,5 +1,5 @@
-from zope.interface import Interface
+from zope.interface import Interface, Attribute
 from foolscap.api import StringConstraint, ListOf, TupleOf, SetOf, DictOf, \
      ChoiceOf, IntegerConstraint, Any, RemoteInterface, Referenceable
@@ -624,6 +624,38 @@ class MustNotBeUnknownRWError(CapConstraintError):
     """Cannot add an unknown child cap specified in a rw_uri field."""
+class IProgress(Interface):
+    """
+    Remembers progress measured in arbitrary units. Users of these
+    instances must call ``set_progress_total`` at least once before
+    progress can be valid, and must use the same units for both
+    ``set_progress_total`` and ``set_progress calls``.
+    See also:
+    :class:`allmydata.util.progress.PercentProgress`
+    """
+    progress = Attribute(
+        "Current amount of progress (in percentage)"
+    )
+    def set_progress(self, value):
+        """
+        Sets the current amount of progress.
+        Arbitrary units, but must match units used for
+        set_progress_total.
+        """
+    def set_progress_total(self, value):
+        """
+        Sets the total amount of expected progress
+        Arbitrary units, but must be same units as used when calling
+        set_progress() on this instance)..
+        """
 class IReadable(Interface):
     """I represent a readable object -- either an immutable file, or a
     specific version of a mutable file.
@@ -653,9 +685,12 @@ class IReadable(Interface):
     def get_size():
         """Return the length (in bytes) of this readable object."""
-    def download_to_data():
+    def download_to_data(progress=None):
         """Download all of the file contents. I return a Deferred that fires
-        with the contents as a byte string."""
+        with the contents as a byte string.
+        :param progress: None or IProgress implementer
+        """
     def read(consumer, offset=0, size=None):
         """Download a portion (possibly all) of the file's contents, making
@@ -915,11 +950,13 @@ class IFileNode(IFilesystemNode):
         the Deferred will errback with an UnrecoverableFileError.
-    def download_best_version():
+    def download_best_version(progress=None):
         """Download the contents of the version that would be returned
         by get_best_readable_version(). This is equivalent to calling
         download_to_data() on the IReadable given by that method.
+        progress is anything that implements IProgress
         I return a Deferred that fires with a byte string when the file
         has been fully downloaded. To support streaming download, use
         the 'read' method of IReadable. If no version is recoverable,
@@ -1065,7 +1102,7 @@ class IMutableFileNode(IFileNode):
         everything) to get increased visibility.
-    def upload(new_contents, servermap):
+    def upload(new_contents, servermap, progress=None):
         """Replace the contents of the file with new ones. This requires a
         servermap that was previously updated with MODE_WRITE.
@@ -1086,6 +1123,8 @@ class IMutableFileNode(IFileNode):
         operation. If I do not signal UncoordinatedWriteError, then I was
         able to write the new version without incident.
+        ``progress`` is either None or an IProgress provider
         I return a Deferred that fires (with a PublishStatus object) when the
         publish has completed. I will update the servermap in-place with the
         location of all new shares.
@@ -1276,12 +1315,14 @@ class IDirectoryNode(IFilesystemNode):
         equivalent to calling set_node() multiple times, but is much more
-    def add_file(name, uploadable, metadata=None, overwrite=True):
+    def add_file(name, uploadable, metadata=None, overwrite=True, progress=None):
         """I upload a file (using the given IUploadable), then attach the
         resulting ImmutableFileNode to the directory at the given name. I set
         metadata the same way as set_uri and set_node. The child name must be
         a unicode string.
+        ``progress`` either provides IProgress or is None
         I return a Deferred that fires (with the IFileNode of the uploaded
         file) when the operation completes."""
diff --git a/src/allmydata/ b/src/allmydata/
index 982b5fe2..661c774d 100644
--- a/src/allmydata/
+++ b/src/allmydata/
@@ -65,6 +65,7 @@ class MagicFolderDB(object):
         row = self.cursor.fetchone()
         if not row:
+            print "found nothing for", relpath_u
             return None
             (size, mtime, ctime, version, last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp) = row
diff --git a/src/allmydata/mutable/ b/src/allmydata/mutable/
index 50bbdd34..084c28f6 100644
--- a/src/allmydata/mutable/
+++ b/src/allmydata/mutable/
@@ -403,21 +403,21 @@ class MutableFileNode:
         return d.addCallback(_get_version, version)
-    def download_best_version(self):
+    def download_best_version(self, progress=None):
         I return a Deferred that fires with the contents of the best
         version of this mutable file.
-        return self._do_serialized(self._download_best_version)
+        return self._do_serialized(self._download_best_version, progress=progress)
-    def _download_best_version(self):
+    def _download_best_version(self, progress=None):
         I am the serialized sibling of download_best_version.
         d = self.get_best_readable_version()
-        d.addCallback(lambda version: version.download_to_data())
+        d.addCallback(lambda version: version.download_to_data(progress=progress))
         # It is possible that the download will fail because there
         # aren't enough shares to be had. If so, we will try again after
@@ -432,7 +432,7 @@ class MutableFileNode:
             d = self.get_best_mutable_version()
-            d.addCallback(lambda version: version.download_to_data())
+            d.addCallback(lambda version: version.download_to_data(progress=progress))
             return d
@@ -935,13 +935,13 @@ class MutableFileVersion:
         return self._servermap.size_of_version(self._version)
-    def download_to_data(self, fetch_privkey=False):
+    def download_to_data(self, fetch_privkey=False, progress=None):
         I return a Deferred that fires with the contents of this
         readable object as a byte string.
-        c = consumer.MemoryConsumer()
+        c = consumer.MemoryConsumer(progress=progress)
         d =, fetch_privkey=fetch_privkey)
         d.addCallback(lambda mc: "".join(mc.chunks))
         return d
diff --git a/src/allmydata/scripts/ b/src/allmydata/scripts/
index 7b965525..1964672b 100644
--- a/src/allmydata/scripts/
+++ b/src/allmydata/scripts/
@@ -31,6 +31,7 @@ class BadResponse(object):
     def __init__(self, url, err):
         self.status = -1
         self.reason = "Error trying to connect to %s: %s" % (url, err)
+        self.error = err
     def read(self):
         return ""
diff --git a/src/allmydata/scripts/ b/src/allmydata/scripts/
index 5d5c61dc..4c9a469b 100644
--- a/src/allmydata/scripts/
+++ b/src/allmydata/scripts/
@@ -1,7 +1,13 @@
 import os
+import urllib
+from sys import stderr
 from types import NoneType
 from cStringIO import StringIO
+from datetime import datetime
+import humanize
+import simplejson
 from twisted.python import usage
@@ -12,10 +18,13 @@ from .cli import MakeDirectoryOptions, LnOptions, CreateAliasOptions
 import tahoe_mv
 from allmydata.util.encodingutil import argv_to_abspath, argv_to_unicode, to_str, \
+from allmydata.scripts.common_http import do_http, format_http_success, \
+    format_http_error, BadResponse
 from allmydata.util import fileutil
 from allmydata.util import configutil
 from allmydata import uri
 class CreateOptions(BasedirOptions):
@@ -200,21 +209,167 @@ class StatusOptions(BasedirOptions):
     nickname = None
     synopsis = ""
     stdin = StringIO("")
     def parseArgs(self):
         node_url_file = os.path.join(self['node-directory'], u"node.url")
-        self['node-url'] = open(node_url_file, "r").read().strip()
+        with open(node_url_file, "r") as f:
+            self['node-url'] =
+def _get_json_for_fragment(options, fragment):
+    nodeurl = options['node-url']
+    if nodeurl.endswith('/'):
+        nodeurl = nodeurl[:-1]
+    url = u'%s/%s' % (nodeurl, fragment)
+    resp = do_http(method, url)
+    if isinstance(resp, BadResponse):
+        # specifically NOT using format_http_error() here because the
+        # URL is pretty sensitive (we're doing /uri/<key>).
+        raise RuntimeError(
+            "Failed to get json from '%s': %s" % (nodeurl, resp.error)
+        )
+    data =
+    parsed = simplejson.loads(data)
+    if not parsed:
+        raise RuntimeError("No data from '%s'" % (nodeurl,))
+    return parsed
+def _get_json_for_cap(options, cap):
+    return _get_json_for_fragment(
+        options,
+        'uri/%s?t=json' % urllib.quote(cap),
+    )
+def _print_item_status(item, now, longest):
+    paddedname = (' ' * (longest - len(item['path']))) + item['path']
+    if 'failure_at' in item:
+        ts = datetime.fromtimestamp(item['started_at'])
+        prog = 'Failed %s (%s)' % (humanize.naturaltime(now - ts), ts)
+    elif item['percent_done'] < 100.0:
+        if 'started_at' not in item:
+            prog = 'not yet started'
+        else:
+            so_far = now - datetime.fromtimestamp(item['started_at'])
+            if so_far.seconds > 0.0:
+                rate = item['percent_done'] / so_far.seconds
+                if rate != 0:
+                    time_left = (100.0 - item['percent_done']) / rate
+                    prog = '%2.1f%% done, around %s left' % (
+                        item['percent_done'],
+                        humanize.naturaldelta(time_left),
+                    )
+                else:
+                    time_left = None
+                    prog = '%2.1f%% done' % (item['percent_done'],)
+            else:
+                prog = 'just started'
+    else:
+        prog = ''
+        for verb in ['finished', 'started', 'queued']:
+            keyname = verb + '_at'
+            if keyname in item:
+                when = datetime.fromtimestamp(item[keyname])
+                prog = '%s %s' % (verb, humanize.naturaltime(now - when))
+                break
+    print "  %s: %s" % (paddedname, prog)
 def status(options):
-    # XXX todo: use http interface to ask about our magic-folder upload status
+    nodedir = options["node-directory"]
+    with open(os.path.join(nodedir, u"private", u"magic_folder_dircap")) as f:
+        dmd_cap =
+    with open(os.path.join(nodedir, u"private", u"collective_dircap")) as f:
+        collective_readcap =
+    try:
+        captype, dmd = _get_json_for_cap(options, dmd_cap)
+        if captype != 'dirnode':
+            print >>stderr, "magic_folder_dircap isn't a directory capability"
+            return 2
+    except RuntimeError as e:
+        print >>stderr, str(e)
+        return 1
+    now =
+    print "Local files:"
+    for (name, child) in dmd['children'].items():
+        captype, meta = child
+        status = 'good'
+        size = meta['size']
+        created = datetime.fromtimestamp(meta['metadata']['tahoe']['linkcrtime'])
+        version = meta['metadata']['version']
+        nice_size = humanize.naturalsize(size)
+        nice_created = humanize.naturaltime(now - created)
+        if captype != 'filenode':
+            print "%20s: error, should be a filecap" % name
+            continue
+        print "  %s (%s): %s, version=%s, created %s" % (name, nice_size, status, version, nice_created)
+    captype, collective = _get_json_for_cap(options, collective_readcap)
+    print
+    print "Remote files:"
+    for (name, data) in collective['children'].items():
+        if data[0] != 'dirnode':
+            print "Error: '%s': expected a dirnode, not '%s'" % (name, data[0])
+        print "  %s's remote:" % name
+        dmd = _get_json_for_cap(options, data[1]['ro_uri'])
+        if dmd[0] != 'dirnode':
+            print "Error: should be a dirnode"
+            continue
+        for (n, d) in dmd[1]['children'].items():
+            if d[0] != 'filenode':
+                print "Error: expected '%s' to be a filenode." % (n,)
+            meta = d[1]
+            status = 'good'
+            size = meta['size']
+            created = datetime.fromtimestamp(meta['metadata']['tahoe']['linkcrtime'])
+            version = meta['metadata']['version']
+            nice_size = humanize.naturalsize(size)
+            nice_created = humanize.naturaltime(now - created)
+            print "    %s (%s): %s, version=%s, created %s" % (n, nice_size, status, version, nice_created)
+    magicdata = _get_json_for_fragment(options, 'magic_folder?t=json')
+    if len(magicdata):
+        uploads = [item for item in magicdata if item['kind'] == 'upload']
+        downloads = [item for item in magicdata if item['kind'] == 'download']
+        longest = max([len(item['path']) for item in magicdata])
+        if True: # maybe --show-completed option or something?
+            uploads = [item for item in uploads if item['status'] != 'success']
+            downloads = [item for item in downloads if item['status'] != 'success']
+        if len(uploads):
+            print
+            print "Uploads:"
+            for item in uploads:
+                _print_item_status(item, now, longest)
+        if len(downloads):
+            print
+            print "Downloads:"
+            for item in downloads:
+                _print_item_status(item, now, longest)
+        for item in magicdata:
+            if item['status'] == 'failure':
+                print "Failed:", item
     return 0
 class MagicFolderCommand(BaseOptions):
     subCommands = [
         ["create", None, CreateOptions, "Create a Magic Folder."],
         ["invite", None, InviteOptions, "Invite someone to a Magic Folder."],
         ["join", None, JoinOptions, "Join a Magic Folder."],
         ["leave", None, LeaveOptions, "Leave a Magic Folder."],
+        ["status", None, StatusOptions, "Display stutus of uploads/downloads."],
     def postOptions(self):
         if not hasattr(self, 'subOptions'):
@@ -234,6 +389,7 @@ subDispatch = {
     "invite": invite,
     "join": join,
     "leave": leave,
+    "status": status,
 def do_magic_folder(options):
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index 6bf2f974..723254b5 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -151,8 +151,8 @@ class FakeCHKFileNode:
         return defer.succeed(self)
-    def download_to_data(self):
-        return download_to_data(self)
+    def download_to_data(self, progress=None):
+        return download_to_data(self, progress=progress)
     download_best_version = download_to_data
@@ -329,11 +329,11 @@ class FakeMutableFileNode:
         return d
-    def download_best_version(self):
-        return defer.succeed(self._download_best_version())
+    def download_best_version(self, progress=None):
+        return defer.succeed(self._download_best_version(progress=progress))
-    def _download_best_version(self, ignored=None):
+    def _download_best_version(self, ignored=None, progress=None):
         if isinstance(self.my_uri, uri.LiteralFileURI):
         if self.storage_index not in self.all_contents:
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index c65114fb..65366709 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -1519,7 +1519,7 @@ class FakeMutableFile:
     def get_write_uri(self):
         return self.uri.to_string()
-    def download_best_version(self):
+    def download_best_version(self, progress=None):
         return defer.succeed(
     def get_writekey(self):
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index e862fced..d974e5de 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -1,5 +1,6 @@
 import os, sys
+import shutil
 from twisted.trial import unittest
 from twisted.internet import defer, task
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index ec329515..824f18c1 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -83,7 +83,7 @@ class FakeUploader(service.Service):
     helper_furl = None
     helper_connected = False
-    def upload(self, uploadable):
+    def upload(self, uploadable, **kw):
         d = uploadable.get_size()
         d.addCallback(lambda size:
         def _got_data(datav):
diff --git a/src/allmydata/util/ b/src/allmydata/util/
index 4128c200..a48fb59d 100644
--- a/src/allmydata/util/
+++ b/src/allmydata/util/
@@ -8,9 +8,12 @@ from twisted.internet.interfaces import IConsumer
 class MemoryConsumer:
-    def __init__(self):
+    def __init__(self, progress=None):
         self.chunks = []
         self.done = False
+        self._progress = progress
     def registerProducer(self, p, streaming):
         self.producer = p
         if streaming:
@@ -19,12 +22,19 @@ class MemoryConsumer:
             while not self.done:
     def write(self, data):
+        if self._progress is not None:
+            self._progress.set_progress(sum([len(c) for c in self.chunks]))
     def unregisterProducer(self):
         self.done = True
-def download_to_data(n, offset=0, size=None):
-    d =, offset, size)
+def download_to_data(n, offset=0, size=None, progress=None):
+    """
+    :param on_progress: if set, a single-arg callable that receives total bytes downloaded
+    """
+    d =, offset, size)
     d.addCallback(lambda mc: "".join(mc.chunks))
     return d
diff --git a/src/allmydata/util/ b/src/allmydata/util/
index 9a212378..b0526304 100644
--- a/src/allmydata/util/
+++ b/src/allmydata/util/
@@ -116,7 +116,7 @@ class HookMixin:
         hook = self._hooks[name]
         if hook is None:
-            return None
+            return res  # pass on error/result
         (d, ignore_count) = hook
         self._log("call_hook %r, ignore_count=%r" % (name, ignore_count))
diff --git a/src/allmydata/util/ b/src/allmydata/util/
new file mode 100644
index 00000000..3618c88d
--- /dev/null
+++ b/src/allmydata/util/
@@ -0,0 +1,37 @@
+Utilities relating to computing progress information.
+Ties in with the "consumer" module also
+from allmydata.interfaces import IProgress
+from zope.interface import implementer
+class PercentProgress(object):
+    """
+    Represents progress as a percentage, from 0.0 to 100.0
+    """
+    def __init__(self, total_size=None):
+        self._value = 0.0
+        self.set_progress_total(total_size)
+    def set_progress(self, value):
+        "IProgress API"
+        self._value = value
+    def set_progress_total(self, size):
+        "IProgress API"
+        if size is not None:
+            size = float(size)
+        self._total_size = size
+    @property
+    def progress(self):
+        if self._total_size is None:
+            return 0  # or 1.0?
+        if self._total_size <= 0.0:
+            return 0
+        return (self._value / self._total_size) * 100.0
diff --git a/src/allmydata/web/ b/src/allmydata/web/
new file mode 100644
index 00000000..5d2f3e5e
--- /dev/null
+++ b/src/allmydata/web/
@@ -0,0 +1,60 @@
+import simplejson
+from nevow import rend, url, tags as T
+from nevow.inevow import IRequest
+from allmydata.web.common import getxmlfile, get_arg, WebError
+class MagicFolderWebApi(rend.Page):
+    """
+    I provide the web-based API for Magic Folder status etc.
+    """
+    def __init__(self, client):
+        ##rend.Page.__init__(self, storage)
+        super(MagicFolderWebApi, self).__init__(client)
+        self.client = client
+    def _render_json(self, req):
+        req.setHeader("content-type", "application/json")
+        data = []
+        for item in self.client._magic_folder.uploader.get_status():
+            d = dict(
+                path=item.relpath_u,
+                status=item.status_history()[-1][0],
+                kind='upload',
+            )
+            for (status, ts) in item.status_history():
+                d[status + '_at'] = ts
+            d['percent_done'] = item.progress.progress
+            data.append(d)
+        for item in self.client._magic_folder.downloader.get_status():
+            d = dict(
+                path=item.relpath_u,
+                status=item.status_history()[-1][0],
+                kind='download',
+            )
+            for (status, ts) in item.status_history():
+                d[status + '_at'] = ts
+            d['percent_done'] = item.progress.progress
+            data.append(d)
+        return simplejson.dumps(data)
+    def renderHTTP(self, ctx):
+        req = IRequest(ctx)
+        t = get_arg(req, "t", None)
+        if t is None:
+            return rend.Page.renderHTTP(self, ctx)
+        t = t.strip()
+        if t == 'json':
+            return self._render_json(req)
+        raise WebError("'%s' invalid type for 't' arg" % (t,), 400)
diff --git a/src/allmydata/web/ b/src/allmydata/web/
index d8d789cf..6b284810 100644
--- a/src/allmydata/web/
+++ b/src/allmydata/web/
@@ -12,7 +12,7 @@ from allmydata import get_package_versions_string
 from allmydata.util import log
 from allmydata.interfaces import IFileNode
 from allmydata.web import filenode, directory, unlinked, status, operations
-from allmydata.web import storage
+from allmydata.web import storage, magic_folder
 from allmydata.web.common import abbreviate_size, getxmlfile, WebError, \
      get_arg, RenderMixin, get_format, get_mutable_type, render_time_delta, render_time, render_time_attr
@@ -154,6 +154,9 @@ class Root(rend.Page):
         self.child_uri = URIHandler(client)
         self.child_cap = URIHandler(client)
+        # handler for "/magic_folder" URIs
+        self.child_magic_folder = magic_folder.MagicFolderWebApi(client)
         self.child_file = FileHandler(client)
         self.child_named = FileHandler(client)
         self.child_status = status.Status(client.get_history())