from twisted.python import log
from allmydata.util.iputil import get_local_ip_for
-from twisted.internet import reactor
+from twisted.internet import defer, reactor
+# this BlockingResolver is because otherwise unit tests must sometimes deal
+# with a leftover DNS lookup thread. I'd prefer to not do this, and use the
+# default ThreadedResolver
from twisted.internet.base import BlockingResolver
reactor.installResolver(BlockingResolver())
def get_remote_service(self, nodeid, servicename):
if nodeid not in self.connections:
- raise IndexError("no connection to that peer")
+ return defer.fail(IndexError("no connection to that peer"))
d = self.connections[nodeid].callRemote("get_service",
name=servicename)
return d
--- /dev/null
+
+from twisted.trial import unittest
+from twisted.internet import defer
+
+from allmydata import upload
+
+class FakePeer:
+ def __init__(self, peerid, response):
+ self.peerid = peerid
+ self.response = response
+
+ def callRemote(self, methname, *args, **kwargs):
+ assert not args
+ return defer.maybeDeferred(self._callRemote, methname, **kwargs)
+
+ def _callRemote(self, methname, **kwargs):
+ assert methname == "allocate_bucket"
+ assert kwargs["size"] == 100
+ assert kwargs["leaser"] == "fakeclient"
+ if self.response == "good":
+ return self
+ raise upload.TooFullError()
+
+class FakeClient:
+ nodeid = "fakeclient"
+ def __init__(self, responses):
+ self.peers = []
+ for peerid,r in enumerate(responses):
+ if r == "disconnected":
+ self.peers.append(None)
+ else:
+ self.peers.append(FakePeer(peerid, r))
+
+ def permute_peerids(self, key, max_peers):
+ assert max_peers == None
+ return range(len(self.peers))
+ def get_remote_service(self, peerid, name):
+ peer = self.peers[peerid]
+ if not peer:
+ return defer.fail(IndexError("no connection to that peer"))
+ return defer.succeed(peer)
+
+class NextPeerUploader(upload.Uploader):
+ def _got_all_peers(self, res):
+ return res
+
+class NextPeer(unittest.TestCase):
+ responses = ["good", # 0
+ "full", # 1
+ "full", # 2
+ "disconnected", # 3
+ "good", # 4
+ ]
+
+ def test_0(self):
+ c = FakeClient([])
+ u = NextPeerUploader(c)
+ u._verifierid = "verifierid"
+ u._shares = 2
+ u._share_size = 100
+ d = u.start()
+ def _check(f):
+ f.trap(upload.NotEnoughPeersError)
+ d.addCallbacks(lambda res: self.fail("this was supposed to fail"),
+ _check)
+ return d
+
+ def test_1(self):
+ c = FakeClient(self.responses)
+ u = NextPeerUploader(c)
+ u._verifierid = "verifierid"
+ u._shares = 2
+ u._share_size = 100
+ d = u.start()
+ def _check(res):
+ self.failUnlessEqual(u.goodness_points, 2)
+ self.failUnlessEqual(u.landlords,
+ [(0, 0, c.peers[0]),
+ (4, 1, c.peers[4]),
+ ])
+ d.addCallback(_check)
+ return d
+
+ def test_2(self):
+ c = FakeClient(self.responses)
+ u = NextPeerUploader(c)
+ u._verifierid = "verifierid"
+ u._shares = 3
+ u._share_size = 100
+ d = u.start()
+ def _check(res):
+ self.failUnlessEqual(u.goodness_points, 3)
+ self.failUnlessEqual(u.landlords,
+ [(0, 0, c.peers[0]),
+ (4, 1, c.peers[4]),
+ (0, 2, c.peers[0]),
+ ])
+ d.addCallback(_check)
+ return d
+
+ responses2 = ["good", # 0
+ "full", # 1
+ "full", # 2
+ "good", # 3
+ "full", # 4
+ ]
+
+ def test_3(self):
+ c = FakeClient(self.responses2)
+ u = NextPeerUploader(c)
+ u._verifierid = "verifierid"
+ u._shares = 3
+ u._share_size = 100
+ d = u.start()
+ def _check(res):
+ self.failUnlessEqual(u.goodness_points, 3)
+ self.failUnlessEqual(u.landlords,
+ [(0, 0, c.peers[0]),
+ (3, 1, c.peers[3]),
+ (0, 2, c.peers[0]),
+ ])
+ d.addCallback(_check)
+ return d
+
--- /dev/null
+
+from twisted.python import failure
+from twisted.internet import defer
+
+class NotEnoughPeersError(Exception):
+ pass
+
+class HaveAllPeersError(Exception):
+ # we use this to jump out of the loop
+ pass
+
+# this wants to live in storage, not here
+class TooFullError(Exception):
+ pass
+
+
+class Uploader:
+ debug = False
+
+ def __init__(self, peer):
+ self._peer = peer
+
+ def set_verifierid(self, vid):
+ assert isinstance(vid, str)
+ self._verifierid = vid
+
+ def set_filesize(self, size):
+ self._size = size
+
+ def _calculate_parameters(self):
+ self._shares = 100
+ self._share_size = self._size / 25
+
+
+ def start(self):
+ # who should we upload to?
+
+ # maybe limit max_peers to 2*len(self.shares), to reduce memory
+ # footprint
+ max_peers = None
+
+ self.permuted = self._peer.permute_peerids(self._verifierid, max_peers)
+ # we will shrink self.permuted as we give up on peers
+ self.peer_index = 0
+ self.goodness_points = 0
+ self.target_goodness = self._shares
+ self.landlords = [] # list of (peerid, bucket_num, remotebucket)
+
+ d = defer.maybeDeferred(self._check_next_peer)
+ d.addCallback(self._got_all_peers)
+ return d
+
+ def _check_next_peer(self):
+ if len(self.permuted) == 0:
+ # there are no more to check
+ raise NotEnoughPeersError
+ if self.peer_index >= len(self.permuted):
+ self.peer_index = 0
+
+ peerid = self.permuted[self.peer_index]
+
+ d = self._peer.get_remote_service(peerid, "storageserver")
+ def _got_peer(service):
+ bucket_num = len(self.landlords)
+ if self.debug: print "asking %s" % peerid
+ d2 = service.callRemote("allocate_bucket",
+ verifierid=self._verifierid,
+ bucket_num=bucket_num,
+ size=self._share_size,
+ leaser=self._peer.nodeid)
+ def _allocate_response(bucket):
+ if self.debug:
+ print " peerid %s will grant us a lease" % peerid
+ self.landlords.append( (peerid, bucket_num, bucket) )
+ self.goodness_points += 1
+ if self.goodness_points >= self.target_goodness:
+ if self.debug: print " we're done!"
+ raise HaveAllPeersError()
+ # otherwise we fall through to allocate more peers
+ d2.addCallback(_allocate_response)
+ return d2
+ d.addCallback(_got_peer)
+ def _done_with_peer(res):
+ if self.debug: print "done with peer %s:" % peerid
+ if isinstance(res, failure.Failure):
+ if res.check(HaveAllPeersError):
+ if self.debug: print " all done"
+ # we're done!
+ return
+ if res.check(TooFullError):
+ if self.debug: print " too full"
+ elif res.check(IndexError):
+ if self.debug: print " no connection"
+ else:
+ if self.debug: print " other error:", res
+ self.permuted.remove(peerid) # this peer was unusable
+ else:
+ if self.debug: print " they gave us a lease"
+ # we get here for either good peers (when we still need
+ # more), or after checking a bad peer (and thus still need
+ # more). So now we need to grab a new peer.
+ self.peer_index += 1
+ return self._check_next_peer()
+ d.addBoth(_done_with_peer)
+ return d
+
+ def _got_all_peers(self, res):
+ d = self._encoder.do_upload(self.landlords)
+ return d
+