1 from base64 import b32encode
2 import os, random, struct, sys, time, re, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from allmydata import uri, storage, offloaded
10 from allmydata.immutable import download, upload, filenode
11 from allmydata.util import idlib, mathutil, testutil
12 from allmydata.util import log, base32
13 from allmydata.scripts import runner
14 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
15 from allmydata.mutable.common import NotMutableError
16 from allmydata.mutable import layout as mutable_layout
17 from foolscap import DeadReferenceError
18 from twisted.python.failure import Failure
19 from twisted.web.client import getPage
20 from twisted.web.error import Error
22 from allmydata.test.common import SystemTestMixin
25 This is some data to publish to the virtual drive, which needs to be large
26 enough to not fit inside a LIT uri.
29 class CountingDataUploadable(upload.Data):
31 interrupt_after = None
32 interrupt_after_d = None
34 def read(self, length):
35 self.bytes_read += length
36 if self.interrupt_after is not None:
37 if self.bytes_read > self.interrupt_after:
38 self.interrupt_after = None
39 self.interrupt_after_d.callback(self)
40 return upload.Data.read(self, length)
43 class SystemTest(SystemTestMixin, unittest.TestCase):
45 def test_connections(self):
46 self.basedir = "system/SystemTest/test_connections"
47 d = self.set_up_nodes()
48 self.extra_node = None
49 d.addCallback(lambda res: self.add_extra_node(self.numclients))
50 def _check(extra_node):
51 self.extra_node = extra_node
52 for c in self.clients:
53 all_peerids = list(c.get_all_peerids())
54 self.failUnlessEqual(len(all_peerids), self.numclients+1)
55 permuted_peers = list(c.get_permuted_peers("storage", "a"))
56 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
59 def _shutdown_extra_node(res):
61 return self.extra_node.stopService()
63 d.addBoth(_shutdown_extra_node)
65 test_connections.timeout = 300
66 # test_connections is subsumed by test_upload_and_download, and takes
67 # quite a while to run on a slow machine (because of all the TLS
68 # connections that must be established). If we ever rework the introducer
69 # code to such an extent that we're not sure if it works anymore, we can
70 # reinstate this test until it does.
73 def test_upload_and_download_random_key(self):
74 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
75 return self._test_upload_and_download(convergence=None)
76 test_upload_and_download_random_key.timeout = 4800
78 def test_upload_and_download_convergent(self):
79 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
80 return self._test_upload_and_download(convergence="some convergence string")
81 test_upload_and_download_convergent.timeout = 4800
83 def _test_upload_and_download(self, convergence):
84 # we use 4000 bytes of data, which will result in about 400k written
85 # to disk among all our simulated nodes
86 DATA = "Some data to upload\n" * 200
87 d = self.set_up_nodes()
88 def _check_connections(res):
89 for c in self.clients:
90 all_peerids = list(c.get_all_peerids())
91 self.failUnlessEqual(len(all_peerids), self.numclients)
92 permuted_peers = list(c.get_permuted_peers("storage", "a"))
93 self.failUnlessEqual(len(permuted_peers), self.numclients)
94 d.addCallback(_check_connections)
98 u = self.clients[0].getServiceNamed("uploader")
100 # we crank the max segsize down to 1024b for the duration of this
101 # test, so we can exercise multiple segments. It is important
102 # that this is not a multiple of the segment size, so that the
103 # tail segment is not the same length as the others. This actualy
104 # gets rounded up to 1025 to be a multiple of the number of
105 # required shares (since we use 25 out of 100 FEC).
106 up = upload.Data(DATA, convergence=convergence)
107 up.max_segment_size = 1024
110 d.addCallback(_do_upload)
111 def _upload_done(results):
113 log.msg("upload finished: uri is %s" % (uri,))
115 dl = self.clients[1].getServiceNamed("downloader")
117 d.addCallback(_upload_done)
119 def _upload_again(res):
120 # Upload again. If using convergent encryption then this ought to be
121 # short-circuited, however with the way we currently generate URIs
122 # (i.e. because they include the roothash), we have to do all of the
123 # encoding work, and only get to save on the upload part.
124 log.msg("UPLOADING AGAIN")
125 up = upload.Data(DATA, convergence=convergence)
126 up.max_segment_size = 1024
127 d1 = self.uploader.upload(up)
128 d.addCallback(_upload_again)
130 def _download_to_data(res):
131 log.msg("DOWNLOADING")
132 return self.downloader.download_to_data(self.uri)
133 d.addCallback(_download_to_data)
134 def _download_to_data_done(data):
135 log.msg("download finished")
136 self.failUnlessEqual(data, DATA)
137 d.addCallback(_download_to_data_done)
139 target_filename = os.path.join(self.basedir, "download.target")
140 def _download_to_filename(res):
141 return self.downloader.download_to_filename(self.uri,
143 d.addCallback(_download_to_filename)
144 def _download_to_filename_done(res):
145 newdata = open(target_filename, "rb").read()
146 self.failUnlessEqual(newdata, DATA)
147 d.addCallback(_download_to_filename_done)
149 target_filename2 = os.path.join(self.basedir, "download.target2")
150 def _download_to_filehandle(res):
151 fh = open(target_filename2, "wb")
152 return self.downloader.download_to_filehandle(self.uri, fh)
153 d.addCallback(_download_to_filehandle)
154 def _download_to_filehandle_done(fh):
156 newdata = open(target_filename2, "rb").read()
157 self.failUnlessEqual(newdata, DATA)
158 d.addCallback(_download_to_filehandle_done)
160 def _download_nonexistent_uri(res):
161 baduri = self.mangle_uri(self.uri)
162 log.msg("about to download non-existent URI", level=log.UNUSUAL,
163 facility="tahoe.tests")
164 d1 = self.downloader.download_to_data(baduri)
165 def _baduri_should_fail(res):
166 log.msg("finished downloading non-existend URI",
167 level=log.UNUSUAL, facility="tahoe.tests")
168 self.failUnless(isinstance(res, Failure))
169 self.failUnless(res.check(download.NotEnoughSharesError),
170 "expected NotEnoughSharesError, got %s" % res)
171 # TODO: files that have zero peers should get a special kind
172 # of NotEnoughSharesError, which can be used to suggest that
173 # the URI might be wrong or that they've never uploaded the
174 # file in the first place.
175 d1.addBoth(_baduri_should_fail)
177 d.addCallback(_download_nonexistent_uri)
179 # add a new node, which doesn't accept shares, and only uses the
181 d.addCallback(lambda res: self.add_extra_node(self.numclients,
183 add_to_sparent=True))
184 def _added(extra_node):
185 self.extra_node = extra_node
186 extra_node.getServiceNamed("storage").sizelimit = 0
187 d.addCallback(_added)
189 HELPER_DATA = "Data that needs help to upload" * 1000
190 def _upload_with_helper(res):
191 u = upload.Data(HELPER_DATA, convergence=convergence)
192 d = self.extra_node.upload(u)
193 def _uploaded(results):
195 return self.downloader.download_to_data(uri)
196 d.addCallback(_uploaded)
198 self.failUnlessEqual(newdata, HELPER_DATA)
199 d.addCallback(_check)
201 d.addCallback(_upload_with_helper)
203 def _upload_duplicate_with_helper(res):
204 u = upload.Data(HELPER_DATA, convergence=convergence)
205 u.debug_stash_RemoteEncryptedUploadable = True
206 d = self.extra_node.upload(u)
207 def _uploaded(results):
209 return self.downloader.download_to_data(uri)
210 d.addCallback(_uploaded)
212 self.failUnlessEqual(newdata, HELPER_DATA)
213 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
214 "uploadable started uploading, should have been avoided")
215 d.addCallback(_check)
217 if convergence is not None:
218 d.addCallback(_upload_duplicate_with_helper)
220 def _upload_resumable(res):
221 DATA = "Data that needs help to upload and gets interrupted" * 1000
222 u1 = CountingDataUploadable(DATA, convergence=convergence)
223 u2 = CountingDataUploadable(DATA, convergence=convergence)
225 # we interrupt the connection after about 5kB by shutting down
226 # the helper, then restartingit.
227 u1.interrupt_after = 5000
228 u1.interrupt_after_d = defer.Deferred()
229 u1.interrupt_after_d.addCallback(lambda res:
230 self.bounce_client(0))
232 # sneak into the helper and reduce its chunk size, so that our
233 # debug_interrupt will sever the connection on about the fifth
234 # chunk fetched. This makes sure that we've started to write the
235 # new shares before we abandon them, which exercises the
236 # abort/delete-partial-share code. TODO: find a cleaner way to do
237 # this. I know that this will affect later uses of the helper in
238 # this same test run, but I'm not currently worried about it.
239 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
241 d = self.extra_node.upload(u1)
243 def _should_not_finish(res):
244 self.fail("interrupted upload should have failed, not finished"
245 " with result %s" % (res,))
247 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
249 # make sure we actually interrupted it before finishing the
251 self.failUnless(u1.bytes_read < len(DATA),
252 "read %d out of %d total" % (u1.bytes_read,
255 log.msg("waiting for reconnect", level=log.NOISY,
256 facility="tahoe.test.test_system")
257 # now, we need to give the nodes a chance to notice that this
258 # connection has gone away. When this happens, the storage
259 # servers will be told to abort their uploads, removing the
260 # partial shares. Unfortunately this involves TCP messages
261 # going through the loopback interface, and we can't easily
262 # predict how long that will take. If it were all local, we
263 # could use fireEventually() to stall. Since we don't have
264 # the right introduction hooks, the best we can do is use a
265 # fixed delay. TODO: this is fragile.
266 u1.interrupt_after_d.addCallback(self.stall, 2.0)
267 return u1.interrupt_after_d
268 d.addCallbacks(_should_not_finish, _interrupted)
270 def _disconnected(res):
271 # check to make sure the storage servers aren't still hanging
272 # on to the partial share: their incoming/ directories should
274 log.msg("disconnected", level=log.NOISY,
275 facility="tahoe.test.test_system")
276 for i in range(self.numclients):
277 incdir = os.path.join(self.getdir("client%d" % i),
278 "storage", "shares", "incoming")
279 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
280 d.addCallback(_disconnected)
282 # then we need to give the reconnector a chance to
283 # reestablish the connection to the helper.
284 d.addCallback(lambda res:
285 log.msg("wait_for_connections", level=log.NOISY,
286 facility="tahoe.test.test_system"))
287 d.addCallback(lambda res: self.wait_for_connections())
290 d.addCallback(lambda res:
291 log.msg("uploading again", level=log.NOISY,
292 facility="tahoe.test.test_system"))
293 d.addCallback(lambda res: self.extra_node.upload(u2))
295 def _uploaded(results):
297 log.msg("Second upload complete", level=log.NOISY,
298 facility="tahoe.test.test_system")
300 # this is really bytes received rather than sent, but it's
301 # convenient and basically measures the same thing
302 bytes_sent = results.ciphertext_fetched
304 # We currently don't support resumption of upload if the data is
305 # encrypted with a random key. (Because that would require us
306 # to store the key locally and re-use it on the next upload of
307 # this file, which isn't a bad thing to do, but we currently
309 if convergence is not None:
310 # Make sure we did not have to read the whole file the
311 # second time around .
312 self.failUnless(bytes_sent < len(DATA),
313 "resumption didn't save us any work:"
314 " read %d bytes out of %d total" %
315 (bytes_sent, len(DATA)))
317 # Make sure we did have to read the whole file the second
318 # time around -- because the one that we partially uploaded
319 # earlier was encrypted with a different random key.
320 self.failIf(bytes_sent < len(DATA),
321 "resumption saved us some work even though we were using random keys:"
322 " read %d bytes out of %d total" %
323 (bytes_sent, len(DATA)))
324 return self.downloader.download_to_data(uri)
325 d.addCallback(_uploaded)
328 self.failUnlessEqual(newdata, DATA)
329 # If using convergent encryption, then also check that the
330 # helper has removed the temp file from its directories.
331 if convergence is not None:
332 basedir = os.path.join(self.getdir("client0"), "helper")
333 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
334 self.failUnlessEqual(files, [])
335 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
336 self.failUnlessEqual(files, [])
337 d.addCallback(_check)
339 d.addCallback(_upload_resumable)
343 def _find_shares(self, basedir):
345 for (dirpath, dirnames, filenames) in os.walk(basedir):
346 if "storage" not in dirpath:
350 pieces = dirpath.split(os.sep)
351 if pieces[-4] == "storage" and pieces[-3] == "shares":
352 # we're sitting in .../storage/shares/$START/$SINDEX , and there
353 # are sharefiles here
354 assert pieces[-5].startswith("client")
355 client_num = int(pieces[-5][-1])
356 storage_index_s = pieces[-1]
357 storage_index = storage.si_a2b(storage_index_s)
358 for sharename in filenames:
359 shnum = int(sharename)
360 filename = os.path.join(dirpath, sharename)
361 data = (client_num, storage_index, filename, shnum)
364 self.fail("unable to find any share files in %s" % basedir)
367 def _corrupt_mutable_share(self, filename, which):
368 msf = storage.MutableShareFile(filename)
369 datav = msf.readv([ (0, 1000000) ])
370 final_share = datav[0]
371 assert len(final_share) < 1000000 # ought to be truncated
372 pieces = mutable_layout.unpack_share(final_share)
373 (seqnum, root_hash, IV, k, N, segsize, datalen,
374 verification_key, signature, share_hash_chain, block_hash_tree,
375 share_data, enc_privkey) = pieces
377 if which == "seqnum":
380 root_hash = self.flip_bit(root_hash)
382 IV = self.flip_bit(IV)
383 elif which == "segsize":
384 segsize = segsize + 15
385 elif which == "pubkey":
386 verification_key = self.flip_bit(verification_key)
387 elif which == "signature":
388 signature = self.flip_bit(signature)
389 elif which == "share_hash_chain":
390 nodenum = share_hash_chain.keys()[0]
391 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
392 elif which == "block_hash_tree":
393 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
394 elif which == "share_data":
395 share_data = self.flip_bit(share_data)
396 elif which == "encprivkey":
397 enc_privkey = self.flip_bit(enc_privkey)
399 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
401 final_share = mutable_layout.pack_share(prefix,
408 msf.writev( [(0, final_share)], None)
411 def test_mutable(self):
412 self.basedir = "system/SystemTest/test_mutable"
413 DATA = "initial contents go here." # 25 bytes % 3 != 0
414 NEWDATA = "new contents yay"
415 NEWERDATA = "this is getting old"
417 d = self.set_up_nodes(use_key_generator=True)
419 def _create_mutable(res):
421 log.msg("starting create_mutable_file")
422 d1 = c.create_mutable_file(DATA)
424 log.msg("DONE: %s" % (res,))
425 self._mutable_node_1 = res
427 d1.addCallback(_done)
429 d.addCallback(_create_mutable)
431 def _test_debug(res):
432 # find a share. It is important to run this while there is only
433 # one slot in the grid.
434 shares = self._find_shares(self.basedir)
435 (client_num, storage_index, filename, shnum) = shares[0]
436 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
438 log.msg(" for clients[%d]" % client_num)
440 out,err = StringIO(), StringIO()
441 rc = runner.runner(["dump-share",
443 stdout=out, stderr=err)
444 output = out.getvalue()
445 self.failUnlessEqual(rc, 0)
447 self.failUnless("Mutable slot found:\n" in output)
448 self.failUnless("share_type: SDMF\n" in output)
449 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
450 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
451 self.failUnless(" num_extra_leases: 0\n" in output)
452 # the pubkey size can vary by a byte, so the container might
453 # be a bit larger on some runs.
454 m = re.search(r'^ container_size: (\d+)$', output, re.M)
456 container_size = int(m.group(1))
457 self.failUnless(2037 <= container_size <= 2049, container_size)
458 m = re.search(r'^ data_length: (\d+)$', output, re.M)
460 data_length = int(m.group(1))
461 self.failUnless(2037 <= data_length <= 2049, data_length)
462 self.failUnless(" secrets are for nodeid: %s\n" % peerid
464 self.failUnless(" SDMF contents:\n" in output)
465 self.failUnless(" seqnum: 1\n" in output)
466 self.failUnless(" required_shares: 3\n" in output)
467 self.failUnless(" total_shares: 10\n" in output)
468 self.failUnless(" segsize: 27\n" in output, (output, filename))
469 self.failUnless(" datalen: 25\n" in output)
470 # the exact share_hash_chain nodes depends upon the sharenum,
471 # and is more of a hassle to compute than I want to deal with
473 self.failUnless(" share_hash_chain: " in output)
474 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
475 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
476 base32.b2a(storage_index))
477 self.failUnless(expected in output)
478 except unittest.FailTest:
480 print "dump-share output was:"
483 d.addCallback(_test_debug)
487 # first, let's see if we can use the existing node to retrieve the
488 # contents. This allows it to use the cached pubkey and maybe the
489 # latest-known sharemap.
491 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
492 def _check_download_1(res):
493 self.failUnlessEqual(res, DATA)
494 # now we see if we can retrieve the data from a new node,
495 # constructed using the URI of the original one. We do this test
496 # on the same client that uploaded the data.
497 uri = self._mutable_node_1.get_uri()
498 log.msg("starting retrieve1")
499 newnode = self.clients[0].create_node_from_uri(uri)
500 newnode_2 = self.clients[0].create_node_from_uri(uri)
501 self.failUnlessIdentical(newnode, newnode_2)
502 return newnode.download_best_version()
503 d.addCallback(_check_download_1)
505 def _check_download_2(res):
506 self.failUnlessEqual(res, DATA)
507 # same thing, but with a different client
508 uri = self._mutable_node_1.get_uri()
509 newnode = self.clients[1].create_node_from_uri(uri)
510 log.msg("starting retrieve2")
511 d1 = newnode.download_best_version()
512 d1.addCallback(lambda res: (res, newnode))
514 d.addCallback(_check_download_2)
516 def _check_download_3((res, newnode)):
517 self.failUnlessEqual(res, DATA)
519 log.msg("starting replace1")
520 d1 = newnode.overwrite(NEWDATA)
521 d1.addCallback(lambda res: newnode.download_best_version())
523 d.addCallback(_check_download_3)
525 def _check_download_4(res):
526 self.failUnlessEqual(res, NEWDATA)
527 # now create an even newer node and replace the data on it. This
528 # new node has never been used for download before.
529 uri = self._mutable_node_1.get_uri()
530 newnode1 = self.clients[2].create_node_from_uri(uri)
531 newnode2 = self.clients[3].create_node_from_uri(uri)
532 self._newnode3 = self.clients[3].create_node_from_uri(uri)
533 log.msg("starting replace2")
534 d1 = newnode1.overwrite(NEWERDATA)
535 d1.addCallback(lambda res: newnode2.download_best_version())
537 d.addCallback(_check_download_4)
539 def _check_download_5(res):
540 log.msg("finished replace2")
541 self.failUnlessEqual(res, NEWERDATA)
542 d.addCallback(_check_download_5)
544 def _corrupt_shares(res):
545 # run around and flip bits in all but k of the shares, to test
547 shares = self._find_shares(self.basedir)
548 ## sort by share number
549 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
550 where = dict([ (shnum, filename)
551 for (client_num, storage_index, filename, shnum)
553 assert len(where) == 10 # this test is designed for 3-of-10
554 for shnum, filename in where.items():
555 # shares 7,8,9 are left alone. read will check
556 # (share_hash_chain, block_hash_tree, share_data). New
557 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
558 # segsize, signature).
560 # read: this will trigger "pubkey doesn't match
562 self._corrupt_mutable_share(filename, "pubkey")
563 self._corrupt_mutable_share(filename, "encprivkey")
565 # triggers "signature is invalid"
566 self._corrupt_mutable_share(filename, "seqnum")
568 # triggers "signature is invalid"
569 self._corrupt_mutable_share(filename, "R")
571 # triggers "signature is invalid"
572 self._corrupt_mutable_share(filename, "segsize")
574 self._corrupt_mutable_share(filename, "share_hash_chain")
576 self._corrupt_mutable_share(filename, "block_hash_tree")
578 self._corrupt_mutable_share(filename, "share_data")
579 # other things to correct: IV, signature
580 # 7,8,9 are left alone
582 # note that initial_query_count=5 means that we'll hit the
583 # first 5 servers in effectively random order (based upon
584 # response time), so we won't necessarily ever get a "pubkey
585 # doesn't match fingerprint" error (if we hit shnum>=1 before
586 # shnum=0, we pull the pubkey from there). To get repeatable
587 # specific failures, we need to set initial_query_count=1,
588 # but of course that will change the sequencing behavior of
589 # the retrieval process. TODO: find a reasonable way to make
590 # this a parameter, probably when we expand this test to test
591 # for one failure mode at a time.
593 # when we retrieve this, we should get three signature
594 # failures (where we've mangled seqnum, R, and segsize). The
596 d.addCallback(_corrupt_shares)
598 d.addCallback(lambda res: self._newnode3.download_best_version())
599 d.addCallback(_check_download_5)
601 def _check_empty_file(res):
602 # make sure we can create empty files, this usually screws up the
604 d1 = self.clients[2].create_mutable_file("")
605 d1.addCallback(lambda newnode: newnode.download_best_version())
606 d1.addCallback(lambda res: self.failUnlessEqual("", res))
608 d.addCallback(_check_empty_file)
610 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
611 def _created_dirnode(dnode):
612 log.msg("_created_dirnode(%s)" % (dnode,))
614 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
615 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
616 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
617 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
618 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
619 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
620 d1.addCallback(lambda res: dnode.build_manifest())
621 d1.addCallback(lambda manifest:
622 self.failUnlessEqual(len(manifest), 1))
624 d.addCallback(_created_dirnode)
626 def wait_for_c3_kg_conn():
627 return self.clients[3]._key_generator is not None
628 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
630 def check_kg_poolsize(junk, size_delta):
631 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
632 self.key_generator_svc.key_generator.pool_size + size_delta)
634 d.addCallback(check_kg_poolsize, 0)
635 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
636 d.addCallback(check_kg_poolsize, -1)
637 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
638 d.addCallback(check_kg_poolsize, -2)
639 # use_helper induces use of clients[3], which is the using-key_gen client
640 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
641 d.addCallback(check_kg_poolsize, -3)
644 # The default 120 second timeout went off when running it under valgrind
645 # on my old Windows laptop, so I'm bumping up the timeout.
646 test_mutable.timeout = 240
648 def flip_bit(self, good):
649 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
651 def mangle_uri(self, gooduri):
652 # change the key, which changes the storage index, which means we'll
653 # be asking about the wrong file, so nobody will have any shares
654 u = IFileURI(gooduri)
655 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
656 uri_extension_hash=u.uri_extension_hash,
657 needed_shares=u.needed_shares,
658 total_shares=u.total_shares,
660 return u2.to_string()
662 # TODO: add a test which mangles the uri_extension_hash instead, and
663 # should fail due to not being able to get a valid uri_extension block.
664 # Also a test which sneakily mangles the uri_extension block to change
665 # some of the validation data, so it will fail in the post-download phase
666 # when the file's crypttext integrity check fails. Do the same thing for
667 # the key, which should cause the download to fail the post-download
668 # plaintext_hash check.
670 def test_vdrive(self):
671 self.basedir = "system/SystemTest/test_vdrive"
672 self.data = LARGE_DATA
673 d = self.set_up_nodes(use_stats_gatherer=True)
674 d.addCallback(self._test_introweb)
675 d.addCallback(self.log, "starting publish")
676 d.addCallback(self._do_publish1)
677 d.addCallback(self._test_runner)
678 d.addCallback(self._do_publish2)
679 # at this point, we have the following filesystem (where "R" denotes
680 # self._root_directory_uri):
683 # R/subdir1/mydata567
685 # R/subdir1/subdir2/mydata992
687 d.addCallback(lambda res: self.bounce_client(0))
688 d.addCallback(self.log, "bounced client0")
690 d.addCallback(self._check_publish1)
691 d.addCallback(self.log, "did _check_publish1")
692 d.addCallback(self._check_publish2)
693 d.addCallback(self.log, "did _check_publish2")
694 d.addCallback(self._do_publish_private)
695 d.addCallback(self.log, "did _do_publish_private")
696 # now we also have (where "P" denotes a new dir):
697 # P/personal/sekrit data
698 # P/s2-rw -> /subdir1/subdir2/
699 # P/s2-ro -> /subdir1/subdir2/ (read-only)
700 d.addCallback(self._check_publish_private)
701 d.addCallback(self.log, "did _check_publish_private")
702 d.addCallback(self._test_web)
703 d.addCallback(self._test_control)
704 d.addCallback(self._test_cli)
705 # P now has four top-level children:
706 # P/personal/sekrit data
709 # P/test_put/ (empty)
710 d.addCallback(self._test_checker)
711 d.addCallback(self._grab_stats)
713 test_vdrive.timeout = 1100
715 def _test_introweb(self, res):
716 d = getPage(self.introweb_url, method="GET", followRedirect=True)
719 self.failUnless("allmydata: %s" % str(allmydata.__version__)
721 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
722 self.failUnless("Subscription Summary: storage: 5" in res)
723 except unittest.FailTest:
725 print "GET %s output was:" % self.introweb_url
728 d.addCallback(_check)
729 d.addCallback(lambda res:
730 getPage(self.introweb_url + "?t=json",
731 method="GET", followRedirect=True))
732 def _check_json(res):
733 data = simplejson.loads(res)
735 self.failUnlessEqual(data["subscription_summary"],
737 self.failUnlessEqual(data["announcement_summary"],
738 {"storage": 5, "stub_client": 5})
739 except unittest.FailTest:
741 print "GET %s?t=json output was:" % self.introweb_url
744 d.addCallback(_check_json)
747 def _do_publish1(self, res):
748 ut = upload.Data(self.data, convergence=None)
750 d = c0.create_empty_dirnode()
751 def _made_root(new_dirnode):
752 self._root_directory_uri = new_dirnode.get_uri()
753 return c0.create_node_from_uri(self._root_directory_uri)
754 d.addCallback(_made_root)
755 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
756 def _made_subdir1(subdir1_node):
757 self._subdir1_node = subdir1_node
758 d1 = subdir1_node.add_file(u"mydata567", ut)
759 d1.addCallback(self.log, "publish finished")
760 def _stash_uri(filenode):
761 self.uri = filenode.get_uri()
762 d1.addCallback(_stash_uri)
764 d.addCallback(_made_subdir1)
767 def _do_publish2(self, res):
768 ut = upload.Data(self.data, convergence=None)
769 d = self._subdir1_node.create_empty_directory(u"subdir2")
770 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
773 def log(self, res, msg, **kwargs):
774 # print "MSG: %s RES: %s" % (msg, res)
775 log.msg(msg, **kwargs)
778 def _do_publish_private(self, res):
779 self.smalldata = "sssh, very secret stuff"
780 ut = upload.Data(self.smalldata, convergence=None)
781 d = self.clients[0].create_empty_dirnode()
782 d.addCallback(self.log, "GOT private directory")
783 def _got_new_dir(privnode):
784 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
785 d1 = privnode.create_empty_directory(u"personal")
786 d1.addCallback(self.log, "made P/personal")
787 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
788 d1.addCallback(self.log, "made P/personal/sekrit data")
789 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
791 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
792 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
794 d1.addCallback(_got_s2)
795 d1.addCallback(lambda res: privnode)
797 d.addCallback(_got_new_dir)
800 def _check_publish1(self, res):
801 # this one uses the iterative API
803 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
804 d.addCallback(self.log, "check_publish1 got /")
805 d.addCallback(lambda root: root.get(u"subdir1"))
806 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
807 d.addCallback(lambda filenode: filenode.download_to_data())
808 d.addCallback(self.log, "get finished")
810 self.failUnlessEqual(data, self.data)
811 d.addCallback(_get_done)
814 def _check_publish2(self, res):
815 # this one uses the path-based API
816 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
817 d = rootnode.get_child_at_path(u"subdir1")
818 d.addCallback(lambda dirnode:
819 self.failUnless(IDirectoryNode.providedBy(dirnode)))
820 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
821 d.addCallback(lambda filenode: filenode.download_to_data())
822 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
824 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
825 def _got_filenode(filenode):
826 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
827 assert fnode == filenode
828 d.addCallback(_got_filenode)
831 def _check_publish_private(self, resnode):
832 # this one uses the path-based API
833 self._private_node = resnode
835 d = self._private_node.get_child_at_path(u"personal")
836 def _got_personal(personal):
837 self._personal_node = personal
839 d.addCallback(_got_personal)
841 d.addCallback(lambda dirnode:
842 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
844 return self._private_node.get_child_at_path(path)
846 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
847 d.addCallback(lambda filenode: filenode.download_to_data())
848 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
849 d.addCallback(lambda res: get_path(u"s2-rw"))
850 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
851 d.addCallback(lambda res: get_path(u"s2-ro"))
852 def _got_s2ro(dirnode):
853 self.failUnless(dirnode.is_mutable(), dirnode)
854 self.failUnless(dirnode.is_readonly(), dirnode)
855 d1 = defer.succeed(None)
856 d1.addCallback(lambda res: dirnode.list())
857 d1.addCallback(self.log, "dirnode.list")
859 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
861 d1.addCallback(self.log, "doing add_file(ro)")
862 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
863 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
865 d1.addCallback(self.log, "doing get(ro)")
866 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
867 d1.addCallback(lambda filenode:
868 self.failUnless(IFileNode.providedBy(filenode)))
870 d1.addCallback(self.log, "doing delete(ro)")
871 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
873 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
875 d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
877 personal = self._personal_node
878 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
880 d1.addCallback(self.log, "doing move_child_to(ro)2")
881 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
883 d1.addCallback(self.log, "finished with _got_s2ro")
885 d.addCallback(_got_s2ro)
886 def _got_home(dummy):
887 home = self._private_node
888 personal = self._personal_node
889 d1 = defer.succeed(None)
890 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
891 d1.addCallback(lambda res:
892 personal.move_child_to(u"sekrit data",home,u"sekrit"))
894 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
895 d1.addCallback(lambda res:
896 home.move_child_to(u"sekrit", home, u"sekrit data"))
898 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
899 d1.addCallback(lambda res:
900 home.move_child_to(u"sekrit data", personal))
902 d1.addCallback(lambda res: home.build_manifest())
903 d1.addCallback(self.log, "manifest")
906 # P/personal/sekrit data
907 # P/s2-rw (same as P/s2-ro)
908 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
909 d1.addCallback(lambda manifest:
910 self.failUnlessEqual(len(manifest), 4))
911 d1.addCallback(lambda res: home.deep_stats())
912 def _check_stats(stats):
913 expected = {"count-immutable-files": 1,
914 "count-mutable-files": 0,
915 "count-literal-files": 1,
917 "count-directories": 3,
918 "size-immutable-files": 112,
919 "size-literal-files": 23,
920 #"size-directories": 616, # varies
921 #"largest-directory": 616,
922 "largest-directory-children": 3,
923 "largest-immutable-file": 112,
925 for k,v in expected.iteritems():
926 self.failUnlessEqual(stats[k], v,
927 "stats[%s] was %s, not %s" %
929 self.failUnless(stats["size-directories"] > 1300,
930 stats["size-directories"])
931 self.failUnless(stats["largest-directory"] > 800,
932 stats["largest-directory"])
933 self.failUnlessEqual(stats["size-files-histogram"],
934 [ (11, 31, 1), (101, 316, 1) ])
935 d1.addCallback(_check_stats)
937 d.addCallback(_got_home)
940 def shouldFail(self, res, expected_failure, which, substring=None):
941 if isinstance(res, Failure):
942 res.trap(expected_failure)
944 self.failUnless(substring in str(res),
945 "substring '%s' not in '%s'"
946 % (substring, str(res)))
948 self.fail("%s was supposed to raise %s, not get '%s'" %
949 (which, expected_failure, res))
951 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
952 assert substring is None or isinstance(substring, str)
953 d = defer.maybeDeferred(callable, *args, **kwargs)
955 if isinstance(res, Failure):
956 res.trap(expected_failure)
958 self.failUnless(substring in str(res),
959 "substring '%s' not in '%s'"
960 % (substring, str(res)))
962 self.fail("%s was supposed to raise %s, not get '%s'" %
963 (which, expected_failure, res))
967 def PUT(self, urlpath, data):
968 url = self.webish_url + urlpath
969 return getPage(url, method="PUT", postdata=data)
971 def GET(self, urlpath, followRedirect=False):
972 url = self.webish_url + urlpath
973 return getPage(url, method="GET", followRedirect=followRedirect)
975 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
977 url = self.helper_webish_url + urlpath
979 url = self.webish_url + urlpath
980 sepbase = "boogabooga"
984 form.append('Content-Disposition: form-data; name="_charset"')
988 for name, value in fields.iteritems():
989 if isinstance(value, tuple):
990 filename, value = value
991 form.append('Content-Disposition: form-data; name="%s"; '
992 'filename="%s"' % (name, filename.encode("utf-8")))
994 form.append('Content-Disposition: form-data; name="%s"' % name)
996 form.append(str(value))
999 body = "\r\n".join(form) + "\r\n"
1000 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1002 return getPage(url, method="POST", postdata=body,
1003 headers=headers, followRedirect=followRedirect)
1005 def _test_web(self, res):
1006 base = self.webish_url
1007 public = "uri/" + self._root_directory_uri
1009 def _got_welcome(page):
1010 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1011 self.failUnless(expected in page,
1012 "I didn't see the right 'connected storage servers'"
1013 " message in: %s" % page
1015 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1016 self.failUnless(expected in page,
1017 "I didn't see the right 'My nodeid' message "
1019 self.failUnless("Helper: 0 active uploads" in page)
1020 d.addCallback(_got_welcome)
1021 d.addCallback(self.log, "done with _got_welcome")
1023 # get the welcome page from the node that uses the helper too
1024 d.addCallback(lambda res: getPage(self.helper_webish_url))
1025 def _got_welcome_helper(page):
1026 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1028 self.failUnless("Not running helper" in page)
1029 d.addCallback(_got_welcome_helper)
1031 d.addCallback(lambda res: getPage(base + public))
1032 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1033 def _got_subdir1(page):
1034 # there ought to be an href for our file
1035 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1036 self.failUnless(">mydata567</a>" in page)
1037 d.addCallback(_got_subdir1)
1038 d.addCallback(self.log, "done with _got_subdir1")
1039 d.addCallback(lambda res:
1040 getPage(base + public + "/subdir1/mydata567"))
1041 def _got_data(page):
1042 self.failUnlessEqual(page, self.data)
1043 d.addCallback(_got_data)
1045 # download from a URI embedded in a URL
1046 d.addCallback(self.log, "_get_from_uri")
1047 def _get_from_uri(res):
1048 return getPage(base + "uri/%s?filename=%s"
1049 % (self.uri, "mydata567"))
1050 d.addCallback(_get_from_uri)
1051 def _got_from_uri(page):
1052 self.failUnlessEqual(page, self.data)
1053 d.addCallback(_got_from_uri)
1055 # download from a URI embedded in a URL, second form
1056 d.addCallback(self.log, "_get_from_uri2")
1057 def _get_from_uri2(res):
1058 return getPage(base + "uri?uri=%s" % (self.uri,))
1059 d.addCallback(_get_from_uri2)
1060 d.addCallback(_got_from_uri)
1062 # download from a bogus URI, make sure we get a reasonable error
1063 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1064 def _get_from_bogus_uri(res):
1065 d1 = getPage(base + "uri/%s?filename=%s"
1066 % (self.mangle_uri(self.uri), "mydata567"))
1067 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1070 d.addCallback(_get_from_bogus_uri)
1071 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1073 # upload a file with PUT
1074 d.addCallback(self.log, "about to try PUT")
1075 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1076 "new.txt contents"))
1077 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1078 d.addCallback(self.failUnlessEqual, "new.txt contents")
1079 # and again with something large enough to use multiple segments,
1080 # and hopefully trigger pauseProducing too
1081 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1082 "big" * 500000)) # 1.5MB
1083 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1084 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1086 # can we replace files in place?
1087 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1089 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1090 d.addCallback(self.failUnlessEqual, "NEWER contents")
1092 # test unlinked POST
1093 d.addCallback(lambda res: self.POST("uri", t="upload",
1094 file=("new.txt", "data" * 10000)))
1095 # and again using the helper, which exercises different upload-status
1097 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1098 file=("foo.txt", "data2" * 10000)))
1100 # check that the status page exists
1101 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1102 def _got_status(res):
1103 # find an interesting upload and download to look at. LIT files
1104 # are not interesting.
1105 for ds in self.clients[0].list_all_download_statuses():
1106 if ds.get_size() > 200:
1107 self._down_status = ds.get_counter()
1108 for us in self.clients[0].list_all_upload_statuses():
1109 if us.get_size() > 200:
1110 self._up_status = us.get_counter()
1111 rs = self.clients[0].list_all_retrieve_statuses()[0]
1112 self._retrieve_status = rs.get_counter()
1113 ps = self.clients[0].list_all_publish_statuses()[0]
1114 self._publish_status = ps.get_counter()
1115 us = self.clients[0].list_all_mapupdate_statuses()[0]
1116 self._update_status = us.get_counter()
1118 # and that there are some upload- and download- status pages
1119 return self.GET("status/up-%d" % self._up_status)
1120 d.addCallback(_got_status)
1122 return self.GET("status/down-%d" % self._down_status)
1123 d.addCallback(_got_up)
1125 return self.GET("status/mapupdate-%d" % self._update_status)
1126 d.addCallback(_got_down)
1127 def _got_update(res):
1128 return self.GET("status/publish-%d" % self._publish_status)
1129 d.addCallback(_got_update)
1130 def _got_publish(res):
1131 return self.GET("status/retrieve-%d" % self._retrieve_status)
1132 d.addCallback(_got_publish)
1134 # check that the helper status page exists
1135 d.addCallback(lambda res:
1136 self.GET("helper_status", followRedirect=True))
1137 def _got_helper_status(res):
1138 self.failUnless("Bytes Fetched:" in res)
1139 # touch a couple of files in the helper's working directory to
1140 # exercise more code paths
1141 workdir = os.path.join(self.getdir("client0"), "helper")
1142 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1143 f = open(incfile, "wb")
1144 f.write("small file")
1146 then = time.time() - 86400*3
1148 os.utime(incfile, (now, then))
1149 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1150 f = open(encfile, "wb")
1151 f.write("less small file")
1153 os.utime(encfile, (now, then))
1154 d.addCallback(_got_helper_status)
1155 # and that the json form exists
1156 d.addCallback(lambda res:
1157 self.GET("helper_status?t=json", followRedirect=True))
1158 def _got_helper_status_json(res):
1159 data = simplejson.loads(res)
1160 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1162 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1163 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1164 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1166 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1167 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1168 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1170 d.addCallback(_got_helper_status_json)
1172 # and check that client[3] (which uses a helper but does not run one
1173 # itself) doesn't explode when you ask for its status
1174 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1175 def _got_non_helper_status(res):
1176 self.failUnless("Upload and Download Status" in res)
1177 d.addCallback(_got_non_helper_status)
1179 # or for helper status with t=json
1180 d.addCallback(lambda res:
1181 getPage(self.helper_webish_url + "helper_status?t=json"))
1182 def _got_non_helper_status_json(res):
1183 data = simplejson.loads(res)
1184 self.failUnlessEqual(data, {})
1185 d.addCallback(_got_non_helper_status_json)
1187 # see if the statistics page exists
1188 d.addCallback(lambda res: self.GET("statistics"))
1189 def _got_stats(res):
1190 self.failUnless("Node Statistics" in res)
1191 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1192 d.addCallback(_got_stats)
1193 d.addCallback(lambda res: self.GET("statistics?t=json"))
1194 def _got_stats_json(res):
1195 data = simplejson.loads(res)
1196 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1197 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1198 d.addCallback(_got_stats_json)
1200 # TODO: mangle the second segment of a file, to test errors that
1201 # occur after we've already sent some good data, which uses a
1202 # different error path.
1204 # TODO: download a URI with a form
1205 # TODO: create a directory by using a form
1206 # TODO: upload by using a form on the directory page
1207 # url = base + "somedir/subdir1/freeform_post!!upload"
1208 # TODO: delete a file by using a button on the directory page
1212 def _test_runner(self, res):
1213 # exercise some of the diagnostic tools in runner.py
1216 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1217 if "storage" not in dirpath:
1221 pieces = dirpath.split(os.sep)
1222 if pieces[-4] == "storage" and pieces[-3] == "shares":
1223 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1224 # are sharefiles here
1225 filename = os.path.join(dirpath, filenames[0])
1226 # peek at the magic to see if it is a chk share
1227 magic = open(filename, "rb").read(4)
1228 if magic == '\x00\x00\x00\x01':
1231 self.fail("unable to find any uri_extension files in %s"
1233 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1235 out,err = StringIO(), StringIO()
1236 rc = runner.runner(["dump-share",
1238 stdout=out, stderr=err)
1239 output = out.getvalue()
1240 self.failUnlessEqual(rc, 0)
1242 # we only upload a single file, so we can assert some things about
1243 # its size and shares.
1244 self.failUnless(("share filename: %s" % filename) in output)
1245 self.failUnless("size: %d\n" % len(self.data) in output)
1246 self.failUnless("num_segments: 1\n" in output)
1247 # segment_size is always a multiple of needed_shares
1248 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1249 self.failUnless("total_shares: 10\n" in output)
1250 # keys which are supposed to be present
1251 for key in ("size", "num_segments", "segment_size",
1252 "needed_shares", "total_shares",
1253 "codec_name", "codec_params", "tail_codec_params",
1254 #"plaintext_hash", "plaintext_root_hash",
1255 "crypttext_hash", "crypttext_root_hash",
1256 "share_root_hash", "UEB_hash"):
1257 self.failUnless("%s: " % key in output, key)
1258 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1260 # now use its storage index to find the other shares using the
1261 # 'find-shares' tool
1262 sharedir, shnum = os.path.split(filename)
1263 storagedir, storage_index_s = os.path.split(sharedir)
1264 out,err = StringIO(), StringIO()
1265 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1266 cmd = ["find-shares", storage_index_s] + nodedirs
1267 rc = runner.runner(cmd, stdout=out, stderr=err)
1268 self.failUnlessEqual(rc, 0)
1270 sharefiles = [sfn.strip() for sfn in out.readlines()]
1271 self.failUnlessEqual(len(sharefiles), 10)
1273 # also exercise the 'catalog-shares' tool
1274 out,err = StringIO(), StringIO()
1275 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1276 cmd = ["catalog-shares"] + nodedirs
1277 rc = runner.runner(cmd, stdout=out, stderr=err)
1278 self.failUnlessEqual(rc, 0)
1280 descriptions = [sfn.strip() for sfn in out.readlines()]
1281 self.failUnlessEqual(len(descriptions), 30)
1283 for line in descriptions
1284 if line.startswith("CHK %s " % storage_index_s)]
1285 self.failUnlessEqual(len(matching), 10)
1287 def _test_control(self, res):
1288 # exercise the remote-control-the-client foolscap interfaces in
1289 # allmydata.control (mostly used for performance tests)
1290 c0 = self.clients[0]
1291 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1292 control_furl = open(control_furl_file, "r").read().strip()
1293 # it doesn't really matter which Tub we use to connect to the client,
1294 # so let's just use our IntroducerNode's
1295 d = self.introducer.tub.getReference(control_furl)
1296 d.addCallback(self._test_control2, control_furl_file)
1298 def _test_control2(self, rref, filename):
1299 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1300 downfile = os.path.join(self.basedir, "control.downfile")
1301 d.addCallback(lambda uri:
1302 rref.callRemote("download_from_uri_to_file",
1305 self.failUnlessEqual(res, downfile)
1306 data = open(downfile, "r").read()
1307 expected_data = open(filename, "r").read()
1308 self.failUnlessEqual(data, expected_data)
1309 d.addCallback(_check)
1310 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1311 if sys.platform == "linux2":
1312 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1313 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1316 def _test_cli(self, res):
1317 # run various CLI commands (in a thread, since they use blocking
1320 private_uri = self._private_node.get_uri()
1321 some_uri = self._root_directory_uri
1322 client0_basedir = self.getdir("client0")
1325 "--node-directory", client0_basedir,
1327 TESTDATA = "I will not write the same thing over and over.\n" * 100
1329 d = defer.succeed(None)
1331 # for compatibility with earlier versions, private/root_dir.cap is
1332 # supposed to be treated as an alias named "tahoe:". Start by making
1333 # sure that works, before we add other aliases.
1335 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1336 f = open(root_file, "w")
1337 f.write(private_uri)
1340 def run(ignored, verb, *args, **kwargs):
1341 stdin = kwargs.get("stdin", "")
1342 newargs = [verb] + nodeargs + list(args)
1343 return self._run_cli(newargs, stdin=stdin)
1345 def _check_ls((out,err), expected_children, unexpected_children=[]):
1346 self.failUnlessEqual(err, "")
1347 for s in expected_children:
1348 self.failUnless(s in out, (s,out))
1349 for s in unexpected_children:
1350 self.failIf(s in out, (s,out))
1352 def _check_ls_root((out,err)):
1353 self.failUnless("personal" in out)
1354 self.failUnless("s2-ro" in out)
1355 self.failUnless("s2-rw" in out)
1356 self.failUnlessEqual(err, "")
1358 # this should reference private_uri
1359 d.addCallback(run, "ls")
1360 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1362 d.addCallback(run, "list-aliases")
1363 def _check_aliases_1((out,err)):
1364 self.failUnlessEqual(err, "")
1365 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1366 d.addCallback(_check_aliases_1)
1368 # now that that's out of the way, remove root_dir.cap and work with
1370 d.addCallback(lambda res: os.unlink(root_file))
1371 d.addCallback(run, "list-aliases")
1372 def _check_aliases_2((out,err)):
1373 self.failUnlessEqual(err, "")
1374 self.failUnlessEqual(out, "")
1375 d.addCallback(_check_aliases_2)
1377 d.addCallback(run, "mkdir")
1378 def _got_dir( (out,err) ):
1379 self.failUnless(uri.from_string_dirnode(out.strip()))
1381 d.addCallback(_got_dir)
1382 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1384 d.addCallback(run, "list-aliases")
1385 def _check_aliases_3((out,err)):
1386 self.failUnlessEqual(err, "")
1387 self.failUnless("tahoe: " in out)
1388 d.addCallback(_check_aliases_3)
1390 def _check_empty_dir((out,err)):
1391 self.failUnlessEqual(out, "")
1392 self.failUnlessEqual(err, "")
1393 d.addCallback(run, "ls")
1394 d.addCallback(_check_empty_dir)
1396 def _check_missing_dir((out,err)):
1397 # TODO: check that rc==2
1398 self.failUnlessEqual(out, "")
1399 self.failUnlessEqual(err, "No such file or directory\n")
1400 d.addCallback(run, "ls", "bogus")
1401 d.addCallback(_check_missing_dir)
1406 fn = os.path.join(self.basedir, "file%d" % i)
1408 data = "data to be uploaded: file%d\n" % i
1410 open(fn,"wb").write(data)
1412 def _check_stdout_against((out,err), filenum=None, data=None):
1413 self.failUnlessEqual(err, "")
1414 if filenum is not None:
1415 self.failUnlessEqual(out, datas[filenum])
1416 if data is not None:
1417 self.failUnlessEqual(out, data)
1419 # test all both forms of put: from a file, and from stdin
1421 d.addCallback(run, "put", files[0], "tahoe-file0")
1422 def _put_out((out,err)):
1423 self.failUnless("URI:LIT:" in out, out)
1424 self.failUnless("201 Created" in err, err)
1426 return run(None, "get", uri0)
1427 d.addCallback(_put_out)
1428 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1430 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1431 # tahoe put bar tahoe:FOO
1432 d.addCallback(run, "put", files[2], "tahoe:file2")
1433 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1434 def _check_put_mutable((out,err)):
1435 self._mutable_file3_uri = out.strip()
1436 d.addCallback(_check_put_mutable)
1437 d.addCallback(run, "get", "tahoe:file3")
1438 d.addCallback(_check_stdout_against, 3)
1441 STDIN_DATA = "This is the file to upload from stdin."
1442 d.addCallback(run, "put", "tahoe-file-stdin", stdin=STDIN_DATA)
1443 # tahoe put tahoe:FOO
1444 d.addCallback(run, "put", "tahoe:from-stdin",
1445 stdin="Other file from stdin.")
1447 d.addCallback(run, "ls")
1448 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1449 "tahoe-file-stdin", "from-stdin"])
1450 d.addCallback(run, "ls", "subdir")
1451 d.addCallback(_check_ls, ["tahoe-file1"])
1454 d.addCallback(run, "mkdir", "subdir2")
1455 d.addCallback(run, "ls")
1456 # TODO: extract the URI, set an alias with it
1457 d.addCallback(_check_ls, ["subdir2"])
1459 # tahoe get: (to stdin and to a file)
1460 d.addCallback(run, "get", "tahoe-file0")
1461 d.addCallback(_check_stdout_against, 0)
1462 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1463 d.addCallback(_check_stdout_against, 1)
1464 outfile0 = os.path.join(self.basedir, "outfile0")
1465 d.addCallback(run, "get", "file2", outfile0)
1466 def _check_outfile0((out,err)):
1467 data = open(outfile0,"rb").read()
1468 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1469 d.addCallback(_check_outfile0)
1470 outfile1 = os.path.join(self.basedir, "outfile0")
1471 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1472 def _check_outfile1((out,err)):
1473 data = open(outfile1,"rb").read()
1474 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1475 d.addCallback(_check_outfile1)
1477 d.addCallback(run, "rm", "tahoe-file0")
1478 d.addCallback(run, "rm", "tahoe:file2")
1479 d.addCallback(run, "ls")
1480 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1482 d.addCallback(run, "ls", "-l")
1483 def _check_ls_l((out,err)):
1484 lines = out.split("\n")
1486 if "tahoe-file-stdin" in l:
1487 self.failUnless(l.startswith("-r-- "), l)
1488 self.failUnless(" %d " % len(STDIN_DATA) in l)
1490 self.failUnless(l.startswith("-rw- "), l) # mutable
1491 d.addCallback(_check_ls_l)
1493 d.addCallback(run, "ls", "--uri")
1494 def _check_ls_uri((out,err)):
1495 lines = out.split("\n")
1498 self.failUnless(self._mutable_file3_uri in l)
1499 d.addCallback(_check_ls_uri)
1501 d.addCallback(run, "ls", "--readonly-uri")
1502 def _check_ls_rouri((out,err)):
1503 lines = out.split("\n")
1506 rw_uri = self._mutable_file3_uri
1507 u = uri.from_string_mutable_filenode(rw_uri)
1508 ro_uri = u.get_readonly().to_string()
1509 self.failUnless(ro_uri in l)
1510 d.addCallback(_check_ls_rouri)
1513 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1514 d.addCallback(run, "ls")
1515 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1517 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1518 d.addCallback(run, "ls")
1519 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1521 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1522 d.addCallback(run, "ls")
1523 d.addCallback(_check_ls, ["file3", "file3-copy"])
1524 d.addCallback(run, "get", "tahoe:file3-copy")
1525 d.addCallback(_check_stdout_against, 3)
1527 # copy from disk into tahoe
1528 d.addCallback(run, "cp", files[4], "tahoe:file4")
1529 d.addCallback(run, "ls")
1530 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1531 d.addCallback(run, "get", "tahoe:file4")
1532 d.addCallback(_check_stdout_against, 4)
1534 # copy from tahoe into disk
1535 target_filename = os.path.join(self.basedir, "file-out")
1536 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1537 def _check_cp_out((out,err)):
1538 self.failUnless(os.path.exists(target_filename))
1539 got = open(target_filename,"rb").read()
1540 self.failUnlessEqual(got, datas[4])
1541 d.addCallback(_check_cp_out)
1543 # copy from disk to disk (silly case)
1544 target2_filename = os.path.join(self.basedir, "file-out-copy")
1545 d.addCallback(run, "cp", target_filename, target2_filename)
1546 def _check_cp_out2((out,err)):
1547 self.failUnless(os.path.exists(target2_filename))
1548 got = open(target2_filename,"rb").read()
1549 self.failUnlessEqual(got, datas[4])
1550 d.addCallback(_check_cp_out2)
1552 # copy from tahoe into disk, overwriting an existing file
1553 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1554 def _check_cp_out3((out,err)):
1555 self.failUnless(os.path.exists(target_filename))
1556 got = open(target_filename,"rb").read()
1557 self.failUnlessEqual(got, datas[3])
1558 d.addCallback(_check_cp_out3)
1560 # copy from disk into tahoe, overwriting an existing immutable file
1561 d.addCallback(run, "cp", files[5], "tahoe:file4")
1562 d.addCallback(run, "ls")
1563 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1564 d.addCallback(run, "get", "tahoe:file4")
1565 d.addCallback(_check_stdout_against, 5)
1567 # copy from disk into tahoe, overwriting an existing mutable file
1568 d.addCallback(run, "cp", files[5], "tahoe:file3")
1569 d.addCallback(run, "ls")
1570 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1571 d.addCallback(run, "get", "tahoe:file3")
1572 d.addCallback(_check_stdout_against, 5)
1574 # recursive copy: setup
1575 dn = os.path.join(self.basedir, "dir1")
1577 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1578 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1579 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1580 sdn2 = os.path.join(dn, "subdir2")
1582 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1583 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1585 # from disk into tahoe
1586 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1587 d.addCallback(run, "ls")
1588 d.addCallback(_check_ls, ["dir1"])
1589 d.addCallback(run, "ls", "dir1")
1590 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1591 ["rfile4", "rfile5"])
1592 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1593 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1594 ["rfile1", "rfile2", "rfile3"])
1595 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1596 d.addCallback(_check_stdout_against, data="rfile4")
1598 # and back out again
1599 dn_copy = os.path.join(self.basedir, "dir1-copy")
1600 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1601 def _check_cp_r_out((out,err)):
1603 old = open(os.path.join(dn, name), "rb").read()
1604 newfn = os.path.join(dn_copy, name)
1605 self.failUnless(os.path.exists(newfn))
1606 new = open(newfn, "rb").read()
1607 self.failUnlessEqual(old, new)
1611 _cmp(os.path.join("subdir2", "rfile4"))
1612 _cmp(os.path.join("subdir2", "rfile5"))
1613 d.addCallback(_check_cp_r_out)
1615 # and copy it a second time, which ought to overwrite the same files
1616 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1618 # and tahoe-to-tahoe
1619 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1620 d.addCallback(run, "ls")
1621 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1622 d.addCallback(run, "ls", "dir1-copy")
1623 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1624 ["rfile4", "rfile5"])
1625 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1626 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1627 ["rfile1", "rfile2", "rfile3"])
1628 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1629 d.addCallback(_check_stdout_against, data="rfile4")
1631 # and copy it a second time, which ought to overwrite the same files
1632 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1634 # tahoe_ls doesn't currently handle the error correctly: it tries to
1635 # JSON-parse a traceback.
1636 ## def _ls_missing(res):
1637 ## argv = ["ls"] + nodeargs + ["bogus"]
1638 ## return self._run_cli(argv)
1639 ## d.addCallback(_ls_missing)
1640 ## def _check_ls_missing((out,err)):
1643 ## self.failUnlessEqual(err, "")
1644 ## d.addCallback(_check_ls_missing)
1648 def _run_cli(self, argv, stdin=""):
1650 stdout, stderr = StringIO(), StringIO()
1651 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1652 stdin=StringIO(stdin),
1653 stdout=stdout, stderr=stderr)
1655 return stdout.getvalue(), stderr.getvalue()
1656 d.addCallback(_done)
1659 def _test_checker(self, res):
1660 ut = upload.Data("too big to be literal" * 200, convergence=None)
1661 d = self._personal_node.add_file(u"big file", ut)
1663 d.addCallback(lambda res: self._personal_node.check())
1664 def _check_dirnode_results(r):
1665 self.failUnless(r.is_healthy())
1666 d.addCallback(_check_dirnode_results)
1667 d.addCallback(lambda res: self._personal_node.check(verify=True))
1668 d.addCallback(_check_dirnode_results)
1670 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1671 def _got_chk_filenode(n):
1672 self.failUnless(isinstance(n, filenode.FileNode))
1674 def _check_filenode_results(r):
1675 self.failUnless(r.is_healthy())
1676 d.addCallback(_check_filenode_results)
1677 d.addCallback(lambda res: n.check(verify=True))
1678 d.addCallback(_check_filenode_results)
1680 d.addCallback(_got_chk_filenode)
1682 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1683 def _got_lit_filenode(n):
1684 self.failUnless(isinstance(n, filenode.LiteralFileNode))
1686 def _check_filenode_results(r):
1687 self.failUnless(r.is_healthy())
1688 d.addCallback(_check_filenode_results)
1689 d.addCallback(lambda res: n.check(verify=True))
1690 d.addCallback(_check_filenode_results)
1692 d.addCallback(_got_lit_filenode)
1696 class Checker(SystemTestMixin, unittest.TestCase):
1698 # Set self.basedir to a temp dir which has the name of the current test method in its
1700 self.basedir = self.mktemp()
1701 TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
1703 d = defer.maybeDeferred(SystemTestMixin.setUp, self)
1704 d.addCallback(lambda x: self.set_up_nodes())
1706 def _upload_a_file(ignored):
1707 d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
1708 d2.addCallback(lambda u: self.clients[0].create_node_from_uri(u.uri))
1710 d.addCallback(_upload_a_file)
1712 def _stash_it(filenode):
1713 self.filenode = filenode
1714 d.addCallback(_stash_it)
1717 def _find_shares(self, unused=None):
1718 shares = {} # k: (i, sharenum), v: data
1720 for i, c in enumerate(self.clients):
1721 sharedir = c.getServiceNamed("storage").sharedir
1722 for (dirp, dirns, fns) in os.walk(sharedir):
1727 # Whoops, I guess that's not a share file then.
1730 data = open(os.path.join(sharedir, dirp, fn), "r").read()
1731 shares[(i, sharenum)] = data
1735 def _replace_shares(self, newshares):
1736 for i, c in enumerate(self.clients):
1737 sharedir = c.getServiceNamed("storage").sharedir
1738 for (dirp, dirns, fns) in os.walk(sharedir):
1743 # Whoops, I guess that's not a share file then.
1746 pathtosharefile = os.path.join(sharedir, dirp, fn)
1747 os.unlink(pathtosharefile)
1748 newdata = newshares.get((i, sharenum))
1749 if newdata is not None:
1750 open(pathtosharefile, "w").write(newdata)
1752 def _corrupt_a_share(self, unused=None):
1753 """ Exactly one bit of exactly one share on disk will be flipped (randomly selected from
1754 among the bits of the 'share data' -- the verifiable bits)."""
1756 shares = self._find_shares()
1758 k = random.choice(ks)
1761 (version, size, num_leases) = struct.unpack(">LLL", data[:0xc])
1762 sharedata = data[0xc:0xc+size]
1764 corruptedsharedata = testutil.flip_one_bit(sharedata)
1765 corrupteddata = data[:0xc]+corruptedsharedata+data[0xc+size:]
1766 shares[k] = corrupteddata
1768 self._replace_shares(shares)
1770 def test_test_code(self):
1771 # The following process of stashing the shares, running
1772 # _replace_shares, and asserting that the new set of shares equals the
1773 # old is more to test this test code than to test the Tahoe code...
1774 d = defer.succeed(None)
1775 d.addCallback(self._find_shares)
1781 d.addCallback(_stash_it)
1782 d.addCallback(self._replace_shares)
1785 oldshares = stash[0]
1786 self.failUnless(isinstance(oldshares, dict), oldshares)
1787 self.failUnlessEqual(oldshares, res)
1789 d.addCallback(self._find_shares)
1790 d.addCallback(_compare)
1792 d.addCallback(lambda ignore: self._replace_shares({}))
1793 d.addCallback(self._find_shares)
1794 d.addCallback(lambda x: self.failUnlessEqual(x, {}))
1798 def _count_reads(self):
1799 sum_of_read_counts = 0
1800 for client in self.clients:
1801 counters = client.stats_provider.get_stats()['counters']
1802 sum_of_read_counts += counters.get('storage_server.read', 0)
1803 return sum_of_read_counts
1805 def test_check_without_verify(self):
1806 """ Check says the file is healthy when none of the shares have been
1807 touched. It says that the file is unhealthy when all of them have
1808 been removed. It says that the file is healthy if one bit of one share
1809 has been flipped."""
1810 d = defer.succeed(self.filenode)
1811 def _check1(filenode):
1812 before_check_reads = self._count_reads()
1814 d2 = filenode.check(verify=False, repair=False)
1815 def _after_check(checkresults):
1816 after_check_reads = self._count_reads()
1817 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
1818 self.failUnless(checkresults.is_healthy())
1820 d2.addCallback(_after_check)
1822 d.addCallback(_check1)
1824 d.addCallback(self._corrupt_a_share)
1825 def _check2(ignored):
1826 before_check_reads = self._count_reads()
1827 d2 = self.filenode.check(verify=False, repair=False)
1829 def _after_check(checkresults):
1830 after_check_reads = self._count_reads()
1831 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
1833 d2.addCallback(_after_check)
1835 d.addCallback(_check2)
1838 d.addCallback(lambda ignore: self._replace_shares({}))
1839 def _check3(ignored):
1840 before_check_reads = self._count_reads()
1841 d2 = self.filenode.check(verify=False, repair=False)
1843 def _after_check(checkresults):
1844 after_check_reads = self._count_reads()
1845 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
1846 self.failIf(checkresults.is_healthy())
1848 d2.addCallback(_after_check)
1850 d.addCallback(_check3)
1854 def test_check_with_verify(self):
1855 """ Check says the file is healthy when none of the shares have been touched. It says
1856 that the file is unhealthy if one bit of one share has been flipped."""
1857 DELTA_READS = 10 * 2 # N == 10
1858 d = defer.succeed(self.filenode)
1859 def _check1(filenode):
1860 before_check_reads = self._count_reads()
1862 d2 = filenode.check(verify=True, repair=False)
1863 def _after_check(checkresults):
1864 after_check_reads = self._count_reads()
1865 # print "delta was ", after_check_reads - before_check_reads
1866 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
1867 self.failUnless(checkresults.is_healthy())
1869 d2.addCallback(_after_check)
1871 d.addCallback(_check1)
1873 d.addCallback(self._corrupt_a_share)
1874 def _check2(ignored):
1875 before_check_reads = self._count_reads()
1876 d2 = self.filenode.check(verify=True, repair=False)
1878 def _after_check(checkresults):
1879 after_check_reads = self._count_reads()
1880 # print "delta was ", after_check_reads - before_check_reads
1881 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
1882 self.failIf(checkresults.is_healthy())
1884 d2.addCallback(_after_check)
1886 d.addCallback(_check2)
1888 test_check_with_verify.todo = "We haven't implemented a verifier this thorough yet."