from allmydata.util.assertutil import _assert, precondition
from allmydata.codec import CRSEncoder
from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
- IEncryptedUploadable
+ IEncryptedUploadable, IUploadStatus
"""
The goal of the encoder is to turn the original file into a series of
class Encoder(object):
implements(IEncoder)
- def __init__(self, log_parent=None):
+ def __init__(self, log_parent=None, upload_status=None):
object.__init__(self)
self.uri_extension_data = {}
self._codec = None
+ self._status = None
+ if upload_status:
+ self._status = IUploadStatus(upload_status)
precondition(log_parent is None or isinstance(log_parent, int),
log_parent)
self._log_number = log.msg("creating Encoder %s" % self,
d.addCallbacks(lambda res: self.done(), self.err)
return d
+ def set_status(self, status):
+ if self._status:
+ self._status.set_status(status)
+
+ def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
+ if self._status:
+ # we treat the final hash+close as an extra segment
+ if sent_segments is None:
+ sent_segments = self.num_segments
+ progress = float(sent_segments + extra) / (self.num_segments + 1)
+ self._status.set_progress(2, progress)
+
def abort(self):
self.log("aborting upload", level=log.UNUSUAL)
assert self._codec, "don't call abort before start"
def start_all_shareholders(self):
self.log("starting shareholders", level=log.NOISY)
+ self.set_status("Starting shareholders")
dl = []
for shareid in self.landlords:
d = self.landlords[shareid].start()
shareids=shareids, landlords=self.landlords)
start = time.time()
dl = []
+ self.set_status("Sending segment %d of %d" % (segnum+1,
+ self.num_segments))
+ self.set_encode_and_push_progress(segnum)
lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
for i in range(len(shares)):
subshare = shares[i]
def finish_hashing(self):
self._start_hashing_and_close_timestamp = time.time()
+ self.set_status("Finishing hashes")
+ self.set_encode_and_push_progress(extra=0.0)
crypttext_hash = self._crypttext_hasher.digest()
self.uri_extension_data["crypttext_hash"] = crypttext_hash
d = self._uploadable.get_plaintext_hash()
def send_plaintext_hash_tree_to_all_shareholders(self):
self.log("sending plaintext hash tree", level=log.NOISY)
+ self.set_status("Sending Plaintext Hash Tree")
+ self.set_encode_and_push_progress(extra=0.2)
dl = []
for shareid in self.landlords.keys():
d = self.send_plaintext_hash_tree(shareid,
def send_crypttext_hash_tree_to_all_shareholders(self):
self.log("sending crypttext hash tree", level=log.NOISY)
+ self.set_status("Sending Crypttext Hash Tree")
+ self.set_encode_and_push_progress(extra=0.3)
t = HashTree(self._crypttext_hashes)
all_hashes = list(t)
self.uri_extension_data["crypttext_root_hash"] = t[0]
def send_all_subshare_hash_trees(self):
self.log("sending subshare hash trees", level=log.NOISY)
+ self.set_status("Sending Subshare Hash Trees")
+ self.set_encode_and_push_progress(extra=0.4)
dl = []
for shareid,hashes in enumerate(self.subshare_hashes):
# hashes is a list of the hashes of all subshares that were sent
# not include the top-level hash root (which is stored securely in
# the URI instead).
self.log("sending all share hash trees", level=log.NOISY)
+ self.set_status("Sending Share Hash Trees")
+ self.set_encode_and_push_progress(extra=0.6)
dl = []
for h in self.share_root_hashes:
assert h
def send_uri_extension_to_all_shareholders(self):
lp = self.log("sending uri_extension", level=log.NOISY)
+ self.set_status("Sending URI Extensions")
+ self.set_encode_and_push_progress(extra=0.8)
for k in ('crypttext_root_hash', 'crypttext_hash',
'plaintext_root_hash', 'plaintext_hash',
):
def close_all_shareholders(self):
self.log("closing shareholders", level=log.NOISY)
+ self.set_status("Closing Shareholders")
+ self.set_encode_and_push_progress(extra=0.9)
dl = []
for shareid in self.landlords:
d = self.landlords[shareid].close()
def done(self):
self.log("upload done", level=log.OPERATIONAL)
+ self.set_status("Done")
+ self.set_encode_and_push_progress(extra=1.0) # done
now = time.time()
h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
self._times["hashes_and_close"] = h_and_c_elapsed
def err(self, f):
self.log("upload failed", failure=f, level=log.UNUSUAL)
+ self.set_status("Failed")
# we need to abort any remaining shareholders, so they'll delete the
# partial share, allowing someone else to upload it again.
self.log("aborting shareholders", level=log.UNUSUAL)
-import os, time
+import os, time, weakref
from zope.interface import implements
from twisted.python import failure
from twisted.internet import defer
from allmydata.util import idlib, mathutil
from allmydata.util.assertutil import precondition
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
- IEncryptedUploadable, RIEncryptedUploadable
+ IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus
from pycryptopp.cipher.aes import AES
from cStringIO import StringIO
class Tahoe2PeerSelector:
- def __init__(self, upload_id, logparent=None):
+ def __init__(self, upload_id, logparent=None, upload_status=None):
self.upload_id = upload_id
self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
self.error_count = 0
self.num_peers_contacted = 0
self.last_failure_msg = None
+ self._status = IUploadStatus(upload_status)
self._log_parent = log.msg("%s starting" % self, parent=logparent)
def __repr__(self):
shares for us
"""
+ if self._status:
+ self._status.set_status("Contacting Peers..")
+
self.total_shares = total_shares
self.shares_of_happiness = shares_of_happiness
shares_to_ask = set([self.homeless_shares.pop(0)])
self.query_count += 1
self.num_peers_contacted += 1
+ if self._status:
+ self._status.set_status("Contacting Peers [%s] (first query),"
+ " %d shares left.."
+ % (idlib.shortnodeid_b2a(peer.peerid),
+ len(self.homeless_shares)))
d = peer.query(shares_to_ask)
d.addBoth(self._got_response, peer, shares_to_ask,
self.contacted_peers)
shares_to_ask = set(self.homeless_shares[:num_shares])
self.homeless_shares[:num_shares] = []
self.query_count += 1
+ if self._status:
+ self._status.set_status("Contacting Peers [%s] (second query),"
+ " %d shares left.."
+ % (idlib.shortnodeid_b2a(peer.peerid),
+ len(self.homeless_shares)))
d = peer.query(shares_to_ask)
d.addBoth(self._got_response, peer, shares_to_ask,
self.contacted_peers2)
raise encode.NotEnoughPeersError(msg)
else:
# we placed enough to be happy, so we're done
+ if self._status:
+ self._status.set_status("Placed all shares")
return self.use_peers
def _got_response(self, res, peer, shares_to_ask, put_peer_here):
self._plaintext_segment_hashes = []
self._encoding_parameters = None
self._file_size = None
+ self._ciphertext_bytes_read = 0
+ self._status = None
+
+ def set_upload_status(self, upload_status):
+ self._status = IUploadStatus(upload_status)
+ self.original.set_upload_status(upload_status)
def log(self, *args, **kwargs):
if "facility" not in kwargs:
d = self.original.get_size()
def _got_size(size):
self._file_size = size
+ if self._status:
+ self._status.set_size(size)
return size
d.addCallback(_got_size)
return d
# specify that it is truncated to the same 128 bits as the AES key.
assert len(storage_index) == 16 # SHA-256 truncated to 128b
self._storage_index = storage_index
-
+ if self._status:
+ self._status.set_storage_index(storage_index)
return e
d.addCallback(_got)
return d
def read_encrypted(self, length, hash_only):
# make sure our parameters have been set up first
d = self.get_all_encoding_parameters()
+ # and size
+ d.addCallback(lambda ignored: self.get_size())
d.addCallback(lambda ignored: self._get_encryptor())
# then fetch and encrypt the plaintext. The unusual structure here
# (passing a Deferred *into* a function) is needed to avoid
cryptdata = []
# we use data.pop(0) instead of 'for chunk in data' to save
# memory: each chunk is destroyed as soon as we're done with it.
+ bytes_processed = 0
while data:
chunk = data.pop(0)
log.msg(" read_encrypted handling %dB-sized chunk" % len(chunk),
level=log.NOISY)
+ bytes_processed += len(chunk)
self._plaintext_hasher.update(chunk)
self._update_segment_hash(chunk)
# TODO: we have to encrypt the data (even if hash_only==True)
cryptdata.append(ciphertext)
del ciphertext
del chunk
+ self._ciphertext_bytes_read += bytes_processed
+ if self._status:
+ progress = float(self._ciphertext_bytes_read) / self._file_size
+ self._status.set_progress(1, progress)
return cryptdata
def close(self):
return self.original.close()
+class UploadStatus:
+ implements(IUploadStatus)
+
+ def __init__(self):
+ self.storage_index = None
+ self.size = None
+ self.helper = False
+ self.status = "Not started"
+ self.progress = [0.0, 0.0, 0.0]
+
+ def get_storage_index(self):
+ return self.storage_index
+ def get_size(self):
+ return self.size
+ def using_helper(self):
+ return self.helper
+ def get_status(self):
+ return self.status
+ def get_progress(self):
+ return tuple(self.progress)
+
+ def set_storage_index(self, si):
+ self.storage_index = si
+ def set_size(self, size):
+ self.size = size
+ def set_helper(self, helper):
+ self.helper = helper
+ def set_status(self, status):
+ self.status = status
+ def set_progress(self, which, value):
+ # [0]: chk, [1]: ciphertext, [2]: encode+push
+ self.progress[which] = value
class CHKUploader:
peer_selector_class = Tahoe2PeerSelector
self._log_number = self._client.log("CHKUploader starting")
self._encoder = None
self._results = UploadResults()
+ self._storage_index = None
+ self._upload_status = UploadStatus()
+ self._upload_status.set_helper(False)
def log(self, *args, **kwargs):
if "parent" not in kwargs:
self.log("starting upload of %s" % uploadable)
eu = EncryptAnUploadable(uploadable)
+ eu.set_upload_status(self._upload_status)
d = self.start_encrypted(eu)
def _uploaded(res):
d1 = uploadable.get_encryption_key()
eu = IEncryptedUploadable(encrypted)
started = time.time()
- self._encoder = e = encode.Encoder(self._log_number)
+ self._encoder = e = encode.Encoder(self._log_number,
+ self._upload_status)
d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders, started)
d.addCallback(self.set_shareholders, e)
peer_selection_started = now = time.time()
self._storage_index_elapsed = now - started
storage_index = encoder.get_param("storage_index")
+ self._storage_index = storage_index
upload_id = idlib.b2a(storage_index)[:6]
self.log("using storage index %s" % upload_id)
- peer_selector = self.peer_selector_class(upload_id, self._log_number)
+ peer_selector = self.peer_selector_class(upload_id, self._log_number,
+ self._upload_status)
share_size = encoder.get_param("share_size")
block_size = encoder.get_param("block_size")
r.uri = u.to_string()
return r
+ def get_upload_status(self):
+ return self._upload_status
def read_this_many_bytes(uploadable, size, prepend_data=[]):
if size == 0:
def __init__(self, client):
self._client = client
self._results = UploadResults()
+ self._status = s = UploadStatus()
+ s.set_storage_index(None)
+ s.set_helper(False)
+ s.set_progress(0, 1.0)
def start(self, uploadable):
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
def _got_size(size):
+ self._size = size
+ self._status.set_size(size)
self._results.file_size = size
return read_this_many_bytes(uploadable, size)
d.addCallback(_got_size)
def _build_results(self, uri):
self._results.uri = uri
+ self._status.set_status("Done")
+ self._status.set_progress(1, 1.0)
+ self._status.set_progress(2, 1.0)
return self._results
def close(self):
pass
+ def get_upload_status(self):
+ return self._status
+
class RemoteEncryptedUploadable(Referenceable):
implements(RIEncryptedUploadable)
- def __init__(self, encrypted_uploadable):
+ def __init__(self, encrypted_uploadable, upload_status):
self._eu = IEncryptedUploadable(encrypted_uploadable)
self._offset = 0
self._bytes_sent = 0
+ self._status = IUploadStatus(upload_status)
+ # we are responsible for updating the status string while we run, and
+ # for setting the ciphertext-fetch progress.
+ self._size = None
+
+ def get_size(self):
+ if self._size is not None:
+ return defer.succeed(self._size)
+ d = self._eu.get_size()
+ def _got_size(size):
+ self._size = size
+ return size
+ d.addCallback(_got_size)
+ return d
def remote_get_size(self):
- return self._eu.get_size()
+ return self.get_size()
def remote_get_all_encoding_parameters(self):
return self._eu.get_all_encoding_parameters()
def __init__(self, helper):
self._helper = helper
self._log_number = log.msg("AssistedUploader starting")
+ self._storage_index = None
+ self._upload_status = s = UploadStatus()
+ s.set_helper(True)
def log(self, msg, parent=None, **kwargs):
if parent is None:
self._started = time.time()
u = IUploadable(uploadable)
eu = EncryptAnUploadable(u)
+ eu.set_upload_status(self._upload_status)
self._encuploadable = eu
d = eu.get_size()
d.addCallback(self._got_size)
def _got_size(self, size):
self._size = size
+ self._upload_status.set_size(size)
def _got_all_encoding_parameters(self, params):
k, happy, n, segment_size = params
now = self._time_contacting_helper_start = time.time()
self._storage_index_elapsed = now - self._started
self.log("contacting helper..")
+ self._upload_status.set_status("Contacting Helper")
d = self._helper.callRemote("upload_chk", self._storage_index)
d.addCallback(self._contacted_helper)
return d
self._elapsed_time_contacting_helper = elapsed
if upload_helper:
self.log("helper says we need to upload")
+ self._upload_status.set_status("Uploading Ciphertext")
# we need to upload the file
- reu = RemoteEncryptedUploadable(self._encuploadable)
- d = upload_helper.callRemote("upload", reu)
+ reu = RemoteEncryptedUploadable(self._encuploadable,
+ self._upload_status)
+ # let it pre-compute the size for progress purposes
+ d = reu.get_size()
+ d.addCallback(lambda ignored:
+ upload_helper.callRemote("upload", reu))
# this Deferred will fire with the upload results
return d
self.log("helper says file is already uploaded")
+ self._upload_status.set_progress(1, 1.0)
return upload_results
def _build_readcap(self, upload_results):
self.log("upload finished, building readcap")
+ self._upload_status.set_status("Building Readcap")
r = upload_results
assert r.uri_extension_data["needed_shares"] == self._needed_shares
assert r.uri_extension_data["total_shares"] == self._total_shares
if "total" in r.timings:
r.timings["helper_total"] = r.timings["total"]
r.timings["total"] = now - self._started
+ self._upload_status.set_status("Done")
return r
+ def get_upload_status(self):
+ return self._upload_status
+
class BaseUploadable:
default_max_segment_size = 1*MiB # overridden by max_segment_size
default_encoding_param_k = 3 # overridden by encoding_parameters
encoding_param_n = None
_all_encoding_parameters = None
+ _status = None
+
+ def set_upload_status(self, upload_status):
+ self._status = IUploadStatus(upload_status)
def set_default_encoding_parameters(self, default_params):
assert isinstance(default_params, dict)
self._filehandle = filehandle
self._key = None
self._contenthashkey = contenthashkey
+ self._size = None
def _get_encryption_key_content_hash(self):
if self._key is not None:
return defer.succeed(self._key)
- d = self.get_all_encoding_parameters()
+ d = self.get_size()
+ # that sets self._size as a side-effect
+ d.addCallback(lambda size: self.get_all_encoding_parameters())
def _got(params):
k, happy, n, segsize = params
f = self._filehandle
enckey_hasher = content_hash_key_hasher(k, n, segsize)
f.seek(0)
BLOCKSIZE = 64*1024
+ bytes_read = 0
while True:
data = f.read(BLOCKSIZE)
if not data:
break
enckey_hasher.update(data)
+ # TODO: setting progress in a non-yielding loop is kind of
+ # pointless, but I'm anticipating (perhaps prematurely) the
+ # day when we use a slowjob or twisted's CooperatorService to
+ # make this yield time to other jobs.
+ bytes_read += len(data)
+ if self._status:
+ self._status.set_progress(0, float(bytes_read)/self._size)
f.seek(0)
self._key = enckey_hasher.digest()
+ if self._status:
+ self._status.set_progress(0, 1.0)
assert len(self._key) == 16
return self._key
d.addCallback(_got)
return self._get_encryption_key_random()
def get_size(self):
+ if self._size is not None:
+ return defer.succeed(self._size)
self._filehandle.seek(0,2)
size = self._filehandle.tell()
+ self._size = size
self._filehandle.seek(0)
return defer.succeed(size)
def __init__(self, helper_furl=None):
self._helper_furl = helper_furl
self._helper = None
+ self._all_uploads = weakref.WeakKeyDictionary()
service.MultiService.__init__(self)
def startService(self):
uploader = AssistedUploader(self._helper)
else:
uploader = self.uploader_class(self.parent)
+ self._all_uploads[uploader.get_upload_status()] = None
return uploader.start(uploadable)
d.addCallback(_got_size)
def _done(res):