]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
clean up Helper to make later changes easier
authorBrian Warner <warner@lothar.com>
Tue, 22 May 2012 04:13:32 +0000 (21:13 -0700)
committerBrian Warner <warner@lothar.com>
Tue, 22 May 2012 04:13:32 +0000 (21:13 -0700)
Fix up control flow inside the Helper, to make it more friendly for
later refactoring.

src/allmydata/immutable/offloaded.py
src/allmydata/test/test_helper.py

index 7ff19c38a44304c922688fafc976537ec33ac2d0..922750a948f7a499c93488faa18bc13cbfa79519 100644 (file)
@@ -160,6 +160,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
         self._finished_observers = observer.OneShotObserverList()
 
+        self._started = time.time()
         d = self._fetcher.when_done()
         d.addCallback(lambda res: self._reader.start())
         d.addCallback(lambda res: self.start_encrypted(self._reader))
@@ -171,31 +172,26 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
             kwargs['facility'] = "tahoe.helper.chk"
         return upload.CHKUploader.log(self, *args, **kwargs)
 
-    def start(self):
-        self._started = time.time()
-        # determine if we need to upload the file. If so, return ({},self) .
-        # If not, return (UploadResults,None) .
+    def remote_get_version(self):
+        return self.VERSION
+
+    def remote_upload(self, reader):
+        # reader is an RIEncryptedUploadable. I am specified to return an
+        # UploadResults dictionary.
+
+        # Log how much ciphertext we need to get.
         self.log("deciding whether to upload the file or not", level=log.NOISY)
         if os.path.exists(self._encoding_file):
             # we have the whole file, and we might be encoding it (or the
             # encode/upload might have failed, and we need to restart it).
             self.log("ciphertext already in place", level=log.UNUSUAL)
-            return (self._results, self)
-        if os.path.exists(self._incoming_file):
+        elif os.path.exists(self._incoming_file):
             # we have some of the file, but not all of it (otherwise we'd be
             # encoding). The caller might be useful.
             self.log("partial ciphertext already present", level=log.UNUSUAL)
-            return (self._results, self)
-        # we don't remember uploading this file
-        self.log("no ciphertext yet", level=log.NOISY)
-        return (self._results, self)
-
-    def remote_get_version(self):
-        return self.VERSION
-
-    def remote_upload(self, reader):
-        # reader is an RIEncryptedUploadable. I am specified to return an
-        # UploadResults dictionary.
+        else:
+            # we don't remember uploading this file
+            self.log("no ciphertext yet", level=log.NOISY)
 
         # let our fetcher pull ciphertext from the reader.
         self._fetcher.add_reader(reader)
@@ -491,7 +487,6 @@ class Helper(Referenceable):
                  { },
                 "application-version": str(allmydata.__full_version__),
                 }
