]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
add unit tests and fix bugs in upload
authorZooko O'Whielacronx <zooko@zooko.com>
Fri, 30 Mar 2007 21:54:33 +0000 (14:54 -0700)
committerZooko O'Whielacronx <zooko@zooko.com>
Fri, 30 Mar 2007 21:54:33 +0000 (14:54 -0700)
src/allmydata/test/test_upload.py
src/allmydata/upload.py

index 0328ae9e0f4a9746e968334401c34860bb2bbb53..fbf6327a9b22ae4e824a56495796b56cd20d3162 100644 (file)
@@ -1,21 +1,44 @@
 
 from twisted.trial import unittest
+from twisted.python import log
+from twisted.python.failure import Failure
 from twisted.internet import defer
 from cStringIO import StringIO
 
+from foolscap import eventual
+
 from allmydata import upload
 from allmydata.uri import unpack_uri
 
+from test_encode import FakePeer
+
 class FakeStorageServer:
-    pass
+    def __init__(self, mode):
+        self.mode = mode
+    def callRemote(self, methname, *args, **kwargs):
+        def _call():
+            meth = getattr(self, methname)
+            return meth(*args, **kwargs)
+        d = eventual.fireEventually()
+        d.addCallback(lambda res: _call())
+        return d
+    def allocate_buckets(self, verifierid, sharenums, shareize, blocksize, canary):
+        if self.mode == "full":
+            return (set(), {},)
+        elif self.mode == "already got them":
+            return (set(sharenums), {},)
+        else:
+            return (set(), dict([(shnum, FakePeer(),) for shnum in sharenums]),)
 
 class FakeClient:
+    def __init__(self, mode="good"):
+        self.mode = mode
     def get_permuted_peers(self, verifierid):
-        return [ ("%20d"%fakeid, "%20d"%fakeid, FakeStorageServer(),) for fakeid in range(50) ]
+        return [ ("%20d"%fakeid, "%20d"%fakeid, FakeStorageServer(self.mode),) for fakeid in range(50) ]
 
-class Uploader(unittest.TestCase):
+class GoodServer(unittest.TestCase):
     def setUp(self):
-        self.node = FakeClient()
+        self.node = FakeClient(mode="good")
         self.u = upload.Uploader()
         self.u.running = True
         self.u.parent = self.node
@@ -51,3 +74,20 @@ class Uploader(unittest.TestCase):
         d = self.u.upload_filename(fn)
         d.addCallback(self._check)
         return d
+
+class FullServer(unittest.TestCase):
+    def setUp(self):
+        self.node = FakeClient(mode="full")
+        self.u = upload.Uploader()
+        self.u.running = True
+        self.u.parent = self.node
+
+    def _should_fail(self, f):
+        self.failUnless(isinstance(f, Failure) and f.check(upload.NotEnoughPeersError))
+
+    def testData(self):
+        data = "This is some data to upload"
+        d = self.u.upload_data(data)
+        d.addBoth(self._should_fail)
+        return d
+
index 5a7964a5ca94cba3c79d901eb3c2d8e238220b79..7559319d5d47aba1bb6961948113ddd6850fa951 100644 (file)
@@ -4,7 +4,8 @@ from twisted.internet import defer
 from twisted.application import service
 from foolscap import Referenceable
 
-from allmydata.util import idlib, mathutil
+from allmydata.util import idlib
+from allmydata.util.assertutil import _assert
 from allmydata import encode_new
 from allmydata.uri import pack_uri
 from allmydata.interfaces import IUploadable, IUploader
@@ -24,8 +25,9 @@ class TooFullError(Exception):
     pass
 
 class PeerTracker:
-    def __init__(self, peerid, connection, sharesize, blocksize, verifierid):
+    def __init__(self, peerid, permutedid, connection, sharesize, blocksize, verifierid):
         self.peerid = peerid
+        self.permutedid = permutedid
         self.connection = connection
         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
         self.sharesize = sharesize
@@ -33,13 +35,14 @@ class PeerTracker:
         self.verifierid = verifierid
 
     def query(self, sharenums):
