]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_immutable.py
immutable: fix test for truncated reads of URI extension block size
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_immutable.py
1
2 from allmydata.test.common import SystemTestMixin, ShareManglingMixin
3 from allmydata.monitor import Monitor
4 from allmydata.interfaces import IURI, NotEnoughSharesError
5 from allmydata.immutable import upload
6 from allmydata.util import log
7 from twisted.internet import defer
8 from twisted.trial import unittest
9 import random, struct
10 import common_util as testutil
11
12 TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
13
14 def corrupt_field(data, offset, size, debug=False):
15     if random.random() < 0.5:
16         newdata = testutil.flip_one_bit(data, offset, size)
17         if debug:
18             log.msg("testing: corrupting offset %d, size %d flipping one bit orig: %r, newdata: %r" % (offset, size, data[offset:offset+size], newdata[offset:offset+size]))
19         return newdata
20     else:
21         newval = testutil.insecurerandstr(size)
22         if debug:
23             log.msg("testing: corrupting offset %d, size %d randomizing field, orig: %r, newval: %r" % (offset, size, data[offset:offset+size], newval))
24         return data[:offset]+newval+data[offset+size:]
25
26 def _corrupt_file_version_number(data):
27     """ Scramble the file data -- the share file version number have one bit flipped or else
28     will be changed to a random value."""
29     return corrupt_field(data, 0x00, 4)
30
31 def _corrupt_size_of_file_data(data):
32     """ Scramble the file data -- the field showing the size of the share data within the file
33     will be set to one smaller. """
34     return corrupt_field(data, 0x04, 4)
35
36 def _corrupt_sharedata_version_number(data):
37     """ Scramble the file data -- the share data version number will have one bit flipped or
38     else will be changed to a random value, but not 1 or 2."""
39     return corrupt_field(data, 0x0c, 4)
40     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
41     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
42     newsharevernum = sharevernum
43     while newsharevernum in (1, 2):
44         newsharevernum = random.randrange(0, 2**32)
45     newsharevernumbytes = struct.pack(">l", newsharevernum)
46     return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
47
48 def _corrupt_sharedata_version_number_to_known_version(data):
49     """ Scramble the file data -- the share data version number will
50     be changed to 2 if it is 1 or else to 1 if it is 2."""
51     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
52     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
53     if sharevernum == 1:
54         newsharevernum = 2
55     else:
56         newsharevernum = 1
57     newsharevernumbytes = struct.pack(">l", newsharevernum)
58     return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
59
60 def _corrupt_segment_size(data):
61     """ Scramble the file data -- the field showing the size of the segment will have one
62     bit flipped or else be changed to a random value. """
63     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
64     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
65     if sharevernum == 1:
66         return corrupt_field(data, 0x0c+0x04, 4, debug=False)
67     else:
68         return corrupt_field(data, 0x0c+0x04, 8, debug=False)
69
70 def _corrupt_size_of_sharedata(data):
71     """ Scramble the file data -- the field showing the size of the data within the share
72     data will have one bit flipped or else will be changed to a random value. """
73     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
74     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
75     if sharevernum == 1:
76         return corrupt_field(data, 0x0c+0x08, 4)
77     else:
78         return corrupt_field(data, 0x0c+0x0c, 8)
79
80 def _corrupt_offset_of_sharedata(data):
81     """ Scramble the file data -- the field showing the offset of the data within the share
82     data will have one bit flipped or else be changed to a random value. """
83     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
84     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
85     if sharevernum == 1:
86         return corrupt_field(data, 0x0c+0x0c, 4)
87     else:
88         return corrupt_field(data, 0x0c+0x14, 8)
89
90 def _corrupt_offset_of_ciphertext_hash_tree(data):
91     """ Scramble the file data -- the field showing the offset of the ciphertext hash tree
92     within the share data will have one bit flipped or else be changed to a random value.
93     """
94     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
95     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
96     if sharevernum == 1:
97         return corrupt_field(data, 0x0c+0x14, 4, debug=False)
98     else:
99         return corrupt_field(data, 0x0c+0x24, 8, debug=False)
100
101 def _corrupt_offset_of_block_hashes(data):
102     """ Scramble the file data -- the field showing the offset of the block hash tree within
103     the share data will have one bit flipped or else will be changed to a random value. """
104     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
105     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
106     if sharevernum == 1:
107         return corrupt_field(data, 0x0c+0x18, 4)
108     else:
109         return corrupt_field(data, 0x0c+0x2c, 8)
110
111 def _corrupt_offset_of_share_hashes(data):
112     """ Scramble the file data -- the field showing the offset of the share hash tree within
113     the share data will have one bit flipped or else will be changed to a random value. """
114     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
115     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
116     if sharevernum == 1:
117         return corrupt_field(data, 0x0c+0x1c, 4)
118     else:
119         return corrupt_field(data, 0x0c+0x34, 8)
120
121 def _corrupt_offset_of_uri_extension(data):
122     """ Scramble the file data -- the field showing the offset of the uri extension will
123     have one bit flipped or else will be changed to a random value. """
124     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
125     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
126     if sharevernum == 1:
127         return corrupt_field(data, 0x0c+0x20, 4)
128     else:
129         return corrupt_field(data, 0x0c+0x3c, 8)
130
131 def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False):
132     """ Scramble the file data -- the field showing the offset of the uri extension will be set
133     to the size of the file minus 3.  This means when the client tries to read the length field
134     from that location it will get a short read -- the result string will be only 3 bytes long,
135     not the 4 or 8 bytes necessary to do a successful struct.unpack."""
136     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
137     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
138     # The "-0x0c" in here is to skip the server-side header in the share file, which the client doesn't see when seeking and reading.
139     if sharevernum == 1:
140         if debug:
141             log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x2c, 4, struct.unpack(">L", data[0x2c:0x2c+4])[0], len(data)-0x0c-3, len(data)))
142         return data[:0x2c] + struct.pack(">L", len(data)-0x0c-3) + data[0x2c+4:]
143     else:
144         if debug:
145             log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x48, 8, struct.unpack(">Q", data[0x48:0x48+8])[0], len(data)-0x0c-3, len(data)))
146         return data[:0x48] + struct.pack(">Q", len(data)-0x0c-3) + data[0x48+8:]
147
148 def _corrupt_share_data(data):
149     """ Scramble the file data -- the field containing the share data itself will have one
150     bit flipped or else will be changed to a random value. """
151     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
152     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
153     if sharevernum == 1:
154         sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
155
156         return corrupt_field(data, 0x0c+0x24, sharedatasize)
157     else:
158         sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
159
160         return corrupt_field(data, 0x0c+0x44, sharedatasize)
161
162 def _corrupt_crypttext_hash_tree(data):
163     """ Scramble the file data -- the field containing the crypttext hash tree will have one
164     bit flipped or else will be changed to a random value.
165     """
166     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
167     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
168     if sharevernum == 1:
169         crypttexthashtreeoffset = struct.unpack(">L", data[0x0c+0x14:0x0c+0x14+4])[0]
170         blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
171     else:
172         crypttexthashtreeoffset = struct.unpack(">Q", data[0x0c+0x24:0x0c+0x24+8])[0]
173         blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
174
175     return corrupt_field(data, crypttexthashtreeoffset, blockhashesoffset-crypttexthashtreeoffset)
176
177 def _corrupt_block_hashes(data):
178     """ Scramble the file data -- the field containing the block hash tree will have one bit
179     flipped or else will be changed to a random value.
180     """
181     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
182     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
183     if sharevernum == 1:
184         blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
185         sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
186     else:
187         blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
188         sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
189
190     return corrupt_field(data, blockhashesoffset, sharehashesoffset-blockhashesoffset)
191
192 def _corrupt_share_hashes(data):
193     """ Scramble the file data -- the field containing the share hash chain will have one
194     bit flipped or else will be changed to a random value.
195     """
196     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
197     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
198     if sharevernum == 1:
199         sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
200         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
201     else:
202         sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
203         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
204
205     return corrupt_field(data, sharehashesoffset, uriextoffset-sharehashesoffset)
206
207 def _corrupt_length_of_uri_extension(data):
208     """ Scramble the file data -- the field showing the length of the uri extension will
209     have one bit flipped or else will be changed to a random value. """
210     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
211     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
212     if sharevernum == 1:
213         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
214         return corrupt_field(data, uriextoffset, 4)
215     else:
216         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
217         return corrupt_field(data, uriextoffset, 8)
218
219 def _corrupt_uri_extension(data):
220     """ Scramble the file data -- the field containing the uri extension will have one bit
221     flipped or else will be changed to a random value. """
222     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
223     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
224     if sharevernum == 1:
225         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
226         uriextlen = struct.unpack(">L", data[0x0c+uriextoffset:0x0c+uriextoffset+4])[0]
227     else:
228         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
229         uriextlen = struct.unpack(">Q", data[0x0c+uriextoffset:0x0c+uriextoffset+8])[0]
230
231     return corrupt_field(data, uriextoffset, uriextlen)
232
233 class Test(ShareManglingMixin, unittest.TestCase):
234     def setUp(self):
235         # Set self.basedir to a temp dir which has the name of the current test method in its
236         # name.
237         self.basedir = self.mktemp()
238
239         d = defer.maybeDeferred(SystemTestMixin.setUp, self)
240         d.addCallback(lambda x: self.set_up_nodes())
241
242         def _upload_a_file(ignored):
243             d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
244             def _after_upload(u):
245                 self.uri = IURI(u.uri)
246                 return self.clients[0].create_node_from_uri(self.uri)
247             d2.addCallback(_after_upload)
248             return d2
249         d.addCallback(_upload_a_file)
250
251         def _stash_it(filenode):
252             self.filenode = filenode
253         d.addCallback(_stash_it)
254         return d
255
256     def _download_and_check_plaintext(self, unused=None):
257         self.downloader = self.clients[1].getServiceNamed("downloader")
258         d = self.downloader.download_to_data(self.uri)
259
260         def _after_download(result):
261             self.failUnlessEqual(result, TEST_DATA)
262         d.addCallback(_after_download)
263         return d
264
265     def _delete_a_share(self, unused=None, sharenum=None):
266         """ Delete one share. """
267
268         shares = self.find_shares()
269         ks = shares.keys()
270         if sharenum is not None:
271             k = [ key for key in shares.keys() if key[1] == sharenum ][0]
272         else:
273             k = random.choice(ks)
274         del shares[k]
275         self.replace_shares(shares, storage_index=self.uri.storage_index)
276
277         return unused
278
279     def test_test_code(self):
280         # The following process of stashing the shares, running
281         # replace_shares, and asserting that the new set of shares equals the
282         # old is more to test this test code than to test the Tahoe code...
283         d = defer.succeed(None)
284         d.addCallback(self.find_shares)
285         stash = [None]
286         def _stash_it(res):
287             stash[0] = res
288             return res
289         d.addCallback(_stash_it)
290         d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
291
292         def _compare(res):
293             oldshares = stash[0]
294             self.failUnless(isinstance(oldshares, dict), oldshares)
295             self.failUnlessEqual(oldshares, res)
296
297         d.addCallback(self.find_shares)
298         d.addCallback(_compare)
299
300         d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
301         d.addCallback(self.find_shares)
302         d.addCallback(lambda x: self.failUnlessEqual(x, {}))
303
304         # The following process of deleting 8 of the shares and asserting that you can't
305         # download it is more to test this test code than to test the Tahoe code...
306         def _then_delete_8(unused=None):
307             self.replace_shares(stash[0], storage_index=self.uri.storage_index)
308             for i in range(8):
309                 self._delete_a_share()
310         d.addCallback(_then_delete_8)
311
312         def _then_download(unused=None):
313             self.downloader = self.clients[1].getServiceNamed("downloader")
314             d = self.downloader.download_to_data(self.uri)
315
316             def _after_download_callb(result):
317                 self.fail() # should have gotten an errback instead
318                 return result
319             def _after_download_errb(failure):
320                 failure.trap(NotEnoughSharesError)
321                 return None # success!
322             d.addCallbacks(_after_download_callb, _after_download_errb)
323         d.addCallback(_then_download)
324
325         # The following process of leaving 8 of the shares deleted and asserting that you can't
326         # repair it is more to test this test code than to test the Tahoe code...
327         def _then_repair(unused=None):
328             d2 = self.filenode.check_and_repair(Monitor(), verify=False)
329             def _after_repair(checkandrepairresults):
330                 prerepairres = checkandrepairresults.get_pre_repair_results()
331                 postrepairres = checkandrepairresults.get_post_repair_results()
332                 self.failIf(prerepairres.is_healthy())
333                 self.failIf(postrepairres.is_healthy())
334             d2.addCallback(_after_repair)
335             return d2
336         d.addCallback(_then_repair)
337         return d
338
339     def _count_reads(self):
340         sum_of_read_counts = 0
341         for client in self.clients:
342             counters = client.stats_provider.get_stats()['counters']
343             sum_of_read_counts += counters.get('storage_server.read', 0)
344         return sum_of_read_counts
345
346     def _count_allocates(self):
347         sum_of_allocate_counts = 0
348         for client in self.clients:
349             counters = client.stats_provider.get_stats()['counters']
350             sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
351         return sum_of_allocate_counts
352
353     def _corrupt_a_share(self, unused, corruptor_func, sharenum):
354         shares = self.find_shares()
355         ks = [ key for key in shares.keys() if key[1] == sharenum ]
356         assert ks, (shares.keys(), sharenum)
357         k = ks[0]
358         shares[k] = corruptor_func(shares[k])
359         self.replace_shares(shares, storage_index=self.uri.storage_index)
360
361     def _corrupt_all_shares(self, unused, corruptor_func):
362         """ All shares on disk will be corrupted by corruptor_func. """
363         shares = self.find_shares()
364         for k in shares.keys():
365             self._corrupt_a_share(unused, corruptor_func, k[1])
366
367     def _corrupt_a_random_share(self, unused, corruptor_func):
368         """ Exactly one share on disk will be corrupted by corruptor_func. """
369         shares = self.find_shares()
370         ks = shares.keys()
371         k = random.choice(ks)
372         return self._corrupt_a_share(unused, corruptor_func, k[1])
373
374     def test_download(self):
375         """ Basic download.  (This functionality is more or less already tested by test code in
376         other modules, but this module is also going to test some more specific things about
377         immutable download.)
378         """
379         d = defer.succeed(None)
380         before_download_reads = self._count_reads()
381         def _after_download(unused=None):
382             after_download_reads = self._count_reads()
383             # To pass this test, you have to download the file using only 10 reads to get the
384             # UEB (in parallel from all shares), plus one read for each of the 3 shares.
385             self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
386         d.addCallback(self._download_and_check_plaintext)
387         d.addCallback(_after_download)
388         return d
389
390     def test_download_from_only_3_remaining_shares(self):
391         """ Test download after 7 random shares (of the 10) have been removed. """
392         d = defer.succeed(None)
393         def _then_delete_7(unused=None):
394             for i in range(7):
395                 self._delete_a_share()
396         before_download_reads = self._count_reads()
397         d.addCallback(_then_delete_7)
398         def _after_download(unused=None):
399             after_download_reads = self._count_reads()
400             # To pass this test, you have to download the file using only 10 reads to get the
401             # UEB (in parallel from all shares), plus one read for each of the 3 shares.
402             self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
403         d.addCallback(self._download_and_check_plaintext)
404         d.addCallback(_after_download)
405         return d
406
407     def test_download_abort_if_too_many_missing_shares(self):
408         """ Test that download gives up quickly when it realizes there aren't enough shares out
409         there."""
410         d = defer.succeed(None)
411         def _then_delete_8(unused=None):
412             for i in range(8):
413                 self._delete_a_share()
414         d.addCallback(_then_delete_8)
415
416         before_download_reads = self._count_reads()
417         def _attempt_to_download(unused=None):
418             downloader = self.clients[1].getServiceNamed("downloader")
419             d = downloader.download_to_data(self.uri)
420
421             def _callb(res):
422                 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
423             def _errb(f):
424                 self.failUnless(f.check(NotEnoughSharesError))
425             d.addCallbacks(_callb, _errb)
426             return d
427
428         d.addCallback(_attempt_to_download)
429
430         def _after_attempt(unused=None):
431             after_download_reads = self._count_reads()
432             # To pass this test, you are required to give up before actually trying to read any
433             # share data.
434             self.failIf(after_download_reads-before_download_reads > 0, (after_download_reads, before_download_reads))
435         d.addCallback(_after_attempt)
436         return d
437
438     def test_download_abort_if_too_many_corrupted_shares(self):
439         """ Test that download gives up quickly when it realizes there aren't enough uncorrupted
440         shares out there. It should be able to tell because the corruption occurs in the
441         sharedata version number, which it checks first."""
442         d = defer.succeed(None)
443         def _then_corrupt_8(unused=None):
444             shnums = range(10)
445             random.shuffle(shnums)
446             for shnum in shnums[:8]:
447                 self._corrupt_a_share(None, _corrupt_sharedata_version_number, shnum)
448         d.addCallback(_then_corrupt_8)
449
450         before_download_reads = self._count_reads()
451         def _attempt_to_download(unused=None):
452             downloader = self.clients[1].getServiceNamed("downloader")
453             d = downloader.download_to_data(self.uri)
454
455             def _callb(res):
456                 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
457             def _errb(f):
458                 self.failUnless(f.check(NotEnoughSharesError))
459             d.addCallbacks(_callb, _errb)
460             return d
461
462         d.addCallback(_attempt_to_download)
463
464         def _after_attempt(unused=None):
465             after_download_reads = self._count_reads()
466             # To pass this test, you are required to give up before reading all of the share
467             # data.  Actually, we could give up sooner than 45 reads, but currently our download
468             # code does 45 reads.  This test then serves as a "performance regression detector"
469             # -- if you change download code so that it takes *more* reads, then this test will
470             # fail.
471             self.failIf(after_download_reads-before_download_reads > 45, (after_download_reads, before_download_reads))
472         d.addCallback(_after_attempt)
473         return d
474
475     def test_check_without_verify(self):
476         """ Check says the file is healthy when none of the shares have been touched.  It says
477         that the file is unhealthy when all of them have been removed. It doesn't use any reads.
478         """
479         d = defer.succeed(self.filenode)
480         def _check1(filenode):
481             before_check_reads = self._count_reads()
482
483             d2 = filenode.check(Monitor(), verify=False)
484             def _after_check(checkresults):
485                 after_check_reads = self._count_reads()
486                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
487                 self.failUnless(checkresults.is_healthy())
488
489             d2.addCallback(_after_check)
490             return d2
491         d.addCallback(_check1)
492
493         d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
494         def _check2(ignored):
495             before_check_reads = self._count_reads()
496             d2 = self.filenode.check(Monitor(), verify=False)
497
498             def _after_check(checkresults):
499                 after_check_reads = self._count_reads()
500                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
501                 self.failIf(checkresults.is_healthy())
502
503             d2.addCallback(_after_check)
504             return d2
505         d.addCallback(_check2)
506
507         return d
508
509     def test_check_with_verify(self):
510         """ Check says the file is healthy when none of the shares have been touched.  It says
511         that the file is unhealthy if any field of any share has been corrupted.  It doesn't use
512         more than twice as many reads as it needs. """
513         LEEWAY = 7 # We'll allow you to pass this test even if you trigger seven times as many disk reads and blocks sends as would be optimal.
514         DELTA_READS = 10 * LEEWAY # N = 10
515         d = defer.succeed(self.filenode)
516         def _check_pristine(filenode):
517             before_check_reads = self._count_reads()
518
519             d2 = filenode.check(Monitor(), verify=True)
520             def _after_check(checkresults):
521                 after_check_reads = self._count_reads()
522                 self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads, DELTA_READS))
523                 self.failUnless(checkresults.is_healthy())
524
525             d2.addCallback(_after_check)
526             return d2
527         d.addCallback(_check_pristine)
528
529         d.addCallback(self.find_shares)
530         stash = [None]
531         def _stash_it(res):
532             stash[0] = res
533             return res
534         d.addCallback(_stash_it)
535
536         def _check_after_feckless_corruption(ignored, corruptor_func):
537             # Corruption which has no effect -- bits of the share file that are unused.
538             before_check_reads = self._count_reads()
539             d2 = self.filenode.check(Monitor(), verify=True)
540
541             def _after_check(checkresults):
542                 after_check_reads = self._count_reads()
543                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
544                 self.failUnless(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
545                 data = checkresults.get_data()
546                 self.failUnless(data['count-shares-good'] == 10, data)
547                 self.failUnless(len(data['sharemap']) == 10, data)
548                 self.failUnless(data['count-shares-needed'] == 3, data)
549                 self.failUnless(data['count-shares-expected'] == 10, data)
550                 self.failUnless(data['count-good-share-hosts'] == 5, data)
551                 self.failUnless(len(data['servers-responding']) == 5, data)
552                 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
553
554             d2.addCallback(_after_check)
555             return d2
556
557         def _put_it_all_back(ignored):
558             self.replace_shares(stash[0], storage_index=self.uri.storage_index)
559             return ignored
560
561         for corruptor_func in (
562             _corrupt_size_of_file_data,
563             _corrupt_size_of_sharedata,
564             _corrupt_segment_size,
565             ):
566             d.addCallback(self._corrupt_a_random_share, corruptor_func)
567             d.addCallback(_check_after_feckless_corruption, corruptor_func=corruptor_func)
568             d.addCallback(_put_it_all_back)
569
570         def _check_after_server_visible_corruption(ignored, corruptor_func):
571             # Corruption which is detected by the server means that the server will send you
572             # back a Failure in response to get_bucket instead of giving you the share data.
573             before_check_reads = self._count_reads()
574             d2 = self.filenode.check(Monitor(), verify=True)
575
576             def _after_check(checkresults):
577                 after_check_reads = self._count_reads()
578                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
579                 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
580                 data = checkresults.get_data()
581                 # The server might fail to serve up its other share as well as the corrupted
582                 # one, so count-shares-good could be 8 or 9.
583                 self.failUnless(data['count-shares-good'] in (8, 9), data)
584                 self.failUnless(len(data['sharemap']) in (8, 9,), data)
585                 self.failUnless(data['count-shares-needed'] == 3, data)
586                 self.failUnless(data['count-shares-expected'] == 10, data)
587                 # The server may have served up the non-corrupted share, or it may not have, so
588                 # the checker could have detected either 4 or 5 good servers.
589                 self.failUnless(data['count-good-share-hosts'] in (4, 5), data)
590                 self.failUnless(len(data['servers-responding']) in (4, 5), data)
591                 # If the server served up the other share, then the checker should consider it good, else it should 
592                 # not.
593                 self.failUnless((data['count-shares-good'] == 9) == (data['count-good-share-hosts'] == 5), data)
594                 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
595
596             d2.addCallback(_after_check)
597             return d2
598
599         for corruptor_func in (
600             _corrupt_file_version_number,
601             ):
602             d.addCallback(self._corrupt_a_random_share, corruptor_func)
603             d.addCallback(_check_after_server_visible_corruption, corruptor_func=corruptor_func)
604             d.addCallback(_put_it_all_back)
605
606         def _check_after_share_incompatibility(ignored, corruptor_func):
607             # Corruption which means the share is indistinguishable from a share of an
608             # incompatible version.
609             before_check_reads = self._count_reads()
610             d2 = self.filenode.check(Monitor(), verify=True)
611
612             def _after_check(checkresults):
613                 after_check_reads = self._count_reads()
614                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
615                 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
616                 data = checkresults.get_data()
617                 self.failUnless(data['count-shares-good'] == 9, data)
618                 self.failUnless(len(data['sharemap']) == 9, data)
619                 self.failUnless(data['count-shares-needed'] == 3, data)
620                 self.failUnless(data['count-shares-expected'] == 10, data)
621                 self.failUnless(data['count-good-share-hosts'] == 5, data)
622                 self.failUnless(len(data['servers-responding']) == 5, data)
623                 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
624                 self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
625                 self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
626                 self.failUnless(len(data['list-incompatible-shares']) == 1, data)
627
628             d2.addCallback(_after_check)
629             return d2
630
631         for corruptor_func in (
632             _corrupt_sharedata_version_number,
633             ):
634             d.addCallback(self._corrupt_a_random_share, corruptor_func)
635             d.addCallback(_check_after_share_incompatibility, corruptor_func=corruptor_func)
636             d.addCallback(_put_it_all_back)
637
638         def _check_after_server_invisible_corruption(ignored, corruptor_func):
639             # Corruption which is not detected by the server means that the server will send you
640             # back the share data, but you will detect that it is wrong.
641             before_check_reads = self._count_reads()
642             d2 = self.filenode.check(Monitor(), verify=True)
643
644             def _after_check(checkresults):
645                 after_check_reads = self._count_reads()
646                 # print "delta was ", after_check_reads - before_check_reads
647                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
648                 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
649                 data = checkresults.get_data()
650                 self.failUnless(data['count-shares-good'] == 9, data)
651                 self.failUnless(data['count-shares-needed'] == 3, data)
652                 self.failUnless(data['count-shares-expected'] == 10, data)
653                 self.failUnless(data['count-good-share-hosts'] == 5, data)
654                 self.failUnless(data['count-corrupt-shares'] == 1, (data, corruptor_func))
655                 self.failUnless(len(data['list-corrupt-shares']) == 1, data)
656                 self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
657                 self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
658                 self.failUnless(len(data['list-incompatible-shares']) == 0, data)
659                 self.failUnless(len(data['servers-responding']) == 5, data)
660                 self.failUnless(len(data['sharemap']) == 9, data)
661
662             d2.addCallback(_after_check)
663             return d2
664
665         for corruptor_func in (
666             _corrupt_sharedata_version_number_to_known_version,
667             _corrupt_offset_of_sharedata,
668             _corrupt_offset_of_ciphertext_hash_tree,
669             _corrupt_offset_of_block_hashes,
670             _corrupt_offset_of_share_hashes,
671             _corrupt_offset_of_uri_extension,
672             _corrupt_offset_of_uri_extension_to_force_short_read,
673             _corrupt_share_data,
674             _corrupt_crypttext_hash_tree,
675             _corrupt_block_hashes,
676             _corrupt_share_hashes,
677             _corrupt_length_of_uri_extension,
678             _corrupt_uri_extension,
679             ):
680             d.addCallback(self._corrupt_a_random_share, corruptor_func)
681             d.addCallback(_check_after_server_invisible_corruption, corruptor_func=corruptor_func)
682             d.addCallback(_put_it_all_back)
683         return d
684     test_check_with_verify.todo = "We haven't implemented a verifier this thorough yet."
685
686     def test_repair(self):
687         """ Repair replaces a share that got deleted. """
688         # N == 10.  7 is the "efficiency leeway" -- we'll allow you to pass this test even if
689         # you trigger seven times as many disk reads and blocks sends as would be optimal.
690         DELTA_READS = 10 * 7
691         # We'll allow you to pass this test only if you repair the missing share using only a
692         # single allocate.
693         DELTA_ALLOCATES = 1
694
695         d = defer.succeed(self.filenode)
696         d.addCallback(self._delete_a_share, sharenum=2)
697
698         def _repair_from_deletion_of_1(filenode):
699             before_repair_reads = self._count_reads()
700             before_repair_allocates = self._count_allocates()
701
702             d2 = filenode.check_and_repair(Monitor(), verify=False)
703             def _after_repair(checkandrepairresults):
704                 prerepairres = checkandrepairresults.get_pre_repair_results()
705                 postrepairres = checkandrepairresults.get_post_repair_results()
706                 after_repair_reads = self._count_reads()
707                 after_repair_allocates = self._count_allocates()
708
709                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
710                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
711                 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
712                 self.failIf(prerepairres.is_healthy())
713                 self.failUnless(postrepairres.is_healthy())
714
715                 # Now we inspect the filesystem to make sure that it has 10 shares.
716                 shares = self.find_shares()
717                 self.failIf(len(shares) < 10)
718
719                 # Now we delete seven of the other shares, then try to download the file and
720                 # assert that it succeeds at downloading and has the right contents.  This can't
721                 # work unless it has already repaired the previously-deleted share #2.
722                 for sharenum in range(3, 10):
723                     self._delete_a_share(sharenum=sharenum)
724
725                 return self._download_and_check_plaintext()
726
727             d2.addCallback(_after_repair)
728             return d2
729         d.addCallback(_repair_from_deletion_of_1)
730
731         # Now we repair again to get all of those 7 back...
732         def _repair_from_deletion_of_7(filenode):
733             before_repair_reads = self._count_reads()
734             before_repair_allocates = self._count_allocates()
735
736             d2 = filenode.check_and_repair(Monitor(), verify=False)
737             def _after_repair(checkandrepairresults):
738                 prerepairres = checkandrepairresults.get_pre_repair_results()
739                 postrepairres = checkandrepairresults.get_post_repair_results()
740                 after_repair_reads = self._count_reads()
741                 after_repair_allocates = self._count_allocates()
742
743                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
744                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
745                 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_ALLOCATES*7))
746                 self.failIf(prerepairres.is_healthy())
747                 self.failUnless(postrepairres.is_healthy())
748
749                 # Now we inspect the filesystem to make sure that it has 10 shares.
750                 shares = self.find_shares()
751                 self.failIf(len(shares) < 10)
752
753                 return self._download_and_check_plaintext()
754
755             d2.addCallback(_after_repair)
756             return d2
757         d.addCallback(_repair_from_deletion_of_7)
758
759         def _repair_from_corruption(filenode):
760             before_repair_reads = self._count_reads()
761             before_repair_allocates = self._count_allocates()
762
763             d2 = filenode.check_and_repair(Monitor(), verify=False)
764             def _after_repair(checkandrepairresults):
765                 prerepairres = checkandrepairresults.get_pre_repair_results()
766                 postrepairres = checkandrepairresults.get_post_repair_results()
767                 after_repair_reads = self._count_reads()
768                 after_repair_allocates = self._count_allocates()
769
770                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
771                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
772                 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
773                 self.failIf(prerepairres.is_healthy())
774                 self.failUnless(postrepairres.is_healthy())
775
776                 return self._download_and_check_plaintext()
777
778             d2.addCallback(_after_repair)
779             return d2
780
781         for corruptor_func in (
782             _corrupt_file_version_number,
783             _corrupt_sharedata_version_number,
784             _corrupt_sharedata_version_number_to_known_version,
785             _corrupt_offset_of_sharedata,
786             _corrupt_offset_of_ciphertext_hash_tree,
787             _corrupt_offset_of_block_hashes,
788             _corrupt_offset_of_share_hashes,
789             _corrupt_offset_of_uri_extension,
790             _corrupt_share_data,
791             _corrupt_crypttext_hash_tree,
792             _corrupt_block_hashes,
793             _corrupt_share_hashes,
794             _corrupt_length_of_uri_extension,
795             _corrupt_uri_extension,
796             ):
797             # Now we corrupt a share...
798             d.addCallback(self._corrupt_a_random_share, corruptor_func)
799             # And repair...
800             d.addCallback(_repair_from_corruption)
801
802         return d
803     test_repair.todo = "We haven't implemented a repairer yet."
804
805
806 # XXX extend these tests to show that the checker detects which specific share on which specific server is broken -- this is necessary so that the checker results can be passed to the repairer and the repairer can go ahead and upload fixes without first doing what is effectively a check (/verify) run
807
808 # XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example
809
810 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit