1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.util.consumer import MemoryConsumer, download_to_data
16 from allmydata.scripts import runner
17 from allmydata.interfaces import IDirectoryNode, IFileNode, \
18 NoSuchChildError, NoSharesError
19 from allmydata.monitor import Monitor
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap.api import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
27 from allmydata.test.common import SystemTestMixin
30 This is some data to publish to the remote grid.., which needs to be large
31 enough to not fit inside a LIT uri.
34 class CountingDataUploadable(upload.Data):
36 interrupt_after = None
37 interrupt_after_d = None
39 def read(self, length):
40 self.bytes_read += length
41 if self.interrupt_after is not None:
42 if self.bytes_read > self.interrupt_after:
43 self.interrupt_after = None
44 self.interrupt_after_d.callback(self)
45 return upload.Data.read(self, length)
47 class SystemTest(SystemTestMixin, unittest.TestCase):
48 timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
50 def test_connections(self):
51 self.basedir = "system/SystemTest/test_connections"
52 d = self.set_up_nodes()
53 self.extra_node = None
54 d.addCallback(lambda res: self.add_extra_node(self.numclients))
55 def _check(extra_node):
56 self.extra_node = extra_node
57 for c in self.clients:
58 all_peerids = c.get_storage_broker().get_all_serverids()
59 self.failUnlessEqual(len(all_peerids), self.numclients+1)
61 permuted_peers = sb.get_servers_for_index("a")
62 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
65 def _shutdown_extra_node(res):
67 return self.extra_node.stopService()
69 d.addBoth(_shutdown_extra_node)
71 # test_connections is subsumed by test_upload_and_download, and takes
72 # quite a while to run on a slow machine (because of all the TLS
73 # connections that must be established). If we ever rework the introducer
74 # code to such an extent that we're not sure if it works anymore, we can
75 # reinstate this test until it does.
78 def test_upload_and_download_random_key(self):
79 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
80 return self._test_upload_and_download(convergence=None)
82 def test_upload_and_download_convergent(self):
83 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
84 return self._test_upload_and_download(convergence="some convergence string")
86 def _test_upload_and_download(self, convergence):
87 # we use 4000 bytes of data, which will result in about 400k written
88 # to disk among all our simulated nodes
89 DATA = "Some data to upload\n" * 200
90 d = self.set_up_nodes()
91 def _check_connections(res):
92 for c in self.clients:
93 all_peerids = c.get_storage_broker().get_all_serverids()
94 self.failUnlessEqual(len(all_peerids), self.numclients)
96 permuted_peers = sb.get_servers_for_index("a")
97 self.failUnlessEqual(len(permuted_peers), self.numclients)
98 d.addCallback(_check_connections)
102 u = self.clients[0].getServiceNamed("uploader")
104 # we crank the max segsize down to 1024b for the duration of this
105 # test, so we can exercise multiple segments. It is important
106 # that this is not a multiple of the segment size, so that the
107 # tail segment is not the same length as the others. This actualy
108 # gets rounded up to 1025 to be a multiple of the number of
109 # required shares (since we use 25 out of 100 FEC).
110 up = upload.Data(DATA, convergence=convergence)
111 up.max_segment_size = 1024
114 d.addCallback(_do_upload)
115 def _upload_done(results):
117 log.msg("upload finished: uri is %s" % (theuri,))
119 assert isinstance(self.uri, str), self.uri
120 self.cap = uri.from_string(self.uri)
121 self.n = self.clients[1].create_node_from_uri(self.uri)
122 d.addCallback(_upload_done)
124 def _upload_again(res):
125 # Upload again. If using convergent encryption then this ought to be
126 # short-circuited, however with the way we currently generate URIs
127 # (i.e. because they include the roothash), we have to do all of the
128 # encoding work, and only get to save on the upload part.
129 log.msg("UPLOADING AGAIN")
130 up = upload.Data(DATA, convergence=convergence)
131 up.max_segment_size = 1024
132 d1 = self.uploader.upload(up)
133 d.addCallback(_upload_again)
135 def _download_to_data(res):
136 log.msg("DOWNLOADING")
137 return download_to_data(self.n)
138 d.addCallback(_download_to_data)
139 def _download_to_data_done(data):
140 log.msg("download finished")
141 self.failUnlessEqual(data, DATA)
142 d.addCallback(_download_to_data_done)
145 n = self.clients[1].create_node_from_uri(self.uri)
146 d = download_to_data(n)
147 def _read_done(data):
148 self.failUnlessEqual(data, DATA)
149 d.addCallback(_read_done)
150 d.addCallback(lambda ign:
151 n.read(MemoryConsumer(), offset=1, size=4))
152 def _read_portion_done(mc):
153 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
154 d.addCallback(_read_portion_done)
155 d.addCallback(lambda ign:
156 n.read(MemoryConsumer(), offset=2, size=None))
157 def _read_tail_done(mc):
158 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
159 d.addCallback(_read_tail_done)
160 d.addCallback(lambda ign:
161 n.read(MemoryConsumer(), size=len(DATA)+1000))
162 def _read_too_much(mc):
163 self.failUnlessEqual("".join(mc.chunks), DATA)
164 d.addCallback(_read_too_much)
167 d.addCallback(_test_read)
169 def _test_bad_read(res):
170 bad_u = uri.from_string_filenode(self.uri)
171 bad_u.key = self.flip_bit(bad_u.key)
172 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
173 # this should cause an error during download
175 d = self.shouldFail2(NoSharesError, "'download bad node'",
177 bad_n.read, MemoryConsumer(), offset=2)
179 d.addCallback(_test_bad_read)
181 def _download_nonexistent_uri(res):
182 baduri = self.mangle_uri(self.uri)
183 badnode = self.clients[1].create_node_from_uri(baduri)
184 log.msg("about to download non-existent URI", level=log.UNUSUAL,
185 facility="tahoe.tests")
186 d1 = download_to_data(badnode)
187 def _baduri_should_fail(res):
188 log.msg("finished downloading non-existend URI",
189 level=log.UNUSUAL, facility="tahoe.tests")
190 self.failUnless(isinstance(res, Failure))
191 self.failUnless(res.check(NoSharesError),
192 "expected NoSharesError, got %s" % res)
193 d1.addBoth(_baduri_should_fail)
195 d.addCallback(_download_nonexistent_uri)
197 # add a new node, which doesn't accept shares, and only uses the
199 d.addCallback(lambda res: self.add_extra_node(self.numclients,
201 add_to_sparent=True))
202 def _added(extra_node):
203 self.extra_node = extra_node
204 d.addCallback(_added)
206 HELPER_DATA = "Data that needs help to upload" * 1000
207 def _upload_with_helper(res):
208 u = upload.Data(HELPER_DATA, convergence=convergence)
209 d = self.extra_node.upload(u)
210 def _uploaded(results):
211 n = self.clients[1].create_node_from_uri(results.uri)
212 return download_to_data(n)
213 d.addCallback(_uploaded)
215 self.failUnlessEqual(newdata, HELPER_DATA)
216 d.addCallback(_check)
218 d.addCallback(_upload_with_helper)
220 def _upload_duplicate_with_helper(res):
221 u = upload.Data(HELPER_DATA, convergence=convergence)
222 u.debug_stash_RemoteEncryptedUploadable = True
223 d = self.extra_node.upload(u)
224 def _uploaded(results):
225 n = self.clients[1].create_node_from_uri(results.uri)
226 return download_to_data(n)
227 d.addCallback(_uploaded)
229 self.failUnlessEqual(newdata, HELPER_DATA)
230 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
231 "uploadable started uploading, should have been avoided")
232 d.addCallback(_check)
234 if convergence is not None:
235 d.addCallback(_upload_duplicate_with_helper)
237 def _upload_resumable(res):
238 DATA = "Data that needs help to upload and gets interrupted" * 1000
239 u1 = CountingDataUploadable(DATA, convergence=convergence)
240 u2 = CountingDataUploadable(DATA, convergence=convergence)
242 # we interrupt the connection after about 5kB by shutting down
243 # the helper, then restartingit.
244 u1.interrupt_after = 5000
245 u1.interrupt_after_d = defer.Deferred()
246 u1.interrupt_after_d.addCallback(lambda res:
247 self.bounce_client(0))
249 # sneak into the helper and reduce its chunk size, so that our
250 # debug_interrupt will sever the connection on about the fifth
251 # chunk fetched. This makes sure that we've started to write the
252 # new shares before we abandon them, which exercises the
253 # abort/delete-partial-share code. TODO: find a cleaner way to do
254 # this. I know that this will affect later uses of the helper in
255 # this same test run, but I'm not currently worried about it.
256 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
258 d = self.extra_node.upload(u1)
260 def _should_not_finish(res):
261 self.fail("interrupted upload should have failed, not finished"
262 " with result %s" % (res,))
264 f.trap(DeadReferenceError)
266 # make sure we actually interrupted it before finishing the
268 self.failUnless(u1.bytes_read < len(DATA),
269 "read %d out of %d total" % (u1.bytes_read,
272 log.msg("waiting for reconnect", level=log.NOISY,
273 facility="tahoe.test.test_system")
274 # now, we need to give the nodes a chance to notice that this
275 # connection has gone away. When this happens, the storage
276 # servers will be told to abort their uploads, removing the
277 # partial shares. Unfortunately this involves TCP messages
278 # going through the loopback interface, and we can't easily
279 # predict how long that will take. If it were all local, we
280 # could use fireEventually() to stall. Since we don't have
281 # the right introduction hooks, the best we can do is use a
282 # fixed delay. TODO: this is fragile.
283 u1.interrupt_after_d.addCallback(self.stall, 2.0)
284 return u1.interrupt_after_d
285 d.addCallbacks(_should_not_finish, _interrupted)
287 def _disconnected(res):
288 # check to make sure the storage servers aren't still hanging
289 # on to the partial share: their incoming/ directories should
291 log.msg("disconnected", level=log.NOISY,
292 facility="tahoe.test.test_system")
293 for i in range(self.numclients):
294 incdir = os.path.join(self.getdir("client%d" % i),
295 "storage", "shares", "incoming")
296 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
297 d.addCallback(_disconnected)
299 # then we need to give the reconnector a chance to
300 # reestablish the connection to the helper.
301 d.addCallback(lambda res:
302 log.msg("wait_for_connections", level=log.NOISY,
303 facility="tahoe.test.test_system"))
304 d.addCallback(lambda res: self.wait_for_connections())
307 d.addCallback(lambda res:
308 log.msg("uploading again", level=log.NOISY,
309 facility="tahoe.test.test_system"))
310 d.addCallback(lambda res: self.extra_node.upload(u2))
312 def _uploaded(results):
314 log.msg("Second upload complete", level=log.NOISY,
315 facility="tahoe.test.test_system")
317 # this is really bytes received rather than sent, but it's
318 # convenient and basically measures the same thing
319 bytes_sent = results.ciphertext_fetched
321 # We currently don't support resumption of upload if the data is
322 # encrypted with a random key. (Because that would require us
323 # to store the key locally and re-use it on the next upload of
324 # this file, which isn't a bad thing to do, but we currently
326 if convergence is not None:
327 # Make sure we did not have to read the whole file the
328 # second time around .
329 self.failUnless(bytes_sent < len(DATA),
330 "resumption didn't save us any work:"
331 " read %d bytes out of %d total" %
332 (bytes_sent, len(DATA)))
334 # Make sure we did have to read the whole file the second
335 # time around -- because the one that we partially uploaded
336 # earlier was encrypted with a different random key.
337 self.failIf(bytes_sent < len(DATA),
338 "resumption saved us some work even though we were using random keys:"
339 " read %d bytes out of %d total" %
340 (bytes_sent, len(DATA)))
341 n = self.clients[1].create_node_from_uri(cap)
342 return download_to_data(n)
343 d.addCallback(_uploaded)
346 self.failUnlessEqual(newdata, DATA)
347 # If using convergent encryption, then also check that the
348 # helper has removed the temp file from its directories.
349 if convergence is not None:
350 basedir = os.path.join(self.getdir("client0"), "helper")
351 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
352 self.failUnlessEqual(files, [])
353 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
354 self.failUnlessEqual(files, [])
355 d.addCallback(_check)
357 d.addCallback(_upload_resumable)
359 def _grab_stats(ignored):
360 # the StatsProvider doesn't normally publish a FURL:
361 # instead it passes a live reference to the StatsGatherer
362 # (if and when it connects). To exercise the remote stats
363 # interface, we manually publish client0's StatsProvider
364 # and use client1 to query it.
365 sp = self.clients[0].stats_provider
366 sp_furl = self.clients[0].tub.registerReference(sp)
367 d = self.clients[1].tub.getReference(sp_furl)
368 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
369 def _got_stats(stats):
371 #from pprint import pprint
374 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
375 c = stats["counters"]
376 self.failUnless("storage_server.allocate" in c)
377 d.addCallback(_got_stats)
379 d.addCallback(_grab_stats)
383 def _find_shares(self, basedir):
385 for (dirpath, dirnames, filenames) in os.walk(basedir):
386 if "storage" not in dirpath:
390 pieces = dirpath.split(os.sep)
392 and pieces[-4] == "storage"
393 and pieces[-3] == "shares"):
394 # we're sitting in .../storage/shares/$START/$SINDEX , and there
395 # are sharefiles here
396 assert pieces[-5].startswith("client")
397 client_num = int(pieces[-5][-1])
398 storage_index_s = pieces[-1]
399 storage_index = si_a2b(storage_index_s)
400 for sharename in filenames:
401 shnum = int(sharename)
402 filename = os.path.join(dirpath, sharename)
403 data = (client_num, storage_index, filename, shnum)
406 self.fail("unable to find any share files in %s" % basedir)
409 def _corrupt_mutable_share(self, filename, which):
410 msf = MutableShareFile(filename)
411 datav = msf.readv([ (0, 1000000) ])
412 final_share = datav[0]
413 assert len(final_share) < 1000000 # ought to be truncated
414 pieces = mutable_layout.unpack_share(final_share)
415 (seqnum, root_hash, IV, k, N, segsize, datalen,
416 verification_key, signature, share_hash_chain, block_hash_tree,
417 share_data, enc_privkey) = pieces
419 if which == "seqnum":
422 root_hash = self.flip_bit(root_hash)
424 IV = self.flip_bit(IV)
425 elif which == "segsize":
426 segsize = segsize + 15
427 elif which == "pubkey":
428 verification_key = self.flip_bit(verification_key)
429 elif which == "signature":
430 signature = self.flip_bit(signature)
431 elif which == "share_hash_chain":
432 nodenum = share_hash_chain.keys()[0]
433 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
434 elif which == "block_hash_tree":
435 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
436 elif which == "share_data":
437 share_data = self.flip_bit(share_data)
438 elif which == "encprivkey":
439 enc_privkey = self.flip_bit(enc_privkey)
441 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
443 final_share = mutable_layout.pack_share(prefix,
450 msf.writev( [(0, final_share)], None)
453 def test_mutable(self):
454 self.basedir = "system/SystemTest/test_mutable"
455 DATA = "initial contents go here." # 25 bytes % 3 != 0
456 NEWDATA = "new contents yay"
457 NEWERDATA = "this is getting old"
459 d = self.set_up_nodes(use_key_generator=True)
461 def _create_mutable(res):
463 log.msg("starting create_mutable_file")
464 d1 = c.create_mutable_file(DATA)
466 log.msg("DONE: %s" % (res,))
467 self._mutable_node_1 = res
468 d1.addCallback(_done)
470 d.addCallback(_create_mutable)
472 def _test_debug(res):
473 # find a share. It is important to run this while there is only
474 # one slot in the grid.
475 shares = self._find_shares(self.basedir)
476 (client_num, storage_index, filename, shnum) = shares[0]
477 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
479 log.msg(" for clients[%d]" % client_num)
481 out,err = StringIO(), StringIO()
482 rc = runner.runner(["debug", "dump-share", "--offsets",
484 stdout=out, stderr=err)
485 output = out.getvalue()
486 self.failUnlessEqual(rc, 0)
488 self.failUnless("Mutable slot found:\n" in output)
489 self.failUnless("share_type: SDMF\n" in output)
490 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
491 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
492 self.failUnless(" num_extra_leases: 0\n" in output)
493 self.failUnless(" secrets are for nodeid: %s\n" % peerid
495 self.failUnless(" SDMF contents:\n" in output)
496 self.failUnless(" seqnum: 1\n" in output)
497 self.failUnless(" required_shares: 3\n" in output)
498 self.failUnless(" total_shares: 10\n" in output)
499 self.failUnless(" segsize: 27\n" in output, (output, filename))
500 self.failUnless(" datalen: 25\n" in output)
501 # the exact share_hash_chain nodes depends upon the sharenum,
502 # and is more of a hassle to compute than I want to deal with
504 self.failUnless(" share_hash_chain: " in output)
505 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
506 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
507 base32.b2a(storage_index))
508 self.failUnless(expected in output)
509 except unittest.FailTest:
511 print "dump-share output was:"
514 d.addCallback(_test_debug)
518 # first, let's see if we can use the existing node to retrieve the
519 # contents. This allows it to use the cached pubkey and maybe the
520 # latest-known sharemap.
522 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
523 def _check_download_1(res):
524 self.failUnlessEqual(res, DATA)
525 # now we see if we can retrieve the data from a new node,
526 # constructed using the URI of the original one. We do this test
527 # on the same client that uploaded the data.
528 uri = self._mutable_node_1.get_uri()
529 log.msg("starting retrieve1")
530 newnode = self.clients[0].create_node_from_uri(uri)
531 newnode_2 = self.clients[0].create_node_from_uri(uri)
532 self.failUnlessIdentical(newnode, newnode_2)
533 return newnode.download_best_version()
534 d.addCallback(_check_download_1)
536 def _check_download_2(res):
537 self.failUnlessEqual(res, DATA)
538 # same thing, but with a different client
539 uri = self._mutable_node_1.get_uri()
540 newnode = self.clients[1].create_node_from_uri(uri)
541 log.msg("starting retrieve2")
542 d1 = newnode.download_best_version()
543 d1.addCallback(lambda res: (res, newnode))
545 d.addCallback(_check_download_2)
547 def _check_download_3((res, newnode)):
548 self.failUnlessEqual(res, DATA)
550 log.msg("starting replace1")
551 d1 = newnode.overwrite(NEWDATA)
552 d1.addCallback(lambda res: newnode.download_best_version())
554 d.addCallback(_check_download_3)
556 def _check_download_4(res):
557 self.failUnlessEqual(res, NEWDATA)
558 # now create an even newer node and replace the data on it. This
559 # new node has never been used for download before.
560 uri = self._mutable_node_1.get_uri()
561 newnode1 = self.clients[2].create_node_from_uri(uri)
562 newnode2 = self.clients[3].create_node_from_uri(uri)
563 self._newnode3 = self.clients[3].create_node_from_uri(uri)
564 log.msg("starting replace2")
565 d1 = newnode1.overwrite(NEWERDATA)
566 d1.addCallback(lambda res: newnode2.download_best_version())
568 d.addCallback(_check_download_4)
570 def _check_download_5(res):
571 log.msg("finished replace2")
572 self.failUnlessEqual(res, NEWERDATA)
573 d.addCallback(_check_download_5)
575 def _corrupt_shares(res):
576 # run around and flip bits in all but k of the shares, to test
578 shares = self._find_shares(self.basedir)
579 ## sort by share number
580 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
581 where = dict([ (shnum, filename)
582 for (client_num, storage_index, filename, shnum)
584 assert len(where) == 10 # this test is designed for 3-of-10
585 for shnum, filename in where.items():
586 # shares 7,8,9 are left alone. read will check
587 # (share_hash_chain, block_hash_tree, share_data). New
588 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
589 # segsize, signature).
591 # read: this will trigger "pubkey doesn't match
593 self._corrupt_mutable_share(filename, "pubkey")
594 self._corrupt_mutable_share(filename, "encprivkey")
596 # triggers "signature is invalid"
597 self._corrupt_mutable_share(filename, "seqnum")
599 # triggers "signature is invalid"
600 self._corrupt_mutable_share(filename, "R")
602 # triggers "signature is invalid"
603 self._corrupt_mutable_share(filename, "segsize")
605 self._corrupt_mutable_share(filename, "share_hash_chain")
607 self._corrupt_mutable_share(filename, "block_hash_tree")
609 self._corrupt_mutable_share(filename, "share_data")
610 # other things to correct: IV, signature
611 # 7,8,9 are left alone
613 # note that initial_query_count=5 means that we'll hit the
614 # first 5 servers in effectively random order (based upon
615 # response time), so we won't necessarily ever get a "pubkey
616 # doesn't match fingerprint" error (if we hit shnum>=1 before
617 # shnum=0, we pull the pubkey from there). To get repeatable
618 # specific failures, we need to set initial_query_count=1,
619 # but of course that will change the sequencing behavior of
620 # the retrieval process. TODO: find a reasonable way to make
621 # this a parameter, probably when we expand this test to test
622 # for one failure mode at a time.
624 # when we retrieve this, we should get three signature
625 # failures (where we've mangled seqnum, R, and segsize). The
627 d.addCallback(_corrupt_shares)
629 d.addCallback(lambda res: self._newnode3.download_best_version())
630 d.addCallback(_check_download_5)
632 def _check_empty_file(res):
633 # make sure we can create empty files, this usually screws up the
635 d1 = self.clients[2].create_mutable_file("")
636 d1.addCallback(lambda newnode: newnode.download_best_version())
637 d1.addCallback(lambda res: self.failUnlessEqual("", res))
639 d.addCallback(_check_empty_file)
641 d.addCallback(lambda res: self.clients[0].create_dirnode())
642 def _created_dirnode(dnode):
643 log.msg("_created_dirnode(%s)" % (dnode,))
645 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
646 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
647 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
648 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
649 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
650 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
651 d1.addCallback(lambda res: dnode.build_manifest().when_done())
652 d1.addCallback(lambda res:
653 self.failUnlessEqual(len(res["manifest"]), 1))
655 d.addCallback(_created_dirnode)
657 def wait_for_c3_kg_conn():
658 return self.clients[3]._key_generator is not None
659 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
661 def check_kg_poolsize(junk, size_delta):
662 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
663 self.key_generator_svc.key_generator.pool_size + size_delta)
665 d.addCallback(check_kg_poolsize, 0)
666 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
667 d.addCallback(check_kg_poolsize, -1)
668 d.addCallback(lambda junk: self.clients[3].create_dirnode())
669 d.addCallback(check_kg_poolsize, -2)
670 # use_helper induces use of clients[3], which is the using-key_gen client
671 d.addCallback(lambda junk:
672 self.POST("uri?t=mkdir&name=george", use_helper=True))
673 d.addCallback(check_kg_poolsize, -3)
677 def flip_bit(self, good):
678 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
680 def mangle_uri(self, gooduri):
681 # change the key, which changes the storage index, which means we'll
682 # be asking about the wrong file, so nobody will have any shares
683 u = uri.from_string(gooduri)
684 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
685 uri_extension_hash=u.uri_extension_hash,
686 needed_shares=u.needed_shares,
687 total_shares=u.total_shares,
689 return u2.to_string()
691 # TODO: add a test which mangles the uri_extension_hash instead, and
692 # should fail due to not being able to get a valid uri_extension block.
693 # Also a test which sneakily mangles the uri_extension block to change
694 # some of the validation data, so it will fail in the post-download phase
695 # when the file's crypttext integrity check fails. Do the same thing for
696 # the key, which should cause the download to fail the post-download
697 # plaintext_hash check.
699 def test_filesystem(self):
700 self.basedir = "system/SystemTest/test_filesystem"
701 self.data = LARGE_DATA
702 d = self.set_up_nodes(use_stats_gatherer=True)
703 d.addCallback(self._test_introweb)
704 d.addCallback(self.log, "starting publish")
705 d.addCallback(self._do_publish1)
706 d.addCallback(self._test_runner)
707 d.addCallback(self._do_publish2)
708 # at this point, we have the following filesystem (where "R" denotes
709 # self._root_directory_uri):
712 # R/subdir1/mydata567
714 # R/subdir1/subdir2/mydata992
716 d.addCallback(lambda res: self.bounce_client(0))
717 d.addCallback(self.log, "bounced client0")
719 d.addCallback(self._check_publish1)
720 d.addCallback(self.log, "did _check_publish1")
721 d.addCallback(self._check_publish2)
722 d.addCallback(self.log, "did _check_publish2")
723 d.addCallback(self._do_publish_private)
724 d.addCallback(self.log, "did _do_publish_private")
725 # now we also have (where "P" denotes a new dir):
726 # P/personal/sekrit data
727 # P/s2-rw -> /subdir1/subdir2/
728 # P/s2-ro -> /subdir1/subdir2/ (read-only)
729 d.addCallback(self._check_publish_private)
730 d.addCallback(self.log, "did _check_publish_private")
731 d.addCallback(self._test_web)
732 d.addCallback(self._test_control)
733 d.addCallback(self._test_cli)
734 # P now has four top-level children:
735 # P/personal/sekrit data
738 # P/test_put/ (empty)
739 d.addCallback(self._test_checker)
742 def _test_introweb(self, res):
743 d = getPage(self.introweb_url, method="GET", followRedirect=True)
746 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
748 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
749 self.failUnless("Subscription Summary: storage: 5" in res)
750 except unittest.FailTest:
752 print "GET %s output was:" % self.introweb_url
755 d.addCallback(_check)
756 d.addCallback(lambda res:
757 getPage(self.introweb_url + "?t=json",
758 method="GET", followRedirect=True))
759 def _check_json(res):
760 data = simplejson.loads(res)
762 self.failUnlessEqual(data["subscription_summary"],
764 self.failUnlessEqual(data["announcement_summary"],
765 {"storage": 5, "stub_client": 5})
766 self.failUnlessEqual(data["announcement_distinct_hosts"],
767 {"storage": 1, "stub_client": 1})
768 except unittest.FailTest:
770 print "GET %s?t=json output was:" % self.introweb_url
773 d.addCallback(_check_json)
776 def _do_publish1(self, res):
777 ut = upload.Data(self.data, convergence=None)
779 d = c0.create_dirnode()
780 def _made_root(new_dirnode):
781 self._root_directory_uri = new_dirnode.get_uri()
782 return c0.create_node_from_uri(self._root_directory_uri)
783 d.addCallback(_made_root)
784 d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
785 def _made_subdir1(subdir1_node):
786 self._subdir1_node = subdir1_node
787 d1 = subdir1_node.add_file(u"mydata567", ut)
788 d1.addCallback(self.log, "publish finished")
789 def _stash_uri(filenode):
790 self.uri = filenode.get_uri()
791 assert isinstance(self.uri, str), (self.uri, filenode)
792 d1.addCallback(_stash_uri)
794 d.addCallback(_made_subdir1)
797 def _do_publish2(self, res):
798 ut = upload.Data(self.data, convergence=None)
799 d = self._subdir1_node.create_subdirectory(u"subdir2")
800 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
803 def log(self, res, *args, **kwargs):
804 # print "MSG: %s RES: %s" % (msg, args)
805 log.msg(*args, **kwargs)
808 def _do_publish_private(self, res):
809 self.smalldata = "sssh, very secret stuff"
810 ut = upload.Data(self.smalldata, convergence=None)
811 d = self.clients[0].create_dirnode()
812 d.addCallback(self.log, "GOT private directory")
813 def _got_new_dir(privnode):
814 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
815 d1 = privnode.create_subdirectory(u"personal")
816 d1.addCallback(self.log, "made P/personal")
817 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
818 d1.addCallback(self.log, "made P/personal/sekrit data")
819 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
821 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
822 s2node.get_readonly_uri())
823 d2.addCallback(lambda node:
824 privnode.set_uri(u"s2-ro",
825 s2node.get_readonly_uri(),
826 s2node.get_readonly_uri()))
828 d1.addCallback(_got_s2)
829 d1.addCallback(lambda res: privnode)
831 d.addCallback(_got_new_dir)
834 def _check_publish1(self, res):
835 # this one uses the iterative API
837 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
838 d.addCallback(self.log, "check_publish1 got /")
839 d.addCallback(lambda root: root.get(u"subdir1"))
840 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
841 d.addCallback(lambda filenode: download_to_data(filenode))
842 d.addCallback(self.log, "get finished")
844 self.failUnlessEqual(data, self.data)
845 d.addCallback(_get_done)
848 def _check_publish2(self, res):
849 # this one uses the path-based API
850 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
851 d = rootnode.get_child_at_path(u"subdir1")
852 d.addCallback(lambda dirnode:
853 self.failUnless(IDirectoryNode.providedBy(dirnode)))
854 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
855 d.addCallback(lambda filenode: download_to_data(filenode))
856 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
858 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
859 def _got_filenode(filenode):
860 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
861 assert fnode == filenode
862 d.addCallback(_got_filenode)
865 def _check_publish_private(self, resnode):
866 # this one uses the path-based API
867 self._private_node = resnode
869 d = self._private_node.get_child_at_path(u"personal")
870 def _got_personal(personal):
871 self._personal_node = personal
873 d.addCallback(_got_personal)
875 d.addCallback(lambda dirnode:
876 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
878 return self._private_node.get_child_at_path(path)
880 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
881 d.addCallback(lambda filenode: download_to_data(filenode))
882 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
883 d.addCallback(lambda res: get_path(u"s2-rw"))
884 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
885 d.addCallback(lambda res: get_path(u"s2-ro"))
886 def _got_s2ro(dirnode):
887 self.failUnless(dirnode.is_mutable(), dirnode)
888 self.failUnless(dirnode.is_readonly(), dirnode)
889 d1 = defer.succeed(None)
890 d1.addCallback(lambda res: dirnode.list())
891 d1.addCallback(self.log, "dirnode.list")
893 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
895 d1.addCallback(self.log, "doing add_file(ro)")
896 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
897 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
899 d1.addCallback(self.log, "doing get(ro)")
900 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
901 d1.addCallback(lambda filenode:
902 self.failUnless(IFileNode.providedBy(filenode)))
904 d1.addCallback(self.log, "doing delete(ro)")
905 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
907 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
909 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
911 personal = self._personal_node
912 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
914 d1.addCallback(self.log, "doing move_child_to(ro)2")
915 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
917 d1.addCallback(self.log, "finished with _got_s2ro")
919 d.addCallback(_got_s2ro)
920 def _got_home(dummy):
921 home = self._private_node
922 personal = self._personal_node
923 d1 = defer.succeed(None)
924 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
925 d1.addCallback(lambda res:
926 personal.move_child_to(u"sekrit data",home,u"sekrit"))
928 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
929 d1.addCallback(lambda res:
930 home.move_child_to(u"sekrit", home, u"sekrit data"))
932 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
933 d1.addCallback(lambda res:
934 home.move_child_to(u"sekrit data", personal))
936 d1.addCallback(lambda res: home.build_manifest().when_done())
937 d1.addCallback(self.log, "manifest")
941 # P/personal/sekrit data
942 # P/s2-rw (same as P/s2-ro)
943 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
944 d1.addCallback(lambda res:
945 self.failUnlessEqual(len(res["manifest"]), 5))
946 d1.addCallback(lambda res: home.start_deep_stats().when_done())
947 def _check_stats(stats):
948 expected = {"count-immutable-files": 1,
949 "count-mutable-files": 0,
950 "count-literal-files": 1,
952 "count-directories": 3,
953 "size-immutable-files": 112,
954 "size-literal-files": 23,
955 #"size-directories": 616, # varies
956 #"largest-directory": 616,
957 "largest-directory-children": 3,
958 "largest-immutable-file": 112,
960 for k,v in expected.iteritems():
961 self.failUnlessEqual(stats[k], v,
962 "stats[%s] was %s, not %s" %
964 self.failUnless(stats["size-directories"] > 1300,
965 stats["size-directories"])
966 self.failUnless(stats["largest-directory"] > 800,
967 stats["largest-directory"])
968 self.failUnlessEqual(stats["size-files-histogram"],
969 [ (11, 31, 1), (101, 316, 1) ])
970 d1.addCallback(_check_stats)
972 d.addCallback(_got_home)
975 def shouldFail(self, res, expected_failure, which, substring=None):
976 if isinstance(res, Failure):
977 res.trap(expected_failure)
979 self.failUnless(substring in str(res),
980 "substring '%s' not in '%s'"
981 % (substring, str(res)))
983 self.fail("%s was supposed to raise %s, not get '%s'" %
984 (which, expected_failure, res))
986 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
987 assert substring is None or isinstance(substring, str)
988 d = defer.maybeDeferred(callable, *args, **kwargs)
990 if isinstance(res, Failure):
991 res.trap(expected_failure)
993 self.failUnless(substring in str(res),
994 "substring '%s' not in '%s'"
995 % (substring, str(res)))
997 self.fail("%s was supposed to raise %s, not get '%s'" %
998 (which, expected_failure, res))
1002 def PUT(self, urlpath, data):
1003 url = self.webish_url + urlpath
1004 return getPage(url, method="PUT", postdata=data)
1006 def GET(self, urlpath, followRedirect=False):
1007 url = self.webish_url + urlpath
1008 return getPage(url, method="GET", followRedirect=followRedirect)
1010 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1011 sepbase = "boogabooga"
1012 sep = "--" + sepbase
1015 form.append('Content-Disposition: form-data; name="_charset"')
1017 form.append('UTF-8')
1019 for name, value in fields.iteritems():
1020 if isinstance(value, tuple):
1021 filename, value = value
1022 form.append('Content-Disposition: form-data; name="%s"; '
1023 'filename="%s"' % (name, filename.encode("utf-8")))
1025 form.append('Content-Disposition: form-data; name="%s"' % name)
1027 form.append(str(value))
1033 body = "\r\n".join(form) + "\r\n"
1034 headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1035 return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1037 def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1040 url = self.helper_webish_url + urlpath
1042 url = self.webish_url + urlpath
1043 return getPage(url, method="POST", postdata=body, headers=headers,
1044 followRedirect=followRedirect)
1046 def _test_web(self, res):
1047 base = self.webish_url
1048 public = "uri/" + self._root_directory_uri
1050 def _got_welcome(page):
1051 # XXX This test is oversensitive to formatting
1052 expected = "Connected to <span>%d</span>\n of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1053 self.failUnless(expected in page,
1054 "I didn't see the right 'connected storage servers'"
1055 " message in: %s" % page
1057 expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1058 self.failUnless(expected in page,
1059 "I didn't see the right 'My nodeid' message "
1061 self.failUnless("Helper: 0 active uploads" in page)
1062 d.addCallback(_got_welcome)
1063 d.addCallback(self.log, "done with _got_welcome")
1065 # get the welcome page from the node that uses the helper too
1066 d.addCallback(lambda res: getPage(self.helper_webish_url))
1067 def _got_welcome_helper(page):
1068 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1070 self.failUnless("Not running helper" in page)
1071 d.addCallback(_got_welcome_helper)
1073 d.addCallback(lambda res: getPage(base + public))
1074 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1075 def _got_subdir1(page):
1076 # there ought to be an href for our file
1077 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1078 self.failUnless(">mydata567</a>" in page)
1079 d.addCallback(_got_subdir1)
1080 d.addCallback(self.log, "done with _got_subdir1")
1081 d.addCallback(lambda res:
1082 getPage(base + public + "/subdir1/mydata567"))
1083 def _got_data(page):
1084 self.failUnlessEqual(page, self.data)
1085 d.addCallback(_got_data)
1087 # download from a URI embedded in a URL
1088 d.addCallback(self.log, "_get_from_uri")
1089 def _get_from_uri(res):
1090 return getPage(base + "uri/%s?filename=%s"
1091 % (self.uri, "mydata567"))
1092 d.addCallback(_get_from_uri)
1093 def _got_from_uri(page):
1094 self.failUnlessEqual(page, self.data)
1095 d.addCallback(_got_from_uri)
1097 # download from a URI embedded in a URL, second form
1098 d.addCallback(self.log, "_get_from_uri2")
1099 def _get_from_uri2(res):
1100 return getPage(base + "uri?uri=%s" % (self.uri,))
1101 d.addCallback(_get_from_uri2)
1102 d.addCallback(_got_from_uri)
1104 # download from a bogus URI, make sure we get a reasonable error
1105 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1106 def _get_from_bogus_uri(res):
1107 d1 = getPage(base + "uri/%s?filename=%s"
1108 % (self.mangle_uri(self.uri), "mydata567"))
1109 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1112 d.addCallback(_get_from_bogus_uri)
1113 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1115 # upload a file with PUT
1116 d.addCallback(self.log, "about to try PUT")
1117 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1118 "new.txt contents"))
1119 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1120 d.addCallback(self.failUnlessEqual, "new.txt contents")
1121 # and again with something large enough to use multiple segments,
1122 # and hopefully trigger pauseProducing too
1123 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1124 "big" * 500000)) # 1.5MB
1125 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1126 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1128 # can we replace files in place?
1129 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1131 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1132 d.addCallback(self.failUnlessEqual, "NEWER contents")
1134 # test unlinked POST
1135 d.addCallback(lambda res: self.POST("uri", t="upload",
1136 file=("new.txt", "data" * 10000)))
1137 # and again using the helper, which exercises different upload-status
1139 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1140 file=("foo.txt", "data2" * 10000)))
1142 # check that the status page exists
1143 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1144 def _got_status(res):
1145 # find an interesting upload and download to look at. LIT files
1146 # are not interesting.
1147 h = self.clients[0].get_history()
1148 for ds in h.list_all_download_statuses():
1149 if ds.get_size() > 200:
1150 self._down_status = ds.get_counter()
1151 for us in h.list_all_upload_statuses():
1152 if us.get_size() > 200:
1153 self._up_status = us.get_counter()
1154 rs = list(h.list_all_retrieve_statuses())[0]
1155 self._retrieve_status = rs.get_counter()
1156 ps = list(h.list_all_publish_statuses())[0]
1157 self._publish_status = ps.get_counter()
1158 us = list(h.list_all_mapupdate_statuses())[0]
1159 self._update_status = us.get_counter()
1161 # and that there are some upload- and download- status pages
1162 return self.GET("status/up-%d" % self._up_status)
1163 d.addCallback(_got_status)
1165 return self.GET("status/down-%d" % self._down_status)
1166 d.addCallback(_got_up)
1168 return self.GET("status/mapupdate-%d" % self._update_status)
1169 d.addCallback(_got_down)
1170 def _got_update(res):
1171 return self.GET("status/publish-%d" % self._publish_status)
1172 d.addCallback(_got_update)
1173 def _got_publish(res):
1174 return self.GET("status/retrieve-%d" % self._retrieve_status)
1175 d.addCallback(_got_publish)
1177 # check that the helper status page exists
1178 d.addCallback(lambda res:
1179 self.GET("helper_status", followRedirect=True))
1180 def _got_helper_status(res):
1181 self.failUnless("Bytes Fetched:" in res)
1182 # touch a couple of files in the helper's working directory to
1183 # exercise more code paths
1184 workdir = os.path.join(self.getdir("client0"), "helper")
1185 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1186 f = open(incfile, "wb")
1187 f.write("small file")
1189 then = time.time() - 86400*3
1191 os.utime(incfile, (now, then))
1192 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1193 f = open(encfile, "wb")
1194 f.write("less small file")
1196 os.utime(encfile, (now, then))
1197 d.addCallback(_got_helper_status)
1198 # and that the json form exists
1199 d.addCallback(lambda res:
1200 self.GET("helper_status?t=json", followRedirect=True))
1201 def _got_helper_status_json(res):
1202 data = simplejson.loads(res)
1203 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1205 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1206 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1207 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1209 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1210 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1211 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1213 d.addCallback(_got_helper_status_json)
1215 # and check that client[3] (which uses a helper but does not run one
1216 # itself) doesn't explode when you ask for its status
1217 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1218 def _got_non_helper_status(res):
1219 self.failUnless("Upload and Download Status" in res)
1220 d.addCallback(_got_non_helper_status)
1222 # or for helper status with t=json
1223 d.addCallback(lambda res:
1224 getPage(self.helper_webish_url + "helper_status?t=json"))
1225 def _got_non_helper_status_json(res):
1226 data = simplejson.loads(res)
1227 self.failUnlessEqual(data, {})
1228 d.addCallback(_got_non_helper_status_json)
1230 # see if the statistics page exists
1231 d.addCallback(lambda res: self.GET("statistics"))
1232 def _got_stats(res):
1233 self.failUnless("Node Statistics" in res)
1234 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1235 d.addCallback(_got_stats)
1236 d.addCallback(lambda res: self.GET("statistics?t=json"))
1237 def _got_stats_json(res):
1238 data = simplejson.loads(res)
1239 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1240 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1241 d.addCallback(_got_stats_json)
1243 # TODO: mangle the second segment of a file, to test errors that
1244 # occur after we've already sent some good data, which uses a
1245 # different error path.
1247 # TODO: download a URI with a form
1248 # TODO: create a directory by using a form
1249 # TODO: upload by using a form on the directory page
1250 # url = base + "somedir/subdir1/freeform_post!!upload"
1251 # TODO: delete a file by using a button on the directory page
1255 def _test_runner(self, res):
1256 # exercise some of the diagnostic tools in runner.py
1259 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1260 if "storage" not in dirpath:
1264 pieces = dirpath.split(os.sep)
1265 if (len(pieces) >= 4
1266 and pieces[-4] == "storage"
1267 and pieces[-3] == "shares"):
1268 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1269 # are sharefiles here
1270 filename = os.path.join(dirpath, filenames[0])
1271 # peek at the magic to see if it is a chk share
1272 magic = open(filename, "rb").read(4)
1273 if magic == '\x00\x00\x00\x01':
1276 self.fail("unable to find any uri_extension files in %s"
1278 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1280 out,err = StringIO(), StringIO()
1281 rc = runner.runner(["debug", "dump-share", "--offsets",
1283 stdout=out, stderr=err)
1284 output = out.getvalue()
1285 self.failUnlessEqual(rc, 0)
1287 # we only upload a single file, so we can assert some things about
1288 # its size and shares.
1289 self.failUnless(("share filename: %s" % filename) in output)
1290 self.failUnless("size: %d\n" % len(self.data) in output)
1291 self.failUnless("num_segments: 1\n" in output)
1292 # segment_size is always a multiple of needed_shares
1293 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1294 self.failUnless("total_shares: 10\n" in output)
1295 # keys which are supposed to be present
1296 for key in ("size", "num_segments", "segment_size",
1297 "needed_shares", "total_shares",
1298 "codec_name", "codec_params", "tail_codec_params",
1299 #"plaintext_hash", "plaintext_root_hash",
1300 "crypttext_hash", "crypttext_root_hash",
1301 "share_root_hash", "UEB_hash"):
1302 self.failUnless("%s: " % key in output, key)
1303 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1305 # now use its storage index to find the other shares using the
1306 # 'find-shares' tool
1307 sharedir, shnum = os.path.split(filename)
1308 storagedir, storage_index_s = os.path.split(sharedir)
1309 out,err = StringIO(), StringIO()
1310 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1311 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1312 rc = runner.runner(cmd, stdout=out, stderr=err)
1313 self.failUnlessEqual(rc, 0)
1315 sharefiles = [sfn.strip() for sfn in out.readlines()]
1316 self.failUnlessEqual(len(sharefiles), 10)
1318 # also exercise the 'catalog-shares' tool
1319 out,err = StringIO(), StringIO()
1320 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1321 cmd = ["debug", "catalog-shares"] + nodedirs
1322 rc = runner.runner(cmd, stdout=out, stderr=err)
1323 self.failUnlessEqual(rc, 0)
1325 descriptions = [sfn.strip() for sfn in out.readlines()]
1326 self.failUnlessEqual(len(descriptions), 30)
1328 for line in descriptions
1329 if line.startswith("CHK %s " % storage_index_s)]
1330 self.failUnlessEqual(len(matching), 10)
1332 def _test_control(self, res):
1333 # exercise the remote-control-the-client foolscap interfaces in
1334 # allmydata.control (mostly used for performance tests)
1335 c0 = self.clients[0]
1336 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1337 control_furl = open(control_furl_file, "r").read().strip()
1338 # it doesn't really matter which Tub we use to connect to the client,
1339 # so let's just use our IntroducerNode's
1340 d = self.introducer.tub.getReference(control_furl)
1341 d.addCallback(self._test_control2, control_furl_file)
1343 def _test_control2(self, rref, filename):
1344 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1345 downfile = os.path.join(self.basedir, "control.downfile")
1346 d.addCallback(lambda uri:
1347 rref.callRemote("download_from_uri_to_file",
1350 self.failUnlessEqual(res, downfile)
1351 data = open(downfile, "r").read()
1352 expected_data = open(filename, "r").read()
1353 self.failUnlessEqual(data, expected_data)
1354 d.addCallback(_check)
1355 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1356 if sys.platform == "linux2":
1357 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1358 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1361 def _test_cli(self, res):
1362 # run various CLI commands (in a thread, since they use blocking
1365 private_uri = self._private_node.get_uri()
1366 client0_basedir = self.getdir("client0")
1369 "--node-directory", client0_basedir,
1372 d = defer.succeed(None)
1374 # for compatibility with earlier versions, private/root_dir.cap is
1375 # supposed to be treated as an alias named "tahoe:". Start by making
1376 # sure that works, before we add other aliases.
1378 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1379 f = open(root_file, "w")
1380 f.write(private_uri)
1383 def run(ignored, verb, *args, **kwargs):
1384 stdin = kwargs.get("stdin", "")
1385 newargs = [verb] + nodeargs + list(args)
1386 return self._run_cli(newargs, stdin=stdin)
1388 def _check_ls((out,err), expected_children, unexpected_children=[]):
1389 self.failUnlessEqual(err, "")
1390 for s in expected_children:
1391 self.failUnless(s in out, (s,out))
1392 for s in unexpected_children:
1393 self.failIf(s in out, (s,out))
1395 def _check_ls_root((out,err)):
1396 self.failUnless("personal" in out)
1397 self.failUnless("s2-ro" in out)
1398 self.failUnless("s2-rw" in out)
1399 self.failUnlessEqual(err, "")
1401 # this should reference private_uri
1402 d.addCallback(run, "ls")
1403 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1405 d.addCallback(run, "list-aliases")
1406 def _check_aliases_1((out,err)):
1407 self.failUnlessEqual(err, "")
1408 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1409 d.addCallback(_check_aliases_1)
1411 # now that that's out of the way, remove root_dir.cap and work with
1413 d.addCallback(lambda res: os.unlink(root_file))
1414 d.addCallback(run, "list-aliases")
1415 def _check_aliases_2((out,err)):
1416 self.failUnlessEqual(err, "")
1417 self.failUnlessEqual(out, "")
1418 d.addCallback(_check_aliases_2)
1420 d.addCallback(run, "mkdir")
1421 def _got_dir( (out,err) ):
1422 self.failUnless(uri.from_string_dirnode(out.strip()))
1424 d.addCallback(_got_dir)
1425 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1427 d.addCallback(run, "list-aliases")
1428 def _check_aliases_3((out,err)):
1429 self.failUnlessEqual(err, "")
1430 self.failUnless("tahoe: " in out)
1431 d.addCallback(_check_aliases_3)
1433 def _check_empty_dir((out,err)):
1434 self.failUnlessEqual(out, "")
1435 self.failUnlessEqual(err, "")
1436 d.addCallback(run, "ls")
1437 d.addCallback(_check_empty_dir)
1439 def _check_missing_dir((out,err)):
1440 # TODO: check that rc==2
1441 self.failUnlessEqual(out, "")
1442 self.failUnlessEqual(err, "No such file or directory\n")
1443 d.addCallback(run, "ls", "bogus")
1444 d.addCallback(_check_missing_dir)
1449 fn = os.path.join(self.basedir, "file%d" % i)
1451 data = "data to be uploaded: file%d\n" % i
1453 open(fn,"wb").write(data)
1455 def _check_stdout_against((out,err), filenum=None, data=None):
1456 self.failUnlessEqual(err, "")
1457 if filenum is not None:
1458 self.failUnlessEqual(out, datas[filenum])
1459 if data is not None:
1460 self.failUnlessEqual(out, data)
1462 # test all both forms of put: from a file, and from stdin
1464 d.addCallback(run, "put", files[0], "tahoe-file0")
1465 def _put_out((out,err)):
1466 self.failUnless("URI:LIT:" in out, out)
1467 self.failUnless("201 Created" in err, err)
1469 return run(None, "get", uri0)
1470 d.addCallback(_put_out)
1471 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1473 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1474 # tahoe put bar tahoe:FOO
1475 d.addCallback(run, "put", files[2], "tahoe:file2")
1476 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1477 def _check_put_mutable((out,err)):
1478 self._mutable_file3_uri = out.strip()
1479 d.addCallback(_check_put_mutable)
1480 d.addCallback(run, "get", "tahoe:file3")
1481 d.addCallback(_check_stdout_against, 3)
1484 STDIN_DATA = "This is the file to upload from stdin."
1485 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1486 # tahoe put tahoe:FOO
1487 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1488 stdin="Other file from stdin.")
1490 d.addCallback(run, "ls")
1491 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1492 "tahoe-file-stdin", "from-stdin"])
1493 d.addCallback(run, "ls", "subdir")
1494 d.addCallback(_check_ls, ["tahoe-file1"])
1497 d.addCallback(run, "mkdir", "subdir2")
1498 d.addCallback(run, "ls")
1499 # TODO: extract the URI, set an alias with it
1500 d.addCallback(_check_ls, ["subdir2"])
1502 # tahoe get: (to stdin and to a file)
1503 d.addCallback(run, "get", "tahoe-file0")
1504 d.addCallback(_check_stdout_against, 0)
1505 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1506 d.addCallback(_check_stdout_against, 1)
1507 outfile0 = os.path.join(self.basedir, "outfile0")
1508 d.addCallback(run, "get", "file2", outfile0)
1509 def _check_outfile0((out,err)):
1510 data = open(outfile0,"rb").read()
1511 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1512 d.addCallback(_check_outfile0)
1513 outfile1 = os.path.join(self.basedir, "outfile0")
1514 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1515 def _check_outfile1((out,err)):
1516 data = open(outfile1,"rb").read()
1517 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1518 d.addCallback(_check_outfile1)
1520 d.addCallback(run, "rm", "tahoe-file0")
1521 d.addCallback(run, "rm", "tahoe:file2")
1522 d.addCallback(run, "ls")
1523 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1525 d.addCallback(run, "ls", "-l")
1526 def _check_ls_l((out,err)):
1527 lines = out.split("\n")
1529 if "tahoe-file-stdin" in l:
1530 self.failUnless(l.startswith("-r-- "), l)
1531 self.failUnless(" %d " % len(STDIN_DATA) in l)
1533 self.failUnless(l.startswith("-rw- "), l) # mutable
1534 d.addCallback(_check_ls_l)
1536 d.addCallback(run, "ls", "--uri")
1537 def _check_ls_uri((out,err)):
1538 lines = out.split("\n")
1541 self.failUnless(self._mutable_file3_uri in l)
1542 d.addCallback(_check_ls_uri)
1544 d.addCallback(run, "ls", "--readonly-uri")
1545 def _check_ls_rouri((out,err)):
1546 lines = out.split("\n")
1549 rw_uri = self._mutable_file3_uri
1550 u = uri.from_string_mutable_filenode(rw_uri)
1551 ro_uri = u.get_readonly().to_string()
1552 self.failUnless(ro_uri in l)
1553 d.addCallback(_check_ls_rouri)
1556 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1557 d.addCallback(run, "ls")
1558 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1560 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1561 d.addCallback(run, "ls")
1562 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1564 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1565 d.addCallback(run, "ls")
1566 d.addCallback(_check_ls, ["file3", "file3-copy"])
1567 d.addCallback(run, "get", "tahoe:file3-copy")
1568 d.addCallback(_check_stdout_against, 3)
1570 # copy from disk into tahoe
1571 d.addCallback(run, "cp", files[4], "tahoe:file4")
1572 d.addCallback(run, "ls")
1573 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1574 d.addCallback(run, "get", "tahoe:file4")
1575 d.addCallback(_check_stdout_against, 4)
1577 # copy from tahoe into disk
1578 target_filename = os.path.join(self.basedir, "file-out")
1579 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1580 def _check_cp_out((out,err)):
1581 self.failUnless(os.path.exists(target_filename))
1582 got = open(target_filename,"rb").read()
1583 self.failUnlessEqual(got, datas[4])
1584 d.addCallback(_check_cp_out)
1586 # copy from disk to disk (silly case)
1587 target2_filename = os.path.join(self.basedir, "file-out-copy")
1588 d.addCallback(run, "cp", target_filename, target2_filename)
1589 def _check_cp_out2((out,err)):
1590 self.failUnless(os.path.exists(target2_filename))
1591 got = open(target2_filename,"rb").read()
1592 self.failUnlessEqual(got, datas[4])
1593 d.addCallback(_check_cp_out2)
1595 # copy from tahoe into disk, overwriting an existing file
1596 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1597 def _check_cp_out3((out,err)):
1598 self.failUnless(os.path.exists(target_filename))
1599 got = open(target_filename,"rb").read()
1600 self.failUnlessEqual(got, datas[3])
1601 d.addCallback(_check_cp_out3)
1603 # copy from disk into tahoe, overwriting an existing immutable file
1604 d.addCallback(run, "cp", files[5], "tahoe:file4")
1605 d.addCallback(run, "ls")
1606 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1607 d.addCallback(run, "get", "tahoe:file4")
1608 d.addCallback(_check_stdout_against, 5)
1610 # copy from disk into tahoe, overwriting an existing mutable file
1611 d.addCallback(run, "cp", files[5], "tahoe:file3")
1612 d.addCallback(run, "ls")
1613 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1614 d.addCallback(run, "get", "tahoe:file3")
1615 d.addCallback(_check_stdout_against, 5)
1617 # recursive copy: setup
1618 dn = os.path.join(self.basedir, "dir1")
1620 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1621 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1622 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1623 sdn2 = os.path.join(dn, "subdir2")
1625 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1626 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1628 # from disk into tahoe
1629 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1630 d.addCallback(run, "ls")
1631 d.addCallback(_check_ls, ["dir1"])
1632 d.addCallback(run, "ls", "dir1")
1633 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1634 ["rfile4", "rfile5"])
1635 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1636 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1637 ["rfile1", "rfile2", "rfile3"])
1638 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1639 d.addCallback(_check_stdout_against, data="rfile4")
1641 # and back out again
1642 dn_copy = os.path.join(self.basedir, "dir1-copy")
1643 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1644 def _check_cp_r_out((out,err)):
1646 old = open(os.path.join(dn, name), "rb").read()
1647 newfn = os.path.join(dn_copy, name)
1648 self.failUnless(os.path.exists(newfn))
1649 new = open(newfn, "rb").read()
1650 self.failUnlessEqual(old, new)
1654 _cmp(os.path.join("subdir2", "rfile4"))
1655 _cmp(os.path.join("subdir2", "rfile5"))
1656 d.addCallback(_check_cp_r_out)
1658 # and copy it a second time, which ought to overwrite the same files
1659 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1661 # and again, only writing filecaps
1662 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1663 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1664 def _check_capsonly((out,err)):
1665 # these should all be LITs
1666 x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1667 y = uri.from_string_filenode(x)
1668 self.failUnlessEqual(y.data, "rfile4")
1669 d.addCallback(_check_capsonly)
1671 # and tahoe-to-tahoe
1672 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1673 d.addCallback(run, "ls")
1674 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1675 d.addCallback(run, "ls", "dir1-copy")
1676 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1677 ["rfile4", "rfile5"])
1678 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1679 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1680 ["rfile1", "rfile2", "rfile3"])
1681 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1682 d.addCallback(_check_stdout_against, data="rfile4")
1684 # and copy it a second time, which ought to overwrite the same files
1685 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1687 # tahoe_ls doesn't currently handle the error correctly: it tries to
1688 # JSON-parse a traceback.
1689 ## def _ls_missing(res):
1690 ## argv = ["ls"] + nodeargs + ["bogus"]
1691 ## return self._run_cli(argv)
1692 ## d.addCallback(_ls_missing)
1693 ## def _check_ls_missing((out,err)):
1696 ## self.failUnlessEqual(err, "")
1697 ## d.addCallback(_check_ls_missing)
1701 def _run_cli(self, argv, stdin=""):
1703 stdout, stderr = StringIO(), StringIO()
1704 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1705 stdin=StringIO(stdin),
1706 stdout=stdout, stderr=stderr)
1708 return stdout.getvalue(), stderr.getvalue()
1709 d.addCallback(_done)
1712 def _test_checker(self, res):
1713 ut = upload.Data("too big to be literal" * 200, convergence=None)
1714 d = self._personal_node.add_file(u"big file", ut)
1716 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1717 def _check_dirnode_results(r):
1718 self.failUnless(r.is_healthy())
1719 d.addCallback(_check_dirnode_results)
1720 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1721 d.addCallback(_check_dirnode_results)
1723 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1724 def _got_chk_filenode(n):
1725 self.failUnless(isinstance(n, ImmutableFileNode))
1726 d = n.check(Monitor())
1727 def _check_filenode_results(r):
1728 self.failUnless(r.is_healthy())
1729 d.addCallback(_check_filenode_results)
1730 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1731 d.addCallback(_check_filenode_results)
1733 d.addCallback(_got_chk_filenode)
1735 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1736 def _got_lit_filenode(n):
1737 self.failUnless(isinstance(n, LiteralFileNode))
1738 d = n.check(Monitor())
1739 def _check_lit_filenode_results(r):
1740 self.failUnlessEqual(r, None)
1741 d.addCallback(_check_lit_filenode_results)
1742 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1743 d.addCallback(_check_lit_filenode_results)
1745 d.addCallback(_got_lit_filenode)