From c85f75bb08dd93b1dddfb0bb92d7dd597426be87 Mon Sep 17 00:00:00 2001 From: Zooko O'Whielacronx <zooko@zooko.com> Date: Tue, 6 Jan 2009 21:48:22 -0700 Subject: [PATCH] immutable: refactor uploader to do just encoding-and-uploading, not encryption This makes Uploader take an EncryptedUploadable object instead of an Uploadable object. I also changed it to return a verify cap instead of a tuple of the bits of data that one finds in a verify cap. This will facilitate hooking together an Uploader and a Downloader to make a Repairer. Also move offloaded.py into src/allmydata/immutable/. --- src/allmydata/client.py | 2 +- src/allmydata/immutable/encode.py | 6 +- src/allmydata/{ => immutable}/offloaded.py | 13 ++- src/allmydata/immutable/upload.py | 122 ++++++++++----------- src/allmydata/interfaces.py | 7 +- src/allmydata/test/test_encode.py | 16 +-- src/allmydata/test/test_helper.py | 11 +- src/allmydata/test/test_system.py | 4 +- 8 files changed, 90 insertions(+), 91 deletions(-) rename src/allmydata/{ => immutable}/offloaded.py (98%) diff --git a/src/allmydata/client.py b/src/allmydata/client.py index a9b9a52f..d1465d6d 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -15,7 +15,7 @@ from allmydata.storage import StorageServer from allmydata.immutable.upload import Uploader from allmydata.immutable.download import Downloader from allmydata.immutable.filenode import FileNode, LiteralFileNode -from allmydata.offloaded import Helper +from allmydata.immutable.offloaded import Helper from allmydata.control import ControlServer from allmydata.introducer.client import IntroducerClient from allmydata.util import hashutil, base32, pollmixin, cachedir diff --git a/src/allmydata/immutable/encode.py b/src/allmydata/immutable/encode.py index 7bc4ce0f..3ef9f3ae 100644 --- a/src/allmydata/immutable/encode.py +++ b/src/allmydata/immutable/encode.py @@ -197,6 +197,8 @@ class Encoder(object): self.landlords = landlords.copy() def start(self): + """ Returns a Deferred that will fire with the verify cap (an instance of + uri.CHKFileVerifierURI).""" self.log("%s starting" % (self,)) #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares) assert self._codec @@ -637,8 +639,8 @@ class Encoder(object): # update our sharemap self._shares_placed = set(self.landlords.keys()) - return (self.uri_extension_hash, self.required_shares, - self.num_shares, self.file_size) + return uri.CHKFileVerifierURI(self._storage_index, self.uri_extension_hash, + self.required_shares, self.num_shares, self.file_size) def err(self, f): self.log("upload failed", failure=f, level=log.UNUSUAL) diff --git a/src/allmydata/offloaded.py b/src/allmydata/immutable/offloaded.py similarity index 98% rename from src/allmydata/offloaded.py rename to src/allmydata/immutable/offloaded.py index 10097e3a..7ba3943e 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/immutable/offloaded.py @@ -9,6 +9,7 @@ import allmydata from allmydata import interfaces, storage, uri from allmydata.immutable import upload from allmydata.immutable.layout import ReadBucketProxy +from allmydata.util.assertutil import precondition from allmydata.util import idlib, log, observer, fileutil, hashutil @@ -205,10 +206,12 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): # and inform the client when the upload has finished return self._finished_observers.when_fired() - def _finished(self, res): - (uri_extension_hash, needed_shares, total_shares, size) = res - r = self._results - r.uri_extension_hash = uri_extension_hash + def _finished(self, uploadresults): + precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr) + assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults + r = uploadresults + v = uri.from_string(r.verifycapstr) + r.uri_extension_hash = v.uri_extension_hash f_times = self._fetcher.get_times() r.timings["cumulative_fetch"] = f_times["cumulative_fetch"] r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched() @@ -216,7 +219,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): self._reader.close() os.unlink(self._encoding_file) self._finished_observers.fire(r) - self._helper.upload_finished(self._storage_index, size) + self._helper.upload_finished(self._storage_index, v.size) del self._reader def _failed(self, f): diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index fd4a601d..1790be93 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -13,7 +13,7 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \ storage_index_hash, plaintext_segment_hasher, convergence_hasher from allmydata import storage, hashtree, uri from allmydata.immutable import encode -from allmydata.util import base32, idlib, mathutil +from allmydata.util import base32, idlib, log, mathutil from allmydata.util.assertutil import precondition from allmydata.util.rrefutil import get_versioned_remote_reference from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ @@ -662,27 +662,21 @@ class CHKUploader: kwargs["facility"] = "tahoe.upload" return self._client.log(*args, **kwargs) - def start(self, uploadable): + def start(self, encrypted_uploadable): """Start uploading the file. - This method returns a Deferred that will fire with the URI (a - string).""" + Returns a Deferred that will fire with the UploadResults instance. + """ self._started = time.time() - uploadable = IUploadable(uploadable) - self.log("starting upload of %s" % uploadable) + eu = IEncryptedUploadable(encrypted_uploadable) + self.log("starting upload of %s" % eu) - eu = EncryptAnUploadable(uploadable, self._log_number) eu.set_upload_status(self._upload_status) d = self.start_encrypted(eu) - def _uploaded(res): - d1 = uploadable.get_encryption_key() - d1.addCallback(lambda key: self._compute_uri(res, key)) - return d1 - d.addCallback(_uploaded) - def _done(res): + def _done(uploadresults): self._upload_status.set_active(False) - return res + return uploadresults d.addBoth(_done) return d @@ -696,6 +690,7 @@ class CHKUploader: return self._encoder.abort() def start_encrypted(self, encrypted): + """ Returns a Deferred that will fire with the UploadResults instance. """ eu = IEncryptedUploadable(encrypted) started = time.time() @@ -706,7 +701,6 @@ class CHKUploader: d.addCallback(self.set_shareholders, e) d.addCallback(lambda res: e.start()) d.addCallback(self._encrypted_done) - # this fires with the uri_extension_hash and other data return d def locate_all_shareholders(self, encoder, started): @@ -761,7 +755,8 @@ class CHKUploader: assert len(buckets) == sum([len(peer.buckets) for peer in used_peers]) encoder.set_shareholders(buckets) - def _encrypted_done(self, res): + def _encrypted_done(self, verifycap): + """ Returns a Deferred that will fire with the UploadResults instance. """ r = self._results for shnum in self._encoder.get_shares_placed(): peer_tracker = self._sharemap[shnum] @@ -779,19 +774,7 @@ class CHKUploader: r.timings["peer_selection"] = self._peer_selection_elapsed r.timings.update(self._encoder.get_times()) r.uri_extension_data = self._encoder.get_uri_extension_data() - return res - - def _compute_uri(self, (uri_extension_hash, - needed_shares, total_shares, size), - key): - u = uri.CHKFileURI(key=key, - uri_extension_hash=uri_extension_hash, - needed_shares=needed_shares, - total_shares=total_shares, - size=size, - ) - r = self._results - r.uri = u.to_string() + r.verifycapstr = verifycap.to_string() return r def get_upload_status(self): @@ -948,26 +931,23 @@ class AssistedUploader: kwargs["parent"] = self._log_number return log.msg(*args, **kwargs) - def start(self, uploadable): + def start(self, encrypted_uploadable, storage_index): + """Start uploading the file. + + Returns a Deferred that will fire with the UploadResults instance. + """ + precondition(isinstance(storage_index, str), storage_index) self._started = time.time() - u = IUploadable(uploadable) - eu = EncryptAnUploadable(u, self._log_number) + eu = IEncryptedUploadable(encrypted_uploadable) eu.set_upload_status(self._upload_status) self._encuploadable = eu + self._storage_index = storage_index d = eu.get_size() d.addCallback(self._got_size) d.addCallback(lambda res: eu.get_all_encoding_parameters()) d.addCallback(self._got_all_encoding_parameters) - # when we get the encryption key, that will also compute the storage - # index, so this only takes one pass. - # TODO: I'm not sure it's cool to switch back and forth between - # the Uploadable and the IEncryptedUploadable that wraps it. - d.addCallback(lambda res: u.get_encryption_key()) - d.addCallback(self._got_encryption_key) - d.addCallback(lambda res: eu.get_storage_index()) - d.addCallback(self._got_storage_index) d.addCallback(self._contact_helper) - d.addCallback(self._build_readcap) + d.addCallback(self._build_verifycap) def _done(res): self._upload_status.set_active(False) return res @@ -985,13 +965,6 @@ class AssistedUploader: self._total_shares = n self._segment_size = segment_size - def _got_encryption_key(self, key): - self._key = key - - def _got_storage_index(self, storage_index): - self._storage_index = storage_index - - def _contact_helper(self, res): now = self._time_contacting_helper_start = time.time() self._storage_index_elapsed = now - self._started @@ -1023,7 +996,7 @@ class AssistedUploader: self._upload_status.set_results(upload_results) return upload_results - def _build_readcap(self, upload_results): + def _build_verifycap(self, upload_results): self.log("upload finished, building readcap") self._upload_status.set_status("Building Readcap") r = upload_results @@ -1031,13 +1004,11 @@ class AssistedUploader: assert r.uri_extension_data["total_shares"] == self._total_shares assert r.uri_extension_data["segment_size"] == self._segment_size assert r.uri_extension_data["size"] == self._size - u = uri.CHKFileURI(key=self._key, - uri_extension_hash=r.uri_extension_hash, - needed_shares=self._needed_shares, - total_shares=self._total_shares, - size=self._size, - ) - r.uri = u.to_string() + r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, + uri_extension_hash=r.uri_extension_hash, + needed_shares=self._needed_shares, + total_shares=self._total_shares, size=self._size + ).to_string() now = time.time() r.file_size = self._size r.timings["storage_index"] = self._storage_index_elapsed @@ -1207,13 +1178,12 @@ class Data(FileHandle): assert convergence is None or isinstance(convergence, str), (convergence, type(convergence)) FileHandle.__init__(self, StringIO(data), convergence=convergence) -class Uploader(service.MultiService): +class Uploader(service.MultiService, log.PrefixingLogMixin): """I am a service that allows file uploading. I am a service-child of the Client. """ implements(IUploader) name = "uploader" - uploader_class = CHKUploader URI_LIT_SIZE_THRESHOLD = 55 MAX_UPLOAD_STATUSES = 10 @@ -1224,6 +1194,7 @@ class Uploader(service.MultiService): self._all_uploads = weakref.WeakKeyDictionary() # for debugging self._all_upload_statuses = weakref.WeakKeyDictionary() self._recent_upload_statuses = [] + log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload") service.MultiService.__init__(self) def startService(self): @@ -1233,7 +1204,7 @@ class Uploader(service.MultiService): self._got_helper) def _got_helper(self, helper): - log.msg("got helper connection, getting versions") + self.log("got helper connection, getting versions") default = { "http://allmydata.org/tahoe/protocols/helper/v1" : { }, "application-version": "unknown: no get_version()", @@ -1257,7 +1228,9 @@ class Uploader(service.MultiService): def upload(self, uploadable): - # this returns the URI + """ + Returns a Deferred that will fire with the UploadResults instance. + """ assert self.parent assert self.running @@ -1275,12 +1248,31 @@ class Uploader(service.MultiService): if size <= self.URI_LIT_SIZE_THRESHOLD: uploader = LiteralUploader(self.parent) - elif self._helper: - uploader = AssistedUploader(self._helper) + return uploader.start(uploadable) else: - uploader = self.uploader_class(self.parent) - self._add_upload(uploader) - return uploader.start(uploadable) + eu = EncryptAnUploadable(uploadable, self._parentmsgid) + d2 = defer.succeed(None) + if self._helper: + uploader = AssistedUploader(self._helper) + d2.addCallback(lambda x: eu.get_storage_index()) + d2.addCallback(lambda si: uploader.start(eu, si)) + else: + uploader = CHKUploader(self.parent) + d2.addCallback(lambda x: uploader.start(eu)) + + self._add_upload(uploader) + def turn_verifycap_into_read_cap(uploadresults): + # Generate the uri from the verifycap plus the key. + d3 = uploadable.get_encryption_key() + def put_readcap_into_results(key): + v = uri.from_string(uploadresults.verifycapstr) + r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size) + uploadresults.uri = r.to_string() + return uploadresults + d3.addCallback(put_readcap_into_results) + return d3 + d2.addCallback(turn_verifycap_into_read_cap) + return d2 d.addCallback(_got_size) def _done(res): uploadable.close() diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 9828db9a..178a9e94 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1212,10 +1212,9 @@ class IEncoder(Interface): set_encrypted_uploadable() and set_shareholders() must be called before this can be invoked. - This returns a Deferred that fires with a tuple of - (uri_extension_hash, needed_shares, total_shares, size) when the - upload process is complete. This information, plus the encryption - key, is sufficient to construct the URI. + This returns a Deferred that fires with a verify cap when the upload process is + complete. The verifycap, plus the encryption key, is sufficient to construct the read + cap. """ class IDecoder(Interface): diff --git a/src/allmydata/test/test_encode.py b/src/allmydata/test/test_encode.py index 39e0adae..1bcec9aa 100644 --- a/src/allmydata/test/test_encode.py +++ b/src/allmydata/test/test_encode.py @@ -304,9 +304,9 @@ class Encode(unittest.TestCase): d.addCallback(_ready) def _check(res): - (uri_extension_hash, required_shares, num_shares, file_size) = res - self.failUnless(isinstance(uri_extension_hash, str)) - self.failUnlessEqual(len(uri_extension_hash), 32) + verifycap = res + self.failUnless(isinstance(verifycap.uri_extension_hash, str)) + self.failUnlessEqual(len(verifycap.uri_extension_hash), 32) for i,peer in enumerate(all_shareholders): self.failUnless(peer.closed) self.failUnlessEqual(len(peer.blocks), NUM_SEGMENTS) @@ -475,7 +475,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin): def recover(self, (res, key, shareholders), AVAILABLE_SHARES, recover_mode, target=None): - (uri_extension_hash, required_shares, num_shares, file_size) = res + verifycap = res if "corrupt_key" in recover_mode: # we corrupt the key, so that the decrypted data is corrupted and @@ -485,10 +485,10 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin): key = flip_bit(key) u = uri.CHKFileURI(key=key, - uri_extension_hash=uri_extension_hash, - needed_shares=required_shares, - total_shares=num_shares, - size=file_size) + uri_extension_hash=verifycap.uri_extension_hash, + needed_shares=verifycap.needed_shares, + total_shares=verifycap.total_shares, + size=verifycap.size) client = FakeClient() if not target: diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index fe815101..163b51ec 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -5,8 +5,9 @@ from twisted.application import service from foolscap import Tub, eventual from foolscap.logging import log -from allmydata import offloaded, storage -from allmydata.immutable import upload +from allmydata import storage +from allmydata.immutable import offloaded, upload +from allmydata import uri from allmydata.util import hashutil, fileutil, mathutil from pycryptopp.cipher.aes import AES @@ -27,8 +28,10 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper): "size": size, } self._results.uri_extension_data = ueb_data - return (hashutil.uri_extension_hash(""), - needed_shares, total_shares, size) + self._results.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, "x"*32, + needed_shares, total_shares, + size).to_string() + return self._results d2.addCallback(_got_parms) return d2 d.addCallback(_got_size) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 32968192..cdd5708d 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -8,8 +8,8 @@ from twisted.internet import threads # CLI tests use deferToThread from twisted.internet.error import ConnectionDone, ConnectionLost from twisted.internet.interfaces import IConsumer, IPushProducer import allmydata -from allmydata import uri, storage, offloaded -from allmydata.immutable import download, upload, filenode +from allmydata import uri, storage +from allmydata.immutable import download, filenode, offloaded, upload from allmydata.util import idlib, mathutil from allmydata.util import log, base32 from allmydata.scripts import runner -- 2.45.2