2 from base64 import b32encode
3 import os, sys, time, re
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.application import service
11 from allmydata import client, uri, download, upload, storage, mutable, offloaded
12 from allmydata.introducer import IntroducerNode
13 from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
14 from allmydata.util import log
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
17 from allmydata.mutable import NotMutableError
18 from allmydata.stats import PickleStatsGatherer
19 from foolscap.eventual import flushEventualQueue
20 from foolscap import DeadReferenceError, Tub
21 from twisted.python.failure import Failure
22 from twisted.web.client import getPage
23 from twisted.web.error import Error
25 def flush_but_dont_ignore(res):
26 d = flushEventualQueue()
33 This is some data to publish to the virtual drive, which needs to be large
34 enough to not fit inside a LIT uri.
37 class CountingDataUploadable(upload.Data):
39 interrupt_after = None
40 interrupt_after_d = None
42 def read(self, length):
43 self.bytes_read += length
44 if self.interrupt_after is not None:
45 if self.bytes_read > self.interrupt_after:
46 self.interrupt_after = None
47 self.interrupt_after_d.callback(self)
48 return upload.Data.read(self, length)
51 class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
54 self.sparent = service.MultiService()
55 self.sparent.startService()
57 log.msg("shutting down SystemTest services")
58 d = self.sparent.stopService()
59 d.addBoth(flush_but_dont_ignore)
62 def getdir(self, subdir):
63 return os.path.join(self.basedir, subdir)
65 def add_service(self, s):
66 s.setServiceParent(self.sparent)
69 def set_up_nodes(self, NUMCLIENTS=5, createprivdir=False):
70 self.numclients = NUMCLIENTS
71 self.createprivdir = createprivdir
72 iv_dir = self.getdir("introducer")
73 if not os.path.isdir(iv_dir):
74 fileutil.make_dirs(iv_dir)
75 f = open(os.path.join(iv_dir, "webport"), "w")
76 f.write("tcp:0:interface=127.0.0.1\n")
78 iv = IntroducerNode(basedir=iv_dir)
79 self.introducer = self.add_service(iv)
80 d = self.introducer.when_tub_ready()
81 d.addCallback(self._get_introducer_web)
82 d.addCallback(self._set_up_stats_gatherer)
83 d.addCallback(self._set_up_nodes_2)
84 d.addCallback(self._grab_stats)
87 def _get_introducer_web(self, res):
88 f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
89 self.introweb_url = f.read().strip()
92 def _set_up_stats_gatherer(self, res):
93 statsdir = self.getdir("stats_gatherer")
94 fileutil.make_dirs(statsdir)
97 l = t.listenOn("tcp:0")
99 t.setLocation("localhost:%d" % p)
101 self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
102 self.add_service(self.stats_gatherer)
103 self.stats_gatherer_furl = self.stats_gatherer.get_furl()
105 def _set_up_nodes_2(self, res):
107 self.introducer_furl = q.introducer_url
110 for i in range(self.numclients):
111 basedir = self.getdir("client%d" % i)
112 basedirs.append(basedir)
113 fileutil.make_dirs(basedir)
115 # client[0] runs a webserver and a helper
116 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
117 open(os.path.join(basedir, "run_helper"), "w").write("yes\n")
119 # client[3] runs a webserver and uses a helper
120 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
121 if self.createprivdir:
122 fileutil.make_dirs(os.path.join(basedir, "private"))
123 open(os.path.join(basedir, "private", "root_dir.cap"), "w")
124 open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
125 open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
127 # start client[0], wait for it's tub to be ready (at which point it
128 # will have registered the helper furl).
129 c = self.add_service(client.Client(basedir=basedirs[0]))
130 self.clients.append(c)
131 d = c.when_tub_ready()
133 f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
134 helper_furl = f.read()
136 self.helper_furl = helper_furl
137 f = open(os.path.join(basedirs[3],"helper.furl"), "w")
141 # this starts the rest of the clients
142 for i in range(1, self.numclients):
143 c = self.add_service(client.Client(basedir=basedirs[i]))
144 self.clients.append(c)
146 return self.wait_for_connections()
147 d.addCallback(_ready)
150 # now find out where the web port was
151 l = self.clients[0].getServiceNamed("webish").listener
152 port = l._port.getHost().port
153 self.webish_url = "http://localhost:%d/" % port
154 # and the helper-using webport
155 l = self.clients[3].getServiceNamed("webish").listener
156 port = l._port.getHost().port
157 self.helper_webish_url = "http://localhost:%d/" % port
158 d.addCallback(_connected)
161 def _grab_stats(self, res):
162 d = self.stats_gatherer.poll()
165 def bounce_client(self, num):
166 c = self.clients[num]
167 d = c.disownServiceParent()
168 # I think windows requires a moment to let the connection really stop
169 # and the port number made available for re-use. TODO: examine the
170 # behavior, see if this is really the problem, see if we can do
171 # better than blindly waiting for a second.
172 d.addCallback(self.stall, 1.0)
174 new_c = client.Client(basedir=self.getdir("client%d" % num))
175 self.clients[num] = new_c
176 self.add_service(new_c)
177 return new_c.when_tub_ready()
178 d.addCallback(_stopped)
179 d.addCallback(lambda res: self.wait_for_connections())
180 def _maybe_get_webport(res):
182 # now find out where the web port was
183 l = self.clients[0].getServiceNamed("webish").listener
184 port = l._port.getHost().port
185 self.webish_url = "http://localhost:%d/" % port
186 d.addCallback(_maybe_get_webport)
189 def add_extra_node(self, client_num, helper_furl=None,
190 add_to_sparent=False):
191 # usually this node is *not* parented to our self.sparent, so we can
192 # shut it down separately from the rest, to exercise the
193 # connection-lost code
194 basedir = self.getdir("client%d" % client_num)
195 if not os.path.isdir(basedir):
196 fileutil.make_dirs(basedir)
197 open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
199 f = open(os.path.join(basedir, "helper.furl") ,"w")
200 f.write(helper_furl+"\n")
203 c = client.Client(basedir=basedir)
204 self.clients.append(c)
207 c.setServiceParent(self.sparent)
210 d = self.wait_for_connections()
211 d.addCallback(lambda res: c)
214 def _check_connections(self):
215 for c in self.clients:
216 ic = c.introducer_client
217 if not ic.connected_to_introducer():
219 if len(ic.get_all_peerids()) != self.numclients:
223 def wait_for_connections(self, ignored=None):
224 # TODO: replace this with something that takes a list of peerids and
225 # fires when they've all been heard from, instead of using a count
227 return self.poll(self._check_connections, timeout=200)
229 def test_connections(self):
230 self.basedir = "system/SystemTest/test_connections"
231 d = self.set_up_nodes()
232 self.extra_node = None
233 d.addCallback(lambda res: self.add_extra_node(self.numclients))
234 def _check(extra_node):
235 self.extra_node = extra_node
236 for c in self.clients:
237 all_peerids = list(c.get_all_peerids())
238 self.failUnlessEqual(len(all_peerids), self.numclients+1)
239 permuted_peers = list(c.get_permuted_peers("storage", "a"))
240 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
242 d.addCallback(_check)
243 def _shutdown_extra_node(res):
245 return self.extra_node.stopService()
247 d.addBoth(_shutdown_extra_node)
249 test_connections.timeout = 300
250 # test_connections is subsumed by test_upload_and_download, and takes
251 # quite a while to run on a slow machine (because of all the TLS
252 # connections that must be established). If we ever rework the introducer
253 # code to such an extent that we're not sure if it works anymore, we can
254 # reinstate this test until it does.
257 def test_upload_and_download_random_key(self):
258 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
259 return self._test_upload_and_download(False)
260 test_upload_and_download_random_key.timeout = 4800
262 def test_upload_and_download_content_hash_key(self):
263 self.basedir = "system/SystemTest/test_upload_and_download_CHK"
264 return self._test_upload_and_download(True)
265 test_upload_and_download_content_hash_key.timeout = 4800
267 def _test_upload_and_download(self, contenthashkey):
268 # we use 4000 bytes of data, which will result in about 400k written
269 # to disk among all our simulated nodes
270 DATA = "Some data to upload\n" * 200
271 d = self.set_up_nodes()
272 def _check_connections(res):
273 for c in self.clients:
274 all_peerids = list(c.get_all_peerids())
275 self.failUnlessEqual(len(all_peerids), self.numclients)
276 permuted_peers = list(c.get_permuted_peers("storage", "a"))
277 self.failUnlessEqual(len(permuted_peers), self.numclients)
278 d.addCallback(_check_connections)
282 u = self.clients[0].getServiceNamed("uploader")
284 # we crank the max segsize down to 1024b for the duration of this
285 # test, so we can exercise multiple segments. It is important
286 # that this is not a multiple of the segment size, so that the
287 # tail segment is not the same length as the others. This actualy
288 # gets rounded up to 1025 to be a multiple of the number of
289 # required shares (since we use 25 out of 100 FEC).
290 up = upload.Data(DATA, contenthashkey=contenthashkey)
291 up.max_segment_size = 1024
294 d.addCallback(_do_upload)
295 def _upload_done(results):
297 log.msg("upload finished: uri is %s" % (uri,))
299 dl = self.clients[1].getServiceNamed("downloader")
301 d.addCallback(_upload_done)
303 def _upload_again(res):
304 # Upload again. If contenthashkey then this ought to be
305 # short-circuited, however with the way we currently generate URIs
306 # (i.e. because they include the roothash), we have to do all of the
307 # encoding work, and only get to save on the upload part.
308 log.msg("UPLOADING AGAIN")
309 up = upload.Data(DATA, contenthashkey=contenthashkey)
310 up.max_segment_size = 1024
311 d1 = self.uploader.upload(up)
312 d.addCallback(_upload_again)
314 def _download_to_data(res):
315 log.msg("DOWNLOADING")
316 return self.downloader.download_to_data(self.uri)
317 d.addCallback(_download_to_data)
318 def _download_to_data_done(data):
319 log.msg("download finished")
320 self.failUnlessEqual(data, DATA)
321 d.addCallback(_download_to_data_done)
323 target_filename = os.path.join(self.basedir, "download.target")
324 def _download_to_filename(res):
325 return self.downloader.download_to_filename(self.uri,
327 d.addCallback(_download_to_filename)
328 def _download_to_filename_done(res):
329 newdata = open(target_filename, "rb").read()
330 self.failUnlessEqual(newdata, DATA)
331 d.addCallback(_download_to_filename_done)
333 target_filename2 = os.path.join(self.basedir, "download.target2")
334 def _download_to_filehandle(res):
335 fh = open(target_filename2, "wb")
336 return self.downloader.download_to_filehandle(self.uri, fh)
337 d.addCallback(_download_to_filehandle)
338 def _download_to_filehandle_done(fh):
340 newdata = open(target_filename2, "rb").read()
341 self.failUnlessEqual(newdata, DATA)
342 d.addCallback(_download_to_filehandle_done)
344 def _download_nonexistent_uri(res):
345 baduri = self.mangle_uri(self.uri)
346 log.msg("about to download non-existent URI", level=log.UNUSUAL,
347 facility="tahoe.tests")
348 d1 = self.downloader.download_to_data(baduri)
349 def _baduri_should_fail(res):
350 log.msg("finished downloading non-existend URI",
351 level=log.UNUSUAL, facility="tahoe.tests")
352 self.failUnless(isinstance(res, Failure))
353 self.failUnless(res.check(download.NotEnoughPeersError),
354 "expected NotEnoughPeersError, got %s" % res)
355 # TODO: files that have zero peers should get a special kind
356 # of NotEnoughPeersError, which can be used to suggest that
357 # the URI might be wrong or that they've never uploaded the
358 # file in the first place.
359 d1.addBoth(_baduri_should_fail)
361 d.addCallback(_download_nonexistent_uri)
363 # add a new node, which doesn't accept shares, and only uses the
365 d.addCallback(lambda res: self.add_extra_node(self.numclients,
367 add_to_sparent=True))
368 def _added(extra_node):
369 self.extra_node = extra_node
370 extra_node.getServiceNamed("storage").sizelimit = 0
371 d.addCallback(_added)
373 HELPER_DATA = "Data that needs help to upload" * 1000
374 def _upload_with_helper(res):
375 u = upload.Data(HELPER_DATA, contenthashkey=contenthashkey)
376 d = self.extra_node.upload(u)
377 def _uploaded(results):
379 return self.downloader.download_to_data(uri)
380 d.addCallback(_uploaded)
382 self.failUnlessEqual(newdata, HELPER_DATA)
383 d.addCallback(_check)
385 d.addCallback(_upload_with_helper)
387 def _upload_duplicate_with_helper(res):
388 u = upload.Data(HELPER_DATA, contenthashkey=contenthashkey)
389 u.debug_stash_RemoteEncryptedUploadable = True
390 d = self.extra_node.upload(u)
391 def _uploaded(results):
393 return self.downloader.download_to_data(uri)
394 d.addCallback(_uploaded)
396 self.failUnlessEqual(newdata, HELPER_DATA)
397 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
398 "uploadable started uploading, should have been avoided")
399 d.addCallback(_check)
402 d.addCallback(_upload_duplicate_with_helper)
404 def _upload_resumable(res):
405 DATA = "Data that needs help to upload and gets interrupted" * 1000
406 u1 = CountingDataUploadable(DATA, contenthashkey=contenthashkey)
407 u2 = CountingDataUploadable(DATA, contenthashkey=contenthashkey)
409 # we interrupt the connection after about 5kB by shutting down
410 # the helper, then restartingit.
411 u1.interrupt_after = 5000
412 u1.interrupt_after_d = defer.Deferred()
413 u1.interrupt_after_d.addCallback(lambda res:
414 self.bounce_client(0))
416 # sneak into the helper and reduce its chunk size, so that our
417 # debug_interrupt will sever the connection on about the fifth
418 # chunk fetched. This makes sure that we've started to write the
419 # new shares before we abandon them, which exercises the
420 # abort/delete-partial-share code. TODO: find a cleaner way to do
421 # this. I know that this will affect later uses of the helper in
422 # this same test run, but I'm not currently worried about it.
423 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
425 d = self.extra_node.upload(u1)
427 def _should_not_finish(res):
428 self.fail("interrupted upload should have failed, not finished"
429 " with result %s" % (res,))
431 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
433 # make sure we actually interrupted it before finishing the
435 self.failUnless(u1.bytes_read < len(DATA),
436 "read %d out of %d total" % (u1.bytes_read,
439 log.msg("waiting for reconnect", level=log.NOISY,
440 facility="tahoe.test.test_system")
441 # now, we need to give the nodes a chance to notice that this
442 # connection has gone away. When this happens, the storage
443 # servers will be told to abort their uploads, removing the
444 # partial shares. Unfortunately this involves TCP messages
445 # going through the loopback interface, and we can't easily
446 # predict how long that will take. If it were all local, we
447 # could use fireEventually() to stall. Since we don't have
448 # the right introduction hooks, the best we can do is use a
449 # fixed delay. TODO: this is fragile.
450 u1.interrupt_after_d.addCallback(self.stall, 2.0)
451 return u1.interrupt_after_d
452 d.addCallbacks(_should_not_finish, _interrupted)
454 def _disconnected(res):
455 # check to make sure the storage servers aren't still hanging
456 # on to the partial share: their incoming/ directories should
458 log.msg("disconnected", level=log.NOISY,
459 facility="tahoe.test.test_system")
460 for i in range(self.numclients):
461 incdir = os.path.join(self.getdir("client%d" % i),
462 "storage", "shares", "incoming")
463 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
464 d.addCallback(_disconnected)
466 # then we need to give the reconnector a chance to
467 # reestablish the connection to the helper.
468 d.addCallback(lambda res:
469 log.msg("wait_for_connections", level=log.NOISY,
470 facility="tahoe.test.test_system"))
471 d.addCallback(lambda res: self.wait_for_connections())
474 d.addCallback(lambda res:
475 log.msg("uploading again", level=log.NOISY,
476 facility="tahoe.test.test_system"))
477 d.addCallback(lambda res: self.extra_node.upload(u2))
479 def _uploaded(results):
481 log.msg("Second upload complete", level=log.NOISY,
482 facility="tahoe.test.test_system")
484 # this is really bytes received rather than sent, but it's
485 # convenient and basically measures the same thing
486 bytes_sent = results.ciphertext_fetched
488 # We currently don't support resumption of upload if the data is
489 # encrypted with a random key. (Because that would require us
490 # to store the key locally and re-use it on the next upload of
491 # this file, which isn't a bad thing to do, but we currently
494 # Make sure we did not have to read the whole file the
495 # second time around .
496 self.failUnless(bytes_sent < len(DATA),
497 "resumption didn't save us any work:"
498 " read %d bytes out of %d total" %
499 (bytes_sent, len(DATA)))
501 # Make sure we did have to read the whole file the second
502 # time around -- because the one that we partially uploaded
503 # earlier was encrypted with a different random key.
504 self.failIf(bytes_sent < len(DATA),
505 "resumption saved us some work even though we were using random keys:"
506 " read %d bytes out of %d total" %
507 (bytes_sent, len(DATA)))
508 return self.downloader.download_to_data(uri)
509 d.addCallback(_uploaded)
512 self.failUnlessEqual(newdata, DATA)
513 # If using a content hash key, then also check that the helper
514 # has removed the temp file from its directories.
516 basedir = os.path.join(self.getdir("client0"), "helper")
517 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
518 self.failUnlessEqual(files, [])
519 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
520 self.failUnlessEqual(files, [])
521 d.addCallback(_check)
523 d.addCallback(_upload_resumable)
527 def _find_shares(self, basedir):
529 for (dirpath, dirnames, filenames) in os.walk(basedir):
530 if "storage" not in dirpath:
534 pieces = dirpath.split(os.sep)
535 if pieces[-4] == "storage" and pieces[-3] == "shares":
536 # we're sitting in .../storage/shares/$START/$SINDEX , and there
537 # are sharefiles here
538 assert pieces[-5].startswith("client")
539 client_num = int(pieces[-5][-1])
540 storage_index_s = pieces[-1]
541 storage_index = storage.si_a2b(storage_index_s)
542 for sharename in filenames:
543 shnum = int(sharename)
544 filename = os.path.join(dirpath, sharename)
545 data = (client_num, storage_index, filename, shnum)
548 self.fail("unable to find any share files in %s" % basedir)
551 def _corrupt_mutable_share(self, filename, which):
552 msf = storage.MutableShareFile(filename)
553 datav = msf.readv([ (0, 1000000) ])
554 final_share = datav[0]
555 assert len(final_share) < 1000000 # ought to be truncated
556 pieces = mutable.unpack_share(final_share)
557 (seqnum, root_hash, IV, k, N, segsize, datalen,
558 verification_key, signature, share_hash_chain, block_hash_tree,
559 share_data, enc_privkey) = pieces
561 if which == "seqnum":
564 root_hash = self.flip_bit(root_hash)
566 IV = self.flip_bit(IV)
567 elif which == "segsize":
568 segsize = segsize + 15
569 elif which == "pubkey":
570 verification_key = self.flip_bit(verification_key)
571 elif which == "signature":
572 signature = self.flip_bit(signature)
573 elif which == "share_hash_chain":
574 nodenum = share_hash_chain.keys()[0]
575 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
576 elif which == "block_hash_tree":
577 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
578 elif which == "share_data":
579 share_data = self.flip_bit(share_data)
580 elif which == "encprivkey":
581 enc_privkey = self.flip_bit(enc_privkey)
583 prefix = mutable.pack_prefix(seqnum, root_hash, IV, k, N,
585 final_share = mutable.pack_share(prefix,
592 msf.writev( [(0, final_share)], None)
594 def test_mutable(self):
595 self.basedir = "system/SystemTest/test_mutable"
596 DATA = "initial contents go here." # 25 bytes % 3 != 0
597 NEWDATA = "new contents yay"
598 NEWERDATA = "this is getting old"
600 d = self.set_up_nodes()
602 def _create_mutable(res):
604 log.msg("starting create_mutable_file")
605 d1 = c.create_mutable_file(DATA)
607 log.msg("DONE: %s" % (res,))
608 self._mutable_node_1 = res
610 d1.addCallback(_done)
612 d.addCallback(_create_mutable)
614 def _test_debug(res):
615 # find a share. It is important to run this while there is only
616 # one slot in the grid.
617 shares = self._find_shares(self.basedir)
618 (client_num, storage_index, filename, shnum) = shares[0]
619 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
621 log.msg(" for clients[%d]" % client_num)
623 out,err = StringIO(), StringIO()
624 rc = runner.runner(["dump-share",
626 stdout=out, stderr=err)
627 output = out.getvalue()
628 self.failUnlessEqual(rc, 0)
630 self.failUnless("Mutable slot found:\n" in output)
631 self.failUnless("share_type: SDMF\n" in output)
632 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
633 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
634 self.failUnless(" num_extra_leases: 0\n" in output)
635 # the pubkey size can vary by a byte, so the container might
636 # be a bit larger on some runs.
637 m = re.search(r'^ container_size: (\d+)$', output, re.M)
639 container_size = int(m.group(1))
640 self.failUnless(2037 <= container_size <= 2049, container_size)
641 m = re.search(r'^ data_length: (\d+)$', output, re.M)
643 data_length = int(m.group(1))
644 self.failUnless(2037 <= data_length <= 2049, data_length)
645 self.failUnless(" secrets are for nodeid: %s\n" % peerid
647 self.failUnless(" SDMF contents:\n" in output)
648 self.failUnless(" seqnum: 1\n" in output)
649 self.failUnless(" required_shares: 3\n" in output)
650 self.failUnless(" total_shares: 10\n" in output)
651 self.failUnless(" segsize: 27\n" in output, (output, filename))
652 self.failUnless(" datalen: 25\n" in output)
653 # the exact share_hash_chain nodes depends upon the sharenum,
654 # and is more of a hassle to compute than I want to deal with
656 self.failUnless(" share_hash_chain: " in output)
657 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
658 except unittest.FailTest:
660 print "dump-share output was:"
663 d.addCallback(_test_debug)
667 # first, let's see if we can use the existing node to retrieve the
668 # contents. This allows it to use the cached pubkey and maybe the
669 # latest-known sharemap.
671 d.addCallback(lambda res: self._mutable_node_1.download_to_data())
672 def _check_download_1(res):
673 self.failUnlessEqual(res, DATA)
674 # now we see if we can retrieve the data from a new node,
675 # constructed using the URI of the original one. We do this test
676 # on the same client that uploaded the data.
677 uri = self._mutable_node_1.get_uri()
678 log.msg("starting retrieve1")
679 newnode = self.clients[0].create_node_from_uri(uri)
680 return newnode.download_to_data()
681 d.addCallback(_check_download_1)
683 def _check_download_2(res):
684 self.failUnlessEqual(res, DATA)
685 # same thing, but with a different client
686 uri = self._mutable_node_1.get_uri()
687 newnode = self.clients[1].create_node_from_uri(uri)
688 log.msg("starting retrieve2")
689 d1 = newnode.download_to_data()
690 d1.addCallback(lambda res: (res, newnode))
692 d.addCallback(_check_download_2)
694 def _check_download_3((res, newnode)):
695 self.failUnlessEqual(res, DATA)
697 log.msg("starting replace1")
698 d1 = newnode.replace(NEWDATA)
699 d1.addCallback(lambda res: newnode.download_to_data())
701 d.addCallback(_check_download_3)
703 def _check_download_4(res):
704 self.failUnlessEqual(res, NEWDATA)
705 # now create an even newer node and replace the data on it. This
706 # new node has never been used for download before.
707 uri = self._mutable_node_1.get_uri()
708 newnode1 = self.clients[2].create_node_from_uri(uri)
709 newnode2 = self.clients[3].create_node_from_uri(uri)
710 self._newnode3 = self.clients[3].create_node_from_uri(uri)
711 log.msg("starting replace2")
712 d1 = newnode1.replace(NEWERDATA)
713 d1.addCallback(lambda res: newnode2.download_to_data())
715 d.addCallback(_check_download_4)
717 def _check_download_5(res):
718 log.msg("finished replace2")
719 self.failUnlessEqual(res, NEWERDATA)
720 d.addCallback(_check_download_5)
722 def _corrupt_shares(res):
723 # run around and flip bits in all but k of the shares, to test
725 shares = self._find_shares(self.basedir)
726 ## sort by share number
727 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
728 where = dict([ (shnum, filename)
729 for (client_num, storage_index, filename, shnum)
731 assert len(where) == 10 # this test is designed for 3-of-10
732 for shnum, filename in where.items():
733 # shares 7,8,9 are left alone. read will check
734 # (share_hash_chain, block_hash_tree, share_data). New
735 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
736 # segsize, signature).
738 # read: this will trigger "pubkey doesn't match
740 self._corrupt_mutable_share(filename, "pubkey")
741 self._corrupt_mutable_share(filename, "encprivkey")
743 # triggers "signature is invalid"
744 self._corrupt_mutable_share(filename, "seqnum")
746 # triggers "signature is invalid"
747 self._corrupt_mutable_share(filename, "R")
749 # triggers "signature is invalid"
750 self._corrupt_mutable_share(filename, "segsize")
752 self._corrupt_mutable_share(filename, "share_hash_chain")
754 self._corrupt_mutable_share(filename, "block_hash_tree")
756 self._corrupt_mutable_share(filename, "share_data")
757 # other things to correct: IV, signature
758 # 7,8,9 are left alone
760 # note that initial_query_count=5 means that we'll hit the
761 # first 5 servers in effectively random order (based upon
762 # response time), so we won't necessarily ever get a "pubkey
763 # doesn't match fingerprint" error (if we hit shnum>=1 before
764 # shnum=0, we pull the pubkey from there). To get repeatable
765 # specific failures, we need to set initial_query_count=1,
766 # but of course that will change the sequencing behavior of
767 # the retrieval process. TODO: find a reasonable way to make
768 # this a parameter, probably when we expand this test to test
769 # for one failure mode at a time.
771 # when we retrieve this, we should get three signature
772 # failures (where we've mangled seqnum, R, and segsize). The
774 d.addCallback(_corrupt_shares)
776 d.addCallback(lambda res: self._newnode3.download_to_data())
777 d.addCallback(_check_download_5)
779 def _check_empty_file(res):
780 # make sure we can create empty files, this usually screws up the
782 d1 = self.clients[2].create_mutable_file("")
783 d1.addCallback(lambda newnode: newnode.download_to_data())
784 d1.addCallback(lambda res: self.failUnlessEqual("", res))
786 d.addCallback(_check_empty_file)
788 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
789 def _created_dirnode(dnode):
790 log.msg("_created_dirnode(%s)" % (dnode,))
792 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
793 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
794 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
795 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
796 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
797 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
798 d1.addCallback(lambda res: dnode.build_manifest())
799 d1.addCallback(lambda manifest:
800 self.failUnlessEqual(len(manifest), 1))
802 d.addCallback(_created_dirnode)
805 # The default 120 second timeout went off when running it under valgrind
806 # on my old Windows laptop, so I'm bumping up the timeout.
807 test_mutable.timeout = 240
809 def flip_bit(self, good):
810 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
812 def mangle_uri(self, gooduri):
813 # change the key, which changes the storage index, which means we'll
814 # be asking about the wrong file, so nobody will have any shares
815 u = IFileURI(gooduri)
816 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
817 uri_extension_hash=u.uri_extension_hash,
818 needed_shares=u.needed_shares,
819 total_shares=u.total_shares,
821 return u2.to_string()
823 # TODO: add a test which mangles the uri_extension_hash instead, and
824 # should fail due to not being able to get a valid uri_extension block.
825 # Also a test which sneakily mangles the uri_extension block to change
826 # some of the validation data, so it will fail in the post-download phase
827 # when the file's crypttext integrity check fails. Do the same thing for
828 # the key, which should cause the download to fail the post-download
829 # plaintext_hash check.
831 def test_vdrive(self):
832 self.basedir = "system/SystemTest/test_vdrive"
833 self.data = LARGE_DATA
834 d = self.set_up_nodes(createprivdir=True)
835 d.addCallback(self._test_introweb)
836 d.addCallback(self.log, "starting publish")
837 d.addCallback(self._do_publish1)
838 d.addCallback(self._test_runner)
839 d.addCallback(self._do_publish2)
840 # at this point, we have the following filesystem (where "R" denotes
841 # self._root_directory_uri):
844 # R/subdir1/mydata567
846 # R/subdir1/subdir2/mydata992
848 d.addCallback(lambda res: self.bounce_client(0))
849 d.addCallback(self.log, "bounced client0")
851 d.addCallback(self._check_publish1)
852 d.addCallback(self.log, "did _check_publish1")
853 d.addCallback(self._check_publish2)
854 d.addCallback(self.log, "did _check_publish2")
855 d.addCallback(self._do_publish_private)
856 d.addCallback(self.log, "did _do_publish_private")
857 # now we also have (where "P" denotes a new dir):
858 # P/personal/sekrit data
859 # P/s2-rw -> /subdir1/subdir2/
860 # P/s2-ro -> /subdir1/subdir2/ (read-only)
861 d.addCallback(self._check_publish_private)
862 d.addCallback(self.log, "did _check_publish_private")
863 d.addCallback(self._test_web)
864 d.addCallback(self._test_control)
865 d.addCallback(self._test_cli)
866 # P now has four top-level children:
867 # P/personal/sekrit data
870 # P/test_put/ (empty)
871 d.addCallback(self._test_checker)
872 d.addCallback(self._test_verifier)
873 d.addCallback(self._grab_stats)
875 test_vdrive.timeout = 1100
877 def _test_introweb(self, res):
878 d = getPage(self.introweb_url, method="GET", followRedirect=True)
881 self.failUnless("allmydata: %s" % str(allmydata.__version__)
883 self.failUnless("Clients:" in res)
884 except unittest.FailTest:
886 print "GET %s output was:" % self.introweb_url
889 d.addCallback(_check)
892 def _do_publish1(self, res):
893 ut = upload.Data(self.data)
895 d = c0.create_empty_dirnode()
896 def _made_root(new_dirnode):
897 self._root_directory_uri = new_dirnode.get_uri()
898 return c0.create_node_from_uri(self._root_directory_uri)
899 d.addCallback(_made_root)
900 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
901 def _made_subdir1(subdir1_node):
902 self._subdir1_node = subdir1_node
903 d1 = subdir1_node.add_file(u"mydata567", ut)
904 d1.addCallback(self.log, "publish finished")
905 def _stash_uri(filenode):
906 self.uri = filenode.get_uri()
907 d1.addCallback(_stash_uri)
909 d.addCallback(_made_subdir1)
912 def _do_publish2(self, res):
913 ut = upload.Data(self.data)
914 d = self._subdir1_node.create_empty_directory(u"subdir2")
915 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
918 def log(self, res, msg, **kwargs):
919 # print "MSG: %s RES: %s" % (msg, res)
920 log.msg(msg, **kwargs)
923 def stall(self, res, delay=1.0):
925 reactor.callLater(delay, d.callback, res)
928 def _do_publish_private(self, res):
929 self.smalldata = "sssh, very secret stuff"
930 ut = upload.Data(self.smalldata)
931 d = self.clients[0].create_empty_dirnode()
932 d.addCallback(self.log, "GOT private directory")
933 def _got_new_dir(privnode):
934 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
935 d1 = privnode.create_empty_directory(u"personal")
936 d1.addCallback(self.log, "made P/personal")
937 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
938 d1.addCallback(self.log, "made P/personal/sekrit data")
939 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
941 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
942 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
944 d1.addCallback(_got_s2)
945 d1.addCallback(lambda res: privnode)
947 d.addCallback(_got_new_dir)
950 def _check_publish1(self, res):
951 # this one uses the iterative API
953 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
954 d.addCallback(self.log, "check_publish1 got /")
955 d.addCallback(lambda root: root.get(u"subdir1"))
956 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
957 d.addCallback(lambda filenode: filenode.download_to_data())
958 d.addCallback(self.log, "get finished")
960 self.failUnlessEqual(data, self.data)
961 d.addCallback(_get_done)
964 def _check_publish2(self, res):
965 # this one uses the path-based API
966 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
967 d = rootnode.get_child_at_path(u"subdir1")
968 d.addCallback(lambda dirnode:
969 self.failUnless(IDirectoryNode.providedBy(dirnode)))
970 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
971 d.addCallback(lambda filenode: filenode.download_to_data())
972 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
974 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
975 def _got_filenode(filenode):
976 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
977 assert fnode == filenode
978 d.addCallback(_got_filenode)
981 def _check_publish_private(self, resnode):
982 # this one uses the path-based API
983 self._private_node = resnode
985 d = self._private_node.get_child_at_path(u"personal")
986 def _got_personal(personal):
987 self._personal_node = personal
989 d.addCallback(_got_personal)
991 d.addCallback(lambda dirnode:
992 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
994 return self._private_node.get_child_at_path(path)
996 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
997 d.addCallback(lambda filenode: filenode.download_to_data())
998 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
999 d.addCallback(lambda res: get_path(u"s2-rw"))
1000 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
1001 d.addCallback(lambda res: get_path(u"s2-ro"))
1002 def _got_s2ro(dirnode):
1003 self.failUnless(dirnode.is_mutable(), dirnode)
1004 self.failUnless(dirnode.is_readonly(), dirnode)
1005 d1 = defer.succeed(None)
1006 d1.addCallback(lambda res: dirnode.list())
1007 d1.addCallback(self.log, "dirnode.list")
1009 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
1011 d1.addCallback(self.log, "doing add_file(ro)")
1012 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.")
1013 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
1015 d1.addCallback(self.log, "doing get(ro)")
1016 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
1017 d1.addCallback(lambda filenode:
1018 self.failUnless(IFileNode.providedBy(filenode)))
1020 d1.addCallback(self.log, "doing delete(ro)")
1021 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
1023 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
1025 d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
1027 personal = self._personal_node
1028 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
1030 d1.addCallback(self.log, "doing move_child_to(ro)2")
1031 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
1033 d1.addCallback(self.log, "finished with _got_s2ro")
1035 d.addCallback(_got_s2ro)
1036 def _got_home(dummy):
1037 home = self._private_node
1038 personal = self._personal_node
1039 d1 = defer.succeed(None)
1040 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
1041 d1.addCallback(lambda res:
1042 personal.move_child_to(u"sekrit data",home,u"sekrit"))
1044 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
1045 d1.addCallback(lambda res:
1046 home.move_child_to(u"sekrit", home, u"sekrit data"))
1048 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
1049 d1.addCallback(lambda res:
1050 home.move_child_to(u"sekrit data", personal))
1052 d1.addCallback(lambda res: home.build_manifest())
1053 d1.addCallback(self.log, "manifest")
1056 # P/personal/sekrit data
1057 # P/s2-rw (same as P/s2-ro)
1058 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1059 d1.addCallback(lambda manifest:
1060 self.failUnlessEqual(len(manifest), 4))
1062 d.addCallback(_got_home)
1065 def shouldFail(self, res, expected_failure, which, substring=None):
1066 if isinstance(res, Failure):
1067 res.trap(expected_failure)
1069 self.failUnless(substring in str(res),
1070 "substring '%s' not in '%s'"
1071 % (substring, str(res)))
1073 self.fail("%s was supposed to raise %s, not get '%s'" %
1074 (which, expected_failure, res))
1076 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1077 assert substring is None or isinstance(substring, str)
1078 d = defer.maybeDeferred(callable, *args, **kwargs)
1080 if isinstance(res, Failure):
1081 res.trap(expected_failure)
1083 self.failUnless(substring in str(res),
1084 "substring '%s' not in '%s'"
1085 % (substring, str(res)))
1087 self.fail("%s was supposed to raise %s, not get '%s'" %
1088 (which, expected_failure, res))
1092 def PUT(self, urlpath, data):
1093 url = self.webish_url + urlpath
1094 return getPage(url, method="PUT", postdata=data)
1096 def GET(self, urlpath, followRedirect=False):
1097 url = self.webish_url + urlpath
1098 return getPage(url, method="GET", followRedirect=followRedirect)
1100 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1102 url = self.helper_webish_url + urlpath
1104 url = self.webish_url + urlpath
1105 sepbase = "boogabooga"
1106 sep = "--" + sepbase
1109 form.append('Content-Disposition: form-data; name="_charset"')
1111 form.append('UTF-8')
1113 for name, value in fields.iteritems():
1114 if isinstance(value, tuple):
1115 filename, value = value
1116 form.append('Content-Disposition: form-data; name="%s"; '
1117 'filename="%s"' % (name, filename.encode("utf-8")))
1119 form.append('Content-Disposition: form-data; name="%s"' % name)
1121 form.append(str(value))
1124 body = "\r\n".join(form) + "\r\n"
1125 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1127 return getPage(url, method="POST", postdata=body,
1128 headers=headers, followRedirect=followRedirect)
1130 def _test_web(self, res):
1131 base = self.webish_url
1132 public = "uri/" + self._root_directory_uri
1134 def _got_welcome(page):
1135 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1136 self.failUnless(expected in page,
1137 "I didn't see the right 'connected storage servers'"
1138 " message in: %s" % page
1140 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1141 self.failUnless(expected in page,
1142 "I didn't see the right 'My nodeid' message "
1144 d.addCallback(_got_welcome)
1145 d.addCallback(self.log, "done with _got_welcome")
1147 # get the welcome page from the node that uses the helper too
1148 d.addCallback(lambda res: getPage(self.helper_webish_url))
1149 def _got_welcome_helper(page):
1150 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1152 d.addCallback(_got_welcome_helper)
1154 d.addCallback(lambda res: getPage(base + public))
1155 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1156 def _got_subdir1(page):
1157 # there ought to be an href for our file
1158 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1159 self.failUnless(">mydata567</a>" in page)
1160 d.addCallback(_got_subdir1)
1161 d.addCallback(self.log, "done with _got_subdir1")
1162 d.addCallback(lambda res:
1163 getPage(base + public + "/subdir1/mydata567"))
1164 def _got_data(page):
1165 self.failUnlessEqual(page, self.data)
1166 d.addCallback(_got_data)
1168 # download from a URI embedded in a URL
1169 d.addCallback(self.log, "_get_from_uri")
1170 def _get_from_uri(res):
1171 return getPage(base + "uri/%s?filename=%s"
1172 % (self.uri, "mydata567"))
1173 d.addCallback(_get_from_uri)
1174 def _got_from_uri(page):
1175 self.failUnlessEqual(page, self.data)
1176 d.addCallback(_got_from_uri)
1178 # download from a URI embedded in a URL, second form
1179 d.addCallback(self.log, "_get_from_uri2")
1180 def _get_from_uri2(res):
1181 return getPage(base + "uri?uri=%s" % (self.uri,))
1182 d.addCallback(_get_from_uri2)
1183 d.addCallback(_got_from_uri)
1185 # download from a bogus URI, make sure we get a reasonable error
1186 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1187 def _get_from_bogus_uri(res):
1188 d1 = getPage(base + "uri/%s?filename=%s"
1189 % (self.mangle_uri(self.uri), "mydata567"))
1190 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1193 d.addCallback(_get_from_bogus_uri)
1194 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1196 # upload a file with PUT
1197 d.addCallback(self.log, "about to try PUT")
1198 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1199 "new.txt contents"))
1200 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1201 d.addCallback(self.failUnlessEqual, "new.txt contents")
1202 # and again with something large enough to use multiple segments,
1203 # and hopefully trigger pauseProducing too
1204 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1205 "big" * 500000)) # 1.5MB
1206 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1207 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1209 # can we replace files in place?
1210 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1212 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1213 d.addCallback(self.failUnlessEqual, "NEWER contents")
1215 # test unlinked POST
1216 d.addCallback(lambda res: self.POST("uri", t="upload",
1217 file=("new.txt", "data" * 10000)))
1218 # and again using the helper, which exercises different upload-status
1220 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1221 file=("foo.txt", "data2" * 10000)))
1223 # check that the status page exists
1224 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1225 def _got_status(res):
1226 # find an interesting upload and download to look at. LIT files
1227 # are not interesting.
1228 for dl in self.clients[0].list_recent_downloads():
1229 if dl.get_size() > 200:
1230 self._down_status = dl.get_counter()
1231 for ul in self.clients[0].list_recent_uploads():
1232 if ul.get_size() > 200:
1233 self._up_status = ul.get_counter()
1234 rs = self.clients[0].list_recent_retrieve()[0]
1235 self._retrieve_status = rs.get_counter()
1236 ps = self.clients[0].list_recent_publish()[0]
1237 self._publish_status = ps.get_counter()
1239 # and that there are some upload- and download- status pages
1240 return self.GET("status/up-%d" % self._up_status)
1241 d.addCallback(_got_status)
1243 return self.GET("status/down-%d" % self._down_status)
1244 d.addCallback(_got_up)
1246 return self.GET("status/publish-%d" % self._publish_status)
1247 d.addCallback(_got_down)
1248 def _got_publish(res):
1249 return self.GET("status/retrieve-%d" % self._retrieve_status)
1250 d.addCallback(_got_publish)
1252 # TODO: mangle the second segment of a file, to test errors that
1253 # occur after we've already sent some good data, which uses a
1254 # different error path.
1256 # TODO: download a URI with a form
1257 # TODO: create a directory by using a form
1258 # TODO: upload by using a form on the directory page
1259 # url = base + "somedir/subdir1/freeform_post!!upload"
1260 # TODO: delete a file by using a button on the directory page
1264 def _test_runner(self, res):
1265 # exercise some of the diagnostic tools in runner.py
1268 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1269 if "storage" not in dirpath:
1273 pieces = dirpath.split(os.sep)
1274 if pieces[-4] == "storage" and pieces[-3] == "shares":
1275 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1276 # are sharefiles here
1277 filename = os.path.join(dirpath, filenames[0])
1278 # peek at the magic to see if it is a chk share
1279 magic = open(filename, "rb").read(4)
1280 if magic == '\x00\x00\x00\x01':
1283 self.fail("unable to find any uri_extension files in %s"
1285 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1287 out,err = StringIO(), StringIO()
1288 rc = runner.runner(["dump-share",
1290 stdout=out, stderr=err)
1291 output = out.getvalue()
1292 self.failUnlessEqual(rc, 0)
1294 # we only upload a single file, so we can assert some things about
1295 # its size and shares.
1296 self.failUnless(("share filename: %s" % filename) in output)
1297 self.failUnless("size: %d\n" % len(self.data) in output)
1298 self.failUnless("num_segments: 1\n" in output)
1299 # segment_size is always a multiple of needed_shares
1300 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1301 self.failUnless("total_shares: 10\n" in output)
1302 # keys which are supposed to be present
1303 for key in ("size", "num_segments", "segment_size",
1304 "needed_shares", "total_shares",
1305 "codec_name", "codec_params", "tail_codec_params",
1306 "plaintext_hash", "plaintext_root_hash",
1307 "crypttext_hash", "crypttext_root_hash",
1308 "share_root_hash", "UEB_hash"):
1309 self.failUnless("%s: " % key in output, key)
1311 # now use its storage index to find the other shares using the
1312 # 'find-shares' tool
1313 sharedir, shnum = os.path.split(filename)
1314 storagedir, storage_index_s = os.path.split(sharedir)
1315 out,err = StringIO(), StringIO()
1316 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1317 cmd = ["find-shares", storage_index_s] + nodedirs
1318 rc = runner.runner(cmd, stdout=out, stderr=err)
1319 self.failUnlessEqual(rc, 0)
1321 sharefiles = [sfn.strip() for sfn in out.readlines()]
1322 self.failUnlessEqual(len(sharefiles), 10)
1324 # also exercise the 'catalog-shares' tool
1325 out,err = StringIO(), StringIO()
1326 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1327 cmd = ["catalog-shares"] + nodedirs
1328 rc = runner.runner(cmd, stdout=out, stderr=err)
1329 self.failUnlessEqual(rc, 0)
1331 descriptions = [sfn.strip() for sfn in out.readlines()]
1332 self.failUnlessEqual(len(descriptions), 30)
1334 for line in descriptions
1335 if line.startswith("CHK %s " % storage_index_s)]
1336 self.failUnlessEqual(len(matching), 10)
1338 def _test_control(self, res):
1339 # exercise the remote-control-the-client foolscap interfaces in
1340 # allmydata.control (mostly used for performance tests)
1341 c0 = self.clients[0]
1342 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1343 control_furl = open(control_furl_file, "r").read().strip()
1344 # it doesn't really matter which Tub we use to connect to the client,
1345 # so let's just use our IntroducerNode's
1346 d = self.introducer.tub.getReference(control_furl)
1347 d.addCallback(self._test_control2, control_furl_file)
1349 def _test_control2(self, rref, filename):
1350 d = rref.callRemote("upload_from_file_to_uri", filename)
1351 downfile = os.path.join(self.basedir, "control.downfile")
1352 d.addCallback(lambda uri:
1353 rref.callRemote("download_from_uri_to_file",
1356 self.failUnlessEqual(res, downfile)
1357 data = open(downfile, "r").read()
1358 expected_data = open(filename, "r").read()
1359 self.failUnlessEqual(data, expected_data)
1360 d.addCallback(_check)
1361 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1362 if sys.platform == "linux2":
1363 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1364 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1367 def _test_cli(self, res):
1368 # run various CLI commands (in a thread, since they use blocking
1371 private_uri = self._private_node.get_uri()
1372 some_uri = self._root_directory_uri
1373 client0_basedir = self.getdir("client0")
1376 "--node-directory", client0_basedir,
1377 "--dir-cap", private_uri,
1380 "--node-url", self.webish_url,
1381 "--dir-cap", some_uri,
1383 TESTDATA = "I will not write the same thing over and over.\n" * 100
1385 d = defer.succeed(None)
1388 argv = ["ls"] + nodeargs
1389 return self._run_cli(argv)
1390 d.addCallback(_ls_root)
1391 def _check_ls_root((out,err)):
1392 self.failUnless("personal" in out)
1393 self.failUnless("s2-ro" in out)
1394 self.failUnless("s2-rw" in out)
1395 self.failUnlessEqual(err, "")
1396 d.addCallback(_check_ls_root)
1398 def _ls_subdir(res):
1399 argv = ["ls"] + nodeargs + ["personal"]
1400 return self._run_cli(argv)
1401 d.addCallback(_ls_subdir)
1402 def _check_ls_subdir((out,err)):
1403 self.failUnless("sekrit data" in out)
1404 self.failUnlessEqual(err, "")
1405 d.addCallback(_check_ls_subdir)
1407 def _ls_public_subdir(res):
1408 argv = ["ls"] + public_nodeargs + ["subdir1"]
1409 return self._run_cli(argv)
1410 d.addCallback(_ls_public_subdir)
1411 def _check_ls_public_subdir((out,err)):
1412 self.failUnless("subdir2" in out)
1413 self.failUnless("mydata567" in out)
1414 self.failUnlessEqual(err, "")
1415 d.addCallback(_check_ls_public_subdir)
1418 argv = ["ls"] + public_nodeargs + ["subdir1/mydata567"]
1419 return self._run_cli(argv)
1420 d.addCallback(_ls_file)
1421 def _check_ls_file((out,err)):
1422 self.failUnlessEqual(out.strip(), "112 subdir1/mydata567")
1423 self.failUnlessEqual(err, "")
1424 d.addCallback(_check_ls_file)
1426 # tahoe_ls doesn't currently handle the error correctly: it tries to
1427 # JSON-parse a traceback.
1428 ## def _ls_missing(res):
1429 ## argv = ["ls"] + nodeargs + ["bogus"]
1430 ## return self._run_cli(argv)
1431 ## d.addCallback(_ls_missing)
1432 ## def _check_ls_missing((out,err)):
1435 ## self.failUnlessEqual(err, "")
1436 ## d.addCallback(_check_ls_missing)
1439 tdir = self.getdir("cli_put")
1440 fileutil.make_dirs(tdir)
1441 fn = os.path.join(tdir, "upload_me")
1445 argv = ["put"] + nodeargs + [fn, "test_put/upload.txt"]
1446 return self._run_cli(argv)
1448 def _check_put((out,err)):
1449 self.failUnless("200 OK" in out)
1450 self.failUnlessEqual(err, "")
1451 d = self._private_node.get_child_at_path(u"test_put/upload.txt")
1452 d.addCallback(lambda filenode: filenode.download_to_data())
1453 def _check_put2(res):
1454 self.failUnlessEqual(res, TESTDATA)
1455 d.addCallback(_check_put2)
1457 d.addCallback(_check_put)
1459 def _get_to_stdout(res):
1460 argv = ["get"] + nodeargs + ["test_put/upload.txt"]
1461 return self._run_cli(argv)
1462 d.addCallback(_get_to_stdout)
1463 def _check_get_to_stdout((out,err)):
1464 self.failUnlessEqual(out, TESTDATA)
1465 self.failUnlessEqual(err, "")
1466 d.addCallback(_check_get_to_stdout)
1468 get_to_file_target = self.basedir + "/get.downfile"
1469 def _get_to_file(res):
1470 argv = ["get"] + nodeargs + ["test_put/upload.txt",
1472 return self._run_cli(argv)
1473 d.addCallback(_get_to_file)
1474 def _check_get_to_file((out,err)):
1475 data = open(get_to_file_target, "rb").read()
1476 self.failUnlessEqual(data, TESTDATA)
1477 self.failUnlessEqual(out, "")
1478 self.failUnlessEqual(err, "test_put/upload.txt retrieved and written to system/SystemTest/test_vdrive/get.downfile\n")
1479 d.addCallback(_check_get_to_file)
1483 argv = ["mv"] + nodeargs + ["test_put/upload.txt",
1484 "test_put/moved.txt"]
1485 return self._run_cli(argv)
1487 def _check_mv((out,err)):
1488 self.failUnless("OK" in out)
1489 self.failUnlessEqual(err, "")
1490 d = self.shouldFail2(KeyError, "test_cli._check_rm", "'upload.txt'", self._private_node.get_child_at_path, u"test_put/upload.txt")
1492 d.addCallback(lambda res:
1493 self._private_node.get_child_at_path(u"test_put/moved.txt"))
1494 d.addCallback(lambda filenode: filenode.download_to_data())
1495 def _check_mv2(res):
1496 self.failUnlessEqual(res, TESTDATA)
1497 d.addCallback(_check_mv2)
1499 d.addCallback(_check_mv)
1502 argv = ["rm"] + nodeargs + ["test_put/moved.txt"]
1503 return self._run_cli(argv)
1505 def _check_rm((out,err)):
1506 self.failUnless("200 OK" in out)
1507 self.failUnlessEqual(err, "")
1508 d = self.shouldFail2(KeyError, "test_cli._check_rm", "'moved.txt'", self._private_node.get_child_at_path, u"test_put/moved.txt")
1510 d.addCallback(_check_rm)
1513 def _run_cli(self, argv):
1514 stdout, stderr = StringIO(), StringIO()
1515 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1516 stdout=stdout, stderr=stderr)
1518 return stdout.getvalue(), stderr.getvalue()
1519 d.addCallback(_done)
1522 def _test_checker(self, res):
1523 d = self._private_node.build_manifest()
1524 d.addCallback(self._test_checker_2)
1527 def _test_checker_2(self, manifest):
1528 checker1 = self.clients[1].getServiceNamed("checker")
1529 self.failUnlessEqual(checker1.checker_results_for(None), [])
1530 self.failUnlessEqual(checker1.checker_results_for(list(manifest)[0]),
1533 starting_time = time.time()
1535 dl.append(checker1.check(si))
1536 d = deferredutil.DeferredListShouldSucceed(dl)
1538 def _check_checker_results(res):
1541 self.failUnless(i is True)
1543 (needed, total, found, sharemap) = i
1544 self.failUnlessEqual(needed, 3)
1545 self.failUnlessEqual(total, 10)
1546 self.failUnlessEqual(found, total)
1547 self.failUnlessEqual(len(sharemap.keys()), 10)
1549 for shpeers in sharemap.values():
1550 peers.update(shpeers)
1551 self.failUnlessEqual(len(peers), self.numclients)
1552 d.addCallback(_check_checker_results)
1554 def _check_stored_results(res):
1555 finish_time = time.time()
1558 results = checker1.checker_results_for(si)
1560 # TODO: implement checker for mutable files and implement tests of that checker
1562 self.failUnlessEqual(len(results), 1)
1563 when, those_results = results[0]
1564 self.failUnless(isinstance(when, (int, float)))
1565 self.failUnless(starting_time <= when <= finish_time)
1566 all_results.append(those_results)
1567 _check_checker_results(all_results)
1568 d.addCallback(_check_stored_results)
1570 d.addCallback(self._test_checker_3)
1573 def _test_checker_3(self, res):
1574 # check one file, through FileNode.check()
1575 d = self._private_node.get_child_at_path(u"personal/sekrit data")
1576 d.addCallback(lambda n: n.check())
1577 def _checked(results):
1578 # 'sekrit data' is small, and fits in a LiteralFileNode, so
1579 # checking it is trivial and always returns True
1580 self.failUnlessEqual(results, True)
1581 d.addCallback(_checked)
1583 c0 = self.clients[1]
1584 n = c0.create_node_from_uri(self._root_directory_uri)
1585 d.addCallback(lambda res: n.get_child_at_path(u"subdir1/mydata567"))
1586 d.addCallback(lambda n: n.check())
1587 def _checked2(results):
1588 # mydata567 is large and lives in a CHK
1589 (needed, total, found, sharemap) = results
1590 self.failUnlessEqual(needed, 3)
1591 self.failUnlessEqual(total, 10)
1592 self.failUnlessEqual(found, 10)
1593 self.failUnlessEqual(len(sharemap), 10)
1594 for shnum in range(10):
1595 self.failUnlessEqual(len(sharemap[shnum]), 1)
1596 d.addCallback(_checked2)
1600 def _test_verifier(self, res):
1601 checker1 = self.clients[1].getServiceNamed("checker")
1602 d = self._private_node.build_manifest()
1603 def _check_all(manifest):
1606 dl.append(checker1.verify(si))
1607 return deferredutil.DeferredListShouldSucceed(dl)
1608 d.addCallback(_check_all)
1611 self.failUnless(i is True)
1612 d.addCallback(_done)
1613 d.addCallback(lambda res: checker1.verify(None))
1614 d.addCallback(self.failUnlessEqual, True)