]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_immutable.py
immutable: new checker and verifier
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_immutable.py
1 from allmydata.test.common import SystemTestMixin, ShareManglingMixin
2 from allmydata.monitor import Monitor
3 from allmydata.interfaces import IURI, NotEnoughSharesError
4 from allmydata.immutable import upload
5 from allmydata.util import log
6 from twisted.internet import defer
7 from twisted.trial import unittest
8 import random, struct
9 import common_util as testutil
10
11 TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
12
13 def corrupt_field(data, offset, size, debug=False):
14     if random.random() < 0.5:
15         newdata = testutil.flip_one_bit(data, offset, size)
16         if debug:
17             log.msg("testing: corrupting offset %d, size %d flipping one bit orig: %r, newdata: %r" % (offset, size, data[offset:offset+size], newdata[offset:offset+size]))
18         return newdata
19     else:
20         newval = testutil.insecurerandstr(size)
21         if debug:
22             log.msg("testing: corrupting offset %d, size %d randomizing field, orig: %r, newval: %r" % (offset, size, data[offset:offset+size], newval))
23         return data[:offset]+newval+data[offset+size:]
24
25 def _corrupt_nothing(data):
26     """ Leave the data pristine. """
27     return data
28
29 def _corrupt_file_version_number(data):
30     """ Scramble the file data -- the share file version number have one bit flipped or else
31     will be changed to a random value."""
32     return corrupt_field(data, 0x00, 4)
33
34 def _corrupt_size_of_file_data(data):
35     """ Scramble the file data -- the field showing the size of the share data within the file
36     will be set to one smaller. """
37     return corrupt_field(data, 0x04, 4)
38
39 def _corrupt_sharedata_version_number(data):
40     """ Scramble the file data -- the share data version number will have one bit flipped or
41     else will be changed to a random value, but not 1 or 2."""
42     return corrupt_field(data, 0x0c, 4)
43     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
44     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
45     newsharevernum = sharevernum
46     while newsharevernum in (1, 2):
47         newsharevernum = random.randrange(0, 2**32)
48     newsharevernumbytes = struct.pack(">l", newsharevernum)
49     return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
50
51 def _corrupt_sharedata_version_number_to_plausible_version(data):
52     """ Scramble the file data -- the share data version number will
53     be changed to 2 if it is 1 or else to 1 if it is 2."""
54     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
55     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
56     if sharevernum == 1:
57         newsharevernum = 2
58     else:
59         newsharevernum = 1
60     newsharevernumbytes = struct.pack(">l", newsharevernum)
61     return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
62
63 def _corrupt_segment_size(data):
64     """ Scramble the file data -- the field showing the size of the segment will have one
65     bit flipped or else be changed to a random value. """
66     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
67     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
68     if sharevernum == 1:
69         return corrupt_field(data, 0x0c+0x04, 4, debug=False)
70     else:
71         return corrupt_field(data, 0x0c+0x04, 8, debug=False)
72
73 def _corrupt_size_of_sharedata(data):
74     """ Scramble the file data -- the field showing the size of the data within the share
75     data will have one bit flipped or else will be changed to a random value. """
76     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
77     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
78     if sharevernum == 1:
79         return corrupt_field(data, 0x0c+0x08, 4)
80     else:
81         return corrupt_field(data, 0x0c+0x0c, 8)
82
83 def _corrupt_offset_of_sharedata(data):
84     """ Scramble the file data -- the field showing the offset of the data within the share
85     data will have one bit flipped or else be changed to a random value. """
86     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
87     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
88     if sharevernum == 1:
89         return corrupt_field(data, 0x0c+0x0c, 4)
90     else:
91         return corrupt_field(data, 0x0c+0x14, 8)
92
93 def _corrupt_offset_of_ciphertext_hash_tree(data):
94     """ Scramble the file data -- the field showing the offset of the ciphertext hash tree
95     within the share data will have one bit flipped or else be changed to a random value.
96     """
97     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
98     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
99     if sharevernum == 1:
100         return corrupt_field(data, 0x0c+0x14, 4, debug=False)
101     else:
102         return corrupt_field(data, 0x0c+0x24, 8, debug=False)
103
104 def _corrupt_offset_of_block_hashes(data):
105     """ Scramble the file data -- the field showing the offset of the block hash tree within
106     the share data will have one bit flipped or else will be changed to a random value. """
107     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
108     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
109     if sharevernum == 1:
110         return corrupt_field(data, 0x0c+0x18, 4)
111     else:
112         return corrupt_field(data, 0x0c+0x2c, 8)
113
114 def _corrupt_offset_of_share_hashes(data):
115     """ Scramble the file data -- the field showing the offset of the share hash tree within
116     the share data will have one bit flipped or else will be changed to a random value. """
117     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
118     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
119     if sharevernum == 1:
120         return corrupt_field(data, 0x0c+0x1c, 4)
121     else:
122         return corrupt_field(data, 0x0c+0x34, 8)
123
124 def _corrupt_offset_of_uri_extension(data):
125     """ Scramble the file data -- the field showing the offset of the uri extension will
126     have one bit flipped or else will be changed to a random value. """
127     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
128     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
129     if sharevernum == 1:
130         return corrupt_field(data, 0x0c+0x20, 4)
131     else:
132         return corrupt_field(data, 0x0c+0x3c, 8)
133
134 def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False):
135     """ Scramble the file data -- the field showing the offset of the uri extension will be set
136     to the size of the file minus 3.  This means when the client tries to read the length field
137     from that location it will get a short read -- the result string will be only 3 bytes long,
138     not the 4 or 8 bytes necessary to do a successful struct.unpack."""
139     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
140     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
141     # The "-0x0c" in here is to skip the server-side header in the share file, which the client doesn't see when seeking and reading.
142     if sharevernum == 1:
143         if debug:
144             log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x2c, 4, struct.unpack(">L", data[0x2c:0x2c+4])[0], len(data)-0x0c-3, len(data)))
145         return data[:0x2c] + struct.pack(">L", len(data)-0x0c-3) + data[0x2c+4:]
146     else:
147         if debug:
148             log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x48, 8, struct.unpack(">Q", data[0x48:0x48+8])[0], len(data)-0x0c-3, len(data)))
149         return data[:0x48] + struct.pack(">Q", len(data)-0x0c-3) + data[0x48+8:]
150
151 def _corrupt_share_data(data):
152     """ Scramble the file data -- the field containing the share data itself will have one
153     bit flipped or else will be changed to a random value. """
154     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
155     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
156     if sharevernum == 1:
157         sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
158
159         return corrupt_field(data, 0x0c+0x24, sharedatasize)
160     else:
161         sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
162
163         return corrupt_field(data, 0x0c+0x44, sharedatasize)
164
165 def _corrupt_crypttext_hash_tree(data):
166     """ Scramble the file data -- the field containing the crypttext hash tree will have one
167     bit flipped or else will be changed to a random value.
168     """
169     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
170     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
171     if sharevernum == 1:
172         crypttexthashtreeoffset = struct.unpack(">L", data[0x0c+0x14:0x0c+0x14+4])[0]
173         blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
174     else:
175         crypttexthashtreeoffset = struct.unpack(">Q", data[0x0c+0x24:0x0c+0x24+8])[0]
176         blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
177
178     return corrupt_field(data, crypttexthashtreeoffset, blockhashesoffset-crypttexthashtreeoffset)
179
180 def _corrupt_block_hashes(data):
181     """ Scramble the file data -- the field containing the block hash tree will have one bit
182     flipped or else will be changed to a random value.
183     """
184     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
185     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
186     if sharevernum == 1:
187         blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
188         sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
189     else:
190         blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
191         sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
192
193     return corrupt_field(data, blockhashesoffset, sharehashesoffset-blockhashesoffset)
194
195 def _corrupt_share_hashes(data):
196     """ Scramble the file data -- the field containing the share hash chain will have one
197     bit flipped or else will be changed to a random value.
198     """
199     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
200     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
201     if sharevernum == 1:
202         sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
203         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
204     else:
205         sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
206         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
207
208     return corrupt_field(data, sharehashesoffset, uriextoffset-sharehashesoffset)
209
210 def _corrupt_length_of_uri_extension(data):
211     """ Scramble the file data -- the field showing the length of the uri extension will
212     have one bit flipped or else will be changed to a random value. """
213     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
214     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
215     if sharevernum == 1:
216         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
217         return corrupt_field(data, uriextoffset, 4)
218     else:
219         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
220         return corrupt_field(data, uriextoffset, 8)
221
222 def _corrupt_uri_extension(data):
223     """ Scramble the file data -- the field containing the uri extension will have one bit
224     flipped or else will be changed to a random value. """
225     sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
226     assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
227     if sharevernum == 1:
228         uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
229         uriextlen = struct.unpack(">L", data[0x0c+uriextoffset:0x0c+uriextoffset+4])[0]
230     else:
231         uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
232         uriextlen = struct.unpack(">Q", data[0x0c+uriextoffset:0x0c+uriextoffset+8])[0]
233
234     return corrupt_field(data, uriextoffset, uriextlen)
235
236 class Test(ShareManglingMixin, unittest.TestCase):
237     def setUp(self):
238         # Set self.basedir to a temp dir which has the name of the current test method in its
239         # name.
240         self.basedir = self.mktemp()
241
242         d = defer.maybeDeferred(SystemTestMixin.setUp, self)
243         d.addCallback(lambda x: self.set_up_nodes())
244
245         def _upload_a_file(ignored):
246             d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
247             def _after_upload(u):
248                 self.uri = IURI(u.uri)
249                 return self.clients[0].create_node_from_uri(self.uri)
250             d2.addCallback(_after_upload)
251             return d2
252         d.addCallback(_upload_a_file)
253
254         def _stash_it(filenode):
255             self.filenode = filenode
256         d.addCallback(_stash_it)
257         return d
258
259     def _download_and_check_plaintext(self, unused=None):
260         self.downloader = self.clients[1].getServiceNamed("downloader")
261         d = self.downloader.download_to_data(self.uri)
262
263         def _after_download(result):
264             self.failUnlessEqual(result, TEST_DATA)
265         d.addCallback(_after_download)
266         return d
267
268     def _delete_a_share(self, unused=None, sharenum=None):
269         """ Delete one share. """
270
271         shares = self.find_shares()
272         ks = shares.keys()
273         if sharenum is not None:
274             k = [ key for key in shares.keys() if key[1] == sharenum ][0]
275         else:
276             k = random.choice(ks)
277         del shares[k]
278         self.replace_shares(shares, storage_index=self.uri.storage_index)
279
280         return unused
281
282     def test_test_code(self):
283         # The following process of stashing the shares, running
284         # replace_shares, and asserting that the new set of shares equals the
285         # old is more to test this test code than to test the Tahoe code...
286         d = defer.succeed(None)
287         d.addCallback(self.find_shares)
288         stash = [None]
289         def _stash_it(res):
290             stash[0] = res
291             return res
292         d.addCallback(_stash_it)
293         d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
294
295         def _compare(res):
296             oldshares = stash[0]
297             self.failUnless(isinstance(oldshares, dict), oldshares)
298             self.failUnlessEqual(oldshares, res)
299
300         d.addCallback(self.find_shares)
301         d.addCallback(_compare)
302
303         d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
304         d.addCallback(self.find_shares)
305         d.addCallback(lambda x: self.failUnlessEqual(x, {}))
306
307         # The following process of deleting 8 of the shares and asserting that you can't
308         # download it is more to test this test code than to test the Tahoe code...
309         def _then_delete_8(unused=None):
310             self.replace_shares(stash[0], storage_index=self.uri.storage_index)
311             for i in range(8):
312                 self._delete_a_share()
313         d.addCallback(_then_delete_8)
314
315         def _then_download(unused=None):
316             self.downloader = self.clients[1].getServiceNamed("downloader")
317             d = self.downloader.download_to_data(self.uri)
318
319             def _after_download_callb(result):
320                 self.fail() # should have gotten an errback instead
321                 return result
322             def _after_download_errb(failure):
323                 failure.trap(NotEnoughSharesError)
324                 return None # success!
325             d.addCallbacks(_after_download_callb, _after_download_errb)
326         d.addCallback(_then_download)
327
328         # The following process of leaving 8 of the shares deleted and asserting that you can't
329         # repair it is more to test this test code than to test the Tahoe code...
330         #TODO def _then_repair(unused=None):
331         #TODO     d2 = self.filenode.check_and_repair(Monitor(), verify=False)
332         #TODO     def _after_repair(checkandrepairresults):
333         #TODO         prerepairres = checkandrepairresults.get_pre_repair_results()
334         #TODO         postrepairres = checkandrepairresults.get_post_repair_results()
335         #TODO         self.failIf(prerepairres.is_healthy())
336         #TODO         self.failIf(postrepairres.is_healthy())
337         #TODO     d2.addCallback(_after_repair)
338         #TODO     return d2
339         #TODO d.addCallback(_then_repair)
340         return d
341
342     def _count_reads(self):
343         sum_of_read_counts = 0
344         for client in self.clients:
345             counters = client.stats_provider.get_stats()['counters']
346             sum_of_read_counts += counters.get('storage_server.read', 0)
347         return sum_of_read_counts
348
349     def _count_allocates(self):
350         sum_of_allocate_counts = 0
351         for client in self.clients:
352             counters = client.stats_provider.get_stats()['counters']
353             sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
354         return sum_of_allocate_counts
355
356     def _corrupt_a_share(self, unused, corruptor_func, sharenum):
357         shares = self.find_shares()
358         ks = [ key for key in shares.keys() if key[1] == sharenum ]
359         assert ks, (shares.keys(), sharenum)
360         k = ks[0]
361         shares[k] = corruptor_func(shares[k])
362         self.replace_shares(shares, storage_index=self.uri.storage_index)
363         return corruptor_func
364
365     def _corrupt_all_shares(self, unused, corruptor_func):
366         """ All shares on disk will be corrupted by corruptor_func. """
367         shares = self.find_shares()
368         for k in shares.keys():
369             self._corrupt_a_share(unused, corruptor_func, k[1])
370         return corruptor_func
371
372     def _corrupt_a_random_share(self, unused, corruptor_func):
373         """ Exactly one share on disk will be corrupted by corruptor_func. """
374         shares = self.find_shares()
375         ks = shares.keys()
376         k = random.choice(ks)
377         self._corrupt_a_share(unused, corruptor_func, k[1])
378         return corruptor_func
379
380     def test_download(self):
381         """ Basic download.  (This functionality is more or less already tested by test code in
382         other modules, but this module is also going to test some more specific things about
383         immutable download.)
384         """
385         d = defer.succeed(None)
386         before_download_reads = self._count_reads()
387         def _after_download(unused=None):
388             after_download_reads = self._count_reads()
389             # To pass this test, you have to download the file using only 10 reads total: 3 to
390             # get the headers from each share, 3 to get the share hash trees and uebs from each
391             # share, 1 to get the crypttext hashes, and 3 to get the block data from each share.
392             self.failIf(after_download_reads-before_download_reads > 12, (after_download_reads, before_download_reads))
393         d.addCallback(self._download_and_check_plaintext)
394         d.addCallback(_after_download)
395         return d
396
397     def test_download_from_only_3_remaining_shares(self):
398         """ Test download after 7 random shares (of the 10) have been removed. """
399         d = defer.succeed(None)
400         def _then_delete_7(unused=None):
401             for i in range(7):
402                 self._delete_a_share()
403         before_download_reads = self._count_reads()
404         d.addCallback(_then_delete_7)
405         def _after_download(unused=None):
406             after_download_reads = self._count_reads()
407             # To pass this test, you have to download the file using only 10 reads to get the
408             # UEB (in parallel from all shares), plus one read for each of the 3 shares.
409             self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
410         d.addCallback(self._download_and_check_plaintext)
411         d.addCallback(_after_download)
412         return d
413
414     def test_download_abort_if_too_many_missing_shares(self):
415         """ Test that download gives up quickly when it realizes there aren't enough shares out
416         there."""
417         d = defer.succeed(None)
418         def _then_delete_8(unused=None):
419             for i in range(8):
420                 self._delete_a_share()
421         d.addCallback(_then_delete_8)
422
423         before_download_reads = self._count_reads()
424         def _attempt_to_download(unused=None):
425             downloader = self.clients[1].getServiceNamed("downloader")
426             d = downloader.download_to_data(self.uri)
427
428             def _callb(res):
429                 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
430             def _errb(f):
431                 self.failUnless(f.check(NotEnoughSharesError))
432             d.addCallbacks(_callb, _errb)
433             return d
434
435         d.addCallback(_attempt_to_download)
436
437         def _after_attempt(unused=None):
438             after_download_reads = self._count_reads()
439             # To pass this test, you are required to give up before actually trying to read any
440             # share data.
441             self.failIf(after_download_reads-before_download_reads > 0, (after_download_reads, before_download_reads))
442         d.addCallback(_after_attempt)
443         return d
444
445     def test_download_abort_if_too_many_corrupted_shares(self):
446         """ Test that download gives up quickly when it realizes there aren't enough uncorrupted
447         shares out there. It should be able to tell because the corruption occurs in the
448         sharedata version number, which it checks first."""
449         d = defer.succeed(None)
450         def _then_corrupt_8(unused=None):
451             shnums = range(10)
452             random.shuffle(shnums)
453             for shnum in shnums[:8]:
454                 self._corrupt_a_share(None, _corrupt_sharedata_version_number, shnum)
455         d.addCallback(_then_corrupt_8)
456
457         before_download_reads = self._count_reads()
458         def _attempt_to_download(unused=None):
459             downloader = self.clients[1].getServiceNamed("downloader")
460             d = downloader.download_to_data(self.uri)
461
462             def _callb(res):
463                 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
464             def _errb(f):
465                 self.failUnless(f.check(NotEnoughSharesError))
466             d.addCallbacks(_callb, _errb)
467             return d
468
469         d.addCallback(_attempt_to_download)
470
471         def _after_attempt(unused=None):
472             after_download_reads = self._count_reads()
473             # To pass this test, you are required to give up before reading all of the share
474             # data.  Actually, we could give up sooner than 45 reads, but currently our download
475             # code does 45 reads.  This test then serves as a "performance regression detector"
476             # -- if you change download code so that it takes *more* reads, then this test will
477             # fail.
478             self.failIf(after_download_reads-before_download_reads > 45, (after_download_reads, before_download_reads))
479         d.addCallback(_after_attempt)
480         return d
481
482     def test_check_without_verify(self):
483         """ Check says the file is healthy when none of the shares have been touched.  It says
484         that the file is unhealthy when all of them have been removed. It doesn't use any reads.
485         """
486         d = defer.succeed(self.filenode)
487         def _check1(filenode):
488             before_check_reads = self._count_reads()
489
490             d2 = filenode.check(Monitor(), verify=False)
491             def _after_check(checkresults):
492                 after_check_reads = self._count_reads()
493                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
494                 self.failUnless(checkresults.is_healthy())
495
496             d2.addCallback(_after_check)
497             return d2
498         d.addCallback(_check1)
499
500         d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
501         def _check2(ignored):
502             before_check_reads = self._count_reads()
503             d2 = self.filenode.check(Monitor(), verify=False)
504
505             def _after_check(checkresults):
506                 after_check_reads = self._count_reads()
507                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
508                 self.failIf(checkresults.is_healthy())
509
510             d2.addCallback(_after_check)
511             return d2
512         d.addCallback(_check2)
513
514         return d
515
516     def _help_test_verify(self, corruptor_funcs, judgement_func):
517         LEEWAY = 7 # We'll allow you to pass this test even if you trigger seven times as many disk reads and blocks sends as would be optimal.
518         DELTA_READS = 10 * LEEWAY # N = 10
519         d = defer.succeed(None)
520
521         d.addCallback(self.find_shares)
522         stash = [None]
523         def _stash_it(res):
524             stash[0] = res
525             return res
526         d.addCallback(_stash_it)
527         def _put_it_all_back(ignored):
528             self.replace_shares(stash[0], storage_index=self.uri.storage_index)
529             return ignored
530
531         def _verify_after_corruption(corruptor_func):
532             before_check_reads = self._count_reads()
533             d2 = self.filenode.check(Monitor(), verify=True)
534             def _after_check(checkresults):
535                 after_check_reads = self._count_reads()
536                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
537                 try:
538                     return judgement_func(checkresults)
539                 except Exception, le:
540                     le.args = tuple(le.args + ("corruptor_func: " + corruptor_func.__name__,))
541                     raise
542
543             d2.addCallback(_after_check)
544             return d2
545
546         for corruptor_func in corruptor_funcs:
547             d.addCallback(self._corrupt_a_random_share, corruptor_func)
548             d.addCallback(_verify_after_corruption)
549             d.addCallback(_put_it_all_back)
550
551         return d
552
553     def test_verify_no_problem(self):
554         """ Verify says the file is healthy when none of the shares have been touched in a way
555         that matters. It doesn't use more than seven times as many reads as it needs."""
556         def judge(checkresults):
557             self.failUnless(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
558             data = checkresults.get_data()
559             self.failUnless(data['count-shares-good'] == 10, data)
560             self.failUnless(len(data['sharemap']) == 10, data)
561             self.failUnless(data['count-shares-needed'] == 3, data)
562             self.failUnless(data['count-shares-expected'] == 10, data)
563             self.failUnless(data['count-good-share-hosts'] == 5, data)
564             self.failUnless(len(data['servers-responding']) == 5, data)
565             self.failUnless(len(data['list-corrupt-shares']) == 0, data)
566         return self._help_test_verify([
567             _corrupt_nothing,
568             _corrupt_size_of_file_data,
569             _corrupt_size_of_sharedata,
570             _corrupt_segment_size, ], judge)
571
572     def test_verify_server_visible_corruption(self):
573         """ Corruption which is detected by the server means that the server will send you back
574         a Failure in response to get_bucket instead of giving you the share data.  Test that
575         verifier handles these answers correctly. It doesn't use more than seven times as many
576         reads as it needs."""
577         def judge(checkresults):
578             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
579             data = checkresults.get_data()
580             # The server might fail to serve up its other share as well as the corrupted
581             # one, so count-shares-good could be 8 or 9.
582             self.failUnless(data['count-shares-good'] in (8, 9), data)
583             self.failUnless(len(data['sharemap']) in (8, 9,), data)
584             self.failUnless(data['count-shares-needed'] == 3, data)
585             self.failUnless(data['count-shares-expected'] == 10, data)
586             # The server may have served up the non-corrupted share, or it may not have, so
587             # the checker could have detected either 4 or 5 good servers.
588             self.failUnless(data['count-good-share-hosts'] in (4, 5), data)
589             self.failUnless(len(data['servers-responding']) in (4, 5), data)
590             # If the server served up the other share, then the checker should consider it good, else it should
591             # not.
592             self.failUnless((data['count-shares-good'] == 9) == (data['count-good-share-hosts'] == 5), data)
593             self.failUnless(len(data['list-corrupt-shares']) == 0, data)
594         return self._help_test_verify([
595             _corrupt_file_version_number,
596             ], judge)
597
598     def test_verify_share_incompatibility(self):
599         def judge(checkresults):
600             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
601             data = checkresults.get_data()
602             self.failUnless(data['count-shares-good'] == 9, data)
603             self.failUnless(len(data['sharemap']) == 9, data)
604             self.failUnless(data['count-shares-needed'] == 3, data)
605             self.failUnless(data['count-shares-expected'] == 10, data)
606             self.failUnless(data['count-good-share-hosts'] == 5, data)
607             self.failUnless(len(data['servers-responding']) == 5, data)
608             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
609             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
610             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
611             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
612         return self._help_test_verify([
613             _corrupt_sharedata_version_number,
614             ], judge)
615
616     def test_verify_server_invisible_corruption(self):
617         def judge(checkresults):
618             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
619             data = checkresults.get_data()
620             self.failUnless(data['count-shares-good'] == 9, data)
621             self.failUnless(data['count-shares-needed'] == 3, data)
622             self.failUnless(data['count-shares-expected'] == 10, data)
623             self.failUnless(data['count-good-share-hosts'] == 5, data)
624             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
625             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
626             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
627             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
628             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
629             self.failUnless(len(data['servers-responding']) == 5, data)
630             self.failUnless(len(data['sharemap']) == 9, data)
631         return self._help_test_verify([
632             _corrupt_offset_of_sharedata,
633             _corrupt_offset_of_uri_extension,
634             _corrupt_offset_of_uri_extension_to_force_short_read,
635             _corrupt_share_data,
636             _corrupt_share_hashes,
637             _corrupt_length_of_uri_extension,
638             _corrupt_uri_extension,
639             ], judge)
640
641     def test_verify_server_invisible_corruption_offset_of_block_hashtree_TODO(self):
642         def judge(checkresults):
643             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
644             data = checkresults.get_data()
645             self.failUnless(data['count-shares-good'] == 9, data)
646             self.failUnless(data['count-shares-needed'] == 3, data)
647             self.failUnless(data['count-shares-expected'] == 10, data)
648             self.failUnless(data['count-good-share-hosts'] == 5, data)
649             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
650             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
651             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
652             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
653             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
654             self.failUnless(len(data['servers-responding']) == 5, data)
655             self.failUnless(len(data['sharemap']) == 9, data)
656         return self._help_test_verify([
657             _corrupt_offset_of_block_hashes,
658             ], judge)
659     test_verify_server_invisible_corruption_offset_of_block_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
660
661     def test_verify_server_invisible_corruption_sharedata_plausible_version(self):
662         def judge(checkresults):
663             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
664             data = checkresults.get_data()
665             self.failUnless(data['count-shares-good'] == 9, data)
666             self.failUnless(data['count-shares-needed'] == 3, data)
667             self.failUnless(data['count-shares-expected'] == 10, data)
668             self.failUnless(data['count-good-share-hosts'] == 5, data)
669             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
670             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
671             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
672             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
673             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
674             self.failUnless(len(data['servers-responding']) == 5, data)
675             self.failUnless(len(data['sharemap']) == 9, data)
676         return self._help_test_verify([
677             _corrupt_sharedata_version_number_to_plausible_version,
678             ], judge)
679
680     def test_verify_server_invisible_corruption_offset_of_share_hashtree_TODO(self):
681         def judge(checkresults):
682             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
683             data = checkresults.get_data()
684             self.failUnless(data['count-shares-good'] == 9, data)
685             self.failUnless(data['count-shares-needed'] == 3, data)
686             self.failUnless(data['count-shares-expected'] == 10, data)
687             self.failUnless(data['count-good-share-hosts'] == 5, data)
688             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
689             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
690             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
691             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
692             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
693             self.failUnless(len(data['servers-responding']) == 5, data)
694             self.failUnless(len(data['sharemap']) == 9, data)
695         return self._help_test_verify([
696             _corrupt_offset_of_share_hashes,
697             ], judge)
698     test_verify_server_invisible_corruption_offset_of_share_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
699
700     def test_verify_server_invisible_corruption_offset_of_ciphertext_hashtree_TODO(self):
701         def judge(checkresults):
702             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
703             data = checkresults.get_data()
704             self.failUnless(data['count-shares-good'] == 9, data)
705             self.failUnless(data['count-shares-needed'] == 3, data)
706             self.failUnless(data['count-shares-expected'] == 10, data)
707             self.failUnless(data['count-good-share-hosts'] == 5, data)
708             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
709             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
710             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
711             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
712             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
713             self.failUnless(len(data['servers-responding']) == 5, data)
714             self.failUnless(len(data['sharemap']) == 9, data)
715         return self._help_test_verify([
716             _corrupt_offset_of_ciphertext_hash_tree,
717             ], judge)
718     test_verify_server_invisible_corruption_offset_of_ciphertext_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
719
720     def test_verify_server_invisible_corruption_cryptext_hash_tree_TODO(self):
721         def judge(checkresults):
722             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
723             data = checkresults.get_data()
724             self.failUnless(data['count-shares-good'] == 9, data)
725             self.failUnless(data['count-shares-needed'] == 3, data)
726             self.failUnless(data['count-shares-expected'] == 10, data)
727             self.failUnless(data['count-good-share-hosts'] == 5, data)
728             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
729             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
730             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
731             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
732             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
733             self.failUnless(len(data['servers-responding']) == 5, data)
734             self.failUnless(len(data['sharemap']) == 9, data)
735         return self._help_test_verify([
736             _corrupt_crypttext_hash_tree,
737             ], judge)
738     test_verify_server_invisible_corruption_cryptext_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
739
740     def test_verify_server_invisible_corruption_block_hash_tree_TODO(self):
741         def judge(checkresults):
742             self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
743             data = checkresults.get_data()
744             self.failUnless(data['count-shares-good'] == 9, data)
745             self.failUnless(data['count-shares-needed'] == 3, data)
746             self.failUnless(data['count-shares-expected'] == 10, data)
747             self.failUnless(data['count-good-share-hosts'] == 5, data)
748             self.failUnless(data['count-corrupt-shares'] == 1, (data,))
749             self.failUnless(len(data['list-corrupt-shares']) == 1, data)
750             self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
751             self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
752             self.failUnless(len(data['list-incompatible-shares']) == 0, data)
753             self.failUnless(len(data['servers-responding']) == 5, data)
754             self.failUnless(len(data['sharemap']) == 9, data)
755         return self._help_test_verify([
756             _corrupt_block_hashes,
757             ], judge)
758     test_verify_server_invisible_corruption_block_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
759
760     def test_repair(self):
761         """ Repair replaces a share that got deleted. """
762         # N == 10.  7 is the "efficiency leeway" -- we'll allow you to pass this test even if
763         # you trigger seven times as many disk reads and blocks sends as would be optimal.
764         DELTA_READS = 10 * 7
765         # We'll allow you to pass this test only if you repair the missing share using only a
766         # single allocate.
767         DELTA_ALLOCATES = 1
768
769         d = defer.succeed(self.filenode)
770         d.addCallback(self._delete_a_share, sharenum=2)
771
772         def _repair_from_deletion_of_1(filenode):
773             before_repair_reads = self._count_reads()
774             before_repair_allocates = self._count_allocates()
775
776             d2 = filenode.check_and_repair(Monitor(), verify=False)
777             def _after_repair(checkandrepairresults):
778                 prerepairres = checkandrepairresults.get_pre_repair_results()
779                 postrepairres = checkandrepairresults.get_post_repair_results()
780                 after_repair_reads = self._count_reads()
781                 after_repair_allocates = self._count_allocates()
782
783                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
784                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
785                 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
786                 self.failIf(prerepairres.is_healthy())
787                 self.failUnless(postrepairres.is_healthy())
788
789                 # Now we inspect the filesystem to make sure that it has 10 shares.
790                 shares = self.find_shares()
791                 self.failIf(len(shares) < 10)
792
793                 # Now we delete seven of the other shares, then try to download the file and
794                 # assert that it succeeds at downloading and has the right contents.  This can't
795                 # work unless it has already repaired the previously-deleted share #2.
796                 for sharenum in range(3, 10):
797                     self._delete_a_share(sharenum=sharenum)
798
799                 return self._download_and_check_plaintext()
800
801             d2.addCallback(_after_repair)
802             return d2
803         d.addCallback(_repair_from_deletion_of_1)
804
805         # Now we repair again to get all of those 7 back...
806         def _repair_from_deletion_of_7(filenode):
807             before_repair_reads = self._count_reads()
808             before_repair_allocates = self._count_allocates()
809
810             d2 = filenode.check_and_repair(Monitor(), verify=False)
811             def _after_repair(checkandrepairresults):
812                 prerepairres = checkandrepairresults.get_pre_repair_results()
813                 postrepairres = checkandrepairresults.get_post_repair_results()
814                 after_repair_reads = self._count_reads()
815                 after_repair_allocates = self._count_allocates()
816
817                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
818                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
819                 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_ALLOCATES*7))
820                 self.failIf(prerepairres.is_healthy())
821                 self.failUnless(postrepairres.is_healthy())
822
823                 # Now we inspect the filesystem to make sure that it has 10 shares.
824                 shares = self.find_shares()
825                 self.failIf(len(shares) < 10)
826
827                 return self._download_and_check_plaintext()
828
829             d2.addCallback(_after_repair)
830             return d2
831         d.addCallback(_repair_from_deletion_of_7)
832
833         def _repair_from_corruption(filenode):
834             before_repair_reads = self._count_reads()
835             before_repair_allocates = self._count_allocates()
836
837             d2 = filenode.check_and_repair(Monitor(), verify=False)
838             def _after_repair(checkandrepairresults):
839                 prerepairres = checkandrepairresults.get_pre_repair_results()
840                 postrepairres = checkandrepairresults.get_post_repair_results()
841                 after_repair_reads = self._count_reads()
842                 after_repair_allocates = self._count_allocates()
843
844                 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
845                 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
846                 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
847                 self.failIf(prerepairres.is_healthy())
848                 self.failUnless(postrepairres.is_healthy())
849
850                 return self._download_and_check_plaintext()
851
852             d2.addCallback(_after_repair)
853             return d2
854
855         for corruptor_func in (
856             _corrupt_file_version_number,
857             _corrupt_sharedata_version_number,
858             _corrupt_sharedata_version_number_to_plausible_version,
859             _corrupt_offset_of_sharedata,
860             _corrupt_offset_of_ciphertext_hash_tree,
861             _corrupt_offset_of_block_hashes,
862             _corrupt_offset_of_share_hashes,
863             _corrupt_offset_of_uri_extension,
864             _corrupt_share_data,
865             _corrupt_crypttext_hash_tree,
866             _corrupt_block_hashes,
867             _corrupt_share_hashes,
868             _corrupt_length_of_uri_extension,
869             _corrupt_uri_extension,
870             ):
871             # Now we corrupt a share...
872             d.addCallback(self._corrupt_a_random_share, corruptor_func)
873             # And repair...
874             d.addCallback(_repair_from_corruption)
875
876         return d
877     test_repair.todo = "We haven't implemented a repairer yet."
878
879
880 # XXX extend these tests to show that the checker detects which specific share on which specific server is broken -- this is necessary so that the checker results can be passed to the repairer and the repairer can go ahead and upload fixes without first doing what is effectively a check (/verify) run
881
882 # XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example
883
884 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit