1 from base64 import b32encode
2 import os, sys, time, re, simplejson
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
11 from allmydata import uri
12 from allmydata.storage.mutable import MutableShareFile
13 from allmydata.storage.server import si_a2b
14 from allmydata.immutable import download, filenode, offloaded, upload
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.scripts import runner
18 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
19 NoSuchChildError, NotEnoughSharesError
20 from allmydata.monitor import Monitor
21 from allmydata.mutable.common import NotMutableError
22 from allmydata.mutable import layout as mutable_layout
23 from foolscap.api import DeadReferenceError
24 from twisted.python.failure import Failure
25 from twisted.web.client import getPage
26 from twisted.web.error import Error
28 from allmydata.test.common import SystemTestMixin, MemoryConsumer, \
32 This is some data to publish to the virtual drive, which needs to be large
33 enough to not fit inside a LIT uri.
36 class CountingDataUploadable(upload.Data):
38 interrupt_after = None
39 interrupt_after_d = None
41 def read(self, length):
42 self.bytes_read += length
43 if self.interrupt_after is not None:
44 if self.bytes_read > self.interrupt_after:
45 self.interrupt_after = None
46 self.interrupt_after_d.callback(self)
47 return upload.Data.read(self, length)
49 class GrabEverythingConsumer:
55 def registerProducer(self, producer, streaming):
57 assert IPushProducer.providedBy(producer)
59 def write(self, data):
62 def unregisterProducer(self):
65 class SystemTest(SystemTestMixin, unittest.TestCase):
67 def test_connections(self):
68 self.basedir = "system/SystemTest/test_connections"
69 d = self.set_up_nodes()
70 self.extra_node = None
71 d.addCallback(lambda res: self.add_extra_node(self.numclients))
72 def _check(extra_node):
73 self.extra_node = extra_node
74 for c in self.clients:
75 all_peerids = list(c.get_all_peerids())
76 self.failUnlessEqual(len(all_peerids), self.numclients+1)
77 permuted_peers = list(c.get_permuted_peers("storage", "a"))
78 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
81 def _shutdown_extra_node(res):
83 return self.extra_node.stopService()
85 d.addBoth(_shutdown_extra_node)
87 test_connections.timeout = 300
88 # test_connections is subsumed by test_upload_and_download, and takes
89 # quite a while to run on a slow machine (because of all the TLS
90 # connections that must be established). If we ever rework the introducer
91 # code to such an extent that we're not sure if it works anymore, we can
92 # reinstate this test until it does.
95 def test_upload_and_download_random_key(self):
96 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
97 return self._test_upload_and_download(convergence=None)
98 test_upload_and_download_random_key.timeout = 4800
100 def test_upload_and_download_convergent(self):
101 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
102 return self._test_upload_and_download(convergence="some convergence string")
103 test_upload_and_download_convergent.timeout = 4800
105 def _test_upload_and_download(self, convergence):
106 # we use 4000 bytes of data, which will result in about 400k written
107 # to disk among all our simulated nodes
108 DATA = "Some data to upload\n" * 200
109 d = self.set_up_nodes()
110 def _check_connections(res):
111 for c in self.clients:
112 all_peerids = list(c.get_all_peerids())
113 self.failUnlessEqual(len(all_peerids), self.numclients)
114 permuted_peers = list(c.get_permuted_peers("storage", "a"))
115 self.failUnlessEqual(len(permuted_peers), self.numclients)
116 d.addCallback(_check_connections)
120 u = self.clients[0].getServiceNamed("uploader")
122 # we crank the max segsize down to 1024b for the duration of this
123 # test, so we can exercise multiple segments. It is important
124 # that this is not a multiple of the segment size, so that the
125 # tail segment is not the same length as the others. This actualy
126 # gets rounded up to 1025 to be a multiple of the number of
127 # required shares (since we use 25 out of 100 FEC).
128 up = upload.Data(DATA, convergence=convergence)
129 up.max_segment_size = 1024
132 d.addCallback(_do_upload)
133 def _upload_done(results):
135 log.msg("upload finished: uri is %s" % (theuri,))
137 assert isinstance(self.uri, str), self.uri
138 dl = self.clients[1].getServiceNamed("downloader")
140 d.addCallback(_upload_done)
142 def _upload_again(res):
143 # Upload again. If using convergent encryption then this ought to be
144 # short-circuited, however with the way we currently generate URIs
145 # (i.e. because they include the roothash), we have to do all of the
146 # encoding work, and only get to save on the upload part.
147 log.msg("UPLOADING AGAIN")
148 up = upload.Data(DATA, convergence=convergence)
149 up.max_segment_size = 1024
150 d1 = self.uploader.upload(up)
151 d.addCallback(_upload_again)
153 def _download_to_data(res):
154 log.msg("DOWNLOADING")
155 return self.downloader.download_to_data(self.uri)
156 d.addCallback(_download_to_data)
157 def _download_to_data_done(data):
158 log.msg("download finished")
159 self.failUnlessEqual(data, DATA)
160 d.addCallback(_download_to_data_done)
162 target_filename = os.path.join(self.basedir, "download.target")
163 def _download_to_filename(res):
164 return self.downloader.download_to_filename(self.uri,
166 d.addCallback(_download_to_filename)
167 def _download_to_filename_done(res):
168 newdata = open(target_filename, "rb").read()
169 self.failUnlessEqual(newdata, DATA)
170 d.addCallback(_download_to_filename_done)
172 target_filename2 = os.path.join(self.basedir, "download.target2")
173 def _download_to_filehandle(res):
174 fh = open(target_filename2, "wb")
175 return self.downloader.download_to_filehandle(self.uri, fh)
176 d.addCallback(_download_to_filehandle)
177 def _download_to_filehandle_done(fh):
179 newdata = open(target_filename2, "rb").read()
180 self.failUnlessEqual(newdata, DATA)
181 d.addCallback(_download_to_filehandle_done)
183 consumer = GrabEverythingConsumer()
184 ct = download.ConsumerAdapter(consumer)
185 d.addCallback(lambda res:
186 self.downloader.download(self.uri, ct))
187 def _download_to_consumer_done(ign):
188 self.failUnlessEqual(consumer.contents, DATA)
189 d.addCallback(_download_to_consumer_done)
192 n = self.clients[1].create_node_from_uri(self.uri)
193 d = download_to_data(n)
194 def _read_done(data):
195 self.failUnlessEqual(data, DATA)
196 d.addCallback(_read_done)
197 d.addCallback(lambda ign:
198 n.read(MemoryConsumer(), offset=1, size=4))
199 def _read_portion_done(mc):
200 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
201 d.addCallback(_read_portion_done)
202 d.addCallback(lambda ign:
203 n.read(MemoryConsumer(), offset=2, size=None))
204 def _read_tail_done(mc):
205 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
206 d.addCallback(_read_tail_done)
207 d.addCallback(lambda ign:
208 n.read(MemoryConsumer(), size=len(DATA)+1000))
209 def _read_too_much(mc):
210 self.failUnlessEqual("".join(mc.chunks), DATA)
211 d.addCallback(_read_too_much)
214 d.addCallback(_test_read)
216 def _test_bad_read(res):
217 bad_u = uri.from_string_filenode(self.uri)
218 bad_u.key = self.flip_bit(bad_u.key)
219 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
220 # this should cause an error during download
222 d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
224 bad_n.read, MemoryConsumer(), offset=2)
226 d.addCallback(_test_bad_read)
228 def _download_nonexistent_uri(res):
229 baduri = self.mangle_uri(self.uri)
230 log.msg("about to download non-existent URI", level=log.UNUSUAL,
231 facility="tahoe.tests")
232 d1 = self.downloader.download_to_data(baduri)
233 def _baduri_should_fail(res):
234 log.msg("finished downloading non-existend URI",
235 level=log.UNUSUAL, facility="tahoe.tests")
236 self.failUnless(isinstance(res, Failure))
237 self.failUnless(res.check(NotEnoughSharesError),
238 "expected NotEnoughSharesError, got %s" % res)
239 # TODO: files that have zero peers should get a special kind
240 # of NotEnoughSharesError, which can be used to suggest that
241 # the URI might be wrong or that they've never uploaded the
242 # file in the first place.
243 d1.addBoth(_baduri_should_fail)
245 d.addCallback(_download_nonexistent_uri)
247 # add a new node, which doesn't accept shares, and only uses the
249 d.addCallback(lambda res: self.add_extra_node(self.numclients,
251 add_to_sparent=True))
252 def _added(extra_node):
253 self.extra_node = extra_node
254 d.addCallback(_added)
256 HELPER_DATA = "Data that needs help to upload" * 1000
257 def _upload_with_helper(res):
258 u = upload.Data(HELPER_DATA, convergence=convergence)
259 d = self.extra_node.upload(u)
260 def _uploaded(results):
262 return self.downloader.download_to_data(uri)
263 d.addCallback(_uploaded)
265 self.failUnlessEqual(newdata, HELPER_DATA)
266 d.addCallback(_check)
268 d.addCallback(_upload_with_helper)
270 def _upload_duplicate_with_helper(res):
271 u = upload.Data(HELPER_DATA, convergence=convergence)
272 u.debug_stash_RemoteEncryptedUploadable = True
273 d = self.extra_node.upload(u)
274 def _uploaded(results):
276 return self.downloader.download_to_data(uri)
277 d.addCallback(_uploaded)
279 self.failUnlessEqual(newdata, HELPER_DATA)
280 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
281 "uploadable started uploading, should have been avoided")
282 d.addCallback(_check)
284 if convergence is not None:
285 d.addCallback(_upload_duplicate_with_helper)
287 def _upload_resumable(res):
288 DATA = "Data that needs help to upload and gets interrupted" * 1000
289 u1 = CountingDataUploadable(DATA, convergence=convergence)
290 u2 = CountingDataUploadable(DATA, convergence=convergence)
292 # we interrupt the connection after about 5kB by shutting down
293 # the helper, then restartingit.
294 u1.interrupt_after = 5000
295 u1.interrupt_after_d = defer.Deferred()
296 u1.interrupt_after_d.addCallback(lambda res:
297 self.bounce_client(0))
299 # sneak into the helper and reduce its chunk size, so that our
300 # debug_interrupt will sever the connection on about the fifth
301 # chunk fetched. This makes sure that we've started to write the
302 # new shares before we abandon them, which exercises the
303 # abort/delete-partial-share code. TODO: find a cleaner way to do
304 # this. I know that this will affect later uses of the helper in
305 # this same test run, but I'm not currently worried about it.
306 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
308 d = self.extra_node.upload(u1)
310 def _should_not_finish(res):
311 self.fail("interrupted upload should have failed, not finished"
312 " with result %s" % (res,))
314 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
316 # make sure we actually interrupted it before finishing the
318 self.failUnless(u1.bytes_read < len(DATA),
319 "read %d out of %d total" % (u1.bytes_read,
322 log.msg("waiting for reconnect", level=log.NOISY,
323 facility="tahoe.test.test_system")
324 # now, we need to give the nodes a chance to notice that this
325 # connection has gone away. When this happens, the storage
326 # servers will be told to abort their uploads, removing the
327 # partial shares. Unfortunately this involves TCP messages
328 # going through the loopback interface, and we can't easily
329 # predict how long that will take. If it were all local, we
330 # could use fireEventually() to stall. Since we don't have
331 # the right introduction hooks, the best we can do is use a
332 # fixed delay. TODO: this is fragile.
333 u1.interrupt_after_d.addCallback(self.stall, 2.0)
334 return u1.interrupt_after_d
335 d.addCallbacks(_should_not_finish, _interrupted)
337 def _disconnected(res):
338 # check to make sure the storage servers aren't still hanging
339 # on to the partial share: their incoming/ directories should
341 log.msg("disconnected", level=log.NOISY,
342 facility="tahoe.test.test_system")
343 for i in range(self.numclients):
344 incdir = os.path.join(self.getdir("client%d" % i),
345 "storage", "shares", "incoming")
346 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
347 d.addCallback(_disconnected)
349 # then we need to give the reconnector a chance to
350 # reestablish the connection to the helper.
351 d.addCallback(lambda res:
352 log.msg("wait_for_connections", level=log.NOISY,
353 facility="tahoe.test.test_system"))
354 d.addCallback(lambda res: self.wait_for_connections())
357 d.addCallback(lambda res:
358 log.msg("uploading again", level=log.NOISY,
359 facility="tahoe.test.test_system"))
360 d.addCallback(lambda res: self.extra_node.upload(u2))
362 def _uploaded(results):
364 log.msg("Second upload complete", level=log.NOISY,
365 facility="tahoe.test.test_system")
367 # this is really bytes received rather than sent, but it's
368 # convenient and basically measures the same thing
369 bytes_sent = results.ciphertext_fetched
371 # We currently don't support resumption of upload if the data is
372 # encrypted with a random key. (Because that would require us
373 # to store the key locally and re-use it on the next upload of
374 # this file, which isn't a bad thing to do, but we currently
376 if convergence is not None:
377 # Make sure we did not have to read the whole file the
378 # second time around .
379 self.failUnless(bytes_sent < len(DATA),
380 "resumption didn't save us any work:"
381 " read %d bytes out of %d total" %
382 (bytes_sent, len(DATA)))
384 # Make sure we did have to read the whole file the second
385 # time around -- because the one that we partially uploaded
386 # earlier was encrypted with a different random key.
387 self.failIf(bytes_sent < len(DATA),
388 "resumption saved us some work even though we were using random keys:"
389 " read %d bytes out of %d total" %
390 (bytes_sent, len(DATA)))
391 return self.downloader.download_to_data(uri)
392 d.addCallback(_uploaded)
395 self.failUnlessEqual(newdata, DATA)
396 # If using convergent encryption, then also check that the
397 # helper has removed the temp file from its directories.
398 if convergence is not None:
399 basedir = os.path.join(self.getdir("client0"), "helper")
400 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
401 self.failUnlessEqual(files, [])
402 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
403 self.failUnlessEqual(files, [])
404 d.addCallback(_check)
406 d.addCallback(_upload_resumable)
408 def _grab_stats(ignored):
409 # the StatsProvider doesn't normally publish a FURL:
410 # instead it passes a live reference to the StatsGatherer
411 # (if and when it connects). To exercise the remote stats
412 # interface, we manually publish client0's StatsProvider
413 # and use client1 to query it.
414 sp = self.clients[0].stats_provider
415 sp_furl = self.clients[0].tub.registerReference(sp)
416 d = self.clients[1].tub.getReference(sp_furl)
417 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
418 def _got_stats(stats):
420 #from pprint import pprint
423 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
424 c = stats["counters"]
425 self.failUnless("storage_server.allocate" in c)
426 d.addCallback(_got_stats)
428 d.addCallback(_grab_stats)
432 def _find_shares(self, basedir):
434 for (dirpath, dirnames, filenames) in os.walk(basedir):
435 if "storage" not in dirpath:
439 pieces = dirpath.split(os.sep)
441 and pieces[-4] == "storage"
442 and pieces[-3] == "shares"):
443 # we're sitting in .../storage/shares/$START/$SINDEX , and there
444 # are sharefiles here
445 assert pieces[-5].startswith("client")
446 client_num = int(pieces[-5][-1])
447 storage_index_s = pieces[-1]
448 storage_index = si_a2b(storage_index_s)
449 for sharename in filenames:
450 shnum = int(sharename)
451 filename = os.path.join(dirpath, sharename)
452 data = (client_num, storage_index, filename, shnum)
455 self.fail("unable to find any share files in %s" % basedir)
458 def _corrupt_mutable_share(self, filename, which):
459 msf = MutableShareFile(filename)
460 datav = msf.readv([ (0, 1000000) ])
461 final_share = datav[0]
462 assert len(final_share) < 1000000 # ought to be truncated
463 pieces = mutable_layout.unpack_share(final_share)
464 (seqnum, root_hash, IV, k, N, segsize, datalen,
465 verification_key, signature, share_hash_chain, block_hash_tree,
466 share_data, enc_privkey) = pieces
468 if which == "seqnum":
471 root_hash = self.flip_bit(root_hash)
473 IV = self.flip_bit(IV)
474 elif which == "segsize":
475 segsize = segsize + 15
476 elif which == "pubkey":
477 verification_key = self.flip_bit(verification_key)
478 elif which == "signature":
479 signature = self.flip_bit(signature)
480 elif which == "share_hash_chain":
481 nodenum = share_hash_chain.keys()[0]
482 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
483 elif which == "block_hash_tree":
484 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
485 elif which == "share_data":
486 share_data = self.flip_bit(share_data)
487 elif which == "encprivkey":
488 enc_privkey = self.flip_bit(enc_privkey)
490 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
492 final_share = mutable_layout.pack_share(prefix,
499 msf.writev( [(0, final_share)], None)
502 def test_mutable(self):
503 self.basedir = "system/SystemTest/test_mutable"
504 DATA = "initial contents go here." # 25 bytes % 3 != 0
505 NEWDATA = "new contents yay"
506 NEWERDATA = "this is getting old"
508 d = self.set_up_nodes(use_key_generator=True)
510 def _create_mutable(res):
512 log.msg("starting create_mutable_file")
513 d1 = c.create_mutable_file(DATA)
515 log.msg("DONE: %s" % (res,))
516 self._mutable_node_1 = res
518 d1.addCallback(_done)
520 d.addCallback(_create_mutable)
522 def _test_debug(res):
523 # find a share. It is important to run this while there is only
524 # one slot in the grid.
525 shares = self._find_shares(self.basedir)
526 (client_num, storage_index, filename, shnum) = shares[0]
527 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
529 log.msg(" for clients[%d]" % client_num)
531 out,err = StringIO(), StringIO()
532 rc = runner.runner(["debug", "dump-share", "--offsets",
534 stdout=out, stderr=err)
535 output = out.getvalue()
536 self.failUnlessEqual(rc, 0)
538 self.failUnless("Mutable slot found:\n" in output)
539 self.failUnless("share_type: SDMF\n" in output)
540 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
541 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
542 self.failUnless(" num_extra_leases: 0\n" in output)
543 # the pubkey size can vary by a byte, so the container might
544 # be a bit larger on some runs.
545 m = re.search(r'^ container_size: (\d+)$', output, re.M)
547 container_size = int(m.group(1))
548 self.failUnless(2037 <= container_size <= 2049, container_size)
549 m = re.search(r'^ data_length: (\d+)$', output, re.M)
551 data_length = int(m.group(1))
552 self.failUnless(2037 <= data_length <= 2049, data_length)
553 self.failUnless(" secrets are for nodeid: %s\n" % peerid
555 self.failUnless(" SDMF contents:\n" in output)
556 self.failUnless(" seqnum: 1\n" in output)
557 self.failUnless(" required_shares: 3\n" in output)
558 self.failUnless(" total_shares: 10\n" in output)
559 self.failUnless(" segsize: 27\n" in output, (output, filename))
560 self.failUnless(" datalen: 25\n" in output)
561 # the exact share_hash_chain nodes depends upon the sharenum,
562 # and is more of a hassle to compute than I want to deal with
564 self.failUnless(" share_hash_chain: " in output)
565 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
566 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
567 base32.b2a(storage_index))
568 self.failUnless(expected in output)
569 except unittest.FailTest:
571 print "dump-share output was:"
574 d.addCallback(_test_debug)
578 # first, let's see if we can use the existing node to retrieve the
579 # contents. This allows it to use the cached pubkey and maybe the
580 # latest-known sharemap.
582 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
583 def _check_download_1(res):
584 self.failUnlessEqual(res, DATA)
585 # now we see if we can retrieve the data from a new node,
586 # constructed using the URI of the original one. We do this test
587 # on the same client that uploaded the data.
588 uri = self._mutable_node_1.get_uri()
589 log.msg("starting retrieve1")
590 newnode = self.clients[0].create_node_from_uri(uri)
591 newnode_2 = self.clients[0].create_node_from_uri(uri)
592 self.failUnlessIdentical(newnode, newnode_2)
593 return newnode.download_best_version()
594 d.addCallback(_check_download_1)
596 def _check_download_2(res):
597 self.failUnlessEqual(res, DATA)
598 # same thing, but with a different client
599 uri = self._mutable_node_1.get_uri()
600 newnode = self.clients[1].create_node_from_uri(uri)
601 log.msg("starting retrieve2")
602 d1 = newnode.download_best_version()
603 d1.addCallback(lambda res: (res, newnode))
605 d.addCallback(_check_download_2)
607 def _check_download_3((res, newnode)):
608 self.failUnlessEqual(res, DATA)
610 log.msg("starting replace1")
611 d1 = newnode.overwrite(NEWDATA)
612 d1.addCallback(lambda res: newnode.download_best_version())
614 d.addCallback(_check_download_3)
616 def _check_download_4(res):
617 self.failUnlessEqual(res, NEWDATA)
618 # now create an even newer node and replace the data on it. This
619 # new node has never been used for download before.
620 uri = self._mutable_node_1.get_uri()
621 newnode1 = self.clients[2].create_node_from_uri(uri)
622 newnode2 = self.clients[3].create_node_from_uri(uri)
623 self._newnode3 = self.clients[3].create_node_from_uri(uri)
624 log.msg("starting replace2")
625 d1 = newnode1.overwrite(NEWERDATA)
626 d1.addCallback(lambda res: newnode2.download_best_version())
628 d.addCallback(_check_download_4)
630 def _check_download_5(res):
631 log.msg("finished replace2")
632 self.failUnlessEqual(res, NEWERDATA)
633 d.addCallback(_check_download_5)
635 def _corrupt_shares(res):
636 # run around and flip bits in all but k of the shares, to test
638 shares = self._find_shares(self.basedir)
639 ## sort by share number
640 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
641 where = dict([ (shnum, filename)
642 for (client_num, storage_index, filename, shnum)
644 assert len(where) == 10 # this test is designed for 3-of-10
645 for shnum, filename in where.items():
646 # shares 7,8,9 are left alone. read will check
647 # (share_hash_chain, block_hash_tree, share_data). New
648 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
649 # segsize, signature).
651 # read: this will trigger "pubkey doesn't match
653 self._corrupt_mutable_share(filename, "pubkey")
654 self._corrupt_mutable_share(filename, "encprivkey")
656 # triggers "signature is invalid"
657 self._corrupt_mutable_share(filename, "seqnum")
659 # triggers "signature is invalid"
660 self._corrupt_mutable_share(filename, "R")
662 # triggers "signature is invalid"
663 self._corrupt_mutable_share(filename, "segsize")
665 self._corrupt_mutable_share(filename, "share_hash_chain")
667 self._corrupt_mutable_share(filename, "block_hash_tree")
669 self._corrupt_mutable_share(filename, "share_data")
670 # other things to correct: IV, signature
671 # 7,8,9 are left alone
673 # note that initial_query_count=5 means that we'll hit the
674 # first 5 servers in effectively random order (based upon
675 # response time), so we won't necessarily ever get a "pubkey
676 # doesn't match fingerprint" error (if we hit shnum>=1 before
677 # shnum=0, we pull the pubkey from there). To get repeatable
678 # specific failures, we need to set initial_query_count=1,
679 # but of course that will change the sequencing behavior of
680 # the retrieval process. TODO: find a reasonable way to make
681 # this a parameter, probably when we expand this test to test
682 # for one failure mode at a time.
684 # when we retrieve this, we should get three signature
685 # failures (where we've mangled seqnum, R, and segsize). The
687 d.addCallback(_corrupt_shares)
689 d.addCallback(lambda res: self._newnode3.download_best_version())
690 d.addCallback(_check_download_5)
692 def _check_empty_file(res):
693 # make sure we can create empty files, this usually screws up the
695 d1 = self.clients[2].create_mutable_file("")
696 d1.addCallback(lambda newnode: newnode.download_best_version())
697 d1.addCallback(lambda res: self.failUnlessEqual("", res))
699 d.addCallback(_check_empty_file)
701 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
702 def _created_dirnode(dnode):
703 log.msg("_created_dirnode(%s)" % (dnode,))
705 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
706 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
707 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
708 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
709 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
710 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
711 d1.addCallback(lambda res: dnode.build_manifest().when_done())
712 d1.addCallback(lambda res:
713 self.failUnlessEqual(len(res["manifest"]), 1))
715 d.addCallback(_created_dirnode)
717 def wait_for_c3_kg_conn():
718 return self.clients[3]._key_generator is not None
719 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
721 def check_kg_poolsize(junk, size_delta):
722 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
723 self.key_generator_svc.key_generator.pool_size + size_delta)
725 d.addCallback(check_kg_poolsize, 0)
726 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
727 d.addCallback(check_kg_poolsize, -1)
728 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
729 d.addCallback(check_kg_poolsize, -2)
730 # use_helper induces use of clients[3], which is the using-key_gen client
731 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
732 d.addCallback(check_kg_poolsize, -3)
735 # The default 120 second timeout went off when running it under valgrind
736 # on my old Windows laptop, so I'm bumping up the timeout.
737 test_mutable.timeout = 240
739 def flip_bit(self, good):
740 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
742 def mangle_uri(self, gooduri):
743 # change the key, which changes the storage index, which means we'll
744 # be asking about the wrong file, so nobody will have any shares
745 u = IFileURI(gooduri)
746 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
747 uri_extension_hash=u.uri_extension_hash,
748 needed_shares=u.needed_shares,
749 total_shares=u.total_shares,
751 return u2.to_string()
753 # TODO: add a test which mangles the uri_extension_hash instead, and
754 # should fail due to not being able to get a valid uri_extension block.
755 # Also a test which sneakily mangles the uri_extension block to change
756 # some of the validation data, so it will fail in the post-download phase
757 # when the file's crypttext integrity check fails. Do the same thing for
758 # the key, which should cause the download to fail the post-download
759 # plaintext_hash check.
761 def test_vdrive(self):
762 self.basedir = "system/SystemTest/test_vdrive"
763 self.data = LARGE_DATA
764 d = self.set_up_nodes(use_stats_gatherer=True)
765 d.addCallback(self._test_introweb)
766 d.addCallback(self.log, "starting publish")
767 d.addCallback(self._do_publish1)
768 d.addCallback(self._test_runner)
769 d.addCallback(self._do_publish2)
770 # at this point, we have the following filesystem (where "R" denotes
771 # self._root_directory_uri):
774 # R/subdir1/mydata567
776 # R/subdir1/subdir2/mydata992
778 d.addCallback(lambda res: self.bounce_client(0))
779 d.addCallback(self.log, "bounced client0")
781 d.addCallback(self._check_publish1)
782 d.addCallback(self.log, "did _check_publish1")
783 d.addCallback(self._check_publish2)
784 d.addCallback(self.log, "did _check_publish2")
785 d.addCallback(self._do_publish_private)
786 d.addCallback(self.log, "did _do_publish_private")
787 # now we also have (where "P" denotes a new dir):
788 # P/personal/sekrit data
789 # P/s2-rw -> /subdir1/subdir2/
790 # P/s2-ro -> /subdir1/subdir2/ (read-only)
791 d.addCallback(self._check_publish_private)
792 d.addCallback(self.log, "did _check_publish_private")
793 d.addCallback(self._test_web)
794 d.addCallback(self._test_control)
795 d.addCallback(self._test_cli)
796 # P now has four top-level children:
797 # P/personal/sekrit data
800 # P/test_put/ (empty)
801 d.addCallback(self._test_checker)
803 test_vdrive.timeout = 1100
805 def _test_introweb(self, res):
806 d = getPage(self.introweb_url, method="GET", followRedirect=True)
809 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
811 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
812 self.failUnless("Subscription Summary: storage: 5" in res)
813 except unittest.FailTest:
815 print "GET %s output was:" % self.introweb_url
818 d.addCallback(_check)
819 d.addCallback(lambda res:
820 getPage(self.introweb_url + "?t=json",
821 method="GET", followRedirect=True))
822 def _check_json(res):
823 data = simplejson.loads(res)
825 self.failUnlessEqual(data["subscription_summary"],
827 self.failUnlessEqual(data["announcement_summary"],
828 {"storage": 5, "stub_client": 5})
829 self.failUnlessEqual(data["announcement_distinct_hosts"],
830 {"storage": 1, "stub_client": 1})
831 except unittest.FailTest:
833 print "GET %s?t=json output was:" % self.introweb_url
836 d.addCallback(_check_json)
839 def _do_publish1(self, res):
840 ut = upload.Data(self.data, convergence=None)
842 d = c0.create_empty_dirnode()
843 def _made_root(new_dirnode):
844 self._root_directory_uri = new_dirnode.get_uri()
845 return c0.create_node_from_uri(self._root_directory_uri)
846 d.addCallback(_made_root)
847 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
848 def _made_subdir1(subdir1_node):
849 self._subdir1_node = subdir1_node
850 d1 = subdir1_node.add_file(u"mydata567", ut)
851 d1.addCallback(self.log, "publish finished")
852 def _stash_uri(filenode):
853 self.uri = filenode.get_uri()
854 assert isinstance(self.uri, str), (self.uri, filenode)
855 d1.addCallback(_stash_uri)
857 d.addCallback(_made_subdir1)
860 def _do_publish2(self, res):
861 ut = upload.Data(self.data, convergence=None)
862 d = self._subdir1_node.create_empty_directory(u"subdir2")
863 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
866 def log(self, res, *args, **kwargs):
867 # print "MSG: %s RES: %s" % (msg, args)
868 log.msg(*args, **kwargs)
871 def _do_publish_private(self, res):
872 self.smalldata = "sssh, very secret stuff"
873 ut = upload.Data(self.smalldata, convergence=None)
874 d = self.clients[0].create_empty_dirnode()
875 d.addCallback(self.log, "GOT private directory")
876 def _got_new_dir(privnode):
877 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
878 d1 = privnode.create_empty_directory(u"personal")
879 d1.addCallback(self.log, "made P/personal")
880 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
881 d1.addCallback(self.log, "made P/personal/sekrit data")
882 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
884 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
885 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
887 d1.addCallback(_got_s2)
888 d1.addCallback(lambda res: privnode)
890 d.addCallback(_got_new_dir)
893 def _check_publish1(self, res):
894 # this one uses the iterative API
896 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
897 d.addCallback(self.log, "check_publish1 got /")
898 d.addCallback(lambda root: root.get(u"subdir1"))
899 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
900 d.addCallback(lambda filenode: filenode.download_to_data())
901 d.addCallback(self.log, "get finished")
903 self.failUnlessEqual(data, self.data)
904 d.addCallback(_get_done)
907 def _check_publish2(self, res):
908 # this one uses the path-based API
909 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
910 d = rootnode.get_child_at_path(u"subdir1")
911 d.addCallback(lambda dirnode:
912 self.failUnless(IDirectoryNode.providedBy(dirnode)))
913 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
914 d.addCallback(lambda filenode: filenode.download_to_data())
915 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
917 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
918 def _got_filenode(filenode):
919 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
920 assert fnode == filenode
921 d.addCallback(_got_filenode)
924 def _check_publish_private(self, resnode):
925 # this one uses the path-based API
926 self._private_node = resnode
928 d = self._private_node.get_child_at_path(u"personal")
929 def _got_personal(personal):
930 self._personal_node = personal
932 d.addCallback(_got_personal)
934 d.addCallback(lambda dirnode:
935 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
937 return self._private_node.get_child_at_path(path)
939 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
940 d.addCallback(lambda filenode: filenode.download_to_data())
941 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
942 d.addCallback(lambda res: get_path(u"s2-rw"))
943 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
944 d.addCallback(lambda res: get_path(u"s2-ro"))
945 def _got_s2ro(dirnode):
946 self.failUnless(dirnode.is_mutable(), dirnode)
947 self.failUnless(dirnode.is_readonly(), dirnode)
948 d1 = defer.succeed(None)
949 d1.addCallback(lambda res: dirnode.list())
950 d1.addCallback(self.log, "dirnode.list")
952 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
954 d1.addCallback(self.log, "doing add_file(ro)")
955 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
956 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
958 d1.addCallback(self.log, "doing get(ro)")
959 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
960 d1.addCallback(lambda filenode:
961 self.failUnless(IFileNode.providedBy(filenode)))
963 d1.addCallback(self.log, "doing delete(ro)")
964 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
966 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
968 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
970 personal = self._personal_node
971 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
973 d1.addCallback(self.log, "doing move_child_to(ro)2")
974 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
976 d1.addCallback(self.log, "finished with _got_s2ro")
978 d.addCallback(_got_s2ro)
979 def _got_home(dummy):
980 home = self._private_node
981 personal = self._personal_node
982 d1 = defer.succeed(None)
983 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
984 d1.addCallback(lambda res:
985 personal.move_child_to(u"sekrit data",home,u"sekrit"))
987 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
988 d1.addCallback(lambda res:
989 home.move_child_to(u"sekrit", home, u"sekrit data"))
991 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
992 d1.addCallback(lambda res:
993 home.move_child_to(u"sekrit data", personal))
995 d1.addCallback(lambda res: home.build_manifest().when_done())
996 d1.addCallback(self.log, "manifest")
1000 # P/personal/sekrit data
1001 # P/s2-rw (same as P/s2-ro)
1002 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1003 d1.addCallback(lambda res:
1004 self.failUnlessEqual(len(res["manifest"]), 5))
1005 d1.addCallback(lambda res: home.start_deep_stats().when_done())
1006 def _check_stats(stats):
1007 expected = {"count-immutable-files": 1,
1008 "count-mutable-files": 0,
1009 "count-literal-files": 1,
1011 "count-directories": 3,
1012 "size-immutable-files": 112,
1013 "size-literal-files": 23,
1014 #"size-directories": 616, # varies
1015 #"largest-directory": 616,
1016 "largest-directory-children": 3,
1017 "largest-immutable-file": 112,
1019 for k,v in expected.iteritems():
1020 self.failUnlessEqual(stats[k], v,
1021 "stats[%s] was %s, not %s" %
1023 self.failUnless(stats["size-directories"] > 1300,
1024 stats["size-directories"])
1025 self.failUnless(stats["largest-directory"] > 800,
1026 stats["largest-directory"])
1027 self.failUnlessEqual(stats["size-files-histogram"],
1028 [ (11, 31, 1), (101, 316, 1) ])
1029 d1.addCallback(_check_stats)
1031 d.addCallback(_got_home)
1034 def shouldFail(self, res, expected_failure, which, substring=None):
1035 if isinstance(res, Failure):
1036 res.trap(expected_failure)
1038 self.failUnless(substring in str(res),
1039 "substring '%s' not in '%s'"
1040 % (substring, str(res)))
1042 self.fail("%s was supposed to raise %s, not get '%s'" %
1043 (which, expected_failure, res))
1045 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1046 assert substring is None or isinstance(substring, str)
1047 d = defer.maybeDeferred(callable, *args, **kwargs)
1049 if isinstance(res, Failure):
1050 res.trap(expected_failure)
1052 self.failUnless(substring in str(res),
1053 "substring '%s' not in '%s'"
1054 % (substring, str(res)))
1056 self.fail("%s was supposed to raise %s, not get '%s'" %
1057 (which, expected_failure, res))
1061 def PUT(self, urlpath, data):
1062 url = self.webish_url + urlpath
1063 return getPage(url, method="PUT", postdata=data)
1065 def GET(self, urlpath, followRedirect=False):
1066 url = self.webish_url + urlpath
1067 return getPage(url, method="GET", followRedirect=followRedirect)
1069 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1071 url = self.helper_webish_url + urlpath
1073 url = self.webish_url + urlpath
1074 sepbase = "boogabooga"
1075 sep = "--" + sepbase
1078 form.append('Content-Disposition: form-data; name="_charset"')
1080 form.append('UTF-8')
1082 for name, value in fields.iteritems():
1083 if isinstance(value, tuple):
1084 filename, value = value
1085 form.append('Content-Disposition: form-data; name="%s"; '
1086 'filename="%s"' % (name, filename.encode("utf-8")))
1088 form.append('Content-Disposition: form-data; name="%s"' % name)
1090 form.append(str(value))
1093 body = "\r\n".join(form) + "\r\n"
1094 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1096 return getPage(url, method="POST", postdata=body,
1097 headers=headers, followRedirect=followRedirect)
1099 def _test_web(self, res):
1100 base = self.webish_url
1101 public = "uri/" + self._root_directory_uri
1103 def _got_welcome(page):
1104 # XXX This test is oversensitive to formatting
1105 expected = "Connected to <span>%d</span>\n of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1106 self.failUnless(expected in page,
1107 "I didn't see the right 'connected storage servers'"
1108 " message in: %s" % page
1110 expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1111 self.failUnless(expected in page,
1112 "I didn't see the right 'My nodeid' message "
1114 self.failUnless("Helper: 0 active uploads" in page)
1115 d.addCallback(_got_welcome)
1116 d.addCallback(self.log, "done with _got_welcome")
1118 # get the welcome page from the node that uses the helper too
1119 d.addCallback(lambda res: getPage(self.helper_webish_url))
1120 def _got_welcome_helper(page):
1121 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1123 self.failUnless("Not running helper" in page)
1124 d.addCallback(_got_welcome_helper)
1126 d.addCallback(lambda res: getPage(base + public))
1127 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1128 def _got_subdir1(page):
1129 # there ought to be an href for our file
1130 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1131 self.failUnless(">mydata567</a>" in page)
1132 d.addCallback(_got_subdir1)
1133 d.addCallback(self.log, "done with _got_subdir1")
1134 d.addCallback(lambda res:
1135 getPage(base + public + "/subdir1/mydata567"))
1136 def _got_data(page):
1137 self.failUnlessEqual(page, self.data)
1138 d.addCallback(_got_data)
1140 # download from a URI embedded in a URL
1141 d.addCallback(self.log, "_get_from_uri")
1142 def _get_from_uri(res):
1143 return getPage(base + "uri/%s?filename=%s"
1144 % (self.uri, "mydata567"))
1145 d.addCallback(_get_from_uri)
1146 def _got_from_uri(page):
1147 self.failUnlessEqual(page, self.data)
1148 d.addCallback(_got_from_uri)
1150 # download from a URI embedded in a URL, second form
1151 d.addCallback(self.log, "_get_from_uri2")
1152 def _get_from_uri2(res):
1153 return getPage(base + "uri?uri=%s" % (self.uri,))
1154 d.addCallback(_get_from_uri2)
1155 d.addCallback(_got_from_uri)
1157 # download from a bogus URI, make sure we get a reasonable error
1158 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1159 def _get_from_bogus_uri(res):
1160 d1 = getPage(base + "uri/%s?filename=%s"
1161 % (self.mangle_uri(self.uri), "mydata567"))
1162 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1165 d.addCallback(_get_from_bogus_uri)
1166 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1168 # upload a file with PUT
1169 d.addCallback(self.log, "about to try PUT")
1170 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1171 "new.txt contents"))
1172 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1173 d.addCallback(self.failUnlessEqual, "new.txt contents")
1174 # and again with something large enough to use multiple segments,
1175 # and hopefully trigger pauseProducing too
1176 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1177 "big" * 500000)) # 1.5MB
1178 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1179 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1181 # can we replace files in place?
1182 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1184 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1185 d.addCallback(self.failUnlessEqual, "NEWER contents")
1187 # test unlinked POST
1188 d.addCallback(lambda res: self.POST("uri", t="upload",
1189 file=("new.txt", "data" * 10000)))
1190 # and again using the helper, which exercises different upload-status
1192 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1193 file=("foo.txt", "data2" * 10000)))
1195 # check that the status page exists
1196 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1197 def _got_status(res):
1198 # find an interesting upload and download to look at. LIT files
1199 # are not interesting.
1200 for ds in self.clients[0].list_all_download_statuses():
1201 if ds.get_size() > 200:
1202 self._down_status = ds.get_counter()
1203 for us in self.clients[0].list_all_upload_statuses():
1204 if us.get_size() > 200:
1205 self._up_status = us.get_counter()
1206 rs = list(self.clients[0].list_all_retrieve_statuses())[0]
1207 self._retrieve_status = rs.get_counter()
1208 ps = list(self.clients[0].list_all_publish_statuses())[0]
1209 self._publish_status = ps.get_counter()
1210 us = list(self.clients[0].list_all_mapupdate_statuses())[0]
1211 self._update_status = us.get_counter()
1213 # and that there are some upload- and download- status pages
1214 return self.GET("status/up-%d" % self._up_status)
1215 d.addCallback(_got_status)
1217 return self.GET("status/down-%d" % self._down_status)
1218 d.addCallback(_got_up)
1220 return self.GET("status/mapupdate-%d" % self._update_status)
1221 d.addCallback(_got_down)
1222 def _got_update(res):
1223 return self.GET("status/publish-%d" % self._publish_status)
1224 d.addCallback(_got_update)
1225 def _got_publish(res):
1226 return self.GET("status/retrieve-%d" % self._retrieve_status)
1227 d.addCallback(_got_publish)
1229 # check that the helper status page exists
1230 d.addCallback(lambda res:
1231 self.GET("helper_status", followRedirect=True))
1232 def _got_helper_status(res):
1233 self.failUnless("Bytes Fetched:" in res)
1234 # touch a couple of files in the helper's working directory to
1235 # exercise more code paths
1236 workdir = os.path.join(self.getdir("client0"), "helper")
1237 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1238 f = open(incfile, "wb")
1239 f.write("small file")
1241 then = time.time() - 86400*3
1243 os.utime(incfile, (now, then))
1244 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1245 f = open(encfile, "wb")
1246 f.write("less small file")
1248 os.utime(encfile, (now, then))
1249 d.addCallback(_got_helper_status)
1250 # and that the json form exists
1251 d.addCallback(lambda res:
1252 self.GET("helper_status?t=json", followRedirect=True))
1253 def _got_helper_status_json(res):
1254 data = simplejson.loads(res)
1255 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1257 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1258 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1259 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1261 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1262 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1263 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1265 d.addCallback(_got_helper_status_json)
1267 # and check that client[3] (which uses a helper but does not run one
1268 # itself) doesn't explode when you ask for its status
1269 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1270 def _got_non_helper_status(res):
1271 self.failUnless("Upload and Download Status" in res)
1272 d.addCallback(_got_non_helper_status)
1274 # or for helper status with t=json
1275 d.addCallback(lambda res:
1276 getPage(self.helper_webish_url + "helper_status?t=json"))
1277 def _got_non_helper_status_json(res):
1278 data = simplejson.loads(res)
1279 self.failUnlessEqual(data, {})
1280 d.addCallback(_got_non_helper_status_json)
1282 # see if the statistics page exists
1283 d.addCallback(lambda res: self.GET("statistics"))
1284 def _got_stats(res):
1285 self.failUnless("Node Statistics" in res)
1286 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1287 d.addCallback(_got_stats)
1288 d.addCallback(lambda res: self.GET("statistics?t=json"))
1289 def _got_stats_json(res):
1290 data = simplejson.loads(res)
1291 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1292 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1293 d.addCallback(_got_stats_json)
1295 # TODO: mangle the second segment of a file, to test errors that
1296 # occur after we've already sent some good data, which uses a
1297 # different error path.
1299 # TODO: download a URI with a form
1300 # TODO: create a directory by using a form
1301 # TODO: upload by using a form on the directory page
1302 # url = base + "somedir/subdir1/freeform_post!!upload"
1303 # TODO: delete a file by using a button on the directory page
1307 def _test_runner(self, res):
1308 # exercise some of the diagnostic tools in runner.py
1311 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1312 if "storage" not in dirpath:
1316 pieces = dirpath.split(os.sep)
1317 if (len(pieces) >= 4
1318 and pieces[-4] == "storage"
1319 and pieces[-3] == "shares"):
1320 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1321 # are sharefiles here
1322 filename = os.path.join(dirpath, filenames[0])
1323 # peek at the magic to see if it is a chk share
1324 magic = open(filename, "rb").read(4)
1325 if magic == '\x00\x00\x00\x01':
1328 self.fail("unable to find any uri_extension files in %s"
1330 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1332 out,err = StringIO(), StringIO()
1333 rc = runner.runner(["debug", "dump-share", "--offsets",
1335 stdout=out, stderr=err)
1336 output = out.getvalue()
1337 self.failUnlessEqual(rc, 0)
1339 # we only upload a single file, so we can assert some things about
1340 # its size and shares.
1341 self.failUnless(("share filename: %s" % filename) in output)
1342 self.failUnless("size: %d\n" % len(self.data) in output)
1343 self.failUnless("num_segments: 1\n" in output)
1344 # segment_size is always a multiple of needed_shares
1345 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1346 self.failUnless("total_shares: 10\n" in output)
1347 # keys which are supposed to be present
1348 for key in ("size", "num_segments", "segment_size",
1349 "needed_shares", "total_shares",
1350 "codec_name", "codec_params", "tail_codec_params",
1351 #"plaintext_hash", "plaintext_root_hash",
1352 "crypttext_hash", "crypttext_root_hash",
1353 "share_root_hash", "UEB_hash"):
1354 self.failUnless("%s: " % key in output, key)
1355 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1357 # now use its storage index to find the other shares using the
1358 # 'find-shares' tool
1359 sharedir, shnum = os.path.split(filename)
1360 storagedir, storage_index_s = os.path.split(sharedir)
1361 out,err = StringIO(), StringIO()
1362 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1363 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1364 rc = runner.runner(cmd, stdout=out, stderr=err)
1365 self.failUnlessEqual(rc, 0)
1367 sharefiles = [sfn.strip() for sfn in out.readlines()]
1368 self.failUnlessEqual(len(sharefiles), 10)
1370 # also exercise the 'catalog-shares' tool
1371 out,err = StringIO(), StringIO()
1372 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1373 cmd = ["debug", "catalog-shares"] + nodedirs
1374 rc = runner.runner(cmd, stdout=out, stderr=err)
1375 self.failUnlessEqual(rc, 0)
1377 descriptions = [sfn.strip() for sfn in out.readlines()]
1378 self.failUnlessEqual(len(descriptions), 30)
1380 for line in descriptions
1381 if line.startswith("CHK %s " % storage_index_s)]
1382 self.failUnlessEqual(len(matching), 10)
1384 def _test_control(self, res):
1385 # exercise the remote-control-the-client foolscap interfaces in
1386 # allmydata.control (mostly used for performance tests)
1387 c0 = self.clients[0]
1388 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1389 control_furl = open(control_furl_file, "r").read().strip()
1390 # it doesn't really matter which Tub we use to connect to the client,
1391 # so let's just use our IntroducerNode's
1392 d = self.introducer.tub.getReference(control_furl)
1393 d.addCallback(self._test_control2, control_furl_file)
1395 def _test_control2(self, rref, filename):
1396 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1397 downfile = os.path.join(self.basedir, "control.downfile")
1398 d.addCallback(lambda uri:
1399 rref.callRemote("download_from_uri_to_file",
1402 self.failUnlessEqual(res, downfile)
1403 data = open(downfile, "r").read()
1404 expected_data = open(filename, "r").read()
1405 self.failUnlessEqual(data, expected_data)
1406 d.addCallback(_check)
1407 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1408 if sys.platform == "linux2":
1409 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1410 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1413 def _test_cli(self, res):
1414 # run various CLI commands (in a thread, since they use blocking
1417 private_uri = self._private_node.get_uri()
1418 some_uri = self._root_directory_uri
1419 client0_basedir = self.getdir("client0")
1422 "--node-directory", client0_basedir,
1424 TESTDATA = "I will not write the same thing over and over.\n" * 100
1426 d = defer.succeed(None)
1428 # for compatibility with earlier versions, private/root_dir.cap is
1429 # supposed to be treated as an alias named "tahoe:". Start by making
1430 # sure that works, before we add other aliases.
1432 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1433 f = open(root_file, "w")
1434 f.write(private_uri)
1437 def run(ignored, verb, *args, **kwargs):
1438 stdin = kwargs.get("stdin", "")
1439 newargs = [verb] + nodeargs + list(args)
1440 return self._run_cli(newargs, stdin=stdin)
1442 def _check_ls((out,err), expected_children, unexpected_children=[]):
1443 self.failUnlessEqual(err, "")
1444 for s in expected_children:
1445 self.failUnless(s in out, (s,out))
1446 for s in unexpected_children:
1447 self.failIf(s in out, (s,out))
1449 def _check_ls_root((out,err)):
1450 self.failUnless("personal" in out)
1451 self.failUnless("s2-ro" in out)
1452 self.failUnless("s2-rw" in out)
1453 self.failUnlessEqual(err, "")
1455 # this should reference private_uri
1456 d.addCallback(run, "ls")
1457 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1459 d.addCallback(run, "list-aliases")
1460 def _check_aliases_1((out,err)):
1461 self.failUnlessEqual(err, "")
1462 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1463 d.addCallback(_check_aliases_1)
1465 # now that that's out of the way, remove root_dir.cap and work with
1467 d.addCallback(lambda res: os.unlink(root_file))
1468 d.addCallback(run, "list-aliases")
1469 def _check_aliases_2((out,err)):
1470 self.failUnlessEqual(err, "")
1471 self.failUnlessEqual(out, "")
1472 d.addCallback(_check_aliases_2)
1474 d.addCallback(run, "mkdir")
1475 def _got_dir( (out,err) ):
1476 self.failUnless(uri.from_string_dirnode(out.strip()))
1478 d.addCallback(_got_dir)
1479 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1481 d.addCallback(run, "list-aliases")
1482 def _check_aliases_3((out,err)):
1483 self.failUnlessEqual(err, "")
1484 self.failUnless("tahoe: " in out)
1485 d.addCallback(_check_aliases_3)
1487 def _check_empty_dir((out,err)):
1488 self.failUnlessEqual(out, "")
1489 self.failUnlessEqual(err, "")
1490 d.addCallback(run, "ls")
1491 d.addCallback(_check_empty_dir)
1493 def _check_missing_dir((out,err)):
1494 # TODO: check that rc==2
1495 self.failUnlessEqual(out, "")
1496 self.failUnlessEqual(err, "No such file or directory\n")
1497 d.addCallback(run, "ls", "bogus")
1498 d.addCallback(_check_missing_dir)
1503 fn = os.path.join(self.basedir, "file%d" % i)
1505 data = "data to be uploaded: file%d\n" % i
1507 open(fn,"wb").write(data)
1509 def _check_stdout_against((out,err), filenum=None, data=None):
1510 self.failUnlessEqual(err, "")
1511 if filenum is not None:
1512 self.failUnlessEqual(out, datas[filenum])
1513 if data is not None:
1514 self.failUnlessEqual(out, data)
1516 # test all both forms of put: from a file, and from stdin
1518 d.addCallback(run, "put", files[0], "tahoe-file0")
1519 def _put_out((out,err)):
1520 self.failUnless("URI:LIT:" in out, out)
1521 self.failUnless("201 Created" in err, err)
1523 return run(None, "get", uri0)
1524 d.addCallback(_put_out)
1525 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1527 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1528 # tahoe put bar tahoe:FOO
1529 d.addCallback(run, "put", files[2], "tahoe:file2")
1530 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1531 def _check_put_mutable((out,err)):
1532 self._mutable_file3_uri = out.strip()
1533 d.addCallback(_check_put_mutable)
1534 d.addCallback(run, "get", "tahoe:file3")
1535 d.addCallback(_check_stdout_against, 3)
1538 STDIN_DATA = "This is the file to upload from stdin."
1539 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1540 # tahoe put tahoe:FOO
1541 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1542 stdin="Other file from stdin.")
1544 d.addCallback(run, "ls")
1545 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1546 "tahoe-file-stdin", "from-stdin"])
1547 d.addCallback(run, "ls", "subdir")
1548 d.addCallback(_check_ls, ["tahoe-file1"])
1551 d.addCallback(run, "mkdir", "subdir2")
1552 d.addCallback(run, "ls")
1553 # TODO: extract the URI, set an alias with it
1554 d.addCallback(_check_ls, ["subdir2"])
1556 # tahoe get: (to stdin and to a file)
1557 d.addCallback(run, "get", "tahoe-file0")
1558 d.addCallback(_check_stdout_against, 0)
1559 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1560 d.addCallback(_check_stdout_against, 1)
1561 outfile0 = os.path.join(self.basedir, "outfile0")
1562 d.addCallback(run, "get", "file2", outfile0)
1563 def _check_outfile0((out,err)):
1564 data = open(outfile0,"rb").read()
1565 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1566 d.addCallback(_check_outfile0)
1567 outfile1 = os.path.join(self.basedir, "outfile0")
1568 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1569 def _check_outfile1((out,err)):
1570 data = open(outfile1,"rb").read()
1571 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1572 d.addCallback(_check_outfile1)
1574 d.addCallback(run, "rm", "tahoe-file0")
1575 d.addCallback(run, "rm", "tahoe:file2")
1576 d.addCallback(run, "ls")
1577 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1579 d.addCallback(run, "ls", "-l")
1580 def _check_ls_l((out,err)):
1581 lines = out.split("\n")
1583 if "tahoe-file-stdin" in l:
1584 self.failUnless(l.startswith("-r-- "), l)
1585 self.failUnless(" %d " % len(STDIN_DATA) in l)
1587 self.failUnless(l.startswith("-rw- "), l) # mutable
1588 d.addCallback(_check_ls_l)
1590 d.addCallback(run, "ls", "--uri")
1591 def _check_ls_uri((out,err)):
1592 lines = out.split("\n")
1595 self.failUnless(self._mutable_file3_uri in l)
1596 d.addCallback(_check_ls_uri)
1598 d.addCallback(run, "ls", "--readonly-uri")
1599 def _check_ls_rouri((out,err)):
1600 lines = out.split("\n")
1603 rw_uri = self._mutable_file3_uri
1604 u = uri.from_string_mutable_filenode(rw_uri)
1605 ro_uri = u.get_readonly().to_string()
1606 self.failUnless(ro_uri in l)
1607 d.addCallback(_check_ls_rouri)
1610 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1611 d.addCallback(run, "ls")
1612 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1614 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1615 d.addCallback(run, "ls")
1616 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1618 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1619 d.addCallback(run, "ls")
1620 d.addCallback(_check_ls, ["file3", "file3-copy"])
1621 d.addCallback(run, "get", "tahoe:file3-copy")
1622 d.addCallback(_check_stdout_against, 3)
1624 # copy from disk into tahoe
1625 d.addCallback(run, "cp", files[4], "tahoe:file4")
1626 d.addCallback(run, "ls")
1627 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1628 d.addCallback(run, "get", "tahoe:file4")
1629 d.addCallback(_check_stdout_against, 4)
1631 # copy from tahoe into disk
1632 target_filename = os.path.join(self.basedir, "file-out")
1633 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1634 def _check_cp_out((out,err)):
1635 self.failUnless(os.path.exists(target_filename))
1636 got = open(target_filename,"rb").read()
1637 self.failUnlessEqual(got, datas[4])
1638 d.addCallback(_check_cp_out)
1640 # copy from disk to disk (silly case)
1641 target2_filename = os.path.join(self.basedir, "file-out-copy")
1642 d.addCallback(run, "cp", target_filename, target2_filename)
1643 def _check_cp_out2((out,err)):
1644 self.failUnless(os.path.exists(target2_filename))
1645 got = open(target2_filename,"rb").read()
1646 self.failUnlessEqual(got, datas[4])
1647 d.addCallback(_check_cp_out2)
1649 # copy from tahoe into disk, overwriting an existing file
1650 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1651 def _check_cp_out3((out,err)):
1652 self.failUnless(os.path.exists(target_filename))
1653 got = open(target_filename,"rb").read()
1654 self.failUnlessEqual(got, datas[3])
1655 d.addCallback(_check_cp_out3)
1657 # copy from disk into tahoe, overwriting an existing immutable file
1658 d.addCallback(run, "cp", files[5], "tahoe:file4")
1659 d.addCallback(run, "ls")
1660 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1661 d.addCallback(run, "get", "tahoe:file4")
1662 d.addCallback(_check_stdout_against, 5)
1664 # copy from disk into tahoe, overwriting an existing mutable file
1665 d.addCallback(run, "cp", files[5], "tahoe:file3")
1666 d.addCallback(run, "ls")
1667 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1668 d.addCallback(run, "get", "tahoe:file3")
1669 d.addCallback(_check_stdout_against, 5)
1671 # recursive copy: setup
1672 dn = os.path.join(self.basedir, "dir1")
1674 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1675 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1676 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1677 sdn2 = os.path.join(dn, "subdir2")
1679 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1680 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1682 # from disk into tahoe
1683 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1684 d.addCallback(run, "ls")
1685 d.addCallback(_check_ls, ["dir1"])
1686 d.addCallback(run, "ls", "dir1")
1687 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1688 ["rfile4", "rfile5"])
1689 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1690 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1691 ["rfile1", "rfile2", "rfile3"])
1692 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1693 d.addCallback(_check_stdout_against, data="rfile4")
1695 # and back out again
1696 dn_copy = os.path.join(self.basedir, "dir1-copy")
1697 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1698 def _check_cp_r_out((out,err)):
1700 old = open(os.path.join(dn, name), "rb").read()
1701 newfn = os.path.join(dn_copy, name)
1702 self.failUnless(os.path.exists(newfn))
1703 new = open(newfn, "rb").read()
1704 self.failUnlessEqual(old, new)
1708 _cmp(os.path.join("subdir2", "rfile4"))
1709 _cmp(os.path.join("subdir2", "rfile5"))
1710 d.addCallback(_check_cp_r_out)
1712 # and copy it a second time, which ought to overwrite the same files
1713 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1715 # and again, only writing filecaps
1716 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1717 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1718 def _check_capsonly((out,err)):
1719 # these should all be LITs
1720 x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1721 y = uri.from_string_filenode(x)
1722 self.failUnlessEqual(y.data, "rfile4")
1723 d.addCallback(_check_capsonly)
1725 # and tahoe-to-tahoe
1726 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1727 d.addCallback(run, "ls")
1728 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1729 d.addCallback(run, "ls", "dir1-copy")
1730 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1731 ["rfile4", "rfile5"])
1732 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1733 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1734 ["rfile1", "rfile2", "rfile3"])
1735 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1736 d.addCallback(_check_stdout_against, data="rfile4")
1738 # and copy it a second time, which ought to overwrite the same files
1739 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1741 # tahoe_ls doesn't currently handle the error correctly: it tries to
1742 # JSON-parse a traceback.
1743 ## def _ls_missing(res):
1744 ## argv = ["ls"] + nodeargs + ["bogus"]
1745 ## return self._run_cli(argv)
1746 ## d.addCallback(_ls_missing)
1747 ## def _check_ls_missing((out,err)):
1750 ## self.failUnlessEqual(err, "")
1751 ## d.addCallback(_check_ls_missing)
1755 def _run_cli(self, argv, stdin=""):
1757 stdout, stderr = StringIO(), StringIO()
1758 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1759 stdin=StringIO(stdin),
1760 stdout=stdout, stderr=stderr)
1762 return stdout.getvalue(), stderr.getvalue()
1763 d.addCallback(_done)
1766 def _test_checker(self, res):
1767 ut = upload.Data("too big to be literal" * 200, convergence=None)
1768 d = self._personal_node.add_file(u"big file", ut)
1770 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1771 def _check_dirnode_results(r):
1772 self.failUnless(r.is_healthy())
1773 d.addCallback(_check_dirnode_results)
1774 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1775 d.addCallback(_check_dirnode_results)
1777 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1778 def _got_chk_filenode(n):
1779 self.failUnless(isinstance(n, filenode.FileNode))
1780 d = n.check(Monitor())
1781 def _check_filenode_results(r):
1782 self.failUnless(r.is_healthy())
1783 d.addCallback(_check_filenode_results)
1784 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1785 d.addCallback(_check_filenode_results)
1787 d.addCallback(_got_chk_filenode)
1789 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1790 def _got_lit_filenode(n):
1791 self.failUnless(isinstance(n, filenode.LiteralFileNode))
1792 d = n.check(Monitor())
1793 def _check_lit_filenode_results(r):
1794 self.failUnlessEqual(r, None)
1795 d.addCallback(_check_lit_filenode_results)
1796 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1797 d.addCallback(_check_lit_filenode_results)
1799 d.addCallback(_got_lit_filenode)