From: Brian Warner <>
Date: Tue, 22 May 2012 04:13:32 +0000 (-0700)
Subject: clean up Helper to make later changes easier

clean up Helper to make later changes easier

Fix up control flow inside the Helper, to make it more friendly for
later refactoring.

diff --git a/src/allmydata/immutable/ b/src/allmydata/immutable/
index 7ff19c38..922750a9 100644
--- a/src/allmydata/immutable/
+++ b/src/allmydata/immutable/
@@ -160,6 +160,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
         self._finished_observers = observer.OneShotObserverList()
+        self._started = time.time()
         d = self._fetcher.when_done()
         d.addCallback(lambda res: self._reader.start())
         d.addCallback(lambda res: self.start_encrypted(self._reader))
@@ -171,31 +172,26 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
             kwargs['facility'] = "tahoe.helper.chk"
         return upload.CHKUploader.log(self, *args, **kwargs)
-    def start(self):
-        self._started = time.time()
-        # determine if we need to upload the file. If so, return ({},self) .
-        # If not, return (UploadResults,None) .
+    def remote_get_version(self):
+        return self.VERSION
+    def remote_upload(self, reader):
+        # reader is an RIEncryptedUploadable. I am specified to return an
+        # UploadResults dictionary.
+        # Log how much ciphertext we need to get.
         self.log("deciding whether to upload the file or not", level=log.NOISY)
         if os.path.exists(self._encoding_file):
             # we have the whole file, and we might be encoding it (or the
             # encode/upload might have failed, and we need to restart it).
             self.log("ciphertext already in place", level=log.UNUSUAL)
-            return (self._results, self)
-        if os.path.exists(self._incoming_file):
+        elif os.path.exists(self._incoming_file):
             # we have some of the file, but not all of it (otherwise we'd be
             # encoding). The caller might be useful.
             self.log("partial ciphertext already present", level=log.UNUSUAL)
-            return (self._results, self)
-        # we don't remember uploading this file
-        self.log("no ciphertext yet", level=log.NOISY)
-        return (self._results, self)
-    def remote_get_version(self):
-        return self.VERSION
-    def remote_upload(self, reader):
-        # reader is an RIEncryptedUploadable. I am specified to return an
-        # UploadResults dictionary.
+        else:
+            # we don't remember uploading this file
+            self.log("no ciphertext yet", level=log.NOISY)
         # let our fetcher pull ciphertext from the reader.
@@ -491,7 +487,6 @@ class Helper(Referenceable):
                  { },
                 "application-version": str(allmydata.__full_version__),
