From: Brian Warner Date: Wed, 6 Feb 2008 08:52:25 +0000 (-0700) Subject: add upload-results timing info for helper uploads. This changes the Helper protocol... X-Git-Tag: allmydata-tahoe-0.8.0~131 X-Git-Url: https://git.rkrishnan.org/pf/$top_link?a=commitdiff_plain;h=124fb5ecdfcc74cb879aab756881076568c39fb4;p=tahoe-lafs%2Ftahoe-lafs.git add upload-results timing info for helper uploads. This changes the Helper protocol, and introduces a compatibility break --- diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index 39f1e904..1e41ef12 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -669,10 +669,3 @@ class Encoder(object): def get_times(self): # return a dictionary of encode+push timings return self._times - def get_rates(self): - # return a dictionary of encode+push speeds - rates = { - "encode": self.file_size / self._times["cumulative_encoding"], - "push": self.file_size / self._times["cumulative_sending"], - } - return rates diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index ae1fc65c..0312ff0d 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1376,7 +1376,7 @@ class RIControlClient(RemoteInterface): return DictOf(Nodeid, float) -UploadResults = DictOf(str, str) +UploadResults = Any() #DictOf(str, str) class RIEncryptedUploadable(RemoteInterface): __remote_name__ = "RIEncryptedUploadable.tahoe.allmydata.com" diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index 4da21e2c..7f2a34f4 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -1,5 +1,5 @@ -import os.path, stat +import os.path, stat, time from zope.interface import implements from twisted.application import service from twisted.internet import defer @@ -131,13 +131,14 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): def __init__(self, storage_index, helper, incoming_file, encoding_file, - log_number): + results, log_number): self._storage_index = storage_index self._helper = helper self._incoming_file = incoming_file self._encoding_file = encoding_file upload_id = idlib.b2a(storage_index)[:6] self._log_number = log_number + self._results = results self._helper.log("CHKUploadHelper starting for SI %s" % upload_id, parent=log_number) @@ -159,6 +160,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): return upload.CHKUploader.log(self, *args, **kwargs) def start(self): + self._started = time.time() # determine if we need to upload the file. If so, return ({},self) . # If not, return (UploadResults,None) . self.log("deciding whether to upload the file or not", level=log.NOISY) @@ -166,15 +168,15 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): # we have the whole file, and we might be encoding it (or the # encode/upload might have failed, and we need to restart it). self.log("ciphertext already in place", level=log.UNUSUAL) - return ({}, self) + return (self._results, self) if os.path.exists(self._incoming_file): # we have some of the file, but not all of it (otherwise we'd be # encoding). The caller might be useful. self.log("partial ciphertext already present", level=log.UNUSUAL) - return ({}, self) + return (self._results, self) # we don't remember uploading this file self.log("no ciphertext yet", level=log.NOISY) - return ({}, self) + return (self._results, self) def remote_upload(self, reader): # reader is an RIEncryptedUploadable. I am specified to return an @@ -190,10 +192,14 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): def _finished(self, res): (uri_extension_hash, needed_shares, total_shares, size) = res - upload_results = {'uri_extension_hash': uri_extension_hash} + r = self._results + r.uri_extension_hash = uri_extension_hash + f_times = self._fetcher.get_times() + r.timings["cumulative_fetch"] = f_times["cumulative_fetch"] + r.timings["total_fetch"] = f_times["total"] self._reader.close() os.unlink(self._encoding_file) - self._finished_observers.fire(upload_results) + self._finished_observers.fire(r) self._helper.upload_finished(self._storage_index) del self._reader @@ -248,6 +254,10 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): self._readers = [] self._started = False self._f = None + self._times = { + "cumulative_fetch": 0.0, + "total": 0.0, + } def log(self, *args, **kwargs): if "facility" not in kwargs: @@ -264,6 +274,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): if self._started: return self._started = True + started = time.time() if os.path.exists(self._encoding_file): self.log("ciphertext already present, bypassing fetch", @@ -276,7 +287,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): # else. have = os.stat(self._encoding_file)[stat.ST_SIZE] d = self.call("read_encrypted", have-1, 1) - d.addCallback(lambda ignored: self._done2()) + d.addCallback(self._done2, started) return # first, find out how large the file is going to be @@ -284,6 +295,7 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): d.addCallback(self._got_size) d.addCallback(self._start_reading) d.addCallback(self._done) + d.addCallback(self._done2, started) d.addErrback(self._failed) def _got_size(self, size): @@ -327,8 +339,11 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): # transfer that involves more than a few hundred chunks. # 'fire_when_done' lives a long time, but the Deferreds returned by # the inner _fetch() call do not. + start = time.time() d = defer.maybeDeferred(self._fetch) def _done(finished): + elapsed = time.time() - start + self._times["cumulative_fetch"] += elapsed if finished: self.log("finished reading ciphertext", level=log.NOISY) fire_when_done.callback(None) @@ -366,10 +381,11 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): size=os.stat(self._incoming_file)[stat.ST_SIZE], level=log.NOISY) os.rename(self._incoming_file, self._encoding_file) - return self._done2() - def _done2(self): + def _done2(self, _ignored, started): self.log("done2", level=log.NOISY) + elapsed = time.time() - started + self._times["total"] = elapsed self._readers = [] self._done_observers.fire(None) @@ -382,6 +398,8 @@ class CHKCiphertextFetcher(AskUntilSuccessMixin): def when_done(self): return self._done_observers.when_fired() + def get_times(self): + return self._times class LocalCiphertextReader(AskUntilSuccessMixin): @@ -449,6 +467,8 @@ class Helper(Referenceable, service.MultiService): return self.parent.log(*args, **kwargs) def remote_upload_chk(self, storage_index): + r = upload.UploadResults() + started = time.time() si_s = idlib.b2a(storage_index) lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s) incoming_file = os.path.join(self._chk_incoming, si_s) @@ -458,11 +478,14 @@ class Helper(Referenceable, service.MultiService): uh = self._active_uploads[storage_index] return uh.start() - d = self._check_for_chk_already_in_grid(storage_index, lp) - def _checked(upload_results): - if upload_results: + d = self._check_for_chk_already_in_grid(storage_index, r, lp) + def _checked(already_present): + elapsed = time.time() - started + r.timings['existence_check'] = elapsed + if already_present: + # the necessary results are placed in the UploadResults self.log("file already found in grid", parent=lp) - return (upload_results, None) + return (r, None) # the file is not present in the grid, by which we mean there are # less than 'N' shares available. @@ -477,7 +500,7 @@ class Helper(Referenceable, service.MultiService): self.log("creating new upload helper", parent=lp) uh = self.chk_upload_helper_class(storage_index, self, incoming_file, encoding_file, - lp) + r, lp) self._active_uploads[storage_index] = uh return uh.start() d.addCallback(_checked) @@ -488,7 +511,7 @@ class Helper(Referenceable, service.MultiService): d.addErrback(_err) return d - def _check_for_chk_already_in_grid(self, storage_index, lp): + def _check_for_chk_already_in_grid(self, storage_index, results, lp): # see if this file is already in the grid lp2 = self.log("doing a quick check+UEBfetch", parent=lp, level=log.NOISY) @@ -499,8 +522,8 @@ class Helper(Referenceable, service.MultiService): if res: (sharemap, ueb_data, ueb_hash) = res self.log("found file in grid", level=log.NOISY, parent=lp) - upload_results = {'uri_extension_hash': ueb_hash} - return upload_results + results.uri_extension_hash = ueb_hash + return True return False d.addCallback(_checked) return d diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index 481664e9..e1e8b0c6 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -28,7 +28,8 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper): class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper): def start(self): - res = {'uri_extension_hash': hashutil.uri_extension_hash("")} + res = upload.UploadResults() + res.uri_extension_hash = hashutil.uri_extension_hash("") return (res, None) class FakeClient(service.MultiService): diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 69a42c60..72b27faa 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -4,7 +4,7 @@ from zope.interface import implements from twisted.python import failure from twisted.internet import defer from twisted.application import service -from foolscap import Referenceable +from foolscap import Referenceable, Copyable, RemoteCopy from foolscap import eventual from foolscap.logging import log @@ -36,14 +36,17 @@ class HaveAllPeersError(Exception): class TooFullError(Exception): pass -class UploadResults: +class UploadResults(Copyable, RemoteCopy): implements(IUploadResults) + typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com" + copytype = typeToCopy + + file_size = None uri = None sharemap = None # dict of shnum to placement string servermap = None # dict of peerid to set(shnums) def __init__(self): self.timings = {} # dict of name to number of seconds - self.rates = {} # dict of name to rates (in bytes per second) # our current uri_extension is 846 bytes for small files, a few bytes # more for larger ones (since the filesize is encoded in decimal in a @@ -597,15 +600,19 @@ class CHKUploader: def start_encrypted(self, encrypted): eu = IEncryptedUploadable(encrypted) + started = time.time() self._encoder = e = encode.Encoder(self._log_number) d = e.set_encrypted_uploadable(eu) - d.addCallback(self.locate_all_shareholders) + d.addCallback(self.locate_all_shareholders, started) d.addCallback(self.set_shareholders, e) d.addCallback(lambda res: e.start()) + d.addCallback(self._encrypted_done) # this fires with the uri_extension_hash and other data return d - def locate_all_shareholders(self, encoder): + def locate_all_shareholders(self, encoder, started): + peer_selection_started = now = time.time() + self._storage_index_elapsed = now - started storage_index = encoder.get_param("storage_index") upload_id = idlib.b2a(storage_index)[:6] self.log("using storage index %s" % upload_id) @@ -621,7 +628,7 @@ class CHKUploader: share_size, block_size, num_segments, n, desired) def _done(res): - self._peer_selection_finished = time.time() + self._peer_selection_elapsed = time.time() - peer_selection_started return res d.addCallback(_done) return d @@ -642,17 +649,8 @@ class CHKUploader: assert len(buckets) == sum([len(peer.buckets) for peer in used_peers]) encoder.set_shareholders(buckets) - def _compute_uri(self, (uri_extension_hash, - needed_shares, total_shares, size), - key): - u = uri.CHKFileURI(key=key, - uri_extension_hash=uri_extension_hash, - needed_shares=needed_shares, - total_shares=total_shares, - size=size, - ) + def _encrypted_done(self, res): r = self._results - r.uri = u.to_string() r.sharemap = {} r.servermap = {} for shnum in self._encoder.get_shares_placed(): @@ -663,14 +661,25 @@ class CHKUploader: if peerid not in r.servermap: r.servermap[peerid] = set() r.servermap[peerid].add(shnum) - peer_selection_time = (self._peer_selection_finished - - self._peer_selection_started) now = time.time() + r.file_size = self._encoder.file_size r.timings["total"] = now - self._started - r.rates["total"] = 1.0 * self._encoder.file_size / r.timings["total"] - r.timings["peer_selection"] = peer_selection_time + r.timings["storage_index"] = self._storage_index_elapsed + r.timings["peer_selection"] = self._peer_selection_elapsed r.timings.update(self._encoder.get_times()) - r.rates.update(self._encoder.get_rates()) + return res + + def _compute_uri(self, (uri_extension_hash, + needed_shares, total_shares, size), + key): + u = uri.CHKFileURI(key=key, + uri_extension_hash=uri_extension_hash, + needed_shares=needed_shares, + total_shares=total_shares, + size=size, + ) + r = self._results + r.uri = u.to_string() return r @@ -703,7 +712,10 @@ class LiteralUploader: def start(self, uploadable): uploadable = IUploadable(uploadable) d = uploadable.get_size() - d.addCallback(lambda size: read_this_many_bytes(uploadable, size)) + def _got_size(size): + self._results.file_size = size + return read_this_many_bytes(uploadable, size) + d.addCallback(_got_size) d.addCallback(lambda data: uri.LiteralFileURI("".join(data))) d.addCallback(lambda u: u.to_string()) d.addCallback(self._build_results) @@ -794,7 +806,6 @@ class AssistedUploader: assert isinstance(default_encoding_parameters, dict) self._default_encoding_parameters = default_encoding_parameters self._log_number = log.msg("AssistedUploader starting") - self._results = UploadResults() def log(self, msg, parent=None, **kwargs): if parent is None: @@ -838,16 +849,16 @@ class AssistedUploader: self._storage_index = storage_index def _contact_helper(self, res): - now = self._time_contacting_helper = time.time() - self._results.timings["local_hashing"] = now - self._started + now = self._time_contacting_helper_start = time.time() + self._storage_index_elapsed = now - self._started self.log("contacting helper..") d = self._helper.callRemote("upload_chk", self._storage_index) d.addCallback(self._contacted_helper) return d def _contacted_helper(self, (upload_results, upload_helper)): now = time.time() - elapsed = now - self._time_contacting_helper - self._results.timings["contacting_helper"] = elapsed + elapsed = now - self._time_contacting_helper_start + self._elapsed_time_contacting_helper = elapsed if upload_helper: self.log("helper says we need to upload") # we need to upload the file @@ -883,18 +894,21 @@ class AssistedUploader: def _build_readcap(self, upload_results): self.log("upload finished, building readcap") - ur = upload_results + r = upload_results u = uri.CHKFileURI(key=self._key, - uri_extension_hash=ur['uri_extension_hash'], + uri_extension_hash=r.uri_extension_hash, needed_shares=self._needed_shares, total_shares=self._total_shares, size=self._size, ) - r = self._results r.uri = u.to_string() now = time.time() + r.file_size = self._size + r.timings["storage_index"] = self._storage_index_elapsed + r.timings["contacting_helper"] = self._elapsed_time_contacting_helper + if "total" in r.timings: + r.timings["helper_total"] = r.timings["total"] r.timings["total"] = now - self._started - r.rates["total"] = 1.0 * self._size / r.timings["total"] return r class NoParameterPreferencesMixin: diff --git a/src/allmydata/web/unlinked-upload.xhtml b/src/allmydata/web/unlinked-upload.xhtml index 6f9149d3..eb4283a7 100644 --- a/src/allmydata/web/unlinked-upload.xhtml +++ b/src/allmydata/web/unlinked-upload.xhtml @@ -18,9 +18,12 @@
  • Servermap:
  • Timings: