From c85f75bb08dd93b1dddfb0bb92d7dd597426be87 Mon Sep 17 00:00:00 2001
From: Zooko O'Whielacronx <zooko@zooko.com>
Date: Tue, 6 Jan 2009 21:48:22 -0700
Subject: [PATCH] immutable: refactor uploader to do just
 encoding-and-uploading, not encryption This makes Uploader take an
 EncryptedUploadable object instead of an Uploadable object.  I also changed
 it to return a verify cap instead of a tuple of the bits of data that one
 finds in a verify cap. This will facilitate hooking together an Uploader and
 a Downloader to make a Repairer. Also move offloaded.py into
 src/allmydata/immutable/.

---
 src/allmydata/client.py                    |   2 +-
 src/allmydata/immutable/encode.py          |   6 +-
 src/allmydata/{ => immutable}/offloaded.py |  13 ++-
 src/allmydata/immutable/upload.py          | 122 ++++++++++-----------
 src/allmydata/interfaces.py                |   7 +-
 src/allmydata/test/test_encode.py          |  16 +--
 src/allmydata/test/test_helper.py          |  11 +-
 src/allmydata/test/test_system.py          |   4 +-
 8 files changed, 90 insertions(+), 91 deletions(-)
 rename src/allmydata/{ => immutable}/offloaded.py (98%)

diff --git a/src/allmydata/client.py b/src/allmydata/client.py
index a9b9a52f..d1465d6d 100644
--- a/src/allmydata/client.py
+++ b/src/allmydata/client.py
@@ -15,7 +15,7 @@ from allmydata.storage import StorageServer
 from allmydata.immutable.upload import Uploader
 from allmydata.immutable.download import Downloader
 from allmydata.immutable.filenode import FileNode, LiteralFileNode
-from allmydata.offloaded import Helper
+from allmydata.immutable.offloaded import Helper
 from allmydata.control import ControlServer
 from allmydata.introducer.client import IntroducerClient
 from allmydata.util import hashutil, base32, pollmixin, cachedir
diff --git a/src/allmydata/immutable/encode.py b/src/allmydata/immutable/encode.py
index 7bc4ce0f..3ef9f3ae 100644
--- a/src/allmydata/immutable/encode.py
+++ b/src/allmydata/immutable/encode.py
@@ -197,6 +197,8 @@ class Encoder(object):
         self.landlords = landlords.copy()
 
     def start(self):
+        """ Returns a Deferred that will fire with the verify cap (an instance of
+        uri.CHKFileVerifierURI)."""
         self.log("%s starting" % (self,))
         #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
         assert self._codec
@@ -637,8 +639,8 @@ class Encoder(object):
 
         # update our sharemap
         self._shares_placed = set(self.landlords.keys())
-        return (self.uri_extension_hash, self.required_shares,
-                self.num_shares, self.file_size)
+        return uri.CHKFileVerifierURI(self._storage_index, self.uri_extension_hash,
+                                      self.required_shares, self.num_shares, self.file_size)
 
     def err(self, f):
         self.log("upload failed", failure=f, level=log.UNUSUAL)
diff --git a/src/allmydata/offloaded.py b/src/allmydata/immutable/offloaded.py
similarity index 98%
rename from src/allmydata/offloaded.py
rename to src/allmydata/immutable/offloaded.py
index 10097e3a..7ba3943e 100644
--- a/src/allmydata/offloaded.py
+++ b/src/allmydata/immutable/offloaded.py
@@ -9,6 +9,7 @@ import allmydata
 from allmydata import interfaces, storage, uri
 from allmydata.immutable import upload
 from allmydata.immutable.layout import ReadBucketProxy
+from allmydata.util.assertutil import precondition
 from allmydata.util import idlib, log, observer, fileutil, hashutil
 
 
@@ -205,10 +206,12 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         # and inform the client when the upload has finished
         return self._finished_observers.when_fired()
 
-    def _finished(self, res):
-        (uri_extension_hash, needed_shares, total_shares, size) = res
-        r = self._results
-        r.uri_extension_hash = uri_extension_hash
+    def _finished(self, uploadresults):
+        precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
+        assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
+        r = uploadresults
+        v = uri.from_string(r.verifycapstr)
+        r.uri_extension_hash = v.uri_extension_hash
         f_times = self._fetcher.get_times()
         r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
         r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
@@ -216,7 +219,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         self._reader.close()
         os.unlink(self._encoding_file)
         self._finished_observers.fire(r)
-        self._helper.upload_finished(self._storage_index, size)
+        self._helper.upload_finished(self._storage_index, v.size)
         del self._reader
 
     def _failed(self, f):
diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py
index fd4a601d..1790be93 100644
--- a/src/allmydata/immutable/upload.py
+++ b/src/allmydata/immutable/upload.py
@@ -13,7 +13,7 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \
      storage_index_hash, plaintext_segment_hasher, convergence_hasher
 from allmydata import storage, hashtree, uri
 from allmydata.immutable import encode
-from allmydata.util import base32, idlib, mathutil
+from allmydata.util import base32, idlib, log, mathutil
 from allmydata.util.assertutil import precondition
 from allmydata.util.rrefutil import get_versioned_remote_reference
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
@@ -662,27 +662,21 @@ class CHKUploader:
             kwargs["facility"] = "tahoe.upload"
         return self._client.log(*args, **kwargs)
 
-    def start(self, uploadable):
+    def start(self, encrypted_uploadable):
         """Start uploading the file.
 
-        This method returns a Deferred that will fire with the URI (a
-        string)."""
+        Returns a Deferred that will fire with the UploadResults instance.
+        """
 
         self._started = time.time()
-        uploadable = IUploadable(uploadable)
-        self.log("starting upload of %s" % uploadable)
+        eu = IEncryptedUploadable(encrypted_uploadable)
+        self.log("starting upload of %s" % eu)
 
-        eu = EncryptAnUploadable(uploadable, self._log_number)
         eu.set_upload_status(self._upload_status)
         d = self.start_encrypted(eu)
-        def _uploaded(res):
-            d1 = uploadable.get_encryption_key()
-            d1.addCallback(lambda key: self._compute_uri(res, key))
-            return d1
-        d.addCallback(_uploaded)
-        def _done(res):
+        def _done(uploadresults):
             self._upload_status.set_active(False)
-            return res
+            return uploadresults
         d.addBoth(_done)
         return d
 
@@ -696,6 +690,7 @@ class CHKUploader:
         return self._encoder.abort()
 
     def start_encrypted(self, encrypted):
+        """ Returns a Deferred that will fire with the UploadResults instance. """
         eu = IEncryptedUploadable(encrypted)
 
         started = time.time()
@@ -706,7 +701,6 @@ class CHKUploader:
         d.addCallback(self.set_shareholders, e)
         d.addCallback(lambda res: e.start())
         d.addCallback(self._encrypted_done)
-        # this fires with the uri_extension_hash and other data
         return d
 
     def locate_all_shareholders(self, encoder, started):
@@ -761,7 +755,8 @@ class CHKUploader:
         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
         encoder.set_shareholders(buckets)
 
-    def _encrypted_done(self, res):
+    def _encrypted_done(self, verifycap):
+        """ Returns a Deferred that will fire with the UploadResults instance. """
         r = self._results
         for shnum in self._encoder.get_shares_placed():
             peer_tracker = self._sharemap[shnum]
@@ -779,19 +774,7 @@ class CHKUploader:
         r.timings["peer_selection"] = self._peer_selection_elapsed
         r.timings.update(self._encoder.get_times())
         r.uri_extension_data = self._encoder.get_uri_extension_data()
-        return res
-
-    def _compute_uri(self, (uri_extension_hash,
-                            needed_shares, total_shares, size),
-                     key):
-        u = uri.CHKFileURI(key=key,
-                           uri_extension_hash=uri_extension_hash,
-                           needed_shares=needed_shares,
-                           total_shares=total_shares,
-                           size=size,
-                           )
-        r = self._results
-        r.uri = u.to_string()
+        r.verifycapstr = verifycap.to_string()
         return r
 
     def get_upload_status(self):
@@ -948,26 +931,23 @@ class AssistedUploader:
             kwargs["parent"] = self._log_number
         return log.msg(*args, **kwargs)
 
-    def start(self, uploadable):
+    def start(self, encrypted_uploadable, storage_index):
+        """Start uploading the file.
+
+        Returns a Deferred that will fire with the UploadResults instance.
+        """
+        precondition(isinstance(storage_index, str), storage_index)
         self._started = time.time()
-        u = IUploadable(uploadable)
-        eu = EncryptAnUploadable(u, self._log_number)
+        eu = IEncryptedUploadable(encrypted_uploadable)
         eu.set_upload_status(self._upload_status)
         self._encuploadable = eu
+        self._storage_index = storage_index
         d = eu.get_size()
         d.addCallback(self._got_size)
         d.addCallback(lambda res: eu.get_all_encoding_parameters())
         d.addCallback(self._got_all_encoding_parameters)
-        # when we get the encryption key, that will also compute the storage
-        # index, so this only takes one pass.
-        # TODO: I'm not sure it's cool to switch back and forth between
-        # the Uploadable and the IEncryptedUploadable that wraps it.
-        d.addCallback(lambda res: u.get_encryption_key())
-        d.addCallback(self._got_encryption_key)
-        d.addCallback(lambda res: eu.get_storage_index())
-        d.addCallback(self._got_storage_index)
         d.addCallback(self._contact_helper)
-        d.addCallback(self._build_readcap)
+        d.addCallback(self._build_verifycap)
         def _done(res):
             self._upload_status.set_active(False)
             return res
@@ -985,13 +965,6 @@ class AssistedUploader:
         self._total_shares = n
         self._segment_size = segment_size
 
-    def _got_encryption_key(self, key):
-        self._key = key
-
-    def _got_storage_index(self, storage_index):
-        self._storage_index = storage_index
-
-
     def _contact_helper(self, res):
         now = self._time_contacting_helper_start = time.time()
         self._storage_index_elapsed = now - self._started
@@ -1023,7 +996,7 @@ class AssistedUploader:
         self._upload_status.set_results(upload_results)
         return upload_results
 
-    def _build_readcap(self, upload_results):
+    def _build_verifycap(self, upload_results):
         self.log("upload finished, building readcap")
         self._upload_status.set_status("Building Readcap")
         r = upload_results
@@ -1031,13 +1004,11 @@ class AssistedUploader:
         assert r.uri_extension_data["total_shares"] == self._total_shares
         assert r.uri_extension_data["segment_size"] == self._segment_size
         assert r.uri_extension_data["size"] == self._size
-        u = uri.CHKFileURI(key=self._key,
-                           uri_extension_hash=r.uri_extension_hash,
-                           needed_shares=self._needed_shares,
-                           total_shares=self._total_shares,
-                           size=self._size,
-                           )
-        r.uri = u.to_string()
+        r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
+                                             uri_extension_hash=r.uri_extension_hash,
+                                             needed_shares=self._needed_shares,
+                                             total_shares=self._total_shares, size=self._size
+                                             ).to_string()
         now = time.time()
         r.file_size = self._size
         r.timings["storage_index"] = self._storage_index_elapsed
@@ -1207,13 +1178,12 @@ class Data(FileHandle):
         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
         FileHandle.__init__(self, StringIO(data), convergence=convergence)
 
-class Uploader(service.MultiService):
+class Uploader(service.MultiService, log.PrefixingLogMixin):
     """I am a service that allows file uploading. I am a service-child of the
     Client.
     """
     implements(IUploader)
     name = "uploader"
-    uploader_class = CHKUploader
     URI_LIT_SIZE_THRESHOLD = 55
     MAX_UPLOAD_STATUSES = 10
 
@@ -1224,6 +1194,7 @@ class Uploader(service.MultiService):
         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
         self._all_upload_statuses = weakref.WeakKeyDictionary()
         self._recent_upload_statuses = []
+        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
         service.MultiService.__init__(self)
 
     def startService(self):
@@ -1233,7 +1204,7 @@ class Uploader(service.MultiService):
                                       self._got_helper)
 
     def _got_helper(self, helper):
-        log.msg("got helper connection, getting versions")
+        self.log("got helper connection, getting versions")
         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
                     { },
                     "application-version": "unknown: no get_version()",
@@ -1257,7 +1228,9 @@ class Uploader(service.MultiService):
 
 
     def upload(self, uploadable):
-        # this returns the URI
+        """
+        Returns a Deferred that will fire with the UploadResults instance.
+        """
         assert self.parent
         assert self.running
 
@@ -1275,12 +1248,31 @@ class Uploader(service.MultiService):
 
             if size <= self.URI_LIT_SIZE_THRESHOLD:
                 uploader = LiteralUploader(self.parent)
-            elif self._helper:
-                uploader = AssistedUploader(self._helper)
+                return uploader.start(uploadable)
             else:
-                uploader = self.uploader_class(self.parent)
-            self._add_upload(uploader)
-            return uploader.start(uploadable)
+                eu = EncryptAnUploadable(uploadable, self._parentmsgid)
+                d2 = defer.succeed(None)
+                if self._helper:
+                    uploader = AssistedUploader(self._helper)
+                    d2.addCallback(lambda x: eu.get_storage_index())
+                    d2.addCallback(lambda si: uploader.start(eu, si))
+                else:
+                    uploader = CHKUploader(self.parent)
+                    d2.addCallback(lambda x: uploader.start(eu))
+
+                self._add_upload(uploader)
+                def turn_verifycap_into_read_cap(uploadresults):
+                    # Generate the uri from the verifycap plus the key.
+                    d3 = uploadable.get_encryption_key()
+                    def put_readcap_into_results(key):
+                        v = uri.from_string(uploadresults.verifycapstr)
+                        r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
+                        uploadresults.uri = r.to_string()
+                        return uploadresults
+                    d3.addCallback(put_readcap_into_results)
+                    return d3
+                d2.addCallback(turn_verifycap_into_read_cap)
+                return d2
         d.addCallback(_got_size)
         def _done(res):
             uploadable.close()
diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py
index 9828db9a..178a9e94 100644
--- a/src/allmydata/interfaces.py
+++ b/src/allmydata/interfaces.py
@@ -1212,10 +1212,9 @@ class IEncoder(Interface):
         set_encrypted_uploadable() and set_shareholders() must be called
         before this can be invoked.
 
-        This returns a Deferred that fires with a tuple of
-        (uri_extension_hash, needed_shares, total_shares, size) when the
-        upload process is complete. This information, plus the encryption
-        key, is sufficient to construct the URI.
+        This returns a Deferred that fires with a verify cap when the upload process is
+        complete. The verifycap, plus the encryption key, is sufficient to construct the read
+        cap.
         """
 
 class IDecoder(Interface):
diff --git a/src/allmydata/test/test_encode.py b/src/allmydata/test/test_encode.py
index 39e0adae..1bcec9aa 100644
--- a/src/allmydata/test/test_encode.py
+++ b/src/allmydata/test/test_encode.py
@@ -304,9 +304,9 @@ class Encode(unittest.TestCase):
         d.addCallback(_ready)
 
         def _check(res):
-            (uri_extension_hash, required_shares, num_shares, file_size) = res
-            self.failUnless(isinstance(uri_extension_hash, str))
-            self.failUnlessEqual(len(uri_extension_hash), 32)
+            verifycap = res
+            self.failUnless(isinstance(verifycap.uri_extension_hash, str))
+            self.failUnlessEqual(len(verifycap.uri_extension_hash), 32)
             for i,peer in enumerate(all_shareholders):
                 self.failUnless(peer.closed)
                 self.failUnlessEqual(len(peer.blocks), NUM_SEGMENTS)
@@ -475,7 +475,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
 
     def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
                 recover_mode, target=None):
-        (uri_extension_hash, required_shares, num_shares, file_size) = res
+        verifycap = res
 
         if "corrupt_key" in recover_mode:
             # we corrupt the key, so that the decrypted data is corrupted and
@@ -485,10 +485,10 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
             key = flip_bit(key)
 
         u = uri.CHKFileURI(key=key,
-                           uri_extension_hash=uri_extension_hash,
-                           needed_shares=required_shares,
-                           total_shares=num_shares,
-                           size=file_size)
+                           uri_extension_hash=verifycap.uri_extension_hash,
+                           needed_shares=verifycap.needed_shares,
+                           total_shares=verifycap.total_shares,
+                           size=verifycap.size)
 
         client = FakeClient()
         if not target:
diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py
index fe815101..163b51ec 100644
--- a/src/allmydata/test/test_helper.py
+++ b/src/allmydata/test/test_helper.py
@@ -5,8 +5,9 @@ from twisted.application import service
 from foolscap import Tub, eventual
 from foolscap.logging import log
 
-from allmydata import offloaded, storage
-from allmydata.immutable import upload
+from allmydata import storage
+from allmydata.immutable import offloaded, upload
+from allmydata import uri
 from allmydata.util import hashutil, fileutil, mathutil
 from pycryptopp.cipher.aes import AES
 
@@ -27,8 +28,10 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
                             "size": size,
                             }
                 self._results.uri_extension_data = ueb_data
-                return (hashutil.uri_extension_hash(""),
-                        needed_shares, total_shares, size)
+                self._results.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, "x"*32,
+                                                                 needed_shares, total_shares,
+                                                                 size).to_string()
+                return self._results
             d2.addCallback(_got_parms)
             return d2
         d.addCallback(_got_size)
diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py
index 32968192..cdd5708d 100644
--- a/src/allmydata/test/test_system.py
+++ b/src/allmydata/test/test_system.py
@@ -8,8 +8,8 @@ from twisted.internet import threads # CLI tests use deferToThread
 from twisted.internet.error import ConnectionDone, ConnectionLost
 from twisted.internet.interfaces import IConsumer, IPushProducer
 import allmydata
-from allmydata import uri, storage, offloaded
-from allmydata.immutable import download, upload, filenode
+from allmydata import uri, storage
+from allmydata.immutable import download, filenode, offloaded, upload
 from allmydata.util import idlib, mathutil
 from allmydata.util import log, base32
 from allmydata.scripts import runner
-- 
2.45.2