From: Brian Warner Date: Wed, 9 Jan 2008 04:18:54 +0000 (-0700) Subject: offloaded: early code: most of client-side, defined the RemoteInterfaces X-Git-Url: https://git.rkrishnan.org/pf/using.html?a=commitdiff_plain;h=db71bdae9c29384567ed04a0927455d55e7971e2;p=tahoe-lafs%2Ftahoe-lafs.git offloaded: early code: most of client-side, defined the RemoteInterfaces --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index f6670d0e..d3632f00 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -41,7 +41,8 @@ class Client(node.Node, Referenceable, testutil.PollMixin): self.init_lease_secret() self.init_storage() self.init_options() - self.add_service(Uploader()) + helper_furl = self.get_config("helper.furl") + self.add_service(Uploader(helper_furl)) self.add_service(Downloader()) self.add_service(Checker()) diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py new file mode 100644 index 00000000..4bb3758b --- /dev/null +++ b/src/allmydata/offloaded.py @@ -0,0 +1,53 @@ + +from foolscap import RemoteInterface +from foolscap.schema import DictOf, ChoiceOf +from allmydata.interfaces import StorageIndex, Hash + +UploadResults = DictOf(str, str) + +class RIEncryptedUploadable(RemoteInterface): + __remote_name__ = "RIEncryptedUploadable.tahoe.allmydata.com" + + def get_size(): + return int + + def set_segment_size(segment_size=long): + return None + + def read_encrypted(offset=long, length=long): + return str + + def get_plaintext_hashtree_leaves(first=int, last=int): + return ListOf(Hash) + + def get_plaintext_hash(): + return Hash + + +class RIUploadHelper(RemoteInterface): + __remote_name__ = "RIUploadHelper.tahoe.allmydata.com" + + def upload(reader=RIEncryptedUploadable): + return UploadResults + + +class RIHelper(RemoteInterface): + __remote_name__ = "RIHelper.tahoe.allmydata.com" + + def upload(si=StorageIndex): + """See if a file with a given storage index needs uploading. The + helper will ask the appropriate storage servers to see if the file + has already been uploaded. If so, the helper will return a set of + 'upload results' that includes whatever hashes are needed to build + the read-cap, and perhaps a truncated sharemap. + + If the file has not yet been uploaded (or if it was only partially + uploaded), the helper will return an empty upload-results dictionary + and also an RIUploadHelper object that will take care of the upload + process. The client should call upload() on this object and pass it a + reference to an RIEncryptedUploadable object that will provide + ciphertext. When the upload is finished, the upload() method will + finish and return the upload results. + """ + return (UploadResults, ChoiceOf(RIUploadHelper, None)) + diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 70ee2185..a997a68f 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -551,6 +551,66 @@ class LiteralUploader: def close(self): pass +class AssistedUploader(FileUploader): + + def __init__(self, helper, options={}): + self._helper = helper + self._options = options + self._log_number = self._client.log("AssistedUploader starting") + + def set_params(self, encoding_parameters): + pass + + def start(self, uploadable): + uploadable = IUploadable(uploadable) + eu = IEncryptedUploadable(EncryptAnUploadable(uploadable)) + self._encuploadable = eu + d = eu.get_size() + d.addCallback(self._got_size) + # when we get the encryption key, that will also compute the storage + # index, so this only takes one pass. + # TODO: I'm not sure it's cool to switch back and forth between + # the Uploadable and the IEncryptedUploadable that wraps it. + d.addCallback(lambda res: u.get_encryption_key()) + d.addCallback(self._got_encryption_key) + d.addCallback(lambda res: eu.get_storage_index()) + d.addCallback(self._got_storage_index) + d.addCallback(self._contact_helper) + d.addCallback(self._build_readcap) + return d + + def _got_size(self, size): + self._size = size + + def _got_encryption_key(self, key): + self._key = key + + def _got_storage_index(self, storage_index): + self._storage_index = storage_index + + def _contact_helper(self, res): + d = self._helper.callRemote("upload", self._storage_index) + d.addCallback(self._contacted_helper) + return d + def _contacted_helper(self, upload_results, upload_helper): + if upload_helper: + # we need to upload the file + reu = RemoteEncryptedUploabable(self._encuploadable) + d = upload_helper.callRemote("upload", reu) + # this Deferred will fire with the upload results + return d + return upload_results + + def _build_readcap(self, upload_results): + ur = upload_results + u = uri.CHKFileURI(key=self._key, + uri_extension_hash=ur['uri_extension_hash'], + needed_shares=self._needed_shares, + total_shares=self._total_shares, + size=self._size, + ) + return u.to_string() + class ConvergentUploadMixin: # to use this, the class it is mixed in to must have a seekable @@ -636,6 +696,19 @@ class Uploader(service.MultiService): # 'total' is the total number of shares created by encoding. If everybody # has room then this is is how many we will upload. + def __init__(self, helper_furl=None): + self._helper_furl = helper_furl + self._helper = None + + def startService(self): + service.MultiService.startService(self) + if self._helper_furl: + self.parent.tub.connectTo(self._helper_furl, + self._got_helper) + + def _got_helper(self, helper): + self._helper = helper + def upload(self, uploadable, options={}, wait_for_numpeers=None): assert wait_for_numpeers is None or isinstance(wait_for_numpeers, int), wait_for_numpeers # this returns the URI @@ -648,10 +721,14 @@ class Uploader(service.MultiService): uploadable = IUploadable(uploadable) d = uploadable.get_size() def _got_size(size): - uploader_class = self.uploader_class if size <= self.URI_LIT_SIZE_THRESHOLD: - uploader_class = LiteralUploader - uploader = uploader_class(self.parent, options, wait_for_numpeers) + uploader = LiteralUploader(self.parent, options, + wait_for_numpeers) + elif self._helper: + uploader = AssistedUploader(self.parent, options) + else: + uploader = self.uploader_class(self.parent, options, + wait_for_numpeers) uploader.set_params(self.parent.get_encoding_parameters() or self.DEFAULT_ENCODING_PARAMETERS) return uploader.start(uploadable)