1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 from twisted.internet import utils
10 from allmydata import uri
11 from allmydata.storage.mutable import MutableShareFile
12 from allmydata.storage.server import si_a2b
13 from allmydata.immutable import offloaded, upload
14 from allmydata.immutable.literal import LiteralFileNode
15 from allmydata.immutable.filenode import ImmutableFileNode
16 from allmydata.util import idlib, mathutil
17 from allmydata.util import log, base32
18 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding
19 from allmydata.util.fileutil import abspath_expanduser_unicode
20 from allmydata.util.consumer import MemoryConsumer, download_to_data
21 from allmydata.scripts import runner
22 from allmydata.interfaces import IDirectoryNode, IFileNode, \
23 NoSuchChildError, NoSharesError
24 from allmydata.monitor import Monitor
25 from allmydata.mutable.common import NotWriteableError
26 from allmydata.mutable import layout as mutable_layout
27 from foolscap.api import DeadReferenceError
28 from twisted.python.failure import Failure
29 from twisted.web.client import getPage
30 from twisted.web.error import Error
32 from allmydata.test.common import SystemTestMixin
34 # TODO: move these to common or common_util
35 from allmydata.test.test_runner import bintahoe, SkipMixin
38 This is some data to publish to the remote grid.., which needs to be large
39 enough to not fit inside a LIT uri.
42 class CountingDataUploadable(upload.Data):
44 interrupt_after = None
45 interrupt_after_d = None
47 def read(self, length):
48 self.bytes_read += length
49 if self.interrupt_after is not None:
50 if self.bytes_read > self.interrupt_after:
51 self.interrupt_after = None
52 self.interrupt_after_d.callback(self)
53 return upload.Data.read(self, length)
55 class SystemTest(SystemTestMixin, SkipMixin, unittest.TestCase):
56 timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
58 def test_connections(self):
59 self.basedir = "system/SystemTest/test_connections"
60 d = self.set_up_nodes()
61 self.extra_node = None
62 d.addCallback(lambda res: self.add_extra_node(self.numclients))
63 def _check(extra_node):
64 self.extra_node = extra_node
65 for c in self.clients:
66 all_peerids = c.get_storage_broker().get_all_serverids()
67 self.failUnlessEqual(len(all_peerids), self.numclients+1)
69 permuted_peers = sb.get_servers_for_index("a")
70 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
73 def _shutdown_extra_node(res):
75 return self.extra_node.stopService()
77 d.addBoth(_shutdown_extra_node)
79 # test_connections is subsumed by test_upload_and_download, and takes
80 # quite a while to run on a slow machine (because of all the TLS
81 # connections that must be established). If we ever rework the introducer
82 # code to such an extent that we're not sure if it works anymore, we can
83 # reinstate this test until it does.
86 def test_upload_and_download_random_key(self):
87 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
88 return self._test_upload_and_download(convergence=None)
90 def test_upload_and_download_convergent(self):
91 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
92 return self._test_upload_and_download(convergence="some convergence string")
94 def _test_upload_and_download(self, convergence):
95 # we use 4000 bytes of data, which will result in about 400k written
96 # to disk among all our simulated nodes
97 DATA = "Some data to upload\n" * 200
98 d = self.set_up_nodes()
99 def _check_connections(res):
100 for c in self.clients:
101 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
102 all_peerids = c.get_storage_broker().get_all_serverids()
103 self.failUnlessEqual(len(all_peerids), self.numclients)
104 sb = c.storage_broker
105 permuted_peers = sb.get_servers_for_index("a")
106 self.failUnlessEqual(len(permuted_peers), self.numclients)
107 d.addCallback(_check_connections)
111 u = self.clients[0].getServiceNamed("uploader")
113 # we crank the max segsize down to 1024b for the duration of this
114 # test, so we can exercise multiple segments. It is important
115 # that this is not a multiple of the segment size, so that the
116 # tail segment is not the same length as the others. This actualy
117 # gets rounded up to 1025 to be a multiple of the number of
118 # required shares (since we use 25 out of 100 FEC).
119 up = upload.Data(DATA, convergence=convergence)
120 up.max_segment_size = 1024
123 d.addCallback(_do_upload)
124 def _upload_done(results):
126 log.msg("upload finished: uri is %s" % (theuri,))
128 assert isinstance(self.uri, str), self.uri
129 self.cap = uri.from_string(self.uri)
130 self.n = self.clients[1].create_node_from_uri(self.uri)
131 d.addCallback(_upload_done)
133 def _upload_again(res):
134 # Upload again. If using convergent encryption then this ought to be
135 # short-circuited, however with the way we currently generate URIs
136 # (i.e. because they include the roothash), we have to do all of the
137 # encoding work, and only get to save on the upload part.
138 log.msg("UPLOADING AGAIN")
139 up = upload.Data(DATA, convergence=convergence)
140 up.max_segment_size = 1024
141 return self.uploader.upload(up)
142 d.addCallback(_upload_again)
144 def _download_to_data(res):
145 log.msg("DOWNLOADING")
146 return download_to_data(self.n)
147 d.addCallback(_download_to_data)
148 def _download_to_data_done(data):
149 log.msg("download finished")
150 self.failUnlessEqual(data, DATA)
151 d.addCallback(_download_to_data_done)
154 n = self.clients[1].create_node_from_uri(self.uri)
155 d = download_to_data(n)
156 def _read_done(data):
157 self.failUnlessEqual(data, DATA)
158 d.addCallback(_read_done)
159 d.addCallback(lambda ign:
160 n.read(MemoryConsumer(), offset=1, size=4))
161 def _read_portion_done(mc):
162 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
163 d.addCallback(_read_portion_done)
164 d.addCallback(lambda ign:
165 n.read(MemoryConsumer(), offset=2, size=None))
166 def _read_tail_done(mc):
167 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
168 d.addCallback(_read_tail_done)
169 d.addCallback(lambda ign:
170 n.read(MemoryConsumer(), size=len(DATA)+1000))
171 def _read_too_much(mc):
172 self.failUnlessEqual("".join(mc.chunks), DATA)
173 d.addCallback(_read_too_much)
176 d.addCallback(_test_read)
178 def _test_bad_read(res):
179 bad_u = uri.from_string_filenode(self.uri)
180 bad_u.key = self.flip_bit(bad_u.key)
181 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
182 # this should cause an error during download
184 d = self.shouldFail2(NoSharesError, "'download bad node'",
186 bad_n.read, MemoryConsumer(), offset=2)
188 d.addCallback(_test_bad_read)
190 def _download_nonexistent_uri(res):
191 baduri = self.mangle_uri(self.uri)
192 badnode = self.clients[1].create_node_from_uri(baduri)
193 log.msg("about to download non-existent URI", level=log.UNUSUAL,
194 facility="tahoe.tests")
195 d1 = download_to_data(badnode)
196 def _baduri_should_fail(res):
197 log.msg("finished downloading non-existend URI",
198 level=log.UNUSUAL, facility="tahoe.tests")
199 self.failUnless(isinstance(res, Failure))
200 self.failUnless(res.check(NoSharesError),
201 "expected NoSharesError, got %s" % res)
202 d1.addBoth(_baduri_should_fail)
204 d.addCallback(_download_nonexistent_uri)
206 # add a new node, which doesn't accept shares, and only uses the
208 d.addCallback(lambda res: self.add_extra_node(self.numclients,
210 add_to_sparent=True))
211 def _added(extra_node):
212 self.extra_node = extra_node
213 self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
214 d.addCallback(_added)
216 HELPER_DATA = "Data that needs help to upload" * 1000
217 def _upload_with_helper(res):
218 u = upload.Data(HELPER_DATA, convergence=convergence)
219 d = self.extra_node.upload(u)
220 def _uploaded(results):
221 n = self.clients[1].create_node_from_uri(results.uri)
222 return download_to_data(n)
223 d.addCallback(_uploaded)
225 self.failUnlessEqual(newdata, HELPER_DATA)
226 d.addCallback(_check)
228 d.addCallback(_upload_with_helper)
230 def _upload_duplicate_with_helper(res):
231 u = upload.Data(HELPER_DATA, convergence=convergence)
232 u.debug_stash_RemoteEncryptedUploadable = True
233 d = self.extra_node.upload(u)
234 def _uploaded(results):
235 n = self.clients[1].create_node_from_uri(results.uri)
236 return download_to_data(n)
237 d.addCallback(_uploaded)
239 self.failUnlessEqual(newdata, HELPER_DATA)
240 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
241 "uploadable started uploading, should have been avoided")
242 d.addCallback(_check)
244 if convergence is not None:
245 d.addCallback(_upload_duplicate_with_helper)
247 def _upload_resumable(res):
248 DATA = "Data that needs help to upload and gets interrupted" * 1000
249 u1 = CountingDataUploadable(DATA, convergence=convergence)
250 u2 = CountingDataUploadable(DATA, convergence=convergence)
252 # we interrupt the connection after about 5kB by shutting down
253 # the helper, then restartingit.
254 u1.interrupt_after = 5000
255 u1.interrupt_after_d = defer.Deferred()
256 u1.interrupt_after_d.addCallback(lambda res:
257 self.bounce_client(0))
259 # sneak into the helper and reduce its chunk size, so that our
260 # debug_interrupt will sever the connection on about the fifth
261 # chunk fetched. This makes sure that we've started to write the
262 # new shares before we abandon them, which exercises the
263 # abort/delete-partial-share code. TODO: find a cleaner way to do
264 # this. I know that this will affect later uses of the helper in
265 # this same test run, but I'm not currently worried about it.
266 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
268 d = self.extra_node.upload(u1)
270 def _should_not_finish(res):
271 self.fail("interrupted upload should have failed, not finished"
272 " with result %s" % (res,))
274 f.trap(DeadReferenceError)
276 # make sure we actually interrupted it before finishing the
278 self.failUnless(u1.bytes_read < len(DATA),
279 "read %d out of %d total" % (u1.bytes_read,
282 log.msg("waiting for reconnect", level=log.NOISY,
283 facility="tahoe.test.test_system")
284 # now, we need to give the nodes a chance to notice that this
285 # connection has gone away. When this happens, the storage
286 # servers will be told to abort their uploads, removing the
287 # partial shares. Unfortunately this involves TCP messages
288 # going through the loopback interface, and we can't easily
289 # predict how long that will take. If it were all local, we
290 # could use fireEventually() to stall. Since we don't have
291 # the right introduction hooks, the best we can do is use a
292 # fixed delay. TODO: this is fragile.
293 u1.interrupt_after_d.addCallback(self.stall, 2.0)
294 return u1.interrupt_after_d
295 d.addCallbacks(_should_not_finish, _interrupted)
297 def _disconnected(res):
298 # check to make sure the storage servers aren't still hanging
299 # on to the partial share: their incoming/ directories should
301 log.msg("disconnected", level=log.NOISY,
302 facility="tahoe.test.test_system")
303 for i in range(self.numclients):
304 incdir = os.path.join(self.getdir("client%d" % i),
305 "storage", "shares", "incoming")
306 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
307 d.addCallback(_disconnected)
309 # then we need to give the reconnector a chance to
310 # reestablish the connection to the helper.
311 d.addCallback(lambda res:
312 log.msg("wait_for_connections", level=log.NOISY,
313 facility="tahoe.test.test_system"))
314 d.addCallback(lambda res: self.wait_for_connections())
317 d.addCallback(lambda res:
318 log.msg("uploading again", level=log.NOISY,
319 facility="tahoe.test.test_system"))
320 d.addCallback(lambda res: self.extra_node.upload(u2))
322 def _uploaded(results):
324 log.msg("Second upload complete", level=log.NOISY,
325 facility="tahoe.test.test_system")
327 # this is really bytes received rather than sent, but it's
328 # convenient and basically measures the same thing
329 bytes_sent = results.ciphertext_fetched
330 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
332 # We currently don't support resumption of upload if the data is
333 # encrypted with a random key. (Because that would require us
334 # to store the key locally and re-use it on the next upload of
335 # this file, which isn't a bad thing to do, but we currently
337 if convergence is not None:
338 # Make sure we did not have to read the whole file the
339 # second time around .
340 self.failUnless(bytes_sent < len(DATA),
341 "resumption didn't save us any work:"
342 " read %r bytes out of %r total" %
343 (bytes_sent, len(DATA)))
345 # Make sure we did have to read the whole file the second
346 # time around -- because the one that we partially uploaded
347 # earlier was encrypted with a different random key.
348 self.failIf(bytes_sent < len(DATA),
349 "resumption saved us some work even though we were using random keys:"
350 " read %r bytes out of %r total" %
351 (bytes_sent, len(DATA)))
352 n = self.clients[1].create_node_from_uri(cap)
353 return download_to_data(n)
354 d.addCallback(_uploaded)
357 self.failUnlessEqual(newdata, DATA)
358 # If using convergent encryption, then also check that the
359 # helper has removed the temp file from its directories.
360 if convergence is not None:
361 basedir = os.path.join(self.getdir("client0"), "helper")
362 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
363 self.failUnlessEqual(files, [])
364 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
365 self.failUnlessEqual(files, [])
366 d.addCallback(_check)
368 d.addCallback(_upload_resumable)
370 def _grab_stats(ignored):
371 # the StatsProvider doesn't normally publish a FURL:
372 # instead it passes a live reference to the StatsGatherer
373 # (if and when it connects). To exercise the remote stats
374 # interface, we manually publish client0's StatsProvider
375 # and use client1 to query it.
376 sp = self.clients[0].stats_provider
377 sp_furl = self.clients[0].tub.registerReference(sp)
378 d = self.clients[1].tub.getReference(sp_furl)
379 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
380 def _got_stats(stats):
382 #from pprint import pprint
385 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
386 c = stats["counters"]
387 self.failUnless("storage_server.allocate" in c)
388 d.addCallback(_got_stats)
390 d.addCallback(_grab_stats)
394 def _find_all_shares(self, basedir):
396 for (dirpath, dirnames, filenames) in os.walk(basedir):
397 if "storage" not in dirpath:
401 pieces = dirpath.split(os.sep)
403 and pieces[-4] == "storage"
404 and pieces[-3] == "shares"):
405 # we're sitting in .../storage/shares/$START/$SINDEX , and there
406 # are sharefiles here
407 assert pieces[-5].startswith("client")
408 client_num = int(pieces[-5][-1])
409 storage_index_s = pieces[-1]
410 storage_index = si_a2b(storage_index_s)
411 for sharename in filenames:
412 shnum = int(sharename)
413 filename = os.path.join(dirpath, sharename)
414 data = (client_num, storage_index, filename, shnum)
417 self.fail("unable to find any share files in %s" % basedir)
420 def _corrupt_mutable_share(self, filename, which):
421 msf = MutableShareFile(filename)
422 datav = msf.readv([ (0, 1000000) ])
423 final_share = datav[0]
424 assert len(final_share) < 1000000 # ought to be truncated
425 pieces = mutable_layout.unpack_share(final_share)
426 (seqnum, root_hash, IV, k, N, segsize, datalen,
427 verification_key, signature, share_hash_chain, block_hash_tree,
428 share_data, enc_privkey) = pieces
430 if which == "seqnum":
433 root_hash = self.flip_bit(root_hash)
435 IV = self.flip_bit(IV)
436 elif which == "segsize":
437 segsize = segsize + 15
438 elif which == "pubkey":
439 verification_key = self.flip_bit(verification_key)
440 elif which == "signature":
441 signature = self.flip_bit(signature)
442 elif which == "share_hash_chain":
443 nodenum = share_hash_chain.keys()[0]
444 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
445 elif which == "block_hash_tree":
446 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
447 elif which == "share_data":
448 share_data = self.flip_bit(share_data)
449 elif which == "encprivkey":
450 enc_privkey = self.flip_bit(enc_privkey)
452 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
454 final_share = mutable_layout.pack_share(prefix,
461 msf.writev( [(0, final_share)], None)
464 def test_mutable(self):
465 self.basedir = "system/SystemTest/test_mutable"
466 DATA = "initial contents go here." # 25 bytes % 3 != 0
467 NEWDATA = "new contents yay"
468 NEWERDATA = "this is getting old"
470 d = self.set_up_nodes(use_key_generator=True)
472 def _create_mutable(res):
474 log.msg("starting create_mutable_file")
475 d1 = c.create_mutable_file(DATA)
477 log.msg("DONE: %s" % (res,))
478 self._mutable_node_1 = res
479 d1.addCallback(_done)
481 d.addCallback(_create_mutable)
483 def _test_debug(res):
484 # find a share. It is important to run this while there is only
485 # one slot in the grid.
486 shares = self._find_all_shares(self.basedir)
487 (client_num, storage_index, filename, shnum) = shares[0]
488 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
490 log.msg(" for clients[%d]" % client_num)
492 out,err = StringIO(), StringIO()
493 rc = runner.runner(["debug", "dump-share", "--offsets",
495 stdout=out, stderr=err)
496 output = out.getvalue()
497 self.failUnlessEqual(rc, 0)
499 self.failUnless("Mutable slot found:\n" in output)
500 self.failUnless("share_type: SDMF\n" in output)
501 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
502 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
503 self.failUnless(" num_extra_leases: 0\n" in output)
504 self.failUnless(" secrets are for nodeid: %s\n" % peerid
506 self.failUnless(" SDMF contents:\n" in output)
507 self.failUnless(" seqnum: 1\n" in output)
508 self.failUnless(" required_shares: 3\n" in output)
509 self.failUnless(" total_shares: 10\n" in output)
510 self.failUnless(" segsize: 27\n" in output, (output, filename))
511 self.failUnless(" datalen: 25\n" in output)
512 # the exact share_hash_chain nodes depends upon the sharenum,
513 # and is more of a hassle to compute than I want to deal with
515 self.failUnless(" share_hash_chain: " in output)
516 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
517 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
518 base32.b2a(storage_index))
519 self.failUnless(expected in output)
520 except unittest.FailTest:
522 print "dump-share output was:"
525 d.addCallback(_test_debug)
529 # first, let's see if we can use the existing node to retrieve the
530 # contents. This allows it to use the cached pubkey and maybe the
531 # latest-known sharemap.
533 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
534 def _check_download_1(res):
535 self.failUnlessEqual(res, DATA)
536 # now we see if we can retrieve the data from a new node,
537 # constructed using the URI of the original one. We do this test
538 # on the same client that uploaded the data.
539 uri = self._mutable_node_1.get_uri()
540 log.msg("starting retrieve1")
541 newnode = self.clients[0].create_node_from_uri(uri)
542 newnode_2 = self.clients[0].create_node_from_uri(uri)
543 self.failUnlessIdentical(newnode, newnode_2)
544 return newnode.download_best_version()
545 d.addCallback(_check_download_1)
547 def _check_download_2(res):
548 self.failUnlessEqual(res, DATA)
549 # same thing, but with a different client
550 uri = self._mutable_node_1.get_uri()
551 newnode = self.clients[1].create_node_from_uri(uri)
552 log.msg("starting retrieve2")
553 d1 = newnode.download_best_version()
554 d1.addCallback(lambda res: (res, newnode))
556 d.addCallback(_check_download_2)
558 def _check_download_3((res, newnode)):
559 self.failUnlessEqual(res, DATA)
561 log.msg("starting replace1")
562 d1 = newnode.overwrite(NEWDATA)
563 d1.addCallback(lambda res: newnode.download_best_version())
565 d.addCallback(_check_download_3)
567 def _check_download_4(res):
568 self.failUnlessEqual(res, NEWDATA)
569 # now create an even newer node and replace the data on it. This
570 # new node has never been used for download before.
571 uri = self._mutable_node_1.get_uri()
572 newnode1 = self.clients[2].create_node_from_uri(uri)
573 newnode2 = self.clients[3].create_node_from_uri(uri)
574 self._newnode3 = self.clients[3].create_node_from_uri(uri)
575 log.msg("starting replace2")
576 d1 = newnode1.overwrite(NEWERDATA)
577 d1.addCallback(lambda res: newnode2.download_best_version())
579 d.addCallback(_check_download_4)
581 def _check_download_5(res):
582 log.msg("finished replace2")
583 self.failUnlessEqual(res, NEWERDATA)
584 d.addCallback(_check_download_5)
586 def _corrupt_shares(res):
587 # run around and flip bits in all but k of the shares, to test
589 shares = self._find_all_shares(self.basedir)
590 ## sort by share number
591 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
592 where = dict([ (shnum, filename)
593 for (client_num, storage_index, filename, shnum)
595 assert len(where) == 10 # this test is designed for 3-of-10
596 for shnum, filename in where.items():
597 # shares 7,8,9 are left alone. read will check
598 # (share_hash_chain, block_hash_tree, share_data). New
599 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
600 # segsize, signature).
602 # read: this will trigger "pubkey doesn't match
604 self._corrupt_mutable_share(filename, "pubkey")
605 self._corrupt_mutable_share(filename, "encprivkey")
607 # triggers "signature is invalid"
608 self._corrupt_mutable_share(filename, "seqnum")
610 # triggers "signature is invalid"
611 self._corrupt_mutable_share(filename, "R")
613 # triggers "signature is invalid"
614 self._corrupt_mutable_share(filename, "segsize")
616 self._corrupt_mutable_share(filename, "share_hash_chain")
618 self._corrupt_mutable_share(filename, "block_hash_tree")
620 self._corrupt_mutable_share(filename, "share_data")
621 # other things to correct: IV, signature
622 # 7,8,9 are left alone
624 # note that initial_query_count=5 means that we'll hit the
625 # first 5 servers in effectively random order (based upon
626 # response time), so we won't necessarily ever get a "pubkey
627 # doesn't match fingerprint" error (if we hit shnum>=1 before
628 # shnum=0, we pull the pubkey from there). To get repeatable
629 # specific failures, we need to set initial_query_count=1,
630 # but of course that will change the sequencing behavior of
631 # the retrieval process. TODO: find a reasonable way to make
632 # this a parameter, probably when we expand this test to test
633 # for one failure mode at a time.
635 # when we retrieve this, we should get three signature
636 # failures (where we've mangled seqnum, R, and segsize). The
638 d.addCallback(_corrupt_shares)
640 d.addCallback(lambda res: self._newnode3.download_best_version())
641 d.addCallback(_check_download_5)
643 def _check_empty_file(res):
644 # make sure we can create empty files, this usually screws up the
646 d1 = self.clients[2].create_mutable_file("")
647 d1.addCallback(lambda newnode: newnode.download_best_version())
648 d1.addCallback(lambda res: self.failUnlessEqual("", res))
650 d.addCallback(_check_empty_file)
652 d.addCallback(lambda res: self.clients[0].create_dirnode())
653 def _created_dirnode(dnode):
654 log.msg("_created_dirnode(%s)" % (dnode,))
656 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
657 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
658 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
659 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
660 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
661 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
662 d1.addCallback(lambda res: dnode.build_manifest().when_done())
663 d1.addCallback(lambda res:
664 self.failUnlessEqual(len(res["manifest"]), 1))
666 d.addCallback(_created_dirnode)
668 def wait_for_c3_kg_conn():
669 return self.clients[3]._key_generator is not None
670 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
672 def check_kg_poolsize(junk, size_delta):
673 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
674 self.key_generator_svc.key_generator.pool_size + size_delta)
676 d.addCallback(check_kg_poolsize, 0)
677 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
678 d.addCallback(check_kg_poolsize, -1)
679 d.addCallback(lambda junk: self.clients[3].create_dirnode())
680 d.addCallback(check_kg_poolsize, -2)
681 # use_helper induces use of clients[3], which is the using-key_gen client
682 d.addCallback(lambda junk:
683 self.POST("uri?t=mkdir&name=george", use_helper=True))
684 d.addCallback(check_kg_poolsize, -3)
688 def flip_bit(self, good):
689 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
691 def mangle_uri(self, gooduri):
692 # change the key, which changes the storage index, which means we'll
693 # be asking about the wrong file, so nobody will have any shares
694 u = uri.from_string(gooduri)
695 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
696 uri_extension_hash=u.uri_extension_hash,
697 needed_shares=u.needed_shares,
698 total_shares=u.total_shares,
700 return u2.to_string()
702 # TODO: add a test which mangles the uri_extension_hash instead, and
703 # should fail due to not being able to get a valid uri_extension block.
704 # Also a test which sneakily mangles the uri_extension block to change
705 # some of the validation data, so it will fail in the post-download phase
706 # when the file's crypttext integrity check fails. Do the same thing for
707 # the key, which should cause the download to fail the post-download
708 # plaintext_hash check.
710 def test_filesystem(self):
711 self.basedir = "system/SystemTest/test_filesystem"
712 self.data = LARGE_DATA
713 d = self.set_up_nodes(use_stats_gatherer=True)
714 def _new_happy_semantics(ign):
715 for c in self.clients:
716 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
717 d.addCallback(_new_happy_semantics)
718 d.addCallback(self._test_introweb)
719 d.addCallback(self.log, "starting publish")
720 d.addCallback(self._do_publish1)
721 d.addCallback(self._test_runner)
722 d.addCallback(self._do_publish2)
723 # at this point, we have the following filesystem (where "R" denotes
724 # self._root_directory_uri):
727 # R/subdir1/mydata567
729 # R/subdir1/subdir2/mydata992
731 d.addCallback(lambda res: self.bounce_client(0))
732 d.addCallback(self.log, "bounced client0")
734 d.addCallback(self._check_publish1)
735 d.addCallback(self.log, "did _check_publish1")
736 d.addCallback(self._check_publish2)
737 d.addCallback(self.log, "did _check_publish2")
738 d.addCallback(self._do_publish_private)
739 d.addCallback(self.log, "did _do_publish_private")
740 # now we also have (where "P" denotes a new dir):
741 # P/personal/sekrit data
742 # P/s2-rw -> /subdir1/subdir2/
743 # P/s2-ro -> /subdir1/subdir2/ (read-only)
744 d.addCallback(self._check_publish_private)
745 d.addCallback(self.log, "did _check_publish_private")
746 d.addCallback(self._test_web)
747 d.addCallback(self._test_control)
748 d.addCallback(self._test_cli)
749 # P now has four top-level children:
750 # P/personal/sekrit data
753 # P/test_put/ (empty)
754 d.addCallback(self._test_checker)
757 def _test_introweb(self, res):
758 d = getPage(self.introweb_url, method="GET", followRedirect=True)
761 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res)
762 verstr = str(allmydata.__version__)
764 # The Python "rational version numbering" convention
765 # disallows "-r$REV" but allows ".post$REV"
766 # instead. Eventually we'll probably move to
767 # that. When we do, this test won't go red:
768 ix = verstr.rfind('-r')
770 altverstr = verstr[:ix] + '.post' + verstr[ix+2:]
772 ix = verstr.rfind('.post')
774 altverstr = verstr[:ix] + '-r' + verstr[ix+5:]
778 appverstr = "%s: %s" % (allmydata.__appname__, verstr)
779 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
781 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
782 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
783 self.failUnless("Subscription Summary: storage: 5" in res)
784 except unittest.FailTest:
786 print "GET %s output was:" % self.introweb_url
789 d.addCallback(_check)
790 d.addCallback(lambda res:
791 getPage(self.introweb_url + "?t=json",
792 method="GET", followRedirect=True))
793 def _check_json(res):
794 data = simplejson.loads(res)
796 self.failUnlessEqual(data["subscription_summary"],
798 self.failUnlessEqual(data["announcement_summary"],
799 {"storage": 5, "stub_client": 5})
800 self.failUnlessEqual(data["announcement_distinct_hosts"],
801 {"storage": 1, "stub_client": 1})
802 except unittest.FailTest:
804 print "GET %s?t=json output was:" % self.introweb_url
807 d.addCallback(_check_json)
810 def _do_publish1(self, res):
811 ut = upload.Data(self.data, convergence=None)
813 d = c0.create_dirnode()
814 def _made_root(new_dirnode):
815 self._root_directory_uri = new_dirnode.get_uri()
816 return c0.create_node_from_uri(self._root_directory_uri)
817 d.addCallback(_made_root)
818 d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
819 def _made_subdir1(subdir1_node):
820 self._subdir1_node = subdir1_node
821 d1 = subdir1_node.add_file(u"mydata567", ut)
822 d1.addCallback(self.log, "publish finished")
823 def _stash_uri(filenode):
824 self.uri = filenode.get_uri()
825 assert isinstance(self.uri, str), (self.uri, filenode)
826 d1.addCallback(_stash_uri)
828 d.addCallback(_made_subdir1)
831 def _do_publish2(self, res):
832 ut = upload.Data(self.data, convergence=None)
833 d = self._subdir1_node.create_subdirectory(u"subdir2")
834 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
837 def log(self, res, *args, **kwargs):
838 # print "MSG: %s RES: %s" % (msg, args)
839 log.msg(*args, **kwargs)
842 def _do_publish_private(self, res):
843 self.smalldata = "sssh, very secret stuff"
844 ut = upload.Data(self.smalldata, convergence=None)
845 d = self.clients[0].create_dirnode()
846 d.addCallback(self.log, "GOT private directory")
847 def _got_new_dir(privnode):
848 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
849 d1 = privnode.create_subdirectory(u"personal")
850 d1.addCallback(self.log, "made P/personal")
851 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
852 d1.addCallback(self.log, "made P/personal/sekrit data")
853 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
855 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
856 s2node.get_readonly_uri())
857 d2.addCallback(lambda node:
858 privnode.set_uri(u"s2-ro",
859 s2node.get_readonly_uri(),
860 s2node.get_readonly_uri()))
862 d1.addCallback(_got_s2)
863 d1.addCallback(lambda res: privnode)
865 d.addCallback(_got_new_dir)
868 def _check_publish1(self, res):
869 # this one uses the iterative API
871 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
872 d.addCallback(self.log, "check_publish1 got /")
873 d.addCallback(lambda root: root.get(u"subdir1"))
874 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
875 d.addCallback(lambda filenode: download_to_data(filenode))
876 d.addCallback(self.log, "get finished")
878 self.failUnlessEqual(data, self.data)
879 d.addCallback(_get_done)
882 def _check_publish2(self, res):
883 # this one uses the path-based API
884 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
885 d = rootnode.get_child_at_path(u"subdir1")
886 d.addCallback(lambda dirnode:
887 self.failUnless(IDirectoryNode.providedBy(dirnode)))
888 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
889 d.addCallback(lambda filenode: download_to_data(filenode))
890 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
892 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
893 def _got_filenode(filenode):
894 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
895 assert fnode == filenode
896 d.addCallback(_got_filenode)
899 def _check_publish_private(self, resnode):
900 # this one uses the path-based API
901 self._private_node = resnode
903 d = self._private_node.get_child_at_path(u"personal")
904 def _got_personal(personal):
905 self._personal_node = personal
907 d.addCallback(_got_personal)
909 d.addCallback(lambda dirnode:
910 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
912 return self._private_node.get_child_at_path(path)
914 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
915 d.addCallback(lambda filenode: download_to_data(filenode))
916 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
917 d.addCallback(lambda res: get_path(u"s2-rw"))
918 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
919 d.addCallback(lambda res: get_path(u"s2-ro"))
920 def _got_s2ro(dirnode):
921 self.failUnless(dirnode.is_mutable(), dirnode)
922 self.failUnless(dirnode.is_readonly(), dirnode)
923 d1 = defer.succeed(None)
924 d1.addCallback(lambda res: dirnode.list())
925 d1.addCallback(self.log, "dirnode.list")
927 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
929 d1.addCallback(self.log, "doing add_file(ro)")
930 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
931 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
933 d1.addCallback(self.log, "doing get(ro)")
934 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
935 d1.addCallback(lambda filenode:
936 self.failUnless(IFileNode.providedBy(filenode)))
938 d1.addCallback(self.log, "doing delete(ro)")
939 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
941 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
943 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
945 personal = self._personal_node
946 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
948 d1.addCallback(self.log, "doing move_child_to(ro)2")
949 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
951 d1.addCallback(self.log, "finished with _got_s2ro")
953 d.addCallback(_got_s2ro)
954 def _got_home(dummy):
955 home = self._private_node
956 personal = self._personal_node
957 d1 = defer.succeed(None)
958 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
959 d1.addCallback(lambda res:
960 personal.move_child_to(u"sekrit data",home,u"sekrit"))
962 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
963 d1.addCallback(lambda res:
964 home.move_child_to(u"sekrit", home, u"sekrit data"))
966 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
967 d1.addCallback(lambda res:
968 home.move_child_to(u"sekrit data", personal))
970 d1.addCallback(lambda res: home.build_manifest().when_done())
971 d1.addCallback(self.log, "manifest")
975 # P/personal/sekrit data
976 # P/s2-rw (same as P/s2-ro)
977 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
978 d1.addCallback(lambda res:
979 self.failUnlessEqual(len(res["manifest"]), 5))
980 d1.addCallback(lambda res: home.start_deep_stats().when_done())
981 def _check_stats(stats):
982 expected = {"count-immutable-files": 1,
983 "count-mutable-files": 0,
984 "count-literal-files": 1,
986 "count-directories": 3,
987 "size-immutable-files": 112,
988 "size-literal-files": 23,
989 #"size-directories": 616, # varies
990 #"largest-directory": 616,
991 "largest-directory-children": 3,
992 "largest-immutable-file": 112,
994 for k,v in expected.iteritems():
995 self.failUnlessEqual(stats[k], v,
996 "stats[%s] was %s, not %s" %
998 self.failUnless(stats["size-directories"] > 1300,
999 stats["size-directories"])
1000 self.failUnless(stats["largest-directory"] > 800,
1001 stats["largest-directory"])
1002 self.failUnlessEqual(stats["size-files-histogram"],
1003 [ (11, 31, 1), (101, 316, 1) ])
1004 d1.addCallback(_check_stats)
1006 d.addCallback(_got_home)
1009 def shouldFail(self, res, expected_failure, which, substring=None):
1010 if isinstance(res, Failure):
1011 res.trap(expected_failure)
1013 self.failUnless(substring in str(res),
1014 "substring '%s' not in '%s'"
1015 % (substring, str(res)))
1017 self.fail("%s was supposed to raise %s, not get '%s'" %
1018 (which, expected_failure, res))
1020 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1021 assert substring is None or isinstance(substring, str)
1022 d = defer.maybeDeferred(callable, *args, **kwargs)
1024 if isinstance(res, Failure):
1025 res.trap(expected_failure)
1027 self.failUnless(substring in str(res),
1028 "substring '%s' not in '%s'"
1029 % (substring, str(res)))
1031 self.fail("%s was supposed to raise %s, not get '%s'" %
1032 (which, expected_failure, res))
1036 def PUT(self, urlpath, data):
1037 url = self.webish_url + urlpath
1038 return getPage(url, method="PUT", postdata=data)
1040 def GET(self, urlpath, followRedirect=False):
1041 url = self.webish_url + urlpath
1042 return getPage(url, method="GET", followRedirect=followRedirect)
1044 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1045 sepbase = "boogabooga"
1046 sep = "--" + sepbase
1049 form.append('Content-Disposition: form-data; name="_charset"')
1051 form.append('UTF-8')
1053 for name, value in fields.iteritems():
1054 if isinstance(value, tuple):
1055 filename, value = value
1056 form.append('Content-Disposition: form-data; name="%s"; '
1057 'filename="%s"' % (name, filename.encode("utf-8")))
1059 form.append('Content-Disposition: form-data; name="%s"' % name)
1061 form.append(str(value))
1067 body = "\r\n".join(form) + "\r\n"
1068 headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1069 return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1071 def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1074 url = self.helper_webish_url + urlpath
1076 url = self.webish_url + urlpath
1077 return getPage(url, method="POST", postdata=body, headers=headers,
1078 followRedirect=followRedirect)
1080 def _test_web(self, res):
1081 base = self.webish_url
1082 public = "uri/" + self._root_directory_uri
1084 def _got_welcome(page):
1085 # XXX This test is oversensitive to formatting
1086 expected = "Connected to <span>%d</span>\n of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1087 self.failUnless(expected in page,
1088 "I didn't see the right 'connected storage servers'"
1089 " message in: %s" % page
1091 expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1092 self.failUnless(expected in page,
1093 "I didn't see the right 'My nodeid' message "
1095 self.failUnless("Helper: 0 active uploads" in page)
1096 d.addCallback(_got_welcome)
1097 d.addCallback(self.log, "done with _got_welcome")
1099 # get the welcome page from the node that uses the helper too
1100 d.addCallback(lambda res: getPage(self.helper_webish_url))
1101 def _got_welcome_helper(page):
1102 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1104 self.failUnless("Not running helper" in page)
1105 d.addCallback(_got_welcome_helper)
1107 d.addCallback(lambda res: getPage(base + public))
1108 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1109 def _got_subdir1(page):
1110 # there ought to be an href for our file
1111 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1112 self.failUnless(">mydata567</a>" in page)
1113 d.addCallback(_got_subdir1)
1114 d.addCallback(self.log, "done with _got_subdir1")
1115 d.addCallback(lambda res:
1116 getPage(base + public + "/subdir1/mydata567"))
1117 def _got_data(page):
1118 self.failUnlessEqual(page, self.data)
1119 d.addCallback(_got_data)
1121 # download from a URI embedded in a URL
1122 d.addCallback(self.log, "_get_from_uri")
1123 def _get_from_uri(res):
1124 return getPage(base + "uri/%s?filename=%s"
1125 % (self.uri, "mydata567"))
1126 d.addCallback(_get_from_uri)
1127 def _got_from_uri(page):
1128 self.failUnlessEqual(page, self.data)
1129 d.addCallback(_got_from_uri)
1131 # download from a URI embedded in a URL, second form
1132 d.addCallback(self.log, "_get_from_uri2")
1133 def _get_from_uri2(res):
1134 return getPage(base + "uri?uri=%s" % (self.uri,))
1135 d.addCallback(_get_from_uri2)
1136 d.addCallback(_got_from_uri)
1138 # download from a bogus URI, make sure we get a reasonable error
1139 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1140 def _get_from_bogus_uri(res):
1141 d1 = getPage(base + "uri/%s?filename=%s"
1142 % (self.mangle_uri(self.uri), "mydata567"))
1143 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1146 d.addCallback(_get_from_bogus_uri)
1147 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1149 # upload a file with PUT
1150 d.addCallback(self.log, "about to try PUT")
1151 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1152 "new.txt contents"))
1153 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1154 d.addCallback(self.failUnlessEqual, "new.txt contents")
1155 # and again with something large enough to use multiple segments,
1156 # and hopefully trigger pauseProducing too
1157 def _new_happy_semantics(ign):
1158 for c in self.clients:
1159 # these get reset somewhere? Whatever.
1160 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1161 d.addCallback(_new_happy_semantics)
1162 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1163 "big" * 500000)) # 1.5MB
1164 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1165 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1167 # can we replace files in place?
1168 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1170 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1171 d.addCallback(self.failUnlessEqual, "NEWER contents")
1173 # test unlinked POST
1174 d.addCallback(lambda res: self.POST("uri", t="upload",
1175 file=("new.txt", "data" * 10000)))
1176 # and again using the helper, which exercises different upload-status
1178 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1179 file=("foo.txt", "data2" * 10000)))
1181 # check that the status page exists
1182 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1183 def _got_status(res):
1184 # find an interesting upload and download to look at. LIT files
1185 # are not interesting.
1186 h = self.clients[0].get_history()
1187 for ds in h.list_all_download_statuses():
1188 if ds.get_size() > 200:
1189 self._down_status = ds.get_counter()
1190 for us in h.list_all_upload_statuses():
1191 if us.get_size() > 200:
1192 self._up_status = us.get_counter()
1193 rs = list(h.list_all_retrieve_statuses())[0]
1194 self._retrieve_status = rs.get_counter()
1195 ps = list(h.list_all_publish_statuses())[0]
1196 self._publish_status = ps.get_counter()
1197 us = list(h.list_all_mapupdate_statuses())[0]
1198 self._update_status = us.get_counter()
1200 # and that there are some upload- and download- status pages
1201 return self.GET("status/up-%d" % self._up_status)
1202 d.addCallback(_got_status)
1204 return self.GET("status/down-%d" % self._down_status)
1205 d.addCallback(_got_up)
1207 return self.GET("status/mapupdate-%d" % self._update_status)
1208 d.addCallback(_got_down)
1209 def _got_update(res):
1210 return self.GET("status/publish-%d" % self._publish_status)
1211 d.addCallback(_got_update)
1212 def _got_publish(res):
1213 return self.GET("status/retrieve-%d" % self._retrieve_status)
1214 d.addCallback(_got_publish)
1216 # check that the helper status page exists
1217 d.addCallback(lambda res:
1218 self.GET("helper_status", followRedirect=True))
1219 def _got_helper_status(res):
1220 self.failUnless("Bytes Fetched:" in res)
1221 # touch a couple of files in the helper's working directory to
1222 # exercise more code paths
1223 workdir = os.path.join(self.getdir("client0"), "helper")
1224 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1225 f = open(incfile, "wb")
1226 f.write("small file")
1228 then = time.time() - 86400*3
1230 os.utime(incfile, (now, then))
1231 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1232 f = open(encfile, "wb")
1233 f.write("less small file")
1235 os.utime(encfile, (now, then))
1236 d.addCallback(_got_helper_status)
1237 # and that the json form exists
1238 d.addCallback(lambda res:
1239 self.GET("helper_status?t=json", followRedirect=True))
1240 def _got_helper_status_json(res):
1241 data = simplejson.loads(res)
1242 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1244 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1245 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1246 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1248 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1249 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1250 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1252 d.addCallback(_got_helper_status_json)
1254 # and check that client[3] (which uses a helper but does not run one
1255 # itself) doesn't explode when you ask for its status
1256 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1257 def _got_non_helper_status(res):
1258 self.failUnless("Upload and Download Status" in res)
1259 d.addCallback(_got_non_helper_status)
1261 # or for helper status with t=json
1262 d.addCallback(lambda res:
1263 getPage(self.helper_webish_url + "helper_status?t=json"))
1264 def _got_non_helper_status_json(res):
1265 data = simplejson.loads(res)
1266 self.failUnlessEqual(data, {})
1267 d.addCallback(_got_non_helper_status_json)
1269 # see if the statistics page exists
1270 d.addCallback(lambda res: self.GET("statistics"))
1271 def _got_stats(res):
1272 self.failUnless("Node Statistics" in res)
1273 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1274 d.addCallback(_got_stats)
1275 d.addCallback(lambda res: self.GET("statistics?t=json"))
1276 def _got_stats_json(res):
1277 data = simplejson.loads(res)
1278 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1279 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1280 d.addCallback(_got_stats_json)
1282 # TODO: mangle the second segment of a file, to test errors that
1283 # occur after we've already sent some good data, which uses a
1284 # different error path.
1286 # TODO: download a URI with a form
1287 # TODO: create a directory by using a form
1288 # TODO: upload by using a form on the directory page
1289 # url = base + "somedir/subdir1/freeform_post!!upload"
1290 # TODO: delete a file by using a button on the directory page
1294 def _test_runner(self, res):
1295 # exercise some of the diagnostic tools in runner.py
1298 for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1299 if "storage" not in dirpath:
1303 pieces = dirpath.split(os.sep)
1304 if (len(pieces) >= 4
1305 and pieces[-4] == "storage"
1306 and pieces[-3] == "shares"):
1307 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1308 # are sharefiles here
1309 filename = os.path.join(dirpath, filenames[0])
1310 # peek at the magic to see if it is a chk share
1311 magic = open(filename, "rb").read(4)
1312 if magic == '\x00\x00\x00\x01':
1315 self.fail("unable to find any uri_extension files in %r"
1317 log.msg("test_system.SystemTest._test_runner using %r" % filename)
1319 out,err = StringIO(), StringIO()
1320 rc = runner.runner(["debug", "dump-share", "--offsets",
1321 unicode_to_argv(filename)],
1322 stdout=out, stderr=err)
1323 output = out.getvalue()
1324 self.failUnlessEqual(rc, 0)
1326 # we only upload a single file, so we can assert some things about
1327 # its size and shares.
1328 self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1329 self.failUnlessIn("size: %d\n" % len(self.data), output)
1330 self.failUnlessIn("num_segments: 1\n", output)
1331 # segment_size is always a multiple of needed_shares
1332 self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1333 self.failUnlessIn("total_shares: 10\n", output)
1334 # keys which are supposed to be present
1335 for key in ("size", "num_segments", "segment_size",
1336 "needed_shares", "total_shares",
1337 "codec_name", "codec_params", "tail_codec_params",
1338 #"plaintext_hash", "plaintext_root_hash",
1339 "crypttext_hash", "crypttext_root_hash",
1340 "share_root_hash", "UEB_hash"):
1341 self.failUnlessIn("%s: " % key, output)
1342 self.failUnlessIn(" verify-cap: URI:CHK-Verifier:", output)
1344 # now use its storage index to find the other shares using the
1345 # 'find-shares' tool
1346 sharedir, shnum = os.path.split(filename)
1347 storagedir, storage_index_s = os.path.split(sharedir)
1348 storage_index_s = str(storage_index_s)
1349 out,err = StringIO(), StringIO()
1350 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1351 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1352 rc = runner.runner(cmd, stdout=out, stderr=err)
1353 self.failUnlessEqual(rc, 0)
1355 sharefiles = [sfn.strip() for sfn in out.readlines()]
1356 self.failUnlessEqual(len(sharefiles), 10)
1358 # also exercise the 'catalog-shares' tool
1359 out,err = StringIO(), StringIO()
1360 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1361 cmd = ["debug", "catalog-shares"] + nodedirs
1362 rc = runner.runner(cmd, stdout=out, stderr=err)
1363 self.failUnlessEqual(rc, 0)
1365 descriptions = [sfn.strip() for sfn in out.readlines()]
1366 self.failUnlessEqual(len(descriptions), 30)
1368 for line in descriptions
1369 if line.startswith("CHK %s " % storage_index_s)]
1370 self.failUnlessEqual(len(matching), 10)
1372 def _test_control(self, res):
1373 # exercise the remote-control-the-client foolscap interfaces in
1374 # allmydata.control (mostly used for performance tests)
1375 c0 = self.clients[0]
1376 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1377 control_furl = open(control_furl_file, "r").read().strip()
1378 # it doesn't really matter which Tub we use to connect to the client,
1379 # so let's just use our IntroducerNode's
1380 d = self.introducer.tub.getReference(control_furl)
1381 d.addCallback(self._test_control2, control_furl_file)
1383 def _test_control2(self, rref, filename):
1384 d = rref.callRemote("upload_from_file_to_uri",
1385 filename.encode(get_filesystem_encoding()), convergence=None)
1386 downfile = os.path.join(self.basedir, "control.downfile").encode(get_filesystem_encoding())
1387 d.addCallback(lambda uri:
1388 rref.callRemote("download_from_uri_to_file",
1391 self.failUnlessEqual(res, downfile)
1392 data = open(downfile, "r").read()
1393 expected_data = open(filename, "r").read()
1394 self.failUnlessEqual(data, expected_data)
1395 d.addCallback(_check)
1396 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1397 if sys.platform == "linux2":
1398 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1399 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1402 def _test_cli(self, res):
1403 # run various CLI commands (in a thread, since they use blocking
1406 private_uri = self._private_node.get_uri()
1407 client0_basedir = self.getdir("client0")
1410 "--node-directory", client0_basedir,
1413 d = defer.succeed(None)
1415 # for compatibility with earlier versions, private/root_dir.cap is
1416 # supposed to be treated as an alias named "tahoe:". Start by making
1417 # sure that works, before we add other aliases.
1419 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1420 f = open(root_file, "w")
1421 f.write(private_uri)
1424 def run(ignored, verb, *args, **kwargs):
1425 stdin = kwargs.get("stdin", "")
1426 newargs = [verb] + nodeargs + list(args)
1427 return self._run_cli(newargs, stdin=stdin)
1429 def _check_ls((out,err), expected_children, unexpected_children=[]):
1430 self.failUnlessEqual(err, "")
1431 for s in expected_children:
1432 self.failUnless(s in out, (s,out))
1433 for s in unexpected_children:
1434 self.failIf(s in out, (s,out))
1436 def _check_ls_root((out,err)):
1437 self.failUnless("personal" in out)
1438 self.failUnless("s2-ro" in out)
1439 self.failUnless("s2-rw" in out)
1440 self.failUnlessEqual(err, "")
1442 # this should reference private_uri
1443 d.addCallback(run, "ls")
1444 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1446 d.addCallback(run, "list-aliases")
1447 def _check_aliases_1((out,err)):
1448 self.failUnlessEqual(err, "")
1449 self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1450 d.addCallback(_check_aliases_1)
1452 # now that that's out of the way, remove root_dir.cap and work with
1454 d.addCallback(lambda res: os.unlink(root_file))
1455 d.addCallback(run, "list-aliases")
1456 def _check_aliases_2((out,err)):
1457 self.failUnlessEqual(err, "")
1458 self.failUnlessEqual(out, "")
1459 d.addCallback(_check_aliases_2)
1461 d.addCallback(run, "mkdir")
1462 def _got_dir( (out,err) ):
1463 self.failUnless(uri.from_string_dirnode(out.strip()))
1465 d.addCallback(_got_dir)
1466 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1468 d.addCallback(run, "list-aliases")
1469 def _check_aliases_3((out,err)):
1470 self.failUnlessEqual(err, "")
1471 self.failUnless("tahoe: " in out)
1472 d.addCallback(_check_aliases_3)
1474 def _check_empty_dir((out,err)):
1475 self.failUnlessEqual(out, "")
1476 self.failUnlessEqual(err, "")
1477 d.addCallback(run, "ls")
1478 d.addCallback(_check_empty_dir)
1480 def _check_missing_dir((out,err)):
1481 # TODO: check that rc==2
1482 self.failUnlessEqual(out, "")
1483 self.failUnlessEqual(err, "No such file or directory\n")
1484 d.addCallback(run, "ls", "bogus")
1485 d.addCallback(_check_missing_dir)
1490 fn = os.path.join(self.basedir, "file%d" % i)
1492 data = "data to be uploaded: file%d\n" % i
1494 open(fn,"wb").write(data)
1496 def _check_stdout_against((out,err), filenum=None, data=None):
1497 self.failUnlessEqual(err, "")
1498 if filenum is not None:
1499 self.failUnlessEqual(out, datas[filenum])
1500 if data is not None:
1501 self.failUnlessEqual(out, data)
1503 # test all both forms of put: from a file, and from stdin
1505 d.addCallback(run, "put", files[0], "tahoe-file0")
1506 def _put_out((out,err)):
1507 self.failUnless("URI:LIT:" in out, out)
1508 self.failUnless("201 Created" in err, err)
1510 return run(None, "get", uri0)
1511 d.addCallback(_put_out)
1512 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1514 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1515 # tahoe put bar tahoe:FOO
1516 d.addCallback(run, "put", files[2], "tahoe:file2")
1517 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1518 def _check_put_mutable((out,err)):
1519 self._mutable_file3_uri = out.strip()
1520 d.addCallback(_check_put_mutable)
1521 d.addCallback(run, "get", "tahoe:file3")
1522 d.addCallback(_check_stdout_against, 3)
1525 STDIN_DATA = "This is the file to upload from stdin."
1526 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1527 # tahoe put tahoe:FOO
1528 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1529 stdin="Other file from stdin.")
1531 d.addCallback(run, "ls")
1532 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1533 "tahoe-file-stdin", "from-stdin"])
1534 d.addCallback(run, "ls", "subdir")
1535 d.addCallback(_check_ls, ["tahoe-file1"])
1538 d.addCallback(run, "mkdir", "subdir2")
1539 d.addCallback(run, "ls")
1540 # TODO: extract the URI, set an alias with it
1541 d.addCallback(_check_ls, ["subdir2"])
1543 # tahoe get: (to stdin and to a file)
1544 d.addCallback(run, "get", "tahoe-file0")
1545 d.addCallback(_check_stdout_against, 0)
1546 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1547 d.addCallback(_check_stdout_against, 1)
1548 outfile0 = os.path.join(self.basedir, "outfile0")
1549 d.addCallback(run, "get", "file2", outfile0)
1550 def _check_outfile0((out,err)):
1551 data = open(outfile0,"rb").read()
1552 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1553 d.addCallback(_check_outfile0)
1554 outfile1 = os.path.join(self.basedir, "outfile0")
1555 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1556 def _check_outfile1((out,err)):
1557 data = open(outfile1,"rb").read()
1558 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1559 d.addCallback(_check_outfile1)
1561 d.addCallback(run, "rm", "tahoe-file0")
1562 d.addCallback(run, "rm", "tahoe:file2")
1563 d.addCallback(run, "ls")
1564 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1566 d.addCallback(run, "ls", "-l")
1567 def _check_ls_l((out,err)):
1568 lines = out.split("\n")
1570 if "tahoe-file-stdin" in l:
1571 self.failUnless(l.startswith("-r-- "), l)
1572 self.failUnless(" %d " % len(STDIN_DATA) in l)
1574 self.failUnless(l.startswith("-rw- "), l) # mutable
1575 d.addCallback(_check_ls_l)
1577 d.addCallback(run, "ls", "--uri")
1578 def _check_ls_uri((out,err)):
1579 lines = out.split("\n")
1582 self.failUnless(self._mutable_file3_uri in l)
1583 d.addCallback(_check_ls_uri)
1585 d.addCallback(run, "ls", "--readonly-uri")
1586 def _check_ls_rouri((out,err)):
1587 lines = out.split("\n")
1590 rw_uri = self._mutable_file3_uri
1591 u = uri.from_string_mutable_filenode(rw_uri)
1592 ro_uri = u.get_readonly().to_string()
1593 self.failUnless(ro_uri in l)
1594 d.addCallback(_check_ls_rouri)
1597 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved-first-time")
1598 d.addCallback(run, "ls")
1599 d.addCallback(_check_ls, ["tahoe-moved-first-time"], ["tahoe-file-stdin"])
1601 def _mv_with_http_proxy(ign):
1603 env['http_proxy'] = env['HTTP_PROXY'] = "http://127.0.0.0:12345" # invalid address
1604 return self._run_cli_in_subprocess(["mv"] + nodeargs + ["tahoe-moved-first-time", "tahoe-moved"], env=env)
1605 d.addCallback(_mv_with_http_proxy)
1607 def _check_mv_with_http_proxy(res):
1608 out, err, rc_or_sig = res
1609 self.failUnlessEqual(rc_or_sig, 0, str(res))
1610 d.addCallback(_check_mv_with_http_proxy)
1612 d.addCallback(run, "ls")
1613 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-moved-firsttime"])
1615 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1616 d.addCallback(run, "ls")
1617 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1619 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1620 d.addCallback(run, "ls")
1621 d.addCallback(_check_ls, ["file3", "file3-copy"])
1622 d.addCallback(run, "get", "tahoe:file3-copy")
1623 d.addCallback(_check_stdout_against, 3)
1625 # copy from disk into tahoe
1626 d.addCallback(run, "cp", files[4], "tahoe:file4")
1627 d.addCallback(run, "ls")
1628 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1629 d.addCallback(run, "get", "tahoe:file4")
1630 d.addCallback(_check_stdout_against, 4)
1632 # copy from tahoe into disk
1633 target_filename = os.path.join(self.basedir, "file-out")
1634 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1635 def _check_cp_out((out,err)):
1636 self.failUnless(os.path.exists(target_filename))
1637 got = open(target_filename,"rb").read()
1638 self.failUnlessEqual(got, datas[4])
1639 d.addCallback(_check_cp_out)
1641 # copy from disk to disk (silly case)
1642 target2_filename = os.path.join(self.basedir, "file-out-copy")
1643 d.addCallback(run, "cp", target_filename, target2_filename)
1644 def _check_cp_out2((out,err)):
1645 self.failUnless(os.path.exists(target2_filename))
1646 got = open(target2_filename,"rb").read()
1647 self.failUnlessEqual(got, datas[4])
1648 d.addCallback(_check_cp_out2)
1650 # copy from tahoe into disk, overwriting an existing file
1651 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1652 def _check_cp_out3((out,err)):
1653 self.failUnless(os.path.exists(target_filename))
1654 got = open(target_filename,"rb").read()
1655 self.failUnlessEqual(got, datas[3])
1656 d.addCallback(_check_cp_out3)
1658 # copy from disk into tahoe, overwriting an existing immutable file
1659 d.addCallback(run, "cp", files[5], "tahoe:file4")
1660 d.addCallback(run, "ls")
1661 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1662 d.addCallback(run, "get", "tahoe:file4")
1663 d.addCallback(_check_stdout_against, 5)
1665 # copy from disk into tahoe, overwriting an existing mutable file
1666 d.addCallback(run, "cp", files[5], "tahoe:file3")
1667 d.addCallback(run, "ls")
1668 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1669 d.addCallback(run, "get", "tahoe:file3")
1670 d.addCallback(_check_stdout_against, 5)
1672 # recursive copy: setup
1673 dn = os.path.join(self.basedir, "dir1")
1675 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1676 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1677 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1678 sdn2 = os.path.join(dn, "subdir2")
1680 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1681 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1683 # from disk into tahoe
1684 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1685 d.addCallback(run, "ls")
1686 d.addCallback(_check_ls, ["dir1"])
1687 d.addCallback(run, "ls", "dir1")
1688 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1689 ["rfile4", "rfile5"])
1690 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1691 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1692 ["rfile1", "rfile2", "rfile3"])
1693 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1694 d.addCallback(_check_stdout_against, data="rfile4")
1696 # and back out again
1697 dn_copy = os.path.join(self.basedir, "dir1-copy")
1698 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1699 def _check_cp_r_out((out,err)):
1701 old = open(os.path.join(dn, name), "rb").read()
1702 newfn = os.path.join(dn_copy, name)
1703 self.failUnless(os.path.exists(newfn))
1704 new = open(newfn, "rb").read()
1705 self.failUnlessEqual(old, new)
1709 _cmp(os.path.join("subdir2", "rfile4"))
1710 _cmp(os.path.join("subdir2", "rfile5"))
1711 d.addCallback(_check_cp_r_out)
1713 # and copy it a second time, which ought to overwrite the same files
1714 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1716 # and again, only writing filecaps
1717 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1718 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1719 def _check_capsonly((out,err)):
1720 # these should all be LITs
1721 x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1722 y = uri.from_string_filenode(x)
1723 self.failUnlessEqual(y.data, "rfile4")
1724 d.addCallback(_check_capsonly)
1726 # and tahoe-to-tahoe
1727 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1728 d.addCallback(run, "ls")
1729 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1730 d.addCallback(run, "ls", "dir1-copy")
1731 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1732 ["rfile4", "rfile5"])
1733 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1734 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1735 ["rfile1", "rfile2", "rfile3"])
1736 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1737 d.addCallback(_check_stdout_against, data="rfile4")
1739 # and copy it a second time, which ought to overwrite the same files
1740 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1742 # tahoe_ls doesn't currently handle the error correctly: it tries to
1743 # JSON-parse a traceback.
1744 ## def _ls_missing(res):
1745 ## argv = ["ls"] + nodeargs + ["bogus"]
1746 ## return self._run_cli(argv)
1747 ## d.addCallback(_ls_missing)
1748 ## def _check_ls_missing((out,err)):
1751 ## self.failUnlessEqual(err, "")
1752 ## d.addCallback(_check_ls_missing)
1756 def _run_cli(self, argv, stdin=""):
1758 stdout, stderr = StringIO(), StringIO()
1759 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1760 stdin=StringIO(stdin),
1761 stdout=stdout, stderr=stderr)
1763 return stdout.getvalue(), stderr.getvalue()
1764 d.addCallback(_done)
1767 def _run_cli_in_subprocess(self, argv, env=None):
1768 self.skip_if_cannot_run_bintahoe()
1772 d = utils.getProcessOutputAndValue(sys.executable, args=[bintahoe] + argv,
1776 def _test_checker(self, res):
1777 ut = upload.Data("too big to be literal" * 200, convergence=None)
1778 d = self._personal_node.add_file(u"big file", ut)
1780 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1781 def _check_dirnode_results(r):
1782 self.failUnless(r.is_healthy())
1783 d.addCallback(_check_dirnode_results)
1784 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1785 d.addCallback(_check_dirnode_results)
1787 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1788 def _got_chk_filenode(n):
1789 self.failUnless(isinstance(n, ImmutableFileNode))
1790 d = n.check(Monitor())
1791 def _check_filenode_results(r):
1792 self.failUnless(r.is_healthy())
1793 d.addCallback(_check_filenode_results)
1794 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1795 d.addCallback(_check_filenode_results)
1797 d.addCallback(_got_chk_filenode)
1799 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1800 def _got_lit_filenode(n):
1801 self.failUnless(isinstance(n, LiteralFileNode))
1802 d = n.check(Monitor())
1803 def _check_lit_filenode_results(r):
1804 self.failUnlessEqual(r, None)
1805 d.addCallback(_check_lit_filenode_results)
1806 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1807 d.addCallback(_check_lit_filenode_results)
1809 d.addCallback(_got_lit_filenode)