]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
offloaded: early code: most of client-side, defined the RemoteInterfaces
authorBrian Warner <warner@allmydata.com>
Wed, 9 Jan 2008 04:18:54 +0000 (21:18 -0700)
committerBrian Warner <warner@allmydata.com>
Wed, 9 Jan 2008 04:18:54 +0000 (21:18 -0700)
src/allmydata/client.py
src/allmydata/offloaded.py [new file with mode: 0644]
src/allmydata/upload.py

index f6670d0e79a6c1c98d7af01e0b6236b0c747eaed..d3632f005686eab76be49946c27f5e841ad85209 100644 (file)
@@ -41,7 +41,8 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
         self.init_lease_secret()
         self.init_storage()
         self.init_options()
-        self.add_service(Uploader())
+        helper_furl = self.get_config("helper.furl")
+        self.add_service(Uploader(helper_furl))
         self.add_service(Downloader())
         self.add_service(Checker())
 
diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py
new file mode 100644 (file)
index 0000000..4bb3758
--- /dev/null
@@ -0,0 +1,53 @@
+
+from foolscap import RemoteInterface
+from foolscap.schema import DictOf, ChoiceOf
+from allmydata.interfaces import StorageIndex, Hash
+
+UploadResults = DictOf(str, str)
+
+class RIEncryptedUploadable(RemoteInterface):
+    __remote_name__ = "RIEncryptedUploadable.tahoe.allmydata.com"
+
+    def get_size():
+        return int
+
+    def set_segment_size(segment_size=long):
+        return None
+
+    def read_encrypted(offset=long, length=long):
+        return str
+
+    def get_plaintext_hashtree_leaves(first=int, last=int):
+        return ListOf(Hash)
+
+    def get_plaintext_hash():
+        return Hash
+
+
+class RIUploadHelper(RemoteInterface):
+    __remote_name__ = "RIUploadHelper.tahoe.allmydata.com"
+
+    def upload(reader=RIEncryptedUploadable):
+        return UploadResults
+
+
+class RIHelper(RemoteInterface):
+    __remote_name__ = "RIHelper.tahoe.allmydata.com"
+
+    def upload(si=StorageIndex):
+        """See if a file with a given storage index needs uploading. The
+        helper will ask the appropriate storage servers to see if the file
+        has already been uploaded. If so, the helper will return a set of
+        'upload results' that includes whatever hashes are needed to build
+        the read-cap, and perhaps a truncated sharemap.
+
+        If the file has not yet been uploaded (or if it was only partially
+        uploaded), the helper will return an empty upload-results dictionary
+        and also an RIUploadHelper object that will take care of the upload
+        process. The client should call upload() on this object and pass it a
+        reference to an RIEncryptedUploadable object that will provide
+        ciphertext. When the upload is finished, the upload() method will
+        finish and return the upload results.
+        """
+        return (UploadResults, ChoiceOf(RIUploadHelper, None))
+
index 70ee2185d00fb042b6d5910536c661a384b251bd..a997a68fb63b39bffcdfbf8fbfb3cf712f6dc797 100644 (file)
@@ -551,6 +551,66 @@ class LiteralUploader:
     def close(self):
         pass
 
+class AssistedUploader(FileUploader):
+
+    def __init__(self, helper, options={}):
+        self._helper = helper
+        self._options = options
+        self._log_number = self._client.log("AssistedUploader starting")
+
+    def set_params(self, encoding_parameters):
+        pass
+
+    def start(self, uploadable):
+        uploadable = IUploadable(uploadable)
+        eu = IEncryptedUploadable(EncryptAnUploadable(uploadable))
+        self._encuploadable = eu
+        d = eu.get_size()
+        d.addCallback(self._got_size)
+        # when we get the encryption key, that will also compute the storage
+        # index, so this only takes one pass.
+        # TODO: I'm not sure it's cool to switch back and forth between
+        # the Uploadable and the IEncryptedUploadable that wraps it.
+        d.addCallback(lambda res: u.get_encryption_key())
+        d.addCallback(self._got_encryption_key)
+        d.addCallback(lambda res: eu.get_storage_index())
+        d.addCallback(self._got_storage_index)
+        d.addCallback(self._contact_helper)
+        d.addCallback(self._build_readcap)
+        return d
+
+    def _got_size(self, size):
+        self._size = size
+
+    def _got_encryption_key(self, key):
+        self._key = key
+
+    def _got_storage_index(self, storage_index):
+        self._storage_index = storage_index
+
+    def _contact_helper(self, res):
+        d = self._helper.callRemote("upload", self._storage_index)
+        d.addCallback(self._contacted_helper)
+        return d
+    def _contacted_helper(self, upload_results, upload_helper):
+        if upload_helper:
+            # we need to upload the file
+            reu = RemoteEncryptedUploabable(self._encuploadable)
+            d = upload_helper.callRemote("upload", reu)
+            # this Deferred will fire with the upload results
+            return d
+        return upload_results
+
+    def _build_readcap(self, upload_results):
+        ur = upload_results
+        u = uri.CHKFileURI(key=self._key,
+                           uri_extension_hash=ur['uri_extension_hash'],
+                           needed_shares=self._needed_shares,
+                           total_shares=self._total_shares,
+                           size=self._size,
+                           )
+        return u.to_string()
+
 
 class ConvergentUploadMixin:
     # to use this, the class it is mixed in to must have a seekable
@@ -636,6 +696,19 @@ class Uploader(service.MultiService):
     # 'total' is the total number of shares created by encoding. If everybody
     # has room then this is is how many we will upload.
 
+    def __init__(self, helper_furl=None):
+        self._helper_furl = helper_furl
+        self._helper = None
+
+    def startService(self):
+        service.MultiService.startService(self)
+        if self._helper_furl:
+            self.parent.tub.connectTo(self._helper_furl,
+                                      self._got_helper)
+
+    def _got_helper(self, helper):
+        self._helper = helper
+
     def upload(self, uploadable, options={}, wait_for_numpeers=None):
         assert wait_for_numpeers is None or isinstance(wait_for_numpeers, int), wait_for_numpeers
         # this returns the URI
@@ -648,10 +721,14 @@ class Uploader(service.MultiService):
         uploadable = IUploadable(uploadable)
         d = uploadable.get_size()
         def _got_size(size):
-            uploader_class = self.uploader_class
             if size <= self.URI_LIT_SIZE_THRESHOLD:
-                uploader_class = LiteralUploader
-            uploader = uploader_class(self.parent, options, wait_for_numpeers)
+                uploader = LiteralUploader(self.parent, options,
+                                           wait_for_numpeers)
+            elif self._helper:
+                uploader = AssistedUploader(self.parent, options)
+            else:
+                uploader = self.uploader_class(self.parent, options,
+                                               wait_for_numpeers)
             uploader.set_params(self.parent.get_encoding_parameters()
                                 or self.DEFAULT_ENCODING_PARAMETERS)
             return uploader.start(uploadable)