From: Zooko O'Whielacronx Date: Mon, 1 Feb 2010 06:16:10 +0000 (-0800) Subject: immutable: downloader accepts notifications of buckets even if those notifications... X-Git-Tag: allmydata-tahoe-1.6.0~11 X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/simplejson/frontends/?a=commitdiff_plain;h=3e4342ecb362589956c03867bb97a1f368976b08;p=tahoe-lafs%2Ftahoe-lafs.git immutable: downloader accepts notifications of buckets even if those notifications arrive after he has begun downloading shares. This can be useful if one of the ones that he has already begun downloading fails. See #287 for discussion. This fixes part of #287 which part was a regression caused by #928, namely this fixes fail-over in case a share is corrupted (or the server returns an error or disconnects). This does not fix the related issue mentioned in #287 if a server hangs and doesn't reply to requests for blocks. --- diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py index b410fc59..ba59afd7 100644 --- a/src/allmydata/immutable/download.py +++ b/src/allmydata/immutable/download.py @@ -789,7 +789,10 @@ class CiphertextDownloader(log.PrefixingLogMixin): self.active_buckets = {} # k: shnum, v: bucket self._share_buckets = {} # k: sharenum, v: list of buckets - self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets + + # _download_all_segments() will set this to: + # self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets + self._share_vbuckets = None self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, } @@ -809,6 +812,10 @@ class CiphertextDownloader(log.PrefixingLogMixin): # self._responses_received = 0 # self._queries_failed = 0 + # This is solely for the use of unit tests. It will be triggered when + # we start downloading shares. + self._stage_4_d = defer.Deferred() + def pauseProducing(self): if self._paused: return @@ -938,6 +945,10 @@ class CiphertextDownloader(log.PrefixingLogMixin): reactor.callLater(0, self._wait_for_enough_buckets_d.callback, True) self._wait_for_enough_buckets_d = None + if self._share_vbuckets is not None: + vbucket = ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size) + self._share_vbuckets.setdefault(sharenum, set()).add(vbucket) + if self._results: if peerid not in self._results.servermap: self._results.servermap[peerid] = set() @@ -1088,6 +1099,13 @@ class CiphertextDownloader(log.PrefixingLogMixin): def _download_all_segments(self, res): + # From now on if new buckets are received then I will notice that + # self._share_vbuckets is not None and generate a vbucket for that new + # bucket and add it in to _share_vbuckets. (We had to wait because we + # didn't have self._vup and self._share_hash_tree earlier. We didn't + # need validated buckets until now -- now that we are ready to download + # shares.) + self._share_vbuckets = {} for sharenum, buckets in self._share_buckets.iteritems(): for bucket in buckets: vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size) @@ -1109,6 +1127,8 @@ class CiphertextDownloader(log.PrefixingLogMixin): # this pause, at the end of write, prevents pre-fetch from # happening until the consumer is ready for more data. d.addCallback(self._check_for_pause) + + self._stage_4_d.callback(None) return d def _check_for_pause(self, res): diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py index 714653f8..1487576f 100644 --- a/src/allmydata/test/no_network.py +++ b/src/allmydata/test/no_network.py @@ -252,12 +252,21 @@ class NoNetworkGrid(service.MultiService): def break_server(self, serverid): # mark the given server as broken, so it will throw exceptions when - # asked to hold a share + # asked to hold a share or serve a share self.servers_by_id[serverid].broken = True - def hang_server(self, serverid, until=defer.Deferred()): - # hang the given server until 'until' fires - self.servers_by_id[serverid].hung_until = until + def hang_server(self, serverid): + # hang the given server + ss = self.servers_by_id[serverid] + assert ss.hung_until is None + ss.hung_until = defer.Deferred() + + def unhang_server(self, serverid): + # unhang the given server + ss = self.servers_by_id[serverid] + assert ss.hung_until is not None + ss.hung_until.callback(None) + ss.hung_until = None class GridTestMixin: diff --git a/src/allmydata/test/test_hung_server.py b/src/allmydata/test/test_hung_server.py index d0ba9956..56b96d94 100644 --- a/src/allmydata/test/test_hung_server.py +++ b/src/allmydata/test/test_hung_server.py @@ -1,14 +1,14 @@ import os, shutil from twisted.trial import unittest -from twisted.internet import defer, reactor +from twisted.internet import defer from allmydata import uri from allmydata.util.consumer import download_to_data from allmydata.immutable import upload from allmydata.mutable.common import UnrecoverableFileError from allmydata.storage.common import storage_index_to_dir from allmydata.test.no_network import GridTestMixin -from allmydata.test.common import ShouldFailMixin +from allmydata.test.common import ShouldFailMixin, _corrupt_share_data from allmydata.interfaces import NotEnoughSharesError immutable_plaintext = "data" * 10000 @@ -25,12 +25,22 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase): for (id, ss) in servers: self.g.hang_server(id, **kwargs) + def _unhang(self, servers, **kwargs): + for (id, ss) in servers: + self.g.unhang_server(id, **kwargs) + def _delete_all_shares_from(self, servers): serverids = [id for (id, ss) in servers] for (i_shnum, i_serverid, i_sharefile) in self.shares: if i_serverid in serverids: os.unlink(i_sharefile) + def _corrupt_all_shares_in(self, servers, corruptor_func): + serverids = [id for (id, ss) in servers] + for (i_shnum, i_serverid, i_sharefile) in self.shares: + if i_serverid in serverids: + self._corrupt_share((i_shnum, i_sharefile), corruptor_func) + def _copy_all_shares_from(self, from_servers, to_server): serverids = [id for (id, ss) in from_servers] for (i_shnum, i_serverid, i_sharefile) in self.shares: @@ -52,6 +62,15 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase): self.failUnless((sharenum, ss.original.my_nodeid, new_sharefile) in self.shares) + def _corrupt_share(self, share, corruptor_func): + (sharenum, sharefile) = share + data = open(sharefile, "rb").read() + newdata = corruptor_func(data) + os.unlink(sharefile) + wf = open(sharefile, "wb") + wf.write(newdata) + wf.close() + def _set_up(self, mutable, testdir, num_clients=1, num_servers=10): self.mutable = mutable if mutable: @@ -80,35 +99,50 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase): d.addCallback(_uploaded_immutable) return d - def _check_download(self): + def _start_download(self): n = self.c0.create_node_from_uri(self.uri) if self.mutable: d = n.download_best_version() - expected_plaintext = mutable_plaintext + stage_4_d = None # currently we aren't doing any tests which require this for mutable files + else: + d = download_to_data(n) + stage_4_d = n._downloader._all_downloads.keys()[0]._stage_4_d # too ugly! FIXME + return (d, stage_4_d,) + + def _wait_for_data(self, n): + if self.mutable: + d = n.download_best_version() else: d = download_to_data(n) - expected_plaintext = immutable_plaintext - def _got_data(data): - self.failUnlessEqual(data, expected_plaintext) - d.addCallback(_got_data) + return d + + def _check(self, resultingdata): + if self.mutable: + self.failUnlessEqual(resultingdata, mutable_plaintext) + else: + self.failUnlessEqual(resultingdata, immutable_plaintext) + + def _download_and_check(self): + d, stage4d = self._start_download() + d.addCallback(self._check) return d def _should_fail_download(self): if self.mutable: return self.shouldFail(UnrecoverableFileError, self.basedir, "no recoverable versions", - self._check_download) + self._download_and_check) else: return self.shouldFail(NotEnoughSharesError, self.basedir, "Failed to get enough shareholders", - self._check_download) + self._download_and_check) def test_10_good_sanity_check(self): d = defer.succeed(None) for mutable in [False, True]: d.addCallback(lambda ign: self._set_up(mutable, "test_10_good_sanity_check")) - d.addCallback(lambda ign: self._check_download()) + d.addCallback(lambda ign: self._download_and_check()) return d def test_10_good_copied_share(self): @@ -116,7 +150,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase): for mutable in [False, True]: d.addCallback(lambda ign: self._set_up(mutable, "test_10_good_copied_share")) d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0])) - d.addCallback(lambda ign: self._check_download()) + d.addCallback(lambda ign: self._download_and_check()) return d def test_3_good_7_noshares(self): @@ -124,7 +158,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase): for mutable in [False, True]: d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_noshares")) d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[3:])) - d.addCallback(lambda ign: self._check_download()) + d.addCallback(lambda ign: self._download_and_check()) return d def test_2_good_8_broken_fail(self): @@ -149,7 +183,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase): d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_broken_copied_share")) d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0])) d.addCallback(lambda ign: self._break(self.servers[2:])) - d.addCallback(lambda ign: self._check_download()) + d.addCallback(lambda ign: self._download_and_check()) return d def test_2_good_8_broken_duplicate_share_fail(self): @@ -168,28 +202,43 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase): for mutable in [False]: d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_hung")) d.addCallback(lambda ign: self._hang(self.servers[3:])) - d.addCallback(lambda ign: self._check_download()) + d.addCallback(lambda ign: self._download_and_check()) return d def test_2_good_8_hung_then_1_recovers(self): d = defer.succeed(None) for mutable in [False]: - recovered = defer.Deferred() d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers")) - d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered)) + d.addCallback(lambda ign: self._hang(self.servers[2:3])) d.addCallback(lambda ign: self._hang(self.servers[3:])) - d.addCallback(lambda ign: reactor.callLater(5, recovered.callback, None)) - d.addCallback(lambda ign: self._check_download()) + d.addCallback(lambda ign: self._unhang(self.servers[2:3])) + d.addCallback(lambda ign: self._download_and_check()) return d def test_2_good_8_hung_then_1_recovers_with_2_shares(self): d = defer.succeed(None) for mutable in [False]: - recovered = defer.Deferred() d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers_with_2_shares")) d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2])) - d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered)) + d.addCallback(lambda ign: self._hang(self.servers[2:3])) d.addCallback(lambda ign: self._hang(self.servers[3:])) - d.addCallback(lambda ign: reactor.callLater(5, recovered.callback, None)) - d.addCallback(lambda ign: self._check_download()) + d.addCallback(lambda ign: self._unhang(self.servers[2:3])) + d.addCallback(lambda ign: self._download_and_check()) + return d + + def test_failover_during_stage_4(self): + # See #287 + d = defer.succeed(None) + for mutable in [False]: + d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4")) + d.addCallback(lambda ign: self._corrupt_all_shares_in(self.servers[2:3], _corrupt_share_data)) + d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4")) + d.addCallback(lambda ign: self._hang(self.servers[3:])) + d.addCallback(lambda ign: self._start_download()) + def _after_starting_download((doned, started4d)): + started4d.addCallback(lambda ign: self._unhang(self.servers[3:4])) + doned.addCallback(self._check) + return doned + d.addCallback(_after_starting_download) + return d