encode: tolerate lost peers, as long as we still get enough shares out. Closes #17.
authorBrian Warner <warner@allmydata.com>
Wed, 6 Jun 2007 17:32:40 +0000 (10:32 -0700)
committerBrian Warner <warner@allmydata.com>
Wed, 6 Jun 2007 17:32:40 +0000 (10:32 -0700)
src/allmydata/encode.py
src/allmydata/test/test_encode.py

index 5581b5fc12c7d30b8e561cdf481f9399f8caa2c8..57b2c3e965994707bbe1ac420b4d534d6a15c44c 100644 (file)
@@ -58,6 +58,9 @@ hash tree is put into the URI.
 
 """
 
+class NotEnoughPeersError(Exception):
+    pass
+
 KiB=1024
 MiB=1024*KiB
 GiB=1024*MiB
@@ -67,6 +70,7 @@ PiB=1024*TiB
 class Encoder(object):
     implements(IEncoder)
     NEEDED_SHARES = 25
+    SHARES_OF_HAPPINESS = 75
     TOTAL_SHARES = 100
     MAX_SEGMENT_SIZE = 2*MiB
 
@@ -74,9 +78,12 @@ class Encoder(object):
         object.__init__(self)
         self.MAX_SEGMENT_SIZE = options.get("max_segment_size",
                                             self.MAX_SEGMENT_SIZE)
-        k,n = options.get("needed_and_total_shares",
-                          (self.NEEDED_SHARES, self.TOTAL_SHARES))
+        k,happy,n = options.get("needed_and_happy_and_total_shares",
+                                (self.NEEDED_SHARES,
+                                 self.SHARES_OF_HAPPINESS,
+                                 self.TOTAL_SHARES))
         self.NEEDED_SHARES = k
+        self.SHARES_OF_HAPPINESS = happy
         self.TOTAL_SHARES = n
         self.thingA_data = {}
 
@@ -91,6 +98,7 @@ class Encoder(object):
 
         self.num_shares = self.TOTAL_SHARES
         self.required_shares = self.NEEDED_SHARES
+        self.shares_of_happiness = self.SHARES_OF_HAPPINESS
 
         self.segment_size = min(self.MAX_SEGMENT_SIZE, self.file_size)
         # this must be a multiple of self.required_shares
@@ -246,7 +254,7 @@ class Encoder(object):
             dl.append(d)
             subshare_hash = block_hash(subshare)
             self.subshare_hashes[shareid].append(subshare_hash)
-        dl = defer.DeferredList(dl)
+        dl = defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
         def _logit(res):
             log.msg("%s uploaded %s / %s bytes of your file." % (self, self.segment_size*(segnum+1), self.segment_size*self.num_segments))
             return res
@@ -257,7 +265,18 @@ class Encoder(object):
         if shareid not in self.landlords:
             return defer.succeed(None)
         sh = self.landlords[shareid]
-        return sh.callRemote("put_block", segment_num, subshare)
+        d = sh.callRemote("put_block", segment_num, subshare)
+        d.addErrback(self._remove_shareholder, shareid,
+                     "segnum=%d" % segment_num)
+        return d
+
+    def _remove_shareholder(self, why, shareid, where):
+        log.msg("error while sending %s to shareholder=%d: %s" %
+                (where, shareid, why)) # UNUSUAL
+        del self.landlords[shareid]
+        if len(self.landlords) < self.shares_of_happiness:
+            msg = "lost too many shareholders during upload"
+            raise NotEnoughPeersError(msg)
 
     def send_all_subshare_hash_trees(self):
         log.msg("%s sending subshare hash trees" % self)
@@ -266,7 +285,8 @@ class Encoder(object):
             # hashes is a list of the hashes of all subshares that were sent
             # to shareholder[shareid].
             dl.append(self.send_one_subshare_hash_tree(shareid, hashes))
-        return defer.DeferredList(dl)
+        return defer.DeferredList(dl, fireOnOneErrback=True,
+                                  consumeErrors=True)
 
     def send_one_subshare_hash_tree(self, shareid, subshare_hashes):
         t = HashTree(subshare_hashes)
@@ -278,7 +298,9 @@ class Encoder(object):
         if shareid not in self.landlords:
             return defer.succeed(None)
         sh = self.landlords[shareid]
-        return sh.callRemote("put_block_hashes", all_hashes)
+        d = sh.callRemote("put_block_hashes", all_hashes)
+        d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
+        return d
 
     def send_all_share_hash_trees(self):
         # each bucket gets a set of share hash tree nodes that are needed to
@@ -300,37 +322,49 @@ class Encoder(object):
             needed_hash_indices = t.needed_hashes(i, include_leaf=True)
             hashes = [(hi, t[hi]) for hi in needed_hash_indices]
             dl.append(self.send_one_share_hash_tree(i, hashes))
-        return defer.DeferredList(dl)
+        return defer.DeferredList(dl, fireOnOneErrback=True,
+                                  consumeErrors=True)
 
     def send_one_share_hash_tree(self, shareid, needed_hashes):
         if shareid not in self.landlords:
             return defer.succeed(None)
         sh = self.landlords[shareid]
-        return sh.callRemote("put_share_hashes", needed_hashes)
+        d = sh.callRemote("put_share_hashes", needed_hashes)
+        d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
+        return d
 
     def send_thingA_to_all_shareholders(self):
         log.msg("%s: sending thingA" % self)
         thingA = bencode.bencode(self.thingA_data)
         self.thingA_hash = thingA_hash(thingA)
         dl = []
-        for sh in self.landlords.values():
-            dl.append(self.send_thingA(sh, thingA))
-        return defer.DeferredList(dl)
+        for shareid in self.landlords.keys():
+            dl.append(self.send_thingA(shareid, thingA))
+        return defer.DeferredList(dl, fireOnOneErrback=True,
+                                  consumeErrors=True)
 
-    def send_thingA(self, sh, thingA):
-        return sh.callRemote("put_thingA", thingA)
+    def send_thingA(self, shareid, thingA):
+        sh = self.landlords[shareid]
+        d = sh.callRemote("put_thingA", thingA)
+        d.addErrback(self._remove_shareholder, shareid, "put_thingA")
+        return d
 
     def close_all_shareholders(self):
         log.msg("%s: closing shareholders" % self)
         dl = []
         for shareid in self.landlords:
-            dl.append(self.landlords[shareid].callRemote("close"))
-        return defer.DeferredList(dl)
+            d = self.landlords[shareid].callRemote("close")
+            d.addErrback(self._remove_shareholder, shareid, "close")
+            dl.append(d)
+        return defer.DeferredList(dl, fireOnOneErrback=True,
+                                  consumeErrors=True)
 
     def done(self):
         log.msg("%s: upload done" % self)
         return self.thingA_hash
 
     def err(self, f):
-        log.msg("%s: upload failed: %s" % (self, f))
+        log.msg("%s: upload failed: %s" % (self, f)) # UNUSUAL
+        if f.check(defer.FirstError):
+            return f.value.subFailure
         return f
index 7c69d135ca7531e0b7354be32fac481cf4296cff..d8f58087b4f85fd22deec6d1bffb13071e5c9fcb 100644 (file)
@@ -41,6 +41,9 @@ class FakeStorageServer:
         else:
             return (set(), dict([(shnum, FakeBucketWriter(),) for shnum in sharenums]),)
 
+class LostPeerError(Exception):
+    pass
+
 class FakeBucketWriter:
     # these are used for both reading and writing
     def __init__(self, mode="good"):
@@ -59,8 +62,10 @@ class FakeBucketWriter:
     def put_block(self, segmentnum, data):
         assert not self.closed
         assert segmentnum not in self.blocks
+        if self.mode == "lost" and segmentnum >= 1:
+            raise LostPeerError("I'm going away now")
         self.blocks[segmentnum] = data
-    
+
     def put_block_hashes(self, blockhashes):
         assert not self.closed
         assert self.block_hashes is None
@@ -215,18 +220,19 @@ class Encode(unittest.TestCase):
         return self.do_encode(25, 101, 100, 5, 15, 8)
 
 class Roundtrip(unittest.TestCase):
-    def send_and_recover(self, k_and_n=(25,100),
+    def send_and_recover(self, k_and_happy_and_n=(25,75,100),
                          AVAILABLE_SHARES=None,
                          datalen=76,
                          max_segment_size=25,
-                         bucket_modes={}):
-        NUM_SHARES = k_and_n[1]
+                         bucket_modes={},
+                         ):
+        NUM_SHARES = k_and_happy_and_n[2]
         if AVAILABLE_SHARES is None:
             AVAILABLE_SHARES = NUM_SHARES
         data = make_data(datalen)
         # force use of multiple segments
         options = {"max_segment_size": max_segment_size,
-                   "needed_and_total_shares": k_and_n}
+                   "needed_and_happy_and_total_shares": k_and_happy_and_n}
         e = encode.Encoder(options)
         nonkey = "\x00" * 16
         e.setup(StringIO(data), nonkey)
@@ -275,7 +281,8 @@ class Roundtrip(unittest.TestCase):
             fd._got_thingA(thingA_data)
             for shnum in range(AVAILABLE_SHARES):
                 bucket = all_shareholders[shnum]
-                fd.add_share_bucket(shnum, bucket)
+                if bucket.closed:
+                    fd.add_share_bucket(shnum, bucket)
             fd._got_all_shareholders(None)
             fd._create_validated_buckets(None)
             d2 = fd._download_all_segments(None)
@@ -289,7 +296,7 @@ class Roundtrip(unittest.TestCase):
         return d
 
     def test_not_enough_shares(self):
-        d = self.send_and_recover((4,10), AVAILABLE_SHARES=2)
+        d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2)
         def _done(res):
             self.failUnless(isinstance(res, Failure))
             self.failUnless(res.check(download.NotEnoughPeersError))
@@ -329,7 +336,7 @@ class Roundtrip(unittest.TestCase):
                         for i in range(6)]
                        + [(i, "good")
                           for i in range(6, 10)])
-        return self.send_and_recover((4,10), bucket_modes=modemap)
+        return self.send_and_recover((4,8,10), bucket_modes=modemap)
 
     def test_bad_blocks_failure(self):
         # the first 7 servers have bad blocks, which will be caught by the
@@ -338,7 +345,7 @@ class Roundtrip(unittest.TestCase):
                         for i in range(7)]
                        + [(i, "good")
                           for i in range(7, 10)])
-        d = self.send_and_recover((4,10), bucket_modes=modemap)
+        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
         def _done(res):
             self.failUnless(isinstance(res, Failure))
             self.failUnless(res.check(download.NotEnoughPeersError))
@@ -352,7 +359,7 @@ class Roundtrip(unittest.TestCase):
                         for i in range(6)]
                        + [(i, "good")
                           for i in range(6, 10)])
-        return self.send_and_recover((4,10), bucket_modes=modemap)
+        return self.send_and_recover((4,8,10), bucket_modes=modemap)
 
     def test_bad_blockhashes_failure(self):
         # the first 7 servers have bad block hashes, so the blockhash tree
@@ -361,7 +368,7 @@ class Roundtrip(unittest.TestCase):
                         for i in range(7)]
                        + [(i, "good")
                           for i in range(7, 10)])
-        d = self.send_and_recover((4,10), bucket_modes=modemap)
+        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
         def _done(res):
             self.failUnless(isinstance(res, Failure))
             self.failUnless(res.check(download.NotEnoughPeersError))
@@ -375,7 +382,7 @@ class Roundtrip(unittest.TestCase):
                         for i in range(6)]
                        + [(i, "good")
                           for i in range(6, 10)])
-        return self.send_and_recover((4,10), bucket_modes=modemap)
+        return self.send_and_recover((4,8,10), bucket_modes=modemap)
 
     def test_bad_sharehashes_failure(self):
         # the first 7 servers have bad block hashes, so the sharehash tree
@@ -384,7 +391,7 @@ class Roundtrip(unittest.TestCase):
                         for i in range(7)]
                        + [(i, "good")
                           for i in range(7, 10)])
-        d = self.send_and_recover((4,10), bucket_modes=modemap)
+        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
         def _done(res):
             self.failUnless(isinstance(res, Failure))
             self.failUnless(res.check(download.NotEnoughPeersError))
@@ -398,7 +405,7 @@ class Roundtrip(unittest.TestCase):
                         for i in range(6)]
                        + [(i, "good")
                           for i in range(6, 10)])
-        return self.send_and_recover((4,10), bucket_modes=modemap)
+        return self.send_and_recover((4,8,10), bucket_modes=modemap)
 
     def test_missing_sharehashes_failure(self):
         # the first 7 servers are missing their sharehashes, so the
@@ -407,10 +414,41 @@ class Roundtrip(unittest.TestCase):
                         for i in range(7)]
                        + [(i, "good")
                           for i in range(7, 10)])
-        d = self.send_and_recover((4,10), bucket_modes=modemap)
+        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
         def _done(res):
             self.failUnless(isinstance(res, Failure))
             self.failUnless(res.check(download.NotEnoughPeersError))
         d.addBoth(_done)
         return d
 
+    def test_lost_one_shareholder(self):
+        # we have enough shareholders when we start, but one segment in we
+        # lose one of them. The upload should still succeed, as long as we
+        # still have 'shares_of_happiness' peers left.
+        modemap = dict([(i, "good") for i in range(9)] +
+                       [(i, "lost") for i in range(9, 10)])
+        return self.send_and_recover((4,8,10), bucket_modes=modemap)
+
+    def test_lost_many_shareholders(self):
+        # we have enough shareholders when we start, but one segment in we
+        # lose all but one of them. The upload should fail.
+        modemap = dict([(i, "good") for i in range(1)] +
+                       [(i, "lost") for i in range(1, 10)])
+        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
+        def _done(res):
+            self.failUnless(isinstance(res, Failure))
+            self.failUnless(res.check(encode.NotEnoughPeersError))
+        d.addBoth(_done)
+        return d
+
+    def test_lost_all_shareholders(self):
+        # we have enough shareholders when we start, but one segment in we
+        # lose all of them. The upload should fail.
+        modemap = dict([(i, "lost") for i in range(10)])
+        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
+        def _done(res):
+            self.failUnless(isinstance(res, Failure))
+            self.failUnless(res.check(encode.NotEnoughPeersError))
+        d.addBoth(_done)
+        return d
+