]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_immutable.py
immutable: stop reading past the end of the sharefile in the process of optimizing...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_immutable.py
1
2 from allmydata.test.common import SystemTestMixin, ShareManglingMixin
3 from allmydata.monitor import Monitor
4 from allmydata.interfaces import IURI, NotEnoughSharesError
5 from allmydata.immutable import upload
6 from allmydata.util import log
7 from twisted.internet import defer
8 from twisted.trial import unittest
9 import random, struct
10 import common_util as testutil
11
12 TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
13
14 def corrupt_field(data, offset, size, debug=False):
15     if random.random() < 0.5:
16         newdata = testutil.flip_one_bit(data, offset, size)
17         if debug:
18             log.msg("testing: corrupting offset %d, size %d flipping one bit orig: %r, newdata: %r" % (offset, size, data[offset:offset+size], newdata[offset:offset+size]))
19         return newdata
20     else:
21         newval = testutil.insecurerandstr(size)
22         if debug:
23             log.msg("testing: corrupting offset %d, size %d randomizing field, orig: %r, newval: %r" % (offset, size, data[offset:offset+size], newval))
24         return data[:offset]+newval+data[offset+size:]
25
26 def _corrupt_file_version_number(data):
27     """ Scramble the file data -- the share file version number have one bit flipped or else
28     will be changed to a random value."""
29     return corrupt_field(data, 0x00, 4)
30
31 def _corrupt_size_of_file_data(data):
32     """ Scramble the file data -- the field showing the size of the share data within the file
33     will be set to one smaller. """
34     return corrupt_field(data, 0x04, 4)
35
36 def _corrupt_sharedata_version_number(data):
37     """ Scramble the file data -- the share data version number will have one bit flipped or
38     else will be changed to a random value, but not 1 or 2."""
39     return corrupt_field(data, 0x0c, 4)
40     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
41     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
42     newsharevernum = sharevernum
43     while newsharevernum in (1, 2):
44         newsharevernum = random.randrange(0, 2**32)
45     newsharevernumbytes = struct.pack(">l", newsharevernum)
46     return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
47
48 def _corrupt_sharedata_version_number_to_known_version(data):
49     """ Scramble the file data -- the share data version number will
50     be changed to 2 if it is 1 or else to 1 if it is 2."""
51     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
52     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
53     if sharevernum == 1:
54         newsharevernum = 2
55     else:
56         newsharevernum = 1
57     newsharevernumbytes = struct.pack(">l", newsharevernum)
58     return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
59
60 def _corrupt_segment_size(data):
61     """ Scramble the file data -- the field showing the size of the segment will have one
62     bit flipped or else be changed to a random value. """
63     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
64     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
65     if sharevernum == 1:
66         return corrupt_field(data, 0x0c+0x04, 4, debug=False)
67     else:
68         return corrupt_field(data, 0x0c+0x04, 8, debug=False)
69
70 def _corrupt_size_of_sharedata(data):
71     """ Scramble the file data -- the field showing the size of the data within the share
72     data will have one bit flipped or else will be changed to a random value. """
73     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
74     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
75     if sharevernum == 1:
76         return corrupt_field(data, 0x0c+0x08, 4)
77     else:
78         return corrupt_field(data, 0x0c+0x0c, 8)
79
80 def _corrupt_offset_of_sharedata(data):
81     """ Scramble the file data -- the field showing the offset of the data within the share
82     data will have one bit flipped or else be changed to a random value. """
83     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
84     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
85     if sharevernum == 1:
86         return corrupt_field(data, 0x0c+0x0c, 4)
87     else:
88         return corrupt_field(data, 0x0c+0x14, 8)
89
90 def _corrupt_offset_of_ciphertext_hash_tree(data):
91     """ Scramble the file data -- the field showing the offset of the ciphertext hash tree
92     within the share data will have one bit flipped or else be changed to a random value.
93     """
94     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
95     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
96     if sharevernum == 1:
97         return corrupt_field(data, 0x0c+0x14, 4, debug=False)
98     else:
99         return corrupt_field(data, 0x0c+0x24, 8, debug=False)
100
101 def _corrupt_offset_of_block_hashes(data):
102     """ Scramble the file data -- the field showing the offset of the block hash tree within
103     the share data will have one bit flipped or else will be changed to a random value. """
104     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
105     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
106     if sharevernum == 1:
107         return corrupt_field(data, 0x0c+0x18, 4)
108     else:
109         return corrupt_field(data, 0x0c+0x2c, 8)
110
111 def _corrupt_offset_of_share_hashes(data):
112     """ Scramble the file data -- the field showing the offset of the share hash tree within
113     the share data will have one bit flipped or else will be changed to a random value. """
114     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
115     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
116     if sharevernum == 1:
117         return corrupt_field(data, 0x0c+0x1c, 4)
118     else:
119         return corrupt_field(data, 0x0c+0x34, 8)
120
121 def _corrupt_offset_of_uri_extension(data):
122     """ Scramble the file data -- the field showing the offset of the uri extension will
123     have one bit flipped or else will be changed to a random value. """
124     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
125     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
126     if sharevernum == 1:
127         return corrupt_field(data, 0x0c+0x20, 4)
128     else:
129         return corrupt_field(data, 0x0c+0x3c, 8)
130
131 def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False):
132     """ Scramble the file data -- the field showing the offset of the uri extension will be set
133     to the size of the file minus 3.  This means when the client tries to read the length field
134     from that location it will get a short read -- the result string will be only 3 bytes long,
135     not the 4 or 8 bytes necessary to do a successful struct.unpack."""
136     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
137     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
138     # The "-0x0c" in here is to skip the server-side header in the share file, which the client doesn't see when seeking and reading.
139     if sharevernum == 1:
140         if debug:
141             log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x2c, 4, struct.unpack(">L", data[0x2c:0x2c+4])[0], len(data)-0x0c-3, len(data)))
142         return data[:0x2c] + struct.pack(">L", len(data)-0x0c-3) + data[0x2c+4:]
143     else:
144         if debug:
145             log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x48, 8, struct.unpack(">Q", data[0x48:0x48+8])[0], len(data)-0x0c-3, len(data)))
146         return data[:0x48] + struct.pack(">Q", len(data)-0x0c-3) + data[0x48+8:]
147
148 def _corrupt_share_data(data):
149     """ Scramble the file data -- the field containing the share data itself will have one
150     bit flipped or else will be changed to a random value. """
151     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
152     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
153     if sharevernum == 1:
154         sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
155
156         return corrupt_field(data, 0x0c+0x24, sharedatasize)
157     else:
158         sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
159
160         return corrupt_field(data, 0x0c+0x44, sharedatasize)
161
162 def _corrupt_crypttext_hash_tree(data):
163     """ Scramble the file data -- the field containing the crypttext hash tree will have one
164     bit flipped or else will be changed to a random value.
165     """
166     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
167     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
168     if sharevernum == 1:
169         crypttexthashtreeoffset = struct.unpack(">L", data[0x0c+0x14:0x0c+0x14+4])[0]
170         blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
171     else:
172         crypttexthashtreeoffset = struct.unpack(">Q", data[0x0c+0x24:0x0c+0x24+8])[0]
173         blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
174
175     return corrupt_field(data, crypttexthashtreeoffset, blockhashesoffset-crypttexthashtreeoffset)
176
177 def _corrupt_block_hashes(data):
178     """ Scramble the file data -- the field containing the block hash tree will have one bit
179     flipped or else will be changed to a random value.
180     """
181     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
182     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
183     if sharevernum == 1:
184         blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
185         sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
186     else:
187         blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
188         sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
189
190     return corrupt_field(data, blockhashesoffset, sharehashesoffset-blockhashesoffset)
191
192 def _corrupt_share_hashes(data):
193     """ Scramble the file data -- the field containing the share hash chain will have one
194     bit flipped or else will be changed to a random value.
195     """
196     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
197     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
198     if sharevernum == 1:
199         sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
200         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
201     else:
202         sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
203         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
204
205     return corrupt_field(data, sharehashesoffset, uriextoffset-sharehashesoffset)
206
207 def _corrupt_length_of_uri_extension(data):
208     """ Scramble the file data -- the field showing the length of the uri extension will
209     have one bit flipped or else will be changed to a random value. """
210     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
211     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
212     if sharevernum == 1:
213         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
214         return corrupt_field(data, uriextoffset, 4)
215     else:
216         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
217         return corrupt_field(data, uriextoffset, 8)
218
219 def _corrupt_uri_extension(data):
220     """ Scramble the file data -- the field containing the uri extension will have one bit
221     flipped or else will be changed to a random value. """
222     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
223     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
224     if sharevernum == 1:
225         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
226         uriextlen = struct.unpack(">L", data[0x0c+uriextoffset:0x0c+uriextoffset+4])[0]
227     else:
228         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
229         uriextlen = struct.unpack(">Q", data[0x0c+uriextoffset:0x0c+uriextoffset+8])[0]
230
231     return corrupt_field(data, uriextoffset, uriextlen)
232
233 class Test(ShareManglingMixin, unittest.TestCase):
234     def setUp(self):
235         # Set self.basedir to a temp dir which has the name of the current test method in its
236         # name.
237         self.basedir = self.mktemp()
238
239         d = defer.maybeDeferred(SystemTestMixin.setUp, self)
240         d.addCallback(lambda x: self.set_up_nodes())
241
242         def _upload_a_file(ignored):
243             d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
244             def _after_upload(u):
245                 self.uri = IURI(u.uri)
246                 return self.clients[0].create_node_from_uri(self.uri)
247             d2.addCallback(_after_upload)
248             return d2
249         d.addCallback(_upload_a_file)
250
251         def _stash_it(filenode):
252             self.filenode = filenode
253         d.addCallback(_stash_it)
254         return d
255
256     def _download_and_check_plaintext(self, unused=None):
257         self.downloader = self.clients[1].getServiceNamed("downloader")
258         d = self.downloader.download_to_data(self.uri)
259
260         def _after_download(result):
261             self.failUnlessEqual(result, TEST_DATA)
262         d.addCallback(_after_download)
263         return d
264
265     def _delete_a_share(self, unused=None, sharenum=None):
266         """ Delete one share. """
267
268         shares = self.find_shares()
269         ks = shares.keys()
270         if sharenum is not None:
271             k = [ key for key in shares.keys() if key[1] == sharenum ][0]
272         else:
273             k = random.choice(ks)
274         del shares[k]
275         self.replace_shares(shares, storage_index=self.uri.storage_index)
276
277         return unused
278
279     def test_test_code(self):
280         # The following process of stashing the shares, running
281         # replace_shares, and asserting that the new set of shares equals the
282         # old is more to test this test code than to test the Tahoe code...
283         d = defer.succeed(None)
284         d.addCallback(self.find_shares)
285         stash = [None]
286         def _stash_it(res):
287             stash[0] = res
288             return res
289         d.addCallback(_stash_it)
290         d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
291
292         def _compare(res):
293             oldshares = stash[0]
294             self.failUnless(isinstance(oldshares, dict), oldshares)
295             self.failUnlessEqual(oldshares, res)
296
297         d.addCallback(self.find_shares)
298         d.addCallback(_compare)
299
300         d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
301         d.addCallback(self.find_shares)
302         d.addCallback(lambda x: self.failUnlessEqual(x, {}))
303
304         # The following process of deleting 8 of the shares and asserting that you can't
305         # download it is more to test this test code than to test the Tahoe code...
306         def _then_delete_8(unused=None):
307             self.replace_shares(stash[0], storage_index=self.uri.storage_index)
308             for i in range(8):
309                 self._delete_a_share()
310         d.addCallback(_then_delete_8)
311
312         def _then_download(unused=None):
313             self.downloader = self.clients[1].getServiceNamed("downloader")
314             d = self.downloader.download_to_data(self.uri)
315
316             def _after_download_callb(result):
317                 self.fail() # should have gotten an errback instead
318                 return result
319             def _after_download_errb(failure):
320                 failure.trap(NotEnoughSharesError)
321                 return None # success!
322             d.addCallbacks(_after_download_callb, _after_download_errb)
323         d.addCallback(_then_download)
324
325         # The following process of leaving 8 of the shares deleted and asserting that you can't
326         # repair it is more to test this test code than to test the Tahoe code...
327         def _then_repair(unused=None):
328             d2 = self.filenode.check_and_repair(Monitor(), verify=False)
329             def _after_repair(checkandrepairresults):
330                 prerepairres = checkandrepairresults.get_pre_repair_results()
331                 postrepairres = checkandrepairresults.get_post_repair_results()
332                 self.failIf(prerepairres.is_healthy())
333                 self.failIf(postrepairres.is_healthy())
334             d2.addCallback(_after_repair)
335             return d2
336         d.addCallback(_then_repair)
337         return d
338
339     def _count_reads(self):
340         sum_of_read_counts = 0
341         for client in self.clients:
342             counters = client.stats_provider.get_stats()['counters']
343             sum_of_read_counts += counters.get('storage_server.read', 0)
344         return sum_of_read_counts
345
346     def _count_allocates(self):
347         sum_of_allocate_counts = 0
348         for client in self.clients:
349             counters = client.stats_provider.get_stats()['counters']
350             sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
351         return sum_of_allocate_counts
352
353     def _corrupt_a_share(self, unused, corruptor_func, sharenum):
354         shares = self.find_shares()
355         ks = [ key for key in shares.keys() if key[1] == sharenum ]
356         assert ks, (shares.keys(), sharenum)
357         k = ks[0]
358         shares[k] = corruptor_func(shares[k])
359         self.replace_shares(shares, storage_index=self.uri.storage_index)
360
361     def _corrupt_all_shares(self, unused, corruptor_func):
362         """ All shares on disk will be corrupted by corruptor_func. """
363         shares = self.find_shares()
364         for k in shares.keys():
365             self._corrupt_a_share(unused, corruptor_func, k[1])
366
367     def _corrupt_a_random_share(self, unused, corruptor_func):
368         """ Exactly one share on disk will be corrupted by corruptor_func. """
369         shares = self.find_shares()
370         ks = shares.keys()
371         k = random.choice(ks)
372         return self._corrupt_a_share(unused, corruptor_func, k[1])
373
374     def test_download(self):
375         """ Basic download.  (This functionality is more or less already tested by test code in
376         other modules, but this module is also going to test some more specific things about
377         immutable download.)
378         """
379         d = defer.succeed(None)
380         before_download_reads = self._count_reads()
381         def _after_download(unused=None):
382             after_download_reads = self._count_reads()
383             # To pass this test, you have to download the file using only 10 reads total: 3 to
384             # get the headers from each share, 3 to get the share hash trees and uebs from each
385             # share, 1 to get the crypttext hashes, and 3 to get the block data from each share.
386             self.failIf(after_download_reads-before_download_reads > 12, (after_download_reads, before_download_reads))
387         d.addCallback(self._download_and_check_plaintext)
388         d.addCallback(_after_download)
389         return d
390
391     def test_download_from_only_3_remaining_shares(self):
392         """ Test download after 7 random shares (of the 10) have been removed. """
393         d = defer.succeed(None)
394         def _then_delete_7(unused=None):
395             for i in range(7):
396                 self._delete_a_share()
397         before_download_reads = self._count_reads()
398         d.addCallback(_then_delete_7)
399         def _after_download(unused=None):
400             after_download_reads = self._count_reads()
401             # To pass this test, you have to download the file using only 10 reads to get the
402             # UEB (in parallel from all shares), plus one read for each of the 3 shares.
403             self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
404         d.addCallback(self._download_and_check_plaintext)
405         d.addCallback(_after_download)
406         return d
407
408     def test_download_abort_if_too_many_missing_shares(self):
409         """ Test that download gives up quickly when it realizes there aren't enough shares out
410         there."""
411         d = defer.succeed(None)
412         def _then_delete_8(unused=None):
413             for i in range(8):
414                 self._delete_a_share()
415         d.addCallback(_then_delete_8)
416
417         before_download_reads = self._count_reads()
418         def _attempt_to_download(unused=None):
419             downloader = self.clients[1].getServiceNamed("downloader")
420             d = downloader.download_to_data(self.uri)
421
422             def _callb(res):
423                 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
424             def _errb(f):
425                 self.failUnless(f.check(NotEnoughSharesError))
426             d.addCallbacks(_callb, _errb)
427             return d
428
429         d.addCallback(_attempt_to_download)
430
431         def _after_attempt(unused=None):
432             after_download_reads = self._count_reads()
433             # To pass this test, you are required to give up before actually trying to read any
434             # share data.
435             self.failIf(after_download_reads-before_download_reads > 0, (after_download_reads, before_download_reads))
436         d.addCallback(_after_attempt)
437         return d
438
439     def test_download_abort_if_too_many_corrupted_shares(self):
440         """ Test that download gives up quickly when it realizes there aren't enough uncorrupted
441         shares out there. It should be able to tell because the corruption occurs in the
442         sharedata version number, which it checks first."""
443         d = defer.succeed(None)
444         def _then_corrupt_8(unused=None):
445             shnums = range(10)
446             random.shuffle(shnums)
447             for shnum in shnums[:8]:
448                 self._corrupt_a_share(None, _corrupt_sharedata_version_number, shnum)
449         d.addCallback(_then_corrupt_8)
450
451         before_download_reads = self._count_reads()
452         def _attempt_to_download(unused=None):
453             downloader = self.clients[1].getServiceNamed("downloader")
454             d = downloader.download_to_data(self.uri)
455
456             def _callb(res):
457                 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
458             def _errb(f):
459                 self.failUnless(f.check(NotEnoughSharesError))
460             d.addCallbacks(_callb, _errb)
461             return d
462
463         d.addCallback(_attempt_to_download)
464
465         def _after_attempt(unused=None):
466             after_download_reads = self._count_reads()
467             # To pass this test, you are required to give up before reading all of the share
468             # data.  Actually, we could give up sooner than 45 reads, but currently our download
469             # code does 45 reads.  This test then serves as a "performance regression detector"
470             # -- if you change download code so that it takes *more* reads, then this test will
471             # fail.
472             self.failIf(after_download_reads-before_download_reads > 45, (after_download_reads, before_download_reads))
473         d.addCallback(_after_attempt)
474         return d
475
476     def test_check_without_verify(self):
477         """ Check says the file is healthy when none of the shares have been touched.  It says
478         that the file is unhealthy when all of them have been removed. It doesn't use any reads.
479         """
480         d = defer.succeed(self.filenode)
481         def _check1(filenode):
482             before_check_reads = self._count_reads()
483
484             d2 = filenode.check(Monitor(), verify=False)
485             def _after_check(checkresults):
486                 after_check_reads = self._count_reads()
487                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
488                 self.failUnless(checkresults.is_healthy())
489
490             d2.addCallback(_after_check)
491             return d2
492         d.addCallback(_check1)
493
494         d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
495         def _check2(ignored):
496             before_check_reads = self._count_reads()
497             d2 = self.filenode.check(Monitor(), verify=False)
498
499             def _after_check(checkresults):
500                 after_check_reads = self._count_reads()
501                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
502                 self.failIf(checkresults.is_healthy())
503
504             d2.addCallback(_after_check)
505             return d2
506         d.addCallback(_check2)
507
508         return d
509
510     def test_check_with_verify(self):
511         """ Check says the file is healthy when none of the shares have been touched.  It says
512         that the file is unhealthy if any field of any share has been corrupted.  It doesn't use
513         more than twice as many reads as it needs. """
514         LEEWAY = 7 # We'll allow you to pass this test even if you trigger seven times as many disk reads and blocks sends as would be optimal.
515         DELTA_READS = 10 * LEEWAY # N = 10
516         d = defer.succeed(self.filenode)
517         def _check_pristine(filenode):
518             before_check_reads = self._count_reads()
519
520             d2 = filenode.check(Monitor(), verify=True)
521             def _after_check(checkresults):
522                 after_check_reads = self._count_reads()
523                 self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads, DELTA_READS))
524                 self.failUnless(checkresults.is_healthy())
525
526             d2.addCallback(_after_check)
527             return d2
528         d.addCallback(_check_pristine)
529
530         d.addCallback(self.find_shares)
531         stash = [None]
532         def _stash_it(res):
533             stash[0] = res
534             return res
535         d.addCallback(_stash_it)
536
537         def _check_after_feckless_corruption(ignored, corruptor_func):
538             # Corruption which has no effect -- bits of the share file that are unused.
539             before_check_reads = self._count_reads()
540             d2 = self.filenode.check(Monitor(), verify=True)
541
542             def _after_check(checkresults):
543                 after_check_reads = self._count_reads()
544                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
545                 self.failUnless(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
546                 data = checkresults.get_data()
547                 self.failUnless(data['count-shares-good'] == 10, data)
548                 self.failUnless(len(data['sharemap']) == 10, data)
549                 self.failUnless(data['count-shares-needed'] == 3, data)
550                 self.failUnless(data['count-shares-expected'] == 10, data)
551                 self.failUnless(data['count-good-share-hosts'] == 5, data)
552                 self.failUnless(len(data['servers-responding']) == 5, data)
553                 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
554
555             d2.addCallback(_after_check)
556             return d2
557
558         def _put_it_all_back(ignored):
559             self.replace_shares(stash[0], storage_index=self.uri.storage_index)
560             return ignored
561
562         for corruptor_func in (
563             _corrupt_size_of_file_data,
564             _corrupt_size_of_sharedata,
565             _corrupt_segment_size,
566             ):
567             d.addCallback(self._corrupt_a_random_share, corruptor_func)
568             d.addCallback(_check_after_feckless_corruption, corruptor_func=corruptor_func)
569             d.addCallback(_put_it_all_back)
570
571         def _check_after_server_visible_corruption(ignored, corruptor_func):
572             # Corruption which is detected by the server means that the server will send you
573             # back a Failure in response to get_bucket instead of giving you the share data.
574             before_check_reads = self._count_reads()
575             d2 = self.filenode.check(Monitor(), verify=True)
576
577             def _after_check(checkresults):
578                 after_check_reads = self._count_reads()
579                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
580                 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
581                 data = checkresults.get_data()
582                 # The server might fail to serve up its other share as well as the corrupted
583                 # one, so count-shares-good could be 8 or 9.
584                 self.failUnless(data['count-shares-good'] in (8, 9), data)
585                 self.failUnless(len(data['sharemap']) in (8, 9,), data)
586                 self.failUnless(data['count-shares-needed'] == 3, data)
587                 self.failUnless(data['count-shares-expected'] == 10, data)
588                 # The server may have served up the non-corrupted share, or it may not have, so
589                 # the checker could have detected either 4 or 5 good servers.
590                 self.failUnless(data['count-good-share-hosts'] in (4, 5), data)
591                 self.failUnless(len(data['servers-responding']) in (4, 5), data)
592                 # If the server served up the other share, then the checker should consider it good, else it should 
593                 # not.
594                 self.failUnless((data['count-shares-good'] == 9) == (data['count-good-share-hosts'] == 5), data)
595                 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
596
597             d2.addCallback(_after_check)
598             return d2
599
600         for corruptor_func in (
601             _corrupt_file_version_number,
602             ):
603             d.addCallback(self._corrupt_a_random_share, corruptor_func)
604             d.addCallback(_check_after_server_visible_corruption, corruptor_func=corruptor_func)
605             d.addCallback(_put_it_all_back)
606
607         def _check_after_share_incompatibility(ignored, corruptor_func):
608             # Corruption which means the share is indistinguishable from a share of an
609             # incompatible version.
610             before_check_reads = self._count_reads()
611             d2 = self.filenode.check(Monitor(), verify=True)
612
613             def _after_check(checkresults):
614                 after_check_reads = self._count_reads()
615                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
616                 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
617                 data = checkresults.get_data()
618                 self.failUnless(data['count-shares-good'] == 9, data)
619                 self.failUnless(len(data['sharemap']) == 9, data)
620                 self.failUnless(data['count-shares-needed'] == 3, data)
621                 self.failUnless(data['count-shares-expected'] == 10, data)
622                 self.failUnless(data['count-good-share-hosts'] == 5, data)
623                 self.failUnless(len(data['servers-responding']) == 5, data)
624                 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
625                 self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
626                 self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
627                 self.failUnless(len(data['list-incompatible-shares']) == 1, data)
628
629             d2.addCallback(_after_check)
630             return d2
631
632         for corruptor_func in (
633             _corrupt_sharedata_version_number,
634             ):
635             d.addCallback(self._corrupt_a_random_share, corruptor_func)
636             d.addCallback(_check_after_share_incompatibility, corruptor_func=corruptor_func)
637             d.addCallback(_put_it_all_back)
638
639         def _check_after_server_invisible_corruption(ignored, corruptor_func):
640             # Corruption which is not detected by the server means that the server will send you
641             # back the share data, but you will detect that it is wrong.
642             before_check_reads = self._count_reads()
643             d2 = self.filenode.check(Monitor(), verify=True)
644
645             def _after_check(checkresults):
646                 after_check_reads = self._count_reads()
647                 # print "delta was ", after_check_reads - before_check_reads
648                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
649                 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
650                 data = checkresults.get_data()
651                 self.failUnless(data['count-shares-good'] == 9, data)
652                 self.failUnless(data['count-shares-needed'] == 3, data)
653                 self.failUnless(data['count-shares-expected'] == 10, data)
654                 self.failUnless(data['count-good-share-hosts'] == 5, data)
655                 self.failUnless(data['count-corrupt-shares'] == 1, (data, corruptor_func))
656                 self.failUnless(len(data['list-corrupt-shares']) == 1, data)
657                 self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
658                 self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
659                 self.failUnless(len(data['list-incompatible-shares']) == 0, data)
660                 self.failUnless(len(data['servers-responding']) == 5, data)
661                 self.failUnless(len(data['sharemap']) == 9, data)
662
663             d2.addCallback(_after_check)
664             return d2
665
666         for corruptor_func in (
667             _corrupt_sharedata_version_number_to_known_version,
668             _corrupt_offset_of_sharedata,
669             _corrupt_offset_of_ciphertext_hash_tree,
670             _corrupt_offset_of_block_hashes,
671             _corrupt_offset_of_share_hashes,
672             _corrupt_offset_of_uri_extension,
673             _corrupt_offset_of_uri_extension_to_force_short_read,
674             _corrupt_share_data,
675             _corrupt_crypttext_hash_tree,
676             _corrupt_block_hashes,
677             _corrupt_share_hashes,
678             _corrupt_length_of_uri_extension,
679             _corrupt_uri_extension,
680             ):
681             d.addCallback(self._corrupt_a_random_share, corruptor_func)
682             d.addCallback(_check_after_server_invisible_corruption, corruptor_func=corruptor_func)
683             d.addCallback(_put_it_all_back)
684         return d
685     test_check_with_verify.todo = "We haven't implemented a verifier this thorough yet."
686
687     def test_repair(self):
688         """ Repair replaces a share that got deleted. """
689         # N == 10.  7 is the "efficiency leeway" -- we'll allow you to pass this test even if
690         # you trigger seven times as many disk reads and blocks sends as would be optimal.
691         DELTA_READS = 10 * 7
692         # We'll allow you to pass this test only if you repair the missing share using only a
693         # single allocate.
694         DELTA_ALLOCATES = 1
695
696         d = defer.succeed(self.filenode)
697         d.addCallback(self._delete_a_share, sharenum=2)
698
699         def _repair_from_deletion_of_1(filenode):
700             before_repair_reads = self._count_reads()
701             before_repair_allocates = self._count_allocates()
702
703             d2 = filenode.check_and_repair(Monitor(), verify=False)
704             def _after_repair(checkandrepairresults):
705                 prerepairres = checkandrepairresults.get_pre_repair_results()
706                 postrepairres = checkandrepairresults.get_post_repair_results()
707                 after_repair_reads = self._count_reads()
708                 after_repair_allocates = self._count_allocates()
709
710                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
711                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
712                 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
713                 self.failIf(prerepairres.is_healthy())
714                 self.failUnless(postrepairres.is_healthy())
715
716                 # Now we inspect the filesystem to make sure that it has 10 shares.
717                 shares = self.find_shares()
718                 self.failIf(len(shares) < 10)
719
720                 # Now we delete seven of the other shares, then try to download the file and
721                 # assert that it succeeds at downloading and has the right contents.  This can't
722                 # work unless it has already repaired the previously-deleted share #2.
723                 for sharenum in range(3, 10):
724                     self._delete_a_share(sharenum=sharenum)
725
726                 return self._download_and_check_plaintext()
727
728             d2.addCallback(_after_repair)
729             return d2
730         d.addCallback(_repair_from_deletion_of_1)
731
732         # Now we repair again to get all of those 7 back...
733         def _repair_from_deletion_of_7(filenode):
734             before_repair_reads = self._count_reads()
735             before_repair_allocates = self._count_allocates()
736
737             d2 = filenode.check_and_repair(Monitor(), verify=False)
738             def _after_repair(checkandrepairresults):
739                 prerepairres = checkandrepairresults.get_pre_repair_results()
740                 postrepairres = checkandrepairresults.get_post_repair_results()
741                 after_repair_reads = self._count_reads()
742                 after_repair_allocates = self._count_allocates()
743
744                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
745                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
746                 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_ALLOCATES*7))
747                 self.failIf(prerepairres.is_healthy())
748                 self.failUnless(postrepairres.is_healthy())
749
750                 # Now we inspect the filesystem to make sure that it has 10 shares.
751                 shares = self.find_shares()
752                 self.failIf(len(shares) < 10)
753
754                 return self._download_and_check_plaintext()
755
756             d2.addCallback(_after_repair)
757             return d2
758         d.addCallback(_repair_from_deletion_of_7)
759
760         def _repair_from_corruption(filenode):
761             before_repair_reads = self._count_reads()
762             before_repair_allocates = self._count_allocates()
763
764             d2 = filenode.check_and_repair(Monitor(), verify=False)
765             def _after_repair(checkandrepairresults):
766                 prerepairres = checkandrepairresults.get_pre_repair_results()
767                 postrepairres = checkandrepairresults.get_post_repair_results()
768                 after_repair_reads = self._count_reads()
769                 after_repair_allocates = self._count_allocates()
770
771                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
772                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
773                 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
774                 self.failIf(prerepairres.is_healthy())
775                 self.failUnless(postrepairres.is_healthy())
776
777                 return self._download_and_check_plaintext()
778
779             d2.addCallback(_after_repair)
780             return d2
781
782         for corruptor_func in (
783             _corrupt_file_version_number,
784             _corrupt_sharedata_version_number,
785             _corrupt_sharedata_version_number_to_known_version,
786             _corrupt_offset_of_sharedata,
787             _corrupt_offset_of_ciphertext_hash_tree,
788             _corrupt_offset_of_block_hashes,
789             _corrupt_offset_of_share_hashes,
790             _corrupt_offset_of_uri_extension,
791             _corrupt_share_data,
792             _corrupt_crypttext_hash_tree,
793             _corrupt_block_hashes,
794             _corrupt_share_hashes,
795             _corrupt_length_of_uri_extension,
796             _corrupt_uri_extension,
797             ):
798             # Now we corrupt a share...
799             d.addCallback(self._corrupt_a_random_share, corruptor_func)
800             # And repair...
801             d.addCallback(_repair_from_corruption)
802
803         return d
804     test_repair.todo = "We haven't implemented a repairer yet."
805
806
807 # XXX extend these tests to show that the checker detects which specific share on which specific server is broken -- this is necessary so that the checker results can be passed to the repairer and the repairer can go ahead and upload fixes without first doing what is effectively a check (/verify) run
808
809 # XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example
810
811 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit