storage_index_hash, plaintext_segment_hasher, convergence_hasher
from allmydata import storage, hashtree, uri
from allmydata.immutable import encode
-from allmydata.util import base32, idlib, mathutil
+from allmydata.util import base32, idlib, log, mathutil
from allmydata.util.assertutil import precondition
from allmydata.util.rrefutil import get_versioned_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
kwargs["facility"] = "tahoe.upload"
return self._client.log(*args, **kwargs)
- def start(self, uploadable):
+ def start(self, encrypted_uploadable):
"""Start uploading the file.
- This method returns a Deferred that will fire with the URI (a
- string)."""
+ Returns a Deferred that will fire with the UploadResults instance.
+ """
self._started = time.time()
- uploadable = IUploadable(uploadable)
- self.log("starting upload of %s" % uploadable)
+ eu = IEncryptedUploadable(encrypted_uploadable)
+ self.log("starting upload of %s" % eu)
- eu = EncryptAnUploadable(uploadable, self._log_number)
eu.set_upload_status(self._upload_status)
d = self.start_encrypted(eu)
- def _uploaded(res):
- d1 = uploadable.get_encryption_key()
- d1.addCallback(lambda key: self._compute_uri(res, key))
- return d1
- d.addCallback(_uploaded)
- def _done(res):
+ def _done(uploadresults):
self._upload_status.set_active(False)
- return res
+ return uploadresults
d.addBoth(_done)
return d
return self._encoder.abort()
def start_encrypted(self, encrypted):
+ """ Returns a Deferred that will fire with the UploadResults instance. """
eu = IEncryptedUploadable(encrypted)
started = time.time()
d.addCallback(self.set_shareholders, e)
d.addCallback(lambda res: e.start())
d.addCallback(self._encrypted_done)
- # this fires with the uri_extension_hash and other data
return d
def locate_all_shareholders(self, encoder, started):
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
encoder.set_shareholders(buckets)
- def _encrypted_done(self, res):
+ def _encrypted_done(self, verifycap):
+ """ Returns a Deferred that will fire with the UploadResults instance. """
r = self._results
for shnum in self._encoder.get_shares_placed():
peer_tracker = self._sharemap[shnum]
r.timings["peer_selection"] = self._peer_selection_elapsed
r.timings.update(self._encoder.get_times())
r.uri_extension_data = self._encoder.get_uri_extension_data()
- return res
-
- def _compute_uri(self, (uri_extension_hash,
- needed_shares, total_shares, size),
- key):
- u = uri.CHKFileURI(key=key,
- uri_extension_hash=uri_extension_hash,
- needed_shares=needed_shares,
- total_shares=total_shares,
- size=size,
- )
- r = self._results
- r.uri = u.to_string()
+ r.verifycapstr = verifycap.to_string()
return r
def get_upload_status(self):
kwargs["parent"] = self._log_number
return log.msg(*args, **kwargs)
- def start(self, uploadable):
+ def start(self, encrypted_uploadable, storage_index):
+ """Start uploading the file.
+
+ Returns a Deferred that will fire with the UploadResults instance.
+ """
+ precondition(isinstance(storage_index, str), storage_index)
self._started = time.time()
- u = IUploadable(uploadable)
- eu = EncryptAnUploadable(u, self._log_number)
+ eu = IEncryptedUploadable(encrypted_uploadable)
eu.set_upload_status(self._upload_status)
self._encuploadable = eu
+ self._storage_index = storage_index
d = eu.get_size()
d.addCallback(self._got_size)
d.addCallback(lambda res: eu.get_all_encoding_parameters())
d.addCallback(self._got_all_encoding_parameters)
- # when we get the encryption key, that will also compute the storage
- # index, so this only takes one pass.
- # TODO: I'm not sure it's cool to switch back and forth between
- # the Uploadable and the IEncryptedUploadable that wraps it.
- d.addCallback(lambda res: u.get_encryption_key())
- d.addCallback(self._got_encryption_key)
- d.addCallback(lambda res: eu.get_storage_index())
- d.addCallback(self._got_storage_index)
d.addCallback(self._contact_helper)
- d.addCallback(self._build_readcap)
+ d.addCallback(self._build_verifycap)
def _done(res):
self._upload_status.set_active(False)
return res
self._total_shares = n
self._segment_size = segment_size
- def _got_encryption_key(self, key):
- self._key = key
-
- def _got_storage_index(self, storage_index):
- self._storage_index = storage_index
-
-
def _contact_helper(self, res):
now = self._time_contacting_helper_start = time.time()
self._storage_index_elapsed = now - self._started
self._upload_status.set_results(upload_results)
return upload_results
- def _build_readcap(self, upload_results):
+ def _build_verifycap(self, upload_results):
self.log("upload finished, building readcap")
self._upload_status.set_status("Building Readcap")
r = upload_results
assert r.uri_extension_data["total_shares"] == self._total_shares
assert r.uri_extension_data["segment_size"] == self._segment_size
assert r.uri_extension_data["size"] == self._size
- u = uri.CHKFileURI(key=self._key,
- uri_extension_hash=r.uri_extension_hash,
- needed_shares=self._needed_shares,
- total_shares=self._total_shares,
- size=self._size,
- )
- r.uri = u.to_string()
+ r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
+ uri_extension_hash=r.uri_extension_hash,
+ needed_shares=self._needed_shares,
+ total_shares=self._total_shares, size=self._size
+ ).to_string()
now = time.time()
r.file_size = self._size
r.timings["storage_index"] = self._storage_index_elapsed
assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
FileHandle.__init__(self, StringIO(data), convergence=convergence)
-class Uploader(service.MultiService):
+class Uploader(service.MultiService, log.PrefixingLogMixin):
"""I am a service that allows file uploading. I am a service-child of the
Client.
"""
implements(IUploader)
name = "uploader"
- uploader_class = CHKUploader
URI_LIT_SIZE_THRESHOLD = 55
MAX_UPLOAD_STATUSES = 10
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
self._all_upload_statuses = weakref.WeakKeyDictionary()
self._recent_upload_statuses = []
+ log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
service.MultiService.__init__(self)
def startService(self):
self._got_helper)
def _got_helper(self, helper):
- log.msg("got helper connection, getting versions")
+ self.log("got helper connection, getting versions")
default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
{ },
"application-version": "unknown: no get_version()",
def upload(self, uploadable):
- # this returns the URI
+ """
+ Returns a Deferred that will fire with the UploadResults instance.
+ """
assert self.parent
assert self.running
if size <= self.URI_LIT_SIZE_THRESHOLD:
uploader = LiteralUploader(self.parent)
- elif self._helper:
- uploader = AssistedUploader(self._helper)
+ return uploader.start(uploadable)
else:
- uploader = self.uploader_class(self.parent)
- self._add_upload(uploader)
- return uploader.start(uploadable)
+ eu = EncryptAnUploadable(uploadable, self._parentmsgid)
+ d2 = defer.succeed(None)
+ if self._helper:
+ uploader = AssistedUploader(self._helper)
+ d2.addCallback(lambda x: eu.get_storage_index())
+ d2.addCallback(lambda si: uploader.start(eu, si))
+ else:
+ uploader = CHKUploader(self.parent)
+ d2.addCallback(lambda x: uploader.start(eu))
+
+ self._add_upload(uploader)
+ def turn_verifycap_into_read_cap(uploadresults):
+ # Generate the uri from the verifycap plus the key.
+ d3 = uploadable.get_encryption_key()
+ def put_readcap_into_results(key):
+ v = uri.from_string(uploadresults.verifycapstr)
+ r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
+ uploadresults.uri = r.to_string()
+ return uploadresults
+ d3.addCallback(put_readcap_into_results)
+ return d3
+ d2.addCallback(turn_verifycap_into_read_cap)
+ return d2
d.addCallback(_got_size)
def _done(res):
uploadable.close()