2 from allmydata.test.common import SystemTestMixin, ShareManglingMixin
3 from allmydata.monitor import Monitor
4 from allmydata.interfaces import IURI, NotEnoughSharesError
5 from allmydata.immutable import upload
6 from allmydata.util import log
7 from twisted.internet import defer
8 from twisted.trial import unittest
10 import common_util as testutil
12 TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
14 def corrupt_field(data, offset, size, debug=False):
15 if random.random() < 0.5:
16 newdata = testutil.flip_one_bit(data, offset, size)
18 log.msg("testing: corrupting offset %d, size %d flipping one bit orig: %r, newdata: %r" % (offset, size, data[offset:offset+size], newdata[offset:offset+size]))
21 newval = testutil.insecurerandstr(size)
23 log.msg("testing: corrupting offset %d, size %d randomizing field, orig: %r, newval: %r" % (offset, size, data[offset:offset+size], newval))
24 return data[:offset]+newval+data[offset+size:]
26 def _corrupt_file_version_number(data):
27 """ Scramble the file data -- the share file version number have one bit flipped or else
28 will be changed to a random value."""
29 return corrupt_field(data, 0x00, 4)
31 def _corrupt_size_of_file_data(data):
32 """ Scramble the file data -- the field showing the size of the share data within the file
33 will be set to one smaller. """
34 return corrupt_field(data, 0x04, 4)
36 def _corrupt_sharedata_version_number(data):
37 """ Scramble the file data -- the share data version number will have one bit flipped or
38 else will be changed to a random value, but not 1 or 2."""
39 return corrupt_field(data, 0x0c, 4)
40 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
41 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
42 newsharevernum = sharevernum
43 while newsharevernum in (1, 2):
44 newsharevernum = random.randrange(0, 2**32)
45 newsharevernumbytes = struct.pack(">l", newsharevernum)
46 return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
48 def _corrupt_sharedata_version_number_to_known_version(data):
49 """ Scramble the file data -- the share data version number will
50 be changed to 2 if it is 1 or else to 1 if it is 2."""
51 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
52 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
57 newsharevernumbytes = struct.pack(">l", newsharevernum)
58 return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
60 def _corrupt_segment_size(data):
61 """ Scramble the file data -- the field showing the size of the segment will have one
62 bit flipped or else be changed to a random value. """
63 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
64 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
66 return corrupt_field(data, 0x0c+0x04, 4, debug=False)
68 return corrupt_field(data, 0x0c+0x04, 8, debug=False)
70 def _corrupt_size_of_sharedata(data):
71 """ Scramble the file data -- the field showing the size of the data within the share
72 data will have one bit flipped or else will be changed to a random value. """
73 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
74 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
76 return corrupt_field(data, 0x0c+0x08, 4)
78 return corrupt_field(data, 0x0c+0x0c, 8)
80 def _corrupt_offset_of_sharedata(data):
81 """ Scramble the file data -- the field showing the offset of the data within the share
82 data will have one bit flipped or else be changed to a random value. """
83 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
84 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
86 return corrupt_field(data, 0x0c+0x0c, 4)
88 return corrupt_field(data, 0x0c+0x14, 8)
90 def _corrupt_offset_of_ciphertext_hash_tree(data):
91 """ Scramble the file data -- the field showing the offset of the ciphertext hash tree
92 within the share data will have one bit flipped or else be changed to a random value.
94 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
95 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
97 return corrupt_field(data, 0x0c+0x14, 4, debug=False)
99 return corrupt_field(data, 0x0c+0x24, 8, debug=False)
101 def _corrupt_offset_of_block_hashes(data):
102 """ Scramble the file data -- the field showing the offset of the block hash tree within
103 the share data will have one bit flipped or else will be changed to a random value. """
104 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
105 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
107 return corrupt_field(data, 0x0c+0x18, 4)
109 return corrupt_field(data, 0x0c+0x2c, 8)
111 def _corrupt_offset_of_share_hashes(data):
112 """ Scramble the file data -- the field showing the offset of the share hash tree within
113 the share data will have one bit flipped or else will be changed to a random value. """
114 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
115 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
117 return corrupt_field(data, 0x0c+0x1c, 4)
119 return corrupt_field(data, 0x0c+0x34, 8)
121 def _corrupt_offset_of_uri_extension(data):
122 """ Scramble the file data -- the field showing the offset of the uri extension will
123 have one bit flipped or else will be changed to a random value. """
124 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
125 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
127 return corrupt_field(data, 0x0c+0x20, 4)
129 return corrupt_field(data, 0x0c+0x3c, 8)
131 def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False):
132 """ Scramble the file data -- the field showing the offset of the uri extension will be set
133 to the size of the file minus 3. This means when the client tries to read the length field
134 from that location it will get a short read -- the result string will be only 3 bytes long,
135 not the 4 or 8 bytes necessary to do a successful struct.unpack."""
136 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
137 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
138 # The "-0x0c" in here is to skip the server-side header in the share file, which the client doesn't see when seeking and reading.
141 log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x2c, 4, struct.unpack(">L", data[0x2c:0x2c+4])[0], len(data)-0x0c-3, len(data)))
142 return data[:0x2c] + struct.pack(">L", len(data)-0x0c-3) + data[0x2c+4:]
145 log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x48, 8, struct.unpack(">Q", data[0x48:0x48+8])[0], len(data)-0x0c-3, len(data)))
146 return data[:0x48] + struct.pack(">Q", len(data)-0x0c-3) + data[0x48+8:]
148 def _corrupt_share_data(data):
149 """ Scramble the file data -- the field containing the share data itself will have one
150 bit flipped or else will be changed to a random value. """
151 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
152 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
154 sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
156 return corrupt_field(data, 0x0c+0x24, sharedatasize)
158 sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
160 return corrupt_field(data, 0x0c+0x44, sharedatasize)
162 def _corrupt_crypttext_hash_tree(data):
163 """ Scramble the file data -- the field containing the crypttext hash tree will have one
164 bit flipped or else will be changed to a random value.
166 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
167 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
169 crypttexthashtreeoffset = struct.unpack(">L", data[0x0c+0x14:0x0c+0x14+4])[0]
170 blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
172 crypttexthashtreeoffset = struct.unpack(">Q", data[0x0c+0x24:0x0c+0x24+8])[0]
173 blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
175 return corrupt_field(data, crypttexthashtreeoffset, blockhashesoffset-crypttexthashtreeoffset)
177 def _corrupt_block_hashes(data):
178 """ Scramble the file data -- the field containing the block hash tree will have one bit
179 flipped or else will be changed to a random value.
181 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
182 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
184 blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
185 sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
187 blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
188 sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
190 return corrupt_field(data, blockhashesoffset, sharehashesoffset-blockhashesoffset)
192 def _corrupt_share_hashes(data):
193 """ Scramble the file data -- the field containing the share hash chain will have one
194 bit flipped or else will be changed to a random value.
196 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
197 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
199 sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
200 uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
202 sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
203 uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
205 return corrupt_field(data, sharehashesoffset, uriextoffset-sharehashesoffset)
207 def _corrupt_length_of_uri_extension(data):
208 """ Scramble the file data -- the field showing the length of the uri extension will
209 have one bit flipped or else will be changed to a random value. """
210 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
211 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
213 uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
214 return corrupt_field(data, uriextoffset, 4)
216 uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
217 return corrupt_field(data, uriextoffset, 8)
219 def _corrupt_uri_extension(data):
220 """ Scramble the file data -- the field containing the uri extension will have one bit
221 flipped or else will be changed to a random value. """
222 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
223 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
225 uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
226 uriextlen = struct.unpack(">L", data[0x0c+uriextoffset:0x0c+uriextoffset+4])[0]
228 uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
229 uriextlen = struct.unpack(">Q", data[0x0c+uriextoffset:0x0c+uriextoffset+8])[0]
231 return corrupt_field(data, uriextoffset, uriextlen)
233 class Test(ShareManglingMixin, unittest.TestCase):
235 # Set self.basedir to a temp dir which has the name of the current test method in its
237 self.basedir = self.mktemp()
239 d = defer.maybeDeferred(SystemTestMixin.setUp, self)
240 d.addCallback(lambda x: self.set_up_nodes())
242 def _upload_a_file(ignored):
243 d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
244 def _after_upload(u):
245 self.uri = IURI(u.uri)
246 return self.clients[0].create_node_from_uri(self.uri)
247 d2.addCallback(_after_upload)
249 d.addCallback(_upload_a_file)
251 def _stash_it(filenode):
252 self.filenode = filenode
253 d.addCallback(_stash_it)
256 def _download_and_check_plaintext(self, unused=None):
257 self.downloader = self.clients[1].getServiceNamed("downloader")
258 d = self.downloader.download_to_data(self.uri)
260 def _after_download(result):
261 self.failUnlessEqual(result, TEST_DATA)
262 d.addCallback(_after_download)
265 def _delete_a_share(self, unused=None, sharenum=None):
266 """ Delete one share. """
268 shares = self.find_shares()
270 if sharenum is not None:
271 k = [ key for key in shares.keys() if key[1] == sharenum ][0]
273 k = random.choice(ks)
275 self.replace_shares(shares, storage_index=self.uri.storage_index)
279 def test_test_code(self):
280 # The following process of stashing the shares, running
281 # replace_shares, and asserting that the new set of shares equals the
282 # old is more to test this test code than to test the Tahoe code...
283 d = defer.succeed(None)
284 d.addCallback(self.find_shares)
289 d.addCallback(_stash_it)
290 d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
294 self.failUnless(isinstance(oldshares, dict), oldshares)
295 self.failUnlessEqual(oldshares, res)
297 d.addCallback(self.find_shares)
298 d.addCallback(_compare)
300 d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
301 d.addCallback(self.find_shares)
302 d.addCallback(lambda x: self.failUnlessEqual(x, {}))
304 # The following process of deleting 8 of the shares and asserting that you can't
305 # download it is more to test this test code than to test the Tahoe code...
306 def _then_delete_8(unused=None):
307 self.replace_shares(stash[0], storage_index=self.uri.storage_index)
309 self._delete_a_share()
310 d.addCallback(_then_delete_8)
312 def _then_download(unused=None):
313 self.downloader = self.clients[1].getServiceNamed("downloader")
314 d = self.downloader.download_to_data(self.uri)
316 def _after_download_callb(result):
317 self.fail() # should have gotten an errback instead
319 def _after_download_errb(failure):
320 failure.trap(NotEnoughSharesError)
321 return None # success!
322 d.addCallbacks(_after_download_callb, _after_download_errb)
323 d.addCallback(_then_download)
325 # The following process of leaving 8 of the shares deleted and asserting that you can't
326 # repair it is more to test this test code than to test the Tahoe code...
327 def _then_repair(unused=None):
328 d2 = self.filenode.check_and_repair(Monitor(), verify=False)
329 def _after_repair(checkandrepairresults):
330 prerepairres = checkandrepairresults.get_pre_repair_results()
331 postrepairres = checkandrepairresults.get_post_repair_results()
332 self.failIf(prerepairres.is_healthy())
333 self.failIf(postrepairres.is_healthy())
334 d2.addCallback(_after_repair)
336 d.addCallback(_then_repair)
339 def _count_reads(self):
340 sum_of_read_counts = 0
341 for client in self.clients:
342 counters = client.stats_provider.get_stats()['counters']
343 sum_of_read_counts += counters.get('storage_server.read', 0)
344 return sum_of_read_counts
346 def _count_allocates(self):
347 sum_of_allocate_counts = 0
348 for client in self.clients:
349 counters = client.stats_provider.get_stats()['counters']
350 sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
351 return sum_of_allocate_counts
353 def _corrupt_a_share(self, unused, corruptor_func, sharenum):
354 shares = self.find_shares()
355 ks = [ key for key in shares.keys() if key[1] == sharenum ]
356 assert ks, (shares.keys(), sharenum)
358 shares[k] = corruptor_func(shares[k])
359 self.replace_shares(shares, storage_index=self.uri.storage_index)
361 def _corrupt_all_shares(self, unused, corruptor_func):
362 """ All shares on disk will be corrupted by corruptor_func. """
363 shares = self.find_shares()
364 for k in shares.keys():
365 self._corrupt_a_share(unused, corruptor_func, k[1])
367 def _corrupt_a_random_share(self, unused, corruptor_func):
368 """ Exactly one share on disk will be corrupted by corruptor_func. """
369 shares = self.find_shares()
371 k = random.choice(ks)
372 return self._corrupt_a_share(unused, corruptor_func, k[1])
374 def test_download(self):
375 """ Basic download. (This functionality is more or less already tested by test code in
376 other modules, but this module is also going to test some more specific things about
379 d = defer.succeed(None)
380 before_download_reads = self._count_reads()
381 def _after_download(unused=None):
382 after_download_reads = self._count_reads()
383 # To pass this test, you have to download the file using only 10 reads to get the
384 # UEB (in parallel from all shares), plus one read for each of the 3 shares.
385 self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
386 d.addCallback(self._download_and_check_plaintext)
387 d.addCallback(_after_download)
390 def test_download_from_only_3_remaining_shares(self):
391 """ Test download after 7 random shares (of the 10) have been removed. """
392 d = defer.succeed(None)
393 def _then_delete_7(unused=None):
395 self._delete_a_share()
396 before_download_reads = self._count_reads()
397 d.addCallback(_then_delete_7)
398 def _after_download(unused=None):
399 after_download_reads = self._count_reads()
400 # To pass this test, you have to download the file using only 10 reads to get the
401 # UEB (in parallel from all shares), plus one read for each of the 3 shares.
402 self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
403 d.addCallback(self._download_and_check_plaintext)
404 d.addCallback(_after_download)
406 test_download_from_only_3_remaining_shares.todo = "I think this test is failing due to the downloader code not knowing how to handle URI corruption and keeping going. I'm going to commit new downloader code soon, and then see if this test starts passing."
408 def test_download_abort_if_too_many_missing_shares(self):
409 """ Test that download gives up quickly when it realizes there aren't enough shares out
411 d = defer.succeed(None)
412 def _then_delete_8(unused=None):
414 self._delete_a_share()
415 d.addCallback(_then_delete_8)
417 before_download_reads = self._count_reads()
418 def _attempt_to_download(unused=None):
419 downloader = self.clients[1].getServiceNamed("downloader")
420 d = downloader.download_to_data(self.uri)
423 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
425 self.failUnless(f.check(NotEnoughSharesError))
426 d.addCallbacks(_callb, _errb)
429 d.addCallback(_attempt_to_download)
431 def _after_attempt(unused=None):
432 after_download_reads = self._count_reads()
433 # To pass this test, you are required to give up before actually trying to read any
435 self.failIf(after_download_reads-before_download_reads > 0, (after_download_reads, before_download_reads))
436 d.addCallback(_after_attempt)
439 def test_download_abort_if_too_many_corrupted_shares(self):
440 """ Test that download gives up quickly when it realizes there aren't enough uncorrupted
441 shares out there. It should be able to tell because the corruption occurs in the
442 sharedata version number, which it checks first."""
443 d = defer.succeed(None)
444 def _then_corrupt_8(unused=None):
446 random.shuffle(shnums)
447 for shnum in shnums[:8]:
448 self._corrupt_a_share(None, _corrupt_sharedata_version_number, shnum)
449 d.addCallback(_then_corrupt_8)
451 before_download_reads = self._count_reads()
452 def _attempt_to_download(unused=None):
453 downloader = self.clients[1].getServiceNamed("downloader")
454 d = downloader.download_to_data(self.uri)
457 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
459 self.failUnless(f.check(NotEnoughSharesError))
460 d.addCallbacks(_callb, _errb)
463 d.addCallback(_attempt_to_download)
465 def _after_attempt(unused=None):
466 after_download_reads = self._count_reads()
467 # To pass this test, you are required to give up before reading all of the share
468 # data. Actually, we could give up sooner than 45 reads, but currently our download
469 # code does 45 reads. This test then serves as a "performance regression detector"
470 # -- if you change download code so that it takes *more* reads, then this test will
472 self.failIf(after_download_reads-before_download_reads > 45, (after_download_reads, before_download_reads))
473 d.addCallback(_after_attempt)
476 def test_check_without_verify(self):
477 """ Check says the file is healthy when none of the shares have been touched. It says
478 that the file is unhealthy when all of them have been removed. It doesn't use any reads.
480 d = defer.succeed(self.filenode)
481 def _check1(filenode):
482 before_check_reads = self._count_reads()
484 d2 = filenode.check(Monitor(), verify=False)
485 def _after_check(checkresults):
486 after_check_reads = self._count_reads()
487 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
488 self.failUnless(checkresults.is_healthy())
490 d2.addCallback(_after_check)
492 d.addCallback(_check1)
494 d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
495 def _check2(ignored):
496 before_check_reads = self._count_reads()
497 d2 = self.filenode.check(Monitor(), verify=False)
499 def _after_check(checkresults):
500 after_check_reads = self._count_reads()
501 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
502 self.failIf(checkresults.is_healthy())
504 d2.addCallback(_after_check)
506 d.addCallback(_check2)
510 def test_check_with_verify(self):
511 """ Check says the file is healthy when none of the shares have been touched. It says
512 that the file is unhealthy if any field of any share has been corrupted. It doesn't use
513 more than twice as many reads as it needs. """
514 LEEWAY = 7 # We'll allow you to pass this test even if you trigger seven times as many disk reads and blocks sends as would be optimal.
515 DELTA_READS = 10 * LEEWAY # N = 10
516 d = defer.succeed(self.filenode)
517 def _check_pristine(filenode):
518 before_check_reads = self._count_reads()
520 d2 = filenode.check(Monitor(), verify=True)
521 def _after_check(checkresults):
522 after_check_reads = self._count_reads()
523 self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads, DELTA_READS))
524 self.failUnless(checkresults.is_healthy())
526 d2.addCallback(_after_check)
528 d.addCallback(_check_pristine)
530 d.addCallback(self.find_shares)
535 d.addCallback(_stash_it)
537 def _check_after_feckless_corruption(ignored, corruptor_func):
538 # Corruption which has no effect -- bits of the share file that are unused.
539 before_check_reads = self._count_reads()
540 d2 = self.filenode.check(Monitor(), verify=True)
542 def _after_check(checkresults):
543 after_check_reads = self._count_reads()
544 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
545 self.failUnless(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
546 data = checkresults.get_data()
547 self.failUnless(data['count-shares-good'] == 10, data)
548 self.failUnless(len(data['sharemap']) == 10, data)
549 self.failUnless(data['count-shares-needed'] == 3, data)
550 self.failUnless(data['count-shares-expected'] == 10, data)
551 self.failUnless(data['count-good-share-hosts'] == 5, data)
552 self.failUnless(len(data['servers-responding']) == 5, data)
553 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
555 d2.addCallback(_after_check)
558 def _put_it_all_back(ignored):
559 self.replace_shares(stash[0], storage_index=self.uri.storage_index)
562 for corruptor_func in (
563 _corrupt_size_of_file_data,
564 _corrupt_size_of_sharedata,
565 _corrupt_segment_size,
567 d.addCallback(self._corrupt_a_random_share, corruptor_func)
568 d.addCallback(_check_after_feckless_corruption, corruptor_func=corruptor_func)
569 d.addCallback(_put_it_all_back)
571 def _check_after_server_visible_corruption(ignored, corruptor_func):
572 # Corruption which is detected by the server means that the server will send you
573 # back a Failure in response to get_bucket instead of giving you the share data.
574 before_check_reads = self._count_reads()
575 d2 = self.filenode.check(Monitor(), verify=True)
577 def _after_check(checkresults):
578 after_check_reads = self._count_reads()
579 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
580 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
581 data = checkresults.get_data()
582 # The server might fail to serve up its other share as well as the corrupted
583 # one, so count-shares-good could be 8 or 9.
584 self.failUnless(data['count-shares-good'] in (8, 9), data)
585 self.failUnless(len(data['sharemap']) in (8, 9,), data)
586 self.failUnless(data['count-shares-needed'] == 3, data)
587 self.failUnless(data['count-shares-expected'] == 10, data)
588 # The server may have served up the non-corrupted share, or it may not have, so
589 # the checker could have detected either 4 or 5 good servers.
590 self.failUnless(data['count-good-share-hosts'] in (4, 5), data)
591 self.failUnless(len(data['servers-responding']) in (4, 5), data)
592 # If the server served up the other share, then the checker should consider it good, else it should
594 self.failUnless((data['count-shares-good'] == 9) == (data['count-good-share-hosts'] == 5), data)
595 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
597 d2.addCallback(_after_check)
600 for corruptor_func in (
601 _corrupt_file_version_number,
603 d.addCallback(self._corrupt_a_random_share, corruptor_func)
604 d.addCallback(_check_after_server_visible_corruption, corruptor_func=corruptor_func)
605 d.addCallback(_put_it_all_back)
607 def _check_after_share_incompatibility(ignored, corruptor_func):
608 # Corruption which means the share is indistinguishable from a share of an
609 # incompatible version.
610 before_check_reads = self._count_reads()
611 d2 = self.filenode.check(Monitor(), verify=True)
613 def _after_check(checkresults):
614 after_check_reads = self._count_reads()
615 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
616 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
617 data = checkresults.get_data()
618 self.failUnless(data['count-shares-good'] == 9, data)
619 self.failUnless(len(data['sharemap']) == 9, data)
620 self.failUnless(data['count-shares-needed'] == 3, data)
621 self.failUnless(data['count-shares-expected'] == 10, data)
622 self.failUnless(data['count-good-share-hosts'] == 5, data)
623 self.failUnless(len(data['servers-responding']) == 5, data)
624 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
625 self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
626 self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
627 self.failUnless(len(data['list-incompatible-shares']) == 1, data)
629 d2.addCallback(_after_check)
632 for corruptor_func in (
633 _corrupt_sharedata_version_number,
635 d.addCallback(self._corrupt_a_random_share, corruptor_func)
636 d.addCallback(_check_after_share_incompatibility, corruptor_func=corruptor_func)
637 d.addCallback(_put_it_all_back)
639 def _check_after_server_invisible_corruption(ignored, corruptor_func):
640 # Corruption which is not detected by the server means that the server will send you
641 # back the share data, but you will detect that it is wrong.
642 before_check_reads = self._count_reads()
643 d2 = self.filenode.check(Monitor(), verify=True)
645 def _after_check(checkresults):
646 after_check_reads = self._count_reads()
647 # print "delta was ", after_check_reads - before_check_reads
648 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
649 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
650 data = checkresults.get_data()
651 self.failUnless(data['count-shares-good'] == 9, data)
652 self.failUnless(data['count-shares-needed'] == 3, data)
653 self.failUnless(data['count-shares-expected'] == 10, data)
654 self.failUnless(data['count-good-share-hosts'] == 5, data)
655 self.failUnless(data['count-corrupt-shares'] == 1, (data, corruptor_func))
656 self.failUnless(len(data['list-corrupt-shares']) == 1, data)
657 self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
658 self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
659 self.failUnless(len(data['list-incompatible-shares']) == 0, data)
660 self.failUnless(len(data['servers-responding']) == 5, data)
661 self.failUnless(len(data['sharemap']) == 9, data)
663 d2.addCallback(_after_check)
666 for corruptor_func in (
667 _corrupt_sharedata_version_number_to_known_version,
668 _corrupt_offset_of_sharedata,
669 _corrupt_offset_of_ciphertext_hash_tree,
670 _corrupt_offset_of_block_hashes,
671 _corrupt_offset_of_share_hashes,
672 _corrupt_offset_of_uri_extension,
673 _corrupt_offset_of_uri_extension_to_force_short_read,
675 _corrupt_crypttext_hash_tree,
676 _corrupt_block_hashes,
677 _corrupt_share_hashes,
678 _corrupt_length_of_uri_extension,
679 _corrupt_uri_extension,
681 d.addCallback(self._corrupt_a_random_share, corruptor_func)
682 d.addCallback(_check_after_server_invisible_corruption, corruptor_func=corruptor_func)
683 d.addCallback(_put_it_all_back)
685 test_check_with_verify.todo = "We haven't implemented a verifier this thorough yet."
687 def test_repair(self):
688 """ Repair replaces a share that got deleted. """
689 # N == 10. 7 is the "efficiency leeway" -- we'll allow you to pass this test even if
690 # you trigger seven times as many disk reads and blocks sends as would be optimal.
692 # We'll allow you to pass this test only if you repair the missing share using only a
696 d = defer.succeed(self.filenode)
697 d.addCallback(self._delete_a_share, sharenum=2)
699 def _repair_from_deletion_of_1(filenode):
700 before_repair_reads = self._count_reads()
701 before_repair_allocates = self._count_allocates()
703 d2 = filenode.check_and_repair(Monitor(), verify=False)
704 def _after_repair(checkandrepairresults):
705 prerepairres = checkandrepairresults.get_pre_repair_results()
706 postrepairres = checkandrepairresults.get_post_repair_results()
707 after_repair_reads = self._count_reads()
708 after_repair_allocates = self._count_allocates()
710 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
711 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
712 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
713 self.failIf(prerepairres.is_healthy())
714 self.failUnless(postrepairres.is_healthy())
716 # Now we inspect the filesystem to make sure that it has 10 shares.
717 shares = self.find_shares()
718 self.failIf(len(shares) < 10)
720 # Now we delete seven of the other shares, then try to download the file and
721 # assert that it succeeds at downloading and has the right contents. This can't
722 # work unless it has already repaired the previously-deleted share #2.
723 for sharenum in range(3, 10):
724 self._delete_a_share(sharenum=sharenum)
726 return self._download_and_check_plaintext()
728 d2.addCallback(_after_repair)
730 d.addCallback(_repair_from_deletion_of_1)
732 # Now we repair again to get all of those 7 back...
733 def _repair_from_deletion_of_7(filenode):
734 before_repair_reads = self._count_reads()
735 before_repair_allocates = self._count_allocates()
737 d2 = filenode.check_and_repair(Monitor(), verify=False)
738 def _after_repair(checkandrepairresults):
739 prerepairres = checkandrepairresults.get_pre_repair_results()
740 postrepairres = checkandrepairresults.get_post_repair_results()
741 after_repair_reads = self._count_reads()
742 after_repair_allocates = self._count_allocates()
744 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
745 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
746 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_ALLOCATES*7))
747 self.failIf(prerepairres.is_healthy())
748 self.failUnless(postrepairres.is_healthy())
750 # Now we inspect the filesystem to make sure that it has 10 shares.
751 shares = self.find_shares()
752 self.failIf(len(shares) < 10)
754 return self._download_and_check_plaintext()
756 d2.addCallback(_after_repair)
758 d.addCallback(_repair_from_deletion_of_7)
760 def _repair_from_corruption(filenode):
761 before_repair_reads = self._count_reads()
762 before_repair_allocates = self._count_allocates()
764 d2 = filenode.check_and_repair(Monitor(), verify=False)
765 def _after_repair(checkandrepairresults):
766 prerepairres = checkandrepairresults.get_pre_repair_results()
767 postrepairres = checkandrepairresults.get_post_repair_results()
768 after_repair_reads = self._count_reads()
769 after_repair_allocates = self._count_allocates()
771 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
772 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
773 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
774 self.failIf(prerepairres.is_healthy())
775 self.failUnless(postrepairres.is_healthy())
777 return self._download_and_check_plaintext()
779 d2.addCallback(_after_repair)
782 for corruptor_func in (
783 _corrupt_file_version_number,
784 _corrupt_sharedata_version_number,
785 _corrupt_sharedata_version_number_to_known_version,
786 _corrupt_offset_of_sharedata,
787 _corrupt_offset_of_ciphertext_hash_tree,
788 _corrupt_offset_of_block_hashes,
789 _corrupt_offset_of_share_hashes,
790 _corrupt_offset_of_uri_extension,
792 _corrupt_crypttext_hash_tree,
793 _corrupt_block_hashes,
794 _corrupt_share_hashes,
795 _corrupt_length_of_uri_extension,
796 _corrupt_uri_extension,
798 # Now we corrupt a share...
799 d.addCallback(self._corrupt_a_random_share, corruptor_func)
801 d.addCallback(_repair_from_corruption)
804 test_repair.todo = "We haven't implemented a repairer yet."
807 # XXX extend these tests to show that the checker detects which specific share on which specific server is broken -- this is necessary so that the checker results can be passed to the repairer and the repairer can go ahead and upload fixes without first doing what is effectively a check (/verify) run
809 # XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example
811 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit