]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_repairer.py
repairer: fix some wrong offsets in the randomized verifier tests, debugged by Brian
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_repairer.py
1 # -*- coding: utf-8 -*-
2 from allmydata.test import common
3 from allmydata.monitor import Monitor
4 from allmydata import check_results
5 from allmydata.interfaces import NotEnoughSharesError
6 from allmydata.immutable import repairer, upload
7 from allmydata.util.consumer import download_to_data
8 from twisted.internet import defer
9 from twisted.trial import unittest
10 import random
11 from no_network import GridTestMixin
12
13 # We'll allow you to pass this test even if you trigger eighteen times as
14 # many disk reads and block fetches as would be optimal.
15 READ_LEEWAY = 18
16 MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10
17
18 timeout=240 # François's ARM box timed out after 120 seconds of Verifier.test_corrupt_crypttext_hashtree
19
20 class RepairTestMixin:
21     def failUnlessIsInstance(self, x, xtype):
22         self.failUnless(isinstance(x, xtype), x)
23
24     def _count_reads(self):
25         sum_of_read_counts = 0
26         for (i, ss, storedir) in self.iterate_servers():
27             counters = ss.stats_provider.get_stats()['counters']
28             sum_of_read_counts += counters.get('storage_server.read', 0)
29         return sum_of_read_counts
30
31     def _count_allocates(self):
32         sum_of_allocate_counts = 0
33         for (i, ss, storedir) in self.iterate_servers():
34             counters = ss.stats_provider.get_stats()['counters']
35             sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
36         return sum_of_allocate_counts
37
38     def _count_writes(self):
39         sum_of_write_counts = 0
40         for (i, ss, storedir) in self.iterate_servers():
41             counters = ss.stats_provider.get_stats()['counters']
42             sum_of_write_counts += counters.get('storage_server.write', 0)
43         return sum_of_write_counts
44
45     def _stash_counts(self):
46         self.before_repair_reads = self._count_reads()
47         self.before_repair_allocates = self._count_allocates()
48         self.before_repair_writes = self._count_writes()
49
50     def _get_delta_counts(self):
51         delta_reads = self._count_reads() - self.before_repair_reads
52         delta_allocates = self._count_allocates() - self.before_repair_allocates
53         delta_writes = self._count_writes() - self.before_repair_writes
54         return (delta_reads, delta_allocates, delta_writes)
55
56     def failIfBigger(self, x, y):
57         self.failIf(x > y, "%s > %s" % (x, y))
58
59     def upload_and_stash(self):
60         c0 = self.g.clients[0]
61         c1 = self.g.clients[1]
62         c0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
63         d = c0.upload(upload.Data(common.TEST_DATA, convergence=""))
64         def _stash_uri(ur):
65             self.uri = ur.uri
66             self.c0_filenode = c0.create_node_from_uri(ur.uri)
67             self.c1_filenode = c1.create_node_from_uri(ur.uri)
68         d.addCallback(_stash_uri)
69         return d
70
71 class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin):
72     def test_check_without_verify(self):
73         """Check says the file is healthy when none of the shares have been
74         touched. It says that the file is unhealthy when all of them have
75         been removed. It doesn't use any reads.
76         """
77         self.basedir = "repairer/Verifier/check_without_verify"
78         self.set_up_grid(num_clients=2)
79         d = self.upload_and_stash()
80         d.addCallback(lambda ignored: self._stash_counts())
81         d.addCallback(lambda ignored:
82                       self.c0_filenode.check(Monitor(), verify=False))
83         def _check(cr):
84             self.failUnless(cr.is_healthy())
85             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
86             self.failIfBigger(delta_reads, 0)
87         d.addCallback(_check)
88
89         def _remove_all(ignored):
90             for sh in self.find_shares(self.uri):
91                 self.delete_share(sh)
92         d.addCallback(_remove_all)
93
94         d.addCallback(lambda ignored: self._stash_counts())
95         d.addCallback(lambda ignored:
96                       self.c0_filenode.check(Monitor(), verify=False))
97         def _check2(cr):
98             self.failIf(cr.is_healthy())
99             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
100             self.failIfBigger(delta_reads, 0)
101         d.addCallback(_check2)
102         return d
103
104     def _help_test_verify(self, corruptor, judgement, shnum=0, debug=False):
105         self.set_up_grid(num_clients=2)
106         d = self.upload_and_stash()
107         d.addCallback(lambda ignored: self._stash_counts())
108
109         d.addCallback(lambda ignored:
110                       self.corrupt_shares_numbered(self.uri, [shnum],corruptor,debug=debug))
111         d.addCallback(lambda ignored:
112                       self.c1_filenode.check(Monitor(), verify=True))
113         def _check(vr):
114             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
115             self.failIfBigger(delta_reads, MAX_DELTA_READS)
116             try:
117                 judgement(vr)
118             except unittest.FailTest, e:
119                 # FailTest just uses e.args[0] == str
120                 new_arg = str(e.args[0]) + "\nvr.data is: " + str(vr.get_data())
121                 e.args = (new_arg,)
122                 raise
123         d.addCallback(_check)
124         return d
125
126     def judge_no_problem(self, vr):
127         """ Verify says the file is healthy when none of the shares have been
128         touched in a way that matters. It doesn't use more than seven times
129         as many reads as it needs."""
130         self.failUnless(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
131         data = vr.get_data()
132         self.failUnless(data['count-shares-good'] == 10, data)
133         self.failUnless(len(data['sharemap']) == 10, data)
134         self.failUnless(data['count-shares-needed'] == 3, data)
135         self.failUnless(data['count-shares-expected'] == 10, data)
136         self.failUnless(data['count-good-share-hosts'] == 10, data)
137         self.failUnless(len(data['servers-responding']) == 10, data)
138         self.failUnless(len(data['list-corrupt-shares']) == 0, data)
139
140     def test_ok_no_corruption(self):
141         self.basedir = "repairer/Verifier/ok_no_corruption"
142         return self._help_test_verify(common._corrupt_nothing,
143                                       self.judge_no_problem)
144
145     def test_ok_filedata_size(self):
146         self.basedir = "repairer/Verifier/ok_filedatasize"
147         return self._help_test_verify(common._corrupt_size_of_file_data,
148                                       self.judge_no_problem)
149
150     def test_ok_sharedata_size(self):
151         self.basedir = "repairer/Verifier/ok_sharedata_size"
152         return self._help_test_verify(common._corrupt_size_of_sharedata,
153                                       self.judge_no_problem)
154
155     def test_ok_segment_size(self):
156         self.basedir = "repairer/Verifier/test_ok_segment_size"
157         return self._help_test_verify(common._corrupt_segment_size,
158                                       self.judge_no_problem)
159
160     def judge_visible_corruption(self, vr):
161         """Corruption which is detected by the server means that the server
162         will send you back a Failure in response to get_bucket instead of
163         giving you the share data. Test that verifier handles these answers
164         correctly. It doesn't use more than seven times as many reads as it
165         needs."""
166         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
167         data = vr.get_data()
168         self.failUnless(data['count-shares-good'] == 9, data)
169         self.failUnless(len(data['sharemap']) == 9, data)
170         self.failUnless(data['count-shares-needed'] == 3, data)
171         self.failUnless(data['count-shares-expected'] == 10, data)
172         self.failUnless(data['count-good-share-hosts'] == 9, data)
173         self.failUnless(len(data['servers-responding']) == 10, data)
174         self.failUnless(len(data['list-corrupt-shares']) == 0, data)
175
176     def test_corrupt_file_verno(self):
177         self.basedir = "repairer/Verifier/corrupt_file_verno"
178         return self._help_test_verify(common._corrupt_file_version_number,
179                                       self.judge_visible_corruption)
180
181     def judge_share_version_incompatibility(self, vr):
182         # corruption of the share version (inside the container, the 1/2
183         # value that determines whether we've got 4-byte offsets or 8-byte
184         # offsets) to something larger than 2 will trigger a
185         # ShareVersionIncompatible exception, which should be counted in
186         # list-incompatible-shares, rather than list-corrupt-shares.
187         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
188         data = vr.get_data()
189         self.failUnlessEqual(data['count-shares-good'], 9)
190         self.failUnlessEqual(len(data['sharemap']), 9)
191         self.failUnlessEqual(data['count-shares-needed'], 3)
192         self.failUnlessEqual(data['count-shares-expected'], 10)
193         self.failUnlessEqual(data['count-good-share-hosts'], 9)
194         self.failUnlessEqual(len(data['servers-responding']), 10)
195         self.failUnlessEqual(len(data['list-corrupt-shares']), 0)
196         self.failUnlessEqual(data['count-corrupt-shares'], 0)
197         self.failUnlessEqual(len(data['list-incompatible-shares']), 1)
198         self.failUnlessEqual(data['count-incompatible-shares'], 1)
199
200     def test_corrupt_share_verno(self):
201         self.basedir = "repairer/Verifier/corrupt_share_verno"
202         return self._help_test_verify(common._corrupt_sharedata_version_number,
203                                       self.judge_share_version_incompatibility)
204
205     def judge_invisible_corruption(self, vr):
206         # corruption of fields that the server does not check (which is most
207         # of them), which will be detected by the client as it downloads
208         # those shares.
209         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
210         data = vr.get_data()
211         self.failUnlessEqual(data['count-shares-good'], 9)
212         self.failUnlessEqual(data['count-shares-needed'], 3)
213         self.failUnlessEqual(data['count-shares-expected'], 10)
214         self.failUnlessEqual(data['count-good-share-hosts'], 9)
215         self.failUnlessEqual(data['count-corrupt-shares'], 1)
216         self.failUnlessEqual(len(data['list-corrupt-shares']), 1)
217         self.failUnlessEqual(data['count-incompatible-shares'], 0)
218         self.failUnlessEqual(len(data['list-incompatible-shares']), 0)
219         self.failUnlessEqual(len(data['servers-responding']), 10)
220         self.failUnlessEqual(len(data['sharemap']), 9)
221
222     def test_corrupt_sharedata_offset(self):
223         self.basedir = "repairer/Verifier/corrupt_sharedata_offset"
224         return self._help_test_verify(common._corrupt_offset_of_sharedata,
225                                       self.judge_invisible_corruption)
226
227     def test_corrupt_ueb_offset(self):
228         self.basedir = "repairer/Verifier/corrupt_ueb_offset"
229         return self._help_test_verify(common._corrupt_offset_of_uri_extension,
230                                       self.judge_invisible_corruption)
231
232     def test_corrupt_ueb_offset_shortread(self):
233         self.basedir = "repairer/Verifier/corrupt_ueb_offset_shortread"
234         return self._help_test_verify(common._corrupt_offset_of_uri_extension_to_force_short_read,
235                                       self.judge_invisible_corruption)
236
237     def test_corrupt_sharedata(self):
238         self.basedir = "repairer/Verifier/corrupt_sharedata"
239         return self._help_test_verify(common._corrupt_share_data,
240                                       self.judge_invisible_corruption)
241
242     def test_corrupt_ueb_length(self):
243         self.basedir = "repairer/Verifier/corrupt_ueb_length"
244         return self._help_test_verify(common._corrupt_length_of_uri_extension,
245                                       self.judge_invisible_corruption)
246
247     def test_corrupt_ueb(self):
248         self.basedir = "repairer/Verifier/corrupt_ueb"
249         return self._help_test_verify(common._corrupt_uri_extension,
250                                       self.judge_invisible_corruption)
251
252     def test_truncate_crypttext_hashtree(self):
253         # change the start of the block hashtree, to truncate the preceding
254         # crypttext hashtree
255         self.basedir = "repairer/Verifier/truncate_crypttext_hashtree"
256         return self._help_test_verify(common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes,
257                                       self.judge_invisible_corruption)
258
259     def test_corrupt_block_hashtree_offset(self):
260         self.basedir = "repairer/Verifier/corrupt_block_hashtree_offset"
261         return self._help_test_verify(common._corrupt_offset_of_block_hashes,
262                                       self.judge_invisible_corruption)
263
264     def test_wrong_share_verno(self):
265         self.basedir = "repairer/Verifier/wrong_share_verno"
266         return self._help_test_verify(common._corrupt_sharedata_version_number_to_plausible_version,
267                                       self.judge_invisible_corruption)
268
269     def test_corrupt_share_hashtree_offset(self):
270         self.basedir = "repairer/Verifier/corrupt_share_hashtree_offset"
271         return self._help_test_verify(common._corrupt_offset_of_share_hashes,
272                                       self.judge_invisible_corruption)
273
274     def test_corrupt_crypttext_hashtree_offset(self):
275         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_offset"
276         return self._help_test_verify(common._corrupt_offset_of_ciphertext_hash_tree,
277                                       self.judge_invisible_corruption)
278
279     def test_corrupt_crypttext_hashtree(self):
280         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree"
281         return self._help_test_verify(common._corrupt_crypttext_hash_tree,
282                                       self.judge_invisible_corruption)
283
284     def test_corrupt_crypttext_hashtree_byte_x221(self):
285         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_byte_9_bit_7"
286         return self._help_test_verify(common._corrupt_crypttext_hash_tree_byte_x221,
287                                       self.judge_invisible_corruption, debug=True)
288
289     def test_corrupt_block_hashtree(self):
290         self.basedir = "repairer/Verifier/corrupt_block_hashtree"
291         return self._help_test_verify(common._corrupt_block_hashes,
292                                       self.judge_invisible_corruption)
293
294     def test_corrupt_share_hashtree(self):
295         self.basedir = "repairer/Verifier/corrupt_share_hashtree"
296         return self._help_test_verify(common._corrupt_share_hashes,
297                                       self.judge_invisible_corruption)
298
299     # TODO: the Verifier should decode to ciphertext and check it against the
300     # crypttext-hash-tree. Check this by constructing a bogus file, in which
301     # the crypttext-hash-tree is modified after encoding is done, but before
302     # the UEB is finalized. The Verifier should see a valid
303     # crypttext-hash-tree but then the ciphertext should show up as invalid.
304     # Normally this could only be triggered by a bug in FEC decode.
305
306 # We'll allow you to pass this test even if you trigger thirty-five times as
307 # many block sends and disk writes as would be optimal.
308 WRITE_LEEWAY = 35
309 # Optimally, you could repair one of these (small) files in a single write.
310 DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY
311
312 class DownUpConnector(unittest.TestCase):
313     def test_deferred_satisfaction(self):
314         duc = repairer.DownUpConnector()
315         duc.registerProducer(None, True) # just because you have to call registerProducer first
316         # case 1: total data in buf is < requested data at time of request
317         duc.write('\x01')
318         d = duc.read_encrypted(2, False)
319         def _then(data):
320             self.failUnlessEqual(len(data), 2)
321             self.failUnlessEqual(data[0], '\x01')
322             self.failUnlessEqual(data[1], '\x02')
323         d.addCallback(_then)
324         duc.write('\x02')
325         return d
326
327     def test_extra(self):
328         duc = repairer.DownUpConnector()
329         duc.registerProducer(None, True) # just because you have to call registerProducer first
330         # case 1: total data in buf is < requested data at time of request
331         duc.write('\x01')
332         d = duc.read_encrypted(2, False)
333         def _then(data):
334             self.failUnlessEqual(len(data), 2)
335             self.failUnlessEqual(data[0], '\x01')
336             self.failUnlessEqual(data[1], '\x02')
337         d.addCallback(_then)
338         duc.write('\x02\0x03')
339         return d
340
341     def test_short_reads_1(self):
342         # You don't get fewer bytes than you requested -- instead you get no callback at all.
343         duc = repairer.DownUpConnector()
344         duc.registerProducer(None, True) # just because you have to call registerProducer first
345
346         d = duc.read_encrypted(2, False)
347         duc.write('\x04')
348
349         def _callb(res):
350             self.fail("Shouldn't have gotten this callback res: %s" % (res,))
351         d.addCallback(_callb)
352
353         # Also in the other order of read-vs-write:
354         duc2 = repairer.DownUpConnector()
355         duc2.registerProducer(None, True) # just because you have to call registerProducer first
356         duc2.write('\x04')
357         d = duc2.read_encrypted(2, False)
358
359         def _callb2(res):
360             self.fail("Shouldn't have gotten this callback res: %s" % (res,))
361         d.addCallback(_callb2)
362
363         # But once the DUC is closed then you *do* get short reads.
364         duc3 = repairer.DownUpConnector()
365         duc3.registerProducer(None, True) # just because you have to call registerProducer first
366
367         d = duc3.read_encrypted(2, False)
368         duc3.write('\x04')
369         duc3.close()
370         def _callb3(res):
371             self.failUnlessEqual(len(res), 1)
372             self.failUnlessEqual(res[0], '\x04')
373         d.addCallback(_callb3)
374         return d
375
376     def test_short_reads_2(self):
377         # Also in the other order of read-vs-write.
378         duc = repairer.DownUpConnector()
379         duc.registerProducer(None, True) # just because you have to call registerProducer first
380
381         duc.write('\x04')
382         d = duc.read_encrypted(2, False)
383         duc.close()
384
385         def _callb(res):
386             self.failUnlessEqual(len(res), 1)
387             self.failUnlessEqual(res[0], '\x04')
388         d.addCallback(_callb)
389         return d
390
391     def test_short_reads_3(self):
392         # Also if it is closed before the read.
393         duc = repairer.DownUpConnector()
394         duc.registerProducer(None, True) # just because you have to call registerProducer first
395
396         duc.write('\x04')
397         duc.close()
398         d = duc.read_encrypted(2, False)
399         def _callb(res):
400             self.failUnlessEqual(len(res), 1)
401             self.failUnlessEqual(res[0], '\x04')
402         d.addCallback(_callb)
403         return d
404
405 class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
406                common.ShouldFailMixin):
407
408     def test_harness(self):
409         # This test is actually to make sure our test harness works, rather
410         # than testing anything about Tahoe code itself.
411
412         self.basedir = "repairer/Repairer/test_code"
413         self.set_up_grid(num_clients=2)
414         d = self.upload_and_stash()
415
416         d.addCallback(lambda ignored: self.find_shares(self.uri))
417         def _stash_shares(oldshares):
418             self.oldshares = oldshares
419         d.addCallback(_stash_shares)
420         d.addCallback(lambda ignored: self.find_shares(self.uri))
421         def _compare(newshares):
422             self.failUnlessEqual(newshares, self.oldshares)
423         d.addCallback(_compare)
424
425         def _delete_8(ignored):
426             shnum = self.oldshares[0][0]
427             self.delete_shares_numbered(self.uri, [shnum])
428             for sh in self.oldshares[1:8]:
429                 self.delete_share(sh)
430         d.addCallback(_delete_8)
431         d.addCallback(lambda ignored: self.find_shares(self.uri))
432         d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
433
434         d.addCallback(lambda ignored:
435                       self.shouldFail(NotEnoughSharesError, "then_download",
436                                       None,
437                                       download_to_data, self.c1_filenode))
438
439         d.addCallback(lambda ignored:
440                       self.shouldFail(NotEnoughSharesError, "then_repair",
441                                       None,
442                                       self.c1_filenode.check_and_repair,
443                                       Monitor(), verify=False))
444
445         # test share corruption
446         def _test_corrupt(ignored):
447             olddata = {}
448             shares = self.find_shares(self.uri)
449             for (shnum, serverid, sharefile) in shares:
450                 olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
451             for sh in shares:
452                 self.corrupt_share(sh, common._corrupt_uri_extension)
453             for (shnum, serverid, sharefile) in shares:
454                 newdata = open(sharefile, "rb").read()
455                 self.failIfEqual(olddata[ (shnum, serverid) ], newdata)
456         d.addCallback(_test_corrupt)
457
458         def _remove_all(ignored):
459             for sh in self.find_shares(self.uri):
460                 self.delete_share(sh)
461         d.addCallback(_remove_all)
462         d.addCallback(lambda ignored: self.find_shares(self.uri))
463         d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
464
465         return d
466
467     def test_repair_from_deletion_of_1(self):
468         """ Repair replaces a share that got deleted. """
469         self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
470         self.set_up_grid(num_clients=2)
471         d = self.upload_and_stash()
472
473         d.addCallback(lambda ignored:
474                       self.delete_shares_numbered(self.uri, [2]))
475         d.addCallback(lambda ignored: self._stash_counts())
476         d.addCallback(lambda ignored:
477                       self.c0_filenode.check_and_repair(Monitor(),
478                                                         verify=False))
479         def _check_results(crr):
480             self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
481             pre = crr.get_pre_repair_results()
482             self.failUnlessIsInstance(pre, check_results.CheckResults)
483             post = crr.get_post_repair_results()
484             self.failUnlessIsInstance(post, check_results.CheckResults)
485             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
486             self.failIfBigger(delta_reads, MAX_DELTA_READS)
487             self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE)
488             self.failIf(pre.is_healthy())
489             self.failUnless(post.is_healthy())
490
491             # Now we inspect the filesystem to make sure that it has 10
492             # shares.
493             shares = self.find_shares(self.uri)
494             self.failIf(len(shares) < 10)
495         d.addCallback(_check_results)
496
497         d.addCallback(lambda ignored:
498                       self.c0_filenode.check(Monitor(), verify=True))
499         d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
500
501         # Now we delete seven of the other shares, then try to download the
502         # file and assert that it succeeds at downloading and has the right
503         # contents. This can't work unless it has already repaired the
504         # previously-deleted share #2.
505
506         d.addCallback(lambda ignored:
507                       self.delete_shares_numbered(self.uri, range(3, 10+1)))
508         d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
509         d.addCallback(lambda newdata:
510                       self.failUnlessEqual(newdata, common.TEST_DATA))
511         return d
512
513     def test_repair_from_deletion_of_7(self):
514         """ Repair replaces seven shares that got deleted. """
515         self.basedir = "repairer/Repairer/repair_from_deletion_of_7"
516         self.set_up_grid(num_clients=2)
517         d = self.upload_and_stash()
518         d.addCallback(lambda ignored:
519                       self.delete_shares_numbered(self.uri, range(7)))
520         d.addCallback(lambda ignored: self._stash_counts())
521         d.addCallback(lambda ignored:
522                       self.c0_filenode.check_and_repair(Monitor(),
523                                                         verify=False))
524         def _check_results(crr):
525             self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
526             pre = crr.get_pre_repair_results()
527             self.failUnlessIsInstance(pre, check_results.CheckResults)
528             post = crr.get_post_repair_results()
529             self.failUnlessIsInstance(post, check_results.CheckResults)
530             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
531
532             self.failIfBigger(delta_reads, MAX_DELTA_READS)
533             self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7))
534             self.failIf(pre.is_healthy())
535             self.failUnless(post.is_healthy(), post.data)
536
537             # Make sure we really have 10 shares.
538             shares = self.find_shares(self.uri)
539             self.failIf(len(shares) < 10)
540         d.addCallback(_check_results)
541
542         d.addCallback(lambda ignored:
543                       self.c0_filenode.check(Monitor(), verify=True))
544         d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
545
546         # Now we delete seven of the other shares, then try to download the
547         # file and assert that it succeeds at downloading and has the right
548         # contents. This can't work unless it has already repaired the
549         # previously-deleted share #2.
550
551         d.addCallback(lambda ignored:
552                       self.delete_shares_numbered(self.uri, range(3, 10+1)))
553         d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
554         d.addCallback(lambda newdata:
555                       self.failUnlessEqual(newdata, common.TEST_DATA))
556         return d
557
558     # why is test_repair_from_corruption_of_1 disabled? Read on:
559     #
560     # As recently documented in NEWS for the 1.3.0 release, the current
561     # immutable repairer suffers from several limitations:
562     #
563     #  * minimalistic verifier: it's just download without decryption, so we
564     #    don't look for corruption in N-k shares, and for many fields (those
565     #    which are the same in all shares) we only look for corruption in a
566     #    single share
567     #
568     #  * some kinds of corruption cause download to fail (when it ought to
569     #    just switch to a different share), so repair will fail on these too
570     #
571     #  * RIStorageServer doesn't offer a way to delete old corrupt immutable
572     #    shares (the authority model is not at all clear), so the best the
573     #    repairer can do is to put replacement shares on new servers,
574     #    unfortunately leaving the corrupt shares in place
575     #
576     # This test is pretty strenuous: it asserts that the repairer does the
577     # ideal thing in 8 distinct situations, with randomized corruption in
578     # each. Because of the aforementioned limitations, it is highly unlikely
579     # to pass any of these. We're also concerned that the download-fails case
580     # can provoke a lost-progress bug (one was fixed, but there might be more
581     # lurking), which will cause the test to fail despite a ".todo" marker,
582     # and will probably cause subsequent unrelated tests to fail too (due to
583     # "unclean reactor" problems).
584     #
585     # In addition, I (warner) have recently refactored the rest of this class
586     # to use the much-faster no_network.GridTestMixin, so this tests needs to
587     # be updated before it will be able to run again.
588     #
589     # So we're turning this test off until we've done one or more of the
590     # following:
591     #  * remove some of these limitations
592     #  * break the test up into smaller, more functionally-oriented pieces
593     #  * simplify the repairer enough to let us be confident that it is free
594     #    of lost-progress bugs
595
596     def OFF_test_repair_from_corruption_of_1(self):
597         d = defer.succeed(None)
598
599         d.addCallback(self.find_shares)
600         stash = [None]
601         def _stash_it(res):
602             stash[0] = res
603             return res
604         d.addCallback(_stash_it)
605         def _put_it_all_back(ignored):
606             self.replace_shares(stash[0], storage_index=self.uri.storage_index)
607             return ignored
608
609         def _repair_from_corruption(shnum, corruptor_func):
610             before_repair_reads = self._count_reads()
611             before_repair_allocates = self._count_writes()
612
613             d2 = self.filenode.check_and_repair(Monitor(), verify=True)
614             def _after_repair(checkandrepairresults):
615                 prerepairres = checkandrepairresults.get_pre_repair_results()
616                 postrepairres = checkandrepairresults.get_post_repair_results()
617                 after_repair_reads = self._count_reads()
618                 after_repair_allocates = self._count_writes()
619
620                 # The "* 2" in reads is because you might read a whole share
621                 # before figuring out that it is corrupted. It might be
622                 # possible to make this delta reads number a little tighter.
623                 self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads))
624                 # The "* 2" in writes is because each server has two shares,
625                 # and it is reasonable for repairer to conclude that there
626                 # are two shares that it should upload, if the server fails
627                 # to serve the first share.
628                 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 2), (after_repair_allocates, before_repair_allocates))
629                 self.failIf(prerepairres.is_healthy(), (prerepairres.data, corruptor_func))
630                 self.failUnless(postrepairres.is_healthy(), (postrepairres.data, corruptor_func))
631
632                 # Now we inspect the filesystem to make sure that it has 10
633                 # shares.
634                 shares = self.find_shares()
635                 self.failIf(len(shares) < 10)
636
637                 # Now we assert that the verifier reports the file as healthy.
638                 d3 = self.filenode.check(Monitor(), verify=True)
639                 def _after_verify(verifyresults):
640                     self.failUnless(verifyresults.is_healthy())
641                 d3.addCallback(_after_verify)
642
643                 # Now we delete seven of the other shares, then try to
644                 # download the file and assert that it succeeds at
645                 # downloading and has the right contents. This can't work
646                 # unless it has already repaired the previously-corrupted share.
647                 def _then_delete_7_and_try_a_download(unused=None):
648                     shnums = range(10)
649                     shnums.remove(shnum)
650                     random.shuffle(shnums)
651                     for sharenum in shnums[:7]:
652                         self._delete_a_share(sharenum=sharenum)
653
654                     return self._download_and_check_plaintext()
655                 d3.addCallback(_then_delete_7_and_try_a_download)
656                 return d3
657
658             d2.addCallback(_after_repair)
659             return d2
660
661         for corruptor_func in (
662             common._corrupt_file_version_number,
663             common._corrupt_sharedata_version_number,
664             common._corrupt_offset_of_sharedata,
665             common._corrupt_offset_of_uri_extension,
666             common._corrupt_offset_of_uri_extension_to_force_short_read,
667             common._corrupt_share_data,
668             common._corrupt_length_of_uri_extension,
669             common._corrupt_uri_extension,
670             ):
671             # Now we corrupt a share...
672             d.addCallback(self._corrupt_a_random_share, corruptor_func)
673             # And repair...
674             d.addCallback(_repair_from_corruption, corruptor_func)
675
676         return d
677     #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
678
679
680 # XXX extend these tests to show that the checker detects which specific
681 # share on which specific server is broken -- this is necessary so that the
682 # checker results can be passed to the repairer and the repairer can go ahead
683 # and upload fixes without first doing what is effectively a check (/verify)
684 # run
685
686 # XXX extend these tests to show bad behavior of various kinds from servers:
687 # raising exception from each remove_foo() method, for example
688
689 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
690
691 # XXX test corruption that truncates other hash trees than just the crypttext
692 # hash tree
693
694 # XXX test the notify-someone-about-corruption feature (also implement that
695 # feature)
696
697 # XXX test whether repairer (downloader) correctly downloads a file even if
698 # to do so it has to acquire shares from a server that has already tried to
699 # serve it a corrupted share. (I don't think the current downloader would
700 # pass this test, depending on the kind of corruption.)