From: Zooko O'Whielacronx Date: Fri, 30 Mar 2007 21:54:33 +0000 (-0700) Subject: add unit tests and fix bugs in upload X-Git-Url: https://git.rkrishnan.org/vdrive//%22file:/%22?a=commitdiff_plain;h=3d694a90f372bf18912ce5a50e9b00cf413a91eb;p=tahoe-lafs%2Ftahoe-lafs.git add unit tests and fix bugs in upload --- diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 0328ae9e..fbf6327a 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -1,21 +1,44 @@ from twisted.trial import unittest +from twisted.python import log +from twisted.python.failure import Failure from twisted.internet import defer from cStringIO import StringIO +from foolscap import eventual + from allmydata import upload from allmydata.uri import unpack_uri +from test_encode import FakePeer + class FakeStorageServer: - pass + def __init__(self, mode): + self.mode = mode + def callRemote(self, methname, *args, **kwargs): + def _call(): + meth = getattr(self, methname) + return meth(*args, **kwargs) + d = eventual.fireEventually() + d.addCallback(lambda res: _call()) + return d + def allocate_buckets(self, verifierid, sharenums, shareize, blocksize, canary): + if self.mode == "full": + return (set(), {},) + elif self.mode == "already got them": + return (set(sharenums), {},) + else: + return (set(), dict([(shnum, FakePeer(),) for shnum in sharenums]),) class FakeClient: + def __init__(self, mode="good"): + self.mode = mode def get_permuted_peers(self, verifierid): - return [ ("%20d"%fakeid, "%20d"%fakeid, FakeStorageServer(),) for fakeid in range(50) ] + return [ ("%20d"%fakeid, "%20d"%fakeid, FakeStorageServer(self.mode),) for fakeid in range(50) ] -class Uploader(unittest.TestCase): +class GoodServer(unittest.TestCase): def setUp(self): - self.node = FakeClient() + self.node = FakeClient(mode="good") self.u = upload.Uploader() self.u.running = True self.u.parent = self.node @@ -51,3 +74,20 @@ class Uploader(unittest.TestCase): d = self.u.upload_filename(fn) d.addCallback(self._check) return d + +class FullServer(unittest.TestCase): + def setUp(self): + self.node = FakeClient(mode="full") + self.u = upload.Uploader() + self.u.running = True + self.u.parent = self.node + + def _should_fail(self, f): + self.failUnless(isinstance(f, Failure) and f.check(upload.NotEnoughPeersError)) + + def testData(self): + data = "This is some data to upload" + d = self.u.upload_data(data) + d.addBoth(self._should_fail) + return d + diff --git a/src/allmydata/upload.py b/src/allmydata/upload.py index 5a7964a5..7559319d 100644 --- a/src/allmydata/upload.py +++ b/src/allmydata/upload.py @@ -4,7 +4,8 @@ from twisted.internet import defer from twisted.application import service from foolscap import Referenceable -from allmydata.util import idlib, mathutil +from allmydata.util import idlib +from allmydata.util.assertutil import _assert from allmydata import encode_new from allmydata.uri import pack_uri from allmydata.interfaces import IUploadable, IUploader @@ -24,8 +25,9 @@ class TooFullError(Exception): pass class PeerTracker: - def __init__(self, peerid, connection, sharesize, blocksize, verifierid): + def __init__(self, peerid, permutedid, connection, sharesize, blocksize, verifierid): self.peerid = peerid + self.permutedid = permutedid self.connection = connection self.buckets = {} # k: shareid, v: IRemoteBucketWriter self.sharesize = sharesize @@ -33,13 +35,14 @@ class PeerTracker: self.verifierid = verifierid def query(self, sharenums): - d = self.connection.callRemote("allocate_buckets", self._verifierid, + d = self.connection.callRemote("allocate_buckets", self.verifierid, sharenums, self.sharesize, self.blocksize, canary=Referenceable()) d.addCallback(self._got_reply) return d def _got_reply(self, (alreadygot, buckets)): + log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) self.buckets.update(buckets) return (alreadygot, set(buckets.keys())) @@ -65,7 +68,6 @@ class FileUploader: assert len(vid) == 20 self._verifierid = vid - def start(self): """Start uploading the file. @@ -90,17 +92,11 @@ class FileUploader: # responsible for handling the data and sending out the shares. peers = self._client.get_permuted_peers(self._verifierid) assert peers - trackers = [ (permutedid, PeerTracker(peerid, conn, share_size, block_size, self._verifierid),) + trackers = [ PeerTracker(peerid, permutedid, conn, share_size, block_size, self._verifierid) for permutedid, peerid, conn in peers ] - ring_things = [] # a list of (position_in_ring, whatami, x) where whatami is 0 if x is a sharenum or else 1 if x is a PeerTracker instance - ring_things.extend([ (permutedpeerid, 1, peer,) for permutedpeerid, peer in trackers ]) - shares = [ (i * 2**160 / self.total_shares, 0, i) for i in range(self.total_shares) ] - ring_things.extend(shares) - ring_things.sort() - self.ring_things = collections.deque(ring_things) - self.usable_peers = set([peer for permutedid, peer in trackers]) - self.used_peers = set() - self.unallocated_sharenums = set(shares) + self.usable_peers = set(trackers) # this set shrinks over time + self.used_peers = set() # while this set grows + self.unallocated_sharenums = set(range(self.total_shares)) # this one shrinks d = self._locate_all_shareholders() d.addCallback(self._send_shares) @@ -112,54 +108,80 @@ class FileUploader: @return: a set of PeerTracker instances that have agreed to hold some shares for us """ + return self._locate_more_shareholders() + + def _locate_more_shareholders(self): d = self._query_peers() - def _done(res): - if not self.unallocated_sharenums: - return self._used_peers - if not self.usable_peers: - if len(self.unallocated_sharenums) < (self.total_shares - self.shares_of_happiness): - # close enough - return self._used_peers - raise NotEnoughPeersError - return self._query_peers() - d.addCallback(_done) + d.addCallback(self._located_some_shareholders) return d + def _located_some_shareholders(self, res): + log.msg("_located_some_shareholders") + log.msg(" still need homes for %d shares, still have %d usable peers" % (len(self.unallocated_sharenums), len(self.usable_peers))) + if not self.unallocated_sharenums: + # Finished allocating places for all shares. + log.msg("%s._locate_all_shareholders() Finished allocating places for all shares.") + log.msg("used_peers is %s" % (self.used_peers,)) + return self.used_peers + if not self.usable_peers: + # Ran out of peers who have space. + log.msg("%s._locate_all_shareholders() Ran out of peers who have space.") + if len(self.unallocated_sharenums) < (self.total_shares - self.shares_of_happiness): + # But we allocated places for enough shares. + log.msg("%s._locate_all_shareholders() But we allocated places for enough shares.") + return self.used_peers + raise NotEnoughPeersError + # we need to keep trying + return self._locate_more_shareholders() + + def _create_ring_of_things(self): + PEER = 1 # must sort later than SHARE, for consistency with download + SHARE = 0 + ring_of_things = [] # a list of (position_in_ring, whatami, x) where whatami is SHARE if x is a sharenum or else PEER if x is a PeerTracker instance + ring_of_things.extend([ (peer.permutedid, PEER, peer,) + for peer in self.usable_peers ]) + shares = [ (i * 2**160 / self.total_shares, SHARE, i) + for i in self.unallocated_sharenums] + ring_of_things.extend(shares) + ring_of_things.sort() + ring_of_things = collections.deque(ring_of_things) + return ring_of_things + def _query_peers(self): """ @return: a deferred that fires when all queries have resolved """ + PEER = 1 + SHARE = 0 + ring = self._create_ring_of_things() + # Choose a random starting point, talk to that peer. - self.ring_things.rotate(random.randrange(0, len(self.ring_things))) + ring.rotate(random.randrange(0, len(ring))) # Walk backwards to find a peer. We know that we'll eventually find # one because we earlier asserted that there was at least one. - while self.ring_things[0][1] != 1: - self.ring_things.rotate(-1) - startingpoint = self.ring_things[0] - peer = startingpoint[2] + while ring[0][1] != PEER: + ring.rotate(-1) + peer = ring[0][2] assert isinstance(peer, PeerTracker), peer - self.ring_things.rotate(-1) + ring.rotate(-1) # loop invariant: at the top of the loop, we are always one step to # the left of a peer, which is stored in the peer variable. outstanding_queries = [] - while self.ring_things[0] != startingpoint: - # Walk backwards to find the previous peer (could be the same one). - # Accumulate all shares that we find along the way. - sharenums_to_query = set() - while self.ring_things[0][1] != 1: - sharenums_to_query.add(self.ring_things[0][2]) - self.ring_things.rotate(-1) - - d = peer.query(sharenums_to_query) - d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,)) - outstanding_queries.append(d) - - peer = self.ring_things[0][2] - assert isinstance(peer, PeerTracker), peer - self.ring_things.rotate(-1) - + sharenums_to_query = set() + for i in range(len(ring)): + if ring[0][1] == SHARE: + sharenums_to_query.add(ring[0][2]) + else: + d = peer.query(sharenums_to_query) + d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,)) + outstanding_queries.append(d) + d.addErrback(log.err) + peer = ring[0][2] + sharenums_to_query = set() + ring.rotate(-1) + return defer.DeferredList(outstanding_queries) def _got_response(self, (alreadygot, allocated), peer, shares_we_requested): @@ -167,20 +189,29 @@ class FileUploader: @type alreadygot: a set of sharenums @type allocated: a set of sharenums """ + log.msg("%s._got_response(%s, %s, %s): self.unallocated_sharenums: %s, unhandled: %s" % (self, (alreadygot, allocated), peer, shares_we_requested, self.unallocated_sharenums, shares_we_requested - alreadygot - allocated)) self.unallocated_sharenums -= alreadygot self.unallocated_sharenums -= allocated if allocated: - self.used_peers.add(peer) + self.usable_peers.add(peer) if shares_we_requested - alreadygot - allocated: + log.msg("%s._got_response(%s, %s, %s): self.unallocated_sharenums: %s, unhandled: %s HE'S FULL" % (self, (alreadygot, allocated), peer, shares_we_requested, self.unallocated_sharenums, shares_we_requested - alreadygot - allocated)) # Then he didn't accept some of the shares, so he's full. self.usable_peers.remove(peer) def _got_error(self, f, peer): - self.usable_peers -= peer + log.msg("%s._got_error(%s, %s)" % (self, f, peer,)) + self.usable_peers.remove(peer) def _send_shares(self, used_peers): + """ + @param used_peers: a sequence of PeerTracker objects + """ + log.msg("_send_shares, used_peers is %s" % (used_peers,)) + for peer in used_peers: + assert isinstance(peer, PeerTracker) buckets = {} for peer in used_peers: buckets.update(peer.buckets)