1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.util.encodingutil import quote_output
16 from allmydata.util.fileutil import abspath_expanduser_unicode
17 from allmydata.util.consumer import MemoryConsumer, download_to_data
18 from allmydata.scripts import runner
19 from allmydata.interfaces import IDirectoryNode, IFileNode, \
20 NoSuchChildError, NoSharesError
21 from allmydata.monitor import Monitor
22 from allmydata.mutable.common import NotWriteableError
23 from allmydata.mutable import layout as mutable_layout
24 from foolscap.api import DeadReferenceError
25 from twisted.python.failure import Failure
26 from twisted.web.client import getPage
27 from twisted.web.error import Error
29 from allmydata.test.common import SystemTestMixin
32 This is some data to publish to the remote grid.., which needs to be large
33 enough to not fit inside a LIT uri.
36 class CountingDataUploadable(upload.Data):
38 interrupt_after = None
39 interrupt_after_d = None
41 def read(self, length):
42 self.bytes_read += length
43 if self.interrupt_after is not None:
44 if self.bytes_read > self.interrupt_after:
45 self.interrupt_after = None
46 self.interrupt_after_d.callback(self)
47 return upload.Data.read(self, length)
49 class SystemTest(SystemTestMixin, unittest.TestCase):
50 timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
52 def test_connections(self):
53 self.basedir = "system/SystemTest/test_connections"
54 d = self.set_up_nodes()
55 self.extra_node = None
56 d.addCallback(lambda res: self.add_extra_node(self.numclients))
57 def _check(extra_node):
58 self.extra_node = extra_node
59 for c in self.clients:
60 all_peerids = c.get_storage_broker().get_all_serverids()
61 self.failUnlessEqual(len(all_peerids), self.numclients+1)
63 permuted_peers = sb.get_servers_for_index("a")
64 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
67 def _shutdown_extra_node(res):
69 return self.extra_node.stopService()
71 d.addBoth(_shutdown_extra_node)
73 # test_connections is subsumed by test_upload_and_download, and takes
74 # quite a while to run on a slow machine (because of all the TLS
75 # connections that must be established). If we ever rework the introducer
76 # code to such an extent that we're not sure if it works anymore, we can
77 # reinstate this test until it does.
80 def test_upload_and_download_random_key(self):
81 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
82 return self._test_upload_and_download(convergence=None)
84 def test_upload_and_download_convergent(self):
85 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
86 return self._test_upload_and_download(convergence="some convergence string")
88 def _test_upload_and_download(self, convergence):
89 # we use 4000 bytes of data, which will result in about 400k written
90 # to disk among all our simulated nodes
91 DATA = "Some data to upload\n" * 200
92 d = self.set_up_nodes()
93 def _check_connections(res):
94 for c in self.clients:
95 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
96 all_peerids = c.get_storage_broker().get_all_serverids()
97 self.failUnlessEqual(len(all_peerids), self.numclients)
99 permuted_peers = sb.get_servers_for_index("a")
100 self.failUnlessEqual(len(permuted_peers), self.numclients)
101 d.addCallback(_check_connections)
105 u = self.clients[0].getServiceNamed("uploader")
107 # we crank the max segsize down to 1024b for the duration of this
108 # test, so we can exercise multiple segments. It is important
109 # that this is not a multiple of the segment size, so that the
110 # tail segment is not the same length as the others. This actualy
111 # gets rounded up to 1025 to be a multiple of the number of
112 # required shares (since we use 25 out of 100 FEC).
113 up = upload.Data(DATA, convergence=convergence)
114 up.max_segment_size = 1024
117 d.addCallback(_do_upload)
118 def _upload_done(results):
120 log.msg("upload finished: uri is %s" % (theuri,))
122 assert isinstance(self.uri, str), self.uri
123 self.cap = uri.from_string(self.uri)
124 self.n = self.clients[1].create_node_from_uri(self.uri)
125 d.addCallback(_upload_done)
127 def _upload_again(res):
128 # Upload again. If using convergent encryption then this ought to be
129 # short-circuited, however with the way we currently generate URIs
130 # (i.e. because they include the roothash), we have to do all of the
131 # encoding work, and only get to save on the upload part.
132 log.msg("UPLOADING AGAIN")
133 up = upload.Data(DATA, convergence=convergence)
134 up.max_segment_size = 1024
135 return self.uploader.upload(up)
136 d.addCallback(_upload_again)
138 def _download_to_data(res):
139 log.msg("DOWNLOADING")
140 return download_to_data(self.n)
141 d.addCallback(_download_to_data)
142 def _download_to_data_done(data):
143 log.msg("download finished")
144 self.failUnlessEqual(data, DATA)
145 d.addCallback(_download_to_data_done)
148 n = self.clients[1].create_node_from_uri(self.uri)
149 d = download_to_data(n)
150 def _read_done(data):
151 self.failUnlessEqual(data, DATA)
152 d.addCallback(_read_done)
153 d.addCallback(lambda ign:
154 n.read(MemoryConsumer(), offset=1, size=4))
155 def _read_portion_done(mc):
156 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
157 d.addCallback(_read_portion_done)
158 d.addCallback(lambda ign:
159 n.read(MemoryConsumer(), offset=2, size=None))
160 def _read_tail_done(mc):
161 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
162 d.addCallback(_read_tail_done)
163 d.addCallback(lambda ign:
164 n.read(MemoryConsumer(), size=len(DATA)+1000))
165 def _read_too_much(mc):
166 self.failUnlessEqual("".join(mc.chunks), DATA)
167 d.addCallback(_read_too_much)
170 d.addCallback(_test_read)
172 def _test_bad_read(res):
173 bad_u = uri.from_string_filenode(self.uri)
174 bad_u.key = self.flip_bit(bad_u.key)
175 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
176 # this should cause an error during download
178 d = self.shouldFail2(NoSharesError, "'download bad node'",
180 bad_n.read, MemoryConsumer(), offset=2)
182 d.addCallback(_test_bad_read)
184 def _download_nonexistent_uri(res):
185 baduri = self.mangle_uri(self.uri)
186 badnode = self.clients[1].create_node_from_uri(baduri)
187 log.msg("about to download non-existent URI", level=log.UNUSUAL,
188 facility="tahoe.tests")
189 d1 = download_to_data(badnode)
190 def _baduri_should_fail(res):
191 log.msg("finished downloading non-existend URI",
192 level=log.UNUSUAL, facility="tahoe.tests")
193 self.failUnless(isinstance(res, Failure))
194 self.failUnless(res.check(NoSharesError),
195 "expected NoSharesError, got %s" % res)
196 d1.addBoth(_baduri_should_fail)
198 d.addCallback(_download_nonexistent_uri)
200 # add a new node, which doesn't accept shares, and only uses the
202 d.addCallback(lambda res: self.add_extra_node(self.numclients,
204 add_to_sparent=True))
205 def _added(extra_node):
206 self.extra_node = extra_node
207 self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
208 d.addCallback(_added)
210 HELPER_DATA = "Data that needs help to upload" * 1000
211 def _upload_with_helper(res):
212 u = upload.Data(HELPER_DATA, convergence=convergence)
213 d = self.extra_node.upload(u)
214 def _uploaded(results):
215 n = self.clients[1].create_node_from_uri(results.uri)
216 return download_to_data(n)
217 d.addCallback(_uploaded)
219 self.failUnlessEqual(newdata, HELPER_DATA)
220 d.addCallback(_check)
222 d.addCallback(_upload_with_helper)
224 def _upload_duplicate_with_helper(res):
225 u = upload.Data(HELPER_DATA, convergence=convergence)
226 u.debug_stash_RemoteEncryptedUploadable = True
227 d = self.extra_node.upload(u)
228 def _uploaded(results):
229 n = self.clients[1].create_node_from_uri(results.uri)
230 return download_to_data(n)
231 d.addCallback(_uploaded)
233 self.failUnlessEqual(newdata, HELPER_DATA)
234 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
235 "uploadable started uploading, should have been avoided")
236 d.addCallback(_check)
238 if convergence is not None:
239 d.addCallback(_upload_duplicate_with_helper)
241 def _upload_resumable(res):
242 DATA = "Data that needs help to upload and gets interrupted" * 1000
243 u1 = CountingDataUploadable(DATA, convergence=convergence)
244 u2 = CountingDataUploadable(DATA, convergence=convergence)
246 # we interrupt the connection after about 5kB by shutting down
247 # the helper, then restartingit.
248 u1.interrupt_after = 5000
249 u1.interrupt_after_d = defer.Deferred()
250 u1.interrupt_after_d.addCallback(lambda res:
251 self.bounce_client(0))
253 # sneak into the helper and reduce its chunk size, so that our
254 # debug_interrupt will sever the connection on about the fifth
255 # chunk fetched. This makes sure that we've started to write the
256 # new shares before we abandon them, which exercises the
257 # abort/delete-partial-share code. TODO: find a cleaner way to do
258 # this. I know that this will affect later uses of the helper in
259 # this same test run, but I'm not currently worried about it.
260 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
262 d = self.extra_node.upload(u1)
264 def _should_not_finish(res):
265 self.fail("interrupted upload should have failed, not finished"
266 " with result %s" % (res,))
268 f.trap(DeadReferenceError)
270 # make sure we actually interrupted it before finishing the
272 self.failUnless(u1.bytes_read < len(DATA),
273 "read %d out of %d total" % (u1.bytes_read,
276 log.msg("waiting for reconnect", level=log.NOISY,
277 facility="tahoe.test.test_system")
278 # now, we need to give the nodes a chance to notice that this
279 # connection has gone away. When this happens, the storage
280 # servers will be told to abort their uploads, removing the
281 # partial shares. Unfortunately this involves TCP messages
282 # going through the loopback interface, and we can't easily
283 # predict how long that will take. If it were all local, we
284 # could use fireEventually() to stall. Since we don't have
285 # the right introduction hooks, the best we can do is use a
286 # fixed delay. TODO: this is fragile.
287 u1.interrupt_after_d.addCallback(self.stall, 2.0)
288 return u1.interrupt_after_d
289 d.addCallbacks(_should_not_finish, _interrupted)
291 def _disconnected(res):
292 # check to make sure the storage servers aren't still hanging
293 # on to the partial share: their incoming/ directories should
295 log.msg("disconnected", level=log.NOISY,
296 facility="tahoe.test.test_system")
297 for i in range(self.numclients):
298 incdir = os.path.join(self.getdir("client%d" % i),
299 "storage", "shares", "incoming")
300 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
301 d.addCallback(_disconnected)
303 # then we need to give the reconnector a chance to
304 # reestablish the connection to the helper.
305 d.addCallback(lambda res:
306 log.msg("wait_for_connections", level=log.NOISY,
307 facility="tahoe.test.test_system"))
308 d.addCallback(lambda res: self.wait_for_connections())
311 d.addCallback(lambda res:
312 log.msg("uploading again", level=log.NOISY,
313 facility="tahoe.test.test_system"))
314 d.addCallback(lambda res: self.extra_node.upload(u2))
316 def _uploaded(results):
318 log.msg("Second upload complete", level=log.NOISY,
319 facility="tahoe.test.test_system")
321 # this is really bytes received rather than sent, but it's
322 # convenient and basically measures the same thing
323 bytes_sent = results.ciphertext_fetched
324 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
326 # We currently don't support resumption of upload if the data is
327 # encrypted with a random key. (Because that would require us
328 # to store the key locally and re-use it on the next upload of
329 # this file, which isn't a bad thing to do, but we currently
331 if convergence is not None:
332 # Make sure we did not have to read the whole file the
333 # second time around .
334 self.failUnless(bytes_sent < len(DATA),
335 "resumption didn't save us any work:"
336 " read %r bytes out of %r total" %
337 (bytes_sent, len(DATA)))
339 # Make sure we did have to read the whole file the second
340 # time around -- because the one that we partially uploaded
341 # earlier was encrypted with a different random key.
342 self.failIf(bytes_sent < len(DATA),
343 "resumption saved us some work even though we were using random keys:"
344 " read %r bytes out of %r total" %
345 (bytes_sent, len(DATA)))
346 n = self.clients[1].create_node_from_uri(cap)
347 return download_to_data(n)
348 d.addCallback(_uploaded)
351 self.failUnlessEqual(newdata, DATA)
352 # If using convergent encryption, then also check that the
353 # helper has removed the temp file from its directories.
354 if convergence is not None:
355 basedir = os.path.join(self.getdir("client0"), "helper")
356 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
357 self.failUnlessEqual(files, [])
358 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
359 self.failUnlessEqual(files, [])
360 d.addCallback(_check)
362 d.addCallback(_upload_resumable)
364 def _grab_stats(ignored):
365 # the StatsProvider doesn't normally publish a FURL:
366 # instead it passes a live reference to the StatsGatherer
367 # (if and when it connects). To exercise the remote stats
368 # interface, we manually publish client0's StatsProvider
369 # and use client1 to query it.
370 sp = self.clients[0].stats_provider
371 sp_furl = self.clients[0].tub.registerReference(sp)
372 d = self.clients[1].tub.getReference(sp_furl)
373 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
374 def _got_stats(stats):
376 #from pprint import pprint
379 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
380 c = stats["counters"]
381 self.failUnless("storage_server.allocate" in c)
382 d.addCallback(_got_stats)
384 d.addCallback(_grab_stats)
388 def _find_all_shares(self, basedir):
390 for (dirpath, dirnames, filenames) in os.walk(basedir):
391 if "storage" not in dirpath:
395 pieces = dirpath.split(os.sep)
397 and pieces[-4] == "storage"
398 and pieces[-3] == "shares"):
399 # we're sitting in .../storage/shares/$START/$SINDEX , and there
400 # are sharefiles here
401 assert pieces[-5].startswith("client")
402 client_num = int(pieces[-5][-1])
403 storage_index_s = pieces[-1]
404 storage_index = si_a2b(storage_index_s)
405 for sharename in filenames:
406 shnum = int(sharename)
407 filename = os.path.join(dirpath, sharename)
408 data = (client_num, storage_index, filename, shnum)
411 self.fail("unable to find any share files in %s" % basedir)
414 def _corrupt_mutable_share(self, filename, which):
415 msf = MutableShareFile(filename)
416 datav = msf.readv([ (0, 1000000) ])
417 final_share = datav[0]
418 assert len(final_share) < 1000000 # ought to be truncated
419 pieces = mutable_layout.unpack_share(final_share)
420 (seqnum, root_hash, IV, k, N, segsize, datalen,
421 verification_key, signature, share_hash_chain, block_hash_tree,
422 share_data, enc_privkey) = pieces
424 if which == "seqnum":
427 root_hash = self.flip_bit(root_hash)
429 IV = self.flip_bit(IV)
430 elif which == "segsize":
431 segsize = segsize + 15
432 elif which == "pubkey":
433 verification_key = self.flip_bit(verification_key)
434 elif which == "signature":
435 signature = self.flip_bit(signature)
436 elif which == "share_hash_chain":
437 nodenum = share_hash_chain.keys()[0]
438 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
439 elif which == "block_hash_tree":
440 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
441 elif which == "share_data":
442 share_data = self.flip_bit(share_data)
443 elif which == "encprivkey":
444 enc_privkey = self.flip_bit(enc_privkey)
446 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
448 final_share = mutable_layout.pack_share(prefix,
455 msf.writev( [(0, final_share)], None)
458 def test_mutable(self):
459 self.basedir = "system/SystemTest/test_mutable"
460 DATA = "initial contents go here." # 25 bytes % 3 != 0
461 NEWDATA = "new contents yay"
462 NEWERDATA = "this is getting old"
464 d = self.set_up_nodes(use_key_generator=True)
466 def _create_mutable(res):
468 log.msg("starting create_mutable_file")
469 d1 = c.create_mutable_file(DATA)
471 log.msg("DONE: %s" % (res,))
472 self._mutable_node_1 = res
473 d1.addCallback(_done)
475 d.addCallback(_create_mutable)
477 def _test_debug(res):
478 # find a share. It is important to run this while there is only
479 # one slot in the grid.
480 shares = self._find_all_shares(self.basedir)
481 (client_num, storage_index, filename, shnum) = shares[0]
482 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
484 log.msg(" for clients[%d]" % client_num)
486 out,err = StringIO(), StringIO()
487 rc = runner.runner(["debug", "dump-share", "--offsets",
489 stdout=out, stderr=err)
490 output = out.getvalue()
491 self.failUnlessEqual(rc, 0)
493 self.failUnless("Mutable slot found:\n" in output)
494 self.failUnless("share_type: SDMF\n" in output)
495 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
496 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
497 self.failUnless(" num_extra_leases: 0\n" in output)
498 self.failUnless(" secrets are for nodeid: %s\n" % peerid
500 self.failUnless(" SDMF contents:\n" in output)
501 self.failUnless(" seqnum: 1\n" in output)
502 self.failUnless(" required_shares: 3\n" in output)
503 self.failUnless(" total_shares: 10\n" in output)
504 self.failUnless(" segsize: 27\n" in output, (output, filename))
505 self.failUnless(" datalen: 25\n" in output)
506 # the exact share_hash_chain nodes depends upon the sharenum,
507 # and is more of a hassle to compute than I want to deal with
509 self.failUnless(" share_hash_chain: " in output)
510 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
511 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
512 base32.b2a(storage_index))
513 self.failUnless(expected in output)
514 except unittest.FailTest:
516 print "dump-share output was:"
519 d.addCallback(_test_debug)
523 # first, let's see if we can use the existing node to retrieve the
524 # contents. This allows it to use the cached pubkey and maybe the
525 # latest-known sharemap.
527 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
528 def _check_download_1(res):
529 self.failUnlessEqual(res, DATA)
530 # now we see if we can retrieve the data from a new node,
531 # constructed using the URI of the original one. We do this test
532 # on the same client that uploaded the data.
533 uri = self._mutable_node_1.get_uri()
534 log.msg("starting retrieve1")
535 newnode = self.clients[0].create_node_from_uri(uri)
536 newnode_2 = self.clients[0].create_node_from_uri(uri)
537 self.failUnlessIdentical(newnode, newnode_2)
538 return newnode.download_best_version()
539 d.addCallback(_check_download_1)
541 def _check_download_2(res):
542 self.failUnlessEqual(res, DATA)
543 # same thing, but with a different client
544 uri = self._mutable_node_1.get_uri()
545 newnode = self.clients[1].create_node_from_uri(uri)
546 log.msg("starting retrieve2")
547 d1 = newnode.download_best_version()
548 d1.addCallback(lambda res: (res, newnode))
550 d.addCallback(_check_download_2)
552 def _check_download_3((res, newnode)):
553 self.failUnlessEqual(res, DATA)
555 log.msg("starting replace1")
556 d1 = newnode.overwrite(NEWDATA)
557 d1.addCallback(lambda res: newnode.download_best_version())
559 d.addCallback(_check_download_3)
561 def _check_download_4(res):
562 self.failUnlessEqual(res, NEWDATA)
563 # now create an even newer node and replace the data on it. This
564 # new node has never been used for download before.
565 uri = self._mutable_node_1.get_uri()
566 newnode1 = self.clients[2].create_node_from_uri(uri)
567 newnode2 = self.clients[3].create_node_from_uri(uri)
568 self._newnode3 = self.clients[3].create_node_from_uri(uri)
569 log.msg("starting replace2")
570 d1 = newnode1.overwrite(NEWERDATA)
571 d1.addCallback(lambda res: newnode2.download_best_version())
573 d.addCallback(_check_download_4)
575 def _check_download_5(res):
576 log.msg("finished replace2")
577 self.failUnlessEqual(res, NEWERDATA)
578 d.addCallback(_check_download_5)
580 def _corrupt_shares(res):
581 # run around and flip bits in all but k of the shares, to test
583 shares = self._find_all_shares(self.basedir)
584 ## sort by share number
585 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
586 where = dict([ (shnum, filename)
587 for (client_num, storage_index, filename, shnum)
589 assert len(where) == 10 # this test is designed for 3-of-10
590 for shnum, filename in where.items():
591 # shares 7,8,9 are left alone. read will check
592 # (share_hash_chain, block_hash_tree, share_data). New
593 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
594 # segsize, signature).
596 # read: this will trigger "pubkey doesn't match
598 self._corrupt_mutable_share(filename, "pubkey")
599 self._corrupt_mutable_share(filename, "encprivkey")
601 # triggers "signature is invalid"
602 self._corrupt_mutable_share(filename, "seqnum")
604 # triggers "signature is invalid"
605 self._corrupt_mutable_share(filename, "R")
607 # triggers "signature is invalid"
608 self._corrupt_mutable_share(filename, "segsize")
610 self._corrupt_mutable_share(filename, "share_hash_chain")
612 self._corrupt_mutable_share(filename, "block_hash_tree")
614 self._corrupt_mutable_share(filename, "share_data")
615 # other things to correct: IV, signature
616 # 7,8,9 are left alone
618 # note that initial_query_count=5 means that we'll hit the
619 # first 5 servers in effectively random order (based upon
620 # response time), so we won't necessarily ever get a "pubkey
621 # doesn't match fingerprint" error (if we hit shnum>=1 before
622 # shnum=0, we pull the pubkey from there). To get repeatable
623 # specific failures, we need to set initial_query_count=1,
624 # but of course that will change the sequencing behavior of
625 # the retrieval process. TODO: find a reasonable way to make
626 # this a parameter, probably when we expand this test to test
627 # for one failure mode at a time.
629 # when we retrieve this, we should get three signature
630 # failures (where we've mangled seqnum, R, and segsize). The
632 d.addCallback(_corrupt_shares)
634 d.addCallback(lambda res: self._newnode3.download_best_version())
635 d.addCallback(_check_download_5)
637 def _check_empty_file(res):
638 # make sure we can create empty files, this usually screws up the
640 d1 = self.clients[2].create_mutable_file("")
641 d1.addCallback(lambda newnode: newnode.download_best_version())
642 d1.addCallback(lambda res: self.failUnlessEqual("", res))
644 d.addCallback(_check_empty_file)
646 d.addCallback(lambda res: self.clients[0].create_dirnode())
647 def _created_dirnode(dnode):
648 log.msg("_created_dirnode(%s)" % (dnode,))
650 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
651 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
652 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
653 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
654 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
655 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
656 d1.addCallback(lambda res: dnode.build_manifest().when_done())
657 d1.addCallback(lambda res:
658 self.failUnlessEqual(len(res["manifest"]), 1))
660 d.addCallback(_created_dirnode)
662 def wait_for_c3_kg_conn():
663 return self.clients[3]._key_generator is not None
664 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
666 def check_kg_poolsize(junk, size_delta):
667 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
668 self.key_generator_svc.key_generator.pool_size + size_delta)
670 d.addCallback(check_kg_poolsize, 0)
671 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
672 d.addCallback(check_kg_poolsize, -1)
673 d.addCallback(lambda junk: self.clients[3].create_dirnode())
674 d.addCallback(check_kg_poolsize, -2)
675 # use_helper induces use of clients[3], which is the using-key_gen client
676 d.addCallback(lambda junk:
677 self.POST("uri?t=mkdir&name=george", use_helper=True))
678 d.addCallback(check_kg_poolsize, -3)
682 def flip_bit(self, good):
683 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
685 def mangle_uri(self, gooduri):
686 # change the key, which changes the storage index, which means we'll
687 # be asking about the wrong file, so nobody will have any shares
688 u = uri.from_string(gooduri)
689 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
690 uri_extension_hash=u.uri_extension_hash,
691 needed_shares=u.needed_shares,
692 total_shares=u.total_shares,
694 return u2.to_string()
696 # TODO: add a test which mangles the uri_extension_hash instead, and
697 # should fail due to not being able to get a valid uri_extension block.
698 # Also a test which sneakily mangles the uri_extension block to change
699 # some of the validation data, so it will fail in the post-download phase
700 # when the file's crypttext integrity check fails. Do the same thing for
701 # the key, which should cause the download to fail the post-download
702 # plaintext_hash check.
704 def test_filesystem(self):
705 self.basedir = "system/SystemTest/test_filesystem"
706 self.data = LARGE_DATA
707 d = self.set_up_nodes(use_stats_gatherer=True)
708 def _new_happy_semantics(ign):
709 for c in self.clients:
710 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
711 d.addCallback(_new_happy_semantics)
712 d.addCallback(self._test_introweb)
713 d.addCallback(self.log, "starting publish")
714 d.addCallback(self._do_publish1)
715 d.addCallback(self._test_runner)
716 d.addCallback(self._do_publish2)
717 # at this point, we have the following filesystem (where "R" denotes
718 # self._root_directory_uri):
721 # R/subdir1/mydata567
723 # R/subdir1/subdir2/mydata992
725 d.addCallback(lambda res: self.bounce_client(0))
726 d.addCallback(self.log, "bounced client0")
728 d.addCallback(self._check_publish1)
729 d.addCallback(self.log, "did _check_publish1")
730 d.addCallback(self._check_publish2)
731 d.addCallback(self.log, "did _check_publish2")
732 d.addCallback(self._do_publish_private)
733 d.addCallback(self.log, "did _do_publish_private")
734 # now we also have (where "P" denotes a new dir):
735 # P/personal/sekrit data
736 # P/s2-rw -> /subdir1/subdir2/
737 # P/s2-ro -> /subdir1/subdir2/ (read-only)
738 d.addCallback(self._check_publish_private)
739 d.addCallback(self.log, "did _check_publish_private")
740 d.addCallback(self._test_web)
741 d.addCallback(self._test_control)
742 d.addCallback(self._test_cli)
743 # P now has four top-level children:
744 # P/personal/sekrit data
747 # P/test_put/ (empty)
748 d.addCallback(self._test_checker)
751 def _test_introweb(self, res):
752 d = getPage(self.introweb_url, method="GET", followRedirect=True)
755 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__)
757 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
758 self.failUnless("Subscription Summary: storage: 5" in res)
759 except unittest.FailTest:
761 print "GET %s output was:" % self.introweb_url
764 d.addCallback(_check)
765 d.addCallback(lambda res:
766 getPage(self.introweb_url + "?t=json",
767 method="GET", followRedirect=True))
768 def _check_json(res):
769 data = simplejson.loads(res)
771 self.failUnlessEqual(data["subscription_summary"],
773 self.failUnlessEqual(data["announcement_summary"],
774 {"storage": 5, "stub_client": 5})
775 self.failUnlessEqual(data["announcement_distinct_hosts"],
776 {"storage": 1, "stub_client": 1})
777 except unittest.FailTest:
779 print "GET %s?t=json output was:" % self.introweb_url
782 d.addCallback(_check_json)
785 def _do_publish1(self, res):
786 ut = upload.Data(self.data, convergence=None)
788 d = c0.create_dirnode()
789 def _made_root(new_dirnode):
790 self._root_directory_uri = new_dirnode.get_uri()
791 return c0.create_node_from_uri(self._root_directory_uri)
792 d.addCallback(_made_root)
793 d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
794 def _made_subdir1(subdir1_node):
795 self._subdir1_node = subdir1_node
796 d1 = subdir1_node.add_file(u"mydata567", ut)
797 d1.addCallback(self.log, "publish finished")
798 def _stash_uri(filenode):
799 self.uri = filenode.get_uri()
800 assert isinstance(self.uri, str), (self.uri, filenode)
801 d1.addCallback(_stash_uri)
803 d.addCallback(_made_subdir1)
806 def _do_publish2(self, res):
807 ut = upload.Data(self.data, convergence=None)
808 d = self._subdir1_node.create_subdirectory(u"subdir2")
809 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
812 def log(self, res, *args, **kwargs):
813 # print "MSG: %s RES: %s" % (msg, args)
814 log.msg(*args, **kwargs)
817 def _do_publish_private(self, res):
818 self.smalldata = "sssh, very secret stuff"
819 ut = upload.Data(self.smalldata, convergence=None)
820 d = self.clients[0].create_dirnode()
821 d.addCallback(self.log, "GOT private directory")
822 def _got_new_dir(privnode):
823 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
824 d1 = privnode.create_subdirectory(u"personal")
825 d1.addCallback(self.log, "made P/personal")
826 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
827 d1.addCallback(self.log, "made P/personal/sekrit data")
828 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
830 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
831 s2node.get_readonly_uri())
832 d2.addCallback(lambda node:
833 privnode.set_uri(u"s2-ro",
834 s2node.get_readonly_uri(),
835 s2node.get_readonly_uri()))
837 d1.addCallback(_got_s2)
838 d1.addCallback(lambda res: privnode)
840 d.addCallback(_got_new_dir)
843 def _check_publish1(self, res):
844 # this one uses the iterative API
846 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
847 d.addCallback(self.log, "check_publish1 got /")
848 d.addCallback(lambda root: root.get(u"subdir1"))
849 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
850 d.addCallback(lambda filenode: download_to_data(filenode))
851 d.addCallback(self.log, "get finished")
853 self.failUnlessEqual(data, self.data)
854 d.addCallback(_get_done)
857 def _check_publish2(self, res):
858 # this one uses the path-based API
859 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
860 d = rootnode.get_child_at_path(u"subdir1")
861 d.addCallback(lambda dirnode:
862 self.failUnless(IDirectoryNode.providedBy(dirnode)))
863 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
864 d.addCallback(lambda filenode: download_to_data(filenode))
865 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
867 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
868 def _got_filenode(filenode):
869 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
870 assert fnode == filenode
871 d.addCallback(_got_filenode)
874 def _check_publish_private(self, resnode):
875 # this one uses the path-based API
876 self._private_node = resnode
878 d = self._private_node.get_child_at_path(u"personal")
879 def _got_personal(personal):
880 self._personal_node = personal
882 d.addCallback(_got_personal)
884 d.addCallback(lambda dirnode:
885 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
887 return self._private_node.get_child_at_path(path)
889 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
890 d.addCallback(lambda filenode: download_to_data(filenode))
891 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
892 d.addCallback(lambda res: get_path(u"s2-rw"))
893 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
894 d.addCallback(lambda res: get_path(u"s2-ro"))
895 def _got_s2ro(dirnode):
896 self.failUnless(dirnode.is_mutable(), dirnode)
897 self.failUnless(dirnode.is_readonly(), dirnode)
898 d1 = defer.succeed(None)
899 d1.addCallback(lambda res: dirnode.list())
900 d1.addCallback(self.log, "dirnode.list")
902 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
904 d1.addCallback(self.log, "doing add_file(ro)")
905 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
906 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
908 d1.addCallback(self.log, "doing get(ro)")
909 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
910 d1.addCallback(lambda filenode:
911 self.failUnless(IFileNode.providedBy(filenode)))
913 d1.addCallback(self.log, "doing delete(ro)")
914 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
916 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
918 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
920 personal = self._personal_node
921 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
923 d1.addCallback(self.log, "doing move_child_to(ro)2")
924 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
926 d1.addCallback(self.log, "finished with _got_s2ro")
928 d.addCallback(_got_s2ro)
929 def _got_home(dummy):
930 home = self._private_node
931 personal = self._personal_node
932 d1 = defer.succeed(None)
933 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
934 d1.addCallback(lambda res:
935 personal.move_child_to(u"sekrit data",home,u"sekrit"))
937 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
938 d1.addCallback(lambda res:
939 home.move_child_to(u"sekrit", home, u"sekrit data"))
941 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
942 d1.addCallback(lambda res:
943 home.move_child_to(u"sekrit data", personal))
945 d1.addCallback(lambda res: home.build_manifest().when_done())
946 d1.addCallback(self.log, "manifest")
950 # P/personal/sekrit data
951 # P/s2-rw (same as P/s2-ro)
952 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
953 d1.addCallback(lambda res:
954 self.failUnlessEqual(len(res["manifest"]), 5))
955 d1.addCallback(lambda res: home.start_deep_stats().when_done())
956 def _check_stats(stats):
957 expected = {"count-immutable-files": 1,
958 "count-mutable-files": 0,
959 "count-literal-files": 1,
961 "count-directories": 3,
962 "size-immutable-files": 112,
963 "size-literal-files": 23,
964 #"size-directories": 616, # varies
965 #"largest-directory": 616,
966 "largest-directory-children": 3,
967 "largest-immutable-file": 112,
969 for k,v in expected.iteritems():
970 self.failUnlessEqual(stats[k], v,
971 "stats[%s] was %s, not %s" %
973 self.failUnless(stats["size-directories"] > 1300,
974 stats["size-directories"])
975 self.failUnless(stats["largest-directory"] > 800,
976 stats["largest-directory"])
977 self.failUnlessEqual(stats["size-files-histogram"],
978 [ (11, 31, 1), (101, 316, 1) ])
979 d1.addCallback(_check_stats)
981 d.addCallback(_got_home)
984 def shouldFail(self, res, expected_failure, which, substring=None):
985 if isinstance(res, Failure):
986 res.trap(expected_failure)
988 self.failUnless(substring in str(res),
989 "substring '%s' not in '%s'"
990 % (substring, str(res)))
992 self.fail("%s was supposed to raise %s, not get '%s'" %
993 (which, expected_failure, res))
995 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
996 assert substring is None or isinstance(substring, str)
997 d = defer.maybeDeferred(callable, *args, **kwargs)
999 if isinstance(res, Failure):
1000 res.trap(expected_failure)
1002 self.failUnless(substring in str(res),
1003 "substring '%s' not in '%s'"
1004 % (substring, str(res)))
1006 self.fail("%s was supposed to raise %s, not get '%s'" %
1007 (which, expected_failure, res))
1011 def PUT(self, urlpath, data):
1012 url = self.webish_url + urlpath
1013 return getPage(url, method="PUT", postdata=data)
1015 def GET(self, urlpath, followRedirect=False):
1016 url = self.webish_url + urlpath
1017 return getPage(url, method="GET", followRedirect=followRedirect)
1019 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1020 sepbase = "boogabooga"
1021 sep = "--" + sepbase
1024 form.append('Content-Disposition: form-data; name="_charset"')
1026 form.append('UTF-8')
1028 for name, value in fields.iteritems():
1029 if isinstance(value, tuple):
1030 filename, value = value
1031 form.append('Content-Disposition: form-data; name="%s"; '
1032 'filename="%s"' % (name, filename.encode("utf-8")))
1034 form.append('Content-Disposition: form-data; name="%s"' % name)
1036 form.append(str(value))
1042 body = "\r\n".join(form) + "\r\n"
1043 headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1044 return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1046 def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1049 url = self.helper_webish_url + urlpath
1051 url = self.webish_url + urlpath
1052 return getPage(url, method="POST", postdata=body, headers=headers,
1053 followRedirect=followRedirect)
1055 def _test_web(self, res):
1056 base = self.webish_url
1057 public = "uri/" + self._root_directory_uri
1059 def _got_welcome(page):
1060 # XXX This test is oversensitive to formatting
1061 expected = "Connected to <span>%d</span>\n of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1062 self.failUnless(expected in page,
1063 "I didn't see the right 'connected storage servers'"
1064 " message in: %s" % page
1066 expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1067 self.failUnless(expected in page,
1068 "I didn't see the right 'My nodeid' message "
1070 self.failUnless("Helper: 0 active uploads" in page)
1071 d.addCallback(_got_welcome)
1072 d.addCallback(self.log, "done with _got_welcome")
1074 # get the welcome page from the node that uses the helper too
1075 d.addCallback(lambda res: getPage(self.helper_webish_url))
1076 def _got_welcome_helper(page):
1077 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1079 self.failUnless("Not running helper" in page)
1080 d.addCallback(_got_welcome_helper)
1082 d.addCallback(lambda res: getPage(base + public))
1083 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1084 def _got_subdir1(page):
1085 # there ought to be an href for our file
1086 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1087 self.failUnless(">mydata567</a>" in page)
1088 d.addCallback(_got_subdir1)
1089 d.addCallback(self.log, "done with _got_subdir1")
1090 d.addCallback(lambda res:
1091 getPage(base + public + "/subdir1/mydata567"))
1092 def _got_data(page):
1093 self.failUnlessEqual(page, self.data)
1094 d.addCallback(_got_data)
1096 # download from a URI embedded in a URL
1097 d.addCallback(self.log, "_get_from_uri")
1098 def _get_from_uri(res):
1099 return getPage(base + "uri/%s?filename=%s"
1100 % (self.uri, "mydata567"))
1101 d.addCallback(_get_from_uri)
1102 def _got_from_uri(page):
1103 self.failUnlessEqual(page, self.data)
1104 d.addCallback(_got_from_uri)
1106 # download from a URI embedded in a URL, second form
1107 d.addCallback(self.log, "_get_from_uri2")
1108 def _get_from_uri2(res):
1109 return getPage(base + "uri?uri=%s" % (self.uri,))
1110 d.addCallback(_get_from_uri2)
1111 d.addCallback(_got_from_uri)
1113 # download from a bogus URI, make sure we get a reasonable error
1114 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1115 def _get_from_bogus_uri(res):
1116 d1 = getPage(base + "uri/%s?filename=%s"
1117 % (self.mangle_uri(self.uri), "mydata567"))
1118 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1121 d.addCallback(_get_from_bogus_uri)
1122 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1124 # upload a file with PUT
1125 d.addCallback(self.log, "about to try PUT")
1126 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1127 "new.txt contents"))
1128 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1129 d.addCallback(self.failUnlessEqual, "new.txt contents")
1130 # and again with something large enough to use multiple segments,
1131 # and hopefully trigger pauseProducing too
1132 def _new_happy_semantics(ign):
1133 for c in self.clients:
1134 # these get reset somewhere? Whatever.
1135 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1136 d.addCallback(_new_happy_semantics)
1137 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1138 "big" * 500000)) # 1.5MB
1139 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1140 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1142 # can we replace files in place?
1143 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1145 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1146 d.addCallback(self.failUnlessEqual, "NEWER contents")
1148 # test unlinked POST
1149 d.addCallback(lambda res: self.POST("uri", t="upload",
1150 file=("new.txt", "data" * 10000)))
1151 # and again using the helper, which exercises different upload-status
1153 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1154 file=("foo.txt", "data2" * 10000)))
1156 # check that the status page exists
1157 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1158 def _got_status(res):
1159 # find an interesting upload and download to look at. LIT files
1160 # are not interesting.
1161 h = self.clients[0].get_history()
1162 for ds in h.list_all_download_statuses():
1163 if ds.get_size() > 200:
1164 self._down_status = ds.get_counter()
1165 for us in h.list_all_upload_statuses():
1166 if us.get_size() > 200:
1167 self._up_status = us.get_counter()
1168 rs = list(h.list_all_retrieve_statuses())[0]
1169 self._retrieve_status = rs.get_counter()
1170 ps = list(h.list_all_publish_statuses())[0]
1171 self._publish_status = ps.get_counter()
1172 us = list(h.list_all_mapupdate_statuses())[0]
1173 self._update_status = us.get_counter()
1175 # and that there are some upload- and download- status pages
1176 return self.GET("status/up-%d" % self._up_status)
1177 d.addCallback(_got_status)
1179 return self.GET("status/down-%d" % self._down_status)
1180 d.addCallback(_got_up)
1182 return self.GET("status/mapupdate-%d" % self._update_status)
1183 d.addCallback(_got_down)
1184 def _got_update(res):
1185 return self.GET("status/publish-%d" % self._publish_status)
1186 d.addCallback(_got_update)
1187 def _got_publish(res):
1188 return self.GET("status/retrieve-%d" % self._retrieve_status)
1189 d.addCallback(_got_publish)
1191 # check that the helper status page exists
1192 d.addCallback(lambda res:
1193 self.GET("helper_status", followRedirect=True))
1194 def _got_helper_status(res):
1195 self.failUnless("Bytes Fetched:" in res)
1196 # touch a couple of files in the helper's working directory to
1197 # exercise more code paths
1198 workdir = os.path.join(self.getdir("client0"), "helper")
1199 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1200 f = open(incfile, "wb")
1201 f.write("small file")
1203 then = time.time() - 86400*3
1205 os.utime(incfile, (now, then))
1206 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1207 f = open(encfile, "wb")
1208 f.write("less small file")
1210 os.utime(encfile, (now, then))
1211 d.addCallback(_got_helper_status)
1212 # and that the json form exists
1213 d.addCallback(lambda res:
1214 self.GET("helper_status?t=json", followRedirect=True))
1215 def _got_helper_status_json(res):
1216 data = simplejson.loads(res)
1217 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1219 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1220 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1221 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1223 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1224 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1225 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1227 d.addCallback(_got_helper_status_json)
1229 # and check that client[3] (which uses a helper but does not run one
1230 # itself) doesn't explode when you ask for its status
1231 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1232 def _got_non_helper_status(res):
1233 self.failUnless("Upload and Download Status" in res)
1234 d.addCallback(_got_non_helper_status)
1236 # or for helper status with t=json
1237 d.addCallback(lambda res:
1238 getPage(self.helper_webish_url + "helper_status?t=json"))
1239 def _got_non_helper_status_json(res):
1240 data = simplejson.loads(res)
1241 self.failUnlessEqual(data, {})
1242 d.addCallback(_got_non_helper_status_json)
1244 # see if the statistics page exists
1245 d.addCallback(lambda res: self.GET("statistics"))
1246 def _got_stats(res):
1247 self.failUnless("Node Statistics" in res)
1248 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1249 d.addCallback(_got_stats)
1250 d.addCallback(lambda res: self.GET("statistics?t=json"))
1251 def _got_stats_json(res):
1252 data = simplejson.loads(res)
1253 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1254 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1255 d.addCallback(_got_stats_json)
1257 # TODO: mangle the second segment of a file, to test errors that
1258 # occur after we've already sent some good data, which uses a
1259 # different error path.
1261 # TODO: download a URI with a form
1262 # TODO: create a directory by using a form
1263 # TODO: upload by using a form on the directory page
1264 # url = base + "somedir/subdir1/freeform_post!!upload"
1265 # TODO: delete a file by using a button on the directory page
1269 def _test_runner(self, res):
1270 # exercise some of the diagnostic tools in runner.py
1273 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1274 if "storage" not in dirpath:
1278 pieces = dirpath.split(os.sep)
1279 if (len(pieces) >= 4
1280 and pieces[-4] == "storage"
1281 and pieces[-3] == "shares"):
1282 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1283 # are sharefiles here
1284 filename = os.path.join(dirpath, filenames[0])
1285 # peek at the magic to see if it is a chk share
1286 magic = open(filename, "rb").read(4)
1287 if magic == '\x00\x00\x00\x01':
1290 self.fail("unable to find any uri_extension files in %r"
1292 log.msg("test_system.SystemTest._test_runner using %r" % filename)
1294 out,err = StringIO(), StringIO()
1295 rc = runner.runner(["debug", "dump-share", "--offsets",
1297 stdout=out, stderr=err)
1298 output = out.getvalue()
1299 self.failUnlessEqual(rc, 0)
1301 # we only upload a single file, so we can assert some things about
1302 # its size and shares.
1303 self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1304 self.failUnlessIn("size: %d\n" % len(self.data), output)
1305 self.failUnlessIn("num_segments: 1\n", output)
1306 # segment_size is always a multiple of needed_shares
1307 self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1308 self.failUnlessIn("total_shares: 10\n", output)
1309 # keys which are supposed to be present
1310 for key in ("size", "num_segments", "segment_size",
1311 "needed_shares", "total_shares",
1312 "codec_name", "codec_params", "tail_codec_params",
1313 #"plaintext_hash", "plaintext_root_hash",
1314 "crypttext_hash", "crypttext_root_hash",
1315 "share_root_hash", "UEB_hash"):
1316 self.failUnlessIn("%s: " % key, output)
1317 self.failUnlessIn(" verify-cap: URI:CHK-Verifier:", output)
1319 # now use its storage index to find the other shares using the
1320 # 'find-shares' tool
1321 sharedir, shnum = os.path.split(filename)
1322 storagedir, storage_index_s = os.path.split(sharedir)
1323 out,err = StringIO(), StringIO()
1324 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1325 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1326 rc = runner.runner(cmd, stdout=out, stderr=err)
1327 self.failUnlessEqual(rc, 0)
1329 sharefiles = [sfn.strip() for sfn in out.readlines()]
1330 self.failUnlessEqual(len(sharefiles), 10)
1332 # also exercise the 'catalog-shares' tool
1333 out,err = StringIO(), StringIO()
1334 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1335 cmd = ["debug", "catalog-shares"] + nodedirs
1336 rc = runner.runner(cmd, stdout=out, stderr=err)
1337 self.failUnlessEqual(rc, 0)
1339 descriptions = [sfn.strip() for sfn in out.readlines()]
1340 self.failUnlessEqual(len(descriptions), 30)
1342 for line in descriptions
1343 if line.startswith("CHK %s " % storage_index_s)]
1344 self.failUnlessEqual(len(matching), 10)
1346 def _test_control(self, res):
1347 # exercise the remote-control-the-client foolscap interfaces in
1348 # allmydata.control (mostly used for performance tests)
1349 c0 = self.clients[0]
1350 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1351 control_furl = open(control_furl_file, "r").read().strip()
1352 # it doesn't really matter which Tub we use to connect to the client,
1353 # so let's just use our IntroducerNode's
1354 d = self.introducer.tub.getReference(control_furl)
1355 d.addCallback(self._test_control2, control_furl_file)
1357 def _test_control2(self, rref, filename):
1358 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1359 downfile = os.path.join(self.basedir, "control.downfile")
1360 d.addCallback(lambda uri:
1361 rref.callRemote("download_from_uri_to_file",
1364 self.failUnlessEqual(res, downfile)
1365 data = open(downfile, "r").read()
1366 expected_data = open(filename, "r").read()
1367 self.failUnlessEqual(data, expected_data)
1368 d.addCallback(_check)
1369 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1370 if sys.platform == "linux2":
1371 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1372 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1375 def _test_cli(self, res):
1376 # run various CLI commands (in a thread, since they use blocking
1379 private_uri = self._private_node.get_uri()
1380 client0_basedir = self.getdir("client0")
1383 "--node-directory", client0_basedir,
1386 d = defer.succeed(None)
1388 # for compatibility with earlier versions, private/root_dir.cap is
1389 # supposed to be treated as an alias named "tahoe:". Start by making
1390 # sure that works, before we add other aliases.
1392 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1393 f = open(root_file, "w")
1394 f.write(private_uri)
1397 def run(ignored, verb, *args, **kwargs):
1398 stdin = kwargs.get("stdin", "")
1399 newargs = [verb] + nodeargs + list(args)
1400 return self._run_cli(newargs, stdin=stdin)
1402 def _check_ls((out,err), expected_children, unexpected_children=[]):
1403 self.failUnlessEqual(err, "")
1404 for s in expected_children:
1405 self.failUnless(s in out, (s,out))
1406 for s in unexpected_children:
1407 self.failIf(s in out, (s,out))
1409 def _check_ls_root((out,err)):
1410 self.failUnless("personal" in out)
1411 self.failUnless("s2-ro" in out)
1412 self.failUnless("s2-rw" in out)
1413 self.failUnlessEqual(err, "")
1415 # this should reference private_uri
1416 d.addCallback(run, "ls")
1417 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1419 d.addCallback(run, "list-aliases")
1420 def _check_aliases_1((out,err)):
1421 self.failUnlessEqual(err, "")
1422 self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1423 d.addCallback(_check_aliases_1)
1425 # now that that's out of the way, remove root_dir.cap and work with
1427 d.addCallback(lambda res: os.unlink(root_file))
1428 d.addCallback(run, "list-aliases")
1429 def _check_aliases_2((out,err)):
1430 self.failUnlessEqual(err, "")
1431 self.failUnlessEqual(out, "")
1432 d.addCallback(_check_aliases_2)
1434 d.addCallback(run, "mkdir")
1435 def _got_dir( (out,err) ):
1436 self.failUnless(uri.from_string_dirnode(out.strip()))
1438 d.addCallback(_got_dir)
1439 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1441 d.addCallback(run, "list-aliases")
1442 def _check_aliases_3((out,err)):
1443 self.failUnlessEqual(err, "")
1444 self.failUnless("tahoe: " in out)
1445 d.addCallback(_check_aliases_3)
1447 def _check_empty_dir((out,err)):
1448 self.failUnlessEqual(out, "")
1449 self.failUnlessEqual(err, "")
1450 d.addCallback(run, "ls")
1451 d.addCallback(_check_empty_dir)
1453 def _check_missing_dir((out,err)):
1454 # TODO: check that rc==2
1455 self.failUnlessEqual(out, "")
1456 self.failUnlessEqual(err, "No such file or directory\n")
1457 d.addCallback(run, "ls", "bogus")
1458 d.addCallback(_check_missing_dir)
1463 fn = os.path.join(self.basedir, "file%d" % i)
1465 data = "data to be uploaded: file%d\n" % i
1467 open(fn,"wb").write(data)
1469 def _check_stdout_against((out,err), filenum=None, data=None):
1470 self.failUnlessEqual(err, "")
1471 if filenum is not None:
1472 self.failUnlessEqual(out, datas[filenum])
1473 if data is not None:
1474 self.failUnlessEqual(out, data)
1476 # test all both forms of put: from a file, and from stdin
1478 d.addCallback(run, "put", files[0], "tahoe-file0")
1479 def _put_out((out,err)):
1480 self.failUnless("URI:LIT:" in out, out)
1481 self.failUnless("201 Created" in err, err)
1483 return run(None, "get", uri0)
1484 d.addCallback(_put_out)
1485 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1487 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1488 # tahoe put bar tahoe:FOO
1489 d.addCallback(run, "put", files[2], "tahoe:file2")
1490 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1491 def _check_put_mutable((out,err)):
1492 self._mutable_file3_uri = out.strip()
1493 d.addCallback(_check_put_mutable)
1494 d.addCallback(run, "get", "tahoe:file3")
1495 d.addCallback(_check_stdout_against, 3)
1498 STDIN_DATA = "This is the file to upload from stdin."
1499 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1500 # tahoe put tahoe:FOO
1501 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1502 stdin="Other file from stdin.")
1504 d.addCallback(run, "ls")
1505 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1506 "tahoe-file-stdin", "from-stdin"])
1507 d.addCallback(run, "ls", "subdir")
1508 d.addCallback(_check_ls, ["tahoe-file1"])
1511 d.addCallback(run, "mkdir", "subdir2")
1512 d.addCallback(run, "ls")
1513 # TODO: extract the URI, set an alias with it
1514 d.addCallback(_check_ls, ["subdir2"])
1516 # tahoe get: (to stdin and to a file)
1517 d.addCallback(run, "get", "tahoe-file0")
1518 d.addCallback(_check_stdout_against, 0)
1519 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1520 d.addCallback(_check_stdout_against, 1)
1521 outfile0 = os.path.join(self.basedir, "outfile0")
1522 d.addCallback(run, "get", "file2", outfile0)
1523 def _check_outfile0((out,err)):
1524 data = open(outfile0,"rb").read()
1525 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1526 d.addCallback(_check_outfile0)
1527 outfile1 = os.path.join(self.basedir, "outfile0")
1528 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1529 def _check_outfile1((out,err)):
1530 data = open(outfile1,"rb").read()
1531 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1532 d.addCallback(_check_outfile1)
1534 d.addCallback(run, "rm", "tahoe-file0")
1535 d.addCallback(run, "rm", "tahoe:file2")
1536 d.addCallback(run, "ls")
1537 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1539 d.addCallback(run, "ls", "-l")
1540 def _check_ls_l((out,err)):
1541 lines = out.split("\n")
1543 if "tahoe-file-stdin" in l:
1544 self.failUnless(l.startswith("-r-- "), l)
1545 self.failUnless(" %d " % len(STDIN_DATA) in l)
1547 self.failUnless(l.startswith("-rw- "), l) # mutable
1548 d.addCallback(_check_ls_l)
1550 d.addCallback(run, "ls", "--uri")
1551 def _check_ls_uri((out,err)):
1552 lines = out.split("\n")
1555 self.failUnless(self._mutable_file3_uri in l)
1556 d.addCallback(_check_ls_uri)
1558 d.addCallback(run, "ls", "--readonly-uri")
1559 def _check_ls_rouri((out,err)):
1560 lines = out.split("\n")
1563 rw_uri = self._mutable_file3_uri
1564 u = uri.from_string_mutable_filenode(rw_uri)
1565 ro_uri = u.get_readonly().to_string()
1566 self.failUnless(ro_uri in l)
1567 d.addCallback(_check_ls_rouri)
1570 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1571 d.addCallback(run, "ls")
1572 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1574 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1575 d.addCallback(run, "ls")
1576 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1578 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1579 d.addCallback(run, "ls")
1580 d.addCallback(_check_ls, ["file3", "file3-copy"])
1581 d.addCallback(run, "get", "tahoe:file3-copy")
1582 d.addCallback(_check_stdout_against, 3)
1584 # copy from disk into tahoe
1585 d.addCallback(run, "cp", files[4], "tahoe:file4")
1586 d.addCallback(run, "ls")
1587 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1588 d.addCallback(run, "get", "tahoe:file4")
1589 d.addCallback(_check_stdout_against, 4)
1591 # copy from tahoe into disk
1592 target_filename = os.path.join(self.basedir, "file-out")
1593 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1594 def _check_cp_out((out,err)):
1595 self.failUnless(os.path.exists(target_filename))
1596 got = open(target_filename,"rb").read()
1597 self.failUnlessEqual(got, datas[4])
1598 d.addCallback(_check_cp_out)
1600 # copy from disk to disk (silly case)
1601 target2_filename = os.path.join(self.basedir, "file-out-copy")
1602 d.addCallback(run, "cp", target_filename, target2_filename)
1603 def _check_cp_out2((out,err)):
1604 self.failUnless(os.path.exists(target2_filename))
1605 got = open(target2_filename,"rb").read()
1606 self.failUnlessEqual(got, datas[4])
1607 d.addCallback(_check_cp_out2)
1609 # copy from tahoe into disk, overwriting an existing file
1610 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1611 def _check_cp_out3((out,err)):
1612 self.failUnless(os.path.exists(target_filename))
1613 got = open(target_filename,"rb").read()
1614 self.failUnlessEqual(got, datas[3])
1615 d.addCallback(_check_cp_out3)
1617 # copy from disk into tahoe, overwriting an existing immutable file
1618 d.addCallback(run, "cp", files[5], "tahoe:file4")
1619 d.addCallback(run, "ls")
1620 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1621 d.addCallback(run, "get", "tahoe:file4")
1622 d.addCallback(_check_stdout_against, 5)
1624 # copy from disk into tahoe, overwriting an existing mutable file
1625 d.addCallback(run, "cp", files[5], "tahoe:file3")
1626 d.addCallback(run, "ls")
1627 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1628 d.addCallback(run, "get", "tahoe:file3")
1629 d.addCallback(_check_stdout_against, 5)
1631 # recursive copy: setup
1632 dn = os.path.join(self.basedir, "dir1")
1634 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1635 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1636 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1637 sdn2 = os.path.join(dn, "subdir2")
1639 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1640 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1642 # from disk into tahoe
1643 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1644 d.addCallback(run, "ls")
1645 d.addCallback(_check_ls, ["dir1"])
1646 d.addCallback(run, "ls", "dir1")
1647 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1648 ["rfile4", "rfile5"])
1649 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1650 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1651 ["rfile1", "rfile2", "rfile3"])
1652 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1653 d.addCallback(_check_stdout_against, data="rfile4")
1655 # and back out again
1656 dn_copy = os.path.join(self.basedir, "dir1-copy")
1657 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1658 def _check_cp_r_out((out,err)):
1660 old = open(os.path.join(dn, name), "rb").read()
1661 newfn = os.path.join(dn_copy, name)
1662 self.failUnless(os.path.exists(newfn))
1663 new = open(newfn, "rb").read()
1664 self.failUnlessEqual(old, new)
1668 _cmp(os.path.join("subdir2", "rfile4"))
1669 _cmp(os.path.join("subdir2", "rfile5"))
1670 d.addCallback(_check_cp_r_out)
1672 # and copy it a second time, which ought to overwrite the same files
1673 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1675 # and again, only writing filecaps
1676 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1677 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1678 def _check_capsonly((out,err)):
1679 # these should all be LITs
1680 x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1681 y = uri.from_string_filenode(x)
1682 self.failUnlessEqual(y.data, "rfile4")
1683 d.addCallback(_check_capsonly)
1685 # and tahoe-to-tahoe
1686 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1687 d.addCallback(run, "ls")
1688 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1689 d.addCallback(run, "ls", "dir1-copy")
1690 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1691 ["rfile4", "rfile5"])
1692 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1693 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1694 ["rfile1", "rfile2", "rfile3"])
1695 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1696 d.addCallback(_check_stdout_against, data="rfile4")
1698 # and copy it a second time, which ought to overwrite the same files
1699 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1701 # tahoe_ls doesn't currently handle the error correctly: it tries to
1702 # JSON-parse a traceback.
1703 ## def _ls_missing(res):
1704 ## argv = ["ls"] + nodeargs + ["bogus"]
1705 ## return self._run_cli(argv)
1706 ## d.addCallback(_ls_missing)
1707 ## def _check_ls_missing((out,err)):
1710 ## self.failUnlessEqual(err, "")
1711 ## d.addCallback(_check_ls_missing)
1715 def _run_cli(self, argv, stdin=""):
1717 stdout, stderr = StringIO(), StringIO()
1718 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1719 stdin=StringIO(stdin),
1720 stdout=stdout, stderr=stderr)
1722 return stdout.getvalue(), stderr.getvalue()
1723 d.addCallback(_done)
1726 def _test_checker(self, res):
1727 ut = upload.Data("too big to be literal" * 200, convergence=None)
1728 d = self._personal_node.add_file(u"big file", ut)
1730 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1731 def _check_dirnode_results(r):
1732 self.failUnless(r.is_healthy())
1733 d.addCallback(_check_dirnode_results)
1734 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1735 d.addCallback(_check_dirnode_results)
1737 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1738 def _got_chk_filenode(n):
1739 self.failUnless(isinstance(n, ImmutableFileNode))
1740 d = n.check(Monitor())
1741 def _check_filenode_results(r):
1742 self.failUnless(r.is_healthy())
1743 d.addCallback(_check_filenode_results)
1744 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1745 d.addCallback(_check_filenode_results)
1747 d.addCallback(_got_chk_filenode)
1749 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1750 def _got_lit_filenode(n):
1751 self.failUnless(isinstance(n, LiteralFileNode))
1752 d = n.check(Monitor())
1753 def _check_lit_filenode_results(r):
1754 self.failUnlessEqual(r, None)
1755 d.addCallback(_check_lit_filenode_results)
1756 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1757 d.addCallback(_check_lit_filenode_results)
1759 d.addCallback(_got_lit_filenode)