From: Brian Warner Date: Tue, 12 Feb 2008 22:36:05 +0000 (-0700) Subject: add upload-status objects, to track upload progress X-Git-Tag: allmydata-tahoe-0.8.0~91 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/reliability?a=commitdiff_plain;h=d0ce8694c180bd9ae6d9c35773c25380e3394c1b;p=tahoe-lafs%2Ftahoe-lafs.git add upload-status objects, to track upload progress --- diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index d662e173..993b12f1 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -10,7 +10,7 @@ from allmydata.util import mathutil, hashutil, idlib, log from allmydata.util.assertutil import _assert, precondition from allmydata.codec import CRSEncoder from allmydata.interfaces import IEncoder, IStorageBucketWriter, \ - IEncryptedUploadable + IEncryptedUploadable, IUploadStatus """ The goal of the encoder is to turn the original file into a series of @@ -74,10 +74,13 @@ PiB=1024*TiB class Encoder(object): implements(IEncoder) - def __init__(self, log_parent=None): + def __init__(self, log_parent=None, upload_status=None): object.__init__(self) self.uri_extension_data = {} self._codec = None + self._status = None + if upload_status: + self._status = IUploadStatus(upload_status) precondition(log_parent is None or isinstance(log_parent, int), log_parent) self._log_number = log.msg("creating Encoder %s" % self, @@ -247,6 +250,18 @@ class Encoder(object): d.addCallbacks(lambda res: self.done(), self.err) return d + def set_status(self, status): + if self._status: + self._status.set_status(status) + + def set_encode_and_push_progress(self, sent_segments=None, extra=0.0): + if self._status: + # we treat the final hash+close as an extra segment + if sent_segments is None: + sent_segments = self.num_segments + progress = float(sent_segments + extra) / (self.num_segments + 1) + self._status.set_progress(2, progress) + def abort(self): self.log("aborting upload", level=log.UNUSUAL) assert self._codec, "don't call abort before start" @@ -269,6 +284,7 @@ class Encoder(object): def start_all_shareholders(self): self.log("starting shareholders", level=log.NOISY) + self.set_status("Starting shareholders") dl = [] for shareid in self.landlords: d = self.landlords[shareid].start() @@ -409,6 +425,9 @@ class Encoder(object): shareids=shareids, landlords=self.landlords) start = time.time() dl = [] + self.set_status("Sending segment %d of %d" % (segnum+1, + self.num_segments)) + self.set_encode_and_push_progress(segnum) lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY) for i in range(len(shares)): subshare = shares[i] @@ -488,6 +507,8 @@ class Encoder(object): def finish_hashing(self): self._start_hashing_and_close_timestamp = time.time() + self.set_status("Finishing hashes") + self.set_encode_and_push_progress(extra=0.0) crypttext_hash = self._crypttext_hasher.digest() self.uri_extension_data["crypttext_hash"] = crypttext_hash d = self._uploadable.get_plaintext_hash() @@ -509,6 +530,8 @@ class Encoder(object): def send_plaintext_hash_tree_to_all_shareholders(self): self.log("sending plaintext hash tree", level=log.NOISY) + self.set_status("Sending Plaintext Hash Tree") + self.set_encode_and_push_progress(extra=0.2) dl = [] for shareid in self.landlords.keys(): d = self.send_plaintext_hash_tree(shareid, @@ -526,6 +549,8 @@ class Encoder(object): def send_crypttext_hash_tree_to_all_shareholders(self): self.log("sending crypttext hash tree", level=log.NOISY) + self.set_status("Sending Crypttext Hash Tree") + self.set_encode_and_push_progress(extra=0.3) t = HashTree(self._crypttext_hashes) all_hashes = list(t) self.uri_extension_data["crypttext_root_hash"] = t[0] @@ -544,6 +569,8 @@ class Encoder(object): def send_all_subshare_hash_trees(self): self.log("sending subshare hash trees", level=log.NOISY) + self.set_status("Sending Subshare Hash Trees") + self.set_encode_and_push_progress(extra=0.4) dl = [] for shareid,hashes in enumerate(self.subshare_hashes): # hashes is a list of the hashes of all subshares that were sent @@ -571,6 +598,8 @@ class Encoder(object): # not include the top-level hash root (which is stored securely in # the URI instead). self.log("sending all share hash trees", level=log.NOISY) + self.set_status("Sending Share Hash Trees") + self.set_encode_and_push_progress(extra=0.6) dl = [] for h in self.share_root_hashes: assert h @@ -597,6 +626,8 @@ class Encoder(object): def send_uri_extension_to_all_shareholders(self): lp = self.log("sending uri_extension", level=log.NOISY) + self.set_status("Sending URI Extensions") + self.set_encode_and_push_progress(extra=0.8) for k in ('crypttext_root_hash', 'crypttext_hash', 'plaintext_root_hash', 'plaintext_hash', ): @@ -623,6 +654,8 @@ class Encoder(object): def close_all_shareholders(self): self.log("closing shareholders", level=log.NOISY) + self.set_status("Closing Shareholders") + self.set_encode_and_push_progress(extra=0.9) dl = [] for shareid in self.landlords: d = self.landlords[shareid].close() @@ -632,6 +665,8 @@ class Encoder(object): def done(self): self.log("upload done", level=log.OPERATIONAL) + self.set_status("Done") + self.set_encode_and_push_progress(extra=1.0) # done now = time.time() h_and_c_elapsed = now - self._start_hashing_and_close_timestamp self._times["hashes_and_close"] = h_and_c_elapsed @@ -645,6 +680,7 @@ class Encoder(object): def err(self, f): self.log("upload failed", failure=f, level=log.UNUSUAL) + self.set_status("Failed") # we need to abort any remaining shareholders, so they'll delete the # partial share, allowing someone else to upload it again. self.log("aborting shareholders", level=log.UNUSUAL) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index bf61994d..64ae6469 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1104,6 +1104,13 @@ class IDownloader(Interface): when the download is finished, or errbacks if something went wrong.""" class IEncryptedUploadable(Interface): + def set_upload_status(upload_status): + """Provide an IUploadStatus object that should be filled with status + information. The IEncryptedUploadable is responsible for setting + key-determination progress ('chk'), size, storage_index, and + ciphertext-fetch progress. It may delegate some of this + responsibility to others, in particular to the IUploadable.""" + def get_size(): """This behaves just like IUploadable.get_size().""" @@ -1165,6 +1172,11 @@ class IEncryptedUploadable(Interface): """Just like IUploadable.close().""" class IUploadable(Interface): + def set_upload_status(upload_status): + """Provide an IUploadStatus object that should be filled with status + information. The IUploadable is responsible for setting + key-determination progress ('chk').""" + def set_default_encoding_parameters(params): """Set the default encoding parameters, which must be a dict mapping strings to ints. The meaningful keys are 'k', 'happy', 'n', and @@ -1362,6 +1374,32 @@ class IClient(Interface): IDirectoryNode-providing instances, like NewDirectoryNode. """ +class IUploadStatus(Interface): + def get_storage_index(): + """Return a string with the (binary) storage index in use on this + upload. Returns None if the storage index has not yet been + calculated.""" + def get_size(): + """Return an integer with the number of bytes that will eventually + be uploaded for this file. Returns None if the size is not yet known. + """ + def using_helper(): + """Return True if this upload is using a Helper, False if not.""" + def get_status(): + """Return a string describing the current state of the upload + process.""" + def get_progress(): + """Returns a tuple of floats, (chk, ciphertext, encode_and_push), + each from 0.0 to 1.0 . 'chk' describes how much progress has been + made towards hashing the file to determine a CHK encryption key: if + non-convergent encryption is in use, this will be trivial, otherwise + the whole file must be hashed. 'ciphertext' describes how much of the + ciphertext has been pushed to the helper, and is '1.0' for non-helper + uploads. 'encode_and_push' describes how much of the encode-and-push + process has finished: for helper uploads this is dependent upon the + helper providing progress reports. It might be reasonable to add all + three numbers and report the sum to the user.""" + class NotCapableError(Exception): """You have tried to write to a read-only node.""" diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index 319e05ae..acc24138 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -139,6 +139,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): upload_id = idlib.b2a(storage_index)[:6] self._log_number = log_number self._results = results + self._upload_status = upload.UploadStatus() + self._upload_status.set_helper(False) self._helper.log("CHKUploadHelper starting for SI %s" % upload_id, parent=log_number) @@ -416,6 +418,10 @@ class LocalCiphertextReader(AskUntilSuccessMixin): self._upload_helper = upload_helper self._storage_index = storage_index self._encoding_file = encoding_file + self._status = None + + def set_upload_status(self, upload_status): + self._status = interfaces.IUploadStatus(upload_status) def start(self): self._size = os.stat(self._encoding_file)[stat.ST_SIZE] diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 2b150d0b..3ea412bc 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -1,5 +1,5 @@ -import os, time +import os, time, weakref from zope.interface import implements from twisted.python import failure from twisted.internet import defer @@ -16,7 +16,7 @@ from allmydata import encode, storage, hashtree, uri from allmydata.util import idlib, mathutil from allmydata.util.assertutil import precondition from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ - IEncryptedUploadable, RIEncryptedUploadable + IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus from pycryptopp.cipher.aes import AES from cStringIO import StringIO @@ -113,12 +113,13 @@ class PeerTracker: class Tahoe2PeerSelector: - def __init__(self, upload_id, logparent=None): + def __init__(self, upload_id, logparent=None, upload_status=None): self.upload_id = upload_id self.query_count, self.good_query_count, self.bad_query_count = 0,0,0 self.error_count = 0 self.num_peers_contacted = 0 self.last_failure_msg = None + self._status = IUploadStatus(upload_status) self._log_parent = log.msg("%s starting" % self, parent=logparent) def __repr__(self): @@ -132,6 +133,9 @@ class Tahoe2PeerSelector: shares for us """ + if self._status: + self._status.set_status("Contacting Peers..") + self.total_shares = total_shares self.shares_of_happiness = shares_of_happiness @@ -204,6 +208,11 @@ class Tahoe2PeerSelector: shares_to_ask = set([self.homeless_shares.pop(0)]) self.query_count += 1 self.num_peers_contacted += 1 + if self._status: + self._status.set_status("Contacting Peers [%s] (first query)," + " %d shares left.." + % (idlib.shortnodeid_b2a(peer.peerid), + len(self.homeless_shares))) d = peer.query(shares_to_ask) d.addBoth(self._got_response, peer, shares_to_ask, self.contacted_peers) @@ -220,6 +229,11 @@ class Tahoe2PeerSelector: shares_to_ask = set(self.homeless_shares[:num_shares]) self.homeless_shares[:num_shares] = [] self.query_count += 1 + if self._status: + self._status.set_status("Contacting Peers [%s] (second query)," + " %d shares left.." + % (idlib.shortnodeid_b2a(peer.peerid), + len(self.homeless_shares))) d = peer.query(shares_to_ask) d.addBoth(self._got_response, peer, shares_to_ask, self.contacted_peers2) @@ -250,6 +264,8 @@ class Tahoe2PeerSelector: raise encode.NotEnoughPeersError(msg) else: # we placed enough to be happy, so we're done + if self._status: + self._status.set_status("Placed all shares") return self.use_peers def _got_response(self, res, peer, shares_to_ask, put_peer_here): @@ -339,6 +355,12 @@ class EncryptAnUploadable: self._plaintext_segment_hashes = [] self._encoding_parameters = None self._file_size = None + self._ciphertext_bytes_read = 0 + self._status = None + + def set_upload_status(self, upload_status): + self._status = IUploadStatus(upload_status) + self.original.set_upload_status(upload_status) def log(self, *args, **kwargs): if "facility" not in kwargs: @@ -351,6 +373,8 @@ class EncryptAnUploadable: d = self.original.get_size() def _got_size(size): self._file_size = size + if self._status: + self._status.set_size(size) return size d.addCallback(_got_size) return d @@ -384,7 +408,8 @@ class EncryptAnUploadable: # specify that it is truncated to the same 128 bits as the AES key. assert len(storage_index) == 16 # SHA-256 truncated to 128b self._storage_index = storage_index - + if self._status: + self._status.set_storage_index(storage_index) return e d.addCallback(_got) return d @@ -432,6 +457,8 @@ class EncryptAnUploadable: def read_encrypted(self, length, hash_only): # make sure our parameters have been set up first d = self.get_all_encoding_parameters() + # and size + d.addCallback(lambda ignored: self.get_size()) d.addCallback(lambda ignored: self._get_encryptor()) # then fetch and encrypt the plaintext. The unusual structure here # (passing a Deferred *into* a function) is needed to avoid @@ -481,10 +508,12 @@ class EncryptAnUploadable: cryptdata = [] # we use data.pop(0) instead of 'for chunk in data' to save # memory: each chunk is destroyed as soon as we're done with it. + bytes_processed = 0 while data: chunk = data.pop(0) log.msg(" read_encrypted handling %dB-sized chunk" % len(chunk), level=log.NOISY) + bytes_processed += len(chunk) self._plaintext_hasher.update(chunk) self._update_segment_hash(chunk) # TODO: we have to encrypt the data (even if hash_only==True) @@ -499,6 +528,10 @@ class EncryptAnUploadable: cryptdata.append(ciphertext) del ciphertext del chunk + self._ciphertext_bytes_read += bytes_processed + if self._status: + progress = float(self._ciphertext_bytes_read) / self._file_size + self._status.set_progress(1, progress) return cryptdata @@ -526,6 +559,38 @@ class EncryptAnUploadable: def close(self): return self.original.close() +class UploadStatus: + implements(IUploadStatus) + + def __init__(self): + self.storage_index = None + self.size = None + self.helper = False + self.status = "Not started" + self.progress = [0.0, 0.0, 0.0] + + def get_storage_index(self): + return self.storage_index + def get_size(self): + return self.size + def using_helper(self): + return self.helper + def get_status(self): + return self.status + def get_progress(self): + return tuple(self.progress) + + def set_storage_index(self, si): + self.storage_index = si + def set_size(self, size): + self.size = size + def set_helper(self, helper): + self.helper = helper + def set_status(self, status): + self.status = status + def set_progress(self, which, value): + # [0]: chk, [1]: ciphertext, [2]: encode+push + self.progress[which] = value class CHKUploader: peer_selector_class = Tahoe2PeerSelector @@ -535,6 +600,9 @@ class CHKUploader: self._log_number = self._client.log("CHKUploader starting") self._encoder = None self._results = UploadResults() + self._storage_index = None + self._upload_status = UploadStatus() + self._upload_status.set_helper(False) def log(self, *args, **kwargs): if "parent" not in kwargs: @@ -554,6 +622,7 @@ class CHKUploader: self.log("starting upload of %s" % uploadable) eu = EncryptAnUploadable(uploadable) + eu.set_upload_status(self._upload_status) d = self.start_encrypted(eu) def _uploaded(res): d1 = uploadable.get_encryption_key() @@ -575,7 +644,8 @@ class CHKUploader: eu = IEncryptedUploadable(encrypted) started = time.time() - self._encoder = e = encode.Encoder(self._log_number) + self._encoder = e = encode.Encoder(self._log_number, + self._upload_status) d = e.set_encrypted_uploadable(eu) d.addCallback(self.locate_all_shareholders, started) d.addCallback(self.set_shareholders, e) @@ -588,9 +658,11 @@ class CHKUploader: peer_selection_started = now = time.time() self._storage_index_elapsed = now - started storage_index = encoder.get_param("storage_index") + self._storage_index = storage_index upload_id = idlib.b2a(storage_index)[:6] self.log("using storage index %s" % upload_id) - peer_selector = self.peer_selector_class(upload_id, self._log_number) + peer_selector = self.peer_selector_class(upload_id, self._log_number, + self._upload_status) share_size = encoder.get_param("share_size") block_size = encoder.get_param("block_size") @@ -657,6 +729,8 @@ class CHKUploader: r.uri = u.to_string() return r + def get_upload_status(self): + return self._upload_status def read_this_many_bytes(uploadable, size, prepend_data=[]): if size == 0: @@ -680,11 +754,17 @@ class LiteralUploader: def __init__(self, client): self._client = client self._results = UploadResults() + self._status = s = UploadStatus() + s.set_storage_index(None) + s.set_helper(False) + s.set_progress(0, 1.0) def start(self, uploadable): uploadable = IUploadable(uploadable) d = uploadable.get_size() def _got_size(size): + self._size = size + self._status.set_size(size) self._results.file_size = size return read_this_many_bytes(uploadable, size) d.addCallback(_got_size) @@ -695,21 +775,41 @@ class LiteralUploader: def _build_results(self, uri): self._results.uri = uri + self._status.set_status("Done") + self._status.set_progress(1, 1.0) + self._status.set_progress(2, 1.0) return self._results def close(self): pass + def get_upload_status(self): + return self._status + class RemoteEncryptedUploadable(Referenceable): implements(RIEncryptedUploadable) - def __init__(self, encrypted_uploadable): + def __init__(self, encrypted_uploadable, upload_status): self._eu = IEncryptedUploadable(encrypted_uploadable) self._offset = 0 self._bytes_sent = 0 + self._status = IUploadStatus(upload_status) + # we are responsible for updating the status string while we run, and + # for setting the ciphertext-fetch progress. + self._size = None + + def get_size(self): + if self._size is not None: + return defer.succeed(self._size) + d = self._eu.get_size() + def _got_size(size): + self._size = size + return size + d.addCallback(_got_size) + return d def remote_get_size(self): - return self._eu.get_size() + return self.get_size() def remote_get_all_encoding_parameters(self): return self._eu.get_all_encoding_parameters() @@ -771,6 +871,9 @@ class AssistedUploader: def __init__(self, helper): self._helper = helper self._log_number = log.msg("AssistedUploader starting") + self._storage_index = None + self._upload_status = s = UploadStatus() + s.set_helper(True) def log(self, msg, parent=None, **kwargs): if parent is None: @@ -781,6 +884,7 @@ class AssistedUploader: self._started = time.time() u = IUploadable(uploadable) eu = EncryptAnUploadable(u) + eu.set_upload_status(self._upload_status) self._encuploadable = eu d = eu.get_size() d.addCallback(self._got_size) @@ -800,6 +904,7 @@ class AssistedUploader: def _got_size(self, size): self._size = size + self._upload_status.set_size(size) def _got_all_encoding_parameters(self, params): k, happy, n, segment_size = params @@ -819,6 +924,7 @@ class AssistedUploader: now = self._time_contacting_helper_start = time.time() self._storage_index_elapsed = now - self._started self.log("contacting helper..") + self._upload_status.set_status("Contacting Helper") d = self._helper.callRemote("upload_chk", self._storage_index) d.addCallback(self._contacted_helper) return d @@ -829,16 +935,23 @@ class AssistedUploader: self._elapsed_time_contacting_helper = elapsed if upload_helper: self.log("helper says we need to upload") + self._upload_status.set_status("Uploading Ciphertext") # we need to upload the file - reu = RemoteEncryptedUploadable(self._encuploadable) - d = upload_helper.callRemote("upload", reu) + reu = RemoteEncryptedUploadable(self._encuploadable, + self._upload_status) + # let it pre-compute the size for progress purposes + d = reu.get_size() + d.addCallback(lambda ignored: + upload_helper.callRemote("upload", reu)) # this Deferred will fire with the upload results return d self.log("helper says file is already uploaded") + self._upload_status.set_progress(1, 1.0) return upload_results def _build_readcap(self, upload_results): self.log("upload finished, building readcap") + self._upload_status.set_status("Building Readcap") r = upload_results assert r.uri_extension_data["needed_shares"] == self._needed_shares assert r.uri_extension_data["total_shares"] == self._total_shares @@ -858,8 +971,12 @@ class AssistedUploader: if "total" in r.timings: r.timings["helper_total"] = r.timings["total"] r.timings["total"] = now - self._started + self._upload_status.set_status("Done") return r + def get_upload_status(self): + return self._upload_status + class BaseUploadable: default_max_segment_size = 1*MiB # overridden by max_segment_size default_encoding_param_k = 3 # overridden by encoding_parameters @@ -872,6 +989,10 @@ class BaseUploadable: encoding_param_n = None _all_encoding_parameters = None + _status = None + + def set_upload_status(self, upload_status): + self._status = IUploadStatus(upload_status) def set_default_encoding_parameters(self, default_params): assert isinstance(default_params, dict) @@ -915,25 +1036,38 @@ class FileHandle(BaseUploadable): self._filehandle = filehandle self._key = None self._contenthashkey = contenthashkey + self._size = None def _get_encryption_key_content_hash(self): if self._key is not None: return defer.succeed(self._key) - d = self.get_all_encoding_parameters() + d = self.get_size() + # that sets self._size as a side-effect + d.addCallback(lambda size: self.get_all_encoding_parameters()) def _got(params): k, happy, n, segsize = params f = self._filehandle enckey_hasher = content_hash_key_hasher(k, n, segsize) f.seek(0) BLOCKSIZE = 64*1024 + bytes_read = 0 while True: data = f.read(BLOCKSIZE) if not data: break enckey_hasher.update(data) + # TODO: setting progress in a non-yielding loop is kind of + # pointless, but I'm anticipating (perhaps prematurely) the + # day when we use a slowjob or twisted's CooperatorService to + # make this yield time to other jobs. + bytes_read += len(data) + if self._status: + self._status.set_progress(0, float(bytes_read)/self._size) f.seek(0) self._key = enckey_hasher.digest() + if self._status: + self._status.set_progress(0, 1.0) assert len(self._key) == 16 return self._key d.addCallback(_got) @@ -951,8 +1085,11 @@ class FileHandle(BaseUploadable): return self._get_encryption_key_random() def get_size(self): + if self._size is not None: + return defer.succeed(self._size) self._filehandle.seek(0,2) size = self._filehandle.tell() + self._size = size self._filehandle.seek(0) return defer.succeed(size) @@ -985,6 +1122,7 @@ class Uploader(service.MultiService): def __init__(self, helper_furl=None): self._helper_furl = helper_furl self._helper = None + self._all_uploads = weakref.WeakKeyDictionary() service.MultiService.__init__(self) def startService(self): @@ -1021,6 +1159,7 @@ class Uploader(service.MultiService): uploader = AssistedUploader(self._helper) else: uploader = self.uploader_class(self.parent) + self._all_uploads[uploader.get_upload_status()] = None return uploader.start(uploadable) d.addCallback(_got_size) def _done(res):