1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
11 from allmydata import uri
12 from allmydata.storage.mutable import MutableShareFile
13 from allmydata.storage.server import si_a2b
14 from allmydata.immutable import download, offloaded, upload
15 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
16 from allmydata.util import idlib, mathutil
17 from allmydata.util import log, base32
18 from allmydata.scripts import runner
19 from allmydata.interfaces import IDirectoryNode, IFileNode, \
20 NoSuchChildError, NoSharesError
21 from allmydata.monitor import Monitor
22 from allmydata.mutable.common import NotMutableError
23 from allmydata.mutable import layout as mutable_layout
24 from foolscap.api import DeadReferenceError
25 from twisted.python.failure import Failure
26 from twisted.web.client import getPage
27 from twisted.web.error import Error
29 from allmydata.test.common import SystemTestMixin, MemoryConsumer, \
33 This is some data to publish to the virtual drive, which needs to be large
34 enough to not fit inside a LIT uri.
37 class CountingDataUploadable(upload.Data):
39 interrupt_after = None
40 interrupt_after_d = None
42 def read(self, length):
43 self.bytes_read += length
44 if self.interrupt_after is not None:
45 if self.bytes_read > self.interrupt_after:
46 self.interrupt_after = None
47 self.interrupt_after_d.callback(self)
48 return upload.Data.read(self, length)
50 class GrabEverythingConsumer:
56 def registerProducer(self, producer, streaming):
58 assert IPushProducer.providedBy(producer)
60 def write(self, data):
63 def unregisterProducer(self):
66 class SystemTest(SystemTestMixin, unittest.TestCase):
67 timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
69 def test_connections(self):
70 self.basedir = "system/SystemTest/test_connections"
71 d = self.set_up_nodes()
72 self.extra_node = None
73 d.addCallback(lambda res: self.add_extra_node(self.numclients))
74 def _check(extra_node):
75 self.extra_node = extra_node
76 for c in self.clients:
77 all_peerids = c.get_storage_broker().get_all_serverids()
78 self.failUnlessEqual(len(all_peerids), self.numclients+1)
80 permuted_peers = sb.get_servers_for_index("a")
81 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
84 def _shutdown_extra_node(res):
86 return self.extra_node.stopService()
88 d.addBoth(_shutdown_extra_node)
90 # test_connections is subsumed by test_upload_and_download, and takes
91 # quite a while to run on a slow machine (because of all the TLS
92 # connections that must be established). If we ever rework the introducer
93 # code to such an extent that we're not sure if it works anymore, we can
94 # reinstate this test until it does.
97 def test_upload_and_download_random_key(self):
98 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
99 return self._test_upload_and_download(convergence=None)
101 def test_upload_and_download_convergent(self):
102 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
103 return self._test_upload_and_download(convergence="some convergence string")
105 def _test_upload_and_download(self, convergence):
106 # we use 4000 bytes of data, which will result in about 400k written
107 # to disk among all our simulated nodes
108 DATA = "Some data to upload\n" * 200
109 d = self.set_up_nodes()
110 def _check_connections(res):
111 for c in self.clients:
112 all_peerids = c.get_storage_broker().get_all_serverids()
113 self.failUnlessEqual(len(all_peerids), self.numclients)
114 sb = c.storage_broker
115 permuted_peers = sb.get_servers_for_index("a")
116 self.failUnlessEqual(len(permuted_peers), self.numclients)
117 d.addCallback(_check_connections)
121 u = self.clients[0].getServiceNamed("uploader")
123 # we crank the max segsize down to 1024b for the duration of this
124 # test, so we can exercise multiple segments. It is important
125 # that this is not a multiple of the segment size, so that the
126 # tail segment is not the same length as the others. This actualy
127 # gets rounded up to 1025 to be a multiple of the number of
128 # required shares (since we use 25 out of 100 FEC).
129 up = upload.Data(DATA, convergence=convergence)
130 up.max_segment_size = 1024
133 d.addCallback(_do_upload)
134 def _upload_done(results):
136 log.msg("upload finished: uri is %s" % (theuri,))
138 assert isinstance(self.uri, str), self.uri
139 self.cap = uri.from_string(self.uri)
140 self.downloader = self.clients[1].downloader
141 d.addCallback(_upload_done)
143 def _upload_again(res):
144 # Upload again. If using convergent encryption then this ought to be
145 # short-circuited, however with the way we currently generate URIs
146 # (i.e. because they include the roothash), we have to do all of the
147 # encoding work, and only get to save on the upload part.
148 log.msg("UPLOADING AGAIN")
149 up = upload.Data(DATA, convergence=convergence)
150 up.max_segment_size = 1024
151 d1 = self.uploader.upload(up)
152 d.addCallback(_upload_again)
154 def _download_to_data(res):
155 log.msg("DOWNLOADING")
156 return self.downloader.download_to_data(self.cap)
157 d.addCallback(_download_to_data)
158 def _download_to_data_done(data):
159 log.msg("download finished")
160 self.failUnlessEqual(data, DATA)
161 d.addCallback(_download_to_data_done)
163 target_filename = os.path.join(self.basedir, "download.target")
164 def _download_to_filename(res):
165 return self.downloader.download_to_filename(self.cap,
167 d.addCallback(_download_to_filename)
168 def _download_to_filename_done(res):
169 newdata = open(target_filename, "rb").read()
170 self.failUnlessEqual(newdata, DATA)
171 d.addCallback(_download_to_filename_done)
173 target_filename2 = os.path.join(self.basedir, "download.target2")
174 def _download_to_filehandle(res):
175 fh = open(target_filename2, "wb")
176 return self.downloader.download_to_filehandle(self.cap, fh)
177 d.addCallback(_download_to_filehandle)
178 def _download_to_filehandle_done(fh):
180 newdata = open(target_filename2, "rb").read()
181 self.failUnlessEqual(newdata, DATA)
182 d.addCallback(_download_to_filehandle_done)
184 consumer = GrabEverythingConsumer()
185 ct = download.ConsumerAdapter(consumer)
186 d.addCallback(lambda res:
187 self.downloader.download(self.cap, ct))
188 def _download_to_consumer_done(ign):
189 self.failUnlessEqual(consumer.contents, DATA)
190 d.addCallback(_download_to_consumer_done)
193 n = self.clients[1].create_node_from_uri(self.uri)
194 d = download_to_data(n)
195 def _read_done(data):
196 self.failUnlessEqual(data, DATA)
197 d.addCallback(_read_done)
198 d.addCallback(lambda ign:
199 n.read(MemoryConsumer(), offset=1, size=4))
200 def _read_portion_done(mc):
201 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
202 d.addCallback(_read_portion_done)
203 d.addCallback(lambda ign:
204 n.read(MemoryConsumer(), offset=2, size=None))
205 def _read_tail_done(mc):
206 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
207 d.addCallback(_read_tail_done)
208 d.addCallback(lambda ign:
209 n.read(MemoryConsumer(), size=len(DATA)+1000))
210 def _read_too_much(mc):
211 self.failUnlessEqual("".join(mc.chunks), DATA)
212 d.addCallback(_read_too_much)
215 d.addCallback(_test_read)
217 def _test_bad_read(res):
218 bad_u = uri.from_string_filenode(self.uri)
219 bad_u.key = self.flip_bit(bad_u.key)
220 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
221 # this should cause an error during download
223 d = self.shouldFail2(NoSharesError, "'download bad node'",
225 bad_n.read, MemoryConsumer(), offset=2)
227 d.addCallback(_test_bad_read)
229 def _download_nonexistent_uri(res):
230 baduri = self.mangle_uri(self.uri)
231 log.msg("about to download non-existent URI", level=log.UNUSUAL,
232 facility="tahoe.tests")
233 d1 = self.downloader.download_to_data(uri.from_string(baduri))
234 def _baduri_should_fail(res):
235 log.msg("finished downloading non-existend URI",
236 level=log.UNUSUAL, facility="tahoe.tests")
237 self.failUnless(isinstance(res, Failure))
238 self.failUnless(res.check(NoSharesError),
239 "expected NoSharesError, got %s" % res)
240 d1.addBoth(_baduri_should_fail)
242 d.addCallback(_download_nonexistent_uri)
244 # add a new node, which doesn't accept shares, and only uses the
246 d.addCallback(lambda res: self.add_extra_node(self.numclients,
248 add_to_sparent=True))
249 def _added(extra_node):
250 self.extra_node = extra_node
251 d.addCallback(_added)
253 HELPER_DATA = "Data that needs help to upload" * 1000
254 def _upload_with_helper(res):
255 u = upload.Data(HELPER_DATA, convergence=convergence)
256 d = self.extra_node.upload(u)
257 def _uploaded(results):
258 cap = uri.from_string(results.uri)
259 return self.downloader.download_to_data(cap)
260 d.addCallback(_uploaded)
262 self.failUnlessEqual(newdata, HELPER_DATA)
263 d.addCallback(_check)
265 d.addCallback(_upload_with_helper)
267 def _upload_duplicate_with_helper(res):
268 u = upload.Data(HELPER_DATA, convergence=convergence)
269 u.debug_stash_RemoteEncryptedUploadable = True
270 d = self.extra_node.upload(u)
271 def _uploaded(results):
272 cap = uri.from_string(results.uri)
273 return self.downloader.download_to_data(cap)
274 d.addCallback(_uploaded)
276 self.failUnlessEqual(newdata, HELPER_DATA)
277 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
278 "uploadable started uploading, should have been avoided")
279 d.addCallback(_check)
281 if convergence is not None:
282 d.addCallback(_upload_duplicate_with_helper)
284 def _upload_resumable(res):
285 DATA = "Data that needs help to upload and gets interrupted" * 1000
286 u1 = CountingDataUploadable(DATA, convergence=convergence)
287 u2 = CountingDataUploadable(DATA, convergence=convergence)
289 # we interrupt the connection after about 5kB by shutting down
290 # the helper, then restartingit.
291 u1.interrupt_after = 5000
292 u1.interrupt_after_d = defer.Deferred()
293 u1.interrupt_after_d.addCallback(lambda res:
294 self.bounce_client(0))
296 # sneak into the helper and reduce its chunk size, so that our
297 # debug_interrupt will sever the connection on about the fifth
298 # chunk fetched. This makes sure that we've started to write the
299 # new shares before we abandon them, which exercises the
300 # abort/delete-partial-share code. TODO: find a cleaner way to do
301 # this. I know that this will affect later uses of the helper in
302 # this same test run, but I'm not currently worried about it.
303 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
305 d = self.extra_node.upload(u1)
307 def _should_not_finish(res):
308 self.fail("interrupted upload should have failed, not finished"
309 " with result %s" % (res,))
311 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
313 # make sure we actually interrupted it before finishing the
315 self.failUnless(u1.bytes_read < len(DATA),
316 "read %d out of %d total" % (u1.bytes_read,
319 log.msg("waiting for reconnect", level=log.NOISY,
320 facility="tahoe.test.test_system")
321 # now, we need to give the nodes a chance to notice that this
322 # connection has gone away. When this happens, the storage
323 # servers will be told to abort their uploads, removing the
324 # partial shares. Unfortunately this involves TCP messages
325 # going through the loopback interface, and we can't easily
326 # predict how long that will take. If it were all local, we
327 # could use fireEventually() to stall. Since we don't have
328 # the right introduction hooks, the best we can do is use a
329 # fixed delay. TODO: this is fragile.
330 u1.interrupt_after_d.addCallback(self.stall, 2.0)
331 return u1.interrupt_after_d
332 d.addCallbacks(_should_not_finish, _interrupted)
334 def _disconnected(res):
335 # check to make sure the storage servers aren't still hanging
336 # on to the partial share: their incoming/ directories should
338 log.msg("disconnected", level=log.NOISY,
339 facility="tahoe.test.test_system")
340 for i in range(self.numclients):
341 incdir = os.path.join(self.getdir("client%d" % i),
342 "storage", "shares", "incoming")
343 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
344 d.addCallback(_disconnected)
346 # then we need to give the reconnector a chance to
347 # reestablish the connection to the helper.
348 d.addCallback(lambda res:
349 log.msg("wait_for_connections", level=log.NOISY,
350 facility="tahoe.test.test_system"))
351 d.addCallback(lambda res: self.wait_for_connections())
354 d.addCallback(lambda res:
355 log.msg("uploading again", level=log.NOISY,
356 facility="tahoe.test.test_system"))
357 d.addCallback(lambda res: self.extra_node.upload(u2))
359 def _uploaded(results):
360 cap = uri.from_string(results.uri)
361 log.msg("Second upload complete", level=log.NOISY,
362 facility="tahoe.test.test_system")
364 # this is really bytes received rather than sent, but it's
365 # convenient and basically measures the same thing
366 bytes_sent = results.ciphertext_fetched
368 # We currently don't support resumption of upload if the data is
369 # encrypted with a random key. (Because that would require us
370 # to store the key locally and re-use it on the next upload of
371 # this file, which isn't a bad thing to do, but we currently
373 if convergence is not None:
374 # Make sure we did not have to read the whole file the
375 # second time around .
376 self.failUnless(bytes_sent < len(DATA),
377 "resumption didn't save us any work:"
378 " read %d bytes out of %d total" %
379 (bytes_sent, len(DATA)))
381 # Make sure we did have to read the whole file the second
382 # time around -- because the one that we partially uploaded
383 # earlier was encrypted with a different random key.
384 self.failIf(bytes_sent < len(DATA),
385 "resumption saved us some work even though we were using random keys:"
386 " read %d bytes out of %d total" %
387 (bytes_sent, len(DATA)))
388 return self.downloader.download_to_data(cap)
389 d.addCallback(_uploaded)
392 self.failUnlessEqual(newdata, DATA)
393 # If using convergent encryption, then also check that the
394 # helper has removed the temp file from its directories.
395 if convergence is not None:
396 basedir = os.path.join(self.getdir("client0"), "helper")
397 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
398 self.failUnlessEqual(files, [])
399 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
400 self.failUnlessEqual(files, [])
401 d.addCallback(_check)
403 d.addCallback(_upload_resumable)
405 def _grab_stats(ignored):
406 # the StatsProvider doesn't normally publish a FURL:
407 # instead it passes a live reference to the StatsGatherer
408 # (if and when it connects). To exercise the remote stats
409 # interface, we manually publish client0's StatsProvider
410 # and use client1 to query it.
411 sp = self.clients[0].stats_provider
412 sp_furl = self.clients[0].tub.registerReference(sp)
413 d = self.clients[1].tub.getReference(sp_furl)
414 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
415 def _got_stats(stats):
417 #from pprint import pprint
420 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
421 c = stats["counters"]
422 self.failUnless("storage_server.allocate" in c)
423 d.addCallback(_got_stats)
425 d.addCallback(_grab_stats)
429 def _find_shares(self, basedir):
431 for (dirpath, dirnames, filenames) in os.walk(basedir):
432 if "storage" not in dirpath:
436 pieces = dirpath.split(os.sep)
438 and pieces[-4] == "storage"
439 and pieces[-3] == "shares"):
440 # we're sitting in .../storage/shares/$START/$SINDEX , and there
441 # are sharefiles here
442 assert pieces[-5].startswith("client")
443 client_num = int(pieces[-5][-1])
444 storage_index_s = pieces[-1]
445 storage_index = si_a2b(storage_index_s)
446 for sharename in filenames:
447 shnum = int(sharename)
448 filename = os.path.join(dirpath, sharename)
449 data = (client_num, storage_index, filename, shnum)
452 self.fail("unable to find any share files in %s" % basedir)
455 def _corrupt_mutable_share(self, filename, which):
456 msf = MutableShareFile(filename)
457 datav = msf.readv([ (0, 1000000) ])
458 final_share = datav[0]
459 assert len(final_share) < 1000000 # ought to be truncated
460 pieces = mutable_layout.unpack_share(final_share)
461 (seqnum, root_hash, IV, k, N, segsize, datalen,
462 verification_key, signature, share_hash_chain, block_hash_tree,
463 share_data, enc_privkey) = pieces
465 if which == "seqnum":
468 root_hash = self.flip_bit(root_hash)
470 IV = self.flip_bit(IV)
471 elif which == "segsize":
472 segsize = segsize + 15
473 elif which == "pubkey":
474 verification_key = self.flip_bit(verification_key)
475 elif which == "signature":
476 signature = self.flip_bit(signature)
477 elif which == "share_hash_chain":
478 nodenum = share_hash_chain.keys()[0]
479 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
480 elif which == "block_hash_tree":
481 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
482 elif which == "share_data":
483 share_data = self.flip_bit(share_data)
484 elif which == "encprivkey":
485 enc_privkey = self.flip_bit(enc_privkey)
487 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
489 final_share = mutable_layout.pack_share(prefix,
496 msf.writev( [(0, final_share)], None)
499 def test_mutable(self):
500 self.basedir = "system/SystemTest/test_mutable"
501 DATA = "initial contents go here." # 25 bytes % 3 != 0
502 NEWDATA = "new contents yay"
503 NEWERDATA = "this is getting old"
505 d = self.set_up_nodes(use_key_generator=True)
507 def _create_mutable(res):
509 log.msg("starting create_mutable_file")
510 d1 = c.create_mutable_file(DATA)
512 log.msg("DONE: %s" % (res,))
513 self._mutable_node_1 = res
515 d1.addCallback(_done)
517 d.addCallback(_create_mutable)
519 def _test_debug(res):
520 # find a share. It is important to run this while there is only
521 # one slot in the grid.
522 shares = self._find_shares(self.basedir)
523 (client_num, storage_index, filename, shnum) = shares[0]
524 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
526 log.msg(" for clients[%d]" % client_num)
528 out,err = StringIO(), StringIO()
529 rc = runner.runner(["debug", "dump-share", "--offsets",
531 stdout=out, stderr=err)
532 output = out.getvalue()
533 self.failUnlessEqual(rc, 0)
535 self.failUnless("Mutable slot found:\n" in output)
536 self.failUnless("share_type: SDMF\n" in output)
537 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
538 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
539 self.failUnless(" num_extra_leases: 0\n" in output)
540 self.failUnless(" secrets are for nodeid: %s\n" % peerid
542 self.failUnless(" SDMF contents:\n" in output)
543 self.failUnless(" seqnum: 1\n" in output)
544 self.failUnless(" required_shares: 3\n" in output)
545 self.failUnless(" total_shares: 10\n" in output)
546 self.failUnless(" segsize: 27\n" in output, (output, filename))
547 self.failUnless(" datalen: 25\n" in output)
548 # the exact share_hash_chain nodes depends upon the sharenum,
549 # and is more of a hassle to compute than I want to deal with
551 self.failUnless(" share_hash_chain: " in output)
552 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
553 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
554 base32.b2a(storage_index))
555 self.failUnless(expected in output)
556 except unittest.FailTest:
558 print "dump-share output was:"
561 d.addCallback(_test_debug)
565 # first, let's see if we can use the existing node to retrieve the
566 # contents. This allows it to use the cached pubkey and maybe the
567 # latest-known sharemap.
569 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
570 def _check_download_1(res):
571 self.failUnlessEqual(res, DATA)
572 # now we see if we can retrieve the data from a new node,
573 # constructed using the URI of the original one. We do this test
574 # on the same client that uploaded the data.
575 uri = self._mutable_node_1.get_uri()
576 log.msg("starting retrieve1")
577 newnode = self.clients[0].create_node_from_uri(uri)
578 newnode_2 = self.clients[0].create_node_from_uri(uri)
579 self.failUnlessIdentical(newnode, newnode_2)
580 return newnode.download_best_version()
581 d.addCallback(_check_download_1)
583 def _check_download_2(res):
584 self.failUnlessEqual(res, DATA)
585 # same thing, but with a different client
586 uri = self._mutable_node_1.get_uri()
587 newnode = self.clients[1].create_node_from_uri(uri)
588 log.msg("starting retrieve2")
589 d1 = newnode.download_best_version()
590 d1.addCallback(lambda res: (res, newnode))
592 d.addCallback(_check_download_2)
594 def _check_download_3((res, newnode)):
595 self.failUnlessEqual(res, DATA)
597 log.msg("starting replace1")
598 d1 = newnode.overwrite(NEWDATA)
599 d1.addCallback(lambda res: newnode.download_best_version())
601 d.addCallback(_check_download_3)
603 def _check_download_4(res):
604 self.failUnlessEqual(res, NEWDATA)
605 # now create an even newer node and replace the data on it. This
606 # new node has never been used for download before.
607 uri = self._mutable_node_1.get_uri()
608 newnode1 = self.clients[2].create_node_from_uri(uri)
609 newnode2 = self.clients[3].create_node_from_uri(uri)
610 self._newnode3 = self.clients[3].create_node_from_uri(uri)
611 log.msg("starting replace2")
612 d1 = newnode1.overwrite(NEWERDATA)
613 d1.addCallback(lambda res: newnode2.download_best_version())
615 d.addCallback(_check_download_4)
617 def _check_download_5(res):
618 log.msg("finished replace2")
619 self.failUnlessEqual(res, NEWERDATA)
620 d.addCallback(_check_download_5)
622 def _corrupt_shares(res):
623 # run around and flip bits in all but k of the shares, to test
625 shares = self._find_shares(self.basedir)
626 ## sort by share number
627 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
628 where = dict([ (shnum, filename)
629 for (client_num, storage_index, filename, shnum)
631 assert len(where) == 10 # this test is designed for 3-of-10
632 for shnum, filename in where.items():
633 # shares 7,8,9 are left alone. read will check
634 # (share_hash_chain, block_hash_tree, share_data). New
635 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
636 # segsize, signature).
638 # read: this will trigger "pubkey doesn't match
640 self._corrupt_mutable_share(filename, "pubkey")
641 self._corrupt_mutable_share(filename, "encprivkey")
643 # triggers "signature is invalid"
644 self._corrupt_mutable_share(filename, "seqnum")
646 # triggers "signature is invalid"
647 self._corrupt_mutable_share(filename, "R")
649 # triggers "signature is invalid"
650 self._corrupt_mutable_share(filename, "segsize")
652 self._corrupt_mutable_share(filename, "share_hash_chain")
654 self._corrupt_mutable_share(filename, "block_hash_tree")
656 self._corrupt_mutable_share(filename, "share_data")
657 # other things to correct: IV, signature
658 # 7,8,9 are left alone
660 # note that initial_query_count=5 means that we'll hit the
661 # first 5 servers in effectively random order (based upon
662 # response time), so we won't necessarily ever get a "pubkey
663 # doesn't match fingerprint" error (if we hit shnum>=1 before
664 # shnum=0, we pull the pubkey from there). To get repeatable
665 # specific failures, we need to set initial_query_count=1,
666 # but of course that will change the sequencing behavior of
667 # the retrieval process. TODO: find a reasonable way to make
668 # this a parameter, probably when we expand this test to test
669 # for one failure mode at a time.
671 # when we retrieve this, we should get three signature
672 # failures (where we've mangled seqnum, R, and segsize). The
674 d.addCallback(_corrupt_shares)
676 d.addCallback(lambda res: self._newnode3.download_best_version())
677 d.addCallback(_check_download_5)
679 def _check_empty_file(res):
680 # make sure we can create empty files, this usually screws up the
682 d1 = self.clients[2].create_mutable_file("")
683 d1.addCallback(lambda newnode: newnode.download_best_version())
684 d1.addCallback(lambda res: self.failUnlessEqual("", res))
686 d.addCallback(_check_empty_file)
688 d.addCallback(lambda res: self.clients[0].create_dirnode())
689 def _created_dirnode(dnode):
690 log.msg("_created_dirnode(%s)" % (dnode,))
692 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
693 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
694 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
695 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
696 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
697 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
698 d1.addCallback(lambda res: dnode.build_manifest().when_done())
699 d1.addCallback(lambda res:
700 self.failUnlessEqual(len(res["manifest"]), 1))
702 d.addCallback(_created_dirnode)
704 def wait_for_c3_kg_conn():
705 return self.clients[3]._key_generator is not None
706 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
708 def check_kg_poolsize(junk, size_delta):
709 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
710 self.key_generator_svc.key_generator.pool_size + size_delta)
712 d.addCallback(check_kg_poolsize, 0)
713 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
714 d.addCallback(check_kg_poolsize, -1)
715 d.addCallback(lambda junk: self.clients[3].create_dirnode())
716 d.addCallback(check_kg_poolsize, -2)
717 # use_helper induces use of clients[3], which is the using-key_gen client
718 d.addCallback(lambda junk:
719 self.POST("uri?t=mkdir&name=george", use_helper=True))
720 d.addCallback(check_kg_poolsize, -3)
724 def flip_bit(self, good):
725 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
727 def mangle_uri(self, gooduri):
728 # change the key, which changes the storage index, which means we'll
729 # be asking about the wrong file, so nobody will have any shares
730 u = uri.from_string(gooduri)
731 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
732 uri_extension_hash=u.uri_extension_hash,
733 needed_shares=u.needed_shares,
734 total_shares=u.total_shares,
736 return u2.to_string()
738 # TODO: add a test which mangles the uri_extension_hash instead, and
739 # should fail due to not being able to get a valid uri_extension block.
740 # Also a test which sneakily mangles the uri_extension block to change
741 # some of the validation data, so it will fail in the post-download phase
742 # when the file's crypttext integrity check fails. Do the same thing for
743 # the key, which should cause the download to fail the post-download
744 # plaintext_hash check.
746 def test_vdrive(self):
747 self.basedir = "system/SystemTest/test_vdrive"
748 self.data = LARGE_DATA
749 d = self.set_up_nodes(use_stats_gatherer=True)
750 d.addCallback(self._test_introweb)
751 d.addCallback(self.log, "starting publish")
752 d.addCallback(self._do_publish1)
753 d.addCallback(self._test_runner)
754 d.addCallback(self._do_publish2)
755 # at this point, we have the following filesystem (where "R" denotes
756 # self._root_directory_uri):
759 # R/subdir1/mydata567
761 # R/subdir1/subdir2/mydata992
763 d.addCallback(lambda res: self.bounce_client(0))
764 d.addCallback(self.log, "bounced client0")
766 d.addCallback(self._check_publish1)
767 d.addCallback(self.log, "did _check_publish1")
768 d.addCallback(self._check_publish2)
769 d.addCallback(self.log, "did _check_publish2")
770 d.addCallback(self._do_publish_private)
771 d.addCallback(self.log, "did _do_publish_private")
772 # now we also have (where "P" denotes a new dir):
773 # P/personal/sekrit data
774 # P/s2-rw -> /subdir1/subdir2/
775 # P/s2-ro -> /subdir1/subdir2/ (read-only)
776 d.addCallback(self._check_publish_private)
777 d.addCallback(self.log, "did _check_publish_private")
778 d.addCallback(self._test_web)
779 d.addCallback(self._test_control)
780 d.addCallback(self._test_cli)
781 # P now has four top-level children:
782 # P/personal/sekrit data
785 # P/test_put/ (empty)
786 d.addCallback(self._test_checker)
789 def _test_introweb(self, res):
790 d = getPage(self.introweb_url, method="GET", followRedirect=True)
793 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
795 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
796 self.failUnless("Subscription Summary: storage: 5" in res)
797 except unittest.FailTest:
799 print "GET %s output was:" % self.introweb_url
802 d.addCallback(_check)
803 d.addCallback(lambda res:
804 getPage(self.introweb_url + "?t=json",
805 method="GET", followRedirect=True))
806 def _check_json(res):
807 data = simplejson.loads(res)
809 self.failUnlessEqual(data["subscription_summary"],
811 self.failUnlessEqual(data["announcement_summary"],
812 {"storage": 5, "stub_client": 5})
813 self.failUnlessEqual(data["announcement_distinct_hosts"],
814 {"storage": 1, "stub_client": 1})
815 except unittest.FailTest:
817 print "GET %s?t=json output was:" % self.introweb_url
820 d.addCallback(_check_json)
823 def _do_publish1(self, res):
824 ut = upload.Data(self.data, convergence=None)
826 d = c0.create_dirnode()
827 def _made_root(new_dirnode):
828 self._root_directory_uri = new_dirnode.get_uri()
829 return c0.create_node_from_uri(self._root_directory_uri)
830 d.addCallback(_made_root)
831 d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
832 def _made_subdir1(subdir1_node):
833 self._subdir1_node = subdir1_node
834 d1 = subdir1_node.add_file(u"mydata567", ut)
835 d1.addCallback(self.log, "publish finished")
836 def _stash_uri(filenode):
837 self.uri = filenode.get_uri()
838 assert isinstance(self.uri, str), (self.uri, filenode)
839 d1.addCallback(_stash_uri)
841 d.addCallback(_made_subdir1)
844 def _do_publish2(self, res):
845 ut = upload.Data(self.data, convergence=None)
846 d = self._subdir1_node.create_subdirectory(u"subdir2")
847 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
850 def log(self, res, *args, **kwargs):
851 # print "MSG: %s RES: %s" % (msg, args)
852 log.msg(*args, **kwargs)
855 def _do_publish_private(self, res):
856 self.smalldata = "sssh, very secret stuff"
857 ut = upload.Data(self.smalldata, convergence=None)
858 d = self.clients[0].create_dirnode()
859 d.addCallback(self.log, "GOT private directory")
860 def _got_new_dir(privnode):
861 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
862 d1 = privnode.create_subdirectory(u"personal")
863 d1.addCallback(self.log, "made P/personal")
864 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
865 d1.addCallback(self.log, "made P/personal/sekrit data")
866 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
868 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
869 s2node.get_readonly_uri())
870 d2.addCallback(lambda node:
871 privnode.set_uri(u"s2-ro",
872 s2node.get_readonly_uri(),
873 s2node.get_readonly_uri()))
875 d1.addCallback(_got_s2)
876 d1.addCallback(lambda res: privnode)
878 d.addCallback(_got_new_dir)
881 def _check_publish1(self, res):
882 # this one uses the iterative API
884 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
885 d.addCallback(self.log, "check_publish1 got /")
886 d.addCallback(lambda root: root.get(u"subdir1"))
887 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
888 d.addCallback(lambda filenode: filenode.download_to_data())
889 d.addCallback(self.log, "get finished")
891 self.failUnlessEqual(data, self.data)
892 d.addCallback(_get_done)
895 def _check_publish2(self, res):
896 # this one uses the path-based API
897 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
898 d = rootnode.get_child_at_path(u"subdir1")
899 d.addCallback(lambda dirnode:
900 self.failUnless(IDirectoryNode.providedBy(dirnode)))
901 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
902 d.addCallback(lambda filenode: filenode.download_to_data())
903 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
905 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
906 def _got_filenode(filenode):
907 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
908 assert fnode == filenode
909 d.addCallback(_got_filenode)
912 def _check_publish_private(self, resnode):
913 # this one uses the path-based API
914 self._private_node = resnode
916 d = self._private_node.get_child_at_path(u"personal")
917 def _got_personal(personal):
918 self._personal_node = personal
920 d.addCallback(_got_personal)
922 d.addCallback(lambda dirnode:
923 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
925 return self._private_node.get_child_at_path(path)
927 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
928 d.addCallback(lambda filenode: filenode.download_to_data())
929 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
930 d.addCallback(lambda res: get_path(u"s2-rw"))
931 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
932 d.addCallback(lambda res: get_path(u"s2-ro"))
933 def _got_s2ro(dirnode):
934 self.failUnless(dirnode.is_mutable(), dirnode)
935 self.failUnless(dirnode.is_readonly(), dirnode)
936 d1 = defer.succeed(None)
937 d1.addCallback(lambda res: dirnode.list())
938 d1.addCallback(self.log, "dirnode.list")
940 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
942 d1.addCallback(self.log, "doing add_file(ro)")
943 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
944 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
946 d1.addCallback(self.log, "doing get(ro)")
947 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
948 d1.addCallback(lambda filenode:
949 self.failUnless(IFileNode.providedBy(filenode)))
951 d1.addCallback(self.log, "doing delete(ro)")
952 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
954 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
956 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
958 personal = self._personal_node
959 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
961 d1.addCallback(self.log, "doing move_child_to(ro)2")
962 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
964 d1.addCallback(self.log, "finished with _got_s2ro")
966 d.addCallback(_got_s2ro)
967 def _got_home(dummy):
968 home = self._private_node
969 personal = self._personal_node
970 d1 = defer.succeed(None)
971 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
972 d1.addCallback(lambda res:
973 personal.move_child_to(u"sekrit data",home,u"sekrit"))
975 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
976 d1.addCallback(lambda res:
977 home.move_child_to(u"sekrit", home, u"sekrit data"))
979 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
980 d1.addCallback(lambda res:
981 home.move_child_to(u"sekrit data", personal))
983 d1.addCallback(lambda res: home.build_manifest().when_done())
984 d1.addCallback(self.log, "manifest")
988 # P/personal/sekrit data
989 # P/s2-rw (same as P/s2-ro)
990 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
991 d1.addCallback(lambda res:
992 self.failUnlessEqual(len(res["manifest"]), 5))
993 d1.addCallback(lambda res: home.start_deep_stats().when_done())
994 def _check_stats(stats):
995 expected = {"count-immutable-files": 1,
996 "count-mutable-files": 0,
997 "count-literal-files": 1,
999 "count-directories": 3,
1000 "size-immutable-files": 112,
1001 "size-literal-files": 23,
1002 #"size-directories": 616, # varies
1003 #"largest-directory": 616,
1004 "largest-directory-children": 3,
1005 "largest-immutable-file": 112,
1007 for k,v in expected.iteritems():
1008 self.failUnlessEqual(stats[k], v,
1009 "stats[%s] was %s, not %s" %
1011 self.failUnless(stats["size-directories"] > 1300,
1012 stats["size-directories"])
1013 self.failUnless(stats["largest-directory"] > 800,
1014 stats["largest-directory"])
1015 self.failUnlessEqual(stats["size-files-histogram"],
1016 [ (11, 31, 1), (101, 316, 1) ])
1017 d1.addCallback(_check_stats)
1019 d.addCallback(_got_home)
1022 def shouldFail(self, res, expected_failure, which, substring=None):
1023 if isinstance(res, Failure):
1024 res.trap(expected_failure)
1026 self.failUnless(substring in str(res),
1027 "substring '%s' not in '%s'"
1028 % (substring, str(res)))
1030 self.fail("%s was supposed to raise %s, not get '%s'" %
1031 (which, expected_failure, res))
1033 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1034 assert substring is None or isinstance(substring, str)
1035 d = defer.maybeDeferred(callable, *args, **kwargs)
1037 if isinstance(res, Failure):
1038 res.trap(expected_failure)
1040 self.failUnless(substring in str(res),
1041 "substring '%s' not in '%s'"
1042 % (substring, str(res)))
1044 self.fail("%s was supposed to raise %s, not get '%s'" %
1045 (which, expected_failure, res))
1049 def PUT(self, urlpath, data):
1050 url = self.webish_url + urlpath
1051 return getPage(url, method="PUT", postdata=data)
1053 def GET(self, urlpath, followRedirect=False):
1054 url = self.webish_url + urlpath
1055 return getPage(url, method="GET", followRedirect=followRedirect)
1057 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1058 sepbase = "boogabooga"
1059 sep = "--" + sepbase
1062 form.append('Content-Disposition: form-data; name="_charset"')
1064 form.append('UTF-8')
1066 for name, value in fields.iteritems():
1067 if isinstance(value, tuple):
1068 filename, value = value
1069 form.append('Content-Disposition: form-data; name="%s"; '
1070 'filename="%s"' % (name, filename.encode("utf-8")))
1072 form.append('Content-Disposition: form-data; name="%s"' % name)
1074 form.append(str(value))
1080 body = "\r\n".join(form) + "\r\n"
1081 headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1082 return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1084 def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1087 url = self.helper_webish_url + urlpath
1089 url = self.webish_url + urlpath
1090 return getPage(url, method="POST", postdata=body, headers=headers,
1091 followRedirect=followRedirect)
1093 def _test_web(self, res):
1094 base = self.webish_url
1095 public = "uri/" + self._root_directory_uri
1097 def _got_welcome(page):
1098 # XXX This test is oversensitive to formatting
1099 expected = "Connected to <span>%d</span>\n of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1100 self.failUnless(expected in page,
1101 "I didn't see the right 'connected storage servers'"
1102 " message in: %s" % page
1104 expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1105 self.failUnless(expected in page,
1106 "I didn't see the right 'My nodeid' message "
1108 self.failUnless("Helper: 0 active uploads" in page)
1109 d.addCallback(_got_welcome)
1110 d.addCallback(self.log, "done with _got_welcome")
1112 # get the welcome page from the node that uses the helper too
1113 d.addCallback(lambda res: getPage(self.helper_webish_url))
1114 def _got_welcome_helper(page):
1115 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1117 self.failUnless("Not running helper" in page)
1118 d.addCallback(_got_welcome_helper)
1120 d.addCallback(lambda res: getPage(base + public))
1121 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1122 def _got_subdir1(page):
1123 # there ought to be an href for our file
1124 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1125 self.failUnless(">mydata567</a>" in page)
1126 d.addCallback(_got_subdir1)
1127 d.addCallback(self.log, "done with _got_subdir1")
1128 d.addCallback(lambda res:
1129 getPage(base + public + "/subdir1/mydata567"))
1130 def _got_data(page):
1131 self.failUnlessEqual(page, self.data)
1132 d.addCallback(_got_data)
1134 # download from a URI embedded in a URL
1135 d.addCallback(self.log, "_get_from_uri")
1136 def _get_from_uri(res):
1137 return getPage(base + "uri/%s?filename=%s"
1138 % (self.uri, "mydata567"))
1139 d.addCallback(_get_from_uri)
1140 def _got_from_uri(page):
1141 self.failUnlessEqual(page, self.data)
1142 d.addCallback(_got_from_uri)
1144 # download from a URI embedded in a URL, second form
1145 d.addCallback(self.log, "_get_from_uri2")
1146 def _get_from_uri2(res):
1147 return getPage(base + "uri?uri=%s" % (self.uri,))
1148 d.addCallback(_get_from_uri2)
1149 d.addCallback(_got_from_uri)
1151 # download from a bogus URI, make sure we get a reasonable error
1152 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1153 def _get_from_bogus_uri(res):
1154 d1 = getPage(base + "uri/%s?filename=%s"
1155 % (self.mangle_uri(self.uri), "mydata567"))
1156 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1159 d.addCallback(_get_from_bogus_uri)
1160 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1162 # upload a file with PUT
1163 d.addCallback(self.log, "about to try PUT")
1164 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1165 "new.txt contents"))
1166 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1167 d.addCallback(self.failUnlessEqual, "new.txt contents")
1168 # and again with something large enough to use multiple segments,
1169 # and hopefully trigger pauseProducing too
1170 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1171 "big" * 500000)) # 1.5MB
1172 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1173 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1175 # can we replace files in place?
1176 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1178 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1179 d.addCallback(self.failUnlessEqual, "NEWER contents")
1181 # test unlinked POST
1182 d.addCallback(lambda res: self.POST("uri", t="upload",
1183 file=("new.txt", "data" * 10000)))
1184 # and again using the helper, which exercises different upload-status
1186 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1187 file=("foo.txt", "data2" * 10000)))
1189 # check that the status page exists
1190 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1191 def _got_status(res):
1192 # find an interesting upload and download to look at. LIT files
1193 # are not interesting.
1194 h = self.clients[0].get_history()
1195 for ds in h.list_all_download_statuses():
1196 if ds.get_size() > 200:
1197 self._down_status = ds.get_counter()
1198 for us in h.list_all_upload_statuses():
1199 if us.get_size() > 200:
1200 self._up_status = us.get_counter()
1201 rs = list(h.list_all_retrieve_statuses())[0]
1202 self._retrieve_status = rs.get_counter()
1203 ps = list(h.list_all_publish_statuses())[0]
1204 self._publish_status = ps.get_counter()
1205 us = list(h.list_all_mapupdate_statuses())[0]
1206 self._update_status = us.get_counter()
1208 # and that there are some upload- and download- status pages
1209 return self.GET("status/up-%d" % self._up_status)
1210 d.addCallback(_got_status)
1212 return self.GET("status/down-%d" % self._down_status)
1213 d.addCallback(_got_up)
1215 return self.GET("status/mapupdate-%d" % self._update_status)
1216 d.addCallback(_got_down)
1217 def _got_update(res):
1218 return self.GET("status/publish-%d" % self._publish_status)
1219 d.addCallback(_got_update)
1220 def _got_publish(res):
1221 return self.GET("status/retrieve-%d" % self._retrieve_status)
1222 d.addCallback(_got_publish)
1224 # check that the helper status page exists
1225 d.addCallback(lambda res:
1226 self.GET("helper_status", followRedirect=True))
1227 def _got_helper_status(res):
1228 self.failUnless("Bytes Fetched:" in res)
1229 # touch a couple of files in the helper's working directory to
1230 # exercise more code paths
1231 workdir = os.path.join(self.getdir("client0"), "helper")
1232 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1233 f = open(incfile, "wb")
1234 f.write("small file")
1236 then = time.time() - 86400*3
1238 os.utime(incfile, (now, then))
1239 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1240 f = open(encfile, "wb")
1241 f.write("less small file")
1243 os.utime(encfile, (now, then))
1244 d.addCallback(_got_helper_status)
1245 # and that the json form exists
1246 d.addCallback(lambda res:
1247 self.GET("helper_status?t=json", followRedirect=True))
1248 def _got_helper_status_json(res):
1249 data = simplejson.loads(res)
1250 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1252 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1253 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1254 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1256 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1257 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1258 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1260 d.addCallback(_got_helper_status_json)
1262 # and check that client[3] (which uses a helper but does not run one
1263 # itself) doesn't explode when you ask for its status
1264 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1265 def _got_non_helper_status(res):
1266 self.failUnless("Upload and Download Status" in res)
1267 d.addCallback(_got_non_helper_status)
1269 # or for helper status with t=json
1270 d.addCallback(lambda res:
1271 getPage(self.helper_webish_url + "helper_status?t=json"))
1272 def _got_non_helper_status_json(res):
1273 data = simplejson.loads(res)
1274 self.failUnlessEqual(data, {})
1275 d.addCallback(_got_non_helper_status_json)
1277 # see if the statistics page exists
1278 d.addCallback(lambda res: self.GET("statistics"))
1279 def _got_stats(res):
1280 self.failUnless("Node Statistics" in res)
1281 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1282 d.addCallback(_got_stats)
1283 d.addCallback(lambda res: self.GET("statistics?t=json"))
1284 def _got_stats_json(res):
1285 data = simplejson.loads(res)
1286 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1287 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1288 d.addCallback(_got_stats_json)
1290 # TODO: mangle the second segment of a file, to test errors that
1291 # occur after we've already sent some good data, which uses a
1292 # different error path.
1294 # TODO: download a URI with a form
1295 # TODO: create a directory by using a form
1296 # TODO: upload by using a form on the directory page
1297 # url = base + "somedir/subdir1/freeform_post!!upload"
1298 # TODO: delete a file by using a button on the directory page
1302 def _test_runner(self, res):
1303 # exercise some of the diagnostic tools in runner.py
1306 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1307 if "storage" not in dirpath:
1311 pieces = dirpath.split(os.sep)
1312 if (len(pieces) >= 4
1313 and pieces[-4] == "storage"
1314 and pieces[-3] == "shares"):
1315 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1316 # are sharefiles here
1317 filename = os.path.join(dirpath, filenames[0])
1318 # peek at the magic to see if it is a chk share
1319 magic = open(filename, "rb").read(4)
1320 if magic == '\x00\x00\x00\x01':
1323 self.fail("unable to find any uri_extension files in %s"
1325 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1327 out,err = StringIO(), StringIO()
1328 rc = runner.runner(["debug", "dump-share", "--offsets",
1330 stdout=out, stderr=err)
1331 output = out.getvalue()
1332 self.failUnlessEqual(rc, 0)
1334 # we only upload a single file, so we can assert some things about
1335 # its size and shares.
1336 self.failUnless(("share filename: %s" % filename) in output)
1337 self.failUnless("size: %d\n" % len(self.data) in output)
1338 self.failUnless("num_segments: 1\n" in output)
1339 # segment_size is always a multiple of needed_shares
1340 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1341 self.failUnless("total_shares: 10\n" in output)
1342 # keys which are supposed to be present
1343 for key in ("size", "num_segments", "segment_size",
1344 "needed_shares", "total_shares",
1345 "codec_name", "codec_params", "tail_codec_params",
1346 #"plaintext_hash", "plaintext_root_hash",
1347 "crypttext_hash", "crypttext_root_hash",
1348 "share_root_hash", "UEB_hash"):
1349 self.failUnless("%s: " % key in output, key)
1350 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1352 # now use its storage index to find the other shares using the
1353 # 'find-shares' tool
1354 sharedir, shnum = os.path.split(filename)
1355 storagedir, storage_index_s = os.path.split(sharedir)
1356 out,err = StringIO(), StringIO()
1357 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1358 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1359 rc = runner.runner(cmd, stdout=out, stderr=err)
1360 self.failUnlessEqual(rc, 0)
1362 sharefiles = [sfn.strip() for sfn in out.readlines()]
1363 self.failUnlessEqual(len(sharefiles), 10)
1365 # also exercise the 'catalog-shares' tool
1366 out,err = StringIO(), StringIO()
1367 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1368 cmd = ["debug", "catalog-shares"] + nodedirs
1369 rc = runner.runner(cmd, stdout=out, stderr=err)
1370 self.failUnlessEqual(rc, 0)
1372 descriptions = [sfn.strip() for sfn in out.readlines()]
1373 self.failUnlessEqual(len(descriptions), 30)
1375 for line in descriptions
1376 if line.startswith("CHK %s " % storage_index_s)]
1377 self.failUnlessEqual(len(matching), 10)
1379 def _test_control(self, res):
1380 # exercise the remote-control-the-client foolscap interfaces in
1381 # allmydata.control (mostly used for performance tests)
1382 c0 = self.clients[0]
1383 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1384 control_furl = open(control_furl_file, "r").read().strip()
1385 # it doesn't really matter which Tub we use to connect to the client,
1386 # so let's just use our IntroducerNode's
1387 d = self.introducer.tub.getReference(control_furl)
1388 d.addCallback(self._test_control2, control_furl_file)
1390 def _test_control2(self, rref, filename):
1391 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1392 downfile = os.path.join(self.basedir, "control.downfile")
1393 d.addCallback(lambda uri:
1394 rref.callRemote("download_from_uri_to_file",
1397 self.failUnlessEqual(res, downfile)
1398 data = open(downfile, "r").read()
1399 expected_data = open(filename, "r").read()
1400 self.failUnlessEqual(data, expected_data)
1401 d.addCallback(_check)
1402 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1403 if sys.platform == "linux2":
1404 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1405 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1408 def _test_cli(self, res):
1409 # run various CLI commands (in a thread, since they use blocking
1412 private_uri = self._private_node.get_uri()
1413 some_uri = self._root_directory_uri
1414 client0_basedir = self.getdir("client0")
1417 "--node-directory", client0_basedir,
1419 TESTDATA = "I will not write the same thing over and over.\n" * 100
1421 d = defer.succeed(None)
1423 # for compatibility with earlier versions, private/root_dir.cap is
1424 # supposed to be treated as an alias named "tahoe:". Start by making
1425 # sure that works, before we add other aliases.
1427 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1428 f = open(root_file, "w")
1429 f.write(private_uri)
1432 def run(ignored, verb, *args, **kwargs):
1433 stdin = kwargs.get("stdin", "")
1434 newargs = [verb] + nodeargs + list(args)
1435 return self._run_cli(newargs, stdin=stdin)
1437 def _check_ls((out,err), expected_children, unexpected_children=[]):
1438 self.failUnlessEqual(err, "")
1439 for s in expected_children:
1440 self.failUnless(s in out, (s,out))
1441 for s in unexpected_children:
1442 self.failIf(s in out, (s,out))
1444 def _check_ls_root((out,err)):
1445 self.failUnless("personal" in out)
1446 self.failUnless("s2-ro" in out)
1447 self.failUnless("s2-rw" in out)
1448 self.failUnlessEqual(err, "")
1450 # this should reference private_uri
1451 d.addCallback(run, "ls")
1452 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1454 d.addCallback(run, "list-aliases")
1455 def _check_aliases_1((out,err)):
1456 self.failUnlessEqual(err, "")
1457 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1458 d.addCallback(_check_aliases_1)
1460 # now that that's out of the way, remove root_dir.cap and work with
1462 d.addCallback(lambda res: os.unlink(root_file))
1463 d.addCallback(run, "list-aliases")
1464 def _check_aliases_2((out,err)):
1465 self.failUnlessEqual(err, "")
1466 self.failUnlessEqual(out, "")
1467 d.addCallback(_check_aliases_2)
1469 d.addCallback(run, "mkdir")
1470 def _got_dir( (out,err) ):
1471 self.failUnless(uri.from_string_dirnode(out.strip()))
1473 d.addCallback(_got_dir)
1474 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1476 d.addCallback(run, "list-aliases")
1477 def _check_aliases_3((out,err)):
1478 self.failUnlessEqual(err, "")
1479 self.failUnless("tahoe: " in out)
1480 d.addCallback(_check_aliases_3)
1482 def _check_empty_dir((out,err)):
1483 self.failUnlessEqual(out, "")
1484 self.failUnlessEqual(err, "")
1485 d.addCallback(run, "ls")
1486 d.addCallback(_check_empty_dir)
1488 def _check_missing_dir((out,err)):
1489 # TODO: check that rc==2
1490 self.failUnlessEqual(out, "")
1491 self.failUnlessEqual(err, "No such file or directory\n")
1492 d.addCallback(run, "ls", "bogus")
1493 d.addCallback(_check_missing_dir)
1498 fn = os.path.join(self.basedir, "file%d" % i)
1500 data = "data to be uploaded: file%d\n" % i
1502 open(fn,"wb").write(data)
1504 def _check_stdout_against((out,err), filenum=None, data=None):
1505 self.failUnlessEqual(err, "")
1506 if filenum is not None:
1507 self.failUnlessEqual(out, datas[filenum])
1508 if data is not None:
1509 self.failUnlessEqual(out, data)
1511 # test all both forms of put: from a file, and from stdin
1513 d.addCallback(run, "put", files[0], "tahoe-file0")
1514 def _put_out((out,err)):
1515 self.failUnless("URI:LIT:" in out, out)
1516 self.failUnless("201 Created" in err, err)
1518 return run(None, "get", uri0)
1519 d.addCallback(_put_out)
1520 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1522 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1523 # tahoe put bar tahoe:FOO
1524 d.addCallback(run, "put", files[2], "tahoe:file2")
1525 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1526 def _check_put_mutable((out,err)):
1527 self._mutable_file3_uri = out.strip()
1528 d.addCallback(_check_put_mutable)
1529 d.addCallback(run, "get", "tahoe:file3")
1530 d.addCallback(_check_stdout_against, 3)
1533 STDIN_DATA = "This is the file to upload from stdin."
1534 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1535 # tahoe put tahoe:FOO
1536 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1537 stdin="Other file from stdin.")
1539 d.addCallback(run, "ls")
1540 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1541 "tahoe-file-stdin", "from-stdin"])
1542 d.addCallback(run, "ls", "subdir")
1543 d.addCallback(_check_ls, ["tahoe-file1"])
1546 d.addCallback(run, "mkdir", "subdir2")
1547 d.addCallback(run, "ls")
1548 # TODO: extract the URI, set an alias with it
1549 d.addCallback(_check_ls, ["subdir2"])
1551 # tahoe get: (to stdin and to a file)
1552 d.addCallback(run, "get", "tahoe-file0")
1553 d.addCallback(_check_stdout_against, 0)
1554 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1555 d.addCallback(_check_stdout_against, 1)
1556 outfile0 = os.path.join(self.basedir, "outfile0")
1557 d.addCallback(run, "get", "file2", outfile0)
1558 def _check_outfile0((out,err)):
1559 data = open(outfile0,"rb").read()
1560 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1561 d.addCallback(_check_outfile0)
1562 outfile1 = os.path.join(self.basedir, "outfile0")
1563 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1564 def _check_outfile1((out,err)):
1565 data = open(outfile1,"rb").read()
1566 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1567 d.addCallback(_check_outfile1)
1569 d.addCallback(run, "rm", "tahoe-file0")
1570 d.addCallback(run, "rm", "tahoe:file2")
1571 d.addCallback(run, "ls")
1572 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1574 d.addCallback(run, "ls", "-l")
1575 def _check_ls_l((out,err)):
1576 lines = out.split("\n")
1578 if "tahoe-file-stdin" in l:
1579 self.failUnless(l.startswith("-r-- "), l)
1580 self.failUnless(" %d " % len(STDIN_DATA) in l)
1582 self.failUnless(l.startswith("-rw- "), l) # mutable
1583 d.addCallback(_check_ls_l)
1585 d.addCallback(run, "ls", "--uri")
1586 def _check_ls_uri((out,err)):
1587 lines = out.split("\n")
1590 self.failUnless(self._mutable_file3_uri in l)
1591 d.addCallback(_check_ls_uri)
1593 d.addCallback(run, "ls", "--readonly-uri")
1594 def _check_ls_rouri((out,err)):
1595 lines = out.split("\n")
1598 rw_uri = self._mutable_file3_uri
1599 u = uri.from_string_mutable_filenode(rw_uri)
1600 ro_uri = u.get_readonly().to_string()
1601 self.failUnless(ro_uri in l)
1602 d.addCallback(_check_ls_rouri)
1605 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1606 d.addCallback(run, "ls")
1607 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1609 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1610 d.addCallback(run, "ls")
1611 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1613 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1614 d.addCallback(run, "ls")
1615 d.addCallback(_check_ls, ["file3", "file3-copy"])
1616 d.addCallback(run, "get", "tahoe:file3-copy")
1617 d.addCallback(_check_stdout_against, 3)
1619 # copy from disk into tahoe
1620 d.addCallback(run, "cp", files[4], "tahoe:file4")
1621 d.addCallback(run, "ls")
1622 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1623 d.addCallback(run, "get", "tahoe:file4")
1624 d.addCallback(_check_stdout_against, 4)
1626 # copy from tahoe into disk
1627 target_filename = os.path.join(self.basedir, "file-out")
1628 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1629 def _check_cp_out((out,err)):
1630 self.failUnless(os.path.exists(target_filename))
1631 got = open(target_filename,"rb").read()
1632 self.failUnlessEqual(got, datas[4])
1633 d.addCallback(_check_cp_out)
1635 # copy from disk to disk (silly case)
1636 target2_filename = os.path.join(self.basedir, "file-out-copy")
1637 d.addCallback(run, "cp", target_filename, target2_filename)
1638 def _check_cp_out2((out,err)):
1639 self.failUnless(os.path.exists(target2_filename))
1640 got = open(target2_filename,"rb").read()
1641 self.failUnlessEqual(got, datas[4])
1642 d.addCallback(_check_cp_out2)
1644 # copy from tahoe into disk, overwriting an existing file
1645 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1646 def _check_cp_out3((out,err)):
1647 self.failUnless(os.path.exists(target_filename))
1648 got = open(target_filename,"rb").read()
1649 self.failUnlessEqual(got, datas[3])
1650 d.addCallback(_check_cp_out3)
1652 # copy from disk into tahoe, overwriting an existing immutable file
1653 d.addCallback(run, "cp", files[5], "tahoe:file4")
1654 d.addCallback(run, "ls")
1655 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1656 d.addCallback(run, "get", "tahoe:file4")
1657 d.addCallback(_check_stdout_against, 5)
1659 # copy from disk into tahoe, overwriting an existing mutable file
1660 d.addCallback(run, "cp", files[5], "tahoe:file3")
1661 d.addCallback(run, "ls")
1662 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1663 d.addCallback(run, "get", "tahoe:file3")
1664 d.addCallback(_check_stdout_against, 5)
1666 # recursive copy: setup
1667 dn = os.path.join(self.basedir, "dir1")
1669 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1670 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1671 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1672 sdn2 = os.path.join(dn, "subdir2")
1674 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1675 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1677 # from disk into tahoe
1678 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1679 d.addCallback(run, "ls")
1680 d.addCallback(_check_ls, ["dir1"])
1681 d.addCallback(run, "ls", "dir1")
1682 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1683 ["rfile4", "rfile5"])
1684 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1685 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1686 ["rfile1", "rfile2", "rfile3"])
1687 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1688 d.addCallback(_check_stdout_against, data="rfile4")
1690 # and back out again
1691 dn_copy = os.path.join(self.basedir, "dir1-copy")
1692 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1693 def _check_cp_r_out((out,err)):
1695 old = open(os.path.join(dn, name), "rb").read()
1696 newfn = os.path.join(dn_copy, name)
1697 self.failUnless(os.path.exists(newfn))
1698 new = open(newfn, "rb").read()
1699 self.failUnlessEqual(old, new)
1703 _cmp(os.path.join("subdir2", "rfile4"))
1704 _cmp(os.path.join("subdir2", "rfile5"))
1705 d.addCallback(_check_cp_r_out)
1707 # and copy it a second time, which ought to overwrite the same files
1708 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1710 # and again, only writing filecaps
1711 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1712 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1713 def _check_capsonly((out,err)):
1714 # these should all be LITs
1715 x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1716 y = uri.from_string_filenode(x)
1717 self.failUnlessEqual(y.data, "rfile4")
1718 d.addCallback(_check_capsonly)
1720 # and tahoe-to-tahoe
1721 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1722 d.addCallback(run, "ls")
1723 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1724 d.addCallback(run, "ls", "dir1-copy")
1725 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1726 ["rfile4", "rfile5"])
1727 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1728 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1729 ["rfile1", "rfile2", "rfile3"])
1730 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1731 d.addCallback(_check_stdout_against, data="rfile4")
1733 # and copy it a second time, which ought to overwrite the same files
1734 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1736 # tahoe_ls doesn't currently handle the error correctly: it tries to
1737 # JSON-parse a traceback.
1738 ## def _ls_missing(res):
1739 ## argv = ["ls"] + nodeargs + ["bogus"]
1740 ## return self._run_cli(argv)
1741 ## d.addCallback(_ls_missing)
1742 ## def _check_ls_missing((out,err)):
1745 ## self.failUnlessEqual(err, "")
1746 ## d.addCallback(_check_ls_missing)
1750 def _run_cli(self, argv, stdin=""):
1752 stdout, stderr = StringIO(), StringIO()
1753 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1754 stdin=StringIO(stdin),
1755 stdout=stdout, stderr=stderr)
1757 return stdout.getvalue(), stderr.getvalue()
1758 d.addCallback(_done)
1761 def _test_checker(self, res):
1762 ut = upload.Data("too big to be literal" * 200, convergence=None)
1763 d = self._personal_node.add_file(u"big file", ut)
1765 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1766 def _check_dirnode_results(r):
1767 self.failUnless(r.is_healthy())
1768 d.addCallback(_check_dirnode_results)
1769 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1770 d.addCallback(_check_dirnode_results)
1772 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1773 def _got_chk_filenode(n):
1774 self.failUnless(isinstance(n, ImmutableFileNode))
1775 d = n.check(Monitor())
1776 def _check_filenode_results(r):
1777 self.failUnless(r.is_healthy())
1778 d.addCallback(_check_filenode_results)
1779 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1780 d.addCallback(_check_filenode_results)
1782 d.addCallback(_got_chk_filenode)
1784 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1785 def _got_lit_filenode(n):
1786 self.failUnless(isinstance(n, LiteralFileNode))
1787 d = n.check(Monitor())
1788 def _check_lit_filenode_results(r):
1789 self.failUnlessEqual(r, None)
1790 d.addCallback(_check_lit_filenode_results)
1791 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1792 d.addCallback(_check_lit_filenode_results)
1794 d.addCallback(_got_lit_filenode)