]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_repairer.py
7e928d59e04726f963d3544b7489cbe3378dfdd2
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_repairer.py
1 # -*- coding: utf-8 -*-
2 from allmydata.test import common
3 from allmydata.monitor import Monitor
4 from allmydata import check_results
5 from allmydata.interfaces import NotEnoughSharesError
6 from allmydata.immutable import repairer, upload
7 from allmydata.util.consumer import download_to_data
8 from twisted.internet import defer
9 from twisted.trial import unittest
10 import random
11 from no_network import GridTestMixin
12
13 # We'll allow you to pass this test even if you trigger eighteen times as
14 # many disk reads and block fetches as would be optimal.
15 READ_LEEWAY = 18
16 MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10
17
18 timeout=240 # François's ARM box timed out after 120 seconds of Verifier.test_corrupt_crypttext_hashtree
19
20 class RepairTestMixin:
21     def failUnlessIsInstance(self, x, xtype):
22         self.failUnless(isinstance(x, xtype), x)
23
24     def _count_reads(self):
25         sum_of_read_counts = 0
26         for (i, ss, storedir) in self.iterate_servers():
27             counters = ss.stats_provider.get_stats()['counters']
28             sum_of_read_counts += counters.get('storage_server.read', 0)
29         return sum_of_read_counts
30
31     def _count_allocates(self):
32         sum_of_allocate_counts = 0
33         for (i, ss, storedir) in self.iterate_servers():
34             counters = ss.stats_provider.get_stats()['counters']
35             sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
36         return sum_of_allocate_counts
37
38     def _count_writes(self):
39         sum_of_write_counts = 0
40         for (i, ss, storedir) in self.iterate_servers():
41             counters = ss.stats_provider.get_stats()['counters']
42             sum_of_write_counts += counters.get('storage_server.write', 0)
43         return sum_of_write_counts
44
45     def _stash_counts(self):
46         self.before_repair_reads = self._count_reads()
47         self.before_repair_allocates = self._count_allocates()
48         self.before_repair_writes = self._count_writes()
49
50     def _get_delta_counts(self):
51         delta_reads = self._count_reads() - self.before_repair_reads
52         delta_allocates = self._count_allocates() - self.before_repair_allocates
53         delta_writes = self._count_writes() - self.before_repair_writes
54         return (delta_reads, delta_allocates, delta_writes)
55
56     def failIfBigger(self, x, y):
57         self.failIf(x > y, "%s > %s" % (x, y))
58
59     def upload_and_stash(self):
60         c0 = self.g.clients[0]
61         c1 = self.g.clients[1]
62         c0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
63         d = c0.upload(upload.Data(common.TEST_DATA, convergence=""))
64         def _stash_uri(ur):
65             self.uri = ur.uri
66             self.c0_filenode = c0.create_node_from_uri(ur.uri)
67             self.c1_filenode = c1.create_node_from_uri(ur.uri)
68         d.addCallback(_stash_uri)
69         return d
70
71 class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin):
72     def test_check_without_verify(self):
73         """Check says the file is healthy when none of the shares have been
74         touched. It says that the file is unhealthy when all of them have
75         been removed. It doesn't use any reads.
76         """
77         self.basedir = "repairer/Verifier/check_without_verify"
78         self.set_up_grid(num_clients=2)
79         d = self.upload_and_stash()
80         d.addCallback(lambda ignored: self._stash_counts())
81         d.addCallback(lambda ignored:
82                       self.c0_filenode.check(Monitor(), verify=False))
83         def _check(cr):
84             self.failUnless(cr.is_healthy())
85             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
86             self.failIfBigger(delta_reads, 0)
87         d.addCallback(_check)
88
89         def _remove_all(ignored):
90             for sh in self.find_shares(self.uri):
91                 self.delete_share(sh)
92         d.addCallback(_remove_all)
93
94         d.addCallback(lambda ignored: self._stash_counts())
95         d.addCallback(lambda ignored:
96                       self.c0_filenode.check(Monitor(), verify=False))
97         def _check2(cr):
98             self.failIf(cr.is_healthy())
99             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
100             self.failIfBigger(delta_reads, 0)
101         d.addCallback(_check2)
102         return d
103
104     def _help_test_verify(self, corruptor, judgement, shnum=0, debug=False):
105         self.set_up_grid(num_clients=2)
106         d = self.upload_and_stash()
107         d.addCallback(lambda ignored: self._stash_counts())
108
109         d.addCallback(lambda ignored:
110                       self.corrupt_shares_numbered(self.uri, [shnum],corruptor,debug=debug))
111         d.addCallback(lambda ignored:
112                       self.c1_filenode.check(Monitor(), verify=True))
113         def _check(vr):
114             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
115             self.failIfBigger(delta_reads, MAX_DELTA_READS)
116             try:
117                 judgement(vr)
118             except unittest.FailTest, e:
119                 # FailTest just uses e.args[0] == str
120                 new_arg = str(e.args[0]) + "\nvr.data is: " + str(vr.get_data())
121                 e.args = (new_arg,)
122                 raise
123         d.addCallback(_check)
124         return d
125
126     def judge_no_problem(self, vr):
127         """ Verify says the file is healthy when none of the shares have been
128         touched in a way that matters. It doesn't use more than seven times
129         as many reads as it needs."""
130         self.failUnless(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
131         data = vr.get_data()
132         self.failUnless(data['count-shares-good'] == 10, data)
133         self.failUnless(len(data['sharemap']) == 10, data)
134         self.failUnless(data['count-shares-needed'] == 3, data)
135         self.failUnless(data['count-shares-expected'] == 10, data)
136         self.failUnless(data['count-good-share-hosts'] == 10, data)
137         self.failUnless(len(data['servers-responding']) == 10, data)
138         self.failUnless(len(data['list-corrupt-shares']) == 0, data)
139
140     def test_ok_no_corruption(self):
141         self.basedir = "repairer/Verifier/ok_no_corruption"
142         return self._help_test_verify(common._corrupt_nothing,
143                                       self.judge_no_problem)
144
145     def test_ok_filedata_size(self):
146         self.basedir = "repairer/Verifier/ok_filedatasize"
147         return self._help_test_verify(common._corrupt_size_of_file_data,
148                                       self.judge_no_problem)
149
150     def test_ok_sharedata_size(self):
151         self.basedir = "repairer/Verifier/ok_sharedata_size"
152         return self._help_test_verify(common._corrupt_size_of_sharedata,
153                                       self.judge_no_problem)
154
155     def test_ok_segment_size(self):
156         self.basedir = "repairer/Verifier/test_ok_segment_size"
157         return self._help_test_verify(common._corrupt_segment_size,
158                                       self.judge_no_problem)
159
160     def judge_visible_corruption(self, vr):
161         """Corruption which is detected by the server means that the server
162         will send you back a Failure in response to get_bucket instead of
163         giving you the share data. Test that verifier handles these answers
164         correctly. It doesn't use more than seven times as many reads as it
165         needs."""
166         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
167         data = vr.get_data()
168         self.failUnless(data['count-shares-good'] == 9, data)
169         self.failUnless(len(data['sharemap']) == 9, data)
170         self.failUnless(data['count-shares-needed'] == 3, data)
171         self.failUnless(data['count-shares-expected'] == 10, data)
172         self.failUnless(data['count-good-share-hosts'] == 9, data)
173         self.failUnless(len(data['servers-responding']) == 10, data)
174         self.failUnless(len(data['list-corrupt-shares']) == 0, data)
175
176     def test_corrupt_file_verno(self):
177         self.basedir = "repairer/Verifier/corrupt_file_verno"
178         return self._help_test_verify(common._corrupt_file_version_number,
179                                       self.judge_visible_corruption)
180
181     def judge_share_version_incompatibility(self, vr):
182         # corruption of the share version (inside the container, the 1/2
183         # value that determines whether we've got 4-byte offsets or 8-byte
184         # offsets) to something larger than 2 will trigger a
185         # ShareVersionIncompatible exception, which should be counted in
186         # list-incompatible-shares, rather than list-corrupt-shares.
187         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
188         data = vr.get_data()
189         self.failUnlessEqual(data['count-shares-good'], 9)
190         self.failUnlessEqual(len(data['sharemap']), 9)
191         self.failUnlessEqual(data['count-shares-needed'], 3)
192         self.failUnlessEqual(data['count-shares-expected'], 10)
193         self.failUnlessEqual(data['count-good-share-hosts'], 9)
194         self.failUnlessEqual(len(data['servers-responding']), 10)
195         self.failUnlessEqual(len(data['list-corrupt-shares']), 0)
196         self.failUnlessEqual(data['count-corrupt-shares'], 0)
197         self.failUnlessEqual(len(data['list-incompatible-shares']), 1)
198         self.failUnlessEqual(data['count-incompatible-shares'], 1)
199
200     def test_corrupt_share_verno(self):
201         self.basedir = "repairer/Verifier/corrupt_share_verno"
202         return self._help_test_verify(common._corrupt_sharedata_version_number,
203                                       self.judge_share_version_incompatibility)
204
205     def judge_invisible_corruption(self, vr):
206         # corruption of fields that the server does not check (which is most
207         # of them), which will be detected by the client as it downloads
208         # those shares.
209         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
210         data = vr.get_data()
211         self.failUnlessEqual(data['count-shares-good'], 9)
212         self.failUnlessEqual(data['count-shares-needed'], 3)
213         self.failUnlessEqual(data['count-shares-expected'], 10)
214         self.failUnlessEqual(data['count-good-share-hosts'], 9)
215         self.failUnlessEqual(data['count-corrupt-shares'], 1)
216         self.failUnlessEqual(len(data['list-corrupt-shares']), 1)
217         self.failUnlessEqual(data['count-incompatible-shares'], 0)
218         self.failUnlessEqual(len(data['list-incompatible-shares']), 0)
219         self.failUnlessEqual(len(data['servers-responding']), 10)
220         self.failUnlessEqual(len(data['sharemap']), 9)
221
222     def test_corrupt_sharedata_offset(self):
223         self.basedir = "repairer/Verifier/corrupt_sharedata_offset"
224         return self._help_test_verify(common._corrupt_offset_of_sharedata,
225                                       self.judge_invisible_corruption)
226
227     def test_corrupt_ueb_offset(self):
228         self.basedir = "repairer/Verifier/corrupt_ueb_offset"
229         return self._help_test_verify(common._corrupt_offset_of_uri_extension,
230                                       self.judge_invisible_corruption)
231
232     def test_corrupt_ueb_offset_shortread(self):
233         self.basedir = "repairer/Verifier/corrupt_ueb_offset_shortread"
234         return self._help_test_verify(common._corrupt_offset_of_uri_extension_to_force_short_read,
235                                       self.judge_invisible_corruption)
236
237     def test_corrupt_sharedata(self):
238         self.basedir = "repairer/Verifier/corrupt_sharedata"
239         return self._help_test_verify(common._corrupt_share_data,
240                                       self.judge_invisible_corruption)
241
242     def test_corrupt_ueb_length(self):
243         self.basedir = "repairer/Verifier/corrupt_ueb_length"
244         return self._help_test_verify(common._corrupt_length_of_uri_extension,
245                                       self.judge_invisible_corruption)
246
247     def test_corrupt_ueb(self):
248         self.basedir = "repairer/Verifier/corrupt_ueb"
249         return self._help_test_verify(common._corrupt_uri_extension,
250                                       self.judge_invisible_corruption)
251
252     def test_truncate_crypttext_hashtree(self):
253         # change the start of the block hashtree, to truncate the preceding
254         # crypttext hashtree
255         self.basedir = "repairer/Verifier/truncate_crypttext_hashtree"
256         return self._help_test_verify(common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes,
257                                       self.judge_invisible_corruption)
258
259     def test_corrupt_block_hashtree_offset(self):
260         self.basedir = "repairer/Verifier/corrupt_block_hashtree_offset"
261         return self._help_test_verify(common._corrupt_offset_of_block_hashes,
262                                       self.judge_invisible_corruption)
263
264     def test_wrong_share_verno(self):
265         self.basedir = "repairer/Verifier/wrong_share_verno"
266         return self._help_test_verify(common._corrupt_sharedata_version_number_to_plausible_version,
267                                       self.judge_invisible_corruption)
268
269     def test_corrupt_share_hashtree_offset(self):
270         self.basedir = "repairer/Verifier/corrupt_share_hashtree_offset"
271         return self._help_test_verify(common._corrupt_offset_of_share_hashes,
272                                       self.judge_invisible_corruption)
273
274     def test_corrupt_crypttext_hashtree_offset(self):
275         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_offset"
276         return self._help_test_verify(common._corrupt_offset_of_ciphertext_hash_tree,
277                                       self.judge_invisible_corruption)
278
279     def test_corrupt_crypttext_hashtree(self):
280         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree"
281         return self._help_test_verify(common._corrupt_crypttext_hash_tree,
282                                       self.judge_invisible_corruption)
283
284     def test_corrupt_crypttext_hashtree_byte_x221(self):
285         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_byte_9_bit_7"
286         return self._help_test_verify(common._corrupt_crypttext_hash_tree_byte_x221,
287                                       self.judge_invisible_corruption, debug=True)
288
289     def test_corrupt_block_hashtree(self):
290         self.basedir = "repairer/Verifier/corrupt_block_hashtree"
291         return self._help_test_verify(common._corrupt_block_hashes,
292                                       self.judge_invisible_corruption)
293
294     def test_corrupt_share_hashtree(self):
295         self.basedir = "repairer/Verifier/corrupt_share_hashtree"
296         return self._help_test_verify(common._corrupt_share_hashes,
297                                       self.judge_invisible_corruption)
298
299     # TODO: the Verifier should decode to ciphertext and check it against the
300     # crypttext-hash-tree. Check this by constructing a bogus file, in which
301     # the crypttext-hash-tree is modified after encoding is done, but before
302     # the UEB is finalized. The Verifier should see a valid
303     # crypttext-hash-tree but then the ciphertext should show up as invalid.
304     # Normally this could only be triggered by a bug in FEC decode.
305
306     def OFF_test_each_byte(self):
307         # this test takes 140s to run on my laptop, and doesn't have any
308         # actual asserts, so it's commented out. It corrupts each byte of the
309         # share in sequence, and checks to see which ones the Verifier
310         # catches and which it misses. Ticket #819 contains details: there
311         # are several portions of the share that are unused, for which
312         # corruption is not supposed to be caught.
313         #
314         # If the test ran quickly, we could use the share size to compute the
315         # offsets of these unused portions and assert that everything outside
316         # of them was detected. We could then replace the rest of
317         # Verifier.test_* (which takes 16s to run on my laptop) with this
318         # one.
319         self.basedir = "repairer/Verifier/each_byte"
320         self.set_up_grid(num_clients=2)
321         d = self.upload_and_stash()
322         def _grab_sh0(res):
323             self.sh0_file = [sharefile
324                              for (shnum, serverid, sharefile)
325                              in self.find_shares(self.uri)
326                              if shnum == 0][0]
327             self.sh0_orig = open(self.sh0_file, "rb").read()
328         d.addCallback(_grab_sh0)
329         def _fix_sh0(res):
330             f = open(self.sh0_file, "wb")
331             f.write(self.sh0_orig)
332             f.close()
333         def _corrupt(ign, which):
334             def _corruptor(s, debug=False):
335                 return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
336             self.corrupt_shares_numbered(self.uri, [0], _corruptor)
337         results = {}
338         def _did_check(vr, i):
339             #print "corrupt %d: healthy=%s" % (i, vr.is_healthy())
340             results[i] = vr.is_healthy()
341         def _start(ign):
342             d = defer.succeed(None)
343             for i in range(len(self.sh0_orig)):
344                 d.addCallback(_corrupt, i)
345                 d.addCallback(lambda ign:
346                               self.c1_filenode.check(Monitor(), verify=True))
347                 d.addCallback(_did_check, i)
348                 d.addCallback(_fix_sh0)
349             return d
350         d.addCallback(_start)
351         def _show_results(ign):
352             f = open("test_each_byte_output", "w")
353             for i in sorted(results.keys()):
354                 print >>f, "%d: %s" % (i, results[i])
355             f.close()
356             print "Please look in _trial_temp/test_each_byte_output for results"
357         d.addCallback(_show_results)
358         return d
359
360 # We'll allow you to pass this test even if you trigger thirty-five times as
361 # many block sends and disk writes as would be optimal.
362 WRITE_LEEWAY = 35
363 # Optimally, you could repair one of these (small) files in a single write.
364 DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY
365
366 class DownUpConnector(unittest.TestCase):
367     def test_deferred_satisfaction(self):
368         duc = repairer.DownUpConnector()
369         duc.registerProducer(None, True) # just because you have to call registerProducer first
370         # case 1: total data in buf is < requested data at time of request
371         duc.write('\x01')
372         d = duc.read_encrypted(2, False)
373         def _then(data):
374             self.failUnlessEqual(len(data), 2)
375             self.failUnlessEqual(data[0], '\x01')
376             self.failUnlessEqual(data[1], '\x02')
377         d.addCallback(_then)
378         duc.write('\x02')
379         return d
380
381     def test_extra(self):
382         duc = repairer.DownUpConnector()
383         duc.registerProducer(None, True) # just because you have to call registerProducer first
384         # case 1: total data in buf is < requested data at time of request
385         duc.write('\x01')
386         d = duc.read_encrypted(2, False)
387         def _then(data):
388             self.failUnlessEqual(len(data), 2)
389             self.failUnlessEqual(data[0], '\x01')
390             self.failUnlessEqual(data[1], '\x02')
391         d.addCallback(_then)
392         duc.write('\x02\0x03')
393         return d
394
395     def test_short_reads_1(self):
396         # You don't get fewer bytes than you requested -- instead you get no callback at all.
397         duc = repairer.DownUpConnector()
398         duc.registerProducer(None, True) # just because you have to call registerProducer first
399
400         d = duc.read_encrypted(2, False)
401         duc.write('\x04')
402
403         def _callb(res):
404             self.fail("Shouldn't have gotten this callback res: %s" % (res,))
405         d.addCallback(_callb)
406
407         # Also in the other order of read-vs-write:
408         duc2 = repairer.DownUpConnector()
409         duc2.registerProducer(None, True) # just because you have to call registerProducer first
410         duc2.write('\x04')
411         d = duc2.read_encrypted(2, False)
412
413         def _callb2(res):
414             self.fail("Shouldn't have gotten this callback res: %s" % (res,))
415         d.addCallback(_callb2)
416
417         # But once the DUC is closed then you *do* get short reads.
418         duc3 = repairer.DownUpConnector()
419         duc3.registerProducer(None, True) # just because you have to call registerProducer first
420
421         d = duc3.read_encrypted(2, False)
422         duc3.write('\x04')
423         duc3.close()
424         def _callb3(res):
425             self.failUnlessEqual(len(res), 1)
426             self.failUnlessEqual(res[0], '\x04')
427         d.addCallback(_callb3)
428         return d
429
430     def test_short_reads_2(self):
431         # Also in the other order of read-vs-write.
432         duc = repairer.DownUpConnector()
433         duc.registerProducer(None, True) # just because you have to call registerProducer first
434
435         duc.write('\x04')
436         d = duc.read_encrypted(2, False)
437         duc.close()
438
439         def _callb(res):
440             self.failUnlessEqual(len(res), 1)
441             self.failUnlessEqual(res[0], '\x04')
442         d.addCallback(_callb)
443         return d
444
445     def test_short_reads_3(self):
446         # Also if it is closed before the read.
447         duc = repairer.DownUpConnector()
448         duc.registerProducer(None, True) # just because you have to call registerProducer first
449
450         duc.write('\x04')
451         duc.close()
452         d = duc.read_encrypted(2, False)
453         def _callb(res):
454             self.failUnlessEqual(len(res), 1)
455             self.failUnlessEqual(res[0], '\x04')
456         d.addCallback(_callb)
457         return d
458
459 class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
460                common.ShouldFailMixin):
461
462     def test_harness(self):
463         # This test is actually to make sure our test harness works, rather
464         # than testing anything about Tahoe code itself.
465
466         self.basedir = "repairer/Repairer/test_code"
467         self.set_up_grid(num_clients=2)
468         d = self.upload_and_stash()
469
470         d.addCallback(lambda ignored: self.find_shares(self.uri))
471         def _stash_shares(oldshares):
472             self.oldshares = oldshares
473         d.addCallback(_stash_shares)
474         d.addCallback(lambda ignored: self.find_shares(self.uri))
475         def _compare(newshares):
476             self.failUnlessEqual(newshares, self.oldshares)
477         d.addCallback(_compare)
478
479         def _delete_8(ignored):
480             shnum = self.oldshares[0][0]
481             self.delete_shares_numbered(self.uri, [shnum])
482             for sh in self.oldshares[1:8]:
483                 self.delete_share(sh)
484         d.addCallback(_delete_8)
485         d.addCallback(lambda ignored: self.find_shares(self.uri))
486         d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
487
488         d.addCallback(lambda ignored:
489                       self.shouldFail(NotEnoughSharesError, "then_download",
490                                       None,
491                                       download_to_data, self.c1_filenode))
492
493         d.addCallback(lambda ignored:
494                       self.shouldFail(NotEnoughSharesError, "then_repair",
495                                       None,
496                                       self.c1_filenode.check_and_repair,
497                                       Monitor(), verify=False))
498
499         # test share corruption
500         def _test_corrupt(ignored):
501             olddata = {}
502             shares = self.find_shares(self.uri)
503             for (shnum, serverid, sharefile) in shares:
504                 olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
505             for sh in shares:
506                 self.corrupt_share(sh, common._corrupt_uri_extension)
507             for (shnum, serverid, sharefile) in shares:
508                 newdata = open(sharefile, "rb").read()
509                 self.failIfEqual(olddata[ (shnum, serverid) ], newdata)
510         d.addCallback(_test_corrupt)
511
512         def _remove_all(ignored):
513             for sh in self.find_shares(self.uri):
514                 self.delete_share(sh)
515         d.addCallback(_remove_all)
516         d.addCallback(lambda ignored: self.find_shares(self.uri))
517         d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
518
519         return d
520
521     def test_repair_from_deletion_of_1(self):
522         """ Repair replaces a share that got deleted. """
523         self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
524         self.set_up_grid(num_clients=2)
525         d = self.upload_and_stash()
526
527         d.addCallback(lambda ignored:
528                       self.delete_shares_numbered(self.uri, [2]))
529         d.addCallback(lambda ignored: self._stash_counts())
530         d.addCallback(lambda ignored:
531                       self.c0_filenode.check_and_repair(Monitor(),
532                                                         verify=False))
533         def _check_results(crr):
534             self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
535             pre = crr.get_pre_repair_results()
536             self.failUnlessIsInstance(pre, check_results.CheckResults)
537             post = crr.get_post_repair_results()
538             self.failUnlessIsInstance(post, check_results.CheckResults)
539             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
540             self.failIfBigger(delta_reads, MAX_DELTA_READS)
541             self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE)
542             self.failIf(pre.is_healthy())
543             self.failUnless(post.is_healthy())
544
545             # Now we inspect the filesystem to make sure that it has 10
546             # shares.
547             shares = self.find_shares(self.uri)
548             self.failIf(len(shares) < 10)
549         d.addCallback(_check_results)
550
551         d.addCallback(lambda ignored:
552                       self.c0_filenode.check(Monitor(), verify=True))
553         d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
554
555         # Now we delete seven of the other shares, then try to download the
556         # file and assert that it succeeds at downloading and has the right
557         # contents. This can't work unless it has already repaired the
558         # previously-deleted share #2.
559
560         d.addCallback(lambda ignored:
561                       self.delete_shares_numbered(self.uri, range(3, 10+1)))
562         d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
563         d.addCallback(lambda newdata:
564                       self.failUnlessEqual(newdata, common.TEST_DATA))
565         return d
566
567     def test_repair_from_deletion_of_7(self):
568         """ Repair replaces seven shares that got deleted. """
569         self.basedir = "repairer/Repairer/repair_from_deletion_of_7"
570         self.set_up_grid(num_clients=2)
571         d = self.upload_and_stash()
572         d.addCallback(lambda ignored:
573                       self.delete_shares_numbered(self.uri, range(7)))
574         d.addCallback(lambda ignored: self._stash_counts())
575         d.addCallback(lambda ignored:
576                       self.c0_filenode.check_and_repair(Monitor(),
577                                                         verify=False))
578         def _check_results(crr):
579             self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
580             pre = crr.get_pre_repair_results()
581             self.failUnlessIsInstance(pre, check_results.CheckResults)
582             post = crr.get_post_repair_results()
583             self.failUnlessIsInstance(post, check_results.CheckResults)
584             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
585
586             self.failIfBigger(delta_reads, MAX_DELTA_READS)
587             self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7))
588             self.failIf(pre.is_healthy())
589             self.failUnless(post.is_healthy(), post.data)
590
591             # Make sure we really have 10 shares.
592             shares = self.find_shares(self.uri)
593             self.failIf(len(shares) < 10)
594         d.addCallback(_check_results)
595
596         d.addCallback(lambda ignored:
597                       self.c0_filenode.check(Monitor(), verify=True))
598         d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
599
600         # Now we delete seven of the other shares, then try to download the
601         # file and assert that it succeeds at downloading and has the right
602         # contents. This can't work unless it has already repaired the
603         # previously-deleted share #2.
604
605         d.addCallback(lambda ignored:
606                       self.delete_shares_numbered(self.uri, range(3, 10+1)))
607         d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
608         d.addCallback(lambda newdata:
609                       self.failUnlessEqual(newdata, common.TEST_DATA))
610         return d
611
612     # why is test_repair_from_corruption_of_1 disabled? Read on:
613     #
614     # As recently documented in NEWS for the 1.3.0 release, the current
615     # immutable repairer suffers from several limitations:
616     #
617     #  * minimalistic verifier: it's just download without decryption, so we
618     #    don't look for corruption in N-k shares, and for many fields (those
619     #    which are the same in all shares) we only look for corruption in a
620     #    single share
621     #
622     #  * some kinds of corruption cause download to fail (when it ought to
623     #    just switch to a different share), so repair will fail on these too
624     #
625     #  * RIStorageServer doesn't offer a way to delete old corrupt immutable
626     #    shares (the authority model is not at all clear), so the best the
627     #    repairer can do is to put replacement shares on new servers,
628     #    unfortunately leaving the corrupt shares in place
629     #
630     # This test is pretty strenuous: it asserts that the repairer does the
631     # ideal thing in 8 distinct situations, with randomized corruption in
632     # each. Because of the aforementioned limitations, it is highly unlikely
633     # to pass any of these. We're also concerned that the download-fails case
634     # can provoke a lost-progress bug (one was fixed, but there might be more
635     # lurking), which will cause the test to fail despite a ".todo" marker,
636     # and will probably cause subsequent unrelated tests to fail too (due to
637     # "unclean reactor" problems).
638     #
639     # In addition, I (warner) have recently refactored the rest of this class
640     # to use the much-faster no_network.GridTestMixin, so this tests needs to
641     # be updated before it will be able to run again.
642     #
643     # So we're turning this test off until we've done one or more of the
644     # following:
645     #  * remove some of these limitations
646     #  * break the test up into smaller, more functionally-oriented pieces
647     #  * simplify the repairer enough to let us be confident that it is free
648     #    of lost-progress bugs
649
650     def OFF_test_repair_from_corruption_of_1(self):
651         d = defer.succeed(None)
652
653         d.addCallback(self.find_shares)
654         stash = [None]
655         def _stash_it(res):
656             stash[0] = res
657             return res
658         d.addCallback(_stash_it)
659         def _put_it_all_back(ignored):
660             self.replace_shares(stash[0], storage_index=self.uri.storage_index)
661             return ignored
662
663         def _repair_from_corruption(shnum, corruptor_func):
664             before_repair_reads = self._count_reads()
665             before_repair_allocates = self._count_writes()
666
667             d2 = self.filenode.check_and_repair(Monitor(), verify=True)
668             def _after_repair(checkandrepairresults):
669                 prerepairres = checkandrepairresults.get_pre_repair_results()
670                 postrepairres = checkandrepairresults.get_post_repair_results()
671                 after_repair_reads = self._count_reads()
672                 after_repair_allocates = self._count_writes()
673
674                 # The "* 2" in reads is because you might read a whole share
675                 # before figuring out that it is corrupted. It might be
676                 # possible to make this delta reads number a little tighter.
677                 self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads))
678                 # The "* 2" in writes is because each server has two shares,
679                 # and it is reasonable for repairer to conclude that there
680                 # are two shares that it should upload, if the server fails
681                 # to serve the first share.
682                 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 2), (after_repair_allocates, before_repair_allocates))
683                 self.failIf(prerepairres.is_healthy(), (prerepairres.data, corruptor_func))
684                 self.failUnless(postrepairres.is_healthy(), (postrepairres.data, corruptor_func))
685
686                 # Now we inspect the filesystem to make sure that it has 10
687                 # shares.
688                 shares = self.find_shares()
689                 self.failIf(len(shares) < 10)
690
691                 # Now we assert that the verifier reports the file as healthy.
692                 d3 = self.filenode.check(Monitor(), verify=True)
693                 def _after_verify(verifyresults):
694                     self.failUnless(verifyresults.is_healthy())
695                 d3.addCallback(_after_verify)
696
697                 # Now we delete seven of the other shares, then try to
698                 # download the file and assert that it succeeds at
699                 # downloading and has the right contents. This can't work
700                 # unless it has already repaired the previously-corrupted share.
701                 def _then_delete_7_and_try_a_download(unused=None):
702                     shnums = range(10)
703                     shnums.remove(shnum)
704                     random.shuffle(shnums)
705                     for sharenum in shnums[:7]:
706                         self._delete_a_share(sharenum=sharenum)
707
708                     return self._download_and_check_plaintext()
709                 d3.addCallback(_then_delete_7_and_try_a_download)
710                 return d3
711
712             d2.addCallback(_after_repair)
713             return d2
714
715         for corruptor_func in (
716             common._corrupt_file_version_number,
717             common._corrupt_sharedata_version_number,
718             common._corrupt_offset_of_sharedata,
719             common._corrupt_offset_of_uri_extension,
720             common._corrupt_offset_of_uri_extension_to_force_short_read,
721             common._corrupt_share_data,
722             common._corrupt_length_of_uri_extension,
723             common._corrupt_uri_extension,
724             ):
725             # Now we corrupt a share...
726             d.addCallback(self._corrupt_a_random_share, corruptor_func)
727             # And repair...
728             d.addCallback(_repair_from_corruption, corruptor_func)
729
730         return d
731     #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
732
733
734 # XXX extend these tests to show that the checker detects which specific
735 # share on which specific server is broken -- this is necessary so that the
736 # checker results can be passed to the repairer and the repairer can go ahead
737 # and upload fixes without first doing what is effectively a check (/verify)
738 # run
739
740 # XXX extend these tests to show bad behavior of various kinds from servers:
741 # raising exception from each remove_foo() method, for example
742
743 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
744
745 # XXX test corruption that truncates other hash trees than just the crypttext
746 # hash tree
747
748 # XXX test the notify-someone-about-corruption feature (also implement that
749 # feature)
750
751 # XXX test whether repairer (downloader) correctly downloads a file even if
752 # to do so it has to acquire shares from a server that has already tried to
753 # serve it a corrupted share. (I don't think the current downloader would
754 # pass this test, depending on the kind of corruption.)