1 from base64 import b32encode
2 import os, sys, time, re, simplejson
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
11 from allmydata import uri
12 from allmydata.storage.mutable import MutableShareFile
13 from allmydata.storage.server import si_a2b
14 from allmydata.immutable import download, filenode, offloaded, upload
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.scripts import runner
18 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
19 NoSuchChildError, NoSharesError
20 from allmydata.monitor import Monitor
21 from allmydata.mutable.common import NotMutableError
22 from allmydata.mutable import layout as mutable_layout
23 from foolscap.api import DeadReferenceError
24 from twisted.python.failure import Failure
25 from twisted.web.client import getPage
26 from twisted.web.error import Error
28 from allmydata.test.common import SystemTestMixin, MemoryConsumer, \
32 This is some data to publish to the virtual drive, which needs to be large
33 enough to not fit inside a LIT uri.
36 class CountingDataUploadable(upload.Data):
38 interrupt_after = None
39 interrupt_after_d = None
41 def read(self, length):
42 self.bytes_read += length
43 if self.interrupt_after is not None:
44 if self.bytes_read > self.interrupt_after:
45 self.interrupt_after = None
46 self.interrupt_after_d.callback(self)
47 return upload.Data.read(self, length)
49 class GrabEverythingConsumer:
55 def registerProducer(self, producer, streaming):
57 assert IPushProducer.providedBy(producer)
59 def write(self, data):
62 def unregisterProducer(self):
65 class SystemTest(SystemTestMixin, unittest.TestCase):
66 timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
68 def test_connections(self):
69 self.basedir = "system/SystemTest/test_connections"
70 d = self.set_up_nodes()
71 self.extra_node = None
72 d.addCallback(lambda res: self.add_extra_node(self.numclients))
73 def _check(extra_node):
74 self.extra_node = extra_node
75 for c in self.clients:
76 all_peerids = c.get_storage_broker().get_all_serverids()
77 self.failUnlessEqual(len(all_peerids), self.numclients+1)
79 permuted_peers = sb.get_servers_for_index("a")
80 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
83 def _shutdown_extra_node(res):
85 return self.extra_node.stopService()
87 d.addBoth(_shutdown_extra_node)
89 # test_connections is subsumed by test_upload_and_download, and takes
90 # quite a while to run on a slow machine (because of all the TLS
91 # connections that must be established). If we ever rework the introducer
92 # code to such an extent that we're not sure if it works anymore, we can
93 # reinstate this test until it does.
96 def test_upload_and_download_random_key(self):
97 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
98 return self._test_upload_and_download(convergence=None)
100 def test_upload_and_download_convergent(self):
101 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
102 return self._test_upload_and_download(convergence="some convergence string")
104 def _test_upload_and_download(self, convergence):
105 # we use 4000 bytes of data, which will result in about 400k written
106 # to disk among all our simulated nodes
107 DATA = "Some data to upload\n" * 200
108 d = self.set_up_nodes()
109 def _check_connections(res):
110 for c in self.clients:
111 all_peerids = c.get_storage_broker().get_all_serverids()
112 self.failUnlessEqual(len(all_peerids), self.numclients)
113 sb = c.storage_broker
114 permuted_peers = sb.get_servers_for_index("a")
115 self.failUnlessEqual(len(permuted_peers), self.numclients)
116 d.addCallback(_check_connections)
120 u = self.clients[0].getServiceNamed("uploader")
122 # we crank the max segsize down to 1024b for the duration of this
123 # test, so we can exercise multiple segments. It is important
124 # that this is not a multiple of the segment size, so that the
125 # tail segment is not the same length as the others. This actualy
126 # gets rounded up to 1025 to be a multiple of the number of
127 # required shares (since we use 25 out of 100 FEC).
128 up = upload.Data(DATA, convergence=convergence)
129 up.max_segment_size = 1024
132 d.addCallback(_do_upload)
133 def _upload_done(results):
135 log.msg("upload finished: uri is %s" % (theuri,))
137 assert isinstance(self.uri, str), self.uri
138 dl = self.clients[1].getServiceNamed("downloader")
140 d.addCallback(_upload_done)
142 def _upload_again(res):
143 # Upload again. If using convergent encryption then this ought to be
144 # short-circuited, however with the way we currently generate URIs
145 # (i.e. because they include the roothash), we have to do all of the
146 # encoding work, and only get to save on the upload part.
147 log.msg("UPLOADING AGAIN")
148 up = upload.Data(DATA, convergence=convergence)
149 up.max_segment_size = 1024
150 d1 = self.uploader.upload(up)
151 d.addCallback(_upload_again)
153 def _download_to_data(res):
154 log.msg("DOWNLOADING")
155 return self.downloader.download_to_data(self.uri)
156 d.addCallback(_download_to_data)
157 def _download_to_data_done(data):
158 log.msg("download finished")
159 self.failUnlessEqual(data, DATA)
160 d.addCallback(_download_to_data_done)
162 target_filename = os.path.join(self.basedir, "download.target")
163 def _download_to_filename(res):
164 return self.downloader.download_to_filename(self.uri,
166 d.addCallback(_download_to_filename)
167 def _download_to_filename_done(res):
168 newdata = open(target_filename, "rb").read()
169 self.failUnlessEqual(newdata, DATA)
170 d.addCallback(_download_to_filename_done)
172 target_filename2 = os.path.join(self.basedir, "download.target2")
173 def _download_to_filehandle(res):
174 fh = open(target_filename2, "wb")
175 return self.downloader.download_to_filehandle(self.uri, fh)
176 d.addCallback(_download_to_filehandle)
177 def _download_to_filehandle_done(fh):
179 newdata = open(target_filename2, "rb").read()
180 self.failUnlessEqual(newdata, DATA)
181 d.addCallback(_download_to_filehandle_done)
183 consumer = GrabEverythingConsumer()
184 ct = download.ConsumerAdapter(consumer)
185 d.addCallback(lambda res:
186 self.downloader.download(self.uri, ct))
187 def _download_to_consumer_done(ign):
188 self.failUnlessEqual(consumer.contents, DATA)
189 d.addCallback(_download_to_consumer_done)
192 n = self.clients[1].create_node_from_uri(self.uri)
193 d = download_to_data(n)
194 def _read_done(data):
195 self.failUnlessEqual(data, DATA)
196 d.addCallback(_read_done)
197 d.addCallback(lambda ign:
198 n.read(MemoryConsumer(), offset=1, size=4))
199 def _read_portion_done(mc):
200 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
201 d.addCallback(_read_portion_done)
202 d.addCallback(lambda ign:
203 n.read(MemoryConsumer(), offset=2, size=None))
204 def _read_tail_done(mc):
205 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
206 d.addCallback(_read_tail_done)
207 d.addCallback(lambda ign:
208 n.read(MemoryConsumer(), size=len(DATA)+1000))
209 def _read_too_much(mc):
210 self.failUnlessEqual("".join(mc.chunks), DATA)
211 d.addCallback(_read_too_much)
214 d.addCallback(_test_read)
216 def _test_bad_read(res):
217 bad_u = uri.from_string_filenode(self.uri)
218 bad_u.key = self.flip_bit(bad_u.key)
219 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
220 # this should cause an error during download
222 d = self.shouldFail2(NoSharesError, "'download bad node'",
224 bad_n.read, MemoryConsumer(), offset=2)
226 d.addCallback(_test_bad_read)
228 def _download_nonexistent_uri(res):
229 baduri = self.mangle_uri(self.uri)
230 log.msg("about to download non-existent URI", level=log.UNUSUAL,
231 facility="tahoe.tests")
232 d1 = self.downloader.download_to_data(baduri)
233 def _baduri_should_fail(res):
234 log.msg("finished downloading non-existend URI",
235 level=log.UNUSUAL, facility="tahoe.tests")
236 self.failUnless(isinstance(res, Failure))
237 self.failUnless(res.check(NoSharesError),
238 "expected NoSharesError, got %s" % res)
239 d1.addBoth(_baduri_should_fail)
241 d.addCallback(_download_nonexistent_uri)
243 # add a new node, which doesn't accept shares, and only uses the
245 d.addCallback(lambda res: self.add_extra_node(self.numclients,
247 add_to_sparent=True))
248 def _added(extra_node):
249 self.extra_node = extra_node
250 d.addCallback(_added)
252 HELPER_DATA = "Data that needs help to upload" * 1000
253 def _upload_with_helper(res):
254 u = upload.Data(HELPER_DATA, convergence=convergence)
255 d = self.extra_node.upload(u)
256 def _uploaded(results):
258 return self.downloader.download_to_data(uri)
259 d.addCallback(_uploaded)
261 self.failUnlessEqual(newdata, HELPER_DATA)
262 d.addCallback(_check)
264 d.addCallback(_upload_with_helper)
266 def _upload_duplicate_with_helper(res):
267 u = upload.Data(HELPER_DATA, convergence=convergence)
268 u.debug_stash_RemoteEncryptedUploadable = True
269 d = self.extra_node.upload(u)
270 def _uploaded(results):
272 return self.downloader.download_to_data(uri)
273 d.addCallback(_uploaded)
275 self.failUnlessEqual(newdata, HELPER_DATA)
276 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
277 "uploadable started uploading, should have been avoided")
278 d.addCallback(_check)
280 if convergence is not None:
281 d.addCallback(_upload_duplicate_with_helper)
283 def _upload_resumable(res):
284 DATA = "Data that needs help to upload and gets interrupted" * 1000
285 u1 = CountingDataUploadable(DATA, convergence=convergence)
286 u2 = CountingDataUploadable(DATA, convergence=convergence)
288 # we interrupt the connection after about 5kB by shutting down
289 # the helper, then restartingit.
290 u1.interrupt_after = 5000
291 u1.interrupt_after_d = defer.Deferred()
292 u1.interrupt_after_d.addCallback(lambda res:
293 self.bounce_client(0))
295 # sneak into the helper and reduce its chunk size, so that our
296 # debug_interrupt will sever the connection on about the fifth
297 # chunk fetched. This makes sure that we've started to write the
298 # new shares before we abandon them, which exercises the
299 # abort/delete-partial-share code. TODO: find a cleaner way to do
300 # this. I know that this will affect later uses of the helper in
301 # this same test run, but I'm not currently worried about it.
302 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
304 d = self.extra_node.upload(u1)
306 def _should_not_finish(res):
307 self.fail("interrupted upload should have failed, not finished"
308 " with result %s" % (res,))
310 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
312 # make sure we actually interrupted it before finishing the
314 self.failUnless(u1.bytes_read < len(DATA),
315 "read %d out of %d total" % (u1.bytes_read,
318 log.msg("waiting for reconnect", level=log.NOISY,
319 facility="tahoe.test.test_system")
320 # now, we need to give the nodes a chance to notice that this
321 # connection has gone away. When this happens, the storage
322 # servers will be told to abort their uploads, removing the
323 # partial shares. Unfortunately this involves TCP messages
324 # going through the loopback interface, and we can't easily
325 # predict how long that will take. If it were all local, we
326 # could use fireEventually() to stall. Since we don't have
327 # the right introduction hooks, the best we can do is use a
328 # fixed delay. TODO: this is fragile.
329 u1.interrupt_after_d.addCallback(self.stall, 2.0)
330 return u1.interrupt_after_d
331 d.addCallbacks(_should_not_finish, _interrupted)
333 def _disconnected(res):
334 # check to make sure the storage servers aren't still hanging
335 # on to the partial share: their incoming/ directories should
337 log.msg("disconnected", level=log.NOISY,
338 facility="tahoe.test.test_system")
339 for i in range(self.numclients):
340 incdir = os.path.join(self.getdir("client%d" % i),
341 "storage", "shares", "incoming")
342 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
343 d.addCallback(_disconnected)
345 # then we need to give the reconnector a chance to
346 # reestablish the connection to the helper.
347 d.addCallback(lambda res:
348 log.msg("wait_for_connections", level=log.NOISY,
349 facility="tahoe.test.test_system"))
350 d.addCallback(lambda res: self.wait_for_connections())
353 d.addCallback(lambda res:
354 log.msg("uploading again", level=log.NOISY,
355 facility="tahoe.test.test_system"))
356 d.addCallback(lambda res: self.extra_node.upload(u2))
358 def _uploaded(results):
360 log.msg("Second upload complete", level=log.NOISY,
361 facility="tahoe.test.test_system")
363 # this is really bytes received rather than sent, but it's
364 # convenient and basically measures the same thing
365 bytes_sent = results.ciphertext_fetched
367 # We currently don't support resumption of upload if the data is
368 # encrypted with a random key. (Because that would require us
369 # to store the key locally and re-use it on the next upload of
370 # this file, which isn't a bad thing to do, but we currently
372 if convergence is not None:
373 # Make sure we did not have to read the whole file the
374 # second time around .
375 self.failUnless(bytes_sent < len(DATA),
376 "resumption didn't save us any work:"
377 " read %d bytes out of %d total" %
378 (bytes_sent, len(DATA)))
380 # Make sure we did have to read the whole file the second
381 # time around -- because the one that we partially uploaded
382 # earlier was encrypted with a different random key.
383 self.failIf(bytes_sent < len(DATA),
384 "resumption saved us some work even though we were using random keys:"
385 " read %d bytes out of %d total" %
386 (bytes_sent, len(DATA)))
387 return self.downloader.download_to_data(uri)
388 d.addCallback(_uploaded)
391 self.failUnlessEqual(newdata, DATA)
392 # If using convergent encryption, then also check that the
393 # helper has removed the temp file from its directories.
394 if convergence is not None:
395 basedir = os.path.join(self.getdir("client0"), "helper")
396 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
397 self.failUnlessEqual(files, [])
398 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
399 self.failUnlessEqual(files, [])
400 d.addCallback(_check)
402 d.addCallback(_upload_resumable)
404 def _grab_stats(ignored):
405 # the StatsProvider doesn't normally publish a FURL:
406 # instead it passes a live reference to the StatsGatherer
407 # (if and when it connects). To exercise the remote stats
408 # interface, we manually publish client0's StatsProvider
409 # and use client1 to query it.
410 sp = self.clients[0].stats_provider
411 sp_furl = self.clients[0].tub.registerReference(sp)
412 d = self.clients[1].tub.getReference(sp_furl)
413 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
414 def _got_stats(stats):
416 #from pprint import pprint
419 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
420 c = stats["counters"]
421 self.failUnless("storage_server.allocate" in c)
422 d.addCallback(_got_stats)
424 d.addCallback(_grab_stats)
428 def _find_shares(self, basedir):
430 for (dirpath, dirnames, filenames) in os.walk(basedir):
431 if "storage" not in dirpath:
435 pieces = dirpath.split(os.sep)
437 and pieces[-4] == "storage"
438 and pieces[-3] == "shares"):
439 # we're sitting in .../storage/shares/$START/$SINDEX , and there
440 # are sharefiles here
441 assert pieces[-5].startswith("client")
442 client_num = int(pieces[-5][-1])
443 storage_index_s = pieces[-1]
444 storage_index = si_a2b(storage_index_s)
445 for sharename in filenames:
446 shnum = int(sharename)
447 filename = os.path.join(dirpath, sharename)
448 data = (client_num, storage_index, filename, shnum)
451 self.fail("unable to find any share files in %s" % basedir)
454 def _corrupt_mutable_share(self, filename, which):
455 msf = MutableShareFile(filename)
456 datav = msf.readv([ (0, 1000000) ])
457 final_share = datav[0]
458 assert len(final_share) < 1000000 # ought to be truncated
459 pieces = mutable_layout.unpack_share(final_share)
460 (seqnum, root_hash, IV, k, N, segsize, datalen,
461 verification_key, signature, share_hash_chain, block_hash_tree,
462 share_data, enc_privkey) = pieces
464 if which == "seqnum":
467 root_hash = self.flip_bit(root_hash)
469 IV = self.flip_bit(IV)
470 elif which == "segsize":
471 segsize = segsize + 15
472 elif which == "pubkey":
473 verification_key = self.flip_bit(verification_key)
474 elif which == "signature":
475 signature = self.flip_bit(signature)
476 elif which == "share_hash_chain":
477 nodenum = share_hash_chain.keys()[0]
478 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
479 elif which == "block_hash_tree":
480 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
481 elif which == "share_data":
482 share_data = self.flip_bit(share_data)
483 elif which == "encprivkey":
484 enc_privkey = self.flip_bit(enc_privkey)
486 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
488 final_share = mutable_layout.pack_share(prefix,
495 msf.writev( [(0, final_share)], None)
498 def test_mutable(self):
499 self.basedir = "system/SystemTest/test_mutable"
500 DATA = "initial contents go here." # 25 bytes % 3 != 0
501 NEWDATA = "new contents yay"
502 NEWERDATA = "this is getting old"
504 d = self.set_up_nodes(use_key_generator=True)
506 def _create_mutable(res):
508 log.msg("starting create_mutable_file")
509 d1 = c.create_mutable_file(DATA)
511 log.msg("DONE: %s" % (res,))
512 self._mutable_node_1 = res
514 d1.addCallback(_done)
516 d.addCallback(_create_mutable)
518 def _test_debug(res):
519 # find a share. It is important to run this while there is only
520 # one slot in the grid.
521 shares = self._find_shares(self.basedir)
522 (client_num, storage_index, filename, shnum) = shares[0]
523 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
525 log.msg(" for clients[%d]" % client_num)
527 out,err = StringIO(), StringIO()
528 rc = runner.runner(["debug", "dump-share", "--offsets",
530 stdout=out, stderr=err)
531 output = out.getvalue()
532 self.failUnlessEqual(rc, 0)
534 self.failUnless("Mutable slot found:\n" in output)
535 self.failUnless("share_type: SDMF\n" in output)
536 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
537 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
538 self.failUnless(" num_extra_leases: 0\n" in output)
539 # the pubkey size can vary by a byte, so the container might
540 # be a bit larger on some runs.
541 m = re.search(r'^ container_size: (\d+)$', output, re.M)
543 container_size = int(m.group(1))
544 self.failUnless(2037 <= container_size <= 2049, container_size)
545 m = re.search(r'^ data_length: (\d+)$', output, re.M)
547 data_length = int(m.group(1))
548 self.failUnless(2037 <= data_length <= 2049, data_length)
549 self.failUnless(" secrets are for nodeid: %s\n" % peerid
551 self.failUnless(" SDMF contents:\n" in output)
552 self.failUnless(" seqnum: 1\n" in output)
553 self.failUnless(" required_shares: 3\n" in output)
554 self.failUnless(" total_shares: 10\n" in output)
555 self.failUnless(" segsize: 27\n" in output, (output, filename))
556 self.failUnless(" datalen: 25\n" in output)
557 # the exact share_hash_chain nodes depends upon the sharenum,
558 # and is more of a hassle to compute than I want to deal with
560 self.failUnless(" share_hash_chain: " in output)
561 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
562 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
563 base32.b2a(storage_index))
564 self.failUnless(expected in output)
565 except unittest.FailTest:
567 print "dump-share output was:"
570 d.addCallback(_test_debug)
574 # first, let's see if we can use the existing node to retrieve the
575 # contents. This allows it to use the cached pubkey and maybe the
576 # latest-known sharemap.
578 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
579 def _check_download_1(res):
580 self.failUnlessEqual(res, DATA)
581 # now we see if we can retrieve the data from a new node,
582 # constructed using the URI of the original one. We do this test
583 # on the same client that uploaded the data.
584 uri = self._mutable_node_1.get_uri()
585 log.msg("starting retrieve1")
586 newnode = self.clients[0].create_node_from_uri(uri)
587 newnode_2 = self.clients[0].create_node_from_uri(uri)
588 self.failUnlessIdentical(newnode, newnode_2)
589 return newnode.download_best_version()
590 d.addCallback(_check_download_1)
592 def _check_download_2(res):
593 self.failUnlessEqual(res, DATA)
594 # same thing, but with a different client
595 uri = self._mutable_node_1.get_uri()
596 newnode = self.clients[1].create_node_from_uri(uri)
597 log.msg("starting retrieve2")
598 d1 = newnode.download_best_version()
599 d1.addCallback(lambda res: (res, newnode))
601 d.addCallback(_check_download_2)
603 def _check_download_3((res, newnode)):
604 self.failUnlessEqual(res, DATA)
606 log.msg("starting replace1")
607 d1 = newnode.overwrite(NEWDATA)
608 d1.addCallback(lambda res: newnode.download_best_version())
610 d.addCallback(_check_download_3)
612 def _check_download_4(res):
613 self.failUnlessEqual(res, NEWDATA)
614 # now create an even newer node and replace the data on it. This
615 # new node has never been used for download before.
616 uri = self._mutable_node_1.get_uri()
617 newnode1 = self.clients[2].create_node_from_uri(uri)
618 newnode2 = self.clients[3].create_node_from_uri(uri)
619 self._newnode3 = self.clients[3].create_node_from_uri(uri)
620 log.msg("starting replace2")
621 d1 = newnode1.overwrite(NEWERDATA)
622 d1.addCallback(lambda res: newnode2.download_best_version())
624 d.addCallback(_check_download_4)
626 def _check_download_5(res):
627 log.msg("finished replace2")
628 self.failUnlessEqual(res, NEWERDATA)
629 d.addCallback(_check_download_5)
631 def _corrupt_shares(res):
632 # run around and flip bits in all but k of the shares, to test
634 shares = self._find_shares(self.basedir)
635 ## sort by share number
636 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
637 where = dict([ (shnum, filename)
638 for (client_num, storage_index, filename, shnum)
640 assert len(where) == 10 # this test is designed for 3-of-10
641 for shnum, filename in where.items():
642 # shares 7,8,9 are left alone. read will check
643 # (share_hash_chain, block_hash_tree, share_data). New
644 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
645 # segsize, signature).
647 # read: this will trigger "pubkey doesn't match
649 self._corrupt_mutable_share(filename, "pubkey")
650 self._corrupt_mutable_share(filename, "encprivkey")
652 # triggers "signature is invalid"
653 self._corrupt_mutable_share(filename, "seqnum")
655 # triggers "signature is invalid"
656 self._corrupt_mutable_share(filename, "R")
658 # triggers "signature is invalid"
659 self._corrupt_mutable_share(filename, "segsize")
661 self._corrupt_mutable_share(filename, "share_hash_chain")
663 self._corrupt_mutable_share(filename, "block_hash_tree")
665 self._corrupt_mutable_share(filename, "share_data")
666 # other things to correct: IV, signature
667 # 7,8,9 are left alone
669 # note that initial_query_count=5 means that we'll hit the
670 # first 5 servers in effectively random order (based upon
671 # response time), so we won't necessarily ever get a "pubkey
672 # doesn't match fingerprint" error (if we hit shnum>=1 before
673 # shnum=0, we pull the pubkey from there). To get repeatable
674 # specific failures, we need to set initial_query_count=1,
675 # but of course that will change the sequencing behavior of
676 # the retrieval process. TODO: find a reasonable way to make
677 # this a parameter, probably when we expand this test to test
678 # for one failure mode at a time.
680 # when we retrieve this, we should get three signature
681 # failures (where we've mangled seqnum, R, and segsize). The
683 d.addCallback(_corrupt_shares)
685 d.addCallback(lambda res: self._newnode3.download_best_version())
686 d.addCallback(_check_download_5)
688 def _check_empty_file(res):
689 # make sure we can create empty files, this usually screws up the
691 d1 = self.clients[2].create_mutable_file("")
692 d1.addCallback(lambda newnode: newnode.download_best_version())
693 d1.addCallback(lambda res: self.failUnlessEqual("", res))
695 d.addCallback(_check_empty_file)
697 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
698 def _created_dirnode(dnode):
699 log.msg("_created_dirnode(%s)" % (dnode,))
701 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
702 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
703 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
704 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
705 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
706 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
707 d1.addCallback(lambda res: dnode.build_manifest().when_done())
708 d1.addCallback(lambda res:
709 self.failUnlessEqual(len(res["manifest"]), 1))
711 d.addCallback(_created_dirnode)
713 def wait_for_c3_kg_conn():
714 return self.clients[3]._key_generator is not None
715 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
717 def check_kg_poolsize(junk, size_delta):
718 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
719 self.key_generator_svc.key_generator.pool_size + size_delta)
721 d.addCallback(check_kg_poolsize, 0)
722 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
723 d.addCallback(check_kg_poolsize, -1)
724 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
725 d.addCallback(check_kg_poolsize, -2)
726 # use_helper induces use of clients[3], which is the using-key_gen client
727 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
728 d.addCallback(check_kg_poolsize, -3)
732 def flip_bit(self, good):
733 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
735 def mangle_uri(self, gooduri):
736 # change the key, which changes the storage index, which means we'll
737 # be asking about the wrong file, so nobody will have any shares
738 u = IFileURI(gooduri)
739 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
740 uri_extension_hash=u.uri_extension_hash,
741 needed_shares=u.needed_shares,
742 total_shares=u.total_shares,
744 return u2.to_string()
746 # TODO: add a test which mangles the uri_extension_hash instead, and
747 # should fail due to not being able to get a valid uri_extension block.
748 # Also a test which sneakily mangles the uri_extension block to change
749 # some of the validation data, so it will fail in the post-download phase
750 # when the file's crypttext integrity check fails. Do the same thing for
751 # the key, which should cause the download to fail the post-download
752 # plaintext_hash check.
754 def test_vdrive(self):
755 self.basedir = "system/SystemTest/test_vdrive"
756 self.data = LARGE_DATA
757 d = self.set_up_nodes(use_stats_gatherer=True)
758 d.addCallback(self._test_introweb)
759 d.addCallback(self.log, "starting publish")
760 d.addCallback(self._do_publish1)
761 d.addCallback(self._test_runner)
762 d.addCallback(self._do_publish2)
763 # at this point, we have the following filesystem (where "R" denotes
764 # self._root_directory_uri):
767 # R/subdir1/mydata567
769 # R/subdir1/subdir2/mydata992
771 d.addCallback(lambda res: self.bounce_client(0))
772 d.addCallback(self.log, "bounced client0")
774 d.addCallback(self._check_publish1)
775 d.addCallback(self.log, "did _check_publish1")
776 d.addCallback(self._check_publish2)
777 d.addCallback(self.log, "did _check_publish2")
778 d.addCallback(self._do_publish_private)
779 d.addCallback(self.log, "did _do_publish_private")
780 # now we also have (where "P" denotes a new dir):
781 # P/personal/sekrit data
782 # P/s2-rw -> /subdir1/subdir2/
783 # P/s2-ro -> /subdir1/subdir2/ (read-only)
784 d.addCallback(self._check_publish_private)
785 d.addCallback(self.log, "did _check_publish_private")
786 d.addCallback(self._test_web)
787 d.addCallback(self._test_control)
788 d.addCallback(self._test_cli)
789 # P now has four top-level children:
790 # P/personal/sekrit data
793 # P/test_put/ (empty)
794 d.addCallback(self._test_checker)
797 def _test_introweb(self, res):
798 d = getPage(self.introweb_url, method="GET", followRedirect=True)
801 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
803 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
804 self.failUnless("Subscription Summary: storage: 5" in res)
805 except unittest.FailTest:
807 print "GET %s output was:" % self.introweb_url
810 d.addCallback(_check)
811 d.addCallback(lambda res:
812 getPage(self.introweb_url + "?t=json",
813 method="GET", followRedirect=True))
814 def _check_json(res):
815 data = simplejson.loads(res)
817 self.failUnlessEqual(data["subscription_summary"],
819 self.failUnlessEqual(data["announcement_summary"],
820 {"storage": 5, "stub_client": 5})
821 self.failUnlessEqual(data["announcement_distinct_hosts"],
822 {"storage": 1, "stub_client": 1})
823 except unittest.FailTest:
825 print "GET %s?t=json output was:" % self.introweb_url
828 d.addCallback(_check_json)
831 def _do_publish1(self, res):
832 ut = upload.Data(self.data, convergence=None)
834 d = c0.create_empty_dirnode()
835 def _made_root(new_dirnode):
836 self._root_directory_uri = new_dirnode.get_uri()
837 return c0.create_node_from_uri(self._root_directory_uri)
838 d.addCallback(_made_root)
839 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
840 def _made_subdir1(subdir1_node):
841 self._subdir1_node = subdir1_node
842 d1 = subdir1_node.add_file(u"mydata567", ut)
843 d1.addCallback(self.log, "publish finished")
844 def _stash_uri(filenode):
845 self.uri = filenode.get_uri()
846 assert isinstance(self.uri, str), (self.uri, filenode)
847 d1.addCallback(_stash_uri)
849 d.addCallback(_made_subdir1)
852 def _do_publish2(self, res):
853 ut = upload.Data(self.data, convergence=None)
854 d = self._subdir1_node.create_empty_directory(u"subdir2")
855 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
858 def log(self, res, *args, **kwargs):
859 # print "MSG: %s RES: %s" % (msg, args)
860 log.msg(*args, **kwargs)
863 def _do_publish_private(self, res):
864 self.smalldata = "sssh, very secret stuff"
865 ut = upload.Data(self.smalldata, convergence=None)
866 d = self.clients[0].create_empty_dirnode()
867 d.addCallback(self.log, "GOT private directory")
868 def _got_new_dir(privnode):
869 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
870 d1 = privnode.create_empty_directory(u"personal")
871 d1.addCallback(self.log, "made P/personal")
872 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
873 d1.addCallback(self.log, "made P/personal/sekrit data")
874 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
876 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
877 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
879 d1.addCallback(_got_s2)
880 d1.addCallback(lambda res: privnode)
882 d.addCallback(_got_new_dir)
885 def _check_publish1(self, res):
886 # this one uses the iterative API
888 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
889 d.addCallback(self.log, "check_publish1 got /")
890 d.addCallback(lambda root: root.get(u"subdir1"))
891 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
892 d.addCallback(lambda filenode: filenode.download_to_data())
893 d.addCallback(self.log, "get finished")
895 self.failUnlessEqual(data, self.data)
896 d.addCallback(_get_done)
899 def _check_publish2(self, res):
900 # this one uses the path-based API
901 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
902 d = rootnode.get_child_at_path(u"subdir1")
903 d.addCallback(lambda dirnode:
904 self.failUnless(IDirectoryNode.providedBy(dirnode)))
905 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
906 d.addCallback(lambda filenode: filenode.download_to_data())
907 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
909 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
910 def _got_filenode(filenode):
911 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
912 assert fnode == filenode
913 d.addCallback(_got_filenode)
916 def _check_publish_private(self, resnode):
917 # this one uses the path-based API
918 self._private_node = resnode
920 d = self._private_node.get_child_at_path(u"personal")
921 def _got_personal(personal):
922 self._personal_node = personal
924 d.addCallback(_got_personal)
926 d.addCallback(lambda dirnode:
927 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
929 return self._private_node.get_child_at_path(path)
931 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
932 d.addCallback(lambda filenode: filenode.download_to_data())
933 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
934 d.addCallback(lambda res: get_path(u"s2-rw"))
935 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
936 d.addCallback(lambda res: get_path(u"s2-ro"))
937 def _got_s2ro(dirnode):
938 self.failUnless(dirnode.is_mutable(), dirnode)
939 self.failUnless(dirnode.is_readonly(), dirnode)
940 d1 = defer.succeed(None)
941 d1.addCallback(lambda res: dirnode.list())
942 d1.addCallback(self.log, "dirnode.list")
944 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
946 d1.addCallback(self.log, "doing add_file(ro)")
947 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
948 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
950 d1.addCallback(self.log, "doing get(ro)")
951 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
952 d1.addCallback(lambda filenode:
953 self.failUnless(IFileNode.providedBy(filenode)))
955 d1.addCallback(self.log, "doing delete(ro)")
956 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
958 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
960 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
962 personal = self._personal_node
963 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
965 d1.addCallback(self.log, "doing move_child_to(ro)2")
966 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
968 d1.addCallback(self.log, "finished with _got_s2ro")
970 d.addCallback(_got_s2ro)
971 def _got_home(dummy):
972 home = self._private_node
973 personal = self._personal_node
974 d1 = defer.succeed(None)
975 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
976 d1.addCallback(lambda res:
977 personal.move_child_to(u"sekrit data",home,u"sekrit"))
979 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
980 d1.addCallback(lambda res:
981 home.move_child_to(u"sekrit", home, u"sekrit data"))
983 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
984 d1.addCallback(lambda res:
985 home.move_child_to(u"sekrit data", personal))
987 d1.addCallback(lambda res: home.build_manifest().when_done())
988 d1.addCallback(self.log, "manifest")
992 # P/personal/sekrit data
993 # P/s2-rw (same as P/s2-ro)
994 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
995 d1.addCallback(lambda res:
996 self.failUnlessEqual(len(res["manifest"]), 5))
997 d1.addCallback(lambda res: home.start_deep_stats().when_done())
998 def _check_stats(stats):
999 expected = {"count-immutable-files": 1,
1000 "count-mutable-files": 0,
1001 "count-literal-files": 1,
1003 "count-directories": 3,
1004 "size-immutable-files": 112,
1005 "size-literal-files": 23,
1006 #"size-directories": 616, # varies
1007 #"largest-directory": 616,
1008 "largest-directory-children": 3,
1009 "largest-immutable-file": 112,
1011 for k,v in expected.iteritems():
1012 self.failUnlessEqual(stats[k], v,
1013 "stats[%s] was %s, not %s" %
1015 self.failUnless(stats["size-directories"] > 1300,
1016 stats["size-directories"])
1017 self.failUnless(stats["largest-directory"] > 800,
1018 stats["largest-directory"])
1019 self.failUnlessEqual(stats["size-files-histogram"],
1020 [ (11, 31, 1), (101, 316, 1) ])
1021 d1.addCallback(_check_stats)
1023 d.addCallback(_got_home)
1026 def shouldFail(self, res, expected_failure, which, substring=None):
1027 if isinstance(res, Failure):
1028 res.trap(expected_failure)
1030 self.failUnless(substring in str(res),
1031 "substring '%s' not in '%s'"
1032 % (substring, str(res)))
1034 self.fail("%s was supposed to raise %s, not get '%s'" %
1035 (which, expected_failure, res))
1037 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1038 assert substring is None or isinstance(substring, str)
1039 d = defer.maybeDeferred(callable, *args, **kwargs)
1041 if isinstance(res, Failure):
1042 res.trap(expected_failure)
1044 self.failUnless(substring in str(res),
1045 "substring '%s' not in '%s'"
1046 % (substring, str(res)))
1048 self.fail("%s was supposed to raise %s, not get '%s'" %
1049 (which, expected_failure, res))
1053 def PUT(self, urlpath, data):
1054 url = self.webish_url + urlpath
1055 return getPage(url, method="PUT", postdata=data)
1057 def GET(self, urlpath, followRedirect=False):
1058 url = self.webish_url + urlpath
1059 return getPage(url, method="GET", followRedirect=followRedirect)
1061 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1063 url = self.helper_webish_url + urlpath
1065 url = self.webish_url + urlpath
1066 sepbase = "boogabooga"
1067 sep = "--" + sepbase
1070 form.append('Content-Disposition: form-data; name="_charset"')
1072 form.append('UTF-8')
1074 for name, value in fields.iteritems():
1075 if isinstance(value, tuple):
1076 filename, value = value
1077 form.append('Content-Disposition: form-data; name="%s"; '
1078 'filename="%s"' % (name, filename.encode("utf-8")))
1080 form.append('Content-Disposition: form-data; name="%s"' % name)
1082 form.append(str(value))
1085 body = "\r\n".join(form) + "\r\n"
1086 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1088 return getPage(url, method="POST", postdata=body,
1089 headers=headers, followRedirect=followRedirect)
1091 def _test_web(self, res):
1092 base = self.webish_url
1093 public = "uri/" + self._root_directory_uri
1095 def _got_welcome(page):
1096 # XXX This test is oversensitive to formatting
1097 expected = "Connected to <span>%d</span>\n of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1098 self.failUnless(expected in page,
1099 "I didn't see the right 'connected storage servers'"
1100 " message in: %s" % page
1102 expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1103 self.failUnless(expected in page,
1104 "I didn't see the right 'My nodeid' message "
1106 self.failUnless("Helper: 0 active uploads" in page)
1107 d.addCallback(_got_welcome)
1108 d.addCallback(self.log, "done with _got_welcome")
1110 # get the welcome page from the node that uses the helper too
1111 d.addCallback(lambda res: getPage(self.helper_webish_url))
1112 def _got_welcome_helper(page):
1113 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1115 self.failUnless("Not running helper" in page)
1116 d.addCallback(_got_welcome_helper)
1118 d.addCallback(lambda res: getPage(base + public))
1119 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1120 def _got_subdir1(page):
1121 # there ought to be an href for our file
1122 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1123 self.failUnless(">mydata567</a>" in page)
1124 d.addCallback(_got_subdir1)
1125 d.addCallback(self.log, "done with _got_subdir1")
1126 d.addCallback(lambda res:
1127 getPage(base + public + "/subdir1/mydata567"))
1128 def _got_data(page):
1129 self.failUnlessEqual(page, self.data)
1130 d.addCallback(_got_data)
1132 # download from a URI embedded in a URL
1133 d.addCallback(self.log, "_get_from_uri")
1134 def _get_from_uri(res):
1135 return getPage(base + "uri/%s?filename=%s"
1136 % (self.uri, "mydata567"))
1137 d.addCallback(_get_from_uri)
1138 def _got_from_uri(page):
1139 self.failUnlessEqual(page, self.data)
1140 d.addCallback(_got_from_uri)
1142 # download from a URI embedded in a URL, second form
1143 d.addCallback(self.log, "_get_from_uri2")
1144 def _get_from_uri2(res):
1145 return getPage(base + "uri?uri=%s" % (self.uri,))
1146 d.addCallback(_get_from_uri2)
1147 d.addCallback(_got_from_uri)
1149 # download from a bogus URI, make sure we get a reasonable error
1150 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1151 def _get_from_bogus_uri(res):
1152 d1 = getPage(base + "uri/%s?filename=%s"
1153 % (self.mangle_uri(self.uri), "mydata567"))
1154 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1157 d.addCallback(_get_from_bogus_uri)
1158 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1160 # upload a file with PUT
1161 d.addCallback(self.log, "about to try PUT")
1162 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1163 "new.txt contents"))
1164 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1165 d.addCallback(self.failUnlessEqual, "new.txt contents")
1166 # and again with something large enough to use multiple segments,
1167 # and hopefully trigger pauseProducing too
1168 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1169 "big" * 500000)) # 1.5MB
1170 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1171 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1173 # can we replace files in place?
1174 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1176 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1177 d.addCallback(self.failUnlessEqual, "NEWER contents")
1179 # test unlinked POST
1180 d.addCallback(lambda res: self.POST("uri", t="upload",
1181 file=("new.txt", "data" * 10000)))
1182 # and again using the helper, which exercises different upload-status
1184 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1185 file=("foo.txt", "data2" * 10000)))
1187 # check that the status page exists
1188 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1189 def _got_status(res):
1190 # find an interesting upload and download to look at. LIT files
1191 # are not interesting.
1192 for ds in self.clients[0].list_all_download_statuses():
1193 if ds.get_size() > 200:
1194 self._down_status = ds.get_counter()
1195 for us in self.clients[0].list_all_upload_statuses():
1196 if us.get_size() > 200:
1197 self._up_status = us.get_counter()
1198 rs = list(self.clients[0].list_all_retrieve_statuses())[0]
1199 self._retrieve_status = rs.get_counter()
1200 ps = list(self.clients[0].list_all_publish_statuses())[0]
1201 self._publish_status = ps.get_counter()
1202 us = list(self.clients[0].list_all_mapupdate_statuses())[0]
1203 self._update_status = us.get_counter()
1205 # and that there are some upload- and download- status pages
1206 return self.GET("status/up-%d" % self._up_status)
1207 d.addCallback(_got_status)
1209 return self.GET("status/down-%d" % self._down_status)
1210 d.addCallback(_got_up)
1212 return self.GET("status/mapupdate-%d" % self._update_status)
1213 d.addCallback(_got_down)
1214 def _got_update(res):
1215 return self.GET("status/publish-%d" % self._publish_status)
1216 d.addCallback(_got_update)
1217 def _got_publish(res):
1218 return self.GET("status/retrieve-%d" % self._retrieve_status)
1219 d.addCallback(_got_publish)
1221 # check that the helper status page exists
1222 d.addCallback(lambda res:
1223 self.GET("helper_status", followRedirect=True))
1224 def _got_helper_status(res):
1225 self.failUnless("Bytes Fetched:" in res)
1226 # touch a couple of files in the helper's working directory to
1227 # exercise more code paths
1228 workdir = os.path.join(self.getdir("client0"), "helper")
1229 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1230 f = open(incfile, "wb")
1231 f.write("small file")
1233 then = time.time() - 86400*3
1235 os.utime(incfile, (now, then))
1236 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1237 f = open(encfile, "wb")
1238 f.write("less small file")
1240 os.utime(encfile, (now, then))
1241 d.addCallback(_got_helper_status)
1242 # and that the json form exists
1243 d.addCallback(lambda res:
1244 self.GET("helper_status?t=json", followRedirect=True))
1245 def _got_helper_status_json(res):
1246 data = simplejson.loads(res)
1247 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1249 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1250 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1251 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1253 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1254 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1255 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1257 d.addCallback(_got_helper_status_json)
1259 # and check that client[3] (which uses a helper but does not run one
1260 # itself) doesn't explode when you ask for its status
1261 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1262 def _got_non_helper_status(res):
1263 self.failUnless("Upload and Download Status" in res)
1264 d.addCallback(_got_non_helper_status)
1266 # or for helper status with t=json
1267 d.addCallback(lambda res:
1268 getPage(self.helper_webish_url + "helper_status?t=json"))
1269 def _got_non_helper_status_json(res):
1270 data = simplejson.loads(res)
1271 self.failUnlessEqual(data, {})
1272 d.addCallback(_got_non_helper_status_json)
1274 # see if the statistics page exists
1275 d.addCallback(lambda res: self.GET("statistics"))
1276 def _got_stats(res):
1277 self.failUnless("Node Statistics" in res)
1278 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1279 d.addCallback(_got_stats)
1280 d.addCallback(lambda res: self.GET("statistics?t=json"))
1281 def _got_stats_json(res):
1282 data = simplejson.loads(res)
1283 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1284 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1285 d.addCallback(_got_stats_json)
1287 # TODO: mangle the second segment of a file, to test errors that
1288 # occur after we've already sent some good data, which uses a
1289 # different error path.
1291 # TODO: download a URI with a form
1292 # TODO: create a directory by using a form
1293 # TODO: upload by using a form on the directory page
1294 # url = base + "somedir/subdir1/freeform_post!!upload"
1295 # TODO: delete a file by using a button on the directory page
1299 def _test_runner(self, res):
1300 # exercise some of the diagnostic tools in runner.py
1303 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1304 if "storage" not in dirpath:
1308 pieces = dirpath.split(os.sep)
1309 if (len(pieces) >= 4
1310 and pieces[-4] == "storage"
1311 and pieces[-3] == "shares"):
1312 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1313 # are sharefiles here
1314 filename = os.path.join(dirpath, filenames[0])
1315 # peek at the magic to see if it is a chk share
1316 magic = open(filename, "rb").read(4)
1317 if magic == '\x00\x00\x00\x01':
1320 self.fail("unable to find any uri_extension files in %s"
1322 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1324 out,err = StringIO(), StringIO()
1325 rc = runner.runner(["debug", "dump-share", "--offsets",
1327 stdout=out, stderr=err)
1328 output = out.getvalue()
1329 self.failUnlessEqual(rc, 0)
1331 # we only upload a single file, so we can assert some things about
1332 # its size and shares.
1333 self.failUnless(("share filename: %s" % filename) in output)
1334 self.failUnless("size: %d\n" % len(self.data) in output)
1335 self.failUnless("num_segments: 1\n" in output)
1336 # segment_size is always a multiple of needed_shares
1337 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1338 self.failUnless("total_shares: 10\n" in output)
1339 # keys which are supposed to be present
1340 for key in ("size", "num_segments", "segment_size",
1341 "needed_shares", "total_shares",
1342 "codec_name", "codec_params", "tail_codec_params",
1343 #"plaintext_hash", "plaintext_root_hash",
1344 "crypttext_hash", "crypttext_root_hash",
1345 "share_root_hash", "UEB_hash"):
1346 self.failUnless("%s: " % key in output, key)
1347 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1349 # now use its storage index to find the other shares using the
1350 # 'find-shares' tool
1351 sharedir, shnum = os.path.split(filename)
1352 storagedir, storage_index_s = os.path.split(sharedir)
1353 out,err = StringIO(), StringIO()
1354 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1355 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1356 rc = runner.runner(cmd, stdout=out, stderr=err)
1357 self.failUnlessEqual(rc, 0)
1359 sharefiles = [sfn.strip() for sfn in out.readlines()]
1360 self.failUnlessEqual(len(sharefiles), 10)
1362 # also exercise the 'catalog-shares' tool
1363 out,err = StringIO(), StringIO()
1364 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1365 cmd = ["debug", "catalog-shares"] + nodedirs
1366 rc = runner.runner(cmd, stdout=out, stderr=err)
1367 self.failUnlessEqual(rc, 0)
1369 descriptions = [sfn.strip() for sfn in out.readlines()]
1370 self.failUnlessEqual(len(descriptions), 30)
1372 for line in descriptions
1373 if line.startswith("CHK %s " % storage_index_s)]
1374 self.failUnlessEqual(len(matching), 10)
1376 def _test_control(self, res):
1377 # exercise the remote-control-the-client foolscap interfaces in
1378 # allmydata.control (mostly used for performance tests)
1379 c0 = self.clients[0]
1380 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1381 control_furl = open(control_furl_file, "r").read().strip()
1382 # it doesn't really matter which Tub we use to connect to the client,
1383 # so let's just use our IntroducerNode's
1384 d = self.introducer.tub.getReference(control_furl)
1385 d.addCallback(self._test_control2, control_furl_file)
1387 def _test_control2(self, rref, filename):
1388 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1389 downfile = os.path.join(self.basedir, "control.downfile")
1390 d.addCallback(lambda uri:
1391 rref.callRemote("download_from_uri_to_file",
1394 self.failUnlessEqual(res, downfile)
1395 data = open(downfile, "r").read()
1396 expected_data = open(filename, "r").read()
1397 self.failUnlessEqual(data, expected_data)
1398 d.addCallback(_check)
1399 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1400 if sys.platform == "linux2":
1401 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1402 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1405 def _test_cli(self, res):
1406 # run various CLI commands (in a thread, since they use blocking
1409 private_uri = self._private_node.get_uri()
1410 some_uri = self._root_directory_uri
1411 client0_basedir = self.getdir("client0")
1414 "--node-directory", client0_basedir,
1416 TESTDATA = "I will not write the same thing over and over.\n" * 100
1418 d = defer.succeed(None)
1420 # for compatibility with earlier versions, private/root_dir.cap is
1421 # supposed to be treated as an alias named "tahoe:". Start by making
1422 # sure that works, before we add other aliases.
1424 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1425 f = open(root_file, "w")
1426 f.write(private_uri)
1429 def run(ignored, verb, *args, **kwargs):
1430 stdin = kwargs.get("stdin", "")
1431 newargs = [verb] + nodeargs + list(args)
1432 return self._run_cli(newargs, stdin=stdin)
1434 def _check_ls((out,err), expected_children, unexpected_children=[]):
1435 self.failUnlessEqual(err, "")
1436 for s in expected_children:
1437 self.failUnless(s in out, (s,out))
1438 for s in unexpected_children:
1439 self.failIf(s in out, (s,out))
1441 def _check_ls_root((out,err)):
1442 self.failUnless("personal" in out)
1443 self.failUnless("s2-ro" in out)
1444 self.failUnless("s2-rw" in out)
1445 self.failUnlessEqual(err, "")
1447 # this should reference private_uri
1448 d.addCallback(run, "ls")
1449 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1451 d.addCallback(run, "list-aliases")
1452 def _check_aliases_1((out,err)):
1453 self.failUnlessEqual(err, "")
1454 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1455 d.addCallback(_check_aliases_1)
1457 # now that that's out of the way, remove root_dir.cap and work with
1459 d.addCallback(lambda res: os.unlink(root_file))
1460 d.addCallback(run, "list-aliases")
1461 def _check_aliases_2((out,err)):
1462 self.failUnlessEqual(err, "")
1463 self.failUnlessEqual(out, "")
1464 d.addCallback(_check_aliases_2)
1466 d.addCallback(run, "mkdir")
1467 def _got_dir( (out,err) ):
1468 self.failUnless(uri.from_string_dirnode(out.strip()))
1470 d.addCallback(_got_dir)
1471 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1473 d.addCallback(run, "list-aliases")
1474 def _check_aliases_3((out,err)):
1475 self.failUnlessEqual(err, "")
1476 self.failUnless("tahoe: " in out)
1477 d.addCallback(_check_aliases_3)
1479 def _check_empty_dir((out,err)):
1480 self.failUnlessEqual(out, "")
1481 self.failUnlessEqual(err, "")
1482 d.addCallback(run, "ls")
1483 d.addCallback(_check_empty_dir)
1485 def _check_missing_dir((out,err)):
1486 # TODO: check that rc==2
1487 self.failUnlessEqual(out, "")
1488 self.failUnlessEqual(err, "No such file or directory\n")
1489 d.addCallback(run, "ls", "bogus")
1490 d.addCallback(_check_missing_dir)
1495 fn = os.path.join(self.basedir, "file%d" % i)
1497 data = "data to be uploaded: file%d\n" % i
1499 open(fn,"wb").write(data)
1501 def _check_stdout_against((out,err), filenum=None, data=None):
1502 self.failUnlessEqual(err, "")
1503 if filenum is not None:
1504 self.failUnlessEqual(out, datas[filenum])
1505 if data is not None:
1506 self.failUnlessEqual(out, data)
1508 # test all both forms of put: from a file, and from stdin
1510 d.addCallback(run, "put", files[0], "tahoe-file0")
1511 def _put_out((out,err)):
1512 self.failUnless("URI:LIT:" in out, out)
1513 self.failUnless("201 Created" in err, err)
1515 return run(None, "get", uri0)
1516 d.addCallback(_put_out)
1517 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1519 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1520 # tahoe put bar tahoe:FOO
1521 d.addCallback(run, "put", files[2], "tahoe:file2")
1522 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1523 def _check_put_mutable((out,err)):
1524 self._mutable_file3_uri = out.strip()
1525 d.addCallback(_check_put_mutable)
1526 d.addCallback(run, "get", "tahoe:file3")
1527 d.addCallback(_check_stdout_against, 3)
1530 STDIN_DATA = "This is the file to upload from stdin."
1531 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1532 # tahoe put tahoe:FOO
1533 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1534 stdin="Other file from stdin.")
1536 d.addCallback(run, "ls")
1537 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1538 "tahoe-file-stdin", "from-stdin"])
1539 d.addCallback(run, "ls", "subdir")
1540 d.addCallback(_check_ls, ["tahoe-file1"])
1543 d.addCallback(run, "mkdir", "subdir2")
1544 d.addCallback(run, "ls")
1545 # TODO: extract the URI, set an alias with it
1546 d.addCallback(_check_ls, ["subdir2"])
1548 # tahoe get: (to stdin and to a file)
1549 d.addCallback(run, "get", "tahoe-file0")
1550 d.addCallback(_check_stdout_against, 0)
1551 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1552 d.addCallback(_check_stdout_against, 1)
1553 outfile0 = os.path.join(self.basedir, "outfile0")
1554 d.addCallback(run, "get", "file2", outfile0)
1555 def _check_outfile0((out,err)):
1556 data = open(outfile0,"rb").read()
1557 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1558 d.addCallback(_check_outfile0)
1559 outfile1 = os.path.join(self.basedir, "outfile0")
1560 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1561 def _check_outfile1((out,err)):
1562 data = open(outfile1,"rb").read()
1563 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1564 d.addCallback(_check_outfile1)
1566 d.addCallback(run, "rm", "tahoe-file0")
1567 d.addCallback(run, "rm", "tahoe:file2")
1568 d.addCallback(run, "ls")
1569 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1571 d.addCallback(run, "ls", "-l")
1572 def _check_ls_l((out,err)):
1573 lines = out.split("\n")
1575 if "tahoe-file-stdin" in l:
1576 self.failUnless(l.startswith("-r-- "), l)
1577 self.failUnless(" %d " % len(STDIN_DATA) in l)
1579 self.failUnless(l.startswith("-rw- "), l) # mutable
1580 d.addCallback(_check_ls_l)
1582 d.addCallback(run, "ls", "--uri")
1583 def _check_ls_uri((out,err)):
1584 lines = out.split("\n")
1587 self.failUnless(self._mutable_file3_uri in l)
1588 d.addCallback(_check_ls_uri)
1590 d.addCallback(run, "ls", "--readonly-uri")
1591 def _check_ls_rouri((out,err)):
1592 lines = out.split("\n")
1595 rw_uri = self._mutable_file3_uri
1596 u = uri.from_string_mutable_filenode(rw_uri)
1597 ro_uri = u.get_readonly().to_string()
1598 self.failUnless(ro_uri in l)
1599 d.addCallback(_check_ls_rouri)
1602 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1603 d.addCallback(run, "ls")
1604 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1606 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1607 d.addCallback(run, "ls")
1608 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1610 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1611 d.addCallback(run, "ls")
1612 d.addCallback(_check_ls, ["file3", "file3-copy"])
1613 d.addCallback(run, "get", "tahoe:file3-copy")
1614 d.addCallback(_check_stdout_against, 3)
1616 # copy from disk into tahoe
1617 d.addCallback(run, "cp", files[4], "tahoe:file4")
1618 d.addCallback(run, "ls")
1619 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1620 d.addCallback(run, "get", "tahoe:file4")
1621 d.addCallback(_check_stdout_against, 4)
1623 # copy from tahoe into disk
1624 target_filename = os.path.join(self.basedir, "file-out")
1625 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1626 def _check_cp_out((out,err)):
1627 self.failUnless(os.path.exists(target_filename))
1628 got = open(target_filename,"rb").read()
1629 self.failUnlessEqual(got, datas[4])
1630 d.addCallback(_check_cp_out)
1632 # copy from disk to disk (silly case)
1633 target2_filename = os.path.join(self.basedir, "file-out-copy")
1634 d.addCallback(run, "cp", target_filename, target2_filename)
1635 def _check_cp_out2((out,err)):
1636 self.failUnless(os.path.exists(target2_filename))
1637 got = open(target2_filename,"rb").read()
1638 self.failUnlessEqual(got, datas[4])
1639 d.addCallback(_check_cp_out2)
1641 # copy from tahoe into disk, overwriting an existing file
1642 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1643 def _check_cp_out3((out,err)):
1644 self.failUnless(os.path.exists(target_filename))
1645 got = open(target_filename,"rb").read()
1646 self.failUnlessEqual(got, datas[3])
1647 d.addCallback(_check_cp_out3)
1649 # copy from disk into tahoe, overwriting an existing immutable file
1650 d.addCallback(run, "cp", files[5], "tahoe:file4")
1651 d.addCallback(run, "ls")
1652 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1653 d.addCallback(run, "get", "tahoe:file4")
1654 d.addCallback(_check_stdout_against, 5)
1656 # copy from disk into tahoe, overwriting an existing mutable file
1657 d.addCallback(run, "cp", files[5], "tahoe:file3")
1658 d.addCallback(run, "ls")
1659 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1660 d.addCallback(run, "get", "tahoe:file3")
1661 d.addCallback(_check_stdout_against, 5)
1663 # recursive copy: setup
1664 dn = os.path.join(self.basedir, "dir1")
1666 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1667 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1668 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1669 sdn2 = os.path.join(dn, "subdir2")
1671 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1672 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1674 # from disk into tahoe
1675 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1676 d.addCallback(run, "ls")
1677 d.addCallback(_check_ls, ["dir1"])
1678 d.addCallback(run, "ls", "dir1")
1679 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1680 ["rfile4", "rfile5"])
1681 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1682 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1683 ["rfile1", "rfile2", "rfile3"])
1684 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1685 d.addCallback(_check_stdout_against, data="rfile4")
1687 # and back out again
1688 dn_copy = os.path.join(self.basedir, "dir1-copy")
1689 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1690 def _check_cp_r_out((out,err)):
1692 old = open(os.path.join(dn, name), "rb").read()
1693 newfn = os.path.join(dn_copy, name)
1694 self.failUnless(os.path.exists(newfn))
1695 new = open(newfn, "rb").read()
1696 self.failUnlessEqual(old, new)
1700 _cmp(os.path.join("subdir2", "rfile4"))
1701 _cmp(os.path.join("subdir2", "rfile5"))
1702 d.addCallback(_check_cp_r_out)
1704 # and copy it a second time, which ought to overwrite the same files
1705 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1707 # and again, only writing filecaps
1708 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1709 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1710 def _check_capsonly((out,err)):
1711 # these should all be LITs
1712 x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1713 y = uri.from_string_filenode(x)
1714 self.failUnlessEqual(y.data, "rfile4")
1715 d.addCallback(_check_capsonly)
1717 # and tahoe-to-tahoe
1718 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1719 d.addCallback(run, "ls")
1720 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1721 d.addCallback(run, "ls", "dir1-copy")
1722 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1723 ["rfile4", "rfile5"])
1724 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1725 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1726 ["rfile1", "rfile2", "rfile3"])
1727 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1728 d.addCallback(_check_stdout_against, data="rfile4")
1730 # and copy it a second time, which ought to overwrite the same files
1731 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1733 # tahoe_ls doesn't currently handle the error correctly: it tries to
1734 # JSON-parse a traceback.
1735 ## def _ls_missing(res):
1736 ## argv = ["ls"] + nodeargs + ["bogus"]
1737 ## return self._run_cli(argv)
1738 ## d.addCallback(_ls_missing)
1739 ## def _check_ls_missing((out,err)):
1742 ## self.failUnlessEqual(err, "")
1743 ## d.addCallback(_check_ls_missing)
1747 def _run_cli(self, argv, stdin=""):
1749 stdout, stderr = StringIO(), StringIO()
1750 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1751 stdin=StringIO(stdin),
1752 stdout=stdout, stderr=stderr)
1754 return stdout.getvalue(), stderr.getvalue()
1755 d.addCallback(_done)
1758 def _test_checker(self, res):
1759 ut = upload.Data("too big to be literal" * 200, convergence=None)
1760 d = self._personal_node.add_file(u"big file", ut)
1762 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1763 def _check_dirnode_results(r):
1764 self.failUnless(r.is_healthy())
1765 d.addCallback(_check_dirnode_results)
1766 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1767 d.addCallback(_check_dirnode_results)
1769 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1770 def _got_chk_filenode(n):
1771 self.failUnless(isinstance(n, filenode.FileNode))
1772 d = n.check(Monitor())
1773 def _check_filenode_results(r):
1774 self.failUnless(r.is_healthy())
1775 d.addCallback(_check_filenode_results)
1776 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1777 d.addCallback(_check_filenode_results)
1779 d.addCallback(_got_chk_filenode)
1781 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1782 def _got_lit_filenode(n):
1783 self.failUnless(isinstance(n, filenode.LiteralFileNode))
1784 d = n.check(Monitor())
1785 def _check_lit_filenode_results(r):
1786 self.failUnlessEqual(r, None)
1787 d.addCallback(_check_lit_filenode_results)
1788 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1789 d.addCallback(_check_lit_filenode_results)
1791 d.addCallback(_got_lit_filenode)