]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
Teach StorageFarmBroker to fire a deferred when a connection threshold is reached...
authorDaira Hopwood <daira@jacaranda.org>
Mon, 28 Dec 2015 15:28:05 +0000 (15:28 +0000)
committerDaira Hopwood <daira@jacaranda.org>
Mon, 28 Dec 2015 21:48:55 +0000 (21:48 +0000)
Signed-off-by: Daira Hopwood <daira@jacaranda.org>
src/allmydata/client.py
src/allmydata/frontends/drop_upload.py
src/allmydata/storage_client.py
src/allmydata/test/test_checker.py
src/allmydata/test/test_client.py
src/allmydata/test/test_helper.py
src/allmydata/test/test_mutable.py
src/allmydata/test/test_upload.py
src/allmydata/test/test_web.py

index 41840e85c6d613de7acbeb4c7328eee46c6a9d75..24432787786296bbc37f0be2bead578e97199c2b 100644 (file)
@@ -130,6 +130,7 @@ class Client(node.Node, pollmixin.PollMixin):
 
     def __init__(self, basedir="."):
         node.Node.__init__(self, basedir)
 
     def __init__(self, basedir="."):
         node.Node.__init__(self, basedir)
+        self.connected_enough_d = defer.Deferred()
         self.started_timestamp = time.time()
         self.logSource="Client"
         self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
         self.started_timestamp = time.time()
         self.logSource="Client"
         self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
@@ -346,7 +347,12 @@ class Client(node.Node, pollmixin.PollMixin):
         # (and everybody else who wants to use storage servers)
         ps = self.get_config("client", "peers.preferred", "").split(",")
         preferred_peers = tuple([p.strip() for p in ps if p != ""])
         # (and everybody else who wants to use storage servers)
         ps = self.get_config("client", "peers.preferred", "").split(",")
         preferred_peers = tuple([p.strip() for p in ps if p != ""])
-        sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True, preferred_peers=preferred_peers)
+
+        connection_threshold = min(self.encoding_params["k"],
+                                   self.encoding_params["happy"] + 1)
+
+        sb = storage_client.StorageFarmBroker(self.tub, True, connection_threshold,
+                                              self.connected_enough_d, preferred_peers=preferred_peers)
         self.storage_broker = sb
 
         # load static server specifications from tahoe.cfg, if any.
         self.storage_broker = sb
 
         # load static server specifications from tahoe.cfg, if any.
@@ -502,6 +508,9 @@ class Client(node.Node, pollmixin.PollMixin):
                 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
                 s.setServiceParent(self)
                 s.startService()
                 s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
                 s.setServiceParent(self)
                 s.startService()
+
+                # start processing the upload queue when we've connected to enough servers
+                self.connected_enough_d.addCallback(s.ready)
             except Exception, e:
                 self.log("couldn't start drop-uploader: %r", args=(e,))
 
             except Exception, e:
                 self.log("couldn't start drop-uploader: %r", args=(e,))
 
index 425007712d83fde810d34138c3d016c5def27c83..5814c1125c219d875c4663e19b37d1a9624f9b7f 100644 (file)
@@ -35,6 +35,8 @@ class DropUploader(service.MultiService):
         self._convergence = client.convergence
         self._local_path = FilePath(local_dir)
 
         self._convergence = client.convergence
         self._local_path = FilePath(local_dir)
 
+        self.is_upload_ready = False
+
         if inotify is None:
             from twisted.internet import inotify
         self._inotify = inotify
         if inotify is None:
             from twisted.internet import inotify
         self._inotify = inotify
@@ -68,6 +70,12 @@ class DropUploader(service.MultiService):
         self._stats_provider.count('drop_upload.dirs_monitored', 1)
         return d
 
         self._stats_provider.count('drop_upload.dirs_monitored', 1)
         return d
 
+    def upload_ready(self):
+        """upload_ready is used to signal us to start
+        processing the upload items...
+        """
+        self.is_upload_ready = True
+
     def _notify(self, opaque, path, events_mask):
         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
 
     def _notify(self, opaque, path, events_mask):
         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
 
index dd9780f23a8c4efd8eefccd1de47cd3d7f92d060..09148932d9271d5ee0fa4c2b4c309a930e69ac90 100644 (file)
@@ -62,10 +62,13 @@ class StorageFarmBroker:
     I'm also responsible for subscribing to the IntroducerClient to find out
     about new servers as they are announced by the Introducer.
     """
     I'm also responsible for subscribing to the IntroducerClient to find out
     about new servers as they are announced by the Introducer.
     """
-    def __init__(self, tub, permute_peers, preferred_peers=()):
+    def __init__(self, tub, permute_peers, connected_threshold, connected_d,
+                 preferred_peers=()):
         self.tub = tub
         assert permute_peers # False not implemented yet
         self.permute_peers = permute_peers
         self.tub = tub
         assert permute_peers # False not implemented yet
         self.permute_peers = permute_peers
