]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/immutable/upload.py
Flesh out "tahoe magic-folder status" command
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
index c63240463cd1a510d411e8939d746fb3a5cd0b4f..99168f4e07952efe53a18a9154a554dd0115cf70 100644 (file)
@@ -21,7 +21,7 @@ from allmydata.util.rrefutil import add_version_to_remote_reference
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
      NoServersError, InsufficientVersionError, UploadUnhappinessError, \
-     DEFAULT_MAX_SEGMENT_SIZE
+     DEFAULT_MAX_SEGMENT_SIZE, IProgress
 from allmydata.immutable import layout
 from pycryptopp.cipher.aes import AES
 
@@ -623,7 +623,7 @@ class EncryptAnUploadable:
     implements(IEncryptedUploadable)
     CHUNKSIZE = 50*1024
 
-    def __init__(self, original, log_parent=None):
+    def __init__(self, original, log_parent=None, progress=None):
         precondition(original.default_params_set,
                      "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
         self.original = IUploadable(original)
@@ -636,6 +636,7 @@ class EncryptAnUploadable:
         self._file_size = None
         self._ciphertext_bytes_read = 0
         self._status = None
+        self._progress = progress
 
     def set_upload_status(self, upload_status):
         self._status = IUploadStatus(upload_status)
@@ -656,6 +657,8 @@ class EncryptAnUploadable:
             self._file_size = size
             if self._status:
                 self._status.set_size(size)
+            if self._progress:
+                self._progress.set_progress_total(size)
             return size
         d.addCallback(_got_size)
         return d
@@ -894,7 +897,7 @@ class UploadStatus:
 class CHKUploader:
     server_selector_class = Tahoe2ServerSelector
 
-    def __init__(self, storage_broker, secret_holder):
+    def __init__(self, storage_broker, secret_holder, progress=None):
         # server_selector needs storage_broker and secret_holder
         self._storage_broker = storage_broker
         self._secret_holder = secret_holder
@@ -904,6 +907,7 @@ class CHKUploader:
         self._upload_status = UploadStatus()
         self._upload_status.set_helper(False)
         self._upload_status.set_active(True)
+        self._progress = progress
 
         # locate_all_shareholders() will create the following attribute:
         # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
@@ -947,8 +951,11 @@ class CHKUploader:
         eu = IEncryptedUploadable(encrypted)
 
         started = time.time()
-        self._encoder = e = encode.Encoder(self._log_number,
-                                           self._upload_status)
+        self._encoder = e = encode.Encoder(
+            self._log_number,
+            self._upload_status,
+            progress=self._progress,
+        )
         d = e.set_encrypted_uploadable(eu)
         d.addCallback(self.locate_all_shareholders, started)
         d.addCallback(self.set_shareholders, e)
@@ -1073,12 +1080,13 @@ def read_this_many_bytes(uploadable, size, prepend_data=[]):
 
 class LiteralUploader:
 
-    def __init__(self):
+    def __init__(self, progress=None):
         self._status = s = UploadStatus()
         s.set_storage_index(None)
         s.set_helper(False)
         s.set_progress(0, 1.0)
         s.set_active(False)
+        self._progress = progress
 
     def start(self, uploadable):
         uploadable = IUploadable(uploadable)
@@ -1086,6 +1094,8 @@ class LiteralUploader:
         def _got_size(size):
             self._size = size
             self._status.set_size(size)
+            if self._progress:
+                self._progress.set_progress_total(size)
             return read_this_many_bytes(uploadable, size)
         d.addCallback(_got_size)
         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
@@ -1109,6 +1119,8 @@ class LiteralUploader:
         self._status.set_progress(1, 1.0)
         self._status.set_progress(2, 1.0)
         self._status.set_results(ur)
+        if self._progress:
+            self._progress.set_progress(self._size)
         return ur
 
     def close(self):
@@ -1503,12 +1515,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
     name = "uploader"
     URI_LIT_SIZE_THRESHOLD = 55
 
-    def __init__(self, helper_furl=None, stats_provider=None, history=None):
+    def __init__(self, helper_furl=None, stats_provider=None, history=None, progress=None):
         self._helper_furl = helper_furl
         self.stats_provider = stats_provider
         self._history = history
         self._helper = None
         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
+        self._progress = progress
         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
         service.MultiService.__init__(self)
 
@@ -1542,12 +1555,13 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
         return (self._helper_furl, bool(self._helper))
 
 
-    def upload(self, uploadable):
+    def upload(self, uploadable, progress=None):
         """
         Returns a Deferred that will fire with the UploadResults instance.
         """
         assert self.parent
         assert self.running
+        assert progress is None or IProgress.providedBy(progress)
 
         uploadable = IUploadable(uploadable)
         d = uploadable.get_size()
@@ -1556,13 +1570,15 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
             precondition(isinstance(default_params, dict), default_params)
             precondition("max_segment_size" in default_params, default_params)
             uploadable.set_default_encoding_parameters(default_params)
+            if progress:
+                progress.set_progress_total(size)
 
             if self.stats_provider:
                 self.stats_provider.count('uploader.files_uploaded', 1)
                 self.stats_provider.count('uploader.bytes_uploaded', size)
 
             if size <= self.URI_LIT_SIZE_THRESHOLD:
-                uploader = LiteralUploader()
+                uploader = LiteralUploader(progress=progress)
                 return uploader.start(uploadable)
             else:
                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
@@ -1575,7 +1591,7 @@ class Uploader(service.MultiService, log.PrefixingLogMixin):
                 else:
                     storage_broker = self.parent.get_storage_broker()
                     secret_holder = self.parent._secret_holder
-                    uploader = CHKUploader(storage_broker, secret_holder)
+                    uploader = CHKUploader(storage_broker, secret_holder, progress=progress)
                     d2.addCallback(lambda x: uploader.start(eu))
 
                 self._all_uploads[uploader] = None