1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
9 from allmydata import uri
10 from allmydata.storage.mutable import MutableShareFile
11 from allmydata.storage.server import si_a2b
12 from allmydata.immutable import offloaded, upload
13 from allmydata.immutable.literal import LiteralFileNode
14 from allmydata.immutable.filenode import ImmutableFileNode
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding
18 from allmydata.util.fileutil import abspath_expanduser_unicode
19 from allmydata.util.consumer import MemoryConsumer, download_to_data
20 from allmydata.scripts import runner
21 from allmydata.interfaces import IDirectoryNode, IFileNode, \
22 NoSuchChildError, NoSharesError
23 from allmydata.monitor import Monitor
24 from allmydata.mutable.common import NotWriteableError
25 from allmydata.mutable import layout as mutable_layout
26 from allmydata.mutable.publish import MutableData
27 from foolscap.api import DeadReferenceError, fireEventually
28 from twisted.python.failure import Failure
29 from twisted.web.client import getPage
30 from twisted.web.error import Error
32 from allmydata.test.common import SystemTestMixin
34 # TODO: move this to common or common_util
35 from allmydata.test.test_runner import RunBinTahoeMixin
38 This is some data to publish to the remote grid.., which needs to be large
39 enough to not fit inside a LIT uri.
42 class CountingDataUploadable(upload.Data):
44 interrupt_after = None
45 interrupt_after_d = None
47 def read(self, length):
48 self.bytes_read += length
49 if self.interrupt_after is not None:
50 if self.bytes_read > self.interrupt_after:
51 self.interrupt_after = None
52 self.interrupt_after_d.callback(self)
53 return upload.Data.read(self, length)
55 class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
56 timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
58 def test_connections(self):
59 self.basedir = "system/SystemTest/test_connections"
60 d = self.set_up_nodes()
61 self.extra_node = None
62 d.addCallback(lambda res: self.add_extra_node(self.numclients))
63 def _check(extra_node):
64 self.extra_node = extra_node
65 for c in self.clients:
66 all_peerids = c.get_storage_broker().get_all_serverids()
67 self.failUnlessEqual(len(all_peerids), self.numclients+1)
69 permuted_peers = sb.get_servers_for_psi("a")
70 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
73 def _shutdown_extra_node(res):
75 return self.extra_node.stopService()
77 d.addBoth(_shutdown_extra_node)
79 # test_connections is subsumed by test_upload_and_download, and takes
80 # quite a while to run on a slow machine (because of all the TLS
81 # connections that must be established). If we ever rework the introducer
82 # code to such an extent that we're not sure if it works anymore, we can
83 # reinstate this test until it does.
86 def test_upload_and_download_random_key(self):
87 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
88 return self._test_upload_and_download(convergence=None)
90 def test_upload_and_download_convergent(self):
91 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
92 return self._test_upload_and_download(convergence="some convergence string")
94 def _test_upload_and_download(self, convergence):
95 # we use 4000 bytes of data, which will result in about 400k written
96 # to disk among all our simulated nodes
97 DATA = "Some data to upload\n" * 200
98 d = self.set_up_nodes()
99 def _check_connections(res):
100 for c in self.clients:
101 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
102 all_peerids = c.get_storage_broker().get_all_serverids()
103 self.failUnlessEqual(len(all_peerids), self.numclients)
104 sb = c.storage_broker
105 permuted_peers = sb.get_servers_for_psi("a")
106 self.failUnlessEqual(len(permuted_peers), self.numclients)
107 d.addCallback(_check_connections)
111 u = self.clients[0].getServiceNamed("uploader")
113 # we crank the max segsize down to 1024b for the duration of this
114 # test, so we can exercise multiple segments. It is important
115 # that this is not a multiple of the segment size, so that the
116 # tail segment is not the same length as the others. This actualy
117 # gets rounded up to 1025 to be a multiple of the number of
118 # required shares (since we use 25 out of 100 FEC).
119 up = upload.Data(DATA, convergence=convergence)
120 up.max_segment_size = 1024
123 d.addCallback(_do_upload)
124 def _upload_done(results):
125 theuri = results.get_uri()
126 log.msg("upload finished: uri is %s" % (theuri,))
128 assert isinstance(self.uri, str), self.uri
129 self.cap = uri.from_string(self.uri)
130 self.n = self.clients[1].create_node_from_uri(self.uri)
131 d.addCallback(_upload_done)
133 def _upload_again(res):
134 # Upload again. If using convergent encryption then this ought to be
135 # short-circuited, however with the way we currently generate URIs
136 # (i.e. because they include the roothash), we have to do all of the
137 # encoding work, and only get to save on the upload part.
138 log.msg("UPLOADING AGAIN")
139 up = upload.Data(DATA, convergence=convergence)
140 up.max_segment_size = 1024
141 return self.uploader.upload(up)
142 d.addCallback(_upload_again)
144 def _download_to_data(res):
145 log.msg("DOWNLOADING")
146 return download_to_data(self.n)
147 d.addCallback(_download_to_data)
148 def _download_to_data_done(data):
149 log.msg("download finished")
150 self.failUnlessEqual(data, DATA)
151 d.addCallback(_download_to_data_done)
154 n = self.clients[1].create_node_from_uri(self.uri)
155 d = download_to_data(n)
156 def _read_done(data):
157 self.failUnlessEqual(data, DATA)
158 d.addCallback(_read_done)
159 d.addCallback(lambda ign:
160 n.read(MemoryConsumer(), offset=1, size=4))
161 def _read_portion_done(mc):
162 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
163 d.addCallback(_read_portion_done)
164 d.addCallback(lambda ign:
165 n.read(MemoryConsumer(), offset=2, size=None))
166 def _read_tail_done(mc):
167 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
168 d.addCallback(_read_tail_done)
169 d.addCallback(lambda ign:
170 n.read(MemoryConsumer(), size=len(DATA)+1000))
171 def _read_too_much(mc):
172 self.failUnlessEqual("".join(mc.chunks), DATA)
173 d.addCallback(_read_too_much)
176 d.addCallback(_test_read)
178 def _test_bad_read(res):
179 bad_u = uri.from_string_filenode(self.uri)
180 bad_u.key = self.flip_bit(bad_u.key)
181 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
182 # this should cause an error during download
184 d = self.shouldFail2(NoSharesError, "'download bad node'",
186 bad_n.read, MemoryConsumer(), offset=2)
188 d.addCallback(_test_bad_read)
190 def _download_nonexistent_uri(res):
191 baduri = self.mangle_uri(self.uri)
192 badnode = self.clients[1].create_node_from_uri(baduri)
193 log.msg("about to download non-existent URI", level=log.UNUSUAL,
194 facility="tahoe.tests")
195 d1 = download_to_data(badnode)
196 def _baduri_should_fail(res):
197 log.msg("finished downloading non-existent URI",
198 level=log.UNUSUAL, facility="tahoe.tests")
199 self.failUnless(isinstance(res, Failure))
200 self.failUnless(res.check(NoSharesError),
201 "expected NoSharesError, got %s" % res)
202 d1.addBoth(_baduri_should_fail)
204 d.addCallback(_download_nonexistent_uri)
206 # add a new node, which doesn't accept shares, and only uses the
208 d.addCallback(lambda res: self.add_extra_node(self.numclients,
210 add_to_sparent=True))
211 def _added(extra_node):
212 self.extra_node = extra_node
213 self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
214 d.addCallback(_added)
217 uploader = self.extra_node.getServiceNamed("uploader")
218 furl, connected = uploader.get_helper_info()
220 d.addCallback(lambda ign: self.poll(_has_helper))
222 HELPER_DATA = "Data that needs help to upload" * 1000
223 def _upload_with_helper(res):
224 u = upload.Data(HELPER_DATA, convergence=convergence)
225 d = self.extra_node.upload(u)
226 def _uploaded(results):
227 n = self.clients[1].create_node_from_uri(results.get_uri())
228 return download_to_data(n)
229 d.addCallback(_uploaded)
231 self.failUnlessEqual(newdata, HELPER_DATA)
232 d.addCallback(_check)
234 d.addCallback(_upload_with_helper)
236 def _upload_duplicate_with_helper(res):
237 u = upload.Data(HELPER_DATA, convergence=convergence)
238 u.debug_stash_RemoteEncryptedUploadable = True
239 d = self.extra_node.upload(u)
240 def _uploaded(results):
241 n = self.clients[1].create_node_from_uri(results.get_uri())
242 return download_to_data(n)
243 d.addCallback(_uploaded)
245 self.failUnlessEqual(newdata, HELPER_DATA)
246 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
247 "uploadable started uploading, should have been avoided")
248 d.addCallback(_check)
250 if convergence is not None:
251 d.addCallback(_upload_duplicate_with_helper)
253 d.addCallback(fireEventually)
255 def _upload_resumable(res):
256 DATA = "Data that needs help to upload and gets interrupted" * 1000
257 u1 = CountingDataUploadable(DATA, convergence=convergence)
258 u2 = CountingDataUploadable(DATA, convergence=convergence)
260 # we interrupt the connection after about 5kB by shutting down
261 # the helper, then restarting it.
262 u1.interrupt_after = 5000
263 u1.interrupt_after_d = defer.Deferred()
264 bounced_d = defer.Deferred()
266 d = self.bounce_client(0)
267 d.addBoth(bounced_d.callback)
268 u1.interrupt_after_d.addCallback(_do_bounce)
270 # sneak into the helper and reduce its chunk size, so that our
271 # debug_interrupt will sever the connection on about the fifth
272 # chunk fetched. This makes sure that we've started to write the
273 # new shares before we abandon them, which exercises the
274 # abort/delete-partial-share code. TODO: find a cleaner way to do
275 # this. I know that this will affect later uses of the helper in
276 # this same test run, but I'm not currently worried about it.
277 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
279 upload_d = self.extra_node.upload(u1)
280 # The upload will start, and bounce_client() will be called after
281 # about 5kB. bounced_d will fire after bounce_client() finishes
282 # shutting down and restarting the node.
285 # By this point, the upload should have failed because of the
286 # interruption. upload_d will fire in a moment
287 def _should_not_finish(res):
288 self.fail("interrupted upload should have failed, not"
289 " finished with result %s" % (res,))
291 f.trap(DeadReferenceError)
292 # make sure we actually interrupted it before finishing
294 self.failUnless(u1.bytes_read < len(DATA),
295 "read %d out of %d total" %
296 (u1.bytes_read, len(DATA)))
297 upload_d.addCallbacks(_should_not_finish, _interrupted)
299 d.addCallback(_bounced)
301 def _disconnected(res):
302 # check to make sure the storage servers aren't still hanging
303 # on to the partial share: their incoming/ directories should
305 log.msg("disconnected", level=log.NOISY,
306 facility="tahoe.test.test_system")
307 for i in range(self.numclients):
308 incdir = os.path.join(self.getdir("client%d" % i),
309 "storage", "shares", "incoming")
310 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
311 d.addCallback(_disconnected)
313 d.addCallback(lambda res:
314 log.msg("wait_for_helper", level=log.NOISY,
315 facility="tahoe.test.test_system"))
316 # then we need to wait for the extra node to reestablish its
317 # connection to the helper.
318 d.addCallback(lambda ign: self.poll(_has_helper))
320 d.addCallback(lambda res:
321 log.msg("uploading again", level=log.NOISY,
322 facility="tahoe.test.test_system"))
323 d.addCallback(lambda res: self.extra_node.upload(u2))
325 def _uploaded(results):
326 cap = results.get_uri()
327 log.msg("Second upload complete", level=log.NOISY,
328 facility="tahoe.test.test_system")
330 # this is really bytes received rather than sent, but it's
331 # convenient and basically measures the same thing
332 bytes_sent = results.get_ciphertext_fetched()
333 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
335 # We currently don't support resumption of upload if the data is
336 # encrypted with a random key. (Because that would require us
337 # to store the key locally and re-use it on the next upload of
338 # this file, which isn't a bad thing to do, but we currently
340 if convergence is not None:
341 # Make sure we did not have to read the whole file the
342 # second time around .
343 self.failUnless(bytes_sent < len(DATA),
344 "resumption didn't save us any work:"
345 " read %r bytes out of %r total" %
346 (bytes_sent, len(DATA)))
348 # Make sure we did have to read the whole file the second
349 # time around -- because the one that we partially uploaded
350 # earlier was encrypted with a different random key.
351 self.failIf(bytes_sent < len(DATA),
352 "resumption saved us some work even though we were using random keys:"
353 " read %r bytes out of %r total" %
354 (bytes_sent, len(DATA)))
355 n = self.clients[1].create_node_from_uri(cap)
356 return download_to_data(n)
357 d.addCallback(_uploaded)
360 self.failUnlessEqual(newdata, DATA)
361 # If using convergent encryption, then also check that the
362 # helper has removed the temp file from its directories.
363 if convergence is not None:
364 basedir = os.path.join(self.getdir("client0"), "helper")
365 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
366 self.failUnlessEqual(files, [])
367 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
368 self.failUnlessEqual(files, [])
369 d.addCallback(_check)
371 d.addCallback(_upload_resumable)
373 def _grab_stats(ignored):
374 # the StatsProvider doesn't normally publish a FURL:
375 # instead it passes a live reference to the StatsGatherer
376 # (if and when it connects). To exercise the remote stats
377 # interface, we manually publish client0's StatsProvider
378 # and use client1 to query it.
379 sp = self.clients[0].stats_provider
380 sp_furl = self.clients[0].tub.registerReference(sp)
381 d = self.clients[1].tub.getReference(sp_furl)
382 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
383 def _got_stats(stats):
385 #from pprint import pprint
388 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
389 c = stats["counters"]
390 self.failUnless("storage_server.allocate" in c)
391 d.addCallback(_got_stats)
393 d.addCallback(_grab_stats)
397 def _find_all_shares(self, basedir):
399 for (dirpath, dirnames, filenames) in os.walk(basedir):
400 if "storage" not in dirpath:
404 pieces = dirpath.split(os.sep)
406 and pieces[-4] == "storage"
407 and pieces[-3] == "shares"):
408 # we're sitting in .../storage/shares/$START/$SINDEX , and there
409 # are sharefiles here
410 assert pieces[-5].startswith("client")
411 client_num = int(pieces[-5][-1])
412 storage_index_s = pieces[-1]
413 storage_index = si_a2b(storage_index_s)
414 for sharename in filenames:
415 shnum = int(sharename)
416 filename = os.path.join(dirpath, sharename)
417 data = (client_num, storage_index, filename, shnum)
420 self.fail("unable to find any share files in %s" % basedir)
423 def _corrupt_mutable_share(self, filename, which):
424 msf = MutableShareFile(filename)
425 datav = msf.readv([ (0, 1000000) ])
426 final_share = datav[0]
427 assert len(final_share) < 1000000 # ought to be truncated
428 pieces = mutable_layout.unpack_share(final_share)
429 (seqnum, root_hash, IV, k, N, segsize, datalen,
430 verification_key, signature, share_hash_chain, block_hash_tree,
431 share_data, enc_privkey) = pieces
433 if which == "seqnum":
436 root_hash = self.flip_bit(root_hash)
438 IV = self.flip_bit(IV)
439 elif which == "segsize":
440 segsize = segsize + 15
441 elif which == "pubkey":
442 verification_key = self.flip_bit(verification_key)
443 elif which == "signature":
444 signature = self.flip_bit(signature)
445 elif which == "share_hash_chain":
446 nodenum = share_hash_chain.keys()[0]
447 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
448 elif which == "block_hash_tree":
449 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
450 elif which == "share_data":
451 share_data = self.flip_bit(share_data)
452 elif which == "encprivkey":
453 enc_privkey = self.flip_bit(enc_privkey)
455 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
457 final_share = mutable_layout.pack_share(prefix,
464 msf.writev( [(0, final_share)], None)
467 def test_mutable(self):
468 self.basedir = "system/SystemTest/test_mutable"
469 DATA = "initial contents go here." # 25 bytes % 3 != 0
470 DATA_uploadable = MutableData(DATA)
471 NEWDATA = "new contents yay"
472 NEWDATA_uploadable = MutableData(NEWDATA)
473 NEWERDATA = "this is getting old"
474 NEWERDATA_uploadable = MutableData(NEWERDATA)
476 d = self.set_up_nodes(use_key_generator=True)
478 def _create_mutable(res):
480 log.msg("starting create_mutable_file")
481 d1 = c.create_mutable_file(DATA_uploadable)
483 log.msg("DONE: %s" % (res,))
484 self._mutable_node_1 = res
485 d1.addCallback(_done)
487 d.addCallback(_create_mutable)
489 def _test_debug(res):
490 # find a share. It is important to run this while there is only
491 # one slot in the grid.
492 shares = self._find_all_shares(self.basedir)
493 (client_num, storage_index, filename, shnum) = shares[0]
494 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
496 log.msg(" for clients[%d]" % client_num)
498 out,err = StringIO(), StringIO()
499 rc = runner.runner(["debug", "dump-share", "--offsets",
501 stdout=out, stderr=err)
502 output = out.getvalue()
503 self.failUnlessEqual(rc, 0)
505 self.failUnless("Mutable slot found:\n" in output)
506 self.failUnless("share_type: SDMF\n" in output)
507 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
508 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
509 self.failUnless(" num_extra_leases: 0\n" in output)
510 self.failUnless(" secrets are for nodeid: %s\n" % peerid
512 self.failUnless(" SDMF contents:\n" in output)
513 self.failUnless(" seqnum: 1\n" in output)
514 self.failUnless(" required_shares: 3\n" in output)
515 self.failUnless(" total_shares: 10\n" in output)
516 self.failUnless(" segsize: 27\n" in output, (output, filename))
517 self.failUnless(" datalen: 25\n" in output)
518 # the exact share_hash_chain nodes depends upon the sharenum,
519 # and is more of a hassle to compute than I want to deal with
521 self.failUnless(" share_hash_chain: " in output)
522 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
523 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
524 base32.b2a(storage_index))
525 self.failUnless(expected in output)
526 except unittest.FailTest:
528 print "dump-share output was:"
531 d.addCallback(_test_debug)
535 # first, let's see if we can use the existing node to retrieve the
536 # contents. This allows it to use the cached pubkey and maybe the
537 # latest-known sharemap.
539 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
540 def _check_download_1(res):
541 self.failUnlessEqual(res, DATA)
542 # now we see if we can retrieve the data from a new node,
543 # constructed using the URI of the original one. We do this test
544 # on the same client that uploaded the data.
545 uri = self._mutable_node_1.get_uri()
546 log.msg("starting retrieve1")
547 newnode = self.clients[0].create_node_from_uri(uri)
548 newnode_2 = self.clients[0].create_node_from_uri(uri)
549 self.failUnlessIdentical(newnode, newnode_2)
550 return newnode.download_best_version()
551 d.addCallback(_check_download_1)
553 def _check_download_2(res):
554 self.failUnlessEqual(res, DATA)
555 # same thing, but with a different client
556 uri = self._mutable_node_1.get_uri()
557 newnode = self.clients[1].create_node_from_uri(uri)
558 log.msg("starting retrieve2")
559 d1 = newnode.download_best_version()
560 d1.addCallback(lambda res: (res, newnode))
562 d.addCallback(_check_download_2)
564 def _check_download_3((res, newnode)):
565 self.failUnlessEqual(res, DATA)
567 log.msg("starting replace1")
568 d1 = newnode.overwrite(NEWDATA_uploadable)
569 d1.addCallback(lambda res: newnode.download_best_version())
571 d.addCallback(_check_download_3)
573 def _check_download_4(res):
574 self.failUnlessEqual(res, NEWDATA)
575 # now create an even newer node and replace the data on it. This
576 # new node has never been used for download before.
577 uri = self._mutable_node_1.get_uri()
578 newnode1 = self.clients[2].create_node_from_uri(uri)
579 newnode2 = self.clients[3].create_node_from_uri(uri)
580 self._newnode3 = self.clients[3].create_node_from_uri(uri)
581 log.msg("starting replace2")
582 d1 = newnode1.overwrite(NEWERDATA_uploadable)
583 d1.addCallback(lambda res: newnode2.download_best_version())
585 d.addCallback(_check_download_4)
587 def _check_download_5(res):
588 log.msg("finished replace2")
589 self.failUnlessEqual(res, NEWERDATA)
590 d.addCallback(_check_download_5)
592 def _corrupt_shares(res):
593 # run around and flip bits in all but k of the shares, to test
595 shares = self._find_all_shares(self.basedir)
596 ## sort by share number
597 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
598 where = dict([ (shnum, filename)
599 for (client_num, storage_index, filename, shnum)
601 assert len(where) == 10 # this test is designed for 3-of-10
602 for shnum, filename in where.items():
603 # shares 7,8,9 are left alone. read will check
604 # (share_hash_chain, block_hash_tree, share_data). New
605 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
606 # segsize, signature).
608 # read: this will trigger "pubkey doesn't match
610 self._corrupt_mutable_share(filename, "pubkey")
611 self._corrupt_mutable_share(filename, "encprivkey")
613 # triggers "signature is invalid"
614 self._corrupt_mutable_share(filename, "seqnum")
616 # triggers "signature is invalid"
617 self._corrupt_mutable_share(filename, "R")
619 # triggers "signature is invalid"
620 self._corrupt_mutable_share(filename, "segsize")
622 self._corrupt_mutable_share(filename, "share_hash_chain")
624 self._corrupt_mutable_share(filename, "block_hash_tree")
626 self._corrupt_mutable_share(filename, "share_data")
627 # other things to correct: IV, signature
628 # 7,8,9 are left alone
630 # note that initial_query_count=5 means that we'll hit the
631 # first 5 servers in effectively random order (based upon
632 # response time), so we won't necessarily ever get a "pubkey
633 # doesn't match fingerprint" error (if we hit shnum>=1 before
634 # shnum=0, we pull the pubkey from there). To get repeatable
635 # specific failures, we need to set initial_query_count=1,
636 # but of course that will change the sequencing behavior of
637 # the retrieval process. TODO: find a reasonable way to make
638 # this a parameter, probably when we expand this test to test
639 # for one failure mode at a time.
641 # when we retrieve this, we should get three signature
642 # failures (where we've mangled seqnum, R, and segsize). The
644 d.addCallback(_corrupt_shares)
646 d.addCallback(lambda res: self._newnode3.download_best_version())
647 d.addCallback(_check_download_5)
649 def _check_empty_file(res):
650 # make sure we can create empty files, this usually screws up the
652 d1 = self.clients[2].create_mutable_file(MutableData(""))
653 d1.addCallback(lambda newnode: newnode.download_best_version())
654 d1.addCallback(lambda res: self.failUnlessEqual("", res))
656 d.addCallback(_check_empty_file)
658 d.addCallback(lambda res: self.clients[0].create_dirnode())
659 def _created_dirnode(dnode):
660 log.msg("_created_dirnode(%s)" % (dnode,))
662 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
663 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
664 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
665 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
666 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
667 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
668 d1.addCallback(lambda res: dnode.build_manifest().when_done())
669 d1.addCallback(lambda res:
670 self.failUnlessEqual(len(res["manifest"]), 1))
672 d.addCallback(_created_dirnode)
674 def wait_for_c3_kg_conn():
675 return self.clients[3]._key_generator is not None
676 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
678 def check_kg_poolsize(junk, size_delta):
679 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
680 self.key_generator_svc.key_generator.pool_size + size_delta)
682 d.addCallback(check_kg_poolsize, 0)
683 d.addCallback(lambda junk:
684 self.clients[3].create_mutable_file(MutableData('hello, world')))
685 d.addCallback(check_kg_poolsize, -1)
686 d.addCallback(lambda junk: self.clients[3].create_dirnode())
687 d.addCallback(check_kg_poolsize, -2)
688 # use_helper induces use of clients[3], which is the using-key_gen client
689 d.addCallback(lambda junk:
690 self.POST("uri?t=mkdir&name=george", use_helper=True))
691 d.addCallback(check_kg_poolsize, -3)
695 def flip_bit(self, good):
696 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
698 def mangle_uri(self, gooduri):
699 # change the key, which changes the storage index, which means we'll
700 # be asking about the wrong file, so nobody will have any shares
701 u = uri.from_string(gooduri)
702 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
703 uri_extension_hash=u.uri_extension_hash,
704 needed_shares=u.needed_shares,
705 total_shares=u.total_shares,
707 return u2.to_string()
709 # TODO: add a test which mangles the uri_extension_hash instead, and
710 # should fail due to not being able to get a valid uri_extension block.
711 # Also a test which sneakily mangles the uri_extension block to change
712 # some of the validation data, so it will fail in the post-download phase
713 # when the file's crypttext integrity check fails. Do the same thing for
714 # the key, which should cause the download to fail the post-download
715 # plaintext_hash check.
717 def test_filesystem(self):
718 self.basedir = "system/SystemTest/test_filesystem"
719 self.data = LARGE_DATA
720 d = self.set_up_nodes(use_stats_gatherer=True)
721 def _new_happy_semantics(ign):
722 for c in self.clients:
723 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
724 d.addCallback(_new_happy_semantics)
725 d.addCallback(self._test_introweb)
726 d.addCallback(self.log, "starting publish")
727 d.addCallback(self._do_publish1)
728 d.addCallback(self._test_runner)
729 d.addCallback(self._do_publish2)
730 # at this point, we have the following filesystem (where "R" denotes
731 # self._root_directory_uri):
734 # R/subdir1/mydata567
736 # R/subdir1/subdir2/mydata992
738 d.addCallback(lambda res: self.bounce_client(0))
739 d.addCallback(self.log, "bounced client0")
741 d.addCallback(self._check_publish1)
742 d.addCallback(self.log, "did _check_publish1")
743 d.addCallback(self._check_publish2)
744 d.addCallback(self.log, "did _check_publish2")
745 d.addCallback(self._do_publish_private)
746 d.addCallback(self.log, "did _do_publish_private")
747 # now we also have (where "P" denotes a new dir):
748 # P/personal/sekrit data
749 # P/s2-rw -> /subdir1/subdir2/
750 # P/s2-ro -> /subdir1/subdir2/ (read-only)
751 d.addCallback(self._check_publish_private)
752 d.addCallback(self.log, "did _check_publish_private")
753 d.addCallback(self._test_web)
754 d.addCallback(self._test_control)
755 d.addCallback(self._test_cli)
756 # P now has four top-level children:
757 # P/personal/sekrit data
760 # P/test_put/ (empty)
761 d.addCallback(self._test_checker)
764 def _test_introweb(self, res):
765 d = getPage(self.introweb_url, method="GET", followRedirect=True)
768 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res)
769 verstr = str(allmydata.__version__)
771 # The Python "rational version numbering" convention
772 # disallows "-r$REV" but allows ".post$REV"
773 # instead. Eventually we'll probably move to
774 # that. When we do, this test won't go red:
775 ix = verstr.rfind('-r')
777 altverstr = verstr[:ix] + '.post' + verstr[ix+2:]
779 ix = verstr.rfind('.post')
781 altverstr = verstr[:ix] + '-r' + verstr[ix+5:]
785 appverstr = "%s: %s" % (allmydata.__appname__, verstr)
786 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
788 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
789 self.failUnless("Announcement Summary: storage: 5" in res)
790 self.failUnless("Subscription Summary: storage: 5" in res)
791 self.failUnless("tahoe.css" in res)
792 except unittest.FailTest:
794 print "GET %s output was:" % self.introweb_url
797 d.addCallback(_check)
798 # make sure it serves the CSS too
799 d.addCallback(lambda res:
800 getPage(self.introweb_url+"tahoe.css", method="GET"))
801 d.addCallback(lambda res:
802 getPage(self.introweb_url + "?t=json",
803 method="GET", followRedirect=True))
804 def _check_json(res):
805 data = simplejson.loads(res)
807 self.failUnlessEqual(data["subscription_summary"],
809 self.failUnlessEqual(data["announcement_summary"],
811 self.failUnlessEqual(data["announcement_distinct_hosts"],
813 except unittest.FailTest:
815 print "GET %s?t=json output was:" % self.introweb_url
818 d.addCallback(_check_json)
821 def _do_publish1(self, res):
822 ut = upload.Data(self.data, convergence=None)
824 d = c0.create_dirnode()
825 def _made_root(new_dirnode):
826 self._root_directory_uri = new_dirnode.get_uri()
827 return c0.create_node_from_uri(self._root_directory_uri)
828 d.addCallback(_made_root)
829 d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
830 def _made_subdir1(subdir1_node):
831 self._subdir1_node = subdir1_node
832 d1 = subdir1_node.add_file(u"mydata567", ut)
833 d1.addCallback(self.log, "publish finished")
834 def _stash_uri(filenode):
835 self.uri = filenode.get_uri()
836 assert isinstance(self.uri, str), (self.uri, filenode)
837 d1.addCallback(_stash_uri)
839 d.addCallback(_made_subdir1)
842 def _do_publish2(self, res):
843 ut = upload.Data(self.data, convergence=None)
844 d = self._subdir1_node.create_subdirectory(u"subdir2")
845 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
848 def log(self, res, *args, **kwargs):
849 # print "MSG: %s RES: %s" % (msg, args)
850 log.msg(*args, **kwargs)
853 def _do_publish_private(self, res):
854 self.smalldata = "sssh, very secret stuff"
855 ut = upload.Data(self.smalldata, convergence=None)
856 d = self.clients[0].create_dirnode()
857 d.addCallback(self.log, "GOT private directory")
858 def _got_new_dir(privnode):
859 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
860 d1 = privnode.create_subdirectory(u"personal")
861 d1.addCallback(self.log, "made P/personal")
862 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
863 d1.addCallback(self.log, "made P/personal/sekrit data")
864 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
866 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
867 s2node.get_readonly_uri())
868 d2.addCallback(lambda node:
869 privnode.set_uri(u"s2-ro",
870 s2node.get_readonly_uri(),
871 s2node.get_readonly_uri()))
873 d1.addCallback(_got_s2)
874 d1.addCallback(lambda res: privnode)
876 d.addCallback(_got_new_dir)
879 def _check_publish1(self, res):
880 # this one uses the iterative API
882 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
883 d.addCallback(self.log, "check_publish1 got /")
884 d.addCallback(lambda root: root.get(u"subdir1"))
885 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
886 d.addCallback(lambda filenode: download_to_data(filenode))
887 d.addCallback(self.log, "get finished")
889 self.failUnlessEqual(data, self.data)
890 d.addCallback(_get_done)
893 def _check_publish2(self, res):
894 # this one uses the path-based API
895 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
896 d = rootnode.get_child_at_path(u"subdir1")
897 d.addCallback(lambda dirnode:
898 self.failUnless(IDirectoryNode.providedBy(dirnode)))
899 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
900 d.addCallback(lambda filenode: download_to_data(filenode))
901 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
903 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
904 def _got_filenode(filenode):
905 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
906 assert fnode == filenode
907 d.addCallback(_got_filenode)
910 def _check_publish_private(self, resnode):
911 # this one uses the path-based API
912 self._private_node = resnode
914 d = self._private_node.get_child_at_path(u"personal")
915 def _got_personal(personal):
916 self._personal_node = personal
918 d.addCallback(_got_personal)
920 d.addCallback(lambda dirnode:
921 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
923 return self._private_node.get_child_at_path(path)
925 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
926 d.addCallback(lambda filenode: download_to_data(filenode))
927 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
928 d.addCallback(lambda res: get_path(u"s2-rw"))
929 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
930 d.addCallback(lambda res: get_path(u"s2-ro"))
931 def _got_s2ro(dirnode):
932 self.failUnless(dirnode.is_mutable(), dirnode)
933 self.failUnless(dirnode.is_readonly(), dirnode)
934 d1 = defer.succeed(None)
935 d1.addCallback(lambda res: dirnode.list())
936 d1.addCallback(self.log, "dirnode.list")
938 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
940 d1.addCallback(self.log, "doing add_file(ro)")
941 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
942 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
944 d1.addCallback(self.log, "doing get(ro)")
945 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
946 d1.addCallback(lambda filenode:
947 self.failUnless(IFileNode.providedBy(filenode)))
949 d1.addCallback(self.log, "doing delete(ro)")
950 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
952 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
954 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
956 personal = self._personal_node
957 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
959 d1.addCallback(self.log, "doing move_child_to(ro)2")
960 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
962 d1.addCallback(self.log, "finished with _got_s2ro")
964 d.addCallback(_got_s2ro)
965 def _got_home(dummy):
966 home = self._private_node
967 personal = self._personal_node
968 d1 = defer.succeed(None)
969 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
970 d1.addCallback(lambda res:
971 personal.move_child_to(u"sekrit data",home,u"sekrit"))
973 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
974 d1.addCallback(lambda res:
975 home.move_child_to(u"sekrit", home, u"sekrit data"))
977 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
978 d1.addCallback(lambda res:
979 home.move_child_to(u"sekrit data", personal))
981 d1.addCallback(lambda res: home.build_manifest().when_done())
982 d1.addCallback(self.log, "manifest")
986 # P/personal/sekrit data
987 # P/s2-rw (same as P/s2-ro)
988 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
989 d1.addCallback(lambda res:
990 self.failUnlessEqual(len(res["manifest"]), 5))
991 d1.addCallback(lambda res: home.start_deep_stats().when_done())
992 def _check_stats(stats):
993 expected = {"count-immutable-files": 1,
994 "count-mutable-files": 0,
995 "count-literal-files": 1,
997 "count-directories": 3,
998 "size-immutable-files": 112,
999 "size-literal-files": 23,
1000 #"size-directories": 616, # varies
1001 #"largest-directory": 616,
1002 "largest-directory-children": 3,
1003 "largest-immutable-file": 112,
1005 for k,v in expected.iteritems():
1006 self.failUnlessEqual(stats[k], v,
1007 "stats[%s] was %s, not %s" %
1009 self.failUnless(stats["size-directories"] > 1300,
1010 stats["size-directories"])
1011 self.failUnless(stats["largest-directory"] > 800,
1012 stats["largest-directory"])
1013 self.failUnlessEqual(stats["size-files-histogram"],
1014 [ (11, 31, 1), (101, 316, 1) ])
1015 d1.addCallback(_check_stats)
1017 d.addCallback(_got_home)
1020 def shouldFail(self, res, expected_failure, which, substring=None):
1021 if isinstance(res, Failure):
1022 res.trap(expected_failure)
1024 self.failUnless(substring in str(res),
1025 "substring '%s' not in '%s'"
1026 % (substring, str(res)))
1028 self.fail("%s was supposed to raise %s, not get '%s'" %
1029 (which, expected_failure, res))
1031 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1032 assert substring is None or isinstance(substring, str)
1033 d = defer.maybeDeferred(callable, *args, **kwargs)
1035 if isinstance(res, Failure):
1036 res.trap(expected_failure)
1038 self.failUnless(substring in str(res),
1039 "substring '%s' not in '%s'"
1040 % (substring, str(res)))
1042 self.fail("%s was supposed to raise %s, not get '%s'" %
1043 (which, expected_failure, res))
1047 def PUT(self, urlpath, data):
1048 url = self.webish_url + urlpath
1049 return getPage(url, method="PUT", postdata=data)
1051 def GET(self, urlpath, followRedirect=False):
1052 url = self.webish_url + urlpath
1053 return getPage(url, method="GET", followRedirect=followRedirect)
1055 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1056 sepbase = "boogabooga"
1057 sep = "--" + sepbase
1060 form.append('Content-Disposition: form-data; name="_charset"')
1062 form.append('UTF-8')
1064 for name, value in fields.iteritems():
1065 if isinstance(value, tuple):
1066 filename, value = value
1067 form.append('Content-Disposition: form-data; name="%s"; '
1068 'filename="%s"' % (name, filename.encode("utf-8")))
1070 form.append('Content-Disposition: form-data; name="%s"' % name)
1072 form.append(str(value))
1078 body = "\r\n".join(form) + "\r\n"
1079 headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1080 return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1082 def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1085 url = self.helper_webish_url + urlpath
1087 url = self.webish_url + urlpath
1088 return getPage(url, method="POST", postdata=body, headers=headers,
1089 followRedirect=followRedirect)
1091 def _test_web(self, res):
1092 base = self.webish_url
1093 public = "uri/" + self._root_directory_uri
1095 def _got_welcome(page):
1096 # XXX This test is oversensitive to formatting
1097 expected = "Connected to <span>%d</span>\n of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1098 self.failUnless(expected in page,
1099 "I didn't see the right 'connected storage servers'"
1100 " message in: %s" % page
1102 expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1103 self.failUnless(expected in page,
1104 "I didn't see the right 'My nodeid' message "
1106 self.failUnless("Helper: 0 active uploads" in page)
1107 d.addCallback(_got_welcome)
1108 d.addCallback(self.log, "done with _got_welcome")
1110 # get the welcome page from the node that uses the helper too
1111 d.addCallback(lambda res: getPage(self.helper_webish_url))
1112 def _got_welcome_helper(page):
1113 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1115 self.failUnless("Not running helper" in page)
1116 d.addCallback(_got_welcome_helper)
1118 d.addCallback(lambda res: getPage(base + public))
1119 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1120 def _got_subdir1(page):
1121 # there ought to be an href for our file
1122 self.failUnlessIn('<td align="right">%d</td>' % len(self.data), page)
1123 self.failUnless(">mydata567</a>" in page)
1124 d.addCallback(_got_subdir1)
1125 d.addCallback(self.log, "done with _got_subdir1")
1126 d.addCallback(lambda res:
1127 getPage(base + public + "/subdir1/mydata567"))
1128 def _got_data(page):
1129 self.failUnlessEqual(page, self.data)
1130 d.addCallback(_got_data)
1132 # download from a URI embedded in a URL
1133 d.addCallback(self.log, "_get_from_uri")
1134 def _get_from_uri(res):
1135 return getPage(base + "uri/%s?filename=%s"
1136 % (self.uri, "mydata567"))
1137 d.addCallback(_get_from_uri)
1138 def _got_from_uri(page):
1139 self.failUnlessEqual(page, self.data)
1140 d.addCallback(_got_from_uri)
1142 # download from a URI embedded in a URL, second form
1143 d.addCallback(self.log, "_get_from_uri2")
1144 def _get_from_uri2(res):
1145 return getPage(base + "uri?uri=%s" % (self.uri,))
1146 d.addCallback(_get_from_uri2)
1147 d.addCallback(_got_from_uri)
1149 # download from a bogus URI, make sure we get a reasonable error
1150 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1151 def _get_from_bogus_uri(res):
1152 d1 = getPage(base + "uri/%s?filename=%s"
1153 % (self.mangle_uri(self.uri), "mydata567"))
1154 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1157 d.addCallback(_get_from_bogus_uri)
1158 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1160 # upload a file with PUT
1161 d.addCallback(self.log, "about to try PUT")
1162 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1163 "new.txt contents"))
1164 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1165 d.addCallback(self.failUnlessEqual, "new.txt contents")
1166 # and again with something large enough to use multiple segments,
1167 # and hopefully trigger pauseProducing too
1168 def _new_happy_semantics(ign):
1169 for c in self.clients:
1170 # these get reset somewhere? Whatever.
1171 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1172 d.addCallback(_new_happy_semantics)
1173 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1174 "big" * 500000)) # 1.5MB
1175 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1176 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1178 # can we replace files in place?
1179 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1181 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1182 d.addCallback(self.failUnlessEqual, "NEWER contents")
1184 # test unlinked POST
1185 d.addCallback(lambda res: self.POST("uri", t="upload",
1186 file=("new.txt", "data" * 10000)))
1187 # and again using the helper, which exercises different upload-status
1189 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1190 file=("foo.txt", "data2" * 10000)))
1192 # check that the status page exists
1193 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1194 def _got_status(res):
1195 # find an interesting upload and download to look at. LIT files
1196 # are not interesting.
1197 h = self.clients[0].get_history()
1198 for ds in h.list_all_download_statuses():
1199 if ds.get_size() > 200:
1200 self._down_status = ds.get_counter()
1201 for us in h.list_all_upload_statuses():
1202 if us.get_size() > 200:
1203 self._up_status = us.get_counter()
1204 rs = list(h.list_all_retrieve_statuses())[0]
1205 self._retrieve_status = rs.get_counter()
1206 ps = list(h.list_all_publish_statuses())[0]
1207 self._publish_status = ps.get_counter()
1208 us = list(h.list_all_mapupdate_statuses())[0]
1209 self._update_status = us.get_counter()
1211 # and that there are some upload- and download- status pages
1212 return self.GET("status/up-%d" % self._up_status)
1213 d.addCallback(_got_status)
1215 return self.GET("status/down-%d" % self._down_status)
1216 d.addCallback(_got_up)
1218 return self.GET("status/mapupdate-%d" % self._update_status)
1219 d.addCallback(_got_down)
1220 def _got_update(res):
1221 return self.GET("status/publish-%d" % self._publish_status)
1222 d.addCallback(_got_update)
1223 def _got_publish(res):
1224 self.failUnlessIn("Publish Results", res)
1225 return self.GET("status/retrieve-%d" % self._retrieve_status)
1226 d.addCallback(_got_publish)
1227 def _got_retrieve(res):
1228 self.failUnlessIn("Retrieve Results", res)
1229 d.addCallback(_got_retrieve)
1231 # check that the helper status page exists
1232 d.addCallback(lambda res:
1233 self.GET("helper_status", followRedirect=True))
1234 def _got_helper_status(res):
1235 self.failUnless("Bytes Fetched:" in res)
1236 # touch a couple of files in the helper's working directory to
1237 # exercise more code paths
1238 workdir = os.path.join(self.getdir("client0"), "helper")
1239 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1240 f = open(incfile, "wb")
1241 f.write("small file")
1243 then = time.time() - 86400*3
1245 os.utime(incfile, (now, then))
1246 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1247 f = open(encfile, "wb")
1248 f.write("less small file")
1250 os.utime(encfile, (now, then))
1251 d.addCallback(_got_helper_status)
1252 # and that the json form exists
1253 d.addCallback(lambda res:
1254 self.GET("helper_status?t=json", followRedirect=True))
1255 def _got_helper_status_json(res):
1256 data = simplejson.loads(res)
1257 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1259 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1260 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1261 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1263 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1264 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1265 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1267 d.addCallback(_got_helper_status_json)
1269 # and check that client[3] (which uses a helper but does not run one
1270 # itself) doesn't explode when you ask for its status
1271 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1272 def _got_non_helper_status(res):
1273 self.failUnless("Upload and Download Status" in res)
1274 d.addCallback(_got_non_helper_status)
1276 # or for helper status with t=json
1277 d.addCallback(lambda res:
1278 getPage(self.helper_webish_url + "helper_status?t=json"))
1279 def _got_non_helper_status_json(res):
1280 data = simplejson.loads(res)
1281 self.failUnlessEqual(data, {})
1282 d.addCallback(_got_non_helper_status_json)
1284 # see if the statistics page exists
1285 d.addCallback(lambda res: self.GET("statistics"))
1286 def _got_stats(res):
1287 self.failUnless("Node Statistics" in res)
1288 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1289 d.addCallback(_got_stats)
1290 d.addCallback(lambda res: self.GET("statistics?t=json"))
1291 def _got_stats_json(res):
1292 data = simplejson.loads(res)
1293 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1294 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1295 d.addCallback(_got_stats_json)
1297 # TODO: mangle the second segment of a file, to test errors that
1298 # occur after we've already sent some good data, which uses a
1299 # different error path.
1301 # TODO: download a URI with a form
1302 # TODO: create a directory by using a form
1303 # TODO: upload by using a form on the directory page
1304 # url = base + "somedir/subdir1/freeform_post!!upload"
1305 # TODO: delete a file by using a button on the directory page
1309 def _test_runner(self, res):
1310 # exercise some of the diagnostic tools in runner.py
1313 for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1314 if "storage" not in dirpath:
1318 pieces = dirpath.split(os.sep)
1319 if (len(pieces) >= 4
1320 and pieces[-4] == "storage"
1321 and pieces[-3] == "shares"):
1322 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1323 # are sharefiles here
1324 filename = os.path.join(dirpath, filenames[0])
1325 # peek at the magic to see if it is a chk share
1326 magic = open(filename, "rb").read(4)
1327 if magic == '\x00\x00\x00\x01':
1330 self.fail("unable to find any uri_extension files in %r"
1332 log.msg("test_system.SystemTest._test_runner using %r" % filename)
1334 out,err = StringIO(), StringIO()
1335 rc = runner.runner(["debug", "dump-share", "--offsets",
1336 unicode_to_argv(filename)],
1337 stdout=out, stderr=err)
1338 output = out.getvalue()
1339 self.failUnlessEqual(rc, 0)
1341 # we only upload a single file, so we can assert some things about
1342 # its size and shares.
1343 self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1344 self.failUnlessIn("size: %d\n" % len(self.data), output)
1345 self.failUnlessIn("num_segments: 1\n", output)
1346 # segment_size is always a multiple of needed_shares
1347 self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1348 self.failUnlessIn("total_shares: 10\n", output)
1349 # keys which are supposed to be present
1350 for key in ("size", "num_segments", "segment_size",
1351 "needed_shares", "total_shares",
1352 "codec_name", "codec_params", "tail_codec_params",
1353 #"plaintext_hash", "plaintext_root_hash",
1354 "crypttext_hash", "crypttext_root_hash",
1355 "share_root_hash", "UEB_hash"):
1356 self.failUnlessIn("%s: " % key, output)
1357 self.failUnlessIn(" verify-cap: URI:CHK-Verifier:", output)
1359 # now use its storage index to find the other shares using the
1360 # 'find-shares' tool
1361 sharedir, shnum = os.path.split(filename)
1362 storagedir, storage_index_s = os.path.split(sharedir)
1363 storage_index_s = str(storage_index_s)
1364 out,err = StringIO(), StringIO()
1365 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1366 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1367 rc = runner.runner(cmd, stdout=out, stderr=err)
1368 self.failUnlessEqual(rc, 0)
1370 sharefiles = [sfn.strip() for sfn in out.readlines()]
1371 self.failUnlessEqual(len(sharefiles), 10)
1373 # also exercise the 'catalog-shares' tool
1374 out,err = StringIO(), StringIO()
1375 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1376 cmd = ["debug", "catalog-shares"] + nodedirs
1377 rc = runner.runner(cmd, stdout=out, stderr=err)
1378 self.failUnlessEqual(rc, 0)
1380 descriptions = [sfn.strip() for sfn in out.readlines()]
1381 self.failUnlessEqual(len(descriptions), 30)
1383 for line in descriptions
1384 if line.startswith("CHK %s " % storage_index_s)]
1385 self.failUnlessEqual(len(matching), 10)
1387 def _test_control(self, res):
1388 # exercise the remote-control-the-client foolscap interfaces in
1389 # allmydata.control (mostly used for performance tests)
1390 c0 = self.clients[0]
1391 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1392 control_furl = open(control_furl_file, "r").read().strip()
1393 # it doesn't really matter which Tub we use to connect to the client,
1394 # so let's just use our IntroducerNode's
1395 d = self.introducer.tub.getReference(control_furl)
1396 d.addCallback(self._test_control2, control_furl_file)
1398 def _test_control2(self, rref, filename):
1399 d = rref.callRemote("upload_from_file_to_uri",
1400 filename.encode(get_filesystem_encoding()), convergence=None)
1401 downfile = os.path.join(self.basedir, "control.downfile").encode(get_filesystem_encoding())
1402 d.addCallback(lambda uri:
1403 rref.callRemote("download_from_uri_to_file",
1406 self.failUnlessEqual(res, downfile)
1407 data = open(downfile, "r").read()
1408 expected_data = open(filename, "r").read()
1409 self.failUnlessEqual(data, expected_data)
1410 d.addCallback(_check)
1411 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1412 if sys.platform in ("linux2", "linux3"):
1413 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1414 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1417 def _test_cli(self, res):
1418 # run various CLI commands (in a thread, since they use blocking
1421 private_uri = self._private_node.get_uri()
1422 client0_basedir = self.getdir("client0")
1425 "--node-directory", client0_basedir,
1428 d = defer.succeed(None)
1430 # for compatibility with earlier versions, private/root_dir.cap is
1431 # supposed to be treated as an alias named "tahoe:". Start by making
1432 # sure that works, before we add other aliases.
1434 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1435 f = open(root_file, "w")
1436 f.write(private_uri)
1439 def run(ignored, verb, *args, **kwargs):
1440 stdin = kwargs.get("stdin", "")
1441 newargs = [verb] + nodeargs + list(args)
1442 return self._run_cli(newargs, stdin=stdin)
1444 def _check_ls((out,err), expected_children, unexpected_children=[]):
1445 self.failUnlessEqual(err, "")
1446 for s in expected_children:
1447 self.failUnless(s in out, (s,out))
1448 for s in unexpected_children:
1449 self.failIf(s in out, (s,out))
1451 def _check_ls_root((out,err)):
1452 self.failUnless("personal" in out)
1453 self.failUnless("s2-ro" in out)
1454 self.failUnless("s2-rw" in out)
1455 self.failUnlessEqual(err, "")
1457 # this should reference private_uri
1458 d.addCallback(run, "ls")
1459 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1461 d.addCallback(run, "list-aliases")
1462 def _check_aliases_1((out,err)):
1463 self.failUnlessEqual(err, "")
1464 self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1465 d.addCallback(_check_aliases_1)
1467 # now that that's out of the way, remove root_dir.cap and work with
1469 d.addCallback(lambda res: os.unlink(root_file))
1470 d.addCallback(run, "list-aliases")
1471 def _check_aliases_2((out,err)):
1472 self.failUnlessEqual(err, "")
1473 self.failUnlessEqual(out, "")
1474 d.addCallback(_check_aliases_2)
1476 d.addCallback(run, "mkdir")
1477 def _got_dir( (out,err) ):
1478 self.failUnless(uri.from_string_dirnode(out.strip()))
1480 d.addCallback(_got_dir)
1481 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1483 d.addCallback(run, "list-aliases")
1484 def _check_aliases_3((out,err)):
1485 self.failUnlessEqual(err, "")
1486 self.failUnless("tahoe: " in out)
1487 d.addCallback(_check_aliases_3)
1489 def _check_empty_dir((out,err)):
1490 self.failUnlessEqual(out, "")
1491 self.failUnlessEqual(err, "")
1492 d.addCallback(run, "ls")
1493 d.addCallback(_check_empty_dir)
1495 def _check_missing_dir((out,err)):
1496 # TODO: check that rc==2
1497 self.failUnlessEqual(out, "")
1498 self.failUnlessEqual(err, "No such file or directory\n")
1499 d.addCallback(run, "ls", "bogus")
1500 d.addCallback(_check_missing_dir)
1505 fn = os.path.join(self.basedir, "file%d" % i)
1507 data = "data to be uploaded: file%d\n" % i
1509 open(fn,"wb").write(data)
1511 def _check_stdout_against((out,err), filenum=None, data=None):
1512 self.failUnlessEqual(err, "")
1513 if filenum is not None:
1514 self.failUnlessEqual(out, datas[filenum])
1515 if data is not None:
1516 self.failUnlessEqual(out, data)
1518 # test all both forms of put: from a file, and from stdin
1520 d.addCallback(run, "put", files[0], "tahoe-file0")
1521 def _put_out((out,err)):
1522 self.failUnless("URI:LIT:" in out, out)
1523 self.failUnless("201 Created" in err, err)
1525 return run(None, "get", uri0)
1526 d.addCallback(_put_out)
1527 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1529 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1530 # tahoe put bar tahoe:FOO
1531 d.addCallback(run, "put", files[2], "tahoe:file2")
1532 d.addCallback(run, "put", "--format=SDMF", files[3], "tahoe:file3")
1533 def _check_put_mutable((out,err)):
1534 self._mutable_file3_uri = out.strip()
1535 d.addCallback(_check_put_mutable)
1536 d.addCallback(run, "get", "tahoe:file3")
1537 d.addCallback(_check_stdout_against, 3)
1540 STDIN_DATA = "This is the file to upload from stdin."
1541 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1542 # tahoe put tahoe:FOO
1543 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1544 stdin="Other file from stdin.")
1546 d.addCallback(run, "ls")
1547 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1548 "tahoe-file-stdin", "from-stdin"])
1549 d.addCallback(run, "ls", "subdir")
1550 d.addCallback(_check_ls, ["tahoe-file1"])
1553 d.addCallback(run, "mkdir", "subdir2")
1554 d.addCallback(run, "ls")
1555 # TODO: extract the URI, set an alias with it
1556 d.addCallback(_check_ls, ["subdir2"])
1558 # tahoe get: (to stdin and to a file)
1559 d.addCallback(run, "get", "tahoe-file0")
1560 d.addCallback(_check_stdout_against, 0)
1561 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1562 d.addCallback(_check_stdout_against, 1)
1563 outfile0 = os.path.join(self.basedir, "outfile0")
1564 d.addCallback(run, "get", "file2", outfile0)
1565 def _check_outfile0((out,err)):
1566 data = open(outfile0,"rb").read()
1567 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1568 d.addCallback(_check_outfile0)
1569 outfile1 = os.path.join(self.basedir, "outfile0")
1570 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1571 def _check_outfile1((out,err)):
1572 data = open(outfile1,"rb").read()
1573 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1574 d.addCallback(_check_outfile1)
1576 d.addCallback(run, "rm", "tahoe-file0")
1577 d.addCallback(run, "rm", "tahoe:file2")
1578 d.addCallback(run, "ls")
1579 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1581 d.addCallback(run, "ls", "-l")
1582 def _check_ls_l((out,err)):
1583 lines = out.split("\n")
1585 if "tahoe-file-stdin" in l:
1586 self.failUnless(l.startswith("-r-- "), l)
1587 self.failUnless(" %d " % len(STDIN_DATA) in l)
1589 self.failUnless(l.startswith("-rw- "), l) # mutable
1590 d.addCallback(_check_ls_l)
1592 d.addCallback(run, "ls", "--uri")
1593 def _check_ls_uri((out,err)):
1594 lines = out.split("\n")
1597 self.failUnless(self._mutable_file3_uri in l)
1598 d.addCallback(_check_ls_uri)
1600 d.addCallback(run, "ls", "--readonly-uri")
1601 def _check_ls_rouri((out,err)):
1602 lines = out.split("\n")
1605 rw_uri = self._mutable_file3_uri
1606 u = uri.from_string_mutable_filenode(rw_uri)
1607 ro_uri = u.get_readonly().to_string()
1608 self.failUnless(ro_uri in l)
1609 d.addCallback(_check_ls_rouri)
1612 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1613 d.addCallback(run, "ls")
1614 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1616 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1617 d.addCallback(run, "ls")
1618 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1620 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1621 d.addCallback(run, "ls")
1622 d.addCallback(_check_ls, ["file3", "file3-copy"])
1623 d.addCallback(run, "get", "tahoe:file3-copy")
1624 d.addCallback(_check_stdout_against, 3)
1626 # copy from disk into tahoe
1627 d.addCallback(run, "cp", files[4], "tahoe:file4")
1628 d.addCallback(run, "ls")
1629 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1630 d.addCallback(run, "get", "tahoe:file4")
1631 d.addCallback(_check_stdout_against, 4)
1633 # copy from tahoe into disk
1634 target_filename = os.path.join(self.basedir, "file-out")
1635 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1636 def _check_cp_out((out,err)):
1637 self.failUnless(os.path.exists(target_filename))
1638 got = open(target_filename,"rb").read()
1639 self.failUnlessEqual(got, datas[4])
1640 d.addCallback(_check_cp_out)
1642 # copy from disk to disk (silly case)
1643 target2_filename = os.path.join(self.basedir, "file-out-copy")
1644 d.addCallback(run, "cp", target_filename, target2_filename)
1645 def _check_cp_out2((out,err)):
1646 self.failUnless(os.path.exists(target2_filename))
1647 got = open(target2_filename,"rb").read()
1648 self.failUnlessEqual(got, datas[4])
1649 d.addCallback(_check_cp_out2)
1651 # copy from tahoe into disk, overwriting an existing file
1652 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1653 def _check_cp_out3((out,err)):
1654 self.failUnless(os.path.exists(target_filename))
1655 got = open(target_filename,"rb").read()
1656 self.failUnlessEqual(got, datas[3])
1657 d.addCallback(_check_cp_out3)
1659 # copy from disk into tahoe, overwriting an existing immutable file
1660 d.addCallback(run, "cp", files[5], "tahoe:file4")
1661 d.addCallback(run, "ls")
1662 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1663 d.addCallback(run, "get", "tahoe:file4")
1664 d.addCallback(_check_stdout_against, 5)
1666 # copy from disk into tahoe, overwriting an existing mutable file
1667 d.addCallback(run, "cp", files[5], "tahoe:file3")
1668 d.addCallback(run, "ls")
1669 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1670 d.addCallback(run, "get", "tahoe:file3")
1671 d.addCallback(_check_stdout_against, 5)
1673 # recursive copy: setup
1674 dn = os.path.join(self.basedir, "dir1")
1676 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1677 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1678 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1679 sdn2 = os.path.join(dn, "subdir2")
1681 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1682 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1684 # from disk into tahoe
1685 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1686 d.addCallback(run, "ls")
1687 d.addCallback(_check_ls, ["dir1"])
1688 d.addCallback(run, "ls", "dir1")
1689 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1690 ["rfile4", "rfile5"])
1691 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1692 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1693 ["rfile1", "rfile2", "rfile3"])
1694 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1695 d.addCallback(_check_stdout_against, data="rfile4")
1697 # and back out again
1698 dn_copy = os.path.join(self.basedir, "dir1-copy")
1699 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1700 def _check_cp_r_out((out,err)):
1702 old = open(os.path.join(dn, name), "rb").read()
1703 newfn = os.path.join(dn_copy, name)
1704 self.failUnless(os.path.exists(newfn))
1705 new = open(newfn, "rb").read()
1706 self.failUnlessEqual(old, new)
1710 _cmp(os.path.join("subdir2", "rfile4"))
1711 _cmp(os.path.join("subdir2", "rfile5"))
1712 d.addCallback(_check_cp_r_out)
1714 # and copy it a second time, which ought to overwrite the same files
1715 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1717 # and again, only writing filecaps
1718 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1719 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1720 def _check_capsonly((out,err)):
1721 # these should all be LITs
1722 x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1723 y = uri.from_string_filenode(x)
1724 self.failUnlessEqual(y.data, "rfile4")
1725 d.addCallback(_check_capsonly)
1727 # and tahoe-to-tahoe
1728 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1729 d.addCallback(run, "ls")
1730 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1731 d.addCallback(run, "ls", "dir1-copy")
1732 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1733 ["rfile4", "rfile5"])
1734 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1735 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1736 ["rfile1", "rfile2", "rfile3"])
1737 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1738 d.addCallback(_check_stdout_against, data="rfile4")
1740 # and copy it a second time, which ought to overwrite the same files
1741 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1743 # tahoe_ls doesn't currently handle the error correctly: it tries to
1744 # JSON-parse a traceback.
1745 ## def _ls_missing(res):
1746 ## argv = ["ls"] + nodeargs + ["bogus"]
1747 ## return self._run_cli(argv)
1748 ## d.addCallback(_ls_missing)
1749 ## def _check_ls_missing((out,err)):
1752 ## self.failUnlessEqual(err, "")
1753 ## d.addCallback(_check_ls_missing)
1757 def test_filesystem_with_cli_in_subprocess(self):
1758 # We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.
1760 self.basedir = "system/SystemTest/test_filesystem_with_cli_in_subprocess"
1761 d = self.set_up_nodes()
1762 def _new_happy_semantics(ign):
1763 for c in self.clients:
1764 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1765 d.addCallback(_new_happy_semantics)
1767 def _run_in_subprocess(ignored, verb, *args, **kwargs):
1768 stdin = kwargs.get("stdin")
1769 env = kwargs.get("env")
1770 newargs = [verb, "--node-directory", self.getdir("client0")] + list(args)
1771 return self.run_bintahoe(newargs, stdin=stdin, env=env)
1773 def _check_succeeded(res, check_stderr=True):
1774 out, err, rc_or_sig = res
1775 self.failUnlessEqual(rc_or_sig, 0, str(res))
1777 self.failUnlessEqual(err, "")
1779 d.addCallback(_run_in_subprocess, "create-alias", "newalias")
1780 d.addCallback(_check_succeeded)
1782 STDIN_DATA = "This is the file to upload from stdin."
1783 d.addCallback(_run_in_subprocess, "put", "-", "newalias:tahoe-file", stdin=STDIN_DATA)
1784 d.addCallback(_check_succeeded, check_stderr=False)
1786 def _mv_with_http_proxy(ign):
1788 env['http_proxy'] = env['HTTP_PROXY'] = "http://127.0.0.0:12345" # invalid address
1789 return _run_in_subprocess(None, "mv", "newalias:tahoe-file", "newalias:tahoe-moved", env=env)
1790 d.addCallback(_mv_with_http_proxy)
1791 d.addCallback(_check_succeeded)
1793 d.addCallback(_run_in_subprocess, "ls", "newalias:")
1795 out, err, rc_or_sig = res
1796 self.failUnlessEqual(rc_or_sig, 0, str(res))
1797 self.failUnlessEqual(err, "", str(res))
1798 self.failUnlessIn("tahoe-moved", out)
1799 self.failIfIn("tahoe-file", out)
1800 d.addCallback(_check_ls)
1803 def test_debug_trial(self):
1804 def _check_for_line(lines, result, test):
1806 if result in l and test in l:
1808 self.fail("output (prefixed with '##') does not have a line containing both %r and %r:\n## %s"
1809 % (result, test, "\n## ".join(lines)))
1811 def _check_for_outcome(lines, out, outcome):
1812 self.failUnlessIn(outcome, out, "output (prefixed with '##') does not contain %r:\n## %s"
1813 % (outcome, "\n## ".join(lines)))
1815 d = self.run_bintahoe(['debug', 'trial', '--reporter=verbose',
1816 'allmydata.test.trialtest'])
1817 def _check_failure( (out, err, rc) ):
1818 self.failUnlessEqual(rc, 1)
1819 lines = out.split('\n')
1820 _check_for_line(lines, "[SKIPPED]", "test_skip")
1821 _check_for_line(lines, "[TODO]", "test_todo")
1822 _check_for_line(lines, "[FAIL]", "test_fail")
1823 _check_for_line(lines, "[ERROR]", "test_deferred_error")
1824 _check_for_line(lines, "[ERROR]", "test_error")
1825 _check_for_outcome(lines, out, "FAILED")
1826 d.addCallback(_check_failure)
1828 # the --quiet argument regression-tests a problem in finding which arguments to pass to trial
1829 d.addCallback(lambda ign: self.run_bintahoe(['--quiet', 'debug', 'trial', '--reporter=verbose',
1830 'allmydata.test.trialtest.Success']))
1831 def _check_success( (out, err, rc) ):
1832 self.failUnlessEqual(rc, 0)
1833 lines = out.split('\n')
1834 _check_for_line(lines, "[SKIPPED]", "test_skip")
1835 _check_for_line(lines, "[TODO]", "test_todo")
1836 _check_for_outcome(lines, out, "PASSED")
1837 d.addCallback(_check_success)
1840 def _run_cli(self, argv, stdin=""):
1842 stdout, stderr = StringIO(), StringIO()
1843 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1844 stdin=StringIO(stdin),
1845 stdout=stdout, stderr=stderr)
1847 return stdout.getvalue(), stderr.getvalue()
1848 d.addCallback(_done)
1851 def _test_checker(self, res):
1852 ut = upload.Data("too big to be literal" * 200, convergence=None)
1853 d = self._personal_node.add_file(u"big file", ut)
1855 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1856 def _check_dirnode_results(r):
1857 self.failUnless(r.is_healthy())
1858 d.addCallback(_check_dirnode_results)
1859 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1860 d.addCallback(_check_dirnode_results)
1862 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1863 def _got_chk_filenode(n):
1864 self.failUnless(isinstance(n, ImmutableFileNode))
1865 d = n.check(Monitor())
1866 def _check_filenode_results(r):
1867 self.failUnless(r.is_healthy())
1868 d.addCallback(_check_filenode_results)
1869 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1870 d.addCallback(_check_filenode_results)
1872 d.addCallback(_got_chk_filenode)
1874 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1875 def _got_lit_filenode(n):
1876 self.failUnless(isinstance(n, LiteralFileNode))
1877 d = n.check(Monitor())
1878 def _check_lit_filenode_results(r):
1879 self.failUnlessEqual(r, None)
1880 d.addCallback(_check_lit_filenode_results)
1881 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1882 d.addCallback(_check_lit_filenode_results)
1884 d.addCallback(_got_lit_filenode)
1887 class Connections(SystemTestMixin, unittest.TestCase):
1888 def test_rref(self):
1889 self.basedir = "system/Connections/rref"
1890 d = self.set_up_nodes(2)
1892 self.c0 = self.clients[0]
1893 for s in self.c0.storage_broker.get_connected_servers():
1894 if "pub-"+s.get_longname() != self.c0.node_key_s:
1896 self.s1 = s # s1 is the server, not c0
1897 self.s1_rref = s.get_rref()
1898 self.failIfEqual(self.s1_rref, None)
1899 self.failUnless(self.s1.is_connected())
1900 d.addCallback(_start)
1902 # now shut down the server
1903 d.addCallback(lambda ign: self.clients[1].disownServiceParent())
1904 # and wait for the client to notice
1906 return len(self.c0.storage_broker.get_connected_servers()) < 2
1907 d.addCallback(lambda ign: self.poll(_poll))
1910 self.failIf(self.s1.is_connected())
1911 rref = self.s1.get_rref()
1912 self.failUnless(rref)
1913 self.failUnlessIdentical(rref, self.s1_rref)
1914 d.addCallback(_down)