From: Brian Warner Date: Tue, 15 Jan 2008 05:20:03 +0000 (-0700) Subject: offloaded: cleanup to handle multiple simultaneous uploaders gracefully X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/css/flags?a=commitdiff_plain;h=f0430ccc483123f5d97a8bb4f3fe76084623ddb8;p=tahoe-lafs%2Ftahoe-lafs.git offloaded: cleanup to handle multiple simultaneous uploaders gracefully --- diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py index 979204f9..62dc67f7 100644 --- a/src/allmydata/offloaded.py +++ b/src/allmydata/offloaded.py @@ -4,9 +4,12 @@ from twisted.application import service from twisted.internet import defer from foolscap import Referenceable from allmydata import upload, interfaces -from allmydata.util import idlib +from allmydata.util import idlib, log, observer +class NotEnoughWritersError(Exception): + pass + class CHKUploadHelper(Referenceable, upload.CHKUploader): """I am the helper-server -side counterpart to AssistedUploader. I handle @@ -16,7 +19,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): implements(interfaces.RICHKUploadHelper) def __init__(self, storage_index, helper, log_number, options={}): - self._finished = False + self._started = False self._storage_index = storage_index self._helper = helper upload_id = idlib.b2a(storage_index)[:6] @@ -26,7 +29,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): self._client = helper.parent self._options = options - self._readers = [] + self._reader = CiphertextReader(storage_index, self) + self._finished_observers = observer.OneShotObserverList() self.set_params( (3,7,10) ) # GACK @@ -45,59 +49,81 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader): # reader is an RIEncryptedUploadable. I am specified to return an # UploadResults dictionary. - self._readers.append(reader) - reader.notifyOnDisconnect(self._remove_reader, reader) - eu = CiphertextReader(reader, self._storage_index) - d = self.start_encrypted(eu) - def _done(res): - self.finished(self._storage_index) - (uri_extension_hash, needed_shares, total_shares, size) = res - return {'uri_extension_hash': uri_extension_hash} - d.addCallback(_done) - return d + self._reader.add_reader(reader) - def _remove_reader(self, reader): - # NEEDS MORE - self._readers.remove(reader) - if not self._readers: - if not self._finished: - self.finished(None) + # there is already an upload in progress, and a second uploader + # has joined in. We will notify the second client when the upload + # is complete, but we will not request any data from them unless + # the first one breaks. TODO: fetch data from both clients to + # speed the upload - def finished(self, res): - self._finished = True + if not self._started: + self._started = True + d = self.start_encrypted(self._reader) + d.addCallbacks(self._finished, self._failed) + return self._finished_observers.when_fired() + + def _finished(self, res): + (uri_extension_hash, needed_shares, total_shares, size) = res + upload_results = {'uri_extension_hash': uri_extension_hash} + self._finished_observers.fire(upload_results) + self._helper.upload_finished(self._storage_index) + + def _failed(self, f): + self._finished_observers.fire(f) self._helper.upload_finished(self._storage_index) class CiphertextReader: implements(interfaces.IEncryptedUploadable) - def __init__(self, remote_reader, storage_index): - self.rr = remote_reader + def __init__(self, storage_index, upload_helper): + self._readers = [] self.storage_index = storage_index self._offset = 0 + self._upload_helper = upload_helper + + def add_reader(self, reader): + # for now, we stick to the first uploader + self._readers.append(reader) + + def call(self, *args, **kwargs): + if not self._readers: + raise NotEnoughWritersError("ran out of assisted uploaders") + rr = self._readers[0] + d = rr.callRemote(*args, **kwargs) + def _err(f): + if rr in self._readers: + self._readers.remove(rr) + self._upload_helper.log("call to assisted uploader %s failed" % rr, + failure=f, level=log.UNUSUAL) + # we can try again with someone else who's left + return self.call(*args, **kwargs) + d.addErrback(_err) + return d def get_size(self): - return self.rr.callRemote("get_size") + return self.call("get_size") def get_storage_index(self): return defer.succeed(self.storage_index) def set_segment_size(self, segment_size): - return self.rr.callRemote("set_segment_size", segment_size) + return self.call("set_segment_size", segment_size) def set_serialized_encoding_parameters(self, params): pass # ?? def read_encrypted(self, length): - d = self.rr.callRemote("read_encrypted", self._offset, length) + d = self.call("read_encrypted", self._offset, length) def _done(strings): self._offset += sum([len(data) for data in strings]) return strings d.addCallback(_done) return d def get_plaintext_hashtree_leaves(self, first, last, num_segments): - return self.rr.callRemote("get_plaintext_hashtree_leaves", - first, last, num_segments) + return self.call("get_plaintext_hashtree_leaves", first, last, + num_segments) def get_plaintext_hash(self): - return self.rr.callRemote("get_plaintext_hash") + return self.call("get_plaintext_hash") def close(self): # ?? - return self.rr.callRemote("close") + return self.call("close") class Helper(Referenceable, service.MultiService):