1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.util.consumer import MemoryConsumer, download_to_data
16 from allmydata.scripts import runner
17 from allmydata.interfaces import IDirectoryNode, IFileNode, \
18 NoSuchChildError, NoSharesError
19 from allmydata.monitor import Monitor
20 from allmydata.mutable.common import NotWriteableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap.api import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
27 from allmydata.test.common import SystemTestMixin
30 This is some data to publish to the remote grid.., which needs to be large
31 enough to not fit inside a LIT uri.
34 class CountingDataUploadable(upload.Data):
36 interrupt_after = None
37 interrupt_after_d = None
39 def read(self, length):
40 self.bytes_read += length
41 if self.interrupt_after is not None:
42 if self.bytes_read > self.interrupt_after:
43 self.interrupt_after = None
44 self.interrupt_after_d.callback(self)
45 return upload.Data.read(self, length)
47 class SystemTest(SystemTestMixin, unittest.TestCase):
48 timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
50 def test_connections(self):
51 self.basedir = "system/SystemTest/test_connections"
52 d = self.set_up_nodes()
53 self.extra_node = None
54 d.addCallback(lambda res: self.add_extra_node(self.numclients))
55 def _check(extra_node):
56 self.extra_node = extra_node
57 for c in self.clients:
58 all_peerids = c.get_storage_broker().get_all_serverids()
59 self.failUnlessEqual(len(all_peerids), self.numclients+1)
61 permuted_peers = sb.get_servers_for_index("a")
62 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
65 def _shutdown_extra_node(res):
67 return self.extra_node.stopService()
69 d.addBoth(_shutdown_extra_node)
71 # test_connections is subsumed by test_upload_and_download, and takes
72 # quite a while to run on a slow machine (because of all the TLS
73 # connections that must be established). If we ever rework the introducer
74 # code to such an extent that we're not sure if it works anymore, we can
75 # reinstate this test until it does.
78 def test_upload_and_download_random_key(self):
79 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
80 return self._test_upload_and_download(convergence=None)
82 def test_upload_and_download_convergent(self):
83 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
84 return self._test_upload_and_download(convergence="some convergence string")
86 def _test_upload_and_download(self, convergence):
87 # we use 4000 bytes of data, which will result in about 400k written
88 # to disk among all our simulated nodes
89 DATA = "Some data to upload\n" * 200
90 d = self.set_up_nodes()
91 def _check_connections(res):
92 for c in self.clients:
93 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
94 all_peerids = c.get_storage_broker().get_all_serverids()
95 self.failUnlessEqual(len(all_peerids), self.numclients)
97 permuted_peers = sb.get_servers_for_index("a")
98 self.failUnlessEqual(len(permuted_peers), self.numclients)
99 d.addCallback(_check_connections)
103 u = self.clients[0].getServiceNamed("uploader")
105 # we crank the max segsize down to 1024b for the duration of this
106 # test, so we can exercise multiple segments. It is important
107 # that this is not a multiple of the segment size, so that the
108 # tail segment is not the same length as the others. This actualy
109 # gets rounded up to 1025 to be a multiple of the number of
110 # required shares (since we use 25 out of 100 FEC).
111 up = upload.Data(DATA, convergence=convergence)
112 up.max_segment_size = 1024
115 d.addCallback(_do_upload)
116 def _upload_done(results):
118 log.msg("upload finished: uri is %s" % (theuri,))
120 assert isinstance(self.uri, str), self.uri
121 self.cap = uri.from_string(self.uri)
122 self.n = self.clients[1].create_node_from_uri(self.uri)
123 d.addCallback(_upload_done)
125 def _upload_again(res):
126 # Upload again. If using convergent encryption then this ought to be
127 # short-circuited, however with the way we currently generate URIs
128 # (i.e. because they include the roothash), we have to do all of the
129 # encoding work, and only get to save on the upload part.
130 log.msg("UPLOADING AGAIN")
131 up = upload.Data(DATA, convergence=convergence)
132 up.max_segment_size = 1024
133 return self.uploader.upload(up)
134 d.addCallback(_upload_again)
136 def _download_to_data(res):
137 log.msg("DOWNLOADING")
138 return download_to_data(self.n)
139 d.addCallback(_download_to_data)
140 def _download_to_data_done(data):
141 log.msg("download finished")
142 self.failUnlessEqual(data, DATA)
143 d.addCallback(_download_to_data_done)
146 n = self.clients[1].create_node_from_uri(self.uri)
147 d = download_to_data(n)
148 def _read_done(data):
149 self.failUnlessEqual(data, DATA)
150 d.addCallback(_read_done)
151 d.addCallback(lambda ign:
152 n.read(MemoryConsumer(), offset=1, size=4))
153 def _read_portion_done(mc):
154 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
155 d.addCallback(_read_portion_done)
156 d.addCallback(lambda ign:
157 n.read(MemoryConsumer(), offset=2, size=None))
158 def _read_tail_done(mc):
159 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
160 d.addCallback(_read_tail_done)
161 d.addCallback(lambda ign:
162 n.read(MemoryConsumer(), size=len(DATA)+1000))
163 def _read_too_much(mc):
164 self.failUnlessEqual("".join(mc.chunks), DATA)
165 d.addCallback(_read_too_much)
168 d.addCallback(_test_read)
170 def _test_bad_read(res):
171 bad_u = uri.from_string_filenode(self.uri)
172 bad_u.key = self.flip_bit(bad_u.key)
173 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
174 # this should cause an error during download
176 d = self.shouldFail2(NoSharesError, "'download bad node'",
178 bad_n.read, MemoryConsumer(), offset=2)
180 d.addCallback(_test_bad_read)
182 def _download_nonexistent_uri(res):
183 baduri = self.mangle_uri(self.uri)
184 badnode = self.clients[1].create_node_from_uri(baduri)
185 log.msg("about to download non-existent URI", level=log.UNUSUAL,
186 facility="tahoe.tests")
187 d1 = download_to_data(badnode)
188 def _baduri_should_fail(res):
189 log.msg("finished downloading non-existend URI",
190 level=log.UNUSUAL, facility="tahoe.tests")
191 self.failUnless(isinstance(res, Failure))
192 self.failUnless(res.check(NoSharesError),
193 "expected NoSharesError, got %s" % res)
194 d1.addBoth(_baduri_should_fail)
196 d.addCallback(_download_nonexistent_uri)
198 # add a new node, which doesn't accept shares, and only uses the
200 d.addCallback(lambda res: self.add_extra_node(self.numclients,
202 add_to_sparent=True))
203 def _added(extra_node):
204 self.extra_node = extra_node
205 self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
206 d.addCallback(_added)
208 HELPER_DATA = "Data that needs help to upload" * 1000
209 def _upload_with_helper(res):
210 u = upload.Data(HELPER_DATA, convergence=convergence)
211 d = self.extra_node.upload(u)
212 def _uploaded(results):
213 n = self.clients[1].create_node_from_uri(results.uri)
214 return download_to_data(n)
215 d.addCallback(_uploaded)
217 self.failUnlessEqual(newdata, HELPER_DATA)
218 d.addCallback(_check)
220 d.addCallback(_upload_with_helper)
222 def _upload_duplicate_with_helper(res):
223 u = upload.Data(HELPER_DATA, convergence=convergence)
224 u.debug_stash_RemoteEncryptedUploadable = True
225 d = self.extra_node.upload(u)
226 def _uploaded(results):
227 n = self.clients[1].create_node_from_uri(results.uri)
228 return download_to_data(n)
229 d.addCallback(_uploaded)
231 self.failUnlessEqual(newdata, HELPER_DATA)
232 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
233 "uploadable started uploading, should have been avoided")
234 d.addCallback(_check)
236 if convergence is not None:
237 d.addCallback(_upload_duplicate_with_helper)
239 def _upload_resumable(res):
240 DATA = "Data that needs help to upload and gets interrupted" * 1000
241 u1 = CountingDataUploadable(DATA, convergence=convergence)
242 u2 = CountingDataUploadable(DATA, convergence=convergence)
244 # we interrupt the connection after about 5kB by shutting down
245 # the helper, then restartingit.
246 u1.interrupt_after = 5000
247 u1.interrupt_after_d = defer.Deferred()
248 u1.interrupt_after_d.addCallback(lambda res:
249 self.bounce_client(0))
251 # sneak into the helper and reduce its chunk size, so that our
252 # debug_interrupt will sever the connection on about the fifth
253 # chunk fetched. This makes sure that we've started to write the
254 # new shares before we abandon them, which exercises the
255 # abort/delete-partial-share code. TODO: find a cleaner way to do
256 # this. I know that this will affect later uses of the helper in
257 # this same test run, but I'm not currently worried about it.
258 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
260 d = self.extra_node.upload(u1)
262 def _should_not_finish(res):
263 self.fail("interrupted upload should have failed, not finished"
264 " with result %s" % (res,))
266 f.trap(DeadReferenceError)
268 # make sure we actually interrupted it before finishing the
270 self.failUnless(u1.bytes_read < len(DATA),
271 "read %d out of %d total" % (u1.bytes_read,
274 log.msg("waiting for reconnect", level=log.NOISY,
275 facility="tahoe.test.test_system")
276 # now, we need to give the nodes a chance to notice that this
277 # connection has gone away. When this happens, the storage
278 # servers will be told to abort their uploads, removing the
279 # partial shares. Unfortunately this involves TCP messages
280 # going through the loopback interface, and we can't easily
281 # predict how long that will take. If it were all local, we
282 # could use fireEventually() to stall. Since we don't have
283 # the right introduction hooks, the best we can do is use a
284 # fixed delay. TODO: this is fragile.
285 u1.interrupt_after_d.addCallback(self.stall, 2.0)
286 return u1.interrupt_after_d
287 d.addCallbacks(_should_not_finish, _interrupted)
289 def _disconnected(res):
290 # check to make sure the storage servers aren't still hanging
291 # on to the partial share: their incoming/ directories should
293 log.msg("disconnected", level=log.NOISY,
294 facility="tahoe.test.test_system")
295 for i in range(self.numclients):
296 incdir = os.path.join(self.getdir("client%d" % i),
297 "storage", "shares", "incoming")
298 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
299 d.addCallback(_disconnected)
301 # then we need to give the reconnector a chance to
302 # reestablish the connection to the helper.
303 d.addCallback(lambda res:
304 log.msg("wait_for_connections", level=log.NOISY,
305 facility="tahoe.test.test_system"))
306 d.addCallback(lambda res: self.wait_for_connections())
309 d.addCallback(lambda res:
310 log.msg("uploading again", level=log.NOISY,
311 facility="tahoe.test.test_system"))
312 d.addCallback(lambda res: self.extra_node.upload(u2))
314 def _uploaded(results):
316 log.msg("Second upload complete", level=log.NOISY,
317 facility="tahoe.test.test_system")
319 # this is really bytes received rather than sent, but it's
320 # convenient and basically measures the same thing
321 bytes_sent = results.ciphertext_fetched
322 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
324 # We currently don't support resumption of upload if the data is
325 # encrypted with a random key. (Because that would require us
326 # to store the key locally and re-use it on the next upload of
327 # this file, which isn't a bad thing to do, but we currently
329 if convergence is not None:
330 # Make sure we did not have to read the whole file the
331 # second time around .
332 self.failUnless(bytes_sent < len(DATA),
333 "resumption didn't save us any work:"
334 " read %r bytes out of %r total" %
335 (bytes_sent, len(DATA)))
337 # Make sure we did have to read the whole file the second
338 # time around -- because the one that we partially uploaded
339 # earlier was encrypted with a different random key.
340 self.failIf(bytes_sent < len(DATA),
341 "resumption saved us some work even though we were using random keys:"
342 " read %r bytes out of %r total" %
343 (bytes_sent, len(DATA)))
344 n = self.clients[1].create_node_from_uri(cap)
345 return download_to_data(n)
346 d.addCallback(_uploaded)
349 self.failUnlessEqual(newdata, DATA)
350 # If using convergent encryption, then also check that the
351 # helper has removed the temp file from its directories.
352 if convergence is not None:
353 basedir = os.path.join(self.getdir("client0"), "helper")
354 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
355 self.failUnlessEqual(files, [])
356 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
357 self.failUnlessEqual(files, [])
358 d.addCallback(_check)
360 d.addCallback(_upload_resumable)
362 def _grab_stats(ignored):
363 # the StatsProvider doesn't normally publish a FURL:
364 # instead it passes a live reference to the StatsGatherer
365 # (if and when it connects). To exercise the remote stats
366 # interface, we manually publish client0's StatsProvider
367 # and use client1 to query it.
368 sp = self.clients[0].stats_provider
369 sp_furl = self.clients[0].tub.registerReference(sp)
370 d = self.clients[1].tub.getReference(sp_furl)
371 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
372 def _got_stats(stats):
374 #from pprint import pprint
377 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
378 c = stats["counters"]
379 self.failUnless("storage_server.allocate" in c)
380 d.addCallback(_got_stats)
382 d.addCallback(_grab_stats)
386 def _find_all_shares(self, basedir):
388 for (dirpath, dirnames, filenames) in os.walk(basedir):
389 if "storage" not in dirpath:
393 pieces = dirpath.split(os.sep)
395 and pieces[-4] == "storage"
396 and pieces[-3] == "shares"):
397 # we're sitting in .../storage/shares/$START/$SINDEX , and there
398 # are sharefiles here
399 assert pieces[-5].startswith("client")
400 client_num = int(pieces[-5][-1])
401 storage_index_s = pieces[-1]
402 storage_index = si_a2b(storage_index_s)
403 for sharename in filenames:
404 shnum = int(sharename)
405 filename = os.path.join(dirpath, sharename)
406 data = (client_num, storage_index, filename, shnum)
409 self.fail("unable to find any share files in %s" % basedir)
412 def _corrupt_mutable_share(self, filename, which):
413 msf = MutableShareFile(filename)
414 datav = msf.readv([ (0, 1000000) ])
415 final_share = datav[0]
416 assert len(final_share) < 1000000 # ought to be truncated
417 pieces = mutable_layout.unpack_share(final_share)
418 (seqnum, root_hash, IV, k, N, segsize, datalen,
419 verification_key, signature, share_hash_chain, block_hash_tree,
420 share_data, enc_privkey) = pieces
422 if which == "seqnum":
425 root_hash = self.flip_bit(root_hash)
427 IV = self.flip_bit(IV)
428 elif which == "segsize":
429 segsize = segsize + 15
430 elif which == "pubkey":
431 verification_key = self.flip_bit(verification_key)
432 elif which == "signature":
433 signature = self.flip_bit(signature)
434 elif which == "share_hash_chain":
435 nodenum = share_hash_chain.keys()[0]
436 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
437 elif which == "block_hash_tree":
438 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
439 elif which == "share_data":
440 share_data = self.flip_bit(share_data)
441 elif which == "encprivkey":
442 enc_privkey = self.flip_bit(enc_privkey)
444 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
446 final_share = mutable_layout.pack_share(prefix,
453 msf.writev( [(0, final_share)], None)
456 def test_mutable(self):
457 self.basedir = "system/SystemTest/test_mutable"
458 DATA = "initial contents go here." # 25 bytes % 3 != 0
459 NEWDATA = "new contents yay"
460 NEWERDATA = "this is getting old"
462 d = self.set_up_nodes(use_key_generator=True)
464 def _create_mutable(res):
466 log.msg("starting create_mutable_file")
467 d1 = c.create_mutable_file(DATA)
469 log.msg("DONE: %s" % (res,))
470 self._mutable_node_1 = res
471 d1.addCallback(_done)
473 d.addCallback(_create_mutable)
475 def _test_debug(res):
476 # find a share. It is important to run this while there is only
477 # one slot in the grid.
478 shares = self._find_all_shares(self.basedir)
479 (client_num, storage_index, filename, shnum) = shares[0]
480 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
482 log.msg(" for clients[%d]" % client_num)
484 out,err = StringIO(), StringIO()
485 rc = runner.runner(["debug", "dump-share", "--offsets",
487 stdout=out, stderr=err)
488 output = out.getvalue()
489 self.failUnlessEqual(rc, 0)
491 self.failUnless("Mutable slot found:\n" in output)
492 self.failUnless("share_type: SDMF\n" in output)
493 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
494 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
495 self.failUnless(" num_extra_leases: 0\n" in output)
496 self.failUnless(" secrets are for nodeid: %s\n" % peerid
498 self.failUnless(" SDMF contents:\n" in output)
499 self.failUnless(" seqnum: 1\n" in output)
500 self.failUnless(" required_shares: 3\n" in output)
501 self.failUnless(" total_shares: 10\n" in output)
502 self.failUnless(" segsize: 27\n" in output, (output, filename))
503 self.failUnless(" datalen: 25\n" in output)
504 # the exact share_hash_chain nodes depends upon the sharenum,
505 # and is more of a hassle to compute than I want to deal with
507 self.failUnless(" share_hash_chain: " in output)
508 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
509 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
510 base32.b2a(storage_index))
511 self.failUnless(expected in output)
512 except unittest.FailTest:
514 print "dump-share output was:"
517 d.addCallback(_test_debug)
521 # first, let's see if we can use the existing node to retrieve the
522 # contents. This allows it to use the cached pubkey and maybe the
523 # latest-known sharemap.
525 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
526 def _check_download_1(res):
527 self.failUnlessEqual(res, DATA)
528 # now we see if we can retrieve the data from a new node,
529 # constructed using the URI of the original one. We do this test
530 # on the same client that uploaded the data.
531 uri = self._mutable_node_1.get_uri()
532 log.msg("starting retrieve1")
533 newnode = self.clients[0].create_node_from_uri(uri)
534 newnode_2 = self.clients[0].create_node_from_uri(uri)
535 self.failUnlessIdentical(newnode, newnode_2)
536 return newnode.download_best_version()
537 d.addCallback(_check_download_1)
539 def _check_download_2(res):
540 self.failUnlessEqual(res, DATA)
541 # same thing, but with a different client
542 uri = self._mutable_node_1.get_uri()
543 newnode = self.clients[1].create_node_from_uri(uri)
544 log.msg("starting retrieve2")
545 d1 = newnode.download_best_version()
546 d1.addCallback(lambda res: (res, newnode))
548 d.addCallback(_check_download_2)
550 def _check_download_3((res, newnode)):
551 self.failUnlessEqual(res, DATA)
553 log.msg("starting replace1")
554 d1 = newnode.overwrite(NEWDATA)
555 d1.addCallback(lambda res: newnode.download_best_version())
557 d.addCallback(_check_download_3)
559 def _check_download_4(res):
560 self.failUnlessEqual(res, NEWDATA)
561 # now create an even newer node and replace the data on it. This
562 # new node has never been used for download before.
563 uri = self._mutable_node_1.get_uri()
564 newnode1 = self.clients[2].create_node_from_uri(uri)
565 newnode2 = self.clients[3].create_node_from_uri(uri)
566 self._newnode3 = self.clients[3].create_node_from_uri(uri)
567 log.msg("starting replace2")
568 d1 = newnode1.overwrite(NEWERDATA)
569 d1.addCallback(lambda res: newnode2.download_best_version())
571 d.addCallback(_check_download_4)
573 def _check_download_5(res):
574 log.msg("finished replace2")
575 self.failUnlessEqual(res, NEWERDATA)
576 d.addCallback(_check_download_5)
578 def _corrupt_shares(res):
579 # run around and flip bits in all but k of the shares, to test
581 shares = self._find_all_shares(self.basedir)
582 ## sort by share number
583 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
584 where = dict([ (shnum, filename)
585 for (client_num, storage_index, filename, shnum)
587 assert len(where) == 10 # this test is designed for 3-of-10
588 for shnum, filename in where.items():
589 # shares 7,8,9 are left alone. read will check
590 # (share_hash_chain, block_hash_tree, share_data). New
591 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
592 # segsize, signature).
594 # read: this will trigger "pubkey doesn't match
596 self._corrupt_mutable_share(filename, "pubkey")
597 self._corrupt_mutable_share(filename, "encprivkey")
599 # triggers "signature is invalid"
600 self._corrupt_mutable_share(filename, "seqnum")
602 # triggers "signature is invalid"
603 self._corrupt_mutable_share(filename, "R")
605 # triggers "signature is invalid"
606 self._corrupt_mutable_share(filename, "segsize")
608 self._corrupt_mutable_share(filename, "share_hash_chain")
610 self._corrupt_mutable_share(filename, "block_hash_tree")
612 self._corrupt_mutable_share(filename, "share_data")
613 # other things to correct: IV, signature
614 # 7,8,9 are left alone
616 # note that initial_query_count=5 means that we'll hit the
617 # first 5 servers in effectively random order (based upon
618 # response time), so we won't necessarily ever get a "pubkey
619 # doesn't match fingerprint" error (if we hit shnum>=1 before
620 # shnum=0, we pull the pubkey from there). To get repeatable
621 # specific failures, we need to set initial_query_count=1,
622 # but of course that will change the sequencing behavior of
623 # the retrieval process. TODO: find a reasonable way to make
624 # this a parameter, probably when we expand this test to test
625 # for one failure mode at a time.
627 # when we retrieve this, we should get three signature
628 # failures (where we've mangled seqnum, R, and segsize). The
630 d.addCallback(_corrupt_shares)
632 d.addCallback(lambda res: self._newnode3.download_best_version())
633 d.addCallback(_check_download_5)
635 def _check_empty_file(res):
636 # make sure we can create empty files, this usually screws up the
638 d1 = self.clients[2].create_mutable_file("")
639 d1.addCallback(lambda newnode: newnode.download_best_version())
640 d1.addCallback(lambda res: self.failUnlessEqual("", res))
642 d.addCallback(_check_empty_file)
644 d.addCallback(lambda res: self.clients[0].create_dirnode())
645 def _created_dirnode(dnode):
646 log.msg("_created_dirnode(%s)" % (dnode,))
648 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
649 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
650 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
651 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
652 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
653 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
654 d1.addCallback(lambda res: dnode.build_manifest().when_done())
655 d1.addCallback(lambda res:
656 self.failUnlessEqual(len(res["manifest"]), 1))
658 d.addCallback(_created_dirnode)
660 def wait_for_c3_kg_conn():
661 return self.clients[3]._key_generator is not None
662 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
664 def check_kg_poolsize(junk, size_delta):
665 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
666 self.key_generator_svc.key_generator.pool_size + size_delta)
668 d.addCallback(check_kg_poolsize, 0)
669 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
670 d.addCallback(check_kg_poolsize, -1)
671 d.addCallback(lambda junk: self.clients[3].create_dirnode())
672 d.addCallback(check_kg_poolsize, -2)
673 # use_helper induces use of clients[3], which is the using-key_gen client
674 d.addCallback(lambda junk:
675 self.POST("uri?t=mkdir&name=george", use_helper=True))
676 d.addCallback(check_kg_poolsize, -3)
680 def flip_bit(self, good):
681 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
683 def mangle_uri(self, gooduri):
684 # change the key, which changes the storage index, which means we'll
685 # be asking about the wrong file, so nobody will have any shares
686 u = uri.from_string(gooduri)
687 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
688 uri_extension_hash=u.uri_extension_hash,
689 needed_shares=u.needed_shares,
690 total_shares=u.total_shares,
692 return u2.to_string()
694 # TODO: add a test which mangles the uri_extension_hash instead, and
695 # should fail due to not being able to get a valid uri_extension block.
696 # Also a test which sneakily mangles the uri_extension block to change
697 # some of the validation data, so it will fail in the post-download phase
698 # when the file's crypttext integrity check fails. Do the same thing for
699 # the key, which should cause the download to fail the post-download
700 # plaintext_hash check.
702 def test_filesystem(self):
703 self.basedir = "system/SystemTest/test_filesystem"
704 self.data = LARGE_DATA
705 d = self.set_up_nodes(use_stats_gatherer=True)
706 def _new_happy_semantics(ign):
707 for c in self.clients:
708 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
709 d.addCallback(_new_happy_semantics)
710 d.addCallback(self._test_introweb)
711 d.addCallback(self.log, "starting publish")
712 d.addCallback(self._do_publish1)
713 d.addCallback(self._test_runner)
714 d.addCallback(self._do_publish2)
715 # at this point, we have the following filesystem (where "R" denotes
716 # self._root_directory_uri):
719 # R/subdir1/mydata567
721 # R/subdir1/subdir2/mydata992
723 d.addCallback(lambda res: self.bounce_client(0))
724 d.addCallback(self.log, "bounced client0")
726 d.addCallback(self._check_publish1)
727 d.addCallback(self.log, "did _check_publish1")
728 d.addCallback(self._check_publish2)
729 d.addCallback(self.log, "did _check_publish2")
730 d.addCallback(self._do_publish_private)
731 d.addCallback(self.log, "did _do_publish_private")
732 # now we also have (where "P" denotes a new dir):
733 # P/personal/sekrit data
734 # P/s2-rw -> /subdir1/subdir2/
735 # P/s2-ro -> /subdir1/subdir2/ (read-only)
736 d.addCallback(self._check_publish_private)
737 d.addCallback(self.log, "did _check_publish_private")
738 d.addCallback(self._test_web)
739 d.addCallback(self._test_control)
740 d.addCallback(self._test_cli)
741 # P now has four top-level children:
742 # P/personal/sekrit data
745 # P/test_put/ (empty)
746 d.addCallback(self._test_checker)
749 def _test_introweb(self, res):
750 d = getPage(self.introweb_url, method="GET", followRedirect=True)
753 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
755 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
756 self.failUnless("Subscription Summary: storage: 5" in res)
757 except unittest.FailTest:
759 print "GET %s output was:" % self.introweb_url
762 d.addCallback(_check)
763 d.addCallback(lambda res:
764 getPage(self.introweb_url + "?t=json",
765 method="GET", followRedirect=True))
766 def _check_json(res):
767 data = simplejson.loads(res)
769 self.failUnlessEqual(data["subscription_summary"],
771 self.failUnlessEqual(data["announcement_summary"],
772 {"storage": 5, "stub_client": 5})
773 self.failUnlessEqual(data["announcement_distinct_hosts"],
774 {"storage": 1, "stub_client": 1})
775 except unittest.FailTest:
777 print "GET %s?t=json output was:" % self.introweb_url
780 d.addCallback(_check_json)
783 def _do_publish1(self, res):
784 ut = upload.Data(self.data, convergence=None)
786 d = c0.create_dirnode()
787 def _made_root(new_dirnode):
788 self._root_directory_uri = new_dirnode.get_uri()
789 return c0.create_node_from_uri(self._root_directory_uri)
790 d.addCallback(_made_root)
791 d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
792 def _made_subdir1(subdir1_node):
793 self._subdir1_node = subdir1_node
794 d1 = subdir1_node.add_file(u"mydata567", ut)
795 d1.addCallback(self.log, "publish finished")
796 def _stash_uri(filenode):
797 self.uri = filenode.get_uri()
798 assert isinstance(self.uri, str), (self.uri, filenode)
799 d1.addCallback(_stash_uri)
801 d.addCallback(_made_subdir1)
804 def _do_publish2(self, res):
805 ut = upload.Data(self.data, convergence=None)
806 d = self._subdir1_node.create_subdirectory(u"subdir2")
807 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
810 def log(self, res, *args, **kwargs):
811 # print "MSG: %s RES: %s" % (msg, args)
812 log.msg(*args, **kwargs)
815 def _do_publish_private(self, res):
816 self.smalldata = "sssh, very secret stuff"
817 ut = upload.Data(self.smalldata, convergence=None)
818 d = self.clients[0].create_dirnode()
819 d.addCallback(self.log, "GOT private directory")
820 def _got_new_dir(privnode):
821 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
822 d1 = privnode.create_subdirectory(u"personal")
823 d1.addCallback(self.log, "made P/personal")
824 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
825 d1.addCallback(self.log, "made P/personal/sekrit data")
826 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
828 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
829 s2node.get_readonly_uri())
830 d2.addCallback(lambda node:
831 privnode.set_uri(u"s2-ro",
832 s2node.get_readonly_uri(),
833 s2node.get_readonly_uri()))
835 d1.addCallback(_got_s2)
836 d1.addCallback(lambda res: privnode)
838 d.addCallback(_got_new_dir)
841 def _check_publish1(self, res):
842 # this one uses the iterative API
844 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
845 d.addCallback(self.log, "check_publish1 got /")
846 d.addCallback(lambda root: root.get(u"subdir1"))
847 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
848 d.addCallback(lambda filenode: download_to_data(filenode))
849 d.addCallback(self.log, "get finished")
851 self.failUnlessEqual(data, self.data)
852 d.addCallback(_get_done)
855 def _check_publish2(self, res):
856 # this one uses the path-based API
857 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
858 d = rootnode.get_child_at_path(u"subdir1")
859 d.addCallback(lambda dirnode:
860 self.failUnless(IDirectoryNode.providedBy(dirnode)))
861 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
862 d.addCallback(lambda filenode: download_to_data(filenode))
863 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
865 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
866 def _got_filenode(filenode):
867 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
868 assert fnode == filenode
869 d.addCallback(_got_filenode)
872 def _check_publish_private(self, resnode):
873 # this one uses the path-based API
874 self._private_node = resnode
876 d = self._private_node.get_child_at_path(u"personal")
877 def _got_personal(personal):
878 self._personal_node = personal
880 d.addCallback(_got_personal)
882 d.addCallback(lambda dirnode:
883 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
885 return self._private_node.get_child_at_path(path)
887 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
888 d.addCallback(lambda filenode: download_to_data(filenode))
889 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
890 d.addCallback(lambda res: get_path(u"s2-rw"))
891 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
892 d.addCallback(lambda res: get_path(u"s2-ro"))
893 def _got_s2ro(dirnode):
894 self.failUnless(dirnode.is_mutable(), dirnode)
895 self.failUnless(dirnode.is_readonly(), dirnode)
896 d1 = defer.succeed(None)
897 d1.addCallback(lambda res: dirnode.list())
898 d1.addCallback(self.log, "dirnode.list")
900 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
902 d1.addCallback(self.log, "doing add_file(ro)")
903 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
904 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
906 d1.addCallback(self.log, "doing get(ro)")
907 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
908 d1.addCallback(lambda filenode:
909 self.failUnless(IFileNode.providedBy(filenode)))
911 d1.addCallback(self.log, "doing delete(ro)")
912 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
914 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
916 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
918 personal = self._personal_node
919 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
921 d1.addCallback(self.log, "doing move_child_to(ro)2")
922 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
924 d1.addCallback(self.log, "finished with _got_s2ro")
926 d.addCallback(_got_s2ro)
927 def _got_home(dummy):
928 home = self._private_node
929 personal = self._personal_node
930 d1 = defer.succeed(None)
931 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
932 d1.addCallback(lambda res:
933 personal.move_child_to(u"sekrit data",home,u"sekrit"))
935 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
936 d1.addCallback(lambda res:
937 home.move_child_to(u"sekrit", home, u"sekrit data"))
939 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
940 d1.addCallback(lambda res:
941 home.move_child_to(u"sekrit data", personal))
943 d1.addCallback(lambda res: home.build_manifest().when_done())
944 d1.addCallback(self.log, "manifest")
948 # P/personal/sekrit data
949 # P/s2-rw (same as P/s2-ro)
950 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
951 d1.addCallback(lambda res:
952 self.failUnlessEqual(len(res["manifest"]), 5))
953 d1.addCallback(lambda res: home.start_deep_stats().when_done())
954 def _check_stats(stats):
955 expected = {"count-immutable-files": 1,
956 "count-mutable-files": 0,
957 "count-literal-files": 1,
959 "count-directories": 3,
960 "size-immutable-files": 112,
961 "size-literal-files": 23,
962 #"size-directories": 616, # varies
963 #"largest-directory": 616,
964 "largest-directory-children": 3,
965 "largest-immutable-file": 112,
967 for k,v in expected.iteritems():
968 self.failUnlessEqual(stats[k], v,
969 "stats[%s] was %s, not %s" %
971 self.failUnless(stats["size-directories"] > 1300,
972 stats["size-directories"])
973 self.failUnless(stats["largest-directory"] > 800,
974 stats["largest-directory"])
975 self.failUnlessEqual(stats["size-files-histogram"],
976 [ (11, 31, 1), (101, 316, 1) ])
977 d1.addCallback(_check_stats)
979 d.addCallback(_got_home)
982 def shouldFail(self, res, expected_failure, which, substring=None):
983 if isinstance(res, Failure):
984 res.trap(expected_failure)
986 self.failUnless(substring in str(res),
987 "substring '%s' not in '%s'"
988 % (substring, str(res)))
990 self.fail("%s was supposed to raise %s, not get '%s'" %
991 (which, expected_failure, res))
993 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
994 assert substring is None or isinstance(substring, str)
995 d = defer.maybeDeferred(callable, *args, **kwargs)
997 if isinstance(res, Failure):
998 res.trap(expected_failure)
1000 self.failUnless(substring in str(res),
1001 "substring '%s' not in '%s'"
1002 % (substring, str(res)))
1004 self.fail("%s was supposed to raise %s, not get '%s'" %
1005 (which, expected_failure, res))
1009 def PUT(self, urlpath, data):
1010 url = self.webish_url + urlpath
1011 return getPage(url, method="PUT", postdata=data)
1013 def GET(self, urlpath, followRedirect=False):
1014 url = self.webish_url + urlpath
1015 return getPage(url, method="GET", followRedirect=followRedirect)
1017 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1018 sepbase = "boogabooga"
1019 sep = "--" + sepbase
1022 form.append('Content-Disposition: form-data; name="_charset"')
1024 form.append('UTF-8')
1026 for name, value in fields.iteritems():
1027 if isinstance(value, tuple):
1028 filename, value = value
1029 form.append('Content-Disposition: form-data; name="%s"; '
1030 'filename="%s"' % (name, filename.encode("utf-8")))
1032 form.append('Content-Disposition: form-data; name="%s"' % name)
1034 form.append(str(value))
1040 body = "\r\n".join(form) + "\r\n"
1041 headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1042 return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1044 def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1047 url = self.helper_webish_url + urlpath
1049 url = self.webish_url + urlpath
1050 return getPage(url, method="POST", postdata=body, headers=headers,
1051 followRedirect=followRedirect)
1053 def _test_web(self, res):
1054 base = self.webish_url
1055 public = "uri/" + self._root_directory_uri
1057 def _got_welcome(page):
1058 # XXX This test is oversensitive to formatting
1059 expected = "Connected to <span>%d</span>\n of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1060 self.failUnless(expected in page,
1061 "I didn't see the right 'connected storage servers'"
1062 " message in: %s" % page
1064 expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1065 self.failUnless(expected in page,
1066 "I didn't see the right 'My nodeid' message "
1068 self.failUnless("Helper: 0 active uploads" in page)
1069 d.addCallback(_got_welcome)
1070 d.addCallback(self.log, "done with _got_welcome")
1072 # get the welcome page from the node that uses the helper too
1073 d.addCallback(lambda res: getPage(self.helper_webish_url))
1074 def _got_welcome_helper(page):
1075 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1077 self.failUnless("Not running helper" in page)
1078 d.addCallback(_got_welcome_helper)
1080 d.addCallback(lambda res: getPage(base + public))
1081 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1082 def _got_subdir1(page):
1083 # there ought to be an href for our file
1084 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1085 self.failUnless(">mydata567</a>" in page)
1086 d.addCallback(_got_subdir1)
1087 d.addCallback(self.log, "done with _got_subdir1")
1088 d.addCallback(lambda res:
1089 getPage(base + public + "/subdir1/mydata567"))
1090 def _got_data(page):
1091 self.failUnlessEqual(page, self.data)
1092 d.addCallback(_got_data)
1094 # download from a URI embedded in a URL
1095 d.addCallback(self.log, "_get_from_uri")
1096 def _get_from_uri(res):
1097 return getPage(base + "uri/%s?filename=%s"
1098 % (self.uri, "mydata567"))
1099 d.addCallback(_get_from_uri)
1100 def _got_from_uri(page):
1101 self.failUnlessEqual(page, self.data)
1102 d.addCallback(_got_from_uri)
1104 # download from a URI embedded in a URL, second form
1105 d.addCallback(self.log, "_get_from_uri2")
1106 def _get_from_uri2(res):
1107 return getPage(base + "uri?uri=%s" % (self.uri,))
1108 d.addCallback(_get_from_uri2)
1109 d.addCallback(_got_from_uri)
1111 # download from a bogus URI, make sure we get a reasonable error
1112 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1113 def _get_from_bogus_uri(res):
1114 d1 = getPage(base + "uri/%s?filename=%s"
1115 % (self.mangle_uri(self.uri), "mydata567"))
1116 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1119 d.addCallback(_get_from_bogus_uri)
1120 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1122 # upload a file with PUT
1123 d.addCallback(self.log, "about to try PUT")
1124 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1125 "new.txt contents"))
1126 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1127 d.addCallback(self.failUnlessEqual, "new.txt contents")
1128 # and again with something large enough to use multiple segments,
1129 # and hopefully trigger pauseProducing too
1130 def _new_happy_semantics(ign):
1131 for c in self.clients:
1132 # these get reset somewhere? Whatever.
1133 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1134 d.addCallback(_new_happy_semantics)
1135 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1136 "big" * 500000)) # 1.5MB
1137 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1138 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1140 # can we replace files in place?
1141 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1143 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1144 d.addCallback(self.failUnlessEqual, "NEWER contents")
1146 # test unlinked POST
1147 d.addCallback(lambda res: self.POST("uri", t="upload",
1148 file=("new.txt", "data" * 10000)))
1149 # and again using the helper, which exercises different upload-status
1151 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1152 file=("foo.txt", "data2" * 10000)))
1154 # check that the status page exists
1155 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1156 def _got_status(res):
1157 # find an interesting upload and download to look at. LIT files
1158 # are not interesting.
1159 h = self.clients[0].get_history()
1160 for ds in h.list_all_download_statuses():
1161 if ds.get_size() > 200:
1162 self._down_status = ds.get_counter()
1163 for us in h.list_all_upload_statuses():
1164 if us.get_size() > 200:
1165 self._up_status = us.get_counter()
1166 rs = list(h.list_all_retrieve_statuses())[0]
1167 self._retrieve_status = rs.get_counter()
1168 ps = list(h.list_all_publish_statuses())[0]
1169 self._publish_status = ps.get_counter()
1170 us = list(h.list_all_mapupdate_statuses())[0]
1171 self._update_status = us.get_counter()
1173 # and that there are some upload- and download- status pages
1174 return self.GET("status/up-%d" % self._up_status)
1175 d.addCallback(_got_status)
1177 return self.GET("status/down-%d" % self._down_status)
1178 d.addCallback(_got_up)
1180 return self.GET("status/mapupdate-%d" % self._update_status)
1181 d.addCallback(_got_down)
1182 def _got_update(res):
1183 return self.GET("status/publish-%d" % self._publish_status)
1184 d.addCallback(_got_update)
1185 def _got_publish(res):
1186 return self.GET("status/retrieve-%d" % self._retrieve_status)
1187 d.addCallback(_got_publish)
1189 # check that the helper status page exists
1190 d.addCallback(lambda res:
1191 self.GET("helper_status", followRedirect=True))
1192 def _got_helper_status(res):
1193 self.failUnless("Bytes Fetched:" in res)
1194 # touch a couple of files in the helper's working directory to
1195 # exercise more code paths
1196 workdir = os.path.join(self.getdir("client0"), "helper")
1197 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1198 f = open(incfile, "wb")
1199 f.write("small file")
1201 then = time.time() - 86400*3
1203 os.utime(incfile, (now, then))
1204 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1205 f = open(encfile, "wb")
1206 f.write("less small file")
1208 os.utime(encfile, (now, then))
1209 d.addCallback(_got_helper_status)
1210 # and that the json form exists
1211 d.addCallback(lambda res:
1212 self.GET("helper_status?t=json", followRedirect=True))
1213 def _got_helper_status_json(res):
1214 data = simplejson.loads(res)
1215 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1217 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1218 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1219 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1221 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1222 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1223 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1225 d.addCallback(_got_helper_status_json)
1227 # and check that client[3] (which uses a helper but does not run one
1228 # itself) doesn't explode when you ask for its status
1229 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1230 def _got_non_helper_status(res):
1231 self.failUnless("Upload and Download Status" in res)
1232 d.addCallback(_got_non_helper_status)
1234 # or for helper status with t=json
1235 d.addCallback(lambda res:
1236 getPage(self.helper_webish_url + "helper_status?t=json"))
1237 def _got_non_helper_status_json(res):
1238 data = simplejson.loads(res)
1239 self.failUnlessEqual(data, {})
1240 d.addCallback(_got_non_helper_status_json)
1242 # see if the statistics page exists
1243 d.addCallback(lambda res: self.GET("statistics"))
1244 def _got_stats(res):
1245 self.failUnless("Node Statistics" in res)
1246 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1247 d.addCallback(_got_stats)
1248 d.addCallback(lambda res: self.GET("statistics?t=json"))
1249 def _got_stats_json(res):
1250 data = simplejson.loads(res)
1251 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1252 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1253 d.addCallback(_got_stats_json)
1255 # TODO: mangle the second segment of a file, to test errors that
1256 # occur after we've already sent some good data, which uses a
1257 # different error path.
1259 # TODO: download a URI with a form
1260 # TODO: create a directory by using a form
1261 # TODO: upload by using a form on the directory page
1262 # url = base + "somedir/subdir1/freeform_post!!upload"
1263 # TODO: delete a file by using a button on the directory page
1267 def _test_runner(self, res):
1268 # exercise some of the diagnostic tools in runner.py
1271 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1272 if "storage" not in dirpath:
1276 pieces = dirpath.split(os.sep)
1277 if (len(pieces) >= 4
1278 and pieces[-4] == "storage"
1279 and pieces[-3] == "shares"):
1280 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1281 # are sharefiles here
1282 filename = os.path.join(dirpath, filenames[0])
1283 # peek at the magic to see if it is a chk share
1284 magic = open(filename, "rb").read(4)
1285 if magic == '\x00\x00\x00\x01':
1288 self.fail("unable to find any uri_extension files in %s"
1290 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1292 out,err = StringIO(), StringIO()
1293 rc = runner.runner(["debug", "dump-share", "--offsets",
1295 stdout=out, stderr=err)
1296 output = out.getvalue()
1297 self.failUnlessEqual(rc, 0)
1299 # we only upload a single file, so we can assert some things about
1300 # its size and shares.
1301 self.failUnless(("share filename: %s" % filename) in output)
1302 self.failUnless("size: %d\n" % len(self.data) in output)
1303 self.failUnless("num_segments: 1\n" in output)
1304 # segment_size is always a multiple of needed_shares
1305 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1306 self.failUnless("total_shares: 10\n" in output)
1307 # keys which are supposed to be present
1308 for key in ("size", "num_segments", "segment_size",
1309 "needed_shares", "total_shares",
1310 "codec_name", "codec_params", "tail_codec_params",
1311 #"plaintext_hash", "plaintext_root_hash",
1312 "crypttext_hash", "crypttext_root_hash",
1313 "share_root_hash", "UEB_hash"):
1314 self.failUnless("%s: " % key in output, key)
1315 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1317 # now use its storage index to find the other shares using the
1318 # 'find-shares' tool
1319 sharedir, shnum = os.path.split(filename)
1320 storagedir, storage_index_s = os.path.split(sharedir)
1321 out,err = StringIO(), StringIO()
1322 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1323 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1324 rc = runner.runner(cmd, stdout=out, stderr=err)
1325 self.failUnlessEqual(rc, 0)
1327 sharefiles = [sfn.strip() for sfn in out.readlines()]
1328 self.failUnlessEqual(len(sharefiles), 10)
1330 # also exercise the 'catalog-shares' tool
1331 out,err = StringIO(), StringIO()
1332 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1333 cmd = ["debug", "catalog-shares"] + nodedirs
1334 rc = runner.runner(cmd, stdout=out, stderr=err)
1335 self.failUnlessEqual(rc, 0)
1337 descriptions = [sfn.strip() for sfn in out.readlines()]
1338 self.failUnlessEqual(len(descriptions), 30)
1340 for line in descriptions
1341 if line.startswith("CHK %s " % storage_index_s)]
1342 self.failUnlessEqual(len(matching), 10)
1344 def _test_control(self, res):
1345 # exercise the remote-control-the-client foolscap interfaces in
1346 # allmydata.control (mostly used for performance tests)
1347 c0 = self.clients[0]
1348 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1349 control_furl = open(control_furl_file, "r").read().strip()
1350 # it doesn't really matter which Tub we use to connect to the client,
1351 # so let's just use our IntroducerNode's
1352 d = self.introducer.tub.getReference(control_furl)
1353 d.addCallback(self._test_control2, control_furl_file)
1355 def _test_control2(self, rref, filename):
1356 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1357 downfile = os.path.join(self.basedir, "control.downfile")
1358 d.addCallback(lambda uri:
1359 rref.callRemote("download_from_uri_to_file",
1362 self.failUnlessEqual(res, downfile)
1363 data = open(downfile, "r").read()
1364 expected_data = open(filename, "r").read()
1365 self.failUnlessEqual(data, expected_data)
1366 d.addCallback(_check)
1367 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1368 if sys.platform == "linux2":
1369 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1370 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1373 def _test_cli(self, res):
1374 # run various CLI commands (in a thread, since they use blocking
1377 private_uri = self._private_node.get_uri()
1378 client0_basedir = self.getdir("client0")
1381 "--node-directory", client0_basedir,
1384 d = defer.succeed(None)
1386 # for compatibility with earlier versions, private/root_dir.cap is
1387 # supposed to be treated as an alias named "tahoe:". Start by making
1388 # sure that works, before we add other aliases.
1390 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1391 f = open(root_file, "w")
1392 f.write(private_uri)
1395 def run(ignored, verb, *args, **kwargs):
1396 stdin = kwargs.get("stdin", "")
1397 newargs = [verb] + nodeargs + list(args)
1398 return self._run_cli(newargs, stdin=stdin)
1400 def _check_ls((out,err), expected_children, unexpected_children=[]):
1401 self.failUnlessEqual(err, "")
1402 for s in expected_children:
1403 self.failUnless(s in out, (s,out))
1404 for s in unexpected_children:
1405 self.failIf(s in out, (s,out))
1407 def _check_ls_root((out,err)):
1408 self.failUnless("personal" in out)
1409 self.failUnless("s2-ro" in out)
1410 self.failUnless("s2-rw" in out)
1411 self.failUnlessEqual(err, "")
1413 # this should reference private_uri
1414 d.addCallback(run, "ls")
1415 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1417 d.addCallback(run, "list-aliases")
1418 def _check_aliases_1((out,err)):
1419 self.failUnlessEqual(err, "")
1420 self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1421 d.addCallback(_check_aliases_1)
1423 # now that that's out of the way, remove root_dir.cap and work with
1425 d.addCallback(lambda res: os.unlink(root_file))
1426 d.addCallback(run, "list-aliases")
1427 def _check_aliases_2((out,err)):
1428 self.failUnlessEqual(err, "")
1429 self.failUnlessEqual(out, "")
1430 d.addCallback(_check_aliases_2)
1432 d.addCallback(run, "mkdir")
1433 def _got_dir( (out,err) ):
1434 self.failUnless(uri.from_string_dirnode(out.strip()))
1436 d.addCallback(_got_dir)
1437 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1439 d.addCallback(run, "list-aliases")
1440 def _check_aliases_3((out,err)):
1441 self.failUnlessEqual(err, "")
1442 self.failUnless("tahoe: " in out)
1443 d.addCallback(_check_aliases_3)
1445 def _check_empty_dir((out,err)):
1446 self.failUnlessEqual(out, "")
1447 self.failUnlessEqual(err, "")
1448 d.addCallback(run, "ls")
1449 d.addCallback(_check_empty_dir)
1451 def _check_missing_dir((out,err)):
1452 # TODO: check that rc==2
1453 self.failUnlessEqual(out, "")
1454 self.failUnlessEqual(err, "No such file or directory\n")
1455 d.addCallback(run, "ls", "bogus")
1456 d.addCallback(_check_missing_dir)
1461 fn = os.path.join(self.basedir, "file%d" % i)
1463 data = "data to be uploaded: file%d\n" % i
1465 open(fn,"wb").write(data)
1467 def _check_stdout_against((out,err), filenum=None, data=None):
1468 self.failUnlessEqual(err, "")
1469 if filenum is not None:
1470 self.failUnlessEqual(out, datas[filenum])
1471 if data is not None:
1472 self.failUnlessEqual(out, data)
1474 # test all both forms of put: from a file, and from stdin
1476 d.addCallback(run, "put", files[0], "tahoe-file0")
1477 def _put_out((out,err)):
1478 self.failUnless("URI:LIT:" in out, out)
1479 self.failUnless("201 Created" in err, err)
1481 return run(None, "get", uri0)
1482 d.addCallback(_put_out)
1483 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1485 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1486 # tahoe put bar tahoe:FOO
1487 d.addCallback(run, "put", files[2], "tahoe:file2")
1488 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1489 def _check_put_mutable((out,err)):
1490 self._mutable_file3_uri = out.strip()
1491 d.addCallback(_check_put_mutable)
1492 d.addCallback(run, "get", "tahoe:file3")
1493 d.addCallback(_check_stdout_against, 3)
1496 STDIN_DATA = "This is the file to upload from stdin."
1497 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1498 # tahoe put tahoe:FOO
1499 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1500 stdin="Other file from stdin.")
1502 d.addCallback(run, "ls")
1503 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1504 "tahoe-file-stdin", "from-stdin"])
1505 d.addCallback(run, "ls", "subdir")
1506 d.addCallback(_check_ls, ["tahoe-file1"])
1509 d.addCallback(run, "mkdir", "subdir2")
1510 d.addCallback(run, "ls")
1511 # TODO: extract the URI, set an alias with it
1512 d.addCallback(_check_ls, ["subdir2"])
1514 # tahoe get: (to stdin and to a file)
1515 d.addCallback(run, "get", "tahoe-file0")
1516 d.addCallback(_check_stdout_against, 0)
1517 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1518 d.addCallback(_check_stdout_against, 1)
1519 outfile0 = os.path.join(self.basedir, "outfile0")
1520 d.addCallback(run, "get", "file2", outfile0)
1521 def _check_outfile0((out,err)):
1522 data = open(outfile0,"rb").read()
1523 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1524 d.addCallback(_check_outfile0)
1525 outfile1 = os.path.join(self.basedir, "outfile0")
1526 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1527 def _check_outfile1((out,err)):
1528 data = open(outfile1,"rb").read()
1529 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1530 d.addCallback(_check_outfile1)
1532 d.addCallback(run, "rm", "tahoe-file0")
1533 d.addCallback(run, "rm", "tahoe:file2")
1534 d.addCallback(run, "ls")
1535 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1537 d.addCallback(run, "ls", "-l")
1538 def _check_ls_l((out,err)):
1539 lines = out.split("\n")
1541 if "tahoe-file-stdin" in l:
1542 self.failUnless(l.startswith("-r-- "), l)
1543 self.failUnless(" %d " % len(STDIN_DATA) in l)
1545 self.failUnless(l.startswith("-rw- "), l) # mutable
1546 d.addCallback(_check_ls_l)
1548 d.addCallback(run, "ls", "--uri")
1549 def _check_ls_uri((out,err)):
1550 lines = out.split("\n")
1553 self.failUnless(self._mutable_file3_uri in l)
1554 d.addCallback(_check_ls_uri)
1556 d.addCallback(run, "ls", "--readonly-uri")
1557 def _check_ls_rouri((out,err)):
1558 lines = out.split("\n")
1561 rw_uri = self._mutable_file3_uri
1562 u = uri.from_string_mutable_filenode(rw_uri)
1563 ro_uri = u.get_readonly().to_string()
1564 self.failUnless(ro_uri in l)
1565 d.addCallback(_check_ls_rouri)
1568 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1569 d.addCallback(run, "ls")
1570 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1572 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1573 d.addCallback(run, "ls")
1574 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1576 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1577 d.addCallback(run, "ls")
1578 d.addCallback(_check_ls, ["file3", "file3-copy"])
1579 d.addCallback(run, "get", "tahoe:file3-copy")
1580 d.addCallback(_check_stdout_against, 3)
1582 # copy from disk into tahoe
1583 d.addCallback(run, "cp", files[4], "tahoe:file4")
1584 d.addCallback(run, "ls")
1585 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1586 d.addCallback(run, "get", "tahoe:file4")
1587 d.addCallback(_check_stdout_against, 4)
1589 # copy from tahoe into disk
1590 target_filename = os.path.join(self.basedir, "file-out")
1591 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1592 def _check_cp_out((out,err)):
1593 self.failUnless(os.path.exists(target_filename))
1594 got = open(target_filename,"rb").read()
1595 self.failUnlessEqual(got, datas[4])
1596 d.addCallback(_check_cp_out)
1598 # copy from disk to disk (silly case)
1599 target2_filename = os.path.join(self.basedir, "file-out-copy")
1600 d.addCallback(run, "cp", target_filename, target2_filename)
1601 def _check_cp_out2((out,err)):
1602 self.failUnless(os.path.exists(target2_filename))
1603 got = open(target2_filename,"rb").read()
1604 self.failUnlessEqual(got, datas[4])
1605 d.addCallback(_check_cp_out2)
1607 # copy from tahoe into disk, overwriting an existing file
1608 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1609 def _check_cp_out3((out,err)):
1610 self.failUnless(os.path.exists(target_filename))
1611 got = open(target_filename,"rb").read()
1612 self.failUnlessEqual(got, datas[3])
1613 d.addCallback(_check_cp_out3)
1615 # copy from disk into tahoe, overwriting an existing immutable file
1616 d.addCallback(run, "cp", files[5], "tahoe:file4")
1617 d.addCallback(run, "ls")
1618 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1619 d.addCallback(run, "get", "tahoe:file4")
1620 d.addCallback(_check_stdout_against, 5)
1622 # copy from disk into tahoe, overwriting an existing mutable file
1623 d.addCallback(run, "cp", files[5], "tahoe:file3")
1624 d.addCallback(run, "ls")
1625 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1626 d.addCallback(run, "get", "tahoe:file3")
1627 d.addCallback(_check_stdout_against, 5)
1629 # recursive copy: setup
1630 dn = os.path.join(self.basedir, "dir1")
1632 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1633 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1634 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1635 sdn2 = os.path.join(dn, "subdir2")
1637 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1638 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1640 # from disk into tahoe
1641 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1642 d.addCallback(run, "ls")
1643 d.addCallback(_check_ls, ["dir1"])
1644 d.addCallback(run, "ls", "dir1")
1645 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1646 ["rfile4", "rfile5"])
1647 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1648 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1649 ["rfile1", "rfile2", "rfile3"])
1650 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1651 d.addCallback(_check_stdout_against, data="rfile4")
1653 # and back out again
1654 dn_copy = os.path.join(self.basedir, "dir1-copy")
1655 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1656 def _check_cp_r_out((out,err)):
1658 old = open(os.path.join(dn, name), "rb").read()
1659 newfn = os.path.join(dn_copy, name)
1660 self.failUnless(os.path.exists(newfn))
1661 new = open(newfn, "rb").read()
1662 self.failUnlessEqual(old, new)
1666 _cmp(os.path.join("subdir2", "rfile4"))
1667 _cmp(os.path.join("subdir2", "rfile5"))
1668 d.addCallback(_check_cp_r_out)
1670 # and copy it a second time, which ought to overwrite the same files
1671 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1673 # and again, only writing filecaps
1674 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1675 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1676 def _check_capsonly((out,err)):
1677 # these should all be LITs
1678 x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1679 y = uri.from_string_filenode(x)
1680 self.failUnlessEqual(y.data, "rfile4")
1681 d.addCallback(_check_capsonly)
1683 # and tahoe-to-tahoe
1684 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1685 d.addCallback(run, "ls")
1686 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1687 d.addCallback(run, "ls", "dir1-copy")
1688 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1689 ["rfile4", "rfile5"])
1690 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1691 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1692 ["rfile1", "rfile2", "rfile3"])
1693 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1694 d.addCallback(_check_stdout_against, data="rfile4")
1696 # and copy it a second time, which ought to overwrite the same files
1697 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1699 # tahoe_ls doesn't currently handle the error correctly: it tries to
1700 # JSON-parse a traceback.
1701 ## def _ls_missing(res):
1702 ## argv = ["ls"] + nodeargs + ["bogus"]
1703 ## return self._run_cli(argv)
1704 ## d.addCallback(_ls_missing)
1705 ## def _check_ls_missing((out,err)):
1708 ## self.failUnlessEqual(err, "")
1709 ## d.addCallback(_check_ls_missing)
1713 def _run_cli(self, argv, stdin=""):
1715 stdout, stderr = StringIO(), StringIO()
1716 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1717 stdin=StringIO(stdin),
1718 stdout=stdout, stderr=stderr)
1720 return stdout.getvalue(), stderr.getvalue()
1721 d.addCallback(_done)
1724 def _test_checker(self, res):
1725 ut = upload.Data("too big to be literal" * 200, convergence=None)
1726 d = self._personal_node.add_file(u"big file", ut)
1728 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1729 def _check_dirnode_results(r):
1730 self.failUnless(r.is_healthy())
1731 d.addCallback(_check_dirnode_results)
1732 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1733 d.addCallback(_check_dirnode_results)
1735 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1736 def _got_chk_filenode(n):
1737 self.failUnless(isinstance(n, ImmutableFileNode))
1738 d = n.check(Monitor())
1739 def _check_filenode_results(r):
1740 self.failUnless(r.is_healthy())
1741 d.addCallback(_check_filenode_results)
1742 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1743 d.addCallback(_check_filenode_results)
1745 d.addCallback(_got_chk_filenode)
1747 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1748 def _got_lit_filenode(n):
1749 self.failUnless(isinstance(n, LiteralFileNode))
1750 d = n.check(Monitor())
1751 def _check_lit_filenode_results(r):
1752 self.failUnlessEqual(r, None)
1753 d.addCallback(_check_lit_filenode_results)
1754 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1755 d.addCallback(_check_lit_filenode_results)
1757 d.addCallback(_got_lit_filenode)