]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
Flesh out "tahoe magic-folder status" command
authormeejah <meejah@meejah.ca>
Thu, 12 Nov 2015 23:16:28 +0000 (16:16 -0700)
committermeejah <meejah@meejah.ca>
Mon, 8 Feb 2016 17:12:32 +0000 (10:12 -0700)
Adds:

 - a JSON endpoint
 - CLI to display information
 - QueuedItem + IQueuedItem for uploader/downloader
 - IProgress interface + PercentProgress implementation
 - progress= args to many upload/download APIs

23 files changed:
src/allmydata/blacklist.py
src/allmydata/client.py
src/allmydata/dirnode.py
src/allmydata/frontends/magic_folder.py
src/allmydata/immutable/encode.py
src/allmydata/immutable/filenode.py
src/allmydata/immutable/literal.py
src/allmydata/immutable/offloaded.py
src/allmydata/immutable/upload.py
src/allmydata/interfaces.py
src/allmydata/magicfolderdb.py
src/allmydata/mutable/filenode.py
src/allmydata/scripts/common_http.py
src/allmydata/scripts/magic_folder_cli.py
src/allmydata/test/common.py
src/allmydata/test/test_dirnode.py
src/allmydata/test/test_magic_folder.py
src/allmydata/test/test_web.py
src/allmydata/util/consumer.py
src/allmydata/util/deferredutil.py
src/allmydata/util/progress.py [new file with mode: 0644]
src/allmydata/web/magic_folder.py [new file with mode: 0644]
src/allmydata/web/root.py

index 9652c7025f1d7b2af4abb8413f56a41454ca9ceb..f0af41df40f893f2ee9a9ffcdb55b1337f6cd93e 100644 (file)
@@ -130,7 +130,7 @@ class ProhibitedNode:
     def get_best_readable_version(self):
         raise FileProhibited(self.reason)
 
     def get_best_readable_version(self):
         raise FileProhibited(self.reason)
 
-    def download_best_version(self):
+    def download_best_version(self, progress=None):
         raise FileProhibited(self.reason)
 
     def get_best_mutable_version(self):
         raise FileProhibited(self.reason)
 
     def get_best_mutable_version(self):
index 9a0a5de8334a19ef4d8546fc3eed5258df4fcfc2..a919d9a1c8953ea88bb69663cd39f8526dbd29e3 100644 (file)
@@ -515,6 +515,7 @@ class Client(node.Node, pollmixin.PollMixin):
             from allmydata.frontends import magic_folder
             umask = self.get_config("magic_folder", "download.umask", 0077)
             s = magic_folder.MagicFolder(self, upload_dircap, collective_dircap, local_dir, dbfile, umask)
             from allmydata.frontends import magic_folder
             umask = self.get_config("magic_folder", "download.umask", 0077)
             s = magic_folder.MagicFolder(self, upload_dircap, collective_dircap, local_dir, dbfile, umask)
+            self._magic_folder = s
             s.setServiceParent(self)
             s.startService()
 
             s.setServiceParent(self)
             s.startService()
 
index 5fddec41a1a723084fbe2c82b216a4b2680028e5..4c17c663edec31a1583c0246b9c374a5c7e00d78 100644 (file)
@@ -588,7 +588,7 @@ class DirectoryNode:
         return d
 
 
         return d
 
 
-    def add_file(self, namex, uploadable, metadata=None, overwrite=True):
+    def add_file(self, namex, uploadable, metadata=None, overwrite=True, progress=None):
         """I upload a file (using the given IUploadable), then attach the
         resulting FileNode to the directory at the given name. I return a
         Deferred that fires (with the IFileNode of the uploaded file) when
         """I upload a file (using the given IUploadable), then attach the
         resulting FileNode to the directory at the given name. I return a
         Deferred that fires (with the IFileNode of the uploaded file) when
@@ -596,7 +596,7 @@ class DirectoryNode:
         name = normalize(namex)
         if self.is_readonly():
             return defer.fail(NotWriteableError())
         name = normalize(namex)
         if self.is_readonly():
             return defer.fail(NotWriteableError())
-        d = self._uploader.upload(uploadable)
+        d = self._uploader.upload(uploadable, progress=progress)
         d.addCallback(lambda results:
                       self._create_and_validate_node(results.get_uri(), None,
                                                      name))
         d.addCallback(lambda results:
                       self._create_and_validate_node(results.get_uri(), None,
                                                      name))
index afe26081dc4909c134009ef00ab6c9476be4a9dc..8a2b07e1be1633c06ffdfeeb05930c12607cdfb9 100644 (file)
@@ -15,6 +15,7 @@ from allmydata.util import log
 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
 from allmydata.util.assertutil import precondition, _assert
 from allmydata.util.deferredutil import HookMixin
 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
 from allmydata.util.assertutil import precondition, _assert
 from allmydata.util.deferredutil import HookMixin
+from allmydata.util.progress import PercentProgress
 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
      extend_filepath, unicode_from_filepath, unicode_segments_from, \
      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
      extend_filepath, unicode_from_filepath, unicode_segments_from, \
      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
@@ -126,10 +127,21 @@ class QueueMixin(HookMixin):
                                  % quote_local_unicode_path(self._local_path_u))
 
         self._deque = deque()
                                  % quote_local_unicode_path(self._local_path_u))
 
         self._deque = deque()
+        # do we also want to bound on "maximum age"?
+        self._process_history = deque(maxlen=10)
         self._lazy_tail = defer.succeed(None)
         self._stopped = False
         self._turn_delay = 0
 
         self._lazy_tail = defer.succeed(None)
         self._stopped = False
         self._turn_delay = 0
 
+    def get_status(self):
+        """
+        Returns an iterable of instances that implement IQueuedItem
+        """
+        for item in self._deque:
+            yield item
+        for item in self._process_history:
+            yield item
+
     def _get_filepath(self, relpath_u):
         self._log("_get_filepath(%r)" % (relpath_u,))
         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
     def _get_filepath(self, relpath_u):
         self._log("_get_filepath(%r)" % (relpath_u,))
         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
@@ -162,8 +174,10 @@ class QueueMixin(HookMixin):
                 self._log("stopped")
                 return
             try:
                 self._log("stopped")
                 return
             try:
-                item = self._deque.pop()
-                self._log("popped %r" % (item,))
+                item = IQueuedItem(self._deque.pop())
+                self._process_history.append(item)
+
+                self._log("popped %r, now have %d" % (item, len(self._deque)))
                 self._count('objects_queued', -1)
             except IndexError:
                 self._log("deque is now empty")
                 self._count('objects_queued', -1)
             except IndexError:
                 self._log("deque is now empty")
@@ -177,6 +191,7 @@ class QueueMixin(HookMixin):
                 self._lazy_tail.addBoth(self._logcb, "got past _process")
                 self._lazy_tail.addBoth(self._call_hook, 'processed', async=True)
                 self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
                 self._lazy_tail.addBoth(self._logcb, "got past _process")
                 self._lazy_tail.addBoth(self._call_hook, 'processed', async=True)
                 self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
+                self._lazy_tail.addErrback(log.err)
                 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
                 self._lazy_tail.addBoth(self._logcb, "got past deferLater")
         except Exception as e:
                 self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
                 self._lazy_tail.addBoth(self._logcb, "got past deferLater")
         except Exception as e:
@@ -184,6 +199,44 @@ class QueueMixin(HookMixin):
             raise
 
 
             raise
 
 
+from zope.interface import Interface, implementer
+
+class IQueuedItem(Interface):
+    pass
+
+
+@implementer(IQueuedItem)
+class QueuedItem(object):
+    def __init__(self, relpath_u, progress):
+        self.relpath_u = relpath_u
+        self.progress = progress
+        self._status_history = dict()
+
+    def set_status(self, status, current_time=None):
+        if current_time is None:
+            current_time = time.time()
+        self._status_history[status] = current_time
+
+    def status_time(self, state):
+        """
+        Returns None if there's no status-update for 'state', else returns
+        the timestamp when that state was reached.
+        """
+        return self._status_history.get(state, None)
+
+    def status_history(self):
+        """
+        Returns a list of 2-tuples of (state, timestamp) sorted by timestamp
+        """
+        hist = self._status_history.items()
+        hist.sort(lambda a, b: cmp(a[1], b[1]))
+        return hist
+
+
+class UploadItem(QueuedItem):
+    pass
+
+
 class Uploader(QueueMixin):
     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
                  immediate=False):
 class Uploader(QueueMixin):
     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
                  immediate=False):
@@ -256,7 +309,12 @@ class Uploader(QueueMixin):
 
     def _extend_queue_and_keep_going(self, relpaths_u):
         self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
 
     def _extend_queue_and_keep_going(self, relpaths_u):
         self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
-        self._deque.extend(relpaths_u)
+        for relpath_u in relpaths_u:
+            progress = PercentProgress()
+            item = UploadItem(relpath_u, progress)
+            item.set_status('queued', self._clock.seconds())
+            self._deque.append(item)
+
         self._count('objects_queued', len(relpaths_u))
 
         if self.is_ready:
         self._count('objects_queued', len(relpaths_u))
 
         if self.is_ready:
@@ -273,7 +331,7 @@ class Uploader(QueueMixin):
         self._extend_queue_and_keep_going(self._pending)
 
     def _add_pending(self, relpath_u):
         self._extend_queue_and_keep_going(self._pending)
 
     def _add_pending(self, relpath_u):
-        self._log("add pending %r" % (relpath_u,))        
+        self._log("add pending %r" % (relpath_u,))
         if not magicpath.should_ignore_file(relpath_u):
             self._pending.add(relpath_u)
 
         if not magicpath.should_ignore_file(relpath_u):
             self._pending.add(relpath_u)
 
@@ -327,10 +385,14 @@ class Uploader(QueueMixin):
     def _when_queue_is_empty(self):
         return defer.succeed(None)
 
     def _when_queue_is_empty(self):
         return defer.succeed(None)
 
-    def _process(self, relpath_u):
+    def _process(self, item):
         # Uploader
         # Uploader
+        relpath_u = item.relpath_u
         self._log("_process(%r)" % (relpath_u,))
         self._log("_process(%r)" % (relpath_u,))
+        item.set_status('started', self._clock.seconds())
+
         if relpath_u is None:
         if relpath_u is None:
+            item.set_status('invalid_path', self._clock.seconds())
             return
         precondition(isinstance(relpath_u, unicode), relpath_u)
         precondition(not relpath_u.endswith(u'/'), relpath_u)
             return
         precondition(isinstance(relpath_u, unicode), relpath_u)
         precondition(not relpath_u.endswith(u'/'), relpath_u)
@@ -374,8 +436,12 @@ class Uploader(QueueMixin):
                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
 
                 empty_uploadable = Data("", self._client.convergence)
                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
 
                 empty_uploadable = Data("", self._client.convergence)
-                d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
-                                                   metadata=metadata, overwrite=True)
+                d2 = self._upload_dirnode.add_file(
+                    encoded_path_u, empty_uploadable,
+                    metadata=metadata,
+                    overwrite=True,
+                    progress=item.progress,
+                )
 
                 def _add_db_entry(filenode):
                     filecap = filenode.get_uri()
 
                 def _add_db_entry(filenode):
                     filecap = filenode.get_uri()
@@ -397,7 +463,12 @@ class Uploader(QueueMixin):
                 uploadable = Data("", self._client.convergence)
                 encoded_path_u += magicpath.path2magic(u"/")
                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
                 uploadable = Data("", self._client.convergence)
                 encoded_path_u += magicpath.path2magic(u"/")
                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
-                upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
+                upload_d = self._upload_dirnode.add_file(
+                    encoded_path_u, uploadable,
+                    metadata={"version": 0},
+                    overwrite=True,
+                    progress=item.progress,
+                )
                 def _dir_succeeded(ign):
                     self._log("created subdirectory %r" % (relpath_u,))
                     self._count('directories_created')
                 def _dir_succeeded(ign):
                     self._log("created subdirectory %r" % (relpath_u,))
                     self._count('directories_created')
@@ -428,8 +499,12 @@ class Uploader(QueueMixin):
                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
 
                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
 
                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
-                d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
-                                                   metadata=metadata, overwrite=True)
+                d2 = self._upload_dirnode.add_file(
+                    encoded_path_u, uploadable,
+                    metadata=metadata,
+                    overwrite=True,
+                    progress=item.progress,
+                )
 
                 def _add_db_entry(filenode):
                     filecap = filenode.get_uri()
 
                 def _add_db_entry(filenode):
                     filecap = filenode.get_uri()
@@ -448,10 +523,12 @@ class Uploader(QueueMixin):
 
         def _succeeded(res):
             self._count('objects_succeeded')
 
         def _succeeded(res):
             self._count('objects_succeeded')
+            item.set_status('success', self._clock.seconds())
             return res
         def _failed(f):
             self._count('objects_failed')
             self._log("%s while processing %r" % (f, relpath_u))
             return res
         def _failed(f):
             self._count('objects_failed')
             self._log("%s while processing %r" % (f, relpath_u))
+            item.set_status('failure', self._clock.seconds())
             return f
         d.addCallbacks(_succeeded, _failed)
         return d
             return f
         d.addCallbacks(_succeeded, _failed)
         return d
@@ -540,6 +617,13 @@ class WriteFileMixin(object):
         return abspath_u
 
 
         return abspath_u
 
 
+class DownloadItem(QueuedItem):
+    def __init__(self, relpath_u, progress, filenode, metadata):
+        super(DownloadItem, self).__init__(relpath_u, progress)
+        self.file_node = filenode
+        self.metadata = metadata
+
+
 class Downloader(QueueMixin, WriteFileMixin):
     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
 
 class Downloader(QueueMixin, WriteFileMixin):
     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
 
@@ -561,6 +645,7 @@ class Downloader(QueueMixin, WriteFileMixin):
 
     def start_downloading(self):
         self._log("start_downloading")
 
     def start_downloading(self):
         self._log("start_downloading")
+        self._turn_delay = self.REMOTE_SCAN_INTERVAL
         files = self._db.get_all_relpaths()
         self._log("all files %s" % files)
 
         files = self._db.get_all_relpaths()
         self._log("all files %s" % files)
 
@@ -685,7 +770,14 @@ class Downloader(QueueMixin, WriteFileMixin):
                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
 
                 if self._should_download(relpath_u, metadata['version']):
                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
 
                 if self._should_download(relpath_u, metadata['version']):
-                    self._deque.append( (relpath_u, file_node, metadata) )
+                    to_dl = DownloadItem(
+                        relpath_u,
+                        PercentProgress(file_node.get_size()),
+                        file_node,
+                        metadata,
+                    )
+                    to_dl.set_status('queued', self._clock.seconds())
+                    self._deque.append(to_dl)
                 else:
                     self._log("Excluding %r" % (relpath_u,))
                     self._call_hook(None, 'processed', async=True)
                 else:
                     self._log("Excluding %r" % (relpath_u,))
                     self._call_hook(None, 'processed', async=True)
@@ -703,42 +795,49 @@ class Downloader(QueueMixin, WriteFileMixin):
     def _process(self, item, now=None):
         # Downloader
         self._log("_process(%r)" % (item,))
     def _process(self, item, now=None):
         # Downloader
         self._log("_process(%r)" % (item,))
-        if now is None:
-            now = time.time()
-        (relpath_u, file_node, metadata) = item
-        fp = self._get_filepath(relpath_u)
+        if now is None:  # XXX why can we pass in now?
+            now = time.time()  # self._clock.seconds()
+
+        self._log("started! %s" % (now,))
+        item.set_status('started', now)
+        fp = self._get_filepath(item.relpath_u)
         abspath_u = unicode_from_filepath(fp)
         conflict_path_u = self._get_conflicted_filename(abspath_u)
 
         d = defer.succeed(None)
 
         def do_update_db(written_abspath_u):
         abspath_u = unicode_from_filepath(fp)
         conflict_path_u = self._get_conflicted_filename(abspath_u)
 
         d = defer.succeed(None)
 
         def do_update_db(written_abspath_u):
-            filecap = file_node.get_uri()
-            last_uploaded_uri = metadata.get('last_uploaded_uri', None)
+            filecap = item.file_node.get_uri()
+            last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
             last_downloaded_uri = filecap
             last_downloaded_timestamp = now
             written_pathinfo = get_pathinfo(written_abspath_u)
 
             last_downloaded_uri = filecap
             last_downloaded_timestamp = now
             written_pathinfo = get_pathinfo(written_abspath_u)
 
-            if not written_pathinfo.exists and not metadata.get('deleted', False):
+            if not written_pathinfo.exists and not item.metadata.get('deleted', False):
                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
 
                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
 
-            self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
-                                        last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
+            self._db.did_upload_version(
+                item.relpath_u, item.metadata['version'], last_uploaded_uri,
+                last_downloaded_uri, last_downloaded_timestamp, written_pathinfo,
+            )
             self._count('objects_downloaded')
             self._count('objects_downloaded')
+            item.set_status('success', self._clock.seconds())
+
         def failed(f):
         def failed(f):
+            item.set_status('failure', self._clock.seconds())
             self._log("download failed: %s" % (str(f),))
             self._count('objects_failed')
             return f
 
         if os.path.isfile(conflict_path_u):
             def fail(res):
             self._log("download failed: %s" % (str(f),))
             self._count('objects_failed')
             return f
 
         if os.path.isfile(conflict_path_u):
             def fail(res):
-                raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
+                raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,))
             d.addCallback(fail)
         else:
             is_conflict = False
             d.addCallback(fail)
         else:
             is_conflict = False
-            db_entry = self._db.get_db_entry(relpath_u)
-            dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
-            dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
+            db_entry = self._db.get_db_entry(item.relpath_u)
+            dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None)
+            dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
             if db_entry:
                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
             if db_entry:
                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
@@ -747,22 +846,22 @@ class Downloader(QueueMixin, WriteFileMixin):
                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
                     is_conflict = True
                     self._count('objects_conflicted')
                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
                     is_conflict = True
                     self._count('objects_conflicted')
-                elif self._is_upload_pending(relpath_u):
+                elif self._is_upload_pending(item.relpath_u):
                     is_conflict = True
                     self._count('objects_conflicted')
 
                     is_conflict = True
                     self._count('objects_conflicted')
 
-            if relpath_u.endswith(u"/"):
-                if metadata.get('deleted', False):
+            if item.relpath_u.endswith(u"/"):
+                if item.metadata.get('deleted', False):
                     self._log("rmdir(%r) ignored" % (abspath_u,))
                 else:
                     self._log("mkdir(%r)" % (abspath_u,))
                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
                     d.addCallback(lambda ign: abspath_u)
             else:
                     self._log("rmdir(%r) ignored" % (abspath_u,))
                 else:
                     self._log("mkdir(%r)" % (abspath_u,))
                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
                     d.addCallback(lambda ign: abspath_u)
             else:
-                if metadata.get('deleted', False):
+                if item.metadata.get('deleted', False):
                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
                 else:
                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
                 else:
-                    d.addCallback(lambda ign: file_node.download_best_version())
+                    d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress))
                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
                                                                                is_conflict=is_conflict))
 
                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
                                                                                is_conflict=is_conflict))
 
index cf308dd3ef861171546d5eb71f42bdc46729b891..8cc32d35a436e16f412939cdb65bf7c5c40de044 100644 (file)
@@ -74,7 +74,7 @@ PiB=1024*TiB
 class Encoder(object):
     implements(IEncoder)
 
 class Encoder(object):
     implements(IEncoder)
 
-    def __init__(self, log_parent=None, upload_status=None):
+    def __init__(self, log_parent=None, upload_status=None, progress=None):
         object.__init__(self)
         self.uri_extension_data = {}
         self._codec = None
         object.__init__(self)
         self.uri_extension_data = {}
         self._codec = None
@@ -86,6 +86,7 @@ class Encoder(object):
         self._log_number = log.msg("creating Encoder %s" % self,
                                    facility="tahoe.encoder", parent=log_parent)
         self._aborted = False
         self._log_number = log.msg("creating Encoder %s" % self,
                                    facility="tahoe.encoder", parent=log_parent)
         self._aborted = False
+        self._progress = progress
 
     def __repr__(self):
         if hasattr(self, "_storage_index"):
 
     def __repr__(self):
         if hasattr(self, "_storage_index"):
@@ -105,6 +106,8 @@ class Encoder(object):
         def _got_size(size):
             self.log(format="file size: %(size)d", size=size)
             self.file_size = size
         def _got_size(size):
             self.log(format="file size: %(size)d", size=size)
             self.file_size = size
+            if self._progress:
+                self._progress.set_progress_total(self.file_size)
         d.addCallback(_got_size)
         d.addCallback(lambda res: eu.get_all_encoding_parameters())
         d.addCallback(self._got_all_encoding_parameters)
         d.addCallback(_got_size)
         d.addCallback(lambda res: eu.get_all_encoding_parameters())
         d.addCallback(self._got_all_encoding_parameters)
@@ -436,6 +439,7 @@ class Encoder(object):
             shareid = shareids[i]
             d = self.send_block(shareid, segnum, block, lognum)
             dl.append(d)
             shareid = shareids[i]
             d = self.send_block(shareid, segnum, block, lognum)
             dl.append(d)
+
             block_hash = hashutil.block_hash(block)
             #from allmydata.util import base32
             #log.msg("creating block (shareid=%d, blocknum=%d) "
             block_hash = hashutil.block_hash(block)
             #from allmydata.util import base32
             #log.msg("creating block (shareid=%d, blocknum=%d) "
@@ -445,6 +449,14 @@ class Encoder(object):
             self.block_hashes[shareid].append(block_hash)
 
         dl = self._gather_responses(dl)
             self.block_hashes[shareid].append(block_hash)
 
         dl = self._gather_responses(dl)
