From: Daira Hopwood Date: Tue, 14 Apr 2015 17:10:51 +0000 (+0100) Subject: Teach StorageFarmBroker to fire a deferred when a connection threshold is reached... X-Git-Url: https://git.rkrishnan.org/FOOURL?a=commitdiff_plain;h=462bce9e914ec34c21049a17ed3107c17b85c87d;p=tahoe-lafs%2Ftahoe-lafs.git Teach StorageFarmBroker to fire a deferred when a connection threshold is reached. refs #1449 Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index bb6dce23..6b214f3f 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -130,6 +130,7 @@ class Client(node.Node, pollmixin.PollMixin): def __init__(self, basedir="."): node.Node.__init__(self, basedir) + self.upload_ready_d = defer.Deferred() self.started_timestamp = time.time() self.logSource="Client" self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy() @@ -344,7 +345,12 @@ class Client(node.Node, pollmixin.PollMixin): def init_client_storage_broker(self): # create a StorageFarmBroker object, for use by Uploader/Downloader # (and everybody else who wants to use storage servers) - sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True) + + connection_threshold = min(self.encoding_params["k"], + self.encoding_params["happy"] + 1) + + sb = storage_client.StorageFarmBroker(self.tub, True, connection_threshold, + self.upload_ready_d) self.storage_broker = sb # load static server specifications from tahoe.cfg, if any. @@ -500,6 +506,9 @@ class Client(node.Node, pollmixin.PollMixin): s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8) s.setServiceParent(self) s.startService() + + # start processing the upload queue when we've connected to enough servers + self.upload_ready_d.addCallback(s.upload_ready) except Exception, e: self.log("couldn't start drop-uploader: %r", args=(e,)) diff --git a/src/allmydata/frontends/drop_upload.py b/src/allmydata/frontends/drop_upload.py index 0e2b48b4..108d2486 100644 --- a/src/allmydata/frontends/drop_upload.py +++ b/src/allmydata/frontends/drop_upload.py @@ -35,6 +35,8 @@ class DropUploader(service.MultiService): self._convergence = client.convergence self._local_path = FilePath(local_dir) + self.is_upload_ready = False + if inotify is None: from twisted.internet import inotify self._inotify = inotify @@ -68,6 +70,12 @@ class DropUploader(service.MultiService): self._stats_provider.count('drop_upload.dirs_monitored', 1) return d + def upload_ready(self): + """upload_ready is used to signal us to start + processing the upload items... + """ + self.is_upload_ready = True + def _notify(self, opaque, path, events_mask): self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index e532db1a..d34878d9 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -62,10 +62,12 @@ class StorageFarmBroker: I'm also responsible for subscribing to the IntroducerClient to find out about new servers as they are announced by the Introducer. """ - def __init__(self, tub, permute_peers): + def __init__(self, tub, permute_peers, connected_threshold, connected_d): self.tub = tub assert permute_peers # False not implemented yet self.permute_peers = permute_peers + self.connected_threshold = connected_threshold + self.connected_d = connected_d # self.servers maps serverid -> IServer, and keeps track of all the # storage servers that we've heard about. Each descriptor manages its # own Reconnector, and will give us a RemoteReference when we ask @@ -75,7 +77,7 @@ class StorageFarmBroker: # these two are used in unit tests def test_add_rref(self, serverid, rref, ann): - s = NativeStorageServer(serverid, ann.copy()) + s = NativeStorageServer(serverid, ann.copy(), self) s.rref = rref s._is_connected = True self.servers[serverid] = s @@ -92,7 +94,7 @@ class StorageFarmBroker: precondition(isinstance(key_s, str), key_s) precondition(key_s.startswith("v0-"), key_s) assert ann["service-name"] == "storage" - s = NativeStorageServer(key_s, ann) + s = NativeStorageServer(key_s, ann, self) serverid = s.get_serverid() old = self.servers.get(serverid) if old: @@ -118,6 +120,13 @@ class StorageFarmBroker: for dsc in self.servers.values(): dsc.try_to_connect() + def check_enough_connected(self): + if (self.connected_d is not None and + len(self.get_connected_servers()) >= self.connected_threshold): + d = self.connected_d + self.connected_d = None + d.callback(None) + def get_servers_for_psi(self, peer_selection_index): # return a list of server objects (IServers) assert self.permute_peers == True @@ -187,9 +196,10 @@ class NativeStorageServer: "application-version": "unknown: no get_version()", } - def __init__(self, key_s, ann): + def __init__(self, key_s, ann, broker): self.key_s = key_s self.announcement = ann + self.broker = broker assert "anonymous-storage-FURL" in ann, ann furl = str(ann["anonymous-storage-FURL"]) @@ -290,6 +300,7 @@ class NativeStorageServer: default = self.VERSION_DEFAULTS d = add_version_to_remote_reference(rref, default) d.addCallback(self._got_versioned_service, lp) + d.addCallback(lambda ign: self.broker.check_enough_connected()) d.addErrback(log.err, format="storageclient._got_connection", name=self.get_name(), umid="Sdq3pg") diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py index d030c12a..0040a021 100644 --- a/src/allmydata/test/test_checker.py +++ b/src/allmydata/test/test_checker.py @@ -22,7 +22,7 @@ class FakeClient: class WebResultsRendering(unittest.TestCase, WebRenderingMixin): def create_fake_client(self): - sb = StorageFarmBroker(None, True) + sb = StorageFarmBroker(None, True, 0, None) # s.get_name() (the "short description") will be "v0-00000000". # s.get_longname() will include the -long suffix. # s.get_peerid() (i.e. tubid) will be "aaa.." or "777.." or "ceir.." @@ -41,7 +41,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin): "my-version": "ver", "oldest-supported": "oldest", } - s = NativeStorageServer(key_s, ann) + s = NativeStorageServer(key_s, ann, sb) sb.test_add_server(peerid, s) # XXX: maybe use key_s? c = FakeClient() c.storage_broker = sb diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index d6cd7d0d..4b152e5a 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -246,7 +246,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase): return [ s.get_longname() for s in sb.get_servers_for_psi(key) ] def test_permute(self): - sb = StorageFarmBroker(None, True) + sb = StorageFarmBroker(None, True, 0, None) for k in ["%d" % i for i in range(5)]: ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake", "permutation-seed-base32": base32.b2a(k) } diff --git a/src/allmydata/test/test_drop_upload.py b/src/allmydata/test/test_drop_upload.py index 30ff8811..ef15a230 100644 --- a/src/allmydata/test/test_drop_upload.py +++ b/src/allmydata/test/test_drop_upload.py @@ -42,7 +42,10 @@ class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonA self.upload_dircap = n.get_uri() self.uploader = DropUploader(self.client, self.upload_dircap, self.local_dir.encode('utf-8'), inotify=self.inotify) - return self.uploader.startService() + self.uploader.setServiceParent(self.client) + self.uploader.startService() + self.uploader.upload_ready() + return None d.addCallback(_made_upload_dir) # Write something short enough for a LIT file. diff --git a/src/allmydata/test/test_helper.py b/src/allmydata/test/test_helper.py index e1fd54db..7b6c53ee 100644 --- a/src/allmydata/test/test_helper.py +++ b/src/allmydata/test/test_helper.py @@ -116,7 +116,7 @@ class AssistedUpload(unittest.TestCase): timeout = 240 # It takes longer than 120 seconds on Francois's arm box. def setUp(self): self.s = FakeClient() - self.s.storage_broker = StorageFarmBroker(None, True) + self.s.storage_broker = StorageFarmBroker(None, True, 0, None) self.s.secret_holder = client.SecretHolder("lease secret", "converge") self.s.startService() diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 890d294e..91e2bef6 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -234,7 +234,7 @@ def make_storagebroker(s=None, num_peers=10): s = FakeStorage() peerids = [tagged_hash("peerid", "%d" % i)[:20] for i in range(num_peers)] - storage_broker = StorageFarmBroker(None, True) + storage_broker = StorageFarmBroker(None, True, 0, None) for peerid in peerids: fss = FakeStorageServer(peerid, s) ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid), diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 090fa3bb..9c82077a 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -198,7 +198,7 @@ class FakeClient: mode = dict([i,mode] for i in range(num_servers)) servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid])) for fakeid in range(self.num_servers) ] - self.storage_broker = StorageFarmBroker(None, permute_peers=True) + self.storage_broker = StorageFarmBroker(None, True, 0, None) for (serverid, rref) in servers: ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid), "permutation-seed-base32": base32.b2a(serverid) } diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index 1f46cbf9..37193e37 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -239,7 +239,7 @@ class FakeClient(Client): self._secret_holder = SecretHolder("lease secret", "convergence secret") self.helper = None self.convergence = "some random string" - self.storage_broker = StorageFarmBroker(None, permute_peers=True) + self.storage_broker = StorageFarmBroker(None, True, 0, None) # fake knowledge of another server self.storage_broker.test_add_server("other_nodeid", FakeDisplayableServer("other_nodeid", u"other_nickname \u263B"))