from twisted.trial import unittest
+from twisted.python import log
+from twisted.python.failure import Failure
from twisted.internet import defer
from cStringIO import StringIO
+from foolscap import eventual
+
from allmydata import upload
from allmydata.uri import unpack_uri
+from test_encode import FakePeer
+
class FakeStorageServer:
- pass
+ def __init__(self, mode):
+ self.mode = mode
+ def callRemote(self, methname, *args, **kwargs):
+ def _call():
+ meth = getattr(self, methname)
+ return meth(*args, **kwargs)
+ d = eventual.fireEventually()
+ d.addCallback(lambda res: _call())
+ return d
+ def allocate_buckets(self, verifierid, sharenums, shareize, blocksize, canary):
+ if self.mode == "full":
+ return (set(), {},)
+ elif self.mode == "already got them":
+ return (set(sharenums), {},)
+ else:
+ return (set(), dict([(shnum, FakePeer(),) for shnum in sharenums]),)
class FakeClient:
+ def __init__(self, mode="good"):
+ self.mode = mode
def get_permuted_peers(self, verifierid):
- return [ ("%20d"%fakeid, "%20d"%fakeid, FakeStorageServer(),) for fakeid in range(50) ]
+ return [ ("%20d"%fakeid, "%20d"%fakeid, FakeStorageServer(self.mode),) for fakeid in range(50) ]
-class Uploader(unittest.TestCase):
+class GoodServer(unittest.TestCase):
def setUp(self):
- self.node = FakeClient()
+ self.node = FakeClient(mode="good")
self.u = upload.Uploader()
self.u.running = True
self.u.parent = self.node
d = self.u.upload_filename(fn)
d.addCallback(self._check)
return d
+
+class FullServer(unittest.TestCase):
+ def setUp(self):
+ self.node = FakeClient(mode="full")
+ self.u = upload.Uploader()
+ self.u.running = True
+ self.u.parent = self.node
+
+ def _should_fail(self, f):
+ self.failUnless(isinstance(f, Failure) and f.check(upload.NotEnoughPeersError))
+
+ def testData(self):
+ data = "This is some data to upload"
+ d = self.u.upload_data(data)
+ d.addBoth(self._should_fail)
+ return d
+
from twisted.application import service
from foolscap import Referenceable
-from allmydata.util import idlib, mathutil
+from allmydata.util import idlib
+from allmydata.util.assertutil import _assert
from allmydata import encode_new
from allmydata.uri import pack_uri
from allmydata.interfaces import IUploadable, IUploader
pass
class PeerTracker:
- def __init__(self, peerid, connection, sharesize, blocksize, verifierid):
+ def __init__(self, peerid, permutedid, connection, sharesize, blocksize, verifierid):
self.peerid = peerid
+ self.permutedid = permutedid
self.connection = connection
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
self.verifierid = verifierid
def query(self, sharenums):
- d = self.connection.callRemote("allocate_buckets", self._verifierid,
+ d = self.connection.callRemote("allocate_buckets", self.verifierid,
sharenums, self.sharesize,
self.blocksize, canary=Referenceable())
d.addCallback(self._got_reply)
return d
def _got_reply(self, (alreadygot, buckets)):
+ log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
self.buckets.update(buckets)
return (alreadygot, set(buckets.keys()))
assert len(vid) == 20
self._verifierid = vid
-
def start(self):
"""Start uploading the file.
# responsible for handling the data and sending out the shares.
peers = self._client.get_permuted_peers(self._verifierid)
assert peers
- trackers = [ (permutedid, PeerTracker(peerid, conn, share_size, block_size, self._verifierid),)
+ trackers = [ PeerTracker(peerid, permutedid, conn, share_size, block_size, self._verifierid)
for permutedid, peerid, conn in peers ]
- ring_things = [] # a list of (position_in_ring, whatami, x) where whatami is 0 if x is a sharenum or else 1 if x is a PeerTracker instance
- ring_things.extend([ (permutedpeerid, 1, peer,) for permutedpeerid, peer in trackers ])
- shares = [ (i * 2**160 / self.total_shares, 0, i) for i in range(self.total_shares) ]
- ring_things.extend(shares)
- ring_things.sort()
- self.ring_things = collections.deque(ring_things)
- self.usable_peers = set([peer for permutedid, peer in trackers])
- self.used_peers = set()
- self.unallocated_sharenums = set(shares)
+ self.usable_peers = set(trackers) # this set shrinks over time
+ self.used_peers = set() # while this set grows
+ self.unallocated_sharenums = set(range(self.total_shares)) # this one shrinks
d = self._locate_all_shareholders()
d.addCallback(self._send_shares)
@return: a set of PeerTracker instances that have agreed to hold some
shares for us
"""
+ return self._locate_more_shareholders()
+
+ def _locate_more_shareholders(self):
d = self._query_peers()
- def _done(res):
- if not self.unallocated_sharenums:
- return self._used_peers
- if not self.usable_peers:
- if len(self.unallocated_sharenums) < (self.total_shares - self.shares_of_happiness):
- # close enough
- return self._used_peers
- raise NotEnoughPeersError
- return self._query_peers()
- d.addCallback(_done)
+ d.addCallback(self._located_some_shareholders)
return d
+ def _located_some_shareholders(self, res):
+ log.msg("_located_some_shareholders")
+ log.msg(" still need homes for %d shares, still have %d usable peers" % (len(self.unallocated_sharenums), len(self.usable_peers)))
+ if not self.unallocated_sharenums:
+ # Finished allocating places for all shares.
+ log.msg("%s._locate_all_shareholders() Finished allocating places for all shares.")
+ log.msg("used_peers is %s" % (self.used_peers,))
+ return self.used_peers
+ if not self.usable_peers:
+ # Ran out of peers who have space.
+ log.msg("%s._locate_all_shareholders() Ran out of peers who have space.")
+ if len(self.unallocated_sharenums) < (self.total_shares - self.shares_of_happiness):
+ # But we allocated places for enough shares.
+ log.msg("%s._locate_all_shareholders() But we allocated places for enough shares.")
+ return self.used_peers
+ raise NotEnoughPeersError
+ # we need to keep trying
+ return self._locate_more_shareholders()
+
+ def _create_ring_of_things(self):
+ PEER = 1 # must sort later than SHARE, for consistency with download
+ SHARE = 0
+ ring_of_things = [] # a list of (position_in_ring, whatami, x) where whatami is SHARE if x is a sharenum or else PEER if x is a PeerTracker instance
+ ring_of_things.extend([ (peer.permutedid, PEER, peer,)
+ for peer in self.usable_peers ])
+ shares = [ (i * 2**160 / self.total_shares, SHARE, i)
+ for i in self.unallocated_sharenums]
+ ring_of_things.extend(shares)
+ ring_of_things.sort()
+ ring_of_things = collections.deque(ring_of_things)
+ return ring_of_things
+
def _query_peers(self):
"""
@return: a deferred that fires when all queries have resolved
"""
+ PEER = 1
+ SHARE = 0
+ ring = self._create_ring_of_things()
+
# Choose a random starting point, talk to that peer.
- self.ring_things.rotate(random.randrange(0, len(self.ring_things)))
+ ring.rotate(random.randrange(0, len(ring)))
# Walk backwards to find a peer. We know that we'll eventually find
# one because we earlier asserted that there was at least one.
- while self.ring_things[0][1] != 1:
- self.ring_things.rotate(-1)
- startingpoint = self.ring_things[0]
- peer = startingpoint[2]
+ while ring[0][1] != PEER:
+ ring.rotate(-1)
+ peer = ring[0][2]
assert isinstance(peer, PeerTracker), peer
- self.ring_things.rotate(-1)
+ ring.rotate(-1)
# loop invariant: at the top of the loop, we are always one step to
# the left of a peer, which is stored in the peer variable.
outstanding_queries = []
- while self.ring_things[0] != startingpoint:
- # Walk backwards to find the previous peer (could be the same one).
- # Accumulate all shares that we find along the way.
- sharenums_to_query = set()
- while self.ring_things[0][1] != 1:
- sharenums_to_query.add(self.ring_things[0][2])
- self.ring_things.rotate(-1)
-
- d = peer.query(sharenums_to_query)
- d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
- outstanding_queries.append(d)
-
- peer = self.ring_things[0][2]
- assert isinstance(peer, PeerTracker), peer
- self.ring_things.rotate(-1)
-
+ sharenums_to_query = set()
+ for i in range(len(ring)):
+ if ring[0][1] == SHARE:
+ sharenums_to_query.add(ring[0][2])
+ else:
+ d = peer.query(sharenums_to_query)
+ d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
+ outstanding_queries.append(d)
+ d.addErrback(log.err)
+ peer = ring[0][2]
+ sharenums_to_query = set()
+ ring.rotate(-1)
+
return defer.DeferredList(outstanding_queries)
def _got_response(self, (alreadygot, allocated), peer, shares_we_requested):
@type alreadygot: a set of sharenums
@type allocated: a set of sharenums
"""
+ log.msg("%s._got_response(%s, %s, %s): self.unallocated_sharenums: %s, unhandled: %s" % (self, (alreadygot, allocated), peer, shares_we_requested, self.unallocated_sharenums, shares_we_requested - alreadygot - allocated))
self.unallocated_sharenums -= alreadygot
self.unallocated_sharenums -= allocated
if allocated:
- self.used_peers.add(peer)
+ self.usable_peers.add(peer)
if shares_we_requested - alreadygot - allocated:
+ log.msg("%s._got_response(%s, %s, %s): self.unallocated_sharenums: %s, unhandled: %s HE'S FULL" % (self, (alreadygot, allocated), peer, shares_we_requested, self.unallocated_sharenums, shares_we_requested - alreadygot - allocated))
# Then he didn't accept some of the shares, so he's full.
self.usable_peers.remove(peer)
def _got_error(self, f, peer):
- self.usable_peers -= peer
+ log.msg("%s._got_error(%s, %s)" % (self, f, peer,))
+ self.usable_peers.remove(peer)
def _send_shares(self, used_peers):
+ """
+ @param used_peers: a sequence of PeerTracker objects
+ """
+ log.msg("_send_shares, used_peers is %s" % (used_peers,))
+ for peer in used_peers:
+ assert isinstance(peer, PeerTracker)
buckets = {}
for peer in used_peers:
buckets.update(peer.buckets)