From: Brian Warner Date: Mon, 28 Jan 2008 19:14:48 +0000 (-0700) Subject: encode.py: don't allow a shareholder which dies in start() to kill the whole upload X-Git-Tag: allmydata-tahoe-0.8.0~223 X-Git-Url: https://git.rkrishnan.org/uri/URI:DIR2:%5B%5E?a=commitdiff_plain;h=8f1212edac1d6aaf52840ff32da2785172d36f79;p=tahoe-lafs%2Ftahoe-lafs.git encode.py: don't allow a shareholder which dies in start() to kill the whole upload --- diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index a2878a41..a0b35c94 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -209,8 +209,7 @@ class Encoder(object): d = eventual.fireEventually() - for l in self.landlords.values(): - d.addCallback(lambda res, l=l: l.start()) + d.addCallback(lambda res: self.start_all_shareholders()) for i in range(self.num_segments-1): # note to self: this form doesn't work, because lambda only @@ -258,6 +257,16 @@ class Encoder(object): return eventual.fireEventually(res) + + def start_all_shareholders(self): + self.log("starting shareholders", level=log.NOISY) + dl = [] + for shareid in self.landlords: + d = self.landlords[shareid].start() + d.addErrback(self._remove_shareholder, shareid, "start") + dl.append(d) + return self._gather_responses(dl) + def _encode_segment(self, segnum): codec = self._codec diff --git a/src/allmydata/test/test_encode.py b/src/allmydata/test/test_encode.py index 756034a1..5fec02ad 100644 --- a/src/allmydata/test/test_encode.py +++ b/src/allmydata/test/test_encode.py @@ -3,6 +3,7 @@ from zope.interface import implements from twisted.trial import unittest from twisted.internet import defer from twisted.python.failure import Failure +from foolscap import eventual from allmydata import encode, upload, download, hashtree, uri from allmydata.util import hashutil from allmydata.util.assertutil import _assert @@ -33,9 +34,15 @@ class FakeBucketWriterProxy: def startIfNecessary(self): return defer.succeed(self) def start(self): + if self.mode == "lost-early": + f = Failure(LostPeerError("I went away early")) + return eventual.fireEventually(f) return defer.succeed(self) def put_block(self, segmentnum, data): + if self.mode == "lost-early": + f = Failure(LostPeerError("I went away early")) + return eventual.fireEventually(f) def _try(): assert not self.closed assert segmentnum not in self.blocks @@ -618,6 +625,15 @@ class Roundtrip(unittest.TestCase): [(i, "lost") for i in range(9, 10)]) return self.send_and_recover((4,8,10), bucket_modes=modemap) + def test_lost_one_shareholder_early(self): + # we have enough shareholders when we choose peers, but just before + # we send the 'start' message, we lose one of them. The upload should + # still succeed, as long as we still have 'shares_of_happiness' peers + # left. + modemap = dict([(i, "good") for i in range(9)] + + [(i, "lost-early") for i in range(9, 10)]) + return self.send_and_recover((4,8,10), bucket_modes=modemap) + def test_lost_many_shareholders(self): # we have enough shareholders when we start, but one segment in we # lose all but one of them. The upload should fail.