4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from foolscap.api import eventually
8 from allmydata.test import common
9 from allmydata.test.no_network import GridTestMixin
10 from allmydata.test.common import TEST_DATA
11 from allmydata import uri
12 from allmydata.util import log
13 from allmydata.util.consumer import download_to_data
15 from allmydata.interfaces import NotEnoughSharesError
16 from allmydata.immutable.upload import Data
17 from allmydata.immutable.downloader import finder
20 class MockShareHashTree(object):
21 def needed_hashes(self):
24 class MockNode(object):
25 def __init__(self, check_reneging, check_fetch_failed):
27 self.finished_d = defer.Deferred()
28 self.segment_size = 78
29 self.guessed_segment_size = 78
30 self._no_more_shares = False
31 self.check_reneging = check_reneging
32 self.check_fetch_failed = check_fetch_failed
35 self.share_hash_tree = MockShareHashTree()
36 self.on_want_more_shares = None
38 def when_finished(self):
39 return self.finished_d
40 def get_num_segments(self):
42 def _calculate_sizes(self, guessed_segment_size):
43 return {'block_size': 4, 'num_segments': 5}
44 def no_more_shares(self):
45 self._no_more_shares = True
46 def got_shares(self, shares):
47 if self.check_reneging:
48 if self._no_more_shares:
49 self.finished_d.errback(unittest.FailTest("The node was told by the share finder that it is destined to remain hungry, then was given another share."))
51 self.got += len(shares)
52 log.msg("yyy 3 %s.got_shares(%s) got: %s" % (self, shares, self.got))
54 self.finished_d.callback(True)
55 def get_desired_ciphertext_hashes(self, *args, **kwargs):
57 def fetch_failed(self, *args, **kwargs):
58 if self.check_fetch_failed:
60 self.finished_d.errback(unittest.FailTest("The node was told by the segment fetcher that the download failed."))
61 self.finished_d = None
62 def want_more_shares(self):
63 if self.on_want_more_shares:
64 self.on_want_more_shares()
65 def process_blocks(self, *args, **kwargs):
67 self.finished_d.callback(None)
69 class TestShareFinder(unittest.TestCase):
70 def test_no_reneging_on_no_more_shares_ever(self):
73 # Suppose that K=3 and you send two DYHB requests, the first
74 # response offers two shares, and then the last offers one
75 # share. If you tell your share consumer "no more shares,
76 # ever", and then immediately tell them "oh, and here's
77 # another share", then you lose.
79 rcap = uri.CHKFileURI('a'*32, 'a'*32, 3, 99, 100)
80 vcap = rcap.get_verify_cap()
82 class MockBuckets(object):
85 class MockServer(object):
86 def __init__(self, buckets):
88 'http://allmydata.org/tahoe/protocols/storage/v1': {
89 "tolerates-immutable-read-overrun": True
92 self.buckets = buckets
93 self.d = defer.Deferred()
95 def callRemote(self, methname, *args, **kwargs):
98 # Even after the 3rd answer we're still hungry because
99 # we're interested in finding a share on a 3rd server
100 # so we don't have to download more than one share
101 # from the first server. This is actually necessary to
103 def _give_buckets_and_hunger_again():
104 d.callback(self.buckets)
106 eventually(_give_buckets_and_hunger_again)
109 class MockIServer(object):
110 def __init__(self, serverid, rref):
111 self.serverid = serverid
113 def get_serverid(self):
118 return "name-%s" % self.serverid
119 def get_version(self):
120 return self.rref.version
122 class MockStorageBroker(object):
123 def __init__(self, servers):
124 self.servers = servers
125 def get_servers_for_psi(self, si):
128 class MockDownloadStatus(object):
129 def add_dyhb_request(self, server, when):
130 return MockDYHBEvent()
132 class MockDYHBEvent(object):
133 def finished(self, shnums, when):
136 mockserver1 = MockServer({1: MockBuckets(), 2: MockBuckets()})
137 mockserver2 = MockServer({})
138 mockserver3 = MockServer({3: MockBuckets()})
139 servers = [ MockIServer("ms1", mockserver1),
140 MockIServer("ms2", mockserver2),
141 MockIServer("ms3", mockserver3), ]
142 mockstoragebroker = MockStorageBroker(servers)
143 mockdownloadstatus = MockDownloadStatus()
144 mocknode = MockNode(check_reneging=True, check_fetch_failed=True)
146 s = finder.ShareFinder(mockstoragebroker, vcap, mocknode, mockdownloadstatus)
154 return mocknode.when_finished()
157 class Test(GridTestMixin, unittest.TestCase, common.ShouldFailMixin):
158 def startup(self, basedir):
159 self.basedir = basedir
160 self.set_up_grid(num_clients=2, num_servers=5)
161 c1 = self.g.clients[1]
162 # We need multiple segments to test crypttext hash trees that are
163 # non-trivial (i.e. they have more than just one hash in them).
164 c1.encoding_params['max_segment_size'] = 12
165 # Tests that need to test servers of happiness using this should
166 # set their own value for happy -- the default (7) breaks stuff.
167 c1.encoding_params['happy'] = 1
168 d = c1.upload(Data(TEST_DATA, convergence=""))
169 def _after_upload(ur):
170 self.uri = ur.get_uri()
171 self.filenode = self.g.clients[0].create_node_from_uri(ur.get_uri())
173 d.addCallback(_after_upload)
176 def _stash_shares(self, shares):
179 def _download_and_check_plaintext(self, ign=None):
180 num_reads = self._count_reads()
181 d = download_to_data(self.filenode)
182 def _after_download(result):
183 self.failUnlessEqual(result, TEST_DATA)
184 return self._count_reads() - num_reads
185 d.addCallback(_after_download)
188 def _shuffled(self, num_shnums):
190 random.shuffle(shnums)
191 return shnums[:num_shnums]
193 def _count_reads(self):
194 return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.read', 0)
195 for s in self.g.servers_by_number.values()])
198 def _count_allocates(self):
199 return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.allocate', 0)
200 for s in self.g.servers_by_number.values()])
202 def _count_writes(self):
203 return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.write', 0)
204 for s in self.g.servers_by_number.values()])
206 def test_test_code(self):
207 # The following process of stashing the shares, running
208 # replace_shares, and asserting that the new set of shares equals the
209 # old is more to test this test code than to test the Tahoe code...
210 d = self.startup("immutable/Test/code")
211 d.addCallback(self.copy_shares)
212 d.addCallback(self._stash_shares)
213 d.addCallback(self._download_and_check_plaintext)
215 # The following process of deleting 8 of the shares and asserting
216 # that you can't download it is more to test this test code than to
217 # test the Tahoe code...
218 def _then_delete_8(ign):
219 self.restore_all_shares(self.shares)
220 self.delete_shares_numbered(self.uri, range(8))
221 d.addCallback(_then_delete_8)
222 d.addCallback(lambda ign:
223 self.shouldFail(NotEnoughSharesError, "download-2",
225 download_to_data, self.filenode))
228 def test_download(self):
229 """ Basic download. (This functionality is more or less already
230 tested by test code in other modules, but this module is also going
231 to test some more specific things about immutable download.)
233 d = self.startup("immutable/Test/download")
234 d.addCallback(self._download_and_check_plaintext)
235 def _after_download(ign):
236 num_reads = self._count_reads()
238 self.failIf(num_reads > 41, num_reads)
239 d.addCallback(_after_download)
242 def test_download_from_only_3_remaining_shares(self):
243 """ Test download after 7 random shares (of the 10) have been
245 d = self.startup("immutable/Test/download_from_only_3_remaining_shares")
246 d.addCallback(lambda ign:
247 self.delete_shares_numbered(self.uri, range(7)))
248 d.addCallback(self._download_and_check_plaintext)
249 def _after_download(num_reads):
251 self.failIf(num_reads > 41, num_reads)
252 d.addCallback(_after_download)
255 def test_download_from_only_3_shares_with_good_crypttext_hash(self):
256 """ Test download after 7 random shares (of the 10) have had their
257 crypttext hash tree corrupted."""
258 d = self.startup("download_from_only_3_shares_with_good_crypttext_hash")
260 c = common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes
261 self.corrupt_shares_numbered(self.uri, self._shuffled(7), c)
262 d.addCallback(_corrupt_7)
263 d.addCallback(self._download_and_check_plaintext)
266 def test_download_abort_if_too_many_missing_shares(self):
267 """ Test that download gives up quickly when it realizes there aren't
268 enough shares out there."""
269 d = self.startup("download_abort_if_too_many_missing_shares")
270 d.addCallback(lambda ign:
271 self.delete_shares_numbered(self.uri, range(8)))
272 d.addCallback(lambda ign:
273 self.shouldFail(NotEnoughSharesError, "delete 8",
274 "Last failure: None",
275 download_to_data, self.filenode))
276 # the new downloader pipelines a bunch of read requests in parallel,
277 # so don't bother asserting anything about the number of reads
280 def test_download_abort_if_too_many_corrupted_shares(self):
281 """Test that download gives up quickly when it realizes there aren't
282 enough uncorrupted shares out there. It should be able to tell
283 because the corruption occurs in the sharedata version number, which
285 d = self.startup("download_abort_if_too_many_corrupted_shares")
287 c = common._corrupt_sharedata_version_number
288 self.corrupt_shares_numbered(self.uri, self._shuffled(8), c)
289 d.addCallback(_corrupt_8)
290 def _try_download(ign):
291 start_reads = self._count_reads()
292 d2 = self.shouldFail(NotEnoughSharesError, "corrupt 8",
294 download_to_data, self.filenode)
295 def _check_numreads(ign):
296 num_reads = self._count_reads() - start_reads
299 # To pass this test, you are required to give up before
300 # reading all of the share data. Actually, we could give up
301 # sooner than 45 reads, but currently our download code does
302 # 45 reads. This test then serves as a "performance
303 # regression detector" -- if you change download code so that
304 # it takes *more* reads, then this test will fail.
305 self.failIf(num_reads > 45, num_reads)
306 d2.addCallback(_check_numreads)
308 d.addCallback(_try_download)
311 def test_download_to_data(self):
312 d = self.startup("download_to_data")
313 d.addCallback(lambda ign: self.filenode.download_to_data())
314 d.addCallback(lambda data:
315 self.failUnlessEqual(data, common.TEST_DATA))
319 def test_download_best_version(self):
320 d = self.startup("download_best_version")
321 d.addCallback(lambda ign: self.filenode.download_best_version())
322 d.addCallback(lambda data:
323 self.failUnlessEqual(data, common.TEST_DATA))
327 def test_get_best_readable_version(self):
328 d = self.startup("get_best_readable_version")
329 d.addCallback(lambda ign: self.filenode.get_best_readable_version())
330 d.addCallback(lambda n2:
331 self.failUnlessEqual(n2, self.filenode))
334 def test_get_size_of_best_version(self):
335 d = self.startup("get_size_of_best_version")
336 d.addCallback(lambda ign: self.filenode.get_size_of_best_version())
337 d.addCallback(lambda size:
338 self.failUnlessEqual(size, len(common.TEST_DATA)))
342 # XXX extend these tests to show bad behavior of various kinds from servers:
343 # raising exception from each remove_foo() method, for example
345 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
347 # TODO: delete this whole file