1 # -*- coding: utf-8 -*-
2 from allmydata.test import common
3 from allmydata.monitor import Monitor
4 from allmydata import check_results
5 from allmydata.interfaces import NotEnoughSharesError
6 from allmydata.immutable import upload
7 from allmydata.util.consumer import download_to_data
8 from twisted.internet import defer
9 from twisted.trial import unittest
11 from allmydata.test.no_network import GridTestMixin
13 # We'll allow you to pass this test even if you trigger eighteen times as
14 # many disk reads and block fetches as would be optimal.
16 MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10
18 timeout=240 # François's ARM box timed out after 120 seconds of Verifier.test_corrupt_crypttext_hashtree
20 class RepairTestMixin:
21 def failUnlessIsInstance(self, x, xtype):
22 self.failUnless(isinstance(x, xtype), x)
24 def _count_reads(self):
25 sum_of_read_counts = 0
26 for (i, ss, storedir) in self.iterate_servers():
27 counters = ss.stats_provider.get_stats()['counters']
28 sum_of_read_counts += counters.get('storage_server.read', 0)
29 return sum_of_read_counts
31 def _count_allocates(self):
32 sum_of_allocate_counts = 0
33 for (i, ss, storedir) in self.iterate_servers():
34 counters = ss.stats_provider.get_stats()['counters']
35 sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
36 return sum_of_allocate_counts
38 def _count_writes(self):
39 sum_of_write_counts = 0
40 for (i, ss, storedir) in self.iterate_servers():
41 counters = ss.stats_provider.get_stats()['counters']
42 sum_of_write_counts += counters.get('storage_server.write', 0)
43 return sum_of_write_counts
45 def _stash_counts(self):
46 self.before_repair_reads = self._count_reads()
47 self.before_repair_allocates = self._count_allocates()
48 self.before_repair_writes = self._count_writes()
50 def _get_delta_counts(self):
51 delta_reads = self._count_reads() - self.before_repair_reads
52 delta_allocates = self._count_allocates() - self.before_repair_allocates
53 delta_writes = self._count_writes() - self.before_repair_writes
54 return (delta_reads, delta_allocates, delta_writes)
56 def failIfBigger(self, x, y):
57 self.failIf(x > y, "%s > %s" % (x, y))
59 def upload_and_stash(self):
60 c0 = self.g.clients[0]
61 c1 = self.g.clients[1]
62 c0.encoding_params['max_segment_size'] = 12
63 d = c0.upload(upload.Data(common.TEST_DATA, convergence=""))
65 self.uri = ur.get_uri()
66 self.c0_filenode = c0.create_node_from_uri(ur.get_uri())
67 self.c1_filenode = c1.create_node_from_uri(ur.get_uri())
68 d.addCallback(_stash_uri)
71 class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin):
72 def test_check_without_verify(self):
73 """Check says the file is healthy when none of the shares have been
74 touched. It says that the file is unhealthy when all of them have
75 been removed. It doesn't use any reads.
77 self.basedir = "repairer/Verifier/check_without_verify"
78 self.set_up_grid(num_clients=2)
79 d = self.upload_and_stash()
80 d.addCallback(lambda ignored: self._stash_counts())
81 d.addCallback(lambda ignored:
82 self.c0_filenode.check(Monitor(), verify=False))
84 self.failUnless(cr.is_healthy())
85 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
86 self.failIfBigger(delta_reads, 0)
89 def _remove_all(ignored):
90 for sh in self.find_uri_shares(self.uri):
92 d.addCallback(_remove_all)
94 d.addCallback(lambda ignored: self._stash_counts())
95 d.addCallback(lambda ignored:
96 self.c0_filenode.check(Monitor(), verify=False))
98 self.failIf(cr.is_healthy())
99 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
100 self.failIfBigger(delta_reads, 0)
101 d.addCallback(_check2)
104 def _help_test_verify(self, corruptor, judgement, shnum=0, debug=False):
105 self.set_up_grid(num_clients=2)
106 d = self.upload_and_stash()
107 d.addCallback(lambda ignored: self._stash_counts())
109 d.addCallback(lambda ignored:
110 self.corrupt_shares_numbered(self.uri, [shnum],corruptor,debug=debug))
111 d.addCallback(lambda ignored:
112 self.c1_filenode.check(Monitor(), verify=True))
114 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
115 self.failIfBigger(delta_reads, MAX_DELTA_READS)
118 except unittest.FailTest, e:
119 # FailTest just uses e.args[0] == str
120 new_arg = str(e.args[0]) + "\nvr.data is: " + str(vr.as_dict())
123 d.addCallback(_check)
126 def judge_no_problem(self, vr):
127 """ Verify says the file is healthy when none of the shares have been
128 touched in a way that matters. It doesn't use more than seven times
129 as many reads as it needs."""
130 self.failUnless(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict()))
131 self.failUnlessEqual(vr.get_share_counter_good(), 10)
132 self.failUnlessEqual(len(vr.get_sharemap()), 10)
133 self.failUnlessEqual(vr.get_encoding_needed(), 3)
134 self.failUnlessEqual(vr.get_encoding_expected(), 10)
135 self.failUnlessEqual(vr.get_host_counter_good_shares(), 10)
136 self.failUnlessEqual(len(vr.get_servers_responding()), 10)
137 self.failUnlessEqual(len(vr.get_corrupt_shares()), 0)
139 def test_ok_no_corruption(self):
140 self.basedir = "repairer/Verifier/ok_no_corruption"
141 return self._help_test_verify(common._corrupt_nothing,
142 self.judge_no_problem)
144 def test_ok_filedata_size(self):
145 self.basedir = "repairer/Verifier/ok_filedatasize"
146 return self._help_test_verify(common._corrupt_size_of_file_data,
147 self.judge_no_problem)
149 def test_ok_sharedata_size(self):
150 self.basedir = "repairer/Verifier/ok_sharedata_size"
151 return self._help_test_verify(common._corrupt_size_of_sharedata,
152 self.judge_no_problem)
154 def test_ok_segment_size(self):
155 self.basedir = "repairer/Verifier/test_ok_segment_size"
156 return self._help_test_verify(common._corrupt_segment_size,
157 self.judge_no_problem)
159 def judge_visible_corruption(self, vr):
160 """Corruption which is detected by the server means that the server
161 will send you back a Failure in response to get_bucket instead of
162 giving you the share data. Test that verifier handles these answers
163 correctly. It doesn't use more than seven times as many reads as it
165 self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict()))
166 self.failUnlessEqual(vr.get_share_counter_good(), 9)
167 self.failUnlessEqual(len(vr.get_sharemap()), 9)
168 self.failUnlessEqual(vr.get_encoding_needed(), 3)
169 self.failUnlessEqual(vr.get_encoding_expected(), 10)
170 self.failUnlessEqual(vr.get_host_counter_good_shares(), 9)
171 self.failUnlessEqual(len(vr.get_servers_responding()), 9)
172 self.failUnlessEqual(len(vr.get_corrupt_shares()), 0)
174 def test_corrupt_file_verno(self):
175 self.basedir = "repairer/Verifier/corrupt_file_verno"
176 return self._help_test_verify(common._corrupt_file_version_number,
177 self.judge_visible_corruption)
179 def judge_share_version_incompatibility(self, vr):
180 # corruption of the share version (inside the container, the 1/2
181 # value that determines whether we've got 4-byte offsets or 8-byte
182 # offsets) to something larger than 2 will trigger a
183 # ShareVersionIncompatible exception, which should be counted in
184 # list-incompatible-shares, rather than list-corrupt-shares.
185 self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict()))
186 self.failUnlessEqual(vr.get_share_counter_good(), 9)
187 self.failUnlessEqual(len(vr.get_sharemap()), 9)
188 self.failUnlessEqual(vr.get_encoding_needed(), 3)
189 self.failUnlessEqual(vr.get_encoding_expected(), 10)
190 self.failUnlessEqual(vr.get_host_counter_good_shares(), 9)
191 self.failUnlessEqual(len(vr.get_servers_responding()), 10)
192 self.failUnlessEqual(len(vr.get_corrupt_shares()), 0)
193 self.failUnlessEqual(len(vr.get_incompatible_shares()), 1)
195 def test_corrupt_share_verno(self):
196 self.basedir = "repairer/Verifier/corrupt_share_verno"
197 return self._help_test_verify(common._corrupt_sharedata_version_number,
198 self.judge_share_version_incompatibility)
200 def judge_invisible_corruption(self, vr):
201 # corruption of fields that the server does not check (which is most
202 # of them), which will be detected by the client as it downloads
204 self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict()))
205 self.failUnlessEqual(vr.get_share_counter_good(), 9)
206 self.failUnlessEqual(vr.get_encoding_needed(), 3)
207 self.failUnlessEqual(vr.get_encoding_expected(), 10)
208 self.failUnlessEqual(vr.get_host_counter_good_shares(), 9)
209 self.failUnlessEqual(len(vr.get_corrupt_shares()), 1)
210 self.failUnlessEqual(len(vr.get_incompatible_shares()), 0)
211 self.failUnlessEqual(len(vr.get_servers_responding()), 10)
212 self.failUnlessEqual(len(vr.get_sharemap()), 9)
214 def test_corrupt_sharedata_offset(self):
215 self.basedir = "repairer/Verifier/corrupt_sharedata_offset"
216 return self._help_test_verify(common._corrupt_offset_of_sharedata,
217 self.judge_invisible_corruption)
219 def test_corrupt_ueb_offset(self):
220 self.basedir = "repairer/Verifier/corrupt_ueb_offset"
221 return self._help_test_verify(common._corrupt_offset_of_uri_extension,
222 self.judge_invisible_corruption)
224 def test_corrupt_ueb_offset_shortread(self):
225 self.basedir = "repairer/Verifier/corrupt_ueb_offset_shortread"
226 return self._help_test_verify(common._corrupt_offset_of_uri_extension_to_force_short_read,
227 self.judge_invisible_corruption)
229 def test_corrupt_sharedata(self):
230 self.basedir = "repairer/Verifier/corrupt_sharedata"
231 return self._help_test_verify(common._corrupt_share_data,
232 self.judge_invisible_corruption)
234 def test_corrupt_sharedata_last_byte(self):
235 self.basedir = "repairer/Verifier/corrupt_sharedata_last_byte"
236 return self._help_test_verify(common._corrupt_share_data_last_byte,
237 self.judge_invisible_corruption)
239 def test_corrupt_ueb_length(self):
240 self.basedir = "repairer/Verifier/corrupt_ueb_length"
241 return self._help_test_verify(common._corrupt_length_of_uri_extension,
242 self.judge_invisible_corruption)
244 def test_corrupt_ueb(self):
245 self.basedir = "repairer/Verifier/corrupt_ueb"
246 return self._help_test_verify(common._corrupt_uri_extension,
247 self.judge_invisible_corruption)
249 def test_truncate_crypttext_hashtree(self):
250 # change the start of the block hashtree, to truncate the preceding
252 self.basedir = "repairer/Verifier/truncate_crypttext_hashtree"
253 return self._help_test_verify(common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes,
254 self.judge_invisible_corruption)
256 def test_corrupt_block_hashtree_offset(self):
257 self.basedir = "repairer/Verifier/corrupt_block_hashtree_offset"
258 return self._help_test_verify(common._corrupt_offset_of_block_hashes,
259 self.judge_invisible_corruption)
261 def test_wrong_share_verno(self):
262 self.basedir = "repairer/Verifier/wrong_share_verno"
263 return self._help_test_verify(common._corrupt_sharedata_version_number_to_plausible_version,
264 self.judge_invisible_corruption)
266 def test_corrupt_share_hashtree_offset(self):
267 self.basedir = "repairer/Verifier/corrupt_share_hashtree_offset"
268 return self._help_test_verify(common._corrupt_offset_of_share_hashes,
269 self.judge_invisible_corruption)
271 def test_corrupt_crypttext_hashtree_offset(self):
272 self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_offset"
273 return self._help_test_verify(common._corrupt_offset_of_ciphertext_hash_tree,
274 self.judge_invisible_corruption)
276 def test_corrupt_crypttext_hashtree(self):
277 self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree"
278 return self._help_test_verify(common._corrupt_crypttext_hash_tree,
279 self.judge_invisible_corruption)
281 def test_corrupt_crypttext_hashtree_byte_x221(self):
282 self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_byte_9_bit_7"
283 return self._help_test_verify(common._corrupt_crypttext_hash_tree_byte_x221,
284 self.judge_invisible_corruption, debug=True)
286 def test_corrupt_block_hashtree(self):
287 self.basedir = "repairer/Verifier/corrupt_block_hashtree"
288 return self._help_test_verify(common._corrupt_block_hashes,
289 self.judge_invisible_corruption)
291 def test_corrupt_share_hashtree(self):
292 self.basedir = "repairer/Verifier/corrupt_share_hashtree"
293 return self._help_test_verify(common._corrupt_share_hashes,
294 self.judge_invisible_corruption)
296 # TODO: the Verifier should decode to ciphertext and check it against the
297 # crypttext-hash-tree. Check this by constructing a bogus file, in which
298 # the crypttext-hash-tree is modified after encoding is done, but before
299 # the UEB is finalized. The Verifier should see a valid
300 # crypttext-hash-tree but then the ciphertext should show up as invalid.
301 # Normally this could only be triggered by a bug in FEC decode.
303 def OFF_test_each_byte(self):
304 # this test takes 140s to run on my laptop, and doesn't have any
305 # actual asserts, so it's commented out. It corrupts each byte of the
306 # share in sequence, and checks to see which ones the Verifier
307 # catches and which it misses. Ticket #819 contains details: there
308 # are several portions of the share that are unused, for which
309 # corruption is not supposed to be caught.
311 # If the test ran quickly, we could use the share size to compute the
312 # offsets of these unused portions and assert that everything outside
313 # of them was detected. We could then replace the rest of
314 # Verifier.test_* (which takes 16s to run on my laptop) with this
316 self.basedir = "repairer/Verifier/each_byte"
317 self.set_up_grid(num_clients=2)
318 d = self.upload_and_stash()
320 self.sh0_file = [sharefile
321 for (shnum, serverid, sharefile)
322 in self.find_uri_shares(self.uri)
324 self.sh0_orig = open(self.sh0_file, "rb").read()
325 d.addCallback(_grab_sh0)
327 f = open(self.sh0_file, "wb")
328 f.write(self.sh0_orig)
330 def _corrupt(ign, which):
331 def _corruptor(s, debug=False):
332 return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
333 self.corrupt_shares_numbered(self.uri, [0], _corruptor)
335 def _did_check(vr, i):
336 #print "corrupt %d: healthy=%s" % (i, vr.is_healthy())
337 results[i] = vr.is_healthy()
339 d = defer.succeed(None)
340 for i in range(len(self.sh0_orig)):
341 d.addCallback(_corrupt, i)
342 d.addCallback(lambda ign:
343 self.c1_filenode.check(Monitor(), verify=True))
344 d.addCallback(_did_check, i)
345 d.addCallback(_fix_sh0)
347 d.addCallback(_start)
348 def _show_results(ign):
349 f = open("test_each_byte_output", "w")
350 for i in sorted(results.keys()):
351 print >>f, "%d: %s" % (i, results[i])
353 print "Please look in _trial_temp/test_each_byte_output for results"
354 d.addCallback(_show_results)
357 # We'll allow you to pass this test even if you trigger thirty-five times as
358 # many block sends and disk writes as would be optimal.
360 # Optimally, you could repair one of these (small) files in a single write.
361 DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY
363 class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
364 common.ShouldFailMixin):
366 def test_harness(self):
367 # This test is actually to make sure our test harness works, rather
368 # than testing anything about Tahoe code itself.
370 self.basedir = "repairer/Repairer/test_code"
371 self.set_up_grid(num_clients=2)
372 d = self.upload_and_stash()
374 d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
375 def _stash_shares(oldshares):
376 self.oldshares = oldshares
377 d.addCallback(_stash_shares)
378 d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
379 def _compare(newshares):
380 self.failUnlessEqual(newshares, self.oldshares)
381 d.addCallback(_compare)
383 def _delete_8(ignored):
384 shnum = self.oldshares[0][0]
385 self.delete_shares_numbered(self.uri, [shnum])
386 for sh in self.oldshares[1:8]:
387 self.delete_share(sh)
388 d.addCallback(_delete_8)
389 d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
390 d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
392 d.addCallback(lambda ignored:
393 self.shouldFail(NotEnoughSharesError, "then_download",
395 download_to_data, self.c1_filenode))
397 d.addCallback(lambda ignored:
398 self.shouldFail(NotEnoughSharesError, "then_repair",
400 self.c1_filenode.check_and_repair,
401 Monitor(), verify=False))
403 # test share corruption
404 def _test_corrupt(ignored):
406 shares = self.find_uri_shares(self.uri)
407 for (shnum, serverid, sharefile) in shares:
408 olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
410 self.corrupt_share(sh, common._corrupt_uri_extension)
411 for (shnum, serverid, sharefile) in shares:
412 newdata = open(sharefile, "rb").read()
413 self.failIfEqual(olddata[ (shnum, serverid) ], newdata)
414 d.addCallback(_test_corrupt)
416 def _remove_all(ignored):
417 for sh in self.find_uri_shares(self.uri):
418 self.delete_share(sh)
419 d.addCallback(_remove_all)
420 d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
421 d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
425 def test_repair_from_deletion_of_1(self):
426 """ Repair replaces a share that got deleted. """
427 self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
428 self.set_up_grid(num_clients=2)
429 d = self.upload_and_stash()
431 d.addCallback(lambda ignored:
432 self.delete_shares_numbered(self.uri, [2]))
433 d.addCallback(lambda ignored: self._stash_counts())
434 d.addCallback(lambda ignored:
435 self.c0_filenode.check_and_repair(Monitor(),
437 def _check_results(crr):
438 self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
439 pre = crr.get_pre_repair_results()
440 self.failUnlessIsInstance(pre, check_results.CheckResults)
441 post = crr.get_post_repair_results()
442 self.failUnlessIsInstance(post, check_results.CheckResults)
443 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
444 self.failIfBigger(delta_reads, MAX_DELTA_READS)
445 self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE)
446 self.failIf(pre.is_healthy())
447 self.failUnless(post.is_healthy())
449 # Now we inspect the filesystem to make sure that it has 10
451 shares = self.find_uri_shares(self.uri)
452 self.failIf(len(shares) < 10)
453 d.addCallback(_check_results)
455 d.addCallback(lambda ignored:
456 self.c0_filenode.check(Monitor(), verify=True))
457 d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
459 # Now we delete seven of the other shares, then try to download the
460 # file and assert that it succeeds at downloading and has the right
461 # contents. This can't work unless it has already repaired the
462 # previously-deleted share #2.
464 d.addCallback(lambda ignored:
465 self.delete_shares_numbered(self.uri, range(3, 10+1)))
466 d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
467 d.addCallback(lambda newdata:
468 self.failUnlessEqual(newdata, common.TEST_DATA))
471 def test_repair_from_deletion_of_7(self):
472 """ Repair replaces seven shares that got deleted. """
473 self.basedir = "repairer/Repairer/repair_from_deletion_of_7"
474 self.set_up_grid(num_clients=2)
475 d = self.upload_and_stash()
476 d.addCallback(lambda ignored:
477 self.delete_shares_numbered(self.uri, range(7)))
478 d.addCallback(lambda ignored: self._stash_counts())
479 d.addCallback(lambda ignored:
480 self.c0_filenode.check_and_repair(Monitor(),
482 def _check_results(crr):
483 self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
484 pre = crr.get_pre_repair_results()
485 self.failUnlessIsInstance(pre, check_results.CheckResults)
486 post = crr.get_post_repair_results()
487 self.failUnlessIsInstance(post, check_results.CheckResults)
488 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
490 self.failIfBigger(delta_reads, MAX_DELTA_READS)
491 self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7))
492 self.failIf(pre.is_healthy())
493 self.failUnless(post.is_healthy(), post.as_dict())
495 # Make sure we really have 10 shares.
496 shares = self.find_uri_shares(self.uri)
497 self.failIf(len(shares) < 10)
498 d.addCallback(_check_results)
500 d.addCallback(lambda ignored:
501 self.c0_filenode.check(Monitor(), verify=True))
502 d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
504 # Now we delete seven of the other shares, then try to download the
505 # file and assert that it succeeds at downloading and has the right
506 # contents. This can't work unless it has already repaired the
507 # previously-deleted share #2.
509 d.addCallback(lambda ignored:
510 self.delete_shares_numbered(self.uri, range(3, 10+1)))
511 d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
512 d.addCallback(lambda newdata:
513 self.failUnlessEqual(newdata, common.TEST_DATA))
516 def test_repairer_servers_of_happiness(self):
517 # The repairer is supposed to generate and place as many of the
518 # missing shares as possible without caring about how they are
520 self.basedir = "repairer/Repairer/repairer_servers_of_happiness"
521 self.set_up_grid(num_clients=2, num_servers=10)
522 d = self.upload_and_stash()
523 # Now delete some servers. We want to leave 3 servers, which
524 # will allow us to restore the file to a healthy state without
525 # distributing the shares widely enough to satisfy the default
527 def _delete_some_servers(ignored):
529 self.g.remove_server(self.g.servers_by_number[i].my_nodeid)
531 assert len(self.g.servers_by_number) == 3
533 d.addCallback(_delete_some_servers)
534 # Now try to repair the file.
535 d.addCallback(lambda ignored:
536 self.c0_filenode.check_and_repair(Monitor(), verify=False))
537 def _check_results(crr):
538 self.failUnlessIsInstance(crr,
539 check_results.CheckAndRepairResults)
540 pre = crr.get_pre_repair_results()
541 post = crr.get_post_repair_results()
542 for p in (pre, post):
543 self.failUnlessIsInstance(p, check_results.CheckResults)
545 self.failIf(pre.is_healthy())
546 self.failUnless(post.is_healthy())
548 d.addCallback(_check_results)
551 # why is test_repair_from_corruption_of_1 disabled? Read on:
553 # As recently documented in NEWS.rst for the 1.3.0 release, the current
554 # immutable repairer suffers from several limitations:
556 # * minimalistic verifier: it's just download without decryption, so we
557 # don't look for corruption in N-k shares, and for many fields (those
558 # which are the same in all shares) we only look for corruption in a
561 # * some kinds of corruption cause download to fail (when it ought to
562 # just switch to a different share), so repair will fail on these too
564 # * RIStorageServer doesn't offer a way to delete old corrupt immutable
565 # shares (the authority model is not at all clear), so the best the
566 # repairer can do is to put replacement shares on new servers,
567 # unfortunately leaving the corrupt shares in place
569 # This test is pretty strenuous: it asserts that the repairer does the
570 # ideal thing in 8 distinct situations, with randomized corruption in
571 # each. Because of the aforementioned limitations, it is highly unlikely
572 # to pass any of these. We're also concerned that the download-fails case
573 # can provoke a lost-progress bug (one was fixed, but there might be more
574 # lurking), which will cause the test to fail despite a ".todo" marker,
575 # and will probably cause subsequent unrelated tests to fail too (due to
576 # "unclean reactor" problems).
578 # In addition, I (warner) have recently refactored the rest of this class
579 # to use the much-faster no_network.GridTestMixin, so this tests needs to
580 # be updated before it will be able to run again.
582 # So we're turning this test off until we've done one or more of the
584 # * remove some of these limitations
585 # * break the test up into smaller, more functionally-oriented pieces
586 # * simplify the repairer enough to let us be confident that it is free
587 # of lost-progress bugs
589 def OFF_test_repair_from_corruption_of_1(self):
590 d = defer.succeed(None)
592 d.addCallback(self.find_all_shares)
597 d.addCallback(_stash_it)
598 def _put_it_all_back(ignored):
599 self.replace_shares(stash[0], storage_index=self.uri.get_storage_index())
602 def _repair_from_corruption(shnum, corruptor_func):
603 before_repair_reads = self._count_reads()
604 before_repair_allocates = self._count_writes()
606 d2 = self.filenode.check_and_repair(Monitor(), verify=True)
607 def _after_repair(checkandrepairresults):
608 prerepairres = checkandrepairresults.get_pre_repair_results()
609 postrepairres = checkandrepairresults.get_post_repair_results()
610 after_repair_reads = self._count_reads()
611 after_repair_allocates = self._count_writes()
613 # The "* 2" in reads is because you might read a whole share
614 # before figuring out that it is corrupted. It might be
615 # possible to make this delta reads number a little tighter.
616 self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads))
617 # The "* 2" in writes is because each server has two shares,
618 # and it is reasonable for repairer to conclude that there
619 # are two shares that it should upload, if the server fails
620 # to serve the first share.
621 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 2), (after_repair_allocates, before_repair_allocates))
622 self.failIf(prerepairres.is_healthy(), (prerepairres.data, corruptor_func))
623 self.failUnless(postrepairres.is_healthy(), (postrepairres.data, corruptor_func))
625 # Now we inspect the filesystem to make sure that it has 10
627 shares = self.find_all_shares()
628 self.failIf(len(shares) < 10)
630 # Now we assert that the verifier reports the file as healthy.
631 d3 = self.filenode.check(Monitor(), verify=True)
632 def _after_verify(verifyresults):
633 self.failUnless(verifyresults.is_healthy())
634 d3.addCallback(_after_verify)
636 # Now we delete seven of the other shares, then try to
637 # download the file and assert that it succeeds at
638 # downloading and has the right contents. This can't work
639 # unless it has already repaired the previously-corrupted share.
640 def _then_delete_7_and_try_a_download(unused=None):
643 random.shuffle(shnums)
644 for sharenum in shnums[:7]:
645 self._delete_a_share(sharenum=sharenum)
647 return self._download_and_check_plaintext()
648 d3.addCallback(_then_delete_7_and_try_a_download)
651 d2.addCallback(_after_repair)
654 for corruptor_func in (
655 common._corrupt_file_version_number,
656 common._corrupt_sharedata_version_number,
657 common._corrupt_offset_of_sharedata,
658 common._corrupt_offset_of_uri_extension,
659 common._corrupt_offset_of_uri_extension_to_force_short_read,
660 common._corrupt_share_data,
661 common._corrupt_length_of_uri_extension,
662 common._corrupt_uri_extension,
664 # Now we corrupt a share...
665 d.addCallback(self._corrupt_a_random_share, corruptor_func)
667 d.addCallback(_repair_from_corruption, corruptor_func)
670 #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
672 def test_tiny_reads(self):
673 # ticket #1223 points out three problems:
674 # repairer reads beyond end of input file
675 # new-downloader does not tolerate overreads
676 # uploader does lots of tiny reads, inefficient
677 self.basedir = "repairer/Repairer/test_tiny_reads"
679 c0 = self.g.clients[0]
681 c0.encoding_params['k'] = 22
682 c0.encoding_params['n'] = 66
683 d = c0.upload(upload.Data(DATA, convergence=""))
685 self.uri = ur.get_uri()
686 self.delete_shares_numbered(self.uri, [0])
687 self.c0_filenode = c0.create_node_from_uri(ur.get_uri())
689 return self.c0_filenode.check_and_repair(Monitor())
692 (r,a,w) = self._get_delta_counts()
693 # when the uploader (driven by the repairer) does full-segment
694 # reads, this makes 44 server read calls (2*k). Before, when it
695 # was doing input_chunk_size reads (7 bytes), it was doing over
697 self.failIf(r > 100, "too many reads: %d>100" % r)
698 d.addCallback(_check)
701 def test_servers_responding(self):
702 self.basedir = "repairer/Repairer/servers_responding"
703 self.set_up_grid(num_clients=2)
704 d = self.upload_and_stash()
705 # now cause one of the servers to not respond during the pre-repair
706 # filecheck, but then *do* respond to the post-repair filecheck
708 ss = self.g.servers_by_number[0]
709 self.g.break_server(ss.my_nodeid, count=1)
710 self.delete_shares_numbered(self.uri, [9])
711 return self.c0_filenode.check_and_repair(Monitor())
714 # this exercises a bug in which the servers-responding list did
715 # not include servers that responded to the Repair, but which did
716 # not respond to the pre-repair filecheck
717 prr = rr.get_post_repair_results()
718 expected = set(self.g.get_all_serverids())
719 responding_set = frozenset([s.get_serverid() for s in prr.get_servers_responding()])
720 self.failIf(expected - responding_set, expected - responding_set)
721 self.failIf(responding_set - expected, responding_set - expected)
722 self.failUnlessEqual(expected,
723 set([s.get_serverid()
724 for s in prr.get_servers_responding()]))
725 d.addCallback(_check)
728 # XXX extend these tests to show that the checker detects which specific
729 # share on which specific server is broken -- this is necessary so that the
730 # checker results can be passed to the repairer and the repairer can go ahead
731 # and upload fixes without first doing what is effectively a check (/verify)
734 # XXX extend these tests to show bad behavior of various kinds from servers:
735 # raising exception from each remove_foo() method, for example
737 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
739 # XXX test corruption that truncates other hash trees than just the crypttext
742 # XXX test the notify-someone-about-corruption feature (also implement that
745 # XXX test whether repairer (downloader) correctly downloads a file even if
746 # to do so it has to acquire shares from a server that has already tried to
747 # serve it a corrupted share. (I don't think the current downloader would
748 # pass this test, depending on the kind of corruption.)