]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
encode.py: don't allow a shareholder which dies in start() to kill the whole upload
authorBrian Warner <warner@allmydata.com>
Mon, 28 Jan 2008 19:14:48 +0000 (12:14 -0700)
committerBrian Warner <warner@allmydata.com>
Mon, 28 Jan 2008 19:14:48 +0000 (12:14 -0700)
src/allmydata/encode.py
src/allmydata/test/test_encode.py

index a2878a41e80d38f3747903c44649f07adad2c6d9..a0b35c948957098ed2d22b32e75e0b3327213b31 100644 (file)
@@ -209,8 +209,7 @@ class Encoder(object):
 
         d = eventual.fireEventually()
 
-        for l in self.landlords.values():
-            d.addCallback(lambda res, l=l: l.start())
+        d.addCallback(lambda res: self.start_all_shareholders())
 
         for i in range(self.num_segments-1):
             # note to self: this form doesn't work, because lambda only
@@ -258,6 +257,16 @@ class Encoder(object):
 
         return eventual.fireEventually(res)
 
+
+    def start_all_shareholders(self):
+        self.log("starting shareholders", level=log.NOISY)
+        dl = []
+        for shareid in self.landlords:
+            d = self.landlords[shareid].start()
+            d.addErrback(self._remove_shareholder, shareid, "start")
+            dl.append(d)
+        return self._gather_responses(dl)
+
     def _encode_segment(self, segnum):
         codec = self._codec
 
index 756034a1895a790777e2c3a5ff8c4b33c211d692..5fec02ada3586272cfebd6f810d36ea1c9005331 100644 (file)
@@ -3,6 +3,7 @@ from zope.interface import implements
 from twisted.trial import unittest
 from twisted.internet import defer
 from twisted.python.failure import Failure
+from foolscap import eventual
 from allmydata import encode, upload, download, hashtree, uri
 from allmydata.util import hashutil
 from allmydata.util.assertutil import _assert
@@ -33,9 +34,15 @@ class FakeBucketWriterProxy:
     def startIfNecessary(self):
         return defer.succeed(self)
     def start(self):
+        if self.mode == "lost-early":
+            f = Failure(LostPeerError("I went away early"))
+            return eventual.fireEventually(f)
         return defer.succeed(self)
 
     def put_block(self, segmentnum, data):
+        if self.mode == "lost-early":
+            f = Failure(LostPeerError("I went away early"))
+            return eventual.fireEventually(f)
         def _try():
             assert not self.closed
             assert segmentnum not in self.blocks
@@ -618,6 +625,15 @@ class Roundtrip(unittest.TestCase):
                        [(i, "lost") for i in range(9, 10)])
         return self.send_and_recover((4,8,10), bucket_modes=modemap)
 
+    def test_lost_one_shareholder_early(self):
+        # we have enough shareholders when we choose peers, but just before
+        # we send the 'start' message, we lose one of them. The upload should
+        # still succeed, as long as we still have 'shares_of_happiness' peers
+        # left.
+        modemap = dict([(i, "good") for i in range(9)] +
+                       [(i, "lost-early") for i in range(9, 10)])
+        return self.send_and_recover((4,8,10), bucket_modes=modemap)
+
     def test_lost_many_shareholders(self):
         # we have enough shareholders when we start, but one segment in we
         # lose all but one of them. The upload should fail.