1 # -*- coding: utf-8 -*-
2 from allmydata.test import common
3 from allmydata.monitor import Monitor
4 from allmydata import check_results
5 from allmydata.interfaces import NotEnoughSharesError
6 from allmydata.immutable import upload
7 from allmydata.util.consumer import download_to_data
8 from twisted.internet import defer
9 from twisted.trial import unittest
11 from allmydata.test.no_network import GridTestMixin
13 # We'll allow you to pass this test even if you trigger eighteen times as
14 # many disk reads and block fetches as would be optimal.
16 MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10
18 timeout=240 # François's ARM box timed out after 120 seconds of Verifier.test_corrupt_crypttext_hashtree
20 class RepairTestMixin:
21 def failUnlessIsInstance(self, x, xtype):
22 self.failUnless(isinstance(x, xtype), x)
24 def _count_reads(self):
25 sum_of_read_counts = 0
26 for (i, ss, storedir) in self.iterate_servers():
27 counters = ss.stats_provider.get_stats()['counters']
28 sum_of_read_counts += counters.get('storage_server.read', 0)
29 return sum_of_read_counts
31 def _count_allocates(self):
32 sum_of_allocate_counts = 0
33 for (i, ss, storedir) in self.iterate_servers():
34 counters = ss.stats_provider.get_stats()['counters']
35 sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
36 return sum_of_allocate_counts
38 def _count_writes(self):
39 sum_of_write_counts = 0
40 for (i, ss, storedir) in self.iterate_servers():
41 counters = ss.stats_provider.get_stats()['counters']
42 sum_of_write_counts += counters.get('storage_server.write', 0)
43 return sum_of_write_counts
45 def _stash_counts(self):
46 self.before_repair_reads = self._count_reads()
47 self.before_repair_allocates = self._count_allocates()
48 self.before_repair_writes = self._count_writes()
50 def _get_delta_counts(self):
51 delta_reads = self._count_reads() - self.before_repair_reads
52 delta_allocates = self._count_allocates() - self.before_repair_allocates
53 delta_writes = self._count_writes() - self.before_repair_writes
54 return (delta_reads, delta_allocates, delta_writes)
56 def failIfBigger(self, x, y):
57 self.failIf(x > y, "%s > %s" % (x, y))
59 def upload_and_stash(self):
60 c0 = self.g.clients[0]
61 c1 = self.g.clients[1]
62 c0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
63 d = c0.upload(upload.Data(common.TEST_DATA, convergence=""))
65 self.uri = ur.get_uri()
66 self.c0_filenode = c0.create_node_from_uri(ur.get_uri())
67 self.c1_filenode = c1.create_node_from_uri(ur.get_uri())
68 d.addCallback(_stash_uri)
71 class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin):
72 def test_check_without_verify(self):
73 """Check says the file is healthy when none of the shares have been
74 touched. It says that the file is unhealthy when all of them have
75 been removed. It doesn't use any reads.
77 self.basedir = "repairer/Verifier/check_without_verify"
78 self.set_up_grid(num_clients=2)
79 d = self.upload_and_stash()
80 d.addCallback(lambda ignored: self._stash_counts())
81 d.addCallback(lambda ignored:
82 self.c0_filenode.check(Monitor(), verify=False))
84 self.failUnless(cr.is_healthy())
85 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
86 self.failIfBigger(delta_reads, 0)
89 def _remove_all(ignored):
90 for sh in self.find_uri_shares(self.uri):
92 d.addCallback(_remove_all)
94 d.addCallback(lambda ignored: self._stash_counts())
95 d.addCallback(lambda ignored:
96 self.c0_filenode.check(Monitor(), verify=False))
98 self.failIf(cr.is_healthy())
99 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
100 self.failIfBigger(delta_reads, 0)
101 d.addCallback(_check2)
104 def _help_test_verify(self, corruptor, judgement, shnum=0, debug=False):
105 self.set_up_grid(num_clients=2)
106 d = self.upload_and_stash()
107 d.addCallback(lambda ignored: self._stash_counts())
109 d.addCallback(lambda ignored:
110 self.corrupt_shares_numbered(self.uri, [shnum],corruptor,debug=debug))
111 d.addCallback(lambda ignored:
112 self.c1_filenode.check(Monitor(), verify=True))
114 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
115 self.failIfBigger(delta_reads, MAX_DELTA_READS)
118 except unittest.FailTest, e:
119 # FailTest just uses e.args[0] == str
120 new_arg = str(e.args[0]) + "\nvr.data is: " + str(vr.get_data())
123 d.addCallback(_check)
126 def judge_no_problem(self, vr):
127 """ Verify says the file is healthy when none of the shares have been
128 touched in a way that matters. It doesn't use more than seven times
129 as many reads as it needs."""
130 self.failUnless(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
132 self.failUnless(data['count-shares-good'] == 10, data)
133 self.failUnless(len(data['sharemap']) == 10, data)
134 self.failUnless(data['count-shares-needed'] == 3, data)
135 self.failUnless(data['count-shares-expected'] == 10, data)
136 self.failUnless(data['count-good-share-hosts'] == 10, data)
137 self.failUnless(len(data['servers-responding']) == 10, data)
138 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
140 def test_ok_no_corruption(self):
141 self.basedir = "repairer/Verifier/ok_no_corruption"
142 return self._help_test_verify(common._corrupt_nothing,
143 self.judge_no_problem)
145 def test_ok_filedata_size(self):
146 self.basedir = "repairer/Verifier/ok_filedatasize"
147 return self._help_test_verify(common._corrupt_size_of_file_data,
148 self.judge_no_problem)
150 def test_ok_sharedata_size(self):
151 self.basedir = "repairer/Verifier/ok_sharedata_size"
152 return self._help_test_verify(common._corrupt_size_of_sharedata,
153 self.judge_no_problem)
155 def test_ok_segment_size(self):
156 self.basedir = "repairer/Verifier/test_ok_segment_size"
157 return self._help_test_verify(common._corrupt_segment_size,
158 self.judge_no_problem)
160 def judge_visible_corruption(self, vr):
161 """Corruption which is detected by the server means that the server
162 will send you back a Failure in response to get_bucket instead of
163 giving you the share data. Test that verifier handles these answers
164 correctly. It doesn't use more than seven times as many reads as it
166 self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
168 self.failUnless(data['count-shares-good'] == 9, data)
169 self.failUnless(len(data['sharemap']) == 9, data)
170 self.failUnless(data['count-shares-needed'] == 3, data)
171 self.failUnless(data['count-shares-expected'] == 10, data)
172 self.failUnless(data['count-good-share-hosts'] == 9, data)
173 self.failUnless(len(data['servers-responding']) == 9, data)
174 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
176 def test_corrupt_file_verno(self):
177 self.basedir = "repairer/Verifier/corrupt_file_verno"
178 return self._help_test_verify(common._corrupt_file_version_number,
179 self.judge_visible_corruption)
181 def judge_share_version_incompatibility(self, vr):
182 # corruption of the share version (inside the container, the 1/2
183 # value that determines whether we've got 4-byte offsets or 8-byte
184 # offsets) to something larger than 2 will trigger a
185 # ShareVersionIncompatible exception, which should be counted in
186 # list-incompatible-shares, rather than list-corrupt-shares.
187 self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
189 self.failUnlessEqual(data['count-shares-good'], 9)
190 self.failUnlessEqual(len(data['sharemap']), 9)
191 self.failUnlessEqual(data['count-shares-needed'], 3)
192 self.failUnlessEqual(data['count-shares-expected'], 10)
193 self.failUnlessEqual(data['count-good-share-hosts'], 9)
194 self.failUnlessEqual(len(data['servers-responding']), 10)
195 self.failUnlessEqual(len(data['list-corrupt-shares']), 0)
196 self.failUnlessEqual(data['count-corrupt-shares'], 0)
197 self.failUnlessEqual(len(data['list-incompatible-shares']), 1)
198 self.failUnlessEqual(data['count-incompatible-shares'], 1)
200 def test_corrupt_share_verno(self):
201 self.basedir = "repairer/Verifier/corrupt_share_verno"
202 return self._help_test_verify(common._corrupt_sharedata_version_number,
203 self.judge_share_version_incompatibility)
205 def judge_invisible_corruption(self, vr):
206 # corruption of fields that the server does not check (which is most
207 # of them), which will be detected by the client as it downloads
209 self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
211 self.failUnlessEqual(data['count-shares-good'], 9)
212 self.failUnlessEqual(data['count-shares-needed'], 3)
213 self.failUnlessEqual(data['count-shares-expected'], 10)
214 self.failUnlessEqual(data['count-good-share-hosts'], 9)
215 self.failUnlessEqual(data['count-corrupt-shares'], 1)
216 self.failUnlessEqual(len(data['list-corrupt-shares']), 1)
217 self.failUnlessEqual(data['count-incompatible-shares'], 0)
218 self.failUnlessEqual(len(data['list-incompatible-shares']), 0)
219 self.failUnlessEqual(len(data['servers-responding']), 10)
220 self.failUnlessEqual(len(data['sharemap']), 9)
222 def test_corrupt_sharedata_offset(self):
223 self.basedir = "repairer/Verifier/corrupt_sharedata_offset"
224 return self._help_test_verify(common._corrupt_offset_of_sharedata,
225 self.judge_invisible_corruption)
227 def test_corrupt_ueb_offset(self):
228 self.basedir = "repairer/Verifier/corrupt_ueb_offset"
229 return self._help_test_verify(common._corrupt_offset_of_uri_extension,
230 self.judge_invisible_corruption)
232 def test_corrupt_ueb_offset_shortread(self):
233 self.basedir = "repairer/Verifier/corrupt_ueb_offset_shortread"
234 return self._help_test_verify(common._corrupt_offset_of_uri_extension_to_force_short_read,
235 self.judge_invisible_corruption)
237 def test_corrupt_sharedata(self):
238 self.basedir = "repairer/Verifier/corrupt_sharedata"
239 return self._help_test_verify(common._corrupt_share_data,
240 self.judge_invisible_corruption)
242 def test_corrupt_sharedata_last_byte(self):
243 self.basedir = "repairer/Verifier/corrupt_sharedata_last_byte"
244 return self._help_test_verify(common._corrupt_share_data_last_byte,
245 self.judge_invisible_corruption)
247 def test_corrupt_ueb_length(self):
248 self.basedir = "repairer/Verifier/corrupt_ueb_length"
249 return self._help_test_verify(common._corrupt_length_of_uri_extension,
250 self.judge_invisible_corruption)
252 def test_corrupt_ueb(self):
253 self.basedir = "repairer/Verifier/corrupt_ueb"
254 return self._help_test_verify(common._corrupt_uri_extension,
255 self.judge_invisible_corruption)
257 def test_truncate_crypttext_hashtree(self):
258 # change the start of the block hashtree, to truncate the preceding
260 self.basedir = "repairer/Verifier/truncate_crypttext_hashtree"
261 return self._help_test_verify(common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes,
262 self.judge_invisible_corruption)
264 def test_corrupt_block_hashtree_offset(self):
265 self.basedir = "repairer/Verifier/corrupt_block_hashtree_offset"
266 return self._help_test_verify(common._corrupt_offset_of_block_hashes,
267 self.judge_invisible_corruption)
269 def test_wrong_share_verno(self):
270 self.basedir = "repairer/Verifier/wrong_share_verno"
271 return self._help_test_verify(common._corrupt_sharedata_version_number_to_plausible_version,
272 self.judge_invisible_corruption)
274 def test_corrupt_share_hashtree_offset(self):
275 self.basedir = "repairer/Verifier/corrupt_share_hashtree_offset"
276 return self._help_test_verify(common._corrupt_offset_of_share_hashes,
277 self.judge_invisible_corruption)
279 def test_corrupt_crypttext_hashtree_offset(self):
280 self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_offset"
281 return self._help_test_verify(common._corrupt_offset_of_ciphertext_hash_tree,
282 self.judge_invisible_corruption)
284 def test_corrupt_crypttext_hashtree(self):
285 self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree"
286 return self._help_test_verify(common._corrupt_crypttext_hash_tree,
287 self.judge_invisible_corruption)
289 def test_corrupt_crypttext_hashtree_byte_x221(self):
290 self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_byte_9_bit_7"
291 return self._help_test_verify(common._corrupt_crypttext_hash_tree_byte_x221,
292 self.judge_invisible_corruption, debug=True)
294 def test_corrupt_block_hashtree(self):
295 self.basedir = "repairer/Verifier/corrupt_block_hashtree"
296 return self._help_test_verify(common._corrupt_block_hashes,
297 self.judge_invisible_corruption)
299 def test_corrupt_share_hashtree(self):
300 self.basedir = "repairer/Verifier/corrupt_share_hashtree"
301 return self._help_test_verify(common._corrupt_share_hashes,
302 self.judge_invisible_corruption)
304 # TODO: the Verifier should decode to ciphertext and check it against the
305 # crypttext-hash-tree. Check this by constructing a bogus file, in which
306 # the crypttext-hash-tree is modified after encoding is done, but before
307 # the UEB is finalized. The Verifier should see a valid
308 # crypttext-hash-tree but then the ciphertext should show up as invalid.
309 # Normally this could only be triggered by a bug in FEC decode.
311 def OFF_test_each_byte(self):
312 # this test takes 140s to run on my laptop, and doesn't have any
313 # actual asserts, so it's commented out. It corrupts each byte of the
314 # share in sequence, and checks to see which ones the Verifier
315 # catches and which it misses. Ticket #819 contains details: there
316 # are several portions of the share that are unused, for which
317 # corruption is not supposed to be caught.
319 # If the test ran quickly, we could use the share size to compute the
320 # offsets of these unused portions and assert that everything outside
321 # of them was detected. We could then replace the rest of
322 # Verifier.test_* (which takes 16s to run on my laptop) with this
324 self.basedir = "repairer/Verifier/each_byte"
325 self.set_up_grid(num_clients=2)
326 d = self.upload_and_stash()
328 self.sh0_file = [sharefile
329 for (shnum, serverid, sharefile)
330 in self.find_uri_shares(self.uri)
332 self.sh0_orig = open(self.sh0_file, "rb").read()
333 d.addCallback(_grab_sh0)
335 f = open(self.sh0_file, "wb")
336 f.write(self.sh0_orig)
338 def _corrupt(ign, which):
339 def _corruptor(s, debug=False):
340 return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
341 self.corrupt_shares_numbered(self.uri, [0], _corruptor)
343 def _did_check(vr, i):
344 #print "corrupt %d: healthy=%s" % (i, vr.is_healthy())
345 results[i] = vr.is_healthy()
347 d = defer.succeed(None)
348 for i in range(len(self.sh0_orig)):
349 d.addCallback(_corrupt, i)
350 d.addCallback(lambda ign:
351 self.c1_filenode.check(Monitor(), verify=True))
352 d.addCallback(_did_check, i)
353 d.addCallback(_fix_sh0)
355 d.addCallback(_start)
356 def _show_results(ign):
357 f = open("test_each_byte_output", "w")
358 for i in sorted(results.keys()):
359 print >>f, "%d: %s" % (i, results[i])
361 print "Please look in _trial_temp/test_each_byte_output for results"
362 d.addCallback(_show_results)
365 # We'll allow you to pass this test even if you trigger thirty-five times as
366 # many block sends and disk writes as would be optimal.
368 # Optimally, you could repair one of these (small) files in a single write.
369 DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY
371 class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
372 common.ShouldFailMixin):
374 def test_harness(self):
375 # This test is actually to make sure our test harness works, rather
376 # than testing anything about Tahoe code itself.
378 self.basedir = "repairer/Repairer/test_code"
379 self.set_up_grid(num_clients=2)
380 d = self.upload_and_stash()
382 d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
383 def _stash_shares(oldshares):
384 self.oldshares = oldshares
385 d.addCallback(_stash_shares)
386 d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
387 def _compare(newshares):
388 self.failUnlessEqual(newshares, self.oldshares)
389 d.addCallback(_compare)
391 def _delete_8(ignored):
392 shnum = self.oldshares[0][0]
393 self.delete_shares_numbered(self.uri, [shnum])
394 for sh in self.oldshares[1:8]:
395 self.delete_share(sh)
396 d.addCallback(_delete_8)
397 d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
398 d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
400 d.addCallback(lambda ignored:
401 self.shouldFail(NotEnoughSharesError, "then_download",
403 download_to_data, self.c1_filenode))
405 d.addCallback(lambda ignored:
406 self.shouldFail(NotEnoughSharesError, "then_repair",
408 self.c1_filenode.check_and_repair,
409 Monitor(), verify=False))
411 # test share corruption
412 def _test_corrupt(ignored):
414 shares = self.find_uri_shares(self.uri)
415 for (shnum, serverid, sharefile) in shares:
416 olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
418 self.corrupt_share(sh, common._corrupt_uri_extension)
419 for (shnum, serverid, sharefile) in shares:
420 newdata = open(sharefile, "rb").read()
421 self.failIfEqual(olddata[ (shnum, serverid) ], newdata)
422 d.addCallback(_test_corrupt)
424 def _remove_all(ignored):
425 for sh in self.find_uri_shares(self.uri):
426 self.delete_share(sh)
427 d.addCallback(_remove_all)
428 d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
429 d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
433 def test_repair_from_deletion_of_1(self):
434 """ Repair replaces a share that got deleted. """
435 self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
436 self.set_up_grid(num_clients=2)
437 d = self.upload_and_stash()
439 d.addCallback(lambda ignored:
440 self.delete_shares_numbered(self.uri, [2]))
441 d.addCallback(lambda ignored: self._stash_counts())
442 d.addCallback(lambda ignored:
443 self.c0_filenode.check_and_repair(Monitor(),
445 def _check_results(crr):
446 self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
447 pre = crr.get_pre_repair_results()
448 self.failUnlessIsInstance(pre, check_results.CheckResults)
449 post = crr.get_post_repair_results()
450 self.failUnlessIsInstance(post, check_results.CheckResults)
451 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
452 self.failIfBigger(delta_reads, MAX_DELTA_READS)
453 self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE)
454 self.failIf(pre.is_healthy())
455 self.failUnless(post.is_healthy())
457 # Now we inspect the filesystem to make sure that it has 10
459 shares = self.find_uri_shares(self.uri)
460 self.failIf(len(shares) < 10)
461 d.addCallback(_check_results)
463 d.addCallback(lambda ignored:
464 self.c0_filenode.check(Monitor(), verify=True))
465 d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
467 # Now we delete seven of the other shares, then try to download the
468 # file and assert that it succeeds at downloading and has the right
469 # contents. This can't work unless it has already repaired the
470 # previously-deleted share #2.
472 d.addCallback(lambda ignored:
473 self.delete_shares_numbered(self.uri, range(3, 10+1)))
474 d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
475 d.addCallback(lambda newdata:
476 self.failUnlessEqual(newdata, common.TEST_DATA))
479 def test_repair_from_deletion_of_7(self):
480 """ Repair replaces seven shares that got deleted. """
481 self.basedir = "repairer/Repairer/repair_from_deletion_of_7"
482 self.set_up_grid(num_clients=2)
483 d = self.upload_and_stash()
484 d.addCallback(lambda ignored:
485 self.delete_shares_numbered(self.uri, range(7)))
486 d.addCallback(lambda ignored: self._stash_counts())
487 d.addCallback(lambda ignored:
488 self.c0_filenode.check_and_repair(Monitor(),
490 def _check_results(crr):
491 self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
492 pre = crr.get_pre_repair_results()
493 self.failUnlessIsInstance(pre, check_results.CheckResults)
494 post = crr.get_post_repair_results()
495 self.failUnlessIsInstance(post, check_results.CheckResults)
496 delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
498 self.failIfBigger(delta_reads, MAX_DELTA_READS)
499 self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7))
500 self.failIf(pre.is_healthy())
501 self.failUnless(post.is_healthy(), post.data)
503 # Make sure we really have 10 shares.
504 shares = self.find_uri_shares(self.uri)
505 self.failIf(len(shares) < 10)
506 d.addCallback(_check_results)
508 d.addCallback(lambda ignored:
509 self.c0_filenode.check(Monitor(), verify=True))
510 d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
512 # Now we delete seven of the other shares, then try to download the
513 # file and assert that it succeeds at downloading and has the right
514 # contents. This can't work unless it has already repaired the
515 # previously-deleted share #2.
517 d.addCallback(lambda ignored:
518 self.delete_shares_numbered(self.uri, range(3, 10+1)))
519 d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
520 d.addCallback(lambda newdata:
521 self.failUnlessEqual(newdata, common.TEST_DATA))
524 def test_repairer_servers_of_happiness(self):
525 # The repairer is supposed to generate and place as many of the
526 # missing shares as possible without caring about how they are
528 self.basedir = "repairer/Repairer/repairer_servers_of_happiness"
529 self.set_up_grid(num_clients=2, num_servers=10)
530 d = self.upload_and_stash()
531 # Now delete some servers. We want to leave 3 servers, which
532 # will allow us to restore the file to a healthy state without
533 # distributing the shares widely enough to satisfy the default
535 def _delete_some_servers(ignored):
537 self.g.remove_server(self.g.servers_by_number[i].my_nodeid)
539 assert len(self.g.servers_by_number) == 3
541 d.addCallback(_delete_some_servers)
542 # Now try to repair the file.
543 d.addCallback(lambda ignored:
544 self.c0_filenode.check_and_repair(Monitor(), verify=False))
545 def _check_results(crr):
546 self.failUnlessIsInstance(crr,
547 check_results.CheckAndRepairResults)
548 pre = crr.get_pre_repair_results()
549 post = crr.get_post_repair_results()
550 for p in (pre, post):
551 self.failUnlessIsInstance(p, check_results.CheckResults)
553 self.failIf(pre.is_healthy())
554 self.failUnless(post.is_healthy())
556 d.addCallback(_check_results)
559 # why is test_repair_from_corruption_of_1 disabled? Read on:
561 # As recently documented in NEWS.rst for the 1.3.0 release, the current
562 # immutable repairer suffers from several limitations:
564 # * minimalistic verifier: it's just download without decryption, so we
565 # don't look for corruption in N-k shares, and for many fields (those
566 # which are the same in all shares) we only look for corruption in a
569 # * some kinds of corruption cause download to fail (when it ought to
570 # just switch to a different share), so repair will fail on these too
572 # * RIStorageServer doesn't offer a way to delete old corrupt immutable
573 # shares (the authority model is not at all clear), so the best the
574 # repairer can do is to put replacement shares on new servers,
575 # unfortunately leaving the corrupt shares in place
577 # This test is pretty strenuous: it asserts that the repairer does the
578 # ideal thing in 8 distinct situations, with randomized corruption in
579 # each. Because of the aforementioned limitations, it is highly unlikely
580 # to pass any of these. We're also concerned that the download-fails case
581 # can provoke a lost-progress bug (one was fixed, but there might be more
582 # lurking), which will cause the test to fail despite a ".todo" marker,
583 # and will probably cause subsequent unrelated tests to fail too (due to
584 # "unclean reactor" problems).
586 # In addition, I (warner) have recently refactored the rest of this class
587 # to use the much-faster no_network.GridTestMixin, so this tests needs to
588 # be updated before it will be able to run again.
590 # So we're turning this test off until we've done one or more of the
592 # * remove some of these limitations
593 # * break the test up into smaller, more functionally-oriented pieces
594 # * simplify the repairer enough to let us be confident that it is free
595 # of lost-progress bugs
597 def OFF_test_repair_from_corruption_of_1(self):
598 d = defer.succeed(None)
600 d.addCallback(self.find_all_shares)
605 d.addCallback(_stash_it)
606 def _put_it_all_back(ignored):
607 self.replace_shares(stash[0], storage_index=self.uri.get_storage_index())
610 def _repair_from_corruption(shnum, corruptor_func):
611 before_repair_reads = self._count_reads()
612 before_repair_allocates = self._count_writes()
614 d2 = self.filenode.check_and_repair(Monitor(), verify=True)
615 def _after_repair(checkandrepairresults):
616 prerepairres = checkandrepairresults.get_pre_repair_results()
617 postrepairres = checkandrepairresults.get_post_repair_results()
618 after_repair_reads = self._count_reads()
619 after_repair_allocates = self._count_writes()
621 # The "* 2" in reads is because you might read a whole share
622 # before figuring out that it is corrupted. It might be
623 # possible to make this delta reads number a little tighter.
624 self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads))
625 # The "* 2" in writes is because each server has two shares,
626 # and it is reasonable for repairer to conclude that there
627 # are two shares that it should upload, if the server fails
628 # to serve the first share.
629 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 2), (after_repair_allocates, before_repair_allocates))
630 self.failIf(prerepairres.is_healthy(), (prerepairres.data, corruptor_func))
631 self.failUnless(postrepairres.is_healthy(), (postrepairres.data, corruptor_func))
633 # Now we inspect the filesystem to make sure that it has 10
635 shares = self.find_all_shares()
636 self.failIf(len(shares) < 10)
638 # Now we assert that the verifier reports the file as healthy.
639 d3 = self.filenode.check(Monitor(), verify=True)
640 def _after_verify(verifyresults):
641 self.failUnless(verifyresults.is_healthy())
642 d3.addCallback(_after_verify)
644 # Now we delete seven of the other shares, then try to
645 # download the file and assert that it succeeds at
646 # downloading and has the right contents. This can't work
647 # unless it has already repaired the previously-corrupted share.
648 def _then_delete_7_and_try_a_download(unused=None):
651 random.shuffle(shnums)
652 for sharenum in shnums[:7]:
653 self._delete_a_share(sharenum=sharenum)
655 return self._download_and_check_plaintext()
656 d3.addCallback(_then_delete_7_and_try_a_download)
659 d2.addCallback(_after_repair)
662 for corruptor_func in (
663 common._corrupt_file_version_number,
664 common._corrupt_sharedata_version_number,
665 common._corrupt_offset_of_sharedata,
666 common._corrupt_offset_of_uri_extension,
667 common._corrupt_offset_of_uri_extension_to_force_short_read,
668 common._corrupt_share_data,
669 common._corrupt_length_of_uri_extension,
670 common._corrupt_uri_extension,
672 # Now we corrupt a share...
673 d.addCallback(self._corrupt_a_random_share, corruptor_func)
675 d.addCallback(_repair_from_corruption, corruptor_func)
678 #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
680 def test_tiny_reads(self):
681 # ticket #1223 points out three problems:
682 # repairer reads beyond end of input file
683 # new-downloader does not tolerate overreads
684 # uploader does lots of tiny reads, inefficient
685 self.basedir = "repairer/Repairer/test_tiny_reads"
687 c0 = self.g.clients[0]
689 c0.DEFAULT_ENCODING_PARAMETERS['k'] = 22
690 c0.DEFAULT_ENCODING_PARAMETERS['n'] = 66
691 d = c0.upload(upload.Data(DATA, convergence=""))
693 self.uri = ur.get_uri()
694 self.delete_shares_numbered(self.uri, [0])
695 self.c0_filenode = c0.create_node_from_uri(ur.get_uri())
697 return self.c0_filenode.check_and_repair(Monitor())
700 (r,a,w) = self._get_delta_counts()
701 # when the uploader (driven by the repairer) does full-segment
702 # reads, this makes 44 server read calls (2*k). Before, when it
703 # was doing input_chunk_size reads (7 bytes), it was doing over
705 self.failIf(r > 100, "too many reads: %d>100" % r)
706 d.addCallback(_check)
709 def test_servers_responding(self):
710 self.basedir = "repairer/Repairer/servers_responding"
711 self.set_up_grid(num_clients=2)
712 d = self.upload_and_stash()
713 # now cause one of the servers to not respond during the pre-repair
714 # filecheck, but then *do* respond to the post-repair filecheck
716 ss = self.g.servers_by_number[0]
717 self.g.break_server(ss.my_nodeid, count=1)
718 self.delete_shares_numbered(self.uri, [9])
719 return self.c0_filenode.check_and_repair(Monitor())
722 # this exercises a bug in which the servers-responding list did
723 # not include servers that responded to the Repair, but which did
724 # not respond to the pre-repair filecheck
725 prr = rr.get_post_repair_results()
726 expected = set(self.g.get_all_serverids())
727 self.failUnlessEqual(expected, set(prr.data["servers-responding"]))
728 d.addCallback(_check)
731 # XXX extend these tests to show that the checker detects which specific
732 # share on which specific server is broken -- this is necessary so that the
733 # checker results can be passed to the repairer and the repairer can go ahead
734 # and upload fixes without first doing what is effectively a check (/verify)
737 # XXX extend these tests to show bad behavior of various kinds from servers:
738 # raising exception from each remove_foo() method, for example
740 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
742 # XXX test corruption that truncates other hash trees than just the crypttext
745 # XXX test the notify-someone-about-corruption feature (also implement that
748 # XXX test whether repairer (downloader) correctly downloads a file even if
749 # to do so it has to acquire shares from a server that has already tried to
750 # serve it a corrupted share. (I don't think the current downloader would
751 # pass this test, depending on the kind of corruption.)