-        d = self.connection.callRemote("allocate_buckets", self._verifierid,
+        d = self.connection.callRemote("allocate_buckets", self.verifierid,
                                        sharenums, self.sharesize,
                                        self.blocksize, canary=Referenceable())
         d.addCallback(self._got_reply)
         return d
         
     def _got_reply(self, (alreadygot, buckets)):
+        log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
         self.buckets.update(buckets)
         return (alreadygot, set(buckets.keys()))
 
@@ -65,7 +68,6 @@ class FileUploader:
         assert len(vid) == 20
         self._verifierid = vid
 
-
     def start(self):
         """Start uploading the file.
 
@@ -90,17 +92,11 @@ class FileUploader:
         # responsible for handling the data and sending out the shares.
         peers = self._client.get_permuted_peers(self._verifierid)
         assert peers
-        trackers = [ (permutedid, PeerTracker(peerid, conn, share_size, block_size, self._verifierid),)
+        trackers = [ PeerTracker(peerid, permutedid, conn, share_size, block_size, self._verifierid)
                      for permutedid, peerid, conn in peers ]
-        ring_things = [] # a list of (position_in_ring, whatami, x) where whatami is 0 if x is a sharenum or else 1 if x is a PeerTracker instance
-        ring_things.extend([ (permutedpeerid, 1, peer,) for permutedpeerid, peer in trackers ])
-        shares = [ (i * 2**160 / self.total_shares, 0, i) for i in range(self.total_shares) ]
-        ring_things.extend(shares)
-        ring_things.sort()
-        self.ring_things = collections.deque(ring_things)
-        self.usable_peers = set([peer for permutedid, peer in trackers])
-        self.used_peers = set()
-        self.unallocated_sharenums = set(shares)
+        self.usable_peers = set(trackers) # this set shrinks over time
+        self.used_peers = set() # while this set grows
+        self.unallocated_sharenums = set(range(self.total_shares)) # this one shrinks
 
         d = self._locate_all_shareholders()
         d.addCallback(self._send_shares)
@@ -112,54 +108,80 @@ class FileUploader:
         @return: a set of PeerTracker instances that have agreed to hold some
             shares for us
         """
+        return self._locate_more_shareholders()
+
+    def _locate_more_shareholders(self):
         d = self._query_peers()
-        def _done(res):
-            if not self.unallocated_sharenums:
-                return self._used_peers
-            if not self.usable_peers:
-                if len(self.unallocated_sharenums) < (self.total_shares - self.shares_of_happiness):
-                    # close enough
-                    return self._used_peers
-                raise NotEnoughPeersError
-            return self._query_peers()
-        d.addCallback(_done)
+        d.addCallback(self._located_some_shareholders)
         return d
 
+    def _located_some_shareholders(self, res):
+        log.msg("_located_some_shareholders")
+        log.msg(" still need homes for %d shares, still have %d usable peers" % (len(self.unallocated_sharenums), len(self.usable_peers)))
+        if not self.unallocated_sharenums:
+            # Finished allocating places for all shares.
+            log.msg("%s._locate_all_shareholders() Finished allocating places for all shares.")
+            log.msg("used_peers is %s" % (self.used_peers,))
+            return self.used_peers
+        if not self.usable_peers:
+            # Ran out of peers who have space.
+            log.msg("%s._locate_all_shareholders() Ran out of peers who have space.")
+            if len(self.unallocated_sharenums) < (self.total_shares - self.shares_of_happiness):
+                # But we allocated places for enough shares.
+                log.msg("%s._locate_all_shareholders() But we allocated places for enough shares.")
+                return self.used_peers
+            raise NotEnoughPeersError
+        # we need to keep trying
+        return self._locate_more_shareholders()
+
+    def _create_ring_of_things(self):
+        PEER = 1 # must sort later than SHARE, for consistency with download
+        SHARE = 0
+        ring_of_things = [] # a list of (position_in_ring, whatami, x) where whatami is SHARE if x is a sharenum or else PEER if x is a PeerTracker instance
+        ring_of_things.extend([ (peer.permutedid, PEER, peer,)
+                                for peer in self.usable_peers ])
+        shares = [ (i * 2**160 / self.total_shares, SHARE, i)
+                   for i in self.unallocated_sharenums]
+        ring_of_things.extend(shares)
+        ring_of_things.sort()
+        ring_of_things = collections.deque(ring_of_things)
+        return ring_of_things
+        
     def _query_peers(self):
         """
         @return: a deferred that fires when all queries have resolved
         """
+        PEER = 1
+        SHARE = 0
+        ring = self._create_ring_of_things()
+
         # Choose a random starting point, talk to that peer.
-        self.ring_things.rotate(random.randrange(0, len(self.ring_things)))
+        ring.rotate(random.randrange(0, len(ring)))
 
         # Walk backwards to find a peer.  We know that we'll eventually find
         # one because we earlier asserted that there was at least one.
-        while self.ring_things[0][1] != 1:
-            self.ring_things.rotate(-1)
-        startingpoint = self.ring_things[0]
-        peer = startingpoint[2]
+        while ring[0][1] != PEER:
+            ring.rotate(-1)
+        peer = ring[0][2]
         assert isinstance(peer, PeerTracker), peer
-        self.ring_things.rotate(-1)
+        ring.rotate(-1)
 
         # loop invariant: at the top of the loop, we are always one step to
         # the left of a peer, which is stored in the peer variable.
         outstanding_queries = []
-        while self.ring_things[0] != startingpoint:
-            # Walk backwards to find the previous peer (could be the same one).
-            # Accumulate all shares that we find along the way.
-            sharenums_to_query = set()
-            while self.ring_things[0][1] != 1:
-                sharenums_to_query.add(self.ring_things[0][2])
-                self.ring_things.rotate(-1)
-
-            d = peer.query(sharenums_to_query)
-            d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
-            outstanding_queries.append(d)
-
-            peer = self.ring_things[0][2]
-            assert isinstance(peer, PeerTracker), peer
-            self.ring_things.rotate(-1)
-
+        sharenums_to_query = set()
+        for i in range(len(ring)):
+            if ring[0][1] == SHARE:
+                sharenums_to_query.add(ring[0][2])
+            else:
+                d = peer.query(sharenums_to_query)
+                d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
+                outstanding_queries.append(d)
+                d.addErrback(log.err)
+                peer = ring[0][2]
+                sharenums_to_query = set()
+            ring.rotate(-1)
+        
         return defer.DeferredList(outstanding_queries)
 
     def _got_response(self, (alreadygot, allocated), peer, shares_we_requested):
@@ -167,20 +189,29 @@ class FileUploader:
         @type alreadygot: a set of sharenums
         @type allocated: a set of sharenums
         """
+        log.msg("%s._got_response(%s, %s, %s): self.unallocated_sharenums: %s, unhandled: %s" % (self, (alreadygot, allocated), peer, shares_we_requested, self.unallocated_sharenums, shares_we_requested - alreadygot - allocated))
         self.unallocated_sharenums -= alreadygot
         self.unallocated_sharenums -= allocated
 
         if allocated:
-            self.used_peers.add(peer)
+            self.usable_peers.add(peer)
 
         if shares_we_requested - alreadygot - allocated:
+            log.msg("%s._got_response(%s, %s, %s): self.unallocated_sharenums: %s, unhandled: %s HE'S FULL" % (self, (alreadygot, allocated), peer, shares_we_requested, self.unallocated_sharenums, shares_we_requested - alreadygot - allocated))
             # Then he didn't accept some of the shares, so he's full.
             self.usable_peers.remove(peer)
 
     def _got_error(self, f, peer):
-        self.usable_peers -= peer
+        log.msg("%s._got_error(%s, %s)" % (self, f, peer,))
+        self.usable_peers.remove(peer)
 
     def _send_shares(self, used_peers):
+        """
+        @param used_peers: a sequence of PeerTracker objects
+        """
+        log.msg("_send_shares, used_peers is %s" % (used_peers,))
+        for peer in used_peers:
+            assert isinstance(peer, PeerTracker)
         buckets = {}
         for peer in used_peers:
             buckets.update(peer.buckets)