1 # -*- coding: utf-8 -*-
2 from allmydata.test import common
3 from allmydata.monitor import Monitor
4 from allmydata import check_results
5 from allmydata.interfaces import NotEnoughSharesError
6 from allmydata.immutable import repairer, upload
7 from allmydata.util.consumer import download_to_data
8 from twisted.internet import defer
9 from twisted.trial import unittest
11 from allmydata.test.no_network import GridTestMixin
13 # We'll allow you to pass this test even if you trigger eighteen times as
14 # many disk reads and block fetches as would be optimal.
16 MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10
18 timeout=240 # François's ARM box timed out after 120 seconds of Verifier.test_corrupt_crypttext_hashtree
20 class RepairTestMixin:
21 def failUnlessIsInstance(self, x, xtype):
22 self.failUnless(isinstance(x, xtype), x)
24 def _count_reads(self):
25 sum_of_read_counts = 0
26 for (i, ss, storedir) in self.iterate_servers():
27 counters = ss.stats_provider.get_stats()['counters']
28 sum_of_read_counts += counters.get('storage_server.read', 0)
29 return sum_of_read_counts
31 def _count_allocates(self):
32 sum_of_allocate_counts = 0
33 for (i, ss, storedir) in self.iterate_servers():
34 counters = ss.stats_provider.get_stats()['counters']
35 sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
36 return sum_of_allocate_counts
38 def _count_writes(self):
39 sum_of_write_counts = 0
40 for (i, ss, storedir) in self.iterate_servers():
41 counters = ss.stats_provider.get_stats()['counters']
42 sum_of_write_counts += counters.get('storage_server.write', 0)
43 return sum_of_write_counts
45 def _stash_counts(self):
46 self.before_repair_reads = self._count_reads()
47 self.before_repair_allocates = self._count_allocates()
48 self.before_repair_writes = self._count_writes()
50 def _get_delta_counts(self):
51 delta_reads = self._count_reads() - self.before_repair_reads
52 delta_allocates = self._count_allocates() - self.before_repair_allocates
53 delta_writes = self._count_writes() - self.before_repair_writes
54 return (delta_reads, delta_allocates, delta_writes)
56 def failIfBigger(self, x, y):
57 self.failIf(x > y, "%s > %s" % (x, y))
59 def upload_and_stash(self):
60 c0 = self.g.clients[0]
61 c1 = self.g.clients[1]
62 c0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
63 d = c0.upload(upload.Data(common.TEST_DATA, convergence=""))
66 self.c0_filenode = c0.create_node_from_uri(ur.uri)
67 self.c1_filenode = c1.create_node_from_uri(ur.uri)
68 d.addCallback(_stash_uri)
71 class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin):
72 def test_check_without_verify(self):
73 """Check says the file is healthy when none of the shares have been
74 touched. It says that the file is unhealthy when all of them have
75 been removed. It doesn't use any reads.
77 self.basedir = "repairer/Verifier/check_without_verify"
78 self.set_up_grid(num_clients=2)
79 d = self.upload_and_stash()
80 d.addCallback(lambda ignored: self._stash_counts())
81 d.addCallback(lambda ignored:
82 self.c0_filenode.check(Monitor(), verify=False))
84 self.failUnless(cr.is_healthy())
85 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
86 self.failIfBigger(delta_reads, 0)
89 def _remove_all(ignored):
90 for sh in self.find_shares(self.uri):
92 d.addCallback(_remove_all)
94 d.addCallback(lambda ignored: self._stash_counts())
95 d.addCallback(lambda ignored:
96 self.c0_filenode.check(Monitor(), verify=False))
98 self.failIf(cr.is_healthy())
99 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
100 self.failIfBigger(delta_reads, 0)
101 d.addCallback(_check2)
104 def _help_test_verify(self, corruptor, judgement, shnum=0, debug=False):
105 self.set_up_grid(num_clients=2)
106 d = self.upload_and_stash()
107 d.addCallback(lambda ignored: self._stash_counts())
109 d.addCallback(lambda ignored:
110 self.corrupt_shares_numbered(self.uri, [shnum],corruptor,debug=debug))
111 d.addCallback(lambda ignored:
112 self.c1_filenode.check(Monitor(), verify=True))
114 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
115 self.failIfBigger(delta_reads, MAX_DELTA_READS)
118 except unittest.FailTest, e:
119 # FailTest just uses e.args[0] == str
120 new_arg = str(e.args[0]) + "\nvr.data is: " + str(vr.get_data())
123 d.addCallback(_check)
126 def judge_no_problem(self, vr):
127 """ Verify says the file is healthy when none of the shares have been
128 touched in a way that matters. It doesn't use more than seven times
129 as many reads as it needs."""
130 self.failUnless(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
132 self.failUnless(data['count-shares-good'] == 10, data)
133 self.failUnless(len(data['sharemap']) == 10, data)
134 self.failUnless(data['count-shares-needed'] == 3, data)
135 self.failUnless(data['count-shares-expected'] == 10, data)
136 self.failUnless(data['count-good-share-hosts'] == 10, data)
137 self.failUnless(len(data['servers-responding']) == 10, data)
138 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
140 def test_ok_no_corruption(self):
141 self.basedir = "repairer/Verifier/ok_no_corruption"
142 return self._help_test_verify(common._corrupt_nothing,
143 self.judge_no_problem)
145 def test_ok_filedata_size(self):
146 self.basedir = "repairer/Verifier/ok_filedatasize"
147 return self._help_test_verify(common._corrupt_size_of_file_data,
148 self.judge_no_problem)
150 def test_ok_sharedata_size(self):
151 self.basedir = "repairer/Verifier/ok_sharedata_size"
152 return self._help_test_verify(common._corrupt_size_of_sharedata,
153 self.judge_no_problem)
155 def test_ok_segment_size(self):
156 self.basedir = "repairer/Verifier/test_ok_segment_size"
157 return self._help_test_verify(common._corrupt_segment_size,
158 self.judge_no_problem)
160 def judge_visible_corruption(self, vr):
161 """Corruption which is detected by the server means that the server
162 will send you back a Failure in response to get_bucket instead of
163 giving you the share data. Test that verifier handles these answers
164 correctly. It doesn't use more than seven times as many reads as it
166 self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
168 self.failUnless(data['count-shares-good'] == 9, data)
169 self.failUnless(len(data['sharemap']) == 9, data)
170 self.failUnless(data['count-shares-needed'] == 3, data)
171 self.failUnless(data['count-shares-expected'] == 10, data)
172 self.failUnless(data['count-good-share-hosts'] == 9, data)
173 self.failUnless(len(data['servers-responding']) == 10, data)
174 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
176 def test_corrupt_file_verno(self):
177 self.basedir = "repairer/Verifier/corrupt_file_verno"
178 return self._help_test_verify(common._corrupt_file_version_number,
179 self.judge_visible_corruption)
181 def judge_share_version_incompatibility(self, vr):
182 # corruption of the share version (inside the container, the 1/2
183 # value that determines whether we've got 4-byte offsets or 8-byte
184 # offsets) to something larger than 2 will trigger a
185 # ShareVersionIncompatible exception, which should be counted in
186 # list-incompatible-shares, rather than list-corrupt-shares.
187 self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
189 self.failUnlessEqual(data['count-shares-good'], 9)
190 self.failUnlessEqual(len(data['sharemap']), 9)
191 self.failUnlessEqual(data['count-shares-needed'], 3)
192 self.failUnlessEqual(data['count-shares-expected'], 10)
193 self.failUnlessEqual(data['count-good-share-hosts'], 9)
194 self.failUnlessEqual(len(data['servers-responding']), 10)
195 self.failUnlessEqual(len(data['list-corrupt-shares']), 0)
196 self.failUnlessEqual(data['count-corrupt-shares'], 0)
197 self.failUnlessEqual(len(data['list-incompatible-shares']), 1)
198 self.failUnlessEqual(data['count-incompatible-shares'], 1)
200 def test_corrupt_share_verno(self):
201 self.basedir = "repairer/Verifier/corrupt_share_verno"
202 return self._help_test_verify(common._corrupt_sharedata_version_number,
203 self.judge_share_version_incompatibility)
205 def judge_invisible_corruption(self, vr):
206 # corruption of fields that the server does not check (which is most
207 # of them), which will be detected by the client as it downloads
209 self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
211 self.failUnlessEqual(data['count-shares-good'], 9)
212 self.failUnlessEqual(data['count-shares-needed'], 3)
213 self.failUnlessEqual(data['count-shares-expected'], 10)
214 self.failUnlessEqual(data['count-good-share-hosts'], 9)
215 self.failUnlessEqual(data['count-corrupt-shares'], 1)
216 self.failUnlessEqual(len(data['list-corrupt-shares']), 1)
217 self.failUnlessEqual(data['count-incompatible-shares'], 0)
218 self.failUnlessEqual(len(data['list-incompatible-shares']), 0)
219 self.failUnlessEqual(len(data['servers-responding']), 10)
220 self.failUnlessEqual(len(data['sharemap']), 9)
222 def test_corrupt_sharedata_offset(self):
223 self.basedir = "repairer/Verifier/corrupt_sharedata_offset"
224 return self._help_test_verify(common._corrupt_offset_of_sharedata,
225 self.judge_invisible_corruption)
227 def test_corrupt_ueb_offset(self):
228 self.basedir = "repairer/Verifier/corrupt_ueb_offset"
229 return self._help_test_verify(common._corrupt_offset_of_uri_extension,
230 self.judge_invisible_corruption)
232 def test_corrupt_ueb_offset_shortread(self):
233 self.basedir = "repairer/Verifier/corrupt_ueb_offset_shortread"
234 return self._help_test_verify(common._corrupt_offset_of_uri_extension_to_force_short_read,
235 self.judge_invisible_corruption)
237 def test_corrupt_sharedata(self):
238 self.basedir = "repairer/Verifier/corrupt_sharedata"
239 return self._help_test_verify(common._corrupt_share_data,
240 self.judge_invisible_corruption)
242 def test_corrupt_ueb_length(self):
243 self.basedir = "repairer/Verifier/corrupt_ueb_length"
244 return self._help_test_verify(common._corrupt_length_of_uri_extension,
245 self.judge_invisible_corruption)
247 def test_corrupt_ueb(self):
248 self.basedir = "repairer/Verifier/corrupt_ueb"
249 return self._help_test_verify(common._corrupt_uri_extension,
250 self.judge_invisible_corruption)
252 def test_truncate_crypttext_hashtree(self):
253 # change the start of the block hashtree, to truncate the preceding
255 self.basedir = "repairer/Verifier/truncate_crypttext_hashtree"
256 return self._help_test_verify(common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes,
257 self.judge_invisible_corruption)
259 def test_corrupt_block_hashtree_offset(self):
260 self.basedir = "repairer/Verifier/corrupt_block_hashtree_offset"
261 return self._help_test_verify(common._corrupt_offset_of_block_hashes,
262 self.judge_invisible_corruption)
264 def test_wrong_share_verno(self):
265 self.basedir = "repairer/Verifier/wrong_share_verno"
266 return self._help_test_verify(common._corrupt_sharedata_version_number_to_plausible_version,
267 self.judge_invisible_corruption)
269 def test_corrupt_share_hashtree_offset(self):
270 self.basedir = "repairer/Verifier/corrupt_share_hashtree_offset"
271 return self._help_test_verify(common._corrupt_offset_of_share_hashes,
272 self.judge_invisible_corruption)
274 def test_corrupt_crypttext_hashtree_offset(self):
275 self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_offset"
276 return self._help_test_verify(common._corrupt_offset_of_ciphertext_hash_tree,
277 self.judge_invisible_corruption)
279 def test_corrupt_crypttext_hashtree(self):
280 self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree"
281 return self._help_test_verify(common._corrupt_crypttext_hash_tree,
282 self.judge_invisible_corruption)
284 def test_corrupt_crypttext_hashtree_byte_x221(self):
285 self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_byte_9_bit_7"
286 return self._help_test_verify(common._corrupt_crypttext_hash_tree_byte_x221,
287 self.judge_invisible_corruption, debug=True)
289 def test_corrupt_block_hashtree(self):
290 self.basedir = "repairer/Verifier/corrupt_block_hashtree"
291 return self._help_test_verify(common._corrupt_block_hashes,
292 self.judge_invisible_corruption)
294 def test_corrupt_share_hashtree(self):
295 self.basedir = "repairer/Verifier/corrupt_share_hashtree"
296 return self._help_test_verify(common._corrupt_share_hashes,
297 self.judge_invisible_corruption)
299 # TODO: the Verifier should decode to ciphertext and check it against the
300 # crypttext-hash-tree. Check this by constructing a bogus file, in which
301 # the crypttext-hash-tree is modified after encoding is done, but before
302 # the UEB is finalized. The Verifier should see a valid
303 # crypttext-hash-tree but then the ciphertext should show up as invalid.
304 # Normally this could only be triggered by a bug in FEC decode.
306 def OFF_test_each_byte(self):
307 # this test takes 140s to run on my laptop, and doesn't have any
308 # actual asserts, so it's commented out. It corrupts each byte of the
309 # share in sequence, and checks to see which ones the Verifier
310 # catches and which it misses. Ticket #819 contains details: there
311 # are several portions of the share that are unused, for which
312 # corruption is not supposed to be caught.
314 # If the test ran quickly, we could use the share size to compute the
315 # offsets of these unused portions and assert that everything outside
316 # of them was detected. We could then replace the rest of
317 # Verifier.test_* (which takes 16s to run on my laptop) with this
319 self.basedir = "repairer/Verifier/each_byte"
320 self.set_up_grid(num_clients=2)
321 d = self.upload_and_stash()
323 self.sh0_file = [sharefile
324 for (shnum, serverid, sharefile)
325 in self.find_shares(self.uri)
327 self.sh0_orig = open(self.sh0_file, "rb").read()
328 d.addCallback(_grab_sh0)
330 f = open(self.sh0_file, "wb")
331 f.write(self.sh0_orig)
333 def _corrupt(ign, which):
334 def _corruptor(s, debug=False):
335 return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
336 self.corrupt_shares_numbered(self.uri, [0], _corruptor)
338 def _did_check(vr, i):
339 #print "corrupt %d: healthy=%s" % (i, vr.is_healthy())
340 results[i] = vr.is_healthy()
342 d = defer.succeed(None)
343 for i in range(len(self.sh0_orig)):
344 d.addCallback(_corrupt, i)
345 d.addCallback(lambda ign:
346 self.c1_filenode.check(Monitor(), verify=True))
347 d.addCallback(_did_check, i)
348 d.addCallback(_fix_sh0)
350 d.addCallback(_start)
351 def _show_results(ign):
352 f = open("test_each_byte_output", "w")
353 for i in sorted(results.keys()):
354 print >>f, "%d: %s" % (i, results[i])
356 print "Please look in _trial_temp/test_each_byte_output for results"
357 d.addCallback(_show_results)
360 # We'll allow you to pass this test even if you trigger thirty-five times as
361 # many block sends and disk writes as would be optimal.
363 # Optimally, you could repair one of these (small) files in a single write.
364 DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY
366 class DownUpConnector(unittest.TestCase):
367 def test_deferred_satisfaction(self):
368 duc = repairer.DownUpConnector()
369 duc.registerProducer(None, True) # just because you have to call registerProducer first
370 # case 1: total data in buf is < requested data at time of request
372 d = duc.read_encrypted(2, False)
374 self.failUnlessEqual(len(data), 2)
375 self.failUnlessEqual(data[0], '\x01')
376 self.failUnlessEqual(data[1], '\x02')
381 def test_extra(self):
382 duc = repairer.DownUpConnector()
383 duc.registerProducer(None, True) # just because you have to call registerProducer first
384 # case 1: total data in buf is < requested data at time of request
386 d = duc.read_encrypted(2, False)
388 self.failUnlessEqual(len(data), 2)
389 self.failUnlessEqual(data[0], '\x01')
390 self.failUnlessEqual(data[1], '\x02')
392 duc.write('\x02\0x03')
395 def test_short_reads_1(self):
396 # You don't get fewer bytes than you requested -- instead you get no callback at all.
397 duc = repairer.DownUpConnector()
398 duc.registerProducer(None, True) # just because you have to call registerProducer first
400 d = duc.read_encrypted(2, False)
404 self.fail("Shouldn't have gotten this callback res: %s" % (res,))
405 d.addCallback(_callb)
407 # Also in the other order of read-vs-write:
408 duc2 = repairer.DownUpConnector()
409 duc2.registerProducer(None, True) # just because you have to call registerProducer first
411 d = duc2.read_encrypted(2, False)
414 self.fail("Shouldn't have gotten this callback res: %s" % (res,))
415 d.addCallback(_callb2)
417 # But once the DUC is closed then you *do* get short reads.
418 duc3 = repairer.DownUpConnector()
419 duc3.registerProducer(None, True) # just because you have to call registerProducer first
421 d = duc3.read_encrypted(2, False)
425 self.failUnlessEqual(len(res), 1)
426 self.failUnlessEqual(res[0], '\x04')
427 d.addCallback(_callb3)
430 def test_short_reads_2(self):
431 # Also in the other order of read-vs-write.
432 duc = repairer.DownUpConnector()
433 duc.registerProducer(None, True) # just because you have to call registerProducer first
436 d = duc.read_encrypted(2, False)
440 self.failUnlessEqual(len(res), 1)
441 self.failUnlessEqual(res[0], '\x04')
442 d.addCallback(_callb)
445 def test_short_reads_3(self):
446 # Also if it is closed before the read.
447 duc = repairer.DownUpConnector()
448 duc.registerProducer(None, True) # just because you have to call registerProducer first
452 d = duc.read_encrypted(2, False)
454 self.failUnlessEqual(len(res), 1)
455 self.failUnlessEqual(res[0], '\x04')
456 d.addCallback(_callb)
459 class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
460 common.ShouldFailMixin):
462 def test_harness(self):
463 # This test is actually to make sure our test harness works, rather
464 # than testing anything about Tahoe code itself.
466 self.basedir = "repairer/Repairer/test_code"
467 self.set_up_grid(num_clients=2)
468 d = self.upload_and_stash()
470 d.addCallback(lambda ignored: self.find_shares(self.uri))
471 def _stash_shares(oldshares):
472 self.oldshares = oldshares
473 d.addCallback(_stash_shares)
474 d.addCallback(lambda ignored: self.find_shares(self.uri))
475 def _compare(newshares):
476 self.failUnlessEqual(newshares, self.oldshares)
477 d.addCallback(_compare)
479 def _delete_8(ignored):
480 shnum = self.oldshares[0][0]
481 self.delete_shares_numbered(self.uri, [shnum])
482 for sh in self.oldshares[1:8]:
483 self.delete_share(sh)
484 d.addCallback(_delete_8)
485 d.addCallback(lambda ignored: self.find_shares(self.uri))
486 d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
488 d.addCallback(lambda ignored:
489 self.shouldFail(NotEnoughSharesError, "then_download",
491 download_to_data, self.c1_filenode))
493 d.addCallback(lambda ignored:
494 self.shouldFail(NotEnoughSharesError, "then_repair",
496 self.c1_filenode.check_and_repair,
497 Monitor(), verify=False))
499 # test share corruption
500 def _test_corrupt(ignored):
502 shares = self.find_shares(self.uri)
503 for (shnum, serverid, sharefile) in shares:
504 olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
506 self.corrupt_share(sh, common._corrupt_uri_extension)
507 for (shnum, serverid, sharefile) in shares:
508 newdata = open(sharefile, "rb").read()
509 self.failIfEqual(olddata[ (shnum, serverid) ], newdata)
510 d.addCallback(_test_corrupt)
512 def _remove_all(ignored):
513 for sh in self.find_shares(self.uri):
514 self.delete_share(sh)
515 d.addCallback(_remove_all)
516 d.addCallback(lambda ignored: self.find_shares(self.uri))
517 d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
521 def test_repair_from_deletion_of_1(self):
522 """ Repair replaces a share that got deleted. """
523 self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
524 self.set_up_grid(num_clients=2)
525 d = self.upload_and_stash()
527 d.addCallback(lambda ignored:
528 self.delete_shares_numbered(self.uri, [2]))
529 d.addCallback(lambda ignored: self._stash_counts())
530 d.addCallback(lambda ignored:
531 self.c0_filenode.check_and_repair(Monitor(),
533 def _check_results(crr):
534 self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
535 pre = crr.get_pre_repair_results()
536 self.failUnlessIsInstance(pre, check_results.CheckResults)
537 post = crr.get_post_repair_results()
538 self.failUnlessIsInstance(post, check_results.CheckResults)
539 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
540 self.failIfBigger(delta_reads, MAX_DELTA_READS)
541 self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE)
542 self.failIf(pre.is_healthy())
543 self.failUnless(post.is_healthy())
545 # Now we inspect the filesystem to make sure that it has 10
547 shares = self.find_shares(self.uri)
548 self.failIf(len(shares) < 10)
549 d.addCallback(_check_results)
551 d.addCallback(lambda ignored:
552 self.c0_filenode.check(Monitor(), verify=True))
553 d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
555 # Now we delete seven of the other shares, then try to download the
556 # file and assert that it succeeds at downloading and has the right
557 # contents. This can't work unless it has already repaired the
558 # previously-deleted share #2.
560 d.addCallback(lambda ignored:
561 self.delete_shares_numbered(self.uri, range(3, 10+1)))
562 d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
563 d.addCallback(lambda newdata:
564 self.failUnlessEqual(newdata, common.TEST_DATA))
567 def test_repair_from_deletion_of_7(self):
568 """ Repair replaces seven shares that got deleted. """
569 self.basedir = "repairer/Repairer/repair_from_deletion_of_7"
570 self.set_up_grid(num_clients=2)
571 d = self.upload_and_stash()
572 d.addCallback(lambda ignored:
573 self.delete_shares_numbered(self.uri, range(7)))
574 d.addCallback(lambda ignored: self._stash_counts())
575 d.addCallback(lambda ignored:
576 self.c0_filenode.check_and_repair(Monitor(),
578 def _check_results(crr):
579 self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
580 pre = crr.get_pre_repair_results()
581 self.failUnlessIsInstance(pre, check_results.CheckResults)
582 post = crr.get_post_repair_results()
583 self.failUnlessIsInstance(post, check_results.CheckResults)
584 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
586 self.failIfBigger(delta_reads, MAX_DELTA_READS)
587 self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7))
588 self.failIf(pre.is_healthy())
589 self.failUnless(post.is_healthy(), post.data)
591 # Make sure we really have 10 shares.
592 shares = self.find_shares(self.uri)
593 self.failIf(len(shares) < 10)
594 d.addCallback(_check_results)
596 d.addCallback(lambda ignored:
597 self.c0_filenode.check(Monitor(), verify=True))
598 d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
600 # Now we delete seven of the other shares, then try to download the
601 # file and assert that it succeeds at downloading and has the right
602 # contents. This can't work unless it has already repaired the
603 # previously-deleted share #2.
605 d.addCallback(lambda ignored:
606 self.delete_shares_numbered(self.uri, range(3, 10+1)))
607 d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
608 d.addCallback(lambda newdata:
609 self.failUnlessEqual(newdata, common.TEST_DATA))
612 # why is test_repair_from_corruption_of_1 disabled? Read on:
614 # As recently documented in NEWS for the 1.3.0 release, the current
615 # immutable repairer suffers from several limitations:
617 # * minimalistic verifier: it's just download without decryption, so we
618 # don't look for corruption in N-k shares, and for many fields (those
619 # which are the same in all shares) we only look for corruption in a
622 # * some kinds of corruption cause download to fail (when it ought to
623 # just switch to a different share), so repair will fail on these too
625 # * RIStorageServer doesn't offer a way to delete old corrupt immutable
626 # shares (the authority model is not at all clear), so the best the
627 # repairer can do is to put replacement shares on new servers,
628 # unfortunately leaving the corrupt shares in place
630 # This test is pretty strenuous: it asserts that the repairer does the
631 # ideal thing in 8 distinct situations, with randomized corruption in
632 # each. Because of the aforementioned limitations, it is highly unlikely
633 # to pass any of these. We're also concerned that the download-fails case
634 # can provoke a lost-progress bug (one was fixed, but there might be more
635 # lurking), which will cause the test to fail despite a ".todo" marker,
636 # and will probably cause subsequent unrelated tests to fail too (due to
637 # "unclean reactor" problems).
639 # In addition, I (warner) have recently refactored the rest of this class
640 # to use the much-faster no_network.GridTestMixin, so this tests needs to
641 # be updated before it will be able to run again.
643 # So we're turning this test off until we've done one or more of the
645 # * remove some of these limitations
646 # * break the test up into smaller, more functionally-oriented pieces
647 # * simplify the repairer enough to let us be confident that it is free
648 # of lost-progress bugs
650 def OFF_test_repair_from_corruption_of_1(self):
651 d = defer.succeed(None)
653 d.addCallback(self.find_shares)
658 d.addCallback(_stash_it)
659 def _put_it_all_back(ignored):
660 self.replace_shares(stash[0], storage_index=self.uri.get_storage_index())
663 def _repair_from_corruption(shnum, corruptor_func):
664 before_repair_reads = self._count_reads()
665 before_repair_allocates = self._count_writes()
667 d2 = self.filenode.check_and_repair(Monitor(), verify=True)
668 def _after_repair(checkandrepairresults):
669 prerepairres = checkandrepairresults.get_pre_repair_results()
670 postrepairres = checkandrepairresults.get_post_repair_results()
671 after_repair_reads = self._count_reads()
672 after_repair_allocates = self._count_writes()
674 # The "* 2" in reads is because you might read a whole share
675 # before figuring out that it is corrupted. It might be
676 # possible to make this delta reads number a little tighter.
677 self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads))
678 # The "* 2" in writes is because each server has two shares,
679 # and it is reasonable for repairer to conclude that there
680 # are two shares that it should upload, if the server fails
681 # to serve the first share.
682 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 2), (after_repair_allocates, before_repair_allocates))
683 self.failIf(prerepairres.is_healthy(), (prerepairres.data, corruptor_func))
684 self.failUnless(postrepairres.is_healthy(), (postrepairres.data, corruptor_func))
686 # Now we inspect the filesystem to make sure that it has 10
688 shares = self.find_shares()
689 self.failIf(len(shares) < 10)
691 # Now we assert that the verifier reports the file as healthy.
692 d3 = self.filenode.check(Monitor(), verify=True)
693 def _after_verify(verifyresults):
694 self.failUnless(verifyresults.is_healthy())
695 d3.addCallback(_after_verify)
697 # Now we delete seven of the other shares, then try to
698 # download the file and assert that it succeeds at
699 # downloading and has the right contents. This can't work
700 # unless it has already repaired the previously-corrupted share.
701 def _then_delete_7_and_try_a_download(unused=None):
704 random.shuffle(shnums)
705 for sharenum in shnums[:7]:
706 self._delete_a_share(sharenum=sharenum)
708 return self._download_and_check_plaintext()
709 d3.addCallback(_then_delete_7_and_try_a_download)
712 d2.addCallback(_after_repair)
715 for corruptor_func in (
716 common._corrupt_file_version_number,
717 common._corrupt_sharedata_version_number,
718 common._corrupt_offset_of_sharedata,
719 common._corrupt_offset_of_uri_extension,
720 common._corrupt_offset_of_uri_extension_to_force_short_read,
721 common._corrupt_share_data,
722 common._corrupt_length_of_uri_extension,
723 common._corrupt_uri_extension,
725 # Now we corrupt a share...
726 d.addCallback(self._corrupt_a_random_share, corruptor_func)
728 d.addCallback(_repair_from_corruption, corruptor_func)
731 #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
734 # XXX extend these tests to show that the checker detects which specific
735 # share on which specific server is broken -- this is necessary so that the
736 # checker results can be passed to the repairer and the repairer can go ahead
737 # and upload fixes without first doing what is effectively a check (/verify)
740 # XXX extend these tests to show bad behavior of various kinds from servers:
741 # raising exception from each remove_foo() method, for example
743 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
745 # XXX test corruption that truncates other hash trees than just the crypttext
748 # XXX test the notify-someone-about-corruption feature (also implement that
751 # XXX test whether repairer (downloader) correctly downloads a file even if
752 # to do so it has to acquire shares from a server that has already tried to
753 # serve it a corrupted share. (I don't think the current downloader would
754 # pass this test, depending on the kind of corruption.)