2 from base64 import b32encode
3 import os, sys, time, re
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.application import service
9 from allmydata import client, uri, download, upload, storage, mutable
10 from allmydata.introducer import IntroducerNode
11 from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
12 from allmydata.scripts import runner
13 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
14 from allmydata.mutable import NotMutableError
15 from foolscap.eventual import flushEventualQueue
16 from twisted.python import log
17 from twisted.python.failure import Failure
18 from twisted.web.client import getPage
19 from twisted.web.error import Error
21 def flush_but_dont_ignore(res):
22 d = flushEventualQueue()
29 This is some data to publish to the virtual drive, which needs to be large
30 enough to not fit inside a LIT uri.
33 class SystemTest(testutil.SignalMixin, unittest.TestCase):
36 self.sparent = service.MultiService()
37 self.sparent.startService()
39 log.msg("shutting down SystemTest services")
40 d = self.sparent.stopService()
41 d.addBoth(flush_but_dont_ignore)
44 def getdir(self, subdir):
45 return os.path.join(self.basedir, subdir)
47 def add_service(self, s):
48 s.setServiceParent(self.sparent)
51 def set_up_nodes(self, NUMCLIENTS=5, createprivdir=False):
52 self.numclients = NUMCLIENTS
53 self.createprivdir = createprivdir
54 iv_dir = self.getdir("introducer")
55 if not os.path.isdir(iv_dir):
56 fileutil.make_dirs(iv_dir)
57 iv = IntroducerNode(basedir=iv_dir)
58 self.introducer = self.add_service(iv)
59 d = self.introducer.when_tub_ready()
60 d.addCallback(self._set_up_nodes_2)
63 def _set_up_nodes_2(self, res):
65 self.introducer_furl = q.introducer_url
67 for i in range(self.numclients):
68 basedir = self.getdir("client%d" % i)
69 fileutil.make_dirs(basedir)
71 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
72 if self.createprivdir:
73 open(os.path.join(basedir, "my_private_dir.uri"), "w")
74 open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
75 c = self.add_service(client.Client(basedir=basedir))
76 self.clients.append(c)
78 d = self.wait_for_connections()
81 # now find out where the web port was
82 l = self.clients[0].getServiceNamed("webish").listener
83 port = l._port.getHost().port
84 self.webish_url = "http://localhost:%d/" % port
85 d.addCallback(_connected)
88 def add_extra_node(self, client_num):
89 # this node is *not* parented to our self.sparent, so we can shut it
90 # down separately from the rest, to exercise the connection-lost code
91 basedir = self.getdir("client%d" % client_num)
92 if not os.path.isdir(basedir):
93 fileutil.make_dirs(basedir)
94 open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
96 c = client.Client(basedir=basedir)
97 self.clients.append(c)
100 d = self.wait_for_connections()
101 d.addCallback(lambda res: c)
104 def wait_for_connections(self, ignored=None):
105 # TODO: replace this with something that takes a list of peerids and
106 # fires when they've all been heard from, instead of using a count
108 for c in self.clients:
109 if (not c.introducer_client or
110 len(list(c.get_all_peerids())) != self.numclients):
112 d.addCallback(self.wait_for_connections)
113 reactor.callLater(0.05, d.callback, None)
115 return defer.succeed(None)
117 def test_connections(self):
118 self.basedir = "system/SystemTest/test_connections"
119 d = self.set_up_nodes()
120 self.extra_node = None
121 d.addCallback(lambda res: self.add_extra_node(self.numclients))
122 def _check(extra_node):
123 self.extra_node = extra_node
124 for c in self.clients:
125 all_peerids = list(c.get_all_peerids())
126 self.failUnlessEqual(len(all_peerids), self.numclients+1)
127 permuted_peers = list(c.get_permuted_peers("a", True))
128 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
129 permuted_other_peers = list(c.get_permuted_peers("a", False))
130 self.failUnlessEqual(len(permuted_other_peers), self.numclients)
132 d.addCallback(_check)
133 def _shutdown_extra_node(res):
135 return self.extra_node.stopService()
137 d.addBoth(_shutdown_extra_node)
139 test_connections.timeout = 300
140 # test_connections is subsumed by test_upload_and_download, and takes
141 # quite a while to run on a slow machine (because of all the TLS
142 # connections that must be established). If we ever rework the introducer
143 # code to such an extent that we're not sure if it works anymore, we can
144 # reinstate this test until it does.
147 def test_upload_and_download(self):
148 self.basedir = "system/SystemTest/test_upload_and_download"
149 # we use 4000 bytes of data, which will result in about 400k written
150 # to disk among all our simulated nodes
151 DATA = "Some data to upload\n" * 200
152 d = self.set_up_nodes()
153 def _check_connections(res):
154 for c in self.clients:
155 all_peerids = list(c.get_all_peerids())
156 self.failUnlessEqual(len(all_peerids), self.numclients)
157 permuted_peers = list(c.get_permuted_peers("a", True))
158 self.failUnlessEqual(len(permuted_peers), self.numclients)
159 permuted_other_peers = list(c.get_permuted_peers("a", False))
160 self.failUnlessEqual(len(permuted_other_peers), self.numclients-1)
161 d.addCallback(_check_connections)
164 u = self.clients[0].getServiceNamed("uploader")
166 # we crank the max segsize down to 1024b for the duration of this
167 # test, so we can exercise multiple segments. It is important
168 # that this is not a multiple of the segment size, so that the
169 # tail segment is not the same length as the others. This actualy
170 # gets rounded up to 1025 to be a multiple of the number of
171 # required shares (since we use 25 out of 100 FEC).
172 options = {"max_segment_size": 1024}
173 d1 = u.upload_data(DATA, options)
175 d.addCallback(_do_upload)
176 def _upload_done(uri):
177 log.msg("upload finished: uri is %s" % (uri,))
179 dl = self.clients[1].getServiceNamed("downloader")
181 d.addCallback(_upload_done)
183 def _upload_again(res):
184 # upload again. This ought to be short-circuited, however with
185 # the way we currently generate URIs (i.e. because they include
186 # the roothash), we have to do all of the encoding work, and only
187 # get to save on the upload part.
188 log.msg("UPLOADING AGAIN")
189 options = {"max_segment_size": 1024}
190 d1 = self.uploader.upload_data(DATA, options)
191 d.addCallback(_upload_again)
193 def _download_to_data(res):
194 log.msg("DOWNLOADING")
195 return self.downloader.download_to_data(self.uri)
196 d.addCallback(_download_to_data)
197 def _download_to_data_done(data):
198 log.msg("download finished")
199 self.failUnlessEqual(data, DATA)
200 d.addCallback(_download_to_data_done)
202 target_filename = os.path.join(self.basedir, "download.target")
203 def _download_to_filename(res):
204 return self.downloader.download_to_filename(self.uri,
206 d.addCallback(_download_to_filename)
207 def _download_to_filename_done(res):
208 newdata = open(target_filename, "rb").read()
209 self.failUnlessEqual(newdata, DATA)
210 d.addCallback(_download_to_filename_done)
212 target_filename2 = os.path.join(self.basedir, "download.target2")
213 def _download_to_filehandle(res):
214 fh = open(target_filename2, "wb")
215 return self.downloader.download_to_filehandle(self.uri, fh)
216 d.addCallback(_download_to_filehandle)
217 def _download_to_filehandle_done(fh):
219 newdata = open(target_filename2, "rb").read()
220 self.failUnlessEqual(newdata, DATA)
221 d.addCallback(_download_to_filehandle_done)
223 def _download_nonexistent_uri(res):
224 baduri = self.mangle_uri(self.uri)
225 d1 = self.downloader.download_to_data(baduri)
226 def _baduri_should_fail(res):
227 self.failUnless(isinstance(res, Failure))
228 self.failUnless(res.check(download.NotEnoughPeersError),
229 "expected NotEnoughPeersError, got %s" % res)
230 # TODO: files that have zero peers should get a special kind
231 # of NotEnoughPeersError, which can be used to suggest that
232 # the URI might be wrong or that they've never uploaded the
233 # file in the first place.
234 d1.addBoth(_baduri_should_fail)
236 d.addCallback(_download_nonexistent_uri)
238 test_upload_and_download.timeout = 4800
240 def _find_shares(self, basedir):
242 for (dirpath, dirnames, filenames) in os.walk(basedir):
243 if "storage" not in dirpath:
247 pieces = dirpath.split(os.sep)
248 if pieces[-3] == "storage" and pieces[-2] == "shares":
249 # we're sitting in .../storage/shares/$SINDEX , and there
250 # are sharefiles here
251 assert pieces[-4].startswith("client")
252 client_num = int(pieces[-4][-1])
253 storage_index_s = pieces[-1]
254 storage_index = idlib.a2b(storage_index_s)
255 for sharename in filenames:
256 shnum = int(sharename)
257 filename = os.path.join(dirpath, sharename)
258 data = (client_num, storage_index, filename, shnum)
261 self.fail("unable to find any share files in %s" % basedir)
264 def _corrupt_mutable_share(self, filename, which):
265 msf = storage.MutableShareFile(filename)
266 datav = msf.readv([ (0, 1000000) ])
267 final_share = datav[0]
268 assert len(final_share) < 1000000 # ought to be truncated
269 pieces = mutable.unpack_share(final_share)
270 (seqnum, root_hash, IV, k, N, segsize, datalen,
271 verification_key, signature, share_hash_chain, block_hash_tree,
272 share_data, enc_privkey) = pieces
274 if which == "seqnum":
277 root_hash = self.flip_bit(root_hash)
279 IV = self.flip_bit(IV)
280 elif which == "segsize":
281 segsize = segsize + 15
282 elif which == "pubkey":
283 verification_key = self.flip_bit(verification_key)
284 elif which == "signature":
285 signature = self.flip_bit(signature)
286 elif which == "share_hash_chain":
287 nodenum = share_hash_chain.keys()[0]
288 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
289 elif which == "block_hash_tree":
290 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
291 elif which == "share_data":
292 share_data = self.flip_bit(share_data)
293 elif which == "encprivkey":
294 enc_privkey = self.flip_bit(enc_privkey)
296 prefix = mutable.pack_prefix(seqnum, root_hash, IV, k, N,
298 final_share = mutable.pack_share(prefix,
305 msf.writev( [(0, final_share)], None)
307 def test_mutable(self):
308 self.basedir = "system/SystemTest/test_mutable"
309 DATA = "initial contents go here." # 25 bytes % 3 != 0
310 NEWDATA = "new contents yay"
311 NEWERDATA = "this is getting old"
313 d = self.set_up_nodes()
315 def _create_mutable(res):
317 log.msg("starting create_mutable_file")
318 d1 = c.create_mutable_file(DATA, wait_for_numpeers=self.numclients)
320 log.msg("DONE: %s" % (res,))
321 self._mutable_node_1 = res
323 d1.addCallback(_done)
325 d.addCallback(_create_mutable)
327 def _test_debug(res):
328 # find a share. It is important to run this while there is only
329 # one slot in the grid.
330 shares = self._find_shares(self.basedir)
331 (client_num, storage_index, filename, shnum) = shares[0]
332 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
334 log.msg(" for clients[%d]" % client_num)
336 out,err = StringIO(), StringIO()
337 rc = runner.runner(["dump-share",
339 stdout=out, stderr=err)
340 output = out.getvalue()
341 self.failUnlessEqual(rc, 0)
343 self.failUnless("Mutable slot found:\n" in output)
344 self.failUnless("share_type: SDMF\n" in output)
345 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
346 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
347 self.failUnless(" num_extra_leases: 0\n" in output)
348 # the pubkey size can vary by a byte, so the container might
349 # be a bit larger on some runs.
350 m = re.search(r'^ container_size: (\d+)$', output, re.M)
352 container_size = int(m.group(1))
353 self.failUnless(2037 <= container_size <= 2049, container_size)
354 m = re.search(r'^ data_length: (\d+)$', output, re.M)
356 data_length = int(m.group(1))
357 self.failUnless(2037 <= data_length <= 2049, data_length)
358 self.failUnless(" secrets are for nodeid: %s\n" % peerid
360 self.failUnless(" SDMF contents:\n" in output)
361 self.failUnless(" seqnum: 1\n" in output)
362 self.failUnless(" required_shares: 3\n" in output)
363 self.failUnless(" total_shares: 10\n" in output)
364 self.failUnless(" segsize: 27\n" in output, (output, filename))
365 self.failUnless(" datalen: 25\n" in output)
366 # the exact share_hash_chain nodes depends upon the sharenum,
367 # and is more of a hassle to compute than I want to deal with
369 self.failUnless(" share_hash_chain: " in output)
370 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
371 except unittest.FailTest:
373 print "dump-share output was:"
376 d.addCallback(_test_debug)
380 # first, let's see if we can use the existing node to retrieve the
381 # contents. This allows it to use the cached pubkey and maybe the
382 # latest-known sharemap.
384 d.addCallback(lambda res: self._mutable_node_1.download_to_data())
385 def _check_download_1(res):
386 self.failUnlessEqual(res, DATA)
387 # now we see if we can retrieve the data from a new node,
388 # constructed using the URI of the original one. We do this test
389 # on the same client that uploaded the data.
390 uri = self._mutable_node_1.get_uri()
391 log.msg("starting retrieve1")
392 newnode = self.clients[0].create_node_from_uri(uri)
393 return newnode.download_to_data()
394 d.addCallback(_check_download_1)
396 def _check_download_2(res):
397 self.failUnlessEqual(res, DATA)
398 # same thing, but with a different client
399 uri = self._mutable_node_1.get_uri()
400 newnode = self.clients[1].create_node_from_uri(uri)
401 log.msg("starting retrieve2")
402 d1 = newnode.download_to_data()
403 d1.addCallback(lambda res: (res, newnode))
405 d.addCallback(_check_download_2)
407 def _check_download_3((res, newnode)):
408 self.failUnlessEqual(res, DATA)
410 log.msg("starting replace1")
411 d1 = newnode.replace(NEWDATA, wait_for_numpeers=self.numclients)
412 d1.addCallback(lambda res: newnode.download_to_data())
414 d.addCallback(_check_download_3)
416 def _check_download_4(res):
417 self.failUnlessEqual(res, NEWDATA)
418 # now create an even newer node and replace the data on it. This
419 # new node has never been used for download before.
420 uri = self._mutable_node_1.get_uri()
421 newnode1 = self.clients[2].create_node_from_uri(uri)
422 newnode2 = self.clients[3].create_node_from_uri(uri)
423 self._newnode3 = self.clients[3].create_node_from_uri(uri)
424 log.msg("starting replace2")
425 d1 = newnode1.replace(NEWERDATA, wait_for_numpeers=self.numclients)
426 d1.addCallback(lambda res: newnode2.download_to_data())
428 d.addCallback(_check_download_4)
430 def _check_download_5(res):
431 log.msg("finished replace2")
432 self.failUnlessEqual(res, NEWERDATA)
433 d.addCallback(_check_download_5)
435 def _corrupt_shares(res):
436 # run around and flip bits in all but k of the shares, to test
438 shares = self._find_shares(self.basedir)
439 ## sort by share number
440 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
441 where = dict([ (shnum, filename)
442 for (client_num, storage_index, filename, shnum)
444 assert len(where) == 10 # this test is designed for 3-of-10
445 for shnum, filename in where.items():
446 # shares 7,8,9 are left alone. read will check
447 # (share_hash_chain, block_hash_tree, share_data). New
448 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
449 # segsize, signature).
451 # read: this will trigger "pubkey doesn't match
453 self._corrupt_mutable_share(filename, "pubkey")
454 self._corrupt_mutable_share(filename, "encprivkey")
456 # triggers "signature is invalid"
457 self._corrupt_mutable_share(filename, "seqnum")
459 # triggers "signature is invalid"
460 self._corrupt_mutable_share(filename, "R")
462 # triggers "signature is invalid"
463 self._corrupt_mutable_share(filename, "segsize")
465 self._corrupt_mutable_share(filename, "share_hash_chain")
467 self._corrupt_mutable_share(filename, "block_hash_tree")
469 self._corrupt_mutable_share(filename, "share_data")
470 # other things to correct: IV, signature
471 # 7,8,9 are left alone
473 # note that initial_query_count=5 means that we'll hit the
474 # first 5 servers in effectively random order (based upon
475 # response time), so we won't necessarily ever get a "pubkey
476 # doesn't match fingerprint" error (if we hit shnum>=1 before
477 # shnum=0, we pull the pubkey from there). To get repeatable
478 # specific failures, we need to set initial_query_count=1,
479 # but of course that will change the sequencing behavior of
480 # the retrieval process. TODO: find a reasonable way to make
481 # this a parameter, probably when we expand this test to test
482 # for one failure mode at a time.
484 # when we retrieve this, we should get three signature
485 # failures (where we've mangled seqnum, R, and segsize). The
487 d.addCallback(_corrupt_shares)
489 d.addCallback(lambda res: self._newnode3.download_to_data())
490 d.addCallback(_check_download_5)
492 def _check_empty_file(res):
493 # make sure we can create empty files, this usually screws up the
495 d1 = self.clients[2].create_mutable_file("", wait_for_numpeers=self.numclients)
496 d1.addCallback(lambda newnode: newnode.download_to_data())
497 d1.addCallback(lambda res: self.failUnlessEqual("", res))
499 d.addCallback(_check_empty_file)
501 d.addCallback(lambda res: self.clients[0].create_empty_dirnode(wait_for_numpeers=self.numclients))
502 def _created_dirnode(dnode):
503 log.msg("_created_dirnode(%s)" % (dnode,))
505 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
506 d1.addCallback(lambda res: dnode.has_child("edgar"))
507 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
508 d1.addCallback(lambda res: dnode.set_node("see recursive", dnode, wait_for_numpeers=self.numclients))
509 d1.addCallback(lambda res: dnode.has_child("see recursive"))
510 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
511 d1.addCallback(lambda res: dnode.build_manifest())
512 d1.addCallback(lambda manifest:
513 self.failUnlessEqual(len(manifest), 1))
515 d.addCallback(_created_dirnode)
518 # The default 120 second timeout went off when running it under valgrind
519 # on my old Windows laptop, so I'm bumping up the timeout.
520 test_mutable.timeout = 240
522 def flip_bit(self, good):
523 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
525 def mangle_uri(self, gooduri):
526 # change the key, which changes the storage index, which means we'll
527 # be asking about the wrong file, so nobody will have any shares
528 u = IFileURI(gooduri)
529 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
530 uri_extension_hash=u.uri_extension_hash,
531 needed_shares=u.needed_shares,
532 total_shares=u.total_shares,
534 return u2.to_string()
536 # TODO: add a test which mangles the uri_extension_hash instead, and
537 # should fail due to not being able to get a valid uri_extension block.
538 # Also a test which sneakily mangles the uri_extension block to change
539 # some of the validation data, so it will fail in the post-download phase
540 # when the file's crypttext integrity check fails. Do the same thing for
541 # the key, which should cause the download to fail the post-download
542 # plaintext_hash check.
544 def test_vdrive(self):
545 self.basedir = "system/SystemTest/test_vdrive"
546 self.data = LARGE_DATA
547 d = self.set_up_nodes(createprivdir=True)
548 d.addCallback(self.log, "starting publish")
549 d.addCallback(self._do_publish1)
550 d.addCallback(self._test_runner)
551 d.addCallback(self._do_publish2)
552 # at this point, we have the following filesystem (where "R" denotes
553 # self._root_directory_uri):
556 # R/subdir1/mydata567
558 # R/subdir1/subdir2/mydata992
560 d.addCallback(self._bounce_client0)
561 d.addCallback(self.log, "bounced client0")
563 d.addCallback(self._check_publish1)
564 d.addCallback(self.log, "did _check_publish1")
565 d.addCallback(self._check_publish2)
566 d.addCallback(self.log, "did _check_publish2")
567 d.addCallback(self._do_publish_private)
568 d.addCallback(self.log, "did _do_publish_private")
569 # now we also have (where "P" denotes clients[0]'s automatic private
571 # P/personal/sekrit data
572 # P/s2-rw -> /subdir1/subdir2/
573 # P/s2-ro -> /subdir1/subdir2/ (read-only)
574 d.addCallback(self._check_publish_private)
575 d.addCallback(self.log, "did _check_publish_private")
576 d.addCallback(self._test_web)
577 d.addCallback(self._test_web_start)
578 d.addCallback(self._test_control)
579 d.addCallback(self._test_cli)
580 # P now has four top-level children:
581 # P/personal/sekrit data
584 # P/test_put/ (empty)
585 d.addCallback(self._test_checker)
586 d.addCallback(self._test_verifier)
588 test_vdrive.timeout = 1100
590 def _do_publish1(self, res):
591 ut = upload.Data(self.data)
593 d = c0.create_empty_dirnode(wait_for_numpeers=self.numclients)
594 def _made_root(new_dirnode):
595 log.msg("ZZZ %s -> %s" % (hasattr(self, '_root_directory_uri') and self._root_directory_uri, new_dirnode.get_uri(),))
596 self._root_directory_uri = new_dirnode.get_uri()
597 return c0.create_node_from_uri(self._root_directory_uri)
598 d.addCallback(_made_root)
599 d.addCallback(lambda root: root.create_empty_directory("subdir1", wait_for_numpeers=self.numclients))
600 def _made_subdir1(subdir1_node):
601 self._subdir1_node = subdir1_node
602 d1 = subdir1_node.add_file("mydata567", ut, wait_for_numpeers=self.numclients)
603 d1.addCallback(self.log, "publish finished")
604 def _stash_uri(filenode):
605 self.uri = filenode.get_uri()
606 d1.addCallback(_stash_uri)
608 d.addCallback(_made_subdir1)
611 def _do_publish2(self, res):
612 ut = upload.Data(self.data)
613 d = self._subdir1_node.create_empty_directory("subdir2", wait_for_numpeers=self.numclients)
614 d.addCallback(lambda subdir2: subdir2.add_file("mydata992", ut, wait_for_numpeers=self.numclients))
617 def _bounce_client0(self, res):
618 old_client0 = self.clients[0]
619 d = old_client0.disownServiceParent()
620 assert isinstance(d, defer.Deferred)
621 d.addCallback(self.log, "STOPPED")
622 # I think windows requires a moment to let the connection really stop
623 # and the port number made available for re-use. TODO: examine the
624 # behavior, see if this is really the problem, see if we can do
625 # better than blindly waiting for a second.
626 d.addCallback(self.stall, 1.0)
628 new_client0 = client.Client(basedir=self.getdir("client0"))
629 self.add_service(new_client0)
630 self.clients[0] = new_client0
631 return self.wait_for_connections()
632 d.addCallback(_stopped)
633 d.addCallback(self.log, "CONNECTED")
635 # now find out where the web port was
636 l = self.clients[0].getServiceNamed("webish").listener
637 port = l._port.getHost().port
638 self.webish_url = "http://localhost:%d/" % port
639 d.addCallback(_connected)
640 d.addCallback(self.log, "GOT WEB LISTENER")
643 def log(self, res, msg):
644 # print "MSG: %s RES: %s" % (msg, res)
648 def stall(self, res, delay=1.0):
650 reactor.callLater(delay, d.callback, res)
653 def _do_publish_private(self, res):
654 self.smalldata = "sssh, very secret stuff"
655 ut = upload.Data(self.smalldata)
656 d = self.clients[0].get_private_uri()
657 d.addCallback(self.log, "GOT private directory")
658 def _got_root_uri(privuri):
660 privnode = self.clients[0].create_node_from_uri(privuri)
661 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
662 d1 = privnode.create_empty_directory("personal", wait_for_numpeers=self.numclients)
663 d1.addCallback(self.log, "made P/personal")
664 d1.addCallback(lambda node: node.add_file("sekrit data", ut, wait_for_numpeers=self.numclients))
665 d1.addCallback(self.log, "made P/personal/sekrit data")
666 d1.addCallback(lambda res: rootnode.get_child_at_path(["subdir1", "subdir2"]))
668 d2 = privnode.set_uri("s2-rw", s2node.get_uri(), wait_for_numpeers=self.numclients)
669 d2.addCallback(lambda node: privnode.set_uri("s2-ro", s2node.get_readonly_uri(), wait_for_numpeers=self.numclients))
671 d1.addCallback(_got_s2)
673 d.addCallback(_got_root_uri)
676 def _check_publish1(self, res):
677 # this one uses the iterative API
679 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
680 d.addCallback(self.log, "check_publish1 got /")
681 d.addCallback(lambda root: root.get("subdir1"))
682 d.addCallback(lambda subdir1: subdir1.get("mydata567"))
683 d.addCallback(lambda filenode: filenode.download_to_data())
684 d.addCallback(self.log, "get finished")
686 self.failUnlessEqual(data, self.data)
687 d.addCallback(_get_done)
690 def _check_publish2(self, res):
691 # this one uses the path-based API
692 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
693 d = rootnode.get_child_at_path("subdir1")
694 d.addCallback(lambda dirnode:
695 self.failUnless(IDirectoryNode.providedBy(dirnode)))
696 d.addCallback(lambda res: rootnode.get_child_at_path("subdir1/mydata567"))
697 d.addCallback(lambda filenode: filenode.download_to_data())
698 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
700 d.addCallback(lambda res: rootnode.get_child_at_path("subdir1/mydata567"))
701 def _got_filenode(filenode):
702 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
703 assert fnode == filenode
704 d.addCallback(_got_filenode)
707 def _check_publish_private(self, res):
708 # this one uses the path-based API
709 d = self.clients[0].get_private_uri()
710 def _got_private_uri(privateuri):
711 self._private_node = self.clients[0].create_node_from_uri(privateuri)
712 d.addCallback(_got_private_uri)
714 d.addCallback(lambda res: self._private_node.get_child_at_path("personal"))
715 def _got_personal(personal):
716 self._personal_node = personal
718 d.addCallback(_got_personal)
720 d.addCallback(lambda dirnode:
721 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
723 return self._private_node.get_child_at_path(path)
725 d.addCallback(lambda res: get_path("personal/sekrit data"))
726 d.addCallback(lambda filenode: filenode.download_to_data())
727 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
728 d.addCallback(lambda res: get_path("s2-rw"))
729 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
730 d.addCallback(lambda res: get_path("s2-ro"))
731 def _got_s2ro(dirnode):
732 self.failUnless(dirnode.is_mutable(), dirnode)
733 self.failUnless(dirnode.is_readonly(), dirnode)
734 d1 = defer.succeed(None)
735 d1.addCallback(lambda res: dirnode.list())
736 d1.addCallback(self.log, "dirnode.list")
738 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, "nope"))
740 d1.addCallback(self.log, "doing add_file(ro)")
741 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.")
742 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, "hope", ut))
744 d1.addCallback(self.log, "doing get(ro)")
745 d1.addCallback(lambda res: dirnode.get("mydata992"))
746 d1.addCallback(lambda filenode:
747 self.failUnless(IFileNode.providedBy(filenode)))
749 d1.addCallback(self.log, "doing delete(ro)")
750 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, "mydata992"))
752 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, "hopeless", self.uri))
754 d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, "missing"))
756 personal = self._personal_node
757 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, "mydata992", personal, "nope"))
759 d1.addCallback(self.log, "doing move_child_to(ro)2")
760 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, "sekrit data", dirnode, "nope"))
762 d1.addCallback(self.log, "finished with _got_s2ro")
764 d.addCallback(_got_s2ro)
765 def _got_home(dummy):
766 home = self._private_node
767 personal = self._personal_node
768 d1 = defer.succeed(None)
769 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
770 d1.addCallback(lambda res:
771 personal.move_child_to("sekrit data",home,"sekrit"))
773 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
774 d1.addCallback(lambda res:
775 home.move_child_to("sekrit", home, "sekrit data"))
777 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
778 d1.addCallback(lambda res:
779 home.move_child_to("sekrit data", personal))
781 d1.addCallback(lambda res: home.build_manifest())
782 d1.addCallback(self.log, "manifest")
785 # P/personal/sekrit data
786 # P/s2-rw (same as P/s2-ro)
787 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
788 d1.addCallback(lambda manifest:
789 self.failUnlessEqual(len(manifest), 4))
791 d.addCallback(_got_home)
794 def shouldFail(self, res, expected_failure, which, substring=None):
795 if isinstance(res, Failure):
796 res.trap(expected_failure)
798 self.failUnless(substring in str(res),
799 "substring '%s' not in '%s'"
800 % (substring, str(res)))
802 self.fail("%s was supposed to raise %s, not get '%s'" %
803 (which, expected_failure, res))
805 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
806 assert substring is None or isinstance(substring, str)
807 d = defer.maybeDeferred(callable, *args, **kwargs)
809 if isinstance(res, Failure):
810 res.trap(expected_failure)
812 self.failUnless(substring in str(res),
813 "substring '%s' not in '%s'"
814 % (substring, str(res)))
816 self.fail("%s was supposed to raise %s, not get '%s'" %
817 (which, expected_failure, res))
821 def PUT(self, urlpath, data):
822 url = self.webish_url + urlpath
823 return getPage(url, method="PUT", postdata=data)
825 def GET(self, urlpath, followRedirect=False):
826 url = self.webish_url + urlpath
827 return getPage(url, method="GET", followRedirect=followRedirect)
829 def _test_web(self, res):
830 base = self.webish_url
831 public = "uri/" + self._root_directory_uri.replace("/", "!")
833 def _got_welcome(page):
834 expected = "Connected Peers: <span>%d</span>" % (self.numclients)
835 self.failUnless(expected in page,
836 "I didn't see the right 'connected peers' message "
839 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
840 self.failUnless(expected in page,
841 "I didn't see the right 'My nodeid' message "
843 d.addCallback(_got_welcome)
844 d.addCallback(self.log, "done with _got_welcome")
845 d.addCallback(lambda res: getPage(base + public))
846 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
847 def _got_subdir1(page):
848 # there ought to be an href for our file
849 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
850 self.failUnless(">mydata567</a>" in page)
851 d.addCallback(_got_subdir1)
852 d.addCallback(self.log, "done with _got_subdir1")
853 d.addCallback(lambda res:
854 getPage(base + public + "/subdir1/mydata567"))
856 self.failUnlessEqual(page, self.data)
857 d.addCallback(_got_data)
859 # download from a URI embedded in a URL
860 d.addCallback(self.log, "_get_from_uri")
861 def _get_from_uri(res):
862 return getPage(base + "uri/%s?filename=%s"
863 % (self.uri, "mydata567"))
864 d.addCallback(_get_from_uri)
865 def _got_from_uri(page):
866 self.failUnlessEqual(page, self.data)
867 d.addCallback(_got_from_uri)
869 # download from a URI embedded in a URL, second form
870 d.addCallback(self.log, "_get_from_uri2")
871 def _get_from_uri2(res):
872 return getPage(base + "uri?uri=%s" % (self.uri,))
873 d.addCallback(_get_from_uri2)
874 d.addCallback(_got_from_uri)
876 # download from a bogus URI, make sure we get a reasonable error
877 d.addCallback(self.log, "_get_from_bogus_uri")
878 def _get_from_bogus_uri(res):
879 d1 = getPage(base + "uri/%s?filename=%s"
880 % (self.mangle_uri(self.uri), "mydata567"))
881 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
884 d.addCallback(_get_from_bogus_uri)
886 # upload a file with PUT
887 d.addCallback(self.log, "about to try PUT")
888 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
890 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
891 d.addCallback(self.failUnlessEqual, "new.txt contents")
892 # and again with something large enough to use multiple segments,
893 # and hopefully trigger pauseProducing too
894 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
895 "big" * 500000)) # 1.5MB
896 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
897 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
899 # can we replace files in place?
900 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
902 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
903 d.addCallback(self.failUnlessEqual, "NEWER contents")
906 # TODO: mangle the second segment of a file, to test errors that
907 # occur after we've already sent some good data, which uses a
908 # different error path.
910 # TODO: download a URI with a form
911 # TODO: create a directory by using a form
912 # TODO: upload by using a form on the directory page
913 # url = base + "somedir/subdir1/freeform_post!!upload"
914 # TODO: delete a file by using a button on the directory page
918 def _test_web_start(self, res):
919 basedir = self.clients[0].basedir
920 startfile = os.path.join(basedir, "start.html")
921 self.failUnless(os.path.exists(startfile))
922 start_html = open(startfile, "r").read()
923 self.failUnless(self.webish_url in start_html)
924 d = self.clients[0].get_private_uri()
925 def done(private_uri):
926 private_url = self.webish_url + "uri/" + private_uri.replace("/","!")
927 self.failUnless(private_url in start_html)
931 def _test_runner(self, res):
932 # exercise some of the diagnostic tools in runner.py
935 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
936 if "storage" not in dirpath:
940 pieces = dirpath.split(os.sep)
941 if pieces[-3] == "storage" and pieces[-2] == "shares":
942 # we're sitting in .../storage/shares/$SINDEX , and there are
944 filename = os.path.join(dirpath, filenames[0])
945 # peek at the magic to see if it is a chk share
946 magic = open(filename, "rb").read(4)
947 if magic == '\x00\x00\x00\x01':
950 self.fail("unable to find any uri_extension files in %s"
952 log.msg("test_system.SystemTest._test_runner using %s" % filename)
954 out,err = StringIO(), StringIO()
955 rc = runner.runner(["dump-share",
957 stdout=out, stderr=err)
958 output = out.getvalue()
959 self.failUnlessEqual(rc, 0)
961 # we only upload a single file, so we can assert some things about
962 # its size and shares.
963 self.failUnless("size: %d\n" % len(self.data) in output)
964 self.failUnless("num_segments: 1\n" in output)
965 # segment_size is always a multiple of needed_shares
966 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
967 self.failUnless("total_shares: 10\n" in output)
968 # keys which are supposed to be present
969 for key in ("size", "num_segments", "segment_size",
970 "needed_shares", "total_shares",
971 "codec_name", "codec_params", "tail_codec_params",
972 "plaintext_hash", "plaintext_root_hash",
973 "crypttext_hash", "crypttext_root_hash",
975 self.failUnless("%s: " % key in output, key)
977 def _test_control(self, res):
978 # exercise the remote-control-the-client foolscap interfaces in
979 # allmydata.control (mostly used for performance tests)
981 control_furl_file = os.path.join(c0.basedir, "control.furl")
982 control_furl = open(control_furl_file, "r").read().strip()
983 # it doesn't really matter which Tub we use to connect to the client,
984 # so let's just use our IntroducerNode's
985 d = self.introducer.tub.getReference(control_furl)
986 d.addCallback(self._test_control2, control_furl_file)
988 def _test_control2(self, rref, filename):
989 d = rref.callRemote("upload_from_file_to_uri", filename)
990 downfile = os.path.join(self.basedir, "control.downfile")
991 d.addCallback(lambda uri:
992 rref.callRemote("download_from_uri_to_file",
995 self.failUnlessEqual(res, downfile)
996 data = open(downfile, "r").read()
997 expected_data = open(filename, "r").read()
998 self.failUnlessEqual(data, expected_data)
999 d.addCallback(_check)
1000 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1001 if sys.platform == "linux2":
1002 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1003 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1006 def _test_cli(self, res):
1007 # run various CLI commands (in a thread, since they use blocking
1010 private_uri = self._private_node.get_uri()
1011 some_uri = self._root_directory_uri
1014 "--node-url", self.webish_url,
1015 "--root-uri", private_uri,
1018 "--node-url", self.webish_url,
1019 "--root-uri", some_uri,
1021 TESTDATA = "I will not write the same thing over and over.\n" * 100
1023 d = defer.succeed(None)
1026 argv = ["ls"] + nodeargs
1027 return self._run_cli(argv)
1028 d.addCallback(_ls_root)
1029 def _check_ls_root((out,err)):
1030 self.failUnless("personal" in out)
1031 self.failUnless("s2-ro" in out)
1032 self.failUnless("s2-rw" in out)
1033 self.failUnlessEqual(err, "")
1034 d.addCallback(_check_ls_root)
1036 def _ls_subdir(res):
1037 argv = ["ls"] + nodeargs + ["personal"]
1038 return self._run_cli(argv)
1039 d.addCallback(_ls_subdir)
1040 def _check_ls_subdir((out,err)):
1041 self.failUnless("sekrit data" in out)
1042 self.failUnlessEqual(err, "")
1043 d.addCallback(_check_ls_subdir)
1045 def _ls_public_subdir(res):
1046 argv = ["ls"] + public_nodeargs + ["subdir1"]
1047 return self._run_cli(argv)
1048 d.addCallback(_ls_public_subdir)
1049 def _check_ls_public_subdir((out,err)):
1050 self.failUnless("subdir2" in out)
1051 self.failUnless("mydata567" in out)
1052 self.failUnlessEqual(err, "")
1053 d.addCallback(_check_ls_public_subdir)
1056 argv = ["ls"] + public_nodeargs + ["subdir1/mydata567"]
1057 return self._run_cli(argv)
1058 d.addCallback(_ls_file)
1059 def _check_ls_file((out,err)):
1060 self.failUnlessEqual(out.strip(), "112 subdir1/mydata567")
1061 self.failUnlessEqual(err, "")
1062 d.addCallback(_check_ls_file)
1064 # tahoe_ls doesn't currently handle the error correctly: it tries to
1065 # JSON-parse a traceback.
1066 ## def _ls_missing(res):
1067 ## argv = ["ls"] + nodeargs + ["bogus"]
1068 ## return self._run_cli(argv)
1069 ## d.addCallback(_ls_missing)
1070 ## def _check_ls_missing((out,err)):
1073 ## self.failUnlessEqual(err, "")
1074 ## d.addCallback(_check_ls_missing)
1077 tdir = self.getdir("cli_put")
1078 fileutil.make_dirs(tdir)
1079 fn = os.path.join(tdir, "upload_me")
1083 argv = ["put"] + nodeargs + [fn, "test_put/upload.txt"]
1084 return self._run_cli(argv)
1086 def _check_put((out,err)):
1087 self.failUnless("200 OK" in out)
1088 self.failUnlessEqual(err, "")
1089 d = self._private_node.get_child_at_path("test_put/upload.txt")
1090 d.addCallback(lambda filenode: filenode.download_to_data())
1091 def _check_put2(res):
1092 self.failUnlessEqual(res, TESTDATA)
1093 d.addCallback(_check_put2)
1095 d.addCallback(_check_put)
1097 def _get_to_stdout(res):
1098 argv = ["get"] + nodeargs + ["test_put/upload.txt"]
1099 return self._run_cli(argv)
1100 d.addCallback(_get_to_stdout)
1101 def _check_get_to_stdout((out,err)):
1102 self.failUnlessEqual(out, TESTDATA)
1103 self.failUnlessEqual(err, "")
1104 d.addCallback(_check_get_to_stdout)
1106 get_to_file_target = self.basedir + "/get.downfile"
1107 def _get_to_file(res):
1108 argv = ["get"] + nodeargs + ["test_put/upload.txt",
1110 return self._run_cli(argv)
1111 d.addCallback(_get_to_file)
1112 def _check_get_to_file((out,err)):
1113 data = open(get_to_file_target, "rb").read()
1114 self.failUnlessEqual(data, TESTDATA)
1115 self.failUnlessEqual(out, "")
1116 self.failUnlessEqual(err, "test_put/upload.txt retrieved and written to system/SystemTest/test_vdrive/get.downfile\n")
1117 d.addCallback(_check_get_to_file)
1121 argv = ["mv"] + nodeargs + ["test_put/upload.txt",
1122 "test_put/moved.txt"]
1123 return self._run_cli(argv)
1125 def _check_mv((out,err)):
1126 self.failUnless("OK" in out)
1127 self.failUnlessEqual(err, "")
1128 d = self.shouldFail2(KeyError, "test_cli._check_rm", "'upload.txt'", self._private_node.get_child_at_path, "test_put/upload.txt")
1130 d.addCallback(lambda res:
1131 self._private_node.get_child_at_path("test_put/moved.txt"))
1132 d.addCallback(lambda filenode: filenode.download_to_data())
1133 def _check_mv2(res):
1134 self.failUnlessEqual(res, TESTDATA)
1135 d.addCallback(_check_mv2)
1137 d.addCallback(_check_mv)
1140 argv = ["rm"] + nodeargs + ["test_put/moved.txt"]
1141 return self._run_cli(argv)
1143 def _check_rm((out,err)):
1144 self.failUnless("200 OK" in out)
1145 self.failUnlessEqual(err, "")
1146 d = self.shouldFail2(KeyError, "test_cli._check_rm", "'moved.txt'", self._private_node.get_child_at_path, "test_put/moved.txt")
1148 d.addCallback(_check_rm)
1151 def _run_cli(self, argv):
1152 stdout, stderr = StringIO(), StringIO()
1153 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1154 stdout=stdout, stderr=stderr)
1156 return stdout.getvalue(), stderr.getvalue()
1157 d.addCallback(_done)
1160 def _test_checker(self, res):
1161 d = self._private_node.build_manifest()
1162 d.addCallback(self._test_checker_2)
1165 def _test_checker_2(self, manifest):
1166 checker1 = self.clients[1].getServiceNamed("checker")
1167 self.failUnlessEqual(checker1.checker_results_for(None), [])
1168 self.failUnlessEqual(checker1.checker_results_for(list(manifest)[0]),
1171 starting_time = time.time()
1173 dl.append(checker1.check(si))
1174 d = deferredutil.DeferredListShouldSucceed(dl)
1176 def _check_checker_results(res):
1179 self.failUnless(i is True)
1181 (needed, total, found, sharemap) = i
1182 self.failUnlessEqual(needed, 3)
1183 self.failUnlessEqual(total, 10)
1184 self.failUnlessEqual(found, total)
1185 self.failUnlessEqual(len(sharemap.keys()), 10)
1187 for shpeers in sharemap.values():
1188 peers.update(shpeers)
1189 self.failUnlessEqual(len(peers), self.numclients-1)
1190 d.addCallback(_check_checker_results)
1192 def _check_stored_results(res):
1193 finish_time = time.time()
1196 results = checker1.checker_results_for(si)
1198 # TODO: implement checker for mutable files and implement tests of that checker
1200 self.failUnlessEqual(len(results), 1)
1201 when, those_results = results[0]
1202 self.failUnless(isinstance(when, (int, float)))
1203 self.failUnless(starting_time <= when <= finish_time)
1204 all_results.append(those_results)
1205 _check_checker_results(all_results)
1206 d.addCallback(_check_stored_results)
1208 d.addCallback(self._test_checker_3)
1211 def _test_checker_3(self, res):
1212 # check one file, through FileNode.check()
1213 d = self._private_node.get_child_at_path("personal/sekrit data")
1214 d.addCallback(lambda n: n.check())
1215 def _checked(results):
1216 # 'sekrit data' is small, and fits in a LiteralFileNode, so
1217 # checking it is trivial and always returns True
1218 self.failUnlessEqual(results, True)
1219 d.addCallback(_checked)
1221 c0 = self.clients[1]
1222 n = c0.create_node_from_uri(self._root_directory_uri)
1223 d.addCallback(lambda res: n.get_child_at_path("subdir1/mydata567"))
1224 d.addCallback(lambda n: n.check())
1225 def _checked2(results):
1226 # mydata567 is large and lives in a CHK
1227 (needed, total, found, sharemap) = results
1228 self.failUnlessEqual(needed, 3)
1229 self.failUnlessEqual(total, 10)
1230 self.failUnlessEqual(found, 10)
1231 self.failUnlessEqual(len(sharemap), 10)
1232 for shnum in range(10):
1233 self.failUnlessEqual(len(sharemap[shnum]), 1)
1234 d.addCallback(_checked2)
1238 def _test_verifier(self, res):
1239 checker1 = self.clients[1].getServiceNamed("checker")
1240 d = self._private_node.build_manifest()
1241 def _check_all(manifest):
1244 dl.append(checker1.verify(si))
1245 return deferredutil.DeferredListShouldSucceed(dl)
1246 d.addCallback(_check_all)
1249 self.failUnless(i is True)
1250 d.addCallback(_done)
1251 d.addCallback(lambda res: checker1.verify(None))
1252 d.addCallback(self.failUnlessEqual, True)