]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_immutable.py
immutable: tests: verifier doesn't always catch corrupted share hashes
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_immutable.py
1 from allmydata.test.common import SystemTestMixin, ShareManglingMixin
2 from allmydata.monitor import Monitor
3 from allmydata.interfaces import IURI, NotEnoughSharesError
4 from allmydata.immutable import upload
5 from allmydata.util import log
6 from twisted.internet import defer
7 from twisted.trial import unittest
8 import random, struct
9 import common_util as testutil
10
11 TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
12
13 def corrupt_field(data, offset, size, debug=False):
14     if random.random() < 0.5:
15         newdata = testutil.flip_one_bit(data, offset, size)
16         if debug:
17             log.msg("testing: corrupting offset %d, size %d flipping one bit orig: %r, newdata: %r" % (offset, size, data[offset:offset+size], newdata[offset:offset+size]))
18         return newdata
19     else:
20         newval = testutil.insecurerandstr(size)
21         if debug:
22             log.msg("testing: corrupting offset %d, size %d randomizing field, orig: %r, newval: %r" % (offset, size, data[offset:offset+size], newval))
23         return data[:offset]+newval+data[offset+size:]
24
25 def _corrupt_nothing(data):
26     """ Leave the data pristine. """
27     return data
28
29 def _corrupt_file_version_number(data):
30     """ Scramble the file data -- the share file version number have one bit flipped or else
31     will be changed to a random value."""
32     return corrupt_field(data, 0x00, 4)
33
34 def _corrupt_size_of_file_data(data):
35     """ Scramble the file data -- the field showing the size of the share data within the file
36     will be set to one smaller. """
37     return corrupt_field(data, 0x04, 4)
38
39 def _corrupt_sharedata_version_number(data):
40     """ Scramble the file data -- the share data version number will have one bit flipped or
41     else will be changed to a random value, but not 1 or 2."""
42     return corrupt_field(data, 0x0c, 4)
43     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
44     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
45     newsharevernum = sharevernum
46     while newsharevernum in (1, 2):
47         newsharevernum = random.randrange(0, 2**32)
48     newsharevernumbytes = struct.pack(">l", newsharevernum)
49     return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
50
51 def _corrupt_sharedata_version_number_to_plausible_version(data):
52     """ Scramble the file data -- the share data version number will
53     be changed to 2 if it is 1 or else to 1 if it is 2."""
54     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
55     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
56     if sharevernum == 1:
57         newsharevernum = 2
58     else:
59         newsharevernum = 1
60     newsharevernumbytes = struct.pack(">l", newsharevernum)
61     return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
62
63 def _corrupt_segment_size(data):
64     """ Scramble the file data -- the field showing the size of the segment will have one
65     bit flipped or else be changed to a random value. """
66     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
67     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
68     if sharevernum == 1:
69         return corrupt_field(data, 0x0c+0x04, 4, debug=False)
70     else:
71         return corrupt_field(data, 0x0c+0x04, 8, debug=False)
72
73 def _corrupt_size_of_sharedata(data):
74     """ Scramble the file data -- the field showing the size of the data within the share
75     data will have one bit flipped or else will be changed to a random value. """
76     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
77     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
78     if sharevernum == 1:
79         return corrupt_field(data, 0x0c+0x08, 4)
80     else:
81         return corrupt_field(data, 0x0c+0x0c, 8)
82
83 def _corrupt_offset_of_sharedata(data):
84     """ Scramble the file data -- the field showing the offset of the data within the share
85     data will have one bit flipped or else be changed to a random value. """
86     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
87     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
88     if sharevernum == 1:
89         return corrupt_field(data, 0x0c+0x0c, 4)
90     else:
91         return corrupt_field(data, 0x0c+0x14, 8)
92
93 def _corrupt_offset_of_ciphertext_hash_tree(data):
94     """ Scramble the file data -- the field showing the offset of the ciphertext hash tree
95     within the share data will have one bit flipped or else be changed to a random value.
96     """
97     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
98     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
99     if sharevernum == 1:
100         return corrupt_field(data, 0x0c+0x14, 4, debug=False)
101     else:
102         return corrupt_field(data, 0x0c+0x24, 8, debug=False)
103
104 def _corrupt_offset_of_block_hashes(data):
105     """ Scramble the file data -- the field showing the offset of the block hash tree within
106     the share data will have one bit flipped or else will be changed to a random value. """
107     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
108     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
109     if sharevernum == 1:
110         return corrupt_field(data, 0x0c+0x18, 4)
111     else:
112         return corrupt_field(data, 0x0c+0x2c, 8)
113
114 def _corrupt_offset_of_share_hashes(data):
115     """ Scramble the file data -- the field showing the offset of the share hash tree within
116     the share data will have one bit flipped or else will be changed to a random value. """
117     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
118     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
119     if sharevernum == 1:
120         return corrupt_field(data, 0x0c+0x1c, 4)
121     else:
122         return corrupt_field(data, 0x0c+0x34, 8)
123
124 def _corrupt_offset_of_uri_extension(data):
125     """ Scramble the file data -- the field showing the offset of the uri extension will
126     have one bit flipped or else will be changed to a random value. """
127     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
128     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
129     if sharevernum == 1:
130         return corrupt_field(data, 0x0c+0x20, 4)
131     else:
132         return corrupt_field(data, 0x0c+0x3c, 8)
133
134 def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False):
135     """ Scramble the file data -- the field showing the offset of the uri extension will be set
136     to the size of the file minus 3.  This means when the client tries to read the length field
137     from that location it will get a short read -- the result string will be only 3 bytes long,
138     not the 4 or 8 bytes necessary to do a successful struct.unpack."""
139     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
140     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
141     # The "-0x0c" in here is to skip the server-side header in the share file, which the client doesn't see when seeking and reading.
142     if sharevernum == 1:
143         if debug:
144             log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x2c, 4, struct.unpack(">L", data[0x2c:0x2c+4])[0], len(data)-0x0c-3, len(data)))
145         return data[:0x2c] + struct.pack(">L", len(data)-0x0c-3) + data[0x2c+4:]
146     else:
147         if debug:
148             log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x48, 8, struct.unpack(">Q", data[0x48:0x48+8])[0], len(data)-0x0c-3, len(data)))
149         return data[:0x48] + struct.pack(">Q", len(data)-0x0c-3) + data[0x48+8:]
150
151 def _corrupt_share_data(data):
152     """ Scramble the file data -- the field containing the share data itself will have one
153     bit flipped or else will be changed to a random value. """
154     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
155     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
156     if sharevernum == 1:
157         sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
158
159         return corrupt_field(data, 0x0c+0x24, sharedatasize)
160     else:
161         sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
162
163         return corrupt_field(data, 0x0c+0x44, sharedatasize)
164
165 def _corrupt_crypttext_hash_tree(data):
166     """ Scramble the file data -- the field containing the crypttext hash tree will have one
167     bit flipped or else will be changed to a random value.
168     """
169     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
170     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
171     if sharevernum == 1:
172         crypttexthashtreeoffset = struct.unpack(">L", data[0x0c+0x14:0x0c+0x14+4])[0]
173         blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
174     else:
175         crypttexthashtreeoffset = struct.unpack(">Q", data[0x0c+0x24:0x0c+0x24+8])[0]
176         blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
177
178     return corrupt_field(data, crypttexthashtreeoffset, blockhashesoffset-crypttexthashtreeoffset)
179
180 def _corrupt_block_hashes(data):
181     """ Scramble the file data -- the field containing the block hash tree will have one bit
182     flipped or else will be changed to a random value.
183     """
184     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
185     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
186     if sharevernum == 1:
187         blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
188         sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
189     else:
190         blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
191         sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
192
193     return corrupt_field(data, blockhashesoffset, sharehashesoffset-blockhashesoffset)
194
195 def _corrupt_share_hashes(data):
196     """ Scramble the file data -- the field containing the share hash chain will have one
197     bit flipped or else will be changed to a random value.
198     """
199     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
200     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
201     if sharevernum == 1:
202         sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
203         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
204     else:
205         sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
206         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
207
208     return corrupt_field(data, sharehashesoffset, uriextoffset-sharehashesoffset)
209
210 def _corrupt_length_of_uri_extension(data):
211     """ Scramble the file data -- the field showing the length of the uri extension will
212     have one bit flipped or else will be changed to a random value. """
213     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
214     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
215     if sharevernum == 1:
216         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
217         return corrupt_field(data, uriextoffset, 4)
218     else:
219         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
220         return corrupt_field(data, uriextoffset, 8)
221
222 def _corrupt_uri_extension(data):
223     """ Scramble the file data -- the field containing the uri extension will have one bit
224     flipped or else will be changed to a random value. """
225     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
226     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
227     if sharevernum == 1:
228         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
229         uriextlen = struct.unpack(">L", data[0x0c+uriextoffset:0x0c+uriextoffset+4])[0]
230     else:
231         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
232         uriextlen = struct.unpack(">Q", data[0x0c+uriextoffset:0x0c+uriextoffset+8])[0]
233
234     return corrupt_field(data, uriextoffset, uriextlen)
235
236 class Test(ShareManglingMixin, unittest.TestCase):
237     def setUp(self):
238         # Set self.basedir to a temp dir which has the name of the current test method in its
239         # name.
240         self.basedir = self.mktemp()
241
242         d = defer.maybeDeferred(SystemTestMixin.setUp, self)
243         d.addCallback(lambda x: self.set_up_nodes())
244
245         def _upload_a_file(ignored):
246             d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
247             def _after_upload(u):
248                 self.uri = IURI(u.uri)
249                 return self.clients[0].create_node_from_uri(self.uri)
250             d2.addCallback(_after_upload)
251             return d2
252         d.addCallback(_upload_a_file)
253
254         def _stash_it(filenode):
255             self.filenode = filenode
256         d.addCallback(_stash_it)
257         return d
258
259     def _download_and_check_plaintext(self, unused=None):
260         self.downloader = self.clients[1].getServiceNamed("downloader")
261         d = self.downloader.download_to_data(self.uri)
262
263         def _after_download(result):
264             self.failUnlessEqual(result, TEST_DATA)
265         d.addCallback(_after_download)
266         return d
267
268     def _delete_a_share(self, unused=None, sharenum=None):
269         """ Delete one share. """
270
271         shares = self.find_shares()
272         ks = shares.keys()
273         if sharenum is not None:
274             k = [ key for key in shares.keys() if key[1] == sharenum ][0]
275         else:
276             k = random.choice(ks)
277         del shares[k]
278         self.replace_shares(shares, storage_index=self.uri.storage_index)
279
280         return unused
281
282     def test_test_code(self):
283         # The following process of stashing the shares, running
284         # replace_shares, and asserting that the new set of shares equals the
285         # old is more to test this test code than to test the Tahoe code...
286         d = defer.succeed(None)
287         d.addCallback(self.find_shares)
288         stash = [None]
289         def _stash_it(res):
290             stash[0] = res
291             return res
292         d.addCallback(_stash_it)
293         d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
294
295         def _compare(res):
296             oldshares = stash[0]
297             self.failUnless(isinstance(oldshares, dict), oldshares)
298             self.failUnlessEqual(oldshares, res)
299
300         d.addCallback(self.find_shares)
301         d.addCallback(_compare)
302
303         d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
304         d.addCallback(self.find_shares)
305         d.addCallback(lambda x: self.failUnlessEqual(x, {}))
306
307         # The following process of deleting 8 of the shares and asserting that you can't
308         # download it is more to test this test code than to test the Tahoe code...
309         def _then_delete_8(unused=None):
310             self.replace_shares(stash[0], storage_index=self.uri.storage_index)
311             for i in range(8):
312                 self._delete_a_share()
313         d.addCallback(_then_delete_8)
314
315         def _then_download(unused=None):
316             self.downloader = self.clients[1].getServiceNamed("downloader")
317             d = self.downloader.download_to_data(self.uri)
318
319             def _after_download_callb(result):
320                 self.fail() # should have gotten an errback instead
321                 return result
322             def _after_download_errb(failure):
323                 failure.trap(NotEnoughSharesError)
324                 return None # success!
325             d.addCallbacks(_after_download_callb, _after_download_errb)
326         d.addCallback(_then_download)
327
328         # The following process of leaving 8 of the shares deleted and asserting that you can't
329         # repair it is more to test this test code than to test the Tahoe code...
330         #TODO def _then_repair(unused=None):
331         #TODO     d2 = self.filenode.check_and_repair(Monitor(), verify=False)
332         #TODO     def _after_repair(checkandrepairresults):
333         #TODO         prerepairres = checkandrepairresults.get_pre_repair_results()
334         #TODO         postrepairres = checkandrepairresults.get_post_repair_results()
335         #TODO         self.failIf(prerepairres.is_healthy())
336         #TODO         self.failIf(postrepairres.is_healthy())
337         #TODO     d2.addCallback(_after_repair)
338         #TODO     return d2
339         #TODO d.addCallback(_then_repair)
340         return d
341
342     def _count_reads(self):
343         sum_of_read_counts = 0
344         for client in self.clients:
345             counters = client.stats_provider.get_stats()['counters']
346             sum_of_read_counts += counters.get('storage_server.read', 0)
347         return sum_of_read_counts
348
349     def _count_allocates(self):
350         sum_of_allocate_counts = 0
351         for client in self.clients:
352             counters = client.stats_provider.get_stats()['counters']
353             sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
354         return sum_of_allocate_counts
355
356     def _corrupt_a_share(self, unused, corruptor_func, sharenum):
357         shares = self.find_shares()
358         ks = [ key for key in shares.keys() if key[1] == sharenum ]
359         assert ks, (shares.keys(), sharenum)
360         k = ks[0]
361         shares[k] = corruptor_func(shares[k])
362         self.replace_shares(shares, storage_index=self.uri.storage_index)
363         return corruptor_func
364
365     def _corrupt_all_shares(self, unused, corruptor_func):
366         """ All shares on disk will be corrupted by corruptor_func. """
367         shares = self.find_shares()
368         for k in shares.keys():
369             self._corrupt_a_share(unused, corruptor_func, k[1])
370         return corruptor_func
371
372     def _corrupt_a_random_share(self, unused, corruptor_func):
373         """ Exactly one share on disk will be corrupted by corruptor_func. """
374         shares = self.find_shares()
375         ks = shares.keys()
376         k = random.choice(ks)
377         self._corrupt_a_share(unused, corruptor_func, k[1])
378         return corruptor_func
379
380     def test_download(self):
381         """ Basic download.  (This functionality is more or less already tested by test code in
382         other modules, but this module is also going to test some more specific things about
383         immutable download.)
384         """
385         d = defer.succeed(None)
386         before_download_reads = self._count_reads()
387         def _after_download(unused=None):
388             after_download_reads = self._count_reads()
389             # To pass this test, you have to download the file using only 10 reads total: 3 to
390             # get the headers from each share, 3 to get the share hash trees and uebs from each
391             # share, 1 to get the crypttext hashes, and 3 to get the block data from each share.
392             self.failIf(after_download_reads-before_download_reads > 12, (after_download_reads, before_download_reads))
393         d.addCallback(self._download_and_check_plaintext)
394         d.addCallback(_after_download)
395         return d
396
397     def test_download_from_only_3_remaining_shares(self):
398         """ Test download after 7 random shares (of the 10) have been removed. """
399         d = defer.succeed(None)
400         def _then_delete_7(unused=None):
401             for i in range(7):
402                 self._delete_a_share()
403         before_download_reads = self._count_reads()
404         d.addCallback(_then_delete_7)
405         def _after_download(unused=None):
406             after_download_reads = self._count_reads()
407             # To pass this test, you have to download the file using only 10 reads to get the
408             # UEB (in parallel from all shares), plus one read for each of the 3 shares.
409             self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
410         d.addCallback(self._download_and_check_plaintext)
411         d.addCallback(_after_download)
412         return d
413
414     def test_download_abort_if_too_many_missing_shares(self):
415         """ Test that download gives up quickly when it realizes there aren't enough shares out
416         there."""
417         d = defer.succeed(None)
418         def _then_delete_8(unused=None):
419             for i in range(8):
420                 self._delete_a_share()
421         d.addCallback(_then_delete_8)
422
423         before_download_reads = self._count_reads()
424         def _attempt_to_download(unused=None):
425             downloader = self.clients[1].getServiceNamed("downloader")
426             d = downloader.download_to_data(self.uri)
427
428             def _callb(res):
429                 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
430             def _errb(f):
431                 self.failUnless(f.check(NotEnoughSharesError))
432             d.addCallbacks(_callb, _errb)
433             return d
434
435         d.addCallback(_attempt_to_download)
436
437         def _after_attempt(unused=None):
438             after_download_reads = self._count_reads()
439             # To pass this test, you are required to give up before actually trying to read any
440             # share data.
441             self.failIf(after_download_reads-before_download_reads > 0, (after_download_reads, before_download_reads))
442         d.addCallback(_after_attempt)
443         return d
444
445     def test_download_abort_if_too_many_corrupted_shares(self):
446         """ Test that download gives up quickly when it realizes there aren't enough uncorrupted
447         shares out there. It should be able to tell because the corruption occurs in the
448         sharedata version number, which it checks first."""
449         d = defer.succeed(None)
450         def _then_corrupt_8(unused=None):
451             shnums = range(10)
452             random.shuffle(shnums)
453             for shnum in shnums[:8]:
454                 self._corrupt_a_share(None, _corrupt_sharedata_version_number, shnum)
455         d.addCallback(_then_corrupt_8)
456
457         before_download_reads = self._count_reads()
458         def _attempt_to_download(unused=None):
459             downloader = self.clients[1].getServiceNamed("downloader")
460             d = downloader.download_to_data(self.uri)
461
462             def _callb(res):
463                 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
464             def _errb(f):
465                 self.failUnless(f.check(NotEnoughSharesError))
466             d.addCallbacks(_callb, _errb)
467             return d
468
469         d.addCallback(_attempt_to_download)
470
471         def _after_attempt(unused=None):
472             after_download_reads = self._count_reads()
473             # To pass this test, you are required to give up before reading all of the share
474             # data.  Actually, we could give up sooner than 45 reads, but currently our download
475             # code does 45 reads.  This test then serves as a "performance regression detector"
476             # -- if you change download code so that it takes *more* reads, then this test will
477             # fail.
478             self.failIf(after_download_reads-before_download_reads > 45, (after_download_reads, before_download_reads))
479         d.addCallback(_after_attempt)
480         return d
481
482     def test_check_without_verify(self):
483         """ Check says the file is healthy when none of the shares have been touched.  It says
484         that the file is unhealthy when all of them have been removed. It doesn't use any reads.
485         """
486         d = defer.succeed(self.filenode)
487         def _check1(filenode):
488             before_check_reads = self._count_reads()
489
490             d2 = filenode.check(Monitor(), verify=False)
491             def _after_check(checkresults):
492                 after_check_reads = self._count_reads()
493                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
494                 self.failUnless(checkresults.is_healthy())
495
496             d2.addCallback(_after_check)
497             return d2
498         d.addCallback(_check1)
499
500         d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
501         def _check2(ignored):
502             before_check_reads = self._count_reads()
503             d2 = self.filenode.check(Monitor(), verify=False)
504
505             def _after_check(checkresults):
506                 after_check_reads = self._count_reads()
507                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
508                 self.failIf(checkresults.is_healthy())
509
510             d2.addCallback(_after_check)
511             return d2
512         d.addCallback(_check2)
513
514         return d
515
516     def _help_test_verify(self, corruptor_funcs, judgement_func):
517         LEEWAY = 7 # We'll allow you to pass this test even if you trigger seven times as many disk reads and blocks sends as would be optimal.
518         DELTA_READS = 10 * LEEWAY # N = 10
519         d = defer.succeed(None)
520
521         d.addCallback(self.find_shares)
522         stash = [None]
523         def _stash_it(res):
524             stash[0] = res
525             return res
526         d.addCallback(_stash_it)
527         def _put_it_all_back(ignored):
528             self.replace_shares(stash[0], storage_index=self.uri.storage_index)
529             return ignored
530
531         def _verify_after_corruption(corruptor_func):
532             before_check_reads = self._count_reads()
533             d2 = self.filenode.check(Monitor(), verify=True)
534             def _after_check(checkresults):
535                 after_check_reads = self._count_reads()
536                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
537                 try:
538                     return judgement_func(checkresults)
539                 except Exception, le:
540                     le.args = tuple(le.args + ("corruptor_func: " + corruptor_func.__name__,))
541                     raise
542
543             d2.addCallback(_after_check)
544             return d2
545
546         for corruptor_func in corruptor_funcs:
547             d.addCallback(self._corrupt_a_random_share, corruptor_func)
548             d.addCallback(_verify_after_corruption)
549             d.addCallback(_put_it_all_back)
550
551         return d
552
553     def test_verify_no_problem(self):
554         """ Verify says the file is healthy when none of the shares have been touched in a way
555         that matters. It doesn't use more than seven times as many reads as it needs."""
556         def judge(checkresults):
557             self.failUnless(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
558             data = checkresults.get_data()
559             self.failUnless(data['count-shares-good'] == 10, data)
560             self.failUnless(len(data['sharemap']) == 10, data)
561             self.failUnless(data['count-shares-needed'] == 3, data)
562             self.failUnless(data['count-shares-expected'] == 10, data)
563             self.failUnless(data['count-good-share-hosts'] == 5, data)
564             self.failUnless(len(data['servers-responding']) == 5, data)
565             self.failUnless(len(data['list-corrupt-shares']) == 0, data)
566         return self._help_test_verify([
567             _corrupt_nothing,
568             _corrupt_size_of_file_data,
569             _corrupt_size_of_sharedata,
570             _corrupt_segment_size, ], judge)
571
572     def test_verify_server_visible_corruption(self):
573         """ Corruption which is detected by the server means that the server will send you back
574         a Failure in response to get_bucket instead of giving you the share data.  Test that
575         verifier handles these answers correctly. It doesn't use more than seven times as many
576         reads as it needs."""
577         def judge(checkresults):
578             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
579             data = checkresults.get_data()
580             # The server might fail to serve up its other share as well as the corrupted
581             # one, so count-shares-good could be 8 or 9.
582             self.failUnless(data['count-shares-good'] in (8, 9), data)
583             self.failUnless(len(data['sharemap']) in (8, 9,), data)
584             self.failUnless(data['count-shares-needed'] == 3, data)
585             self.failUnless(data['count-shares-expected'] == 10, data)
586             # The server may have served up the non-corrupted share, or it may not have, so
587             # the checker could have detected either 4 or 5 good servers.
588             self.failUnless(data['count-good-share-hosts'] in (4, 5), data)
589             self.failUnless(len(data['servers-responding']) in (4, 5), data)
590             # If the server served up the other share, then the checker should consider it good, else it should
591             # not.
592             self.failUnless((data['count-shares-good'] == 9) == (data['count-good-share-hosts'] == 5), data)
593             self.failUnless(len(data['list-corrupt-shares']) == 0, data)
594         return self._help_test_verify([
595             _corrupt_file_version_number,
596             ], judge)
597
598     def test_verify_share_incompatibility(self):
599         def judge(checkresults):
600             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
601             data = checkresults.get_data()
602             self.failUnless(data['count-shares-good'] == 9, data)
603             self.failUnless(len(data['sharemap']) == 9, data)
604             self.failUnless(data['count-shares-needed'] == 3, data)
605             self.failUnless(data['count-shares-expected'] == 10, data)
606             self.failUnless(data['count-good-share-hosts'] == 5, data)
607             self.failUnless(len(data['servers-responding']) == 5, data)
608             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
609             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
610             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
611             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
612         return self._help_test_verify([
613             _corrupt_sharedata_version_number,
614             ], judge)
615
616     def test_verify_server_invisible_corruption(self):
617         def judge(checkresults):
618             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
619             data = checkresults.get_data()
620             self.failUnless(data['count-shares-good'] == 9, data)
621             self.failUnless(data['count-shares-needed'] == 3, data)
622             self.failUnless(data['count-shares-expected'] == 10, data)
623             self.failUnless(data['count-good-share-hosts'] == 5, data)
624             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
625             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
626             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
627             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
628             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
629             self.failUnless(len(data['servers-responding']) == 5, data)
630             self.failUnless(len(data['sharemap']) == 9, data)
631         return self._help_test_verify([
632             _corrupt_offset_of_sharedata,
633             _corrupt_offset_of_uri_extension,
634             _corrupt_offset_of_uri_extension_to_force_short_read,
635             _corrupt_share_data,
636             _corrupt_length_of_uri_extension,
637             _corrupt_uri_extension,
638             ], judge)
639
640     def test_verify_server_invisible_corruption_offset_of_block_hashtree_TODO(self):
641         def judge(checkresults):
642             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
643             data = checkresults.get_data()
644             self.failUnless(data['count-shares-good'] == 9, data)
645             self.failUnless(data['count-shares-needed'] == 3, data)
646             self.failUnless(data['count-shares-expected'] == 10, data)
647             self.failUnless(data['count-good-share-hosts'] == 5, data)
648             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
649             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
650             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
651             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
652             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
653             self.failUnless(len(data['servers-responding']) == 5, data)
654             self.failUnless(len(data['sharemap']) == 9, data)
655         return self._help_test_verify([
656             _corrupt_offset_of_block_hashes,
657             ], judge)
658     test_verify_server_invisible_corruption_offset_of_block_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
659
660     def test_verify_server_invisible_corruption_sharedata_plausible_version(self):
661         def judge(checkresults):
662             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
663             data = checkresults.get_data()
664             self.failUnless(data['count-shares-good'] == 9, data)
665             self.failUnless(data['count-shares-needed'] == 3, data)
666             self.failUnless(data['count-shares-expected'] == 10, data)
667             self.failUnless(data['count-good-share-hosts'] == 5, data)
668             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
669             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
670             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
671             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
672             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
673             self.failUnless(len(data['servers-responding']) == 5, data)
674             self.failUnless(len(data['sharemap']) == 9, data)
675         return self._help_test_verify([
676             _corrupt_sharedata_version_number_to_plausible_version,
677             ], judge)
678
679     def test_verify_server_invisible_corruption_offset_of_share_hashtree_TODO(self):
680         def judge(checkresults):
681             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
682             data = checkresults.get_data()
683             self.failUnless(data['count-shares-good'] == 9, data)
684             self.failUnless(data['count-shares-needed'] == 3, data)
685             self.failUnless(data['count-shares-expected'] == 10, data)
686             self.failUnless(data['count-good-share-hosts'] == 5, data)
687             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
688             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
689             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
690             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
691             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
692             self.failUnless(len(data['servers-responding']) == 5, data)
693             self.failUnless(len(data['sharemap']) == 9, data)
694         return self._help_test_verify([
695             _corrupt_offset_of_share_hashes,
696             ], judge)
697     test_verify_server_invisible_corruption_offset_of_share_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
698
699     def test_verify_server_invisible_corruption_offset_of_ciphertext_hashtree_TODO(self):
700         def judge(checkresults):
701             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
702             data = checkresults.get_data()
703             self.failUnless(data['count-shares-good'] == 9, data)
704             self.failUnless(data['count-shares-needed'] == 3, data)
705             self.failUnless(data['count-shares-expected'] == 10, data)
706             self.failUnless(data['count-good-share-hosts'] == 5, data)
707             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
708             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
709             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
710             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
711             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
712             self.failUnless(len(data['servers-responding']) == 5, data)
713             self.failUnless(len(data['sharemap']) == 9, data)
714         return self._help_test_verify([
715             _corrupt_offset_of_ciphertext_hash_tree,
716             ], judge)
717     test_verify_server_invisible_corruption_offset_of_ciphertext_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
718
719     def test_verify_server_invisible_corruption_cryptext_hash_tree_TODO(self):
720         def judge(checkresults):
721             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
722             data = checkresults.get_data()
723             self.failUnless(data['count-shares-good'] == 9, data)
724             self.failUnless(data['count-shares-needed'] == 3, data)
725             self.failUnless(data['count-shares-expected'] == 10, data)
726             self.failUnless(data['count-good-share-hosts'] == 5, data)
727             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
728             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
729             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
730             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
731             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
732             self.failUnless(len(data['servers-responding']) == 5, data)
733             self.failUnless(len(data['sharemap']) == 9, data)
734         return self._help_test_verify([
735             _corrupt_crypttext_hash_tree,
736             ], judge)
737     test_verify_server_invisible_corruption_cryptext_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
738
739     def test_verify_server_invisible_corruption_block_hash_tree_TODO(self):
740         def judge(checkresults):
741             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
742             data = checkresults.get_data()
743             self.failUnless(data['count-shares-good'] == 9, data)
744             self.failUnless(data['count-shares-needed'] == 3, data)
745             self.failUnless(data['count-shares-expected'] == 10, data)
746             self.failUnless(data['count-good-share-hosts'] == 5, data)
747             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
748             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
749             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
750             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
751             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
752             self.failUnless(len(data['servers-responding']) == 5, data)
753             self.failUnless(len(data['sharemap']) == 9, data)
754         return self._help_test_verify([
755             _corrupt_block_hashes,
756             ], judge)
757     test_verify_server_invisible_corruption_block_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
758
759     def test_verify_server_invisible_corruption_share_hash_tree_TODO(self):
760         def judge(checkresults):
761             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
762             data = checkresults.get_data()
763             self.failUnless(data['count-shares-good'] == 9, data)
764             self.failUnless(data['count-shares-needed'] == 3, data)
765             self.failUnless(data['count-shares-expected'] == 10, data)
766             self.failUnless(data['count-good-share-hosts'] == 5, data)
767             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
768             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
769             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
770             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
771             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
772             self.failUnless(len(data['servers-responding']) == 5, data)
773             self.failUnless(len(data['sharemap']) == 9, data)
774         return self._help_test_verify([
775             _corrupt_share_hashes,
776             ], judge)
777     test_verify_server_invisible_corruption_share_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
778
779     def test_repair(self):
780         """ Repair replaces a share that got deleted. """
781         # N == 10.  7 is the "efficiency leeway" -- we'll allow you to pass this test even if
782         # you trigger seven times as many disk reads and blocks sends as would be optimal.
783         DELTA_READS = 10 * 7
784         # We'll allow you to pass this test only if you repair the missing share using only a
785         # single allocate.
786         DELTA_ALLOCATES = 1
787
788         d = defer.succeed(self.filenode)
789         d.addCallback(self._delete_a_share, sharenum=2)
790
791         def _repair_from_deletion_of_1(filenode):
792             before_repair_reads = self._count_reads()
793             before_repair_allocates = self._count_allocates()
794
795             d2 = filenode.check_and_repair(Monitor(), verify=False)
796             def _after_repair(checkandrepairresults):
797                 prerepairres = checkandrepairresults.get_pre_repair_results()
798                 postrepairres = checkandrepairresults.get_post_repair_results()
799                 after_repair_reads = self._count_reads()
800                 after_repair_allocates = self._count_allocates()
801
802                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
803                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
804                 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
805                 self.failIf(prerepairres.is_healthy())
806                 self.failUnless(postrepairres.is_healthy())
807
808                 # Now we inspect the filesystem to make sure that it has 10 shares.
809                 shares = self.find_shares()
810                 self.failIf(len(shares) < 10)
811
812                 # Now we delete seven of the other shares, then try to download the file and
813                 # assert that it succeeds at downloading and has the right contents.  This can't
814                 # work unless it has already repaired the previously-deleted share #2.
815                 for sharenum in range(3, 10):
816                     self._delete_a_share(sharenum=sharenum)
817
818                 return self._download_and_check_plaintext()
819
820             d2.addCallback(_after_repair)
821             return d2
822         d.addCallback(_repair_from_deletion_of_1)
823
824         # Now we repair again to get all of those 7 back...
825         def _repair_from_deletion_of_7(filenode):
826             before_repair_reads = self._count_reads()
827             before_repair_allocates = self._count_allocates()
828
829             d2 = filenode.check_and_repair(Monitor(), verify=False)
830             def _after_repair(checkandrepairresults):
831                 prerepairres = checkandrepairresults.get_pre_repair_results()
832                 postrepairres = checkandrepairresults.get_post_repair_results()
833                 after_repair_reads = self._count_reads()
834                 after_repair_allocates = self._count_allocates()
835
836                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
837                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
838                 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_ALLOCATES*7))
839                 self.failIf(prerepairres.is_healthy())
840                 self.failUnless(postrepairres.is_healthy())
841
842                 # Now we inspect the filesystem to make sure that it has 10 shares.
843                 shares = self.find_shares()
844                 self.failIf(len(shares) < 10)
845
846                 return self._download_and_check_plaintext()
847
848             d2.addCallback(_after_repair)
849             return d2
850         d.addCallback(_repair_from_deletion_of_7)
851
852         def _repair_from_corruption(filenode):
853             before_repair_reads = self._count_reads()
854             before_repair_allocates = self._count_allocates()
855
856             d2 = filenode.check_and_repair(Monitor(), verify=False)
857             def _after_repair(checkandrepairresults):
858                 prerepairres = checkandrepairresults.get_pre_repair_results()
859                 postrepairres = checkandrepairresults.get_post_repair_results()
860                 after_repair_reads = self._count_reads()
861                 after_repair_allocates = self._count_allocates()
862
863                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
864                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
865                 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
866                 self.failIf(prerepairres.is_healthy())
867                 self.failUnless(postrepairres.is_healthy())
868
869                 return self._download_and_check_plaintext()
870
871             d2.addCallback(_after_repair)
872             return d2
873
874         for corruptor_func in (
875             _corrupt_file_version_number,
876             _corrupt_sharedata_version_number,
877             _corrupt_sharedata_version_number_to_plausible_version,
878             _corrupt_offset_of_sharedata,
879             _corrupt_offset_of_ciphertext_hash_tree,
880             _corrupt_offset_of_block_hashes,
881             _corrupt_offset_of_share_hashes,
882             _corrupt_offset_of_uri_extension,
883             _corrupt_share_data,
884             _corrupt_crypttext_hash_tree,
885             _corrupt_block_hashes,
886             _corrupt_share_hashes,
887             _corrupt_length_of_uri_extension,
888             _corrupt_uri_extension,
889             ):
890             # Now we corrupt a share...
891             d.addCallback(self._corrupt_a_random_share, corruptor_func)
892             # And repair...
893             d.addCallback(_repair_from_corruption)
894
895         return d
896     test_repair.todo = "We haven't implemented a repairer yet."
897
898
899 # XXX extend these tests to show that the checker detects which specific share on which specific server is broken -- this is necessary so that the checker results can be passed to the repairer and the repairer can go ahead and upload fixes without first doing what is effectively a check (/verify) run
900
901 # XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example
902
903 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit