]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
implement upload peer selection
authorBrian Warner <warner@lothar.com>
Fri, 1 Dec 2006 09:54:28 +0000 (02:54 -0700)
committerBrian Warner <warner@lothar.com>
Fri, 1 Dec 2006 09:54:28 +0000 (02:54 -0700)
allmydata/client.py
allmydata/test/test_upload.py [new file with mode: 0644]
allmydata/upload.py [new file with mode: 0644]

index 4b055b30a1f5c33f7de78d67bb173849db181e58..1a25606fd52760b2a18fb775234d701c0671e968 100644 (file)
@@ -6,7 +6,10 @@ from twisted.application import service
 from twisted.python import log
 from allmydata.util.iputil import get_local_ip_for
 
-from twisted.internet import reactor
+from twisted.internet import defer, reactor
+# this BlockingResolver is because otherwise unit tests must sometimes deal
+# with a leftover DNS lookup thread. I'd prefer to not do this, and use the
+# default ThreadedResolver
 from twisted.internet.base import BlockingResolver
 reactor.installResolver(BlockingResolver())
 
@@ -108,7 +111,7 @@ class Client(service.MultiService, Referenceable):
 
     def get_remote_service(self, nodeid, servicename):
         if nodeid not in self.connections:
-            raise IndexError("no connection to that peer")
+            return defer.fail(IndexError("no connection to that peer"))
         d = self.connections[nodeid].callRemote("get_service",
                                                 name=servicename)
         return d
diff --git a/allmydata/test/test_upload.py b/allmydata/test/test_upload.py
new file mode 100644 (file)
index 0000000..e0f522a
--- /dev/null
@@ -0,0 +1,124 @@
+
+from twisted.trial import unittest
+from twisted.internet import defer
+
+from allmydata import upload
+
+class FakePeer:
+    def __init__(self, peerid, response):
+        self.peerid = peerid
+        self.response = response
+
+    def callRemote(self, methname, *args, **kwargs):
+        assert not args
+        return defer.maybeDeferred(self._callRemote, methname, **kwargs)
+
+    def _callRemote(self, methname, **kwargs):
+        assert methname == "allocate_bucket"
+        assert kwargs["size"] == 100
+        assert kwargs["leaser"] == "fakeclient"
+        if self.response == "good":
+            return self
+        raise upload.TooFullError()
+
+class FakeClient:
+    nodeid = "fakeclient"
+    def __init__(self, responses):
+        self.peers = []
+        for peerid,r in enumerate(responses):
+            if r == "disconnected":
+                self.peers.append(None)
+            else:
+                self.peers.append(FakePeer(peerid, r))
+
+    def permute_peerids(self, key, max_peers):
+        assert max_peers == None
+        return range(len(self.peers))
+    def get_remote_service(self, peerid, name):
+        peer = self.peers[peerid]
+        if not peer:
+            return defer.fail(IndexError("no connection to that peer"))
+        return defer.succeed(peer)
+
+class NextPeerUploader(upload.Uploader):
+    def _got_all_peers(self, res):
+        return res
+
+class NextPeer(unittest.TestCase):
+    responses = ["good", # 0
+                 "full", # 1
+                 "full", # 2
+                 "disconnected", # 3
+                 "good", # 4
+                 ]
+
+    def test_0(self):
+        c = FakeClient([])
+        u = NextPeerUploader(c)
+        u._verifierid = "verifierid"
+        u._shares = 2
+        u._share_size = 100
+        d = u.start()
+        def _check(f):
+            f.trap(upload.NotEnoughPeersError)
+        d.addCallbacks(lambda res: self.fail("this was supposed to fail"),
+                       _check)
+        return d
+
+    def test_1(self):
+        c = FakeClient(self.responses)
+        u = NextPeerUploader(c)
+        u._verifierid = "verifierid"
+        u._shares = 2
+        u._share_size = 100
+        d = u.start()
+        def _check(res):
+            self.failUnlessEqual(u.goodness_points, 2)
+            self.failUnlessEqual(u.landlords,
+                                 [(0, 0, c.peers[0]),
+                                  (4, 1, c.peers[4]),
+                                  ])
+        d.addCallback(_check)
+        return d
+
+    def test_2(self):
+        c = FakeClient(self.responses)
+        u = NextPeerUploader(c)
+        u._verifierid = "verifierid"
+        u._shares = 3
+        u._share_size = 100
+        d = u.start()
+        def _check(res):
+            self.failUnlessEqual(u.goodness_points, 3)
+            self.failUnlessEqual(u.landlords,
+                                 [(0, 0, c.peers[0]),
+                                  (4, 1, c.peers[4]),
+                                  (0, 2, c.peers[0]),
+                                  ])
+        d.addCallback(_check)
+        return d
+
+    responses2 = ["good", # 0
+                 "full", # 1
+                 "full", # 2
+                 "good", # 3
+                 "full", # 4
+                 ]
+
+    def test_3(self):
+        c = FakeClient(self.responses2)
+        u = NextPeerUploader(c)
+        u._verifierid = "verifierid"
+        u._shares = 3
+        u._share_size = 100
+        d = u.start()
+        def _check(res):
+            self.failUnlessEqual(u.goodness_points, 3)
+            self.failUnlessEqual(u.landlords,
+                                 [(0, 0, c.peers[0]),
+                                  (3, 1, c.peers[3]),
+                                  (0, 2, c.peers[0]),
+                                  ])
+        d.addCallback(_check)
+        return d
+
diff --git a/allmydata/upload.py b/allmydata/upload.py
new file mode 100644 (file)
index 0000000..6b69ee0
--- /dev/null
@@ -0,0 +1,110 @@
+
+from twisted.python import failure
+from twisted.internet import defer
+
+class NotEnoughPeersError(Exception):
+    pass
+
+class HaveAllPeersError(Exception):
+    # we use this to jump out of the loop
+    pass
+
+# this wants to live in storage, not here
+class TooFullError(Exception):
+    pass
+
+
+class Uploader:
+    debug = False
+
+    def __init__(self, peer):
+        self._peer = peer
+
+    def set_verifierid(self, vid):
+        assert isinstance(vid, str)
+        self._verifierid = vid
+
+    def set_filesize(self, size):
+        self._size = size
+
+    def _calculate_parameters(self):
+        self._shares = 100
+        self._share_size = self._size / 25
+
+
+    def start(self):
+        # who should we upload to?
+
+        # maybe limit max_peers to 2*len(self.shares), to reduce memory
+        # footprint
+        max_peers = None
+
+        self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
+        # we will shrink self.permuted as we give up on peers
+        self.peer_index = 0
+        self.goodness_points = 0
+        self.target_goodness = self._shares
+        self.landlords = [] # list of (peerid, bucket_num, remotebucket)
+
+        d = defer.maybeDeferred(self._check_next_peer)
+        d.addCallback(self._got_all_peers)
+        return d
+
+    def _check_next_peer(self):
+        if len(self.permuted) == 0:
+            # there are no more to check
+            raise NotEnoughPeersError
+        if self.peer_index >= len(self.permuted):
+            self.peer_index = 0
+
+        peerid = self.permuted[self.peer_index]
+
+        d = self._peer.get_remote_service(peerid, "storageserver")
+        def _got_peer(service):
+            bucket_num = len(self.landlords)
+            if self.debug: print "asking %s" % peerid
+            d2 = service.callRemote("allocate_bucket",
+                                    verifierid=self._verifierid,
+                                    bucket_num=bucket_num,
+                                    size=self._share_size,
+                                    leaser=self._peer.nodeid)
+            def _allocate_response(bucket):
+                if self.debug:
+                    print " peerid %s will grant us a lease" % peerid
+                self.landlords.append( (peerid, bucket_num, bucket) )
+                self.goodness_points += 1
+                if self.goodness_points >= self.target_goodness:
+                    if self.debug: print " we're done!"
+                    raise HaveAllPeersError()
+                # otherwise we fall through to allocate more peers
+            d2.addCallback(_allocate_response)
+            return d2
+        d.addCallback(_got_peer)
+        def _done_with_peer(res):
+            if self.debug: print "done with peer %s:" % peerid
+            if isinstance(res, failure.Failure):
+                if res.check(HaveAllPeersError):
+                    if self.debug: print " all done"
+                    # we're done!
+                    return
+                if res.check(TooFullError):
+                    if self.debug: print " too full"
+                elif res.check(IndexError):
+                    if self.debug: print " no connection"
+                else:
+                    if self.debug: print " other error:", res
+                self.permuted.remove(peerid) # this peer was unusable
+            else:
+                if self.debug: print " they gave us a lease"
+                # we get here for either good peers (when we still need
+                # more), or after checking a bad peer (and thus still need
+                # more). So now we need to grab a new peer.
+                self.peer_index += 1
+            return self._check_next_peer()
+        d.addBoth(_done_with_peer)
+        return d
+
+    def _got_all_peers(self, res):
+        d = self._encoder.do_upload(self.landlords)
+        return d
+