From: Brian Warner Date: Thu, 10 Jan 2008 03:25:05 +0000 (-0700) Subject: offloaded: create a Helper if 'run_helper' is non-empty X-Git-Url: https://git.rkrishnan.org/vdrive/%22file:/reliability?a=commitdiff_plain;h=2ad84eeed8129bd828fed6c489a5efd4153d9cd4;p=tahoe-lafs%2Ftahoe-lafs.git offloaded: create a Helper if 'run_helper' is non-empty --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index d3632f00..7ef87008 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -14,6 +14,7 @@ from allmydata.storage import StorageServer from allmydata.upload import Uploader from allmydata.download import Downloader from allmydata.checker import Checker +from allmydata.offloaded import Helper from allmydata.control import ControlServer from allmydata.introducer import IntroducerClient from allmydata.util import hashutil, idlib, testutil @@ -45,6 +46,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin): self.add_service(Uploader(helper_furl)) self.add_service(Downloader()) self.add_service(Checker()) + # ControlServer and Helper are attached after Tub startup self.introducer_furl = self.get_config("introducer.furl", required=True) @@ -135,6 +137,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin): ic.setServiceParent(self) self.register_control() + self.register_helper() def register_control(self): c = ControlServer() @@ -142,6 +145,20 @@ class Client(node.Node, Referenceable, testutil.PollMixin): control_url = self.tub.registerReference(c) self.write_private_config("control.furl", control_url + "\n") + def register_helper(self): + run_helper = self.get_config("run_helper") + if not run_helper: + return + h = Helper(os.path.join(self.basedir, "helper")) + h.setServiceParent(self) + helper_furl = self.tub.registerReference(h) + # TODO: this is confusing. BASEDIR/private/helper.furl is created by + # the helper. BASEDIR/helper.furl is consumed by the client who wants + # to use the helper. I like having the filename be the same, since + # that makes 'cp' work smoothly, but the difference between config + # inputs and generated outputs is hard to see. + self.write_private_config("helper.furl", helper_furl + "\n") + def remote_get_versions(self): return str(allmydata.__version__), str(self.OLDEST_SUPPORTED_VERSION) diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index d23cb492..94d7354d 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -1,7 +1,10 @@ -from foolscap import RemoteInterface +from zope.interface import implements +from twisted.application import service +from foolscap import RemoteInterface, Referenceable from foolscap.schema import DictOf, ChoiceOf, ListOf from allmydata.interfaces import StorageIndex, Hash +from allmydata.util import hashutil UploadResults = DictOf(str, str) @@ -51,3 +54,66 @@ class RIHelper(RemoteInterface): """ return (UploadResults, ChoiceOf(RIUploadHelper, None)) + +class CHKUploadHelper(Referenceable): + """I am the helper-server -side counterpart to AssistedUploader. I handle + peer selection, encoding, and share pushing. I read ciphertext from the + remote AssistedUploader. + """ + implements(RIUploadHelper) + + def __init__(self, storage_index, helper): + self._finished = False + self._storage_index = storage_index + self._helper = helper + self._log_number = self._helper.log("CHKUploadHelper starting") + + def log(self, msg, parent=None): + if parent is None: + parent = self._log_number + return self._client.log(msg, parent=parent) + + def start(self): + # determine if we need to upload the file. If so, return ({},self) . + # If not, return (UploadResults,None) . + return ({'uri_extension_hash': hashutil.uri_extension_hash("")},self) + + def remote_upload(self, reader): + # reader is an RIEncryptedUploadable. I am specified to return an + # UploadResults dictionary. + return {'uri_extension_hash': hashutil.uri_extension_hash("")} + + def finished(self, res): + self._finished = True + self._helper.upload_finished(self._storage_index) + + +class Helper(Referenceable, service.MultiService): + implements(RIHelper) + # this is the non-distributed version. When we need to have multiple + # helpers, this object will query the farm to see if anyone has the + # storage_index of interest, and send the request off to them. + + chk_upload_helper_class = CHKUploadHelper + + def __init__(self, basedir): + self._basedir = basedir + self._active_uploads = {} + service.MultiService.__init__(self) + + def log(self, msg, **kwargs): + if 'facility' not in kwargs: + kwargs['facility'] = "helper" + return self.parent.log(msg, **kwargs) + + def remote_upload(self, storage_index): + # TODO: look on disk + if storage_index in self._active_uploads: + uh = self._active_uploads[storage_index] + else: + uh = CHKUploadHelper(storage_index, self) + self._active_uploads[storage_index] = uh + return uh.start() + + def upload_finished(self, storage_index): + del self._active_uploads[storage_index]