from twisted.internet import defer
from foolscap import Referenceable
from allmydata import upload, interfaces
-from allmydata.util import idlib
+from allmydata.util import idlib, log, observer
+class NotEnoughWritersError(Exception):
+ pass
+
class CHKUploadHelper(Referenceable, upload.CHKUploader):
"""I am the helper-server -side counterpart to AssistedUploader. I handle
implements(interfaces.RICHKUploadHelper)
def __init__(self, storage_index, helper, log_number, options={}):
- self._finished = False
+ self._started = False
self._storage_index = storage_index
self._helper = helper
upload_id = idlib.b2a(storage_index)[:6]
self._client = helper.parent
self._options = options
- self._readers = []
+ self._reader = CiphertextReader(storage_index, self)
+ self._finished_observers = observer.OneShotObserverList()
self.set_params( (3,7,10) ) # GACK
# reader is an RIEncryptedUploadable. I am specified to return an
# UploadResults dictionary.
- self._readers.append(reader)
- reader.notifyOnDisconnect(self._remove_reader, reader)
- eu = CiphertextReader(reader, self._storage_index)
- d = self.start_encrypted(eu)
- def _done(res):
- self.finished(self._storage_index)
- (uri_extension_hash, needed_shares, total_shares, size) = res
- return {'uri_extension_hash': uri_extension_hash}
- d.addCallback(_done)
- return d
+ self._reader.add_reader(reader)
- def _remove_reader(self, reader):
- # NEEDS MORE
- self._readers.remove(reader)
- if not self._readers:
- if not self._finished:
- self.finished(None)
+ # there is already an upload in progress, and a second uploader
+ # has joined in. We will notify the second client when the upload
+ # is complete, but we will not request any data from them unless
+ # the first one breaks. TODO: fetch data from both clients to
+ # speed the upload
- def finished(self, res):
- self._finished = True
+ if not self._started:
+ self._started = True
+ d = self.start_encrypted(self._reader)
+ d.addCallbacks(self._finished, self._failed)
+ return self._finished_observers.when_fired()
+
+ def _finished(self, res):
+ (uri_extension_hash, needed_shares, total_shares, size) = res
+ upload_results = {'uri_extension_hash': uri_extension_hash}
+ self._finished_observers.fire(upload_results)
+ self._helper.upload_finished(self._storage_index)
+
+ def _failed(self, f):
+ self._finished_observers.fire(f)
self._helper.upload_finished(self._storage_index)
class CiphertextReader:
implements(interfaces.IEncryptedUploadable)
- def __init__(self, remote_reader, storage_index):
- self.rr = remote_reader
+ def __init__(self, storage_index, upload_helper):
+ self._readers = []
self.storage_index = storage_index
self._offset = 0
+ self._upload_helper = upload_helper
+
+ def add_reader(self, reader):
+ # for now, we stick to the first uploader
+ self._readers.append(reader)
+
+ def call(self, *args, **kwargs):
+ if not self._readers:
+ raise NotEnoughWritersError("ran out of assisted uploaders")
+ rr = self._readers[0]
+ d = rr.callRemote(*args, **kwargs)
+ def _err(f):
+ if rr in self._readers:
+ self._readers.remove(rr)
+ self._upload_helper.log("call to assisted uploader %s failed" % rr,
+ failure=f, level=log.UNUSUAL)
+ # we can try again with someone else who's left
+ return self.call(*args, **kwargs)
+ d.addErrback(_err)
+ return d
def get_size(self):
- return self.rr.callRemote("get_size")
+ return self.call("get_size")
def get_storage_index(self):
return defer.succeed(self.storage_index)
def set_segment_size(self, segment_size):
- return self.rr.callRemote("set_segment_size", segment_size)
+ return self.call("set_segment_size", segment_size)
def set_serialized_encoding_parameters(self, params):
pass # ??
def read_encrypted(self, length):
- d = self.rr.callRemote("read_encrypted", self._offset, length)
+ d = self.call("read_encrypted", self._offset, length)
def _done(strings):
self._offset += sum([len(data) for data in strings])
return strings
d.addCallback(_done)
return d
def get_plaintext_hashtree_leaves(self, first, last, num_segments):
- return self.rr.callRemote("get_plaintext_hashtree_leaves",
- first, last, num_segments)
+ return self.call("get_plaintext_hashtree_leaves", first, last,
+ num_segments)
def get_plaintext_hash(self):
- return self.rr.callRemote("get_plaintext_hash")
+ return self.call("get_plaintext_hash")
def close(self):
# ??
- return self.rr.callRemote("close")
+ return self.call("close")
class Helper(Referenceable, service.MultiService):