+        self.connected_threshold = connected_threshold
+        self.connected_d = connected_d
         self.preferred_peers = preferred_peers
         # self.servers maps serverid -> IServer, and keeps track of all the
         # storage servers that we've heard about. Each descriptor manages its
         self.preferred_peers = preferred_peers
         # self.servers maps serverid -> IServer, and keeps track of all the
         # storage servers that we've heard about. Each descriptor manages its
@@ -76,7 +79,7 @@ class StorageFarmBroker:
 
     # these two are used in unit tests
     def test_add_rref(self, serverid, rref, ann):
 
     # these two are used in unit tests
     def test_add_rref(self, serverid, rref, ann):
-        s = NativeStorageServer(serverid, ann.copy())
+        s = NativeStorageServer(serverid, ann.copy(), self)
         s.rref = rref
         s._is_connected = True
         self.servers[serverid] = s
         s.rref = rref
         s._is_connected = True
         self.servers[serverid] = s
@@ -93,7 +96,7 @@ class StorageFarmBroker:
             precondition(isinstance(key_s, str), key_s)
             precondition(key_s.startswith("v0-"), key_s)
         assert ann["service-name"] == "storage"
             precondition(isinstance(key_s, str), key_s)
             precondition(key_s.startswith("v0-"), key_s)
         assert ann["service-name"] == "storage"
-        s = NativeStorageServer(key_s, ann)
+        s = NativeStorageServer(key_s, ann, self)
         serverid = s.get_serverid()
         old = self.servers.get(serverid)
         if old:
         serverid = s.get_serverid()
         old = self.servers.get(serverid)
         if old:
@@ -119,6 +122,13 @@ class StorageFarmBroker:
         for dsc in self.servers.values():
             dsc.try_to_connect()
 
         for dsc in self.servers.values():
             dsc.try_to_connect()
 
+    def check_enough_connected(self):
+        if (self.connected_d is not None and
+            len(self.get_connected_servers()) >= self.connected_threshold):
+            d = self.connected_d
+            self.connected_d = None
+            d.callback(None)
+
     def get_servers_for_psi(self, peer_selection_index):
         # return a list of server objects (IServers)
         assert self.permute_peers == True
     def get_servers_for_psi(self, peer_selection_index):
         # return a list of server objects (IServers)
         assert self.permute_peers == True
@@ -191,9 +201,10 @@ class NativeStorageServer:
         "application-version": "unknown: no get_version()",
         }
 
         "application-version": "unknown: no get_version()",
         }
 
-    def __init__(self, key_s, ann):
+    def __init__(self, key_s, ann, broker):
         self.key_s = key_s
         self.announcement = ann
         self.key_s = key_s
         self.announcement = ann
+        self.broker = broker
 
         assert "anonymous-storage-FURL" in ann, ann
         furl = str(ann["anonymous-storage-FURL"])
 
         assert "anonymous-storage-FURL" in ann, ann
         furl = str(ann["anonymous-storage-FURL"])
@@ -294,6 +305,7 @@ class NativeStorageServer:
         default = self.VERSION_DEFAULTS
         d = add_version_to_remote_reference(rref, default)
         d.addCallback(self._got_versioned_service, lp)
         default = self.VERSION_DEFAULTS
         d = add_version_to_remote_reference(rref, default)
         d.addCallback(self._got_versioned_service, lp)
+        d.addCallback(lambda ign: self.broker.check_enough_connected())
         d.addErrback(log.err, format="storageclient._got_connection",
                      name=self.get_name(), umid="Sdq3pg")
 
         d.addErrback(log.err, format="storageclient._got_connection",
                      name=self.get_name(), umid="Sdq3pg")
 
index d030c12a40176b225a574008e09b31732be57728..0040a021af78d5bdd24feb6c49cb8f0e8922ddf1 100644 (file)
@@ -22,7 +22,7 @@ class FakeClient:
 class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
 
     def create_fake_client(self):
 class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
 
     def create_fake_client(self):
-        sb = StorageFarmBroker(None, True)
+        sb = StorageFarmBroker(None, True, 0, None)
         # s.get_name() (the "short description") will be "v0-00000000".
         # s.get_longname() will include the -long suffix.
         # s.get_peerid() (i.e. tubid) will be "aaa.." or "777.." or "ceir.."
         # s.get_name() (the "short description") will be "v0-00000000".
         # s.get_longname() will include the -long suffix.
         # s.get_peerid() (i.e. tubid) will be "aaa.." or "777.." or "ceir.."
@@ -41,7 +41,7 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
                     "my-version": "ver",
                     "oldest-supported": "oldest",
                     }
                     "my-version": "ver",
                     "oldest-supported": "oldest",
                     }
-            s = NativeStorageServer(key_s, ann)
+            s = NativeStorageServer(key_s, ann, sb)
             sb.test_add_server(peerid, s) # XXX: maybe use key_s?
         c = FakeClient()
         c.storage_broker = sb
             sb.test_add_server(peerid, s) # XXX: maybe use key_s?
         c = FakeClient()
         c.storage_broker = sb
