]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_immutable.py
e4b584595476f5aa8dd48d2d063c8a7f8a4810ba
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_immutable.py
1 import random
2
3 from twisted.trial import unittest
4 from twisted.internet import defer
5 import mock
6 from foolscap.api import eventually
7
8 from allmydata.test import common
9 from allmydata.test.no_network import GridTestMixin
10 from allmydata.test.common import TEST_DATA
11 from allmydata import uri
12 from allmydata.util import log
13 from allmydata.util.consumer import download_to_data
14
15 from allmydata.interfaces import NotEnoughSharesError
16 from allmydata.immutable.upload import Data
17 from allmydata.immutable.downloader import finder
18
19 class MockNode(object):
20     def __init__(self, check_reneging, check_fetch_failed):
21         self.got = 0
22         self.finished_d = defer.Deferred()
23         self.segment_size = 78
24         self.guessed_segment_size = 78
25         self._no_more_shares = False
26         self.check_reneging = check_reneging
27         self.check_fetch_failed = check_fetch_failed
28         self._si_prefix='aa'
29         self.have_UEB = True
30         self.share_hash_tree = mock.Mock()
31         self.share_hash_tree.needed_hashes.return_value = False
32         self.on_want_more_shares = None
33
34     def when_finished(self):
35         return self.finished_d
36     def get_num_segments(self):
37         return (5, True)
38     def _calculate_sizes(self, guessed_segment_size):
39         return {'block_size': 4, 'num_segments': 5}
40     def no_more_shares(self):
41         self._no_more_shares = True
42     def got_shares(self, shares):
43         if self.check_reneging:
44             if self._no_more_shares:
45                 self.finished_d.errback(unittest.FailTest("The node was told by the share finder that it is destined to remain hungry, then was given another share."))
46                 return
47         self.got += len(shares)
48         log.msg("yyy 3 %s.got_shares(%s) got: %s" % (self, shares, self.got))
49         if self.got == 3:
50             self.finished_d.callback(True)
51     def get_desired_ciphertext_hashes(self, *args, **kwargs):
52         return iter([])
53     def fetch_failed(self, *args, **kwargs):
54         if self.check_fetch_failed:
55             if self.finished_d:
56                 self.finished_d.errback(unittest.FailTest("The node was told by the segment fetcher that the download failed."))
57                 self.finished_d = None
58     def want_more_shares(self):
59         if self.on_want_more_shares:
60             self.on_want_more_shares()
61     def process_blocks(self, *args, **kwargs):
62         if self.finished_d:
63             self.finished_d.callback(None)
64
65 class TestShareFinder(unittest.TestCase):
66     def test_no_reneging_on_no_more_shares_ever(self):
67         # ticket #1191
68
69         # Suppose that K=3 and you send two DYHB requests, the first
70         # response offers two shares, and then the last offers one
71         # share. If you tell your share consumer "no more shares,
72         # ever", and then immediately tell them "oh, and here's
73         # another share", then you lose.
74
75         rcap = uri.CHKFileURI('a'*32, 'a'*32, 3, 99, 100)
76         vcap = rcap.get_verify_cap()
77
78         class MockServer(object):
79             def __init__(self, buckets):
80                 self.version = {
81                     'http://allmydata.org/tahoe/protocols/storage/v1': {
82                         "tolerates-immutable-read-overrun": True
83                         }
84                     }
85                 self.buckets = buckets
86                 self.d = defer.Deferred()
87                 self.s = None
88             def callRemote(self, methname, *args, **kwargs):
89                 d = defer.Deferred()
90
91                 # Even after the 3rd answer we're still hungry because
92                 # we're interested in finding a share on a 3rd server
93                 # so we don't have to download more than one share
94                 # from the first server. This is actually necessary to
95                 # trigger the bug.
96                 def _give_buckets_and_hunger_again():
97                     d.callback(self.buckets)
98                     self.s.hungry()
99                 eventually(_give_buckets_and_hunger_again)
100                 return d
101         class MockIServer(object):
102             def __init__(self, serverid, rref):
103                 self.serverid = serverid
104                 self.rref = rref
105             def get_serverid(self):
106                 return self.serverid
107             def get_rref(self):
108                 return self.rref
109             def get_name(self):
110                 return "name-%s" % self.serverid
111             def get_version(self):
112                 return self.rref.version
113
114         mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()})
115         mockserver2 = MockServer({})
116         mockserver3 = MockServer({3: mock.Mock()})
117         mockstoragebroker = mock.Mock()
118         servers = [ MockIServer("ms1", mockserver1),
119                     MockIServer("ms2", mockserver2),
120                     MockIServer("ms3", mockserver3), ]
121         mockstoragebroker.get_servers_for_psi.return_value = servers
122         mockdownloadstatus = mock.Mock()
123         mocknode = MockNode(check_reneging=True, check_fetch_failed=True)
124
125         s = finder.ShareFinder(mockstoragebroker, vcap, mocknode, mockdownloadstatus)
126
127         mockserver1.s = s
128         mockserver2.s = s
129         mockserver3.s = s
130
131         s.hungry()
132
133         return mocknode.when_finished()
134
135
136 class Test(GridTestMixin, unittest.TestCase, common.ShouldFailMixin):
137     def startup(self, basedir):
138         self.basedir = basedir
139         self.set_up_grid(num_clients=2, num_servers=5)
140         c1 = self.g.clients[1]
141         # We need multiple segments to test crypttext hash trees that are
142         # non-trivial (i.e. they have more than just one hash in them).
143         c1.encoding_params['max_segment_size'] = 12
144         # Tests that need to test servers of happiness using this should
145         # set their own value for happy -- the default (7) breaks stuff.
146         c1.encoding_params['happy'] = 1
147         d = c1.upload(Data(TEST_DATA, convergence=""))
148         def _after_upload(ur):
149             self.uri = ur.get_uri()
150             self.filenode = self.g.clients[0].create_node_from_uri(ur.get_uri())
151             return self.uri
152         d.addCallback(_after_upload)
153         return d
154
155     def _stash_shares(self, shares):
156         self.shares = shares
157
158     def _download_and_check_plaintext(self, ign=None):
159         num_reads = self._count_reads()
160         d = download_to_data(self.filenode)
161         def _after_download(result):
162             self.failUnlessEqual(result, TEST_DATA)
163             return self._count_reads() - num_reads
164         d.addCallback(_after_download)
165         return d
166
167     def _shuffled(self, num_shnums):
168         shnums = range(10)
169         random.shuffle(shnums)
170         return shnums[:num_shnums]
171
172     def _count_reads(self):
173         return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.read', 0)
174                     for s in self.g.servers_by_number.values()])
175
176
177     def _count_allocates(self):
178         return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.allocate', 0)
179                     for s in self.g.servers_by_number.values()])
180
181     def _count_writes(self):
182         return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.write', 0)
183                     for s in self.g.servers_by_number.values()])
184
185     def test_test_code(self):
186         # The following process of stashing the shares, running
187         # replace_shares, and asserting that the new set of shares equals the
188         # old is more to test this test code than to test the Tahoe code...
189         d = self.startup("immutable/Test/code")
190         d.addCallback(self.copy_shares)
191         d.addCallback(self._stash_shares)
192         d.addCallback(self._download_and_check_plaintext)
193
194         # The following process of deleting 8 of the shares and asserting
195         # that you can't download it is more to test this test code than to
196         # test the Tahoe code...
197         def _then_delete_8(ign):
198             self.restore_all_shares(self.shares)
199             self.delete_shares_numbered(self.uri, range(8))
200         d.addCallback(_then_delete_8)
201         d.addCallback(lambda ign:
202                       self.shouldFail(NotEnoughSharesError, "download-2",
203                                       "ran out of shares",
204                                       download_to_data, self.filenode))
205         return d
206
207     def test_download(self):
208         """ Basic download. (This functionality is more or less already
209         tested by test code in other modules, but this module is also going
210         to test some more specific things about immutable download.)
211         """
212         d = self.startup("immutable/Test/download")
213         d.addCallback(self._download_and_check_plaintext)
214         def _after_download(ign):
215             num_reads = self._count_reads()
216             #print num_reads
217             self.failIf(num_reads > 41, num_reads)
218         d.addCallback(_after_download)
219         return d
220
221     def test_download_from_only_3_remaining_shares(self):
222         """ Test download after 7 random shares (of the 10) have been
223         removed."""
224         d = self.startup("immutable/Test/download_from_only_3_remaining_shares")
225         d.addCallback(lambda ign:
226                       self.delete_shares_numbered(self.uri, range(7)))
227         d.addCallback(self._download_and_check_plaintext)
228         def _after_download(num_reads):
229             #print num_reads
230             self.failIf(num_reads > 41, num_reads)
231         d.addCallback(_after_download)
232         return d
233
234     def test_download_from_only_3_shares_with_good_crypttext_hash(self):
235         """ Test download after 7 random shares (of the 10) have had their
236         crypttext hash tree corrupted."""
237         d = self.startup("download_from_only_3_shares_with_good_crypttext_hash")
238         def _corrupt_7(ign):
239             c = common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes
240             self.corrupt_shares_numbered(self.uri, self._shuffled(7), c)
241         d.addCallback(_corrupt_7)
242         d.addCallback(self._download_and_check_plaintext)
243         return d
244
245     def test_download_abort_if_too_many_missing_shares(self):
246         """ Test that download gives up quickly when it realizes there aren't
247         enough shares out there."""
248         d = self.startup("download_abort_if_too_many_missing_shares")
249         d.addCallback(lambda ign:
250                       self.delete_shares_numbered(self.uri, range(8)))
251         d.addCallback(lambda ign:
252                       self.shouldFail(NotEnoughSharesError, "delete 8",
253                                       "Last failure: None",
254                                       download_to_data, self.filenode))
255         # the new downloader pipelines a bunch of read requests in parallel,
256         # so don't bother asserting anything about the number of reads
257         return d
258
259     def test_download_abort_if_too_many_corrupted_shares(self):
260         """Test that download gives up quickly when it realizes there aren't
261         enough uncorrupted shares out there. It should be able to tell
262         because the corruption occurs in the sharedata version number, which
263         it checks first."""
264         d = self.startup("download_abort_if_too_many_corrupted_shares")
265         def _corrupt_8(ign):
266             c = common._corrupt_sharedata_version_number
267             self.corrupt_shares_numbered(self.uri, self._shuffled(8), c)
268         d.addCallback(_corrupt_8)
269         def _try_download(ign):
270             start_reads = self._count_reads()
271             d2 = self.shouldFail(NotEnoughSharesError, "corrupt 8",
272                                  "LayoutInvalid",
273                                  download_to_data, self.filenode)
274             def _check_numreads(ign):
275                 num_reads = self._count_reads() - start_reads
276                 #print num_reads
277
278                 # To pass this test, you are required to give up before
279                 # reading all of the share data. Actually, we could give up
280                 # sooner than 45 reads, but currently our download code does
281                 # 45 reads. This test then serves as a "performance
282                 # regression detector" -- if you change download code so that
283                 # it takes *more* reads, then this test will fail.
284                 self.failIf(num_reads > 45, num_reads)
285             d2.addCallback(_check_numreads)
286             return d2
287         d.addCallback(_try_download)
288         return d
289
290     def test_download_to_data(self):
291         d = self.startup("download_to_data")
292         d.addCallback(lambda ign: self.filenode.download_to_data())
293         d.addCallback(lambda data:
294             self.failUnlessEqual(data, common.TEST_DATA))
295         return d
296
297
298     def test_download_best_version(self):
299         d = self.startup("download_best_version")
300         d.addCallback(lambda ign: self.filenode.download_best_version())
301         d.addCallback(lambda data:
302             self.failUnlessEqual(data, common.TEST_DATA))
303         return d
304
305
306     def test_get_best_readable_version(self):
307         d = self.startup("get_best_readable_version")
308         d.addCallback(lambda ign: self.filenode.get_best_readable_version())
309         d.addCallback(lambda n2:
310             self.failUnlessEqual(n2, self.filenode))
311         return d
312
313     def test_get_size_of_best_version(self):
314         d = self.startup("get_size_of_best_version")
315         d.addCallback(lambda ign: self.filenode.get_size_of_best_version())
316         d.addCallback(lambda size:
317             self.failUnlessEqual(size, len(common.TEST_DATA)))
318         return d
319
320
321 # XXX extend these tests to show bad behavior of various kinds from servers:
322 # raising exception from each remove_foo() method, for example
323
324 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
325
326 # TODO: delete this whole file