offloaded: more test coverage on client side, change interfaces a bit
authorBrian Warner <warner@lothar.com>
Fri, 11 Jan 2008 11:53:37 +0000 (04:53 -0700)
committerBrian Warner <warner@lothar.com>
Fri, 11 Jan 2008 11:53:37 +0000 (04:53 -0700)
src/allmydata/interfaces.py
src/allmydata/offloaded.py
src/allmydata/test/test_helper.py
src/allmydata/upload.py

index ec29fdd951ef532c138eafb9ad027a4ebce5aed8..b37e42400a5bb6f9117a7bee9721bdd0177bd889 100644 (file)
@@ -1278,7 +1278,7 @@ class RIEncryptedUploadable(RemoteInterface):
         return Hash
 
 
-class RIUploadHelper(RemoteInterface):
+class RICHKUploadHelper(RemoteInterface):
     __remote_name__ = "RIUploadHelper.tahoe.allmydata.com"
 
     def upload(reader=RIEncryptedUploadable):
@@ -1288,7 +1288,7 @@ class RIUploadHelper(RemoteInterface):
 class RIHelper(RemoteInterface):
     __remote_name__ = "RIHelper.tahoe.allmydata.com"
 
-    def upload(si=StorageIndex):
+    def upload_chk(si=StorageIndex):
         """See if a file with a given storage index needs uploading. The
         helper will ask the appropriate storage servers to see if the file
         has already been uploaded. If so, the helper will return a set of
@@ -1297,10 +1297,10 @@ class RIHelper(RemoteInterface):
 
         If the file has not yet been uploaded (or if it was only partially
         uploaded), the helper will return an empty upload-results dictionary
-        and also an RIUploadHelper object that will take care of the upload
-        process. The client should call upload() on this object and pass it a
-        reference to an RIEncryptedUploadable object that will provide
-        ciphertext. When the upload is finished, the upload() method will
-        finish and return the upload results.
+        and also an RICHKUploadHelper object that will take care of the
+        upload process. The client should call upload() on this object and
+        pass it a reference to an RIEncryptedUploadable object that will
+        provide ciphertext. When the upload is finished, the upload() method
+        will finish and return the upload results.
         """
