2 from base64 import b32encode
3 import os, sys, time, re, simplejson
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.application import service
11 from allmydata import client, uri, download, upload, storage, offloaded
12 from allmydata.introducer import IntroducerNode
13 from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
14 from allmydata.util import log
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
17 from allmydata.mutable.common import NotMutableError
18 from allmydata.mutable import layout as mutable_layout
19 from allmydata.stats import PickleStatsGatherer
20 from allmydata.key_generator import KeyGeneratorService
21 from foolscap.eventual import flushEventualQueue, fireEventually
22 from foolscap import DeadReferenceError, Tub
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
27 def flush_but_dont_ignore(res):
28 d = flushEventualQueue()
35 This is some data to publish to the virtual drive, which needs to be large
36 enough to not fit inside a LIT uri.
39 class CountingDataUploadable(upload.Data):
41 interrupt_after = None
42 interrupt_after_d = None
44 def read(self, length):
45 self.bytes_read += length
46 if self.interrupt_after is not None:
47 if self.bytes_read > self.interrupt_after:
48 self.interrupt_after = None
49 self.interrupt_after_d.callback(self)
50 return upload.Data.read(self, length)
53 class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
56 self.sparent = service.MultiService()
57 self.sparent.startService()
59 log.msg("shutting down SystemTest services")
60 d = self.sparent.stopService()
61 d.addBoth(flush_but_dont_ignore)
64 def getdir(self, subdir):
65 return os.path.join(self.basedir, subdir)
67 def add_service(self, s):
68 s.setServiceParent(self.sparent)
71 def set_up_nodes(self, NUMCLIENTS=5, createprivdir=False):
72 self.numclients = NUMCLIENTS
73 self.createprivdir = createprivdir
74 iv_dir = self.getdir("introducer")
75 if not os.path.isdir(iv_dir):
76 fileutil.make_dirs(iv_dir)
77 f = open(os.path.join(iv_dir, "webport"), "w")
78 f.write("tcp:0:interface=127.0.0.1\n")
80 iv = IntroducerNode(basedir=iv_dir)
81 self.introducer = self.add_service(iv)
82 d = self.introducer.when_tub_ready()
83 d.addCallback(self._get_introducer_web)
84 d.addCallback(self._set_up_stats_gatherer)
85 d.addCallback(self._set_up_key_generator)
86 d.addCallback(self._set_up_nodes_2)
87 d.addCallback(self._grab_stats)
90 def _get_introducer_web(self, res):
91 f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
92 self.introweb_url = f.read().strip()
95 def _set_up_stats_gatherer(self, res):
96 statsdir = self.getdir("stats_gatherer")
97 fileutil.make_dirs(statsdir)
100 l = t.listenOn("tcp:0")
102 t.setLocation("localhost:%d" % p)
104 self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
105 self.add_service(self.stats_gatherer)
106 self.stats_gatherer_furl = self.stats_gatherer.get_furl()
108 def _set_up_key_generator(self, res):
109 kgsdir = self.getdir("key_generator")
110 fileutil.make_dirs(kgsdir)
112 self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False)
113 self.key_generator_svc.key_generator.pool_size = 4
114 self.key_generator_svc.key_generator.pool_refresh_delay = 60
115 self.add_service(self.key_generator_svc)
118 def check_for_furl():
119 return os.path.exists(os.path.join(kgsdir, 'key_generator.furl'))
120 d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
122 kgf = os.path.join(kgsdir, 'key_generator.furl')
123 self.key_generator_furl = file(kgf, 'rb').read().strip()
124 d.addCallback(get_furl)
127 def _set_up_nodes_2(self, res):
129 self.introducer_furl = q.introducer_url
132 for i in range(self.numclients):
133 basedir = self.getdir("client%d" % i)
134 basedirs.append(basedir)
135 fileutil.make_dirs(basedir)
137 # client[0] runs a webserver and a helper, no key_generator
138 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
139 open(os.path.join(basedir, "run_helper"), "w").write("yes\n")
141 # client[3] runs a webserver and uses a helper, uses key_generator
142 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
143 kgf = "%s\n" % (self.key_generator_furl,)
144 open(os.path.join(basedir, "key_generator.furl"), "w").write(kgf)
145 if self.createprivdir:
146 fileutil.make_dirs(os.path.join(basedir, "private"))
147 open(os.path.join(basedir, "private", "root_dir.cap"), "w")
148 open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
149 open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
151 # start client[0], wait for it's tub to be ready (at which point it
152 # will have registered the helper furl).
153 c = self.add_service(client.Client(basedir=basedirs[0]))
154 self.clients.append(c)
155 d = c.when_tub_ready()
157 f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
158 helper_furl = f.read()
160 self.helper_furl = helper_furl
161 f = open(os.path.join(basedirs[3],"helper.furl"), "w")
165 # this starts the rest of the clients
166 for i in range(1, self.numclients):
167 c = self.add_service(client.Client(basedir=basedirs[i]))
168 self.clients.append(c)
170 return self.wait_for_connections()
171 d.addCallback(_ready)
174 # now find out where the web port was
175 l = self.clients[0].getServiceNamed("webish").listener
176 port = l._port.getHost().port
177 self.webish_url = "http://localhost:%d/" % port
178 # and the helper-using webport
179 l = self.clients[3].getServiceNamed("webish").listener
180 port = l._port.getHost().port
181 self.helper_webish_url = "http://localhost:%d/" % port
182 d.addCallback(_connected)
185 def _grab_stats(self, res):
186 d = self.stats_gatherer.poll()
189 def bounce_client(self, num):
190 c = self.clients[num]
191 d = c.disownServiceParent()
192 # I think windows requires a moment to let the connection really stop
193 # and the port number made available for re-use. TODO: examine the
194 # behavior, see if this is really the problem, see if we can do
195 # better than blindly waiting for a second.
196 d.addCallback(self.stall, 1.0)
198 new_c = client.Client(basedir=self.getdir("client%d" % num))
199 self.clients[num] = new_c
200 self.add_service(new_c)
201 return new_c.when_tub_ready()
202 d.addCallback(_stopped)
203 d.addCallback(lambda res: self.wait_for_connections())
204 def _maybe_get_webport(res):
206 # now find out where the web port was
207 l = self.clients[0].getServiceNamed("webish").listener
208 port = l._port.getHost().port
209 self.webish_url = "http://localhost:%d/" % port
210 d.addCallback(_maybe_get_webport)
213 def add_extra_node(self, client_num, helper_furl=None,
214 add_to_sparent=False):
215 # usually this node is *not* parented to our self.sparent, so we can
216 # shut it down separately from the rest, to exercise the
217 # connection-lost code
218 basedir = self.getdir("client%d" % client_num)
219 if not os.path.isdir(basedir):
220 fileutil.make_dirs(basedir)
221 open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
223 f = open(os.path.join(basedir, "helper.furl") ,"w")
224 f.write(helper_furl+"\n")
227 c = client.Client(basedir=basedir)
228 self.clients.append(c)
231 c.setServiceParent(self.sparent)
234 d = self.wait_for_connections()
235 d.addCallback(lambda res: c)
238 def _check_connections(self):
239 for c in self.clients:
240 ic = c.introducer_client
241 if not ic.connected_to_introducer():
243 if len(ic.get_all_peerids()) != self.numclients:
247 def wait_for_connections(self, ignored=None):
248 # TODO: replace this with something that takes a list of peerids and
249 # fires when they've all been heard from, instead of using a count
251 return self.poll(self._check_connections, timeout=200)
253 def test_connections(self):
254 self.basedir = "system/SystemTest/test_connections"
255 d = self.set_up_nodes()
256 self.extra_node = None
257 d.addCallback(lambda res: self.add_extra_node(self.numclients))
258 def _check(extra_node):
259 self.extra_node = extra_node
260 for c in self.clients:
261 all_peerids = list(c.get_all_peerids())
262 self.failUnlessEqual(len(all_peerids), self.numclients+1)
263 permuted_peers = list(c.get_permuted_peers("storage", "a"))
264 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
266 d.addCallback(_check)
267 def _shutdown_extra_node(res):
269 return self.extra_node.stopService()
271 d.addBoth(_shutdown_extra_node)
273 test_connections.timeout = 300
274 # test_connections is subsumed by test_upload_and_download, and takes
275 # quite a while to run on a slow machine (because of all the TLS
276 # connections that must be established). If we ever rework the introducer
277 # code to such an extent that we're not sure if it works anymore, we can
278 # reinstate this test until it does.
281 def test_upload_and_download_random_key(self):
282 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
283 return self._test_upload_and_download(convergence=None)
284 test_upload_and_download_random_key.timeout = 4800
286 def test_upload_and_download_convergent(self):
287 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
288 return self._test_upload_and_download(convergence="some convergence string")
289 test_upload_and_download_convergent.timeout = 4800
291 def _test_upload_and_download(self, convergence):
292 # we use 4000 bytes of data, which will result in about 400k written
293 # to disk among all our simulated nodes
294 DATA = "Some data to upload\n" * 200
295 d = self.set_up_nodes()
296 def _check_connections(res):
297 for c in self.clients:
298 all_peerids = list(c.get_all_peerids())
299 self.failUnlessEqual(len(all_peerids), self.numclients)
300 permuted_peers = list(c.get_permuted_peers("storage", "a"))
301 self.failUnlessEqual(len(permuted_peers), self.numclients)
302 d.addCallback(_check_connections)
306 u = self.clients[0].getServiceNamed("uploader")
308 # we crank the max segsize down to 1024b for the duration of this
309 # test, so we can exercise multiple segments. It is important
310 # that this is not a multiple of the segment size, so that the
311 # tail segment is not the same length as the others. This actualy
312 # gets rounded up to 1025 to be a multiple of the number of
313 # required shares (since we use 25 out of 100 FEC).
314 up = upload.Data(DATA, convergence=convergence)
315 up.max_segment_size = 1024
318 d.addCallback(_do_upload)
319 def _upload_done(results):
321 log.msg("upload finished: uri is %s" % (uri,))
323 dl = self.clients[1].getServiceNamed("downloader")
325 d.addCallback(_upload_done)
327 def _upload_again(res):
328 # Upload again. If using convergent encryption then this ought to be
329 # short-circuited, however with the way we currently generate URIs
330 # (i.e. because they include the roothash), we have to do all of the
331 # encoding work, and only get to save on the upload part.
332 log.msg("UPLOADING AGAIN")
333 up = upload.Data(DATA, convergence=convergence)
334 up.max_segment_size = 1024
335 d1 = self.uploader.upload(up)
336 d.addCallback(_upload_again)
338 def _download_to_data(res):
339 log.msg("DOWNLOADING")
340 return self.downloader.download_to_data(self.uri)
341 d.addCallback(_download_to_data)
342 def _download_to_data_done(data):
343 log.msg("download finished")
344 self.failUnlessEqual(data, DATA)
345 d.addCallback(_download_to_data_done)
347 target_filename = os.path.join(self.basedir, "download.target")
348 def _download_to_filename(res):
349 return self.downloader.download_to_filename(self.uri,
351 d.addCallback(_download_to_filename)
352 def _download_to_filename_done(res):
353 newdata = open(target_filename, "rb").read()
354 self.failUnlessEqual(newdata, DATA)
355 d.addCallback(_download_to_filename_done)
357 target_filename2 = os.path.join(self.basedir, "download.target2")
358 def _download_to_filehandle(res):
359 fh = open(target_filename2, "wb")
360 return self.downloader.download_to_filehandle(self.uri, fh)
361 d.addCallback(_download_to_filehandle)
362 def _download_to_filehandle_done(fh):
364 newdata = open(target_filename2, "rb").read()
365 self.failUnlessEqual(newdata, DATA)
366 d.addCallback(_download_to_filehandle_done)
368 def _download_nonexistent_uri(res):
369 baduri = self.mangle_uri(self.uri)
370 log.msg("about to download non-existent URI", level=log.UNUSUAL,
371 facility="tahoe.tests")
372 d1 = self.downloader.download_to_data(baduri)
373 def _baduri_should_fail(res):
374 log.msg("finished downloading non-existend URI",
375 level=log.UNUSUAL, facility="tahoe.tests")
376 self.failUnless(isinstance(res, Failure))
377 self.failUnless(res.check(download.NotEnoughSharesError),
378 "expected NotEnoughSharesError, got %s" % res)
379 # TODO: files that have zero peers should get a special kind
380 # of NotEnoughSharesError, which can be used to suggest that
381 # the URI might be wrong or that they've never uploaded the
382 # file in the first place.
383 d1.addBoth(_baduri_should_fail)
385 d.addCallback(_download_nonexistent_uri)
387 # add a new node, which doesn't accept shares, and only uses the
389 d.addCallback(lambda res: self.add_extra_node(self.numclients,
391 add_to_sparent=True))
392 def _added(extra_node):
393 self.extra_node = extra_node
394 extra_node.getServiceNamed("storage").sizelimit = 0
395 d.addCallback(_added)
397 HELPER_DATA = "Data that needs help to upload" * 1000
398 def _upload_with_helper(res):
399 u = upload.Data(HELPER_DATA, convergence=convergence)
400 d = self.extra_node.upload(u)
401 def _uploaded(results):
403 return self.downloader.download_to_data(uri)
404 d.addCallback(_uploaded)
406 self.failUnlessEqual(newdata, HELPER_DATA)
407 d.addCallback(_check)
409 d.addCallback(_upload_with_helper)
411 def _upload_duplicate_with_helper(res):
412 u = upload.Data(HELPER_DATA, convergence=convergence)
413 u.debug_stash_RemoteEncryptedUploadable = True
414 d = self.extra_node.upload(u)
415 def _uploaded(results):
417 return self.downloader.download_to_data(uri)
418 d.addCallback(_uploaded)
420 self.failUnlessEqual(newdata, HELPER_DATA)
421 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
422 "uploadable started uploading, should have been avoided")
423 d.addCallback(_check)
425 if convergence is not None:
426 d.addCallback(_upload_duplicate_with_helper)
428 def _upload_resumable(res):
429 DATA = "Data that needs help to upload and gets interrupted" * 1000
430 u1 = CountingDataUploadable(DATA, convergence=convergence)
431 u2 = CountingDataUploadable(DATA, convergence=convergence)
433 # we interrupt the connection after about 5kB by shutting down
434 # the helper, then restartingit.
435 u1.interrupt_after = 5000
436 u1.interrupt_after_d = defer.Deferred()
437 u1.interrupt_after_d.addCallback(lambda res:
438 self.bounce_client(0))
440 # sneak into the helper and reduce its chunk size, so that our
441 # debug_interrupt will sever the connection on about the fifth
442 # chunk fetched. This makes sure that we've started to write the
443 # new shares before we abandon them, which exercises the
444 # abort/delete-partial-share code. TODO: find a cleaner way to do
445 # this. I know that this will affect later uses of the helper in
446 # this same test run, but I'm not currently worried about it.
447 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
449 d = self.extra_node.upload(u1)
451 def _should_not_finish(res):
452 self.fail("interrupted upload should have failed, not finished"
453 " with result %s" % (res,))
455 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
457 # make sure we actually interrupted it before finishing the
459 self.failUnless(u1.bytes_read < len(DATA),
460 "read %d out of %d total" % (u1.bytes_read,
463 log.msg("waiting for reconnect", level=log.NOISY,
464 facility="tahoe.test.test_system")
465 # now, we need to give the nodes a chance to notice that this
466 # connection has gone away. When this happens, the storage
467 # servers will be told to abort their uploads, removing the
468 # partial shares. Unfortunately this involves TCP messages
469 # going through the loopback interface, and we can't easily
470 # predict how long that will take. If it were all local, we
471 # could use fireEventually() to stall. Since we don't have
472 # the right introduction hooks, the best we can do is use a
473 # fixed delay. TODO: this is fragile.
474 u1.interrupt_after_d.addCallback(self.stall, 2.0)
475 return u1.interrupt_after_d
476 d.addCallbacks(_should_not_finish, _interrupted)
478 def _disconnected(res):
479 # check to make sure the storage servers aren't still hanging
480 # on to the partial share: their incoming/ directories should
482 log.msg("disconnected", level=log.NOISY,
483 facility="tahoe.test.test_system")
484 for i in range(self.numclients):
485 incdir = os.path.join(self.getdir("client%d" % i),
486 "storage", "shares", "incoming")
487 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
488 d.addCallback(_disconnected)
490 # then we need to give the reconnector a chance to
491 # reestablish the connection to the helper.
492 d.addCallback(lambda res:
493 log.msg("wait_for_connections", level=log.NOISY,
494 facility="tahoe.test.test_system"))
495 d.addCallback(lambda res: self.wait_for_connections())
498 d.addCallback(lambda res:
499 log.msg("uploading again", level=log.NOISY,
500 facility="tahoe.test.test_system"))
501 d.addCallback(lambda res: self.extra_node.upload(u2))
503 def _uploaded(results):
505 log.msg("Second upload complete", level=log.NOISY,
506 facility="tahoe.test.test_system")
508 # this is really bytes received rather than sent, but it's
509 # convenient and basically measures the same thing
510 bytes_sent = results.ciphertext_fetched
512 # We currently don't support resumption of upload if the data is
513 # encrypted with a random key. (Because that would require us
514 # to store the key locally and re-use it on the next upload of
515 # this file, which isn't a bad thing to do, but we currently
517 if convergence is not None:
518 # Make sure we did not have to read the whole file the
519 # second time around .
520 self.failUnless(bytes_sent < len(DATA),
521 "resumption didn't save us any work:"
522 " read %d bytes out of %d total" %
523 (bytes_sent, len(DATA)))
525 # Make sure we did have to read the whole file the second
526 # time around -- because the one that we partially uploaded
527 # earlier was encrypted with a different random key.
528 self.failIf(bytes_sent < len(DATA),
529 "resumption saved us some work even though we were using random keys:"
530 " read %d bytes out of %d total" %
531 (bytes_sent, len(DATA)))
532 return self.downloader.download_to_data(uri)
533 d.addCallback(_uploaded)
536 self.failUnlessEqual(newdata, DATA)
537 # If using convergent encryption, then also check that the
538 # helper has removed the temp file from its directories.
539 if convergence is not None:
540 basedir = os.path.join(self.getdir("client0"), "helper")
541 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
542 self.failUnlessEqual(files, [])
543 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
544 self.failUnlessEqual(files, [])
545 d.addCallback(_check)
547 d.addCallback(_upload_resumable)
551 def _find_shares(self, basedir):
553 for (dirpath, dirnames, filenames) in os.walk(basedir):
554 if "storage" not in dirpath:
558 pieces = dirpath.split(os.sep)
559 if pieces[-4] == "storage" and pieces[-3] == "shares":
560 # we're sitting in .../storage/shares/$START/$SINDEX , and there
561 # are sharefiles here
562 assert pieces[-5].startswith("client")
563 client_num = int(pieces[-5][-1])
564 storage_index_s = pieces[-1]
565 storage_index = storage.si_a2b(storage_index_s)
566 for sharename in filenames:
567 shnum = int(sharename)
568 filename = os.path.join(dirpath, sharename)
569 data = (client_num, storage_index, filename, shnum)
572 self.fail("unable to find any share files in %s" % basedir)
575 def _corrupt_mutable_share(self, filename, which):
576 msf = storage.MutableShareFile(filename)
577 datav = msf.readv([ (0, 1000000) ])
578 final_share = datav[0]
579 assert len(final_share) < 1000000 # ought to be truncated
580 pieces = mutable_layout.unpack_share(final_share)
581 (seqnum, root_hash, IV, k, N, segsize, datalen,
582 verification_key, signature, share_hash_chain, block_hash_tree,
583 share_data, enc_privkey) = pieces
585 if which == "seqnum":
588 root_hash = self.flip_bit(root_hash)
590 IV = self.flip_bit(IV)
591 elif which == "segsize":
592 segsize = segsize + 15
593 elif which == "pubkey":
594 verification_key = self.flip_bit(verification_key)
595 elif which == "signature":
596 signature = self.flip_bit(signature)
597 elif which == "share_hash_chain":
598 nodenum = share_hash_chain.keys()[0]
599 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
600 elif which == "block_hash_tree":
601 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
602 elif which == "share_data":
603 share_data = self.flip_bit(share_data)
604 elif which == "encprivkey":
605 enc_privkey = self.flip_bit(enc_privkey)
607 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
609 final_share = mutable_layout.pack_share(prefix,
616 msf.writev( [(0, final_share)], None)
618 def test_mutable(self):
619 self.basedir = "system/SystemTest/test_mutable"
620 DATA = "initial contents go here." # 25 bytes % 3 != 0
621 NEWDATA = "new contents yay"
622 NEWERDATA = "this is getting old"
624 d = self.set_up_nodes()
626 def _create_mutable(res):
628 log.msg("starting create_mutable_file")
629 d1 = c.create_mutable_file(DATA)
631 log.msg("DONE: %s" % (res,))
632 self._mutable_node_1 = res
634 d1.addCallback(_done)
636 d.addCallback(_create_mutable)
638 def _test_debug(res):
639 # find a share. It is important to run this while there is only
640 # one slot in the grid.
641 shares = self._find_shares(self.basedir)
642 (client_num, storage_index, filename, shnum) = shares[0]
643 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
645 log.msg(" for clients[%d]" % client_num)
647 out,err = StringIO(), StringIO()
648 rc = runner.runner(["dump-share",
650 stdout=out, stderr=err)
651 output = out.getvalue()
652 self.failUnlessEqual(rc, 0)
654 self.failUnless("Mutable slot found:\n" in output)
655 self.failUnless("share_type: SDMF\n" in output)
656 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
657 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
658 self.failUnless(" num_extra_leases: 0\n" in output)
659 # the pubkey size can vary by a byte, so the container might
660 # be a bit larger on some runs.
661 m = re.search(r'^ container_size: (\d+)$', output, re.M)
663 container_size = int(m.group(1))
664 self.failUnless(2037 <= container_size <= 2049, container_size)
665 m = re.search(r'^ data_length: (\d+)$', output, re.M)
667 data_length = int(m.group(1))
668 self.failUnless(2037 <= data_length <= 2049, data_length)
669 self.failUnless(" secrets are for nodeid: %s\n" % peerid
671 self.failUnless(" SDMF contents:\n" in output)
672 self.failUnless(" seqnum: 1\n" in output)
673 self.failUnless(" required_shares: 3\n" in output)
674 self.failUnless(" total_shares: 10\n" in output)
675 self.failUnless(" segsize: 27\n" in output, (output, filename))
676 self.failUnless(" datalen: 25\n" in output)
677 # the exact share_hash_chain nodes depends upon the sharenum,
678 # and is more of a hassle to compute than I want to deal with
680 self.failUnless(" share_hash_chain: " in output)
681 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
682 except unittest.FailTest:
684 print "dump-share output was:"
687 d.addCallback(_test_debug)
691 # first, let's see if we can use the existing node to retrieve the
692 # contents. This allows it to use the cached pubkey and maybe the
693 # latest-known sharemap.
695 d.addCallback(lambda res: self._mutable_node_1.download_to_data())
696 def _check_download_1(res):
697 self.failUnlessEqual(res, DATA)
698 # now we see if we can retrieve the data from a new node,
699 # constructed using the URI of the original one. We do this test
700 # on the same client that uploaded the data.
701 uri = self._mutable_node_1.get_uri()
702 log.msg("starting retrieve1")
703 newnode = self.clients[0].create_node_from_uri(uri)
704 return newnode.download_to_data()
705 d.addCallback(_check_download_1)
707 def _check_download_2(res):
708 self.failUnlessEqual(res, DATA)
709 # same thing, but with a different client
710 uri = self._mutable_node_1.get_uri()
711 newnode = self.clients[1].create_node_from_uri(uri)
712 log.msg("starting retrieve2")
713 d1 = newnode.download_to_data()
714 d1.addCallback(lambda res: (res, newnode))
716 d.addCallback(_check_download_2)
718 def _check_download_3((res, newnode)):
719 self.failUnlessEqual(res, DATA)
721 log.msg("starting replace1")
722 d1 = newnode.update(NEWDATA)
723 d1.addCallback(lambda res: newnode.download_to_data())
725 d.addCallback(_check_download_3)
727 def _check_download_4(res):
728 self.failUnlessEqual(res, NEWDATA)
729 # now create an even newer node and replace the data on it. This
730 # new node has never been used for download before.
731 uri = self._mutable_node_1.get_uri()
732 newnode1 = self.clients[2].create_node_from_uri(uri)
733 newnode2 = self.clients[3].create_node_from_uri(uri)
734 self._newnode3 = self.clients[3].create_node_from_uri(uri)
735 log.msg("starting replace2")
736 d1 = newnode1.overwrite(NEWERDATA)
737 d1.addCallback(lambda res: newnode2.download_to_data())
739 d.addCallback(_check_download_4)
741 def _check_download_5(res):
742 log.msg("finished replace2")
743 self.failUnlessEqual(res, NEWERDATA)
744 d.addCallback(_check_download_5)
746 def _corrupt_shares(res):
747 # run around and flip bits in all but k of the shares, to test
749 shares = self._find_shares(self.basedir)
750 ## sort by share number
751 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
752 where = dict([ (shnum, filename)
753 for (client_num, storage_index, filename, shnum)
755 assert len(where) == 10 # this test is designed for 3-of-10
756 for shnum, filename in where.items():
757 # shares 7,8,9 are left alone. read will check
758 # (share_hash_chain, block_hash_tree, share_data). New
759 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
760 # segsize, signature).
762 # read: this will trigger "pubkey doesn't match
764 self._corrupt_mutable_share(filename, "pubkey")
765 self._corrupt_mutable_share(filename, "encprivkey")
767 # triggers "signature is invalid"
768 self._corrupt_mutable_share(filename, "seqnum")
770 # triggers "signature is invalid"
771 self._corrupt_mutable_share(filename, "R")
773 # triggers "signature is invalid"
774 self._corrupt_mutable_share(filename, "segsize")
776 self._corrupt_mutable_share(filename, "share_hash_chain")
778 self._corrupt_mutable_share(filename, "block_hash_tree")
780 self._corrupt_mutable_share(filename, "share_data")
781 # other things to correct: IV, signature
782 # 7,8,9 are left alone
784 # note that initial_query_count=5 means that we'll hit the
785 # first 5 servers in effectively random order (based upon
786 # response time), so we won't necessarily ever get a "pubkey
787 # doesn't match fingerprint" error (if we hit shnum>=1 before
788 # shnum=0, we pull the pubkey from there). To get repeatable
789 # specific failures, we need to set initial_query_count=1,
790 # but of course that will change the sequencing behavior of
791 # the retrieval process. TODO: find a reasonable way to make
792 # this a parameter, probably when we expand this test to test
793 # for one failure mode at a time.
795 # when we retrieve this, we should get three signature
796 # failures (where we've mangled seqnum, R, and segsize). The
798 d.addCallback(_corrupt_shares)
800 d.addCallback(lambda res: self._newnode3.download_to_data())
801 d.addCallback(_check_download_5)
803 def _check_empty_file(res):
804 # make sure we can create empty files, this usually screws up the
806 d1 = self.clients[2].create_mutable_file("")
807 d1.addCallback(lambda newnode: newnode.download_to_data())
808 d1.addCallback(lambda res: self.failUnlessEqual("", res))
810 d.addCallback(_check_empty_file)
812 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
813 def _created_dirnode(dnode):
814 log.msg("_created_dirnode(%s)" % (dnode,))
816 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
817 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
818 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
819 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
820 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
821 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
822 d1.addCallback(lambda res: dnode.build_manifest())
823 d1.addCallback(lambda manifest:
824 self.failUnlessEqual(len(manifest), 1))
826 d.addCallback(_created_dirnode)
828 def wait_for_c3_kg_conn():
829 return self.clients[3]._key_generator is not None
830 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
832 def check_kg_poolsize(junk, size_delta):
833 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
834 self.key_generator_svc.key_generator.pool_size + size_delta)
836 d.addCallback(check_kg_poolsize, 0)
837 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
838 d.addCallback(check_kg_poolsize, -1)
839 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
840 d.addCallback(check_kg_poolsize, -2)
841 # use_helper induces use of clients[3], which is the using-key_gen client
842 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
843 d.addCallback(check_kg_poolsize, -3)
846 # The default 120 second timeout went off when running it under valgrind
847 # on my old Windows laptop, so I'm bumping up the timeout.
848 test_mutable.timeout = 240
850 def flip_bit(self, good):
851 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
853 def mangle_uri(self, gooduri):
854 # change the key, which changes the storage index, which means we'll
855 # be asking about the wrong file, so nobody will have any shares
856 u = IFileURI(gooduri)
857 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
858 uri_extension_hash=u.uri_extension_hash,
859 needed_shares=u.needed_shares,
860 total_shares=u.total_shares,
862 return u2.to_string()
864 # TODO: add a test which mangles the uri_extension_hash instead, and
865 # should fail due to not being able to get a valid uri_extension block.
866 # Also a test which sneakily mangles the uri_extension block to change
867 # some of the validation data, so it will fail in the post-download phase
868 # when the file's crypttext integrity check fails. Do the same thing for
869 # the key, which should cause the download to fail the post-download
870 # plaintext_hash check.
872 def test_vdrive(self):
873 self.basedir = "system/SystemTest/test_vdrive"
874 self.data = LARGE_DATA
875 d = self.set_up_nodes(createprivdir=True)
876 d.addCallback(self._test_introweb)
877 d.addCallback(self.log, "starting publish")
878 d.addCallback(self._do_publish1)
879 d.addCallback(self._test_runner)
880 d.addCallback(self._do_publish2)
881 # at this point, we have the following filesystem (where "R" denotes
882 # self._root_directory_uri):
885 # R/subdir1/mydata567
887 # R/subdir1/subdir2/mydata992
889 d.addCallback(lambda res: self.bounce_client(0))
890 d.addCallback(self.log, "bounced client0")
892 d.addCallback(self._check_publish1)
893 d.addCallback(self.log, "did _check_publish1")
894 d.addCallback(self._check_publish2)
895 d.addCallback(self.log, "did _check_publish2")
896 d.addCallback(self._do_publish_private)
897 d.addCallback(self.log, "did _do_publish_private")
898 # now we also have (where "P" denotes a new dir):
899 # P/personal/sekrit data
900 # P/s2-rw -> /subdir1/subdir2/
901 # P/s2-ro -> /subdir1/subdir2/ (read-only)
902 d.addCallback(self._check_publish_private)
903 d.addCallback(self.log, "did _check_publish_private")
904 d.addCallback(self._test_web)
905 d.addCallback(self._test_control)
906 d.addCallback(self._test_cli)
907 # P now has four top-level children:
908 # P/personal/sekrit data
911 # P/test_put/ (empty)
912 d.addCallback(self._test_checker)
913 d.addCallback(self._test_verifier)
914 d.addCallback(self._grab_stats)
916 test_vdrive.timeout = 1100
918 def _test_introweb(self, res):
919 d = getPage(self.introweb_url, method="GET", followRedirect=True)
922 self.failUnless("allmydata: %s" % str(allmydata.__version__)
924 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
925 self.failUnless("Subscription Summary: storage: 5" in res)
926 except unittest.FailTest:
928 print "GET %s output was:" % self.introweb_url
931 d.addCallback(_check)
932 d.addCallback(lambda res:
933 getPage(self.introweb_url + "?t=json",
934 method="GET", followRedirect=True))
935 def _check_json(res):
936 data = simplejson.loads(res)
938 self.failUnlessEqual(data["subscription_summary"],
940 self.failUnlessEqual(data["announcement_summary"],
941 {"storage": 5, "stub_client": 5})
942 except unittest.FailTest:
944 print "GET %s?t=json output was:" % self.introweb_url
947 d.addCallback(_check_json)
950 def _do_publish1(self, res):
951 ut = upload.Data(self.data, convergence=None)
953 d = c0.create_empty_dirnode()
954 def _made_root(new_dirnode):
955 self._root_directory_uri = new_dirnode.get_uri()
956 return c0.create_node_from_uri(self._root_directory_uri)
957 d.addCallback(_made_root)
958 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
959 def _made_subdir1(subdir1_node):
960 self._subdir1_node = subdir1_node
961 d1 = subdir1_node.add_file(u"mydata567", ut)
962 d1.addCallback(self.log, "publish finished")
963 def _stash_uri(filenode):
964 self.uri = filenode.get_uri()
965 d1.addCallback(_stash_uri)
967 d.addCallback(_made_subdir1)
970 def _do_publish2(self, res):
971 ut = upload.Data(self.data, convergence=None)
972 d = self._subdir1_node.create_empty_directory(u"subdir2")
973 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
976 def log(self, res, msg, **kwargs):
977 # print "MSG: %s RES: %s" % (msg, res)
978 log.msg(msg, **kwargs)
981 def stall(self, res, delay=1.0):
983 reactor.callLater(delay, d.callback, res)
986 def _do_publish_private(self, res):
987 self.smalldata = "sssh, very secret stuff"
988 ut = upload.Data(self.smalldata, convergence=None)
989 d = self.clients[0].create_empty_dirnode()
990 d.addCallback(self.log, "GOT private directory")
991 def _got_new_dir(privnode):
992 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
993 d1 = privnode.create_empty_directory(u"personal")
994 d1.addCallback(self.log, "made P/personal")
995 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
996 d1.addCallback(self.log, "made P/personal/sekrit data")
997 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
999 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
1000 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
1002 d1.addCallback(_got_s2)
1003 d1.addCallback(lambda res: privnode)
1005 d.addCallback(_got_new_dir)
1008 def _check_publish1(self, res):
1009 # this one uses the iterative API
1010 c1 = self.clients[1]
1011 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
1012 d.addCallback(self.log, "check_publish1 got /")
1013 d.addCallback(lambda root: root.get(u"subdir1"))
1014 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
1015 d.addCallback(lambda filenode: filenode.download_to_data())
1016 d.addCallback(self.log, "get finished")
1017 def _get_done(data):
1018 self.failUnlessEqual(data, self.data)
1019 d.addCallback(_get_done)
1022 def _check_publish2(self, res):
1023 # this one uses the path-based API
1024 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
1025 d = rootnode.get_child_at_path(u"subdir1")
1026 d.addCallback(lambda dirnode:
1027 self.failUnless(IDirectoryNode.providedBy(dirnode)))
1028 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1029 d.addCallback(lambda filenode: filenode.download_to_data())
1030 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
1032 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1033 def _got_filenode(filenode):
1034 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
1035 assert fnode == filenode
1036 d.addCallback(_got_filenode)
1039 def _check_publish_private(self, resnode):
1040 # this one uses the path-based API
1041 self._private_node = resnode
1043 d = self._private_node.get_child_at_path(u"personal")
1044 def _got_personal(personal):
1045 self._personal_node = personal
1047 d.addCallback(_got_personal)
1049 d.addCallback(lambda dirnode:
1050 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
1052 return self._private_node.get_child_at_path(path)
1054 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
1055 d.addCallback(lambda filenode: filenode.download_to_data())
1056 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
1057 d.addCallback(lambda res: get_path(u"s2-rw"))
1058 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
1059 d.addCallback(lambda res: get_path(u"s2-ro"))
1060 def _got_s2ro(dirnode):
1061 self.failUnless(dirnode.is_mutable(), dirnode)
1062 self.failUnless(dirnode.is_readonly(), dirnode)
1063 d1 = defer.succeed(None)
1064 d1.addCallback(lambda res: dirnode.list())
1065 d1.addCallback(self.log, "dirnode.list")
1067 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
1069 d1.addCallback(self.log, "doing add_file(ro)")
1070 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
1071 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
1073 d1.addCallback(self.log, "doing get(ro)")
1074 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
1075 d1.addCallback(lambda filenode:
1076 self.failUnless(IFileNode.providedBy(filenode)))
1078 d1.addCallback(self.log, "doing delete(ro)")
1079 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
1081 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
1083 d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
1085 personal = self._personal_node
1086 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
1088 d1.addCallback(self.log, "doing move_child_to(ro)2")
1089 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
1091 d1.addCallback(self.log, "finished with _got_s2ro")
1093 d.addCallback(_got_s2ro)
1094 def _got_home(dummy):
1095 home = self._private_node
1096 personal = self._personal_node
1097 d1 = defer.succeed(None)
1098 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
1099 d1.addCallback(lambda res:
1100 personal.move_child_to(u"sekrit data",home,u"sekrit"))
1102 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
1103 d1.addCallback(lambda res:
1104 home.move_child_to(u"sekrit", home, u"sekrit data"))
1106 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
1107 d1.addCallback(lambda res:
1108 home.move_child_to(u"sekrit data", personal))
1110 d1.addCallback(lambda res: home.build_manifest())
1111 d1.addCallback(self.log, "manifest")
1114 # P/personal/sekrit data
1115 # P/s2-rw (same as P/s2-ro)
1116 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1117 d1.addCallback(lambda manifest:
1118 self.failUnlessEqual(len(manifest), 4))
1120 d.addCallback(_got_home)
1123 def shouldFail(self, res, expected_failure, which, substring=None):
1124 if isinstance(res, Failure):
1125 res.trap(expected_failure)
1127 self.failUnless(substring in str(res),
1128 "substring '%s' not in '%s'"
1129 % (substring, str(res)))
1131 self.fail("%s was supposed to raise %s, not get '%s'" %
1132 (which, expected_failure, res))
1134 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1135 assert substring is None or isinstance(substring, str)
1136 d = defer.maybeDeferred(callable, *args, **kwargs)
1138 if isinstance(res, Failure):
1139 res.trap(expected_failure)
1141 self.failUnless(substring in str(res),
1142 "substring '%s' not in '%s'"
1143 % (substring, str(res)))
1145 self.fail("%s was supposed to raise %s, not get '%s'" %
1146 (which, expected_failure, res))
1150 def PUT(self, urlpath, data):
1151 url = self.webish_url + urlpath
1152 return getPage(url, method="PUT", postdata=data)
1154 def GET(self, urlpath, followRedirect=False):
1155 url = self.webish_url + urlpath
1156 return getPage(url, method="GET", followRedirect=followRedirect)
1158 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1160 url = self.helper_webish_url + urlpath
1162 url = self.webish_url + urlpath
1163 sepbase = "boogabooga"
1164 sep = "--" + sepbase
1167 form.append('Content-Disposition: form-data; name="_charset"')
1169 form.append('UTF-8')
1171 for name, value in fields.iteritems():
1172 if isinstance(value, tuple):
1173 filename, value = value
1174 form.append('Content-Disposition: form-data; name="%s"; '
1175 'filename="%s"' % (name, filename.encode("utf-8")))
1177 form.append('Content-Disposition: form-data; name="%s"' % name)
1179 form.append(str(value))
1182 body = "\r\n".join(form) + "\r\n"
1183 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1185 return getPage(url, method="POST", postdata=body,
1186 headers=headers, followRedirect=followRedirect)
1188 def _test_web(self, res):
1189 base = self.webish_url
1190 public = "uri/" + self._root_directory_uri
1192 def _got_welcome(page):
1193 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1194 self.failUnless(expected in page,
1195 "I didn't see the right 'connected storage servers'"
1196 " message in: %s" % page
1198 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1199 self.failUnless(expected in page,
1200 "I didn't see the right 'My nodeid' message "
1202 self.failUnless("Helper: 0 active uploads" in page)
1203 d.addCallback(_got_welcome)
1204 d.addCallback(self.log, "done with _got_welcome")
1206 # get the welcome page from the node that uses the helper too
1207 d.addCallback(lambda res: getPage(self.helper_webish_url))
1208 def _got_welcome_helper(page):
1209 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1211 self.failUnless("Not running helper" in page)
1212 d.addCallback(_got_welcome_helper)
1214 d.addCallback(lambda res: getPage(base + public))
1215 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1216 def _got_subdir1(page):
1217 # there ought to be an href for our file
1218 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1219 self.failUnless(">mydata567</a>" in page)
1220 d.addCallback(_got_subdir1)
1221 d.addCallback(self.log, "done with _got_subdir1")
1222 d.addCallback(lambda res:
1223 getPage(base + public + "/subdir1/mydata567"))
1224 def _got_data(page):
1225 self.failUnlessEqual(page, self.data)
1226 d.addCallback(_got_data)
1228 # download from a URI embedded in a URL
1229 d.addCallback(self.log, "_get_from_uri")
1230 def _get_from_uri(res):
1231 return getPage(base + "uri/%s?filename=%s"
1232 % (self.uri, "mydata567"))
1233 d.addCallback(_get_from_uri)
1234 def _got_from_uri(page):
1235 self.failUnlessEqual(page, self.data)
1236 d.addCallback(_got_from_uri)
1238 # download from a URI embedded in a URL, second form
1239 d.addCallback(self.log, "_get_from_uri2")
1240 def _get_from_uri2(res):
1241 return getPage(base + "uri?uri=%s" % (self.uri,))
1242 d.addCallback(_get_from_uri2)
1243 d.addCallback(_got_from_uri)
1245 # download from a bogus URI, make sure we get a reasonable error
1246 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1247 def _get_from_bogus_uri(res):
1248 d1 = getPage(base + "uri/%s?filename=%s"
1249 % (self.mangle_uri(self.uri), "mydata567"))
1250 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1253 d.addCallback(_get_from_bogus_uri)
1254 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1256 # upload a file with PUT
1257 d.addCallback(self.log, "about to try PUT")
1258 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1259 "new.txt contents"))
1260 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1261 d.addCallback(self.failUnlessEqual, "new.txt contents")
1262 # and again with something large enough to use multiple segments,
1263 # and hopefully trigger pauseProducing too
1264 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1265 "big" * 500000)) # 1.5MB
1266 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1267 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1269 # can we replace files in place?
1270 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1272 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1273 d.addCallback(self.failUnlessEqual, "NEWER contents")
1275 # test unlinked POST
1276 d.addCallback(lambda res: self.POST("uri", t="upload",
1277 file=("new.txt", "data" * 10000)))
1278 # and again using the helper, which exercises different upload-status
1280 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1281 file=("foo.txt", "data2" * 10000)))
1283 # check that the status page exists
1284 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1285 def _got_status(res):
1286 # find an interesting upload and download to look at. LIT files
1287 # are not interesting.
1288 for dl in self.clients[0].list_recent_downloads():
1289 if dl.get_size() > 200:
1290 self._down_status = dl.get_counter()
1291 for ul in self.clients[0].list_recent_uploads():
1292 if ul.get_size() > 200:
1293 self._up_status = ul.get_counter()
1294 rs = self.clients[0].list_recent_retrieve()[0]
1295 self._retrieve_status = rs.get_counter()
1296 ps = self.clients[0].list_recent_publish()[0]
1297 self._publish_status = ps.get_counter()
1299 # and that there are some upload- and download- status pages
1300 return self.GET("status/up-%d" % self._up_status)
1301 d.addCallback(_got_status)
1303 return self.GET("status/down-%d" % self._down_status)
1304 d.addCallback(_got_up)
1306 return self.GET("status/publish-%d" % self._publish_status)
1307 d.addCallback(_got_down)
1308 def _got_publish(res):
1309 return self.GET("status/retrieve-%d" % self._retrieve_status)
1310 d.addCallback(_got_publish)
1312 # check that the helper status page exists
1313 d.addCallback(lambda res:
1314 self.GET("helper_status", followRedirect=True))
1315 def _got_helper_status(res):
1316 self.failUnless("Bytes Fetched:" in res)
1317 # touch a couple of files in the helper's working directory to
1318 # exercise more code paths
1319 workdir = os.path.join(self.getdir("client0"), "helper")
1320 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1321 f = open(incfile, "wb")
1322 f.write("small file")
1324 then = time.time() - 86400*3
1326 os.utime(incfile, (now, then))
1327 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1328 f = open(encfile, "wb")
1329 f.write("less small file")
1331 os.utime(encfile, (now, then))
1332 d.addCallback(_got_helper_status)
1333 # and that the json form exists
1334 d.addCallback(lambda res:
1335 self.GET("helper_status?t=json", followRedirect=True))
1336 def _got_helper_status_json(res):
1337 data = simplejson.loads(res)
1338 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1340 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1341 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1342 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1344 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1345 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1346 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1348 d.addCallback(_got_helper_status_json)
1350 # and check that client[3] (which uses a helper but does not run one
1351 # itself) doesn't explode when you ask for its status
1352 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1353 def _got_non_helper_status(res):
1354 self.failUnless("Upload and Download Status" in res)
1355 d.addCallback(_got_non_helper_status)
1357 # or for helper status with t=json
1358 d.addCallback(lambda res:
1359 getPage(self.helper_webish_url + "helper_status?t=json"))
1360 def _got_non_helper_status_json(res):
1361 data = simplejson.loads(res)
1362 self.failUnlessEqual(data, {})
1363 d.addCallback(_got_non_helper_status_json)
1365 # see if the statistics page exists
1366 d.addCallback(lambda res: self.GET("statistics"))
1367 def _got_stats(res):
1368 self.failUnless("Node Statistics" in res)
1369 self.failUnless(" 'downloader.files_downloaded': 8," in res)
1370 d.addCallback(_got_stats)
1371 d.addCallback(lambda res: self.GET("statistics?t=json"))
1372 def _got_stats_json(res):
1373 data = simplejson.loads(res)
1374 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1375 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1376 d.addCallback(_got_stats_json)
1378 # TODO: mangle the second segment of a file, to test errors that
1379 # occur after we've already sent some good data, which uses a
1380 # different error path.
1382 # TODO: download a URI with a form
1383 # TODO: create a directory by using a form
1384 # TODO: upload by using a form on the directory page
1385 # url = base + "somedir/subdir1/freeform_post!!upload"
1386 # TODO: delete a file by using a button on the directory page
1390 def _test_runner(self, res):
1391 # exercise some of the diagnostic tools in runner.py
1394 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1395 if "storage" not in dirpath:
1399 pieces = dirpath.split(os.sep)
1400 if pieces[-4] == "storage" and pieces[-3] == "shares":
1401 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1402 # are sharefiles here
1403 filename = os.path.join(dirpath, filenames[0])
1404 # peek at the magic to see if it is a chk share
1405 magic = open(filename, "rb").read(4)
1406 if magic == '\x00\x00\x00\x01':
1409 self.fail("unable to find any uri_extension files in %s"
1411 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1413 out,err = StringIO(), StringIO()
1414 rc = runner.runner(["dump-share",
1416 stdout=out, stderr=err)
1417 output = out.getvalue()
1418 self.failUnlessEqual(rc, 0)
1420 # we only upload a single file, so we can assert some things about
1421 # its size and shares.
1422 self.failUnless(("share filename: %s" % filename) in output)
1423 self.failUnless("size: %d\n" % len(self.data) in output)
1424 self.failUnless("num_segments: 1\n" in output)
1425 # segment_size is always a multiple of needed_shares
1426 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1427 self.failUnless("total_shares: 10\n" in output)
1428 # keys which are supposed to be present
1429 for key in ("size", "num_segments", "segment_size",
1430 "needed_shares", "total_shares",
1431 "codec_name", "codec_params", "tail_codec_params",
1432 #"plaintext_hash", "plaintext_root_hash",
1433 "crypttext_hash", "crypttext_root_hash",
1434 "share_root_hash", "UEB_hash"):
1435 self.failUnless("%s: " % key in output, key)
1437 # now use its storage index to find the other shares using the
1438 # 'find-shares' tool
1439 sharedir, shnum = os.path.split(filename)
1440 storagedir, storage_index_s = os.path.split(sharedir)
1441 out,err = StringIO(), StringIO()
1442 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1443 cmd = ["find-shares", storage_index_s] + nodedirs
1444 rc = runner.runner(cmd, stdout=out, stderr=err)
1445 self.failUnlessEqual(rc, 0)
1447 sharefiles = [sfn.strip() for sfn in out.readlines()]
1448 self.failUnlessEqual(len(sharefiles), 10)
1450 # also exercise the 'catalog-shares' tool
1451 out,err = StringIO(), StringIO()
1452 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1453 cmd = ["catalog-shares"] + nodedirs
1454 rc = runner.runner(cmd, stdout=out, stderr=err)
1455 self.failUnlessEqual(rc, 0)
1457 descriptions = [sfn.strip() for sfn in out.readlines()]
1458 self.failUnlessEqual(len(descriptions), 30)
1460 for line in descriptions
1461 if line.startswith("CHK %s " % storage_index_s)]
1462 self.failUnlessEqual(len(matching), 10)
1464 def _test_control(self, res):
1465 # exercise the remote-control-the-client foolscap interfaces in
1466 # allmydata.control (mostly used for performance tests)
1467 c0 = self.clients[0]
1468 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1469 control_furl = open(control_furl_file, "r").read().strip()
1470 # it doesn't really matter which Tub we use to connect to the client,
1471 # so let's just use our IntroducerNode's
1472 d = self.introducer.tub.getReference(control_furl)
1473 d.addCallback(self._test_control2, control_furl_file)
1475 def _test_control2(self, rref, filename):
1476 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1477 downfile = os.path.join(self.basedir, "control.downfile")
1478 d.addCallback(lambda uri:
1479 rref.callRemote("download_from_uri_to_file",
1482 self.failUnlessEqual(res, downfile)
1483 data = open(downfile, "r").read()
1484 expected_data = open(filename, "r").read()
1485 self.failUnlessEqual(data, expected_data)
1486 d.addCallback(_check)
1487 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1488 if sys.platform == "linux2":
1489 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1490 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1493 def _test_cli(self, res):
1494 # run various CLI commands (in a thread, since they use blocking
1497 private_uri = self._private_node.get_uri()
1498 some_uri = self._root_directory_uri
1499 client0_basedir = self.getdir("client0")
1502 "--node-directory", client0_basedir,
1503 "--dir-cap", private_uri,
1506 "--node-url", self.webish_url,
1507 "--dir-cap", some_uri,
1509 TESTDATA = "I will not write the same thing over and over.\n" * 100
1511 d = defer.succeed(None)
1514 argv = ["ls"] + nodeargs
1515 return self._run_cli(argv)
1516 d.addCallback(_ls_root)
1517 def _check_ls_root((out,err)):
1518 self.failUnless("personal" in out)
1519 self.failUnless("s2-ro" in out)
1520 self.failUnless("s2-rw" in out)
1521 self.failUnlessEqual(err, "")
1522 d.addCallback(_check_ls_root)
1524 def _ls_subdir(res):
1525 argv = ["ls"] + nodeargs + ["personal"]
1526 return self._run_cli(argv)
1527 d.addCallback(_ls_subdir)
1528 def _check_ls_subdir((out,err)):
1529 self.failUnless("sekrit data" in out)
1530 self.failUnlessEqual(err, "")
1531 d.addCallback(_check_ls_subdir)
1533 def _ls_public_subdir(res):
1534 argv = ["ls"] + public_nodeargs + ["subdir1"]
1535 return self._run_cli(argv)
1536 d.addCallback(_ls_public_subdir)
1537 def _check_ls_public_subdir((out,err)):
1538 self.failUnless("subdir2" in out)
1539 self.failUnless("mydata567" in out)
1540 self.failUnlessEqual(err, "")
1541 d.addCallback(_check_ls_public_subdir)
1544 argv = ["ls"] + public_nodeargs + ["subdir1/mydata567"]
1545 return self._run_cli(argv)
1546 d.addCallback(_ls_file)
1547 def _check_ls_file((out,err)):
1548 self.failUnlessEqual(out.strip(), "112 subdir1/mydata567")
1549 self.failUnlessEqual(err, "")
1550 d.addCallback(_check_ls_file)
1552 # tahoe_ls doesn't currently handle the error correctly: it tries to
1553 # JSON-parse a traceback.
1554 ## def _ls_missing(res):
1555 ## argv = ["ls"] + nodeargs + ["bogus"]
1556 ## return self._run_cli(argv)
1557 ## d.addCallback(_ls_missing)
1558 ## def _check_ls_missing((out,err)):
1561 ## self.failUnlessEqual(err, "")
1562 ## d.addCallback(_check_ls_missing)
1565 tdir = self.getdir("cli_put")
1566 fileutil.make_dirs(tdir)
1567 fn = os.path.join(tdir, "upload_me")
1571 argv = ["put"] + nodeargs + [fn, "test_put/upload.txt"]
1572 return self._run_cli(argv)
1574 def _check_put((out,err)):
1575 self.failUnless("200 OK" in out)
1576 self.failUnlessEqual(err, "")
1577 d = self._private_node.get_child_at_path(u"test_put/upload.txt")
1578 d.addCallback(lambda filenode: filenode.download_to_data())
1579 def _check_put2(res):
1580 self.failUnlessEqual(res, TESTDATA)
1581 d.addCallback(_check_put2)
1583 d.addCallback(_check_put)
1585 def _get_to_stdout(res):
1586 argv = ["get"] + nodeargs + ["test_put/upload.txt"]
1587 return self._run_cli(argv)
1588 d.addCallback(_get_to_stdout)
1589 def _check_get_to_stdout((out,err)):
1590 self.failUnlessEqual(out, TESTDATA)
1591 self.failUnlessEqual(err, "")
1592 d.addCallback(_check_get_to_stdout)
1594 get_to_file_target = self.basedir + "/get.downfile"
1595 def _get_to_file(res):
1596 argv = ["get"] + nodeargs + ["test_put/upload.txt",
1598 return self._run_cli(argv)
1599 d.addCallback(_get_to_file)
1600 def _check_get_to_file((out,err)):
1601 data = open(get_to_file_target, "rb").read()
1602 self.failUnlessEqual(data, TESTDATA)
1603 self.failUnlessEqual(out, "")
1604 self.failUnlessEqual(err, "test_put/upload.txt retrieved and written to system/SystemTest/test_vdrive/get.downfile\n")
1605 d.addCallback(_check_get_to_file)
1609 argv = ["mv"] + nodeargs + ["test_put/upload.txt",
1610 "test_put/moved.txt"]
1611 return self._run_cli(argv)
1613 def _check_mv((out,err)):
1614 self.failUnless("OK" in out)
1615 self.failUnlessEqual(err, "")
1616 d = self.shouldFail2(KeyError, "test_cli._check_rm", "'upload.txt'", self._private_node.get_child_at_path, u"test_put/upload.txt")
1618 d.addCallback(lambda res:
1619 self._private_node.get_child_at_path(u"test_put/moved.txt"))
1620 d.addCallback(lambda filenode: filenode.download_to_data())
1621 def _check_mv2(res):
1622 self.failUnlessEqual(res, TESTDATA)
1623 d.addCallback(_check_mv2)
1625 d.addCallback(_check_mv)
1628 argv = ["rm"] + nodeargs + ["test_put/moved.txt"]
1629 return self._run_cli(argv)
1631 def _check_rm((out,err)):
1632 self.failUnless("200 OK" in out)
1633 self.failUnlessEqual(err, "")
1634 d = self.shouldFail2(KeyError, "test_cli._check_rm", "'moved.txt'", self._private_node.get_child_at_path, u"test_put/moved.txt")
1636 d.addCallback(_check_rm)
1639 def _run_cli(self, argv):
1640 stdout, stderr = StringIO(), StringIO()
1641 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1642 stdout=stdout, stderr=stderr)
1644 return stdout.getvalue(), stderr.getvalue()
1645 d.addCallback(_done)
1648 def _test_checker(self, res):
1649 d = self._private_node.build_manifest()
1650 d.addCallback(self._test_checker_2)
1653 def _test_checker_2(self, manifest):
1654 checker1 = self.clients[1].getServiceNamed("checker")
1655 self.failUnlessEqual(checker1.checker_results_for(None), [])
1656 self.failUnlessEqual(checker1.checker_results_for(list(manifest)[0]),
1659 starting_time = time.time()
1661 dl.append(checker1.check(si))
1662 d = deferredutil.DeferredListShouldSucceed(dl)
1664 def _check_checker_results(res):
1667 self.failUnless(i is True)
1669 (needed, total, found, sharemap) = i
1670 self.failUnlessEqual(needed, 3)
1671 self.failUnlessEqual(total, 10)
1672 self.failUnlessEqual(found, total)
1673 self.failUnlessEqual(len(sharemap.keys()), 10)
1675 for shpeers in sharemap.values():
1676 peers.update(shpeers)
1677 self.failUnlessEqual(len(peers), self.numclients)
1678 d.addCallback(_check_checker_results)
1680 def _check_stored_results(res):
1681 finish_time = time.time()
1684 results = checker1.checker_results_for(si)
1686 # TODO: implement checker for mutable files and implement tests of that checker
1688 self.failUnlessEqual(len(results), 1)
1689 when, those_results = results[0]
1690 self.failUnless(isinstance(when, (int, float)))
1691 self.failUnless(starting_time <= when <= finish_time)
1692 all_results.append(those_results)
1693 _check_checker_results(all_results)
1694 d.addCallback(_check_stored_results)
1696 d.addCallback(self._test_checker_3)
1699 def _test_checker_3(self, res):
1700 # check one file, through FileNode.check()
1701 d = self._private_node.get_child_at_path(u"personal/sekrit data")
1702 d.addCallback(lambda n: n.check())
1703 def _checked(results):
1704 # 'sekrit data' is small, and fits in a LiteralFileNode, so
1705 # checking it is trivial and always returns True
1706 self.failUnlessEqual(results, True)
1707 d.addCallback(_checked)
1709 c0 = self.clients[1]
1710 n = c0.create_node_from_uri(self._root_directory_uri)
1711 d.addCallback(lambda res: n.get_child_at_path(u"subdir1/mydata567"))
1712 d.addCallback(lambda n: n.check())
1713 def _checked2(results):
1714 # mydata567 is large and lives in a CHK
1715 (needed, total, found, sharemap) = results
1716 self.failUnlessEqual(needed, 3)
1717 self.failUnlessEqual(total, 10)
1718 self.failUnlessEqual(found, 10)
1719 self.failUnlessEqual(len(sharemap), 10)
1720 for shnum in range(10):
1721 self.failUnlessEqual(len(sharemap[shnum]), 1)
1722 d.addCallback(_checked2)
1726 def _test_verifier(self, res):
1727 checker1 = self.clients[1].getServiceNamed("checker")
1728 d = self._private_node.build_manifest()
1729 def _check_all(manifest):
1732 dl.append(checker1.verify(si))
1733 return deferredutil.DeferredListShouldSucceed(dl)
1734 d.addCallback(_check_all)
1737 self.failUnless(i is True)
1738 d.addCallback(_done)
1739 d.addCallback(lambda res: checker1.verify(None))
1740 d.addCallback(self.failUnlessEqual, True)