]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_repairer.py
7efb4a82a41ea34c04b409d53b6fdcc179a706e9
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_repairer.py
1 # -*- coding: utf-8 -*-
2 from allmydata.test import common
3 from allmydata.monitor import Monitor
4 from allmydata import check_results
5 from allmydata.interfaces import NotEnoughSharesError
6 from allmydata.immutable import upload
7 from allmydata.util.consumer import download_to_data
8 from twisted.internet import defer
9 from twisted.trial import unittest
10 import random
11 from allmydata.test.no_network import GridTestMixin
12
13 # We'll allow you to pass this test even if you trigger eighteen times as
14 # many disk reads and block fetches as would be optimal.
15 READ_LEEWAY = 18
16 MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10
17
18 timeout=240 # François's ARM box timed out after 120 seconds of Verifier.test_corrupt_crypttext_hashtree
19
20 class RepairTestMixin:
21     def failUnlessIsInstance(self, x, xtype):
22         self.failUnless(isinstance(x, xtype), x)
23
24     def _count_reads(self):
25         sum_of_read_counts = 0
26         for (i, ss, storedir) in self.iterate_servers():
27             counters = ss.stats_provider.get_stats()['counters']
28             sum_of_read_counts += counters.get('storage_server.read', 0)
29         return sum_of_read_counts
30
31     def _count_allocates(self):
32         sum_of_allocate_counts = 0
33         for (i, ss, storedir) in self.iterate_servers():
34             counters = ss.stats_provider.get_stats()['counters']
35             sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
36         return sum_of_allocate_counts
37
38     def _count_writes(self):
39         sum_of_write_counts = 0
40         for (i, ss, storedir) in self.iterate_servers():
41             counters = ss.stats_provider.get_stats()['counters']
42             sum_of_write_counts += counters.get('storage_server.write', 0)
43         return sum_of_write_counts
44
45     def _stash_counts(self):
46         self.before_repair_reads = self._count_reads()
47         self.before_repair_allocates = self._count_allocates()
48         self.before_repair_writes = self._count_writes()
49
50     def _get_delta_counts(self):
51         delta_reads = self._count_reads() - self.before_repair_reads
52         delta_allocates = self._count_allocates() - self.before_repair_allocates
53         delta_writes = self._count_writes() - self.before_repair_writes
54         return (delta_reads, delta_allocates, delta_writes)
55
56     def failIfBigger(self, x, y):
57         self.failIf(x > y, "%s > %s" % (x, y))
58
59     def upload_and_stash(self):
60         c0 = self.g.clients[0]
61         c1 = self.g.clients[1]
62         c0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
63         d = c0.upload(upload.Data(common.TEST_DATA, convergence=""))
64         def _stash_uri(ur):
65             self.uri = ur.uri
66             self.c0_filenode = c0.create_node_from_uri(ur.uri)
67             self.c1_filenode = c1.create_node_from_uri(ur.uri)
68         d.addCallback(_stash_uri)
69         return d
70
71 class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin):
72     def test_check_without_verify(self):
73         """Check says the file is healthy when none of the shares have been
74         touched. It says that the file is unhealthy when all of them have
75         been removed. It doesn't use any reads.
76         """
77         self.basedir = "repairer/Verifier/check_without_verify"
78         self.set_up_grid(num_clients=2)
79         d = self.upload_and_stash()
80         d.addCallback(lambda ignored: self._stash_counts())
81         d.addCallback(lambda ignored:
82                       self.c0_filenode.check(Monitor(), verify=False))
83         def _check(cr):
84             self.failUnless(cr.is_healthy())
85             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
86             self.failIfBigger(delta_reads, 0)
87         d.addCallback(_check)
88
89         def _remove_all(ignored):
90             for sh in self.find_uri_shares(self.uri):
91                 self.delete_share(sh)
92         d.addCallback(_remove_all)
93
94         d.addCallback(lambda ignored: self._stash_counts())
95         d.addCallback(lambda ignored:
96                       self.c0_filenode.check(Monitor(), verify=False))
97         def _check2(cr):
98             self.failIf(cr.is_healthy())
99             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
100             self.failIfBigger(delta_reads, 0)
101         d.addCallback(_check2)
102         return d
103
104     def _help_test_verify(self, corruptor, judgement, shnum=0, debug=False):
105         self.set_up_grid(num_clients=2)
106         d = self.upload_and_stash()
107         d.addCallback(lambda ignored: self._stash_counts())
108
109         d.addCallback(lambda ignored:
110                       self.corrupt_shares_numbered(self.uri, [shnum],corruptor,debug=debug))
111         d.addCallback(lambda ignored:
112                       self.c1_filenode.check(Monitor(), verify=True))
113         def _check(vr):
114             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
115             self.failIfBigger(delta_reads, MAX_DELTA_READS)
116             try:
117                 judgement(vr)
118             except unittest.FailTest, e:
119                 # FailTest just uses e.args[0] == str
120                 new_arg = str(e.args[0]) + "\nvr.data is: " + str(vr.get_data())
121                 e.args = (new_arg,)
122                 raise
123         d.addCallback(_check)
124         return d
125
126     def judge_no_problem(self, vr):
127         """ Verify says the file is healthy when none of the shares have been
128         touched in a way that matters. It doesn't use more than seven times
129         as many reads as it needs."""
130         self.failUnless(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
131         data = vr.get_data()
132         self.failUnless(data['count-shares-good'] == 10, data)
133         self.failUnless(len(data['sharemap']) == 10, data)
134         self.failUnless(data['count-shares-needed'] == 3, data)
135         self.failUnless(data['count-shares-expected'] == 10, data)
136         self.failUnless(data['count-good-share-hosts'] == 10, data)
137         self.failUnless(len(data['servers-responding']) == 10, data)
138         self.failUnless(len(data['list-corrupt-shares']) == 0, data)
139
140     def test_ok_no_corruption(self):
141         self.basedir = "repairer/Verifier/ok_no_corruption"
142         return self._help_test_verify(common._corrupt_nothing,
143                                       self.judge_no_problem)
144
145     def test_ok_filedata_size(self):
146         self.basedir = "repairer/Verifier/ok_filedatasize"
147         return self._help_test_verify(common._corrupt_size_of_file_data,
148                                       self.judge_no_problem)
149
150     def test_ok_sharedata_size(self):
151         self.basedir = "repairer/Verifier/ok_sharedata_size"
152         return self._help_test_verify(common._corrupt_size_of_sharedata,
153                                       self.judge_no_problem)
154
155     def test_ok_segment_size(self):
156         self.basedir = "repairer/Verifier/test_ok_segment_size"
157         return self._help_test_verify(common._corrupt_segment_size,
158                                       self.judge_no_problem)
159
160     def judge_visible_corruption(self, vr):
161         """Corruption which is detected by the server means that the server
162         will send you back a Failure in response to get_bucket instead of
163         giving you the share data. Test that verifier handles these answers
164         correctly. It doesn't use more than seven times as many reads as it
165         needs."""
166         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
167         data = vr.get_data()
168         self.failUnless(data['count-shares-good'] == 9, data)
169         self.failUnless(len(data['sharemap']) == 9, data)
170         self.failUnless(data['count-shares-needed'] == 3, data)
171         self.failUnless(data['count-shares-expected'] == 10, data)
172         self.failUnless(data['count-good-share-hosts'] == 9, data)
173         self.failUnless(len(data['servers-responding']) == 10, data)
174         self.failUnless(len(data['list-corrupt-shares']) == 0, data)
175
176     def test_corrupt_file_verno(self):
177         self.basedir = "repairer/Verifier/corrupt_file_verno"
178         return self._help_test_verify(common._corrupt_file_version_number,
179                                       self.judge_visible_corruption)
180
181     def judge_share_version_incompatibility(self, vr):
182         # corruption of the share version (inside the container, the 1/2
183         # value that determines whether we've got 4-byte offsets or 8-byte
184         # offsets) to something larger than 2 will trigger a
185         # ShareVersionIncompatible exception, which should be counted in
186         # list-incompatible-shares, rather than list-corrupt-shares.
187         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
188         data = vr.get_data()
189         self.failUnlessEqual(data['count-shares-good'], 9)
190         self.failUnlessEqual(len(data['sharemap']), 9)
191         self.failUnlessEqual(data['count-shares-needed'], 3)
192         self.failUnlessEqual(data['count-shares-expected'], 10)
193         self.failUnlessEqual(data['count-good-share-hosts'], 9)
194         self.failUnlessEqual(len(data['servers-responding']), 10)
195         self.failUnlessEqual(len(data['list-corrupt-shares']), 0)
196         self.failUnlessEqual(data['count-corrupt-shares'], 0)
197         self.failUnlessEqual(len(data['list-incompatible-shares']), 1)
198         self.failUnlessEqual(data['count-incompatible-shares'], 1)
199
200     def test_corrupt_share_verno(self):
201         self.basedir = "repairer/Verifier/corrupt_share_verno"
202         return self._help_test_verify(common._corrupt_sharedata_version_number,
203                                       self.judge_share_version_incompatibility)
204
205     def judge_invisible_corruption(self, vr):
206         # corruption of fields that the server does not check (which is most
207         # of them), which will be detected by the client as it downloads
208         # those shares.
209         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
210         data = vr.get_data()
211         self.failUnlessEqual(data['count-shares-good'], 9)
212         self.failUnlessEqual(data['count-shares-needed'], 3)
213         self.failUnlessEqual(data['count-shares-expected'], 10)
214         self.failUnlessEqual(data['count-good-share-hosts'], 9)
215         self.failUnlessEqual(data['count-corrupt-shares'], 1)
216         self.failUnlessEqual(len(data['list-corrupt-shares']), 1)
217         self.failUnlessEqual(data['count-incompatible-shares'], 0)
218         self.failUnlessEqual(len(data['list-incompatible-shares']), 0)
219         self.failUnlessEqual(len(data['servers-responding']), 10)
220         self.failUnlessEqual(len(data['sharemap']), 9)
221
222     def test_corrupt_sharedata_offset(self):
223         self.basedir = "repairer/Verifier/corrupt_sharedata_offset"
224         return self._help_test_verify(common._corrupt_offset_of_sharedata,
225                                       self.judge_invisible_corruption)
226
227     def test_corrupt_ueb_offset(self):
228         self.basedir = "repairer/Verifier/corrupt_ueb_offset"
229         return self._help_test_verify(common._corrupt_offset_of_uri_extension,
230                                       self.judge_invisible_corruption)
231
232     def test_corrupt_ueb_offset_shortread(self):
233         self.basedir = "repairer/Verifier/corrupt_ueb_offset_shortread"
234         return self._help_test_verify(common._corrupt_offset_of_uri_extension_to_force_short_read,
235                                       self.judge_invisible_corruption)
236
237     def test_corrupt_sharedata(self):
238         self.basedir = "repairer/Verifier/corrupt_sharedata"
239         return self._help_test_verify(common._corrupt_share_data,
240                                       self.judge_invisible_corruption)
241
242     def test_corrupt_ueb_length(self):
243         self.basedir = "repairer/Verifier/corrupt_ueb_length"
244         return self._help_test_verify(common._corrupt_length_of_uri_extension,
245                                       self.judge_invisible_corruption)
246
247     def test_corrupt_ueb(self):
248         self.basedir = "repairer/Verifier/corrupt_ueb"
249         return self._help_test_verify(common._corrupt_uri_extension,
250                                       self.judge_invisible_corruption)
251
252     def test_truncate_crypttext_hashtree(self):
253         # change the start of the block hashtree, to truncate the preceding
254         # crypttext hashtree
255         self.basedir = "repairer/Verifier/truncate_crypttext_hashtree"
256         return self._help_test_verify(common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes,
257                                       self.judge_invisible_corruption)
258
259     def test_corrupt_block_hashtree_offset(self):
260         self.basedir = "repairer/Verifier/corrupt_block_hashtree_offset"
261         return self._help_test_verify(common._corrupt_offset_of_block_hashes,
262                                       self.judge_invisible_corruption)
263
264     def test_wrong_share_verno(self):
265         self.basedir = "repairer/Verifier/wrong_share_verno"
266         return self._help_test_verify(common._corrupt_sharedata_version_number_to_plausible_version,
267                                       self.judge_invisible_corruption)
268
269     def test_corrupt_share_hashtree_offset(self):
270         self.basedir = "repairer/Verifier/corrupt_share_hashtree_offset"
271         return self._help_test_verify(common._corrupt_offset_of_share_hashes,
272                                       self.judge_invisible_corruption)
273
274     def test_corrupt_crypttext_hashtree_offset(self):
275         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_offset"
276         return self._help_test_verify(common._corrupt_offset_of_ciphertext_hash_tree,
277                                       self.judge_invisible_corruption)
278
279     def test_corrupt_crypttext_hashtree(self):
280         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree"
281         return self._help_test_verify(common._corrupt_crypttext_hash_tree,
282                                       self.judge_invisible_corruption)
283
284     def test_corrupt_crypttext_hashtree_byte_x221(self):
285         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_byte_9_bit_7"
286         return self._help_test_verify(common._corrupt_crypttext_hash_tree_byte_x221,
287                                       self.judge_invisible_corruption, debug=True)
288
289     def test_corrupt_block_hashtree(self):
290         self.basedir = "repairer/Verifier/corrupt_block_hashtree"
291         return self._help_test_verify(common._corrupt_block_hashes,
292                                       self.judge_invisible_corruption)
293
294     def test_corrupt_share_hashtree(self):
295         self.basedir = "repairer/Verifier/corrupt_share_hashtree"
296         return self._help_test_verify(common._corrupt_share_hashes,
297                                       self.judge_invisible_corruption)
298
299     # TODO: the Verifier should decode to ciphertext and check it against the
300     # crypttext-hash-tree. Check this by constructing a bogus file, in which
301     # the crypttext-hash-tree is modified after encoding is done, but before
302     # the UEB is finalized. The Verifier should see a valid
303     # crypttext-hash-tree but then the ciphertext should show up as invalid.
304     # Normally this could only be triggered by a bug in FEC decode.
305
306     def OFF_test_each_byte(self):
307         # this test takes 140s to run on my laptop, and doesn't have any
308         # actual asserts, so it's commented out. It corrupts each byte of the
309         # share in sequence, and checks to see which ones the Verifier
310         # catches and which it misses. Ticket #819 contains details: there
311         # are several portions of the share that are unused, for which
312         # corruption is not supposed to be caught.
313         #
314         # If the test ran quickly, we could use the share size to compute the
315         # offsets of these unused portions and assert that everything outside
316         # of them was detected. We could then replace the rest of
317         # Verifier.test_* (which takes 16s to run on my laptop) with this
318         # one.
319         self.basedir = "repairer/Verifier/each_byte"
320         self.set_up_grid(num_clients=2)
321         d = self.upload_and_stash()
322         def _grab_sh0(res):
323             self.sh0_file = [sharefile
324                              for (shnum, serverid, sharefile)
325                              in self.find_uri_shares(self.uri)
326                              if shnum == 0][0]
327             self.sh0_orig = open(self.sh0_file, "rb").read()
328         d.addCallback(_grab_sh0)
329         def _fix_sh0(res):
330             f = open(self.sh0_file, "wb")
331             f.write(self.sh0_orig)
332             f.close()
333         def _corrupt(ign, which):
334             def _corruptor(s, debug=False):
335                 return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
336             self.corrupt_shares_numbered(self.uri, [0], _corruptor)
337         results = {}
338         def _did_check(vr, i):
339             #print "corrupt %d: healthy=%s" % (i, vr.is_healthy())
340             results[i] = vr.is_healthy()
341         def _start(ign):
342             d = defer.succeed(None)
343             for i in range(len(self.sh0_orig)):
344                 d.addCallback(_corrupt, i)
345                 d.addCallback(lambda ign:
346                               self.c1_filenode.check(Monitor(), verify=True))
347                 d.addCallback(_did_check, i)
348                 d.addCallback(_fix_sh0)
349             return d
350         d.addCallback(_start)
351         def _show_results(ign):
352             f = open("test_each_byte_output", "w")
353             for i in sorted(results.keys()):
354                 print >>f, "%d: %s" % (i, results[i])
355             f.close()
356             print "Please look in _trial_temp/test_each_byte_output for results"
357         d.addCallback(_show_results)
358         return d
359
360 # We'll allow you to pass this test even if you trigger thirty-five times as
361 # many block sends and disk writes as would be optimal.
362 WRITE_LEEWAY = 35
363 # Optimally, you could repair one of these (small) files in a single write.
364 DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY
365
366 class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
367                common.ShouldFailMixin):
368
369     def test_harness(self):
370         # This test is actually to make sure our test harness works, rather
371         # than testing anything about Tahoe code itself.
372
373         self.basedir = "repairer/Repairer/test_code"
374         self.set_up_grid(num_clients=2)
375         d = self.upload_and_stash()
376
377         d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
378         def _stash_shares(oldshares):
379             self.oldshares = oldshares
380         d.addCallback(_stash_shares)
381         d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
382         def _compare(newshares):
383             self.failUnlessEqual(newshares, self.oldshares)
384         d.addCallback(_compare)
385
386         def _delete_8(ignored):
387             shnum = self.oldshares[0][0]
388             self.delete_shares_numbered(self.uri, [shnum])
389             for sh in self.oldshares[1:8]:
390                 self.delete_share(sh)
391         d.addCallback(_delete_8)
392         d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
393         d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
394
395         d.addCallback(lambda ignored:
396                       self.shouldFail(NotEnoughSharesError, "then_download",
397                                       None,
398                                       download_to_data, self.c1_filenode))
399
400         d.addCallback(lambda ignored:
401                       self.shouldFail(NotEnoughSharesError, "then_repair",
402                                       None,
403                                       self.c1_filenode.check_and_repair,
404                                       Monitor(), verify=False))
405
406         # test share corruption
407         def _test_corrupt(ignored):
408             olddata = {}
409             shares = self.find_uri_shares(self.uri)
410             for (shnum, serverid, sharefile) in shares:
411                 olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
412             for sh in shares:
413                 self.corrupt_share(sh, common._corrupt_uri_extension)
414             for (shnum, serverid, sharefile) in shares:
415                 newdata = open(sharefile, "rb").read()
416                 self.failIfEqual(olddata[ (shnum, serverid) ], newdata)
417         d.addCallback(_test_corrupt)
418
419         def _remove_all(ignored):
420             for sh in self.find_uri_shares(self.uri):
421                 self.delete_share(sh)
422         d.addCallback(_remove_all)
423         d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
424         d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
425
426         return d
427
428     def test_repair_from_deletion_of_1(self):
429         """ Repair replaces a share that got deleted. """
430         self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
431         self.set_up_grid(num_clients=2)
432         d = self.upload_and_stash()
433
434         d.addCallback(lambda ignored:
435                       self.delete_shares_numbered(self.uri, [2]))
436         d.addCallback(lambda ignored: self._stash_counts())
437         d.addCallback(lambda ignored:
438                       self.c0_filenode.check_and_repair(Monitor(),
439                                                         verify=False))
440         def _check_results(crr):
441             self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
442             pre = crr.get_pre_repair_results()
443             self.failUnlessIsInstance(pre, check_results.CheckResults)
444             post = crr.get_post_repair_results()
445             self.failUnlessIsInstance(post, check_results.CheckResults)
446             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
447             self.failIfBigger(delta_reads, MAX_DELTA_READS)
448             self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE)
449             self.failIf(pre.is_healthy())
450             self.failUnless(post.is_healthy())
451
452             # Now we inspect the filesystem to make sure that it has 10
453             # shares.
454             shares = self.find_uri_shares(self.uri)
455             self.failIf(len(shares) < 10)
456         d.addCallback(_check_results)
457
458         d.addCallback(lambda ignored:
459                       self.c0_filenode.check(Monitor(), verify=True))
460         d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
461
462         # Now we delete seven of the other shares, then try to download the
463         # file and assert that it succeeds at downloading and has the right
464         # contents. This can't work unless it has already repaired the
465         # previously-deleted share #2.
466
467         d.addCallback(lambda ignored:
468                       self.delete_shares_numbered(self.uri, range(3, 10+1)))
469         d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
470         d.addCallback(lambda newdata:
471                       self.failUnlessEqual(newdata, common.TEST_DATA))
472         return d
473
474     def test_repair_from_deletion_of_7(self):
475         """ Repair replaces seven shares that got deleted. """
476         self.basedir = "repairer/Repairer/repair_from_deletion_of_7"
477         self.set_up_grid(num_clients=2)
478         d = self.upload_and_stash()
479         d.addCallback(lambda ignored:
480                       self.delete_shares_numbered(self.uri, range(7)))
481         d.addCallback(lambda ignored: self._stash_counts())
482         d.addCallback(lambda ignored:
483                       self.c0_filenode.check_and_repair(Monitor(),
484                                                         verify=False))
485         def _check_results(crr):
486             self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
487             pre = crr.get_pre_repair_results()
488             self.failUnlessIsInstance(pre, check_results.CheckResults)
489             post = crr.get_post_repair_results()
490             self.failUnlessIsInstance(post, check_results.CheckResults)
491             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
492
493             self.failIfBigger(delta_reads, MAX_DELTA_READS)
494             self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7))
495             self.failIf(pre.is_healthy())
496             self.failUnless(post.is_healthy(), post.data)
497
498             # Make sure we really have 10 shares.
499             shares = self.find_uri_shares(self.uri)
500             self.failIf(len(shares) < 10)
501         d.addCallback(_check_results)
502
503         d.addCallback(lambda ignored:
504                       self.c0_filenode.check(Monitor(), verify=True))
505         d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
506
507         # Now we delete seven of the other shares, then try to download the
508         # file and assert that it succeeds at downloading and has the right
509         # contents. This can't work unless it has already repaired the
510         # previously-deleted share #2.
511
512         d.addCallback(lambda ignored:
513                       self.delete_shares_numbered(self.uri, range(3, 10+1)))
514         d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
515         d.addCallback(lambda newdata:
516                       self.failUnlessEqual(newdata, common.TEST_DATA))
517         return d
518
519     def test_repairer_servers_of_happiness(self):
520         # The repairer is supposed to generate and place as many of the
521         # missing shares as possible without caring about how they are
522         # distributed.
523         self.basedir = "repairer/Repairer/repairer_servers_of_happiness"
524         self.set_up_grid(num_clients=2, num_servers=10)
525         d = self.upload_and_stash()
526         # Now delete some servers. We want to leave 3 servers, which
527         # will allow us to restore the file to a healthy state without
528         # distributing the shares widely enough to satisfy the default
529         # happiness setting.
530         def _delete_some_servers(ignored):
531             for i in xrange(7):
532                 self.g.remove_server(self.g.servers_by_number[i].my_nodeid)
533
534             assert len(self.g.servers_by_number) == 3
535
536         d.addCallback(_delete_some_servers)
537         # Now try to repair the file.
538         d.addCallback(lambda ignored:
539             self.c0_filenode.check_and_repair(Monitor(), verify=False))
540         def _check_results(crr):
541             self.failUnlessIsInstance(crr,
542                                       check_results.CheckAndRepairResults)
543             pre = crr.get_pre_repair_results()
544             post = crr.get_post_repair_results()
545             for p in (pre, post):
546                 self.failUnlessIsInstance(p, check_results.CheckResults)
547
548             self.failIf(pre.is_healthy())
549             self.failUnless(post.is_healthy())
550
551         d.addCallback(_check_results)
552         return d
553
554     # why is test_repair_from_corruption_of_1 disabled? Read on:
555     #
556     # As recently documented in NEWS.rst for the 1.3.0 release, the current
557     # immutable repairer suffers from several limitations:
558     #
559     #  * minimalistic verifier: it's just download without decryption, so we
560     #    don't look for corruption in N-k shares, and for many fields (those
561     #    which are the same in all shares) we only look for corruption in a
562     #    single share
563     #
564     #  * some kinds of corruption cause download to fail (when it ought to
565     #    just switch to a different share), so repair will fail on these too
566     #
567     #  * RIStorageServer doesn't offer a way to delete old corrupt immutable
568     #    shares (the authority model is not at all clear), so the best the
569     #    repairer can do is to put replacement shares on new servers,
570     #    unfortunately leaving the corrupt shares in place
571     #
572     # This test is pretty strenuous: it asserts that the repairer does the
573     # ideal thing in 8 distinct situations, with randomized corruption in
574     # each. Because of the aforementioned limitations, it is highly unlikely
575     # to pass any of these. We're also concerned that the download-fails case
576     # can provoke a lost-progress bug (one was fixed, but there might be more
577     # lurking), which will cause the test to fail despite a ".todo" marker,
578     # and will probably cause subsequent unrelated tests to fail too (due to
579     # "unclean reactor" problems).
580     #
581     # In addition, I (warner) have recently refactored the rest of this class
582     # to use the much-faster no_network.GridTestMixin, so this tests needs to
583     # be updated before it will be able to run again.
584     #
585     # So we're turning this test off until we've done one or more of the
586     # following:
587     #  * remove some of these limitations
588     #  * break the test up into smaller, more functionally-oriented pieces
589     #  * simplify the repairer enough to let us be confident that it is free
590     #    of lost-progress bugs
591
592     def OFF_test_repair_from_corruption_of_1(self):
593         d = defer.succeed(None)
594
595         d.addCallback(self.find_all_shares)
596         stash = [None]
597         def _stash_it(res):
598             stash[0] = res
599             return res
600         d.addCallback(_stash_it)
601         def _put_it_all_back(ignored):
602             self.replace_shares(stash[0], storage_index=self.uri.get_storage_index())
603             return ignored
604
605         def _repair_from_corruption(shnum, corruptor_func):
606             before_repair_reads = self._count_reads()
607             before_repair_allocates = self._count_writes()
608
609             d2 = self.filenode.check_and_repair(Monitor(), verify=True)
610             def _after_repair(checkandrepairresults):
611                 prerepairres = checkandrepairresults.get_pre_repair_results()
612                 postrepairres = checkandrepairresults.get_post_repair_results()
613                 after_repair_reads = self._count_reads()
614                 after_repair_allocates = self._count_writes()
615
616                 # The "* 2" in reads is because you might read a whole share
617                 # before figuring out that it is corrupted. It might be
618                 # possible to make this delta reads number a little tighter.
619                 self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads))
620                 # The "* 2" in writes is because each server has two shares,
621                 # and it is reasonable for repairer to conclude that there
622                 # are two shares that it should upload, if the server fails
623                 # to serve the first share.
624                 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 2), (after_repair_allocates, before_repair_allocates))
625                 self.failIf(prerepairres.is_healthy(), (prerepairres.data, corruptor_func))
626                 self.failUnless(postrepairres.is_healthy(), (postrepairres.data, corruptor_func))
627
628                 # Now we inspect the filesystem to make sure that it has 10
629                 # shares.
630                 shares = self.find_all_shares()
631                 self.failIf(len(shares) < 10)
632
633                 # Now we assert that the verifier reports the file as healthy.
634                 d3 = self.filenode.check(Monitor(), verify=True)
635                 def _after_verify(verifyresults):
636                     self.failUnless(verifyresults.is_healthy())
637                 d3.addCallback(_after_verify)
638
639                 # Now we delete seven of the other shares, then try to
640                 # download the file and assert that it succeeds at
641                 # downloading and has the right contents. This can't work
642                 # unless it has already repaired the previously-corrupted share.
643                 def _then_delete_7_and_try_a_download(unused=None):
644                     shnums = range(10)
645                     shnums.remove(shnum)
646                     random.shuffle(shnums)
647                     for sharenum in shnums[:7]:
648                         self._delete_a_share(sharenum=sharenum)
649
650                     return self._download_and_check_plaintext()
651                 d3.addCallback(_then_delete_7_and_try_a_download)
652                 return d3
653
654             d2.addCallback(_after_repair)
655             return d2
656
657         for corruptor_func in (
658             common._corrupt_file_version_number,
659             common._corrupt_sharedata_version_number,
660             common._corrupt_offset_of_sharedata,
661             common._corrupt_offset_of_uri_extension,
662             common._corrupt_offset_of_uri_extension_to_force_short_read,
663             common._corrupt_share_data,
664             common._corrupt_length_of_uri_extension,
665             common._corrupt_uri_extension,
666             ):
667             # Now we corrupt a share...
668             d.addCallback(self._corrupt_a_random_share, corruptor_func)
669             # And repair...
670             d.addCallback(_repair_from_corruption, corruptor_func)
671
672         return d
673     #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
674
675     def test_tiny_reads(self):
676         # ticket #1223 points out three problems:
677         #   repairer reads beyond end of input file
678         #   new-downloader does not tolerate overreads
679         #   uploader does lots of tiny reads, inefficient
680         self.basedir = "repairer/Repairer/test_tiny_reads"
681         self.set_up_grid()
682         c0 = self.g.clients[0]
683         DATA = "a"*135
684         c0.DEFAULT_ENCODING_PARAMETERS['k'] = 22
685         c0.DEFAULT_ENCODING_PARAMETERS['n'] = 66
686         d = c0.upload(upload.Data(DATA, convergence=""))
687         def _then(ur):
688             self.uri = ur.uri
689             self.delete_shares_numbered(self.uri, [0])
690             self.c0_filenode = c0.create_node_from_uri(ur.uri)
691             self._stash_counts()
692             return self.c0_filenode.check_and_repair(Monitor())
693         d.addCallback(_then)
694         def _check(ign):
695             (r,a,w) = self._get_delta_counts()
696             # when the uploader (driven by the repairer) does full-segment
697             # reads, this makes 44 server read calls (2*k). Before, when it
698             # was doing input_chunk_size reads (7 bytes), it was doing over
699             # 400.
700             self.failIf(r > 100, "too many reads: %d>100" % r)
701         d.addCallback(_check)
702         return d
703
704
705 # XXX extend these tests to show that the checker detects which specific
706 # share on which specific server is broken -- this is necessary so that the
707 # checker results can be passed to the repairer and the repairer can go ahead
708 # and upload fixes without first doing what is effectively a check (/verify)
709 # run
710
711 # XXX extend these tests to show bad behavior of various kinds from servers:
712 # raising exception from each remove_foo() method, for example
713
714 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
715
716 # XXX test corruption that truncates other hash trees than just the crypttext
717 # hash tree
718
719 # XXX test the notify-someone-about-corruption feature (also implement that
720 # feature)
721
722 # XXX test whether repairer (downloader) correctly downloads a file even if
723 # to do so it has to acquire shares from a server that has already tried to
724 # serve it a corrupted share. (I don't think the current downloader would
725 # pass this test, depending on the kind of corruption.)