3 from twisted.trial import unittest
4 from twisted.internet import defer
6 from foolscap.api import eventually
8 from allmydata.test import common
9 from allmydata.test.no_network import GridTestMixin
10 from allmydata.test.common import TEST_DATA
11 from allmydata import uri
12 from allmydata.util import log
13 from allmydata.util.consumer import download_to_data
15 from allmydata.interfaces import NotEnoughSharesError
16 from allmydata.immutable.upload import Data
17 from allmydata.immutable.downloader import finder
19 class MockNode(object):
20 def __init__(self, check_reneging, check_fetch_failed):
22 self.finished_d = defer.Deferred()
23 self.segment_size = 78
24 self.guessed_segment_size = 78
25 self._no_more_shares = False
26 self.check_reneging = check_reneging
27 self.check_fetch_failed = check_fetch_failed
30 self.share_hash_tree = mock.Mock()
31 self.share_hash_tree.needed_hashes.return_value = False
32 self.on_want_more_shares = None
34 def when_finished(self):
35 return self.finished_d
36 def get_num_segments(self):
38 def _calculate_sizes(self, guessed_segment_size):
39 return {'block_size': 4, 'num_segments': 5}
40 def no_more_shares(self):
41 self._no_more_shares = True
42 def got_shares(self, shares):
43 if self.check_reneging:
44 if self._no_more_shares:
45 self.finished_d.errback(unittest.FailTest("The node was told by the share finder that it is destined to remain hungry, then was given another share."))
47 self.got += len(shares)
48 log.msg("yyy 3 %s.got_shares(%s) got: %s" % (self, shares, self.got))
50 self.finished_d.callback(True)
51 def get_desired_ciphertext_hashes(self, *args, **kwargs):
53 def fetch_failed(self, *args, **kwargs):
54 if self.check_fetch_failed:
56 self.finished_d.errback(unittest.FailTest("The node was told by the segment fetcher that the download failed."))
57 self.finished_d = None
58 def want_more_shares(self):
59 if self.on_want_more_shares:
60 self.on_want_more_shares()
61 def process_blocks(self, *args, **kwargs):
63 self.finished_d.callback(None)
65 class TestShareFinder(unittest.TestCase):
66 def test_no_reneging_on_no_more_shares_ever(self):
69 # Suppose that K=3 and you send two DYHB requests, the first
70 # response offers two shares, and then the last offers one
71 # share. If you tell your share consumer "no more shares,
72 # ever", and then immediately tell them "oh, and here's
73 # another share", then you lose.
75 rcap = uri.CHKFileURI('a'*32, 'a'*32, 3, 99, 100)
76 vcap = rcap.get_verify_cap()
78 class MockServer(object):
79 def __init__(self, buckets):
81 'http://allmydata.org/tahoe/protocols/storage/v1': {
82 "tolerates-immutable-read-overrun": True
85 self.buckets = buckets
86 self.d = defer.Deferred()
88 def callRemote(self, methname, *args, **kwargs):
91 # Even after the 3rd answer we're still hungry because
92 # we're interested in finding a share on a 3rd server
93 # so we don't have to download more than one share
94 # from the first server. This is actually necessary to
96 def _give_buckets_and_hunger_again():
97 d.callback(self.buckets)
99 eventually(_give_buckets_and_hunger_again)
101 class MockIServer(object):
102 def __init__(self, serverid, rref):
103 self.serverid = serverid
105 def get_serverid(self):
110 return "name-%s" % self.serverid
111 def get_version(self):
112 return self.rref.version
114 mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()})
115 mockserver2 = MockServer({})
116 mockserver3 = MockServer({3: mock.Mock()})
117 mockstoragebroker = mock.Mock()
118 servers = [ MockIServer("ms1", mockserver1),
119 MockIServer("ms2", mockserver2),
120 MockIServer("ms3", mockserver3), ]
121 mockstoragebroker.get_servers_for_psi.return_value = servers
122 mockdownloadstatus = mock.Mock()
123 mocknode = MockNode(check_reneging=True, check_fetch_failed=True)
125 s = finder.ShareFinder(mockstoragebroker, vcap, mocknode, mockdownloadstatus)
133 return mocknode.when_finished()
136 class Test(GridTestMixin, unittest.TestCase, common.ShouldFailMixin):
137 def startup(self, basedir):
138 self.basedir = basedir
139 self.set_up_grid(num_clients=2, num_servers=5)
140 c1 = self.g.clients[1]
141 # We need multiple segments to test crypttext hash trees that are
142 # non-trivial (i.e. they have more than just one hash in them).
143 c1.encoding_params['max_segment_size'] = 12
144 # Tests that need to test servers of happiness using this should
145 # set their own value for happy -- the default (7) breaks stuff.
146 c1.encoding_params['happy'] = 1
147 d = c1.upload(Data(TEST_DATA, convergence=""))
148 def _after_upload(ur):
149 self.uri = ur.get_uri()
150 self.filenode = self.g.clients[0].create_node_from_uri(ur.get_uri())
152 d.addCallback(_after_upload)
155 def _stash_shares(self, shares):
158 def _download_and_check_plaintext(self, ign=None):
159 num_reads = self._count_reads()
160 d = download_to_data(self.filenode)
161 def _after_download(result):
162 self.failUnlessEqual(result, TEST_DATA)
163 return self._count_reads() - num_reads
164 d.addCallback(_after_download)
167 def _shuffled(self, num_shnums):
169 random.shuffle(shnums)
170 return shnums[:num_shnums]
172 def _count_reads(self):
173 return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.read', 0)
174 for s in self.g.servers_by_number.values()])
177 def _count_allocates(self):
178 return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.allocate', 0)
179 for s in self.g.servers_by_number.values()])
181 def _count_writes(self):
182 return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.write', 0)
183 for s in self.g.servers_by_number.values()])
185 def test_test_code(self):
186 # The following process of stashing the shares, running
187 # replace_shares, and asserting that the new set of shares equals the
188 # old is more to test this test code than to test the Tahoe code...
189 d = self.startup("immutable/Test/code")
190 d.addCallback(self.copy_shares)
191 d.addCallback(self._stash_shares)
192 d.addCallback(self._download_and_check_plaintext)
194 # The following process of deleting 8 of the shares and asserting
195 # that you can't download it is more to test this test code than to
196 # test the Tahoe code...
197 def _then_delete_8(ign):
198 self.restore_all_shares(self.shares)
199 self.delete_shares_numbered(self.uri, range(8))
200 d.addCallback(_then_delete_8)
201 d.addCallback(lambda ign:
202 self.shouldFail(NotEnoughSharesError, "download-2",
204 download_to_data, self.filenode))
207 def test_download(self):
208 """ Basic download. (This functionality is more or less already
209 tested by test code in other modules, but this module is also going
210 to test some more specific things about immutable download.)
212 d = self.startup("immutable/Test/download")
213 d.addCallback(self._download_and_check_plaintext)
214 def _after_download(ign):
215 num_reads = self._count_reads()
217 self.failIf(num_reads > 41, num_reads)
218 d.addCallback(_after_download)
221 def test_download_from_only_3_remaining_shares(self):
222 """ Test download after 7 random shares (of the 10) have been
224 d = self.startup("immutable/Test/download_from_only_3_remaining_shares")
225 d.addCallback(lambda ign:
226 self.delete_shares_numbered(self.uri, range(7)))
227 d.addCallback(self._download_and_check_plaintext)
228 def _after_download(num_reads):
230 self.failIf(num_reads > 41, num_reads)
231 d.addCallback(_after_download)
234 def test_download_from_only_3_shares_with_good_crypttext_hash(self):
235 """ Test download after 7 random shares (of the 10) have had their
236 crypttext hash tree corrupted."""
237 d = self.startup("download_from_only_3_shares_with_good_crypttext_hash")
239 c = common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes
240 self.corrupt_shares_numbered(self.uri, self._shuffled(7), c)
241 d.addCallback(_corrupt_7)
242 d.addCallback(self._download_and_check_plaintext)
245 def test_download_abort_if_too_many_missing_shares(self):
246 """ Test that download gives up quickly when it realizes there aren't
247 enough shares out there."""
248 d = self.startup("download_abort_if_too_many_missing_shares")
249 d.addCallback(lambda ign:
250 self.delete_shares_numbered(self.uri, range(8)))
251 d.addCallback(lambda ign:
252 self.shouldFail(NotEnoughSharesError, "delete 8",
253 "Last failure: None",
254 download_to_data, self.filenode))
255 # the new downloader pipelines a bunch of read requests in parallel,
256 # so don't bother asserting anything about the number of reads
259 def test_download_abort_if_too_many_corrupted_shares(self):
260 """Test that download gives up quickly when it realizes there aren't
261 enough uncorrupted shares out there. It should be able to tell
262 because the corruption occurs in the sharedata version number, which
264 d = self.startup("download_abort_if_too_many_corrupted_shares")
266 c = common._corrupt_sharedata_version_number
267 self.corrupt_shares_numbered(self.uri, self._shuffled(8), c)
268 d.addCallback(_corrupt_8)
269 def _try_download(ign):
270 start_reads = self._count_reads()
271 d2 = self.shouldFail(NotEnoughSharesError, "corrupt 8",
273 download_to_data, self.filenode)
274 def _check_numreads(ign):
275 num_reads = self._count_reads() - start_reads
278 # To pass this test, you are required to give up before
279 # reading all of the share data. Actually, we could give up
280 # sooner than 45 reads, but currently our download code does
281 # 45 reads. This test then serves as a "performance
282 # regression detector" -- if you change download code so that
283 # it takes *more* reads, then this test will fail.
284 self.failIf(num_reads > 45, num_reads)
285 d2.addCallback(_check_numreads)
287 d.addCallback(_try_download)
290 def test_download_to_data(self):
291 d = self.startup("download_to_data")
292 d.addCallback(lambda ign: self.filenode.download_to_data())
293 d.addCallback(lambda data:
294 self.failUnlessEqual(data, common.TEST_DATA))
298 def test_download_best_version(self):
299 d = self.startup("download_best_version")
300 d.addCallback(lambda ign: self.filenode.download_best_version())
301 d.addCallback(lambda data:
302 self.failUnlessEqual(data, common.TEST_DATA))
306 def test_get_best_readable_version(self):
307 d = self.startup("get_best_readable_version")
308 d.addCallback(lambda ign: self.filenode.get_best_readable_version())
309 d.addCallback(lambda n2:
310 self.failUnlessEqual(n2, self.filenode))
313 def test_get_size_of_best_version(self):
314 d = self.startup("get_size_of_best_version")
315 d.addCallback(lambda ign: self.filenode.get_size_of_best_version())
316 d.addCallback(lambda size:
317 self.failUnlessEqual(size, len(common.TEST_DATA)))
321 # XXX extend these tests to show bad behavior of various kinds from servers:
322 # raising exception from each remove_foo() method, for example
324 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
326 # TODO: delete this whole file