]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_repairer.py
65d2b2c27669810f207598a10ed62e233d5003f3
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_repairer.py
1 # -*- coding: utf-8 -*-
2 from allmydata.test import common
3 from allmydata.monitor import Monitor
4 from allmydata import check_results
5 from allmydata.interfaces import NotEnoughSharesError
6 from allmydata.immutable import upload
7 from allmydata.util.consumer import download_to_data
8 from twisted.internet import defer
9 from twisted.trial import unittest
10 import random
11 from allmydata.test.no_network import GridTestMixin
12
13 # We'll allow you to pass this test even if you trigger eighteen times as
14 # many disk reads and block fetches as would be optimal.
15 READ_LEEWAY = 18
16 MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10
17
18 timeout=240 # François's ARM box timed out after 120 seconds of Verifier.test_corrupt_crypttext_hashtree
19
20 class RepairTestMixin:
21     def failUnlessIsInstance(self, x, xtype):
22         self.failUnless(isinstance(x, xtype), x)
23
24     def _count_reads(self):
25         sum_of_read_counts = 0
26         for (i, ss, storedir) in self.iterate_servers():
27             counters = ss.stats_provider.get_stats()['counters']
28             sum_of_read_counts += counters.get('storage_server.read', 0)
29         return sum_of_read_counts
30
31     def _count_allocates(self):
32         sum_of_allocate_counts = 0
33         for (i, ss, storedir) in self.iterate_servers():
34             counters = ss.stats_provider.get_stats()['counters']
35             sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
36         return sum_of_allocate_counts
37
38     def _count_writes(self):
39         sum_of_write_counts = 0
40         for (i, ss, storedir) in self.iterate_servers():
41             counters = ss.stats_provider.get_stats()['counters']
42             sum_of_write_counts += counters.get('storage_server.write', 0)
43         return sum_of_write_counts
44
45     def _stash_counts(self):
46         self.before_repair_reads = self._count_reads()
47         self.before_repair_allocates = self._count_allocates()
48         self.before_repair_writes = self._count_writes()
49
50     def _get_delta_counts(self):
51         delta_reads = self._count_reads() - self.before_repair_reads
52         delta_allocates = self._count_allocates() - self.before_repair_allocates
53         delta_writes = self._count_writes() - self.before_repair_writes
54         return (delta_reads, delta_allocates, delta_writes)
55
56     def failIfBigger(self, x, y):
57         self.failIf(x > y, "%s > %s" % (x, y))
58
59     def upload_and_stash(self):
60         c0 = self.g.clients[0]
61         c1 = self.g.clients[1]
62         c0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
63         d = c0.upload(upload.Data(common.TEST_DATA, convergence=""))
64         def _stash_uri(ur):
65             self.uri = ur.get_uri()
66             self.c0_filenode = c0.create_node_from_uri(ur.get_uri())
67             self.c1_filenode = c1.create_node_from_uri(ur.get_uri())
68         d.addCallback(_stash_uri)
69         return d
70
71 class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin):
72     def test_check_without_verify(self):
73         """Check says the file is healthy when none of the shares have been
74         touched. It says that the file is unhealthy when all of them have
75         been removed. It doesn't use any reads.
76         """
77         self.basedir = "repairer/Verifier/check_without_verify"
78         self.set_up_grid(num_clients=2)
79         d = self.upload_and_stash()
80         d.addCallback(lambda ignored: self._stash_counts())
81         d.addCallback(lambda ignored:
82                       self.c0_filenode.check(Monitor(), verify=False))
83         def _check(cr):
84             self.failUnless(cr.is_healthy())
85             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
86             self.failIfBigger(delta_reads, 0)
87         d.addCallback(_check)
88
89         def _remove_all(ignored):
90             for sh in self.find_uri_shares(self.uri):
91                 self.delete_share(sh)
92         d.addCallback(_remove_all)
93
94         d.addCallback(lambda ignored: self._stash_counts())
95         d.addCallback(lambda ignored:
96                       self.c0_filenode.check(Monitor(), verify=False))
97         def _check2(cr):
98             self.failIf(cr.is_healthy())
99             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
100             self.failIfBigger(delta_reads, 0)
101         d.addCallback(_check2)
102         return d
103
104     def _help_test_verify(self, corruptor, judgement, shnum=0, debug=False):
105         self.set_up_grid(num_clients=2)
106         d = self.upload_and_stash()
107         d.addCallback(lambda ignored: self._stash_counts())
108
109         d.addCallback(lambda ignored:
110                       self.corrupt_shares_numbered(self.uri, [shnum],corruptor,debug=debug))
111         d.addCallback(lambda ignored:
112                       self.c1_filenode.check(Monitor(), verify=True))
113         def _check(vr):
114             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
115             self.failIfBigger(delta_reads, MAX_DELTA_READS)
116             try:
117                 judgement(vr)
118             except unittest.FailTest, e:
119                 # FailTest just uses e.args[0] == str
120                 new_arg = str(e.args[0]) + "\nvr.data is: " + str(vr.get_data())
121                 e.args = (new_arg,)
122                 raise
123         d.addCallback(_check)
124         return d
125
126     def judge_no_problem(self, vr):
127         """ Verify says the file is healthy when none of the shares have been
128         touched in a way that matters. It doesn't use more than seven times
129         as many reads as it needs."""
130         self.failUnless(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
131         data = vr.get_data()
132         self.failUnless(data['count-shares-good'] == 10, data)
133         self.failUnless(len(data['sharemap']) == 10, data)
134         self.failUnless(data['count-shares-needed'] == 3, data)
135         self.failUnless(data['count-shares-expected'] == 10, data)
136         self.failUnless(data['count-good-share-hosts'] == 10, data)
137         self.failUnless(len(data['servers-responding']) == 10, data)
138         self.failUnless(len(data['list-corrupt-shares']) == 0, data)
139
140     def test_ok_no_corruption(self):
141         self.basedir = "repairer/Verifier/ok_no_corruption"
142         return self._help_test_verify(common._corrupt_nothing,
143                                       self.judge_no_problem)
144
145     def test_ok_filedata_size(self):
146         self.basedir = "repairer/Verifier/ok_filedatasize"
147         return self._help_test_verify(common._corrupt_size_of_file_data,
148                                       self.judge_no_problem)
149
150     def test_ok_sharedata_size(self):
151         self.basedir = "repairer/Verifier/ok_sharedata_size"
152         return self._help_test_verify(common._corrupt_size_of_sharedata,
153                                       self.judge_no_problem)
154
155     def test_ok_segment_size(self):
156         self.basedir = "repairer/Verifier/test_ok_segment_size"
157         return self._help_test_verify(common._corrupt_segment_size,
158                                       self.judge_no_problem)
159
160     def judge_visible_corruption(self, vr):
161         """Corruption which is detected by the server means that the server
162         will send you back a Failure in response to get_bucket instead of
163         giving you the share data. Test that verifier handles these answers
164         correctly. It doesn't use more than seven times as many reads as it
165         needs."""
166         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
167         data = vr.get_data()
168         self.failUnless(data['count-shares-good'] == 9, data)
169         self.failUnless(len(data['sharemap']) == 9, data)
170         self.failUnless(data['count-shares-needed'] == 3, data)
171         self.failUnless(data['count-shares-expected'] == 10, data)
172         self.failUnless(data['count-good-share-hosts'] == 9, data)
173         self.failUnless(len(data['servers-responding']) == 9, data)
174         self.failUnless(len(data['list-corrupt-shares']) == 0, data)
175
176     def test_corrupt_file_verno(self):
177         self.basedir = "repairer/Verifier/corrupt_file_verno"
178         return self._help_test_verify(common._corrupt_file_version_number,
179                                       self.judge_visible_corruption)
180
181     def judge_share_version_incompatibility(self, vr):
182         # corruption of the share version (inside the container, the 1/2
183         # value that determines whether we've got 4-byte offsets or 8-byte
184         # offsets) to something larger than 2 will trigger a
185         # ShareVersionIncompatible exception, which should be counted in
186         # list-incompatible-shares, rather than list-corrupt-shares.
187         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
188         data = vr.get_data()
189         self.failUnlessEqual(data['count-shares-good'], 9)
190         self.failUnlessEqual(len(data['sharemap']), 9)
191         self.failUnlessEqual(data['count-shares-needed'], 3)
192         self.failUnlessEqual(data['count-shares-expected'], 10)
193         self.failUnlessEqual(data['count-good-share-hosts'], 9)
194         self.failUnlessEqual(len(data['servers-responding']), 10)
195         self.failUnlessEqual(len(data['list-corrupt-shares']), 0)
196         self.failUnlessEqual(data['count-corrupt-shares'], 0)
197         self.failUnlessEqual(len(data['list-incompatible-shares']), 1)
198         self.failUnlessEqual(data['count-incompatible-shares'], 1)
199
200     def test_corrupt_share_verno(self):
201         self.basedir = "repairer/Verifier/corrupt_share_verno"
202         return self._help_test_verify(common._corrupt_sharedata_version_number,
203                                       self.judge_share_version_incompatibility)
204
205     def judge_invisible_corruption(self, vr):
206         # corruption of fields that the server does not check (which is most
207         # of them), which will be detected by the client as it downloads
208         # those shares.
209         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.get_data()))
210         data = vr.get_data()
211         self.failUnlessEqual(data['count-shares-good'], 9)
212         self.failUnlessEqual(data['count-shares-needed'], 3)
213         self.failUnlessEqual(data['count-shares-expected'], 10)
214         self.failUnlessEqual(data['count-good-share-hosts'], 9)
215         self.failUnlessEqual(data['count-corrupt-shares'], 1)
216         self.failUnlessEqual(len(data['list-corrupt-shares']), 1)
217         self.failUnlessEqual(data['count-incompatible-shares'], 0)
218         self.failUnlessEqual(len(data['list-incompatible-shares']), 0)
219         self.failUnlessEqual(len(data['servers-responding']), 10)
220         self.failUnlessEqual(len(data['sharemap']), 9)
221
222     def test_corrupt_sharedata_offset(self):
223         self.basedir = "repairer/Verifier/corrupt_sharedata_offset"
224         return self._help_test_verify(common._corrupt_offset_of_sharedata,
225                                       self.judge_invisible_corruption)
226
227     def test_corrupt_ueb_offset(self):
228         self.basedir = "repairer/Verifier/corrupt_ueb_offset"
229         return self._help_test_verify(common._corrupt_offset_of_uri_extension,
230                                       self.judge_invisible_corruption)
231
232     def test_corrupt_ueb_offset_shortread(self):
233         self.basedir = "repairer/Verifier/corrupt_ueb_offset_shortread"
234         return self._help_test_verify(common._corrupt_offset_of_uri_extension_to_force_short_read,
235                                       self.judge_invisible_corruption)
236
237     def test_corrupt_sharedata(self):
238         self.basedir = "repairer/Verifier/corrupt_sharedata"
239         return self._help_test_verify(common._corrupt_share_data,
240                                       self.judge_invisible_corruption)
241
242     def test_corrupt_sharedata_last_byte(self):
243         self.basedir = "repairer/Verifier/corrupt_sharedata_last_byte"
244         return self._help_test_verify(common._corrupt_share_data_last_byte,
245                                       self.judge_invisible_corruption)
246
247     def test_corrupt_ueb_length(self):
248         self.basedir = "repairer/Verifier/corrupt_ueb_length"
249         return self._help_test_verify(common._corrupt_length_of_uri_extension,
250                                       self.judge_invisible_corruption)
251
252     def test_corrupt_ueb(self):
253         self.basedir = "repairer/Verifier/corrupt_ueb"
254         return self._help_test_verify(common._corrupt_uri_extension,
255                                       self.judge_invisible_corruption)
256
257     def test_truncate_crypttext_hashtree(self):
258         # change the start of the block hashtree, to truncate the preceding
259         # crypttext hashtree
260         self.basedir = "repairer/Verifier/truncate_crypttext_hashtree"
261         return self._help_test_verify(common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes,
262                                       self.judge_invisible_corruption)
263
264     def test_corrupt_block_hashtree_offset(self):
265         self.basedir = "repairer/Verifier/corrupt_block_hashtree_offset"
266         return self._help_test_verify(common._corrupt_offset_of_block_hashes,
267                                       self.judge_invisible_corruption)
268
269     def test_wrong_share_verno(self):
270         self.basedir = "repairer/Verifier/wrong_share_verno"
271         return self._help_test_verify(common._corrupt_sharedata_version_number_to_plausible_version,
272                                       self.judge_invisible_corruption)
273
274     def test_corrupt_share_hashtree_offset(self):
275         self.basedir = "repairer/Verifier/corrupt_share_hashtree_offset"
276         return self._help_test_verify(common._corrupt_offset_of_share_hashes,
277                                       self.judge_invisible_corruption)
278
279     def test_corrupt_crypttext_hashtree_offset(self):
280         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_offset"
281         return self._help_test_verify(common._corrupt_offset_of_ciphertext_hash_tree,
282                                       self.judge_invisible_corruption)
283
284     def test_corrupt_crypttext_hashtree(self):
285         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree"
286         return self._help_test_verify(common._corrupt_crypttext_hash_tree,
287                                       self.judge_invisible_corruption)
288
289     def test_corrupt_crypttext_hashtree_byte_x221(self):
290         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_byte_9_bit_7"
291         return self._help_test_verify(common._corrupt_crypttext_hash_tree_byte_x221,
292                                       self.judge_invisible_corruption, debug=True)
293
294     def test_corrupt_block_hashtree(self):
295         self.basedir = "repairer/Verifier/corrupt_block_hashtree"
296         return self._help_test_verify(common._corrupt_block_hashes,
297                                       self.judge_invisible_corruption)
298
299     def test_corrupt_share_hashtree(self):
300         self.basedir = "repairer/Verifier/corrupt_share_hashtree"
301         return self._help_test_verify(common._corrupt_share_hashes,
302                                       self.judge_invisible_corruption)
303
304     # TODO: the Verifier should decode to ciphertext and check it against the
305     # crypttext-hash-tree. Check this by constructing a bogus file, in which
306     # the crypttext-hash-tree is modified after encoding is done, but before
307     # the UEB is finalized. The Verifier should see a valid
308     # crypttext-hash-tree but then the ciphertext should show up as invalid.
309     # Normally this could only be triggered by a bug in FEC decode.
310
311     def OFF_test_each_byte(self):
312         # this test takes 140s to run on my laptop, and doesn't have any
313         # actual asserts, so it's commented out. It corrupts each byte of the
314         # share in sequence, and checks to see which ones the Verifier
315         # catches and which it misses. Ticket #819 contains details: there
316         # are several portions of the share that are unused, for which
317         # corruption is not supposed to be caught.
318         #
319         # If the test ran quickly, we could use the share size to compute the
320         # offsets of these unused portions and assert that everything outside
321         # of them was detected. We could then replace the rest of
322         # Verifier.test_* (which takes 16s to run on my laptop) with this
323         # one.
324         self.basedir = "repairer/Verifier/each_byte"
325         self.set_up_grid(num_clients=2)
326         d = self.upload_and_stash()
327         def _grab_sh0(res):
328             self.sh0_file = [sharefile
329                              for (shnum, serverid, sharefile)
330                              in self.find_uri_shares(self.uri)
331                              if shnum == 0][0]
332             self.sh0_orig = open(self.sh0_file, "rb").read()
333         d.addCallback(_grab_sh0)
334         def _fix_sh0(res):
335             f = open(self.sh0_file, "wb")
336             f.write(self.sh0_orig)
337             f.close()
338         def _corrupt(ign, which):
339             def _corruptor(s, debug=False):
340                 return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
341             self.corrupt_shares_numbered(self.uri, [0], _corruptor)
342         results = {}
343         def _did_check(vr, i):
344             #print "corrupt %d: healthy=%s" % (i, vr.is_healthy())
345             results[i] = vr.is_healthy()
346         def _start(ign):
347             d = defer.succeed(None)
348             for i in range(len(self.sh0_orig)):
349                 d.addCallback(_corrupt, i)
350                 d.addCallback(lambda ign:
351                               self.c1_filenode.check(Monitor(), verify=True))
352                 d.addCallback(_did_check, i)
353                 d.addCallback(_fix_sh0)
354             return d
355         d.addCallback(_start)
356         def _show_results(ign):
357             f = open("test_each_byte_output", "w")
358             for i in sorted(results.keys()):
359                 print >>f, "%d: %s" % (i, results[i])
360             f.close()
361             print "Please look in _trial_temp/test_each_byte_output for results"
362         d.addCallback(_show_results)
363         return d
364
365 # We'll allow you to pass this test even if you trigger thirty-five times as
366 # many block sends and disk writes as would be optimal.
367 WRITE_LEEWAY = 35
368 # Optimally, you could repair one of these (small) files in a single write.
369 DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY
370
371 class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
372                common.ShouldFailMixin):
373
374     def test_harness(self):
375         # This test is actually to make sure our test harness works, rather
376         # than testing anything about Tahoe code itself.
377
378         self.basedir = "repairer/Repairer/test_code"
379         self.set_up_grid(num_clients=2)
380         d = self.upload_and_stash()
381
382         d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
383         def _stash_shares(oldshares):
384             self.oldshares = oldshares
385         d.addCallback(_stash_shares)
386         d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
387         def _compare(newshares):
388             self.failUnlessEqual(newshares, self.oldshares)
389         d.addCallback(_compare)
390
391         def _delete_8(ignored):
392             shnum = self.oldshares[0][0]
393             self.delete_shares_numbered(self.uri, [shnum])
394             for sh in self.oldshares[1:8]:
395                 self.delete_share(sh)
396         d.addCallback(_delete_8)
397         d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
398         d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
399
400         d.addCallback(lambda ignored:
401                       self.shouldFail(NotEnoughSharesError, "then_download",
402                                       None,
403                                       download_to_data, self.c1_filenode))
404
405         d.addCallback(lambda ignored:
406                       self.shouldFail(NotEnoughSharesError, "then_repair",
407                                       None,
408                                       self.c1_filenode.check_and_repair,
409                                       Monitor(), verify=False))
410
411         # test share corruption
412         def _test_corrupt(ignored):
413             olddata = {}
414             shares = self.find_uri_shares(self.uri)
415             for (shnum, serverid, sharefile) in shares:
416                 olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
417             for sh in shares:
418                 self.corrupt_share(sh, common._corrupt_uri_extension)
419             for (shnum, serverid, sharefile) in shares:
420                 newdata = open(sharefile, "rb").read()
421                 self.failIfEqual(olddata[ (shnum, serverid) ], newdata)
422         d.addCallback(_test_corrupt)
423
424         def _remove_all(ignored):
425             for sh in self.find_uri_shares(self.uri):
426                 self.delete_share(sh)
427         d.addCallback(_remove_all)
428         d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
429         d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
430
431         return d
432
433     def test_repair_from_deletion_of_1(self):
434         """ Repair replaces a share that got deleted. """
435         self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
436         self.set_up_grid(num_clients=2)
437         d = self.upload_and_stash()
438
439         d.addCallback(lambda ignored:
440                       self.delete_shares_numbered(self.uri, [2]))
441         d.addCallback(lambda ignored: self._stash_counts())
442         d.addCallback(lambda ignored:
443                       self.c0_filenode.check_and_repair(Monitor(),
444                                                         verify=False))
445         def _check_results(crr):
446             self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
447             pre = crr.get_pre_repair_results()
448             self.failUnlessIsInstance(pre, check_results.CheckResults)
449             post = crr.get_post_repair_results()
450             self.failUnlessIsInstance(post, check_results.CheckResults)
451             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
452             self.failIfBigger(delta_reads, MAX_DELTA_READS)
453             self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE)
454             self.failIf(pre.is_healthy())
455             self.failUnless(post.is_healthy())
456
457             # Now we inspect the filesystem to make sure that it has 10
458             # shares.
459             shares = self.find_uri_shares(self.uri)
460             self.failIf(len(shares) < 10)
461         d.addCallback(_check_results)
462
463         d.addCallback(lambda ignored:
464                       self.c0_filenode.check(Monitor(), verify=True))
465         d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
466
467         # Now we delete seven of the other shares, then try to download the
468         # file and assert that it succeeds at downloading and has the right
469         # contents. This can't work unless it has already repaired the
470         # previously-deleted share #2.
471
472         d.addCallback(lambda ignored:
473                       self.delete_shares_numbered(self.uri, range(3, 10+1)))
474         d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
475         d.addCallback(lambda newdata:
476                       self.failUnlessEqual(newdata, common.TEST_DATA))
477         return d
478
479     def test_repair_from_deletion_of_7(self):
480         """ Repair replaces seven shares that got deleted. """
481         self.basedir = "repairer/Repairer/repair_from_deletion_of_7"
482         self.set_up_grid(num_clients=2)
483         d = self.upload_and_stash()
484         d.addCallback(lambda ignored:
485                       self.delete_shares_numbered(self.uri, range(7)))
486         d.addCallback(lambda ignored: self._stash_counts())
487         d.addCallback(lambda ignored:
488                       self.c0_filenode.check_and_repair(Monitor(),
489                                                         verify=False))
490         def _check_results(crr):
491             self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
492             pre = crr.get_pre_repair_results()
493             self.failUnlessIsInstance(pre, check_results.CheckResults)
494             post = crr.get_post_repair_results()
495             self.failUnlessIsInstance(post, check_results.CheckResults)
496             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
497
498             self.failIfBigger(delta_reads, MAX_DELTA_READS)
499             self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7))
500             self.failIf(pre.is_healthy())
501             self.failUnless(post.is_healthy(), post.data)
502
503             # Make sure we really have 10 shares.
504             shares = self.find_uri_shares(self.uri)
505             self.failIf(len(shares) < 10)
506         d.addCallback(_check_results)
507
508         d.addCallback(lambda ignored:
509                       self.c0_filenode.check(Monitor(), verify=True))
510         d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
511
512         # Now we delete seven of the other shares, then try to download the
513         # file and assert that it succeeds at downloading and has the right
514         # contents. This can't work unless it has already repaired the
515         # previously-deleted share #2.
516
517         d.addCallback(lambda ignored:
518                       self.delete_shares_numbered(self.uri, range(3, 10+1)))
519         d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
520         d.addCallback(lambda newdata:
521                       self.failUnlessEqual(newdata, common.TEST_DATA))
522         return d
523
524     def test_repairer_servers_of_happiness(self):
525         # The repairer is supposed to generate and place as many of the
526         # missing shares as possible without caring about how they are
527         # distributed.
528         self.basedir = "repairer/Repairer/repairer_servers_of_happiness"
529         self.set_up_grid(num_clients=2, num_servers=10)
530         d = self.upload_and_stash()
531         # Now delete some servers. We want to leave 3 servers, which
532         # will allow us to restore the file to a healthy state without
533         # distributing the shares widely enough to satisfy the default
534         # happiness setting.
535         def _delete_some_servers(ignored):
536             for i in xrange(7):
537                 self.g.remove_server(self.g.servers_by_number[i].my_nodeid)
538
539             assert len(self.g.servers_by_number) == 3
540
541         d.addCallback(_delete_some_servers)
542         # Now try to repair the file.
543         d.addCallback(lambda ignored:
544             self.c0_filenode.check_and_repair(Monitor(), verify=False))
545         def _check_results(crr):
546             self.failUnlessIsInstance(crr,
547                                       check_results.CheckAndRepairResults)
548             pre = crr.get_pre_repair_results()
549             post = crr.get_post_repair_results()
550             for p in (pre, post):
551                 self.failUnlessIsInstance(p, check_results.CheckResults)
552
553             self.failIf(pre.is_healthy())
554             self.failUnless(post.is_healthy())
555
556         d.addCallback(_check_results)
557         return d
558
559     # why is test_repair_from_corruption_of_1 disabled? Read on:
560     #
561     # As recently documented in NEWS.rst for the 1.3.0 release, the current
562     # immutable repairer suffers from several limitations:
563     #
564     #  * minimalistic verifier: it's just download without decryption, so we
565     #    don't look for corruption in N-k shares, and for many fields (those
566     #    which are the same in all shares) we only look for corruption in a
567     #    single share
568     #
569     #  * some kinds of corruption cause download to fail (when it ought to
570     #    just switch to a different share), so repair will fail on these too
571     #
572     #  * RIStorageServer doesn't offer a way to delete old corrupt immutable
573     #    shares (the authority model is not at all clear), so the best the
574     #    repairer can do is to put replacement shares on new servers,
575     #    unfortunately leaving the corrupt shares in place
576     #
577     # This test is pretty strenuous: it asserts that the repairer does the
578     # ideal thing in 8 distinct situations, with randomized corruption in
579     # each. Because of the aforementioned limitations, it is highly unlikely
580     # to pass any of these. We're also concerned that the download-fails case
581     # can provoke a lost-progress bug (one was fixed, but there might be more
582     # lurking), which will cause the test to fail despite a ".todo" marker,
583     # and will probably cause subsequent unrelated tests to fail too (due to
584     # "unclean reactor" problems).
585     #
586     # In addition, I (warner) have recently refactored the rest of this class
587     # to use the much-faster no_network.GridTestMixin, so this tests needs to
588     # be updated before it will be able to run again.
589     #
590     # So we're turning this test off until we've done one or more of the
591     # following:
592     #  * remove some of these limitations
593     #  * break the test up into smaller, more functionally-oriented pieces
594     #  * simplify the repairer enough to let us be confident that it is free
595     #    of lost-progress bugs
596
597     def OFF_test_repair_from_corruption_of_1(self):
598         d = defer.succeed(None)
599
600         d.addCallback(self.find_all_shares)
601         stash = [None]
602         def _stash_it(res):
603             stash[0] = res
604             return res
605         d.addCallback(_stash_it)
606         def _put_it_all_back(ignored):
607             self.replace_shares(stash[0], storage_index=self.uri.get_storage_index())
608             return ignored
609
610         def _repair_from_corruption(shnum, corruptor_func):
611             before_repair_reads = self._count_reads()
612             before_repair_allocates = self._count_writes()
613
614             d2 = self.filenode.check_and_repair(Monitor(), verify=True)
615             def _after_repair(checkandrepairresults):
616                 prerepairres = checkandrepairresults.get_pre_repair_results()
617                 postrepairres = checkandrepairresults.get_post_repair_results()
618                 after_repair_reads = self._count_reads()
619                 after_repair_allocates = self._count_writes()
620
621                 # The "* 2" in reads is because you might read a whole share
622                 # before figuring out that it is corrupted. It might be
623                 # possible to make this delta reads number a little tighter.
624                 self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads))
625                 # The "* 2" in writes is because each server has two shares,
626                 # and it is reasonable for repairer to conclude that there
627                 # are two shares that it should upload, if the server fails
628                 # to serve the first share.
629                 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 2), (after_repair_allocates, before_repair_allocates))
630                 self.failIf(prerepairres.is_healthy(), (prerepairres.data, corruptor_func))
631                 self.failUnless(postrepairres.is_healthy(), (postrepairres.data, corruptor_func))
632
633                 # Now we inspect the filesystem to make sure that it has 10
634                 # shares.
635                 shares = self.find_all_shares()
636                 self.failIf(len(shares) < 10)
637
638                 # Now we assert that the verifier reports the file as healthy.
639                 d3 = self.filenode.check(Monitor(), verify=True)
640                 def _after_verify(verifyresults):
641                     self.failUnless(verifyresults.is_healthy())
642                 d3.addCallback(_after_verify)
643
644                 # Now we delete seven of the other shares, then try to
645                 # download the file and assert that it succeeds at
646                 # downloading and has the right contents. This can't work
647                 # unless it has already repaired the previously-corrupted share.
648                 def _then_delete_7_and_try_a_download(unused=None):
649                     shnums = range(10)
650                     shnums.remove(shnum)
651                     random.shuffle(shnums)
652                     for sharenum in shnums[:7]:
653                         self._delete_a_share(sharenum=sharenum)
654
655                     return self._download_and_check_plaintext()
656                 d3.addCallback(_then_delete_7_and_try_a_download)
657                 return d3
658
659             d2.addCallback(_after_repair)
660             return d2
661
662         for corruptor_func in (
663             common._corrupt_file_version_number,
664             common._corrupt_sharedata_version_number,
665             common._corrupt_offset_of_sharedata,
666             common._corrupt_offset_of_uri_extension,
667             common._corrupt_offset_of_uri_extension_to_force_short_read,
668             common._corrupt_share_data,
669             common._corrupt_length_of_uri_extension,
670             common._corrupt_uri_extension,
671             ):
672             # Now we corrupt a share...
673             d.addCallback(self._corrupt_a_random_share, corruptor_func)
674             # And repair...
675             d.addCallback(_repair_from_corruption, corruptor_func)
676
677         return d
678     #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
679
680     def test_tiny_reads(self):
681         # ticket #1223 points out three problems:
682         #   repairer reads beyond end of input file
683         #   new-downloader does not tolerate overreads
684         #   uploader does lots of tiny reads, inefficient
685         self.basedir = "repairer/Repairer/test_tiny_reads"
686         self.set_up_grid()
687         c0 = self.g.clients[0]
688         DATA = "a"*135
689         c0.DEFAULT_ENCODING_PARAMETERS['k'] = 22
690         c0.DEFAULT_ENCODING_PARAMETERS['n'] = 66
691         d = c0.upload(upload.Data(DATA, convergence=""))
692         def _then(ur):
693             self.uri = ur.get_uri()
694             self.delete_shares_numbered(self.uri, [0])
695             self.c0_filenode = c0.create_node_from_uri(ur.get_uri())
696             self._stash_counts()
697             return self.c0_filenode.check_and_repair(Monitor())
698         d.addCallback(_then)
699         def _check(ign):
700             (r,a,w) = self._get_delta_counts()
701             # when the uploader (driven by the repairer) does full-segment
702             # reads, this makes 44 server read calls (2*k). Before, when it
703             # was doing input_chunk_size reads (7 bytes), it was doing over
704             # 400.
705             self.failIf(r > 100, "too many reads: %d>100" % r)
706         d.addCallback(_check)
707         return d
708
709     def test_servers_responding(self):
710         self.basedir = "repairer/Repairer/servers_responding"
711         self.set_up_grid(num_clients=2)
712         d = self.upload_and_stash()
713         # now cause one of the servers to not respond during the pre-repair
714         # filecheck, but then *do* respond to the post-repair filecheck
715         def _then(ign):
716             ss = self.g.servers_by_number[0]
717             self.g.break_server(ss.my_nodeid, count=1)
718             self.delete_shares_numbered(self.uri, [9])
719             return self.c0_filenode.check_and_repair(Monitor())
720         d.addCallback(_then)
721         def _check(rr):
722             # this exercises a bug in which the servers-responding list did
723             # not include servers that responded to the Repair, but which did
724             # not respond to the pre-repair filecheck
725             prr = rr.get_post_repair_results()
726             expected = set(self.g.get_all_serverids())
727             self.failUnlessEqual(expected, set(prr.data["servers-responding"]))
728         d.addCallback(_check)
729         return d
730
731 # XXX extend these tests to show that the checker detects which specific
732 # share on which specific server is broken -- this is necessary so that the
733 # checker results can be passed to the repairer and the repairer can go ahead
734 # and upload fixes without first doing what is effectively a check (/verify)
735 # run
736
737 # XXX extend these tests to show bad behavior of various kinds from servers:
738 # raising exception from each remove_foo() method, for example
739
740 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
741
742 # XXX test corruption that truncates other hash trees than just the crypttext
743 # hash tree
744
745 # XXX test the notify-someone-about-corruption feature (also implement that
746 # feature)
747
748 # XXX test whether repairer (downloader) correctly downloads a file even if
749 # to do so it has to acquire shares from a server that has already tried to
750 # serve it a corrupted share. (I don't think the current downloader would
751 # pass this test, depending on the kind of corruption.)