1 from allmydata.immutable import upload
2 from allmydata.test.common import SystemTestMixin, ShareManglingMixin
3 from allmydata.monitor import Monitor
4 from allmydata.interfaces import IURI, NotEnoughSharesError
5 from twisted.internet import defer
6 from twisted.trial import unittest
8 import common_util as testutil
10 TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
12 def corrupt_field(data, offset, size):
13 if random.random() < 0.5:
14 return testutil.flip_one_bit(data, offset, size)
16 return data[:offset]+testutil.insecurerandstr(size)+data[offset+size:]
18 def _corrupt_file_version_number(data):
19 """ Scramble the file data -- the share file version number have one bit flipped or else
20 will be changed to a random value."""
21 return corrupt_field(data, 0x00, 4)
23 def _corrupt_size_of_file_data(data):
24 """ Scramble the file data -- the field showing the size of the share data within the
25 file will have one bit flipped or else will be changed to a random value. """
26 return corrupt_field(data, 0x04, 4)
28 def _corrupt_sharedata_version_number(data):
29 """ Scramble the file data -- the share data version number will have one bit flipped or
30 else will be changed to a random value."""
31 return corrupt_field(data, 0x0c, 4)
33 def _corrupt_segment_size(data):
34 """ Scramble the file data -- the field showing the size of the segment will have one
35 bit flipped or else be changed to a random value. """
36 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
37 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
39 return corrupt_field(data, 0x0c+0x04, 4)
41 return corrupt_field(data, 0x0c+0x04, 8)
43 def _corrupt_size_of_sharedata(data):
44 """ Scramble the file data -- the field showing the size of the data within the share
45 data will have one bit flipped or else will be changed to a random value. """
46 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
47 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
49 return corrupt_field(data, 0x0c+0x08, 4)
51 return corrupt_field(data, 0x0c+0x0c, 8)
53 def _corrupt_offset_of_sharedata(data):
54 """ Scramble the file data -- the field showing the offset of the data within the share
55 data will have one bit flipped or else be changed to a random value. """
56 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
57 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
59 return corrupt_field(data, 0x0c+0x0c, 4)
61 return corrupt_field(data, 0x0c+0x14, 8)
63 def _corrupt_offset_of_ciphertext_hash_tree(data):
64 """ Scramble the file data -- the field showing the offset of the ciphertext hash tree
65 within the share data will have one bit flipped or else be changed to a random value.
67 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
68 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
70 return corrupt_field(data, 0x0c+0x14, 4)
72 return corrupt_field(data, 0x0c+0x24, 8)
74 def _corrupt_offset_of_block_hashes(data):
75 """ Scramble the file data -- the field showing the offset of the block hash tree within
76 the share data will have one bit flipped or else will be changed to a random value. """
77 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
78 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
80 return corrupt_field(data, 0x0c+0x18, 4)
82 return corrupt_field(data, 0x0c+0x2c, 8)
84 def _corrupt_offset_of_share_hashes(data):
85 """ Scramble the file data -- the field showing the offset of the share hash tree within
86 the share data will have one bit flipped or else will be changed to a random value. """
87 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
88 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
90 return corrupt_field(data, 0x0c+0x1c, 4)
92 return corrupt_field(data, 0x0c+0x34, 8)
94 def _corrupt_offset_of_uri_extension(data):
95 """ Scramble the file data -- the field showing the offset of the uri extension will
96 have one bit flipped or else will be changed to a random value. """
97 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
98 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
100 return corrupt_field(data, 0x0c+0x20, 4)
102 return corrupt_field(data, 0x0c+0x3c, 8)
104 def _corrupt_share_data(data):
105 """ Scramble the file data -- the field containing the share data itself will have one
106 bit flipped or else will be changed to a random value. """
107 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
108 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
110 sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
112 return corrupt_field(data, 0x0c+0x24, sharedatasize)
114 sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
116 return corrupt_field(data, 0x0c+0x44, sharedatasize)
118 def _corrupt_crypttext_hash_tree(data):
119 """ Scramble the file data -- the field containing the crypttext hash tree will have one
120 bit flipped or else will be changed to a random value.
122 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
123 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
125 crypttexthashtreeoffset = struct.unpack(">L", data[0x0c+0x14:0x0c+0x14+4])[0]
126 blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
128 crypttexthashtreeoffset = struct.unpack(">Q", data[0x0c+0x24:0x0c+0x24+8])[0]
129 blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
131 return corrupt_field(data, crypttexthashtreeoffset, blockhashesoffset-crypttexthashtreeoffset)
133 def _corrupt_block_hashes(data):
134 """ Scramble the file data -- the field containing the block hash tree will have one bit
135 flipped or else will be changed to a random value.
137 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
138 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
140 blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
141 sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
143 blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
144 sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
146 return corrupt_field(data, blockhashesoffset, sharehashesoffset-blockhashesoffset)
148 def _corrupt_share_hashes(data):
149 """ Scramble the file data -- the field containing the share hash chain will have one
150 bit flipped or else will be changed to a random value.
152 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
153 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
155 sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
156 uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
158 sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
159 uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
161 return corrupt_field(data, sharehashesoffset, uriextoffset-sharehashesoffset)
163 def _corrupt_length_of_uri_extension(data):
164 """ Scramble the file data -- the field showing the length of the uri extension will
165 have one bit flipped or else will be changed to a random value. """
166 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
167 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
169 uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
170 return corrupt_field(data, uriextoffset, 4)
172 uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
173 return corrupt_field(data, uriextoffset, 8)
175 def _corrupt_uri_extension(data):
176 """ Scramble the file data -- the field containing the uri extension will have one bit
177 flipped or else will be changed to a random value. """
178 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
179 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
181 uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
182 uriextlen = struct.unpack(">L", data[0x0c+uriextoffset:0x0c+uriextoffset+4])[0]
184 uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
185 uriextlen = struct.unpack(">Q", data[0x0c+uriextoffset:0x0c+uriextoffset+8])[0]
187 return corrupt_field(data, uriextoffset, uriextlen)
189 class Test(ShareManglingMixin, unittest.TestCase):
191 # Set self.basedir to a temp dir which has the name of the current test method in its
193 self.basedir = self.mktemp()
195 d = defer.maybeDeferred(SystemTestMixin.setUp, self)
196 d.addCallback(lambda x: self.set_up_nodes())
198 def _upload_a_file(ignored):
199 d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
200 def _after_upload(u):
201 self.uri = IURI(u.uri)
202 return self.clients[0].create_node_from_uri(self.uri)
203 d2.addCallback(_after_upload)
205 d.addCallback(_upload_a_file)
207 def _stash_it(filenode):
208 self.filenode = filenode
209 d.addCallback(_stash_it)
212 def _download_and_check_plaintext(self, unused=None):
213 self.downloader = self.clients[1].getServiceNamed("downloader")
214 d = self.downloader.download_to_data(self.uri)
216 def _after_download(result):
217 self.failUnlessEqual(result, TEST_DATA)
218 d.addCallback(_after_download)
221 def _delete_a_share(self, unused=None, sharenum=None):
222 """ Delete one share. """
224 shares = self.find_shares()
226 if sharenum is not None:
227 k = [ key for key in shares.keys() if key[1] == sharenum ][0]
229 k = random.choice(ks)
231 self.replace_shares(shares, storage_index=self.uri.storage_index)
235 def test_test_code(self):
236 # The following process of stashing the shares, running
237 # replace_shares, and asserting that the new set of shares equals the
238 # old is more to test this test code than to test the Tahoe code...
239 d = defer.succeed(None)
240 d.addCallback(self.find_shares)
245 d.addCallback(_stash_it)
246 d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
250 self.failUnless(isinstance(oldshares, dict), oldshares)
251 self.failUnlessEqual(oldshares, res)
253 d.addCallback(self.find_shares)
254 d.addCallback(_compare)
256 d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
257 d.addCallback(self.find_shares)
258 d.addCallback(lambda x: self.failUnlessEqual(x, {}))
260 # The following process of deleting 8 of the shares and asserting that you can't
261 # download it is more to test this test code than to test the Tahoe code...
262 def _then_delete_8(unused=None):
263 self.replace_shares(stash[0], storage_index=self.uri.storage_index)
264 for sharenum in range(2, 10):
265 self._delete_a_share()
266 d.addCallback(_then_delete_8)
268 def _then_download(unused=None):
269 self.downloader = self.clients[1].getServiceNamed("downloader")
270 d = self.downloader.download_to_data(self.uri)
272 def _after_download_callb(result):
273 self.fail() # should have gotten an errback instead
275 def _after_download_errb(failure):
276 failure.trap(NotEnoughSharesError)
277 return None # success!
278 d.addCallbacks(_after_download_callb, _after_download_errb)
279 d.addCallback(_then_download)
281 # The following process of leaving 8 of the shares deleted and asserting that you can't
282 # repair it is more to test this test code than to test the Tahoe code...
283 def _then_repair(unused=None):
284 d2 = self.filenode.check_and_repair(Monitor(), verify=False)
285 def _after_repair(checkandrepairresults):
286 prerepairres = checkandrepairresults.get_pre_repair_results()
287 postrepairres = checkandrepairresults.get_post_repair_results()
288 self.failIf(prerepairres.is_healthy())
289 self.failIf(postrepairres.is_healthy())
290 d2.addCallback(_after_repair)
292 d.addCallback(_then_repair)
295 def _count_reads(self):
296 sum_of_read_counts = 0
297 for client in self.clients:
298 counters = client.stats_provider.get_stats()['counters']
299 sum_of_read_counts += counters.get('storage_server.read', 0)
300 return sum_of_read_counts
302 def _count_allocates(self):
303 sum_of_allocate_counts = 0
304 for client in self.clients:
305 counters = client.stats_provider.get_stats()['counters']
306 sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
307 return sum_of_allocate_counts
309 def _corrupt_a_random_share(self, unused, corruptor_func):
310 """ Exactly one share on disk will be corrupted by corruptor_func. """
311 shares = self.find_shares()
313 k = random.choice(ks)
315 shares[k] = corruptor_func(shares[k])
317 self.replace_shares(shares, storage_index=self.uri.storage_index)
319 def test_check_without_verify(self):
320 """ Check says the file is healthy when none of the shares have been touched. It says
321 that the file is unhealthy when all of them have been removed. It doesn't use any reads.
323 d = defer.succeed(self.filenode)
324 def _check1(filenode):
325 before_check_reads = self._count_reads()
327 d2 = filenode.check(Monitor(), verify=False)
328 def _after_check(checkresults):
329 after_check_reads = self._count_reads()
330 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
331 self.failUnless(checkresults.is_healthy())
333 d2.addCallback(_after_check)
335 d.addCallback(_check1)
337 d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
338 def _check2(ignored):
339 before_check_reads = self._count_reads()
340 d2 = self.filenode.check(Monitor(), verify=False)
342 def _after_check(checkresults):
343 after_check_reads = self._count_reads()
344 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
345 self.failIf(checkresults.is_healthy())
347 d2.addCallback(_after_check)
349 d.addCallback(_check2)
353 def test_check_with_verify(self):
354 """ Check says the file is healthy when none of the shares have been touched. It says
355 that the file is unhealthy if any field of any share has been corrupted. It doesn't use
356 more than twice as many reads as it needs. """
357 # N == 10. 2 is the "efficiency leeway" -- we'll allow you to pass this test even if
358 # you trigger twice as many disk reads and blocks sends as would be optimal.
360 d = defer.succeed(self.filenode)
361 def _check1(filenode):
362 before_check_reads = self._count_reads()
364 d2 = filenode.check(Monitor(), verify=True)
365 def _after_check(checkresults):
366 after_check_reads = self._count_reads()
367 # print "delta was ", after_check_reads - before_check_reads
368 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
369 self.failUnless(checkresults.is_healthy())
371 d2.addCallback(_after_check)
373 d.addCallback(_check1)
375 d.addCallback(self.find_shares)
380 d.addCallback(_stash_it)
382 def _check2(ignored):
383 before_check_reads = self._count_reads()
384 d2 = self.filenode.check(Monitor(), verify=True)
386 def _after_check(checkresults):
387 after_check_reads = self._count_reads()
388 # print "delta was ", after_check_reads - before_check_reads
389 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
390 self.failIf(checkresults.is_healthy())
392 d2.addCallback(_after_check)
395 def _put_it_all_back(ignored):
396 self.replace_shares(stash[0], storage_index=self.uri.storage_index)
399 for corruptor_func in (
400 _corrupt_file_version_number,
401 _corrupt_size_of_file_data,
402 _corrupt_sharedata_version_number,
403 _corrupt_segment_size,
404 _corrupt_size_of_sharedata,
405 _corrupt_offset_of_sharedata,
406 _corrupt_offset_of_ciphertext_hash_tree,
407 _corrupt_offset_of_block_hashes,
408 _corrupt_offset_of_share_hashes,
409 _corrupt_offset_of_uri_extension,
411 _corrupt_crypttext_hash_tree,
412 _corrupt_block_hashes,
413 _corrupt_share_hashes,
414 _corrupt_length_of_uri_extension,
415 _corrupt_uri_extension,
417 d.addCallback(self._corrupt_a_random_share, corruptor_func)
418 d.addCallback(_check2)
419 d.addCallback(_put_it_all_back)
421 test_check_with_verify.todo = "We haven't implemented a verifier this thorough yet."
423 def test_repair(self):
424 """ Repair replaces a share that got deleted. """
425 # N == 10. 2 is the "efficiency leeway" -- we'll allow you to pass this test even if
426 # you trigger twice as many disk reads and blocks sends as would be optimal.
428 # We'll allow you to pass this test only if you repair the missing share using only a
432 d = defer.succeed(self.filenode)
433 d.addCallback(self._delete_a_share, sharenum=2)
435 def _repair_from_deletion_of_1(filenode):
436 before_repair_reads = self._count_reads()
437 before_repair_allocates = self._count_allocates()
439 d2 = filenode.check_and_repair(Monitor(), verify=False)
440 def _after_repair(checkandrepairresults):
441 prerepairres = checkandrepairresults.get_pre_repair_results()
442 postrepairres = checkandrepairresults.get_post_repair_results()
443 after_repair_reads = self._count_reads()
444 after_repair_allocates = self._count_allocates()
446 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
447 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
448 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
449 self.failIf(prerepairres.is_healthy())
450 self.failUnless(postrepairres.is_healthy())
452 # Now we inspect the filesystem to make sure that it has 10 shares.
453 shares = self.find_shares()
454 self.failIf(len(shares) < 10)
456 # Now we delete seven of the other shares, then try to download the file and
457 # assert that it succeeds at downloading and has the right contents. This can't
458 # work unless it has already repaired the previously-deleted share #2.
459 for sharenum in range(3, 10):
460 self._delete_a_share(sharenum=sharenum)
462 return self._download_and_check_plaintext()
464 d2.addCallback(_after_repair)
466 d.addCallback(_repair_from_deletion_of_1)
468 # Now we repair again to get all of those 7 back...
469 def _repair_from_deletion_of_7(filenode):
470 before_repair_reads = self._count_reads()
471 before_repair_allocates = self._count_allocates()
473 d2 = filenode.check_and_repair(Monitor(), verify=False)
474 def _after_repair(checkandrepairresults):
475 prerepairres = checkandrepairresults.get_pre_repair_results()
476 postrepairres = checkandrepairresults.get_post_repair_results()
477 after_repair_reads = self._count_reads()
478 after_repair_allocates = self._count_allocates()
480 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
481 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
482 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_ALLOCATES*7))
483 self.failIf(prerepairres.is_healthy())
484 self.failUnless(postrepairres.is_healthy())
486 # Now we inspect the filesystem to make sure that it has 10 shares.
487 shares = self.find_shares()
488 self.failIf(len(shares) < 10)
490 return self._download_and_check_plaintext()
492 d2.addCallback(_after_repair)
494 d.addCallback(_repair_from_deletion_of_7)
496 def _repair_from_corruption(filenode):
497 before_repair_reads = self._count_reads()
498 before_repair_allocates = self._count_allocates()
500 d2 = filenode.check_and_repair(Monitor(), verify=False)
501 def _after_repair(checkandrepairresults):
502 prerepairres = checkandrepairresults.get_pre_repair_results()
503 postrepairres = checkandrepairresults.get_post_repair_results()
504 after_repair_reads = self._count_reads()
505 after_repair_allocates = self._count_allocates()
507 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
508 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
509 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
510 self.failIf(prerepairres.is_healthy())
511 self.failUnless(postrepairres.is_healthy())
513 return self._download_and_check_plaintext()
515 d2.addCallback(_after_repair)
518 for corruptor_func in (
519 _corrupt_file_version_number,
520 _corrupt_size_of_file_data,
521 _corrupt_sharedata_version_number,
522 _corrupt_segment_size,
523 _corrupt_size_of_sharedata,
524 _corrupt_offset_of_sharedata,
525 _corrupt_offset_of_ciphertext_hash_tree,
526 _corrupt_offset_of_block_hashes,
527 _corrupt_offset_of_share_hashes,
528 _corrupt_offset_of_uri_extension,
530 _corrupt_crypttext_hash_tree,
531 _corrupt_block_hashes,
532 _corrupt_share_hashes,
533 _corrupt_length_of_uri_extension,
534 _corrupt_uri_extension,
536 # Now we corrupt a share...
537 d.addCallback(self._corrupt_a_random_share, corruptor_func)
539 d.addCallback(_repair_from_corruption)
542 test_repair.todo = "We haven't implemented a repairer yet."