1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.util.consumer import MemoryConsumer, download_to_data
16 from allmydata.scripts import runner
17 from allmydata.interfaces import IDirectoryNode, IFileNode, \
18 NoSuchChildError, NoSharesError
19 from allmydata.monitor import Monitor
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap.api import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
27 from allmydata.test.common import SystemTestMixin
30 This is some data to publish to the virtual drive, which needs to be large
31 enough to not fit inside a LIT uri.
34 class CountingDataUploadable(upload.Data):
36 interrupt_after = None
37 interrupt_after_d = None
39 def read(self, length):
40 self.bytes_read += length
41 if self.interrupt_after is not None:
42 if self.bytes_read > self.interrupt_after:
43 self.interrupt_after = None
44 self.interrupt_after_d.callback(self)
45 return upload.Data.read(self, length)
47 class SystemTest(SystemTestMixin, unittest.TestCase):
48 timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
50 def test_connections(self):
51 self.basedir = "system/SystemTest/test_connections"
52 d = self.set_up_nodes()
53 self.extra_node = None
54 d.addCallback(lambda res: self.add_extra_node(self.numclients))
55 def _check(extra_node):
56 self.extra_node = extra_node
57 for c in self.clients:
58 all_peerids = c.get_storage_broker().get_all_serverids()
59 self.failUnlessEqual(len(all_peerids), self.numclients+1)
61 permuted_peers = sb.get_servers_for_index("a")
62 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
65 def _shutdown_extra_node(res):
67 return self.extra_node.stopService()
69 d.addBoth(_shutdown_extra_node)
71 # test_connections is subsumed by test_upload_and_download, and takes
72 # quite a while to run on a slow machine (because of all the TLS
73 # connections that must be established). If we ever rework the introducer
74 # code to such an extent that we're not sure if it works anymore, we can
75 # reinstate this test until it does.
78 def test_upload_and_download_random_key(self):
79 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
80 return self._test_upload_and_download(convergence=None)
82 def test_upload_and_download_convergent(self):
83 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
84 return self._test_upload_and_download(convergence="some convergence string")
86 def _test_upload_and_download(self, convergence):
87 # we use 4000 bytes of data, which will result in about 400k written
88 # to disk among all our simulated nodes
89 DATA = "Some data to upload\n" * 200
90 d = self.set_up_nodes()
91 def _check_connections(res):
92 for c in self.clients:
93 all_peerids = c.get_storage_broker().get_all_serverids()
94 self.failUnlessEqual(len(all_peerids), self.numclients)
96 permuted_peers = sb.get_servers_for_index("a")
97 self.failUnlessEqual(len(permuted_peers), self.numclients)
98 d.addCallback(_check_connections)
102 u = self.clients[0].getServiceNamed("uploader")
104 # we crank the max segsize down to 1024b for the duration of this
105 # test, so we can exercise multiple segments. It is important
106 # that this is not a multiple of the segment size, so that the
107 # tail segment is not the same length as the others. This actualy
108 # gets rounded up to 1025 to be a multiple of the number of
109 # required shares (since we use 25 out of 100 FEC).
110 up = upload.Data(DATA, convergence=convergence)
111 up.max_segment_size = 1024
114 d.addCallback(_do_upload)
115 def _upload_done(results):
117 log.msg("upload finished: uri is %s" % (theuri,))
119 assert isinstance(self.uri, str), self.uri
120 self.cap = uri.from_string(self.uri)
121 self.n = self.clients[1].create_node_from_uri(self.uri)
122 d.addCallback(_upload_done)
124 def _upload_again(res):
125 # Upload again. If using convergent encryption then this ought to be
126 # short-circuited, however with the way we currently generate URIs
127 # (i.e. because they include the roothash), we have to do all of the
128 # encoding work, and only get to save on the upload part.
129 log.msg("UPLOADING AGAIN")
130 up = upload.Data(DATA, convergence=convergence)
131 up.max_segment_size = 1024
132 d1 = self.uploader.upload(up)
133 d.addCallback(_upload_again)
135 def _download_to_data(res):
136 log.msg("DOWNLOADING")
137 return download_to_data(self.n)
138 d.addCallback(_download_to_data)
139 def _download_to_data_done(data):
140 log.msg("download finished")
141 self.failUnlessEqual(data, DATA)
142 d.addCallback(_download_to_data_done)
145 n = self.clients[1].create_node_from_uri(self.uri)
146 d = download_to_data(n)
147 def _read_done(data):
148 self.failUnlessEqual(data, DATA)
149 d.addCallback(_read_done)
150 d.addCallback(lambda ign:
151 n.read(MemoryConsumer(), offset=1, size=4))
152 def _read_portion_done(mc):
153 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
154 d.addCallback(_read_portion_done)
155 d.addCallback(lambda ign:
156 n.read(MemoryConsumer(), offset=2, size=None))
157 def _read_tail_done(mc):
158 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
159 d.addCallback(_read_tail_done)
160 d.addCallback(lambda ign:
161 n.read(MemoryConsumer(), size=len(DATA)+1000))
162 def _read_too_much(mc):
163 self.failUnlessEqual("".join(mc.chunks), DATA)
164 d.addCallback(_read_too_much)
167 d.addCallback(_test_read)
169 def _test_bad_read(res):
170 bad_u = uri.from_string_filenode(self.uri)
171 bad_u.key = self.flip_bit(bad_u.key)
172 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
173 # this should cause an error during download
175 d = self.shouldFail2(NoSharesError, "'download bad node'",
177 bad_n.read, MemoryConsumer(), offset=2)
179 d.addCallback(_test_bad_read)
181 def _download_nonexistent_uri(res):
182 baduri = self.mangle_uri(self.uri)
183 badnode = self.clients[1].create_node_from_uri(baduri)
184 log.msg("about to download non-existent URI", level=log.UNUSUAL,
185 facility="tahoe.tests")
186 d1 = download_to_data(badnode)
187 def _baduri_should_fail(res):
188 log.msg("finished downloading non-existend URI",
189 level=log.UNUSUAL, facility="tahoe.tests")
190 self.failUnless(isinstance(res, Failure))
191 self.failUnless(res.check(NoSharesError),
192 "expected NoSharesError, got %s" % res)
193 d1.addBoth(_baduri_should_fail)
195 d.addCallback(_download_nonexistent_uri)
197 # add a new node, which doesn't accept shares, and only uses the
199 d.addCallback(lambda res: self.add_extra_node(self.numclients,
201 add_to_sparent=True))
202 def _added(extra_node):
203 self.extra_node = extra_node
204 d.addCallback(_added)
206 HELPER_DATA = "Data that needs help to upload" * 1000
207 def _upload_with_helper(res):
208 u = upload.Data(HELPER_DATA, convergence=convergence)
209 d = self.extra_node.upload(u)
210 def _uploaded(results):
211 n = self.clients[1].create_node_from_uri(results.uri)
212 return download_to_data(n)
213 d.addCallback(_uploaded)
215 self.failUnlessEqual(newdata, HELPER_DATA)
216 d.addCallback(_check)
218 d.addCallback(_upload_with_helper)
220 def _upload_duplicate_with_helper(res):
221 u = upload.Data(HELPER_DATA, convergence=convergence)
222 u.debug_stash_RemoteEncryptedUploadable = True
223 d = self.extra_node.upload(u)
224 def _uploaded(results):
225 n = self.clients[1].create_node_from_uri(results.uri)
226 return download_to_data(n)
227 d.addCallback(_uploaded)
229 self.failUnlessEqual(newdata, HELPER_DATA)
230 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
231 "uploadable started uploading, should have been avoided")
232 d.addCallback(_check)
234 if convergence is not None:
235 d.addCallback(_upload_duplicate_with_helper)
237 def _upload_resumable(res):
238 DATA = "Data that needs help to upload and gets interrupted" * 1000
239 u1 = CountingDataUploadable(DATA, convergence=convergence)
240 u2 = CountingDataUploadable(DATA, convergence=convergence)
242 # we interrupt the connection after about 5kB by shutting down
243 # the helper, then restartingit.
244 u1.interrupt_after = 5000
245 u1.interrupt_after_d = defer.Deferred()
246 u1.interrupt_after_d.addCallback(lambda res:
247 self.bounce_client(0))
249 # sneak into the helper and reduce its chunk size, so that our
250 # debug_interrupt will sever the connection on about the fifth
251 # chunk fetched. This makes sure that we've started to write the
252 # new shares before we abandon them, which exercises the
253 # abort/delete-partial-share code. TODO: find a cleaner way to do
254 # this. I know that this will affect later uses of the helper in
255 # this same test run, but I'm not currently worried about it.
256 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
258 d = self.extra_node.upload(u1)
260 def _should_not_finish(res):
261 self.fail("interrupted upload should have failed, not finished"
262 " with result %s" % (res,))
264 f.trap(DeadReferenceError)
266 # make sure we actually interrupted it before finishing the
268 self.failUnless(u1.bytes_read < len(DATA),
269 "read %d out of %d total" % (u1.bytes_read,
272 log.msg("waiting for reconnect", level=log.NOISY,
273 facility="tahoe.test.test_system")
274 # now, we need to give the nodes a chance to notice that this
275 # connection has gone away. When this happens, the storage
276 # servers will be told to abort their uploads, removing the
277 # partial shares. Unfortunately this involves TCP messages
278 # going through the loopback interface, and we can't easily
279 # predict how long that will take. If it were all local, we
280 # could use fireEventually() to stall. Since we don't have
281 # the right introduction hooks, the best we can do is use a
282 # fixed delay. TODO: this is fragile.
283 u1.interrupt_after_d.addCallback(self.stall, 2.0)
284 return u1.interrupt_after_d
285 d.addCallbacks(_should_not_finish, _interrupted)
287 def _disconnected(res):
288 # check to make sure the storage servers aren't still hanging
289 # on to the partial share: their incoming/ directories should
291 log.msg("disconnected", level=log.NOISY,
292 facility="tahoe.test.test_system")
293 for i in range(self.numclients):
294 incdir = os.path.join(self.getdir("client%d" % i),
295 "storage", "shares", "incoming")
296 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
297 d.addCallback(_disconnected)
299 # then we need to give the reconnector a chance to
300 # reestablish the connection to the helper.
301 d.addCallback(lambda res:
302 log.msg("wait_for_connections", level=log.NOISY,
303 facility="tahoe.test.test_system"))
304 d.addCallback(lambda res: self.wait_for_connections())
307 d.addCallback(lambda res:
308 log.msg("uploading again", level=log.NOISY,
309 facility="tahoe.test.test_system"))
310 d.addCallback(lambda res: self.extra_node.upload(u2))
312 def _uploaded(results):
314 log.msg("Second upload complete", level=log.NOISY,
315 facility="tahoe.test.test_system")
317 # this is really bytes received rather than sent, but it's
318 # convenient and basically measures the same thing
319 bytes_sent = results.ciphertext_fetched
321 # We currently don't support resumption of upload if the data is
322 # encrypted with a random key. (Because that would require us
323 # to store the key locally and re-use it on the next upload of
324 # this file, which isn't a bad thing to do, but we currently
326 if convergence is not None:
327 # Make sure we did not have to read the whole file the
328 # second time around .
329 self.failUnless(bytes_sent < len(DATA),
330 "resumption didn't save us any work:"
331 " read %d bytes out of %d total" %
332 (bytes_sent, len(DATA)))
334 # Make sure we did have to read the whole file the second
335 # time around -- because the one that we partially uploaded
336 # earlier was encrypted with a different random key.
337 self.failIf(bytes_sent < len(DATA),
338 "resumption saved us some work even though we were using random keys:"
339 " read %d bytes out of %d total" %
340 (bytes_sent, len(DATA)))
341 n = self.clients[1].create_node_from_uri(cap)
342 return download_to_data(n)
343 d.addCallback(_uploaded)
346 self.failUnlessEqual(newdata, DATA)
347 # If using convergent encryption, then also check that the
348 # helper has removed the temp file from its directories.
349 if convergence is not None:
350 basedir = os.path.join(self.getdir("client0"), "helper")
351 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
352 self.failUnlessEqual(files, [])
353 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
354 self.failUnlessEqual(files, [])
355 d.addCallback(_check)
357 d.addCallback(_upload_resumable)
359 def _grab_stats(ignored):
360 # the StatsProvider doesn't normally publish a FURL:
361 # instead it passes a live reference to the StatsGatherer
362 # (if and when it connects). To exercise the remote stats
363 # interface, we manually publish client0's StatsProvider
364 # and use client1 to query it.
365 sp = self.clients[0].stats_provider
366 sp_furl = self.clients[0].tub.registerReference(sp)
367 d = self.clients[1].tub.getReference(sp_furl)
368 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
369 def _got_stats(stats):
371 #from pprint import pprint
374 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
375 c = stats["counters"]
376 self.failUnless("storage_server.allocate" in c)
377 d.addCallback(_got_stats)
379 d.addCallback(_grab_stats)
383 def _find_shares(self, basedir):
385 for (dirpath, dirnames, filenames) in os.walk(basedir):
386 if "storage" not in dirpath:
390 pieces = dirpath.split(os.sep)
392 and pieces[-4] == "storage"
393 and pieces[-3] == "shares"):
394 # we're sitting in .../storage/shares/$START/$SINDEX , and there
395 # are sharefiles here
396 assert pieces[-5].startswith("client")
397 client_num = int(pieces[-5][-1])
398 storage_index_s = pieces[-1]
399 storage_index = si_a2b(storage_index_s)
400 for sharename in filenames:
401 shnum = int(sharename)
402 filename = os.path.join(dirpath, sharename)
403 data = (client_num, storage_index, filename, shnum)
406 self.fail("unable to find any share files in %s" % basedir)
409 def _corrupt_mutable_share(self, filename, which):
410 msf = MutableShareFile(filename)
411 datav = msf.readv([ (0, 1000000) ])
412 final_share = datav[0]
413 assert len(final_share) < 1000000 # ought to be truncated
414 pieces = mutable_layout.unpack_share(final_share)
415 (seqnum, root_hash, IV, k, N, segsize, datalen,
416 verification_key, signature, share_hash_chain, block_hash_tree,
417 share_data, enc_privkey) = pieces
419 if which == "seqnum":
422 root_hash = self.flip_bit(root_hash)
424 IV = self.flip_bit(IV)
425 elif which == "segsize":
426 segsize = segsize + 15
427 elif which == "pubkey":
428 verification_key = self.flip_bit(verification_key)
429 elif which == "signature":
430 signature = self.flip_bit(signature)
431 elif which == "share_hash_chain":
432 nodenum = share_hash_chain.keys()[0]
433 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
434 elif which == "block_hash_tree":
435 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
436 elif which == "share_data":
437 share_data = self.flip_bit(share_data)
438 elif which == "encprivkey":
439 enc_privkey = self.flip_bit(enc_privkey)
441 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
443 final_share = mutable_layout.pack_share(prefix,
450 msf.writev( [(0, final_share)], None)
453 def test_mutable(self):
454 self.basedir = "system/SystemTest/test_mutable"
455 DATA = "initial contents go here." # 25 bytes % 3 != 0
456 NEWDATA = "new contents yay"
457 NEWERDATA = "this is getting old"
459 d = self.set_up_nodes(use_key_generator=True)
461 def _create_mutable(res):
463 log.msg("starting create_mutable_file")
464 d1 = c.create_mutable_file(DATA)
466 log.msg("DONE: %s" % (res,))
467 self._mutable_node_1 = res
469 d1.addCallback(_done)
471 d.addCallback(_create_mutable)
473 def _test_debug(res):
474 # find a share. It is important to run this while there is only
475 # one slot in the grid.
476 shares = self._find_shares(self.basedir)
477 (client_num, storage_index, filename, shnum) = shares[0]
478 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
480 log.msg(" for clients[%d]" % client_num)
482 out,err = StringIO(), StringIO()
483 rc = runner.runner(["debug", "dump-share", "--offsets",
485 stdout=out, stderr=err)
486 output = out.getvalue()
487 self.failUnlessEqual(rc, 0)
489 self.failUnless("Mutable slot found:\n" in output)
490 self.failUnless("share_type: SDMF\n" in output)
491 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
492 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
493 self.failUnless(" num_extra_leases: 0\n" in output)
494 self.failUnless(" secrets are for nodeid: %s\n" % peerid
496 self.failUnless(" SDMF contents:\n" in output)
497 self.failUnless(" seqnum: 1\n" in output)
498 self.failUnless(" required_shares: 3\n" in output)
499 self.failUnless(" total_shares: 10\n" in output)
500 self.failUnless(" segsize: 27\n" in output, (output, filename))
501 self.failUnless(" datalen: 25\n" in output)
502 # the exact share_hash_chain nodes depends upon the sharenum,
503 # and is more of a hassle to compute than I want to deal with
505 self.failUnless(" share_hash_chain: " in output)
506 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
507 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
508 base32.b2a(storage_index))
509 self.failUnless(expected in output)
510 except unittest.FailTest:
512 print "dump-share output was:"
515 d.addCallback(_test_debug)
519 # first, let's see if we can use the existing node to retrieve the
520 # contents. This allows it to use the cached pubkey and maybe the
521 # latest-known sharemap.
523 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
524 def _check_download_1(res):
525 self.failUnlessEqual(res, DATA)
526 # now we see if we can retrieve the data from a new node,
527 # constructed using the URI of the original one. We do this test
528 # on the same client that uploaded the data.
529 uri = self._mutable_node_1.get_uri()
530 log.msg("starting retrieve1")
531 newnode = self.clients[0].create_node_from_uri(uri)
532 newnode_2 = self.clients[0].create_node_from_uri(uri)
533 self.failUnlessIdentical(newnode, newnode_2)
534 return newnode.download_best_version()
535 d.addCallback(_check_download_1)
537 def _check_download_2(res):
538 self.failUnlessEqual(res, DATA)
539 # same thing, but with a different client
540 uri = self._mutable_node_1.get_uri()
541 newnode = self.clients[1].create_node_from_uri(uri)
542 log.msg("starting retrieve2")
543 d1 = newnode.download_best_version()
544 d1.addCallback(lambda res: (res, newnode))
546 d.addCallback(_check_download_2)
548 def _check_download_3((res, newnode)):
549 self.failUnlessEqual(res, DATA)
551 log.msg("starting replace1")
552 d1 = newnode.overwrite(NEWDATA)
553 d1.addCallback(lambda res: newnode.download_best_version())
555 d.addCallback(_check_download_3)
557 def _check_download_4(res):
558 self.failUnlessEqual(res, NEWDATA)
559 # now create an even newer node and replace the data on it. This
560 # new node has never been used for download before.
561 uri = self._mutable_node_1.get_uri()
562 newnode1 = self.clients[2].create_node_from_uri(uri)
563 newnode2 = self.clients[3].create_node_from_uri(uri)
564 self._newnode3 = self.clients[3].create_node_from_uri(uri)
565 log.msg("starting replace2")
566 d1 = newnode1.overwrite(NEWERDATA)
567 d1.addCallback(lambda res: newnode2.download_best_version())
569 d.addCallback(_check_download_4)
571 def _check_download_5(res):
572 log.msg("finished replace2")
573 self.failUnlessEqual(res, NEWERDATA)
574 d.addCallback(_check_download_5)
576 def _corrupt_shares(res):
577 # run around and flip bits in all but k of the shares, to test
579 shares = self._find_shares(self.basedir)
580 ## sort by share number
581 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
582 where = dict([ (shnum, filename)
583 for (client_num, storage_index, filename, shnum)
585 assert len(where) == 10 # this test is designed for 3-of-10
586 for shnum, filename in where.items():
587 # shares 7,8,9 are left alone. read will check
588 # (share_hash_chain, block_hash_tree, share_data). New
589 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
590 # segsize, signature).
592 # read: this will trigger "pubkey doesn't match
594 self._corrupt_mutable_share(filename, "pubkey")
595 self._corrupt_mutable_share(filename, "encprivkey")
597 # triggers "signature is invalid"
598 self._corrupt_mutable_share(filename, "seqnum")
600 # triggers "signature is invalid"
601 self._corrupt_mutable_share(filename, "R")
603 # triggers "signature is invalid"
604 self._corrupt_mutable_share(filename, "segsize")
606 self._corrupt_mutable_share(filename, "share_hash_chain")
608 self._corrupt_mutable_share(filename, "block_hash_tree")
610 self._corrupt_mutable_share(filename, "share_data")
611 # other things to correct: IV, signature
612 # 7,8,9 are left alone
614 # note that initial_query_count=5 means that we'll hit the
615 # first 5 servers in effectively random order (based upon
616 # response time), so we won't necessarily ever get a "pubkey
617 # doesn't match fingerprint" error (if we hit shnum>=1 before
618 # shnum=0, we pull the pubkey from there). To get repeatable
619 # specific failures, we need to set initial_query_count=1,
620 # but of course that will change the sequencing behavior of
621 # the retrieval process. TODO: find a reasonable way to make
622 # this a parameter, probably when we expand this test to test
623 # for one failure mode at a time.
625 # when we retrieve this, we should get three signature
626 # failures (where we've mangled seqnum, R, and segsize). The
628 d.addCallback(_corrupt_shares)
630 d.addCallback(lambda res: self._newnode3.download_best_version())
631 d.addCallback(_check_download_5)
633 def _check_empty_file(res):
634 # make sure we can create empty files, this usually screws up the
636 d1 = self.clients[2].create_mutable_file("")
637 d1.addCallback(lambda newnode: newnode.download_best_version())
638 d1.addCallback(lambda res: self.failUnlessEqual("", res))
640 d.addCallback(_check_empty_file)
642 d.addCallback(lambda res: self.clients[0].create_dirnode())
643 def _created_dirnode(dnode):
644 log.msg("_created_dirnode(%s)" % (dnode,))
646 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
647 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
648 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
649 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
650 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
651 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
652 d1.addCallback(lambda res: dnode.build_manifest().when_done())
653 d1.addCallback(lambda res:
654 self.failUnlessEqual(len(res["manifest"]), 1))
656 d.addCallback(_created_dirnode)
658 def wait_for_c3_kg_conn():
659 return self.clients[3]._key_generator is not None
660 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
662 def check_kg_poolsize(junk, size_delta):
663 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
664 self.key_generator_svc.key_generator.pool_size + size_delta)
666 d.addCallback(check_kg_poolsize, 0)
667 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
668 d.addCallback(check_kg_poolsize, -1)
669 d.addCallback(lambda junk: self.clients[3].create_dirnode())
670 d.addCallback(check_kg_poolsize, -2)
671 # use_helper induces use of clients[3], which is the using-key_gen client
672 d.addCallback(lambda junk:
673 self.POST("uri?t=mkdir&name=george", use_helper=True))
674 d.addCallback(check_kg_poolsize, -3)
678 def flip_bit(self, good):
679 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
681 def mangle_uri(self, gooduri):
682 # change the key, which changes the storage index, which means we'll
683 # be asking about the wrong file, so nobody will have any shares
684 u = uri.from_string(gooduri)
685 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
686 uri_extension_hash=u.uri_extension_hash,
687 needed_shares=u.needed_shares,
688 total_shares=u.total_shares,
690 return u2.to_string()
692 # TODO: add a test which mangles the uri_extension_hash instead, and
693 # should fail due to not being able to get a valid uri_extension block.
694 # Also a test which sneakily mangles the uri_extension block to change
695 # some of the validation data, so it will fail in the post-download phase
696 # when the file's crypttext integrity check fails. Do the same thing for
697 # the key, which should cause the download to fail the post-download
698 # plaintext_hash check.
700 def test_vdrive(self):
701 self.basedir = "system/SystemTest/test_vdrive"
702 self.data = LARGE_DATA
703 d = self.set_up_nodes(use_stats_gatherer=True)
704 d.addCallback(self._test_introweb)
705 d.addCallback(self.log, "starting publish")
706 d.addCallback(self._do_publish1)
707 d.addCallback(self._test_runner)
708 d.addCallback(self._do_publish2)
709 # at this point, we have the following filesystem (where "R" denotes
710 # self._root_directory_uri):
713 # R/subdir1/mydata567
715 # R/subdir1/subdir2/mydata992
717 d.addCallback(lambda res: self.bounce_client(0))
718 d.addCallback(self.log, "bounced client0")
720 d.addCallback(self._check_publish1)
721 d.addCallback(self.log, "did _check_publish1")
722 d.addCallback(self._check_publish2)
723 d.addCallback(self.log, "did _check_publish2")
724 d.addCallback(self._do_publish_private)
725 d.addCallback(self.log, "did _do_publish_private")
726 # now we also have (where "P" denotes a new dir):
727 # P/personal/sekrit data
728 # P/s2-rw -> /subdir1/subdir2/
729 # P/s2-ro -> /subdir1/subdir2/ (read-only)
730 d.addCallback(self._check_publish_private)
731 d.addCallback(self.log, "did _check_publish_private")
732 d.addCallback(self._test_web)
733 d.addCallback(self._test_control)
734 d.addCallback(self._test_cli)
735 # P now has four top-level children:
736 # P/personal/sekrit data
739 # P/test_put/ (empty)
740 d.addCallback(self._test_checker)
743 def _test_introweb(self, res):
744 d = getPage(self.introweb_url, method="GET", followRedirect=True)
747 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
749 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
750 self.failUnless("Subscription Summary: storage: 5" in res)
751 except unittest.FailTest:
753 print "GET %s output was:" % self.introweb_url
756 d.addCallback(_check)
757 d.addCallback(lambda res:
758 getPage(self.introweb_url + "?t=json",
759 method="GET", followRedirect=True))
760 def _check_json(res):
761 data = simplejson.loads(res)
763 self.failUnlessEqual(data["subscription_summary"],
765 self.failUnlessEqual(data["announcement_summary"],
766 {"storage": 5, "stub_client": 5})
767 self.failUnlessEqual(data["announcement_distinct_hosts"],
768 {"storage": 1, "stub_client": 1})
769 except unittest.FailTest:
771 print "GET %s?t=json output was:" % self.introweb_url
774 d.addCallback(_check_json)
777 def _do_publish1(self, res):
778 ut = upload.Data(self.data, convergence=None)
780 d = c0.create_dirnode()
781 def _made_root(new_dirnode):
782 self._root_directory_uri = new_dirnode.get_uri()
783 return c0.create_node_from_uri(self._root_directory_uri)
784 d.addCallback(_made_root)
785 d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
786 def _made_subdir1(subdir1_node):
787 self._subdir1_node = subdir1_node
788 d1 = subdir1_node.add_file(u"mydata567", ut)
789 d1.addCallback(self.log, "publish finished")
790 def _stash_uri(filenode):
791 self.uri = filenode.get_uri()
792 assert isinstance(self.uri, str), (self.uri, filenode)
793 d1.addCallback(_stash_uri)
795 d.addCallback(_made_subdir1)
798 def _do_publish2(self, res):
799 ut = upload.Data(self.data, convergence=None)
800 d = self._subdir1_node.create_subdirectory(u"subdir2")
801 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
804 def log(self, res, *args, **kwargs):
805 # print "MSG: %s RES: %s" % (msg, args)
806 log.msg(*args, **kwargs)
809 def _do_publish_private(self, res):
810 self.smalldata = "sssh, very secret stuff"
811 ut = upload.Data(self.smalldata, convergence=None)
812 d = self.clients[0].create_dirnode()
813 d.addCallback(self.log, "GOT private directory")
814 def _got_new_dir(privnode):
815 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
816 d1 = privnode.create_subdirectory(u"personal")
817 d1.addCallback(self.log, "made P/personal")
818 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
819 d1.addCallback(self.log, "made P/personal/sekrit data")
820 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
822 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
823 s2node.get_readonly_uri())
824 d2.addCallback(lambda node:
825 privnode.set_uri(u"s2-ro",
826 s2node.get_readonly_uri(),
827 s2node.get_readonly_uri()))
829 d1.addCallback(_got_s2)
830 d1.addCallback(lambda res: privnode)
832 d.addCallback(_got_new_dir)
835 def _check_publish1(self, res):
836 # this one uses the iterative API
838 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
839 d.addCallback(self.log, "check_publish1 got /")
840 d.addCallback(lambda root: root.get(u"subdir1"))
841 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
842 d.addCallback(lambda filenode: download_to_data(filenode))
843 d.addCallback(self.log, "get finished")
845 self.failUnlessEqual(data, self.data)
846 d.addCallback(_get_done)
849 def _check_publish2(self, res):
850 # this one uses the path-based API
851 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
852 d = rootnode.get_child_at_path(u"subdir1")
853 d.addCallback(lambda dirnode:
854 self.failUnless(IDirectoryNode.providedBy(dirnode)))
855 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
856 d.addCallback(lambda filenode: download_to_data(filenode))
857 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
859 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
860 def _got_filenode(filenode):
861 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
862 assert fnode == filenode
863 d.addCallback(_got_filenode)
866 def _check_publish_private(self, resnode):
867 # this one uses the path-based API
868 self._private_node = resnode
870 d = self._private_node.get_child_at_path(u"personal")
871 def _got_personal(personal):
872 self._personal_node = personal
874 d.addCallback(_got_personal)
876 d.addCallback(lambda dirnode:
877 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
879 return self._private_node.get_child_at_path(path)
881 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
882 d.addCallback(lambda filenode: download_to_data(filenode))
883 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
884 d.addCallback(lambda res: get_path(u"s2-rw"))
885 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
886 d.addCallback(lambda res: get_path(u"s2-ro"))
887 def _got_s2ro(dirnode):
888 self.failUnless(dirnode.is_mutable(), dirnode)
889 self.failUnless(dirnode.is_readonly(), dirnode)
890 d1 = defer.succeed(None)
891 d1.addCallback(lambda res: dirnode.list())
892 d1.addCallback(self.log, "dirnode.list")
894 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
896 d1.addCallback(self.log, "doing add_file(ro)")
897 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
898 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
900 d1.addCallback(self.log, "doing get(ro)")
901 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
902 d1.addCallback(lambda filenode:
903 self.failUnless(IFileNode.providedBy(filenode)))
905 d1.addCallback(self.log, "doing delete(ro)")
906 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
908 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
910 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
912 personal = self._personal_node
913 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
915 d1.addCallback(self.log, "doing move_child_to(ro)2")
916 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
918 d1.addCallback(self.log, "finished with _got_s2ro")
920 d.addCallback(_got_s2ro)
921 def _got_home(dummy):
922 home = self._private_node
923 personal = self._personal_node
924 d1 = defer.succeed(None)
925 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
926 d1.addCallback(lambda res:
927 personal.move_child_to(u"sekrit data",home,u"sekrit"))
929 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
930 d1.addCallback(lambda res:
931 home.move_child_to(u"sekrit", home, u"sekrit data"))
933 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
934 d1.addCallback(lambda res:
935 home.move_child_to(u"sekrit data", personal))
937 d1.addCallback(lambda res: home.build_manifest().when_done())
938 d1.addCallback(self.log, "manifest")
942 # P/personal/sekrit data
943 # P/s2-rw (same as P/s2-ro)
944 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
945 d1.addCallback(lambda res:
946 self.failUnlessEqual(len(res["manifest"]), 5))
947 d1.addCallback(lambda res: home.start_deep_stats().when_done())
948 def _check_stats(stats):
949 expected = {"count-immutable-files": 1,
950 "count-mutable-files": 0,
951 "count-literal-files": 1,
953 "count-directories": 3,
954 "size-immutable-files": 112,
955 "size-literal-files": 23,
956 #"size-directories": 616, # varies
957 #"largest-directory": 616,
958 "largest-directory-children": 3,
959 "largest-immutable-file": 112,
961 for k,v in expected.iteritems():
962 self.failUnlessEqual(stats[k], v,
963 "stats[%s] was %s, not %s" %
965 self.failUnless(stats["size-directories"] > 1300,
966 stats["size-directories"])
967 self.failUnless(stats["largest-directory"] > 800,
968 stats["largest-directory"])
969 self.failUnlessEqual(stats["size-files-histogram"],
970 [ (11, 31, 1), (101, 316, 1) ])
971 d1.addCallback(_check_stats)
973 d.addCallback(_got_home)
976 def shouldFail(self, res, expected_failure, which, substring=None):
977 if isinstance(res, Failure):
978 res.trap(expected_failure)
980 self.failUnless(substring in str(res),
981 "substring '%s' not in '%s'"
982 % (substring, str(res)))
984 self.fail("%s was supposed to raise %s, not get '%s'" %
985 (which, expected_failure, res))
987 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
988 assert substring is None or isinstance(substring, str)
989 d = defer.maybeDeferred(callable, *args, **kwargs)
991 if isinstance(res, Failure):
992 res.trap(expected_failure)
994 self.failUnless(substring in str(res),
995 "substring '%s' not in '%s'"
996 % (substring, str(res)))
998 self.fail("%s was supposed to raise %s, not get '%s'" %
999 (which, expected_failure, res))
1003 def PUT(self, urlpath, data):
1004 url = self.webish_url + urlpath
1005 return getPage(url, method="PUT", postdata=data)
1007 def GET(self, urlpath, followRedirect=False):
1008 url = self.webish_url + urlpath
1009 return getPage(url, method="GET", followRedirect=followRedirect)
1011 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1012 sepbase = "boogabooga"
1013 sep = "--" + sepbase
1016 form.append('Content-Disposition: form-data; name="_charset"')
1018 form.append('UTF-8')
1020 for name, value in fields.iteritems():
1021 if isinstance(value, tuple):
1022 filename, value = value
1023 form.append('Content-Disposition: form-data; name="%s"; '
1024 'filename="%s"' % (name, filename.encode("utf-8")))
1026 form.append('Content-Disposition: form-data; name="%s"' % name)
1028 form.append(str(value))
1034 body = "\r\n".join(form) + "\r\n"
1035 headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1036 return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1038 def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1041 url = self.helper_webish_url + urlpath
1043 url = self.webish_url + urlpath
1044 return getPage(url, method="POST", postdata=body, headers=headers,
1045 followRedirect=followRedirect)
1047 def _test_web(self, res):
1048 base = self.webish_url
1049 public = "uri/" + self._root_directory_uri
1051 def _got_welcome(page):
1052 # XXX This test is oversensitive to formatting
1053 expected = "Connected to <span>%d</span>\n of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1054 self.failUnless(expected in page,
1055 "I didn't see the right 'connected storage servers'"
1056 " message in: %s" % page
1058 expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1059 self.failUnless(expected in page,
1060 "I didn't see the right 'My nodeid' message "
1062 self.failUnless("Helper: 0 active uploads" in page)
1063 d.addCallback(_got_welcome)
1064 d.addCallback(self.log, "done with _got_welcome")
1066 # get the welcome page from the node that uses the helper too
1067 d.addCallback(lambda res: getPage(self.helper_webish_url))
1068 def _got_welcome_helper(page):
1069 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1071 self.failUnless("Not running helper" in page)
1072 d.addCallback(_got_welcome_helper)
1074 d.addCallback(lambda res: getPage(base + public))
1075 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1076 def _got_subdir1(page):
1077 # there ought to be an href for our file
1078 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1079 self.failUnless(">mydata567</a>" in page)
1080 d.addCallback(_got_subdir1)
1081 d.addCallback(self.log, "done with _got_subdir1")
1082 d.addCallback(lambda res:
1083 getPage(base + public + "/subdir1/mydata567"))
1084 def _got_data(page):
1085 self.failUnlessEqual(page, self.data)
1086 d.addCallback(_got_data)
1088 # download from a URI embedded in a URL
1089 d.addCallback(self.log, "_get_from_uri")
1090 def _get_from_uri(res):
1091 return getPage(base + "uri/%s?filename=%s"
1092 % (self.uri, "mydata567"))
1093 d.addCallback(_get_from_uri)
1094 def _got_from_uri(page):
1095 self.failUnlessEqual(page, self.data)
1096 d.addCallback(_got_from_uri)
1098 # download from a URI embedded in a URL, second form
1099 d.addCallback(self.log, "_get_from_uri2")
1100 def _get_from_uri2(res):
1101 return getPage(base + "uri?uri=%s" % (self.uri,))
1102 d.addCallback(_get_from_uri2)
1103 d.addCallback(_got_from_uri)
1105 # download from a bogus URI, make sure we get a reasonable error
1106 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1107 def _get_from_bogus_uri(res):
1108 d1 = getPage(base + "uri/%s?filename=%s"
1109 % (self.mangle_uri(self.uri), "mydata567"))
1110 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1113 d.addCallback(_get_from_bogus_uri)
1114 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1116 # upload a file with PUT
1117 d.addCallback(self.log, "about to try PUT")
1118 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1119 "new.txt contents"))
1120 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1121 d.addCallback(self.failUnlessEqual, "new.txt contents")
1122 # and again with something large enough to use multiple segments,
1123 # and hopefully trigger pauseProducing too
1124 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1125 "big" * 500000)) # 1.5MB
1126 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1127 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1129 # can we replace files in place?
1130 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1132 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1133 d.addCallback(self.failUnlessEqual, "NEWER contents")
1135 # test unlinked POST
1136 d.addCallback(lambda res: self.POST("uri", t="upload",
1137 file=("new.txt", "data" * 10000)))
1138 # and again using the helper, which exercises different upload-status
1140 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1141 file=("foo.txt", "data2" * 10000)))
1143 # check that the status page exists
1144 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1145 def _got_status(res):
1146 # find an interesting upload and download to look at. LIT files
1147 # are not interesting.
1148 h = self.clients[0].get_history()
1149 for ds in h.list_all_download_statuses():
1150 if ds.get_size() > 200:
1151 self._down_status = ds.get_counter()
1152 for us in h.list_all_upload_statuses():
1153 if us.get_size() > 200:
1154 self._up_status = us.get_counter()
1155 rs = list(h.list_all_retrieve_statuses())[0]
1156 self._retrieve_status = rs.get_counter()
1157 ps = list(h.list_all_publish_statuses())[0]
1158 self._publish_status = ps.get_counter()
1159 us = list(h.list_all_mapupdate_statuses())[0]
1160 self._update_status = us.get_counter()
1162 # and that there are some upload- and download- status pages
1163 return self.GET("status/up-%d" % self._up_status)
1164 d.addCallback(_got_status)
1166 return self.GET("status/down-%d" % self._down_status)
1167 d.addCallback(_got_up)
1169 return self.GET("status/mapupdate-%d" % self._update_status)
1170 d.addCallback(_got_down)
1171 def _got_update(res):
1172 return self.GET("status/publish-%d" % self._publish_status)
1173 d.addCallback(_got_update)
1174 def _got_publish(res):
1175 return self.GET("status/retrieve-%d" % self._retrieve_status)
1176 d.addCallback(_got_publish)
1178 # check that the helper status page exists
1179 d.addCallback(lambda res:
1180 self.GET("helper_status", followRedirect=True))
1181 def _got_helper_status(res):
1182 self.failUnless("Bytes Fetched:" in res)
1183 # touch a couple of files in the helper's working directory to
1184 # exercise more code paths
1185 workdir = os.path.join(self.getdir("client0"), "helper")
1186 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1187 f = open(incfile, "wb")
1188 f.write("small file")
1190 then = time.time() - 86400*3
1192 os.utime(incfile, (now, then))
1193 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1194 f = open(encfile, "wb")
1195 f.write("less small file")
1197 os.utime(encfile, (now, then))
1198 d.addCallback(_got_helper_status)
1199 # and that the json form exists
1200 d.addCallback(lambda res:
1201 self.GET("helper_status?t=json", followRedirect=True))
1202 def _got_helper_status_json(res):
1203 data = simplejson.loads(res)
1204 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1206 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1207 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1208 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1210 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1211 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1212 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1214 d.addCallback(_got_helper_status_json)
1216 # and check that client[3] (which uses a helper but does not run one
1217 # itself) doesn't explode when you ask for its status
1218 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1219 def _got_non_helper_status(res):
1220 self.failUnless("Upload and Download Status" in res)
1221 d.addCallback(_got_non_helper_status)
1223 # or for helper status with t=json
1224 d.addCallback(lambda res:
1225 getPage(self.helper_webish_url + "helper_status?t=json"))
1226 def _got_non_helper_status_json(res):
1227 data = simplejson.loads(res)
1228 self.failUnlessEqual(data, {})
1229 d.addCallback(_got_non_helper_status_json)
1231 # see if the statistics page exists
1232 d.addCallback(lambda res: self.GET("statistics"))
1233 def _got_stats(res):
1234 self.failUnless("Node Statistics" in res)
1235 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1236 d.addCallback(_got_stats)
1237 d.addCallback(lambda res: self.GET("statistics?t=json"))
1238 def _got_stats_json(res):
1239 data = simplejson.loads(res)
1240 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1241 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1242 d.addCallback(_got_stats_json)
1244 # TODO: mangle the second segment of a file, to test errors that
1245 # occur after we've already sent some good data, which uses a
1246 # different error path.
1248 # TODO: download a URI with a form
1249 # TODO: create a directory by using a form
1250 # TODO: upload by using a form on the directory page
1251 # url = base + "somedir/subdir1/freeform_post!!upload"
1252 # TODO: delete a file by using a button on the directory page
1256 def _test_runner(self, res):
1257 # exercise some of the diagnostic tools in runner.py
1260 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1261 if "storage" not in dirpath:
1265 pieces = dirpath.split(os.sep)
1266 if (len(pieces) >= 4
1267 and pieces[-4] == "storage"
1268 and pieces[-3] == "shares"):
1269 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1270 # are sharefiles here
1271 filename = os.path.join(dirpath, filenames[0])
1272 # peek at the magic to see if it is a chk share
1273 magic = open(filename, "rb").read(4)
1274 if magic == '\x00\x00\x00\x01':
1277 self.fail("unable to find any uri_extension files in %s"
1279 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1281 out,err = StringIO(), StringIO()
1282 rc = runner.runner(["debug", "dump-share", "--offsets",
1284 stdout=out, stderr=err)
1285 output = out.getvalue()
1286 self.failUnlessEqual(rc, 0)
1288 # we only upload a single file, so we can assert some things about
1289 # its size and shares.
1290 self.failUnless(("share filename: %s" % filename) in output)
1291 self.failUnless("size: %d\n" % len(self.data) in output)
1292 self.failUnless("num_segments: 1\n" in output)
1293 # segment_size is always a multiple of needed_shares
1294 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1295 self.failUnless("total_shares: 10\n" in output)
1296 # keys which are supposed to be present
1297 for key in ("size", "num_segments", "segment_size",
1298 "needed_shares", "total_shares",
1299 "codec_name", "codec_params", "tail_codec_params",
1300 #"plaintext_hash", "plaintext_root_hash",
1301 "crypttext_hash", "crypttext_root_hash",
1302 "share_root_hash", "UEB_hash"):
1303 self.failUnless("%s: " % key in output, key)
1304 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1306 # now use its storage index to find the other shares using the
1307 # 'find-shares' tool
1308 sharedir, shnum = os.path.split(filename)
1309 storagedir, storage_index_s = os.path.split(sharedir)
1310 out,err = StringIO(), StringIO()
1311 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1312 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1313 rc = runner.runner(cmd, stdout=out, stderr=err)
1314 self.failUnlessEqual(rc, 0)
1316 sharefiles = [sfn.strip() for sfn in out.readlines()]
1317 self.failUnlessEqual(len(sharefiles), 10)
1319 # also exercise the 'catalog-shares' tool
1320 out,err = StringIO(), StringIO()
1321 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1322 cmd = ["debug", "catalog-shares"] + nodedirs
1323 rc = runner.runner(cmd, stdout=out, stderr=err)
1324 self.failUnlessEqual(rc, 0)
1326 descriptions = [sfn.strip() for sfn in out.readlines()]
1327 self.failUnlessEqual(len(descriptions), 30)
1329 for line in descriptions
1330 if line.startswith("CHK %s " % storage_index_s)]
1331 self.failUnlessEqual(len(matching), 10)
1333 def _test_control(self, res):
1334 # exercise the remote-control-the-client foolscap interfaces in
1335 # allmydata.control (mostly used for performance tests)
1336 c0 = self.clients[0]
1337 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1338 control_furl = open(control_furl_file, "r").read().strip()
1339 # it doesn't really matter which Tub we use to connect to the client,
1340 # so let's just use our IntroducerNode's
1341 d = self.introducer.tub.getReference(control_furl)
1342 d.addCallback(self._test_control2, control_furl_file)
1344 def _test_control2(self, rref, filename):
1345 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1346 downfile = os.path.join(self.basedir, "control.downfile")
1347 d.addCallback(lambda uri:
1348 rref.callRemote("download_from_uri_to_file",
1351 self.failUnlessEqual(res, downfile)
1352 data = open(downfile, "r").read()
1353 expected_data = open(filename, "r").read()
1354 self.failUnlessEqual(data, expected_data)
1355 d.addCallback(_check)
1356 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1357 if sys.platform == "linux2":
1358 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1359 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1362 def _test_cli(self, res):
1363 # run various CLI commands (in a thread, since they use blocking
1366 private_uri = self._private_node.get_uri()
1367 some_uri = self._root_directory_uri
1368 client0_basedir = self.getdir("client0")
1371 "--node-directory", client0_basedir,
1373 TESTDATA = "I will not write the same thing over and over.\n" * 100
1375 d = defer.succeed(None)
1377 # for compatibility with earlier versions, private/root_dir.cap is
1378 # supposed to be treated as an alias named "tahoe:". Start by making
1379 # sure that works, before we add other aliases.
1381 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1382 f = open(root_file, "w")
1383 f.write(private_uri)
1386 def run(ignored, verb, *args, **kwargs):
1387 stdin = kwargs.get("stdin", "")
1388 newargs = [verb] + nodeargs + list(args)
1389 return self._run_cli(newargs, stdin=stdin)
1391 def _check_ls((out,err), expected_children, unexpected_children=[]):
1392 self.failUnlessEqual(err, "")
1393 for s in expected_children:
1394 self.failUnless(s in out, (s,out))
1395 for s in unexpected_children:
1396 self.failIf(s in out, (s,out))
1398 def _check_ls_root((out,err)):
1399 self.failUnless("personal" in out)
1400 self.failUnless("s2-ro" in out)
1401 self.failUnless("s2-rw" in out)
1402 self.failUnlessEqual(err, "")
1404 # this should reference private_uri
1405 d.addCallback(run, "ls")
1406 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1408 d.addCallback(run, "list-aliases")
1409 def _check_aliases_1((out,err)):
1410 self.failUnlessEqual(err, "")
1411 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1412 d.addCallback(_check_aliases_1)
1414 # now that that's out of the way, remove root_dir.cap and work with
1416 d.addCallback(lambda res: os.unlink(root_file))
1417 d.addCallback(run, "list-aliases")
1418 def _check_aliases_2((out,err)):
1419 self.failUnlessEqual(err, "")
1420 self.failUnlessEqual(out, "")
1421 d.addCallback(_check_aliases_2)
1423 d.addCallback(run, "mkdir")
1424 def _got_dir( (out,err) ):
1425 self.failUnless(uri.from_string_dirnode(out.strip()))
1427 d.addCallback(_got_dir)
1428 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1430 d.addCallback(run, "list-aliases")
1431 def _check_aliases_3((out,err)):
1432 self.failUnlessEqual(err, "")
1433 self.failUnless("tahoe: " in out)
1434 d.addCallback(_check_aliases_3)
1436 def _check_empty_dir((out,err)):
1437 self.failUnlessEqual(out, "")
1438 self.failUnlessEqual(err, "")
1439 d.addCallback(run, "ls")
1440 d.addCallback(_check_empty_dir)
1442 def _check_missing_dir((out,err)):
1443 # TODO: check that rc==2
1444 self.failUnlessEqual(out, "")
1445 self.failUnlessEqual(err, "No such file or directory\n")
1446 d.addCallback(run, "ls", "bogus")
1447 d.addCallback(_check_missing_dir)
1452 fn = os.path.join(self.basedir, "file%d" % i)
1454 data = "data to be uploaded: file%d\n" % i
1456 open(fn,"wb").write(data)
1458 def _check_stdout_against((out,err), filenum=None, data=None):
1459 self.failUnlessEqual(err, "")
1460 if filenum is not None:
1461 self.failUnlessEqual(out, datas[filenum])
1462 if data is not None:
1463 self.failUnlessEqual(out, data)
1465 # test all both forms of put: from a file, and from stdin
1467 d.addCallback(run, "put", files[0], "tahoe-file0")
1468 def _put_out((out,err)):
1469 self.failUnless("URI:LIT:" in out, out)
1470 self.failUnless("201 Created" in err, err)
1472 return run(None, "get", uri0)
1473 d.addCallback(_put_out)
1474 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1476 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1477 # tahoe put bar tahoe:FOO
1478 d.addCallback(run, "put", files[2], "tahoe:file2")
1479 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1480 def _check_put_mutable((out,err)):
1481 self._mutable_file3_uri = out.strip()
1482 d.addCallback(_check_put_mutable)
1483 d.addCallback(run, "get", "tahoe:file3")
1484 d.addCallback(_check_stdout_against, 3)
1487 STDIN_DATA = "This is the file to upload from stdin."
1488 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1489 # tahoe put tahoe:FOO
1490 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1491 stdin="Other file from stdin.")
1493 d.addCallback(run, "ls")
1494 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1495 "tahoe-file-stdin", "from-stdin"])
1496 d.addCallback(run, "ls", "subdir")
1497 d.addCallback(_check_ls, ["tahoe-file1"])
1500 d.addCallback(run, "mkdir", "subdir2")
1501 d.addCallback(run, "ls")
1502 # TODO: extract the URI, set an alias with it
1503 d.addCallback(_check_ls, ["subdir2"])
1505 # tahoe get: (to stdin and to a file)
1506 d.addCallback(run, "get", "tahoe-file0")
1507 d.addCallback(_check_stdout_against, 0)
1508 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1509 d.addCallback(_check_stdout_against, 1)
1510 outfile0 = os.path.join(self.basedir, "outfile0")
1511 d.addCallback(run, "get", "file2", outfile0)
1512 def _check_outfile0((out,err)):
1513 data = open(outfile0,"rb").read()
1514 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1515 d.addCallback(_check_outfile0)
1516 outfile1 = os.path.join(self.basedir, "outfile0")
1517 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1518 def _check_outfile1((out,err)):
1519 data = open(outfile1,"rb").read()
1520 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1521 d.addCallback(_check_outfile1)
1523 d.addCallback(run, "rm", "tahoe-file0")
1524 d.addCallback(run, "rm", "tahoe:file2")
1525 d.addCallback(run, "ls")
1526 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1528 d.addCallback(run, "ls", "-l")
1529 def _check_ls_l((out,err)):
1530 lines = out.split("\n")
1532 if "tahoe-file-stdin" in l:
1533 self.failUnless(l.startswith("-r-- "), l)
1534 self.failUnless(" %d " % len(STDIN_DATA) in l)
1536 self.failUnless(l.startswith("-rw- "), l) # mutable
1537 d.addCallback(_check_ls_l)
1539 d.addCallback(run, "ls", "--uri")
1540 def _check_ls_uri((out,err)):
1541 lines = out.split("\n")
1544 self.failUnless(self._mutable_file3_uri in l)
1545 d.addCallback(_check_ls_uri)
1547 d.addCallback(run, "ls", "--readonly-uri")
1548 def _check_ls_rouri((out,err)):
1549 lines = out.split("\n")
1552 rw_uri = self._mutable_file3_uri
1553 u = uri.from_string_mutable_filenode(rw_uri)
1554 ro_uri = u.get_readonly().to_string()
1555 self.failUnless(ro_uri in l)
1556 d.addCallback(_check_ls_rouri)
1559 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1560 d.addCallback(run, "ls")
1561 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1563 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1564 d.addCallback(run, "ls")
1565 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1567 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1568 d.addCallback(run, "ls")
1569 d.addCallback(_check_ls, ["file3", "file3-copy"])
1570 d.addCallback(run, "get", "tahoe:file3-copy")
1571 d.addCallback(_check_stdout_against, 3)
1573 # copy from disk into tahoe
1574 d.addCallback(run, "cp", files[4], "tahoe:file4")
1575 d.addCallback(run, "ls")
1576 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1577 d.addCallback(run, "get", "tahoe:file4")
1578 d.addCallback(_check_stdout_against, 4)
1580 # copy from tahoe into disk
1581 target_filename = os.path.join(self.basedir, "file-out")
1582 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1583 def _check_cp_out((out,err)):
1584 self.failUnless(os.path.exists(target_filename))
1585 got = open(target_filename,"rb").read()
1586 self.failUnlessEqual(got, datas[4])
1587 d.addCallback(_check_cp_out)
1589 # copy from disk to disk (silly case)
1590 target2_filename = os.path.join(self.basedir, "file-out-copy")
1591 d.addCallback(run, "cp", target_filename, target2_filename)
1592 def _check_cp_out2((out,err)):
1593 self.failUnless(os.path.exists(target2_filename))
1594 got = open(target2_filename,"rb").read()
1595 self.failUnlessEqual(got, datas[4])
1596 d.addCallback(_check_cp_out2)
1598 # copy from tahoe into disk, overwriting an existing file
1599 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1600 def _check_cp_out3((out,err)):
1601 self.failUnless(os.path.exists(target_filename))
1602 got = open(target_filename,"rb").read()
1603 self.failUnlessEqual(got, datas[3])
1604 d.addCallback(_check_cp_out3)
1606 # copy from disk into tahoe, overwriting an existing immutable file
1607 d.addCallback(run, "cp", files[5], "tahoe:file4")
1608 d.addCallback(run, "ls")
1609 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1610 d.addCallback(run, "get", "tahoe:file4")
1611 d.addCallback(_check_stdout_against, 5)
1613 # copy from disk into tahoe, overwriting an existing mutable file
1614 d.addCallback(run, "cp", files[5], "tahoe:file3")
1615 d.addCallback(run, "ls")
1616 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1617 d.addCallback(run, "get", "tahoe:file3")
1618 d.addCallback(_check_stdout_against, 5)
1620 # recursive copy: setup
1621 dn = os.path.join(self.basedir, "dir1")
1623 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1624 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1625 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1626 sdn2 = os.path.join(dn, "subdir2")
1628 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1629 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1631 # from disk into tahoe
1632 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1633 d.addCallback(run, "ls")
1634 d.addCallback(_check_ls, ["dir1"])
1635 d.addCallback(run, "ls", "dir1")
1636 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1637 ["rfile4", "rfile5"])
1638 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1639 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1640 ["rfile1", "rfile2", "rfile3"])
1641 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1642 d.addCallback(_check_stdout_against, data="rfile4")
1644 # and back out again
1645 dn_copy = os.path.join(self.basedir, "dir1-copy")
1646 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1647 def _check_cp_r_out((out,err)):
1649 old = open(os.path.join(dn, name), "rb").read()
1650 newfn = os.path.join(dn_copy, name)
1651 self.failUnless(os.path.exists(newfn))
1652 new = open(newfn, "rb").read()
1653 self.failUnlessEqual(old, new)
1657 _cmp(os.path.join("subdir2", "rfile4"))
1658 _cmp(os.path.join("subdir2", "rfile5"))
1659 d.addCallback(_check_cp_r_out)
1661 # and copy it a second time, which ought to overwrite the same files
1662 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1664 # and again, only writing filecaps
1665 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1666 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1667 def _check_capsonly((out,err)):
1668 # these should all be LITs
1669 x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1670 y = uri.from_string_filenode(x)
1671 self.failUnlessEqual(y.data, "rfile4")
1672 d.addCallback(_check_capsonly)
1674 # and tahoe-to-tahoe
1675 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1676 d.addCallback(run, "ls")
1677 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1678 d.addCallback(run, "ls", "dir1-copy")
1679 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1680 ["rfile4", "rfile5"])
1681 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1682 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1683 ["rfile1", "rfile2", "rfile3"])
1684 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1685 d.addCallback(_check_stdout_against, data="rfile4")
1687 # and copy it a second time, which ought to overwrite the same files
1688 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1690 # tahoe_ls doesn't currently handle the error correctly: it tries to
1691 # JSON-parse a traceback.
1692 ## def _ls_missing(res):
1693 ## argv = ["ls"] + nodeargs + ["bogus"]
1694 ## return self._run_cli(argv)
1695 ## d.addCallback(_ls_missing)
1696 ## def _check_ls_missing((out,err)):
1699 ## self.failUnlessEqual(err, "")
1700 ## d.addCallback(_check_ls_missing)
1704 def _run_cli(self, argv, stdin=""):
1706 stdout, stderr = StringIO(), StringIO()
1707 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1708 stdin=StringIO(stdin),
1709 stdout=stdout, stderr=stderr)
1711 return stdout.getvalue(), stderr.getvalue()
1712 d.addCallback(_done)
1715 def _test_checker(self, res):
1716 ut = upload.Data("too big to be literal" * 200, convergence=None)
1717 d = self._personal_node.add_file(u"big file", ut)
1719 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1720 def _check_dirnode_results(r):
1721 self.failUnless(r.is_healthy())
1722 d.addCallback(_check_dirnode_results)
1723 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1724 d.addCallback(_check_dirnode_results)
1726 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1727 def _got_chk_filenode(n):
1728 self.failUnless(isinstance(n, ImmutableFileNode))
1729 d = n.check(Monitor())
1730 def _check_filenode_results(r):
1731 self.failUnless(r.is_healthy())
1732 d.addCallback(_check_filenode_results)
1733 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1734 d.addCallback(_check_filenode_results)
1736 d.addCallback(_got_chk_filenode)
1738 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1739 def _got_lit_filenode(n):
1740 self.failUnless(isinstance(n, LiteralFileNode))
1741 d = n.check(Monitor())
1742 def _check_lit_filenode_results(r):
1743 self.failUnlessEqual(r, None)
1744 d.addCallback(_check_lit_filenode_results)
1745 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1746 d.addCallback(_check_lit_filenode_results)
1748 d.addCallback(_got_lit_filenode)