]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/immutable/upload.py
immutable: refactor uploader to do just encoding-and-uploading, not encryption
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
index fd4a601d1d941a106e9d61a3fa7e57b1ce2a38c5..1790be93b6779a2afff3d7a0e967c183c3142edf 100644 (file)
@@ -13,7 +13,7 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \
      storage_index_hash, plaintext_segment_hasher, convergence_hasher
 from allmydata import storage, hashtree, uri
 from allmydata.immutable import encode
-from allmydata.util import base32, idlib, mathutil
+from allmydata.util import base32, idlib, log, mathutil
 from allmydata.util.assertutil import precondition
 from allmydata.util.rrefutil import get_versioned_remote_reference
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
@@ -662,27 +662,21 @@ class CHKUploader:
             kwargs["facility"] = "tahoe.upload"
         return self._client.log(*args, **kwargs)
 
-    def start(self, uploadable):
+    def start(self, encrypted_uploadable):
         """Start uploading the file.
 
-        This method returns a Deferred that will fire with the URI (a
-        string)."""
+        Returns a Deferred that will fire with the UploadResults instance.
+        """
 
         self._started = time.time()
-        uploadable = IUploadable(uploadable)
-        self.log("starting upload of %s" % uploadable)
+        eu = IEncryptedUploadable(encrypted_uploadable)
+        self.log("starting upload of %s" % eu)
 
-        eu = EncryptAnUploadable(uploadable, self._log_number)
         eu.set_upload_status(self._upload_status)
         d = self.start_encrypted(eu)
-        def _uploaded(res):
-            d1 = uploadable.get_encryption_key()
-            d1.addCallback(lambda key: self._compute_uri(res, key))
-            return d1
-        d.addCallback(_uploaded)
-        def _done(res):
+        def _done(uploadresults):
             self._upload_status.set_active(False)
-            return res
+            return uploadresults
         d.addBoth(_done)
         return d
 
@@ -696,6 +690,7 @@ class CHKUploader:
         return self._encoder.abort()
 
     def start_encrypted(self, encrypted):
+        """ Returns a Deferred that will fire with the UploadResults instance. """
         eu = IEncryptedUploadable(encrypted)
 
         started = time.time()
@@ -706,7 +701,6 @@ class CHKUploader:
         d.addCallback(self.set_shareholders, e)
         d.addCallback(lambda res: e.start())
         d.addCallback(self._encrypted_done)
-        # this fires with the uri_extension_hash and other data
         return d
 
     def locate_all_shareholders(self, encoder, started):
@@ -761,7 +755,8 @@ class CHKUploader:
         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
         encoder.set_shareholders(buckets)
 
-    def _encrypted_done(self, res):
+    def _encrypted_done(self, verifycap):
+        """ Returns a Deferred that will fire with the UploadResults instance. """
         r = self._results
         for shnum in self._encoder.get_shares_placed():
             peer_tracker = self._sharemap[shnum]
@@ -779,19 +774,7 @@ class CHKUploader:
         r.timings["peer_selection"] = self._peer_selection_elapsed
         r.timings.update(self._encoder.get_times())
         r.uri_extension_data = self._encoder.get_uri_extension_data()
-        return res
-
-    def _compute_uri(self, (uri_extension_hash,
-                            needed_shares, total_shares, size),
-                     key):
-        u = uri.CHKFileURI(key=key,
-                           uri_extension_hash=uri_extension_hash,
-                           needed_shares=needed_shares,
-                           total_shares=total_shares,
-                           size=size,
-                           )
-        r = self._results
-        r.uri = u.to_string()
+        r.verifycapstr = verifycap.to_string()
         return r
 
     def get_upload_status(self):
@@ -948,26 +931,23 @@ class AssistedUploader:
             kwargs["parent"] = self._log_number
         return log.msg(*args, **kwargs)
 
-    def start(self, uploadable):
+    def start(self, encrypted_uploadable, storage_index):
+        """Start uploading the file.
+
+        Returns a Deferred that will fire with the UploadResults instance.
+        """
+        precondition(isinstance(storage_index, str), storage_index)
         self._started = time.time()
-        u = IUploadable(uploadable)
-        eu = EncryptAnUploadable(u, self._log_number)
+        eu = IEncryptedUploadable(encrypted_uploadable)
         eu.set_upload_status(self._upload_status)
         self._encuploadable = eu
+        self._storage_index = storage_index
         d = eu.get_size()
         d.addCallback(self._got_size)
         d.addCallback(lambda res: eu.get_all_encoding_parameters())
         d.addCallback(self._got_all_encoding_parameters)
-        # when we get the encryption key, that will also compute the storage
-        # index, so this only takes one pass.
-        # TODO: I'm not sure it's cool to switch back and forth between
-        # the Uploadable and the IEncryptedUploadable that wraps it.
-        d.addCallback(lambda res: u.get_encryption_key())
-        d.addCallback(self._got_encryption_key)
-        d.addCallback(lambda res: eu.get_storage_index())
-        d.addCallback(self._got_storage_index)
         d.addCallback(self._contact_helper)
-        d.addCallback(self._build_readcap)
+        d.addCallback(self._build_verifycap)
         def _done(res):
             self._upload_status.set_active(False)
             return res
@@ -985,13 +965,6 @@ class AssistedUploader:
         self._total_shares = n
         self._segment_size = segment_size
 
-    def _got_encryption_key(self, key):
-        self._key = key
-
-    def _got_storage_index(self, storage_index):
-        self._storage_index = storage_index
-
-
     def _contact_helper(self, res):
         now = self._time_contacting_helper_start = time.time()
         self._storage_index_elapsed = now - self._started
@@ -1023,7 +996,7 @@ class AssistedUploader:
         self._upload_status.set_results(upload_results)
         return upload_results
 
-    def _build_readcap(self, upload_results):
+    def _build_verifycap(self, upload_results):
         self.log("upload finished, building readcap")
         self._upload_status.set_status("Building Readcap")
         r = upload_results
@@ -1031,13 +1004,11 @@ class AssistedUploader:
         assert r.uri_extension_data["total_shares"] == self._total_shares
         assert r.uri_extension_data["segment_size"] == self._segment_size
         assert r.uri_extension_data["size"] == self._size
-        u = uri.CHKFileURI(key=self._key,
-                           uri_extension_hash=r.uri_extension_hash,
-                           needed_shares=self._needed_shares,
-                           total_shares=self._total_shares,
-                           size=self._size,
-                           )
-        r.uri = u.to_string()
+        r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
+                                             uri_extension_hash=r.uri_extension_hash,
+                                             needed_shares=self._needed_shares,
+                                             total_shares=self._total_shares, size=self._size
+                                             ).to_string()
         now = time.time()
         r.file_size = self._size
         r.timings["storage_index"] = self._storage_index_elapsed
@@ -1207,13 +1178,12 @@ class Data(FileHandle):
         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
         FileHandle.__init__(self, StringIO(data), convergence=convergence)
 
-class Uploader(service.MultiService):
+class Uploader(service.MultiService, log.PrefixingLogMixin):
     """I am a service that allows file uploading. I am a service-child of the
     Client.
     """
     implements(IUploader)
     name = "uploader"
-    uploader_class = CHKUploader
     URI_LIT_SIZE_THRESHOLD = 55
     MAX_UPLOAD_STATUSES = 10
 
@@ -1224,6 +1194,7 @@ class Uploader(service.MultiService):
         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
         self._all_upload_statuses = weakref.WeakKeyDictionary()
         self._recent_upload_statuses = []
+        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
         service.MultiService.__init__(self)
 
     def startService(self):
@@ -1233,7 +1204,7 @@ class Uploader(service.MultiService):
                                       self._got_helper)
 
     def _got_helper(self, helper):
-        log.msg("got helper connection, getting versions")
+        self.log("got helper connection, getting versions")
         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
                     { },
                     "application-version": "unknown: no get_version()",
@@ -1257,7 +1228,9 @@ class Uploader(service.MultiService):
 
 
     def upload(self, uploadable):
-        # this returns the URI
+        """
+        Returns a Deferred that will fire with the UploadResults instance.
+        """
         assert self.parent
         assert self.running
 
@@ -1275,12 +1248,31 @@ class Uploader(service.MultiService):
 
             if size <= self.URI_LIT_SIZE_THRESHOLD:
                 uploader = LiteralUploader(self.parent)
-            elif self._helper:
-                uploader = AssistedUploader(self._helper)
+                return uploader.start(uploadable)
             else:
-                uploader = self.uploader_class(self.parent)
-            self._add_upload(uploader)
-            return uploader.start(uploadable)
+                eu = EncryptAnUploadable(uploadable, self._parentmsgid)
+                d2 = defer.succeed(None)
+                if self._helper:
+                    uploader = AssistedUploader(self._helper)
+                    d2.addCallback(lambda x: eu.get_storage_index())
+                    d2.addCallback(lambda si: uploader.start(eu, si))
+                else:
+                    uploader = CHKUploader(self.parent)
+                    d2.addCallback(lambda x: uploader.start(eu))
+
+                self._add_upload(uploader)
+                def turn_verifycap_into_read_cap(uploadresults):
+                    # Generate the uri from the verifycap plus the key.
+                    d3 = uploadable.get_encryption_key()
+                    def put_readcap_into_results(key):
+                        v = uri.from_string(uploadresults.verifycapstr)
+                        r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
+                        uploadresults.uri = r.to_string()
+                        return uploadresults
+                    d3.addCallback(put_readcap_into_results)
+                    return d3
+                d2.addCallback(turn_verifycap_into_read_cap)
+                return d2
         d.addCallback(_got_size)
         def _done(res):
             uploadable.close()