From: Brian Warner Date: Thu, 10 Jan 2008 04:25:47 +0000 (-0700) Subject: offloaded: move interfaces to interfaces.py, start implementing backend X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/COPYING.TGPPL.html?a=commitdiff_plain;h=e825406fc2916e96effbd03236f536a3ea4e23c1;p=tahoe-lafs%2Ftahoe-lafs.git offloaded: move interfaces to interfaces.py, start implementing backend --- diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 61b2f07a..ec29fdd9 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1256,3 +1256,51 @@ class RIControlClient(RemoteInterface): """ return DictOf(Nodeid, float) + +UploadResults = DictOf(str, str) + +class RIEncryptedUploadable(RemoteInterface): + __remote_name__ = "RIEncryptedUploadable.tahoe.allmydata.com" + + def get_size(): + return int + + def set_segment_size(segment_size=long): + return None + + def read_encrypted(offset=long, length=long): + return str + + def get_plaintext_hashtree_leaves(first=int, last=int): + return ListOf(Hash) + + def get_plaintext_hash(): + return Hash + + +class RIUploadHelper(RemoteInterface): + __remote_name__ = "RIUploadHelper.tahoe.allmydata.com" + + def upload(reader=RIEncryptedUploadable): + return UploadResults + + +class RIHelper(RemoteInterface): + __remote_name__ = "RIHelper.tahoe.allmydata.com" + + def upload(si=StorageIndex): + """See if a file with a given storage index needs uploading. The + helper will ask the appropriate storage servers to see if the file + has already been uploaded. If so, the helper will return a set of + 'upload results' that includes whatever hashes are needed to build + the read-cap, and perhaps a truncated sharemap. + + If the file has not yet been uploaded (or if it was only partially + uploaded), the helper will return an empty upload-results dictionary + and also an RIUploadHelper object that will take care of the upload + process. The client should call upload() on this object and pass it a + reference to an RIEncryptedUploadable object that will provide + ciphertext. When the upload is finished, the upload() method will + finish and return the upload results. + """ + return (UploadResults, ChoiceOf(RIUploadHelper, None)) diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index 94d7354d..604f306f 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -1,66 +1,19 @@ from zope.interface import implements from twisted.application import service -from foolscap import RemoteInterface, Referenceable -from foolscap.schema import DictOf, ChoiceOf, ListOf -from allmydata.interfaces import StorageIndex, Hash +from twisted.internet import defer +from foolscap import Referenceable from allmydata.util import hashutil +from allmydata import upload, interfaces -UploadResults = DictOf(str, str) -class RIEncryptedUploadable(RemoteInterface): - __remote_name__ = "RIEncryptedUploadable.tahoe.allmydata.com" - def get_size(): - return int - - def set_segment_size(segment_size=long): - return None - - def read_encrypted(offset=long, length=long): - return str - - def get_plaintext_hashtree_leaves(first=int, last=int): - return ListOf(Hash) - - def get_plaintext_hash(): - return Hash - - -class RIUploadHelper(RemoteInterface): - __remote_name__ = "RIUploadHelper.tahoe.allmydata.com" - - def upload(reader=RIEncryptedUploadable): - return UploadResults - - -class RIHelper(RemoteInterface): - __remote_name__ = "RIHelper.tahoe.allmydata.com" - - def upload(si=StorageIndex): - """See if a file with a given storage index needs uploading. The - helper will ask the appropriate storage servers to see if the file - has already been uploaded. If so, the helper will return a set of - 'upload results' that includes whatever hashes are needed to build - the read-cap, and perhaps a truncated sharemap. - - If the file has not yet been uploaded (or if it was only partially - uploaded), the helper will return an empty upload-results dictionary - and also an RIUploadHelper object that will take care of the upload - process. The client should call upload() on this object and pass it a - reference to an RIEncryptedUploadable object that will provide - ciphertext. When the upload is finished, the upload() method will - finish and return the upload results. - """ - return (UploadResults, ChoiceOf(RIUploadHelper, None)) - - -class CHKUploadHelper(Referenceable): +class CHKUploadHelper(Referenceable, upload.CHKUploader): """I am the helper-server -side counterpart to AssistedUploader. I handle peer selection, encoding, and share pushing. I read ciphertext from the remote AssistedUploader. """ - implements(RIUploadHelper) + implements(interfaces.RIUploadHelper) def __init__(self, storage_index, helper): self._finished = False @@ -68,10 +21,11 @@ class CHKUploadHelper(Referenceable): self._helper = helper self._log_number = self._helper.log("CHKUploadHelper starting") - def log(self, msg, parent=None): - if parent is None: - parent = self._log_number - return self._client.log(msg, parent=parent) + self._client = helper.parent + self._wait_for_numpeers = None + self._options = {} + + self.set_params( (3,7,10) ) # GACK def start(self): # determine if we need to upload the file. If so, return ({},self) . @@ -81,15 +35,48 @@ class CHKUploadHelper(Referenceable): def remote_upload(self, reader): # reader is an RIEncryptedUploadable. I am specified to return an # UploadResults dictionary. - return {'uri_extension_hash': hashutil.uri_extension_hash("")} + + eu = CiphertextReader(reader, self._storage_index) + d = self.start_encrypted(eu) + def _done(res): + (uri_extension_hash, needed_shares, total_shares, size) = res + return {'uri_extension_hash': uri_extension_hash} + d.addCallback(_done) + return d def finished(self, res): self._finished = True self._helper.upload_finished(self._storage_index) +class CiphertextReader: + implements(interfaces.IEncryptedUploadable) + + def __init__(self, remote_reader, storage_index): + self.rr = remote_reader + self.storage_index = storage_index + + def get_size(self): + return self.rr.callRemote("get_size") + def get_storage_index(self): + return defer.succeed(self.storage_index) + def set_segment_size(self, segment_size): + return self.rr.callRemote("set_segment_size", segment_size) + def set_serialized_encoding_parameters(self, params): + pass # ?? + def read_encrypted(self, length): + return self.rr.callRemote("read_encrypted", length) + def get_plaintext_hashtree_leaves(self, first, last, num_segments): + return self.rr.callRemote("get_plaintext_hashtree_leaves", + first, last, num_segments) + def get_plaintext_hash(self): + return self.rr.callRemote("get_plaintext_hash") + def close(self): + # ?? + return self.rr.callRemote("close") + class Helper(Referenceable, service.MultiService): - implements(RIHelper) + implements(interfaces.RIHelper) # this is the non-distributed version. When we need to have multiple # helpers, this object will query the farm to see if anyone has the # storage_index of interest, and send the request off to them. @@ -111,7 +98,7 @@ class Helper(Referenceable, service.MultiService): if storage_index in self._active_uploads: uh = self._active_uploads[storage_index] else: - uh = CHKUploadHelper(storage_index, self) + uh = self.chk_upload_helper_class(storage_index, self) self._active_uploads[storage_index] = uh return uh.start() diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index ae6f213d..a5d50771 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -6,9 +6,11 @@ from foolscap import Tub, eventual from foolscap.logging import log from allmydata import upload, offloaded +from allmydata.util import hashutil class FakeCHKUploadHelper(offloaded.CHKUploadHelper): - pass + def remote_upload(self, reader): + return {'uri_extension_hash': hashutil.uri_extension_hash("")} class FakeHelper(offloaded.Helper): chk_upload_helper_class = FakeCHKUploadHelper diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 35fc6552..5c3a491d 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -14,8 +14,8 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \ from allmydata import encode, storage, hashtree, uri from allmydata.util import idlib, mathutil from allmydata.util.assertutil import precondition -from allmydata.interfaces import IUploadable, IUploader, IEncryptedUploadable -from allmydata.offloaded import RIEncryptedUploadable +from allmydata.interfaces import IUploadable, IUploader, \ + IEncryptedUploadable, RIEncryptedUploadable from pycryptopp.cipher.aes import AES from cStringIO import StringIO @@ -429,10 +429,10 @@ class CHKUploader: def set_params(self, encoding_parameters): self._encoding_parameters = encoding_parameters - def log(self, msg, parent=None): + def log(self, msg, parent=None, **kwargs): if parent is None: parent = self._log_number - return self._client.log(msg, parent=parent) + return self._client.log(msg, parent=parent, **kwargs) def start(self, uploadable): """Start uploading the file.