From c17a8db7324ddc4fab99bce5034a69c193b2bec9 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Fri, 1 Dec 2006 02:54:28 -0700 Subject: [PATCH] implement upload peer selection --- allmydata/client.py | 7 +- allmydata/test/test_upload.py | 124 ++++++++++++++++++++++++++++++++++ allmydata/upload.py | 110 ++++++++++++++++++++++++++++++ 3 files changed, 239 insertions(+), 2 deletions(-) create mode 100644 allmydata/test/test_upload.py create mode 100644 allmydata/upload.py diff --git a/allmydata/client.py b/allmydata/client.py index 4b055b30..1a25606f 100644 --- a/allmydata/client.py +++ b/allmydata/client.py @@ -6,7 +6,10 @@ from twisted.application import service from twisted.python import log from allmydata.util.iputil import get_local_ip_for -from twisted.internet import reactor +from twisted.internet import defer, reactor +# this BlockingResolver is because otherwise unit tests must sometimes deal +# with a leftover DNS lookup thread. I'd prefer to not do this, and use the +# default ThreadedResolver from twisted.internet.base import BlockingResolver reactor.installResolver(BlockingResolver()) @@ -108,7 +111,7 @@ class Client(service.MultiService, Referenceable): def get_remote_service(self, nodeid, servicename): if nodeid not in self.connections: - raise IndexError("no connection to that peer") + return defer.fail(IndexError("no connection to that peer")) d = self.connections[nodeid].callRemote("get_service", name=servicename) return d diff --git a/allmydata/test/test_upload.py b/allmydata/test/test_upload.py new file mode 100644 index 00000000..e0f522af --- /dev/null +++ b/allmydata/test/test_upload.py @@ -0,0 +1,124 @@ + +from twisted.trial import unittest +from twisted.internet import defer + +from allmydata import upload + +class FakePeer: + def __init__(self, peerid, response): + self.peerid = peerid + self.response = response + + def callRemote(self, methname, *args, **kwargs): + assert not args + return defer.maybeDeferred(self._callRemote, methname, **kwargs) + + def _callRemote(self, methname, **kwargs): + assert methname == "allocate_bucket" + assert kwargs["size"] == 100 + assert kwargs["leaser"] == "fakeclient" + if self.response == "good": + return self + raise upload.TooFullError() + +class FakeClient: + nodeid = "fakeclient" + def __init__(self, responses): + self.peers = [] + for peerid,r in enumerate(responses): + if r == "disconnected": + self.peers.append(None) + else: + self.peers.append(FakePeer(peerid, r)) + + def permute_peerids(self, key, max_peers): + assert max_peers == None + return range(len(self.peers)) + def get_remote_service(self, peerid, name): + peer = self.peers[peerid] + if not peer: + return defer.fail(IndexError("no connection to that peer")) + return defer.succeed(peer) + +class NextPeerUploader(upload.Uploader): + def _got_all_peers(self, res): + return res + +class NextPeer(unittest.TestCase): + responses = ["good", # 0 + "full", # 1 + "full", # 2 + "disconnected", # 3 + "good", # 4 + ] + + def test_0(self): + c = FakeClient([]) + u = NextPeerUploader(c) + u._verifierid = "verifierid" + u._shares = 2 + u._share_size = 100 + d = u.start() + def _check(f): + f.trap(upload.NotEnoughPeersError) + d.addCallbacks(lambda res: self.fail("this was supposed to fail"), + _check) + return d + + def test_1(self): + c = FakeClient(self.responses) + u = NextPeerUploader(c) + u._verifierid = "verifierid" + u._shares = 2 + u._share_size = 100 + d = u.start() + def _check(res): + self.failUnlessEqual(u.goodness_points, 2) + self.failUnlessEqual(u.landlords, + [(0, 0, c.peers[0]), + (4, 1, c.peers[4]), + ]) + d.addCallback(_check) + return d + + def test_2(self): + c = FakeClient(self.responses) + u = NextPeerUploader(c) + u._verifierid = "verifierid" + u._shares = 3 + u._share_size = 100 + d = u.start() + def _check(res): + self.failUnlessEqual(u.goodness_points, 3) + self.failUnlessEqual(u.landlords, + [(0, 0, c.peers[0]), + (4, 1, c.peers[4]), + (0, 2, c.peers[0]), + ]) + d.addCallback(_check) + return d + + responses2 = ["good", # 0 + "full", # 1 + "full", # 2 + "good", # 3 + "full", # 4 + ] + + def test_3(self): + c = FakeClient(self.responses2) + u = NextPeerUploader(c) + u._verifierid = "verifierid" + u._shares = 3 + u._share_size = 100 + d = u.start() + def _check(res): + self.failUnlessEqual(u.goodness_points, 3) + self.failUnlessEqual(u.landlords, + [(0, 0, c.peers[0]), + (3, 1, c.peers[3]), + (0, 2, c.peers[0]), + ]) + d.addCallback(_check) + return d + diff --git a/allmydata/upload.py b/allmydata/upload.py new file mode 100644 index 00000000..6b69ee03 --- /dev/null +++ b/allmydata/upload.py @@ -0,0 +1,110 @@ + +from twisted.python import failure +from twisted.internet import defer + +class NotEnoughPeersError(Exception): + pass + +class HaveAllPeersError(Exception): + # we use this to jump out of the loop + pass + +# this wants to live in storage, not here +class TooFullError(Exception): + pass + + +class Uploader: + debug = False + + def __init__(self, peer): + self._peer = peer + + def set_verifierid(self, vid): + assert isinstance(vid, str) + self._verifierid = vid + + def set_filesize(self, size): + self._size = size + + def _calculate_parameters(self): + self._shares = 100 + self._share_size = self._size / 25 + + + def start(self): + # who should we upload to? + + # maybe limit max_peers to 2*len(self.shares), to reduce memory + # footprint + max_peers = None + + self.permuted = self._peer.permute_peerids(self._verifierid, max_peers) + # we will shrink self.permuted as we give up on peers + self.peer_index = 0 + self.goodness_points = 0 + self.target_goodness = self._shares + self.landlords = [] # list of (peerid, bucket_num, remotebucket) + + d = defer.maybeDeferred(self._check_next_peer) + d.addCallback(self._got_all_peers) + return d + + def _check_next_peer(self): + if len(self.permuted) == 0: + # there are no more to check + raise NotEnoughPeersError + if self.peer_index >= len(self.permuted): + self.peer_index = 0 + + peerid = self.permuted[self.peer_index] + + d = self._peer.get_remote_service(peerid, "storageserver") + def _got_peer(service): + bucket_num = len(self.landlords) + if self.debug: print "asking %s" % peerid + d2 = service.callRemote("allocate_bucket", + verifierid=self._verifierid, + bucket_num=bucket_num, + size=self._share_size, + leaser=self._peer.nodeid) + def _allocate_response(bucket): + if self.debug: + print " peerid %s will grant us a lease" % peerid + self.landlords.append( (peerid, bucket_num, bucket) ) + self.goodness_points += 1 + if self.goodness_points >= self.target_goodness: + if self.debug: print " we're done!" + raise HaveAllPeersError() + # otherwise we fall through to allocate more peers + d2.addCallback(_allocate_response) + return d2 + d.addCallback(_got_peer) + def _done_with_peer(res): + if self.debug: print "done with peer %s:" % peerid + if isinstance(res, failure.Failure): + if res.check(HaveAllPeersError): + if self.debug: print " all done" + # we're done! + return + if res.check(TooFullError): + if self.debug: print " too full" + elif res.check(IndexError): + if self.debug: print " no connection" + else: + if self.debug: print " other error:", res + self.permuted.remove(peerid) # this peer was unusable + else: + if self.debug: print " they gave us a lease" + # we get here for either good peers (when we still need + # more), or after checking a bad peer (and thus still need + # more). So now we need to grab a new peer. + self.peer_index += 1 + return self._check_next_peer() + d.addBoth(_done_with_peer) + return d + + def _got_all_peers(self, res): + d = self._encoder.do_upload(self.landlords) + return d + -- 2.45.2