+
+        def do_progress(ign):
+            done = self.segment_size * (segnum + 1)
+            if self._progress:
+                self._progress.set_progress(done)
+            return ign
+        dl.addCallback(do_progress)
+
         def _logit(res):
             self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
                      (self,
         def _logit(res):
             self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
                      (self,
index 1c3780ba6f39bb8a4e9e4a063fe62f690c57ca34..779977670068dc5fe99ac7bd98a9a5310f7b5881 100644 (file)
@@ -8,7 +8,7 @@ from twisted.internet import defer
 from allmydata import uri
 from twisted.internet.interfaces import IConsumer
 from allmydata.interfaces import IImmutableFileNode, IUploadResults
 from allmydata import uri
 from twisted.internet.interfaces import IConsumer
 from allmydata.interfaces import IImmutableFileNode, IUploadResults
-from allmydata.util import consumer
+from allmydata.util import consumer, progress
 from allmydata.check_results import CheckResults, CheckAndRepairResults
 from allmydata.util.dictutil import DictOfSets
 from allmydata.util.happinessutil import servers_of_happiness
 from allmydata.check_results import CheckResults, CheckAndRepairResults
 from allmydata.util.dictutil import DictOfSets
 from allmydata.util.happinessutil import servers_of_happiness
@@ -245,11 +245,13 @@ class ImmutableFileNode:
     # we keep it here, we should also put this on CiphertextFileNode
     def __hash__(self):
         return self.u.__hash__()
     # we keep it here, we should also put this on CiphertextFileNode
     def __hash__(self):
         return self.u.__hash__()
+
     def __eq__(self, other):
         if isinstance(other, ImmutableFileNode):
             return self.u.__eq__(other.u)
         else:
             return False
     def __eq__(self, other):
         if isinstance(other, ImmutableFileNode):
             return self.u.__eq__(other.u)
         else:
             return False
+
     def __ne__(self, other):
         if isinstance(other, ImmutableFileNode):
             return self.u.__eq__(other.u)
     def __ne__(self, other):
         if isinstance(other, ImmutableFileNode):
             return self.u.__eq__(other.u)
@@ -273,12 +275,16 @@ class ImmutableFileNode:
 
     def get_uri(self):
         return self.u.to_string()
 
     def get_uri(self):
         return self.u.to_string()
+
     def get_cap(self):
         return self.u
     def get_cap(self):
         return self.u
+
     def get_readcap(self):
         return self.u.get_readonly()
     def get_readcap(self):
         return self.u.get_readonly()
+
     def get_verify_cap(self):
         return self.u.get_verify_cap()
     def get_verify_cap(self):
         return self.u.get_verify_cap()
+
     def get_repair_cap(self):
         # CHK files can be repaired with just the verifycap
         return self.u.get_verify_cap()
     def get_repair_cap(self):
         # CHK files can be repaired with just the verifycap
         return self.u.get_verify_cap()
@@ -288,6 +294,7 @@ class ImmutableFileNode:
 
     def get_size(self):
         return self.u.get_size()
 
     def get_size(self):
         return self.u.get_size()
+
     def get_current_size(self):
         return defer.succeed(self.get_size())
 
     def get_current_size(self):
         return defer.succeed(self.get_size())
 
@@ -305,6 +312,7 @@ class ImmutableFileNode:
 
     def check_and_repair(self, monitor, verify=False, add_lease=False):
         return self._cnode.check_and_repair(monitor, verify, add_lease)
 
     def check_and_repair(self, monitor, verify=False, add_lease=False):
         return self._cnode.check_and_repair(monitor, verify, add_lease)
+
     def check(self, monitor, verify=False, add_lease=False):
         return self._cnode.check(monitor, verify, add_lease)
 
     def check(self, monitor, verify=False, add_lease=False):
         return self._cnode.check(monitor, verify, add_lease)
 
@@ -316,14 +324,13 @@ class ImmutableFileNode:
         """
         return defer.succeed(self)
 
         """
         return defer.succeed(self)
 
-
-    def download_best_version(self):
+    def download_best_version(self, progress=None):
         """
         Download the best version of this file, returning its contents
         as a bytestring. Since there is only one version of an immutable
         file, we download and return the contents of this file.
         """
         """
         Download the best version of this file, returning its contents
         as a bytestring. Since there is only one version of an immutable
         file, we download and return the contents of this file.
         """
-        d = consumer.download_to_data(self)
+        d = consumer.download_to_data(self, progress=progress)
         return d
 
     # for an immutable file, download_to_data (specified in IReadable)
         return d
 
     # for an immutable file, download_to_data (specified in IReadable)
index dc77b4ec87265c40d0c2a0701a1f1ea221325783..b18dccb6c6d58f3d392c2d787d43616a8a4c9519 100644 (file)
@@ -113,7 +113,10 @@ class LiteralFileNode(_ImmutableFileNodeBase):
         return defer.succeed(self)
 
 
         return defer.succeed(self)
 
 
-    def download_best_version(self):
+    def download_best_version(self, progress=None):
+        if progress is not None:
+            progress.set_progress_total(len(self.u.data))
+            progress.set_progress(len(self.u.data))
         return defer.succeed(self.u.data)
 
 
         return defer.succeed(self.u.data)
 
 
index 84390b1d9a44515a67949e2f3222a4ba03d51169..937dceae8eaf687fbbfe03bb13104d2661282d5b 100644 (file)
@@ -137,7 +137,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
     def __init__(self, storage_index,
                  helper, storage_broker, secret_holder,
                  incoming_file, encoding_file,
     def __init__(self, storage_index,
                  helper, storage_broker, secret_holder,
                  incoming_file, encoding_file,
-                 log_number):
+                 log_number, progress=None):
+        upload.CHKUploader.__init__(self, storage_broker, secret_holder, progress=progress)
         self._storage_index = storage_index
         self._helper = helper
         self._incoming_file = incoming_file
         self._storage_index = storage_index
         self._helper = helper
         self._incoming_file = incoming_file
index c63240463cd1a510d411e8939d746fb3a5cd0b4f..99168f4e07952efe53a18a9154a554dd0115cf70 100644 (file)
@@ -21,7 +21,7 @@ from allmydata.util.rrefutil import add_version_to_remote_reference
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
      NoServersError, InsufficientVersionError, UploadUnhappinessError, \
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
      NoServersError, InsufficientVersionError, UploadUnhappinessError, \
-     DEFAULT_MAX_SEGMENT_SIZE
+     DEFAULT_MAX_SEGMENT_SIZE, IProgress
 from allmydata.immutable import layout
 from pycryptopp.cipher.aes import AES
 
 from allmydata.immutable import layout
 from pycryptopp.cipher.aes import AES
 
@@ -623,7 +623,7 @@ class EncryptAnUploadable:
     implements(IEncryptedUploadable)
     CHUNKSIZE = 50*1024
 
     implements(IEncryptedUploadable)
     CHUNKSIZE = 50*1024
 
-    def __init__(self, original, log_parent=None):
+    def __init__(self, original, log_parent=None, progress=None):
         precondition(original.default_params_set,
                      "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
         self.original = IUploadable(original)
         precondition(original.default_params_set,
                      "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
         self.original = IUploadable(original)
@@ -636,6 +636,7 @@ class EncryptAnUploadable:
         self._file_size = None
         self._ciphertext_bytes_read = 0
         self._status = None
         self._file_size = None
         self._ciphertext_bytes_read = 0
         self._status = None
+        self._progress = progress
 
     def set_upload_status(self, upload_status):
         self._status = IUploadStatus(upload_status)
 
     def set_upload_status(self, upload_status):
         self._status = IUploadStatus(upload_status)
@@ -656,6 +657,8 @@ class EncryptAnUploadable:
             self._file_size = size
             if self._status:
                 self._status.set_size(size)
             self._file_size = size
             if self._status:
                 self._status.set_size(size)
+            if self._progress:
+                self._progress.set_progress_total(size)
             return size
         d.addCallback(_got_size)
         return d
             return size
         d.addCallback(_got_size)
         return d
@@ -894,7 +897,7 @@ class UploadStatus:
 class CHKUploader:
     server_selector_class = Tahoe2ServerSelector
 
 class CHKUploader:
     server_selector_class = Tahoe2ServerSelector
 
-    def __init__(self, storage_broker, secret_holder):
+    def __init__(self, storage_broker, secret_holder, progress=None):
         # server_selector needs storage_broker and secret_holder
         self._storage_broker = storage_broker
         self._secret_holder = secret_holder
         # server_selector needs storage_broker and secret_holder
         self._storage_broker = storage_broker
         self._secret_holder = secret_holder
@@ -904,6 +907,7 @@ class CHKUploader:
         self._upload_status = UploadStatus()
         self._upload_status.set_helper(False)
         self._upload_status.set_active(True)
         self._upload_status = UploadStatus()
         self._upload_status.set_helper(False)
         self._upload_status.set_active(True)
+        self._progress = progress
 
         # locate_all_shareholders() will create the following attribute:
         # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
 
         # locate_all_shareholders() will create the following attribute:
         # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
@@ -947,8 +951,11 @@ class CHKUploader:
         eu = IEncryptedUploadable(encrypted)
 
         started = time.time()
         eu = IEncryptedUploadable(encrypted)
 
         started = time.time()
-        self._encoder = e = encode.Encoder(self._log_number,
-                                           self._upload_status)
+        self._encoder = e = encode.Encoder(
+            self._log_number,
+            self._upload_status,
+            progress=self._progress,
+        )
         d = e.set_encrypted_uploadable(eu)
         d.addCallback(self.locate_all_shareholders, started)
         d.addCallback(self.set_shareholders, e)
         d = e.set_encrypted_uploadable(eu)
         d.addCallback(self.locate_all_shareholders, started)
         d.addCallback(self.set_shareholders, e)
@@ -1073,12 +1080,13 @@ def read_this_many_bytes(uploadable, size, prepend_data=[]):
 
 class LiteralUploader:
 
 
 class LiteralUploader:
 
-    def __init__(self):
+    def __init__(self, progress=None):
         self._status = s = UploadStatus()
         s.set_storage_index(None)
         s.set_helper(False)
         s.set_progress(0, 1.0)
         s.set_active(False)
         self._status = s = UploadStatus()
         s.set_storage_index(None)
         s.set_helper(False)
         s.set_progress(0, 1.0)
         s.set_active(False)
+        self._progress = progress
 
     def start(self, uploadable):
         uploadable = IUploadable(uploadable)
 
     def start(self, uploadable):
         uploadable = IUploadable(uploadable)
@@ -1086,6 +1094,8 @@ class LiteralUploader:
         def _got_size(size):
             self._size = size
             self._status.set_size(size)
         def _got_size(size):
             self._size = size
             self._status.set_size(size)
+            if self._progress:
+                self._progress.set_progress_total(size)
             return read_this_many_bytes(uploadable, size)
         d.addCallback(_got_size)
         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
             return read_this_many_bytes(uploadable, size)
         d.addCallback(_got_size)
         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
@@ -1109,6 +1119,8 @@ class LiteralUploader:
         self._status.set_progress(1, 1.0)
         self._status.set_progress(2, 1.0)
         self._status.set_results(ur)
         self._status.set_progress(1, 1.0)
         self._status.set_progress(2, 1.0)
         self._status.set_results(ur)
+        if self._progress:
+            self._progress.set_progress(self._size)
         return ur
 
     def close(self):
         return ur
 
     def close(self):
@@ -1503,12 +1515,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
     name = "uploader"
     URI_LIT_SIZE_THRESHOLD = 55
 
     name = "uploader"
     URI_LIT_SIZE_THRESHOLD = 55
 
-    def __init__(self, helper_furl=None, stats_provider=None, history=None):
+    def __init__(self, helper_furl=None, stats_provider=None, history=None, progress=None):
         self._helper_furl = helper_furl
         self.stats_provider = stats_provider
         self._history = history
         self._helper = None
         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
         self._helper_furl = helper_furl
         self.stats_provider = stats_provider
         self._history = history
         self._helper = None
         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
+        self._progress = progress
         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
         service.MultiService.__init__(self)
 
         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
         service.MultiService.__init__(self)
 
@@ -1542,12 +1555,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
         return (self._helper_furl, bool(self._helper))
 
 
         return (self._helper_furl, bool(self._helper))
 
 
-    def upload(self, uploadable):
+    def upload(self, uploadable, progress=None):
         """
         Returns a Deferred that will fire with the UploadResults instance.
         """
         assert self.parent
         assert self.running
         """
         Returns a Deferred that will fire with the UploadResults instance.
         """
         assert self.parent
         assert self.running
+        assert progress is None or IProgress.providedBy(progress)
 
         uploadable = IUploadable(uploadable)
         d = uploadable.get_size()
 
         uploadable = IUploadable(uploadable)
         d = uploadable.get_size()
@@ -1556,13 +1570,15 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
             precondition(isinstance(default_params, dict), default_params)
             precondition("max_segment_size" in default_params, default_params)
             uploadable.set_default_encoding_parameters(default_params)
             precondition(isinstance(default_params, dict), default_params)
             precondition("max_segment_size" in default_params, default_params)
             uploadable.set_default_encoding_parameters(default_params)
+            if progress:
+                progress.set_progress_total(size)
 
             if self.stats_provider:
                 self.stats_provider.count('uploader.files_uploaded', 1)
                 self.stats_provider.count('uploader.bytes_uploaded', size)
 
             if size <= self.URI_LIT_SIZE_THRESHOLD:
 
             if self.stats_provider:
                 self.stats_provider.count('uploader.files_uploaded', 1)
                 self.stats_provider.count('uploader.bytes_uploaded', size)
 
             if size <= self.URI_LIT_SIZE_THRESHOLD:
-                uploader = LiteralUploader()
+                uploader = LiteralUploader(progress=progress)
                 return uploader.start(uploadable)
             else:
                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
                 return uploader.start(uploadable)
             else:
                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
@@ -1575,7 +1591,7 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
                 else:
                     storage_broker = self.parent.get_storage_broker()
                     secret_holder = self.parent._secret_holder
                 else:
                     storage_broker = self.parent.get_storage_broker()
                     secret_holder = self.parent._secret_holder
-                    uploader = CHKUploader(storage_broker, secret_holder)
+                    uploader = CHKUploader(storage_broker, secret_holder, progress=progress)
                     d2.addCallback(lambda x: uploader.start(eu))
 
                 self._all_uploads[uploader] = None
                     d2.addCallback(lambda x: uploader.start(eu))
 
                 self._all_uploads[uploader] = None
index 31fc614443cd8d750090b41c11f954a51b1a0924..40ef040b6a21f124f4046bbc48da9222947a07e2 100644 (file)
@@ -1,5 +1,5 @@
 
 
-from zope.interface import Interface
+from zope.interface import Interface, Attribute
 from foolscap.api import StringConstraint, ListOf, TupleOf, SetOf, DictOf, \
      ChoiceOf, IntegerConstraint, Any, RemoteInterface, Referenceable
 
 from foolscap.api import StringConstraint, ListOf, TupleOf, SetOf, DictOf, \
      ChoiceOf, IntegerConstraint, Any, RemoteInterface, Referenceable
 
@@ -624,6 +624,38 @@ class MustNotBeUnknownRWError(CapConstraintError):
     """Cannot add an unknown child cap specified in a rw_uri field."""
 
 
     """Cannot add an unknown child cap specified in a rw_uri field."""
 
 
+class IProgress(Interface):
+    """
+    Remembers progress measured in arbitrary units. Users of these
+    instances must call ``set_progress_total`` at least once before
+    progress can be valid, and must use the same units for both
+    ``set_progress_total`` and ``set_progress calls``.
+
+    See also:
+    :class:`allmydata.util.progress.PercentProgress`
+    """
+
+    progress = Attribute(
+        "Current amount of progress (in percentage)"
+    )
+
+    def set_progress(self, value):
+        """
+        Sets the current amount of progress.
+
+        Arbitrary units, but must match units used for
+        set_progress_total.
+        """
+
+    def set_progress_total(self, value):
+        """
+        Sets the total amount of expected progress
+
+        Arbitrary units, but must be same units as used when calling
+        set_progress() on this instance)..
+        """
+
+
 class IReadable(Interface):
     """I represent a readable object -- either an immutable file, or a
     specific version of a mutable file.
 class IReadable(Interface):
     """I represent a readable object -- either an immutable file, or a
     specific version of a mutable file.
@@ -653,9 +685,12 @@ class IReadable(Interface):
     def get_size():
         """Return the length (in bytes) of this readable object."""
 
     def get_size():
         """Return the length (in bytes) of this readable object."""
 
-    def download_to_data():
+    def download_to_data(progress=None):
         """Download all of the file contents. I return a Deferred that fires
         """Download all of the file contents. I return a Deferred that fires
-        with the contents as a byte string."""
+        with the contents as a byte string.
+
+        :param progress: None or IProgress implementer
+        """
 
     def read(consumer, offset=0, size=None):
         """Download a portion (possibly all) of the file's contents, making
 
     def read(consumer, offset=0, size=None):
         """Download a portion (possibly all) of the file's contents, making
@@ -915,11 +950,13 @@ class IFileNode(IFilesystemNode):
         the Deferred will errback with an UnrecoverableFileError.
         """
 
         the Deferred will errback with an UnrecoverableFileError.
         """
 
-    def download_best_version():
+    def download_best_version(progress=None):
         """Download the contents of the version that would be returned
         by get_best_readable_version(). This is equivalent to calling
         download_to_data() on the IReadable given by that method.
 
         """Download the contents of the version that would be returned
         by get_best_readable_version(). This is equivalent to calling
         download_to_data() on the IReadable given by that method.
 
+        progress is anything that implements IProgress
+
         I return a Deferred that fires with a byte string when the file
         has been fully downloaded. To support streaming download, use
         the 'read' method of IReadable. If no version is recoverable,
         I return a Deferred that fires with a byte string when the file
         has been fully downloaded. To support streaming download, use
         the 'read' method of IReadable. If no version is recoverable,
@@ -1065,7 +1102,7 @@ class IMutableFileNode(IFileNode):
         everything) to get increased visibility.
         """
 
         everything) to get increased visibility.
         """
 
-    def upload(new_contents, servermap):
+    def upload(new_contents, servermap, progress=None):
         """Replace the contents of the file with new ones. This requires a
         servermap that was previously updated with MODE_WRITE.
 
         """Replace the contents of the file with new ones. This requires a
         servermap that was previously updated with MODE_WRITE.
 
@@ -1086,6 +1123,8 @@ class IMutableFileNode(IFileNode):
         operation. If I do not signal UncoordinatedWriteError, then I was
         able to write the new version without incident.
 
         operation. If I do not signal UncoordinatedWriteError, then I was
         able to write the new version without incident.
 
+        ``progress`` is either None or an IProgress provider
+
         I return a Deferred that fires (with a PublishStatus object) when the
         publish has completed. I will update the servermap in-place with the
         location of all new shares.
         I return a Deferred that fires (with a PublishStatus object) when the
         publish has completed. I will update the servermap in-place with the
         location of all new shares.
@@ -1276,12 +1315,14 @@ class IDirectoryNode(IFilesystemNode):
         equivalent to calling set_node() multiple times, but is much more
         efficient."""
 
         equivalent to calling set_node() multiple times, but is much more
         efficient."""
 
-    def add_file(name, uploadable, metadata=None, overwrite=True):
+    def add_file(name, uploadable, metadata=None, overwrite=True, progress=None):
         """I upload a file (using the given IUploadable), then attach the
         resulting ImmutableFileNode to the directory at the given name. I set
         metadata the same way as set_uri and set_node. The child name must be
         a unicode string.
 
         """I upload a file (using the given IUploadable), then attach the
         resulting ImmutableFileNode to the directory at the given name. I set
         metadata the same way as set_uri and set_node. The child name must be
         a unicode string.
 
+        ``progress`` either provides IProgress or is None
+
         I return a Deferred that fires (with the IFileNode of the uploaded
         file) when the operation completes."""
 
         I return a Deferred that fires (with the IFileNode of the uploaded
         file) when the operation completes."""
 
index 982b5fe298f8b02f52182d6b608b292d8bbdccf1..661c774d26535875a60a2322314eec7efb47da16 100644 (file)
@@ -65,6 +65,7 @@ class MagicFolderDB(object):
                   (relpath_u,))
         row = self.cursor.fetchone()
         if not row:
                   (relpath_u,))
         row = self.cursor.fetchone()
         if not row:
+            print "found nothing for", relpath_u
             return None
         else:
             (size, mtime, ctime, version, last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp) = row
             return None
         else:
             (size, mtime, ctime, version, last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp) = row
index 50bbdd34e06910c8166357aac2177446c80a229f..084c28f66b4e9ed399f1498b00a20985c9c73975 100644 (file)
@@ -403,21 +403,21 @@ class MutableFileNode:
         return d.addCallback(_get_version, version)
 
 
         return d.addCallback(_get_version, version)
 
 
-    def download_best_version(self):
+    def download_best_version(self, progress=None):
         """
         I return a Deferred that fires with the contents of the best
         version of this mutable file.
         """
         """
         I return a Deferred that fires with the contents of the best
         version of this mutable file.
         """
-        return self._do_serialized(self._download_best_version)
+        return self._do_serialized(self._download_best_version, progress=progress)
 
 
 
 
-    def _download_best_version(self):
+    def _download_best_version(self, progress=None):
         """
         I am the serialized sibling of download_best_version.
         """
         d = self.get_best_readable_version()
         d.addCallback(self._record_size)
         """
         I am the serialized sibling of download_best_version.
         """
         d = self.get_best_readable_version()
         d.addCallback(self._record_size)
-        d.addCallback(lambda version: version.download_to_data())
+        d.addCallback(lambda version: version.download_to_data(progress=progress))
 
         # It is possible that the download will fail because there
         # aren't enough shares to be had. If so, we will try again after
 
         # It is possible that the download will fail because there
         # aren't enough shares to be had. If so, we will try again after
@@ -432,7 +432,7 @@ class MutableFileNode:
 
             d = self.get_best_mutable_version()
             d.addCallback(self._record_size)
 
             d = self.get_best_mutable_version()
             d.addCallback(self._record_size)
-            d.addCallback(lambda version: version.download_to_data())
+            d.addCallback(lambda version: version.download_to_data(progress=progress))
             return d
 
         d.addErrback(_maybe_retry)
             return d
 
         d.addErrback(_maybe_retry)
@@ -935,13 +935,13 @@ class MutableFileVersion:
         return self._servermap.size_of_version(self._version)
 
 
         return self._servermap.size_of_version(self._version)
 
 
-    def download_to_data(self, fetch_privkey=False):
+    def download_to_data(self, fetch_privkey=False, progress=None):
         """
         I return a Deferred that fires with the contents of this
         readable object as a byte string.
 
         """
         """
         I return a Deferred that fires with the contents of this
         readable object as a byte string.
 
         """
-        c = consumer.MemoryConsumer()
+        c = consumer.MemoryConsumer(progress=progress)
         d = self.read(c, fetch_privkey=fetch_privkey)
         d.addCallback(lambda mc: "".join(mc.chunks))
         return d
         d = self.read(c, fetch_privkey=fetch_privkey)
         d.addCallback(lambda mc: "".join(mc.chunks))
         return d
index 7b965525deced23e202d361180943818e6ea78a6..1964672bd2b416973592fb26ca85c317b62cae3c 100644 (file)
@@ -31,6 +31,7 @@ class BadResponse(object):
     def __init__(self, url, err):
         self.status = -1
         self.reason = "Error trying to connect to %s: %s" % (url, err)
     def __init__(self, url, err):
         self.status = -1
         self.reason = "Error trying to connect to %s: %s" % (url, err)
+        self.error = err
     def read(self):
         return ""
 
     def read(self):
         return ""
 
index 5d5c61dc591a1c378536353ef870d07078ac8f15..4c9a469beff980523c9e6e406a6af358cdc74a9f 100644 (file)
@@ -1,7 +1,13 @@
 
 import os
 
 import os
+import urllib
+from sys import stderr
 from types import NoneType
 from cStringIO import StringIO
 from types import NoneType
 from cStringIO import StringIO
+from datetime import datetime
+
+import humanize
+import simplejson
 
 from twisted.python import usage
 
 
 from twisted.python import usage
 
@@ -12,10 +18,13 @@ from .cli import MakeDirectoryOptions, LnOptions, CreateAliasOptions
 import tahoe_mv
 from allmydata.util.encodingutil import argv_to_abspath, argv_to_unicode, to_str, \
     quote_local_unicode_path
 import tahoe_mv
 from allmydata.util.encodingutil import argv_to_abspath, argv_to_unicode, to_str, \
     quote_local_unicode_path
+from allmydata.scripts.common_http import do_http, format_http_success, \
+    format_http_error, BadResponse
 from allmydata.util import fileutil
 from allmydata.util import configutil
 from allmydata import uri
 
 from allmydata.util import fileutil
 from allmydata.util import configutil
 from allmydata import uri
 
+
 INVITE_SEPARATOR = "+"
 
 class CreateOptions(BasedirOptions):
 INVITE_SEPARATOR = "+"
 
 class CreateOptions(BasedirOptions):
@@ -200,21 +209,167 @@ class StatusOptions(BasedirOptions):
     nickname = None
     synopsis = ""
     stdin = StringIO("")
     nickname = None
     synopsis = ""
     stdin = StringIO("")
+
     def parseArgs(self):
         BasedirOptions.parseArgs(self)
         node_url_file = os.path.join(self['node-directory'], u"node.url")
     def parseArgs(self):
         BasedirOptions.parseArgs(self)
         node_url_file = os.path.join(self['node-directory'], u"node.url")
-        self['node-url'] = open(node_url_file, "r").read().strip()
+        with open(node_url_file, "r") as f:
+            self['node-url'] = f.read().strip()
+
+
+def _get_json_for_fragment(options, fragment):
+    nodeurl = options['node-url']
+    if nodeurl.endswith('/'):
+        nodeurl = nodeurl[:-1]
+
+    url = u'%s/%s' % (nodeurl, fragment)
+    resp = do_http(method, url)
+    if isinstance(resp, BadResponse):
+        # specifically NOT using format_http_error() here because the
+        # URL is pretty sensitive (we're doing /uri/<key>).
+        raise RuntimeError(
+            "Failed to get json from '%s': %s" % (nodeurl, resp.error)
+        )
+
+    data = resp.read()
+    parsed = simplejson.loads(data)
+    if not parsed:
+        raise RuntimeError("No data from '%s'" % (nodeurl,))
+    return parsed
+
+
+def _get_json_for_cap(options, cap):
+    return _get_json_for_fragment(
+        options,
+        'uri/%s?t=json' % urllib.quote(cap),
+    )
+
+def _print_item_status(item, now, longest):
+    paddedname = (' ' * (longest - len(item['path']))) + item['path']
+    if 'failure_at' in item:
+        ts = datetime.fromtimestamp(item['started_at'])
+        prog = 'Failed %s (%s)' % (humanize.naturaltime(now - ts), ts)
+    elif item['percent_done'] < 100.0:
+        if 'started_at' not in item:
+            prog = 'not yet started'
+        else:
+            so_far = now - datetime.fromtimestamp(item['started_at'])
+            if so_far.seconds > 0.0:
+                rate = item['percent_done'] / so_far.seconds
+                if rate != 0:
+                    time_left = (100.0 - item['percent_done']) / rate
+                    prog = '%2.1f%% done, around %s left' % (
+                        item['percent_done'],
+                        humanize.naturaldelta(time_left),
+                    )
+                else:
+                    time_left = None
+                    prog = '%2.1f%% done' % (item['percent_done'],)
+            else:
+                prog = 'just started'
+    else:
+        prog = ''
+        for verb in ['finished', 'started', 'queued']:
+            keyname = verb + '_at'
+            if keyname in item:
+                when = datetime.fromtimestamp(item[keyname])
+                prog = '%s %s' % (verb, humanize.naturaltime(now - when))
+                break
+
+    print "  %s: %s" % (paddedname, prog)
 
 def status(options):
 
 def status(options):
-    # XXX todo: use http interface to ask about our magic-folder upload status
+    nodedir = options["node-directory"]
+    with open(os.path.join(nodedir, u"private", u"magic_folder_dircap")) as f:
+        dmd_cap = f.read().strip()
+    with open(os.path.join(nodedir, u"private", u"collective_dircap")) as f:
+        collective_readcap = f.read().strip()
+
+    try:
+        captype, dmd = _get_json_for_cap(options, dmd_cap)
+        if captype != 'dirnode':
+            print >>stderr, "magic_folder_dircap isn't a directory capability"
+            return 2
+    except RuntimeError as e:
+        print >>stderr, str(e)
+        return 1
+
+    now = datetime.now()
+
+    print "Local files:"
+    for (name, child) in dmd['children'].items():
+        captype, meta = child
+        status = 'good'
+        size = meta['size']
+        created = datetime.fromtimestamp(meta['metadata']['tahoe']['linkcrtime'])
+        version = meta['metadata']['version']
+        nice_size = humanize.naturalsize(size)
+        nice_created = humanize.naturaltime(now - created)
+        if captype != 'filenode':
+            print "%20s: error, should be a filecap" % name
+            continue
+        print "  %s (%s): %s, version=%s, created %s" % (name, nice_size, status, version, nice_created)
+
+    captype, collective = _get_json_for_cap(options, collective_readcap)
+    print
+    print "Remote files:"
+    for (name, data) in collective['children'].items():
+        if data[0] != 'dirnode':
+            print "Error: '%s': expected a dirnode, not '%s'" % (name, data[0])
+        print "  %s's remote:" % name
+        dmd = _get_json_for_cap(options, data[1]['ro_uri'])
+        if dmd[0] != 'dirnode':
+            print "Error: should be a dirnode"
+            continue
+        for (n, d) in dmd[1]['children'].items():
+            if d[0] != 'filenode':
+                print "Error: expected '%s' to be a filenode." % (n,)
+
+            meta = d[1]
+            status = 'good'
+            size = meta['size']
+            created = datetime.fromtimestamp(meta['metadata']['tahoe']['linkcrtime'])
+            version = meta['metadata']['version']
+            nice_size = humanize.naturalsize(size)
+            nice_created = humanize.naturaltime(now - created)
+            print "    %s (%s): %s, version=%s, created %s" % (n, nice_size, status, version, nice_created)
+
+    magicdata = _get_json_for_fragment(options, 'magic_folder?t=json')
+    if len(magicdata):
+        uploads = [item for item in magicdata if item['kind'] == 'upload']
+        downloads = [item for item in magicdata if item['kind'] == 'download']
+        longest = max([len(item['path']) for item in magicdata])
+
+        if True: # maybe --show-completed option or something?
+            uploads = [item for item in uploads if item['status'] != 'success']
+            downloads = [item for item in downloads if item['status'] != 'success']
+
+        if len(uploads):
+            print
+            print "Uploads:"
+            for item in uploads:
+                _print_item_status(item, now, longest)
+
+        if len(downloads):
+            print
+            print "Downloads:"
+            for item in downloads:
+                _print_item_status(item, now, longest)
+
+        for item in magicdata:
+            if item['status'] == 'failure':
+                print "Failed:", item
+
     return 0
 
     return 0
 
+
 class MagicFolderCommand(BaseOptions):
     subCommands = [
         ["create", None, CreateOptions, "Create a Magic Folder."],
         ["invite", None, InviteOptions, "Invite someone to a Magic Folder."],
         ["join", None, JoinOptions, "Join a Magic Folder."],
         ["leave", None, LeaveOptions, "Leave a Magic Folder."],
 class MagicFolderCommand(BaseOptions):
     subCommands = [
         ["create", None, CreateOptions, "Create a Magic Folder."],
         ["invite", None, InviteOptions, "Invite someone to a Magic Folder."],
         ["join", None, JoinOptions, "Join a Magic Folder."],
         ["leave", None, LeaveOptions, "Leave a Magic Folder."],
+        ["status", None, StatusOptions, "Display stutus of uploads/downloads."],
     ]
     def postOptions(self):
         if not hasattr(self, 'subOptions'):
     ]
     def postOptions(self):
         if not hasattr(self, 'subOptions'):
@@ -234,6 +389,7 @@ subDispatch = {
     "invite": invite,
     "join": join,
     "leave": leave,
     "invite": invite,
     "join": join,
     "leave": leave,
+    "status": status,
 }
 
 def do_magic_folder(options):
 }
 
 def do_magic_folder(options):