index 1819fa4a57666be9db48a792c07fb20b38138e27..0e646edfe9c629d11a25baeec4e125270ea26361 100644 (file)
@@ -251,7 +251,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
         return [ s.get_longname() for s in sb.get_servers_for_psi(key) ]
 
     def test_permute(self):
         return [ s.get_longname() for s in sb.get_servers_for_psi(key) ]
 
     def test_permute(self):
-        sb = StorageFarmBroker(None, True)
+        sb = StorageFarmBroker(None, True, 0, None)
         for k in ["%d" % i for i in range(5)]:
             ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
                    "permutation-seed-base32": base32.b2a(k) }
         for k in ["%d" % i for i in range(5)]:
             ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
                    "permutation-seed-base32": base32.b2a(k) }
@@ -263,7 +263,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
         self.failUnlessReallyEqual(self._permute(sb, "one"), [])
 
     def test_permute_with_preferred(self):
         self.failUnlessReallyEqual(self._permute(sb, "one"), [])
 
     def test_permute_with_preferred(self):
-        sb = StorageFarmBroker(None, True, ['1','4'])
+        sb = StorageFarmBroker(None, True, 0, None, ['1','4'])
         for k in ["%d" % i for i in range(5)]:
             ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
                    "permutation-seed-base32": base32.b2a(k) }
         for k in ["%d" % i for i in range(5)]:
             ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
                    "permutation-seed-base32": base32.b2a(k) }
index e1fd54dbe853fde5e4bccfad0a49f7ea0f6dd7b4..7b6c53eee2c9a749fb03265629120eefaba99e35 100644 (file)
@@ -116,7 +116,7 @@ class AssistedUpload(unittest.TestCase):
     timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
     def setUp(self):
         self.s = FakeClient()
     timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
     def setUp(self):
         self.s = FakeClient()
-        self.s.storage_broker = StorageFarmBroker(None, True)
+        self.s.storage_broker = StorageFarmBroker(None, True, 0, None)
         self.s.secret_holder = client.SecretHolder("lease secret", "converge")
         self.s.startService()
 
         self.s.secret_holder = client.SecretHolder("lease secret", "converge")
         self.s.startService()
 
index c9711c6dc3ac5287a6406f746667ce0d5cc18bc5..0757182c88d1ec445e0240e74696bb6f0351cba6 100644 (file)
@@ -234,7 +234,7 @@ def make_storagebroker(s=None, num_peers=10):
         s = FakeStorage()
     peerids = [tagged_hash("peerid", "%d" % i)[:20]
                for i in range(num_peers)]
         s = FakeStorage()
     peerids = [tagged_hash("peerid", "%d" % i)[:20]
                for i in range(num_peers)]
-    storage_broker = StorageFarmBroker(None, True)
+    storage_broker = StorageFarmBroker(None, True, 0, None)
     for peerid in peerids:
         fss = FakeStorageServer(peerid, s)
         ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
     for peerid in peerids:
         fss = FakeStorageServer(peerid, s)
         ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
index 090fa3bbe554773ea118a21dc0fc3a0b5bbca81e..9c82077af0f4cc4d1860155d6586d9081b6b45d2 100644 (file)
@@ -198,7 +198,7 @@ class FakeClient:
             mode = dict([i,mode] for i in range(num_servers))
         servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
                     for fakeid in range(self.num_servers) ]
             mode = dict([i,mode] for i in range(num_servers))
         servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
                     for fakeid in range(self.num_servers) ]
-        self.storage_broker = StorageFarmBroker(None, permute_peers=True)
+        self.storage_broker = StorageFarmBroker(None, True, 0, None)
         for (serverid, rref) in servers:
             ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
                    "permutation-seed-base32": base32.b2a(serverid) }
         for (serverid, rref) in servers:
             ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
                    "permutation-seed-base32": base32.b2a(serverid) }
index 723ae7ac604e6f494762e9f10d95fcf539002e25..6368f5b9651fc1048998ad9a4e97c18cbc94a51e 100644 (file)
@@ -239,7 +239,7 @@ class FakeClient(Client):
         self._secret_holder = SecretHolder("lease secret", "convergence secret")
         self.helper = None
         self.convergence = "some random string"
         self._secret_holder = SecretHolder("lease secret", "convergence secret")
         self.helper = None
         self.convergence = "some random string"
-        self.storage_broker = StorageFarmBroker(None, permute_peers=True)
+        self.storage_broker = StorageFarmBroker(None, True, 0, None)
         # fake knowledge of another server
         self.storage_broker.test_add_server("other_nodeid",
                                             FakeDisplayableServer("other_nodeid", u"other_nickname \u263B"))
         # fake knowledge of another server
         self.storage_broker.test_add_server("other_nodeid",
                                             FakeDisplayableServer("other_nodeid", u"other_nickname \u263B"))