-        return (UploadResults, ChoiceOf(RIUploadHelper, None))
+        return (UploadResults, ChoiceOf(RICHKUploadHelper, None))
index 604f306fbbed7182cab8af176e97bf624e00997c..be60ad817eadd70f3cf341514c041d6339cfd7ac 100644 (file)
@@ -13,7 +13,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
     peer selection, encoding, and share pushing. I read ciphertext from the
     remote AssistedUploader.
     """
-    implements(interfaces.RIUploadHelper)
+    implements(interfaces.RICHKUploadHelper)
 
     def __init__(self, storage_index, helper):
         self._finished = False
@@ -30,7 +30,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
     def start(self):
         # determine if we need to upload the file. If so, return ({},self) .
         # If not, return (UploadResults,None) .
-        return ({'uri_extension_hash': hashutil.uri_extension_hash("")},self)
+        #return ({'uri_extension_hash': hashutil.uri_extension_hash("")},self)
+        return ({}, self)
 
     def remote_upload(self, reader):
         # reader is an RIEncryptedUploadable. I am specified to return an
@@ -39,6 +40,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         eu = CiphertextReader(reader, self._storage_index)
         d = self.start_encrypted(eu)
         def _done(res):
+            self.finished(self._storage_index)
             (uri_extension_hash, needed_shares, total_shares, size) = res
             return {'uri_extension_hash': uri_extension_hash}
         d.addCallback(_done)
@@ -78,8 +80,10 @@ class CiphertextReader:
 class Helper(Referenceable, service.MultiService):
     implements(interfaces.RIHelper)
     # this is the non-distributed version. When we need to have multiple
-    # helpers, this object will query the farm to see if anyone has the
-    # storage_index of interest, and send the request off to them.
+    # helpers, this object will become the HelperCoordinator, and will query
+    # the farm of Helpers to see if anyone has the storage_index of interest,
+    # and send the request off to them. If nobody has it, we'll choose a
+    # helper at random.
 
     chk_upload_helper_class = CHKUploadHelper
 
@@ -93,7 +97,7 @@ class Helper(Referenceable, service.MultiService):
             kwargs['facility'] = "helper"
         return self.parent.log(msg, **kwargs)
 
-    def remote_upload(self, storage_index):
+    def remote_upload_chk(self, storage_index):
         # TODO: look on disk
         if storage_index in self._active_uploads:
             uh = self._active_uploads[storage_index]
index a5d5077102db7a9450a0943312010f4a729b7cec..18dfd99d8b09d345c6cbdd4ba32d81e533e477aa 100644 (file)
@@ -8,12 +8,20 @@ from foolscap.logging import log
 from allmydata import upload, offloaded
 from allmydata.util import hashutil
 
-class FakeCHKUploadHelper(offloaded.CHKUploadHelper):
-    def remote_upload(self, reader):
-        return {'uri_extension_hash': hashutil.uri_extension_hash("")}
+class CHKUploadHelper_fake(offloaded.CHKUploadHelper):
+    def start_encrypted(self, eu):
+        needed_shares, happy, total_shares = self._encoding_parameters
+        d = eu.get_size()
+        def _got_size(size):
+            return (hashutil.uri_extension_hash(""),
+                    needed_shares, total_shares, size)
+        d.addCallback(_got_size)
+        return d
 
-class FakeHelper(offloaded.Helper):
-    chk_upload_helper_class = FakeCHKUploadHelper
+class CHKUploadHelper_already_uploaded(offloaded.CHKUploadHelper):
+    def start(self):
+        res = {'uri_extension_hash': hashutil.uri_extension_hash("")}
+        return (res, None)
 
 class FakeClient(service.MultiService):
     def log(self, msg, **kwargs):
@@ -42,8 +50,8 @@ class AssistedUpload(unittest.TestCase):
         # bogus host/port
         t.setLocation("bogus:1234")
 
-
-        h = FakeHelper(".")
+        self.helper = h = offloaded.Helper(".")
+        h.chk_upload_helper_class = CHKUploadHelper_fake
         h.setServiceParent(self.s)
         self.helper_furl = t.registerReference(h)
 
@@ -75,3 +83,26 @@ class AssistedUpload(unittest.TestCase):
 
         return d
 
+
+    def test_already_uploaded(self):
+        self.helper.chk_upload_helper_class = CHKUploadHelper_already_uploaded
+        u = upload.Uploader(self.helper_furl)
+        u.setServiceParent(self.s)
+
+        # wait a few turns
+        d = eventual.fireEventually()
+        d.addCallback(eventual.fireEventually)
+        d.addCallback(eventual.fireEventually)
+
+        def _ready(res):
+            assert u._helper
+
+            DATA = "I need help\n" * 1000
+            return u.upload_data(DATA)
+        d.addCallback(_ready)
+        def _uploaded(uri):
+            assert "CHK" in uri
+        d.addCallback(_uploaded)
+
+        return d
+
index 5c3a491d3f2c901ddef6c08b4e78c76ad96c7f0e..307894781f8516bcce9088c3059e5129bae24455 100644 (file)
@@ -611,16 +611,19 @@ class AssistedUploader:
         self._storage_index = storage_index
 
     def _contact_helper(self, res):
-        d = self._helper.callRemote("upload", self._storage_index)
+        self.log("contacting helper..")
+        d = self._helper.callRemote("upload_chk", self._storage_index)
         d.addCallback(self._contacted_helper)
         return d
     def _contacted_helper(self, (upload_results, upload_helper)):
         if upload_helper:
+            self.log("helper says we need to upload")
             # we need to upload the file
             reu = RemoteEncryptedUploabable(self._encuploadable)
             d = upload_helper.callRemote("upload", reu)
             # this Deferred will fire with the upload results
             return d
+        self.log("helper says file is already uploaded")
         return upload_results
 
     def _build_readcap(self, upload_results):