From 6bb9debc166df7565363e9036ae7a4deac9733a2 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Wed, 6 Jun 2007 10:32:40 -0700 Subject: [PATCH] encode: tolerate lost peers, as long as we still get enough shares out. Closes #17. --- src/allmydata/encode.py | 66 ++++++++++++++++++++++-------- src/allmydata/test/test_encode.py | 68 ++++++++++++++++++++++++------- 2 files changed, 103 insertions(+), 31 deletions(-) diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index 5581b5fc..57b2c3e9 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -58,6 +58,9 @@ hash tree is put into the URI. """ +class NotEnoughPeersError(Exception): + pass + KiB=1024 MiB=1024*KiB GiB=1024*MiB @@ -67,6 +70,7 @@ PiB=1024*TiB class Encoder(object): implements(IEncoder) NEEDED_SHARES = 25 + SHARES_OF_HAPPINESS = 75 TOTAL_SHARES = 100 MAX_SEGMENT_SIZE = 2*MiB @@ -74,9 +78,12 @@ class Encoder(object): object.__init__(self) self.MAX_SEGMENT_SIZE = options.get("max_segment_size", self.MAX_SEGMENT_SIZE) - k,n = options.get("needed_and_total_shares", - (self.NEEDED_SHARES, self.TOTAL_SHARES)) + k,happy,n = options.get("needed_and_happy_and_total_shares", + (self.NEEDED_SHARES, + self.SHARES_OF_HAPPINESS, + self.TOTAL_SHARES)) self.NEEDED_SHARES = k + self.SHARES_OF_HAPPINESS = happy self.TOTAL_SHARES = n self.thingA_data = {} @@ -91,6 +98,7 @@ class Encoder(object): self.num_shares = self.TOTAL_SHARES self.required_shares = self.NEEDED_SHARES + self.shares_of_happiness = self.SHARES_OF_HAPPINESS self.segment_size = min(self.MAX_SEGMENT_SIZE, self.file_size) # this must be a multiple of self.required_shares @@ -246,7 +254,7 @@ class Encoder(object): dl.append(d) subshare_hash = block_hash(subshare) self.subshare_hashes[shareid].append(subshare_hash) - dl = defer.DeferredList(dl) + dl = defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True) def _logit(res): log.msg("%s uploaded %s / %s bytes of your file." % (self, self.segment_size*(segnum+1), self.segment_size*self.num_segments)) return res @@ -257,7 +265,18 @@ class Encoder(object): if shareid not in self.landlords: return defer.succeed(None) sh = self.landlords[shareid] - return sh.callRemote("put_block", segment_num, subshare) + d = sh.callRemote("put_block", segment_num, subshare) + d.addErrback(self._remove_shareholder, shareid, + "segnum=%d" % segment_num) + return d + + def _remove_shareholder(self, why, shareid, where): + log.msg("error while sending %s to shareholder=%d: %s" % + (where, shareid, why)) # UNUSUAL + del self.landlords[shareid] + if len(self.landlords) < self.shares_of_happiness: + msg = "lost too many shareholders during upload" + raise NotEnoughPeersError(msg) def send_all_subshare_hash_trees(self): log.msg("%s sending subshare hash trees" % self) @@ -266,7 +285,8 @@ class Encoder(object): # hashes is a list of the hashes of all subshares that were sent # to shareholder[shareid]. dl.append(self.send_one_subshare_hash_tree(shareid, hashes)) - return defer.DeferredList(dl) + return defer.DeferredList(dl, fireOnOneErrback=True, + consumeErrors=True) def send_one_subshare_hash_tree(self, shareid, subshare_hashes): t = HashTree(subshare_hashes) @@ -278,7 +298,9 @@ class Encoder(object): if shareid not in self.landlords: return defer.succeed(None) sh = self.landlords[shareid] - return sh.callRemote("put_block_hashes", all_hashes) + d = sh.callRemote("put_block_hashes", all_hashes) + d.addErrback(self._remove_shareholder, shareid, "put_block_hashes") + return d def send_all_share_hash_trees(self): # each bucket gets a set of share hash tree nodes that are needed to @@ -300,37 +322,49 @@ class Encoder(object): needed_hash_indices = t.needed_hashes(i, include_leaf=True) hashes = [(hi, t[hi]) for hi in needed_hash_indices] dl.append(self.send_one_share_hash_tree(i, hashes)) - return defer.DeferredList(dl) + return defer.DeferredList(dl, fireOnOneErrback=True, + consumeErrors=True) def send_one_share_hash_tree(self, shareid, needed_hashes): if shareid not in self.landlords: return defer.succeed(None) sh = self.landlords[shareid] - return sh.callRemote("put_share_hashes", needed_hashes) + d = sh.callRemote("put_share_hashes", needed_hashes) + d.addErrback(self._remove_shareholder, shareid, "put_share_hashes") + return d def send_thingA_to_all_shareholders(self): log.msg("%s: sending thingA" % self) thingA = bencode.bencode(self.thingA_data) self.thingA_hash = thingA_hash(thingA) dl = [] - for sh in self.landlords.values(): - dl.append(self.send_thingA(sh, thingA)) - return defer.DeferredList(dl) + for shareid in self.landlords.keys(): + dl.append(self.send_thingA(shareid, thingA)) + return defer.DeferredList(dl, fireOnOneErrback=True, + consumeErrors=True) - def send_thingA(self, sh, thingA): - return sh.callRemote("put_thingA", thingA) + def send_thingA(self, shareid, thingA): + sh = self.landlords[shareid] + d = sh.callRemote("put_thingA", thingA) + d.addErrback(self._remove_shareholder, shareid, "put_thingA") + return d def close_all_shareholders(self): log.msg("%s: closing shareholders" % self) dl = [] for shareid in self.landlords: - dl.append(self.landlords[shareid].callRemote("close")) - return defer.DeferredList(dl) + d = self.landlords[shareid].callRemote("close") + d.addErrback(self._remove_shareholder, shareid, "close") + dl.append(d) + return defer.DeferredList(dl, fireOnOneErrback=True, + consumeErrors=True) def done(self): log.msg("%s: upload done" % self) return self.thingA_hash def err(self, f): - log.msg("%s: upload failed: %s" % (self, f)) + log.msg("%s: upload failed: %s" % (self, f)) # UNUSUAL + if f.check(defer.FirstError): + return f.value.subFailure return f diff --git a/src/allmydata/test/test_encode.py b/src/allmydata/test/test_encode.py index 7c69d135..d8f58087 100644 --- a/src/allmydata/test/test_encode.py +++ b/src/allmydata/test/test_encode.py @@ -41,6 +41,9 @@ class FakeStorageServer: else: return (set(), dict([(shnum, FakeBucketWriter(),) for shnum in sharenums]),) +class LostPeerError(Exception): + pass + class FakeBucketWriter: # these are used for both reading and writing def __init__(self, mode="good"): @@ -59,8 +62,10 @@ class FakeBucketWriter: def put_block(self, segmentnum, data): assert not self.closed assert segmentnum not in self.blocks + if self.mode == "lost" and segmentnum >= 1: + raise LostPeerError("I'm going away now") self.blocks[segmentnum] = data - + def put_block_hashes(self, blockhashes): assert not self.closed assert self.block_hashes is None @@ -215,18 +220,19 @@ class Encode(unittest.TestCase): return self.do_encode(25, 101, 100, 5, 15, 8) class Roundtrip(unittest.TestCase): - def send_and_recover(self, k_and_n=(25,100), + def send_and_recover(self, k_and_happy_and_n=(25,75,100), AVAILABLE_SHARES=None, datalen=76, max_segment_size=25, - bucket_modes={}): - NUM_SHARES = k_and_n[1] + bucket_modes={}, + ): + NUM_SHARES = k_and_happy_and_n[2] if AVAILABLE_SHARES is None: AVAILABLE_SHARES = NUM_SHARES data = make_data(datalen) # force use of multiple segments options = {"max_segment_size": max_segment_size, - "needed_and_total_shares": k_and_n} + "needed_and_happy_and_total_shares": k_and_happy_and_n} e = encode.Encoder(options) nonkey = "\x00" * 16 e.setup(StringIO(data), nonkey) @@ -275,7 +281,8 @@ class Roundtrip(unittest.TestCase): fd._got_thingA(thingA_data) for shnum in range(AVAILABLE_SHARES): bucket = all_shareholders[shnum] - fd.add_share_bucket(shnum, bucket) + if bucket.closed: + fd.add_share_bucket(shnum, bucket) fd._got_all_shareholders(None) fd._create_validated_buckets(None) d2 = fd._download_all_segments(None) @@ -289,7 +296,7 @@ class Roundtrip(unittest.TestCase): return d def test_not_enough_shares(self): - d = self.send_and_recover((4,10), AVAILABLE_SHARES=2) + d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2) def _done(res): self.failUnless(isinstance(res, Failure)) self.failUnless(res.check(download.NotEnoughPeersError)) @@ -329,7 +336,7 @@ class Roundtrip(unittest.TestCase): for i in range(6)] + [(i, "good") for i in range(6, 10)]) - return self.send_and_recover((4,10), bucket_modes=modemap) + return self.send_and_recover((4,8,10), bucket_modes=modemap) def test_bad_blocks_failure(self): # the first 7 servers have bad blocks, which will be caught by the @@ -338,7 +345,7 @@ class Roundtrip(unittest.TestCase): for i in range(7)] + [(i, "good") for i in range(7, 10)]) - d = self.send_and_recover((4,10), bucket_modes=modemap) + d = self.send_and_recover((4,8,10), bucket_modes=modemap) def _done(res): self.failUnless(isinstance(res, Failure)) self.failUnless(res.check(download.NotEnoughPeersError)) @@ -352,7 +359,7 @@ class Roundtrip(unittest.TestCase): for i in range(6)] + [(i, "good") for i in range(6, 10)]) - return self.send_and_recover((4,10), bucket_modes=modemap) + return self.send_and_recover((4,8,10), bucket_modes=modemap) def test_bad_blockhashes_failure(self): # the first 7 servers have bad block hashes, so the blockhash tree @@ -361,7 +368,7 @@ class Roundtrip(unittest.TestCase): for i in range(7)] + [(i, "good") for i in range(7, 10)]) - d = self.send_and_recover((4,10), bucket_modes=modemap) + d = self.send_and_recover((4,8,10), bucket_modes=modemap) def _done(res): self.failUnless(isinstance(res, Failure)) self.failUnless(res.check(download.NotEnoughPeersError)) @@ -375,7 +382,7 @@ class Roundtrip(unittest.TestCase): for i in range(6)] + [(i, "good") for i in range(6, 10)]) - return self.send_and_recover((4,10), bucket_modes=modemap) + return self.send_and_recover((4,8,10), bucket_modes=modemap) def test_bad_sharehashes_failure(self): # the first 7 servers have bad block hashes, so the sharehash tree @@ -384,7 +391,7 @@ class Roundtrip(unittest.TestCase): for i in range(7)] + [(i, "good") for i in range(7, 10)]) - d = self.send_and_recover((4,10), bucket_modes=modemap) + d = self.send_and_recover((4,8,10), bucket_modes=modemap) def _done(res): self.failUnless(isinstance(res, Failure)) self.failUnless(res.check(download.NotEnoughPeersError)) @@ -398,7 +405,7 @@ class Roundtrip(unittest.TestCase): for i in range(6)] + [(i, "good") for i in range(6, 10)]) - return self.send_and_recover((4,10), bucket_modes=modemap) + return self.send_and_recover((4,8,10), bucket_modes=modemap) def test_missing_sharehashes_failure(self): # the first 7 servers are missing their sharehashes, so the @@ -407,10 +414,41 @@ class Roundtrip(unittest.TestCase): for i in range(7)] + [(i, "good") for i in range(7, 10)]) - d = self.send_and_recover((4,10), bucket_modes=modemap) + d = self.send_and_recover((4,8,10), bucket_modes=modemap) def _done(res): self.failUnless(isinstance(res, Failure)) self.failUnless(res.check(download.NotEnoughPeersError)) d.addBoth(_done) return d + def test_lost_one_shareholder(self): + # we have enough shareholders when we start, but one segment in we + # lose one of them. The upload should still succeed, as long as we + # still have 'shares_of_happiness' peers left. + modemap = dict([(i, "good") for i in range(9)] + + [(i, "lost") for i in range(9, 10)]) + return self.send_and_recover((4,8,10), bucket_modes=modemap) + + def test_lost_many_shareholders(self): + # we have enough shareholders when we start, but one segment in we + # lose all but one of them. The upload should fail. + modemap = dict([(i, "good") for i in range(1)] + + [(i, "lost") for i in range(1, 10)]) + d = self.send_and_recover((4,8,10), bucket_modes=modemap) + def _done(res): + self.failUnless(isinstance(res, Failure)) + self.failUnless(res.check(encode.NotEnoughPeersError)) + d.addBoth(_done) + return d + + def test_lost_all_shareholders(self): + # we have enough shareholders when we start, but one segment in we + # lose all of them. The upload should fail. + modemap = dict([(i, "lost") for i in range(10)]) + d = self.send_and_recover((4,8,10), bucket_modes=modemap) + def _done(res): + self.failUnless(isinstance(res, Failure)) + self.failUnless(res.check(encode.NotEnoughPeersError)) + d.addBoth(_done) + return d + -- 2.45.2