index 6bf2f974370a5f66780a4ca19936d9161d57f74d..723254b5c049446dd3ebc87d1f5bf132a5939b42 100644 (file)
@@ -151,8 +151,8 @@ class FakeCHKFileNode:
         return defer.succeed(self)
 
 
         return defer.succeed(self)
 
 
-    def download_to_data(self):
-        return download_to_data(self)
+    def download_to_data(self, progress=None):
+        return download_to_data(self, progress=progress)
 
 
     download_best_version = download_to_data
 
 
     download_best_version = download_to_data
@@ -329,11 +329,11 @@ class FakeMutableFileNode:
         d.addCallback(_done)
         return d
 
         d.addCallback(_done)
         return d
 
-    def download_best_version(self):
-        return defer.succeed(self._download_best_version())
+    def download_best_version(self, progress=None):
+        return defer.succeed(self._download_best_version(progress=progress))
 
 
 
 
-    def _download_best_version(self, ignored=None):
+    def _download_best_version(self, ignored=None, progress=None):
         if isinstance(self.my_uri, uri.LiteralFileURI):
             return self.my_uri.data
         if self.storage_index not in self.all_contents:
         if isinstance(self.my_uri, uri.LiteralFileURI):
             return self.my_uri.data
         if self.storage_index not in self.all_contents:
