From f0430ccc483123f5d97a8bb4f3fe76084623ddb8 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@allmydata.com>
Date: Mon, 14 Jan 2008 22:20:03 -0700
Subject: [PATCH] offloaded: cleanup to handle multiple simultaneous uploaders
 gracefully

---
 src/allmydata/offloaded.py | 86 +++++++++++++++++++++++++-------------
 1 file changed, 56 insertions(+), 30 deletions(-)

diff --git a/src/allmydata/offloaded.py b/src/allmydata/offloaded.py
index 979204f9..62dc67f7 100644
--- a/src/allmydata/offloaded.py
+++ b/src/allmydata/offloaded.py
@@ -4,9 +4,12 @@ from twisted.application import service
 from twisted.internet import defer
 from foolscap import Referenceable
 from allmydata import upload, interfaces
-from allmydata.util import idlib
+from allmydata.util import idlib, log, observer
 
 
+class NotEnoughWritersError(Exception):
+    pass
+
 
 class CHKUploadHelper(Referenceable, upload.CHKUploader):
     """I am the helper-server -side counterpart to AssistedUploader. I handle
@@ -16,7 +19,7 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
     implements(interfaces.RICHKUploadHelper)
 
     def __init__(self, storage_index, helper, log_number, options={}):
-        self._finished = False
+        self._started = False
         self._storage_index = storage_index
         self._helper = helper
         upload_id = idlib.b2a(storage_index)[:6]
@@ -26,7 +29,8 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
 
         self._client = helper.parent
         self._options = options
-        self._readers = []
+        self._reader = CiphertextReader(storage_index, self)
+        self._finished_observers = observer.OneShotObserverList()
 
         self.set_params( (3,7,10) ) # GACK
 
@@ -45,59 +49,81 @@ class CHKUploadHelper(Referenceable, upload.CHKUploader):
         # reader is an RIEncryptedUploadable. I am specified to return an
         # UploadResults dictionary.
 
-        self._readers.append(reader)
-        reader.notifyOnDisconnect(self._remove_reader, reader)
-        eu = CiphertextReader(reader, self._storage_index)
-        d = self.start_encrypted(eu)
-        def _done(res):
-            self.finished(self._storage_index)
-            (uri_extension_hash, needed_shares, total_shares, size) = res
-            return {'uri_extension_hash': uri_extension_hash}
-        d.addCallback(_done)
-        return d
+        self._reader.add_reader(reader)
 
-    def _remove_reader(self, reader):
-        # NEEDS MORE
-        self._readers.remove(reader)
-        if not self._readers:
-            if not self._finished:
-                self.finished(None)
+            # there is already an upload in progress, and a second uploader
+            # has joined in. We will notify the second client when the upload
+            # is complete, but we will not request any data from them unless
+            # the first one breaks. TODO: fetch data from both clients to
+            # speed the upload
 
-    def finished(self, res):
-        self._finished = True
+        if not self._started:
+            self._started = True
+            d = self.start_encrypted(self._reader)
+            d.addCallbacks(self._finished, self._failed)
+        return self._finished_observers.when_fired()
+
+    def _finished(self, res):
+        (uri_extension_hash, needed_shares, total_shares, size) = res
+        upload_results = {'uri_extension_hash': uri_extension_hash}
+        self._finished_observers.fire(upload_results)
+        self._helper.upload_finished(self._storage_index)
+
+    def _failed(self, f):
+        self._finished_observers.fire(f)
         self._helper.upload_finished(self._storage_index)
 
 class CiphertextReader:
     implements(interfaces.IEncryptedUploadable)
 
-    def __init__(self, remote_reader, storage_index):
-        self.rr = remote_reader
+    def __init__(self, storage_index, upload_helper):
+        self._readers = []
         self.storage_index = storage_index
         self._offset = 0
+        self._upload_helper = upload_helper
+
+    def add_reader(self, reader):
+        # for now, we stick to the first uploader
+        self._readers.append(reader)
+
+    def call(self, *args, **kwargs):
+        if not self._readers:
+            raise NotEnoughWritersError("ran out of assisted uploaders")
+        rr = self._readers[0]
+        d = rr.callRemote(*args, **kwargs)
+        def _err(f):
+            if rr in self._readers:
+                self._readers.remove(rr)
+            self._upload_helper.log("call to assisted uploader %s failed" % rr,
+                                    failure=f, level=log.UNUSUAL)
+            # we can try again with someone else who's left
+            return self.call(*args, **kwargs)
+        d.addErrback(_err)
+        return d
 
     def get_size(self):
-        return self.rr.callRemote("get_size")
+        return self.call("get_size")
     def get_storage_index(self):
         return defer.succeed(self.storage_index)
     def set_segment_size(self, segment_size):
-        return self.rr.callRemote("set_segment_size", segment_size)
+        return self.call("set_segment_size", segment_size)
     def set_serialized_encoding_parameters(self, params):
         pass # ??
     def read_encrypted(self, length):
-        d = self.rr.callRemote("read_encrypted", self._offset, length)
+        d = self.call("read_encrypted", self._offset, length)
         def _done(strings):
             self._offset += sum([len(data) for data in strings])
             return strings
         d.addCallback(_done)
         return d
     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
-        return self.rr.callRemote("get_plaintext_hashtree_leaves",
-                                  first, last, num_segments)
+        return self.call("get_plaintext_hashtree_leaves", first, last,
+                         num_segments)
     def get_plaintext_hash(self):
-        return self.rr.callRemote("get_plaintext_hash")
+        return self.call("get_plaintext_hash")
     def close(self):
         # ??
-        return self.rr.callRemote("close")
+        return self.call("close")
 
 
 class Helper(Referenceable, service.MultiService):
-- 
2.45.2