]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_immutable.py
Refactor StorageFarmBroker handling of servers
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_immutable.py
1 from allmydata.test import common
2 from allmydata.interfaces import NotEnoughSharesError
3 from allmydata.util.consumer import download_to_data
4 from allmydata import uri
5 from twisted.internet import defer
6 from twisted.trial import unittest
7 import random
8
9 from foolscap.api import eventually
10 from allmydata.util import log
11
12 from allmydata.immutable.downloader import finder
13
14 import mock
15
16 class MockNode(object):
17     def __init__(self, check_reneging, check_fetch_failed):
18         self.got = 0
19         self.finished_d = defer.Deferred()
20         self.segment_size = 78
21         self.guessed_segment_size = 78
22         self._no_more_shares = False
23         self.check_reneging = check_reneging
24         self.check_fetch_failed = check_fetch_failed
25         self._si_prefix='aa'
26         self.have_UEB = True
27         self.share_hash_tree = mock.Mock()
28         self.share_hash_tree.needed_hashes.return_value = False
29         self.on_want_more_shares = None
30
31     def when_finished(self):
32         return self.finished_d
33     def get_num_segments(self):
34         return (5, True)
35     def _calculate_sizes(self, guessed_segment_size):
36         return {'block_size': 4, 'num_segments': 5}
37     def no_more_shares(self):
38         self._no_more_shares = True
39     def got_shares(self, shares):
40         if self.check_reneging:
41             if self._no_more_shares:
42                 self.finished_d.errback(unittest.FailTest("The node was told by the share finder that it is destined to remain hungry, then was given another share."))
43                 return
44         self.got += len(shares)
45         log.msg("yyy 3 %s.got_shares(%s) got: %s" % (self, shares, self.got))
46         if self.got == 3:
47             self.finished_d.callback(True)
48     def get_desired_ciphertext_hashes(self, *args, **kwargs):
49         return iter([])
50     def fetch_failed(self, *args, **kwargs):
51         if self.check_fetch_failed:
52             if self.finished_d:
53                 self.finished_d.errback(unittest.FailTest("The node was told by the segment fetcher that the download failed."))
54                 self.finished_d = None
55     def want_more_shares(self):
56         if self.on_want_more_shares:
57             self.on_want_more_shares()
58     def process_blocks(self, *args, **kwargs):
59         if self.finished_d:
60             self.finished_d.callback(None)
61
62 class TestShareFinder(unittest.TestCase):
63     def test_no_reneging_on_no_more_shares_ever(self):
64         # ticket #1191
65
66         # Suppose that K=3 and you send two DYHB requests, the first
67         # response offers two shares, and then the last offers one
68         # share. If you tell your share consumer "no more shares,
69         # ever", and then immediately tell them "oh, and here's
70         # another share", then you lose.
71
72         rcap = uri.CHKFileURI('a'*32, 'a'*32, 3, 99, 100)
73         vcap = rcap.get_verify_cap()
74
75         class MockServer(object):
76             def __init__(self, buckets):
77                 self.version = {
78                     'http://allmydata.org/tahoe/protocols/storage/v1': {
79                         "tolerates-immutable-read-overrun": True
80                         }
81                     }
82                 self.buckets = buckets
83                 self.d = defer.Deferred()
84                 self.s = None
85             def callRemote(self, methname, *args, **kwargs):
86                 d = defer.Deferred()
87
88                 # Even after the 3rd answer we're still hungry because
89                 # we're interested in finding a share on a 3rd server
90                 # so we don't have to download more than one share
91                 # from the first server. This is actually necessary to
92                 # trigger the bug.
93                 def _give_buckets_and_hunger_again():
94                     d.callback(self.buckets)
95                     self.s.hungry()
96                 eventually(_give_buckets_and_hunger_again)
97                 return d
98         class MockIServer(object):
99             def __init__(self, serverid, rref):
100                 self.serverid = serverid
101                 self.rref = rref
102             def get_serverid(self):
103                 return self.serverid
104             def get_rref(self):
105                 return self.rref
106
107         mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()})
108         mockserver2 = MockServer({})
109         mockserver3 = MockServer({3: mock.Mock()})
110         mockstoragebroker = mock.Mock()
111         servers = [ MockIServer("ms1", mockserver1),
112                     MockIServer("ms2", mockserver2),
113                     MockIServer("ms3", mockserver3), ]
114         mockstoragebroker.get_servers_for_psi.return_value = servers
115         mockdownloadstatus = mock.Mock()
116         mocknode = MockNode(check_reneging=True, check_fetch_failed=True)
117
118         s = finder.ShareFinder(mockstoragebroker, vcap, mocknode, mockdownloadstatus)
119
120         mockserver1.s = s
121         mockserver2.s = s
122         mockserver3.s = s
123
124         s.hungry()
125
126         return mocknode.when_finished()
127
128 class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase):
129     def test_test_code(self):
130         # The following process of stashing the shares, running
131         # replace_shares, and asserting that the new set of shares equals the
132         # old is more to test this test code than to test the Tahoe code...
133         d = defer.succeed(None)
134         d.addCallback(self.find_all_shares)
135         stash = [None]
136         def _stash_it(res):
137             stash[0] = res
138             return res
139         d.addCallback(_stash_it)
140
141         # The following process of deleting 8 of the shares and asserting
142         # that you can't download it is more to test this test code than to
143         # test the Tahoe code...
144         def _then_delete_8(unused=None):
145             self.replace_shares(stash[0], storage_index=self.uri.get_storage_index())
146             for i in range(8):
147                 self._delete_a_share()
148         d.addCallback(_then_delete_8)
149
150         def _then_download(unused=None):
151             d2 = download_to_data(self.n)
152
153             def _after_download_callb(result):
154                 self.fail() # should have gotten an errback instead
155                 return result
156             def _after_download_errb(failure):
157                 failure.trap(NotEnoughSharesError)
158                 return None # success!
159             d2.addCallbacks(_after_download_callb, _after_download_errb)
160             return d2
161         d.addCallback(_then_download)
162
163         return d
164
165     def test_download(self):
166         """ Basic download. (This functionality is more or less already
167         tested by test code in other modules, but this module is also going
168         to test some more specific things about immutable download.)
169         """
170         d = defer.succeed(None)
171         before_download_reads = self._count_reads()
172         def _after_download(unused=None):
173             after_download_reads = self._count_reads()
174             #print before_download_reads, after_download_reads
175             self.failIf(after_download_reads-before_download_reads > 41,
176                         (after_download_reads, before_download_reads))
177         d.addCallback(self._download_and_check_plaintext)
178         d.addCallback(_after_download)
179         return d
180
181     def test_download_from_only_3_remaining_shares(self):
182         """ Test download after 7 random shares (of the 10) have been
183         removed."""
184         d = defer.succeed(None)
185         def _then_delete_7(unused=None):
186             for i in range(7):
187                 self._delete_a_share()
188         before_download_reads = self._count_reads()
189         d.addCallback(_then_delete_7)
190         def _after_download(unused=None):
191             after_download_reads = self._count_reads()
192             #print before_download_reads, after_download_reads
193             self.failIf(after_download_reads-before_download_reads > 41, (after_download_reads, before_download_reads))
194         d.addCallback(self._download_and_check_plaintext)
195         d.addCallback(_after_download)
196         return d
197
198     def test_download_from_only_3_shares_with_good_crypttext_hash(self):
199         """ Test download after 7 random shares (of the 10) have had their
200         crypttext hash tree corrupted."""
201         d = defer.succeed(None)
202         def _then_corrupt_7(unused=None):
203             shnums = range(10)
204             random.shuffle(shnums)
205             for i in shnums[:7]:
206                 self._corrupt_a_share(None, common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes, i)
207         #before_download_reads = self._count_reads()
208         d.addCallback(_then_corrupt_7)
209         d.addCallback(self._download_and_check_plaintext)
210         return d
211
212     def test_download_abort_if_too_many_missing_shares(self):
213         """ Test that download gives up quickly when it realizes there aren't
214         enough shares out there."""
215         for i in range(8):
216             self._delete_a_share()
217         d = self.shouldFail(NotEnoughSharesError, "delete 8", None,
218                             download_to_data, self.n)
219         # the new downloader pipelines a bunch of read requests in parallel,
220         # so don't bother asserting anything about the number of reads
221         return d
222
223     def test_download_abort_if_too_many_corrupted_shares(self):
224         """Test that download gives up quickly when it realizes there aren't
225         enough uncorrupted shares out there. It should be able to tell
226         because the corruption occurs in the sharedata version number, which
227         it checks first."""
228         d = defer.succeed(None)
229         def _then_corrupt_8(unused=None):
230             shnums = range(10)
231             random.shuffle(shnums)
232             for shnum in shnums[:8]:
233                 self._corrupt_a_share(None, common._corrupt_sharedata_version_number, shnum)
234         d.addCallback(_then_corrupt_8)
235
236         before_download_reads = self._count_reads()
237         def _attempt_to_download(unused=None):
238             d2 = download_to_data(self.n)
239
240             def _callb(res):
241                 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
242             def _errb(f):
243                 self.failUnless(f.check(NotEnoughSharesError))
244             d2.addCallbacks(_callb, _errb)
245             return d2
246
247         d.addCallback(_attempt_to_download)
248
249         def _after_attempt(unused=None):
250             after_download_reads = self._count_reads()
251             #print before_download_reads, after_download_reads
252             # To pass this test, you are required to give up before reading
253             # all of the share data. Actually, we could give up sooner than
254             # 45 reads, but currently our download code does 45 reads. This
255             # test then serves as a "performance regression detector" -- if
256             # you change download code so that it takes *more* reads, then
257             # this test will fail.
258             self.failIf(after_download_reads-before_download_reads > 45,
259                         (after_download_reads, before_download_reads))
260         d.addCallback(_after_attempt)
261         return d
262
263
264 # XXX extend these tests to show bad behavior of various kinds from servers:
265 # raising exception from each remove_foo() method, for example
266
267 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
268
269 # TODO: delete this whole file