1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.util.consumer import MemoryConsumer, download_to_data
16 from allmydata.scripts import runner
17 from allmydata.interfaces import IDirectoryNode, IFileNode, \
18 NoSuchChildError, NoSharesError
19 from allmydata.monitor import Monitor
20 from allmydata.mutable.common import NotWriteableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap.api import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
27 from allmydata.test.common import SystemTestMixin
30 This is some data to publish to the remote grid.., which needs to be large
31 enough to not fit inside a LIT uri.
34 class CountingDataUploadable(upload.Data):
36 interrupt_after = None
37 interrupt_after_d = None
39 def read(self, length):
40 self.bytes_read += length
41 if self.interrupt_after is not None:
42 if self.bytes_read > self.interrupt_after:
43 self.interrupt_after = None
44 self.interrupt_after_d.callback(self)
45 return upload.Data.read(self, length)
47 class SystemTest(SystemTestMixin, unittest.TestCase):
48 timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
50 def test_connections(self):
51 self.basedir = "system/SystemTest/test_connections"
52 d = self.set_up_nodes()
53 self.extra_node = None
54 d.addCallback(lambda res: self.add_extra_node(self.numclients))
55 def _check(extra_node):
56 self.extra_node = extra_node
57 for c in self.clients:
58 all_peerids = c.get_storage_broker().get_all_serverids()
59 self.failUnlessEqual(len(all_peerids), self.numclients+1)
61 permuted_peers = sb.get_servers_for_index("a")
62 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
65 def _shutdown_extra_node(res):
67 return self.extra_node.stopService()
69 d.addBoth(_shutdown_extra_node)
71 # test_connections is subsumed by test_upload_and_download, and takes
72 # quite a while to run on a slow machine (because of all the TLS
73 # connections that must be established). If we ever rework the introducer
74 # code to such an extent that we're not sure if it works anymore, we can
75 # reinstate this test until it does.
78 def test_upload_and_download_random_key(self):
79 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
80 return self._test_upload_and_download(convergence=None)
82 def test_upload_and_download_convergent(self):
83 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
84 return self._test_upload_and_download(convergence="some convergence string")
86 def _test_upload_and_download(self, convergence):
87 # we use 4000 bytes of data, which will result in about 400k written
88 # to disk among all our simulated nodes
89 DATA = "Some data to upload\n" * 200
90 d = self.set_up_nodes()
91 def _check_connections(res):
92 for c in self.clients:
93 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
94 all_peerids = c.get_storage_broker().get_all_serverids()
95 self.failUnlessEqual(len(all_peerids), self.numclients)
97 permuted_peers = sb.get_servers_for_index("a")
98 self.failUnlessEqual(len(permuted_peers), self.numclients)
99 d.addCallback(_check_connections)
103 u = self.clients[0].getServiceNamed("uploader")
105 # we crank the max segsize down to 1024b for the duration of this
106 # test, so we can exercise multiple segments. It is important
107 # that this is not a multiple of the segment size, so that the
108 # tail segment is not the same length as the others. This actualy
109 # gets rounded up to 1025 to be a multiple of the number of
110 # required shares (since we use 25 out of 100 FEC).
111 up = upload.Data(DATA, convergence=convergence)
112 up.max_segment_size = 1024
115 d.addCallback(_do_upload)
116 def _upload_done(results):
118 log.msg("upload finished: uri is %s" % (theuri,))
120 assert isinstance(self.uri, str), self.uri
121 self.cap = uri.from_string(self.uri)
122 self.n = self.clients[1].create_node_from_uri(self.uri)
123 d.addCallback(_upload_done)
125 def _upload_again(res):
126 # Upload again. If using convergent encryption then this ought to be
127 # short-circuited, however with the way we currently generate URIs
128 # (i.e. because they include the roothash), we have to do all of the
129 # encoding work, and only get to save on the upload part.
130 log.msg("UPLOADING AGAIN")
131 up = upload.Data(DATA, convergence=convergence)
132 up.max_segment_size = 1024
133 return self.uploader.upload(up)
134 d.addCallback(_upload_again)
136 def _download_to_data(res):
137 log.msg("DOWNLOADING")
138 return download_to_data(self.n)
139 d.addCallback(_download_to_data)
140 def _download_to_data_done(data):
141 log.msg("download finished")
142 self.failUnlessEqual(data, DATA)
143 d.addCallback(_download_to_data_done)
146 n = self.clients[1].create_node_from_uri(self.uri)
147 d = download_to_data(n)
148 def _read_done(data):
149 self.failUnlessEqual(data, DATA)
150 d.addCallback(_read_done)
151 d.addCallback(lambda ign:
152 n.read(MemoryConsumer(), offset=1, size=4))
153 def _read_portion_done(mc):
154 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
155 d.addCallback(_read_portion_done)
156 d.addCallback(lambda ign:
157 n.read(MemoryConsumer(), offset=2, size=None))
158 def _read_tail_done(mc):
159 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
160 d.addCallback(_read_tail_done)
161 d.addCallback(lambda ign:
162 n.read(MemoryConsumer(), size=len(DATA)+1000))
163 def _read_too_much(mc):
164 self.failUnlessEqual("".join(mc.chunks), DATA)
165 d.addCallback(_read_too_much)
168 d.addCallback(_test_read)
170 def _test_bad_read(res):
171 bad_u = uri.from_string_filenode(self.uri)
172 bad_u.key = self.flip_bit(bad_u.key)
173 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
174 # this should cause an error during download
176 d = self.shouldFail2(NoSharesError, "'download bad node'",
178 bad_n.read, MemoryConsumer(), offset=2)
180 d.addCallback(_test_bad_read)
182 def _download_nonexistent_uri(res):
183 baduri = self.mangle_uri(self.uri)
184 badnode = self.clients[1].create_node_from_uri(baduri)
185 log.msg("about to download non-existent URI", level=log.UNUSUAL,
186 facility="tahoe.tests")
187 d1 = download_to_data(badnode)
188 def _baduri_should_fail(res):
189 log.msg("finished downloading non-existend URI",
190 level=log.UNUSUAL, facility="tahoe.tests")
191 self.failUnless(isinstance(res, Failure))
192 self.failUnless(res.check(NoSharesError),
193 "expected NoSharesError, got %s" % res)
194 d1.addBoth(_baduri_should_fail)
196 d.addCallback(_download_nonexistent_uri)
198 # add a new node, which doesn't accept shares, and only uses the
200 d.addCallback(lambda res: self.add_extra_node(self.numclients,
202 add_to_sparent=True))
203 def _added(extra_node):
204 self.extra_node = extra_node
205 self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
206 d.addCallback(_added)
208 HELPER_DATA = "Data that needs help to upload" * 1000
209 def _upload_with_helper(res):
210 u = upload.Data(HELPER_DATA, convergence=convergence)
211 d = self.extra_node.upload(u)
212 def _uploaded(results):
213 n = self.clients[1].create_node_from_uri(results.uri)
214 return download_to_data(n)
215 d.addCallback(_uploaded)
217 self.failUnlessEqual(newdata, HELPER_DATA)
218 d.addCallback(_check)
220 d.addCallback(_upload_with_helper)
222 def _upload_duplicate_with_helper(res):
223 u = upload.Data(HELPER_DATA, convergence=convergence)
224 u.debug_stash_RemoteEncryptedUploadable = True
225 d = self.extra_node.upload(u)
226 def _uploaded(results):
227 n = self.clients[1].create_node_from_uri(results.uri)
228 return download_to_data(n)
229 d.addCallback(_uploaded)
231 self.failUnlessEqual(newdata, HELPER_DATA)
232 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
233 "uploadable started uploading, should have been avoided")
234 d.addCallback(_check)
236 if convergence is not None:
237 d.addCallback(_upload_duplicate_with_helper)
239 def _upload_resumable(res):
240 DATA = "Data that needs help to upload and gets interrupted" * 1000
241 u1 = CountingDataUploadable(DATA, convergence=convergence)
242 u2 = CountingDataUploadable(DATA, convergence=convergence)
244 # we interrupt the connection after about 5kB by shutting down
245 # the helper, then restartingit.
246 u1.interrupt_after = 5000
247 u1.interrupt_after_d = defer.Deferred()
248 u1.interrupt_after_d.addCallback(lambda res:
249 self.bounce_client(0))
251 # sneak into the helper and reduce its chunk size, so that our
252 # debug_interrupt will sever the connection on about the fifth
253 # chunk fetched. This makes sure that we've started to write the
254 # new shares before we abandon them, which exercises the
255 # abort/delete-partial-share code. TODO: find a cleaner way to do
256 # this. I know that this will affect later uses of the helper in
257 # this same test run, but I'm not currently worried about it.
258 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
260 d = self.extra_node.upload(u1)
262 def _should_not_finish(res):
263 self.fail("interrupted upload should have failed, not finished"
264 " with result %s" % (res,))
266 f.trap(DeadReferenceError)
268 # make sure we actually interrupted it before finishing the
270 self.failUnless(u1.bytes_read < len(DATA),
271 "read %d out of %d total" % (u1.bytes_read,
274 log.msg("waiting for reconnect", level=log.NOISY,
275 facility="tahoe.test.test_system")
276 # now, we need to give the nodes a chance to notice that this
277 # connection has gone away. When this happens, the storage
278 # servers will be told to abort their uploads, removing the
279 # partial shares. Unfortunately this involves TCP messages
280 # going through the loopback interface, and we can't easily
281 # predict how long that will take. If it were all local, we
282 # could use fireEventually() to stall. Since we don't have
283 # the right introduction hooks, the best we can do is use a
284 # fixed delay. TODO: this is fragile.
285 u1.interrupt_after_d.addCallback(self.stall, 2.0)
286 return u1.interrupt_after_d
287 d.addCallbacks(_should_not_finish, _interrupted)
289 def _disconnected(res):
290 # check to make sure the storage servers aren't still hanging
291 # on to the partial share: their incoming/ directories should
293 log.msg("disconnected", level=log.NOISY,
294 facility="tahoe.test.test_system")
295 for i in range(self.numclients):
296 incdir = os.path.join(self.getdir("client%d" % i),
297 "storage", "shares", "incoming")
298 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
299 d.addCallback(_disconnected)
301 # then we need to give the reconnector a chance to
302 # reestablish the connection to the helper.
303 d.addCallback(lambda res:
304 log.msg("wait_for_connections", level=log.NOISY,
305 facility="tahoe.test.test_system"))
306 d.addCallback(lambda res: self.wait_for_connections())
309 d.addCallback(lambda res:
310 log.msg("uploading again", level=log.NOISY,
311 facility="tahoe.test.test_system"))
312 d.addCallback(lambda res: self.extra_node.upload(u2))
314 def _uploaded(results):
316 log.msg("Second upload complete", level=log.NOISY,
317 facility="tahoe.test.test_system")
319 # this is really bytes received rather than sent, but it's
320 # convenient and basically measures the same thing
321 bytes_sent = results.ciphertext_fetched
323 # We currently don't support resumption of upload if the data is
324 # encrypted with a random key. (Because that would require us
325 # to store the key locally and re-use it on the next upload of
326 # this file, which isn't a bad thing to do, but we currently
328 if convergence is not None:
329 # Make sure we did not have to read the whole file the
330 # second time around .
331 self.failUnless(bytes_sent < len(DATA),
332 "resumption didn't save us any work:"
333 " read %d bytes out of %d total" %
334 (bytes_sent, len(DATA)))
336 # Make sure we did have to read the whole file the second
337 # time around -- because the one that we partially uploaded
338 # earlier was encrypted with a different random key.
339 self.failIf(bytes_sent < len(DATA),
340 "resumption saved us some work even though we were using random keys:"
341 " read %d bytes out of %d total" %
342 (bytes_sent, len(DATA)))
343 n = self.clients[1].create_node_from_uri(cap)
344 return download_to_data(n)
345 d.addCallback(_uploaded)
348 self.failUnlessEqual(newdata, DATA)
349 # If using convergent encryption, then also check that the
350 # helper has removed the temp file from its directories.
351 if convergence is not None:
352 basedir = os.path.join(self.getdir("client0"), "helper")
353 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
354 self.failUnlessEqual(files, [])
355 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
356 self.failUnlessEqual(files, [])
357 d.addCallback(_check)
359 d.addCallback(_upload_resumable)
361 def _grab_stats(ignored):
362 # the StatsProvider doesn't normally publish a FURL:
363 # instead it passes a live reference to the StatsGatherer
364 # (if and when it connects). To exercise the remote stats
365 # interface, we manually publish client0's StatsProvider
366 # and use client1 to query it.
367 sp = self.clients[0].stats_provider
368 sp_furl = self.clients[0].tub.registerReference(sp)
369 d = self.clients[1].tub.getReference(sp_furl)
370 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
371 def _got_stats(stats):
373 #from pprint import pprint
376 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
377 c = stats["counters"]
378 self.failUnless("storage_server.allocate" in c)
379 d.addCallback(_got_stats)
381 d.addCallback(_grab_stats)
385 def _find_shares(self, basedir):
387 for (dirpath, dirnames, filenames) in os.walk(basedir):
388 if "storage" not in dirpath:
392 pieces = dirpath.split(os.sep)
394 and pieces[-4] == "storage"
395 and pieces[-3] == "shares"):
396 # we're sitting in .../storage/shares/$START/$SINDEX , and there
397 # are sharefiles here
398 assert pieces[-5].startswith("client")
399 client_num = int(pieces[-5][-1])
400 storage_index_s = pieces[-1]
401 storage_index = si_a2b(storage_index_s)
402 for sharename in filenames:
403 shnum = int(sharename)
404 filename = os.path.join(dirpath, sharename)
405 data = (client_num, storage_index, filename, shnum)
408 self.fail("unable to find any share files in %s" % basedir)
411 def _corrupt_mutable_share(self, filename, which):
412 msf = MutableShareFile(filename)
413 datav = msf.readv([ (0, 1000000) ])
414 final_share = datav[0]
415 assert len(final_share) < 1000000 # ought to be truncated
416 pieces = mutable_layout.unpack_share(final_share)
417 (seqnum, root_hash, IV, k, N, segsize, datalen,
418 verification_key, signature, share_hash_chain, block_hash_tree,
419 share_data, enc_privkey) = pieces
421 if which == "seqnum":
424 root_hash = self.flip_bit(root_hash)
426 IV = self.flip_bit(IV)
427 elif which == "segsize":
428 segsize = segsize + 15
429 elif which == "pubkey":
430 verification_key = self.flip_bit(verification_key)
431 elif which == "signature":
432 signature = self.flip_bit(signature)
433 elif which == "share_hash_chain":
434 nodenum = share_hash_chain.keys()[0]
435 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
436 elif which == "block_hash_tree":
437 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
438 elif which == "share_data":
439 share_data = self.flip_bit(share_data)
440 elif which == "encprivkey":
441 enc_privkey = self.flip_bit(enc_privkey)
443 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
445 final_share = mutable_layout.pack_share(prefix,
452 msf.writev( [(0, final_share)], None)
455 def test_mutable(self):
456 self.basedir = "system/SystemTest/test_mutable"
457 DATA = "initial contents go here." # 25 bytes % 3 != 0
458 NEWDATA = "new contents yay"
459 NEWERDATA = "this is getting old"
461 d = self.set_up_nodes(use_key_generator=True)
463 def _create_mutable(res):
465 log.msg("starting create_mutable_file")
466 d1 = c.create_mutable_file(DATA)
468 log.msg("DONE: %s" % (res,))
469 self._mutable_node_1 = res
470 d1.addCallback(_done)
472 d.addCallback(_create_mutable)
474 def _test_debug(res):
475 # find a share. It is important to run this while there is only
476 # one slot in the grid.
477 shares = self._find_shares(self.basedir)
478 (client_num, storage_index, filename, shnum) = shares[0]
479 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
481 log.msg(" for clients[%d]" % client_num)
483 out,err = StringIO(), StringIO()
484 rc = runner.runner(["debug", "dump-share", "--offsets",
486 stdout=out, stderr=err)
487 output = out.getvalue()
488 self.failUnlessEqual(rc, 0)
490 self.failUnless("Mutable slot found:\n" in output)
491 self.failUnless("share_type: SDMF\n" in output)
492 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
493 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
494 self.failUnless(" num_extra_leases: 0\n" in output)
495 self.failUnless(" secrets are for nodeid: %s\n" % peerid
497 self.failUnless(" SDMF contents:\n" in output)
498 self.failUnless(" seqnum: 1\n" in output)
499 self.failUnless(" required_shares: 3\n" in output)
500 self.failUnless(" total_shares: 10\n" in output)
501 self.failUnless(" segsize: 27\n" in output, (output, filename))
502 self.failUnless(" datalen: 25\n" in output)
503 # the exact share_hash_chain nodes depends upon the sharenum,
504 # and is more of a hassle to compute than I want to deal with
506 self.failUnless(" share_hash_chain: " in output)
507 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
508 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
509 base32.b2a(storage_index))
510 self.failUnless(expected in output)
511 except unittest.FailTest:
513 print "dump-share output was:"
516 d.addCallback(_test_debug)
520 # first, let's see if we can use the existing node to retrieve the
521 # contents. This allows it to use the cached pubkey and maybe the
522 # latest-known sharemap.
524 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
525 def _check_download_1(res):
526 self.failUnlessEqual(res, DATA)
527 # now we see if we can retrieve the data from a new node,
528 # constructed using the URI of the original one. We do this test
529 # on the same client that uploaded the data.
530 uri = self._mutable_node_1.get_uri()
531 log.msg("starting retrieve1")
532 newnode = self.clients[0].create_node_from_uri(uri)
533 newnode_2 = self.clients[0].create_node_from_uri(uri)
534 self.failUnlessIdentical(newnode, newnode_2)
535 return newnode.download_best_version()
536 d.addCallback(_check_download_1)
538 def _check_download_2(res):
539 self.failUnlessEqual(res, DATA)
540 # same thing, but with a different client
541 uri = self._mutable_node_1.get_uri()
542 newnode = self.clients[1].create_node_from_uri(uri)
543 log.msg("starting retrieve2")
544 d1 = newnode.download_best_version()
545 d1.addCallback(lambda res: (res, newnode))
547 d.addCallback(_check_download_2)
549 def _check_download_3((res, newnode)):
550 self.failUnlessEqual(res, DATA)
552 log.msg("starting replace1")
553 d1 = newnode.overwrite(NEWDATA)
554 d1.addCallback(lambda res: newnode.download_best_version())
556 d.addCallback(_check_download_3)
558 def _check_download_4(res):
559 self.failUnlessEqual(res, NEWDATA)
560 # now create an even newer node and replace the data on it. This
561 # new node has never been used for download before.
562 uri = self._mutable_node_1.get_uri()
563 newnode1 = self.clients[2].create_node_from_uri(uri)
564 newnode2 = self.clients[3].create_node_from_uri(uri)
565 self._newnode3 = self.clients[3].create_node_from_uri(uri)
566 log.msg("starting replace2")
567 d1 = newnode1.overwrite(NEWERDATA)
568 d1.addCallback(lambda res: newnode2.download_best_version())
570 d.addCallback(_check_download_4)
572 def _check_download_5(res):
573 log.msg("finished replace2")
574 self.failUnlessEqual(res, NEWERDATA)
575 d.addCallback(_check_download_5)
577 def _corrupt_shares(res):
578 # run around and flip bits in all but k of the shares, to test
580 shares = self._find_shares(self.basedir)
581 ## sort by share number
582 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
583 where = dict([ (shnum, filename)
584 for (client_num, storage_index, filename, shnum)
586 assert len(where) == 10 # this test is designed for 3-of-10
587 for shnum, filename in where.items():
588 # shares 7,8,9 are left alone. read will check
589 # (share_hash_chain, block_hash_tree, share_data). New
590 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
591 # segsize, signature).
593 # read: this will trigger "pubkey doesn't match
595 self._corrupt_mutable_share(filename, "pubkey")
596 self._corrupt_mutable_share(filename, "encprivkey")
598 # triggers "signature is invalid"
599 self._corrupt_mutable_share(filename, "seqnum")
601 # triggers "signature is invalid"
602 self._corrupt_mutable_share(filename, "R")
604 # triggers "signature is invalid"
605 self._corrupt_mutable_share(filename, "segsize")
607 self._corrupt_mutable_share(filename, "share_hash_chain")
609 self._corrupt_mutable_share(filename, "block_hash_tree")
611 self._corrupt_mutable_share(filename, "share_data")
612 # other things to correct: IV, signature
613 # 7,8,9 are left alone
615 # note that initial_query_count=5 means that we'll hit the
616 # first 5 servers in effectively random order (based upon
617 # response time), so we won't necessarily ever get a "pubkey
618 # doesn't match fingerprint" error (if we hit shnum>=1 before
619 # shnum=0, we pull the pubkey from there). To get repeatable
620 # specific failures, we need to set initial_query_count=1,
621 # but of course that will change the sequencing behavior of
622 # the retrieval process. TODO: find a reasonable way to make
623 # this a parameter, probably when we expand this test to test
624 # for one failure mode at a time.
626 # when we retrieve this, we should get three signature
627 # failures (where we've mangled seqnum, R, and segsize). The
629 d.addCallback(_corrupt_shares)
631 d.addCallback(lambda res: self._newnode3.download_best_version())
632 d.addCallback(_check_download_5)
634 def _check_empty_file(res):
635 # make sure we can create empty files, this usually screws up the
637 d1 = self.clients[2].create_mutable_file("")
638 d1.addCallback(lambda newnode: newnode.download_best_version())
639 d1.addCallback(lambda res: self.failUnlessEqual("", res))
641 d.addCallback(_check_empty_file)
643 d.addCallback(lambda res: self.clients[0].create_dirnode())
644 def _created_dirnode(dnode):
645 log.msg("_created_dirnode(%s)" % (dnode,))
647 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
648 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
649 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
650 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
651 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
652 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
653 d1.addCallback(lambda res: dnode.build_manifest().when_done())
654 d1.addCallback(lambda res:
655 self.failUnlessEqual(len(res["manifest"]), 1))
657 d.addCallback(_created_dirnode)
659 def wait_for_c3_kg_conn():
660 return self.clients[3]._key_generator is not None
661 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
663 def check_kg_poolsize(junk, size_delta):
664 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
665 self.key_generator_svc.key_generator.pool_size + size_delta)
667 d.addCallback(check_kg_poolsize, 0)
668 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
669 d.addCallback(check_kg_poolsize, -1)
670 d.addCallback(lambda junk: self.clients[3].create_dirnode())
671 d.addCallback(check_kg_poolsize, -2)
672 # use_helper induces use of clients[3], which is the using-key_gen client
673 d.addCallback(lambda junk:
674 self.POST("uri?t=mkdir&name=george", use_helper=True))
675 d.addCallback(check_kg_poolsize, -3)
679 def flip_bit(self, good):
680 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
682 def mangle_uri(self, gooduri):
683 # change the key, which changes the storage index, which means we'll
684 # be asking about the wrong file, so nobody will have any shares
685 u = uri.from_string(gooduri)
686 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
687 uri_extension_hash=u.uri_extension_hash,
688 needed_shares=u.needed_shares,
689 total_shares=u.total_shares,
691 return u2.to_string()
693 # TODO: add a test which mangles the uri_extension_hash instead, and
694 # should fail due to not being able to get a valid uri_extension block.
695 # Also a test which sneakily mangles the uri_extension block to change
696 # some of the validation data, so it will fail in the post-download phase
697 # when the file's crypttext integrity check fails. Do the same thing for
698 # the key, which should cause the download to fail the post-download
699 # plaintext_hash check.
701 def test_filesystem(self):
702 self.basedir = "system/SystemTest/test_filesystem"
703 self.data = LARGE_DATA
704 d = self.set_up_nodes(use_stats_gatherer=True)
705 def _new_happy_semantics(ign):
706 for c in self.clients:
707 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
708 d.addCallback(_new_happy_semantics)
709 d.addCallback(self._test_introweb)
710 d.addCallback(self.log, "starting publish")
711 d.addCallback(self._do_publish1)
712 d.addCallback(self._test_runner)
713 d.addCallback(self._do_publish2)
714 # at this point, we have the following filesystem (where "R" denotes
715 # self._root_directory_uri):
718 # R/subdir1/mydata567
720 # R/subdir1/subdir2/mydata992
722 d.addCallback(lambda res: self.bounce_client(0))
723 d.addCallback(self.log, "bounced client0")
725 d.addCallback(self._check_publish1)
726 d.addCallback(self.log, "did _check_publish1")
727 d.addCallback(self._check_publish2)
728 d.addCallback(self.log, "did _check_publish2")
729 d.addCallback(self._do_publish_private)
730 d.addCallback(self.log, "did _do_publish_private")
731 # now we also have (where "P" denotes a new dir):
732 # P/personal/sekrit data
733 # P/s2-rw -> /subdir1/subdir2/
734 # P/s2-ro -> /subdir1/subdir2/ (read-only)
735 d.addCallback(self._check_publish_private)
736 d.addCallback(self.log, "did _check_publish_private")
737 d.addCallback(self._test_web)
738 d.addCallback(self._test_control)
739 d.addCallback(self._test_cli)
740 # P now has four top-level children:
741 # P/personal/sekrit data
744 # P/test_put/ (empty)
745 d.addCallback(self._test_checker)
748 def _test_introweb(self, res):
749 d = getPage(self.introweb_url, method="GET", followRedirect=True)
752 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
754 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
755 self.failUnless("Subscription Summary: storage: 5" in res)
756 except unittest.FailTest:
758 print "GET %s output was:" % self.introweb_url
761 d.addCallback(_check)
762 d.addCallback(lambda res:
763 getPage(self.introweb_url + "?t=json",
764 method="GET", followRedirect=True))
765 def _check_json(res):
766 data = simplejson.loads(res)
768 self.failUnlessEqual(data["subscription_summary"],
770 self.failUnlessEqual(data["announcement_summary"],
771 {"storage": 5, "stub_client": 5})
772 self.failUnlessEqual(data["announcement_distinct_hosts"],
773 {"storage": 1, "stub_client": 1})
774 except unittest.FailTest:
776 print "GET %s?t=json output was:" % self.introweb_url
779 d.addCallback(_check_json)
782 def _do_publish1(self, res):
783 ut = upload.Data(self.data, convergence=None)
785 d = c0.create_dirnode()
786 def _made_root(new_dirnode):
787 self._root_directory_uri = new_dirnode.get_uri()
788 return c0.create_node_from_uri(self._root_directory_uri)
789 d.addCallback(_made_root)
790 d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
791 def _made_subdir1(subdir1_node):
792 self._subdir1_node = subdir1_node
793 d1 = subdir1_node.add_file(u"mydata567", ut)
794 d1.addCallback(self.log, "publish finished")
795 def _stash_uri(filenode):
796 self.uri = filenode.get_uri()
797 assert isinstance(self.uri, str), (self.uri, filenode)
798 d1.addCallback(_stash_uri)
800 d.addCallback(_made_subdir1)
803 def _do_publish2(self, res):
804 ut = upload.Data(self.data, convergence=None)
805 d = self._subdir1_node.create_subdirectory(u"subdir2")
806 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
809 def log(self, res, *args, **kwargs):
810 # print "MSG: %s RES: %s" % (msg, args)
811 log.msg(*args, **kwargs)
814 def _do_publish_private(self, res):
815 self.smalldata = "sssh, very secret stuff"
816 ut = upload.Data(self.smalldata, convergence=None)
817 d = self.clients[0].create_dirnode()
818 d.addCallback(self.log, "GOT private directory")
819 def _got_new_dir(privnode):
820 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
821 d1 = privnode.create_subdirectory(u"personal")
822 d1.addCallback(self.log, "made P/personal")
823 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
824 d1.addCallback(self.log, "made P/personal/sekrit data")
825 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
827 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
828 s2node.get_readonly_uri())
829 d2.addCallback(lambda node:
830 privnode.set_uri(u"s2-ro",
831 s2node.get_readonly_uri(),
832 s2node.get_readonly_uri()))
834 d1.addCallback(_got_s2)
835 d1.addCallback(lambda res: privnode)
837 d.addCallback(_got_new_dir)
840 def _check_publish1(self, res):
841 # this one uses the iterative API
843 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
844 d.addCallback(self.log, "check_publish1 got /")
845 d.addCallback(lambda root: root.get(u"subdir1"))
846 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
847 d.addCallback(lambda filenode: download_to_data(filenode))
848 d.addCallback(self.log, "get finished")
850 self.failUnlessEqual(data, self.data)
851 d.addCallback(_get_done)
854 def _check_publish2(self, res):
855 # this one uses the path-based API
856 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
857 d = rootnode.get_child_at_path(u"subdir1")
858 d.addCallback(lambda dirnode:
859 self.failUnless(IDirectoryNode.providedBy(dirnode)))
860 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
861 d.addCallback(lambda filenode: download_to_data(filenode))
862 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
864 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
865 def _got_filenode(filenode):
866 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
867 assert fnode == filenode
868 d.addCallback(_got_filenode)
871 def _check_publish_private(self, resnode):
872 # this one uses the path-based API
873 self._private_node = resnode
875 d = self._private_node.get_child_at_path(u"personal")
876 def _got_personal(personal):
877 self._personal_node = personal
879 d.addCallback(_got_personal)
881 d.addCallback(lambda dirnode:
882 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
884 return self._private_node.get_child_at_path(path)
886 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
887 d.addCallback(lambda filenode: download_to_data(filenode))
888 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
889 d.addCallback(lambda res: get_path(u"s2-rw"))
890 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
891 d.addCallback(lambda res: get_path(u"s2-ro"))
892 def _got_s2ro(dirnode):
893 self.failUnless(dirnode.is_mutable(), dirnode)
894 self.failUnless(dirnode.is_readonly(), dirnode)
895 d1 = defer.succeed(None)
896 d1.addCallback(lambda res: dirnode.list())
897 d1.addCallback(self.log, "dirnode.list")
899 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
901 d1.addCallback(self.log, "doing add_file(ro)")
902 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
903 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
905 d1.addCallback(self.log, "doing get(ro)")
906 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
907 d1.addCallback(lambda filenode:
908 self.failUnless(IFileNode.providedBy(filenode)))
910 d1.addCallback(self.log, "doing delete(ro)")
911 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
913 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
915 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
917 personal = self._personal_node
918 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
920 d1.addCallback(self.log, "doing move_child_to(ro)2")
921 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
923 d1.addCallback(self.log, "finished with _got_s2ro")
925 d.addCallback(_got_s2ro)
926 def _got_home(dummy):
927 home = self._private_node
928 personal = self._personal_node
929 d1 = defer.succeed(None)
930 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
931 d1.addCallback(lambda res:
932 personal.move_child_to(u"sekrit data",home,u"sekrit"))
934 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
935 d1.addCallback(lambda res:
936 home.move_child_to(u"sekrit", home, u"sekrit data"))
938 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
939 d1.addCallback(lambda res:
940 home.move_child_to(u"sekrit data", personal))
942 d1.addCallback(lambda res: home.build_manifest().when_done())
943 d1.addCallback(self.log, "manifest")
947 # P/personal/sekrit data
948 # P/s2-rw (same as P/s2-ro)
949 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
950 d1.addCallback(lambda res:
951 self.failUnlessEqual(len(res["manifest"]), 5))
952 d1.addCallback(lambda res: home.start_deep_stats().when_done())
953 def _check_stats(stats):
954 expected = {"count-immutable-files": 1,
955 "count-mutable-files": 0,
956 "count-literal-files": 1,
958 "count-directories": 3,
959 "size-immutable-files": 112,
960 "size-literal-files": 23,
961 #"size-directories": 616, # varies
962 #"largest-directory": 616,
963 "largest-directory-children": 3,
964 "largest-immutable-file": 112,
966 for k,v in expected.iteritems():
967 self.failUnlessEqual(stats[k], v,
968 "stats[%s] was %s, not %s" %
970 self.failUnless(stats["size-directories"] > 1300,
971 stats["size-directories"])
972 self.failUnless(stats["largest-directory"] > 800,
973 stats["largest-directory"])
974 self.failUnlessEqual(stats["size-files-histogram"],
975 [ (11, 31, 1), (101, 316, 1) ])
976 d1.addCallback(_check_stats)
978 d.addCallback(_got_home)
981 def shouldFail(self, res, expected_failure, which, substring=None):
982 if isinstance(res, Failure):
983 res.trap(expected_failure)
985 self.failUnless(substring in str(res),
986 "substring '%s' not in '%s'"
987 % (substring, str(res)))
989 self.fail("%s was supposed to raise %s, not get '%s'" %
990 (which, expected_failure, res))
992 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
993 assert substring is None or isinstance(substring, str)
994 d = defer.maybeDeferred(callable, *args, **kwargs)
996 if isinstance(res, Failure):
997 res.trap(expected_failure)
999 self.failUnless(substring in str(res),
1000 "substring '%s' not in '%s'"
1001 % (substring, str(res)))
1003 self.fail("%s was supposed to raise %s, not get '%s'" %
1004 (which, expected_failure, res))
1008 def PUT(self, urlpath, data):
1009 url = self.webish_url + urlpath
1010 return getPage(url, method="PUT", postdata=data)
1012 def GET(self, urlpath, followRedirect=False):
1013 url = self.webish_url + urlpath
1014 return getPage(url, method="GET", followRedirect=followRedirect)
1016 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1017 sepbase = "boogabooga"
1018 sep = "--" + sepbase
1021 form.append('Content-Disposition: form-data; name="_charset"')
1023 form.append('UTF-8')
1025 for name, value in fields.iteritems():
1026 if isinstance(value, tuple):
1027 filename, value = value
1028 form.append('Content-Disposition: form-data; name="%s"; '
1029 'filename="%s"' % (name, filename.encode("utf-8")))
1031 form.append('Content-Disposition: form-data; name="%s"' % name)
1033 form.append(str(value))
1039 body = "\r\n".join(form) + "\r\n"
1040 headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1041 return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1043 def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1046 url = self.helper_webish_url + urlpath
1048 url = self.webish_url + urlpath
1049 return getPage(url, method="POST", postdata=body, headers=headers,
1050 followRedirect=followRedirect)
1052 def _test_web(self, res):
1053 base = self.webish_url
1054 public = "uri/" + self._root_directory_uri
1056 def _got_welcome(page):
1057 # XXX This test is oversensitive to formatting
1058 expected = "Connected to <span>%d</span>\n of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1059 self.failUnless(expected in page,
1060 "I didn't see the right 'connected storage servers'"
1061 " message in: %s" % page
1063 expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1064 self.failUnless(expected in page,
1065 "I didn't see the right 'My nodeid' message "
1067 self.failUnless("Helper: 0 active uploads" in page)
1068 d.addCallback(_got_welcome)
1069 d.addCallback(self.log, "done with _got_welcome")
1071 # get the welcome page from the node that uses the helper too
1072 d.addCallback(lambda res: getPage(self.helper_webish_url))
1073 def _got_welcome_helper(page):
1074 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1076 self.failUnless("Not running helper" in page)
1077 d.addCallback(_got_welcome_helper)
1079 d.addCallback(lambda res: getPage(base + public))
1080 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1081 def _got_subdir1(page):
1082 # there ought to be an href for our file
1083 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1084 self.failUnless(">mydata567</a>" in page)
1085 d.addCallback(_got_subdir1)
1086 d.addCallback(self.log, "done with _got_subdir1")
1087 d.addCallback(lambda res:
1088 getPage(base + public + "/subdir1/mydata567"))
1089 def _got_data(page):
1090 self.failUnlessEqual(page, self.data)
1091 d.addCallback(_got_data)
1093 # download from a URI embedded in a URL
1094 d.addCallback(self.log, "_get_from_uri")
1095 def _get_from_uri(res):
1096 return getPage(base + "uri/%s?filename=%s"
1097 % (self.uri, "mydata567"))
1098 d.addCallback(_get_from_uri)
1099 def _got_from_uri(page):
1100 self.failUnlessEqual(page, self.data)
1101 d.addCallback(_got_from_uri)
1103 # download from a URI embedded in a URL, second form
1104 d.addCallback(self.log, "_get_from_uri2")
1105 def _get_from_uri2(res):
1106 return getPage(base + "uri?uri=%s" % (self.uri,))
1107 d.addCallback(_get_from_uri2)
1108 d.addCallback(_got_from_uri)
1110 # download from a bogus URI, make sure we get a reasonable error
1111 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1112 def _get_from_bogus_uri(res):
1113 d1 = getPage(base + "uri/%s?filename=%s"
1114 % (self.mangle_uri(self.uri), "mydata567"))
1115 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1118 d.addCallback(_get_from_bogus_uri)
1119 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1121 # upload a file with PUT
1122 d.addCallback(self.log, "about to try PUT")
1123 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1124 "new.txt contents"))
1125 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1126 d.addCallback(self.failUnlessEqual, "new.txt contents")
1127 # and again with something large enough to use multiple segments,
1128 # and hopefully trigger pauseProducing too
1129 def _new_happy_semantics(ign):
1130 for c in self.clients:
1131 # these get reset somewhere? Whatever.
1132 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1133 d.addCallback(_new_happy_semantics)
1134 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1135 "big" * 500000)) # 1.5MB
1136 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1137 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1139 # can we replace files in place?
1140 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1142 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1143 d.addCallback(self.failUnlessEqual, "NEWER contents")
1145 # test unlinked POST
1146 d.addCallback(lambda res: self.POST("uri", t="upload",
1147 file=("new.txt", "data" * 10000)))
1148 # and again using the helper, which exercises different upload-status
1150 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1151 file=("foo.txt", "data2" * 10000)))
1153 # check that the status page exists
1154 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1155 def _got_status(res):
1156 # find an interesting upload and download to look at. LIT files
1157 # are not interesting.
1158 h = self.clients[0].get_history()
1159 for ds in h.list_all_download_statuses():
1160 if ds.get_size() > 200:
1161 self._down_status = ds.get_counter()
1162 for us in h.list_all_upload_statuses():
1163 if us.get_size() > 200:
1164 self._up_status = us.get_counter()
1165 rs = list(h.list_all_retrieve_statuses())[0]
1166 self._retrieve_status = rs.get_counter()
1167 ps = list(h.list_all_publish_statuses())[0]
1168 self._publish_status = ps.get_counter()
1169 us = list(h.list_all_mapupdate_statuses())[0]
1170 self._update_status = us.get_counter()
1172 # and that there are some upload- and download- status pages
1173 return self.GET("status/up-%d" % self._up_status)
1174 d.addCallback(_got_status)
1176 return self.GET("status/down-%d" % self._down_status)
1177 d.addCallback(_got_up)
1179 return self.GET("status/mapupdate-%d" % self._update_status)
1180 d.addCallback(_got_down)
1181 def _got_update(res):
1182 return self.GET("status/publish-%d" % self._publish_status)
1183 d.addCallback(_got_update)
1184 def _got_publish(res):
1185 return self.GET("status/retrieve-%d" % self._retrieve_status)
1186 d.addCallback(_got_publish)
1188 # check that the helper status page exists
1189 d.addCallback(lambda res:
1190 self.GET("helper_status", followRedirect=True))
1191 def _got_helper_status(res):
1192 self.failUnless("Bytes Fetched:" in res)
1193 # touch a couple of files in the helper's working directory to
1194 # exercise more code paths
1195 workdir = os.path.join(self.getdir("client0"), "helper")
1196 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1197 f = open(incfile, "wb")
1198 f.write("small file")
1200 then = time.time() - 86400*3
1202 os.utime(incfile, (now, then))
1203 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1204 f = open(encfile, "wb")
1205 f.write("less small file")
1207 os.utime(encfile, (now, then))
1208 d.addCallback(_got_helper_status)
1209 # and that the json form exists
1210 d.addCallback(lambda res:
1211 self.GET("helper_status?t=json", followRedirect=True))
1212 def _got_helper_status_json(res):
1213 data = simplejson.loads(res)
1214 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1216 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1217 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1218 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1220 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1221 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1222 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1224 d.addCallback(_got_helper_status_json)
1226 # and check that client[3] (which uses a helper but does not run one
1227 # itself) doesn't explode when you ask for its status
1228 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1229 def _got_non_helper_status(res):
1230 self.failUnless("Upload and Download Status" in res)
1231 d.addCallback(_got_non_helper_status)
1233 # or for helper status with t=json
1234 d.addCallback(lambda res:
1235 getPage(self.helper_webish_url + "helper_status?t=json"))
1236 def _got_non_helper_status_json(res):
1237 data = simplejson.loads(res)
1238 self.failUnlessEqual(data, {})
1239 d.addCallback(_got_non_helper_status_json)
1241 # see if the statistics page exists
1242 d.addCallback(lambda res: self.GET("statistics"))
1243 def _got_stats(res):
1244 self.failUnless("Node Statistics" in res)
1245 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1246 d.addCallback(_got_stats)
1247 d.addCallback(lambda res: self.GET("statistics?t=json"))
1248 def _got_stats_json(res):
1249 data = simplejson.loads(res)
1250 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1251 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1252 d.addCallback(_got_stats_json)
1254 # TODO: mangle the second segment of a file, to test errors that
1255 # occur after we've already sent some good data, which uses a
1256 # different error path.
1258 # TODO: download a URI with a form
1259 # TODO: create a directory by using a form
1260 # TODO: upload by using a form on the directory page
1261 # url = base + "somedir/subdir1/freeform_post!!upload"
1262 # TODO: delete a file by using a button on the directory page
1266 def _test_runner(self, res):
1267 # exercise some of the diagnostic tools in runner.py
1270 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1271 if "storage" not in dirpath:
1275 pieces = dirpath.split(os.sep)
1276 if (len(pieces) >= 4
1277 and pieces[-4] == "storage"
1278 and pieces[-3] == "shares"):
1279 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1280 # are sharefiles here
1281 filename = os.path.join(dirpath, filenames[0])
1282 # peek at the magic to see if it is a chk share
1283 magic = open(filename, "rb").read(4)
1284 if magic == '\x00\x00\x00\x01':
1287 self.fail("unable to find any uri_extension files in %s"
1289 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1291 out,err = StringIO(), StringIO()
1292 rc = runner.runner(["debug", "dump-share", "--offsets",
1294 stdout=out, stderr=err)
1295 output = out.getvalue()
1296 self.failUnlessEqual(rc, 0)
1298 # we only upload a single file, so we can assert some things about
1299 # its size and shares.
1300 self.failUnless(("share filename: %s" % filename) in output)
1301 self.failUnless("size: %d\n" % len(self.data) in output)
1302 self.failUnless("num_segments: 1\n" in output)
1303 # segment_size is always a multiple of needed_shares
1304 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1305 self.failUnless("total_shares: 10\n" in output)
1306 # keys which are supposed to be present
1307 for key in ("size", "num_segments", "segment_size",
1308 "needed_shares", "total_shares",
1309 "codec_name", "codec_params", "tail_codec_params",
1310 #"plaintext_hash", "plaintext_root_hash",
1311 "crypttext_hash", "crypttext_root_hash",
1312 "share_root_hash", "UEB_hash"):
1313 self.failUnless("%s: " % key in output, key)
1314 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1316 # now use its storage index to find the other shares using the
1317 # 'find-shares' tool
1318 sharedir, shnum = os.path.split(filename)
1319 storagedir, storage_index_s = os.path.split(sharedir)
1320 out,err = StringIO(), StringIO()
1321 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1322 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1323 rc = runner.runner(cmd, stdout=out, stderr=err)
1324 self.failUnlessEqual(rc, 0)
1326 sharefiles = [sfn.strip() for sfn in out.readlines()]
1327 self.failUnlessEqual(len(sharefiles), 10)
1329 # also exercise the 'catalog-shares' tool
1330 out,err = StringIO(), StringIO()
1331 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1332 cmd = ["debug", "catalog-shares"] + nodedirs
1333 rc = runner.runner(cmd, stdout=out, stderr=err)
1334 self.failUnlessEqual(rc, 0)
1336 descriptions = [sfn.strip() for sfn in out.readlines()]
1337 self.failUnlessEqual(len(descriptions), 30)
1339 for line in descriptions
1340 if line.startswith("CHK %s " % storage_index_s)]
1341 self.failUnlessEqual(len(matching), 10)
1343 def _test_control(self, res):
1344 # exercise the remote-control-the-client foolscap interfaces in
1345 # allmydata.control (mostly used for performance tests)
1346 c0 = self.clients[0]
1347 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1348 control_furl = open(control_furl_file, "r").read().strip()
1349 # it doesn't really matter which Tub we use to connect to the client,
1350 # so let's just use our IntroducerNode's
1351 d = self.introducer.tub.getReference(control_furl)
1352 d.addCallback(self._test_control2, control_furl_file)
1354 def _test_control2(self, rref, filename):
1355 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1356 downfile = os.path.join(self.basedir, "control.downfile")
1357 d.addCallback(lambda uri:
1358 rref.callRemote("download_from_uri_to_file",
1361 self.failUnlessEqual(res, downfile)
1362 data = open(downfile, "r").read()
1363 expected_data = open(filename, "r").read()
1364 self.failUnlessEqual(data, expected_data)
1365 d.addCallback(_check)
1366 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1367 if sys.platform == "linux2":
1368 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1369 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1372 def _test_cli(self, res):
1373 # run various CLI commands (in a thread, since they use blocking
1376 private_uri = self._private_node.get_uri()
1377 client0_basedir = self.getdir("client0")
1380 "--node-directory", client0_basedir,
1383 d = defer.succeed(None)
1385 # for compatibility with earlier versions, private/root_dir.cap is
1386 # supposed to be treated as an alias named "tahoe:". Start by making
1387 # sure that works, before we add other aliases.
1389 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1390 f = open(root_file, "w")
1391 f.write(private_uri)
1394 def run(ignored, verb, *args, **kwargs):
1395 stdin = kwargs.get("stdin", "")
1396 newargs = [verb] + nodeargs + list(args)
1397 return self._run_cli(newargs, stdin=stdin)
1399 def _check_ls((out,err), expected_children, unexpected_children=[]):
1400 self.failUnlessEqual(err, "")
1401 for s in expected_children:
1402 self.failUnless(s in out, (s,out))
1403 for s in unexpected_children:
1404 self.failIf(s in out, (s,out))
1406 def _check_ls_root((out,err)):
1407 self.failUnless("personal" in out)
1408 self.failUnless("s2-ro" in out)
1409 self.failUnless("s2-rw" in out)
1410 self.failUnlessEqual(err, "")
1412 # this should reference private_uri
1413 d.addCallback(run, "ls")
1414 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1416 d.addCallback(run, "list-aliases")
1417 def _check_aliases_1((out,err)):
1418 self.failUnlessEqual(err, "")
1419 self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1420 d.addCallback(_check_aliases_1)
1422 # now that that's out of the way, remove root_dir.cap and work with
1424 d.addCallback(lambda res: os.unlink(root_file))
1425 d.addCallback(run, "list-aliases")
1426 def _check_aliases_2((out,err)):
1427 self.failUnlessEqual(err, "")
1428 self.failUnlessEqual(out, "")
1429 d.addCallback(_check_aliases_2)
1431 d.addCallback(run, "mkdir")
1432 def _got_dir( (out,err) ):
1433 self.failUnless(uri.from_string_dirnode(out.strip()))
1435 d.addCallback(_got_dir)
1436 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1438 d.addCallback(run, "list-aliases")
1439 def _check_aliases_3((out,err)):
1440 self.failUnlessEqual(err, "")
1441 self.failUnless("tahoe: " in out)
1442 d.addCallback(_check_aliases_3)
1444 def _check_empty_dir((out,err)):
1445 self.failUnlessEqual(out, "")
1446 self.failUnlessEqual(err, "")
1447 d.addCallback(run, "ls")
1448 d.addCallback(_check_empty_dir)
1450 def _check_missing_dir((out,err)):
1451 # TODO: check that rc==2
1452 self.failUnlessEqual(out, "")
1453 self.failUnlessEqual(err, "No such file or directory\n")
1454 d.addCallback(run, "ls", "bogus")
1455 d.addCallback(_check_missing_dir)
1460 fn = os.path.join(self.basedir, "file%d" % i)
1462 data = "data to be uploaded: file%d\n" % i
1464 open(fn,"wb").write(data)
1466 def _check_stdout_against((out,err), filenum=None, data=None):
1467 self.failUnlessEqual(err, "")
1468 if filenum is not None:
1469 self.failUnlessEqual(out, datas[filenum])
1470 if data is not None:
1471 self.failUnlessEqual(out, data)
1473 # test all both forms of put: from a file, and from stdin
1475 d.addCallback(run, "put", files[0], "tahoe-file0")
1476 def _put_out((out,err)):
1477 self.failUnless("URI:LIT:" in out, out)
1478 self.failUnless("201 Created" in err, err)
1480 return run(None, "get", uri0)
1481 d.addCallback(_put_out)
1482 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1484 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1485 # tahoe put bar tahoe:FOO
1486 d.addCallback(run, "put", files[2], "tahoe:file2")
1487 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1488 def _check_put_mutable((out,err)):
1489 self._mutable_file3_uri = out.strip()
1490 d.addCallback(_check_put_mutable)
1491 d.addCallback(run, "get", "tahoe:file3")
1492 d.addCallback(_check_stdout_against, 3)
1495 STDIN_DATA = "This is the file to upload from stdin."
1496 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1497 # tahoe put tahoe:FOO
1498 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1499 stdin="Other file from stdin.")
1501 d.addCallback(run, "ls")
1502 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1503 "tahoe-file-stdin", "from-stdin"])
1504 d.addCallback(run, "ls", "subdir")
1505 d.addCallback(_check_ls, ["tahoe-file1"])
1508 d.addCallback(run, "mkdir", "subdir2")
1509 d.addCallback(run, "ls")
1510 # TODO: extract the URI, set an alias with it
1511 d.addCallback(_check_ls, ["subdir2"])
1513 # tahoe get: (to stdin and to a file)
1514 d.addCallback(run, "get", "tahoe-file0")
1515 d.addCallback(_check_stdout_against, 0)
1516 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1517 d.addCallback(_check_stdout_against, 1)
1518 outfile0 = os.path.join(self.basedir, "outfile0")
1519 d.addCallback(run, "get", "file2", outfile0)
1520 def _check_outfile0((out,err)):
1521 data = open(outfile0,"rb").read()
1522 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1523 d.addCallback(_check_outfile0)
1524 outfile1 = os.path.join(self.basedir, "outfile0")
1525 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1526 def _check_outfile1((out,err)):
1527 data = open(outfile1,"rb").read()
1528 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1529 d.addCallback(_check_outfile1)
1531 d.addCallback(run, "rm", "tahoe-file0")
1532 d.addCallback(run, "rm", "tahoe:file2")
1533 d.addCallback(run, "ls")
1534 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1536 d.addCallback(run, "ls", "-l")
1537 def _check_ls_l((out,err)):
1538 lines = out.split("\n")
1540 if "tahoe-file-stdin" in l:
1541 self.failUnless(l.startswith("-r-- "), l)
1542 self.failUnless(" %d " % len(STDIN_DATA) in l)
1544 self.failUnless(l.startswith("-rw- "), l) # mutable
1545 d.addCallback(_check_ls_l)
1547 d.addCallback(run, "ls", "--uri")
1548 def _check_ls_uri((out,err)):
1549 lines = out.split("\n")
1552 self.failUnless(self._mutable_file3_uri in l)
1553 d.addCallback(_check_ls_uri)
1555 d.addCallback(run, "ls", "--readonly-uri")
1556 def _check_ls_rouri((out,err)):
1557 lines = out.split("\n")
1560 rw_uri = self._mutable_file3_uri
1561 u = uri.from_string_mutable_filenode(rw_uri)
1562 ro_uri = u.get_readonly().to_string()
1563 self.failUnless(ro_uri in l)
1564 d.addCallback(_check_ls_rouri)
1567 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1568 d.addCallback(run, "ls")
1569 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1571 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1572 d.addCallback(run, "ls")
1573 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1575 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1576 d.addCallback(run, "ls")
1577 d.addCallback(_check_ls, ["file3", "file3-copy"])
1578 d.addCallback(run, "get", "tahoe:file3-copy")
1579 d.addCallback(_check_stdout_against, 3)
1581 # copy from disk into tahoe
1582 d.addCallback(run, "cp", files[4], "tahoe:file4")
1583 d.addCallback(run, "ls")
1584 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1585 d.addCallback(run, "get", "tahoe:file4")
1586 d.addCallback(_check_stdout_against, 4)
1588 # copy from tahoe into disk
1589 target_filename = os.path.join(self.basedir, "file-out")
1590 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1591 def _check_cp_out((out,err)):
1592 self.failUnless(os.path.exists(target_filename))
1593 got = open(target_filename,"rb").read()
1594 self.failUnlessEqual(got, datas[4])
1595 d.addCallback(_check_cp_out)
1597 # copy from disk to disk (silly case)
1598 target2_filename = os.path.join(self.basedir, "file-out-copy")
1599 d.addCallback(run, "cp", target_filename, target2_filename)
1600 def _check_cp_out2((out,err)):
1601 self.failUnless(os.path.exists(target2_filename))
1602 got = open(target2_filename,"rb").read()
1603 self.failUnlessEqual(got, datas[4])
1604 d.addCallback(_check_cp_out2)
1606 # copy from tahoe into disk, overwriting an existing file
1607 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1608 def _check_cp_out3((out,err)):
1609 self.failUnless(os.path.exists(target_filename))
1610 got = open(target_filename,"rb").read()
1611 self.failUnlessEqual(got, datas[3])
1612 d.addCallback(_check_cp_out3)
1614 # copy from disk into tahoe, overwriting an existing immutable file
1615 d.addCallback(run, "cp", files[5], "tahoe:file4")
1616 d.addCallback(run, "ls")
1617 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1618 d.addCallback(run, "get", "tahoe:file4")
1619 d.addCallback(_check_stdout_against, 5)
1621 # copy from disk into tahoe, overwriting an existing mutable file
1622 d.addCallback(run, "cp", files[5], "tahoe:file3")
1623 d.addCallback(run, "ls")
1624 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1625 d.addCallback(run, "get", "tahoe:file3")
1626 d.addCallback(_check_stdout_against, 5)
1628 # recursive copy: setup
1629 dn = os.path.join(self.basedir, "dir1")
1631 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1632 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1633 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1634 sdn2 = os.path.join(dn, "subdir2")
1636 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1637 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1639 # from disk into tahoe
1640 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1641 d.addCallback(run, "ls")
1642 d.addCallback(_check_ls, ["dir1"])
1643 d.addCallback(run, "ls", "dir1")
1644 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1645 ["rfile4", "rfile5"])
1646 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1647 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1648 ["rfile1", "rfile2", "rfile3"])
1649 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1650 d.addCallback(_check_stdout_against, data="rfile4")
1652 # and back out again
1653 dn_copy = os.path.join(self.basedir, "dir1-copy")
1654 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1655 def _check_cp_r_out((out,err)):
1657 old = open(os.path.join(dn, name), "rb").read()
1658 newfn = os.path.join(dn_copy, name)
1659 self.failUnless(os.path.exists(newfn))
1660 new = open(newfn, "rb").read()
1661 self.failUnlessEqual(old, new)
1665 _cmp(os.path.join("subdir2", "rfile4"))
1666 _cmp(os.path.join("subdir2", "rfile5"))
1667 d.addCallback(_check_cp_r_out)
1669 # and copy it a second time, which ought to overwrite the same files
1670 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1672 # and again, only writing filecaps
1673 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1674 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1675 def _check_capsonly((out,err)):
1676 # these should all be LITs
1677 x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1678 y = uri.from_string_filenode(x)
1679 self.failUnlessEqual(y.data, "rfile4")
1680 d.addCallback(_check_capsonly)
1682 # and tahoe-to-tahoe
1683 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1684 d.addCallback(run, "ls")
1685 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1686 d.addCallback(run, "ls", "dir1-copy")
1687 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1688 ["rfile4", "rfile5"])
1689 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1690 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1691 ["rfile1", "rfile2", "rfile3"])
1692 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1693 d.addCallback(_check_stdout_against, data="rfile4")
1695 # and copy it a second time, which ought to overwrite the same files
1696 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1698 # tahoe_ls doesn't currently handle the error correctly: it tries to
1699 # JSON-parse a traceback.
1700 ## def _ls_missing(res):
1701 ## argv = ["ls"] + nodeargs + ["bogus"]
1702 ## return self._run_cli(argv)
1703 ## d.addCallback(_ls_missing)
1704 ## def _check_ls_missing((out,err)):
1707 ## self.failUnlessEqual(err, "")
1708 ## d.addCallback(_check_ls_missing)
1712 def _run_cli(self, argv, stdin=""):
1714 stdout, stderr = StringIO(), StringIO()
1715 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1716 stdin=StringIO(stdin),
1717 stdout=stdout, stderr=stderr)
1719 return stdout.getvalue(), stderr.getvalue()
1720 d.addCallback(_done)
1723 def _test_checker(self, res):
1724 ut = upload.Data("too big to be literal" * 200, convergence=None)
1725 d = self._personal_node.add_file(u"big file", ut)
1727 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1728 def _check_dirnode_results(r):
1729 self.failUnless(r.is_healthy())
1730 d.addCallback(_check_dirnode_results)
1731 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1732 d.addCallback(_check_dirnode_results)
1734 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1735 def _got_chk_filenode(n):
1736 self.failUnless(isinstance(n, ImmutableFileNode))
1737 d = n.check(Monitor())
1738 def _check_filenode_results(r):
1739 self.failUnless(r.is_healthy())
1740 d.addCallback(_check_filenode_results)
1741 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1742 d.addCallback(_check_filenode_results)
1744 d.addCallback(_got_chk_filenode)
1746 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1747 def _got_lit_filenode(n):
1748 self.failUnless(isinstance(n, LiteralFileNode))
1749 d = n.check(Monitor())
1750 def _check_lit_filenode_results(r):
1751 self.failUnlessEqual(r, None)
1752 d.addCallback(_check_lit_filenode_results)
1753 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1754 d.addCallback(_check_lit_filenode_results)
1756 d.addCallback(_got_lit_filenode)