-    chk_upload_helper_class = CHKUploadHelper
     MAX_UPLOAD_STATUSES = 10
 
     def __init__(self, basedir, storage_broker, secret_holder,
@@ -567,44 +562,15 @@ class Helper(Referenceable):
     def remote_upload_chk(self, storage_index):
         self.count("chk_upload_helper.upload_requests")
         r = upload.UploadResults()
-        si_s = si_b2a(storage_index)
-        lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
-        incoming_file = os.path.join(self._chk_incoming, si_s)
-        encoding_file = os.path.join(self._chk_encoding, si_s)
+        lp = self.log(format="helper: upload_chk query for SI %(si)s",
+                      si=si_b2a(storage_index))
         if storage_index in self._active_uploads:
             self.log("upload is currently active", parent=lp)
             uh = self._active_uploads[storage_index]
-            return uh.start()
-
-        d = self._check_for_chk_already_in_grid(storage_index, r, lp)
-        def _checked(already_present):
-            if already_present:
-                # the necessary results are placed in the UploadResults
-                self.count("chk_upload_helper.upload_already_present")
-                self.log("file already found in grid", parent=lp)
-                return (r, None)
-
-            self.count("chk_upload_helper.upload_need_upload")
-            # the file is not present in the grid, by which we mean there are
-            # less than 'N' shares available.
-            self.log("unable to find file in the grid", parent=lp,
-                     level=log.NOISY)
-            # We need an upload helper. Check our active uploads again in
-            # case there was a race.
-            if storage_index in self._active_uploads:
-                self.log("upload is currently active", parent=lp)
-                uh = self._active_uploads[storage_index]
-            else:
-                self.log("creating new upload helper", parent=lp)
-                uh = self.chk_upload_helper_class(storage_index, self,
-                                                  self._storage_broker,
-                                                  self._secret_holder,
-                                                  incoming_file, encoding_file,
-                                                  r, lp)
-                self._active_uploads[storage_index] = uh
-                self._add_upload(uh)
-            return uh.start()
-        d.addCallback(_checked)
+            return (None, uh)
+
+        d = self._check_chk(storage_index, r, lp)
+        d.addCallback(self._did_chk_check, storage_index, r, lp)
         def _err(f):
             self.log("error while checking for chk-already-in-grid",
                      failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
@@ -612,7 +578,7 @@ class Helper(Referenceable):
         d.addErrback(_err)
         return d
 
-    def _check_for_chk_already_in_grid(self, storage_index, results, lp):
+    def _check_chk(self, storage_index, results, lp):
         # see if this file is already in the grid
         lp2 = self.log("doing a quick check+UEBfetch",
                        parent=lp, level=log.NOISY)
@@ -628,11 +594,46 @@ class Helper(Referenceable):
                 results.uri_extension_data = ueb_data
                 results.preexisting_shares = len(sharemap)
                 results.pushed_shares = 0
-                return True
-            return False
+                return results
+            return None
         d.addCallback(_checked)
         return d
 
+    def _did_chk_check(self, already_present, storage_index, r, lp):
+        if already_present:
+            # the necessary results are placed in the UploadResults
+            self.count("chk_upload_helper.upload_already_present")
+            self.log("file already found in grid", parent=lp)
+            return (already_present, None)
+
+        self.count("chk_upload_helper.upload_need_upload")
+        # the file is not present in the grid, by which we mean there are
+        # less than 'N' shares available.
+        self.log("unable to find file in the grid", parent=lp,
+                 level=log.NOISY)
+        # We need an upload helper. Check our active uploads again in
+        # case there was a race.
+        if storage_index in self._active_uploads:
+            self.log("upload is currently active", parent=lp)
+            uh = self._active_uploads[storage_index]
+        else:
+            self.log("creating new upload helper", parent=lp)
+            uh = self._make_chk_upload_helper(storage_index, r, lp)
+            self._active_uploads[storage_index] = uh
+            self._add_upload(uh)
+        return (None, uh)
+
+    def _make_chk_upload_helper(self, storage_index, r, lp):
+        si_s = si_b2a(storage_index)
+        incoming_file = os.path.join(self._chk_incoming, si_s)
+        encoding_file = os.path.join(self._chk_encoding, si_s)
+        uh = CHKUploadHelper(storage_index, self,
+                             self._storage_broker,
+                             self._secret_holder,
+                             incoming_file, encoding_file,
+                             r, lp)
+        return uh
+
     def _add_upload(self, uh):
         self._all_uploads[uh] = None
         if self._history:
index 399b2ec88f75ddd4b70267cdd178e0ee07d4f57f..c0851f8cae3a6a5b75439d4d603befd927bdb007 100644 (file)
@@ -1,4 +1,5 @@
 import os
+from twisted.internet import defer
 from twisted.trial import unittest
 from twisted.application import service
 
@@ -37,8 +38,20 @@ class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
         d.addCallback(_got_size)
         return d
 
-class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
-    def start(self):
+class Helper_fake_upload(offloaded.Helper):
+    def _make_chk_upload_helper(self, storage_index, r, lp):
+        si_s = si_b2a(storage_index)
+        incoming_file = os.path.join(self._chk_incoming, si_s)
+        encoding_file = os.path.join(self._chk_encoding, si_s)
+        uh = CHKUploadHelper_fake(storage_index, self,
+                                  self._storage_broker,
+                                  self._secret_holder,
+                                  incoming_file, encoding_file,
+                                  r, lp)
+        return uh
+
+class Helper_already_uploaded(Helper_fake_upload):
+    def _check_chk(self, storage_index, results, lp):
         res = upload.UploadResults()
         res.uri_extension_hash = hashutil.uri_extension_hash("")
 
@@ -53,7 +66,7 @@ class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
                     "size": len(DATA),
                     }
         res.uri_extension_data = ueb_data
-        return (res, None)
+        return defer.succeed(res)
 
 class FakeClient(service.MultiService):
     DEFAULT_ENCODING_PARAMETERS = {"k":25,
@@ -101,13 +114,12 @@ class AssistedUpload(unittest.TestCase):
         # bogus host/port
         t.setLocation("bogus:1234")
 
-    def setUpHelper(self, basedir):
+    def setUpHelper(self, basedir, helper_class=Helper_fake_upload):
         fileutil.make_dirs(basedir)
-        self.helper = h = offloaded.Helper(basedir,
-                                           self.storage_broker,
-                                           self.secret_holder,
-                                           None, None)
-        h.chk_upload_helper_class = CHKUploadHelper_fake
+        self.helper = h = helper_class(basedir,
+                                       self.storage_broker,
+                                       self.secret_holder,
+                                       None, None)
         self.helper_furl = self.tub.registerReference(h)
 
     def tearDown(self):
@@ -196,8 +208,7 @@ class AssistedUpload(unittest.TestCase):
 
     def test_already_uploaded(self):
         self.basedir = "helper/AssistedUpload/test_already_uploaded"
-        self.setUpHelper(self.basedir)
-        self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded
+        self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded)
         u = upload.Uploader(self.helper_furl)
         u.setServiceParent(self.s)