"""
+class NotEnoughPeersError(Exception):
+ pass
+
KiB=1024
MiB=1024*KiB
GiB=1024*MiB
class Encoder(object):
implements(IEncoder)
NEEDED_SHARES = 25
+ SHARES_OF_HAPPINESS = 75
TOTAL_SHARES = 100
MAX_SEGMENT_SIZE = 2*MiB
object.__init__(self)
self.MAX_SEGMENT_SIZE = options.get("max_segment_size",
self.MAX_SEGMENT_SIZE)
- k,n = options.get("needed_and_total_shares",
- (self.NEEDED_SHARES, self.TOTAL_SHARES))
+ k,happy,n = options.get("needed_and_happy_and_total_shares",
+ (self.NEEDED_SHARES,
+ self.SHARES_OF_HAPPINESS,
+ self.TOTAL_SHARES))
self.NEEDED_SHARES = k
+ self.SHARES_OF_HAPPINESS = happy
self.TOTAL_SHARES = n
self.thingA_data = {}
self.num_shares = self.TOTAL_SHARES
self.required_shares = self.NEEDED_SHARES
+ self.shares_of_happiness = self.SHARES_OF_HAPPINESS
self.segment_size = min(self.MAX_SEGMENT_SIZE, self.file_size)
# this must be a multiple of self.required_shares
dl.append(d)
subshare_hash = block_hash(subshare)
self.subshare_hashes[shareid].append(subshare_hash)
- dl = defer.DeferredList(dl)
+ dl = defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
def _logit(res):
log.msg("%s uploaded %s / %s bytes of your file." % (self, self.segment_size*(segnum+1), self.segment_size*self.num_segments))
return res
if shareid not in self.landlords:
return defer.succeed(None)
sh = self.landlords[shareid]
- return sh.callRemote("put_block", segment_num, subshare)
+ d = sh.callRemote("put_block", segment_num, subshare)
+ d.addErrback(self._remove_shareholder, shareid,
+ "segnum=%d" % segment_num)
+ return d
+
+ def _remove_shareholder(self, why, shareid, where):
+ log.msg("error while sending %s to shareholder=%d: %s" %
+ (where, shareid, why)) # UNUSUAL
+ del self.landlords[shareid]
+ if len(self.landlords) < self.shares_of_happiness:
+ msg = "lost too many shareholders during upload"
+ raise NotEnoughPeersError(msg)
def send_all_subshare_hash_trees(self):
log.msg("%s sending subshare hash trees" % self)
# hashes is a list of the hashes of all subshares that were sent
# to shareholder[shareid].
dl.append(self.send_one_subshare_hash_tree(shareid, hashes))
- return defer.DeferredList(dl)
+ return defer.DeferredList(dl, fireOnOneErrback=True,
+ consumeErrors=True)
def send_one_subshare_hash_tree(self, shareid, subshare_hashes):
t = HashTree(subshare_hashes)
if shareid not in self.landlords:
return defer.succeed(None)
sh = self.landlords[shareid]
- return sh.callRemote("put_block_hashes", all_hashes)
+ d = sh.callRemote("put_block_hashes", all_hashes)
+ d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
+ return d
def send_all_share_hash_trees(self):
# each bucket gets a set of share hash tree nodes that are needed to
needed_hash_indices = t.needed_hashes(i, include_leaf=True)
hashes = [(hi, t[hi]) for hi in needed_hash_indices]
dl.append(self.send_one_share_hash_tree(i, hashes))
- return defer.DeferredList(dl)
+ return defer.DeferredList(dl, fireOnOneErrback=True,
+ consumeErrors=True)
def send_one_share_hash_tree(self, shareid, needed_hashes):
if shareid not in self.landlords:
return defer.succeed(None)
sh = self.landlords[shareid]
- return sh.callRemote("put_share_hashes", needed_hashes)
+ d = sh.callRemote("put_share_hashes", needed_hashes)
+ d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
+ return d
def send_thingA_to_all_shareholders(self):
log.msg("%s: sending thingA" % self)
thingA = bencode.bencode(self.thingA_data)
self.thingA_hash = thingA_hash(thingA)
dl = []
- for sh in self.landlords.values():
- dl.append(self.send_thingA(sh, thingA))
- return defer.DeferredList(dl)
+ for shareid in self.landlords.keys():
+ dl.append(self.send_thingA(shareid, thingA))
+ return defer.DeferredList(dl, fireOnOneErrback=True,
+ consumeErrors=True)
- def send_thingA(self, sh, thingA):
- return sh.callRemote("put_thingA", thingA)
+ def send_thingA(self, shareid, thingA):
+ sh = self.landlords[shareid]
+ d = sh.callRemote("put_thingA", thingA)
+ d.addErrback(self._remove_shareholder, shareid, "put_thingA")
+ return d
def close_all_shareholders(self):
log.msg("%s: closing shareholders" % self)
dl = []
for shareid in self.landlords:
- dl.append(self.landlords[shareid].callRemote("close"))
- return defer.DeferredList(dl)
+ d = self.landlords[shareid].callRemote("close")
+ d.addErrback(self._remove_shareholder, shareid, "close")
+ dl.append(d)
+ return defer.DeferredList(dl, fireOnOneErrback=True,
+ consumeErrors=True)
def done(self):
log.msg("%s: upload done" % self)
return self.thingA_hash
def err(self, f):
- log.msg("%s: upload failed: %s" % (self, f))
+ log.msg("%s: upload failed: %s" % (self, f)) # UNUSUAL
+ if f.check(defer.FirstError):
+ return f.value.subFailure
return f
else:
return (set(), dict([(shnum, FakeBucketWriter(),) for shnum in sharenums]),)
+class LostPeerError(Exception):
+ pass
+
class FakeBucketWriter:
# these are used for both reading and writing
def __init__(self, mode="good"):
def put_block(self, segmentnum, data):
assert not self.closed
assert segmentnum not in self.blocks
+ if self.mode == "lost" and segmentnum >= 1:
+ raise LostPeerError("I'm going away now")
self.blocks[segmentnum] = data
-
+
def put_block_hashes(self, blockhashes):
assert not self.closed
assert self.block_hashes is None
return self.do_encode(25, 101, 100, 5, 15, 8)
class Roundtrip(unittest.TestCase):
- def send_and_recover(self, k_and_n=(25,100),
+ def send_and_recover(self, k_and_happy_and_n=(25,75,100),
AVAILABLE_SHARES=None,
datalen=76,
max_segment_size=25,
- bucket_modes={}):
- NUM_SHARES = k_and_n[1]
+ bucket_modes={},
+ ):
+ NUM_SHARES = k_and_happy_and_n[2]
if AVAILABLE_SHARES is None:
AVAILABLE_SHARES = NUM_SHARES
data = make_data(datalen)
# force use of multiple segments
options = {"max_segment_size": max_segment_size,
- "needed_and_total_shares": k_and_n}
+ "needed_and_happy_and_total_shares": k_and_happy_and_n}
e = encode.Encoder(options)
nonkey = "\x00" * 16
e.setup(StringIO(data), nonkey)
fd._got_thingA(thingA_data)
for shnum in range(AVAILABLE_SHARES):
bucket = all_shareholders[shnum]
- fd.add_share_bucket(shnum, bucket)
+ if bucket.closed:
+ fd.add_share_bucket(shnum, bucket)
fd._got_all_shareholders(None)
fd._create_validated_buckets(None)
d2 = fd._download_all_segments(None)
return d
def test_not_enough_shares(self):
- d = self.send_and_recover((4,10), AVAILABLE_SHARES=2)
+ d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2)
def _done(res):
self.failUnless(isinstance(res, Failure))
self.failUnless(res.check(download.NotEnoughPeersError))
for i in range(6)]
+ [(i, "good")
for i in range(6, 10)])
- return self.send_and_recover((4,10), bucket_modes=modemap)
+ return self.send_and_recover((4,8,10), bucket_modes=modemap)
def test_bad_blocks_failure(self):
# the first 7 servers have bad blocks, which will be caught by the
for i in range(7)]
+ [(i, "good")
for i in range(7, 10)])
- d = self.send_and_recover((4,10), bucket_modes=modemap)
+ d = self.send_and_recover((4,8,10), bucket_modes=modemap)
def _done(res):
self.failUnless(isinstance(res, Failure))
self.failUnless(res.check(download.NotEnoughPeersError))
for i in range(6)]
+ [(i, "good")
for i in range(6, 10)])
- return self.send_and_recover((4,10), bucket_modes=modemap)
+ return self.send_and_recover((4,8,10), bucket_modes=modemap)
def test_bad_blockhashes_failure(self):
# the first 7 servers have bad block hashes, so the blockhash tree
for i in range(7)]
+ [(i, "good")
for i in range(7, 10)])
- d = self.send_and_recover((4,10), bucket_modes=modemap)
+ d = self.send_and_recover((4,8,10), bucket_modes=modemap)
def _done(res):
self.failUnless(isinstance(res, Failure))
self.failUnless(res.check(download.NotEnoughPeersError))
for i in range(6)]
+ [(i, "good")
for i in range(6, 10)])
- return self.send_and_recover((4,10), bucket_modes=modemap)
+ return self.send_and_recover((4,8,10), bucket_modes=modemap)
def test_bad_sharehashes_failure(self):
# the first 7 servers have bad block hashes, so the sharehash tree
for i in range(7)]
+ [(i, "good")
for i in range(7, 10)])
- d = self.send_and_recover((4,10), bucket_modes=modemap)
+ d = self.send_and_recover((4,8,10), bucket_modes=modemap)
def _done(res):
self.failUnless(isinstance(res, Failure))
self.failUnless(res.check(download.NotEnoughPeersError))
for i in range(6)]
+ [(i, "good")
for i in range(6, 10)])
- return self.send_and_recover((4,10), bucket_modes=modemap)
+ return self.send_and_recover((4,8,10), bucket_modes=modemap)
def test_missing_sharehashes_failure(self):
# the first 7 servers are missing their sharehashes, so the
for i in range(7)]
+ [(i, "good")
for i in range(7, 10)])
- d = self.send_and_recover((4,10), bucket_modes=modemap)
+ d = self.send_and_recover((4,8,10), bucket_modes=modemap)
def _done(res):
self.failUnless(isinstance(res, Failure))
self.failUnless(res.check(download.NotEnoughPeersError))
d.addBoth(_done)
return d
+ def test_lost_one_shareholder(self):
+ # we have enough shareholders when we start, but one segment in we
+ # lose one of them. The upload should still succeed, as long as we
+ # still have 'shares_of_happiness' peers left.
+ modemap = dict([(i, "good") for i in range(9)] +
+ [(i, "lost") for i in range(9, 10)])
+ return self.send_and_recover((4,8,10), bucket_modes=modemap)
+
+ def test_lost_many_shareholders(self):
+ # we have enough shareholders when we start, but one segment in we
+ # lose all but one of them. The upload should fail.
+ modemap = dict([(i, "good") for i in range(1)] +
+ [(i, "lost") for i in range(1, 10)])
+ d = self.send_and_recover((4,8,10), bucket_modes=modemap)
+ def _done(res):
+ self.failUnless(isinstance(res, Failure))
+ self.failUnless(res.check(encode.NotEnoughPeersError))
+ d.addBoth(_done)
+ return d
+
+ def test_lost_all_shareholders(self):
+ # we have enough shareholders when we start, but one segment in we
+ # lose all of them. The upload should fail.
+ modemap = dict([(i, "lost") for i in range(10)])
+ d = self.send_and_recover((4,8,10), bucket_modes=modemap)
+ def _done(res):
+ self.failUnless(isinstance(res, Failure))
+ self.failUnless(res.check(encode.NotEnoughPeersError))
+ d.addBoth(_done)
+ return d
+