-    chk_upload_helper_class = CHKUploadHelper
     def __init__(self, basedir, storage_broker, secret_holder,
@@ -567,44 +562,15 @@ class Helper(Referenceable):
     def remote_upload_chk(self, storage_index):
         r = upload.UploadResults()
-        si_s = si_b2a(storage_index)
-        lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
-        incoming_file = os.path.join(self._chk_incoming, si_s)
-        encoding_file = os.path.join(self._chk_encoding, si_s)
+        lp = self.log(format="helper: upload_chk query for SI %(si)s",
+                      si=si_b2a(storage_index))
         if storage_index in self._active_uploads:
             self.log("upload is currently active", parent=lp)
             uh = self._active_uploads[storage_index]
-            return uh.start()
-        d = self._check_for_chk_already_in_grid(storage_index, r, lp)
-        def _checked(already_present):
-            if already_present:
-                # the necessary results are placed in the UploadResults
-                self.count("chk_upload_helper.upload_already_present")
-                self.log("file already found in grid", parent=lp)
-                return (r, None)
-            self.count("chk_upload_helper.upload_need_upload")
-            # the file is not present in the grid, by which we mean there are
-            # less than 'N' shares available.
-            self.log("unable to find file in the grid", parent=lp,
-                     level=log.NOISY)
-            # We need an upload helper. Check our active uploads again in
-            # case there was a race.
-            if storage_index in self._active_uploads:
-                self.log("upload is currently active", parent=lp)
-                uh = self._active_uploads[storage_index]
-            else:
-                self.log("creating new upload helper", parent=lp)
-                uh = self.chk_upload_helper_class(storage_index, self,
-                                                  self._storage_broker,
-                                                  self._secret_holder,
-                                                  incoming_file, encoding_file,
-                                                  r, lp)
-                self._active_uploads[storage_index] = uh
-                self._add_upload(uh)
-            return uh.start()
-        d.addCallback(_checked)
+            return (None, uh)
+        d = self._check_chk(storage_index, r, lp)
+        d.addCallback(self._did_chk_check, storage_index, r, lp)
         def _err(f):
             self.log("error while checking for chk-already-in-grid",
                      failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
@@ -612,7 +578,7 @@ class Helper(Referenceable):
         return d
-    def _check_for_chk_already_in_grid(self, storage_index, results, lp):
+    def _check_chk(self, storage_index, results, lp):
         # see if this file is already in the grid
         lp2 = self.log("doing a quick check+UEBfetch",
                        parent=lp, level=log.NOISY)
@@ -628,11 +594,46 @@ class Helper(Referenceable):
                 results.uri_extension_data = ueb_data
                 results.preexisting_shares = len(sharemap)
                 results.pushed_shares = 0
-                return True
-            return False
+                return results
+            return None
         return d
+    def _did_chk_check(self, already_present, storage_index, r, lp):
+        if already_present:
+            # the necessary results are placed in the UploadResults
+            self.count("chk_upload_helper.upload_already_present")
+            self.log("file already found in grid", parent=lp)
+            return (already_present, None)
+        self.count("chk_upload_helper.upload_need_upload")
+        # the file is not present in the grid, by which we mean there are
+        # less than 'N' shares available.
+        self.log("unable to find file in the grid", parent=lp,
+                 level=log.NOISY)
+        # We need an upload helper. Check our active uploads again in
+        # case there was a race.
+        if storage_index in self._active_uploads:
+            self.log("upload is currently active", parent=lp)
+            uh = self._active_uploads[storage_index]
+        else:
+            self.log("creating new upload helper", parent=lp)
+            uh = self._make_chk_upload_helper(storage_index, r, lp)
+            self._active_uploads[storage_index] = uh
+            self._add_upload(uh)
+        return (None, uh)
+    def _make_chk_upload_helper(self, storage_index, r, lp):
+        si_s = si_b2a(storage_index)
+        incoming_file = os.path.join(self._chk_incoming, si_s)
+        encoding_file = os.path.join(self._chk_encoding, si_s)
+        uh = CHKUploadHelper(storage_index, self,
+                             self._storage_broker,
+                             self._secret_holder,
+                             incoming_file, encoding_file,
+                             r, lp)
+        return uh
     def _add_upload(self, uh):
         self._all_uploads[uh] = None
         if self._history:
diff --git a/src/allmydata/test/ b/src/allmydata/test/
index 399b2ec8..c0851f8c 100644
--- a/src/allmydata/test/
+++ b/src/allmydata/test/
@@ -1,4 +1,5 @@
 import os
+from twisted.internet import defer
 from twisted.trial import unittest
 from twisted.application import service
@@ -37,8 +38,20 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
         return d
-class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
-    def start(self):
+class Helper_fake_upload(offloaded.Helper):
+    def _make_chk_upload_helper(self, storage_index, r, lp):
+        si_s = si_b2a(storage_index)
+        incoming_file = os.path.join(self._chk_incoming, si_s)
+        encoding_file = os.path.join(self._chk_encoding, si_s)
+        uh = CHKUploadHelper_fake(storage_index, self,
+                                  self._storage_broker,
+                                  self._secret_holder,
+                                  incoming_file, encoding_file,
+                                  r, lp)
+        return uh
+class Helper_already_uploaded(Helper_fake_upload):
+    def _check_chk(self, storage_index, results, lp):
         res = upload.UploadResults()
         res.uri_extension_hash = hashutil.uri_extension_hash("")
@@ -53,7 +66,7 @@ class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
                     "size": len(DATA),
         res.uri_extension_data = ueb_data
-        return (res, None)
+        return defer.succeed(res)
 class FakeClient(service.MultiService):
@@ -101,13 +114,12 @@ class AssistedUpload(unittest.TestCase):
         # bogus host/port
-    def setUpHelper(self, basedir):
+    def setUpHelper(self, basedir, helper_class=Helper_fake_upload):
-        self.helper = h = offloaded.Helper(basedir,
-                                           self.storage_broker,
-                                           self.secret_holder,
-                                           None, None)
-        h.chk_upload_helper_class = CHKUploadHelper_fake
+        self.helper = h = helper_class(basedir,
+                                       self.storage_broker,
+                                       self.secret_holder,
+                                       None, None)
         self.helper_furl = self.tub.registerReference(h)
     def tearDown(self):
@@ -196,8 +208,7 @@ class AssistedUpload(unittest.TestCase):
     def test_already_uploaded(self):
         self.basedir = "helper/AssistedUpload/test_already_uploaded"
-        self.setUpHelper(self.basedir)
-        self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded
+        self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded)
         u = upload.Uploader(self.helper_furl)