index c65114fb1bcc13917d1adc499e68fb8b4ec49833..65366709cc9afd96b843936a0b61b8ba0d38ae20 100644 (file)
@@ -1519,7 +1519,7 @@ class FakeMutableFile:
     def get_write_uri(self):
         return self.uri.to_string()
 
     def get_write_uri(self):
         return self.uri.to_string()
 
-    def download_best_version(self):
+    def download_best_version(self, progress=None):
         return defer.succeed(self.data)
 
     def get_writekey(self):
         return defer.succeed(self.data)
 
     def get_writekey(self):
index e862fcedee385fb257ac54f58d0fa07af869468f..d974e5debd8652ea61bd7481efc72575de5ef664 100644 (file)
@@ -1,5 +1,6 @@
 
 import os, sys
 
 import os, sys
+import shutil
 
 from twisted.trial import unittest
 from twisted.internet import defer, task
 
 from twisted.trial import unittest
 from twisted.internet import defer, task
index ec329515e26d2e95d82ce28d911bb44fb195a343..824f18c16355d8c3a1bd17db4f2b12a45a420f64 100644 (file)
@@ -83,7 +83,7 @@ class FakeUploader(service.Service):
     helper_furl = None
     helper_connected = False
 
     helper_furl = None
     helper_connected = False
 
-    def upload(self, uploadable):
+    def upload(self, uploadable, **kw):
         d = uploadable.get_size()
         d.addCallback(lambda size: uploadable.read(size))
         def _got_data(datav):
         d = uploadable.get_size()
         d.addCallback(lambda size: uploadable.read(size))
         def _got_data(datav):
index 4128c2006029e17a37146c95509285916b67b39e..a48fb59da9d5af3be9b198764b7ce51e807e8a1f 100644 (file)
@@ -8,9 +8,12 @@ from twisted.internet.interfaces import IConsumer
 
 class MemoryConsumer:
     implements(IConsumer)
 
 class MemoryConsumer:
     implements(IConsumer)
-    def __init__(self):
+
+    def __init__(self, progress=None):
         self.chunks = []
         self.done = False
         self.chunks = []
         self.done = False
+        self._progress = progress
+
     def registerProducer(self, p, streaming):
         self.producer = p
         if streaming:
     def registerProducer(self, p, streaming):
         self.producer = p
         if streaming:
@@ -19,12 +22,19 @@ class MemoryConsumer:
         else:
             while not self.done:
                 p.resumeProducing()
         else:
             while not self.done:
                 p.resumeProducing()
+
     def write(self, data):
         self.chunks.append(data)
     def write(self, data):
         self.chunks.append(data)
