From: Brian Warner Date: Tue, 22 May 2012 04:13:32 +0000 (-0700) Subject: clean up Helper to make later changes easier X-Git-Url: https://git.rkrishnan.org/?a=commitdiff_plain;h=0df833eac9aa24fe892dd8622c0707bfdbc35f89;p=tahoe-lafs%2Ftahoe-lafs.git clean up Helper to make later changes easier Fix up control flow inside the Helper, to make it more friendly for later refactoring. --- diff --git a/src/allmydata/immutable/offloaded.py b/src/allmydata/immutable/offloaded.py index 7ff19c38..922750a9 100644 --- a/src/allmydata/immutable/offloaded.py +++ b/src/allmydata/immutable/offloaded.py @@ -160,6 +160,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): self._reader = LocalCiphertextReader(self, storage_index, encoding_file) self._finished_observers = observer.OneShotObserverList() + self._started = time.time() d = self._fetcher.when_done() d.addCallback(lambda res: self._reader.start()) d.addCallback(lambda res: self.start_encrypted(self._reader)) @@ -171,31 +172,26 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): kwargs['facility'] = "tahoe.helper.chk" return upload.CHKUploader.log(self, *args, **kwargs) - def start(self): - self._started = time.time() - # determine if we need to upload the file. If so, return ({},self) . - # If not, return (UploadResults,None) . + def remote_get_version(self): + return self.VERSION + + def remote_upload(self, reader): + # reader is an RIEncryptedUploadable. I am specified to return an + # UploadResults dictionary. + + # Log how much ciphertext we need to get. self.log("deciding whether to upload the file or not", level=log.NOISY) if os.path.exists(self._encoding_file): # we have the whole file, and we might be encoding it (or the # encode/upload might have failed, and we need to restart it). self.log("ciphertext already in place", level=log.UNUSUAL) - return (self._results, self) - if os.path.exists(self._incoming_file): + elif os.path.exists(self._incoming_file): # we have some of the file, but not all of it (otherwise we'd be # encoding). The caller might be useful. self.log("partial ciphertext already present", level=log.UNUSUAL) - return (self._results, self) - # we don't remember uploading this file - self.log("no ciphertext yet", level=log.NOISY) - return (self._results, self) - - def remote_get_version(self): - return self.VERSION - - def remote_upload(self, reader): - # reader is an RIEncryptedUploadable. I am specified to return an - # UploadResults dictionary. + else: + # we don't remember uploading this file + self.log("no ciphertext yet", level=log.NOISY) # let our fetcher pull ciphertext from the reader. self._fetcher.add_reader(reader) @@ -491,7 +487,6 @@ class Helper(Referenceable): { }, "application-version": str(allmydata.__full_version__), } - chk_upload_helper_class = CHKUploadHelper MAX_UPLOAD_STATUSES = 10 def __init__(self, basedir, storage_broker, secret_holder, @@ -567,44 +562,15 @@ class Helper(Referenceable): def remote_upload_chk(self, storage_index): self.count("chk_upload_helper.upload_requests") r = upload.UploadResults() - si_s = si_b2a(storage_index) - lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s) - incoming_file = os.path.join(self._chk_incoming, si_s) - encoding_file = os.path.join(self._chk_encoding, si_s) + lp = self.log(format="helper: upload_chk query for SI %(si)s", + si=si_b2a(storage_index)) if storage_index in self._active_uploads: self.log("upload is currently active", parent=lp) uh = self._active_uploads[storage_index] - return uh.start() - - d = self._check_for_chk_already_in_grid(storage_index, r, lp) - def _checked(already_present): - if already_present: - # the necessary results are placed in the UploadResults - self.count("chk_upload_helper.upload_already_present") - self.log("file already found in grid", parent=lp) - return (r, None) - - self.count("chk_upload_helper.upload_need_upload") - # the file is not present in the grid, by which we mean there are - # less than 'N' shares available. - self.log("unable to find file in the grid", parent=lp, - level=log.NOISY) - # We need an upload helper. Check our active uploads again in - # case there was a race. - if storage_index in self._active_uploads: - self.log("upload is currently active", parent=lp) - uh = self._active_uploads[storage_index] - else: - self.log("creating new upload helper", parent=lp) - uh = self.chk_upload_helper_class(storage_index, self, - self._storage_broker, - self._secret_holder, - incoming_file, encoding_file, - r, lp) - self._active_uploads[storage_index] = uh - self._add_upload(uh) - return uh.start() - d.addCallback(_checked) + return (None, uh) + + d = self._check_chk(storage_index, r, lp) + d.addCallback(self._did_chk_check, storage_index, r, lp) def _err(f): self.log("error while checking for chk-already-in-grid", failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg") @@ -612,7 +578,7 @@ class Helper(Referenceable): d.addErrback(_err) return d - def _check_for_chk_already_in_grid(self, storage_index, results, lp): + def _check_chk(self, storage_index, results, lp): # see if this file is already in the grid lp2 = self.log("doing a quick check+UEBfetch", parent=lp, level=log.NOISY) @@ -628,11 +594,46 @@ class Helper(Referenceable): results.uri_extension_data = ueb_data results.preexisting_shares = len(sharemap) results.pushed_shares = 0 - return True - return False + return results + return None d.addCallback(_checked) return d + def _did_chk_check(self, already_present, storage_index, r, lp): + if already_present: + # the necessary results are placed in the UploadResults + self.count("chk_upload_helper.upload_already_present") + self.log("file already found in grid", parent=lp) + return (already_present, None) + + self.count("chk_upload_helper.upload_need_upload") + # the file is not present in the grid, by which we mean there are + # less than 'N' shares available. + self.log("unable to find file in the grid", parent=lp, + level=log.NOISY) + # We need an upload helper. Check our active uploads again in + # case there was a race. + if storage_index in self._active_uploads: + self.log("upload is currently active", parent=lp) + uh = self._active_uploads[storage_index] + else: + self.log("creating new upload helper", parent=lp) + uh = self._make_chk_upload_helper(storage_index, r, lp) + self._active_uploads[storage_index] = uh + self._add_upload(uh) + return (None, uh) + + def _make_chk_upload_helper(self, storage_index, r, lp): + si_s = si_b2a(storage_index) + incoming_file = os.path.join(self._chk_incoming, si_s) + encoding_file = os.path.join(self._chk_encoding, si_s) + uh = CHKUploadHelper(storage_index, self, + self._storage_broker, + self._secret_holder, + incoming_file, encoding_file, + r, lp) + return uh + def _add_upload(self, uh): self._all_uploads[uh] = None if self._history: diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index 399b2ec8..c0851f8c 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -1,4 +1,5 @@ import os +from twisted.internet import defer from twisted.trial import unittest from twisted.application import service @@ -37,8 +38,20 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper): d.addCallback(_got_size) return d -class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper): - def start(self): +class Helper_fake_upload(offloaded.Helper): + def _make_chk_upload_helper(self, storage_index, r, lp): + si_s = si_b2a(storage_index) + incoming_file = os.path.join(self._chk_incoming, si_s) + encoding_file = os.path.join(self._chk_encoding, si_s) + uh = CHKUploadHelper_fake(storage_index, self, + self._storage_broker, + self._secret_holder, + incoming_file, encoding_file, + r, lp) + return uh + +class Helper_already_uploaded(Helper_fake_upload): + def _check_chk(self, storage_index, results, lp): res = upload.UploadResults() res.uri_extension_hash = hashutil.uri_extension_hash("") @@ -53,7 +66,7 @@ class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper): "size": len(DATA), } res.uri_extension_data = ueb_data - return (res, None) + return defer.succeed(res) class FakeClient(service.MultiService): DEFAULT_ENCODING_PARAMETERS = {"k":25, @@ -101,13 +114,12 @@ class AssistedUpload(unittest.TestCase): # bogus host/port t.setLocation("bogus:1234") - def setUpHelper(self, basedir): + def setUpHelper(self, basedir, helper_class=Helper_fake_upload): fileutil.make_dirs(basedir) - self.helper = h = offloaded.Helper(basedir, - self.storage_broker, - self.secret_holder, - None, None) - h.chk_upload_helper_class = CHKUploadHelper_fake + self.helper = h = helper_class(basedir, + self.storage_broker, + self.secret_holder, + None, None) self.helper_furl = self.tub.registerReference(h) def tearDown(self): @@ -196,8 +208,7 @@ class AssistedUpload(unittest.TestCase): def test_already_uploaded(self): self.basedir = "helper/AssistedUpload/test_already_uploaded" - self.setUpHelper(self.basedir) - self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded + self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded) u = upload.Uploader(self.helper_furl) u.setServiceParent(self.s)