1 from base64 import b32encode
2 import os, sys, time, re, simplejson
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
11 from allmydata import uri
12 from allmydata.storage.mutable import MutableShareFile
13 from allmydata.storage.server import si_a2b
14 from allmydata.immutable import download, filenode, offloaded, upload
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.scripts import runner
18 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
19 NoSuchChildError, NotEnoughSharesError
20 from allmydata.monitor import Monitor
21 from allmydata.mutable.common import NotMutableError
22 from allmydata.mutable import layout as mutable_layout
23 from foolscap.api import DeadReferenceError
24 from twisted.python.failure import Failure
25 from twisted.web.client import getPage
26 from twisted.web.error import Error
28 from allmydata.test.common import SystemTestMixin, MemoryConsumer, \
32 This is some data to publish to the virtual drive, which needs to be large
33 enough to not fit inside a LIT uri.
36 class CountingDataUploadable(upload.Data):
38 interrupt_after = None
39 interrupt_after_d = None
41 def read(self, length):
42 self.bytes_read += length
43 if self.interrupt_after is not None:
44 if self.bytes_read > self.interrupt_after:
45 self.interrupt_after = None
46 self.interrupt_after_d.callback(self)
47 return upload.Data.read(self, length)
49 class GrabEverythingConsumer:
55 def registerProducer(self, producer, streaming):
57 assert IPushProducer.providedBy(producer)
59 def write(self, data):
62 def unregisterProducer(self):
65 class SystemTest(SystemTestMixin, unittest.TestCase):
67 def test_connections(self):
68 self.basedir = "system/SystemTest/test_connections"
69 d = self.set_up_nodes()
70 self.extra_node = None
71 d.addCallback(lambda res: self.add_extra_node(self.numclients))
72 def _check(extra_node):
73 self.extra_node = extra_node
74 for c in self.clients:
75 all_peerids = list(c.get_all_serverids())
76 self.failUnlessEqual(len(all_peerids), self.numclients+1)
78 permuted_peers = list(sb.get_servers("a"))
79 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
82 def _shutdown_extra_node(res):
84 return self.extra_node.stopService()
86 d.addBoth(_shutdown_extra_node)
88 test_connections.timeout = 300
89 # test_connections is subsumed by test_upload_and_download, and takes
90 # quite a while to run on a slow machine (because of all the TLS
91 # connections that must be established). If we ever rework the introducer
92 # code to such an extent that we're not sure if it works anymore, we can
93 # reinstate this test until it does.
96 def test_upload_and_download_random_key(self):
97 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
98 return self._test_upload_and_download(convergence=None)
99 test_upload_and_download_random_key.timeout = 4800
101 def test_upload_and_download_convergent(self):
102 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
103 return self._test_upload_and_download(convergence="some convergence string")
104 test_upload_and_download_convergent.timeout = 4800
106 def _test_upload_and_download(self, convergence):
107 # we use 4000 bytes of data, which will result in about 400k written
108 # to disk among all our simulated nodes
109 DATA = "Some data to upload\n" * 200
110 d = self.set_up_nodes()
111 def _check_connections(res):
112 for c in self.clients:
113 all_peerids = list(c.get_all_serverids())
114 self.failUnlessEqual(len(all_peerids), self.numclients)
115 sb = c.storage_broker
116 permuted_peers = list(sb.get_servers("a"))
117 self.failUnlessEqual(len(permuted_peers), self.numclients)
118 d.addCallback(_check_connections)
122 u = self.clients[0].getServiceNamed("uploader")
124 # we crank the max segsize down to 1024b for the duration of this
125 # test, so we can exercise multiple segments. It is important
126 # that this is not a multiple of the segment size, so that the
127 # tail segment is not the same length as the others. This actualy
128 # gets rounded up to 1025 to be a multiple of the number of
129 # required shares (since we use 25 out of 100 FEC).
130 up = upload.Data(DATA, convergence=convergence)
131 up.max_segment_size = 1024
134 d.addCallback(_do_upload)
135 def _upload_done(results):
137 log.msg("upload finished: uri is %s" % (theuri,))
139 assert isinstance(self.uri, str), self.uri
140 dl = self.clients[1].getServiceNamed("downloader")
142 d.addCallback(_upload_done)
144 def _upload_again(res):
145 # Upload again. If using convergent encryption then this ought to be
146 # short-circuited, however with the way we currently generate URIs
147 # (i.e. because they include the roothash), we have to do all of the
148 # encoding work, and only get to save on the upload part.
149 log.msg("UPLOADING AGAIN")
150 up = upload.Data(DATA, convergence=convergence)
151 up.max_segment_size = 1024
152 d1 = self.uploader.upload(up)
153 d.addCallback(_upload_again)
155 def _download_to_data(res):
156 log.msg("DOWNLOADING")
157 return self.downloader.download_to_data(self.uri)
158 d.addCallback(_download_to_data)
159 def _download_to_data_done(data):
160 log.msg("download finished")
161 self.failUnlessEqual(data, DATA)
162 d.addCallback(_download_to_data_done)
164 target_filename = os.path.join(self.basedir, "download.target")
165 def _download_to_filename(res):
166 return self.downloader.download_to_filename(self.uri,
168 d.addCallback(_download_to_filename)
169 def _download_to_filename_done(res):
170 newdata = open(target_filename, "rb").read()
171 self.failUnlessEqual(newdata, DATA)
172 d.addCallback(_download_to_filename_done)
174 target_filename2 = os.path.join(self.basedir, "download.target2")
175 def _download_to_filehandle(res):
176 fh = open(target_filename2, "wb")
177 return self.downloader.download_to_filehandle(self.uri, fh)
178 d.addCallback(_download_to_filehandle)
179 def _download_to_filehandle_done(fh):
181 newdata = open(target_filename2, "rb").read()
182 self.failUnlessEqual(newdata, DATA)
183 d.addCallback(_download_to_filehandle_done)
185 consumer = GrabEverythingConsumer()
186 ct = download.ConsumerAdapter(consumer)
187 d.addCallback(lambda res:
188 self.downloader.download(self.uri, ct))
189 def _download_to_consumer_done(ign):
190 self.failUnlessEqual(consumer.contents, DATA)
191 d.addCallback(_download_to_consumer_done)
194 n = self.clients[1].create_node_from_uri(self.uri)
195 d = download_to_data(n)
196 def _read_done(data):
197 self.failUnlessEqual(data, DATA)
198 d.addCallback(_read_done)
199 d.addCallback(lambda ign:
200 n.read(MemoryConsumer(), offset=1, size=4))
201 def _read_portion_done(mc):
202 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
203 d.addCallback(_read_portion_done)
204 d.addCallback(lambda ign:
205 n.read(MemoryConsumer(), offset=2, size=None))
206 def _read_tail_done(mc):
207 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
208 d.addCallback(_read_tail_done)
209 d.addCallback(lambda ign:
210 n.read(MemoryConsumer(), size=len(DATA)+1000))
211 def _read_too_much(mc):
212 self.failUnlessEqual("".join(mc.chunks), DATA)
213 d.addCallback(_read_too_much)
216 d.addCallback(_test_read)
218 def _test_bad_read(res):
219 bad_u = uri.from_string_filenode(self.uri)
220 bad_u.key = self.flip_bit(bad_u.key)
221 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
222 # this should cause an error during download
224 d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
226 bad_n.read, MemoryConsumer(), offset=2)
228 d.addCallback(_test_bad_read)
230 def _download_nonexistent_uri(res):
231 baduri = self.mangle_uri(self.uri)
232 log.msg("about to download non-existent URI", level=log.UNUSUAL,
233 facility="tahoe.tests")
234 d1 = self.downloader.download_to_data(baduri)
235 def _baduri_should_fail(res):
236 log.msg("finished downloading non-existend URI",
237 level=log.UNUSUAL, facility="tahoe.tests")
238 self.failUnless(isinstance(res, Failure))
239 self.failUnless(res.check(NotEnoughSharesError),
240 "expected NotEnoughSharesError, got %s" % res)
241 # TODO: files that have zero peers should get a special kind
242 # of NotEnoughSharesError, which can be used to suggest that
243 # the URI might be wrong or that they've never uploaded the
244 # file in the first place.
245 d1.addBoth(_baduri_should_fail)
247 d.addCallback(_download_nonexistent_uri)
249 # add a new node, which doesn't accept shares, and only uses the
251 d.addCallback(lambda res: self.add_extra_node(self.numclients,
253 add_to_sparent=True))
254 def _added(extra_node):
255 self.extra_node = extra_node
256 d.addCallback(_added)
258 HELPER_DATA = "Data that needs help to upload" * 1000
259 def _upload_with_helper(res):
260 u = upload.Data(HELPER_DATA, convergence=convergence)
261 d = self.extra_node.upload(u)
262 def _uploaded(results):
264 return self.downloader.download_to_data(uri)
265 d.addCallback(_uploaded)
267 self.failUnlessEqual(newdata, HELPER_DATA)
268 d.addCallback(_check)
270 d.addCallback(_upload_with_helper)
272 def _upload_duplicate_with_helper(res):
273 u = upload.Data(HELPER_DATA, convergence=convergence)
274 u.debug_stash_RemoteEncryptedUploadable = True
275 d = self.extra_node.upload(u)
276 def _uploaded(results):
278 return self.downloader.download_to_data(uri)
279 d.addCallback(_uploaded)
281 self.failUnlessEqual(newdata, HELPER_DATA)
282 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
283 "uploadable started uploading, should have been avoided")
284 d.addCallback(_check)
286 if convergence is not None:
287 d.addCallback(_upload_duplicate_with_helper)
289 def _upload_resumable(res):
290 DATA = "Data that needs help to upload and gets interrupted" * 1000
291 u1 = CountingDataUploadable(DATA, convergence=convergence)
292 u2 = CountingDataUploadable(DATA, convergence=convergence)
294 # we interrupt the connection after about 5kB by shutting down
295 # the helper, then restartingit.
296 u1.interrupt_after = 5000
297 u1.interrupt_after_d = defer.Deferred()
298 u1.interrupt_after_d.addCallback(lambda res:
299 self.bounce_client(0))
301 # sneak into the helper and reduce its chunk size, so that our
302 # debug_interrupt will sever the connection on about the fifth
303 # chunk fetched. This makes sure that we've started to write the
304 # new shares before we abandon them, which exercises the
305 # abort/delete-partial-share code. TODO: find a cleaner way to do
306 # this. I know that this will affect later uses of the helper in
307 # this same test run, but I'm not currently worried about it.
308 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
310 d = self.extra_node.upload(u1)
312 def _should_not_finish(res):
313 self.fail("interrupted upload should have failed, not finished"
314 " with result %s" % (res,))
316 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
318 # make sure we actually interrupted it before finishing the
320 self.failUnless(u1.bytes_read < len(DATA),
321 "read %d out of %d total" % (u1.bytes_read,
324 log.msg("waiting for reconnect", level=log.NOISY,
325 facility="tahoe.test.test_system")
326 # now, we need to give the nodes a chance to notice that this
327 # connection has gone away. When this happens, the storage
328 # servers will be told to abort their uploads, removing the
329 # partial shares. Unfortunately this involves TCP messages
330 # going through the loopback interface, and we can't easily
331 # predict how long that will take. If it were all local, we
332 # could use fireEventually() to stall. Since we don't have
333 # the right introduction hooks, the best we can do is use a
334 # fixed delay. TODO: this is fragile.
335 u1.interrupt_after_d.addCallback(self.stall, 2.0)
336 return u1.interrupt_after_d
337 d.addCallbacks(_should_not_finish, _interrupted)
339 def _disconnected(res):
340 # check to make sure the storage servers aren't still hanging
341 # on to the partial share: their incoming/ directories should
343 log.msg("disconnected", level=log.NOISY,
344 facility="tahoe.test.test_system")
345 for i in range(self.numclients):
346 incdir = os.path.join(self.getdir("client%d" % i),
347 "storage", "shares", "incoming")
348 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
349 d.addCallback(_disconnected)
351 # then we need to give the reconnector a chance to
352 # reestablish the connection to the helper.
353 d.addCallback(lambda res:
354 log.msg("wait_for_connections", level=log.NOISY,
355 facility="tahoe.test.test_system"))
356 d.addCallback(lambda res: self.wait_for_connections())
359 d.addCallback(lambda res:
360 log.msg("uploading again", level=log.NOISY,
361 facility="tahoe.test.test_system"))
362 d.addCallback(lambda res: self.extra_node.upload(u2))
364 def _uploaded(results):
366 log.msg("Second upload complete", level=log.NOISY,
367 facility="tahoe.test.test_system")
369 # this is really bytes received rather than sent, but it's
370 # convenient and basically measures the same thing
371 bytes_sent = results.ciphertext_fetched
373 # We currently don't support resumption of upload if the data is
374 # encrypted with a random key. (Because that would require us
375 # to store the key locally and re-use it on the next upload of
376 # this file, which isn't a bad thing to do, but we currently
378 if convergence is not None:
379 # Make sure we did not have to read the whole file the
380 # second time around .
381 self.failUnless(bytes_sent < len(DATA),
382 "resumption didn't save us any work:"
383 " read %d bytes out of %d total" %
384 (bytes_sent, len(DATA)))
386 # Make sure we did have to read the whole file the second
387 # time around -- because the one that we partially uploaded
388 # earlier was encrypted with a different random key.
389 self.failIf(bytes_sent < len(DATA),
390 "resumption saved us some work even though we were using random keys:"
391 " read %d bytes out of %d total" %
392 (bytes_sent, len(DATA)))
393 return self.downloader.download_to_data(uri)
394 d.addCallback(_uploaded)
397 self.failUnlessEqual(newdata, DATA)
398 # If using convergent encryption, then also check that the
399 # helper has removed the temp file from its directories.
400 if convergence is not None:
401 basedir = os.path.join(self.getdir("client0"), "helper")
402 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
403 self.failUnlessEqual(files, [])
404 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
405 self.failUnlessEqual(files, [])
406 d.addCallback(_check)
408 d.addCallback(_upload_resumable)
410 def _grab_stats(ignored):
411 # the StatsProvider doesn't normally publish a FURL:
412 # instead it passes a live reference to the StatsGatherer
413 # (if and when it connects). To exercise the remote stats
414 # interface, we manually publish client0's StatsProvider
415 # and use client1 to query it.
416 sp = self.clients[0].stats_provider
417 sp_furl = self.clients[0].tub.registerReference(sp)
418 d = self.clients[1].tub.getReference(sp_furl)
419 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
420 def _got_stats(stats):
422 #from pprint import pprint
425 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
426 c = stats["counters"]
427 self.failUnless("storage_server.allocate" in c)
428 d.addCallback(_got_stats)
430 d.addCallback(_grab_stats)
434 def _find_shares(self, basedir):
436 for (dirpath, dirnames, filenames) in os.walk(basedir):
437 if "storage" not in dirpath:
441 pieces = dirpath.split(os.sep)
443 and pieces[-4] == "storage"
444 and pieces[-3] == "shares"):
445 # we're sitting in .../storage/shares/$START/$SINDEX , and there
446 # are sharefiles here
447 assert pieces[-5].startswith("client")
448 client_num = int(pieces[-5][-1])
449 storage_index_s = pieces[-1]
450 storage_index = si_a2b(storage_index_s)
451 for sharename in filenames:
452 shnum = int(sharename)
453 filename = os.path.join(dirpath, sharename)
454 data = (client_num, storage_index, filename, shnum)
457 self.fail("unable to find any share files in %s" % basedir)
460 def _corrupt_mutable_share(self, filename, which):
461 msf = MutableShareFile(filename)
462 datav = msf.readv([ (0, 1000000) ])
463 final_share = datav[0]
464 assert len(final_share) < 1000000 # ought to be truncated
465 pieces = mutable_layout.unpack_share(final_share)
466 (seqnum, root_hash, IV, k, N, segsize, datalen,
467 verification_key, signature, share_hash_chain, block_hash_tree,
468 share_data, enc_privkey) = pieces
470 if which == "seqnum":
473 root_hash = self.flip_bit(root_hash)
475 IV = self.flip_bit(IV)
476 elif which == "segsize":
477 segsize = segsize + 15
478 elif which == "pubkey":
479 verification_key = self.flip_bit(verification_key)
480 elif which == "signature":
481 signature = self.flip_bit(signature)
482 elif which == "share_hash_chain":
483 nodenum = share_hash_chain.keys()[0]
484 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
485 elif which == "block_hash_tree":
486 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
487 elif which == "share_data":
488 share_data = self.flip_bit(share_data)
489 elif which == "encprivkey":
490 enc_privkey = self.flip_bit(enc_privkey)
492 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
494 final_share = mutable_layout.pack_share(prefix,
501 msf.writev( [(0, final_share)], None)
504 def test_mutable(self):
505 self.basedir = "system/SystemTest/test_mutable"
506 DATA = "initial contents go here." # 25 bytes % 3 != 0
507 NEWDATA = "new contents yay"
508 NEWERDATA = "this is getting old"
510 d = self.set_up_nodes(use_key_generator=True)
512 def _create_mutable(res):
514 log.msg("starting create_mutable_file")
515 d1 = c.create_mutable_file(DATA)
517 log.msg("DONE: %s" % (res,))
518 self._mutable_node_1 = res
520 d1.addCallback(_done)
522 d.addCallback(_create_mutable)
524 def _test_debug(res):
525 # find a share. It is important to run this while there is only
526 # one slot in the grid.
527 shares = self._find_shares(self.basedir)
528 (client_num, storage_index, filename, shnum) = shares[0]
529 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
531 log.msg(" for clients[%d]" % client_num)
533 out,err = StringIO(), StringIO()
534 rc = runner.runner(["debug", "dump-share", "--offsets",
536 stdout=out, stderr=err)
537 output = out.getvalue()
538 self.failUnlessEqual(rc, 0)
540 self.failUnless("Mutable slot found:\n" in output)
541 self.failUnless("share_type: SDMF\n" in output)
542 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
543 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
544 self.failUnless(" num_extra_leases: 0\n" in output)
545 # the pubkey size can vary by a byte, so the container might
546 # be a bit larger on some runs.
547 m = re.search(r'^ container_size: (\d+)$', output, re.M)
549 container_size = int(m.group(1))
550 self.failUnless(2037 <= container_size <= 2049, container_size)
551 m = re.search(r'^ data_length: (\d+)$', output, re.M)
553 data_length = int(m.group(1))
554 self.failUnless(2037 <= data_length <= 2049, data_length)
555 self.failUnless(" secrets are for nodeid: %s\n" % peerid
557 self.failUnless(" SDMF contents:\n" in output)
558 self.failUnless(" seqnum: 1\n" in output)
559 self.failUnless(" required_shares: 3\n" in output)
560 self.failUnless(" total_shares: 10\n" in output)
561 self.failUnless(" segsize: 27\n" in output, (output, filename))
562 self.failUnless(" datalen: 25\n" in output)
563 # the exact share_hash_chain nodes depends upon the sharenum,
564 # and is more of a hassle to compute than I want to deal with
566 self.failUnless(" share_hash_chain: " in output)
567 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
568 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
569 base32.b2a(storage_index))
570 self.failUnless(expected in output)
571 except unittest.FailTest:
573 print "dump-share output was:"
576 d.addCallback(_test_debug)
580 # first, let's see if we can use the existing node to retrieve the
581 # contents. This allows it to use the cached pubkey and maybe the
582 # latest-known sharemap.
584 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
585 def _check_download_1(res):
586 self.failUnlessEqual(res, DATA)
587 # now we see if we can retrieve the data from a new node,
588 # constructed using the URI of the original one. We do this test
589 # on the same client that uploaded the data.
590 uri = self._mutable_node_1.get_uri()
591 log.msg("starting retrieve1")
592 newnode = self.clients[0].create_node_from_uri(uri)
593 newnode_2 = self.clients[0].create_node_from_uri(uri)
594 self.failUnlessIdentical(newnode, newnode_2)
595 return newnode.download_best_version()
596 d.addCallback(_check_download_1)
598 def _check_download_2(res):
599 self.failUnlessEqual(res, DATA)
600 # same thing, but with a different client
601 uri = self._mutable_node_1.get_uri()
602 newnode = self.clients[1].create_node_from_uri(uri)
603 log.msg("starting retrieve2")
604 d1 = newnode.download_best_version()
605 d1.addCallback(lambda res: (res, newnode))
607 d.addCallback(_check_download_2)
609 def _check_download_3((res, newnode)):
610 self.failUnlessEqual(res, DATA)
612 log.msg("starting replace1")
613 d1 = newnode.overwrite(NEWDATA)
614 d1.addCallback(lambda res: newnode.download_best_version())
616 d.addCallback(_check_download_3)
618 def _check_download_4(res):
619 self.failUnlessEqual(res, NEWDATA)
620 # now create an even newer node and replace the data on it. This
621 # new node has never been used for download before.
622 uri = self._mutable_node_1.get_uri()
623 newnode1 = self.clients[2].create_node_from_uri(uri)
624 newnode2 = self.clients[3].create_node_from_uri(uri)
625 self._newnode3 = self.clients[3].create_node_from_uri(uri)
626 log.msg("starting replace2")
627 d1 = newnode1.overwrite(NEWERDATA)
628 d1.addCallback(lambda res: newnode2.download_best_version())
630 d.addCallback(_check_download_4)
632 def _check_download_5(res):
633 log.msg("finished replace2")
634 self.failUnlessEqual(res, NEWERDATA)
635 d.addCallback(_check_download_5)
637 def _corrupt_shares(res):
638 # run around and flip bits in all but k of the shares, to test
640 shares = self._find_shares(self.basedir)
641 ## sort by share number
642 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
643 where = dict([ (shnum, filename)
644 for (client_num, storage_index, filename, shnum)
646 assert len(where) == 10 # this test is designed for 3-of-10
647 for shnum, filename in where.items():
648 # shares 7,8,9 are left alone. read will check
649 # (share_hash_chain, block_hash_tree, share_data). New
650 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
651 # segsize, signature).
653 # read: this will trigger "pubkey doesn't match
655 self._corrupt_mutable_share(filename, "pubkey")
656 self._corrupt_mutable_share(filename, "encprivkey")
658 # triggers "signature is invalid"
659 self._corrupt_mutable_share(filename, "seqnum")
661 # triggers "signature is invalid"
662 self._corrupt_mutable_share(filename, "R")
664 # triggers "signature is invalid"
665 self._corrupt_mutable_share(filename, "segsize")
667 self._corrupt_mutable_share(filename, "share_hash_chain")
669 self._corrupt_mutable_share(filename, "block_hash_tree")
671 self._corrupt_mutable_share(filename, "share_data")
672 # other things to correct: IV, signature
673 # 7,8,9 are left alone
675 # note that initial_query_count=5 means that we'll hit the
676 # first 5 servers in effectively random order (based upon
677 # response time), so we won't necessarily ever get a "pubkey
678 # doesn't match fingerprint" error (if we hit shnum>=1 before
679 # shnum=0, we pull the pubkey from there). To get repeatable
680 # specific failures, we need to set initial_query_count=1,
681 # but of course that will change the sequencing behavior of
682 # the retrieval process. TODO: find a reasonable way to make
683 # this a parameter, probably when we expand this test to test
684 # for one failure mode at a time.
686 # when we retrieve this, we should get three signature
687 # failures (where we've mangled seqnum, R, and segsize). The
689 d.addCallback(_corrupt_shares)
691 d.addCallback(lambda res: self._newnode3.download_best_version())
692 d.addCallback(_check_download_5)
694 def _check_empty_file(res):
695 # make sure we can create empty files, this usually screws up the
697 d1 = self.clients[2].create_mutable_file("")
698 d1.addCallback(lambda newnode: newnode.download_best_version())
699 d1.addCallback(lambda res: self.failUnlessEqual("", res))
701 d.addCallback(_check_empty_file)
703 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
704 def _created_dirnode(dnode):
705 log.msg("_created_dirnode(%s)" % (dnode,))
707 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
708 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
709 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
710 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
711 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
712 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
713 d1.addCallback(lambda res: dnode.build_manifest().when_done())
714 d1.addCallback(lambda res:
715 self.failUnlessEqual(len(res["manifest"]), 1))
717 d.addCallback(_created_dirnode)
719 def wait_for_c3_kg_conn():
720 return self.clients[3]._key_generator is not None
721 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
723 def check_kg_poolsize(junk, size_delta):
724 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
725 self.key_generator_svc.key_generator.pool_size + size_delta)
727 d.addCallback(check_kg_poolsize, 0)
728 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
729 d.addCallback(check_kg_poolsize, -1)
730 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
731 d.addCallback(check_kg_poolsize, -2)
732 # use_helper induces use of clients[3], which is the using-key_gen client
733 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
734 d.addCallback(check_kg_poolsize, -3)
737 # The default 120 second timeout went off when running it under valgrind
738 # on my old Windows laptop, so I'm bumping up the timeout.
739 test_mutable.timeout = 240
741 def flip_bit(self, good):
742 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
744 def mangle_uri(self, gooduri):
745 # change the key, which changes the storage index, which means we'll
746 # be asking about the wrong file, so nobody will have any shares
747 u = IFileURI(gooduri)
748 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
749 uri_extension_hash=u.uri_extension_hash,
750 needed_shares=u.needed_shares,
751 total_shares=u.total_shares,
753 return u2.to_string()
755 # TODO: add a test which mangles the uri_extension_hash instead, and
756 # should fail due to not being able to get a valid uri_extension block.
757 # Also a test which sneakily mangles the uri_extension block to change
758 # some of the validation data, so it will fail in the post-download phase
759 # when the file's crypttext integrity check fails. Do the same thing for
760 # the key, which should cause the download to fail the post-download
761 # plaintext_hash check.
763 def test_vdrive(self):
764 self.basedir = "system/SystemTest/test_vdrive"
765 self.data = LARGE_DATA
766 d = self.set_up_nodes(use_stats_gatherer=True)
767 d.addCallback(self._test_introweb)
768 d.addCallback(self.log, "starting publish")
769 d.addCallback(self._do_publish1)
770 d.addCallback(self._test_runner)
771 d.addCallback(self._do_publish2)
772 # at this point, we have the following filesystem (where "R" denotes
773 # self._root_directory_uri):
776 # R/subdir1/mydata567
778 # R/subdir1/subdir2/mydata992
780 d.addCallback(lambda res: self.bounce_client(0))
781 d.addCallback(self.log, "bounced client0")
783 d.addCallback(self._check_publish1)
784 d.addCallback(self.log, "did _check_publish1")
785 d.addCallback(self._check_publish2)
786 d.addCallback(self.log, "did _check_publish2")
787 d.addCallback(self._do_publish_private)
788 d.addCallback(self.log, "did _do_publish_private")
789 # now we also have (where "P" denotes a new dir):
790 # P/personal/sekrit data
791 # P/s2-rw -> /subdir1/subdir2/
792 # P/s2-ro -> /subdir1/subdir2/ (read-only)
793 d.addCallback(self._check_publish_private)
794 d.addCallback(self.log, "did _check_publish_private")
795 d.addCallback(self._test_web)
796 d.addCallback(self._test_control)
797 d.addCallback(self._test_cli)
798 # P now has four top-level children:
799 # P/personal/sekrit data
802 # P/test_put/ (empty)
803 d.addCallback(self._test_checker)
805 test_vdrive.timeout = 1100
807 def _test_introweb(self, res):
808 d = getPage(self.introweb_url, method="GET", followRedirect=True)
811 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
813 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
814 self.failUnless("Subscription Summary: storage: 5" in res)
815 except unittest.FailTest:
817 print "GET %s output was:" % self.introweb_url
820 d.addCallback(_check)
821 d.addCallback(lambda res:
822 getPage(self.introweb_url + "?t=json",
823 method="GET", followRedirect=True))
824 def _check_json(res):
825 data = simplejson.loads(res)
827 self.failUnlessEqual(data["subscription_summary"],
829 self.failUnlessEqual(data["announcement_summary"],
830 {"storage": 5, "stub_client": 5})
831 self.failUnlessEqual(data["announcement_distinct_hosts"],
832 {"storage": 1, "stub_client": 1})
833 except unittest.FailTest:
835 print "GET %s?t=json output was:" % self.introweb_url
838 d.addCallback(_check_json)
841 def _do_publish1(self, res):
842 ut = upload.Data(self.data, convergence=None)
844 d = c0.create_empty_dirnode()
845 def _made_root(new_dirnode):
846 self._root_directory_uri = new_dirnode.get_uri()
847 return c0.create_node_from_uri(self._root_directory_uri)
848 d.addCallback(_made_root)
849 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
850 def _made_subdir1(subdir1_node):
851 self._subdir1_node = subdir1_node
852 d1 = subdir1_node.add_file(u"mydata567", ut)
853 d1.addCallback(self.log, "publish finished")
854 def _stash_uri(filenode):
855 self.uri = filenode.get_uri()
856 assert isinstance(self.uri, str), (self.uri, filenode)
857 d1.addCallback(_stash_uri)
859 d.addCallback(_made_subdir1)
862 def _do_publish2(self, res):
863 ut = upload.Data(self.data, convergence=None)
864 d = self._subdir1_node.create_empty_directory(u"subdir2")
865 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
868 def log(self, res, *args, **kwargs):
869 # print "MSG: %s RES: %s" % (msg, args)
870 log.msg(*args, **kwargs)
873 def _do_publish_private(self, res):
874 self.smalldata = "sssh, very secret stuff"
875 ut = upload.Data(self.smalldata, convergence=None)
876 d = self.clients[0].create_empty_dirnode()
877 d.addCallback(self.log, "GOT private directory")
878 def _got_new_dir(privnode):
879 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
880 d1 = privnode.create_empty_directory(u"personal")
881 d1.addCallback(self.log, "made P/personal")
882 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
883 d1.addCallback(self.log, "made P/personal/sekrit data")
884 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
886 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
887 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
889 d1.addCallback(_got_s2)
890 d1.addCallback(lambda res: privnode)
892 d.addCallback(_got_new_dir)
895 def _check_publish1(self, res):
896 # this one uses the iterative API
898 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
899 d.addCallback(self.log, "check_publish1 got /")
900 d.addCallback(lambda root: root.get(u"subdir1"))
901 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
902 d.addCallback(lambda filenode: filenode.download_to_data())
903 d.addCallback(self.log, "get finished")
905 self.failUnlessEqual(data, self.data)
906 d.addCallback(_get_done)
909 def _check_publish2(self, res):
910 # this one uses the path-based API
911 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
912 d = rootnode.get_child_at_path(u"subdir1")
913 d.addCallback(lambda dirnode:
914 self.failUnless(IDirectoryNode.providedBy(dirnode)))
915 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
916 d.addCallback(lambda filenode: filenode.download_to_data())
917 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
919 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
920 def _got_filenode(filenode):
921 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
922 assert fnode == filenode
923 d.addCallback(_got_filenode)
926 def _check_publish_private(self, resnode):
927 # this one uses the path-based API
928 self._private_node = resnode
930 d = self._private_node.get_child_at_path(u"personal")
931 def _got_personal(personal):
932 self._personal_node = personal
934 d.addCallback(_got_personal)
936 d.addCallback(lambda dirnode:
937 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
939 return self._private_node.get_child_at_path(path)
941 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
942 d.addCallback(lambda filenode: filenode.download_to_data())
943 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
944 d.addCallback(lambda res: get_path(u"s2-rw"))
945 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
946 d.addCallback(lambda res: get_path(u"s2-ro"))
947 def _got_s2ro(dirnode):
948 self.failUnless(dirnode.is_mutable(), dirnode)
949 self.failUnless(dirnode.is_readonly(), dirnode)
950 d1 = defer.succeed(None)
951 d1.addCallback(lambda res: dirnode.list())
952 d1.addCallback(self.log, "dirnode.list")
954 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
956 d1.addCallback(self.log, "doing add_file(ro)")
957 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
958 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
960 d1.addCallback(self.log, "doing get(ro)")
961 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
962 d1.addCallback(lambda filenode:
963 self.failUnless(IFileNode.providedBy(filenode)))
965 d1.addCallback(self.log, "doing delete(ro)")
966 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
968 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
970 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
972 personal = self._personal_node
973 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
975 d1.addCallback(self.log, "doing move_child_to(ro)2")
976 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
978 d1.addCallback(self.log, "finished with _got_s2ro")
980 d.addCallback(_got_s2ro)
981 def _got_home(dummy):
982 home = self._private_node
983 personal = self._personal_node
984 d1 = defer.succeed(None)
985 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
986 d1.addCallback(lambda res:
987 personal.move_child_to(u"sekrit data",home,u"sekrit"))
989 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
990 d1.addCallback(lambda res:
991 home.move_child_to(u"sekrit", home, u"sekrit data"))
993 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
994 d1.addCallback(lambda res:
995 home.move_child_to(u"sekrit data", personal))
997 d1.addCallback(lambda res: home.build_manifest().when_done())
998 d1.addCallback(self.log, "manifest")
1002 # P/personal/sekrit data
1003 # P/s2-rw (same as P/s2-ro)
1004 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1005 d1.addCallback(lambda res:
1006 self.failUnlessEqual(len(res["manifest"]), 5))
1007 d1.addCallback(lambda res: home.start_deep_stats().when_done())
1008 def _check_stats(stats):
1009 expected = {"count-immutable-files": 1,
1010 "count-mutable-files": 0,
1011 "count-literal-files": 1,
1013 "count-directories": 3,
1014 "size-immutable-files": 112,
1015 "size-literal-files": 23,
1016 #"size-directories": 616, # varies
1017 #"largest-directory": 616,
1018 "largest-directory-children": 3,
1019 "largest-immutable-file": 112,
1021 for k,v in expected.iteritems():
1022 self.failUnlessEqual(stats[k], v,
1023 "stats[%s] was %s, not %s" %
1025 self.failUnless(stats["size-directories"] > 1300,
1026 stats["size-directories"])
1027 self.failUnless(stats["largest-directory"] > 800,
1028 stats["largest-directory"])
1029 self.failUnlessEqual(stats["size-files-histogram"],
1030 [ (11, 31, 1), (101, 316, 1) ])
1031 d1.addCallback(_check_stats)
1033 d.addCallback(_got_home)
1036 def shouldFail(self, res, expected_failure, which, substring=None):
1037 if isinstance(res, Failure):
1038 res.trap(expected_failure)
1040 self.failUnless(substring in str(res),
1041 "substring '%s' not in '%s'"
1042 % (substring, str(res)))
1044 self.fail("%s was supposed to raise %s, not get '%s'" %
1045 (which, expected_failure, res))
1047 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1048 assert substring is None or isinstance(substring, str)
1049 d = defer.maybeDeferred(callable, *args, **kwargs)
1051 if isinstance(res, Failure):
1052 res.trap(expected_failure)
1054 self.failUnless(substring in str(res),
1055 "substring '%s' not in '%s'"
1056 % (substring, str(res)))
1058 self.fail("%s was supposed to raise %s, not get '%s'" %
1059 (which, expected_failure, res))
1063 def PUT(self, urlpath, data):
1064 url = self.webish_url + urlpath
1065 return getPage(url, method="PUT", postdata=data)
1067 def GET(self, urlpath, followRedirect=False):
1068 url = self.webish_url + urlpath
1069 return getPage(url, method="GET", followRedirect=followRedirect)
1071 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1073 url = self.helper_webish_url + urlpath
1075 url = self.webish_url + urlpath
1076 sepbase = "boogabooga"
1077 sep = "--" + sepbase
1080 form.append('Content-Disposition: form-data; name="_charset"')
1082 form.append('UTF-8')
1084 for name, value in fields.iteritems():
1085 if isinstance(value, tuple):
1086 filename, value = value
1087 form.append('Content-Disposition: form-data; name="%s"; '
1088 'filename="%s"' % (name, filename.encode("utf-8")))
1090 form.append('Content-Disposition: form-data; name="%s"' % name)
1092 form.append(str(value))
1095 body = "\r\n".join(form) + "\r\n"
1096 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1098 return getPage(url, method="POST", postdata=body,
1099 headers=headers, followRedirect=followRedirect)
1101 def _test_web(self, res):
1102 base = self.webish_url
1103 public = "uri/" + self._root_directory_uri
1105 def _got_welcome(page):
1106 # XXX This test is oversensitive to formatting
1107 expected = "Connected to <span>%d</span>\n of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1108 self.failUnless(expected in page,
1109 "I didn't see the right 'connected storage servers'"
1110 " message in: %s" % page
1112 expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1113 self.failUnless(expected in page,
1114 "I didn't see the right 'My nodeid' message "
1116 self.failUnless("Helper: 0 active uploads" in page)
1117 d.addCallback(_got_welcome)
1118 d.addCallback(self.log, "done with _got_welcome")
1120 # get the welcome page from the node that uses the helper too
1121 d.addCallback(lambda res: getPage(self.helper_webish_url))
1122 def _got_welcome_helper(page):
1123 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1125 self.failUnless("Not running helper" in page)
1126 d.addCallback(_got_welcome_helper)
1128 d.addCallback(lambda res: getPage(base + public))
1129 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1130 def _got_subdir1(page):
1131 # there ought to be an href for our file
1132 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1133 self.failUnless(">mydata567</a>" in page)
1134 d.addCallback(_got_subdir1)
1135 d.addCallback(self.log, "done with _got_subdir1")
1136 d.addCallback(lambda res:
1137 getPage(base + public + "/subdir1/mydata567"))
1138 def _got_data(page):
1139 self.failUnlessEqual(page, self.data)
1140 d.addCallback(_got_data)
1142 # download from a URI embedded in a URL
1143 d.addCallback(self.log, "_get_from_uri")
1144 def _get_from_uri(res):
1145 return getPage(base + "uri/%s?filename=%s"
1146 % (self.uri, "mydata567"))
1147 d.addCallback(_get_from_uri)
1148 def _got_from_uri(page):
1149 self.failUnlessEqual(page, self.data)
1150 d.addCallback(_got_from_uri)
1152 # download from a URI embedded in a URL, second form
1153 d.addCallback(self.log, "_get_from_uri2")
1154 def _get_from_uri2(res):
1155 return getPage(base + "uri?uri=%s" % (self.uri,))
1156 d.addCallback(_get_from_uri2)
1157 d.addCallback(_got_from_uri)
1159 # download from a bogus URI, make sure we get a reasonable error
1160 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1161 def _get_from_bogus_uri(res):
1162 d1 = getPage(base + "uri/%s?filename=%s"
1163 % (self.mangle_uri(self.uri), "mydata567"))
1164 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1167 d.addCallback(_get_from_bogus_uri)
1168 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1170 # upload a file with PUT
1171 d.addCallback(self.log, "about to try PUT")
1172 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1173 "new.txt contents"))
1174 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1175 d.addCallback(self.failUnlessEqual, "new.txt contents")
1176 # and again with something large enough to use multiple segments,
1177 # and hopefully trigger pauseProducing too
1178 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1179 "big" * 500000)) # 1.5MB
1180 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1181 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1183 # can we replace files in place?
1184 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1186 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1187 d.addCallback(self.failUnlessEqual, "NEWER contents")
1189 # test unlinked POST
1190 d.addCallback(lambda res: self.POST("uri", t="upload",
1191 file=("new.txt", "data" * 10000)))
1192 # and again using the helper, which exercises different upload-status
1194 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1195 file=("foo.txt", "data2" * 10000)))
1197 # check that the status page exists
1198 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1199 def _got_status(res):
1200 # find an interesting upload and download to look at. LIT files
1201 # are not interesting.
1202 for ds in self.clients[0].list_all_download_statuses():
1203 if ds.get_size() > 200:
1204 self._down_status = ds.get_counter()
1205 for us in self.clients[0].list_all_upload_statuses():
1206 if us.get_size() > 200:
1207 self._up_status = us.get_counter()
1208 rs = list(self.clients[0].list_all_retrieve_statuses())[0]
1209 self._retrieve_status = rs.get_counter()
1210 ps = list(self.clients[0].list_all_publish_statuses())[0]
1211 self._publish_status = ps.get_counter()
1212 us = list(self.clients[0].list_all_mapupdate_statuses())[0]
1213 self._update_status = us.get_counter()
1215 # and that there are some upload- and download- status pages
1216 return self.GET("status/up-%d" % self._up_status)
1217 d.addCallback(_got_status)
1219 return self.GET("status/down-%d" % self._down_status)
1220 d.addCallback(_got_up)
1222 return self.GET("status/mapupdate-%d" % self._update_status)
1223 d.addCallback(_got_down)
1224 def _got_update(res):
1225 return self.GET("status/publish-%d" % self._publish_status)
1226 d.addCallback(_got_update)
1227 def _got_publish(res):
1228 return self.GET("status/retrieve-%d" % self._retrieve_status)
1229 d.addCallback(_got_publish)
1231 # check that the helper status page exists
1232 d.addCallback(lambda res:
1233 self.GET("helper_status", followRedirect=True))
1234 def _got_helper_status(res):
1235 self.failUnless("Bytes Fetched:" in res)
1236 # touch a couple of files in the helper's working directory to
1237 # exercise more code paths
1238 workdir = os.path.join(self.getdir("client0"), "helper")
1239 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1240 f = open(incfile, "wb")
1241 f.write("small file")
1243 then = time.time() - 86400*3
1245 os.utime(incfile, (now, then))
1246 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1247 f = open(encfile, "wb")
1248 f.write("less small file")
1250 os.utime(encfile, (now, then))
1251 d.addCallback(_got_helper_status)
1252 # and that the json form exists
1253 d.addCallback(lambda res:
1254 self.GET("helper_status?t=json", followRedirect=True))
1255 def _got_helper_status_json(res):
1256 data = simplejson.loads(res)
1257 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1259 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1260 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1261 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1263 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1264 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1265 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1267 d.addCallback(_got_helper_status_json)
1269 # and check that client[3] (which uses a helper but does not run one
1270 # itself) doesn't explode when you ask for its status
1271 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1272 def _got_non_helper_status(res):
1273 self.failUnless("Upload and Download Status" in res)
1274 d.addCallback(_got_non_helper_status)
1276 # or for helper status with t=json
1277 d.addCallback(lambda res:
1278 getPage(self.helper_webish_url + "helper_status?t=json"))
1279 def _got_non_helper_status_json(res):
1280 data = simplejson.loads(res)
1281 self.failUnlessEqual(data, {})
1282 d.addCallback(_got_non_helper_status_json)
1284 # see if the statistics page exists
1285 d.addCallback(lambda res: self.GET("statistics"))
1286 def _got_stats(res):
1287 self.failUnless("Node Statistics" in res)
1288 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1289 d.addCallback(_got_stats)
1290 d.addCallback(lambda res: self.GET("statistics?t=json"))
1291 def _got_stats_json(res):
1292 data = simplejson.loads(res)
1293 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1294 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1295 d.addCallback(_got_stats_json)
1297 # TODO: mangle the second segment of a file, to test errors that
1298 # occur after we've already sent some good data, which uses a
1299 # different error path.
1301 # TODO: download a URI with a form
1302 # TODO: create a directory by using a form
1303 # TODO: upload by using a form on the directory page
1304 # url = base + "somedir/subdir1/freeform_post!!upload"
1305 # TODO: delete a file by using a button on the directory page
1309 def _test_runner(self, res):
1310 # exercise some of the diagnostic tools in runner.py
1313 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1314 if "storage" not in dirpath:
1318 pieces = dirpath.split(os.sep)
1319 if (len(pieces) >= 4
1320 and pieces[-4] == "storage"
1321 and pieces[-3] == "shares"):
1322 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1323 # are sharefiles here
1324 filename = os.path.join(dirpath, filenames[0])
1325 # peek at the magic to see if it is a chk share
1326 magic = open(filename, "rb").read(4)
1327 if magic == '\x00\x00\x00\x01':
1330 self.fail("unable to find any uri_extension files in %s"
1332 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1334 out,err = StringIO(), StringIO()
1335 rc = runner.runner(["debug", "dump-share", "--offsets",
1337 stdout=out, stderr=err)
1338 output = out.getvalue()
1339 self.failUnlessEqual(rc, 0)
1341 # we only upload a single file, so we can assert some things about
1342 # its size and shares.
1343 self.failUnless(("share filename: %s" % filename) in output)
1344 self.failUnless("size: %d\n" % len(self.data) in output)
1345 self.failUnless("num_segments: 1\n" in output)
1346 # segment_size is always a multiple of needed_shares
1347 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1348 self.failUnless("total_shares: 10\n" in output)
1349 # keys which are supposed to be present
1350 for key in ("size", "num_segments", "segment_size",
1351 "needed_shares", "total_shares",
1352 "codec_name", "codec_params", "tail_codec_params",
1353 #"plaintext_hash", "plaintext_root_hash",
1354 "crypttext_hash", "crypttext_root_hash",
1355 "share_root_hash", "UEB_hash"):
1356 self.failUnless("%s: " % key in output, key)
1357 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1359 # now use its storage index to find the other shares using the
1360 # 'find-shares' tool
1361 sharedir, shnum = os.path.split(filename)
1362 storagedir, storage_index_s = os.path.split(sharedir)
1363 out,err = StringIO(), StringIO()
1364 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1365 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1366 rc = runner.runner(cmd, stdout=out, stderr=err)
1367 self.failUnlessEqual(rc, 0)
1369 sharefiles = [sfn.strip() for sfn in out.readlines()]
1370 self.failUnlessEqual(len(sharefiles), 10)
1372 # also exercise the 'catalog-shares' tool
1373 out,err = StringIO(), StringIO()
1374 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1375 cmd = ["debug", "catalog-shares"] + nodedirs
1376 rc = runner.runner(cmd, stdout=out, stderr=err)
1377 self.failUnlessEqual(rc, 0)
1379 descriptions = [sfn.strip() for sfn in out.readlines()]
1380 self.failUnlessEqual(len(descriptions), 30)
1382 for line in descriptions
1383 if line.startswith("CHK %s " % storage_index_s)]
1384 self.failUnlessEqual(len(matching), 10)
1386 def _test_control(self, res):
1387 # exercise the remote-control-the-client foolscap interfaces in
1388 # allmydata.control (mostly used for performance tests)
1389 c0 = self.clients[0]
1390 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1391 control_furl = open(control_furl_file, "r").read().strip()
1392 # it doesn't really matter which Tub we use to connect to the client,
1393 # so let's just use our IntroducerNode's
1394 d = self.introducer.tub.getReference(control_furl)
1395 d.addCallback(self._test_control2, control_furl_file)
1397 def _test_control2(self, rref, filename):
1398 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1399 downfile = os.path.join(self.basedir, "control.downfile")
1400 d.addCallback(lambda uri:
1401 rref.callRemote("download_from_uri_to_file",
1404 self.failUnlessEqual(res, downfile)
1405 data = open(downfile, "r").read()
1406 expected_data = open(filename, "r").read()
1407 self.failUnlessEqual(data, expected_data)
1408 d.addCallback(_check)
1409 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1410 if sys.platform == "linux2":
1411 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1412 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1415 def _test_cli(self, res):
1416 # run various CLI commands (in a thread, since they use blocking
1419 private_uri = self._private_node.get_uri()
1420 some_uri = self._root_directory_uri
1421 client0_basedir = self.getdir("client0")
1424 "--node-directory", client0_basedir,
1426 TESTDATA = "I will not write the same thing over and over.\n" * 100
1428 d = defer.succeed(None)
1430 # for compatibility with earlier versions, private/root_dir.cap is
1431 # supposed to be treated as an alias named "tahoe:". Start by making
1432 # sure that works, before we add other aliases.
1434 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1435 f = open(root_file, "w")
1436 f.write(private_uri)
1439 def run(ignored, verb, *args, **kwargs):
1440 stdin = kwargs.get("stdin", "")
1441 newargs = [verb] + nodeargs + list(args)
1442 return self._run_cli(newargs, stdin=stdin)
1444 def _check_ls((out,err), expected_children, unexpected_children=[]):
1445 self.failUnlessEqual(err, "")
1446 for s in expected_children:
1447 self.failUnless(s in out, (s,out))
1448 for s in unexpected_children:
1449 self.failIf(s in out, (s,out))
1451 def _check_ls_root((out,err)):
1452 self.failUnless("personal" in out)
1453 self.failUnless("s2-ro" in out)
1454 self.failUnless("s2-rw" in out)
1455 self.failUnlessEqual(err, "")
1457 # this should reference private_uri
1458 d.addCallback(run, "ls")
1459 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1461 d.addCallback(run, "list-aliases")
1462 def _check_aliases_1((out,err)):
1463 self.failUnlessEqual(err, "")
1464 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1465 d.addCallback(_check_aliases_1)
1467 # now that that's out of the way, remove root_dir.cap and work with
1469 d.addCallback(lambda res: os.unlink(root_file))
1470 d.addCallback(run, "list-aliases")
1471 def _check_aliases_2((out,err)):
1472 self.failUnlessEqual(err, "")
1473 self.failUnlessEqual(out, "")
1474 d.addCallback(_check_aliases_2)
1476 d.addCallback(run, "mkdir")
1477 def _got_dir( (out,err) ):
1478 self.failUnless(uri.from_string_dirnode(out.strip()))
1480 d.addCallback(_got_dir)
1481 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1483 d.addCallback(run, "list-aliases")
1484 def _check_aliases_3((out,err)):
1485 self.failUnlessEqual(err, "")
1486 self.failUnless("tahoe: " in out)
1487 d.addCallback(_check_aliases_3)
1489 def _check_empty_dir((out,err)):
1490 self.failUnlessEqual(out, "")
1491 self.failUnlessEqual(err, "")
1492 d.addCallback(run, "ls")
1493 d.addCallback(_check_empty_dir)
1495 def _check_missing_dir((out,err)):
1496 # TODO: check that rc==2
1497 self.failUnlessEqual(out, "")
1498 self.failUnlessEqual(err, "No such file or directory\n")
1499 d.addCallback(run, "ls", "bogus")
1500 d.addCallback(_check_missing_dir)
1505 fn = os.path.join(self.basedir, "file%d" % i)
1507 data = "data to be uploaded: file%d\n" % i
1509 open(fn,"wb").write(data)
1511 def _check_stdout_against((out,err), filenum=None, data=None):
1512 self.failUnlessEqual(err, "")
1513 if filenum is not None:
1514 self.failUnlessEqual(out, datas[filenum])
1515 if data is not None:
1516 self.failUnlessEqual(out, data)
1518 # test all both forms of put: from a file, and from stdin
1520 d.addCallback(run, "put", files[0], "tahoe-file0")
1521 def _put_out((out,err)):
1522 self.failUnless("URI:LIT:" in out, out)
1523 self.failUnless("201 Created" in err, err)
1525 return run(None, "get", uri0)
1526 d.addCallback(_put_out)
1527 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1529 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1530 # tahoe put bar tahoe:FOO
1531 d.addCallback(run, "put", files[2], "tahoe:file2")
1532 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1533 def _check_put_mutable((out,err)):
1534 self._mutable_file3_uri = out.strip()
1535 d.addCallback(_check_put_mutable)
1536 d.addCallback(run, "get", "tahoe:file3")
1537 d.addCallback(_check_stdout_against, 3)
1540 STDIN_DATA = "This is the file to upload from stdin."
1541 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1542 # tahoe put tahoe:FOO
1543 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1544 stdin="Other file from stdin.")
1546 d.addCallback(run, "ls")
1547 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1548 "tahoe-file-stdin", "from-stdin"])
1549 d.addCallback(run, "ls", "subdir")
1550 d.addCallback(_check_ls, ["tahoe-file1"])
1553 d.addCallback(run, "mkdir", "subdir2")
1554 d.addCallback(run, "ls")
1555 # TODO: extract the URI, set an alias with it
1556 d.addCallback(_check_ls, ["subdir2"])
1558 # tahoe get: (to stdin and to a file)
1559 d.addCallback(run, "get", "tahoe-file0")
1560 d.addCallback(_check_stdout_against, 0)
1561 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1562 d.addCallback(_check_stdout_against, 1)
1563 outfile0 = os.path.join(self.basedir, "outfile0")
1564 d.addCallback(run, "get", "file2", outfile0)
1565 def _check_outfile0((out,err)):
1566 data = open(outfile0,"rb").read()
1567 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1568 d.addCallback(_check_outfile0)
1569 outfile1 = os.path.join(self.basedir, "outfile0")
1570 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1571 def _check_outfile1((out,err)):
1572 data = open(outfile1,"rb").read()
1573 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1574 d.addCallback(_check_outfile1)
1576 d.addCallback(run, "rm", "tahoe-file0")
1577 d.addCallback(run, "rm", "tahoe:file2")
1578 d.addCallback(run, "ls")
1579 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1581 d.addCallback(run, "ls", "-l")
1582 def _check_ls_l((out,err)):
1583 lines = out.split("\n")
1585 if "tahoe-file-stdin" in l:
1586 self.failUnless(l.startswith("-r-- "), l)
1587 self.failUnless(" %d " % len(STDIN_DATA) in l)
1589 self.failUnless(l.startswith("-rw- "), l) # mutable
1590 d.addCallback(_check_ls_l)
1592 d.addCallback(run, "ls", "--uri")
1593 def _check_ls_uri((out,err)):
1594 lines = out.split("\n")
1597 self.failUnless(self._mutable_file3_uri in l)
1598 d.addCallback(_check_ls_uri)
1600 d.addCallback(run, "ls", "--readonly-uri")
1601 def _check_ls_rouri((out,err)):
1602 lines = out.split("\n")
1605 rw_uri = self._mutable_file3_uri
1606 u = uri.from_string_mutable_filenode(rw_uri)
1607 ro_uri = u.get_readonly().to_string()
1608 self.failUnless(ro_uri in l)
1609 d.addCallback(_check_ls_rouri)
1612 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1613 d.addCallback(run, "ls")
1614 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1616 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1617 d.addCallback(run, "ls")
1618 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1620 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1621 d.addCallback(run, "ls")
1622 d.addCallback(_check_ls, ["file3", "file3-copy"])
1623 d.addCallback(run, "get", "tahoe:file3-copy")
1624 d.addCallback(_check_stdout_against, 3)
1626 # copy from disk into tahoe
1627 d.addCallback(run, "cp", files[4], "tahoe:file4")
1628 d.addCallback(run, "ls")
1629 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1630 d.addCallback(run, "get", "tahoe:file4")
1631 d.addCallback(_check_stdout_against, 4)
1633 # copy from tahoe into disk
1634 target_filename = os.path.join(self.basedir, "file-out")
1635 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1636 def _check_cp_out((out,err)):
1637 self.failUnless(os.path.exists(target_filename))
1638 got = open(target_filename,"rb").read()
1639 self.failUnlessEqual(got, datas[4])
1640 d.addCallback(_check_cp_out)
1642 # copy from disk to disk (silly case)
1643 target2_filename = os.path.join(self.basedir, "file-out-copy")
1644 d.addCallback(run, "cp", target_filename, target2_filename)
1645 def _check_cp_out2((out,err)):
1646 self.failUnless(os.path.exists(target2_filename))
1647 got = open(target2_filename,"rb").read()
1648 self.failUnlessEqual(got, datas[4])
1649 d.addCallback(_check_cp_out2)
1651 # copy from tahoe into disk, overwriting an existing file
1652 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1653 def _check_cp_out3((out,err)):
1654 self.failUnless(os.path.exists(target_filename))
1655 got = open(target_filename,"rb").read()
1656 self.failUnlessEqual(got, datas[3])
1657 d.addCallback(_check_cp_out3)
1659 # copy from disk into tahoe, overwriting an existing immutable file
1660 d.addCallback(run, "cp", files[5], "tahoe:file4")
1661 d.addCallback(run, "ls")
1662 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1663 d.addCallback(run, "get", "tahoe:file4")
1664 d.addCallback(_check_stdout_against, 5)
1666 # copy from disk into tahoe, overwriting an existing mutable file
1667 d.addCallback(run, "cp", files[5], "tahoe:file3")
1668 d.addCallback(run, "ls")
1669 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1670 d.addCallback(run, "get", "tahoe:file3")
1671 d.addCallback(_check_stdout_against, 5)
1673 # recursive copy: setup
1674 dn = os.path.join(self.basedir, "dir1")
1676 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1677 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1678 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1679 sdn2 = os.path.join(dn, "subdir2")
1681 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1682 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1684 # from disk into tahoe
1685 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1686 d.addCallback(run, "ls")
1687 d.addCallback(_check_ls, ["dir1"])
1688 d.addCallback(run, "ls", "dir1")
1689 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1690 ["rfile4", "rfile5"])
1691 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1692 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1693 ["rfile1", "rfile2", "rfile3"])
1694 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1695 d.addCallback(_check_stdout_against, data="rfile4")
1697 # and back out again
1698 dn_copy = os.path.join(self.basedir, "dir1-copy")
1699 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1700 def _check_cp_r_out((out,err)):
1702 old = open(os.path.join(dn, name), "rb").read()
1703 newfn = os.path.join(dn_copy, name)
1704 self.failUnless(os.path.exists(newfn))
1705 new = open(newfn, "rb").read()
1706 self.failUnlessEqual(old, new)
1710 _cmp(os.path.join("subdir2", "rfile4"))
1711 _cmp(os.path.join("subdir2", "rfile5"))
1712 d.addCallback(_check_cp_r_out)
1714 # and copy it a second time, which ought to overwrite the same files
1715 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1717 # and again, only writing filecaps
1718 dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1719 d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1720 def _check_capsonly((out,err)):
1721 # these should all be LITs
1722 x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1723 y = uri.from_string_filenode(x)
1724 self.failUnlessEqual(y.data, "rfile4")
1725 d.addCallback(_check_capsonly)
1727 # and tahoe-to-tahoe
1728 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1729 d.addCallback(run, "ls")
1730 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1731 d.addCallback(run, "ls", "dir1-copy")
1732 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1733 ["rfile4", "rfile5"])
1734 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1735 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1736 ["rfile1", "rfile2", "rfile3"])
1737 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1738 d.addCallback(_check_stdout_against, data="rfile4")
1740 # and copy it a second time, which ought to overwrite the same files
1741 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1743 # tahoe_ls doesn't currently handle the error correctly: it tries to
1744 # JSON-parse a traceback.
1745 ## def _ls_missing(res):
1746 ## argv = ["ls"] + nodeargs + ["bogus"]
1747 ## return self._run_cli(argv)
1748 ## d.addCallback(_ls_missing)
1749 ## def _check_ls_missing((out,err)):
1752 ## self.failUnlessEqual(err, "")
1753 ## d.addCallback(_check_ls_missing)
1757 def _run_cli(self, argv, stdin=""):
1759 stdout, stderr = StringIO(), StringIO()
1760 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1761 stdin=StringIO(stdin),
1762 stdout=stdout, stderr=stderr)
1764 return stdout.getvalue(), stderr.getvalue()
1765 d.addCallback(_done)
1768 def _test_checker(self, res):
1769 ut = upload.Data("too big to be literal" * 200, convergence=None)
1770 d = self._personal_node.add_file(u"big file", ut)
1772 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1773 def _check_dirnode_results(r):
1774 self.failUnless(r.is_healthy())
1775 d.addCallback(_check_dirnode_results)
1776 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1777 d.addCallback(_check_dirnode_results)
1779 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1780 def _got_chk_filenode(n):
1781 self.failUnless(isinstance(n, filenode.FileNode))
1782 d = n.check(Monitor())
1783 def _check_filenode_results(r):
1784 self.failUnless(r.is_healthy())
1785 d.addCallback(_check_filenode_results)
1786 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1787 d.addCallback(_check_filenode_results)
1789 d.addCallback(_got_chk_filenode)
1791 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1792 def _got_lit_filenode(n):
1793 self.failUnless(isinstance(n, filenode.LiteralFileNode))
1794 d = n.check(Monitor())
1795 def _check_lit_filenode_results(r):
1796 self.failUnlessEqual(r, None)
1797 d.addCallback(_check_lit_filenode_results)
1798 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1799 d.addCallback(_check_lit_filenode_results)
1801 d.addCallback(_got_lit_filenode)