2 from allmydata.test.common import SystemTestMixin, ShareManglingMixin
3 from allmydata.monitor import Monitor
4 from allmydata.interfaces import IURI, NotEnoughSharesError
5 from allmydata.immutable import upload
6 from allmydata.util import log
7 from twisted.internet import defer
8 from twisted.trial import unittest
10 import common_util as testutil
12 TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
14 def corrupt_field(data, offset, size, debug=False):
15 if random.random() < 0.5:
16 newdata = testutil.flip_one_bit(data, offset, size)
18 log.msg("testing: corrupting offset %d, size %d flipping one bit orig: %r, newdata: %r" % (offset, size, data[offset:offset+size], newdata[offset:offset+size]))
21 newval = testutil.insecurerandstr(size)
23 log.msg("testing: corrupting offset %d, size %d randomizing field, orig: %r, newval: %r" % (offset, size, data[offset:offset+size], newval))
24 return data[:offset]+newval+data[offset+size:]
26 def _corrupt_file_version_number(data):
27 """ Scramble the file data -- the share file version number have one bit flipped or else
28 will be changed to a random value."""
29 return corrupt_field(data, 0x00, 4)
31 def _corrupt_size_of_file_data(data):
32 """ Scramble the file data -- the field showing the size of the share data within the file
33 will be set to one smaller. """
34 return corrupt_field(data, 0x04, 4)
36 def _corrupt_sharedata_version_number(data):
37 """ Scramble the file data -- the share data version number will have one bit flipped or
38 else will be changed to a random value, but not 1 or 2."""
39 return corrupt_field(data, 0x0c, 4)
40 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
41 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
42 newsharevernum = sharevernum
43 while newsharevernum in (1, 2):
44 newsharevernum = random.randrange(0, 2**32)
45 newsharevernumbytes = struct.pack(">l", newsharevernum)
46 return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
48 def _corrupt_sharedata_version_number_to_known_version(data):
49 """ Scramble the file data -- the share data version number will
50 be changed to 2 if it is 1 or else to 1 if it is 2."""
51 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
52 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
57 newsharevernumbytes = struct.pack(">l", newsharevernum)
58 return data[:0x0c] + newsharevernumbytes + data[0x0c+4:]
60 def _corrupt_segment_size(data):
61 """ Scramble the file data -- the field showing the size of the segment will have one
62 bit flipped or else be changed to a random value. """
63 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
64 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
66 return corrupt_field(data, 0x0c+0x04, 4, debug=False)
68 return corrupt_field(data, 0x0c+0x04, 8, debug=False)
70 def _corrupt_size_of_sharedata(data):
71 """ Scramble the file data -- the field showing the size of the data within the share
72 data will have one bit flipped or else will be changed to a random value. """
73 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
74 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
76 return corrupt_field(data, 0x0c+0x08, 4)
78 return corrupt_field(data, 0x0c+0x0c, 8)
80 def _corrupt_offset_of_sharedata(data):
81 """ Scramble the file data -- the field showing the offset of the data within the share
82 data will have one bit flipped or else be changed to a random value. """
83 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
84 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
86 return corrupt_field(data, 0x0c+0x0c, 4)
88 return corrupt_field(data, 0x0c+0x14, 8)
90 def _corrupt_offset_of_ciphertext_hash_tree(data):
91 """ Scramble the file data -- the field showing the offset of the ciphertext hash tree
92 within the share data will have one bit flipped or else be changed to a random value.
94 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
95 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
97 return corrupt_field(data, 0x0c+0x14, 4, debug=False)
99 return corrupt_field(data, 0x0c+0x24, 8, debug=False)
101 def _corrupt_offset_of_block_hashes(data):
102 """ Scramble the file data -- the field showing the offset of the block hash tree within
103 the share data will have one bit flipped or else will be changed to a random value. """
104 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
105 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
107 return corrupt_field(data, 0x0c+0x18, 4)
109 return corrupt_field(data, 0x0c+0x2c, 8)
111 def _corrupt_offset_of_share_hashes(data):
112 """ Scramble the file data -- the field showing the offset of the share hash tree within
113 the share data will have one bit flipped or else will be changed to a random value. """
114 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
115 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
117 return corrupt_field(data, 0x0c+0x1c, 4)
119 return corrupt_field(data, 0x0c+0x34, 8)
121 def _corrupt_offset_of_uri_extension(data):
122 """ Scramble the file data -- the field showing the offset of the uri extension will
123 have one bit flipped or else will be changed to a random value. """
124 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
125 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
127 return corrupt_field(data, 0x0c+0x20, 4)
129 return corrupt_field(data, 0x0c+0x3c, 8)
131 def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False):
132 """ Scramble the file data -- the field showing the offset of the uri extension will be set
133 to the size of the file minus 3. This means when the client tries to read the length field
134 from that location it will get a short read -- the result string will be only 3 bytes long,
135 not the 4 or 8 bytes necessary to do a successful struct.unpack."""
136 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
137 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
138 # The "-0x0c" in here is to skip the server-side header in the share file, which the client doesn't see when seeking and reading.
141 log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x2c, 4, struct.unpack(">L", data[0x2c:0x2c+4])[0], len(data)-0x0c-3, len(data)))
142 return data[:0x2c] + struct.pack(">L", len(data)-0x0c-3) + data[0x2c+4:]
145 log.msg("testing: corrupting offset %d, size %d, changing %d to %d (len(data) == %d)" % (0x48, 8, struct.unpack(">Q", data[0x48:0x48+8])[0], len(data)-0x0c-3, len(data)))
146 return data[:0x48] + struct.pack(">Q", len(data)-0x0c-3) + data[0x48+8:]
148 def _corrupt_share_data(data):
149 """ Scramble the file data -- the field containing the share data itself will have one
150 bit flipped or else will be changed to a random value. """
151 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
152 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
154 sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
156 return corrupt_field(data, 0x0c+0x24, sharedatasize)
158 sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
160 return corrupt_field(data, 0x0c+0x44, sharedatasize)
162 def _corrupt_crypttext_hash_tree(data):
163 """ Scramble the file data -- the field containing the crypttext hash tree will have one
164 bit flipped or else will be changed to a random value.
166 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
167 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
169 crypttexthashtreeoffset = struct.unpack(">L", data[0x0c+0x14:0x0c+0x14+4])[0]
170 blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
172 crypttexthashtreeoffset = struct.unpack(">Q", data[0x0c+0x24:0x0c+0x24+8])[0]
173 blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
175 return corrupt_field(data, crypttexthashtreeoffset, blockhashesoffset-crypttexthashtreeoffset)
177 def _corrupt_block_hashes(data):
178 """ Scramble the file data -- the field containing the block hash tree will have one bit
179 flipped or else will be changed to a random value.
181 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
182 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
184 blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
185 sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
187 blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
188 sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
190 return corrupt_field(data, blockhashesoffset, sharehashesoffset-blockhashesoffset)
192 def _corrupt_share_hashes(data):
193 """ Scramble the file data -- the field containing the share hash chain will have one
194 bit flipped or else will be changed to a random value.
196 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
197 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
199 sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
200 uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
202 sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
203 uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
205 return corrupt_field(data, sharehashesoffset, uriextoffset-sharehashesoffset)
207 def _corrupt_length_of_uri_extension(data):
208 """ Scramble the file data -- the field showing the length of the uri extension will
209 have one bit flipped or else will be changed to a random value. """
210 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
211 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
213 uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
214 return corrupt_field(data, uriextoffset, 4)
216 uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
217 return corrupt_field(data, uriextoffset, 8)
219 def _corrupt_uri_extension(data):
220 """ Scramble the file data -- the field containing the uri extension will have one bit
221 flipped or else will be changed to a random value. """
222 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
223 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
225 uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
226 uriextlen = struct.unpack(">L", data[0x0c+uriextoffset:0x0c+uriextoffset+4])[0]
228 uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
229 uriextlen = struct.unpack(">Q", data[0x0c+uriextoffset:0x0c+uriextoffset+8])[0]
231 return corrupt_field(data, uriextoffset, uriextlen)
233 class Test(ShareManglingMixin, unittest.TestCase):
235 # Set self.basedir to a temp dir which has the name of the current test method in its
237 self.basedir = self.mktemp()
239 d = defer.maybeDeferred(SystemTestMixin.setUp, self)
240 d.addCallback(lambda x: self.set_up_nodes())
242 def _upload_a_file(ignored):
243 d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
244 def _after_upload(u):
245 self.uri = IURI(u.uri)
246 return self.clients[0].create_node_from_uri(self.uri)
247 d2.addCallback(_after_upload)
249 d.addCallback(_upload_a_file)
251 def _stash_it(filenode):
252 self.filenode = filenode
253 d.addCallback(_stash_it)
256 def _download_and_check_plaintext(self, unused=None):
257 self.downloader = self.clients[1].getServiceNamed("downloader")
258 d = self.downloader.download_to_data(self.uri)
260 def _after_download(result):
261 self.failUnlessEqual(result, TEST_DATA)
262 d.addCallback(_after_download)
265 def _delete_a_share(self, unused=None, sharenum=None):
266 """ Delete one share. """
268 shares = self.find_shares()
270 if sharenum is not None:
271 k = [ key for key in shares.keys() if key[1] == sharenum ][0]
273 k = random.choice(ks)
275 self.replace_shares(shares, storage_index=self.uri.storage_index)
279 def test_test_code(self):
280 # The following process of stashing the shares, running
281 # replace_shares, and asserting that the new set of shares equals the
282 # old is more to test this test code than to test the Tahoe code...
283 d = defer.succeed(None)
284 d.addCallback(self.find_shares)
289 d.addCallback(_stash_it)
290 d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
294 self.failUnless(isinstance(oldshares, dict), oldshares)
295 self.failUnlessEqual(oldshares, res)
297 d.addCallback(self.find_shares)
298 d.addCallback(_compare)
300 d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
301 d.addCallback(self.find_shares)
302 d.addCallback(lambda x: self.failUnlessEqual(x, {}))
304 # The following process of deleting 8 of the shares and asserting that you can't
305 # download it is more to test this test code than to test the Tahoe code...
306 def _then_delete_8(unused=None):
307 self.replace_shares(stash[0], storage_index=self.uri.storage_index)
309 self._delete_a_share()
310 d.addCallback(_then_delete_8)
312 def _then_download(unused=None):
313 self.downloader = self.clients[1].getServiceNamed("downloader")
314 d = self.downloader.download_to_data(self.uri)
316 def _after_download_callb(result):
317 self.fail() # should have gotten an errback instead
319 def _after_download_errb(failure):
320 failure.trap(NotEnoughSharesError)
321 return None # success!
322 d.addCallbacks(_after_download_callb, _after_download_errb)
323 d.addCallback(_then_download)
325 # The following process of leaving 8 of the shares deleted and asserting that you can't
326 # repair it is more to test this test code than to test the Tahoe code...
327 def _then_repair(unused=None):
328 d2 = self.filenode.check_and_repair(Monitor(), verify=False)
329 def _after_repair(checkandrepairresults):
330 prerepairres = checkandrepairresults.get_pre_repair_results()
331 postrepairres = checkandrepairresults.get_post_repair_results()
332 self.failIf(prerepairres.is_healthy())
333 self.failIf(postrepairres.is_healthy())
334 d2.addCallback(_after_repair)
336 d.addCallback(_then_repair)
339 def _count_reads(self):
340 sum_of_read_counts = 0
341 for client in self.clients:
342 counters = client.stats_provider.get_stats()['counters']
343 sum_of_read_counts += counters.get('storage_server.read', 0)
344 return sum_of_read_counts
346 def _count_allocates(self):
347 sum_of_allocate_counts = 0
348 for client in self.clients:
349 counters = client.stats_provider.get_stats()['counters']
350 sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
351 return sum_of_allocate_counts
353 def _corrupt_a_share(self, unused, corruptor_func, sharenum):
354 shares = self.find_shares()
355 ks = [ key for key in shares.keys() if key[1] == sharenum ]
356 assert ks, (shares.keys(), sharenum)
358 shares[k] = corruptor_func(shares[k])
359 self.replace_shares(shares, storage_index=self.uri.storage_index)
361 def _corrupt_all_shares(self, unused, corruptor_func):
362 """ All shares on disk will be corrupted by corruptor_func. """
363 shares = self.find_shares()
364 for k in shares.keys():
365 self._corrupt_a_share(unused, corruptor_func, k[1])
367 def _corrupt_a_random_share(self, unused, corruptor_func):
368 """ Exactly one share on disk will be corrupted by corruptor_func. """
369 shares = self.find_shares()
371 k = random.choice(ks)
372 return self._corrupt_a_share(unused, corruptor_func, k[1])
374 def test_download(self):
375 """ Basic download. (This functionality is more or less already tested by test code in
376 other modules, but this module is also going to test some more specific things about
379 d = defer.succeed(None)
380 before_download_reads = self._count_reads()
381 def _after_download(unused=None):
382 after_download_reads = self._count_reads()
383 # To pass this test, you have to download the file using only 10 reads to get the
384 # UEB (in parallel from all shares), plus one read for each of the 3 shares.
385 self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
386 d.addCallback(self._download_and_check_plaintext)
387 d.addCallback(_after_download)
390 def test_download_from_only_3_remaining_shares(self):
391 """ Test download after 7 random shares (of the 10) have been removed. """
392 d = defer.succeed(None)
393 def _then_delete_7(unused=None):
395 self._delete_a_share()
396 before_download_reads = self._count_reads()
397 d.addCallback(_then_delete_7)
398 def _after_download(unused=None):
399 after_download_reads = self._count_reads()
400 # To pass this test, you have to download the file using only 10 reads to get the
401 # UEB (in parallel from all shares), plus one read for each of the 3 shares.
402 self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
403 d.addCallback(self._download_and_check_plaintext)
404 d.addCallback(_after_download)
407 def test_download_abort_if_too_many_missing_shares(self):
408 """ Test that download gives up quickly when it realizes there aren't enough shares out
410 d = defer.succeed(None)
411 def _then_delete_8(unused=None):
413 self._delete_a_share()
414 d.addCallback(_then_delete_8)
416 before_download_reads = self._count_reads()
417 def _attempt_to_download(unused=None):
418 downloader = self.clients[1].getServiceNamed("downloader")
419 d = downloader.download_to_data(self.uri)
422 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
424 self.failUnless(f.check(NotEnoughSharesError))
425 d.addCallbacks(_callb, _errb)
428 d.addCallback(_attempt_to_download)
430 def _after_attempt(unused=None):
431 after_download_reads = self._count_reads()
432 # To pass this test, you are required to give up before actually trying to read any
434 self.failIf(after_download_reads-before_download_reads > 0, (after_download_reads, before_download_reads))
435 d.addCallback(_after_attempt)
438 def test_download_abort_if_too_many_corrupted_shares(self):
439 """ Test that download gives up quickly when it realizes there aren't enough uncorrupted
440 shares out there. It should be able to tell because the corruption occurs in the
441 sharedata version number, which it checks first."""
442 d = defer.succeed(None)
443 def _then_corrupt_8(unused=None):
445 random.shuffle(shnums)
446 for shnum in shnums[:8]:
447 self._corrupt_a_share(None, _corrupt_sharedata_version_number, shnum)
448 d.addCallback(_then_corrupt_8)
450 before_download_reads = self._count_reads()
451 def _attempt_to_download(unused=None):
452 downloader = self.clients[1].getServiceNamed("downloader")
453 d = downloader.download_to_data(self.uri)
456 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
458 self.failUnless(f.check(NotEnoughSharesError))
459 d.addCallbacks(_callb, _errb)
462 d.addCallback(_attempt_to_download)
464 def _after_attempt(unused=None):
465 after_download_reads = self._count_reads()
466 # To pass this test, you are required to give up before reading all of the share
467 # data. Actually, we could give up sooner than 45 reads, but currently our download
468 # code does 45 reads. This test then serves as a "performance regression detector"
469 # -- if you change download code so that it takes *more* reads, then this test will
471 self.failIf(after_download_reads-before_download_reads > 45, (after_download_reads, before_download_reads))
472 d.addCallback(_after_attempt)
475 def test_check_without_verify(self):
476 """ Check says the file is healthy when none of the shares have been touched. It says
477 that the file is unhealthy when all of them have been removed. It doesn't use any reads.
479 d = defer.succeed(self.filenode)
480 def _check1(filenode):
481 before_check_reads = self._count_reads()
483 d2 = filenode.check(Monitor(), verify=False)
484 def _after_check(checkresults):
485 after_check_reads = self._count_reads()
486 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
487 self.failUnless(checkresults.is_healthy())
489 d2.addCallback(_after_check)
491 d.addCallback(_check1)
493 d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
494 def _check2(ignored):
495 before_check_reads = self._count_reads()
496 d2 = self.filenode.check(Monitor(), verify=False)
498 def _after_check(checkresults):
499 after_check_reads = self._count_reads()
500 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
501 self.failIf(checkresults.is_healthy())
503 d2.addCallback(_after_check)
505 d.addCallback(_check2)
509 def test_check_with_verify(self):
510 """ Check says the file is healthy when none of the shares have been touched. It says
511 that the file is unhealthy if any field of any share has been corrupted. It doesn't use
512 more than twice as many reads as it needs. """
513 LEEWAY = 7 # We'll allow you to pass this test even if you trigger seven times as many disk reads and blocks sends as would be optimal.
514 DELTA_READS = 10 * LEEWAY # N = 10
515 d = defer.succeed(self.filenode)
516 def _check_pristine(filenode):
517 before_check_reads = self._count_reads()
519 d2 = filenode.check(Monitor(), verify=True)
520 def _after_check(checkresults):
521 after_check_reads = self._count_reads()
522 self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads, DELTA_READS))
523 self.failUnless(checkresults.is_healthy())
525 d2.addCallback(_after_check)
527 d.addCallback(_check_pristine)
529 d.addCallback(self.find_shares)
534 d.addCallback(_stash_it)
536 def _check_after_feckless_corruption(ignored, corruptor_func):
537 # Corruption which has no effect -- bits of the share file that are unused.
538 before_check_reads = self._count_reads()
539 d2 = self.filenode.check(Monitor(), verify=True)
541 def _after_check(checkresults):
542 after_check_reads = self._count_reads()
543 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
544 self.failUnless(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
545 data = checkresults.get_data()
546 self.failUnless(data['count-shares-good'] == 10, data)
547 self.failUnless(len(data['sharemap']) == 10, data)
548 self.failUnless(data['count-shares-needed'] == 3, data)
549 self.failUnless(data['count-shares-expected'] == 10, data)
550 self.failUnless(data['count-good-share-hosts'] == 5, data)
551 self.failUnless(len(data['servers-responding']) == 5, data)
552 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
554 d2.addCallback(_after_check)
557 def _put_it_all_back(ignored):
558 self.replace_shares(stash[0], storage_index=self.uri.storage_index)
561 for corruptor_func in (
562 _corrupt_size_of_file_data,
563 _corrupt_size_of_sharedata,
564 _corrupt_segment_size,
566 d.addCallback(self._corrupt_a_random_share, corruptor_func)
567 d.addCallback(_check_after_feckless_corruption, corruptor_func=corruptor_func)
568 d.addCallback(_put_it_all_back)
570 def _check_after_server_visible_corruption(ignored, corruptor_func):
571 # Corruption which is detected by the server means that the server will send you
572 # back a Failure in response to get_bucket instead of giving you the share data.
573 before_check_reads = self._count_reads()
574 d2 = self.filenode.check(Monitor(), verify=True)
576 def _after_check(checkresults):
577 after_check_reads = self._count_reads()
578 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
579 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
580 data = checkresults.get_data()
581 # The server might fail to serve up its other share as well as the corrupted
582 # one, so count-shares-good could be 8 or 9.
583 self.failUnless(data['count-shares-good'] in (8, 9), data)
584 self.failUnless(len(data['sharemap']) in (8, 9,), data)
585 self.failUnless(data['count-shares-needed'] == 3, data)
586 self.failUnless(data['count-shares-expected'] == 10, data)
587 # The server may have served up the non-corrupted share, or it may not have, so
588 # the checker could have detected either 4 or 5 good servers.
589 self.failUnless(data['count-good-share-hosts'] in (4, 5), data)
590 self.failUnless(len(data['servers-responding']) in (4, 5), data)
591 # If the server served up the other share, then the checker should consider it good, else it should
593 self.failUnless((data['count-shares-good'] == 9) == (data['count-good-share-hosts'] == 5), data)
594 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
596 d2.addCallback(_after_check)
599 for corruptor_func in (
600 _corrupt_file_version_number,
602 d.addCallback(self._corrupt_a_random_share, corruptor_func)
603 d.addCallback(_check_after_server_visible_corruption, corruptor_func=corruptor_func)
604 d.addCallback(_put_it_all_back)
606 def _check_after_share_incompatibility(ignored, corruptor_func):
607 # Corruption which means the share is indistinguishable from a share of an
608 # incompatible version.
609 before_check_reads = self._count_reads()
610 d2 = self.filenode.check(Monitor(), verify=True)
612 def _after_check(checkresults):
613 after_check_reads = self._count_reads()
614 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
615 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
616 data = checkresults.get_data()
617 self.failUnless(data['count-shares-good'] == 9, data)
618 self.failUnless(len(data['sharemap']) == 9, data)
619 self.failUnless(data['count-shares-needed'] == 3, data)
620 self.failUnless(data['count-shares-expected'] == 10, data)
621 self.failUnless(data['count-good-share-hosts'] == 5, data)
622 self.failUnless(len(data['servers-responding']) == 5, data)
623 self.failUnless(len(data['list-corrupt-shares']) == 0, data)
624 self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
625 self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
626 self.failUnless(len(data['list-incompatible-shares']) == 1, data)
628 d2.addCallback(_after_check)
631 for corruptor_func in (
632 _corrupt_sharedata_version_number,
634 d.addCallback(self._corrupt_a_random_share, corruptor_func)
635 d.addCallback(_check_after_share_incompatibility, corruptor_func=corruptor_func)
636 d.addCallback(_put_it_all_back)
638 def _check_after_server_invisible_corruption(ignored, corruptor_func):
639 # Corruption which is not detected by the server means that the server will send you
640 # back the share data, but you will detect that it is wrong.
641 before_check_reads = self._count_reads()
642 d2 = self.filenode.check(Monitor(), verify=True)
644 def _after_check(checkresults):
645 after_check_reads = self._count_reads()
646 # print "delta was ", after_check_reads - before_check_reads
647 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
648 self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data(), corruptor_func))
649 data = checkresults.get_data()
650 self.failUnless(data['count-shares-good'] == 9, data)
651 self.failUnless(data['count-shares-needed'] == 3, data)
652 self.failUnless(data['count-shares-expected'] == 10, data)
653 self.failUnless(data['count-good-share-hosts'] == 5, data)
654 self.failUnless(data['count-corrupt-shares'] == 1, (data, corruptor_func))
655 self.failUnless(len(data['list-corrupt-shares']) == 1, data)
656 self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
657 self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
658 self.failUnless(len(data['list-incompatible-shares']) == 0, data)
659 self.failUnless(len(data['servers-responding']) == 5, data)
660 self.failUnless(len(data['sharemap']) == 9, data)
662 d2.addCallback(_after_check)
665 for corruptor_func in (
666 _corrupt_sharedata_version_number_to_known_version,
667 _corrupt_offset_of_sharedata,
668 _corrupt_offset_of_ciphertext_hash_tree,
669 _corrupt_offset_of_block_hashes,
670 _corrupt_offset_of_share_hashes,
671 _corrupt_offset_of_uri_extension,
672 _corrupt_offset_of_uri_extension_to_force_short_read,
674 _corrupt_crypttext_hash_tree,
675 _corrupt_block_hashes,
676 _corrupt_share_hashes,
677 _corrupt_length_of_uri_extension,
678 _corrupt_uri_extension,
680 d.addCallback(self._corrupt_a_random_share, corruptor_func)
681 d.addCallback(_check_after_server_invisible_corruption, corruptor_func=corruptor_func)
682 d.addCallback(_put_it_all_back)
684 test_check_with_verify.todo = "We haven't implemented a verifier this thorough yet."
686 def test_repair(self):
687 """ Repair replaces a share that got deleted. """
688 # N == 10. 7 is the "efficiency leeway" -- we'll allow you to pass this test even if
689 # you trigger seven times as many disk reads and blocks sends as would be optimal.
691 # We'll allow you to pass this test only if you repair the missing share using only a
695 d = defer.succeed(self.filenode)
696 d.addCallback(self._delete_a_share, sharenum=2)
698 def _repair_from_deletion_of_1(filenode):
699 before_repair_reads = self._count_reads()
700 before_repair_allocates = self._count_allocates()
702 d2 = filenode.check_and_repair(Monitor(), verify=False)
703 def _after_repair(checkandrepairresults):
704 prerepairres = checkandrepairresults.get_pre_repair_results()
705 postrepairres = checkandrepairresults.get_post_repair_results()
706 after_repair_reads = self._count_reads()
707 after_repair_allocates = self._count_allocates()
709 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
710 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
711 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
712 self.failIf(prerepairres.is_healthy())
713 self.failUnless(postrepairres.is_healthy())
715 # Now we inspect the filesystem to make sure that it has 10 shares.
716 shares = self.find_shares()
717 self.failIf(len(shares) < 10)
719 # Now we delete seven of the other shares, then try to download the file and
720 # assert that it succeeds at downloading and has the right contents. This can't
721 # work unless it has already repaired the previously-deleted share #2.
722 for sharenum in range(3, 10):
723 self._delete_a_share(sharenum=sharenum)
725 return self._download_and_check_plaintext()
727 d2.addCallback(_after_repair)
729 d.addCallback(_repair_from_deletion_of_1)
731 # Now we repair again to get all of those 7 back...
732 def _repair_from_deletion_of_7(filenode):
733 before_repair_reads = self._count_reads()
734 before_repair_allocates = self._count_allocates()
736 d2 = filenode.check_and_repair(Monitor(), verify=False)
737 def _after_repair(checkandrepairresults):
738 prerepairres = checkandrepairresults.get_pre_repair_results()
739 postrepairres = checkandrepairresults.get_post_repair_results()
740 after_repair_reads = self._count_reads()
741 after_repair_allocates = self._count_allocates()
743 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
744 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
745 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_ALLOCATES*7))
746 self.failIf(prerepairres.is_healthy())
747 self.failUnless(postrepairres.is_healthy())
749 # Now we inspect the filesystem to make sure that it has 10 shares.
750 shares = self.find_shares()
751 self.failIf(len(shares) < 10)
753 return self._download_and_check_plaintext()
755 d2.addCallback(_after_repair)
757 d.addCallback(_repair_from_deletion_of_7)
759 def _repair_from_corruption(filenode):
760 before_repair_reads = self._count_reads()
761 before_repair_allocates = self._count_allocates()
763 d2 = filenode.check_and_repair(Monitor(), verify=False)
764 def _after_repair(checkandrepairresults):
765 prerepairres = checkandrepairresults.get_pre_repair_results()
766 postrepairres = checkandrepairresults.get_post_repair_results()
767 after_repair_reads = self._count_reads()
768 after_repair_allocates = self._count_allocates()
770 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
771 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
772 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
773 self.failIf(prerepairres.is_healthy())
774 self.failUnless(postrepairres.is_healthy())
776 return self._download_and_check_plaintext()
778 d2.addCallback(_after_repair)
781 for corruptor_func in (
782 _corrupt_file_version_number,
783 _corrupt_sharedata_version_number,
784 _corrupt_sharedata_version_number_to_known_version,
785 _corrupt_offset_of_sharedata,
786 _corrupt_offset_of_ciphertext_hash_tree,
787 _corrupt_offset_of_block_hashes,
788 _corrupt_offset_of_share_hashes,
789 _corrupt_offset_of_uri_extension,
791 _corrupt_crypttext_hash_tree,
792 _corrupt_block_hashes,
793 _corrupt_share_hashes,
794 _corrupt_length_of_uri_extension,
795 _corrupt_uri_extension,
797 # Now we corrupt a share...
798 d.addCallback(self._corrupt_a_random_share, corruptor_func)
800 d.addCallback(_repair_from_corruption)
803 test_repair.todo = "We haven't implemented a repairer yet."
806 # XXX extend these tests to show that the checker detects which specific share on which specific server is broken -- this is necessary so that the checker results can be passed to the repairer and the repairer can go ahead and upload fixes without first doing what is effectively a check (/verify) run
808 # XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example
810 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit