2 import os, re, sys, time, simplejson
3 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
10 from allmydata import uri
11 from allmydata.storage.mutable import MutableShareFile
12 from allmydata.storage.server import si_a2b
13 from allmydata.immutable import offloaded, upload
14 from allmydata.immutable.literal import LiteralFileNode
15 from allmydata.immutable.filenode import ImmutableFileNode
16 from allmydata.util import idlib, mathutil
17 from allmydata.util import log, base32
18 from allmydata.util.verlib import NormalizedVersion
19 from allmydata.util.encodingutil import quote_output, unicode_to_argv
20 from allmydata.util.fileutil import abspath_expanduser_unicode
21 from allmydata.util.consumer import MemoryConsumer, download_to_data
22 from allmydata.scripts import runner
23 from allmydata.interfaces import IDirectoryNode, IFileNode, \
24 NoSuchChildError, NoSharesError
25 from allmydata.monitor import Monitor
26 from allmydata.mutable.common import NotWriteableError
27 from allmydata.mutable import layout as mutable_layout
28 from allmydata.mutable.publish import MutableData
31 from foolscap.api import DeadReferenceError, fireEventually
32 from twisted.python.failure import Failure
33 from twisted.web.client import getPage
34 from twisted.web.error import Error
36 from allmydata.test.common import SystemTestMixin
38 # TODO: move this to common or common_util
39 from allmydata.test.test_runner import RunBinTahoeMixin
42 This is some data to publish to the remote grid.., which needs to be large
43 enough to not fit inside a LIT uri.
46 class CountingDataUploadable(upload.Data):
48 interrupt_after = None
49 interrupt_after_d = None
51 def read(self, length):
52 self.bytes_read += length
53 if self.interrupt_after is not None:
54 if self.bytes_read > self.interrupt_after:
55 self.interrupt_after = None
56 self.interrupt_after_d.callback(self)
57 return upload.Data.read(self, length)
59 class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
60 timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
62 def test_connections(self):
63 self.basedir = "system/SystemTest/test_connections"
64 d = self.set_up_nodes()
65 self.extra_node = None
66 d.addCallback(lambda res: self.add_extra_node(self.numclients))
67 def _check(extra_node):
68 self.extra_node = extra_node
69 for c in self.clients:
70 all_peerids = c.get_storage_broker().get_all_serverids()
71 self.failUnlessEqual(len(all_peerids), self.numclients+1)
73 permuted_peers = sb.get_servers_for_psi("a")
74 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
77 def _shutdown_extra_node(res):
79 return self.extra_node.stopService()
81 d.addBoth(_shutdown_extra_node)
83 # test_connections is subsumed by test_upload_and_download, and takes
84 # quite a while to run on a slow machine (because of all the TLS
85 # connections that must be established). If we ever rework the introducer
86 # code to such an extent that we're not sure if it works anymore, we can
87 # reinstate this test until it does.
90 def test_upload_and_download_random_key(self):
91 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
92 return self._test_upload_and_download(convergence=None)
94 def test_upload_and_download_convergent(self):
95 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
96 return self._test_upload_and_download(convergence="some convergence string")
98 def _test_upload_and_download(self, convergence):
99 # we use 4000 bytes of data, which will result in about 400k written
100 # to disk among all our simulated nodes
101 DATA = "Some data to upload\n" * 200
102 d = self.set_up_nodes()
103 def _check_connections(res):
104 for c in self.clients:
105 c.encoding_params['happy'] = 5
106 all_peerids = c.get_storage_broker().get_all_serverids()
107 self.failUnlessEqual(len(all_peerids), self.numclients)
108 sb = c.storage_broker
109 permuted_peers = sb.get_servers_for_psi("a")
110 self.failUnlessEqual(len(permuted_peers), self.numclients)
111 d.addCallback(_check_connections)
115 u = self.clients[0].getServiceNamed("uploader")
117 # we crank the max segsize down to 1024b for the duration of this
118 # test, so we can exercise multiple segments. It is important
119 # that this is not a multiple of the segment size, so that the
120 # tail segment is not the same length as the others. This actualy
121 # gets rounded up to 1025 to be a multiple of the number of
122 # required shares (since we use 25 out of 100 FEC).
123 up = upload.Data(DATA, convergence=convergence)
124 up.max_segment_size = 1024
127 d.addCallback(_do_upload)
128 def _upload_done(results):
129 theuri = results.get_uri()
130 log.msg("upload finished: uri is %s" % (theuri,))
132 assert isinstance(self.uri, str), self.uri
133 self.cap = uri.from_string(self.uri)
134 self.n = self.clients[1].create_node_from_uri(self.uri)
135 d.addCallback(_upload_done)
137 def _upload_again(res):
138 # Upload again. If using convergent encryption then this ought to be
139 # short-circuited, however with the way we currently generate URIs
140 # (i.e. because they include the roothash), we have to do all of the
141 # encoding work, and only get to save on the upload part.
142 log.msg("UPLOADING AGAIN")
143 up = upload.Data(DATA, convergence=convergence)
144 up.max_segment_size = 1024
145 return self.uploader.upload(up)
146 d.addCallback(_upload_again)
148 def _download_to_data(res):
149 log.msg("DOWNLOADING")
150 return download_to_data(self.n)
151 d.addCallback(_download_to_data)
152 def _download_to_data_done(data):
153 log.msg("download finished")
154 self.failUnlessEqual(data, DATA)
155 d.addCallback(_download_to_data_done)
158 n = self.clients[1].create_node_from_uri(self.uri)
159 d = download_to_data(n)
160 def _read_done(data):
161 self.failUnlessEqual(data, DATA)
162 d.addCallback(_read_done)
163 d.addCallback(lambda ign:
164 n.read(MemoryConsumer(), offset=1, size=4))
165 def _read_portion_done(mc):
166 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
167 d.addCallback(_read_portion_done)
168 d.addCallback(lambda ign:
169 n.read(MemoryConsumer(), offset=2, size=None))
170 def _read_tail_done(mc):
171 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
172 d.addCallback(_read_tail_done)
173 d.addCallback(lambda ign:
174 n.read(MemoryConsumer(), size=len(DATA)+1000))
175 def _read_too_much(mc):
176 self.failUnlessEqual("".join(mc.chunks), DATA)
177 d.addCallback(_read_too_much)
180 d.addCallback(_test_read)
182 def _test_bad_read(res):
183 bad_u = uri.from_string_filenode(self.uri)
184 bad_u.key = self.flip_bit(bad_u.key)
185 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
186 # this should cause an error during download
188 d = self.shouldFail2(NoSharesError, "'download bad node'",
190 bad_n.read, MemoryConsumer(), offset=2)
192 d.addCallback(_test_bad_read)
194 def _download_nonexistent_uri(res):
195 baduri = self.mangle_uri(self.uri)
196 badnode = self.clients[1].create_node_from_uri(baduri)
197 log.msg("about to download non-existent URI", level=log.UNUSUAL,
198 facility="tahoe.tests")
199 d1 = download_to_data(badnode)
200 def _baduri_should_fail(res):
201 log.msg("finished downloading non-existent URI",
202 level=log.UNUSUAL, facility="tahoe.tests")
203 self.failUnless(isinstance(res, Failure))
204 self.failUnless(res.check(NoSharesError),
205 "expected NoSharesError, got %s" % res)
206 d1.addBoth(_baduri_should_fail)
208 d.addCallback(_download_nonexistent_uri)
210 # add a new node, which doesn't accept shares, and only uses the
212 d.addCallback(lambda res: self.add_extra_node(self.numclients,
214 add_to_sparent=True))
215 def _added(extra_node):
216 self.extra_node = extra_node
217 self.extra_node.encoding_params['happy'] = 5
218 d.addCallback(_added)
221 uploader = self.extra_node.getServiceNamed("uploader")
222 furl, connected = uploader.get_helper_info()
224 d.addCallback(lambda ign: self.poll(_has_helper))
226 HELPER_DATA = "Data that needs help to upload" * 1000
227 def _upload_with_helper(res):
228 u = upload.Data(HELPER_DATA, convergence=convergence)
229 d = self.extra_node.upload(u)
230 def _uploaded(results):
231 n = self.clients[1].create_node_from_uri(results.get_uri())
232 return download_to_data(n)
233 d.addCallback(_uploaded)
235 self.failUnlessEqual(newdata, HELPER_DATA)
236 d.addCallback(_check)
238 d.addCallback(_upload_with_helper)
240 def _upload_duplicate_with_helper(res):
241 u = upload.Data(HELPER_DATA, convergence=convergence)
242 u.debug_stash_RemoteEncryptedUploadable = True
243 d = self.extra_node.upload(u)
244 def _uploaded(results):
245 n = self.clients[1].create_node_from_uri(results.get_uri())
246 return download_to_data(n)
247 d.addCallback(_uploaded)
249 self.failUnlessEqual(newdata, HELPER_DATA)
250 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
251 "uploadable started uploading, should have been avoided")
252 d.addCallback(_check)
254 if convergence is not None:
255 d.addCallback(_upload_duplicate_with_helper)
257 d.addCallback(fireEventually)
259 def _upload_resumable(res):
260 DATA = "Data that needs help to upload and gets interrupted" * 1000
261 u1 = CountingDataUploadable(DATA, convergence=convergence)
262 u2 = CountingDataUploadable(DATA, convergence=convergence)
264 # we interrupt the connection after about 5kB by shutting down
265 # the helper, then restarting it.
266 u1.interrupt_after = 5000
267 u1.interrupt_after_d = defer.Deferred()
268 bounced_d = defer.Deferred()
270 d = self.bounce_client(0)
271 d.addBoth(bounced_d.callback)
272 u1.interrupt_after_d.addCallback(_do_bounce)
274 # sneak into the helper and reduce its chunk size, so that our
275 # debug_interrupt will sever the connection on about the fifth
276 # chunk fetched. This makes sure that we've started to write the
277 # new shares before we abandon them, which exercises the
278 # abort/delete-partial-share code. TODO: find a cleaner way to do
279 # this. I know that this will affect later uses of the helper in
280 # this same test run, but I'm not currently worried about it.
281 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
283 upload_d = self.extra_node.upload(u1)
284 # The upload will start, and bounce_client() will be called after
285 # about 5kB. bounced_d will fire after bounce_client() finishes
286 # shutting down and restarting the node.
289 # By this point, the upload should have failed because of the
290 # interruption. upload_d will fire in a moment
291 def _should_not_finish(res):
292 self.fail("interrupted upload should have failed, not"
293 " finished with result %s" % (res,))
295 f.trap(DeadReferenceError)
296 # make sure we actually interrupted it before finishing
298 self.failUnless(u1.bytes_read < len(DATA),
299 "read %d out of %d total" %
300 (u1.bytes_read, len(DATA)))
301 upload_d.addCallbacks(_should_not_finish, _interrupted)
303 d.addCallback(_bounced)
305 def _disconnected(res):
306 # check to make sure the storage servers aren't still hanging
307 # on to the partial share: their incoming/ directories should
309 log.msg("disconnected", level=log.NOISY,
310 facility="tahoe.test.test_system")
311 for i in range(self.numclients):
312 incdir = os.path.join(self.getdir("client%d" % i),
313 "storage", "shares", "incoming")
314 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
315 d.addCallback(_disconnected)
317 d.addCallback(lambda res:
318 log.msg("wait_for_helper", level=log.NOISY,
319 facility="tahoe.test.test_system"))
320 # then we need to wait for the extra node to reestablish its
321 # connection to the helper.
322 d.addCallback(lambda ign: self.poll(_has_helper))
324 d.addCallback(lambda res:
325 log.msg("uploading again", level=log.NOISY,
326 facility="tahoe.test.test_system"))
327 d.addCallback(lambda res: self.extra_node.upload(u2))
329 def _uploaded(results):
330 cap = results.get_uri()
331 log.msg("Second upload complete", level=log.NOISY,
332 facility="tahoe.test.test_system")
334 # this is really bytes received rather than sent, but it's
335 # convenient and basically measures the same thing
336 bytes_sent = results.get_ciphertext_fetched()
337 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
339 # We currently don't support resumption of upload if the data is
340 # encrypted with a random key. (Because that would require us
341 # to store the key locally and re-use it on the next upload of
342 # this file, which isn't a bad thing to do, but we currently
344 if convergence is not None:
345 # Make sure we did not have to read the whole file the
346 # second time around .
347 self.failUnless(bytes_sent < len(DATA),
348 "resumption didn't save us any work:"
349 " read %r bytes out of %r total" %
350 (bytes_sent, len(DATA)))
352 # Make sure we did have to read the whole file the second
353 # time around -- because the one that we partially uploaded
354 # earlier was encrypted with a different random key.
355 self.failIf(bytes_sent < len(DATA),
356 "resumption saved us some work even though we were using random keys:"
357 " read %r bytes out of %r total" %
358 (bytes_sent, len(DATA)))
359 n = self.clients[1].create_node_from_uri(cap)
360 return download_to_data(n)
361 d.addCallback(_uploaded)
364 self.failUnlessEqual(newdata, DATA)
365 # If using convergent encryption, then also check that the
366 # helper has removed the temp file from its directories.
367 if convergence is not None:
368 basedir = os.path.join(self.getdir("client0"), "helper")
369 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
370 self.failUnlessEqual(files, [])
371 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
372 self.failUnlessEqual(files, [])
373 d.addCallback(_check)
375 d.addCallback(_upload_resumable)
377 def _grab_stats(ignored):
378 # the StatsProvider doesn't normally publish a FURL:
379 # instead it passes a live reference to the StatsGatherer
380 # (if and when it connects). To exercise the remote stats
381 # interface, we manually publish client0's StatsProvider
382 # and use client1 to query it.
383 sp = self.clients[0].stats_provider
384 sp_furl = self.clients[0].tub.registerReference(sp)
385 d = self.clients[1].tub.getReference(sp_furl)
386 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
387 def _got_stats(stats):
389 #from pprint import pprint
392 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
393 c = stats["counters"]
394 self.failUnless("storage_server.allocate" in c)
395 d.addCallback(_got_stats)
397 d.addCallback(_grab_stats)
401 def _find_all_shares(self, basedir):
403 for (dirpath, dirnames, filenames) in os.walk(basedir):
404 if "storage" not in dirpath:
408 pieces = dirpath.split(os.sep)
410 and pieces[-4] == "storage"
411 and pieces[-3] == "shares"):
412 # we're sitting in .../storage/shares/$START/$SINDEX , and there
413 # are sharefiles here
414 assert pieces[-5].startswith("client")
415 client_num = int(pieces[-5][-1])
416 storage_index_s = pieces[-1]
417 storage_index = si_a2b(storage_index_s)
418 for sharename in filenames:
419 shnum = int(sharename)
420 filename = os.path.join(dirpath, sharename)
421 data = (client_num, storage_index, filename, shnum)
424 self.fail("unable to find any share files in %s" % basedir)
427 def _corrupt_mutable_share(self, filename, which):
428 msf = MutableShareFile(filename)
429 datav = msf.readv([ (0, 1000000) ])
430 final_share = datav[0]
431 assert len(final_share) < 1000000 # ought to be truncated
432 pieces = mutable_layout.unpack_share(final_share)
433 (seqnum, root_hash, IV, k, N, segsize, datalen,
434 verification_key, signature, share_hash_chain, block_hash_tree,
435 share_data, enc_privkey) = pieces
437 if which == "seqnum":
440 root_hash = self.flip_bit(root_hash)
442 IV = self.flip_bit(IV)
443 elif which == "segsize":
444 segsize = segsize + 15
445 elif which == "pubkey":
446 verification_key = self.flip_bit(verification_key)
447 elif which == "signature":
448 signature = self.flip_bit(signature)
449 elif which == "share_hash_chain":
450 nodenum = share_hash_chain.keys()[0]
451 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
452 elif which == "block_hash_tree":
453 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
454 elif which == "share_data":
455 share_data = self.flip_bit(share_data)
456 elif which == "encprivkey":
457 enc_privkey = self.flip_bit(enc_privkey)
459 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
461 final_share = mutable_layout.pack_share(prefix,
468 msf.writev( [(0, final_share)], None)
471 def test_mutable(self):
472 self.basedir = "system/SystemTest/test_mutable"
473 DATA = "initial contents go here." # 25 bytes % 3 != 0
474 DATA_uploadable = MutableData(DATA)
475 NEWDATA = "new contents yay"
476 NEWDATA_uploadable = MutableData(NEWDATA)
477 NEWERDATA = "this is getting old"
478 NEWERDATA_uploadable = MutableData(NEWERDATA)
480 d = self.set_up_nodes(use_key_generator=True)
482 def _create_mutable(res):
484 log.msg("starting create_mutable_file")
485 d1 = c.create_mutable_file(DATA_uploadable)
487 log.msg("DONE: %s" % (res,))
488 self._mutable_node_1 = res
489 d1.addCallback(_done)
491 d.addCallback(_create_mutable)
493 def _test_debug(res):
494 # find a share. It is important to run this while there is only
495 # one slot in the grid.
496 shares = self._find_all_shares(self.basedir)
497 (client_num, storage_index, filename, shnum) = shares[0]
498 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
500 log.msg(" for clients[%d]" % client_num)
502 out,err = StringIO(), StringIO()
503 rc = runner.runner(["debug", "dump-share", "--offsets",
505 stdout=out, stderr=err)
506 output = out.getvalue()
507 self.failUnlessEqual(rc, 0)
509 self.failUnless("Mutable slot found:\n" in output)
510 self.failUnless("share_type: SDMF\n" in output)
511 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
512 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
513 self.failUnless(" num_extra_leases: 0\n" in output)
514 self.failUnless(" secrets are for nodeid: %s\n" % peerid
516 self.failUnless(" SDMF contents:\n" in output)
517 self.failUnless(" seqnum: 1\n" in output)
518 self.failUnless(" required_shares: 3\n" in output)
519 self.failUnless(" total_shares: 10\n" in output)
520 self.failUnless(" segsize: 27\n" in output, (output, filename))
521 self.failUnless(" datalen: 25\n" in output)
522 # the exact share_hash_chain nodes depends upon the sharenum,
523 # and is more of a hassle to compute than I want to deal with
525 self.failUnless(" share_hash_chain: " in output)
526 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
527 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
528 base32.b2a(storage_index))
529 self.failUnless(expected in output)
530 except unittest.FailTest:
532 print "dump-share output was:"
535 d.addCallback(_test_debug)
539 # first, let's see if we can use the existing node to retrieve the
540 # contents. This allows it to use the cached pubkey and maybe the
541 # latest-known sharemap.
543 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
544 def _check_download_1(res):
545 self.failUnlessEqual(res, DATA)
546 # now we see if we can retrieve the data from a new node,
547 # constructed using the URI of the original one. We do this test
548 # on the same client that uploaded the data.
549 uri = self._mutable_node_1.get_uri()
550 log.msg("starting retrieve1")
551 newnode = self.clients[0].create_node_from_uri(uri)
552 newnode_2 = self.clients[0].create_node_from_uri(uri)
553 self.failUnlessIdentical(newnode, newnode_2)
554 return newnode.download_best_version()
555 d.addCallback(_check_download_1)
557 def _check_download_2(res):
558 self.failUnlessEqual(res, DATA)
559 # same thing, but with a different client
560 uri = self._mutable_node_1.get_uri()
561 newnode = self.clients[1].create_node_from_uri(uri)
562 log.msg("starting retrieve2")
563 d1 = newnode.download_best_version()
564 d1.addCallback(lambda res: (res, newnode))
566 d.addCallback(_check_download_2)
568 def _check_download_3((res, newnode)):
569 self.failUnlessEqual(res, DATA)
571 log.msg("starting replace1")
572 d1 = newnode.overwrite(NEWDATA_uploadable)
573 d1.addCallback(lambda res: newnode.download_best_version())
575 d.addCallback(_check_download_3)
577 def _check_download_4(res):
578 self.failUnlessEqual(res, NEWDATA)
579 # now create an even newer node and replace the data on it. This
580 # new node has never been used for download before.
581 uri = self._mutable_node_1.get_uri()
582 newnode1 = self.clients[2].create_node_from_uri(uri)
583 newnode2 = self.clients[3].create_node_from_uri(uri)
584 self._newnode3 = self.clients[3].create_node_from_uri(uri)
585 log.msg("starting replace2")
586 d1 = newnode1.overwrite(NEWERDATA_uploadable)
587 d1.addCallback(lambda res: newnode2.download_best_version())
589 d.addCallback(_check_download_4)
591 def _check_download_5(res):
592 log.msg("finished replace2")
593 self.failUnlessEqual(res, NEWERDATA)
594 d.addCallback(_check_download_5)
596 def _corrupt_shares(res):
597 # run around and flip bits in all but k of the shares, to test
599 shares = self._find_all_shares(self.basedir)
600 ## sort by share number
601 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
602 where = dict([ (shnum, filename)
603 for (client_num, storage_index, filename, shnum)
605 assert len(where) == 10 # this test is designed for 3-of-10
606 for shnum, filename in where.items():
607 # shares 7,8,9 are left alone. read will check
608 # (share_hash_chain, block_hash_tree, share_data). New
609 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
610 # segsize, signature).
612 # read: this will trigger "pubkey doesn't match
614 self._corrupt_mutable_share(filename, "pubkey")
615 self._corrupt_mutable_share(filename, "encprivkey")
617 # triggers "signature is invalid"
618 self._corrupt_mutable_share(filename, "seqnum")
620 # triggers "signature is invalid"
621 self._corrupt_mutable_share(filename, "R")
623 # triggers "signature is invalid"
624 self._corrupt_mutable_share(filename, "segsize")
626 self._corrupt_mutable_share(filename, "share_hash_chain")
628 self._corrupt_mutable_share(filename, "block_hash_tree")
630 self._corrupt_mutable_share(filename, "share_data")
631 # other things to correct: IV, signature
632 # 7,8,9 are left alone
634 # note that initial_query_count=5 means that we'll hit the
635 # first 5 servers in effectively random order (based upon
636 # response time), so we won't necessarily ever get a "pubkey
637 # doesn't match fingerprint" error (if we hit shnum>=1 before
638 # shnum=0, we pull the pubkey from there). To get repeatable
639 # specific failures, we need to set initial_query_count=1,
640 # but of course that will change the sequencing behavior of
641 # the retrieval process. TODO: find a reasonable way to make
642 # this a parameter, probably when we expand this test to test
643 # for one failure mode at a time.
645 # when we retrieve this, we should get three signature
646 # failures (where we've mangled seqnum, R, and segsize). The
648 d.addCallback(_corrupt_shares)
650 d.addCallback(lambda res: self._newnode3.download_best_version())
651 d.addCallback(_check_download_5)
653 def _check_empty_file(res):
654 # make sure we can create empty files, this usually screws up the
656 d1 = self.clients[2].create_mutable_file(MutableData(""))
657 d1.addCallback(lambda newnode: newnode.download_best_version())
658 d1.addCallback(lambda res: self.failUnlessEqual("", res))
660 d.addCallback(_check_empty_file)
662 d.addCallback(lambda res: self.clients[0].create_dirnode())
663 def _created_dirnode(dnode):
664 log.msg("_created_dirnode(%s)" % (dnode,))
666 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
667 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
668 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
669 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
670 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
671 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
672 d1.addCallback(lambda res: dnode.build_manifest().when_done())
673 d1.addCallback(lambda res:
674 self.failUnlessEqual(len(res["manifest"]), 1))
676 d.addCallback(_created_dirnode)
678 def wait_for_c3_kg_conn():
679 return self.clients[3]._key_generator is not None
680 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
682 def check_kg_poolsize(junk, size_delta):
683 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
684 self.key_generator_svc.key_generator.pool_size + size_delta)
686 d.addCallback(check_kg_poolsize, 0)
687 d.addCallback(lambda junk:
688 self.clients[3].create_mutable_file(MutableData('hello, world')))
689 d.addCallback(check_kg_poolsize, -1)
690 d.addCallback(lambda junk: self.clients[3].create_dirnode())
691 d.addCallback(check_kg_poolsize, -2)
692 # use_helper induces use of clients[3], which is the using-key_gen client
693 d.addCallback(lambda junk:
694 self.POST("uri?t=mkdir&name=george", use_helper=True))
695 d.addCallback(check_kg_poolsize, -3)
699 def flip_bit(self, good):
700 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
702 def mangle_uri(self, gooduri):
703 # change the key, which changes the storage index, which means we'll
704 # be asking about the wrong file, so nobody will have any shares
705 u = uri.from_string(gooduri)
706 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
707 uri_extension_hash=u.uri_extension_hash,
708 needed_shares=u.needed_shares,
709 total_shares=u.total_shares,
711 return u2.to_string()
713 # TODO: add a test which mangles the uri_extension_hash instead, and
714 # should fail due to not being able to get a valid uri_extension block.
715 # Also a test which sneakily mangles the uri_extension block to change
716 # some of the validation data, so it will fail in the post-download phase
717 # when the file's crypttext integrity check fails. Do the same thing for
718 # the key, which should cause the download to fail the post-download
719 # plaintext_hash check.
721 def test_filesystem(self):
722 self.basedir = "system/SystemTest/test_filesystem"
723 self.data = LARGE_DATA
724 d = self.set_up_nodes(use_stats_gatherer=True)
725 def _new_happy_semantics(ign):
726 for c in self.clients:
727 c.encoding_params['happy'] = 1
728 d.addCallback(_new_happy_semantics)
729 d.addCallback(self._test_introweb)
730 d.addCallback(self.log, "starting publish")
731 d.addCallback(self._do_publish1)
732 d.addCallback(self._test_runner)
733 d.addCallback(self._do_publish2)
734 # at this point, we have the following filesystem (where "R" denotes
735 # self._root_directory_uri):
738 # R/subdir1/mydata567
740 # R/subdir1/subdir2/mydata992
742 d.addCallback(lambda res: self.bounce_client(0))
743 d.addCallback(self.log, "bounced client0")
745 d.addCallback(self._check_publish1)
746 d.addCallback(self.log, "did _check_publish1")
747 d.addCallback(self._check_publish2)
748 d.addCallback(self.log, "did _check_publish2")
749 d.addCallback(self._do_publish_private)
750 d.addCallback(self.log, "did _do_publish_private")
751 # now we also have (where "P" denotes a new dir):
752 # P/personal/sekrit data
753 # P/s2-rw -> /subdir1/subdir2/
754 # P/s2-ro -> /subdir1/subdir2/ (read-only)
755 d.addCallback(self._check_publish_private)
756 d.addCallback(self.log, "did _check_publish_private")
757 d.addCallback(self._test_web)
758 d.addCallback(self._test_control)
759 d.addCallback(self._test_cli)
760 # P now has four top-level children:
761 # P/personal/sekrit data
764 # P/test_put/ (empty)
765 d.addCallback(self._test_checker)
768 def _test_introweb(self, res):
769 d = getPage(self.introweb_url, method="GET", followRedirect=True)
772 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res)
773 verstr = str(allmydata.__version__)
775 # The Python "rational version numbering" convention
776 # disallows "-r$REV" but allows ".post$REV"
777 # instead. Eventually we'll probably move to
778 # that. When we do, this test won't go red:
779 ix = verstr.rfind('-r')
781 altverstr = verstr[:ix] + '.post' + verstr[ix+2:]
783 ix = verstr.rfind('.post')
785 altverstr = verstr[:ix] + '-r' + verstr[ix+5:]
789 appverstr = "%s: %s" % (allmydata.__appname__, verstr)
790 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
792 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
793 self.failUnless("Announcement Summary: storage: 5" in res)
794 self.failUnless("Subscription Summary: storage: 5" in res)
795 self.failUnless("tahoe.css" in res)
796 except unittest.FailTest:
798 print "GET %s output was:" % self.introweb_url
801 d.addCallback(_check)
802 # make sure it serves the CSS too
803 d.addCallback(lambda res:
804 getPage(self.introweb_url+"tahoe.css", method="GET"))
805 d.addCallback(lambda res:
806 getPage(self.introweb_url + "?t=json",
807 method="GET", followRedirect=True))
808 def _check_json(res):
809 data = simplejson.loads(res)
811 self.failUnlessEqual(data["subscription_summary"],
813 self.failUnlessEqual(data["announcement_summary"],
815 except unittest.FailTest:
817 print "GET %s?t=json output was:" % self.introweb_url
820 d.addCallback(_check_json)
823 def _do_publish1(self, res):
824 ut = upload.Data(self.data, convergence=None)
826 d = c0.create_dirnode()
827 def _made_root(new_dirnode):
828 self._root_directory_uri = new_dirnode.get_uri()
829 return c0.create_node_from_uri(self._root_directory_uri)
830 d.addCallback(_made_root)
831 d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
832 def _made_subdir1(subdir1_node):
833 self._subdir1_node = subdir1_node
834 d1 = subdir1_node.add_file(u"mydata567", ut)
835 d1.addCallback(self.log, "publish finished")
836 def _stash_uri(filenode):
837 self.uri = filenode.get_uri()
838 assert isinstance(self.uri, str), (self.uri, filenode)
839 d1.addCallback(_stash_uri)
841 d.addCallback(_made_subdir1)
844 def _do_publish2(self, res):
845 ut = upload.Data(self.data, convergence=None)
846 d = self._subdir1_node.create_subdirectory(u"subdir2")
847 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
850 def log(self, res, *args, **kwargs):
851 # print "MSG: %s RES: %s" % (msg, args)
852 log.msg(*args, **kwargs)
855 def _do_publish_private(self, res):
856 self.smalldata = "sssh, very secret stuff"
857 ut = upload.Data(self.smalldata, convergence=None)
858 d = self.clients[0].create_dirnode()
859 d.addCallback(self.log, "GOT private directory")
860 def _got_new_dir(privnode):
861 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
862 d1 = privnode.create_subdirectory(u"personal")
863 d1.addCallback(self.log, "made P/personal")
864 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
865 d1.addCallback(self.log, "made P/personal/sekrit data")
866 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
868 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
869 s2node.get_readonly_uri())
870 d2.addCallback(lambda node:
871 privnode.set_uri(u"s2-ro",
872 s2node.get_readonly_uri(),
873 s2node.get_readonly_uri()))
875 d1.addCallback(_got_s2)
876 d1.addCallback(lambda res: privnode)
878 d.addCallback(_got_new_dir)
881 def _check_publish1(self, res):
882 # this one uses the iterative API
884 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
885 d.addCallback(self.log, "check_publish1 got /")
886 d.addCallback(lambda root: root.get(u"subdir1"))
887 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
888 d.addCallback(lambda filenode: download_to_data(filenode))
889 d.addCallback(self.log, "get finished")
891 self.failUnlessEqual(data, self.data)
892 d.addCallback(_get_done)
895 def _check_publish2(self, res):
896 # this one uses the path-based API
897 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
898 d = rootnode.get_child_at_path(u"subdir1")
899 d.addCallback(lambda dirnode:
900 self.failUnless(IDirectoryNode.providedBy(dirnode)))
901 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
902 d.addCallback(lambda filenode: download_to_data(filenode))
903 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
905 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
906 def _got_filenode(filenode):
907 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
908 assert fnode == filenode
909 d.addCallback(_got_filenode)
912 def _check_publish_private(self, resnode):
913 # this one uses the path-based API
914 self._private_node = resnode
916 d = self._private_node.get_child_at_path(u"personal")
917 def _got_personal(personal):
918 self._personal_node = personal
920 d.addCallback(_got_personal)
922 d.addCallback(lambda dirnode:
923 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
925 return self._private_node.get_child_at_path(path)
927 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
928 d.addCallback(lambda filenode: download_to_data(filenode))
929 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
930 d.addCallback(lambda res: get_path(u"s2-rw"))
931 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
932 d.addCallback(lambda res: get_path(u"s2-ro"))
933 def _got_s2ro(dirnode):
934 self.failUnless(dirnode.is_mutable(), dirnode)
935 self.failUnless(dirnode.is_readonly(), dirnode)
936 d1 = defer.succeed(None)
937 d1.addCallback(lambda res: dirnode.list())
938 d1.addCallback(self.log, "dirnode.list")
940 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
942 d1.addCallback(self.log, "doing add_file(ro)")
943 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
944 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
946 d1.addCallback(self.log, "doing get(ro)")
947 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
948 d1.addCallback(lambda filenode:
949 self.failUnless(IFileNode.providedBy(filenode)))
951 d1.addCallback(self.log, "doing delete(ro)")
952 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
954 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
956 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
958 personal = self._personal_node
959 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
961 d1.addCallback(self.log, "doing move_child_to(ro)2")
962 d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
964 d1.addCallback(self.log, "finished with _got_s2ro")
966 d.addCallback(_got_s2ro)
967 def _got_home(dummy):
968 home = self._private_node
969 personal = self._personal_node
970 d1 = defer.succeed(None)
971 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
972 d1.addCallback(lambda res:
973 personal.move_child_to(u"sekrit data",home,u"sekrit"))
975 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
976 d1.addCallback(lambda res:
977 home.move_child_to(u"sekrit", home, u"sekrit data"))
979 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
980 d1.addCallback(lambda res:
981 home.move_child_to(u"sekrit data", personal))
983 d1.addCallback(lambda res: home.build_manifest().when_done())
984 d1.addCallback(self.log, "manifest")
988 # P/personal/sekrit data
989 # P/s2-rw (same as P/s2-ro)
990 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
991 d1.addCallback(lambda res:
992 self.failUnlessEqual(len(res["manifest"]), 5))
993 d1.addCallback(lambda res: home.start_deep_stats().when_done())
994 def _check_stats(stats):
995 expected = {"count-immutable-files": 1,
996 "count-mutable-files": 0,
997 "count-literal-files": 1,
999 "count-directories": 3,
1000 "size-immutable-files": 112,
1001 "size-literal-files": 23,
1002 #"size-directories": 616, # varies
1003 #"largest-directory": 616,
1004 "largest-directory-children": 3,
1005 "largest-immutable-file": 112,
1007 for k,v in expected.iteritems():
1008 self.failUnlessEqual(stats[k], v,
1009 "stats[%s] was %s, not %s" %
1011 self.failUnless(stats["size-directories"] > 1300,
1012 stats["size-directories"])
1013 self.failUnless(stats["largest-directory"] > 800,
1014 stats["largest-directory"])
1015 self.failUnlessEqual(stats["size-files-histogram"],
1016 [ (11, 31, 1), (101, 316, 1) ])
1017 d1.addCallback(_check_stats)
1019 d.addCallback(_got_home)
1022 def shouldFail(self, res, expected_failure, which, substring=None):
1023 if isinstance(res, Failure):
1024 res.trap(expected_failure)
1026 self.failUnless(substring in str(res),
1027 "substring '%s' not in '%s'"
1028 % (substring, str(res)))
1030 self.fail("%s was supposed to raise %s, not get '%s'" %
1031 (which, expected_failure, res))
1033 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1034 assert substring is None or isinstance(substring, str)
1035 d = defer.maybeDeferred(callable, *args, **kwargs)
1037 if isinstance(res, Failure):
1038 res.trap(expected_failure)
1040 self.failUnless(substring in str(res),
1041 "substring '%s' not in '%s'"
1042 % (substring, str(res)))
1044 self.fail("%s was supposed to raise %s, not get '%s'" %
1045 (which, expected_failure, res))
1049 def PUT(self, urlpath, data):
1050 url = self.webish_url + urlpath
1051 return getPage(url, method="PUT", postdata=data)
1053 def GET(self, urlpath, followRedirect=False):
1054 url = self.webish_url + urlpath
1055 return getPage(url, method="GET", followRedirect=followRedirect)
1057 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1058 sepbase = "boogabooga"
1059 sep = "--" + sepbase
1062 form.append('Content-Disposition: form-data; name="_charset"')
1064 form.append('UTF-8')
1066 for name, value in fields.iteritems():
1067 if isinstance(value, tuple):
1068 filename, value = value
1069 form.append('Content-Disposition: form-data; name="%s"; '
1070 'filename="%s"' % (name, filename.encode("utf-8")))
1072 form.append('Content-Disposition: form-data; name="%s"' % name)
1074 form.append(str(value))
1080 body = "\r\n".join(form) + "\r\n"
1081 headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1082 return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1084 def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1087 url = self.helper_webish_url + urlpath
1089 url = self.webish_url + urlpath
1090 return getPage(url, method="POST", postdata=body, headers=headers,
1091 followRedirect=followRedirect)
1093 def _test_web(self, res):
1094 base = self.webish_url
1095 public = "uri/" + self._root_directory_uri
1097 def _got_welcome(page):
1098 html = page.replace('\n', ' ')
1099 connected_re = r'Connected to <span>%d</span>\s*of <span>%d</span> known storage servers' % (self.numclients, self.numclients)
1100 self.failUnless(re.search(connected_re, html),
1101 "I didn't see the right '%s' message in:\n%s" % (connected_re, page))
1102 # nodeids/tubids don't have any regexp-special characters
1103 nodeid_re = r'<th>Node ID:</th>\s*<td title="TubID: %s">%s</td>' % (
1104 self.clients[0].get_long_tubid(), self.clients[0].get_long_nodeid())
1105 self.failUnless(re.search(nodeid_re, html),
1106 "I didn't see the right '%s' message in:\n%s" % (nodeid_re, page))
1107 self.failUnless("Helper: 0 active uploads" in page)
1108 d.addCallback(_got_welcome)
1109 d.addCallback(self.log, "done with _got_welcome")
1111 # get the welcome page from the node that uses the helper too
1112 d.addCallback(lambda res: getPage(self.helper_webish_url))
1113 def _got_welcome_helper(page):
1114 html = page.replace('\n', ' ')
1115 self.failUnless(re.search('<img src="img/connected-yes.png" alt="Connected" />', html), page)
1116 self.failUnlessIn("Not running helper", page)
1117 d.addCallback(_got_welcome_helper)
1119 d.addCallback(lambda res: getPage(base + public))
1120 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1121 def _got_subdir1(page):
1122 # there ought to be an href for our file
1123 self.failUnlessIn('<td align="right">%d</td>' % len(self.data), page)
1124 self.failUnless(">mydata567</a>" in page)
1125 d.addCallback(_got_subdir1)
1126 d.addCallback(self.log, "done with _got_subdir1")
1127 d.addCallback(lambda res:
1128 getPage(base + public + "/subdir1/mydata567"))
1129 def _got_data(page):
1130 self.failUnlessEqual(page, self.data)
1131 d.addCallback(_got_data)
1133 # download from a URI embedded in a URL
1134 d.addCallback(self.log, "_get_from_uri")
1135 def _get_from_uri(res):
1136 return getPage(base + "uri/%s?filename=%s"
1137 % (self.uri, "mydata567"))
1138 d.addCallback(_get_from_uri)
1139 def _got_from_uri(page):
1140 self.failUnlessEqual(page, self.data)
1141 d.addCallback(_got_from_uri)
1143 # download from a URI embedded in a URL, second form
1144 d.addCallback(self.log, "_get_from_uri2")
1145 def _get_from_uri2(res):
1146 return getPage(base + "uri?uri=%s" % (self.uri,))
1147 d.addCallback(_get_from_uri2)
1148 d.addCallback(_got_from_uri)
1150 # download from a bogus URI, make sure we get a reasonable error
1151 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1152 def _get_from_bogus_uri(res):
1153 d1 = getPage(base + "uri/%s?filename=%s"
1154 % (self.mangle_uri(self.uri), "mydata567"))
1155 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1158 d.addCallback(_get_from_bogus_uri)
1159 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1161 # upload a file with PUT
1162 d.addCallback(self.log, "about to try PUT")
1163 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1164 "new.txt contents"))
1165 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1166 d.addCallback(self.failUnlessEqual, "new.txt contents")
1167 # and again with something large enough to use multiple segments,
1168 # and hopefully trigger pauseProducing too
1169 def _new_happy_semantics(ign):
1170 for c in self.clients:
1171 # these get reset somewhere? Whatever.
1172 c.encoding_params['happy'] = 1
1173 d.addCallback(_new_happy_semantics)
1174 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1175 "big" * 500000)) # 1.5MB
1176 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1177 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1179 # can we replace files in place?
1180 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1182 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1183 d.addCallback(self.failUnlessEqual, "NEWER contents")
1185 # test unlinked POST
1186 d.addCallback(lambda res: self.POST("uri", t="upload",
1187 file=("new.txt", "data" * 10000)))
1188 # and again using the helper, which exercises different upload-status
1190 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1191 file=("foo.txt", "data2" * 10000)))
1193 # check that the status page exists
1194 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1195 def _got_status(res):
1196 # find an interesting upload and download to look at. LIT files
1197 # are not interesting.
1198 h = self.clients[0].get_history()
1199 for ds in h.list_all_download_statuses():
1200 if ds.get_size() > 200:
1201 self._down_status = ds.get_counter()
1202 for us in h.list_all_upload_statuses():
1203 if us.get_size() > 200:
1204 self._up_status = us.get_counter()
1205 rs = list(h.list_all_retrieve_statuses())[0]
1206 self._retrieve_status = rs.get_counter()
1207 ps = list(h.list_all_publish_statuses())[0]
1208 self._publish_status = ps.get_counter()
1209 us = list(h.list_all_mapupdate_statuses())[0]
1210 self._update_status = us.get_counter()
1212 # and that there are some upload- and download- status pages
1213 return self.GET("status/up-%d" % self._up_status)
1214 d.addCallback(_got_status)
1216 return self.GET("status/down-%d" % self._down_status)
1217 d.addCallback(_got_up)
1219 return self.GET("status/mapupdate-%d" % self._update_status)
1220 d.addCallback(_got_down)
1221 def _got_update(res):
1222 return self.GET("status/publish-%d" % self._publish_status)
1223 d.addCallback(_got_update)
1224 def _got_publish(res):
1225 self.failUnlessIn("Publish Results", res)
1226 return self.GET("status/retrieve-%d" % self._retrieve_status)
1227 d.addCallback(_got_publish)
1228 def _got_retrieve(res):
1229 self.failUnlessIn("Retrieve Results", res)
1230 d.addCallback(_got_retrieve)
1232 # check that the helper status page exists
1233 d.addCallback(lambda res:
1234 self.GET("helper_status", followRedirect=True))
1235 def _got_helper_status(res):
1236 self.failUnless("Bytes Fetched:" in res)
1237 # touch a couple of files in the helper's working directory to
1238 # exercise more code paths
1239 workdir = os.path.join(self.getdir("client0"), "helper")
1240 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1241 f = open(incfile, "wb")
1242 f.write("small file")
1244 then = time.time() - 86400*3
1246 os.utime(incfile, (now, then))
1247 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1248 f = open(encfile, "wb")
1249 f.write("less small file")
1251 os.utime(encfile, (now, then))
1252 d.addCallback(_got_helper_status)
1253 # and that the json form exists
1254 d.addCallback(lambda res:
1255 self.GET("helper_status?t=json", followRedirect=True))
1256 def _got_helper_status_json(res):
1257 data = simplejson.loads(res)
1258 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1260 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1261 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1262 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1264 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1265 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1266 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1268 d.addCallback(_got_helper_status_json)
1270 # and check that client[3] (which uses a helper but does not run one
1271 # itself) doesn't explode when you ask for its status
1272 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1273 def _got_non_helper_status(res):
1274 self.failUnlessIn("Recent and Active Operations", res)
1275 d.addCallback(_got_non_helper_status)
1277 # or for helper status with t=json
1278 d.addCallback(lambda res:
1279 getPage(self.helper_webish_url + "helper_status?t=json"))
1280 def _got_non_helper_status_json(res):
1281 data = simplejson.loads(res)
1282 self.failUnlessEqual(data, {})
1283 d.addCallback(_got_non_helper_status_json)
1285 # see if the statistics page exists
1286 d.addCallback(lambda res: self.GET("statistics"))
1287 def _got_stats(res):
1288 self.failUnlessIn("Operational Statistics", res)
1289 self.failUnlessIn(" 'downloader.files_downloaded': 5,", res)
1290 d.addCallback(_got_stats)
1291 d.addCallback(lambda res: self.GET("statistics?t=json"))
1292 def _got_stats_json(res):
1293 data = simplejson.loads(res)
1294 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1295 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1296 d.addCallback(_got_stats_json)
1298 # TODO: mangle the second segment of a file, to test errors that
1299 # occur after we've already sent some good data, which uses a
1300 # different error path.
1302 # TODO: download a URI with a form
1303 # TODO: create a directory by using a form
1304 # TODO: upload by using a form on the directory page
1305 # url = base + "somedir/subdir1/freeform_post!!upload"
1306 # TODO: delete a file by using a button on the directory page
1310 def _test_runner(self, res):
1311 # exercise some of the diagnostic tools in runner.py
1314 for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1315 if "storage" not in dirpath:
1319 pieces = dirpath.split(os.sep)
1320 if (len(pieces) >= 4
1321 and pieces[-4] == "storage"
1322 and pieces[-3] == "shares"):
1323 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1324 # are sharefiles here
1325 filename = os.path.join(dirpath, filenames[0])
1326 # peek at the magic to see if it is a chk share
1327 magic = open(filename, "rb").read(4)
1328 if magic == '\x00\x00\x00\x01':
1331 self.fail("unable to find any uri_extension files in %r"
1333 log.msg("test_system.SystemTest._test_runner using %r" % filename)
1335 out,err = StringIO(), StringIO()
1336 rc = runner.runner(["debug", "dump-share", "--offsets",
1337 unicode_to_argv(filename)],
1338 stdout=out, stderr=err)
1339 output = out.getvalue()
1340 self.failUnlessEqual(rc, 0)
1342 # we only upload a single file, so we can assert some things about
1343 # its size and shares.
1344 self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1345 self.failUnlessIn("size: %d\n" % len(self.data), output)
1346 self.failUnlessIn("num_segments: 1\n", output)
1347 # segment_size is always a multiple of needed_shares
1348 self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1349 self.failUnlessIn("total_shares: 10\n", output)
1350 # keys which are supposed to be present
1351 for key in ("size", "num_segments", "segment_size",
1352 "needed_shares", "total_shares",
1353 "codec_name", "codec_params", "tail_codec_params",
1354 #"plaintext_hash", "plaintext_root_hash",
1355 "crypttext_hash", "crypttext_root_hash",
1356 "share_root_hash", "UEB_hash"):
1357 self.failUnlessIn("%s: " % key, output)
1358 self.failUnlessIn(" verify-cap: URI:CHK-Verifier:", output)
1360 # now use its storage index to find the other shares using the
1361 # 'find-shares' tool
1362 sharedir, shnum = os.path.split(filename)
1363 storagedir, storage_index_s = os.path.split(sharedir)
1364 storage_index_s = str(storage_index_s)
1365 out,err = StringIO(), StringIO()
1366 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1367 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1368 rc = runner.runner(cmd, stdout=out, stderr=err)
1369 self.failUnlessEqual(rc, 0)
1371 sharefiles = [sfn.strip() for sfn in out.readlines()]
1372 self.failUnlessEqual(len(sharefiles), 10)
1374 # also exercise the 'catalog-shares' tool
1375 out,err = StringIO(), StringIO()
1376 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1377 cmd = ["debug", "catalog-shares"] + nodedirs
1378 rc = runner.runner(cmd, stdout=out, stderr=err)
1379 self.failUnlessEqual(rc, 0)
1381 descriptions = [sfn.strip() for sfn in out.readlines()]
1382 self.failUnlessEqual(len(descriptions), 30)
1384 for line in descriptions
1385 if line.startswith("CHK %s " % storage_index_s)]
1386 self.failUnlessEqual(len(matching), 10)
1388 def _test_control(self, res):
1389 # exercise the remote-control-the-client foolscap interfaces in
1390 # allmydata.control (mostly used for performance tests)
1391 c0 = self.clients[0]
1392 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1393 control_furl = open(control_furl_file, "r").read().strip()
1394 # it doesn't really matter which Tub we use to connect to the client,
1395 # so let's just use our IntroducerNode's
1396 d = self.introducer.tub.getReference(control_furl)
1397 d.addCallback(self._test_control2, control_furl_file)
1399 def _test_control2(self, rref, filename):
1400 d = defer.succeed(None)
1401 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1402 if sys.platform in ("linux2", "linux3"):
1403 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1404 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1407 def _test_cli(self, res):
1408 # run various CLI commands (in a thread, since they use blocking
1411 private_uri = self._private_node.get_uri()
1412 client0_basedir = self.getdir("client0")
1415 "--node-directory", client0_basedir,
1418 d = defer.succeed(None)
1420 # for compatibility with earlier versions, private/root_dir.cap is
1421 # supposed to be treated as an alias named "tahoe:". Start by making
1422 # sure that works, before we add other aliases.
1424 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1425 f = open(root_file, "w")
1426 f.write(private_uri)
1429 def run(ignored, verb, *args, **kwargs):
1430 stdin = kwargs.get("stdin", "")
1431 newargs = nodeargs + [verb] + list(args)
1432 return self._run_cli(newargs, stdin=stdin)
1434 def _check_ls((out,err), expected_children, unexpected_children=[]):
1435 self.failUnlessEqual(err, "")
1436 for s in expected_children:
1437 self.failUnless(s in out, (s,out))
1438 for s in unexpected_children:
1439 self.failIf(s in out, (s,out))
1441 def _check_ls_root((out,err)):
1442 self.failUnless("personal" in out)
1443 self.failUnless("s2-ro" in out)
1444 self.failUnless("s2-rw" in out)
1445 self.failUnlessEqual(err, "")
1447 # this should reference private_uri
1448 d.addCallback(run, "ls")
1449 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1451 d.addCallback(run, "list-aliases")
1452 def _check_aliases_1((out,err)):
1453 self.failUnlessEqual(err, "")
1454 self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1455 d.addCallback(_check_aliases_1)
1457 # now that that's out of the way, remove root_dir.cap and work with
1459 d.addCallback(lambda res: os.unlink(root_file))
1460 d.addCallback(run, "list-aliases")
1461 def _check_aliases_2((out,err)):
1462 self.failUnlessEqual(err, "")
1463 self.failUnlessEqual(out, "")
1464 d.addCallback(_check_aliases_2)
1466 d.addCallback(run, "mkdir")
1467 def _got_dir( (out,err) ):
1468 self.failUnless(uri.from_string_dirnode(out.strip()))
1470 d.addCallback(_got_dir)
1471 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1473 d.addCallback(run, "list-aliases")
1474 def _check_aliases_3((out,err)):
1475 self.failUnlessEqual(err, "")
1476 self.failUnless("tahoe: " in out)
1477 d.addCallback(_check_aliases_3)
1479 def _check_empty_dir((out,err)):
1480 self.failUnlessEqual(out, "")
1481 self.failUnlessEqual(err, "")
1482 d.addCallback(run, "ls")
1483 d.addCallback(_check_empty_dir)
1485 def _check_missing_dir((out,err)):
1486 # TODO: check that rc==2
1487 self.failUnlessEqual(out, "")
1488 self.failUnlessEqual(err, "No such file or directory\n")
1489 d.addCallback(run, "ls", "bogus")
1490 d.addCallback(_check_missing_dir)
1495 fn = os.path.join(self.basedir, "file%d" % i)
1497 data = "data to be uploaded: file%d\n" % i
1499 open(fn,"wb").write(data)
1501 def _check_stdout_against((out,err), filenum=None, data=None):
1502 self.failUnlessEqual(err, "")
1503 if filenum is not None:
1504 self.failUnlessEqual(out, datas[filenum])
1505 if data is not None:
1506 self.failUnlessEqual(out, data)
1508 # test all both forms of put: from a file, and from stdin
1510 d.addCallback(run, "put", files[0], "tahoe-file0")
1511 def _put_out((out,err)):
1512 self.failUnless("URI:LIT:" in out, out)
1513 self.failUnless("201 Created" in err, err)
1515 return run(None, "get", uri0)
1516 d.addCallback(_put_out)
1517 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1519 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1520 # tahoe put bar tahoe:FOO
1521 d.addCallback(run, "put", files[2], "tahoe:file2")
1522 d.addCallback(run, "put", "--format=SDMF", files[3], "tahoe:file3")
1523 def _check_put_mutable((out,err)):
1524 self._mutable_file3_uri = out.strip()
1525 d.addCallback(_check_put_mutable)
1526 d.addCallback(run, "get", "tahoe:file3")
1527 d.addCallback(_check_stdout_against, 3)
1530 STDIN_DATA = "This is the file to upload from stdin."
1531 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1532 # tahoe put tahoe:FOO
1533 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1534 stdin="Other file from stdin.")
1536 d.addCallback(run, "ls")
1537 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1538 "tahoe-file-stdin", "from-stdin"])
1539 d.addCallback(run, "ls", "subdir")
1540 d.addCallback(_check_ls, ["tahoe-file1"])
1543 d.addCallback(run, "mkdir", "subdir2")
1544 d.addCallback(run, "ls")
1545 # TODO: extract the URI, set an alias with it
1546 d.addCallback(_check_ls, ["subdir2"])
1548 # tahoe get: (to stdin and to a file)
1549 d.addCallback(run, "get", "tahoe-file0")
1550 d.addCallback(_check_stdout_against, 0)
1551 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1552 d.addCallback(_check_stdout_against, 1)
1553 outfile0 = os.path.join(self.basedir, "outfile0")
1554 d.addCallback(run, "get", "file2", outfile0)
1555 def _check_outfile0((out,err)):
1556 data = open(outfile0,"rb").read()
1557 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1558 d.addCallback(_check_outfile0)
1559 outfile1 = os.path.join(self.basedir, "outfile0")
1560 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1561 def _check_outfile1((out,err)):
1562 data = open(outfile1,"rb").read()
1563 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1564 d.addCallback(_check_outfile1)
1566 d.addCallback(run, "rm", "tahoe-file0")
1567 d.addCallback(run, "rm", "tahoe:file2")
1568 d.addCallback(run, "ls")
1569 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1571 d.addCallback(run, "ls", "-l")
1572 def _check_ls_l((out,err)):
1573 lines = out.split("\n")
1575 if "tahoe-file-stdin" in l:
1576 self.failUnless(l.startswith("-r-- "), l)
1577 self.failUnless(" %d " % len(STDIN_DATA) in l)
1579 self.failUnless(l.startswith("-rw- "), l) # mutable
1580 d.addCallback(_check_ls_l)
1582 d.addCallback(run, "ls", "--uri")
1583 def _check_ls_uri((out,err)):
1584 lines = out.split("\n")
1587 self.failUnless(self._mutable_file3_uri in l)
1588 d.addCallback(_check_ls_uri)
1590 d.addCallback(run, "ls", "--readonly-uri")
1591 def _check_ls_rouri((out,err)):
1592 lines = out.split("\n")
1595 rw_uri = self._mutable_file3_uri
1596 u = uri.from_string_mutable_filenode(rw_uri)
1597 ro_uri = u.get_readonly().to_string()
1598 self.failUnless(ro_uri in l)
1599 d.addCallback(_check_ls_rouri)
1602 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1603 d.addCallback(run, "ls")
1604 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1606 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1607 d.addCallback(run, "ls")
1608 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1610 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1611 d.addCallback(run, "ls")
1612 d.addCallback(_check_ls, ["file3", "file3-copy"])
1613 d.addCallback(run, "get", "tahoe:file3-copy")
1614 d.addCallback(_check_stdout_against, 3)
1616 # copy from disk into tahoe
1617 d.addCallback(run, "cp", files[4], "tahoe:file4")
1618 d.addCallback(run, "ls")
1619 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1620 d.addCallback(run, "get", "tahoe:file4")
1621 d.addCallback(_check_stdout_against, 4)
1623 # copy from tahoe into disk
1624 target_filename = os.path.join(self.basedir, "file-out")
1625 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1626 def _check_cp_out((out,err)):
1627 self.failUnless(os.path.exists(target_filename))
1628 got = open(target_filename,"rb").read()
1629 self.failUnlessEqual(got, datas[4])
1630 d.addCallback(_check_cp_out)
1632 # copy from disk to disk (silly case)
1633 target2_filename = os.path.join(self.basedir, "file-out-copy")
1634 d.addCallback(run, "cp", target_filename, target2_filename)
1635 def _check_cp_out2((out,err)):
1636 self.failUnless(os.path.exists(target2_filename))
1637 got = open(target2_filename,"rb").read()
1638 self.failUnlessEqual(got, datas[4])
1639 d.addCallback(_check_cp_out2)
1641 # copy from tahoe into disk, overwriting an existing file
1642 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1643 def _check_cp_out3((out,err)):
1644 self.failUnless(os.path.exists(target_filename))
1645 got = open(target_filename,"rb").read()
1646 self.failUnlessEqual(got, datas[3])
1647 d.addCallback(_check_cp_out3)
1649 # copy from disk into tahoe, overwriting an existing immutable file
1650 d.addCallback(run, "cp", files[5], "tahoe:file4")
1651 d.addCallback(run, "ls")
1652 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1653 d.addCallback(run, "get", "tahoe:file4")
1654 d.addCallback(_check_stdout_against, 5)
1656 # copy from disk into tahoe, overwriting an existing mutable file
1657 d.addCallback(run, "cp", files[5], "tahoe:file3")
1658 d.addCallback(run, "ls")
1659 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1660 d.addCallback(run, "get", "tahoe:file3")
1661 d.addCallback(_check_stdout_against, 5)
1663 # recursive copy: setup
1664 dn = os.path.join(self.basedir, "dir1")
1666 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1667 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1668 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1669 sdn2 = os.path.join(dn, "subdir2")
1671 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1672 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1674 # from disk into tahoe
1675 d.addCallback(run, "cp", "-r", dn, "tahoe:")
1676 d.addCallback(run, "ls")
1677 d.addCallback(_check_ls, ["dir1"])
1678 d.addCallback(run, "ls", "dir1")
1679 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1680 ["rfile4", "rfile5"])
1681 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1682 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1683 ["rfile1", "rfile2", "rfile3"])
1684 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1685 d.addCallback(_check_stdout_against, data="rfile4")
1687 # and back out again
1688 dn_copy = os.path.join(self.basedir, "dir1-copy")
1689 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1690 def _check_cp_r_out((out,err)):
1692 old = open(os.path.join(dn, name), "rb").read()
1693 newfn = os.path.join(dn_copy, "dir1", name)
1694 self.failUnless(os.path.exists(newfn))
1695 new = open(newfn, "rb").read()
1696 self.failUnlessEqual(old, new)
1700 _cmp(os.path.join("subdir2", "rfile4"))
1701 _cmp(os.path.join("subdir2", "rfile5"))
1702 d.addCallback(_check_cp_r_out)
1704 # and copy it a second time, which ought to overwrite the same files
1705 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1707 # and again, only writing filecaps
1708 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1709 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1710 def _check_capsonly((out,err)):
1711 # these should all be LITs
1712 x = open(os.path.join(dn_copy2, "dir1", "subdir2", "rfile4")).read()
1713 y = uri.from_string_filenode(x)
1714 self.failUnlessEqual(y.data, "rfile4")
1715 d.addCallback(_check_capsonly)
1717 # and tahoe-to-tahoe
1718 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1719 d.addCallback(run, "ls")
1720 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1721 d.addCallback(run, "ls", "dir1-copy/dir1")
1722 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1723 ["rfile4", "rfile5"])
1724 d.addCallback(run, "ls", "tahoe:dir1-copy/dir1/subdir2")
1725 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1726 ["rfile1", "rfile2", "rfile3"])
1727 d.addCallback(run, "get", "dir1-copy/dir1/subdir2/rfile4")
1728 d.addCallback(_check_stdout_against, data="rfile4")
1730 # and copy it a second time, which ought to overwrite the same files
1731 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1733 # tahoe_ls doesn't currently handle the error correctly: it tries to
1734 # JSON-parse a traceback.
1735 ## def _ls_missing(res):
1736 ## argv = nodeargs + ["ls", "bogus"]
1737 ## return self._run_cli(argv)
1738 ## d.addCallback(_ls_missing)
1739 ## def _check_ls_missing((out,err)):
1742 ## self.failUnlessEqual(err, "")
1743 ## d.addCallback(_check_ls_missing)
1747 def test_filesystem_with_cli_in_subprocess(self):
1748 # We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.
1750 self.basedir = "system/SystemTest/test_filesystem_with_cli_in_subprocess"
1751 d = self.set_up_nodes()
1752 def _new_happy_semantics(ign):
1753 for c in self.clients:
1754 c.encoding_params['happy'] = 1
1755 d.addCallback(_new_happy_semantics)
1757 def _run_in_subprocess(ignored, verb, *args, **kwargs):
1758 stdin = kwargs.get("stdin")
1759 env = kwargs.get("env")
1760 newargs = ["--node-directory", self.getdir("client0"), verb] + list(args)
1761 return self.run_bintahoe(newargs, stdin=stdin, env=env)
1763 def _check_succeeded(res, check_stderr=True):
1764 out, err, rc_or_sig = res
1765 self.failUnlessEqual(rc_or_sig, 0, str(res))
1767 self.failUnlessEqual(err, "")
1769 d.addCallback(_run_in_subprocess, "create-alias", "newalias")
1770 d.addCallback(_check_succeeded)
1772 STDIN_DATA = "This is the file to upload from stdin."
1773 d.addCallback(_run_in_subprocess, "put", "-", "newalias:tahoe-file", stdin=STDIN_DATA)
1774 d.addCallback(_check_succeeded, check_stderr=False)
1776 def _mv_with_http_proxy(ign):
1778 env['http_proxy'] = env['HTTP_PROXY'] = "http://127.0.0.0:12345" # invalid address
1779 return _run_in_subprocess(None, "mv", "newalias:tahoe-file", "newalias:tahoe-moved", env=env)
1780 d.addCallback(_mv_with_http_proxy)
1781 d.addCallback(_check_succeeded)
1783 d.addCallback(_run_in_subprocess, "ls", "newalias:")
1785 out, err, rc_or_sig = res
1786 self.failUnlessEqual(rc_or_sig, 0, str(res))
1787 self.failUnlessEqual(err, "", str(res))
1788 self.failUnlessIn("tahoe-moved", out)
1789 self.failIfIn("tahoe-file", out)
1790 d.addCallback(_check_ls)
1793 def test_debug_trial(self):
1794 def _check_for_line(lines, result, test):
1796 if result in l and test in l:
1798 self.fail("output (prefixed with '##') does not have a line containing both %r and %r:\n## %s"
1799 % (result, test, "\n## ".join(lines)))
1801 def _check_for_outcome(lines, out, outcome):
1802 self.failUnlessIn(outcome, out, "output (prefixed with '##') does not contain %r:\n## %s"
1803 % (outcome, "\n## ".join(lines)))
1805 d = self.run_bintahoe(['debug', 'trial', '--reporter=verbose',
1806 'allmydata.test.trialtest'])
1807 def _check_failure( (out, err, rc) ):
1808 self.failUnlessEqual(rc, 1)
1809 lines = out.split('\n')
1810 _check_for_line(lines, "[SKIPPED]", "test_skip")
1811 _check_for_line(lines, "[TODO]", "test_todo")
1812 _check_for_line(lines, "[FAIL]", "test_fail")
1813 _check_for_line(lines, "[ERROR]", "test_deferred_error")
1814 _check_for_line(lines, "[ERROR]", "test_error")
1815 _check_for_outcome(lines, out, "FAILED")
1816 d.addCallback(_check_failure)
1818 # the --quiet argument regression-tests a problem in finding which arguments to pass to trial
1819 d.addCallback(lambda ign: self.run_bintahoe(['--quiet', 'debug', 'trial', '--reporter=verbose',
1820 'allmydata.test.trialtest.Success']))
1821 def _check_success( (out, err, rc) ):
1822 self.failUnlessEqual(rc, 0)
1823 lines = out.split('\n')
1824 _check_for_line(lines, "[SKIPPED]", "test_skip")
1825 _check_for_line(lines, "[TODO]", "test_todo")
1826 _check_for_outcome(lines, out, "PASSED")
1827 d.addCallback(_check_success)
1830 def _run_cli(self, argv, stdin=""):
1832 stdout, stderr = StringIO(), StringIO()
1833 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1834 stdin=StringIO(stdin),
1835 stdout=stdout, stderr=stderr)
1837 return stdout.getvalue(), stderr.getvalue()
1838 d.addCallback(_done)
1841 def _test_checker(self, res):
1842 ut = upload.Data("too big to be literal" * 200, convergence=None)
1843 d = self._personal_node.add_file(u"big file", ut)
1845 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1846 def _check_dirnode_results(r):
1847 self.failUnless(r.is_healthy())
1848 d.addCallback(_check_dirnode_results)
1849 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1850 d.addCallback(_check_dirnode_results)
1852 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1853 def _got_chk_filenode(n):
1854 self.failUnless(isinstance(n, ImmutableFileNode))
1855 d = n.check(Monitor())
1856 def _check_filenode_results(r):
1857 self.failUnless(r.is_healthy())
1858 d.addCallback(_check_filenode_results)
1859 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1860 d.addCallback(_check_filenode_results)
1862 d.addCallback(_got_chk_filenode)
1864 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1865 def _got_lit_filenode(n):
1866 self.failUnless(isinstance(n, LiteralFileNode))
1867 d = n.check(Monitor())
1868 def _check_lit_filenode_results(r):
1869 self.failUnlessEqual(r, None)
1870 d.addCallback(_check_lit_filenode_results)
1871 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1872 d.addCallback(_check_lit_filenode_results)
1874 d.addCallback(_got_lit_filenode)
1878 class Connections(SystemTestMixin, unittest.TestCase):
1879 def test_rref(self):
1880 if NormalizedVersion(foolscap.__version__) < NormalizedVersion('0.6.4'):
1881 raise unittest.SkipTest("skipped due to http://foolscap.lothar.com/trac/ticket/196 "
1882 "(which does not affect normal usage of Tahoe-LAFS)")
1884 self.basedir = "system/Connections/rref"
1885 d = self.set_up_nodes(2)
1887 self.c0 = self.clients[0]
1888 nonclients = [s for s in self.c0.storage_broker.get_connected_servers()
1889 if s.get_serverid() != self.c0.nodeid]
1890 self.failUnlessEqual(len(nonclients), 1)
1892 self.s1 = nonclients[0] # s1 is the server, not c0
1893 self.s1_rref = self.s1.get_rref()
1894 self.failIfEqual(self.s1_rref, None)
1895 self.failUnless(self.s1.is_connected())
1896 d.addCallback(_start)
1898 # now shut down the server
1899 d.addCallback(lambda ign: self.clients[1].disownServiceParent())
1900 # and wait for the client to notice
1902 return len(self.c0.storage_broker.get_connected_servers()) < 2
1903 d.addCallback(lambda ign: self.poll(_poll))
1906 self.failIf(self.s1.is_connected())
1907 rref = self.s1.get_rref()
1908 self.failUnless(rref)
1909 self.failUnlessIdentical(rref, self.s1_rref)
1910 d.addCallback(_down)