offloaded: create a Helper if 'run_helper' is non-empty
authorBrian Warner <warner@allmydata.com>
Thu, 10 Jan 2008 03:25:05 +0000 (20:25 -0700)
committerBrian Warner <warner@allmydata.com>
Thu, 10 Jan 2008 03:25:05 +0000 (20:25 -0700)
src/allmydata/client.py
src/allmydata/offloaded.py

index d3632f005686eab76be49946c27f5e841ad85209..7ef870087735cce59ef431685b9dc90eb312c8d9 100644 (file)
@@ -14,6 +14,7 @@ from allmydata.storage import StorageServer
 from allmydata.upload import Uploader
 from allmydata.download import Downloader
 from allmydata.checker import Checker
+from allmydata.offloaded import Helper
 from allmydata.control import ControlServer
 from allmydata.introducer import IntroducerClient
 from allmydata.util import hashutil, idlib, testutil
@@ -45,6 +46,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
         self.add_service(Uploader(helper_furl))
         self.add_service(Downloader())
         self.add_service(Checker())
+        # ControlServer and Helper are attached after Tub startup
 
         self.introducer_furl = self.get_config("introducer.furl", required=True)
 
@@ -135,6 +137,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
         ic.setServiceParent(self)
 
         self.register_control()
+        self.register_helper()
 
     def register_control(self):
         c = ControlServer()
@@ -142,6 +145,20 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
         control_url = self.tub.registerReference(c)
         self.write_private_config("control.furl", control_url + "\n")
 
+    def register_helper(self):
+        run_helper = self.get_config("run_helper")
+        if not run_helper:
+            return
+        h = Helper(os.path.join(self.basedir, "helper"))
+        h.setServiceParent(self)
+        helper_furl = self.tub.registerReference(h)
+        # TODO: this is confusing. BASEDIR/private/helper.furl is created by
+        # the helper. BASEDIR/helper.furl is consumed by the client who wants
+        # to use the helper. I like having the filename be the same, since
+        # that makes 'cp' work smoothly, but the difference between config
+        # inputs and generated outputs is hard to see.
+        self.write_private_config("helper.furl", helper_furl + "\n")
+
     def remote_get_versions(self):
         return str(allmydata.__version__), str(self.OLDEST_SUPPORTED_VERSION)
 
index d23cb4921437f22b3e4aab75561ab78cba1b56b3..94d7354d7798edd087144d88ff3d8670b3caccad 100644 (file)
@@ -1,7 +1,10 @@
 
-from foolscap import RemoteInterface
+from zope.interface import implements
+from twisted.application import service
+from foolscap import RemoteInterface, Referenceable
 from foolscap.schema import DictOf, ChoiceOf, ListOf
 from allmydata.interfaces import StorageIndex, Hash
+from allmydata.util import hashutil
 
 UploadResults = DictOf(str, str)
 
@@ -51,3 +54,66 @@ class RIHelper(RemoteInterface):
         """
         return (UploadResults, ChoiceOf(RIUploadHelper, None))
 
+
+class CHKUploadHelper(Referenceable):
+    """I am the helper-server -side counterpart to AssistedUploader. I handle
+    peer selection, encoding, and share pushing. I read ciphertext from the
+    remote AssistedUploader.
+    """
+    implements(RIUploadHelper)
+
+    def __init__(self, storage_index, helper):
+        self._finished = False
+        self._storage_index = storage_index
+        self._helper = helper
+        self._log_number = self._helper.log("CHKUploadHelper starting")
+
+    def log(self, msg, parent=None):
+        if parent is None:
+            parent = self._log_number
+        return self._client.log(msg, parent=parent)
+
+    def start(self):
+        # determine if we need to upload the file. If so, return ({},self) .
+        # If not, return (UploadResults,None) .
+        return ({'uri_extension_hash': hashutil.uri_extension_hash("")},self)
+
+    def remote_upload(self, reader):
+        # reader is an RIEncryptedUploadable. I am specified to return an
+        # UploadResults dictionary.
+        return {'uri_extension_hash': hashutil.uri_extension_hash("")}
+
+    def finished(self, res):
+        self._finished = True
+        self._helper.upload_finished(self._storage_index)
+
+
+class Helper(Referenceable, service.MultiService):
+    implements(RIHelper)
+    # this is the non-distributed version. When we need to have multiple
+    # helpers, this object will query the farm to see if anyone has the
+    # storage_index of interest, and send the request off to them.
+
+    chk_upload_helper_class = CHKUploadHelper
+
+    def __init__(self, basedir):
+        self._basedir = basedir
+        self._active_uploads = {}
+        service.MultiService.__init__(self)
+
+    def log(self, msg, **kwargs):
+        if 'facility' not in kwargs:
+            kwargs['facility'] = "helper"
+        return self.parent.log(msg, **kwargs)
+
+    def remote_upload(self, storage_index):
+        # TODO: look on disk
+        if storage_index in self._active_uploads:
+            uh = self._active_uploads[storage_index]
+        else:
+            uh = CHKUploadHelper(storage_index, self)
+            self._active_uploads[storage_index] = uh
+        return uh.start()
+
+    def upload_finished(self, storage_index):
+        del self._active_uploads[storage_index]