1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.literal import LiteralFileNode
13 from allmydata.immutable.filenode import ImmutableFileNode
14 from allmydata.util import idlib, mathutil
15 from allmydata.util import log, base32
16 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding
17 from allmydata.util.fileutil import abspath_expanduser_unicode
18 from allmydata.util.consumer import MemoryConsumer, download_to_data
19 from allmydata.scripts import runner
20 from allmydata.interfaces import IDirectoryNode, IFileNode, \
21 NoSuchChildError, NoSharesError
22 from allmydata.monitor import Monitor
23 from allmydata.mutable.common import NotWriteableError
24 from allmydata.mutable import layout as mutable_layout
25 from foolscap.api import DeadReferenceError
26 from twisted.python.failure import Failure
27 from twisted.web.client import getPage
28 from twisted.web.error import Error
30 from allmydata.test.common import SystemTestMixin
33 This is some data to publish to the remote grid.., which needs to be large
34 enough to not fit inside a LIT uri.
37 class CountingDataUploadable(upload.Data):
39 interrupt_after = None
40 interrupt_after_d = None
42 def read(self, length):
43 self.bytes_read += length
44 if self.interrupt_after is not None:
45 if self.bytes_read > self.interrupt_after:
46 self.interrupt_after = None
47 self.interrupt_after_d.callback(self)
48 return upload.Data.read(self, length)
50 class SystemTest(SystemTestMixin, unittest.TestCase):
51 timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
53 def test_connections(self):
54 self.basedir = "system/SystemTest/test_connections"
55 d = self.set_up_nodes()
56 self.extra_node = None
57 d.addCallback(lambda res: self.add_extra_node(self.numclients))
58 def _check(extra_node):
59 self.extra_node = extra_node
60 for c in self.clients:
61 all_peerids = c.get_storage_broker().get_all_serverids()
62 self.failUnlessEqual(len(all_peerids), self.numclients+1)
64 permuted_peers = sb.get_servers_for_index("a")
65 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
68 def _shutdown_extra_node(res):
70 return self.extra_node.stopService()
72 d.addBoth(_shutdown_extra_node)
74 # test_connections is subsumed by test_upload_and_download, and takes
75 # quite a while to run on a slow machine (because of all the TLS
76 # connections that must be established). If we ever rework the introducer
77 # code to such an extent that we're not sure if it works anymore, we can
78 # reinstate this test until it does.
81 def test_upload_and_download_random_key(self):
82 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
83 return self._test_upload_and_download(convergence=None)
85 def test_upload_and_download_convergent(self):
86 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
87 return self._test_upload_and_download(convergence="some convergence string")
89 def _test_upload_and_download(self, convergence):
90 # we use 4000 bytes of data, which will result in about 400k written
91 # to disk among all our simulated nodes
92 DATA = "Some data to upload\n" * 200
93 d = self.set_up_nodes()
94 def _check_connections(res):
95 for c in self.clients:
96 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
97 all_peerids = c.get_storage_broker().get_all_serverids()
98 self.failUnlessEqual(len(all_peerids), self.numclients)
100 permuted_peers = sb.get_servers_for_index("a")
101 self.failUnlessEqual(len(permuted_peers), self.numclients)
102 d.addCallback(_check_connections)
106 u = self.clients[0].getServiceNamed("uploader")
108 # we crank the max segsize down to 1024b for the duration of this
109 # test, so we can exercise multiple segments. It is important
110 # that this is not a multiple of the segment size, so that the
111 # tail segment is not the same length as the others. This actualy
112 # gets rounded up to 1025 to be a multiple of the number of
113 # required shares (since we use 25 out of 100 FEC).
114 up = upload.Data(DATA, convergence=convergence)
115 up.max_segment_size = 1024
118 d.addCallback(_do_upload)
119 def _upload_done(results):
121 log.msg("upload finished: uri is %s" % (theuri,))
123 assert isinstance(self.uri, str), self.uri
124 self.cap = uri.from_string(self.uri)
125 self.n = self.clients[1].create_node_from_uri(self.uri)
126 d.addCallback(_upload_done)
128 def _upload_again(res):
129 # Upload again. If using convergent encryption then this ought to be
130 # short-circuited, however with the way we currently generate URIs
131 # (i.e. because they include the roothash), we have to do all of the
132 # encoding work, and only get to save on the upload part.
133 log.msg("UPLOADING AGAIN")
134 up = upload.Data(DATA, convergence=convergence)
135 up.max_segment_size = 1024
136 return self.uploader.upload(up)
137 d.addCallback(_upload_again)
139 def _download_to_data(res):
140 log.msg("DOWNLOADING")
141 return download_to_data(self.n)
142 d.addCallback(_download_to_data)
143 def _download_to_data_done(data):
144 log.msg("download finished")
145 self.failUnlessEqual(data, DATA)
146 d.addCallback(_download_to_data_done)
149 n = self.clients[1].create_node_from_uri(self.uri)
150 d = download_to_data(n)
151 def _read_done(data):
152 self.failUnlessEqual(data, DATA)
153 d.addCallback(_read_done)
154 d.addCallback(lambda ign:
155 n.read(MemoryConsumer(), offset=1, size=4))
156 def _read_portion_done(mc):
157 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
158 d.addCallback(_read_portion_done)
159 d.addCallback(lambda ign:
160 n.read(MemoryConsumer(), offset=2, size=None))
161 def _read_tail_done(mc):
162 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
163 d.addCallback(_read_tail_done)
164 d.addCallback(lambda ign:
165 n.read(MemoryConsumer(), size=len(DATA)+1000))
166 def _read_too_much(mc):
167 self.failUnlessEqual("".join(mc.chunks), DATA)
168 d.addCallback(_read_too_much)
171 d.addCallback(_test_read)
173 def _test_bad_read(res):
174 bad_u = uri.from_string_filenode(self.uri)
175 bad_u.key = self.flip_bit(bad_u.key)
176 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
177 # this should cause an error during download
179 d = self.shouldFail2(NoSharesError, "'download bad node'",
181 bad_n.read, MemoryConsumer(), offset=2)
183 d.addCallback(_test_bad_read)
185 def _download_nonexistent_uri(res):
186 baduri = self.mangle_uri(self.uri)
187 badnode = self.clients[1].create_node_from_uri(baduri)
188 log.msg("about to download non-existent URI", level=log.UNUSUAL,
189 facility="tahoe.tests")
190 d1 = download_to_data(badnode)
191 def _baduri_should_fail(res):
192 log.msg("finished downloading non-existend URI",
193 level=log.UNUSUAL, facility="tahoe.tests")
194 self.failUnless(isinstance(res, Failure))
195 self.failUnless(res.check(NoSharesError),
196 "expected NoSharesError, got %s" % res)
197 d1.addBoth(_baduri_should_fail)
199 d.addCallback(_download_nonexistent_uri)
201 # add a new node, which doesn't accept shares, and only uses the
203 d.addCallback(lambda res: self.add_extra_node(self.numclients,
205 add_to_sparent=True))
206 def _added(extra_node):
207 self.extra_node = extra_node
208 self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
209 d.addCallback(_added)
211 HELPER_DATA = "Data that needs help to upload" * 1000
212 def _upload_with_helper(res):
213 u = upload.Data(HELPER_DATA, convergence=convergence)
214 d = self.extra_node.upload(u)
215 def _uploaded(results):
216 n = self.clients[1].create_node_from_uri(results.uri)
217 return download_to_data(n)
218 d.addCallback(_uploaded)
220 self.failUnlessEqual(newdata, HELPER_DATA)
221 d.addCallback(_check)
223 d.addCallback(_upload_with_helper)
225 def _upload_duplicate_with_helper(res):
226 u = upload.Data(HELPER_DATA, convergence=convergence)
227 u.debug_stash_RemoteEncryptedUploadable = True
228 d = self.extra_node.upload(u)
229 def _uploaded(results):
230 n = self.clients[1].create_node_from_uri(results.uri)
231 return download_to_data(n)
232 d.addCallback(_uploaded)
234 self.failUnlessEqual(newdata, HELPER_DATA)
235 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
236 "uploadable started uploading, should have been avoided")
237 d.addCallback(_check)
239 if convergence is not None:
240 d.addCallback(_upload_duplicate_with_helper)
242 def _upload_resumable(res):
243 DATA = "Data that needs help to upload and gets interrupted" * 1000
244 u1 = CountingDataUploadable(DATA, convergence=convergence)
245 u2 = CountingDataUploadable(DATA, convergence=convergence)
247 # we interrupt the connection after about 5kB by shutting down
248 # the helper, then restartingit.
249 u1.interrupt_after = 5000
250 u1.interrupt_after_d = defer.Deferred()
251 u1.interrupt_after_d.addCallback(lambda res:
252 self.bounce_client(0))
254 # sneak into the helper and reduce its chunk size, so that our
255 # debug_interrupt will sever the connection on about the fifth
256 # chunk fetched. This makes sure that we've started to write the
257 # new shares before we abandon them, which exercises the
258 # abort/delete-partial-share code. TODO: find a cleaner way to do
259 # this. I know that this will affect later uses of the helper in
260 # this same test run, but I'm not currently worried about it.
261 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
263 d = self.extra_node.upload(u1)
265 def _should_not_finish(res):
266 self.fail("interrupted upload should have failed, not finished"
267 " with result %s" % (res,))
269 f.trap(DeadReferenceError)
271 # make sure we actually interrupted it before finishing the
273 self.failUnless(u1.bytes_read < len(DATA),
274 "read %d out of %d total" % (u1.bytes_read,
277 log.msg("waiting for reconnect", level=log.NOISY,
278 facility="tahoe.test.test_system")
279 # now, we need to give the nodes a chance to notice that this
280 # connection has gone away. When this happens, the storage
281 # servers will be told to abort their uploads, removing the
282 # partial shares. Unfortunately this involves TCP messages
283 # going through the loopback interface, and we can't easily
284 # predict how long that will take. If it were all local, we
285 # could use fireEventually() to stall. Since we don't have
286 # the right introduction hooks, the best we can do is use a
287 # fixed delay. TODO: this is fragile.
288 u1.interrupt_after_d.addCallback(self.stall, 2.0)
289 return u1.interrupt_after_d
290 d.addCallbacks(_should_not_finish, _interrupted)
292 def _disconnected(res):
293 # check to make sure the storage servers aren't still hanging
294 # on to the partial share: their incoming/ directories should
296 log.msg("disconnected", level=log.NOISY,
297 facility="tahoe.test.test_system")
298 for i in range(self.numclients):
299 incdir = os.path.join(self.getdir("client%d" % i),
300 "storage", "shares", "incoming")
301 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
302 d.addCallback(_disconnected)
304 # then we need to give the reconnector a chance to
305 # reestablish the connection to the helper.
306 d.addCallback(lambda res:
307 log.msg("wait_for_connections", level=log.NOISY,
308 facility="tahoe.test.test_system"))
309 d.addCallback(lambda res: self.wait_for_connections())
312 d.addCallback(lambda res:
313 log.msg("uploading again", level=log.NOISY,
314 facility="tahoe.test.test_system"))
315 d.addCallback(lambda res: self.extra_node.upload(u2))
317 def _uploaded(results):
319 log.msg("Second upload complete", level=log.NOISY,
320 facility="tahoe.test.test_system")
322 # this is really bytes received rather than sent, but it's
323 # convenient and basically measures the same thing
324 bytes_sent = results.ciphertext_fetched
325 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
327 # We currently don't support resumption of upload if the data is
328 # encrypted with a random key. (Because that would require us
329 # to store the key locally and re-use it on the next upload of
330 # this file, which isn't a bad thing to do, but we currently
332 if convergence is not None:
333 # Make sure we did not have to read the whole file the
334 # second time around .
335 self.failUnless(bytes_sent < len(DATA),
336 "resumption didn't save us any work:"
337 " read %r bytes out of %r total" %
338 (bytes_sent, len(DATA)))
340 # Make sure we did have to read the whole file the second
341 # time around -- because the one that we partially uploaded
342 # earlier was encrypted with a different random key.
343 self.failIf(bytes_sent < len(DATA),
344 "resumption saved us some work even though we were using random keys:"
345 " read %r bytes out of %r total" %
346 (bytes_sent, len(DATA)))
347 n = self.clients[1].create_node_from_uri(cap)
348 return download_to_data(n)
349 d.addCallback(_uploaded)
352 self.failUnlessEqual(newdata, DATA)
353 # If using convergent encryption, then also check that the
354 # helper has removed the temp file from its directories.
355 if convergence is not None:
356 basedir = os.path.join(self.getdir("client0"), "helper")
357 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
358 self.failUnlessEqual(files, [])
359 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
360 self.failUnlessEqual(files, [])
361 d.addCallback(_check)
363 d.addCallback(_upload_resumable)
365 def _grab_stats(ignored):
366 # the StatsProvider doesn't normally publish a FURL:
367 # instead it passes a live reference to the StatsGatherer
368 # (if and when it connects). To exercise the remote stats
369 # interface, we manually publish client0's StatsProvider
370 # and use client1 to query it.
371 sp = self.clients[0].stats_provider
372 sp_furl = self.clients[0].tub.registerReference(sp)
373 d = self.clients[1].tub.getReference(sp_furl)
374 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
375 def _got_stats(stats):
377 #from pprint import pprint
380 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
381 c = stats["counters"]
382 self.failUnless("storage_server.allocate" in c)
383 d.addCallback(_got_stats)
385 d.addCallback(_grab_stats)
389 def _find_all_shares(self, basedir):
391 for (dirpath, dirnames, filenames) in os.walk(basedir):
392 if "storage" not in dirpath:
396 pieces = dirpath.split(os.sep)
398 and pieces[-4] == "storage"
399 and pieces[-3] == "shares"):
400 # we're sitting in .../storage/shares/$START/$SINDEX , and there
401 # are sharefiles here
402 assert pieces[-5].startswith("client")
403 client_num = int(pieces[-5][-1])
404 storage_index_s = pieces[-1]
405 storage_index = si_a2b(storage_index_s)
406 for sharename in filenames:
407 shnum = int(sharename)
408 filename = os.path.join(dirpath, sharename)
409 data = (client_num, storage_index, filename, shnum)
412 self.fail("unable to find any share files in %s" % basedir)
415 def _corrupt_mutable_share(self, filename, which):
416 msf = MutableShareFile(filename)
417 datav = msf.readv([ (0, 1000000) ])
418 final_share = datav[0]
419 assert len(final_share) < 1000000 # ought to be truncated
420 pieces = mutable_layout.unpack_share(final_share)
421 (seqnum, root_hash, IV, k, N, segsize, datalen,
422 verification_key, signature, share_hash_chain, block_hash_tree,
423 share_data, enc_privkey) = pieces
425 if which == "seqnum":
428 root_hash = self.flip_bit(root_hash)
430 IV = self.flip_bit(IV)
431 elif which == "segsize":
432 segsize = segsize + 15
433 elif which == "pubkey":
434 verification_key = self.flip_bit(verification_key)
435 elif which == "signature":
436 signature = self.flip_bit(signature)
437 elif which == "share_hash_chain":
438 nodenum = share_hash_chain.keys()[0]
439 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
440 elif which == "block_hash_tree":
441 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
442 elif which == "share_data":
443 share_data = self.flip_bit(share_data)
444 elif which == "encprivkey":
445 enc_privkey = self.flip_bit(enc_privkey)
447 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
449 final_share = mutable_layout.pack_share(prefix,
456 msf.writev( [(0, final_share)], None)
459 def test_mutable(self):
460 self.basedir = "system/SystemTest/test_mutable"
461 DATA = "initial contents go here." # 25 bytes % 3 != 0
462 NEWDATA = "new contents yay"
463 NEWERDATA = "this is getting old"
465 d = self.set_up_nodes(use_key_generator=True)
467 def _create_mutable(res):
469 log.msg("starting create_mutable_file")
470 d1 = c.create_mutable_file(DATA)
472 log.msg("DONE: %s" % (res,))
473 self._mutable_node_1 = res
474 d1.addCallback(_done)
476 d.addCallback(_create_mutable)
478 def _test_debug(res):
479 # find a share. It is important to run this while there is only
480 # one slot in the grid.
481 shares = self._find_all_shares(self.basedir)
482 (client_num, storage_index, filename, shnum) = shares[0]
483 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
485 log.msg(" for clients[%d]" % client_num)
487 out,err = StringIO(), StringIO()
488 rc = runner.runner(["debug", "dump-share", "--offsets",
490 stdout=out, stderr=err)
491 output = out.getvalue()
492 self.failUnlessEqual(rc, 0)
494 self.failUnless("Mutable slot found:\n" in output)
495 self.failUnless("share_type: SDMF\n" in output)
496 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
497 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
498 self.failUnless(" num_extra_leases: 0\n" in output)
499 self.failUnless(" secrets are for nodeid: %s\n" % peerid
501 self.failUnless(" SDMF contents:\n" in output)
502 self.failUnless(" seqnum: 1\n" in output)
503 self.failUnless(" required_shares: 3\n" in output)
504 self.failUnless(" total_shares: 10\n" in output)
505 self.failUnless(" segsize: 27\n" in output, (output, filename))
506 self.failUnless(" datalen: 25\n" in output)
507 # the exact share_hash_chain nodes depends upon the sharenum,
508 # and is more of a hassle to compute than I want to deal with
510 self.failUnless(" share_hash_chain: " in output)
511 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
512 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
513 base32.b2a(storage_index))
514 self.failUnless(expected in output)
515 except unittest.FailTest:
517 print "dump-share output was:"
520 d.addCallback(_test_debug)
524 # first, let's see if we can use the existing node to retrieve the
525 # contents. This allows it to use the cached pubkey and maybe the
526 # latest-known sharemap.
528 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
529 def _check_download_1(res):
530 self.failUnlessEqual(res, DATA)
531 # now we see if we can retrieve the data from a new node,
532 # constructed using the URI of the original one. We do this test
533 # on the same client that uploaded the data.
534 uri = self._mutable_node_1.get_uri()
535 log.msg("starting retrieve1")
536 newnode = self.clients[0].create_node_from_uri(uri)
537 newnode_2 = self.clients[0].create_node_from_uri(uri)
538 self.failUnlessIdentical(newnode, newnode_2)
539 return newnode.download_best_version()
540 d.addCallback(_check_download_1)
542 def _check_download_2(res):
543 self.failUnlessEqual(res, DATA)
544 # same thing, but with a different client
545 uri = self._mutable_node_1.get_uri()
546 newnode = self.clients[1].create_node_from_uri(uri)
547 log.msg("starting retrieve2")
548 d1 = newnode.download_best_version()
549 d1.addCallback(lambda res: (res, newnode))
551 d.addCallback(_check_download_2)
553 def _check_download_3((res, newnode)):
554 self.failUnlessEqual(res, DATA)
556 log.msg("starting replace1")
557 d1 = newnode.overwrite(NEWDATA)
558 d1.addCallback(lambda res: newnode.download_best_version())
560 d.addCallback(_check_download_3)
562 def _check_download_4(res):
563 self.failUnlessEqual(res, NEWDATA)
564 # now create an even newer node and replace the data on it. This
565 # new node has never been used for download before.
566 uri = self._mutable_node_1.get_uri()
567 newnode1 = self.clients[2].create_node_from_uri(uri)
568 newnode2 = self.clients[3].create_node_from_uri(uri)
569 self._newnode3 = self.clients[3].create_node_from_uri(uri)
570 log.msg("starting replace2")
571 d1 = newnode1.overwrite(NEWERDATA)
572 d1.addCallback(lambda res: newnode2.download_best_version())
574 d.addCallback(_check_download_4)
576 def _check_download_5(res):
577 log.msg("finished replace2")
578 self.failUnlessEqual(res, NEWERDATA)
579 d.addCallback(_check_download_5)
581 def _corrupt_shares(res):
582 # run around and flip bits in all but k of the shares, to test
584 shares = self._find_all_shares(self.basedir)
585 ## sort by share number
586 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
587 where = dict([ (shnum, filename)
588 for (client_num, storage_index, filename, shnum)
590 assert len(where) == 10 # this test is designed for 3-of-10
591 for shnum, filename in where.items():
592 # shares 7,8,9 are left alone. read will check
593 # (share_hash_chain, block_hash_tree, share_data). New
594 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
595 # segsize, signature).
597 # read: this will trigger "pubkey doesn't match
599 self._corrupt_mutable_share(filename, "pubkey")
600 self._corrupt_mutable_share(filename, "encprivkey")
602 # triggers "signature is invalid"
603 self._corrupt_mutable_share(filename, "seqnum")
605 # triggers "signature is invalid"
606 self._corrupt_mutable_share(filename, "R")
608 # triggers "signature is invalid"
609 self._corrupt_mutable_share(filename, "segsize")
611 self._corrupt_mutable_share(filename, "share_hash_chain")
613 self._corrupt_mutable_share(filename, "block_hash_tree")
615 self._corrupt_mutable_share(filename, "share_data")
616 # other things to correct: IV, signature
617 # 7,8,9 are left alone
619 # note that initial_query_count=5 means that we'll hit the
620 # first 5 servers in effectively random order (based upon
621 # response time), so we won't necessarily ever get a "pubkey
622 # doesn't match fingerprint" error (if we hit shnum>=1 before
623 # shnum=0, we pull the pubkey from there). To get repeatable
624 # specific failures, we need to set initial_query_count=1,
625 # but of course that will change the sequencing behavior of
626 # the retrieval process. TODO: find a reasonable way to make
627 # this a parameter, probably when we expand this test to test
628 # for one failure mode at a time.
630 # when we retrieve this, we should get three signature
631 # failures (where we've mangled seqnum, R, and segsize). The
633 d.addCallback(_corrupt_shares)
635 d.addCallback(lambda res: self._newnode3.download_best_version())
636 d.addCallback(_check_download_5)
638 def _check_empty_file(res):
639 # make sure we can create empty files, this usually screws up the
641 d1 = self.clients[2].create_mutable_file("")
642 d1.addCallback(lambda newnode: newnode.download_best_version())
643 d1.addCallback(lambda res: self.failUnlessEqual("", res))
645 d.addCallback(_check_empty_file)
647 d.addCallback(lambda res: self.clients[0].create_dirnode())
648 def _created_dirnode(dnode):
649 log.msg("_created_dirnode(%s)" % (dnode,))
651 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
652 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
653 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
654 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
655 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
656 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
657 d1.addCallback(lambda res: dnode.build_manifest().when_done())
658 d1.addCallback(lambda res:
659 self.failUnlessEqual(len(res["manifest"]), 1))
661 d.addCallback(_created_dirnode)
663 def wait_for_c3_kg_conn():
664 return self.clients[3]._key_generator is not None
665 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
667 def check_kg_poolsize(junk, size_delta):
668 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
669 self.key_generator_svc.key_generator.pool_size + size_delta)
671 d.addCallback(check_kg_poolsize, 0)
672 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
673 d.addCallback(check_kg_poolsize, -1)
674 d.addCallback(lambda junk: self.clients[3].create_dirnode())
675 d.addCallback(check_kg_poolsize, -2)
676 # use_helper induces use of clients[3], which is the using-key_gen client
677 d.addCallback(lambda junk:
678 self.POST("uri?t=mkdir&name=george", use_helper=True))
679 d.addCallback(check_kg_poolsize, -3)
683 def flip_bit(self, good):
684 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
686 def mangle_uri(self, gooduri):
687 # change the key, which changes the storage index, which means we'll
688 # be asking about the wrong file, so nobody will have any shares
689 u = uri.from_string(gooduri)
690 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
691 uri_extension_hash=u.uri_extension_hash,
692 needed_shares=u.needed_shares,
693 total_shares=u.total_shares,
695 return u2.to_string()
697 # TODO: add a test which mangles the uri_extension_hash instead, and
698 # should fail due to not being able to get a valid uri_extension block.
699 # Also a test which sneakily mangles the uri_extension block to change
700 # some of the validation data, so it will fail in the post-download phase
701 # when the file's crypttext integrity check fails. Do the same thing for
702 # the key, which should cause the download to fail the post-download
703 # plaintext_hash check.
705 def test_filesystem(self):
706 self.basedir = "system/SystemTest/test_filesystem"
707 self.data = LARGE_DATA
708 d = self.set_up_nodes(use_stats_gatherer=True)
709 def _new_happy_semantics(ign):
710 for c in self.clients:
711 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
712 d.addCallback(_new_happy_semantics)
713 d.addCallback(self._test_introweb)
714 d.addCallback(self.log, "starting publish")
715 d.addCallback(self._do_publish1)
716 d.addCallback(self._test_runner)
717 d.addCallback(self._do_publish2)
718 # at this point, we have the following filesystem (where "R" denotes
719 # self._root_directory_uri):
722 # R/subdir1/mydata567
724 # R/subdir1/subdir2/mydata992
726 d.addCallback(lambda res: self.bounce_client(0))
727 d.addCallback(self.log, "bounced client0")
729 d.addCallback(self._check_publish1)
730 d.addCallback(self.log, "did _check_publish1")
731 d.addCallback(self._check_publish2)
732 d.addCallback(self.log, "did _check_publish2")
733 d.addCallback(self._do_publish_private)
734 d.addCallback(self.log, "did _do_publish_private")
735 # now we also have (where "P" denotes a new dir):
736 # P/personal/sekrit data
737 # P/s2-rw -> /subdir1/subdir2/
738 # P/s2-ro -> /subdir1/subdir2/ (read-only)
739 d.addCallback(self._check_publish_private)
740 d.addCallback(self.log, "did _check_publish_private")
741 d.addCallback(self._test_web)
742 d.addCallback(self._test_control)
743 d.addCallback(self._test_cli)
744 # P now has four top-level children:
745 # P/personal/sekrit data
748 # P/test_put/ (empty)
749 d.addCallback(self._test_checker)
752 def _test_introweb(self, res):
753 d = getPage(self.introweb_url, method="GET", followRedirect=True)
756 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res)
757 verstr = str(allmydata.__version__)
759 # The Python "rational version numbering" convention
760 # disallows "-r$REV" but allows ".post$REV"
761 # instead. Eventually we'll probably move to
762 # that. When we do, this test won't go red:
763 ix = verstr.rfind('-r')
765 altverstr = verstr[:ix] + '.post' + verstr[ix+2:]
767 ix = verstr.rfind('.post')
769 altverstr = verstr[:ix] + '-r' + verstr[ix+5:]
773 appverstr = "%s: %s" % (allmydata.__appname__, verstr)
774 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
776 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
777 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
778 self.failUnless("Subscription Summary: storage: 5" in res)
779 except unittest.FailTest:
781 print "GET %s output was:" % self.introweb_url
784 d.addCallback(_check)
785 d.addCallback(lambda res:
786 getPage(self.introweb_url + "?t=json",
787 method="GET", followRedirect=True))
788 def _check_json(res):
789 data = simplejson.loads(res)
791 self.failUnlessEqual(data["subscription_summary"],
793 self.failUnlessEqual(data["announcement_summary"],
794 {"storage": 5, "stub_client": 5})
795 self.failUnlessEqual(data["announcement_distinct_hosts"],
796 {"storage": 1, "stub_client": 1})
797 except unittest.FailTest:
799 print "GET %s?t=json output was:" % self.introweb_url
802 d.addCallback(_check_json)
805 def _do_publish1(self, res):
806 ut = upload.Data(self.data, convergence=None)
808 d = c0.create_dirnode()
809 def _made_root(new_dirnode):
810 self._root_directory_uri = new_dirnode.get_uri()
811 return c0.create_node_from_uri(self._root_directory_uri)
812 d.addCallback(_made_root)
813 d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
814 def _made_subdir1(subdir1_node):
815 self._subdir1_node = subdir1_node
816 d1 = subdir1_node.add_file(u"mydata567", ut)
817 d1.addCallback(self.log, "publish finished")
818 def _stash_uri(filenode):
819 self.uri = filenode.get_uri()
820 assert isinstance(self.uri, str), (self.uri, filenode)
821 d1.addCallback(_stash_uri)
823 d.addCallback(_made_subdir1)
826 def _do_publish2(self, res):
827 ut = upload.Data(self.data, convergence=None)
828 d = self._subdir1_node.create_subdirectory(u"subdir2")
829 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
832 def log(self, res, *args, **kwargs):
833 # print "MSG: %s RES: %s" % (msg, args)
834 log.msg(*args, **kwargs)
837 def _do_publish_private(self, res):
838 self.smalldata = "sssh, very secret stuff"
839 ut = upload.Data(self.smalldata, convergence=None)
840 d = self.clients[0].create_dirnode()
841 d.addCallback(self.log, "GOT private directory")
842 def _got_new_dir(privnode):
843 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
844 d1 = privnode.create_subdirectory(u"personal")
845 d1.addCallback(self.log, "made P/personal")
846 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
847 d1.addCallback(self.log, "made P/personal/sekrit data")
848 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
850 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
851 s2node.get_readonly_uri())
852 d2.addCallback(lambda node:
853 privnode.set_uri(u"s2-ro",
854 s2node.get_readonly_uri(),
855 s2node.get_readonly_uri()))
857 d1.addCallback(_got_s2)
858 d1.addCallback(lambda res: privnode)
860 d.addCallback(_got_new_dir)
863 def _check_publish1(self, res):
864 # this one uses the iterative API
866 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
867 d.addCallback(self.log, "check_publish1 got /")
868 d.addCallback(lambda root: root.get(u"subdir1"))
869 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
870 d.addCallback(lambda filenode: download_to_data(filenode))
871 d.addCallback(self.log, "get finished")
873 self.failUnlessEqual(data, self.data)
874 d.addCallback(_get_done)
877 def _check_publish2(self, res):
878 # this one uses the path-based API
879 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
880 d = rootnode.get_child_at_path(u"subdir1")
881 d.addCallback(lambda dirnode:
882 self.failUnless(IDirectoryNode.providedBy(dirnode)))
883 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
884 d.addCallback(lambda filenode: download_to_data(filenode))
885 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
887 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
888 def _got_filenode(filenode):
889 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
890 assert fnode == filenode
891 d.addCallback(_got_filenode)
894 def _check_publish_private(self, resnode):
895 # this one uses the path-based API
896 self._private_node = resnode
898 d = self._private_node.get_child_at_path(u"personal")
899 def _got_personal(personal):
900 self._personal_node = personal
902 d.addCallback(_got_personal)
904 d.addCallback(lambda dirnode:
905 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
907 return self._private_node.get_child_at_path(path)
909 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
910 d.addCallback(lambda filenode: download_to_data(filenode))
911 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
912 d.addCallback(lambda res: get_path(u"s2-rw"))
913 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
914 d.addCallback(lambda res: get_path(u"s2-ro"))
915 def _got_s2ro(dirnode):
916 self.failUnless(dirnode.is_mutable(), dirnode)
917 self.failUnless(dirnode.is_readonly(), dirnode)
918 d1 = defer.succeed(None)
919 d1.addCallback(lambda res: dirnode.list())
920 d1.addCallback(self.log, "dirnode.list")
922 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
924 d1.addCallback(self.log, "doing add_file(ro)")
925 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
926 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
928 d1.addCallback(self.log, "doing get(ro)")
929 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
930 d1.addCallback(lambda filenode:
931 self.failUnless(IFileNode.providedBy(filenode)))
933 d1.addCallback(self.log, "doing delete(ro)")
934 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
936 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
938 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
940 personal = self._personal_node
941 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
943 d1.addCallback(self.log, "doing move_child_to(ro)2")
944 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
946 d1.addCallback(self.log, "finished with _got_s2ro")
948 d.addCallback(_got_s2ro)
949 def _got_home(dummy):
950 home = self._private_node
951 personal = self._personal_node
952 d1 = defer.succeed(None)
953 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
954 d1.addCallback(lambda res:
955 personal.move_child_to(u"sekrit data",home,u"sekrit"))
957 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
958 d1.addCallback(lambda res:
959 home.move_child_to(u"sekrit", home, u"sekrit data"))
961 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
962 d1.addCallback(lambda res:
963 home.move_child_to(u"sekrit data", personal))
965 d1.addCallback(lambda res: home.build_manifest().when_done())
966 d1.addCallback(self.log, "manifest")
970 # P/personal/sekrit data
971 # P/s2-rw (same as P/s2-ro)
972 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
973 d1.addCallback(lambda res:
974 self.failUnlessEqual(len(res["manifest"]), 5))
975 d1.addCallback(lambda res: home.start_deep_stats().when_done())
976 def _check_stats(stats):
977 expected = {"count-immutable-files": 1,
978 "count-mutable-files": 0,
979 "count-literal-files": 1,
981 "count-directories": 3,
982 "size-immutable-files": 112,
983 "size-literal-files": 23,
984 #"size-directories": 616, # varies
985 #"largest-directory": 616,
986 "largest-directory-children": 3,
987 "largest-immutable-file": 112,
989 for k,v in expected.iteritems():
990 self.failUnlessEqual(stats[k], v,
991 "stats[%s] was %s, not %s" %
993 self.failUnless(stats["size-directories"] > 1300,
994 stats["size-directories"])
995 self.failUnless(stats["largest-directory"] > 800,
996 stats["largest-directory"])
997 self.failUnlessEqual(stats["size-files-histogram"],
998 [ (11, 31, 1), (101, 316, 1) ])
999 d1.addCallback(_check_stats)
1001 d.addCallback(_got_home)
1004 def shouldFail(self, res, expected_failure, which, substring=None):
1005 if isinstance(res, Failure):
1006 res.trap(expected_failure)
1008 self.failUnless(substring in str(res),
1009 "substring '%s' not in '%s'"
1010 % (substring, str(res)))
1012 self.fail("%s was supposed to raise %s, not get '%s'" %
1013 (which, expected_failure, res))
1015 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1016 assert substring is None or isinstance(substring, str)
1017 d = defer.maybeDeferred(callable, *args, **kwargs)
1019 if isinstance(res, Failure):
1020 res.trap(expected_failure)
1022 self.failUnless(substring in str(res),
1023 "substring '%s' not in '%s'"
1024 % (substring, str(res)))
1026 self.fail("%s was supposed to raise %s, not get '%s'" %
1027 (which, expected_failure, res))
1031 def PUT(self, urlpath, data):
1032 url = self.webish_url + urlpath
1033 return getPage(url, method="PUT", postdata=data)
1035 def GET(self, urlpath, followRedirect=False):
1036 url = self.webish_url + urlpath
1037 return getPage(url, method="GET", followRedirect=followRedirect)
1039 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1040 sepbase = "boogabooga"
1041 sep = "--" + sepbase
1044 form.append('Content-Disposition: form-data; name="_charset"')
1046 form.append('UTF-8')
1048 for name, value in fields.iteritems():
1049 if isinstance(value, tuple):
1050 filename, value = value
1051 form.append('Content-Disposition: form-data; name="%s"; '
1052 'filename="%s"' % (name, filename.encode("utf-8")))
1054 form.append('Content-Disposition: form-data; name="%s"' % name)
1056 form.append(str(value))
1062 body = "\r\n".join(form) + "\r\n"
1063 headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1064 return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1066 def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1069 url = self.helper_webish_url + urlpath
1071 url = self.webish_url + urlpath
1072 return getPage(url, method="POST", postdata=body, headers=headers,
1073 followRedirect=followRedirect)
1075 def _test_web(self, res):
1076 base = self.webish_url
1077 public = "uri/" + self._root_directory_uri
1079 def _got_welcome(page):
1080 # XXX This test is oversensitive to formatting
1081 expected = "Connected to <span>%d</span>\n of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1082 self.failUnless(expected in page,
1083 "I didn't see the right 'connected storage servers'"
1084 " message in: %s" % page
1086 expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1087 self.failUnless(expected in page,
1088 "I didn't see the right 'My nodeid' message "
1090 self.failUnless("Helper: 0 active uploads" in page)
1091 d.addCallback(_got_welcome)
1092 d.addCallback(self.log, "done with _got_welcome")
1094 # get the welcome page from the node that uses the helper too
1095 d.addCallback(lambda res: getPage(self.helper_webish_url))
1096 def _got_welcome_helper(page):
1097 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1099 self.failUnless("Not running helper" in page)
1100 d.addCallback(_got_welcome_helper)
1102 d.addCallback(lambda res: getPage(base + public))
1103 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1104 def _got_subdir1(page):
1105 # there ought to be an href for our file
1106 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1107 self.failUnless(">mydata567</a>" in page)
1108 d.addCallback(_got_subdir1)
1109 d.addCallback(self.log, "done with _got_subdir1")
1110 d.addCallback(lambda res:
1111 getPage(base + public + "/subdir1/mydata567"))
1112 def _got_data(page):
1113 self.failUnlessEqual(page, self.data)
1114 d.addCallback(_got_data)
1116 # download from a URI embedded in a URL
1117 d.addCallback(self.log, "_get_from_uri")
1118 def _get_from_uri(res):
1119 return getPage(base + "uri/%s?filename=%s"
1120 % (self.uri, "mydata567"))
1121 d.addCallback(_get_from_uri)
1122 def _got_from_uri(page):
1123 self.failUnlessEqual(page, self.data)
1124 d.addCallback(_got_from_uri)
1126 # download from a URI embedded in a URL, second form
1127 d.addCallback(self.log, "_get_from_uri2")
1128 def _get_from_uri2(res):
1129 return getPage(base + "uri?uri=%s" % (self.uri,))
1130 d.addCallback(_get_from_uri2)
1131 d.addCallback(_got_from_uri)
1133 # download from a bogus URI, make sure we get a reasonable error
1134 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1135 def _get_from_bogus_uri(res):
1136 d1 = getPage(base + "uri/%s?filename=%s"
1137 % (self.mangle_uri(self.uri), "mydata567"))
1138 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1141 d.addCallback(_get_from_bogus_uri)
1142 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1144 # upload a file with PUT
1145 d.addCallback(self.log, "about to try PUT")
1146 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1147 "new.txt contents"))
1148 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1149 d.addCallback(self.failUnlessEqual, "new.txt contents")
1150 # and again with something large enough to use multiple segments,
1151 # and hopefully trigger pauseProducing too
1152 def _new_happy_semantics(ign):
1153 for c in self.clients:
1154 # these get reset somewhere? Whatever.
1155 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1156 d.addCallback(_new_happy_semantics)
1157 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1158 "big" * 500000)) # 1.5MB
1159 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1160 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1162 # can we replace files in place?
1163 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1165 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1166 d.addCallback(self.failUnlessEqual, "NEWER contents")
1168 # test unlinked POST
1169 d.addCallback(lambda res: self.POST("uri", t="upload",
1170 file=("new.txt", "data" * 10000)))
1171 # and again using the helper, which exercises different upload-status
1173 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1174 file=("foo.txt", "data2" * 10000)))
1176 # check that the status page exists
1177 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1178 def _got_status(res):
1179 # find an interesting upload and download to look at. LIT files
1180 # are not interesting.
1181 h = self.clients[0].get_history()
1182 for ds in h.list_all_download_statuses():
1183 if ds.get_size() > 200:
1184 self._down_status = ds.get_counter()
1185 for us in h.list_all_upload_statuses():
1186 if us.get_size() > 200:
1187 self._up_status = us.get_counter()
1188 rs = list(h.list_all_retrieve_statuses())[0]
1189 self._retrieve_status = rs.get_counter()
1190 ps = list(h.list_all_publish_statuses())[0]
1191 self._publish_status = ps.get_counter()
1192 us = list(h.list_all_mapupdate_statuses())[0]
1193 self._update_status = us.get_counter()
1195 # and that there are some upload- and download- status pages
1196 return self.GET("status/up-%d" % self._up_status)
1197 d.addCallback(_got_status)
1199 return self.GET("status/down-%d" % self._down_status)
1200 d.addCallback(_got_up)
1202 return self.GET("status/mapupdate-%d" % self._update_status)
1203 d.addCallback(_got_down)
1204 def _got_update(res):
1205 return self.GET("status/publish-%d" % self._publish_status)
1206 d.addCallback(_got_update)
1207 def _got_publish(res):
1208 return self.GET("status/retrieve-%d" % self._retrieve_status)
1209 d.addCallback(_got_publish)
1211 # check that the helper status page exists
1212 d.addCallback(lambda res:
1213 self.GET("helper_status", followRedirect=True))
1214 def _got_helper_status(res):
1215 self.failUnless("Bytes Fetched:" in res)
1216 # touch a couple of files in the helper's working directory to
1217 # exercise more code paths
1218 workdir = os.path.join(self.getdir("client0"), "helper")
1219 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1220 f = open(incfile, "wb")
1221 f.write("small file")
1223 then = time.time() - 86400*3
1225 os.utime(incfile, (now, then))
1226 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1227 f = open(encfile, "wb")
1228 f.write("less small file")
1230 os.utime(encfile, (now, then))
1231 d.addCallback(_got_helper_status)
1232 # and that the json form exists
1233 d.addCallback(lambda res:
1234 self.GET("helper_status?t=json", followRedirect=True))
1235 def _got_helper_status_json(res):
1236 data = simplejson.loads(res)
1237 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1239 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1240 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1241 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1243 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1244 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1245 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1247 d.addCallback(_got_helper_status_json)
1249 # and check that client[3] (which uses a helper but does not run one
1250 # itself) doesn't explode when you ask for its status
1251 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1252 def _got_non_helper_status(res):
1253 self.failUnless("Upload and Download Status" in res)
1254 d.addCallback(_got_non_helper_status)
1256 # or for helper status with t=json
1257 d.addCallback(lambda res:
1258 getPage(self.helper_webish_url + "helper_status?t=json"))
1259 def _got_non_helper_status_json(res):
1260 data = simplejson.loads(res)
1261 self.failUnlessEqual(data, {})
1262 d.addCallback(_got_non_helper_status_json)
1264 # see if the statistics page exists
1265 d.addCallback(lambda res: self.GET("statistics"))
1266 def _got_stats(res):
1267 self.failUnless("Node Statistics" in res)
1268 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1269 d.addCallback(_got_stats)
1270 d.addCallback(lambda res: self.GET("statistics?t=json"))
1271 def _got_stats_json(res):
1272 data = simplejson.loads(res)
1273 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1274 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1275 d.addCallback(_got_stats_json)
1277 # TODO: mangle the second segment of a file, to test errors that
1278 # occur after we've already sent some good data, which uses a
1279 # different error path.
1281 # TODO: download a URI with a form
1282 # TODO: create a directory by using a form
1283 # TODO: upload by using a form on the directory page
1284 # url = base + "somedir/subdir1/freeform_post!!upload"
1285 # TODO: delete a file by using a button on the directory page
1289 def _test_runner(self, res):
1290 # exercise some of the diagnostic tools in runner.py
1293 for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1294 if "storage" not in dirpath:
1298 pieces = dirpath.split(os.sep)
1299 if (len(pieces) >= 4
1300 and pieces[-4] == "storage"
1301 and pieces[-3] == "shares"):
1302 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1303 # are sharefiles here
1304 filename = os.path.join(dirpath, filenames[0])
1305 # peek at the magic to see if it is a chk share
1306 magic = open(filename, "rb").read(4)
1307 if magic == '\x00\x00\x00\x01':
1310 self.fail("unable to find any uri_extension files in %r"
1312 log.msg("test_system.SystemTest._test_runner using %r" % filename)
1314 out,err = StringIO(), StringIO()
1315 rc = runner.runner(["debug", "dump-share", "--offsets",
1316 unicode_to_argv(filename)],
1317 stdout=out, stderr=err)
1318 output = out.getvalue()
1319 self.failUnlessEqual(rc, 0)
1321 # we only upload a single file, so we can assert some things about
1322 # its size and shares.
1323 self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1324 self.failUnlessIn("size: %d\n" % len(self.data), output)
1325 self.failUnlessIn("num_segments: 1\n", output)
1326 # segment_size is always a multiple of needed_shares
1327 self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1328 self.failUnlessIn("total_shares: 10\n", output)
1329 # keys which are supposed to be present
1330 for key in ("size", "num_segments", "segment_size",
1331 "needed_shares", "total_shares",
1332 "codec_name", "codec_params", "tail_codec_params",
1333 #"plaintext_hash", "plaintext_root_hash",
1334 "crypttext_hash", "crypttext_root_hash",
1335 "share_root_hash", "UEB_hash"):
1336 self.failUnlessIn("%s: " % key, output)
1337 self.failUnlessIn(" verify-cap: URI:CHK-Verifier:", output)
1339 # now use its storage index to find the other shares using the
1340 # 'find-shares' tool
1341 sharedir, shnum = os.path.split(filename)
1342 storagedir, storage_index_s = os.path.split(sharedir)
1343 storage_index_s = str(storage_index_s)
1344 out,err = StringIO(), StringIO()
1345 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1346 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1347 rc = runner.runner(cmd, stdout=out, stderr=err)
1348 self.failUnlessEqual(rc, 0)
1350 sharefiles = [sfn.strip() for sfn in out.readlines()]
1351 self.failUnlessEqual(len(sharefiles), 10)
1353 # also exercise the 'catalog-shares' tool
1354 out,err = StringIO(), StringIO()
1355 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1356 cmd = ["debug", "catalog-shares"] + nodedirs
1357 rc = runner.runner(cmd, stdout=out, stderr=err)
1358 self.failUnlessEqual(rc, 0)
1360 descriptions = [sfn.strip() for sfn in out.readlines()]
1361 self.failUnlessEqual(len(descriptions), 30)
1363 for line in descriptions
1364 if line.startswith("CHK %s " % storage_index_s)]
1365 self.failUnlessEqual(len(matching), 10)
1367 def _test_control(self, res):
1368 # exercise the remote-control-the-client foolscap interfaces in
1369 # allmydata.control (mostly used for performance tests)
1370 c0 = self.clients[0]
1371 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1372 control_furl = open(control_furl_file, "r").read().strip()
1373 # it doesn't really matter which Tub we use to connect to the client,
1374 # so let's just use our IntroducerNode's
1375 d = self.introducer.tub.getReference(control_furl)
1376 d.addCallback(self._test_control2, control_furl_file)
1378 def _test_control2(self, rref, filename):
1379 d = rref.callRemote("upload_from_file_to_uri",
1380 filename.encode(get_filesystem_encoding()), convergence=None)
1381 downfile = os.path.join(self.basedir, "control.downfile").encode(get_filesystem_encoding())
1382 d.addCallback(lambda uri:
1383 rref.callRemote("download_from_uri_to_file",
1386 self.failUnlessEqual(res, downfile)
1387 data = open(downfile, "r").read()
1388 expected_data = open(filename, "r").read()
1389 self.failUnlessEqual(data, expected_data)
1390 d.addCallback(_check)
1391 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1392 if sys.platform == "linux2":
1393 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1394 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1397 def _test_cli(self, res):
1398 # run various CLI commands (in a thread, since they use blocking
1401 private_uri = self._private_node.get_uri()
1402 client0_basedir = self.getdir("client0")
1405 "--node-directory", client0_basedir,
1408 d = defer.succeed(None)
1410 # for compatibility with earlier versions, private/root_dir.cap is
1411 # supposed to be treated as an alias named "tahoe:". Start by making
1412 # sure that works, before we add other aliases.
1414 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1415 f = open(root_file, "w")
1416 f.write(private_uri)
1419 def run(ignored, verb, *args, **kwargs):
1420 stdin = kwargs.get("stdin", "")
1421 newargs = [verb] + nodeargs + list(args)
1422 return self._run_cli(newargs, stdin=stdin)
1424 def _check_ls((out,err), expected_children, unexpected_children=[]):
1425 self.failUnlessEqual(err, "")
1426 for s in expected_children:
1427 self.failUnless(s in out, (s,out))
1428 for s in unexpected_children:
1429 self.failIf(s in out, (s,out))
1431 def _check_ls_root((out,err)):
1432 self.failUnless("personal" in out)
1433 self.failUnless("s2-ro" in out)
1434 self.failUnless("s2-rw" in out)
1435 self.failUnlessEqual(err, "")
1437 # this should reference private_uri
1438 d.addCallback(run, "ls")
1439 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1441 d.addCallback(run, "list-aliases")
1442 def _check_aliases_1((out,err)):
1443 self.failUnlessEqual(err, "")
1444 self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1445 d.addCallback(_check_aliases_1)
1447 # now that that's out of the way, remove root_dir.cap and work with
1449 d.addCallback(lambda res: os.unlink(root_file))
1450 d.addCallback(run, "list-aliases")
1451 def _check_aliases_2((out,err)):
1452 self.failUnlessEqual(err, "")
1453 self.failUnlessEqual(out, "")
1454 d.addCallback(_check_aliases_2)
1456 d.addCallback(run, "mkdir")
1457 def _got_dir( (out,err) ):
1458 self.failUnless(uri.from_string_dirnode(out.strip()))
1460 d.addCallback(_got_dir)
1461 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1463 d.addCallback(run, "list-aliases")
1464 def _check_aliases_3((out,err)):
1465 self.failUnlessEqual(err, "")
1466 self.failUnless("tahoe: " in out)
1467 d.addCallback(_check_aliases_3)
1469 def _check_empty_dir((out,err)):
1470 self.failUnlessEqual(out, "")
1471 self.failUnlessEqual(err, "")
1472 d.addCallback(run, "ls")
1473 d.addCallback(_check_empty_dir)
1475 def _check_missing_dir((out,err)):
1476 # TODO: check that rc==2
1477 self.failUnlessEqual(out, "")
1478 self.failUnlessEqual(err, "No such file or directory\n")
1479 d.addCallback(run, "ls", "bogus")
1480 d.addCallback(_check_missing_dir)
1485 fn = os.path.join(self.basedir, "file%d" % i)
1487 data = "data to be uploaded: file%d\n" % i
1489 open(fn,"wb").write(data)
1491 def _check_stdout_against((out,err), filenum=None, data=None):
1492 self.failUnlessEqual(err, "")
1493 if filenum is not None:
1494 self.failUnlessEqual(out, datas[filenum])
1495 if data is not None:
1496 self.failUnlessEqual(out, data)
1498 # test all both forms of put: from a file, and from stdin
1500 d.addCallback(run, "put", files[0], "tahoe-file0")
1501 def _put_out((out,err)):
1502 self.failUnless("URI:LIT:" in out, out)
1503 self.failUnless("201 Created" in err, err)
1505 return run(None, "get", uri0)
1506 d.addCallback(_put_out)
1507 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1509 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1510 # tahoe put bar tahoe:FOO
1511 d.addCallback(run, "put", files[2], "tahoe:file2")
1512 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1513 def _check_put_mutable((out,err)):
1514 self._mutable_file3_uri = out.strip()
1515 d.addCallback(_check_put_mutable)
1516 d.addCallback(run, "get", "tahoe:file3")
1517 d.addCallback(_check_stdout_against, 3)
1520 STDIN_DATA = "This is the file to upload from stdin."
1521 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1522 # tahoe put tahoe:FOO
1523 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1524 stdin="Other file from stdin.")
1526 d.addCallback(run, "ls")
1527 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1528 "tahoe-file-stdin", "from-stdin"])
1529 d.addCallback(run, "ls", "subdir")
1530 d.addCallback(_check_ls, ["tahoe-file1"])
1533 d.addCallback(run, "mkdir", "subdir2")
1534 d.addCallback(run, "ls")
1535 # TODO: extract the URI, set an alias with it
1536 d.addCallback(_check_ls, ["subdir2"])
1538 # tahoe get: (to stdin and to a file)
1539 d.addCallback(run, "get", "tahoe-file0")
1540 d.addCallback(_check_stdout_against, 0)
1541 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1542 d.addCallback(_check_stdout_against, 1)
1543 outfile0 = os.path.join(self.basedir, "outfile0")
1544 d.addCallback(run, "get", "file2", outfile0)
1545 def _check_outfile0((out,err)):
1546 data = open(outfile0,"rb").read()
1547 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1548 d.addCallback(_check_outfile0)
1549 outfile1 = os.path.join(self.basedir, "outfile0")
1550 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1551 def _check_outfile1((out,err)):
1552 data = open(outfile1,"rb").read()
1553 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1554 d.addCallback(_check_outfile1)
1556 d.addCallback(run, "rm", "tahoe-file0")
1557 d.addCallback(run, "rm", "tahoe:file2")
1558 d.addCallback(run, "ls")
1559 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1561 d.addCallback(run, "ls", "-l")
1562 def _check_ls_l((out,err)):
1563 lines = out.split("\n")
1565 if "tahoe-file-stdin" in l:
1566 self.failUnless(l.startswith("-r-- "), l)
1567 self.failUnless(" %d " % len(STDIN_DATA) in l)
1569 self.failUnless(l.startswith("-rw- "), l) # mutable
1570 d.addCallback(_check_ls_l)
1572 d.addCallback(run, "ls", "--uri")
1573 def _check_ls_uri((out,err)):
1574 lines = out.split("\n")
1577 self.failUnless(self._mutable_file3_uri in l)
1578 d.addCallback(_check_ls_uri)
1580 d.addCallback(run, "ls", "--readonly-uri")
1581 def _check_ls_rouri((out,err)):
1582 lines = out.split("\n")
1585 rw_uri = self._mutable_file3_uri
1586 u = uri.from_string_mutable_filenode(rw_uri)
1587 ro_uri = u.get_readonly().to_string()
1588 self.failUnless(ro_uri in l)
1589 d.addCallback(_check_ls_rouri)
1592 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1593 d.addCallback(run, "ls")
1594 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1596 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1597 d.addCallback(run, "ls")
1598 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1600 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1601 d.addCallback(run, "ls")
1602 d.addCallback(_check_ls, ["file3", "file3-copy"])
1603 d.addCallback(run, "get", "tahoe:file3-copy")
1604 d.addCallback(_check_stdout_against, 3)
1606 # copy from disk into tahoe
1607 d.addCallback(run, "cp", files[4], "tahoe:file4")
1608 d.addCallback(run, "ls")
1609 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1610 d.addCallback(run, "get", "tahoe:file4")
1611 d.addCallback(_check_stdout_against, 4)
1613 # copy from tahoe into disk
1614 target_filename = os.path.join(self.basedir, "file-out")
1615 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1616 def _check_cp_out((out,err)):
1617 self.failUnless(os.path.exists(target_filename))
1618 got = open(target_filename,"rb").read()
1619 self.failUnlessEqual(got, datas[4])
1620 d.addCallback(_check_cp_out)
1622 # copy from disk to disk (silly case)
1623 target2_filename = os.path.join(self.basedir, "file-out-copy")
1624 d.addCallback(run, "cp", target_filename, target2_filename)
1625 def _check_cp_out2((out,err)):
1626 self.failUnless(os.path.exists(target2_filename))
1627 got = open(target2_filename,"rb").read()
1628 self.failUnlessEqual(got, datas[4])
1629 d.addCallback(_check_cp_out2)
1631 # copy from tahoe into disk, overwriting an existing file
1632 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1633 def _check_cp_out3((out,err)):
1634 self.failUnless(os.path.exists(target_filename))
1635 got = open(target_filename,"rb").read()
1636 self.failUnlessEqual(got, datas[3])
1637 d.addCallback(_check_cp_out3)
1639 # copy from disk into tahoe, overwriting an existing immutable file
1640 d.addCallback(run, "cp", files[5], "tahoe:file4")
1641 d.addCallback(run, "ls")
1642 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1643 d.addCallback(run, "get", "tahoe:file4")
1644 d.addCallback(_check_stdout_against, 5)
1646 # copy from disk into tahoe, overwriting an existing mutable file
1647 d.addCallback(run, "cp", files[5], "tahoe:file3")
1648 d.addCallback(run, "ls")
1649 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1650 d.addCallback(run, "get", "tahoe:file3")
1651 d.addCallback(_check_stdout_against, 5)
1653 # recursive copy: setup
1654 dn = os.path.join(self.basedir, "dir1")
1656 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1657 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1658 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1659 sdn2 = os.path.join(dn, "subdir2")
1661 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1662 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1664 # from disk into tahoe
1665 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1666 d.addCallback(run, "ls")
1667 d.addCallback(_check_ls, ["dir1"])
1668 d.addCallback(run, "ls", "dir1")
1669 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1670 ["rfile4", "rfile5"])
1671 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1672 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1673 ["rfile1", "rfile2", "rfile3"])
1674 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1675 d.addCallback(_check_stdout_against, data="rfile4")
1677 # and back out again
1678 dn_copy = os.path.join(self.basedir, "dir1-copy")
1679 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1680 def _check_cp_r_out((out,err)):
1682 old = open(os.path.join(dn, name), "rb").read()
1683 newfn = os.path.join(dn_copy, name)
1684 self.failUnless(os.path.exists(newfn))
1685 new = open(newfn, "rb").read()
1686 self.failUnlessEqual(old, new)
1690 _cmp(os.path.join("subdir2", "rfile4"))
1691 _cmp(os.path.join("subdir2", "rfile5"))
1692 d.addCallback(_check_cp_r_out)
1694 # and copy it a second time, which ought to overwrite the same files
1695 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1697 # and again, only writing filecaps
1698 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1699 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1700 def _check_capsonly((out,err)):
1701 # these should all be LITs
1702 x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1703 y = uri.from_string_filenode(x)
1704 self.failUnlessEqual(y.data, "rfile4")
1705 d.addCallback(_check_capsonly)
1707 # and tahoe-to-tahoe
1708 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1709 d.addCallback(run, "ls")
1710 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1711 d.addCallback(run, "ls", "dir1-copy")
1712 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1713 ["rfile4", "rfile5"])
1714 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1715 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1716 ["rfile1", "rfile2", "rfile3"])
1717 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1718 d.addCallback(_check_stdout_against, data="rfile4")
1720 # and copy it a second time, which ought to overwrite the same files
1721 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1723 # tahoe_ls doesn't currently handle the error correctly: it tries to
1724 # JSON-parse a traceback.
1725 ## def _ls_missing(res):
1726 ## argv = ["ls"] + nodeargs + ["bogus"]
1727 ## return self._run_cli(argv)
1728 ## d.addCallback(_ls_missing)
1729 ## def _check_ls_missing((out,err)):
1732 ## self.failUnlessEqual(err, "")
1733 ## d.addCallback(_check_ls_missing)
1737 def _run_cli(self, argv, stdin=""):
1739 stdout, stderr = StringIO(), StringIO()
1740 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1741 stdin=StringIO(stdin),
1742 stdout=stdout, stderr=stderr)
1744 return stdout.getvalue(), stderr.getvalue()
1745 d.addCallback(_done)
1748 def _test_checker(self, res):
1749 ut = upload.Data("too big to be literal" * 200, convergence=None)
1750 d = self._personal_node.add_file(u"big file", ut)
1752 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1753 def _check_dirnode_results(r):
1754 self.failUnless(r.is_healthy())
1755 d.addCallback(_check_dirnode_results)
1756 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1757 d.addCallback(_check_dirnode_results)
1759 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1760 def _got_chk_filenode(n):
1761 self.failUnless(isinstance(n, ImmutableFileNode))
1762 d = n.check(Monitor())
1763 def _check_filenode_results(r):
1764 self.failUnless(r.is_healthy())
1765 d.addCallback(_check_filenode_results)
1766 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1767 d.addCallback(_check_filenode_results)
1769 d.addCallback(_got_chk_filenode)
1771 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1772 def _got_lit_filenode(n):
1773 self.failUnless(isinstance(n, LiteralFileNode))
1774 d = n.check(Monitor())
1775 def _check_lit_filenode_results(r):
1776 self.failUnlessEqual(r, None)
1777 d.addCallback(_check_lit_filenode_results)
1778 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1779 d.addCallback(_check_lit_filenode_results)
1781 d.addCallback(_got_lit_filenode)