1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.literal import LiteralFileNode
13 from allmydata.immutable.filenode import ImmutableFileNode
14 from allmydata.util import idlib, mathutil
15 from allmydata.util import log, base32
16 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding
17 from allmydata.util.fileutil import abspath_expanduser_unicode
18 from allmydata.util.consumer import MemoryConsumer, download_to_data
19 from allmydata.scripts import runner
20 from allmydata.interfaces import IDirectoryNode, IFileNode, \
21 NoSuchChildError, NoSharesError
22 from allmydata.monitor import Monitor
23 from allmydata.mutable.common import NotWriteableError
24 from allmydata.mutable import layout as mutable_layout
25 from foolscap.api import DeadReferenceError
26 from twisted.python.failure import Failure
27 from twisted.web.client import getPage
28 from twisted.web.error import Error
30 from allmydata.test.common import SystemTestMixin
33 This is some data to publish to the remote grid.., which needs to be large
34 enough to not fit inside a LIT uri.
37 class CountingDataUploadable(upload.Data):
39 interrupt_after = None
40 interrupt_after_d = None
42 def read(self, length):
43 self.bytes_read += length
44 if self.interrupt_after is not None:
45 if self.bytes_read > self.interrupt_after:
46 self.interrupt_after = None
47 self.interrupt_after_d.callback(self)
48 return upload.Data.read(self, length)
50 class SystemTest(SystemTestMixin, unittest.TestCase):
51 timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
53 def test_connections(self):
54 self.basedir = "system/SystemTest/test_connections"
55 d = self.set_up_nodes()
56 self.extra_node = None
57 d.addCallback(lambda res: self.add_extra_node(self.numclients))
58 def _check(extra_node):
59 self.extra_node = extra_node
60 for c in self.clients:
61 all_peerids = c.get_storage_broker().get_all_serverids()
62 self.failUnlessEqual(len(all_peerids), self.numclients+1)
64 permuted_peers = sb.get_servers_for_index("a")
65 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
68 def _shutdown_extra_node(res):
70 return self.extra_node.stopService()
72 d.addBoth(_shutdown_extra_node)
74 # test_connections is subsumed by test_upload_and_download, and takes
75 # quite a while to run on a slow machine (because of all the TLS
76 # connections that must be established). If we ever rework the introducer
77 # code to such an extent that we're not sure if it works anymore, we can
78 # reinstate this test until it does.
81 def test_upload_and_download_random_key(self):
82 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
83 return self._test_upload_and_download(convergence=None)
85 def test_upload_and_download_convergent(self):
86 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
87 return self._test_upload_and_download(convergence="some convergence string")
89 def _test_upload_and_download(self, convergence):
90 # we use 4000 bytes of data, which will result in about 400k written
91 # to disk among all our simulated nodes
92 DATA = "Some data to upload\n" * 200
93 d = self.set_up_nodes()
94 def _check_connections(res):
95 for c in self.clients:
96 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
97 all_peerids = c.get_storage_broker().get_all_serverids()
98 self.failUnlessEqual(len(all_peerids), self.numclients)
100 permuted_peers = sb.get_servers_for_index("a")
101 self.failUnlessEqual(len(permuted_peers), self.numclients)
102 d.addCallback(_check_connections)
106 u = self.clients[0].getServiceNamed("uploader")
108 # we crank the max segsize down to 1024b for the duration of this
109 # test, so we can exercise multiple segments. It is important
110 # that this is not a multiple of the segment size, so that the
111 # tail segment is not the same length as the others. This actualy
112 # gets rounded up to 1025 to be a multiple of the number of
113 # required shares (since we use 25 out of 100 FEC).
114 up = upload.Data(DATA, convergence=convergence)
115 up.max_segment_size = 1024
118 d.addCallback(_do_upload)
119 def _upload_done(results):
121 log.msg("upload finished: uri is %s" % (theuri,))
123 assert isinstance(self.uri, str), self.uri
124 self.cap = uri.from_string(self.uri)
125 self.n = self.clients[1].create_node_from_uri(self.uri)
126 d.addCallback(_upload_done)
128 def _upload_again(res):
129 # Upload again. If using convergent encryption then this ought to be
130 # short-circuited, however with the way we currently generate URIs
131 # (i.e. because they include the roothash), we have to do all of the
132 # encoding work, and only get to save on the upload part.
133 log.msg("UPLOADING AGAIN")
134 up = upload.Data(DATA, convergence=convergence)
135 up.max_segment_size = 1024
136 return self.uploader.upload(up)
137 d.addCallback(_upload_again)
139 def _download_to_data(res):
140 log.msg("DOWNLOADING")
141 return download_to_data(self.n)
142 d.addCallback(_download_to_data)
143 def _download_to_data_done(data):
144 log.msg("download finished")
145 self.failUnlessEqual(data, DATA)
146 d.addCallback(_download_to_data_done)
149 n = self.clients[1].create_node_from_uri(self.uri)
150 d = download_to_data(n)
151 def _read_done(data):
152 self.failUnlessEqual(data, DATA)
153 d.addCallback(_read_done)
154 d.addCallback(lambda ign:
155 n.read(MemoryConsumer(), offset=1, size=4))
156 def _read_portion_done(mc):
157 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
158 d.addCallback(_read_portion_done)
159 d.addCallback(lambda ign:
160 n.read(MemoryConsumer(), offset=2, size=None))
161 def _read_tail_done(mc):
162 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
163 d.addCallback(_read_tail_done)
164 d.addCallback(lambda ign:
165 n.read(MemoryConsumer(), size=len(DATA)+1000))
166 def _read_too_much(mc):
167 self.failUnlessEqual("".join(mc.chunks), DATA)
168 d.addCallback(_read_too_much)
171 d.addCallback(_test_read)
173 def _test_bad_read(res):
174 bad_u = uri.from_string_filenode(self.uri)
175 bad_u.key = self.flip_bit(bad_u.key)
176 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
177 # this should cause an error during download
179 d = self.shouldFail2(NoSharesError, "'download bad node'",
181 bad_n.read, MemoryConsumer(), offset=2)
183 d.addCallback(_test_bad_read)
185 def _download_nonexistent_uri(res):
186 baduri = self.mangle_uri(self.uri)
187 badnode = self.clients[1].create_node_from_uri(baduri)
188 log.msg("about to download non-existent URI", level=log.UNUSUAL,
189 facility="tahoe.tests")
190 d1 = download_to_data(badnode)
191 def _baduri_should_fail(res):
192 log.msg("finished downloading non-existend URI",
193 level=log.UNUSUAL, facility="tahoe.tests")
194 self.failUnless(isinstance(res, Failure))
195 self.failUnless(res.check(NoSharesError),
196 "expected NoSharesError, got %s" % res)
197 d1.addBoth(_baduri_should_fail)
199 d.addCallback(_download_nonexistent_uri)
201 # add a new node, which doesn't accept shares, and only uses the
203 d.addCallback(lambda res: self.add_extra_node(self.numclients,
205 add_to_sparent=True))
206 def _added(extra_node):
207 self.extra_node = extra_node
208 self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
209 d.addCallback(_added)
211 HELPER_DATA = "Data that needs help to upload" * 1000
212 def _upload_with_helper(res):
213 u = upload.Data(HELPER_DATA, convergence=convergence)
214 d = self.extra_node.upload(u)
215 def _uploaded(results):
216 n = self.clients[1].create_node_from_uri(results.uri)
217 return download_to_data(n)
218 d.addCallback(_uploaded)
220 self.failUnlessEqual(newdata, HELPER_DATA)
221 d.addCallback(_check)
223 d.addCallback(_upload_with_helper)
225 def _upload_duplicate_with_helper(res):
226 u = upload.Data(HELPER_DATA, convergence=convergence)
227 u.debug_stash_RemoteEncryptedUploadable = True
228 d = self.extra_node.upload(u)
229 def _uploaded(results):
230 n = self.clients[1].create_node_from_uri(results.uri)
231 return download_to_data(n)
232 d.addCallback(_uploaded)
234 self.failUnlessEqual(newdata, HELPER_DATA)
235 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
236 "uploadable started uploading, should have been avoided")
237 d.addCallback(_check)
239 if convergence is not None:
240 d.addCallback(_upload_duplicate_with_helper)
242 def _upload_resumable(res):
243 DATA = "Data that needs help to upload and gets interrupted" * 1000
244 u1 = CountingDataUploadable(DATA, convergence=convergence)
245 u2 = CountingDataUploadable(DATA, convergence=convergence)
247 # we interrupt the connection after about 5kB by shutting down
248 # the helper, then restartingit.
249 u1.interrupt_after = 5000
250 u1.interrupt_after_d = defer.Deferred()
251 u1.interrupt_after_d.addCallback(lambda res:
252 self.bounce_client(0))
254 # sneak into the helper and reduce its chunk size, so that our
255 # debug_interrupt will sever the connection on about the fifth
256 # chunk fetched. This makes sure that we've started to write the
257 # new shares before we abandon them, which exercises the
258 # abort/delete-partial-share code. TODO: find a cleaner way to do
259 # this. I know that this will affect later uses of the helper in
260 # this same test run, but I'm not currently worried about it.
261 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
263 d = self.extra_node.upload(u1)
265 def _should_not_finish(res):
266 self.fail("interrupted upload should have failed, not finished"
267 " with result %s" % (res,))
269 f.trap(DeadReferenceError)
271 # make sure we actually interrupted it before finishing the
273 self.failUnless(u1.bytes_read < len(DATA),
274 "read %d out of %d total" % (u1.bytes_read,
277 log.msg("waiting for reconnect", level=log.NOISY,
278 facility="tahoe.test.test_system")
279 # now, we need to give the nodes a chance to notice that this
280 # connection has gone away. When this happens, the storage
281 # servers will be told to abort their uploads, removing the
282 # partial shares. Unfortunately this involves TCP messages
283 # going through the loopback interface, and we can't easily
284 # predict how long that will take. If it were all local, we
285 # could use fireEventually() to stall. Since we don't have
286 # the right introduction hooks, the best we can do is use a
287 # fixed delay. TODO: this is fragile.
288 u1.interrupt_after_d.addCallback(self.stall, 2.0)
289 return u1.interrupt_after_d
290 d.addCallbacks(_should_not_finish, _interrupted)
292 def _disconnected(res):
293 # check to make sure the storage servers aren't still hanging
294 # on to the partial share: their incoming/ directories should
296 log.msg("disconnected", level=log.NOISY,
297 facility="tahoe.test.test_system")
298 for i in range(self.numclients):
299 incdir = os.path.join(self.getdir("client%d" % i),
300 "storage", "shares", "incoming")
301 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
302 d.addCallback(_disconnected)
304 # then we need to give the reconnector a chance to
305 # reestablish the connection to the helper.
306 d.addCallback(lambda res:
307 log.msg("wait_for_connections", level=log.NOISY,
308 facility="tahoe.test.test_system"))
309 d.addCallback(lambda res: self.wait_for_connections())
312 d.addCallback(lambda res:
313 log.msg("uploading again", level=log.NOISY,
314 facility="tahoe.test.test_system"))
315 d.addCallback(lambda res: self.extra_node.upload(u2))
317 def _uploaded(results):
319 log.msg("Second upload complete", level=log.NOISY,
320 facility="tahoe.test.test_system")
322 # this is really bytes received rather than sent, but it's
323 # convenient and basically measures the same thing
324 bytes_sent = results.ciphertext_fetched
325 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
327 # We currently don't support resumption of upload if the data is
328 # encrypted with a random key. (Because that would require us
329 # to store the key locally and re-use it on the next upload of
330 # this file, which isn't a bad thing to do, but we currently
332 if convergence is not None:
333 # Make sure we did not have to read the whole file the
334 # second time around .
335 self.failUnless(bytes_sent < len(DATA),
336 "resumption didn't save us any work:"
337 " read %r bytes out of %r total" %
338 (bytes_sent, len(DATA)))
340 # Make sure we did have to read the whole file the second
341 # time around -- because the one that we partially uploaded
342 # earlier was encrypted with a different random key.
343 self.failIf(bytes_sent < len(DATA),
344 "resumption saved us some work even though we were using random keys:"
345 " read %r bytes out of %r total" %
346 (bytes_sent, len(DATA)))
347 n = self.clients[1].create_node_from_uri(cap)
348 return download_to_data(n)
349 d.addCallback(_uploaded)
352 self.failUnlessEqual(newdata, DATA)
353 # If using convergent encryption, then also check that the
354 # helper has removed the temp file from its directories.
355 if convergence is not None:
356 basedir = os.path.join(self.getdir("client0"), "helper")
357 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
358 self.failUnlessEqual(files, [])
359 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
360 self.failUnlessEqual(files, [])
361 d.addCallback(_check)
363 d.addCallback(_upload_resumable)
365 def _grab_stats(ignored):
366 # the StatsProvider doesn't normally publish a FURL:
367 # instead it passes a live reference to the StatsGatherer
368 # (if and when it connects). To exercise the remote stats
369 # interface, we manually publish client0's StatsProvider
370 # and use client1 to query it.
371 sp = self.clients[0].stats_provider
372 sp_furl = self.clients[0].tub.registerReference(sp)
373 d = self.clients[1].tub.getReference(sp_furl)
374 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
375 def _got_stats(stats):
377 #from pprint import pprint
380 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
381 c = stats["counters"]
382 self.failUnless("storage_server.allocate" in c)
383 d.addCallback(_got_stats)
385 d.addCallback(_grab_stats)
389 def _find_all_shares(self, basedir):
391 for (dirpath, dirnames, filenames) in os.walk(basedir):
392 if "storage" not in dirpath:
396 pieces = dirpath.split(os.sep)
398 and pieces[-4] == "storage"
399 and pieces[-3] == "shares"):
400 # we're sitting in .../storage/shares/$START/$SINDEX , and there
401 # are sharefiles here
402 assert pieces[-5].startswith("client")
403 client_num = int(pieces[-5][-1])
404 storage_index_s = pieces[-1]
405 storage_index = si_a2b(storage_index_s)
406 for sharename in filenames:
407 shnum = int(sharename)
408 filename = os.path.join(dirpath, sharename)
409 data = (client_num, storage_index, filename, shnum)
412 self.fail("unable to find any share files in %s" % basedir)
415 def _corrupt_mutable_share(self, filename, which):
416 msf = MutableShareFile(filename)
417 datav = msf.readv([ (0, 1000000) ])
418 final_share = datav[0]
419 assert len(final_share) < 1000000 # ought to be truncated
420 pieces = mutable_layout.unpack_share(final_share)
421 (seqnum, root_hash, IV, k, N, segsize, datalen,
422 verification_key, signature, share_hash_chain, block_hash_tree,
423 share_data, enc_privkey) = pieces
425 if which == "seqnum":
428 root_hash = self.flip_bit(root_hash)
430 IV = self.flip_bit(IV)
431 elif which == "segsize":
432 segsize = segsize + 15
433 elif which == "pubkey":
434 verification_key = self.flip_bit(verification_key)
435 elif which == "signature":
436 signature = self.flip_bit(signature)
437 elif which == "share_hash_chain":
438 nodenum = share_hash_chain.keys()[0]
439 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
440 elif which == "block_hash_tree":
441 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
442 elif which == "share_data":
443 share_data = self.flip_bit(share_data)
444 elif which == "encprivkey":
445 enc_privkey = self.flip_bit(enc_privkey)
447 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
449 final_share = mutable_layout.pack_share(prefix,
456 msf.writev( [(0, final_share)], None)
459 def test_mutable(self):
460 self.basedir = "system/SystemTest/test_mutable"
461 DATA = "initial contents go here." # 25 bytes % 3 != 0
462 NEWDATA = "new contents yay"
463 NEWERDATA = "this is getting old"
465 d = self.set_up_nodes(use_key_generator=True)
467 def _create_mutable(res):
469 log.msg("starting create_mutable_file")
470 d1 = c.create_mutable_file(DATA)
472 log.msg("DONE: %s" % (res,))
473 self._mutable_node_1 = res
474 d1.addCallback(_done)
476 d.addCallback(_create_mutable)
478 def _test_debug(res):
479 # find a share. It is important to run this while there is only
480 # one slot in the grid.
481 shares = self._find_all_shares(self.basedir)
482 (client_num, storage_index, filename, shnum) = shares[0]
483 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
485 log.msg(" for clients[%d]" % client_num)
487 out,err = StringIO(), StringIO()
488 rc = runner.runner(["debug", "dump-share", "--offsets",
490 stdout=out, stderr=err)
491 output = out.getvalue()
492 self.failUnlessEqual(rc, 0)
494 self.failUnless("Mutable slot found:\n" in output)
495 self.failUnless("share_type: SDMF\n" in output)
496 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
497 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
498 self.failUnless(" num_extra_leases: 0\n" in output)
499 self.failUnless(" secrets are for nodeid: %s\n" % peerid
501 self.failUnless(" SDMF contents:\n" in output)
502 self.failUnless(" seqnum: 1\n" in output)
503 self.failUnless(" required_shares: 3\n" in output)
504 self.failUnless(" total_shares: 10\n" in output)
505 self.failUnless(" segsize: 27\n" in output, (output, filename))
506 self.failUnless(" datalen: 25\n" in output)
507 # the exact share_hash_chain nodes depends upon the sharenum,
508 # and is more of a hassle to compute than I want to deal with
510 self.failUnless(" share_hash_chain: " in output)
511 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
512 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
513 base32.b2a(storage_index))
514 self.failUnless(expected in output)
515 except unittest.FailTest:
517 print "dump-share output was:"
520 d.addCallback(_test_debug)
524 # first, let's see if we can use the existing node to retrieve the
525 # contents. This allows it to use the cached pubkey and maybe the
526 # latest-known sharemap.
528 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
529 def _check_download_1(res):
530 self.failUnlessEqual(res, DATA)
531 # now we see if we can retrieve the data from a new node,
532 # constructed using the URI of the original one. We do this test
533 # on the same client that uploaded the data.
534 uri = self._mutable_node_1.get_uri()
535 log.msg("starting retrieve1")
536 newnode = self.clients[0].create_node_from_uri(uri)
537 newnode_2 = self.clients[0].create_node_from_uri(uri)
538 self.failUnlessIdentical(newnode, newnode_2)
539 return newnode.download_best_version()
540 d.addCallback(_check_download_1)
542 def _check_download_2(res):
543 self.failUnlessEqual(res, DATA)
544 # same thing, but with a different client
545 uri = self._mutable_node_1.get_uri()
546 newnode = self.clients[1].create_node_from_uri(uri)
547 log.msg("starting retrieve2")
548 d1 = newnode.download_best_version()
549 d1.addCallback(lambda res: (res, newnode))
551 d.addCallback(_check_download_2)
553 def _check_download_3((res, newnode)):
554 self.failUnlessEqual(res, DATA)
556 log.msg("starting replace1")
557 d1 = newnode.overwrite(NEWDATA)
558 d1.addCallback(lambda res: newnode.download_best_version())
560 d.addCallback(_check_download_3)
562 def _check_download_4(res):
563 self.failUnlessEqual(res, NEWDATA)
564 # now create an even newer node and replace the data on it. This
565 # new node has never been used for download before.
566 uri = self._mutable_node_1.get_uri()
567 newnode1 = self.clients[2].create_node_from_uri(uri)
568 newnode2 = self.clients[3].create_node_from_uri(uri)
569 self._newnode3 = self.clients[3].create_node_from_uri(uri)
570 log.msg("starting replace2")
571 d1 = newnode1.overwrite(NEWERDATA)
572 d1.addCallback(lambda res: newnode2.download_best_version())
574 d.addCallback(_check_download_4)
576 def _check_download_5(res):
577 log.msg("finished replace2")
578 self.failUnlessEqual(res, NEWERDATA)
579 d.addCallback(_check_download_5)
581 def _corrupt_shares(res):
582 # run around and flip bits in all but k of the shares, to test
584 shares = self._find_all_shares(self.basedir)
585 ## sort by share number
586 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
587 where = dict([ (shnum, filename)
588 for (client_num, storage_index, filename, shnum)
590 assert len(where) == 10 # this test is designed for 3-of-10
591 for shnum, filename in where.items():
592 # shares 7,8,9 are left alone. read will check
593 # (share_hash_chain, block_hash_tree, share_data). New
594 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
595 # segsize, signature).
597 # read: this will trigger "pubkey doesn't match
599 self._corrupt_mutable_share(filename, "pubkey")
600 self._corrupt_mutable_share(filename, "encprivkey")
602 # triggers "signature is invalid"
603 self._corrupt_mutable_share(filename, "seqnum")
605 # triggers "signature is invalid"
606 self._corrupt_mutable_share(filename, "R")
608 # triggers "signature is invalid"
609 self._corrupt_mutable_share(filename, "segsize")
611 self._corrupt_mutable_share(filename, "share_hash_chain")
613 self._corrupt_mutable_share(filename, "block_hash_tree")
615 self._corrupt_mutable_share(filename, "share_data")
616 # other things to correct: IV, signature
617 # 7,8,9 are left alone
619 # note that initial_query_count=5 means that we'll hit the
620 # first 5 servers in effectively random order (based upon
621 # response time), so we won't necessarily ever get a "pubkey
622 # doesn't match fingerprint" error (if we hit shnum>=1 before
623 # shnum=0, we pull the pubkey from there). To get repeatable
624 # specific failures, we need to set initial_query_count=1,
625 # but of course that will change the sequencing behavior of
626 # the retrieval process. TODO: find a reasonable way to make
627 # this a parameter, probably when we expand this test to test
628 # for one failure mode at a time.
630 # when we retrieve this, we should get three signature
631 # failures (where we've mangled seqnum, R, and segsize). The
633 d.addCallback(_corrupt_shares)
635 d.addCallback(lambda res: self._newnode3.download_best_version())
636 d.addCallback(_check_download_5)
638 def _check_empty_file(res):
639 # make sure we can create empty files, this usually screws up the
641 d1 = self.clients[2].create_mutable_file("")
642 d1.addCallback(lambda newnode: newnode.download_best_version())
643 d1.addCallback(lambda res: self.failUnlessEqual("", res))
645 d.addCallback(_check_empty_file)
647 d.addCallback(lambda res: self.clients[0].create_dirnode())
648 def _created_dirnode(dnode):
649 log.msg("_created_dirnode(%s)" % (dnode,))
651 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
652 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
653 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
654 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
655 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
656 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
657 d1.addCallback(lambda res: dnode.build_manifest().when_done())
658 d1.addCallback(lambda res:
659 self.failUnlessEqual(len(res["manifest"]), 1))
661 d.addCallback(_created_dirnode)
663 def wait_for_c3_kg_conn():
664 return self.clients[3]._key_generator is not None
665 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
667 def check_kg_poolsize(junk, size_delta):
668 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
669 self.key_generator_svc.key_generator.pool_size + size_delta)
671 d.addCallback(check_kg_poolsize, 0)
672 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
673 d.addCallback(check_kg_poolsize, -1)
674 d.addCallback(lambda junk: self.clients[3].create_dirnode())
675 d.addCallback(check_kg_poolsize, -2)
676 # use_helper induces use of clients[3], which is the using-key_gen client
677 d.addCallback(lambda junk:
678 self.POST("uri?t=mkdir&name=george", use_helper=True))
679 d.addCallback(check_kg_poolsize, -3)
683 def flip_bit(self, good):
684 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
686 def mangle_uri(self, gooduri):
687 # change the key, which changes the storage index, which means we'll
688 # be asking about the wrong file, so nobody will have any shares
689 u = uri.from_string(gooduri)
690 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
691 uri_extension_hash=u.uri_extension_hash,
692 needed_shares=u.needed_shares,
693 total_shares=u.total_shares,
695 return u2.to_string()
697 # TODO: add a test which mangles the uri_extension_hash instead, and
698 # should fail due to not being able to get a valid uri_extension block.
699 # Also a test which sneakily mangles the uri_extension block to change
700 # some of the validation data, so it will fail in the post-download phase
701 # when the file's crypttext integrity check fails. Do the same thing for
702 # the key, which should cause the download to fail the post-download
703 # plaintext_hash check.
705 def test_filesystem(self):
706 self.basedir = "system/SystemTest/test_filesystem"
707 self.data = LARGE_DATA
708 d = self.set_up_nodes(use_stats_gatherer=True)
709 def _new_happy_semantics(ign):
710 for c in self.clients:
711 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
712 d.addCallback(_new_happy_semantics)
713 d.addCallback(self._test_introweb)
714 d.addCallback(self.log, "starting publish")
715 d.addCallback(self._do_publish1)
716 d.addCallback(self._test_runner)
717 d.addCallback(self._do_publish2)
718 # at this point, we have the following filesystem (where "R" denotes
719 # self._root_directory_uri):
722 # R/subdir1/mydata567
724 # R/subdir1/subdir2/mydata992
726 d.addCallback(lambda res: self.bounce_client(0))
727 d.addCallback(self.log, "bounced client0")
729 d.addCallback(self._check_publish1)
730 d.addCallback(self.log, "did _check_publish1")
731 d.addCallback(self._check_publish2)
732 d.addCallback(self.log, "did _check_publish2")
733 d.addCallback(self._do_publish_private)
734 d.addCallback(self.log, "did _do_publish_private")
735 # now we also have (where "P" denotes a new dir):
736 # P/personal/sekrit data
737 # P/s2-rw -> /subdir1/subdir2/
738 # P/s2-ro -> /subdir1/subdir2/ (read-only)
739 d.addCallback(self._check_publish_private)
740 d.addCallback(self.log, "did _check_publish_private")
741 d.addCallback(self._test_web)
742 d.addCallback(self._test_control)
743 d.addCallback(self._test_cli)
744 # P now has four top-level children:
745 # P/personal/sekrit data
748 # P/test_put/ (empty)
749 d.addCallback(self._test_checker)
752 def _test_introweb(self, res):
753 d = getPage(self.introweb_url, method="GET", followRedirect=True)
756 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__)
758 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
759 self.failUnless("Subscription Summary: storage: 5" in res)
760 except unittest.FailTest:
762 print "GET %s output was:" % self.introweb_url
765 d.addCallback(_check)
766 d.addCallback(lambda res:
767 getPage(self.introweb_url + "?t=json",
768 method="GET", followRedirect=True))
769 def _check_json(res):
770 data = simplejson.loads(res)
772 self.failUnlessEqual(data["subscription_summary"],
774 self.failUnlessEqual(data["announcement_summary"],
775 {"storage": 5, "stub_client": 5})
776 self.failUnlessEqual(data["announcement_distinct_hosts"],
777 {"storage": 1, "stub_client": 1})
778 except unittest.FailTest:
780 print "GET %s?t=json output was:" % self.introweb_url
783 d.addCallback(_check_json)
786 def _do_publish1(self, res):
787 ut = upload.Data(self.data, convergence=None)
789 d = c0.create_dirnode()
790 def _made_root(new_dirnode):
791 self._root_directory_uri = new_dirnode.get_uri()
792 return c0.create_node_from_uri(self._root_directory_uri)
793 d.addCallback(_made_root)
794 d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
795 def _made_subdir1(subdir1_node):
796 self._subdir1_node = subdir1_node
797 d1 = subdir1_node.add_file(u"mydata567", ut)
798 d1.addCallback(self.log, "publish finished")
799 def _stash_uri(filenode):
800 self.uri = filenode.get_uri()
801 assert isinstance(self.uri, str), (self.uri, filenode)
802 d1.addCallback(_stash_uri)
804 d.addCallback(_made_subdir1)
807 def _do_publish2(self, res):
808 ut = upload.Data(self.data, convergence=None)
809 d = self._subdir1_node.create_subdirectory(u"subdir2")
810 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
813 def log(self, res, *args, **kwargs):
814 # print "MSG: %s RES: %s" % (msg, args)
815 log.msg(*args, **kwargs)
818 def _do_publish_private(self, res):
819 self.smalldata = "sssh, very secret stuff"
820 ut = upload.Data(self.smalldata, convergence=None)
821 d = self.clients[0].create_dirnode()
822 d.addCallback(self.log, "GOT private directory")
823 def _got_new_dir(privnode):
824 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
825 d1 = privnode.create_subdirectory(u"personal")
826 d1.addCallback(self.log, "made P/personal")
827 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
828 d1.addCallback(self.log, "made P/personal/sekrit data")
829 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
831 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
832 s2node.get_readonly_uri())
833 d2.addCallback(lambda node:
834 privnode.set_uri(u"s2-ro",
835 s2node.get_readonly_uri(),
836 s2node.get_readonly_uri()))
838 d1.addCallback(_got_s2)
839 d1.addCallback(lambda res: privnode)
841 d.addCallback(_got_new_dir)
844 def _check_publish1(self, res):
845 # this one uses the iterative API
847 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
848 d.addCallback(self.log, "check_publish1 got /")
849 d.addCallback(lambda root: root.get(u"subdir1"))
850 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
851 d.addCallback(lambda filenode: download_to_data(filenode))
852 d.addCallback(self.log, "get finished")
854 self.failUnlessEqual(data, self.data)
855 d.addCallback(_get_done)
858 def _check_publish2(self, res):
859 # this one uses the path-based API
860 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
861 d = rootnode.get_child_at_path(u"subdir1")
862 d.addCallback(lambda dirnode:
863 self.failUnless(IDirectoryNode.providedBy(dirnode)))
864 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
865 d.addCallback(lambda filenode: download_to_data(filenode))
866 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
868 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
869 def _got_filenode(filenode):
870 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
871 assert fnode == filenode
872 d.addCallback(_got_filenode)
875 def _check_publish_private(self, resnode):
876 # this one uses the path-based API
877 self._private_node = resnode
879 d = self._private_node.get_child_at_path(u"personal")
880 def _got_personal(personal):
881 self._personal_node = personal
883 d.addCallback(_got_personal)
885 d.addCallback(lambda dirnode:
886 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
888 return self._private_node.get_child_at_path(path)
890 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
891 d.addCallback(lambda filenode: download_to_data(filenode))
892 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
893 d.addCallback(lambda res: get_path(u"s2-rw"))
894 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
895 d.addCallback(lambda res: get_path(u"s2-ro"))
896 def _got_s2ro(dirnode):
897 self.failUnless(dirnode.is_mutable(), dirnode)
898 self.failUnless(dirnode.is_readonly(), dirnode)
899 d1 = defer.succeed(None)
900 d1.addCallback(lambda res: dirnode.list())
901 d1.addCallback(self.log, "dirnode.list")
903 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
905 d1.addCallback(self.log, "doing add_file(ro)")
906 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
907 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
909 d1.addCallback(self.log, "doing get(ro)")
910 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
911 d1.addCallback(lambda filenode:
912 self.failUnless(IFileNode.providedBy(filenode)))
914 d1.addCallback(self.log, "doing delete(ro)")
915 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
917 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
919 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
921 personal = self._personal_node
922 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
924 d1.addCallback(self.log, "doing move_child_to(ro)2")
925 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
927 d1.addCallback(self.log, "finished with _got_s2ro")
929 d.addCallback(_got_s2ro)
930 def _got_home(dummy):
931 home = self._private_node
932 personal = self._personal_node
933 d1 = defer.succeed(None)
934 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
935 d1.addCallback(lambda res:
936 personal.move_child_to(u"sekrit data",home,u"sekrit"))
938 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
939 d1.addCallback(lambda res:
940 home.move_child_to(u"sekrit", home, u"sekrit data"))
942 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
943 d1.addCallback(lambda res:
944 home.move_child_to(u"sekrit data", personal))
946 d1.addCallback(lambda res: home.build_manifest().when_done())
947 d1.addCallback(self.log, "manifest")
951 # P/personal/sekrit data
952 # P/s2-rw (same as P/s2-ro)
953 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
954 d1.addCallback(lambda res:
955 self.failUnlessEqual(len(res["manifest"]), 5))
956 d1.addCallback(lambda res: home.start_deep_stats().when_done())
957 def _check_stats(stats):
958 expected = {"count-immutable-files": 1,
959 "count-mutable-files": 0,
960 "count-literal-files": 1,
962 "count-directories": 3,
963 "size-immutable-files": 112,
964 "size-literal-files": 23,
965 #"size-directories": 616, # varies
966 #"largest-directory": 616,
967 "largest-directory-children": 3,
968 "largest-immutable-file": 112,
970 for k,v in expected.iteritems():
971 self.failUnlessEqual(stats[k], v,
972 "stats[%s] was %s, not %s" %
974 self.failUnless(stats["size-directories"] > 1300,
975 stats["size-directories"])
976 self.failUnless(stats["largest-directory"] > 800,
977 stats["largest-directory"])
978 self.failUnlessEqual(stats["size-files-histogram"],
979 [ (11, 31, 1), (101, 316, 1) ])
980 d1.addCallback(_check_stats)
982 d.addCallback(_got_home)
985 def shouldFail(self, res, expected_failure, which, substring=None):
986 if isinstance(res, Failure):
987 res.trap(expected_failure)
989 self.failUnless(substring in str(res),
990 "substring '%s' not in '%s'"
991 % (substring, str(res)))
993 self.fail("%s was supposed to raise %s, not get '%s'" %
994 (which, expected_failure, res))
996 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
997 assert substring is None or isinstance(substring, str)
998 d = defer.maybeDeferred(callable, *args, **kwargs)
1000 if isinstance(res, Failure):
1001 res.trap(expected_failure)
1003 self.failUnless(substring in str(res),
1004 "substring '%s' not in '%s'"
1005 % (substring, str(res)))
1007 self.fail("%s was supposed to raise %s, not get '%s'" %
1008 (which, expected_failure, res))
1012 def PUT(self, urlpath, data):
1013 url = self.webish_url + urlpath
1014 return getPage(url, method="PUT", postdata=data)
1016 def GET(self, urlpath, followRedirect=False):
1017 url = self.webish_url + urlpath
1018 return getPage(url, method="GET", followRedirect=followRedirect)
1020 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1021 sepbase = "boogabooga"
1022 sep = "--" + sepbase
1025 form.append('Content-Disposition: form-data; name="_charset"')
1027 form.append('UTF-8')
1029 for name, value in fields.iteritems():
1030 if isinstance(value, tuple):
1031 filename, value = value
1032 form.append('Content-Disposition: form-data; name="%s"; '
1033 'filename="%s"' % (name, filename.encode("utf-8")))
1035 form.append('Content-Disposition: form-data; name="%s"' % name)
1037 form.append(str(value))
1043 body = "\r\n".join(form) + "\r\n"
1044 headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1045 return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1047 def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1050 url = self.helper_webish_url + urlpath
1052 url = self.webish_url + urlpath
1053 return getPage(url, method="POST", postdata=body, headers=headers,
1054 followRedirect=followRedirect)
1056 def _test_web(self, res):
1057 base = self.webish_url
1058 public = "uri/" + self._root_directory_uri
1060 def _got_welcome(page):
1061 # XXX This test is oversensitive to formatting
1062 expected = "Connected to <span>%d</span>\n of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1063 self.failUnless(expected in page,
1064 "I didn't see the right 'connected storage servers'"
1065 " message in: %s" % page
1067 expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1068 self.failUnless(expected in page,
1069 "I didn't see the right 'My nodeid' message "
1071 self.failUnless("Helper: 0 active uploads" in page)
1072 d.addCallback(_got_welcome)
1073 d.addCallback(self.log, "done with _got_welcome")
1075 # get the welcome page from the node that uses the helper too
1076 d.addCallback(lambda res: getPage(self.helper_webish_url))
1077 def _got_welcome_helper(page):
1078 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1080 self.failUnless("Not running helper" in page)
1081 d.addCallback(_got_welcome_helper)
1083 d.addCallback(lambda res: getPage(base + public))
1084 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1085 def _got_subdir1(page):
1086 # there ought to be an href for our file
1087 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1088 self.failUnless(">mydata567</a>" in page)
1089 d.addCallback(_got_subdir1)
1090 d.addCallback(self.log, "done with _got_subdir1")
1091 d.addCallback(lambda res:
1092 getPage(base + public + "/subdir1/mydata567"))
1093 def _got_data(page):
1094 self.failUnlessEqual(page, self.data)
1095 d.addCallback(_got_data)
1097 # download from a URI embedded in a URL
1098 d.addCallback(self.log, "_get_from_uri")
1099 def _get_from_uri(res):
1100 return getPage(base + "uri/%s?filename=%s"
1101 % (self.uri, "mydata567"))
1102 d.addCallback(_get_from_uri)
1103 def _got_from_uri(page):
1104 self.failUnlessEqual(page, self.data)
1105 d.addCallback(_got_from_uri)
1107 # download from a URI embedded in a URL, second form
1108 d.addCallback(self.log, "_get_from_uri2")
1109 def _get_from_uri2(res):
1110 return getPage(base + "uri?uri=%s" % (self.uri,))
1111 d.addCallback(_get_from_uri2)
1112 d.addCallback(_got_from_uri)
1114 # download from a bogus URI, make sure we get a reasonable error
1115 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1116 def _get_from_bogus_uri(res):
1117 d1 = getPage(base + "uri/%s?filename=%s"
1118 % (self.mangle_uri(self.uri), "mydata567"))
1119 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1122 d.addCallback(_get_from_bogus_uri)
1123 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1125 # upload a file with PUT
1126 d.addCallback(self.log, "about to try PUT")
1127 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1128 "new.txt contents"))
1129 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1130 d.addCallback(self.failUnlessEqual, "new.txt contents")
1131 # and again with something large enough to use multiple segments,
1132 # and hopefully trigger pauseProducing too
1133 def _new_happy_semantics(ign):
1134 for c in self.clients:
1135 # these get reset somewhere? Whatever.
1136 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1137 d.addCallback(_new_happy_semantics)
1138 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1139 "big" * 500000)) # 1.5MB
1140 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1141 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1143 # can we replace files in place?
1144 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1146 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1147 d.addCallback(self.failUnlessEqual, "NEWER contents")
1149 # test unlinked POST
1150 d.addCallback(lambda res: self.POST("uri", t="upload",
1151 file=("new.txt", "data" * 10000)))
1152 # and again using the helper, which exercises different upload-status
1154 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1155 file=("foo.txt", "data2" * 10000)))
1157 # check that the status page exists
1158 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1159 def _got_status(res):
1160 # find an interesting upload and download to look at. LIT files
1161 # are not interesting.
1162 h = self.clients[0].get_history()
1163 for ds in h.list_all_download_statuses():
1164 if ds.get_size() > 200:
1165 self._down_status = ds.get_counter()
1166 for us in h.list_all_upload_statuses():
1167 if us.get_size() > 200:
1168 self._up_status = us.get_counter()
1169 rs = list(h.list_all_retrieve_statuses())[0]
1170 self._retrieve_status = rs.get_counter()
1171 ps = list(h.list_all_publish_statuses())[0]
1172 self._publish_status = ps.get_counter()
1173 us = list(h.list_all_mapupdate_statuses())[0]
1174 self._update_status = us.get_counter()
1176 # and that there are some upload- and download- status pages
1177 return self.GET("status/up-%d" % self._up_status)
1178 d.addCallback(_got_status)
1180 return self.GET("status/down-%d" % self._down_status)
1181 d.addCallback(_got_up)
1183 return self.GET("status/mapupdate-%d" % self._update_status)
1184 d.addCallback(_got_down)
1185 def _got_update(res):
1186 return self.GET("status/publish-%d" % self._publish_status)
1187 d.addCallback(_got_update)
1188 def _got_publish(res):
1189 return self.GET("status/retrieve-%d" % self._retrieve_status)
1190 d.addCallback(_got_publish)
1192 # check that the helper status page exists
1193 d.addCallback(lambda res:
1194 self.GET("helper_status", followRedirect=True))
1195 def _got_helper_status(res):
1196 self.failUnless("Bytes Fetched:" in res)
1197 # touch a couple of files in the helper's working directory to
1198 # exercise more code paths
1199 workdir = os.path.join(self.getdir("client0"), "helper")
1200 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1201 f = open(incfile, "wb")
1202 f.write("small file")
1204 then = time.time() - 86400*3
1206 os.utime(incfile, (now, then))
1207 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1208 f = open(encfile, "wb")
1209 f.write("less small file")
1211 os.utime(encfile, (now, then))
1212 d.addCallback(_got_helper_status)
1213 # and that the json form exists
1214 d.addCallback(lambda res:
1215 self.GET("helper_status?t=json", followRedirect=True))
1216 def _got_helper_status_json(res):
1217 data = simplejson.loads(res)
1218 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1220 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1221 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1222 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1224 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1225 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1226 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1228 d.addCallback(_got_helper_status_json)
1230 # and check that client[3] (which uses a helper but does not run one
1231 # itself) doesn't explode when you ask for its status
1232 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1233 def _got_non_helper_status(res):
1234 self.failUnless("Upload and Download Status" in res)
1235 d.addCallback(_got_non_helper_status)
1237 # or for helper status with t=json
1238 d.addCallback(lambda res:
1239 getPage(self.helper_webish_url + "helper_status?t=json"))
1240 def _got_non_helper_status_json(res):
1241 data = simplejson.loads(res)
1242 self.failUnlessEqual(data, {})
1243 d.addCallback(_got_non_helper_status_json)
1245 # see if the statistics page exists
1246 d.addCallback(lambda res: self.GET("statistics"))
1247 def _got_stats(res):
1248 self.failUnless("Node Statistics" in res)
1249 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1250 d.addCallback(_got_stats)
1251 d.addCallback(lambda res: self.GET("statistics?t=json"))
1252 def _got_stats_json(res):
1253 data = simplejson.loads(res)
1254 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1255 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1256 d.addCallback(_got_stats_json)
1258 # TODO: mangle the second segment of a file, to test errors that
1259 # occur after we've already sent some good data, which uses a
1260 # different error path.
1262 # TODO: download a URI with a form
1263 # TODO: create a directory by using a form
1264 # TODO: upload by using a form on the directory page
1265 # url = base + "somedir/subdir1/freeform_post!!upload"
1266 # TODO: delete a file by using a button on the directory page
1270 def _test_runner(self, res):
1271 # exercise some of the diagnostic tools in runner.py
1274 for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1275 if "storage" not in dirpath:
1279 pieces = dirpath.split(os.sep)
1280 if (len(pieces) >= 4
1281 and pieces[-4] == "storage"
1282 and pieces[-3] == "shares"):
1283 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1284 # are sharefiles here
1285 filename = os.path.join(dirpath, filenames[0])
1286 # peek at the magic to see if it is a chk share
1287 magic = open(filename, "rb").read(4)
1288 if magic == '\x00\x00\x00\x01':
1291 self.fail("unable to find any uri_extension files in %r"
1293 log.msg("test_system.SystemTest._test_runner using %r" % filename)
1295 out,err = StringIO(), StringIO()
1296 rc = runner.runner(["debug", "dump-share", "--offsets",
1297 unicode_to_argv(filename)],
1298 stdout=out, stderr=err)
1299 output = out.getvalue()
1300 self.failUnlessEqual(rc, 0)
1302 # we only upload a single file, so we can assert some things about
1303 # its size and shares.
1304 self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1305 self.failUnlessIn("size: %d\n" % len(self.data), output)
1306 self.failUnlessIn("num_segments: 1\n", output)
1307 # segment_size is always a multiple of needed_shares
1308 self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1309 self.failUnlessIn("total_shares: 10\n", output)
1310 # keys which are supposed to be present
1311 for key in ("size", "num_segments", "segment_size",
1312 "needed_shares", "total_shares",
1313 "codec_name", "codec_params", "tail_codec_params",
1314 #"plaintext_hash", "plaintext_root_hash",
1315 "crypttext_hash", "crypttext_root_hash",
1316 "share_root_hash", "UEB_hash"):
1317 self.failUnlessIn("%s: " % key, output)
1318 self.failUnlessIn(" verify-cap: URI:CHK-Verifier:", output)
1320 # now use its storage index to find the other shares using the
1321 # 'find-shares' tool
1322 sharedir, shnum = os.path.split(filename)
1323 storagedir, storage_index_s = os.path.split(sharedir)
1324 storage_index_s = str(storage_index_s)
1325 out,err = StringIO(), StringIO()
1326 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1327 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1328 rc = runner.runner(cmd, stdout=out, stderr=err)
1329 self.failUnlessEqual(rc, 0)
1331 sharefiles = [sfn.strip() for sfn in out.readlines()]
1332 self.failUnlessEqual(len(sharefiles), 10)
1334 # also exercise the 'catalog-shares' tool
1335 out,err = StringIO(), StringIO()
1336 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1337 cmd = ["debug", "catalog-shares"] + nodedirs
1338 rc = runner.runner(cmd, stdout=out, stderr=err)
1339 self.failUnlessEqual(rc, 0)
1341 descriptions = [sfn.strip() for sfn in out.readlines()]
1342 self.failUnlessEqual(len(descriptions), 30)
1344 for line in descriptions
1345 if line.startswith("CHK %s " % storage_index_s)]
1346 self.failUnlessEqual(len(matching), 10)
1348 def _test_control(self, res):
1349 # exercise the remote-control-the-client foolscap interfaces in
1350 # allmydata.control (mostly used for performance tests)
1351 c0 = self.clients[0]
1352 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1353 control_furl = open(control_furl_file, "r").read().strip()
1354 # it doesn't really matter which Tub we use to connect to the client,
1355 # so let's just use our IntroducerNode's
1356 d = self.introducer.tub.getReference(control_furl)
1357 d.addCallback(self._test_control2, control_furl_file)
1359 def _test_control2(self, rref, filename):
1360 d = rref.callRemote("upload_from_file_to_uri",
1361 filename.encode(get_filesystem_encoding()), convergence=None)
1362 downfile = os.path.join(self.basedir, "control.downfile").encode(get_filesystem_encoding())
1363 d.addCallback(lambda uri:
1364 rref.callRemote("download_from_uri_to_file",
1367 self.failUnlessEqual(res, downfile)
1368 data = open(downfile, "r").read()
1369 expected_data = open(filename, "r").read()
1370 self.failUnlessEqual(data, expected_data)
1371 d.addCallback(_check)
1372 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1373 if sys.platform == "linux2":
1374 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1375 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1378 def _test_cli(self, res):
1379 # run various CLI commands (in a thread, since they use blocking
1382 private_uri = self._private_node.get_uri()
1383 client0_basedir = self.getdir("client0")
1386 "--node-directory", client0_basedir,
1389 d = defer.succeed(None)
1391 # for compatibility with earlier versions, private/root_dir.cap is
1392 # supposed to be treated as an alias named "tahoe:". Start by making
1393 # sure that works, before we add other aliases.
1395 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1396 f = open(root_file, "w")
1397 f.write(private_uri)
1400 def run(ignored, verb, *args, **kwargs):
1401 stdin = kwargs.get("stdin", "")
1402 newargs = [verb] + nodeargs + list(args)
1403 return self._run_cli(newargs, stdin=stdin)
1405 def _check_ls((out,err), expected_children, unexpected_children=[]):
1406 self.failUnlessEqual(err, "")
1407 for s in expected_children:
1408 self.failUnless(s in out, (s,out))
1409 for s in unexpected_children:
1410 self.failIf(s in out, (s,out))
1412 def _check_ls_root((out,err)):
1413 self.failUnless("personal" in out)
1414 self.failUnless("s2-ro" in out)
1415 self.failUnless("s2-rw" in out)
1416 self.failUnlessEqual(err, "")
1418 # this should reference private_uri
1419 d.addCallback(run, "ls")
1420 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1422 d.addCallback(run, "list-aliases")
1423 def _check_aliases_1((out,err)):
1424 self.failUnlessEqual(err, "")
1425 self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1426 d.addCallback(_check_aliases_1)
1428 # now that that's out of the way, remove root_dir.cap and work with
1430 d.addCallback(lambda res: os.unlink(root_file))
1431 d.addCallback(run, "list-aliases")
1432 def _check_aliases_2((out,err)):
1433 self.failUnlessEqual(err, "")
1434 self.failUnlessEqual(out, "")
1435 d.addCallback(_check_aliases_2)
1437 d.addCallback(run, "mkdir")
1438 def _got_dir( (out,err) ):
1439 self.failUnless(uri.from_string_dirnode(out.strip()))
1441 d.addCallback(_got_dir)
1442 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1444 d.addCallback(run, "list-aliases")
1445 def _check_aliases_3((out,err)):
1446 self.failUnlessEqual(err, "")
1447 self.failUnless("tahoe: " in out)
1448 d.addCallback(_check_aliases_3)
1450 def _check_empty_dir((out,err)):
1451 self.failUnlessEqual(out, "")
1452 self.failUnlessEqual(err, "")
1453 d.addCallback(run, "ls")
1454 d.addCallback(_check_empty_dir)
1456 def _check_missing_dir((out,err)):
1457 # TODO: check that rc==2
1458 self.failUnlessEqual(out, "")
1459 self.failUnlessEqual(err, "No such file or directory\n")
1460 d.addCallback(run, "ls", "bogus")
1461 d.addCallback(_check_missing_dir)
1466 fn = os.path.join(self.basedir, "file%d" % i)
1468 data = "data to be uploaded: file%d\n" % i
1470 open(fn,"wb").write(data)
1472 def _check_stdout_against((out,err), filenum=None, data=None):
1473 self.failUnlessEqual(err, "")
1474 if filenum is not None:
1475 self.failUnlessEqual(out, datas[filenum])
1476 if data is not None:
1477 self.failUnlessEqual(out, data)
1479 # test all both forms of put: from a file, and from stdin
1481 d.addCallback(run, "put", files[0], "tahoe-file0")
1482 def _put_out((out,err)):
1483 self.failUnless("URI:LIT:" in out, out)
1484 self.failUnless("201 Created" in err, err)
1486 return run(None, "get", uri0)
1487 d.addCallback(_put_out)
1488 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1490 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1491 # tahoe put bar tahoe:FOO
1492 d.addCallback(run, "put", files[2], "tahoe:file2")
1493 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1494 def _check_put_mutable((out,err)):
1495 self._mutable_file3_uri = out.strip()
1496 d.addCallback(_check_put_mutable)
1497 d.addCallback(run, "get", "tahoe:file3")
1498 d.addCallback(_check_stdout_against, 3)
1501 STDIN_DATA = "This is the file to upload from stdin."
1502 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1503 # tahoe put tahoe:FOO
1504 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1505 stdin="Other file from stdin.")
1507 d.addCallback(run, "ls")
1508 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1509 "tahoe-file-stdin", "from-stdin"])
1510 d.addCallback(run, "ls", "subdir")
1511 d.addCallback(_check_ls, ["tahoe-file1"])
1514 d.addCallback(run, "mkdir", "subdir2")
1515 d.addCallback(run, "ls")
1516 # TODO: extract the URI, set an alias with it
1517 d.addCallback(_check_ls, ["subdir2"])
1519 # tahoe get: (to stdin and to a file)
1520 d.addCallback(run, "get", "tahoe-file0")
1521 d.addCallback(_check_stdout_against, 0)
1522 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1523 d.addCallback(_check_stdout_against, 1)
1524 outfile0 = os.path.join(self.basedir, "outfile0")
1525 d.addCallback(run, "get", "file2", outfile0)
1526 def _check_outfile0((out,err)):
1527 data = open(outfile0,"rb").read()
1528 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1529 d.addCallback(_check_outfile0)
1530 outfile1 = os.path.join(self.basedir, "outfile0")
1531 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1532 def _check_outfile1((out,err)):
1533 data = open(outfile1,"rb").read()
1534 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1535 d.addCallback(_check_outfile1)
1537 d.addCallback(run, "rm", "tahoe-file0")
1538 d.addCallback(run, "rm", "tahoe:file2")
1539 d.addCallback(run, "ls")
1540 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1542 d.addCallback(run, "ls", "-l")
1543 def _check_ls_l((out,err)):
1544 lines = out.split("\n")
1546 if "tahoe-file-stdin" in l:
1547 self.failUnless(l.startswith("-r-- "), l)
1548 self.failUnless(" %d " % len(STDIN_DATA) in l)
1550 self.failUnless(l.startswith("-rw- "), l) # mutable
1551 d.addCallback(_check_ls_l)
1553 d.addCallback(run, "ls", "--uri")
1554 def _check_ls_uri((out,err)):
1555 lines = out.split("\n")
1558 self.failUnless(self._mutable_file3_uri in l)
1559 d.addCallback(_check_ls_uri)
1561 d.addCallback(run, "ls", "--readonly-uri")
1562 def _check_ls_rouri((out,err)):
1563 lines = out.split("\n")
1566 rw_uri = self._mutable_file3_uri
1567 u = uri.from_string_mutable_filenode(rw_uri)
1568 ro_uri = u.get_readonly().to_string()
1569 self.failUnless(ro_uri in l)
1570 d.addCallback(_check_ls_rouri)
1573 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1574 d.addCallback(run, "ls")
1575 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1577 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1578 d.addCallback(run, "ls")
1579 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1581 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1582 d.addCallback(run, "ls")
1583 d.addCallback(_check_ls, ["file3", "file3-copy"])
1584 d.addCallback(run, "get", "tahoe:file3-copy")
1585 d.addCallback(_check_stdout_against, 3)
1587 # copy from disk into tahoe
1588 d.addCallback(run, "cp", files[4], "tahoe:file4")
1589 d.addCallback(run, "ls")
1590 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1591 d.addCallback(run, "get", "tahoe:file4")
1592 d.addCallback(_check_stdout_against, 4)
1594 # copy from tahoe into disk
1595 target_filename = os.path.join(self.basedir, "file-out")
1596 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1597 def _check_cp_out((out,err)):
1598 self.failUnless(os.path.exists(target_filename))
1599 got = open(target_filename,"rb").read()
1600 self.failUnlessEqual(got, datas[4])
1601 d.addCallback(_check_cp_out)
1603 # copy from disk to disk (silly case)
1604 target2_filename = os.path.join(self.basedir, "file-out-copy")
1605 d.addCallback(run, "cp", target_filename, target2_filename)
1606 def _check_cp_out2((out,err)):
1607 self.failUnless(os.path.exists(target2_filename))
1608 got = open(target2_filename,"rb").read()
1609 self.failUnlessEqual(got, datas[4])
1610 d.addCallback(_check_cp_out2)
1612 # copy from tahoe into disk, overwriting an existing file
1613 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1614 def _check_cp_out3((out,err)):
1615 self.failUnless(os.path.exists(target_filename))
1616 got = open(target_filename,"rb").read()
1617 self.failUnlessEqual(got, datas[3])
1618 d.addCallback(_check_cp_out3)
1620 # copy from disk into tahoe, overwriting an existing immutable file
1621 d.addCallback(run, "cp", files[5], "tahoe:file4")
1622 d.addCallback(run, "ls")
1623 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1624 d.addCallback(run, "get", "tahoe:file4")
1625 d.addCallback(_check_stdout_against, 5)
1627 # copy from disk into tahoe, overwriting an existing mutable file
1628 d.addCallback(run, "cp", files[5], "tahoe:file3")
1629 d.addCallback(run, "ls")
1630 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1631 d.addCallback(run, "get", "tahoe:file3")
1632 d.addCallback(_check_stdout_against, 5)
1634 # recursive copy: setup
1635 dn = os.path.join(self.basedir, "dir1")
1637 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1638 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1639 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1640 sdn2 = os.path.join(dn, "subdir2")
1642 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1643 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1645 # from disk into tahoe
1646 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1647 d.addCallback(run, "ls")
1648 d.addCallback(_check_ls, ["dir1"])
1649 d.addCallback(run, "ls", "dir1")
1650 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1651 ["rfile4", "rfile5"])
1652 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1653 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1654 ["rfile1", "rfile2", "rfile3"])
1655 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1656 d.addCallback(_check_stdout_against, data="rfile4")
1658 # and back out again
1659 dn_copy = os.path.join(self.basedir, "dir1-copy")
1660 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1661 def _check_cp_r_out((out,err)):
1663 old = open(os.path.join(dn, name), "rb").read()
1664 newfn = os.path.join(dn_copy, name)
1665 self.failUnless(os.path.exists(newfn))
1666 new = open(newfn, "rb").read()
1667 self.failUnlessEqual(old, new)
1671 _cmp(os.path.join("subdir2", "rfile4"))
1672 _cmp(os.path.join("subdir2", "rfile5"))
1673 d.addCallback(_check_cp_r_out)
1675 # and copy it a second time, which ought to overwrite the same files
1676 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1678 # and again, only writing filecaps
1679 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1680 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1681 def _check_capsonly((out,err)):
1682 # these should all be LITs
1683 x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1684 y = uri.from_string_filenode(x)
1685 self.failUnlessEqual(y.data, "rfile4")
1686 d.addCallback(_check_capsonly)
1688 # and tahoe-to-tahoe
1689 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1690 d.addCallback(run, "ls")
1691 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1692 d.addCallback(run, "ls", "dir1-copy")
1693 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1694 ["rfile4", "rfile5"])
1695 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1696 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1697 ["rfile1", "rfile2", "rfile3"])
1698 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1699 d.addCallback(_check_stdout_against, data="rfile4")
1701 # and copy it a second time, which ought to overwrite the same files
1702 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1704 # tahoe_ls doesn't currently handle the error correctly: it tries to
1705 # JSON-parse a traceback.
1706 ## def _ls_missing(res):
1707 ## argv = ["ls"] + nodeargs + ["bogus"]
1708 ## return self._run_cli(argv)
1709 ## d.addCallback(_ls_missing)
1710 ## def _check_ls_missing((out,err)):
1713 ## self.failUnlessEqual(err, "")
1714 ## d.addCallback(_check_ls_missing)
1718 def _run_cli(self, argv, stdin=""):
1720 stdout, stderr = StringIO(), StringIO()
1721 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1722 stdin=StringIO(stdin),
1723 stdout=stdout, stderr=stderr)
1725 return stdout.getvalue(), stderr.getvalue()
1726 d.addCallback(_done)
1729 def _test_checker(self, res):
1730 ut = upload.Data("too big to be literal" * 200, convergence=None)
1731 d = self._personal_node.add_file(u"big file", ut)
1733 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1734 def _check_dirnode_results(r):
1735 self.failUnless(r.is_healthy())
1736 d.addCallback(_check_dirnode_results)
1737 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1738 d.addCallback(_check_dirnode_results)
1740 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1741 def _got_chk_filenode(n):
1742 self.failUnless(isinstance(n, ImmutableFileNode))
1743 d = n.check(Monitor())
1744 def _check_filenode_results(r):
1745 self.failUnless(r.is_healthy())
1746 d.addCallback(_check_filenode_results)
1747 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1748 d.addCallback(_check_filenode_results)
1750 d.addCallback(_got_chk_filenode)
1752 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1753 def _got_lit_filenode(n):
1754 self.failUnless(isinstance(n, LiteralFileNode))
1755 d = n.check(Monitor())
1756 def _check_lit_filenode_results(r):
1757 self.failUnlessEqual(r, None)
1758 d.addCallback(_check_lit_filenode_results)
1759 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1760 d.addCallback(_check_lit_filenode_results)
1762 d.addCallback(_got_lit_filenode)