]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_immutable.py
Eliminate mock dependency.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_immutable.py
1
2 import random
3
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from foolscap.api import eventually
7
8 from allmydata.test import common
9 from allmydata.test.no_network import GridTestMixin
10 from allmydata.test.common import TEST_DATA
11 from allmydata import uri
12 from allmydata.util import log
13 from allmydata.util.consumer import download_to_data
14
15 from allmydata.interfaces import NotEnoughSharesError
16 from allmydata.immutable.upload import Data
17 from allmydata.immutable.downloader import finder
18
19
20 class MockShareHashTree(object):
21     def needed_hashes(self):
22         return False
23
24 class MockNode(object):
25     def __init__(self, check_reneging, check_fetch_failed):
26         self.got = 0
27         self.finished_d = defer.Deferred()
28         self.segment_size = 78
29         self.guessed_segment_size = 78
30         self._no_more_shares = False
31         self.check_reneging = check_reneging
32         self.check_fetch_failed = check_fetch_failed
33         self._si_prefix='aa'
34         self.have_UEB = True
35         self.share_hash_tree = MockShareHashTree()
36         self.on_want_more_shares = None
37
38     def when_finished(self):
39         return self.finished_d
40     def get_num_segments(self):
41         return (5, True)
42     def _calculate_sizes(self, guessed_segment_size):
43         return {'block_size': 4, 'num_segments': 5}
44     def no_more_shares(self):
45         self._no_more_shares = True
46     def got_shares(self, shares):
47         if self.check_reneging:
48             if self._no_more_shares:
49                 self.finished_d.errback(unittest.FailTest("The node was told by the share finder that it is destined to remain hungry, then was given another share."))
50                 return
51         self.got += len(shares)
52         log.msg("yyy 3 %s.got_shares(%s) got: %s" % (self, shares, self.got))
53         if self.got == 3:
54             self.finished_d.callback(True)
55     def get_desired_ciphertext_hashes(self, *args, **kwargs):
56         return iter([])
57     def fetch_failed(self, *args, **kwargs):
58         if self.check_fetch_failed:
59             if self.finished_d:
60                 self.finished_d.errback(unittest.FailTest("The node was told by the segment fetcher that the download failed."))
61                 self.finished_d = None
62     def want_more_shares(self):
63         if self.on_want_more_shares:
64             self.on_want_more_shares()
65     def process_blocks(self, *args, **kwargs):
66         if self.finished_d:
67             self.finished_d.callback(None)
68
69 class TestShareFinder(unittest.TestCase):
70     def test_no_reneging_on_no_more_shares_ever(self):
71         # ticket #1191
72
73         # Suppose that K=3 and you send two DYHB requests, the first
74         # response offers two shares, and then the last offers one
75         # share. If you tell your share consumer "no more shares,
76         # ever", and then immediately tell them "oh, and here's
77         # another share", then you lose.
78
79         rcap = uri.CHKFileURI('a'*32, 'a'*32, 3, 99, 100)
80         vcap = rcap.get_verify_cap()
81
82         class MockBuckets(object):
83             pass
84
85         class MockServer(object):
86             def __init__(self, buckets):
87                 self.version = {
88                     'http://allmydata.org/tahoe/protocols/storage/v1': {
89                         "tolerates-immutable-read-overrun": True
90                         }
91                     }
92                 self.buckets = buckets
93                 self.d = defer.Deferred()
94                 self.s = None
95             def callRemote(self, methname, *args, **kwargs):
96                 d = defer.Deferred()
97
98                 # Even after the 3rd answer we're still hungry because
99                 # we're interested in finding a share on a 3rd server
100                 # so we don't have to download more than one share
101                 # from the first server. This is actually necessary to
102                 # trigger the bug.
103                 def _give_buckets_and_hunger_again():
104                     d.callback(self.buckets)
105                     self.s.hungry()
106                 eventually(_give_buckets_and_hunger_again)
107                 return d
108
109         class MockIServer(object):
110             def __init__(self, serverid, rref):
111                 self.serverid = serverid
112                 self.rref = rref
113             def get_serverid(self):
114                 return self.serverid
115             def get_rref(self):
116                 return self.rref
117             def get_name(self):
118                 return "name-%s" % self.serverid
119             def get_version(self):
120                 return self.rref.version
121
122         class MockStorageBroker(object):
123             def __init__(self, servers):
124                 self.servers = servers
125             def get_servers_for_psi(self, si):
126                 return self.servers
127
128         class MockDownloadStatus(object):
129             def add_dyhb_request(self, server, when):
130                 return MockDYHBEvent()
131
132         class MockDYHBEvent(object):
133             def finished(self, shnums, when):
134                 pass
135
136         mockserver1 = MockServer({1: MockBuckets(), 2: MockBuckets()})
137         mockserver2 = MockServer({})
138         mockserver3 = MockServer({3: MockBuckets()})
139         servers = [ MockIServer("ms1", mockserver1),
140                     MockIServer("ms2", mockserver2),
141                     MockIServer("ms3", mockserver3), ]
142         mockstoragebroker = MockStorageBroker(servers)
143         mockdownloadstatus = MockDownloadStatus()
144         mocknode = MockNode(check_reneging=True, check_fetch_failed=True)
145
146         s = finder.ShareFinder(mockstoragebroker, vcap, mocknode, mockdownloadstatus)
147
148         mockserver1.s = s
149         mockserver2.s = s
150         mockserver3.s = s
151
152         s.hungry()
153
154         return mocknode.when_finished()
155
156
157 class Test(GridTestMixin, unittest.TestCase, common.ShouldFailMixin):
158     def startup(self, basedir):
159         self.basedir = basedir
160         self.set_up_grid(num_clients=2, num_servers=5)
161         c1 = self.g.clients[1]
162         # We need multiple segments to test crypttext hash trees that are
163         # non-trivial (i.e. they have more than just one hash in them).
164         c1.encoding_params['max_segment_size'] = 12
165         # Tests that need to test servers of happiness using this should
166         # set their own value for happy -- the default (7) breaks stuff.
167         c1.encoding_params['happy'] = 1
168         d = c1.upload(Data(TEST_DATA, convergence=""))
169         def _after_upload(ur):
170             self.uri = ur.get_uri()
171             self.filenode = self.g.clients[0].create_node_from_uri(ur.get_uri())
172             return self.uri
173         d.addCallback(_after_upload)
174         return d
175
176     def _stash_shares(self, shares):
177         self.shares = shares
178
179     def _download_and_check_plaintext(self, ign=None):
180         num_reads = self._count_reads()
181         d = download_to_data(self.filenode)
182         def _after_download(result):
183             self.failUnlessEqual(result, TEST_DATA)
184             return self._count_reads() - num_reads
185         d.addCallback(_after_download)
186         return d
187
188     def _shuffled(self, num_shnums):
189         shnums = range(10)
190         random.shuffle(shnums)
191         return shnums[:num_shnums]
192
193     def _count_reads(self):
194         return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.read', 0)
195                     for s in self.g.servers_by_number.values()])
196
197
198     def _count_allocates(self):
199         return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.allocate', 0)
200                     for s in self.g.servers_by_number.values()])
201
202     def _count_writes(self):
203         return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.write', 0)
204                     for s in self.g.servers_by_number.values()])
205
206     def test_test_code(self):
207         # The following process of stashing the shares, running
208         # replace_shares, and asserting that the new set of shares equals the
209         # old is more to test this test code than to test the Tahoe code...
210         d = self.startup("immutable/Test/code")
211         d.addCallback(self.copy_shares)
212         d.addCallback(self._stash_shares)
213         d.addCallback(self._download_and_check_plaintext)
214
215         # The following process of deleting 8 of the shares and asserting
216         # that you can't download it is more to test this test code than to
217         # test the Tahoe code...
218         def _then_delete_8(ign):
219             self.restore_all_shares(self.shares)
220             self.delete_shares_numbered(self.uri, range(8))
221         d.addCallback(_then_delete_8)
222         d.addCallback(lambda ign:
223                       self.shouldFail(NotEnoughSharesError, "download-2",
224                                       "ran out of shares",
225                                       download_to_data, self.filenode))
226         return d
227
228     def test_download(self):
229         """ Basic download. (This functionality is more or less already
230         tested by test code in other modules, but this module is also going
231         to test some more specific things about immutable download.)
232         """
233         d = self.startup("immutable/Test/download")
234         d.addCallback(self._download_and_check_plaintext)
235         def _after_download(ign):
236             num_reads = self._count_reads()
237             #print num_reads
238             self.failIf(num_reads > 41, num_reads)
239         d.addCallback(_after_download)
240         return d
241
242     def test_download_from_only_3_remaining_shares(self):
243         """ Test download after 7 random shares (of the 10) have been
244         removed."""
245         d = self.startup("immutable/Test/download_from_only_3_remaining_shares")
246         d.addCallback(lambda ign:
247                       self.delete_shares_numbered(self.uri, range(7)))
248         d.addCallback(self._download_and_check_plaintext)
249         def _after_download(num_reads):
250             #print num_reads
251             self.failIf(num_reads > 41, num_reads)
252         d.addCallback(_after_download)
253         return d
254
255     def test_download_from_only_3_shares_with_good_crypttext_hash(self):
256         """ Test download after 7 random shares (of the 10) have had their
257         crypttext hash tree corrupted."""
258         d = self.startup("download_from_only_3_shares_with_good_crypttext_hash")
259         def _corrupt_7(ign):
260             c = common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes
261             self.corrupt_shares_numbered(self.uri, self._shuffled(7), c)
262         d.addCallback(_corrupt_7)
263         d.addCallback(self._download_and_check_plaintext)
264         return d
265
266     def test_download_abort_if_too_many_missing_shares(self):
267         """ Test that download gives up quickly when it realizes there aren't
268         enough shares out there."""
269         d = self.startup("download_abort_if_too_many_missing_shares")
270         d.addCallback(lambda ign:
271                       self.delete_shares_numbered(self.uri, range(8)))
272         d.addCallback(lambda ign:
273                       self.shouldFail(NotEnoughSharesError, "delete 8",
274                                       "Last failure: None",
275                                       download_to_data, self.filenode))
276         # the new downloader pipelines a bunch of read requests in parallel,
277         # so don't bother asserting anything about the number of reads
278         return d
279
280     def test_download_abort_if_too_many_corrupted_shares(self):
281         """Test that download gives up quickly when it realizes there aren't
282         enough uncorrupted shares out there. It should be able to tell
283         because the corruption occurs in the sharedata version number, which
284         it checks first."""
285         d = self.startup("download_abort_if_too_many_corrupted_shares")
286         def _corrupt_8(ign):
287             c = common._corrupt_sharedata_version_number
288             self.corrupt_shares_numbered(self.uri, self._shuffled(8), c)
289         d.addCallback(_corrupt_8)
290         def _try_download(ign):
291             start_reads = self._count_reads()
292             d2 = self.shouldFail(NotEnoughSharesError, "corrupt 8",
293                                  "LayoutInvalid",
294                                  download_to_data, self.filenode)
295             def _check_numreads(ign):
296                 num_reads = self._count_reads() - start_reads
297                 #print num_reads
298
299                 # To pass this test, you are required to give up before
300                 # reading all of the share data. Actually, we could give up
301                 # sooner than 45 reads, but currently our download code does
302                 # 45 reads. This test then serves as a "performance
303                 # regression detector" -- if you change download code so that
304                 # it takes *more* reads, then this test will fail.
305                 self.failIf(num_reads > 45, num_reads)
306             d2.addCallback(_check_numreads)
307             return d2
308         d.addCallback(_try_download)
309         return d
310
311     def test_download_to_data(self):
312         d = self.startup("download_to_data")
313         d.addCallback(lambda ign: self.filenode.download_to_data())
314         d.addCallback(lambda data:
315             self.failUnlessEqual(data, common.TEST_DATA))
316         return d
317
318
319     def test_download_best_version(self):
320         d = self.startup("download_best_version")
321         d.addCallback(lambda ign: self.filenode.download_best_version())
322         d.addCallback(lambda data:
323             self.failUnlessEqual(data, common.TEST_DATA))
324         return d
325
326
327     def test_get_best_readable_version(self):
328         d = self.startup("get_best_readable_version")
329         d.addCallback(lambda ign: self.filenode.get_best_readable_version())
330         d.addCallback(lambda n2:
331             self.failUnlessEqual(n2, self.filenode))
332         return d
333
334     def test_get_size_of_best_version(self):
335         d = self.startup("get_size_of_best_version")
336         d.addCallback(lambda ign: self.filenode.get_size_of_best_version())
337         d.addCallback(lambda size:
338             self.failUnlessEqual(size, len(common.TEST_DATA)))
339         return d
340
341
342 # XXX extend these tests to show bad behavior of various kinds from servers:
343 # raising exception from each remove_foo() method, for example
344
345 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
346
347 # TODO: delete this whole file