+        if self._progress is not None:
+            self._progress.set_progress(sum([len(c) for c in self.chunks]))
+
     def unregisterProducer(self):
         self.done = True
 
     def unregisterProducer(self):
         self.done = True
 
-def download_to_data(n, offset=0, size=None):
-    d = n.read(MemoryConsumer(), offset, size)
+def download_to_data(n, offset=0, size=None, progress=None):
+    """
+    :param on_progress: if set, a single-arg callable that receives total bytes downloaded
+    """
+    d = n.read(MemoryConsumer(progress=progress), offset, size)
     d.addCallback(lambda mc: "".join(mc.chunks))
     return d
     d.addCallback(lambda mc: "".join(mc.chunks))
     return d
index 9a212378f8838a7482f36014bb2c2e96fd7e3466..b05263041e8769489c4d8f5b641026183c42959c 100644 (file)
@@ -116,7 +116,7 @@ class HookMixin:
         """
         hook = self._hooks[name]
         if hook is None:
         """
         hook = self._hooks[name]
         if hook is None:
-            return None
+            return res  # pass on error/result
 
         (d, ignore_count) = hook
         self._log("call_hook %r, ignore_count=%r" % (name, ignore_count))
 
         (d, ignore_count) = hook
         self._log("call_hook %r, ignore_count=%r" % (name, ignore_count))
diff --git a/src/allmydata/util/progress.py b/src/allmydata/util/progress.py
new file mode 100644 (file)
index 0000000..3618c88
--- /dev/null
@@ -0,0 +1,37 @@
+"""
+Utilities relating to computing progress information.
+
+Ties in with the "consumer" module also
+"""
+
+from allmydata.interfaces import IProgress
+from zope.interface import implementer
+
+
+@implementer(IProgress)
+class PercentProgress(object):
+    """
+    Represents progress as a percentage, from 0.0 to 100.0
+    """
+
+    def __init__(self, total_size=None):
+        self._value = 0.0
+        self.set_progress_total(total_size)
+
+    def set_progress(self, value):
+        "IProgress API"
+        self._value = value
+
+    def set_progress_total(self, size):
+        "IProgress API"
+        if size is not None:
+            size = float(size)
+        self._total_size = size
+
+    @property
+    def progress(self):
+        if self._total_size is None:
+            return 0  # or 1.0?
+        if self._total_size <= 0.0:
+            return 0
+        return (self._value / self._total_size) * 100.0
diff --git a/src/allmydata/web/magic_folder.py b/src/allmydata/web/magic_folder.py
new file mode 100644 (file)
index 0000000..5d2f3e5
--- /dev/null
@@ -0,0 +1,60 @@
+import simplejson
+
+from nevow import rend, url, tags as T
+from nevow.inevow import IRequest
+
+from allmydata.web.common import getxmlfile, get_arg, WebError
+
+
+class MagicFolderWebApi(rend.Page):
+    """
+    I provide the web-based API for Magic Folder status etc.
+    """
+
+    def __init__(self, client):
+        ##rend.Page.__init__(self, storage)
+        super(MagicFolderWebApi, self).__init__(client)
+        self.client = client
+
+    def _render_json(self, req):
+        req.setHeader("content-type", "application/json")
+
+        data = []
+        for item in self.client._magic_folder.uploader.get_status():
+            d = dict(
+                path=item.relpath_u,
+                status=item.status_history()[-1][0],
+                kind='upload',
+            )
+            for (status, ts) in item.status_history():
+                d[status + '_at'] = ts
+            d['percent_done'] = item.progress.progress
+            data.append(d)
+
+        for item in self.client._magic_folder.downloader.get_status():
+            d = dict(
+                path=item.relpath_u,
+                status=item.status_history()[-1][0],
+                kind='download',
+            )
+            for (status, ts) in item.status_history():
+                d[status + '_at'] = ts
+            d['percent_done'] = item.progress.progress
+            data.append(d)
+
+        return simplejson.dumps(data)
+
+    def renderHTTP(self, ctx):
+        req = IRequest(ctx)
+        t = get_arg(req, "t", None)
+
+        if t is None:
+            return rend.Page.renderHTTP(self, ctx)
+
+        t = t.strip()
+        if t == 'json':
+            return self._render_json(req)
+
+        raise WebError("'%s' invalid type for 't' arg" % (t,), 400)
+
+
index d8d789cf37c08e14b0de8758754ea84f69bccd95..6b28481050b24326e819fea322dc867dcac9398b 100644 (file)
@@ -12,7 +12,7 @@ from allmydata import get_package_versions_string
 from allmydata.util import log
 from allmydata.interfaces import IFileNode
 from allmydata.web import filenode, directory, unlinked, status, operations
 from allmydata.util import log
 from allmydata.interfaces import IFileNode
 from allmydata.web import filenode, directory, unlinked, status, operations
-from allmydata.web import storage
+from allmydata.web import storage, magic_folder
 from allmydata.web.common import abbreviate_size, getxmlfile, WebError, \
      get_arg, RenderMixin, get_format, get_mutable_type, render_time_delta, render_time, render_time_attr
 
 from allmydata.web.common import abbreviate_size, getxmlfile, WebError, \
      get_arg, RenderMixin, get_format, get_mutable_type, render_time_delta, render_time, render_time_attr
 
@@ -154,6 +154,9 @@ class Root(rend.Page):
         self.child_uri = URIHandler(client)
         self.child_cap = URIHandler(client)
 
         self.child_uri = URIHandler(client)
         self.child_cap = URIHandler(client)
 
+        # handler for "/magic_folder" URIs
+        self.child_magic_folder = magic_folder.MagicFolderWebApi(client)
+
         self.child_file = FileHandler(client)
         self.child_named = FileHandler(client)
         self.child_status = status.Status(client.get_history())
         self.child_file = FileHandler(client)
         self.child_named = FileHandler(client)
         self.child_status = status.Status(client.get_history())