1 # -*- coding: utf-8 -*-
2 from allmydata.test import common
3 from allmydata.monitor import Monitor
4 from allmydata import check_results
5 from allmydata.interfaces import NotEnoughSharesError
6 from allmydata.immutable import upload
7 from allmydata.util.consumer import download_to_data
8 from twisted.internet import defer
9 from twisted.trial import unittest
11 from allmydata.test.no_network import GridTestMixin
13 # We'll allow you to pass this test even if you trigger eighteen times as
14 # many disk reads and block fetches as would be optimal.
16 MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10
18 timeout=240 # François's ARM box timed out after 120 seconds of Verifier.test_corrupt_crypttext_hashtree
20 class RepairTestMixin:
21 def failUnlessIsInstance(self, x, xtype):
22 self.failUnless(isinstance(x, xtype), x)
24 def _count_reads(self):
25 sum_of_read_counts = 0
26 for (i, ss, storedir) in self.iterate_servers():
27 counters = ss.stats_provider.get_stats()['counters']
28 sum_of_read_counts += counters.get('storage_server.read', 0)
29 return sum_of_read_counts
31 def _count_allocates(self):
32 sum_of_allocate_counts = 0
33 for (i, ss, storedir) in self.iterate_servers():
34 counters = ss.stats_provider.get_stats()['counters']
35 sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
36 return sum_of_allocate_counts
38 def _count_writes(self):
39 sum_of_write_counts = 0
40 for (i, ss, storedir) in self.iterate_servers():
41 counters = ss.stats_provider.get_stats()['counters']
42 sum_of_write_counts += counters.get('storage_server.write', 0)
43 return sum_of_write_counts
45 def _stash_counts(self):
46 self.before_repair_reads = self._count_reads()
47 self.before_repair_allocates = self._count_allocates()
48 self.before_repair_writes = self._count_writes()
50 def _get_delta_counts(self):
51 delta_reads = self._count_reads() - self.before_repair_reads
52 delta_allocates = self._count_allocates() - self.before_repair_allocates
53 delta_writes = self._count_writes() - self.before_repair_writes
54 return (delta_reads, delta_allocates, delta_writes)
56 def failIfBigger(self, x, y):
57 self.failIf(x > y, "%s > %s" % (x, y))
59 def upload_and_stash(self):
60 c0 = self.g.clients[0]
61 c1 = self.g.clients[1]
62 c0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
63 d = c0.upload(upload.Data(common.TEST_DATA, convergence=""))
66 self.c0_filenode = c0.create_node_from_uri(ur.uri)
67 self.c1_filenode = c1.create_node_from_uri(ur.uri)
68 d.addCallback(_stash_uri)
71 class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin):
72 def test_check_without_verify(self):
73 """Check says the file is healthy when none of the shares have been
74 touched. It says that the file is unhealthy when all of them have
75 been removed. It doesn't use any reads.
77 self.basedir = "repairer/Verifier/check_without_verify"
78 self.set_up_grid(num_clients=2)
79 d = self.upload_and_stash()
80 d.addCallback(lambda ignored: self._stash_counts())
81 d.addCallback(lambda ignored:
82 self.c0_filenode.check(Monitor(), verify=False))
84 self.failUnless(cr.is_healthy())
85 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
86 self.failIfBigger(delta_reads, 0)
89 def _remove_all(ignored):
90 for sh in self.find_uri_shares(self.uri):
92 d.addCallback(_remove_all)
94 d.addCallback(lambda ignored: self._stash_counts())
95 d.addCallback(lambda ignored:
96 self.c0_filenode.check(Monitor(), verify=False))
98 self.failIf(cr.is_healthy())
99 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
100 self.failIfBigger(delta_reads, 0)
101 d.addCallback(_check2)
104 def _help_test_verify(self, corruptor, judgement, shnum=0, debug=False):
105 self.set_up_grid(num_clients=2)
106 d = self.upload_and_stash()
107 d.addCallback(lambda ignored: self._stash_counts())
109 d.addCallback(lambda ignored:
110 self.corrupt_shares_numbered(self.uri, [shnum],corruptor,debug=debug))
111 d.addCallback(lambda ignored:
112 self.c1_filenode.check(Monitor(), verify=True))
114 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
115 self.failIfBigger(delta_reads, MAX_DELTA_READS)
118 except unittest.FailTest, e:
119 # FailTest just uses e.args[0] == str
120 new_arg = str(e.args[0]) + "\nvr.data is: " + str(vr.get_data())
123 d.addCallback(_check)
126 def judge_no_problem(self, vr):
127 """ Verify says the file is healthy when none of the shares have been
128 touched in a way that matters. It doesn't use more than seven times
129 as many reads as it needs."""
130 self.failUnless(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
132 self.failUnless(data['count-shares-good'] == 10, data)
133 self.failUnless(len(data['sharemap']) == 10, data)
134 self.failUnless(data['count-shares-needed'] == 3, data)
135 self.failUnless(data['count-shares-expected'] == 10, data)
136 self.failUnless(data['count-good-share-hosts'] == 10, data)
137 self.failUnless(len(data['servers-responding']) == 10, data)
138 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
140 def test_ok_no_corruption(self):
141 self.basedir = "repairer/Verifier/ok_no_corruption"
142 return self._help_test_verify(common._corrupt_nothing,
143 self.judge_no_problem)
145 def test_ok_filedata_size(self):
146 self.basedir = "repairer/Verifier/ok_filedatasize"
147 return self._help_test_verify(common._corrupt_size_of_file_data,
148 self.judge_no_problem)
150 def test_ok_sharedata_size(self):
151 self.basedir = "repairer/Verifier/ok_sharedata_size"
152 return self._help_test_verify(common._corrupt_size_of_sharedata,
153 self.judge_no_problem)
155 def test_ok_segment_size(self):
156 self.basedir = "repairer/Verifier/test_ok_segment_size"
157 return self._help_test_verify(common._corrupt_segment_size,
158 self.judge_no_problem)
160 def judge_visible_corruption(self, vr):
161 """Corruption which is detected by the server means that the server
162 will send you back a Failure in response to get_bucket instead of
163 giving you the share data. Test that verifier handles these answers
164 correctly. It doesn't use more than seven times as many reads as it
166 self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
168 self.failUnless(data['count-shares-good'] == 9, data)
169 self.failUnless(len(data['sharemap']) == 9, data)
170 self.failUnless(data['count-shares-needed'] == 3, data)
171 self.failUnless(data['count-shares-expected'] == 10, data)
172 self.failUnless(data['count-good-share-hosts'] == 9, data)
173 self.failUnless(len(data['servers-responding']) == 10, data)
174 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
176 def test_corrupt_file_verno(self):
177 self.basedir = "repairer/Verifier/corrupt_file_verno"
178 return self._help_test_verify(common._corrupt_file_version_number,
179 self.judge_visible_corruption)
181 def judge_share_version_incompatibility(self, vr):
182 # corruption of the share version (inside the container, the 1/2
183 # value that determines whether we've got 4-byte offsets or 8-byte
184 # offsets) to something larger than 2 will trigger a
185 # ShareVersionIncompatible exception, which should be counted in
186 # list-incompatible-shares, rather than list-corrupt-shares.
187 self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
189 self.failUnlessEqual(data['count-shares-good'], 9)
190 self.failUnlessEqual(len(data['sharemap']), 9)
191 self.failUnlessEqual(data['count-shares-needed'], 3)
192 self.failUnlessEqual(data['count-shares-expected'], 10)
193 self.failUnlessEqual(data['count-good-share-hosts'], 9)
194 self.failUnlessEqual(len(data['servers-responding']), 10)
195 self.failUnlessEqual(len(data['list-corrupt-shares']), 0)
196 self.failUnlessEqual(data['count-corrupt-shares'], 0)
197 self.failUnlessEqual(len(data['list-incompatible-shares']), 1)
198 self.failUnlessEqual(data['count-incompatible-shares'], 1)
200 def test_corrupt_share_verno(self):
201 self.basedir = "repairer/Verifier/corrupt_share_verno"
202 return self._help_test_verify(common._corrupt_sharedata_version_number,
203 self.judge_share_version_incompatibility)
205 def judge_invisible_corruption(self, vr):
206 # corruption of fields that the server does not check (which is most
207 # of them), which will be detected by the client as it downloads
209 self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
211 self.failUnlessEqual(data['count-shares-good'], 9)
212 self.failUnlessEqual(data['count-shares-needed'], 3)
213 self.failUnlessEqual(data['count-shares-expected'], 10)
214 self.failUnlessEqual(data['count-good-share-hosts'], 9)
215 self.failUnlessEqual(data['count-corrupt-shares'], 1)
216 self.failUnlessEqual(len(data['list-corrupt-shares']), 1)
217 self.failUnlessEqual(data['count-incompatible-shares'], 0)
218 self.failUnlessEqual(len(data['list-incompatible-shares']), 0)
219 self.failUnlessEqual(len(data['servers-responding']), 10)
220 self.failUnlessEqual(len(data['sharemap']), 9)
222 def test_corrupt_sharedata_offset(self):
223 self.basedir = "repairer/Verifier/corrupt_sharedata_offset"
224 return self._help_test_verify(common._corrupt_offset_of_sharedata,
225 self.judge_invisible_corruption)
227 def test_corrupt_ueb_offset(self):
228 self.basedir = "repairer/Verifier/corrupt_ueb_offset"
229 return self._help_test_verify(common._corrupt_offset_of_uri_extension,
230 self.judge_invisible_corruption)
232 def test_corrupt_ueb_offset_shortread(self):
233 self.basedir = "repairer/Verifier/corrupt_ueb_offset_shortread"
234 return self._help_test_verify(common._corrupt_offset_of_uri_extension_to_force_short_read,
235 self.judge_invisible_corruption)
237 def test_corrupt_sharedata(self):
238 self.basedir = "repairer/Verifier/corrupt_sharedata"
239 return self._help_test_verify(common._corrupt_share_data,
240 self.judge_invisible_corruption)
242 def test_corrupt_ueb_length(self):
243 self.basedir = "repairer/Verifier/corrupt_ueb_length"
244 return self._help_test_verify(common._corrupt_length_of_uri_extension,
245 self.judge_invisible_corruption)
247 def test_corrupt_ueb(self):
248 self.basedir = "repairer/Verifier/corrupt_ueb"
249 return self._help_test_verify(common._corrupt_uri_extension,
250 self.judge_invisible_corruption)
252 def test_truncate_crypttext_hashtree(self):
253 # change the start of the block hashtree, to truncate the preceding
255 self.basedir = "repairer/Verifier/truncate_crypttext_hashtree"
256 return self._help_test_verify(common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes,
257 self.judge_invisible_corruption)
259 def test_corrupt_block_hashtree_offset(self):
260 self.basedir = "repairer/Verifier/corrupt_block_hashtree_offset"
261 return self._help_test_verify(common._corrupt_offset_of_block_hashes,
262 self.judge_invisible_corruption)
264 def test_wrong_share_verno(self):
265 self.basedir = "repairer/Verifier/wrong_share_verno"
266 return self._help_test_verify(common._corrupt_sharedata_version_number_to_plausible_version,
267 self.judge_invisible_corruption)
269 def test_corrupt_share_hashtree_offset(self):
270 self.basedir = "repairer/Verifier/corrupt_share_hashtree_offset"
271 return self._help_test_verify(common._corrupt_offset_of_share_hashes,
272 self.judge_invisible_corruption)
274 def test_corrupt_crypttext_hashtree_offset(self):
275 self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_offset"
276 return self._help_test_verify(common._corrupt_offset_of_ciphertext_hash_tree,
277 self.judge_invisible_corruption)
279 def test_corrupt_crypttext_hashtree(self):
280 self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree"
281 return self._help_test_verify(common._corrupt_crypttext_hash_tree,
282 self.judge_invisible_corruption)
284 def test_corrupt_crypttext_hashtree_byte_x221(self):
285 self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_byte_9_bit_7"
286 return self._help_test_verify(common._corrupt_crypttext_hash_tree_byte_x221,
287 self.judge_invisible_corruption, debug=True)
289 def test_corrupt_block_hashtree(self):
290 self.basedir = "repairer/Verifier/corrupt_block_hashtree"
291 return self._help_test_verify(common._corrupt_block_hashes,
292 self.judge_invisible_corruption)
294 def test_corrupt_share_hashtree(self):
295 self.basedir = "repairer/Verifier/corrupt_share_hashtree"
296 return self._help_test_verify(common._corrupt_share_hashes,
297 self.judge_invisible_corruption)
299 # TODO: the Verifier should decode to ciphertext and check it against the
300 # crypttext-hash-tree. Check this by constructing a bogus file, in which
301 # the crypttext-hash-tree is modified after encoding is done, but before
302 # the UEB is finalized. The Verifier should see a valid
303 # crypttext-hash-tree but then the ciphertext should show up as invalid.
304 # Normally this could only be triggered by a bug in FEC decode.
306 def OFF_test_each_byte(self):
307 # this test takes 140s to run on my laptop, and doesn't have any
308 # actual asserts, so it's commented out. It corrupts each byte of the
309 # share in sequence, and checks to see which ones the Verifier
310 # catches and which it misses. Ticket #819 contains details: there
311 # are several portions of the share that are unused, for which
312 # corruption is not supposed to be caught.
314 # If the test ran quickly, we could use the share size to compute the
315 # offsets of these unused portions and assert that everything outside
316 # of them was detected. We could then replace the rest of
317 # Verifier.test_* (which takes 16s to run on my laptop) with this
319 self.basedir = "repairer/Verifier/each_byte"
320 self.set_up_grid(num_clients=2)
321 d = self.upload_and_stash()
323 self.sh0_file = [sharefile
324 for (shnum, serverid, sharefile)
325 in self.find_uri_shares(self.uri)
327 self.sh0_orig = open(self.sh0_file, "rb").read()
328 d.addCallback(_grab_sh0)
330 f = open(self.sh0_file, "wb")
331 f.write(self.sh0_orig)
333 def _corrupt(ign, which):
334 def _corruptor(s, debug=False):
335 return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
336 self.corrupt_shares_numbered(self.uri, [0], _corruptor)
338 def _did_check(vr, i):
339 #print "corrupt %d: healthy=%s" % (i, vr.is_healthy())
340 results[i] = vr.is_healthy()
342 d = defer.succeed(None)
343 for i in range(len(self.sh0_orig)):
344 d.addCallback(_corrupt, i)
345 d.addCallback(lambda ign:
346 self.c1_filenode.check(Monitor(), verify=True))
347 d.addCallback(_did_check, i)
348 d.addCallback(_fix_sh0)
350 d.addCallback(_start)
351 def _show_results(ign):
352 f = open("test_each_byte_output", "w")
353 for i in sorted(results.keys()):
354 print >>f, "%d: %s" % (i, results[i])
356 print "Please look in _trial_temp/test_each_byte_output for results"
357 d.addCallback(_show_results)
360 # We'll allow you to pass this test even if you trigger thirty-five times as
361 # many block sends and disk writes as would be optimal.
363 # Optimally, you could repair one of these (small) files in a single write.
364 DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY
366 class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
367 common.ShouldFailMixin):
369 def test_harness(self):
370 # This test is actually to make sure our test harness works, rather
371 # than testing anything about Tahoe code itself.
373 self.basedir = "repairer/Repairer/test_code"
374 self.set_up_grid(num_clients=2)
375 d = self.upload_and_stash()
377 d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
378 def _stash_shares(oldshares):
379 self.oldshares = oldshares
380 d.addCallback(_stash_shares)
381 d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
382 def _compare(newshares):
383 self.failUnlessEqual(newshares, self.oldshares)
384 d.addCallback(_compare)
386 def _delete_8(ignored):
387 shnum = self.oldshares[0][0]
388 self.delete_shares_numbered(self.uri, [shnum])
389 for sh in self.oldshares[1:8]:
390 self.delete_share(sh)
391 d.addCallback(_delete_8)
392 d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
393 d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
395 d.addCallback(lambda ignored:
396 self.shouldFail(NotEnoughSharesError, "then_download",
398 download_to_data, self.c1_filenode))
400 d.addCallback(lambda ignored:
401 self.shouldFail(NotEnoughSharesError, "then_repair",
403 self.c1_filenode.check_and_repair,
404 Monitor(), verify=False))
406 # test share corruption
407 def _test_corrupt(ignored):
409 shares = self.find_uri_shares(self.uri)
410 for (shnum, serverid, sharefile) in shares:
411 olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
413 self.corrupt_share(sh, common._corrupt_uri_extension)
414 for (shnum, serverid, sharefile) in shares:
415 newdata = open(sharefile, "rb").read()
416 self.failIfEqual(olddata[ (shnum, serverid) ], newdata)
417 d.addCallback(_test_corrupt)
419 def _remove_all(ignored):
420 for sh in self.find_uri_shares(self.uri):
421 self.delete_share(sh)
422 d.addCallback(_remove_all)
423 d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
424 d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
428 def test_repair_from_deletion_of_1(self):
429 """ Repair replaces a share that got deleted. """
430 self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
431 self.set_up_grid(num_clients=2)
432 d = self.upload_and_stash()
434 d.addCallback(lambda ignored:
435 self.delete_shares_numbered(self.uri, [2]))
436 d.addCallback(lambda ignored: self._stash_counts())
437 d.addCallback(lambda ignored:
438 self.c0_filenode.check_and_repair(Monitor(),
440 def _check_results(crr):
441 self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
442 pre = crr.get_pre_repair_results()
443 self.failUnlessIsInstance(pre, check_results.CheckResults)
444 post = crr.get_post_repair_results()
445 self.failUnlessIsInstance(post, check_results.CheckResults)
446 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
447 self.failIfBigger(delta_reads, MAX_DELTA_READS)
448 self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE)
449 self.failIf(pre.is_healthy())
450 self.failUnless(post.is_healthy())
452 # Now we inspect the filesystem to make sure that it has 10
454 shares = self.find_uri_shares(self.uri)
455 self.failIf(len(shares) < 10)
456 d.addCallback(_check_results)
458 d.addCallback(lambda ignored:
459 self.c0_filenode.check(Monitor(), verify=True))
460 d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
462 # Now we delete seven of the other shares, then try to download the
463 # file and assert that it succeeds at downloading and has the right
464 # contents. This can't work unless it has already repaired the
465 # previously-deleted share #2.
467 d.addCallback(lambda ignored:
468 self.delete_shares_numbered(self.uri, range(3, 10+1)))
469 d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
470 d.addCallback(lambda newdata:
471 self.failUnlessEqual(newdata, common.TEST_DATA))
474 def test_repair_from_deletion_of_7(self):
475 """ Repair replaces seven shares that got deleted. """
476 self.basedir = "repairer/Repairer/repair_from_deletion_of_7"
477 self.set_up_grid(num_clients=2)
478 d = self.upload_and_stash()
479 d.addCallback(lambda ignored:
480 self.delete_shares_numbered(self.uri, range(7)))
481 d.addCallback(lambda ignored: self._stash_counts())
482 d.addCallback(lambda ignored:
483 self.c0_filenode.check_and_repair(Monitor(),
485 def _check_results(crr):
486 self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
487 pre = crr.get_pre_repair_results()
488 self.failUnlessIsInstance(pre, check_results.CheckResults)
489 post = crr.get_post_repair_results()
490 self.failUnlessIsInstance(post, check_results.CheckResults)
491 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
493 self.failIfBigger(delta_reads, MAX_DELTA_READS)
494 self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7))
495 self.failIf(pre.is_healthy())
496 self.failUnless(post.is_healthy(), post.data)
498 # Make sure we really have 10 shares.
499 shares = self.find_uri_shares(self.uri)
500 self.failIf(len(shares) < 10)
501 d.addCallback(_check_results)
503 d.addCallback(lambda ignored:
504 self.c0_filenode.check(Monitor(), verify=True))
505 d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
507 # Now we delete seven of the other shares, then try to download the
508 # file and assert that it succeeds at downloading and has the right
509 # contents. This can't work unless it has already repaired the
510 # previously-deleted share #2.
512 d.addCallback(lambda ignored:
513 self.delete_shares_numbered(self.uri, range(3, 10+1)))
514 d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
515 d.addCallback(lambda newdata:
516 self.failUnlessEqual(newdata, common.TEST_DATA))
519 def test_repairer_servers_of_happiness(self):
520 # The repairer is supposed to generate and place as many of the
521 # missing shares as possible without caring about how they are
523 self.basedir = "repairer/Repairer/repairer_servers_of_happiness"
524 self.set_up_grid(num_clients=2, num_servers=10)
525 d = self.upload_and_stash()
526 # Now delete some servers. We want to leave 3 servers, which
527 # will allow us to restore the file to a healthy state without
528 # distributing the shares widely enough to satisfy the default
530 def _delete_some_servers(ignored):
532 self.g.remove_server(self.g.servers_by_number[i].my_nodeid)
534 assert len(self.g.servers_by_number) == 3
536 d.addCallback(_delete_some_servers)
537 # Now try to repair the file.
538 d.addCallback(lambda ignored:
539 self.c0_filenode.check_and_repair(Monitor(), verify=False))
540 def _check_results(crr):
541 self.failUnlessIsInstance(crr,
542 check_results.CheckAndRepairResults)
543 pre = crr.get_pre_repair_results()
544 post = crr.get_post_repair_results()
545 for p in (pre, post):
546 self.failUnlessIsInstance(p, check_results.CheckResults)
548 self.failIf(pre.is_healthy())
549 self.failUnless(post.is_healthy())
551 d.addCallback(_check_results)
554 # why is test_repair_from_corruption_of_1 disabled? Read on:
556 # As recently documented in NEWS.rst for the 1.3.0 release, the current
557 # immutable repairer suffers from several limitations:
559 # * minimalistic verifier: it's just download without decryption, so we
560 # don't look for corruption in N-k shares, and for many fields (those
561 # which are the same in all shares) we only look for corruption in a
564 # * some kinds of corruption cause download to fail (when it ought to
565 # just switch to a different share), so repair will fail on these too
567 # * RIStorageServer doesn't offer a way to delete old corrupt immutable
568 # shares (the authority model is not at all clear), so the best the
569 # repairer can do is to put replacement shares on new servers,
570 # unfortunately leaving the corrupt shares in place
572 # This test is pretty strenuous: it asserts that the repairer does the
573 # ideal thing in 8 distinct situations, with randomized corruption in
574 # each. Because of the aforementioned limitations, it is highly unlikely
575 # to pass any of these. We're also concerned that the download-fails case
576 # can provoke a lost-progress bug (one was fixed, but there might be more
577 # lurking), which will cause the test to fail despite a ".todo" marker,
578 # and will probably cause subsequent unrelated tests to fail too (due to
579 # "unclean reactor" problems).
581 # In addition, I (warner) have recently refactored the rest of this class
582 # to use the much-faster no_network.GridTestMixin, so this tests needs to
583 # be updated before it will be able to run again.
585 # So we're turning this test off until we've done one or more of the
587 # * remove some of these limitations
588 # * break the test up into smaller, more functionally-oriented pieces
589 # * simplify the repairer enough to let us be confident that it is free
590 # of lost-progress bugs
592 def OFF_test_repair_from_corruption_of_1(self):
593 d = defer.succeed(None)
595 d.addCallback(self.find_all_shares)
600 d.addCallback(_stash_it)
601 def _put_it_all_back(ignored):
602 self.replace_shares(stash[0], storage_index=self.uri.get_storage_index())
605 def _repair_from_corruption(shnum, corruptor_func):
606 before_repair_reads = self._count_reads()
607 before_repair_allocates = self._count_writes()
609 d2 = self.filenode.check_and_repair(Monitor(), verify=True)
610 def _after_repair(checkandrepairresults):
611 prerepairres = checkandrepairresults.get_pre_repair_results()
612 postrepairres = checkandrepairresults.get_post_repair_results()
613 after_repair_reads = self._count_reads()
614 after_repair_allocates = self._count_writes()
616 # The "* 2" in reads is because you might read a whole share
617 # before figuring out that it is corrupted. It might be
618 # possible to make this delta reads number a little tighter.
619 self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads))
620 # The "* 2" in writes is because each server has two shares,
621 # and it is reasonable for repairer to conclude that there
622 # are two shares that it should upload, if the server fails
623 # to serve the first share.
624 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 2), (after_repair_allocates, before_repair_allocates))
625 self.failIf(prerepairres.is_healthy(), (prerepairres.data, corruptor_func))
626 self.failUnless(postrepairres.is_healthy(), (postrepairres.data, corruptor_func))
628 # Now we inspect the filesystem to make sure that it has 10
630 shares = self.find_all_shares()
631 self.failIf(len(shares) < 10)
633 # Now we assert that the verifier reports the file as healthy.
634 d3 = self.filenode.check(Monitor(), verify=True)
635 def _after_verify(verifyresults):
636 self.failUnless(verifyresults.is_healthy())
637 d3.addCallback(_after_verify)
639 # Now we delete seven of the other shares, then try to
640 # download the file and assert that it succeeds at
641 # downloading and has the right contents. This can't work
642 # unless it has already repaired the previously-corrupted share.
643 def _then_delete_7_and_try_a_download(unused=None):
646 random.shuffle(shnums)
647 for sharenum in shnums[:7]:
648 self._delete_a_share(sharenum=sharenum)
650 return self._download_and_check_plaintext()
651 d3.addCallback(_then_delete_7_and_try_a_download)
654 d2.addCallback(_after_repair)
657 for corruptor_func in (
658 common._corrupt_file_version_number,
659 common._corrupt_sharedata_version_number,
660 common._corrupt_offset_of_sharedata,
661 common._corrupt_offset_of_uri_extension,
662 common._corrupt_offset_of_uri_extension_to_force_short_read,
663 common._corrupt_share_data,
664 common._corrupt_length_of_uri_extension,
665 common._corrupt_uri_extension,
667 # Now we corrupt a share...
668 d.addCallback(self._corrupt_a_random_share, corruptor_func)
670 d.addCallback(_repair_from_corruption, corruptor_func)
673 #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
675 def test_tiny_reads(self):
676 # ticket #1223 points out three problems:
677 # repairer reads beyond end of input file
678 # new-downloader does not tolerate overreads
679 # uploader does lots of tiny reads, inefficient
680 self.basedir = "repairer/Repairer/test_tiny_reads"
682 c0 = self.g.clients[0]
684 c0.DEFAULT_ENCODING_PARAMETERS['k'] = 22
685 c0.DEFAULT_ENCODING_PARAMETERS['n'] = 66
686 d = c0.upload(upload.Data(DATA, convergence=""))
689 self.delete_shares_numbered(self.uri, [0])
690 self.c0_filenode = c0.create_node_from_uri(ur.uri)
692 return self.c0_filenode.check_and_repair(Monitor())
695 (r,a,w) = self._get_delta_counts()
696 # when the uploader (driven by the repairer) does full-segment
697 # reads, this makes 44 server read calls (2*k). Before, when it
698 # was doing input_chunk_size reads (7 bytes), it was doing over
700 self.failIf(r > 100, "too many reads: %d>100" % r)
701 d.addCallback(_check)
705 # XXX extend these tests to show that the checker detects which specific
706 # share on which specific server is broken -- this is necessary so that the
707 # checker results can be passed to the repairer and the repairer can go ahead
708 # and upload fixes without first doing what is effectively a check (/verify)
711 # XXX extend these tests to show bad behavior of various kinds from servers:
712 # raising exception from each remove_foo() method, for example
714 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
716 # XXX test corruption that truncates other hash trees than just the crypttext
719 # XXX test the notify-someone-about-corruption feature (also implement that
722 # XXX test whether repairer (downloader) correctly downloads a file even if
723 # to do so it has to acquire shares from a server that has already tried to
724 # serve it a corrupted share. (I don't think the current downloader would
725 # pass this test, depending on the kind of corruption.)