]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_immutable.py
immutable: whoops, it actually takes up to 39 reads sometimes to download a corrupted...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_immutable.py
1
2 from allmydata.test.common import SystemTestMixin, ShareManglingMixin
3 from allmydata.monitor import Monitor
4 from allmydata.interfaces import IURI, NotEnoughSharesError
5 from allmydata.immutable import upload
6 from allmydata.util import log
7 from twisted.internet import defer
8 from twisted.trial import unittest
9 import random, struct
10 import common_util as testutil
11
12 TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
13
14 def corrupt_field(data, offset, size, debug=False):
15     if random.random() < 0.5:
16         newdata = testutil.flip_one_bit(data, offset, size)
17         if debug:
18             log.msg("testing: corrupting offset %d, size %d flipping one bit orig: %r, newdata: %r" % (offset, size, data[offset:offset+size], newdata[offset:offset+size]))
19         return newdata
20     else:
21         newval = testutil.insecurerandstr(size)
22         if debug:
23             log.msg("testing: corrupting offset %d, size %d randomizing field, orig: %r, newval: %r" % (offset, size, data[offset:offset+size], newval))
24         return data[:offset]+newval+data[offset+size:]
25
26 def _corrupt_file_version_number(data):
27     """ Scramble the file data -- the share file version number have one bit flipped or else
28     will be changed to a random value."""
29     return corrupt_field(data, 0x00, 4)
30
31 def _corrupt_size_of_file_data(data):
32     """ Scramble the file data -- the field showing the size of the share data within the file
33     will be set to one smaller. """
34     return corrupt_field(data, 0x04, 4)
35
36 def _corrupt_sharedata_version_number(data):
37     """ Scramble the file data -- the share data version number will have one bit flipped or
38     else will be changed to a random value, but not 1 or 2."""
39     return corrupt_field(data, 0x0c, 4)
40     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
41     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
42     newsharevernum = sharevernum
43     while newsharevernum in (1, 2):
44         newsharevernum = random.randrange(0, 2**32)
45     newsharevernumbytes = struct.pack(">l", newsharevernum)
46     return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
47
48 def _corrupt_sharedata_version_number_to_known_version(data):
49     """ Scramble the file data -- the share data version number will
50     be changed to 2 if it is 1 or else to 1 if it is 2."""
51     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
52     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
53     if sharevernum == 1:
54         newsharevernum = 2
55     else:
56         newsharevernum = 1
57     newsharevernumbytes = struct.pack(">l", newsharevernum)
58     return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
59
60 def _corrupt_segment_size(data):
61     """ Scramble the file data -- the field showing the size of the segment will have one
62     bit flipped or else be changed to a random value. """
63     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
64     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
65     if sharevernum == 1:
66         return corrupt_field(data, 0x0c+0x04, 4, debug=True)
67     else:
68         return corrupt_field(data, 0x0c+0x04, 8, debug=True)
69
70 def _corrupt_size_of_sharedata(data):
71     """ Scramble the file data -- the field showing the size of the data within the share
72     data will have one bit flipped or else will be changed to a random value. """
73     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
74     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
75     if sharevernum == 1:
76         return corrupt_field(data, 0x0c+0x08, 4)
77     else:
78         return corrupt_field(data, 0x0c+0x0c, 8)
79
80 def _corrupt_offset_of_sharedata(data):
81     """ Scramble the file data -- the field showing the offset of the data within the share
82     data will have one bit flipped or else be changed to a random value. """
83     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
84     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
85     if sharevernum == 1:
86         return corrupt_field(data, 0x0c+0x0c, 4)
87     else:
88         return corrupt_field(data, 0x0c+0x14, 8)
89
90 def _corrupt_offset_of_ciphertext_hash_tree(data):
91     """ Scramble the file data -- the field showing the offset of the ciphertext hash tree
92     within the share data will have one bit flipped or else be changed to a random value.
93     """
94     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
95     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
96     if sharevernum == 1:
97         return corrupt_field(data, 0x0c+0x14, 4, debug=True)
98     else:
99         return corrupt_field(data, 0x0c+0x24, 8, debug=True)
100
101 def _corrupt_offset_of_block_hashes(data):
102     """ Scramble the file data -- the field showing the offset of the block hash tree within
103     the share data will have one bit flipped or else will be changed to a random value. """
104     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
105     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
106     if sharevernum == 1:
107         return corrupt_field(data, 0x0c+0x18, 4)
108     else:
109         return corrupt_field(data, 0x0c+0x2c, 8)
110
111 def _corrupt_offset_of_share_hashes(data):
112     """ Scramble the file data -- the field showing the offset of the share hash tree within
113     the share data will have one bit flipped or else will be changed to a random value. """
114     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
115     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
116     if sharevernum == 1:
117         return corrupt_field(data, 0x0c+0x1c, 4)
118     else:
119         return corrupt_field(data, 0x0c+0x34, 8)
120
121 def _corrupt_offset_of_uri_extension(data):
122     """ Scramble the file data -- the field showing the offset of the uri extension will
123     have one bit flipped or else will be changed to a random value. """
124     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
125     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
126     if sharevernum == 1:
127         return corrupt_field(data, 0x0c+0x20, 4)
128     else:
129         return corrupt_field(data, 0x0c+0x3c, 8)
130
131 def _corrupt_share_data(data):
132     """ Scramble the file data -- the field containing the share data itself will have one
133     bit flipped or else will be changed to a random value. """
134     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
135     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
136     if sharevernum == 1:
137         sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
138
139         return corrupt_field(data, 0x0c+0x24, sharedatasize)
140     else:
141         sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
142
143         return corrupt_field(data, 0x0c+0x44, sharedatasize)
144
145 def _corrupt_crypttext_hash_tree(data):
146     """ Scramble the file data -- the field containing the crypttext hash tree will have one
147     bit flipped or else will be changed to a random value.
148     """
149     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
150     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
151     if sharevernum == 1:
152         crypttexthashtreeoffset = struct.unpack(">L", data[0x0c+0x14:0x0c+0x14+4])[0]
153         blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
154     else:
155         crypttexthashtreeoffset = struct.unpack(">Q", data[0x0c+0x24:0x0c+0x24+8])[0]
156         blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
157
158     return corrupt_field(data, crypttexthashtreeoffset, blockhashesoffset-crypttexthashtreeoffset)
159
160 def _corrupt_block_hashes(data):
161     """ Scramble the file data -- the field containing the block hash tree will have one bit
162     flipped or else will be changed to a random value.
163     """
164     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
165     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
166     if sharevernum == 1:
167         blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
168         sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
169     else:
170         blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
171         sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
172
173     return corrupt_field(data, blockhashesoffset, sharehashesoffset-blockhashesoffset)
174
175 def _corrupt_share_hashes(data):
176     """ Scramble the file data -- the field containing the share hash chain will have one
177     bit flipped or else will be changed to a random value.
178     """
179     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
180     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
181     if sharevernum == 1:
182         sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
183         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
184     else:
185         sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
186         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
187
188     return corrupt_field(data, sharehashesoffset, uriextoffset-sharehashesoffset)
189
190 def _corrupt_length_of_uri_extension(data):
191     """ Scramble the file data -- the field showing the length of the uri extension will
192     have one bit flipped or else will be changed to a random value. """
193     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
194     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
195     if sharevernum == 1:
196         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
197         return corrupt_field(data, uriextoffset, 4)
198     else:
199         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
200         return corrupt_field(data, uriextoffset, 8)
201
202 def _corrupt_uri_extension(data):
203     """ Scramble the file data -- the field containing the uri extension will have one bit
204     flipped or else will be changed to a random value. """
205     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
206     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
207     if sharevernum == 1:
208         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
209         uriextlen = struct.unpack(">L", data[0x0c+uriextoffset:0x0c+uriextoffset+4])[0]
210     else:
211         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
212         uriextlen = struct.unpack(">Q", data[0x0c+uriextoffset:0x0c+uriextoffset+8])[0]
213
214     return corrupt_field(data, uriextoffset, uriextlen)
215
216 class Test(ShareManglingMixin, unittest.TestCase):
217     def setUp(self):
218         # Set self.basedir to a temp dir which has the name of the current test method in its
219         # name.
220         self.basedir = self.mktemp()
221
222         d = defer.maybeDeferred(SystemTestMixin.setUp, self)
223         d.addCallback(lambda x: self.set_up_nodes())
224
225         def _upload_a_file(ignored):
226             d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
227             def _after_upload(u):
228                 self.uri = IURI(u.uri)
229                 return self.clients[0].create_node_from_uri(self.uri)
230             d2.addCallback(_after_upload)
231             return d2
232         d.addCallback(_upload_a_file)
233
234         def _stash_it(filenode):
235             self.filenode = filenode
236         d.addCallback(_stash_it)
237         return d
238
239     def _download_and_check_plaintext(self, unused=None):
240         self.downloader = self.clients[1].getServiceNamed("downloader")
241         d = self.downloader.download_to_data(self.uri)
242
243         def _after_download(result):
244             self.failUnlessEqual(result, TEST_DATA)
245         d.addCallback(_after_download)
246         return d
247
248     def _delete_a_share(self, unused=None, sharenum=None):
249         """ Delete one share. """
250
251         shares = self.find_shares()
252         ks = shares.keys()
253         if sharenum is not None:
254             k = [ key for key in shares.keys() if key[1] == sharenum ][0]
255         else:
256             k = random.choice(ks)
257         del shares[k]
258         self.replace_shares(shares, storage_index=self.uri.storage_index)
259
260         return unused
261
262     def test_test_code(self):
263         # The following process of stashing the shares, running
264         # replace_shares, and asserting that the new set of shares equals the
265         # old is more to test this test code than to test the Tahoe code...
266         d = defer.succeed(None)
267         d.addCallback(self.find_shares)
268         stash = [None]
269         def _stash_it(res):
270             stash[0] = res
271             return res
272         d.addCallback(_stash_it)
273         d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
274
275         def _compare(res):
276             oldshares = stash[0]
277             self.failUnless(isinstance(oldshares, dict), oldshares)
278             self.failUnlessEqual(oldshares, res)
279
280         d.addCallback(self.find_shares)
281         d.addCallback(_compare)
282
283         d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
284         d.addCallback(self.find_shares)
285         d.addCallback(lambda x: self.failUnlessEqual(x, {}))
286
287         # The following process of deleting 8 of the shares and asserting that you can't
288         # download it is more to test this test code than to test the Tahoe code...
289         def _then_delete_8(unused=None):
290             self.replace_shares(stash[0], storage_index=self.uri.storage_index)
291             for i in range(8):
292                 self._delete_a_share()
293         d.addCallback(_then_delete_8)
294
295         def _then_download(unused=None):
296             self.downloader = self.clients[1].getServiceNamed("downloader")
297             d = self.downloader.download_to_data(self.uri)
298
299             def _after_download_callb(result):
300                 self.fail() # should have gotten an errback instead
301                 return result
302             def _after_download_errb(failure):
303                 failure.trap(NotEnoughSharesError)
304                 return None # success!
305             d.addCallbacks(_after_download_callb, _after_download_errb)
306         d.addCallback(_then_download)
307
308         # The following process of leaving 8 of the shares deleted and asserting that you can't
309         # repair it is more to test this test code than to test the Tahoe code...
310         def _then_repair(unused=None):
311             d2 = self.filenode.check_and_repair(Monitor(), verify=False)
312             def _after_repair(checkandrepairresults):
313                 prerepairres = checkandrepairresults.get_pre_repair_results()
314                 postrepairres = checkandrepairresults.get_post_repair_results()
315                 self.failIf(prerepairres.is_healthy())
316                 self.failIf(postrepairres.is_healthy())
317             d2.addCallback(_after_repair)
318             return d2
319         d.addCallback(_then_repair)
320         return d
321
322     def _count_reads(self):
323         sum_of_read_counts = 0
324         for client in self.clients:
325             counters = client.stats_provider.get_stats()['counters']
326             sum_of_read_counts += counters.get('storage_server.read', 0)
327         return sum_of_read_counts
328
329     def _count_allocates(self):
330         sum_of_allocate_counts = 0
331         for client in self.clients:
332             counters = client.stats_provider.get_stats()['counters']
333             sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
334         return sum_of_allocate_counts
335
336     def _corrupt_a_share(self, unused, corruptor_func, sharenum):
337         shares = self.find_shares()
338         ks = [ key for key in shares.keys() if key[1] == sharenum ]
339         assert ks, (shares.keys(), sharenum)
340         k = ks[0]
341         shares[k] = corruptor_func(shares[k])
342         self.replace_shares(shares, storage_index=self.uri.storage_index)
343
344     def _corrupt_a_random_share(self, unused, corruptor_func):
345         """ Exactly one share on disk will be corrupted by corruptor_func. """
346         shares = self.find_shares()
347         ks = shares.keys()
348         k = random.choice(ks)
349         return self._corrupt_a_share(unused, corruptor_func, k)
350
351     def test_download(self):
352         """ Basic download.  (This functionality is more or less already tested by test code in
353         other modules, but this module is also going to test some more specific things about
354         immutable download.)
355         """
356         d = defer.succeed(None)
357         before_download_reads = self._count_reads()
358         def _after_download(unused=None):
359             after_download_reads = self._count_reads()
360             # To pass this test, you have to download the file using only 10 reads to get the
361             # UEB (in parallel from all shares), plus one read for each of the 3 shares.
362             self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
363         d.addCallback(self._download_and_check_plaintext)
364         d.addCallback(_after_download)
365         return d
366
367     def test_download_from_only_3_remaining_shares(self):
368         """ Test download after 7 random shares (of the 10) have been removed. """
369         d = defer.succeed(None)
370         def _then_delete_7(unused=None):
371             for i in range(7):
372                 self._delete_a_share()
373         before_download_reads = self._count_reads()
374         d.addCallback(_then_delete_7)
375         def _after_download(unused=None):
376             after_download_reads = self._count_reads()
377             # To pass this test, you have to download the file using only 10 reads to get the
378             # UEB (in parallel from all shares), plus one read for each of the 3 shares.
379             self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
380         d.addCallback(self._download_and_check_plaintext)
381         d.addCallback(_after_download)
382         return d
383
384     def test_download_abort_if_too_many_missing_shares(self):
385         """ Test that download gives up quickly when it realizes there aren't enough shares out
386         there."""
387         d = defer.succeed(None)
388         def _then_delete_8(unused=None):
389             for i in range(8):
390                 self._delete_a_share()
391         d.addCallback(_then_delete_8)
392
393         before_download_reads = self._count_reads()
394         def _attempt_to_download(unused=None):
395             downloader = self.clients[1].getServiceNamed("downloader")
396             d = downloader.download_to_data(self.uri)
397
398             def _callb(res):
399                 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
400             def _errb(f):
401                 self.failUnless(f.check(NotEnoughSharesError))
402             d.addCallbacks(_callb, _errb)
403             return d
404
405         d.addCallback(_attempt_to_download)
406
407         def _after_attempt(unused=None):
408             after_download_reads = self._count_reads()
409             # To pass this test, you are required to give up before actually trying to read any
410             # share data.
411             self.failIf(after_download_reads-before_download_reads > 0, (after_download_reads, before_download_reads))
412         d.addCallback(_after_attempt)
413         return d
414
415     def test_download_abort_if_too_many_corrupted_shares(self):
416         """ Test that download gives up quickly when it realizes there aren't enough uncorrupted
417         shares out there. It should be able to tell because the corruption occurs in the
418         sharedata version number, which it checks first."""
419         d = defer.succeed(None)
420         def _then_corrupt_8(unused=None):
421             shnums = range(10)
422             random.shuffle(shnums)
423             for shnum in shnums[:8]:
424                 self._corrupt_a_share(None, _corrupt_sharedata_version_number, shnum)
425         d.addCallback(_then_corrupt_8)
426
427         before_download_reads = self._count_reads()
428         def _attempt_to_download(unused=None):
429             downloader = self.clients[1].getServiceNamed("downloader")
430             d = downloader.download_to_data(self.uri)
431
432             def _callb(res):
433                 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
434             def _errb(f):
435                 self.failUnless(f.check(NotEnoughSharesError))
436             d.addCallbacks(_callb, _errb)
437             return d
438
439         d.addCallback(_attempt_to_download)
440
441         def _after_attempt(unused=None):
442             after_download_reads = self._count_reads()
443             # To pass this test, you are required to give up before reading all of the share
444             # data.  Actually, we could give up sooner than 39 reads, but currently our download
445             # code does 39 reads.  This test then serves as a "performance regression detector"
446             # -- if you change download code so that it takes *more* reads, then this test will
447             # fail.
448             self.failIf(after_download_reads-before_download_reads > 39, (after_download_reads, before_download_reads))
449         d.addCallback(_after_attempt)
450         return d
451
452     def test_check_without_verify(self):
453         """ Check says the file is healthy when none of the shares have been touched.  It says
454         that the file is unhealthy when all of them have been removed. It doesn't use any reads.
455         """
456         d = defer.succeed(self.filenode)
457         def _check1(filenode):
458             before_check_reads = self._count_reads()
459
460             d2 = filenode.check(Monitor(), verify=False)
461             def _after_check(checkresults):
462                 after_check_reads = self._count_reads()
463                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
464                 self.failUnless(checkresults.is_healthy())
465
466             d2.addCallback(_after_check)
467             return d2
468         d.addCallback(_check1)
469
470         d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
471         def _check2(ignored):
472             before_check_reads = self._count_reads()
473             d2 = self.filenode.check(Monitor(), verify=False)
474
475             def _after_check(checkresults):
476                 after_check_reads = self._count_reads()
477                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
478                 self.failIf(checkresults.is_healthy())
479
480             d2.addCallback(_after_check)
481             return d2
482         d.addCallback(_check2)
483
484         return d
485
486     def test_check_with_verify(self):
487         """ Check says the file is healthy when none of the shares have been touched.  It says
488         that the file is unhealthy if any field of any share has been corrupted.  It doesn't use
489         more than twice as many reads as it needs. """
490         LEEWAY = 7 # We'll allow you to pass this test even if you trigger seven times as many disk reads and blocks sends as would be optimal.
491         DELTA_READS = 10 * LEEWAY # N = 10
492         d = defer.succeed(self.filenode)
493         def _check_pristine(filenode):
494             before_check_reads = self._count_reads()
495
496             d2 = filenode.check(Monitor(), verify=True)
497             def _after_check(checkresults):
498                 after_check_reads = self._count_reads()
499                 self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads, DELTA_READS))
500                 self.failUnless(checkresults.is_healthy())
501
502             d2.addCallback(_after_check)
503             return d2
504         d.addCallback(_check_pristine)
505
506         d.addCallback(self.find_shares)
507         stash = [None]
508         def _stash_it(res):
509             stash[0] = res
510             return res
511         d.addCallback(_stash_it)
512
513         def _check_after_feckless_corruption(ignored, corruptor_func):
514             # Corruption which has no effect -- bits of the share file that are unused.
515             before_check_reads = self._count_reads()
516             d2 = self.filenode.check(Monitor(), verify=True)
517
518             def _after_check(checkresults):
519                 after_check_reads = self._count_reads()
520                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
521                 self.failUnless(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
522                 data = checkresults.get_data()
523                 self.failUnless(data['count-shares-good'] == 10, data)
524                 self.failUnless(len(data['sharemap']) == 10, data)
525                 self.failUnless(data['count-shares-needed'] == 3, data)
526                 self.failUnless(data['count-shares-expected'] == 10, data)
527                 self.failUnless(data['count-good-share-hosts'] == 5, data)
528                 self.failUnless(len(data['servers-responding']) == 5, data)
529                 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
530
531             d2.addCallback(_after_check)
532             return d2
533
534         def _put_it_all_back(ignored):
535             self.replace_shares(stash[0], storage_index=self.uri.storage_index)
536             return ignored
537
538         for corruptor_func in (
539             _corrupt_size_of_file_data,
540             _corrupt_size_of_sharedata,
541             _corrupt_segment_size,
542             ):
543             d.addCallback(self._corrupt_a_random_share, corruptor_func)
544             d.addCallback(_check_after_feckless_corruption, corruptor_func=corruptor_func)
545             d.addCallback(_put_it_all_back)
546
547         def _check_after_server_visible_corruption(ignored, corruptor_func):
548             # Corruption which is detected by the server means that the server will send you
549             # back a Failure in response to get_bucket instead of giving you the share data.
550             before_check_reads = self._count_reads()
551             d2 = self.filenode.check(Monitor(), verify=True)
552
553             def _after_check(checkresults):
554                 after_check_reads = self._count_reads()
555                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
556                 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
557                 data = checkresults.get_data()
558                 # The server might fail to serve up its other share as well as the corrupted
559                 # one, so count-shares-good could be 8 or 9.
560                 self.failUnless(data['count-shares-good'] in (8, 9), data)
561                 self.failUnless(len(data['sharemap']) in (8, 9,), data)
562                 self.failUnless(data['count-shares-needed'] == 3, data)
563                 self.failUnless(data['count-shares-expected'] == 10, data)
564                 # The server may have served up the non-corrupted share, or it may not have, so
565                 # the checker could have detected either 4 or 5 good servers.
566                 self.failUnless(data['count-good-share-hosts'] in (4, 5), data)
567                 self.failUnless(len(data['servers-responding']) in (4, 5), data)
568                 # If the server served up the other share, then the checker should consider it good, else it should 
569                 # not.
570                 self.failUnless((data['count-shares-good'] == 9) == (data['count-good-share-hosts'] == 5), data)
571                 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
572
573             d2.addCallback(_after_check)
574             return d2
575
576         for corruptor_func in (
577             _corrupt_file_version_number,
578             ):
579             d.addCallback(self._corrupt_a_random_share, corruptor_func)
580             d.addCallback(_check_after_server_visible_corruption, corruptor_func=corruptor_func)
581             d.addCallback(_put_it_all_back)
582
583         def _check_after_share_incompatibility(ignored, corruptor_func):
584             # Corruption which means the share is indistinguishable from a share of an
585             # incompatible version.
586             before_check_reads = self._count_reads()
587             d2 = self.filenode.check(Monitor(), verify=True)
588
589             def _after_check(checkresults):
590                 after_check_reads = self._count_reads()
591                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
592                 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
593                 data = checkresults.get_data()
594                 self.failUnless(data['count-shares-good'] == 9, data)
595                 self.failUnless(len(data['sharemap']) == 9, data)
596                 self.failUnless(data['count-shares-needed'] == 3, data)
597                 self.failUnless(data['count-shares-expected'] == 10, data)
598                 self.failUnless(data['count-good-share-hosts'] == 5, data)
599                 self.failUnless(len(data['servers-responding']) == 5, data)
600                 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
601                 self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
602                 self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
603                 self.failUnless(len(data['list-incompatible-shares']) == 1, data)
604
605             d2.addCallback(_after_check)
606             return d2
607
608         for corruptor_func in (
609             _corrupt_sharedata_version_number,
610             ):
611             d.addCallback(self._corrupt_a_random_share, corruptor_func)
612             d.addCallback(_check_after_share_incompatibility, corruptor_func=corruptor_func)
613             d.addCallback(_put_it_all_back)
614
615         def _check_after_server_invisible_corruption(ignored, corruptor_func):
616             # Corruption which is not detected by the server means that the server will send you
617             # back the share data, but you will detect that it is wrong.
618             before_check_reads = self._count_reads()
619             d2 = self.filenode.check(Monitor(), verify=True)
620
621             def _after_check(checkresults):
622                 after_check_reads = self._count_reads()
623                 # print "delta was ", after_check_reads - before_check_reads
624                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
625                 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
626                 data = checkresults.get_data()
627                 self.failUnless(data['count-shares-good'] == 9, data)
628                 self.failUnless(data['count-shares-needed'] == 3, data)
629                 self.failUnless(data['count-shares-expected'] == 10, data)
630                 self.failUnless(data['count-good-share-hosts'] == 5, data)
631                 self.failUnless(data['count-corrupt-shares'] == 1, (data, corruptor_func))
632                 self.failUnless(len(data['list-corrupt-shares']) == 1, data)
633                 self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
634                 self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
635                 self.failUnless(len(data['list-incompatible-shares']) == 0, data)
636                 self.failUnless(len(data['servers-responding']) == 5, data)
637                 self.failUnless(len(data['sharemap']) == 9, data)
638
639             d2.addCallback(_after_check)
640             return d2
641
642         for corruptor_func in (
643             _corrupt_sharedata_version_number_to_known_version,
644             _corrupt_offset_of_sharedata,
645             _corrupt_offset_of_ciphertext_hash_tree,
646             _corrupt_offset_of_block_hashes,
647             _corrupt_offset_of_share_hashes,
648             _corrupt_offset_of_uri_extension,
649             _corrupt_share_data,
650             _corrupt_crypttext_hash_tree,
651             _corrupt_block_hashes,
652             _corrupt_share_hashes,
653             _corrupt_length_of_uri_extension,
654             _corrupt_uri_extension,
655             ):
656             d.addCallback(self._corrupt_a_random_share, corruptor_func)
657             d.addCallback(_check_after_server_invisible_corruption, corruptor_func=corruptor_func)
658             d.addCallback(_put_it_all_back)
659         return d
660     test_check_with_verify.todo = "We haven't implemented a verifier this thorough yet."
661
662     def test_repair(self):
663         """ Repair replaces a share that got deleted. """
664         # N == 10.  7 is the "efficiency leeway" -- we'll allow you to pass this test even if
665         # you trigger seven times as many disk reads and blocks sends as would be optimal.
666         DELTA_READS = 10 * 7
667         # We'll allow you to pass this test only if you repair the missing share using only a
668         # single allocate.
669         DELTA_ALLOCATES = 1
670
671         d = defer.succeed(self.filenode)
672         d.addCallback(self._delete_a_share, sharenum=2)
673
674         def _repair_from_deletion_of_1(filenode):
675             before_repair_reads = self._count_reads()
676             before_repair_allocates = self._count_allocates()
677
678             d2 = filenode.check_and_repair(Monitor(), verify=False)
679             def _after_repair(checkandrepairresults):
680                 prerepairres = checkandrepairresults.get_pre_repair_results()
681                 postrepairres = checkandrepairresults.get_post_repair_results()
682                 after_repair_reads = self._count_reads()
683                 after_repair_allocates = self._count_allocates()
684
685                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
686                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
687                 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
688                 self.failIf(prerepairres.is_healthy())
689                 self.failUnless(postrepairres.is_healthy())
690
691                 # Now we inspect the filesystem to make sure that it has 10 shares.
692                 shares = self.find_shares()
693                 self.failIf(len(shares) < 10)
694
695                 # Now we delete seven of the other shares, then try to download the file and
696                 # assert that it succeeds at downloading and has the right contents.  This can't
697                 # work unless it has already repaired the previously-deleted share #2.
698                 for sharenum in range(3, 10):
699                     self._delete_a_share(sharenum=sharenum)
700
701                 return self._download_and_check_plaintext()
702
703             d2.addCallback(_after_repair)
704             return d2
705         d.addCallback(_repair_from_deletion_of_1)
706
707         # Now we repair again to get all of those 7 back...
708         def _repair_from_deletion_of_7(filenode):
709             before_repair_reads = self._count_reads()
710             before_repair_allocates = self._count_allocates()
711
712             d2 = filenode.check_and_repair(Monitor(), verify=False)
713             def _after_repair(checkandrepairresults):
714                 prerepairres = checkandrepairresults.get_pre_repair_results()
715                 postrepairres = checkandrepairresults.get_post_repair_results()
716                 after_repair_reads = self._count_reads()
717                 after_repair_allocates = self._count_allocates()
718
719                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
720                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
721                 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_ALLOCATES*7))
722                 self.failIf(prerepairres.is_healthy())
723                 self.failUnless(postrepairres.is_healthy())
724
725                 # Now we inspect the filesystem to make sure that it has 10 shares.
726                 shares = self.find_shares()
727                 self.failIf(len(shares) < 10)
728
729                 return self._download_and_check_plaintext()
730
731             d2.addCallback(_after_repair)
732             return d2
733         d.addCallback(_repair_from_deletion_of_7)
734
735         def _repair_from_corruption(filenode):
736             before_repair_reads = self._count_reads()
737             before_repair_allocates = self._count_allocates()
738
739             d2 = filenode.check_and_repair(Monitor(), verify=False)
740             def _after_repair(checkandrepairresults):
741                 prerepairres = checkandrepairresults.get_pre_repair_results()
742                 postrepairres = checkandrepairresults.get_post_repair_results()
743                 after_repair_reads = self._count_reads()
744                 after_repair_allocates = self._count_allocates()
745
746                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
747                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
748                 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
749                 self.failIf(prerepairres.is_healthy())
750                 self.failUnless(postrepairres.is_healthy())
751
752                 return self._download_and_check_plaintext()
753
754             d2.addCallback(_after_repair)
755             return d2
756
757         for corruptor_func in (
758             _corrupt_file_version_number,
759             _corrupt_sharedata_version_number,
760             _corrupt_sharedata_version_number_to_known_version,
761             _corrupt_offset_of_sharedata,
762             _corrupt_offset_of_ciphertext_hash_tree,
763             _corrupt_offset_of_block_hashes,
764             _corrupt_offset_of_share_hashes,
765             _corrupt_offset_of_uri_extension,
766             _corrupt_share_data,
767             _corrupt_crypttext_hash_tree,
768             _corrupt_block_hashes,
769             _corrupt_share_hashes,
770             _corrupt_length_of_uri_extension,
771             _corrupt_uri_extension,
772             ):
773             # Now we corrupt a share...
774             d.addCallback(self._corrupt_a_random_share, corruptor_func)
775             # And repair...
776             d.addCallback(_repair_from_corruption)
777
778         return d
779     test_repair.todo = "We haven't implemented a repairer yet."
780
781
782 # XXX extend these tests to show that the checker detects which specific share on which specific server is broken -- this is necessary so that the checker results can be passed to the repairer and the repairer can go ahead and upload fixes without first doing what is effectively a check (/verify) run
783
784 # XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example
785
786 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit