]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_repairer.py
0a6eb8c3917c35925465d7b991ebff7c94a2f7c0
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_repairer.py
1 # -*- coding: utf-8 -*-
2 from allmydata.test import common
3 from allmydata.monitor import Monitor
4 from allmydata import check_results
5 from allmydata.interfaces import NotEnoughSharesError
6 from allmydata.immutable import upload
7 from allmydata.util.consumer import download_to_data
8 from twisted.internet import defer
9 from twisted.trial import unittest
10 import random
11 from allmydata.test.no_network import GridTestMixin
12
13 # We'll allow you to pass this test even if you trigger eighteen times as
14 # many disk reads and block fetches as would be optimal.
15 READ_LEEWAY = 18
16 MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10
17
18 timeout=240 # François's ARM box timed out after 120 seconds of Verifier.test_corrupt_crypttext_hashtree
19
20 class RepairTestMixin:
21     def failUnlessIsInstance(self, x, xtype):
22         self.failUnless(isinstance(x, xtype), x)
23
24     def _count_reads(self):
25         sum_of_read_counts = 0
26         for (i, ss, storedir) in self.iterate_servers():
27             counters = ss.stats_provider.get_stats()['counters']
28             sum_of_read_counts += counters.get('storage_server.read', 0)
29         return sum_of_read_counts
30
31     def _count_allocates(self):
32         sum_of_allocate_counts = 0
33         for (i, ss, storedir) in self.iterate_servers():
34             counters = ss.stats_provider.get_stats()['counters']
35             sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
36         return sum_of_allocate_counts
37
38     def _count_writes(self):
39         sum_of_write_counts = 0
40         for (i, ss, storedir) in self.iterate_servers():
41             counters = ss.stats_provider.get_stats()['counters']
42             sum_of_write_counts += counters.get('storage_server.write', 0)
43         return sum_of_write_counts
44
45     def _stash_counts(self):
46         self.before_repair_reads = self._count_reads()
47         self.before_repair_allocates = self._count_allocates()
48         self.before_repair_writes = self._count_writes()
49
50     def _get_delta_counts(self):
51         delta_reads = self._count_reads() - self.before_repair_reads
52         delta_allocates = self._count_allocates() - self.before_repair_allocates
53         delta_writes = self._count_writes() - self.before_repair_writes
54         return (delta_reads, delta_allocates, delta_writes)
55
56     def failIfBigger(self, x, y):
57         self.failIf(x > y, "%s > %s" % (x, y))
58
59     def upload_and_stash(self):
60         c0 = self.g.clients[0]
61         c1 = self.g.clients[1]
62         c0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
63         d = c0.upload(upload.Data(common.TEST_DATA, convergence=""))
64         def _stash_uri(ur):
65             self.uri = ur.get_uri()
66             self.c0_filenode = c0.create_node_from_uri(ur.get_uri())
67             self.c1_filenode = c1.create_node_from_uri(ur.get_uri())
68         d.addCallback(_stash_uri)
69         return d
70
71 class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin):
72     def test_check_without_verify(self):
73         """Check says the file is healthy when none of the shares have been
74         touched. It says that the file is unhealthy when all of them have
75         been removed. It doesn't use any reads.
76         """
77         self.basedir = "repairer/Verifier/check_without_verify"
78         self.set_up_grid(num_clients=2)
79         d = self.upload_and_stash()
80         d.addCallback(lambda ignored: self._stash_counts())
81         d.addCallback(lambda ignored:
82                       self.c0_filenode.check(Monitor(), verify=False))
83         def _check(cr):
84             self.failUnless(cr.is_healthy())
85             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
86             self.failIfBigger(delta_reads, 0)
87         d.addCallback(_check)
88
89         def _remove_all(ignored):
90             for sh in self.find_uri_shares(self.uri):
91                 self.delete_share(sh)
92         d.addCallback(_remove_all)
93
94         d.addCallback(lambda ignored: self._stash_counts())
95         d.addCallback(lambda ignored:
96                       self.c0_filenode.check(Monitor(), verify=False))
97         def _check2(cr):
98             self.failIf(cr.is_healthy())
99             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
100             self.failIfBigger(delta_reads, 0)
101         d.addCallback(_check2)
102         return d
103
104     def _help_test_verify(self, corruptor, judgement, shnum=0, debug=False):
105         self.set_up_grid(num_clients=2)
106         d = self.upload_and_stash()
107         d.addCallback(lambda ignored: self._stash_counts())
108
109         d.addCallback(lambda ignored:
110                       self.corrupt_shares_numbered(self.uri, [shnum],corruptor,debug=debug))
111         d.addCallback(lambda ignored:
112                       self.c1_filenode.check(Monitor(), verify=True))
113         def _check(vr):
114             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
115             self.failIfBigger(delta_reads, MAX_DELTA_READS)
116             try:
117                 judgement(vr)
118             except unittest.FailTest, e:
119                 # FailTest just uses e.args[0] == str
120                 new_arg = str(e.args[0]) + "\nvr.data is: " + str(vr.as_dict())
121                 e.args = (new_arg,)
122                 raise
123         d.addCallback(_check)
124         return d
125
126     def judge_no_problem(self, vr):
127         """ Verify says the file is healthy when none of the shares have been
128         touched in a way that matters. It doesn't use more than seven times
129         as many reads as it needs."""
130         self.failUnless(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict()))
131         self.failUnlessEqual(vr.get_share_counter_good(), 10)
132         self.failUnlessEqual(len(vr.get_sharemap()), 10)
133         self.failUnlessEqual(vr.get_encoding_needed(), 3)
134         self.failUnlessEqual(vr.get_encoding_expected(), 10)
135         self.failUnlessEqual(vr.get_host_counter_good_shares(), 10)
136         self.failUnlessEqual(len(vr.get_servers_responding()), 10)
137         self.failUnlessEqual(len(vr.get_corrupt_shares()), 0)
138
139     def test_ok_no_corruption(self):
140         self.basedir = "repairer/Verifier/ok_no_corruption"
141         return self._help_test_verify(common._corrupt_nothing,
142                                       self.judge_no_problem)
143
144     def test_ok_filedata_size(self):
145         self.basedir = "repairer/Verifier/ok_filedatasize"
146         return self._help_test_verify(common._corrupt_size_of_file_data,
147                                       self.judge_no_problem)
148
149     def test_ok_sharedata_size(self):
150         self.basedir = "repairer/Verifier/ok_sharedata_size"
151         return self._help_test_verify(common._corrupt_size_of_sharedata,
152                                       self.judge_no_problem)
153
154     def test_ok_segment_size(self):
155         self.basedir = "repairer/Verifier/test_ok_segment_size"
156         return self._help_test_verify(common._corrupt_segment_size,
157                                       self.judge_no_problem)
158
159     def judge_visible_corruption(self, vr):
160         """Corruption which is detected by the server means that the server
161         will send you back a Failure in response to get_bucket instead of
162         giving you the share data. Test that verifier handles these answers
163         correctly. It doesn't use more than seven times as many reads as it
164         needs."""
165         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict()))
166         self.failUnlessEqual(vr.get_share_counter_good(), 9)
167         self.failUnlessEqual(len(vr.get_sharemap()), 9)
168         self.failUnlessEqual(vr.get_encoding_needed(), 3)
169         self.failUnlessEqual(vr.get_encoding_expected(), 10)
170         self.failUnlessEqual(vr.get_host_counter_good_shares(), 9)
171         self.failUnlessEqual(len(vr.get_servers_responding()), 9)
172         self.failUnlessEqual(len(vr.get_corrupt_shares()), 0)
173
174     def test_corrupt_file_verno(self):
175         self.basedir = "repairer/Verifier/corrupt_file_verno"
176         return self._help_test_verify(common._corrupt_file_version_number,
177                                       self.judge_visible_corruption)
178
179     def judge_share_version_incompatibility(self, vr):
180         # corruption of the share version (inside the container, the 1/2
181         # value that determines whether we've got 4-byte offsets or 8-byte
182         # offsets) to something larger than 2 will trigger a
183         # ShareVersionIncompatible exception, which should be counted in
184         # list-incompatible-shares, rather than list-corrupt-shares.
185         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict()))
186         self.failUnlessEqual(vr.get_share_counter_good(), 9)
187         self.failUnlessEqual(len(vr.get_sharemap()), 9)
188         self.failUnlessEqual(vr.get_encoding_needed(), 3)
189         self.failUnlessEqual(vr.get_encoding_expected(), 10)
190         self.failUnlessEqual(vr.get_host_counter_good_shares(), 9)
191         self.failUnlessEqual(len(vr.get_servers_responding()), 10)
192         self.failUnlessEqual(len(vr.get_corrupt_shares()), 0)
193         self.failUnlessEqual(len(vr.get_incompatible_shares()), 1)
194
195     def test_corrupt_share_verno(self):
196         self.basedir = "repairer/Verifier/corrupt_share_verno"
197         return self._help_test_verify(common._corrupt_sharedata_version_number,
198                                       self.judge_share_version_incompatibility)
199
200     def judge_invisible_corruption(self, vr):
201         # corruption of fields that the server does not check (which is most
202         # of them), which will be detected by the client as it downloads
203         # those shares.
204         self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict()))
205         self.failUnlessEqual(vr.get_share_counter_good(), 9)
206         self.failUnlessEqual(vr.get_encoding_needed(), 3)
207         self.failUnlessEqual(vr.get_encoding_expected(), 10)
208         self.failUnlessEqual(vr.get_host_counter_good_shares(), 9)
209         self.failUnlessEqual(len(vr.get_corrupt_shares()), 1)
210         self.failUnlessEqual(len(vr.get_incompatible_shares()), 0)
211         self.failUnlessEqual(len(vr.get_servers_responding()), 10)
212         self.failUnlessEqual(len(vr.get_sharemap()), 9)
213
214     def test_corrupt_sharedata_offset(self):
215         self.basedir = "repairer/Verifier/corrupt_sharedata_offset"
216         return self._help_test_verify(common._corrupt_offset_of_sharedata,
217                                       self.judge_invisible_corruption)
218
219     def test_corrupt_ueb_offset(self):
220         self.basedir = "repairer/Verifier/corrupt_ueb_offset"
221         return self._help_test_verify(common._corrupt_offset_of_uri_extension,
222                                       self.judge_invisible_corruption)
223
224     def test_corrupt_ueb_offset_shortread(self):
225         self.basedir = "repairer/Verifier/corrupt_ueb_offset_shortread"
226         return self._help_test_verify(common._corrupt_offset_of_uri_extension_to_force_short_read,
227                                       self.judge_invisible_corruption)
228
229     def test_corrupt_sharedata(self):
230         self.basedir = "repairer/Verifier/corrupt_sharedata"
231         return self._help_test_verify(common._corrupt_share_data,
232                                       self.judge_invisible_corruption)
233
234     def test_corrupt_sharedata_last_byte(self):
235         self.basedir = "repairer/Verifier/corrupt_sharedata_last_byte"
236         return self._help_test_verify(common._corrupt_share_data_last_byte,
237                                       self.judge_invisible_corruption)
238
239     def test_corrupt_ueb_length(self):
240         self.basedir = "repairer/Verifier/corrupt_ueb_length"
241         return self._help_test_verify(common._corrupt_length_of_uri_extension,
242                                       self.judge_invisible_corruption)
243
244     def test_corrupt_ueb(self):
245         self.basedir = "repairer/Verifier/corrupt_ueb"
246         return self._help_test_verify(common._corrupt_uri_extension,
247                                       self.judge_invisible_corruption)
248
249     def test_truncate_crypttext_hashtree(self):
250         # change the start of the block hashtree, to truncate the preceding
251         # crypttext hashtree
252         self.basedir = "repairer/Verifier/truncate_crypttext_hashtree"
253         return self._help_test_verify(common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes,
254                                       self.judge_invisible_corruption)
255
256     def test_corrupt_block_hashtree_offset(self):
257         self.basedir = "repairer/Verifier/corrupt_block_hashtree_offset"
258         return self._help_test_verify(common._corrupt_offset_of_block_hashes,
259                                       self.judge_invisible_corruption)
260
261     def test_wrong_share_verno(self):
262         self.basedir = "repairer/Verifier/wrong_share_verno"
263         return self._help_test_verify(common._corrupt_sharedata_version_number_to_plausible_version,
264                                       self.judge_invisible_corruption)
265
266     def test_corrupt_share_hashtree_offset(self):
267         self.basedir = "repairer/Verifier/corrupt_share_hashtree_offset"
268         return self._help_test_verify(common._corrupt_offset_of_share_hashes,
269                                       self.judge_invisible_corruption)
270
271     def test_corrupt_crypttext_hashtree_offset(self):
272         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_offset"
273         return self._help_test_verify(common._corrupt_offset_of_ciphertext_hash_tree,
274                                       self.judge_invisible_corruption)
275
276     def test_corrupt_crypttext_hashtree(self):
277         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree"
278         return self._help_test_verify(common._corrupt_crypttext_hash_tree,
279                                       self.judge_invisible_corruption)
280
281     def test_corrupt_crypttext_hashtree_byte_x221(self):
282         self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_byte_9_bit_7"
283         return self._help_test_verify(common._corrupt_crypttext_hash_tree_byte_x221,
284                                       self.judge_invisible_corruption, debug=True)
285
286     def test_corrupt_block_hashtree(self):
287         self.basedir = "repairer/Verifier/corrupt_block_hashtree"
288         return self._help_test_verify(common._corrupt_block_hashes,
289                                       self.judge_invisible_corruption)
290
291     def test_corrupt_share_hashtree(self):
292         self.basedir = "repairer/Verifier/corrupt_share_hashtree"
293         return self._help_test_verify(common._corrupt_share_hashes,
294                                       self.judge_invisible_corruption)
295
296     # TODO: the Verifier should decode to ciphertext and check it against the
297     # crypttext-hash-tree. Check this by constructing a bogus file, in which
298     # the crypttext-hash-tree is modified after encoding is done, but before
299     # the UEB is finalized. The Verifier should see a valid
300     # crypttext-hash-tree but then the ciphertext should show up as invalid.
301     # Normally this could only be triggered by a bug in FEC decode.
302
303     def OFF_test_each_byte(self):
304         # this test takes 140s to run on my laptop, and doesn't have any
305         # actual asserts, so it's commented out. It corrupts each byte of the
306         # share in sequence, and checks to see which ones the Verifier
307         # catches and which it misses. Ticket #819 contains details: there
308         # are several portions of the share that are unused, for which
309         # corruption is not supposed to be caught.
310         #
311         # If the test ran quickly, we could use the share size to compute the
312         # offsets of these unused portions and assert that everything outside
313         # of them was detected. We could then replace the rest of
314         # Verifier.test_* (which takes 16s to run on my laptop) with this
315         # one.
316         self.basedir = "repairer/Verifier/each_byte"
317         self.set_up_grid(num_clients=2)
318         d = self.upload_and_stash()
319         def _grab_sh0(res):
320             self.sh0_file = [sharefile
321                              for (shnum, serverid, sharefile)
322                              in self.find_uri_shares(self.uri)
323                              if shnum == 0][0]
324             self.sh0_orig = open(self.sh0_file, "rb").read()
325         d.addCallback(_grab_sh0)
326         def _fix_sh0(res):
327             f = open(self.sh0_file, "wb")
328             f.write(self.sh0_orig)
329             f.close()
330         def _corrupt(ign, which):
331             def _corruptor(s, debug=False):
332                 return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
333             self.corrupt_shares_numbered(self.uri, [0], _corruptor)
334         results = {}
335         def _did_check(vr, i):
336             #print "corrupt %d: healthy=%s" % (i, vr.is_healthy())
337             results[i] = vr.is_healthy()
338         def _start(ign):
339             d = defer.succeed(None)
340             for i in range(len(self.sh0_orig)):
341                 d.addCallback(_corrupt, i)
342                 d.addCallback(lambda ign:
343                               self.c1_filenode.check(Monitor(), verify=True))
344                 d.addCallback(_did_check, i)
345                 d.addCallback(_fix_sh0)
346             return d
347         d.addCallback(_start)
348         def _show_results(ign):
349             f = open("test_each_byte_output", "w")
350             for i in sorted(results.keys()):
351                 print >>f, "%d: %s" % (i, results[i])
352             f.close()
353             print "Please look in _trial_temp/test_each_byte_output for results"
354         d.addCallback(_show_results)
355         return d
356
357 # We'll allow you to pass this test even if you trigger thirty-five times as
358 # many block sends and disk writes as would be optimal.
359 WRITE_LEEWAY = 35
360 # Optimally, you could repair one of these (small) files in a single write.
361 DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY
362
363 class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
364                common.ShouldFailMixin):
365
366     def test_harness(self):
367         # This test is actually to make sure our test harness works, rather
368         # than testing anything about Tahoe code itself.
369
370         self.basedir = "repairer/Repairer/test_code"
371         self.set_up_grid(num_clients=2)
372         d = self.upload_and_stash()
373
374         d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
375         def _stash_shares(oldshares):
376             self.oldshares = oldshares
377         d.addCallback(_stash_shares)
378         d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
379         def _compare(newshares):
380             self.failUnlessEqual(newshares, self.oldshares)
381         d.addCallback(_compare)
382
383         def _delete_8(ignored):
384             shnum = self.oldshares[0][0]
385             self.delete_shares_numbered(self.uri, [shnum])
386             for sh in self.oldshares[1:8]:
387                 self.delete_share(sh)
388         d.addCallback(_delete_8)
389         d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
390         d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
391
392         d.addCallback(lambda ignored:
393                       self.shouldFail(NotEnoughSharesError, "then_download",
394                                       None,
395                                       download_to_data, self.c1_filenode))
396
397         d.addCallback(lambda ignored:
398                       self.shouldFail(NotEnoughSharesError, "then_repair",
399                                       None,
400                                       self.c1_filenode.check_and_repair,
401                                       Monitor(), verify=False))
402
403         # test share corruption
404         def _test_corrupt(ignored):
405             olddata = {}
406             shares = self.find_uri_shares(self.uri)
407             for (shnum, serverid, sharefile) in shares:
408                 olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
409             for sh in shares:
410                 self.corrupt_share(sh, common._corrupt_uri_extension)
411             for (shnum, serverid, sharefile) in shares:
412                 newdata = open(sharefile, "rb").read()
413                 self.failIfEqual(olddata[ (shnum, serverid) ], newdata)
414         d.addCallback(_test_corrupt)
415
416         def _remove_all(ignored):
417             for sh in self.find_uri_shares(self.uri):
418                 self.delete_share(sh)
419         d.addCallback(_remove_all)
420         d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
421         d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
422
423         return d
424
425     def test_repair_from_deletion_of_1(self):
426         """ Repair replaces a share that got deleted. """
427         self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
428         self.set_up_grid(num_clients=2)
429         d = self.upload_and_stash()
430
431         d.addCallback(lambda ignored:
432                       self.delete_shares_numbered(self.uri, [2]))
433         d.addCallback(lambda ignored: self._stash_counts())
434         d.addCallback(lambda ignored:
435                       self.c0_filenode.check_and_repair(Monitor(),
436                                                         verify=False))
437         def _check_results(crr):
438             self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
439             pre = crr.get_pre_repair_results()
440             self.failUnlessIsInstance(pre, check_results.CheckResults)
441             post = crr.get_post_repair_results()
442             self.failUnlessIsInstance(post, check_results.CheckResults)
443             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
444             self.failIfBigger(delta_reads, MAX_DELTA_READS)
445             self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE)
446             self.failIf(pre.is_healthy())
447             self.failUnless(post.is_healthy())
448
449             # Now we inspect the filesystem to make sure that it has 10
450             # shares.
451             shares = self.find_uri_shares(self.uri)
452             self.failIf(len(shares) < 10)
453         d.addCallback(_check_results)
454
455         d.addCallback(lambda ignored:
456                       self.c0_filenode.check(Monitor(), verify=True))
457         d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
458
459         # Now we delete seven of the other shares, then try to download the
460         # file and assert that it succeeds at downloading and has the right
461         # contents. This can't work unless it has already repaired the
462         # previously-deleted share #2.
463
464         d.addCallback(lambda ignored:
465                       self.delete_shares_numbered(self.uri, range(3, 10+1)))
466         d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
467         d.addCallback(lambda newdata:
468                       self.failUnlessEqual(newdata, common.TEST_DATA))
469         return d
470
471     def test_repair_from_deletion_of_7(self):
472         """ Repair replaces seven shares that got deleted. """
473         self.basedir = "repairer/Repairer/repair_from_deletion_of_7"
474         self.set_up_grid(num_clients=2)
475         d = self.upload_and_stash()
476         d.addCallback(lambda ignored:
477                       self.delete_shares_numbered(self.uri, range(7)))
478         d.addCallback(lambda ignored: self._stash_counts())
479         d.addCallback(lambda ignored:
480                       self.c0_filenode.check_and_repair(Monitor(),
481                                                         verify=False))
482         def _check_results(crr):
483             self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
484             pre = crr.get_pre_repair_results()
485             self.failUnlessIsInstance(pre, check_results.CheckResults)
486             post = crr.get_post_repair_results()
487             self.failUnlessIsInstance(post, check_results.CheckResults)
488             delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
489
490             self.failIfBigger(delta_reads, MAX_DELTA_READS)
491             self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7))
492             self.failIf(pre.is_healthy())
493             self.failUnless(post.is_healthy(), post.as_dict())
494
495             # Make sure we really have 10 shares.
496             shares = self.find_uri_shares(self.uri)
497             self.failIf(len(shares) < 10)
498         d.addCallback(_check_results)
499
500         d.addCallback(lambda ignored:
501                       self.c0_filenode.check(Monitor(), verify=True))
502         d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
503
504         # Now we delete seven of the other shares, then try to download the
505         # file and assert that it succeeds at downloading and has the right
506         # contents. This can't work unless it has already repaired the
507         # previously-deleted share #2.
508
509         d.addCallback(lambda ignored:
510                       self.delete_shares_numbered(self.uri, range(3, 10+1)))
511         d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
512         d.addCallback(lambda newdata:
513                       self.failUnlessEqual(newdata, common.TEST_DATA))
514         return d
515
516     def test_repairer_servers_of_happiness(self):
517         # The repairer is supposed to generate and place as many of the
518         # missing shares as possible without caring about how they are
519         # distributed.
520         self.basedir = "repairer/Repairer/repairer_servers_of_happiness"
521         self.set_up_grid(num_clients=2, num_servers=10)
522         d = self.upload_and_stash()
523         # Now delete some servers. We want to leave 3 servers, which
524         # will allow us to restore the file to a healthy state without
525         # distributing the shares widely enough to satisfy the default
526         # happiness setting.
527         def _delete_some_servers(ignored):
528             for i in xrange(7):
529                 self.g.remove_server(self.g.servers_by_number[i].my_nodeid)
530
531             assert len(self.g.servers_by_number) == 3
532
533         d.addCallback(_delete_some_servers)
534         # Now try to repair the file.
535         d.addCallback(lambda ignored:
536             self.c0_filenode.check_and_repair(Monitor(), verify=False))
537         def _check_results(crr):
538             self.failUnlessIsInstance(crr,
539                                       check_results.CheckAndRepairResults)
540             pre = crr.get_pre_repair_results()
541             post = crr.get_post_repair_results()
542             for p in (pre, post):
543                 self.failUnlessIsInstance(p, check_results.CheckResults)
544
545             self.failIf(pre.is_healthy())
546             self.failUnless(post.is_healthy())
547
548         d.addCallback(_check_results)
549         return d
550
551     # why is test_repair_from_corruption_of_1 disabled? Read on:
552     #
553     # As recently documented in NEWS.rst for the 1.3.0 release, the current
554     # immutable repairer suffers from several limitations:
555     #
556     #  * minimalistic verifier: it's just download without decryption, so we
557     #    don't look for corruption in N-k shares, and for many fields (those
558     #    which are the same in all shares) we only look for corruption in a
559     #    single share
560     #
561     #  * some kinds of corruption cause download to fail (when it ought to
562     #    just switch to a different share), so repair will fail on these too
563     #
564     #  * RIStorageServer doesn't offer a way to delete old corrupt immutable
565     #    shares (the authority model is not at all clear), so the best the
566     #    repairer can do is to put replacement shares on new servers,
567     #    unfortunately leaving the corrupt shares in place
568     #
569     # This test is pretty strenuous: it asserts that the repairer does the
570     # ideal thing in 8 distinct situations, with randomized corruption in
571     # each. Because of the aforementioned limitations, it is highly unlikely
572     # to pass any of these. We're also concerned that the download-fails case
573     # can provoke a lost-progress bug (one was fixed, but there might be more
574     # lurking), which will cause the test to fail despite a ".todo" marker,
575     # and will probably cause subsequent unrelated tests to fail too (due to
576     # "unclean reactor" problems).
577     #
578     # In addition, I (warner) have recently refactored the rest of this class
579     # to use the much-faster no_network.GridTestMixin, so this tests needs to
580     # be updated before it will be able to run again.
581     #
582     # So we're turning this test off until we've done one or more of the
583     # following:
584     #  * remove some of these limitations
585     #  * break the test up into smaller, more functionally-oriented pieces
586     #  * simplify the repairer enough to let us be confident that it is free
587     #    of lost-progress bugs
588
589     def OFF_test_repair_from_corruption_of_1(self):
590         d = defer.succeed(None)
591
592         d.addCallback(self.find_all_shares)
593         stash = [None]
594         def _stash_it(res):
595             stash[0] = res
596             return res
597         d.addCallback(_stash_it)
598         def _put_it_all_back(ignored):
599             self.replace_shares(stash[0], storage_index=self.uri.get_storage_index())
600             return ignored
601
602         def _repair_from_corruption(shnum, corruptor_func):
603             before_repair_reads = self._count_reads()
604             before_repair_allocates = self._count_writes()
605
606             d2 = self.filenode.check_and_repair(Monitor(), verify=True)
607             def _after_repair(checkandrepairresults):
608                 prerepairres = checkandrepairresults.get_pre_repair_results()
609                 postrepairres = checkandrepairresults.get_post_repair_results()
610                 after_repair_reads = self._count_reads()
611                 after_repair_allocates = self._count_writes()
612
613                 # The "* 2" in reads is because you might read a whole share
614                 # before figuring out that it is corrupted. It might be
615                 # possible to make this delta reads number a little tighter.
616                 self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads))
617                 # The "* 2" in writes is because each server has two shares,
618                 # and it is reasonable for repairer to conclude that there
619                 # are two shares that it should upload, if the server fails
620                 # to serve the first share.
621                 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 2), (after_repair_allocates, before_repair_allocates))
622                 self.failIf(prerepairres.is_healthy(), (prerepairres.data, corruptor_func))
623                 self.failUnless(postrepairres.is_healthy(), (postrepairres.data, corruptor_func))
624
625                 # Now we inspect the filesystem to make sure that it has 10
626                 # shares.
627                 shares = self.find_all_shares()
628                 self.failIf(len(shares) < 10)
629
630                 # Now we assert that the verifier reports the file as healthy.
631                 d3 = self.filenode.check(Monitor(), verify=True)
632                 def _after_verify(verifyresults):
633                     self.failUnless(verifyresults.is_healthy())
634                 d3.addCallback(_after_verify)
635
636                 # Now we delete seven of the other shares, then try to
637                 # download the file and assert that it succeeds at
638                 # downloading and has the right contents. This can't work
639                 # unless it has already repaired the previously-corrupted share.
640                 def _then_delete_7_and_try_a_download(unused=None):
641                     shnums = range(10)
642                     shnums.remove(shnum)
643                     random.shuffle(shnums)
644                     for sharenum in shnums[:7]:
645                         self._delete_a_share(sharenum=sharenum)
646
647                     return self._download_and_check_plaintext()
648                 d3.addCallback(_then_delete_7_and_try_a_download)
649                 return d3
650
651             d2.addCallback(_after_repair)
652             return d2
653
654         for corruptor_func in (
655             common._corrupt_file_version_number,
656             common._corrupt_sharedata_version_number,
657             common._corrupt_offset_of_sharedata,
658             common._corrupt_offset_of_uri_extension,
659             common._corrupt_offset_of_uri_extension_to_force_short_read,
660             common._corrupt_share_data,
661             common._corrupt_length_of_uri_extension,
662             common._corrupt_uri_extension,
663             ):
664             # Now we corrupt a share...
665             d.addCallback(self._corrupt_a_random_share, corruptor_func)
666             # And repair...
667             d.addCallback(_repair_from_corruption, corruptor_func)
668
669         return d
670     #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
671
672     def test_tiny_reads(self):
673         # ticket #1223 points out three problems:
674         #   repairer reads beyond end of input file
675         #   new-downloader does not tolerate overreads
676         #   uploader does lots of tiny reads, inefficient
677         self.basedir = "repairer/Repairer/test_tiny_reads"
678         self.set_up_grid()
679         c0 = self.g.clients[0]
680         DATA = "a"*135
681         c0.DEFAULT_ENCODING_PARAMETERS['k'] = 22
682         c0.DEFAULT_ENCODING_PARAMETERS['n'] = 66
683         d = c0.upload(upload.Data(DATA, convergence=""))
684         def _then(ur):
685             self.uri = ur.get_uri()
686             self.delete_shares_numbered(self.uri, [0])
687             self.c0_filenode = c0.create_node_from_uri(ur.get_uri())
688             self._stash_counts()
689             return self.c0_filenode.check_and_repair(Monitor())
690         d.addCallback(_then)
691         def _check(ign):
692             (r,a,w) = self._get_delta_counts()
693             # when the uploader (driven by the repairer) does full-segment
694             # reads, this makes 44 server read calls (2*k). Before, when it
695             # was doing input_chunk_size reads (7 bytes), it was doing over
696             # 400.
697             self.failIf(r > 100, "too many reads: %d>100" % r)
698         d.addCallback(_check)
699         return d
700
701     def test_servers_responding(self):
702         self.basedir = "repairer/Repairer/servers_responding"
703         self.set_up_grid(num_clients=2)
704         d = self.upload_and_stash()
705         # now cause one of the servers to not respond during the pre-repair
706         # filecheck, but then *do* respond to the post-repair filecheck
707         def _then(ign):
708             ss = self.g.servers_by_number[0]
709             self.g.break_server(ss.my_nodeid, count=1)
710             self.delete_shares_numbered(self.uri, [9])
711             return self.c0_filenode.check_and_repair(Monitor())
712         d.addCallback(_then)
713         def _check(rr):
714             # this exercises a bug in which the servers-responding list did
715             # not include servers that responded to the Repair, but which did
716             # not respond to the pre-repair filecheck
717             prr = rr.get_post_repair_results()
718             expected = set(self.g.get_all_serverids())
719             self.failUnlessEqual(expected,
720                                  set([s.get_serverid()
721                                       for s in prr.get_servers_responding()]))
722         d.addCallback(_check)
723         return d
724
725 # XXX extend these tests to show that the checker detects which specific
726 # share on which specific server is broken -- this is necessary so that the
727 # checker results can be passed to the repairer and the repairer can go ahead
728 # and upload fixes without first doing what is effectively a check (/verify)
729 # run
730
731 # XXX extend these tests to show bad behavior of various kinds from servers:
732 # raising exception from each remove_foo() method, for example
733
734 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
735
736 # XXX test corruption that truncates other hash trees than just the crypttext
737 # hash tree
738
739 # XXX test the notify-someone-about-corruption feature (also implement that
740 # feature)
741
742 # XXX test whether repairer (downloader) correctly downloads a file even if
743 # to do so it has to acquire shares from a server that has already tried to
744 # serve it a corrupted share. (I don't think the current downloader would
745 # pass this test, depending on the kind of corruption.)