self.active_buckets = {} # k: shnum, v: bucket
self._share_buckets = {} # k: sharenum, v: list of buckets
- self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
+
+ # _download_all_segments() will set this to:
+ # self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
+ self._share_vbuckets = None
self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
# self._responses_received = 0
# self._queries_failed = 0
+ # This is solely for the use of unit tests. It will be triggered when
+ # we start downloading shares.
+ self._stage_4_d = defer.Deferred()
+
def pauseProducing(self):
if self._paused:
return
reactor.callLater(0, self._wait_for_enough_buckets_d.callback, True)
self._wait_for_enough_buckets_d = None
+ if self._share_vbuckets is not None:
+ vbucket = ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
+ self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
+
if self._results:
if peerid not in self._results.servermap:
self._results.servermap[peerid] = set()
def _download_all_segments(self, res):
+ # From now on if new buckets are received then I will notice that
+ # self._share_vbuckets is not None and generate a vbucket for that new
+ # bucket and add it in to _share_vbuckets. (We had to wait because we
+ # didn't have self._vup and self._share_hash_tree earlier. We didn't
+ # need validated buckets until now -- now that we are ready to download
+ # shares.)
+ self._share_vbuckets = {}
for sharenum, buckets in self._share_buckets.iteritems():
for bucket in buckets:
vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
# this pause, at the end of write, prevents pre-fetch from
# happening until the consumer is ready for more data.
d.addCallback(self._check_for_pause)
+
+ self._stage_4_d.callback(None)
return d
def _check_for_pause(self, res):
import os, shutil
from twisted.trial import unittest
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from allmydata import uri
from allmydata.util.consumer import download_to_data
from allmydata.immutable import upload
from allmydata.mutable.common import UnrecoverableFileError
from allmydata.storage.common import storage_index_to_dir
from allmydata.test.no_network import GridTestMixin
-from allmydata.test.common import ShouldFailMixin
+from allmydata.test.common import ShouldFailMixin, _corrupt_share_data
from allmydata.interfaces import NotEnoughSharesError
immutable_plaintext = "data" * 10000
for (id, ss) in servers:
self.g.hang_server(id, **kwargs)
+ def _unhang(self, servers, **kwargs):
+ for (id, ss) in servers:
+ self.g.unhang_server(id, **kwargs)
+
def _delete_all_shares_from(self, servers):
serverids = [id for (id, ss) in servers]
for (i_shnum, i_serverid, i_sharefile) in self.shares:
if i_serverid in serverids:
os.unlink(i_sharefile)
+ def _corrupt_all_shares_in(self, servers, corruptor_func):
+ serverids = [id for (id, ss) in servers]
+ for (i_shnum, i_serverid, i_sharefile) in self.shares:
+ if i_serverid in serverids:
+ self._corrupt_share((i_shnum, i_sharefile), corruptor_func)
+
def _copy_all_shares_from(self, from_servers, to_server):
serverids = [id for (id, ss) in from_servers]
for (i_shnum, i_serverid, i_sharefile) in self.shares:
self.failUnless((sharenum, ss.original.my_nodeid, new_sharefile)
in self.shares)
+ def _corrupt_share(self, share, corruptor_func):
+ (sharenum, sharefile) = share
+ data = open(sharefile, "rb").read()
+ newdata = corruptor_func(data)
+ os.unlink(sharefile)
+ wf = open(sharefile, "wb")
+ wf.write(newdata)
+ wf.close()
+
def _set_up(self, mutable, testdir, num_clients=1, num_servers=10):
self.mutable = mutable
if mutable:
d.addCallback(_uploaded_immutable)
return d
- def _check_download(self):
+ def _start_download(self):
n = self.c0.create_node_from_uri(self.uri)
if self.mutable:
d = n.download_best_version()
- expected_plaintext = mutable_plaintext
+ stage_4_d = None # currently we aren't doing any tests which require this for mutable files
+ else:
+ d = download_to_data(n)
+ stage_4_d = n._downloader._all_downloads.keys()[0]._stage_4_d # too ugly! FIXME
+ return (d, stage_4_d,)
+
+ def _wait_for_data(self, n):
+ if self.mutable:
+ d = n.download_best_version()
else:
d = download_to_data(n)
- expected_plaintext = immutable_plaintext
- def _got_data(data):
- self.failUnlessEqual(data, expected_plaintext)
- d.addCallback(_got_data)
+ return d
+
+ def _check(self, resultingdata):
+ if self.mutable:
+ self.failUnlessEqual(resultingdata, mutable_plaintext)
+ else:
+ self.failUnlessEqual(resultingdata, immutable_plaintext)
+
+ def _download_and_check(self):
+ d, stage4d = self._start_download()
+ d.addCallback(self._check)
return d
def _should_fail_download(self):
if self.mutable:
return self.shouldFail(UnrecoverableFileError, self.basedir,
"no recoverable versions",
- self._check_download)
+ self._download_and_check)
else:
return self.shouldFail(NotEnoughSharesError, self.basedir,
"Failed to get enough shareholders",
- self._check_download)
+ self._download_and_check)
def test_10_good_sanity_check(self):
d = defer.succeed(None)
for mutable in [False, True]:
d.addCallback(lambda ign: self._set_up(mutable, "test_10_good_sanity_check"))
- d.addCallback(lambda ign: self._check_download())
+ d.addCallback(lambda ign: self._download_and_check())
return d
def test_10_good_copied_share(self):
for mutable in [False, True]:
d.addCallback(lambda ign: self._set_up(mutable, "test_10_good_copied_share"))
d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0]))
- d.addCallback(lambda ign: self._check_download())
+ d.addCallback(lambda ign: self._download_and_check())
return d
def test_3_good_7_noshares(self):
for mutable in [False, True]:
d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_noshares"))
d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[3:]))
- d.addCallback(lambda ign: self._check_download())
+ d.addCallback(lambda ign: self._download_and_check())
return d
def test_2_good_8_broken_fail(self):
d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_broken_copied_share"))
d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0]))
d.addCallback(lambda ign: self._break(self.servers[2:]))
- d.addCallback(lambda ign: self._check_download())
+ d.addCallback(lambda ign: self._download_and_check())
return d
def test_2_good_8_broken_duplicate_share_fail(self):
for mutable in [False]:
d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_hung"))
d.addCallback(lambda ign: self._hang(self.servers[3:]))
- d.addCallback(lambda ign: self._check_download())
+ d.addCallback(lambda ign: self._download_and_check())
return d
def test_2_good_8_hung_then_1_recovers(self):
d = defer.succeed(None)
for mutable in [False]:
- recovered = defer.Deferred()
d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers"))
- d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered))
+ d.addCallback(lambda ign: self._hang(self.servers[2:3]))
d.addCallback(lambda ign: self._hang(self.servers[3:]))
- d.addCallback(lambda ign: reactor.callLater(5, recovered.callback, None))
- d.addCallback(lambda ign: self._check_download())
+ d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
+ d.addCallback(lambda ign: self._download_and_check())
return d
def test_2_good_8_hung_then_1_recovers_with_2_shares(self):
d = defer.succeed(None)
for mutable in [False]:
- recovered = defer.Deferred()
d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers_with_2_shares"))
d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2]))
- d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered))
+ d.addCallback(lambda ign: self._hang(self.servers[2:3]))
d.addCallback(lambda ign: self._hang(self.servers[3:]))
- d.addCallback(lambda ign: reactor.callLater(5, recovered.callback, None))
- d.addCallback(lambda ign: self._check_download())
+ d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
+ d.addCallback(lambda ign: self._download_and_check())
+ return d
+
+ def test_failover_during_stage_4(self):
+ # See #287
+ d = defer.succeed(None)
+ for mutable in [False]:
+ d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4"))
+ d.addCallback(lambda ign: self._corrupt_all_shares_in(self.servers[2:3], _corrupt_share_data))
+ d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4"))
+ d.addCallback(lambda ign: self._hang(self.servers[3:]))
+ d.addCallback(lambda ign: self._start_download())
+ def _after_starting_download((doned, started4d)):
+ started4d.addCallback(lambda ign: self._unhang(self.servers[3:4]))
+ doned.addCallback(self._check)
+ return doned
+ d.addCallback(_after_starting_download)
+
return d