From: Brian Warner Date: Tue, 22 May 2012 04:14:00 +0000 (-0700) Subject: add HelperUploadResults X-Git-Url: https://git.rkrishnan.org/pf/content/vdrive?a=commitdiff_plain;h=b71234c538701246d0466f41d63731898e873f5c;p=tahoe-lafs%2Ftahoe-lafs.git add HelperUploadResults This splits the pb.Copyable on-wire object (HelperUploadResults) out from the local results object (UploadResults). To maintain compatibility with older Helpers, we have to leave pb.Copyable classes alone and unmodified, but we want to change UploadResults to use IServers instead of serverids. So by using a different class on the wire, and translating to/from it on either end, we can accomplish both. --- diff --git a/src/allmydata/immutable/offloaded.py b/src/allmydata/immutable/offloaded.py index 922750a9..8faf516a 100644 --- a/src/allmydata/immutable/offloaded.py +++ b/src/allmydata/immutable/offloaded.py @@ -137,14 +137,13 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): def __init__(self, storage_index, helper, storage_broker, secret_holder, incoming_file, encoding_file, - results, log_number): + log_number): self._storage_index = storage_index self._helper = helper self._incoming_file = incoming_file self._encoding_file = encoding_file self._upload_id = si_b2a(storage_index)[:5] self._log_number = log_number - self._results = results self._upload_status = upload.UploadStatus() self._upload_status.set_helper(False) self._upload_status.set_storage_index(storage_index) @@ -201,19 +200,31 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): # and inform the client when the upload has finished return self._finished_observers.when_fired() - def _finished(self, uploadresults): - precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr) - assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults - r = uploadresults - v = uri.from_string(r.verifycapstr) - r.uri_extension_hash = v.uri_extension_hash + def _finished(self, ur): + precondition(isinstance(ur.verifycapstr, str), ur.verifycapstr) + assert interfaces.IUploadResults.providedBy(ur), ur + v = uri.from_string(ur.verifycapstr) f_times = self._fetcher.get_times() - r.timings["cumulative_fetch"] = f_times["cumulative_fetch"] - r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched() - r.timings["total_fetch"] = f_times["total"] + + hur = upload.HelperUploadResults() + hur.timings = {"cumulative_fetch": f_times["cumulative_fetch"], + "total_fetch": f_times["total"], + } + for k in ur.timings: + hur.timings[k] = ur.timings[k] + hur.uri_extension_hash = v.uri_extension_hash + hur.ciphertext_fetched = self._fetcher.get_ciphertext_fetched() + hur.preexisting_shares = ur.preexisting_shares + hur.sharemap = ur.sharemap + hur.servermap = ur.servermap + hur.pushed_shares = ur.pushed_shares + hur.file_size = ur.file_size + hur.uri_extension_data = ur.uri_extension_data + hur.verifycapstr = ur.verifycapstr + self._reader.close() os.unlink(self._encoding_file) - self._finished_observers.fire(r) + self._finished_observers.fire(hur) self._helper.upload_finished(self._storage_index, v.size) del self._reader @@ -561,7 +572,6 @@ class Helper(Referenceable): def remote_upload_chk(self, storage_index): self.count("chk_upload_helper.upload_requests") - r = upload.UploadResults() lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_b2a(storage_index)) if storage_index in self._active_uploads: @@ -569,8 +579,8 @@ class Helper(Referenceable): uh = self._active_uploads[storage_index] return (None, uh) - d = self._check_chk(storage_index, r, lp) - d.addCallback(self._did_chk_check, storage_index, r, lp) + d = self._check_chk(storage_index, lp) + d.addCallback(self._did_chk_check, storage_index, lp) def _err(f): self.log("error while checking for chk-already-in-grid", failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg") @@ -578,7 +588,7 @@ class Helper(Referenceable): d.addErrback(_err) return d - def _check_chk(self, storage_index, results, lp): + def _check_chk(self, storage_index, lp): # see if this file is already in the grid lp2 = self.log("doing a quick check+UEBfetch", parent=lp, level=log.NOISY) @@ -589,17 +599,18 @@ class Helper(Referenceable): if res: (sharemap, ueb_data, ueb_hash) = res self.log("found file in grid", level=log.NOISY, parent=lp) - results.uri_extension_hash = ueb_hash - results.sharemap = sharemap - results.uri_extension_data = ueb_data - results.preexisting_shares = len(sharemap) - results.pushed_shares = 0 - return results + hur = upload.HelperUploadResults() + hur.uri_extension_hash = ueb_hash + hur.sharemap = sharemap + hur.uri_extension_data = ueb_data + hur.preexisting_shares = len(sharemap) + hur.pushed_shares = 0 + return hur return None d.addCallback(_checked) return d - def _did_chk_check(self, already_present, storage_index, r, lp): + def _did_chk_check(self, already_present, storage_index, lp): if already_present: # the necessary results are placed in the UploadResults self.count("chk_upload_helper.upload_already_present") @@ -618,12 +629,12 @@ class Helper(Referenceable): uh = self._active_uploads[storage_index] else: self.log("creating new upload helper", parent=lp) - uh = self._make_chk_upload_helper(storage_index, r, lp) + uh = self._make_chk_upload_helper(storage_index, lp) self._active_uploads[storage_index] = uh self._add_upload(uh) return (None, uh) - def _make_chk_upload_helper(self, storage_index, r, lp): + def _make_chk_upload_helper(self, storage_index, lp): si_s = si_b2a(storage_index) incoming_file = os.path.join(self._chk_incoming, si_s) encoding_file = os.path.join(self._chk_encoding, si_s) @@ -631,7 +642,7 @@ class Helper(Referenceable): self._storage_broker, self._secret_holder, incoming_file, encoding_file, - r, lp) + lp) return uh def _add_upload(self, uh): diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index 9d33cb42..958962ab 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -32,8 +32,10 @@ from cStringIO import StringIO class TooFullError(Exception): pass -class UploadResults(Copyable, RemoteCopy): - implements(IUploadResults) +# HelperUploadResults are what we get from the Helper, and to retain +# backwards compatibility with old Helpers we can't change the format. We +# convert them into a local UploadResults upon receipt. +class HelperUploadResults(Copyable, RemoteCopy): # note: don't change this string, it needs to match the value used on the # helper, and it does *not* need to match the fully-qualified # package/module/class name @@ -55,6 +57,19 @@ class UploadResults(Copyable, RemoteCopy): self.preexisting_shares = None # count of shares already present self.pushed_shares = None # count of shares we pushed +class UploadResults: + implements(IUploadResults) + + def __init__(self): + self.timings = {} # dict of name to number of seconds + self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)} + self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)} + self.file_size = None + self.ciphertext_fetched = None # how much the helper fetched + self.uri = None + self.preexisting_shares = None # count of shares already present + self.pushed_shares = None # count of shares we pushed + # our current uri_extension is 846 bytes for small files, a few bytes # more for larger ones (since the filesize is encoded in decimal in a @@ -1179,7 +1194,7 @@ class AssistedUploader: d.addCallback(self._contacted_helper) return d - def _contacted_helper(self, (upload_results, upload_helper)): + def _contacted_helper(self, (helper_upload_results, upload_helper)): now = time.time() elapsed = now - self._time_contacting_helper_start self._elapsed_time_contacting_helper = elapsed @@ -1197,7 +1212,7 @@ class AssistedUploader: return d self.log("helper says file is already uploaded", level=log.OPERATIONAL) self._upload_status.set_progress(1, 1.0) - return upload_results + return helper_upload_results def _convert_old_upload_results(self, upload_results): # pre-1.3.0 helpers return upload results which contain a mapping @@ -1216,30 +1231,41 @@ class AssistedUploader: if str in [type(v) for v in sharemap.values()]: upload_results.sharemap = None - def _build_verifycap(self, upload_results): + def _build_verifycap(self, helper_upload_results): self.log("upload finished, building readcap", level=log.OPERATIONAL) - self._convert_old_upload_results(upload_results) + self._convert_old_upload_results(helper_upload_results) self._upload_status.set_status("Building Readcap") - r = upload_results - assert r.uri_extension_data["needed_shares"] == self._needed_shares - assert r.uri_extension_data["total_shares"] == self._total_shares - assert r.uri_extension_data["segment_size"] == self._segment_size - assert r.uri_extension_data["size"] == self._size - r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, - uri_extension_hash=r.uri_extension_hash, - needed_shares=self._needed_shares, - total_shares=self._total_shares, size=self._size - ).to_string() + hur = helper_upload_results + assert hur.uri_extension_data["needed_shares"] == self._needed_shares + assert hur.uri_extension_data["total_shares"] == self._total_shares + assert hur.uri_extension_data["segment_size"] == self._segment_size + assert hur.uri_extension_data["size"] == self._size + ur = UploadResults() + # hur.verifycap doesn't exist if already found + v = uri.CHKFileVerifierURI(self._storage_index, + uri_extension_hash=hur.uri_extension_hash, + needed_shares=self._needed_shares, + total_shares=self._total_shares, + size=self._size) + ur.verifycapstr = v.to_string() + ur.timings = hur.timings + ur.uri_extension_data = hur.uri_extension_data + ur.uri_extension_hash = hur.uri_extension_hash + ur.preexisting_shares = hur.preexisting_shares + ur.pushed_shares = hur.pushed_shares + ur.sharemap = hur.sharemap + ur.servermap = hur.servermap # not if already found + ur.ciphertext_fetched = hur.ciphertext_fetched # not if already found now = time.time() - r.file_size = self._size - r.timings["storage_index"] = self._storage_index_elapsed - r.timings["contacting_helper"] = self._elapsed_time_contacting_helper - if "total" in r.timings: - r.timings["helper_total"] = r.timings["total"] - r.timings["total"] = now - self._started + ur.file_size = self._size + ur.timings["storage_index"] = self._storage_index_elapsed + ur.timings["contacting_helper"] = self._elapsed_time_contacting_helper + if "total" in ur.timings: + ur.timings["helper_total"] = ur.timings["total"] + ur.timings["total"] = now - self._started self._upload_status.set_status("Finished") - self._upload_status.set_results(r) - return r + self._upload_status.set_results(ur) + return ur def get_upload_status(self): return self._upload_status diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index c0851f8c..6d7093b5 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -22,24 +22,31 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper): def _got_size(size): d2 = eu.get_all_encoding_parameters() def _got_parms(parms): + # just pretend we did the upload needed_shares, happy, total_shares, segsize = parms ueb_data = {"needed_shares": needed_shares, "total_shares": total_shares, "segment_size": segsize, "size": size, } - self._results.uri_extension_data = ueb_data - self._results.verifycapstr = uri.CHKFileVerifierURI(self._storage_index, "x"*32, - needed_shares, total_shares, - size).to_string() - return self._results + + r = upload.UploadResults() + r.preexisting_shares = 0 + r.pushed_shares = total_shares + r.file_size = size + r.uri_extension_data = ueb_data + v = uri.CHKFileVerifierURI(self._storage_index, "x"*32, + needed_shares, total_shares, + size) + r.verifycapstr = v.to_string() + return r d2.addCallback(_got_parms) return d2 d.addCallback(_got_size) return d class Helper_fake_upload(offloaded.Helper): - def _make_chk_upload_helper(self, storage_index, r, lp): + def _make_chk_upload_helper(self, storage_index, lp): si_s = si_b2a(storage_index) incoming_file = os.path.join(self._chk_incoming, si_s) encoding_file = os.path.join(self._chk_encoding, si_s) @@ -47,12 +54,12 @@ class Helper_fake_upload(offloaded.Helper): self._storage_broker, self._secret_holder, incoming_file, encoding_file, - r, lp) + lp) return uh class Helper_already_uploaded(Helper_fake_upload): - def _check_chk(self, storage_index, results, lp): - res = upload.UploadResults() + def _check_chk(self, storage_index, lp): + res = upload.HelperUploadResults() res.uri_extension_hash = hashutil.uri_extension_hash("") # we're pretending that the file they're trying to upload was already