1 from allmydata.immutable import upload
2 from allmydata.test.common import SystemTestMixin, ShareManglingMixin
3 from allmydata.monitor import Monitor
4 from allmydata.interfaces import IURI, NotEnoughSharesError
5 from twisted.internet import defer
6 from twisted.trial import unittest
8 import common_util as testutil
10 TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
12 def corrupt_field(data, offset, size):
13 if random.random() < 0.5:
14 return testutil.flip_one_bit(data, offset, size)
16 return data[:offset]+testutil.insecurerandstr(size)+data[offset+size:]
18 def _corrupt_file_version_number(data):
19 """ Scramble the file data -- the share file version number have one bit flipped or else
20 will be changed to a random value."""
21 return corrupt_field(data, 0x00, 4)
23 def _corrupt_size_of_file_data(data):
24 """ Scramble the file data -- the field showing the size of the share data within the
25 file will have one bit flipped or else will be changed to a random value. """
26 return corrupt_field(data, 0x04, 4)
28 def _corrupt_sharedata_version_number(data):
29 """ Scramble the file data -- the share data version number will have one bit flipped or
30 else will be changed to a random value."""
31 return corrupt_field(data, 0x0c, 4)
33 def _corrupt_segment_size(data):
34 """ Scramble the file data -- the field showing the size of the segment will have one
35 bit flipped or else be changed to a random value. """
36 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
37 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
39 return corrupt_field(data, 0x0c+0x04, 4)
41 return corrupt_field(data, 0x0c+0x04, 8)
43 def _corrupt_size_of_sharedata(data):
44 """ Scramble the file data -- the field showing the size of the data within the share
45 data will have one bit flipped or else will be changed to a random value. """
46 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
47 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
49 return corrupt_field(data, 0x0c+0x08, 4)
51 return corrupt_field(data, 0x0c+0x0c, 8)
53 def _corrupt_offset_of_sharedata(data):
54 """ Scramble the file data -- the field showing the offset of the data within the share
55 data will have one bit flipped or else be changed to a random value. """
56 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
57 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
59 return corrupt_field(data, 0x0c+0x0c, 4)
61 return corrupt_field(data, 0x0c+0x14, 8)
63 def _corrupt_offset_of_ciphertext_hash_tree(data):
64 """ Scramble the file data -- the field showing the offset of the ciphertext hash tree
65 within the share data will have one bit flipped or else be changed to a random value.
67 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
68 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
70 return corrupt_field(data, 0x0c+0x14, 4)
72 return corrupt_field(data, 0x0c+0x24, 8)
74 def _corrupt_offset_of_block_hashes(data):
75 """ Scramble the file data -- the field showing the offset of the block hash tree within
76 the share data will have one bit flipped or else will be changed to a random value. """
77 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
78 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
80 return corrupt_field(data, 0x0c+0x18, 4)
82 return corrupt_field(data, 0x0c+0x2c, 8)
84 def _corrupt_offset_of_share_hashes(data):
85 """ Scramble the file data -- the field showing the offset of the share hash tree within
86 the share data will have one bit flipped or else will be changed to a random value. """
87 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
88 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
90 return corrupt_field(data, 0x0c+0x1c, 4)
92 return corrupt_field(data, 0x0c+0x34, 8)
94 def _corrupt_offset_of_uri_extension(data):
95 """ Scramble the file data -- the field showing the offset of the uri extension will
96 have one bit flipped or else will be changed to a random value. """
97 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
98 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
100 return corrupt_field(data, 0x0c+0x20, 4)
102 return corrupt_field(data, 0x0c+0x3c, 8)
104 def _corrupt_share_data(data):
105 """ Scramble the file data -- the field containing the share data itself will have one
106 bit flipped or else will be changed to a random value. """
107 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
108 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
110 sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
112 return corrupt_field(data, 0x0c+0x24, sharedatasize)
114 sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
116 return corrupt_field(data, 0x0c+0x44, sharedatasize)
118 def _corrupt_crypttext_hash_tree(data):
119 """ Scramble the file data -- the field containing the crypttext hash tree will have one
120 bit flipped or else will be changed to a random value.
122 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
123 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
125 crypttexthashtreeoffset = struct.unpack(">L", data[0x0c+0x14:0x0c+0x14+4])[0]
126 blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
128 crypttexthashtreeoffset = struct.unpack(">Q", data[0x0c+0x24:0x0c+0x24+8])[0]
129 blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
131 return corrupt_field(data, crypttexthashtreeoffset, blockhashesoffset-crypttexthashtreeoffset)
133 def _corrupt_block_hashes(data):
134 """ Scramble the file data -- the field containing the block hash tree will have one bit
135 flipped or else will be changed to a random value.
137 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
138 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
140 blockhashesoffset = struct.unpack(">L", data[0x0c+0x18:0x0c+0x18+4])[0]
141 sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
143 blockhashesoffset = struct.unpack(">Q", data[0x0c+0x2c:0x0c+0x2c+8])[0]
144 sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
146 return corrupt_field(data, blockhashesoffset, sharehashesoffset-blockhashesoffset)
148 def _corrupt_share_hashes(data):
149 """ Scramble the file data -- the field containing the share hash chain will have one
150 bit flipped or else will be changed to a random value.
152 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
153 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
155 sharehashesoffset = struct.unpack(">L", data[0x0c+0x1c:0x0c+0x1c+4])[0]
156 uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
158 sharehashesoffset = struct.unpack(">Q", data[0x0c+0x34:0x0c+0x34+8])[0]
159 uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
161 return corrupt_field(data, sharehashesoffset, uriextoffset-sharehashesoffset)
163 def _corrupt_length_of_uri_extension(data):
164 """ Scramble the file data -- the field showing the length of the uri extension will
165 have one bit flipped or else will be changed to a random value. """
166 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
167 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
169 uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
170 return corrupt_field(data, uriextoffset, 4)
172 uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
173 return corrupt_field(data, uriextoffset, 8)
175 def _corrupt_uri_extension(data):
176 """ Scramble the file data -- the field containing the uri extension will have one bit
177 flipped or else will be changed to a random value. """
178 sharevernum = struct.unpack(">l", data[0x0c:0x0c+4])[0]
179 assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways."
181 uriextoffset = struct.unpack(">L", data[0x0c+0x20:0x0c+0x20+4])[0]
182 uriextlen = struct.unpack(">L", data[0x0c+uriextoffset:0x0c+uriextoffset+4])[0]
184 uriextoffset = struct.unpack(">Q", data[0x0c+0x3c:0x0c+0x3c+8])[0]
185 uriextlen = struct.unpack(">Q", data[0x0c+uriextoffset:0x0c+uriextoffset+8])[0]
187 return corrupt_field(data, uriextoffset, uriextlen)
189 class Test(ShareManglingMixin, unittest.TestCase):
191 # Set self.basedir to a temp dir which has the name of the current test method in its
193 self.basedir = self.mktemp()
195 d = defer.maybeDeferred(SystemTestMixin.setUp, self)
196 d.addCallback(lambda x: self.set_up_nodes())
198 def _upload_a_file(ignored):
199 d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
200 def _after_upload(u):
201 self.uri = IURI(u.uri)
202 return self.clients[0].create_node_from_uri(self.uri)
203 d2.addCallback(_after_upload)
205 d.addCallback(_upload_a_file)
207 def _stash_it(filenode):
208 self.filenode = filenode
209 d.addCallback(_stash_it)
212 def _download_and_check_plaintext(self, unused=None):
213 self.downloader = self.clients[1].getServiceNamed("downloader")
214 d = self.downloader.download_to_data(self.uri)
216 def _after_download(result):
217 self.failUnlessEqual(result, TEST_DATA)
218 d.addCallback(_after_download)
221 def _delete_a_share(self, unused=None, sharenum=None):
222 """ Delete one share. """
224 shares = self.find_shares()
226 if sharenum is not None:
227 k = [ key for key in shares.keys() if key[1] == sharenum ][0]
229 k = random.choice(ks)
231 self.replace_shares(shares, storage_index=self.uri.storage_index)
235 def test_test_code(self):
236 # The following process of stashing the shares, running
237 # replace_shares, and asserting that the new set of shares equals the
238 # old is more to test this test code than to test the Tahoe code...
239 d = defer.succeed(None)
240 d.addCallback(self.find_shares)
245 d.addCallback(_stash_it)
246 d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
250 self.failUnless(isinstance(oldshares, dict), oldshares)
251 self.failUnlessEqual(oldshares, res)
253 d.addCallback(self.find_shares)
254 d.addCallback(_compare)
256 d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
257 d.addCallback(self.find_shares)
258 d.addCallback(lambda x: self.failUnlessEqual(x, {}))
260 # The following process of deleting 8 of the shares and asserting that you can't
261 # download it is more to test this test code than to test the Tahoe code...
262 def _then_delete_8(unused=None):
263 self.replace_shares(stash[0], storage_index=self.uri.storage_index)
264 for sharenum in range(2, 10):
265 self._delete_a_share()
266 d.addCallback(_then_delete_8)
268 def _then_download(unused=None):
269 self.downloader = self.clients[1].getServiceNamed("downloader")
270 d = self.downloader.download_to_data(self.uri)
272 def _after_download_callb(result):
273 self.fail() # should have gotten an errback instead
275 def _after_download_errb(failure):
276 failure.trap(NotEnoughSharesError)
277 return None # success!
278 d.addCallbacks(_after_download_callb, _after_download_errb)
279 d.addCallback(_then_download)
281 # The following process of leaving 8 of the shares deleted and asserting that you can't
282 # repair it is more to test this test code than to test the Tahoe code...
283 def _then_repair(unused=None):
284 d2 = self.filenode.check_and_repair(Monitor(), verify=False)
285 def _after_repair(checkandrepairresults):
286 prerepairres = checkandrepairresults.get_pre_repair_results()
287 postrepairres = checkandrepairresults.get_post_repair_results()
288 self.failIf(prerepairres.is_healthy())
289 self.failIf(postrepairres.is_healthy())
290 d2.addCallback(_after_repair)
292 d.addCallback(_then_repair)
295 def _count_reads(self):
296 sum_of_read_counts = 0
297 for client in self.clients:
298 counters = client.stats_provider.get_stats()['counters']
299 sum_of_read_counts += counters.get('storage_server.read', 0)
300 return sum_of_read_counts
302 def _count_allocates(self):
303 sum_of_allocate_counts = 0
304 for client in self.clients:
305 counters = client.stats_provider.get_stats()['counters']
306 sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
307 return sum_of_allocate_counts
309 def _corrupt_a_random_share(self, unused, corruptor_func):
310 """ Exactly one share on disk will be corrupted by corruptor_func. """
311 shares = self.find_shares()
313 k = random.choice(ks)
315 shares[k] = corruptor_func(shares[k])
317 self.replace_shares(shares, storage_index=self.uri.storage_index)
319 def test_check_without_verify(self):
320 """ Check says the file is healthy when none of the shares have been touched. It says
321 that the file is unhealthy when all of them have been removed. It doesn't use any reads.
323 d = defer.succeed(self.filenode)
324 def _check1(filenode):
325 before_check_reads = self._count_reads()
327 d2 = filenode.check(Monitor(), verify=False)
328 def _after_check(checkresults):
329 after_check_reads = self._count_reads()
330 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
331 self.failUnless(checkresults.is_healthy())
333 d2.addCallback(_after_check)
335 d.addCallback(_check1)
337 d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
338 def _check2(ignored):
339 before_check_reads = self._count_reads()
340 d2 = self.filenode.check(Monitor(), verify=False)
342 def _after_check(checkresults):
343 after_check_reads = self._count_reads()
344 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
345 self.failIf(checkresults.is_healthy())
347 d2.addCallback(_after_check)
349 d.addCallback(_check2)
353 def test_check_with_verify(self):
354 """ Check says the file is healthy when none of the shares have been touched. It says
355 that the file is unhealthy if any field of any share has been corrupted. It doesn't use
356 more than twice as many reads as it needs. """
357 # N == 10. 2 is the "efficiency leeway" -- we'll allow you to pass this test even if
358 # you trigger twice as many disk reads and blocks sends as would be optimal.
360 d = defer.succeed(self.filenode)
361 def _check1(filenode):
362 before_check_reads = self._count_reads()
364 d2 = filenode.check(Monitor(), verify=True)
365 def _after_check(checkresults):
366 after_check_reads = self._count_reads()
367 self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads, DELTA_READS))
368 self.failUnless(checkresults.is_healthy())
370 d2.addCallback(_after_check)
372 d.addCallback(_check1)
374 d.addCallback(self.find_shares)
379 d.addCallback(_stash_it)
381 def _check2(ignored):
382 before_check_reads = self._count_reads()
383 d2 = self.filenode.check(Monitor(), verify=True)
385 def _after_check(checkresults):
386 after_check_reads = self._count_reads()
387 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
388 self.failIf(checkresults.is_healthy())
390 d2.addCallback(_after_check)
393 def _put_it_all_back(ignored):
394 self.replace_shares(stash[0], storage_index=self.uri.storage_index)
397 for corruptor_func in (
398 _corrupt_file_version_number,
399 _corrupt_size_of_file_data,
400 _corrupt_sharedata_version_number,
401 _corrupt_segment_size,
402 _corrupt_size_of_sharedata,
403 _corrupt_offset_of_sharedata,
404 _corrupt_offset_of_ciphertext_hash_tree,
405 _corrupt_offset_of_block_hashes,
406 _corrupt_offset_of_share_hashes,
407 _corrupt_offset_of_uri_extension,
409 _corrupt_crypttext_hash_tree,
410 _corrupt_block_hashes,
411 _corrupt_share_hashes,
412 _corrupt_length_of_uri_extension,
413 _corrupt_uri_extension,
415 d.addCallback(self._corrupt_a_random_share, corruptor_func)
416 d.addCallback(_check2)
417 d.addCallback(_put_it_all_back)
419 test_check_with_verify.todo = "We haven't implemented a verifier this thorough yet."
421 def test_repair(self):
422 """ Repair replaces a share that got deleted. """
423 # N == 10. 2 is the "efficiency leeway" -- we'll allow you to pass this test even if
424 # you trigger twice as many disk reads and blocks sends as would be optimal.
426 # We'll allow you to pass this test only if you repair the missing share using only a
430 d = defer.succeed(self.filenode)
431 d.addCallback(self._delete_a_share, sharenum=2)
433 def _repair_from_deletion_of_1(filenode):
434 before_repair_reads = self._count_reads()
435 before_repair_allocates = self._count_allocates()
437 d2 = filenode.check_and_repair(Monitor(), verify=False)
438 def _after_repair(checkandrepairresults):
439 prerepairres = checkandrepairresults.get_pre_repair_results()
440 postrepairres = checkandrepairresults.get_post_repair_results()
441 after_repair_reads = self._count_reads()
442 after_repair_allocates = self._count_allocates()
444 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
445 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
446 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
447 self.failIf(prerepairres.is_healthy())
448 self.failUnless(postrepairres.is_healthy())
450 # Now we inspect the filesystem to make sure that it has 10 shares.
451 shares = self.find_shares()
452 self.failIf(len(shares) < 10)
454 # Now we delete seven of the other shares, then try to download the file and
455 # assert that it succeeds at downloading and has the right contents. This can't
456 # work unless it has already repaired the previously-deleted share #2.
457 for sharenum in range(3, 10):
458 self._delete_a_share(sharenum=sharenum)
460 return self._download_and_check_plaintext()
462 d2.addCallback(_after_repair)
464 d.addCallback(_repair_from_deletion_of_1)
466 # Now we repair again to get all of those 7 back...
467 def _repair_from_deletion_of_7(filenode):
468 before_repair_reads = self._count_reads()
469 before_repair_allocates = self._count_allocates()
471 d2 = filenode.check_and_repair(Monitor(), verify=False)
472 def _after_repair(checkandrepairresults):
473 prerepairres = checkandrepairresults.get_pre_repair_results()
474 postrepairres = checkandrepairresults.get_post_repair_results()
475 after_repair_reads = self._count_reads()
476 after_repair_allocates = self._count_allocates()
478 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
479 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
480 self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_ALLOCATES*7))
481 self.failIf(prerepairres.is_healthy())
482 self.failUnless(postrepairres.is_healthy())
484 # Now we inspect the filesystem to make sure that it has 10 shares.
485 shares = self.find_shares()
486 self.failIf(len(shares) < 10)
488 return self._download_and_check_plaintext()
490 d2.addCallback(_after_repair)
492 d.addCallback(_repair_from_deletion_of_7)
494 def _repair_from_corruption(filenode):
495 before_repair_reads = self._count_reads()
496 before_repair_allocates = self._count_allocates()
498 d2 = filenode.check_and_repair(Monitor(), verify=False)
499 def _after_repair(checkandrepairresults):
500 prerepairres = checkandrepairresults.get_pre_repair_results()
501 postrepairres = checkandrepairresults.get_post_repair_results()
502 after_repair_reads = self._count_reads()
503 after_repair_allocates = self._count_allocates()
505 # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
506 self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
507 self.failIf(after_repair_allocates - before_repair_allocates > DELTA_ALLOCATES)
508 self.failIf(prerepairres.is_healthy())
509 self.failUnless(postrepairres.is_healthy())
511 return self._download_and_check_plaintext()
513 d2.addCallback(_after_repair)
516 for corruptor_func in (
517 _corrupt_file_version_number,
518 _corrupt_size_of_file_data,
519 _corrupt_sharedata_version_number,
520 _corrupt_segment_size,
521 _corrupt_size_of_sharedata,
522 _corrupt_offset_of_sharedata,
523 _corrupt_offset_of_ciphertext_hash_tree,
524 _corrupt_offset_of_block_hashes,
525 _corrupt_offset_of_share_hashes,
526 _corrupt_offset_of_uri_extension,
528 _corrupt_crypttext_hash_tree,
529 _corrupt_block_hashes,
530 _corrupt_share_hashes,
531 _corrupt_length_of_uri_extension,
532 _corrupt_uri_extension,
534 # Now we corrupt a share...
535 d.addCallback(self._corrupt_a_random_share, corruptor_func)
537 d.addCallback(_repair_from_corruption)
540 test_repair.todo = "We haven't implemented a repairer yet."