2 from base64 import b32encode
3 import os, sys, time, re, simplejson
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.application import service
11 from allmydata import client, uri, download, upload, storage, mutable, offloaded
12 from allmydata.introducer import IntroducerNode
13 from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
14 from allmydata.util import log
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
17 from allmydata.mutable import NotMutableError
18 from allmydata.stats import PickleStatsGatherer
19 from allmydata.key_generator import KeyGeneratorService
20 from foolscap.eventual import flushEventualQueue, fireEventually
21 from foolscap import DeadReferenceError, Tub
22 from twisted.python.failure import Failure
23 from twisted.web.client import getPage
24 from twisted.web.error import Error
26 def flush_but_dont_ignore(res):
27 d = flushEventualQueue()
34 This is some data to publish to the virtual drive, which needs to be large
35 enough to not fit inside a LIT uri.
38 class CountingDataUploadable(upload.Data):
40 interrupt_after = None
41 interrupt_after_d = None
43 def read(self, length):
44 self.bytes_read += length
45 if self.interrupt_after is not None:
46 if self.bytes_read > self.interrupt_after:
47 self.interrupt_after = None
48 self.interrupt_after_d.callback(self)
49 return upload.Data.read(self, length)
52 class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
55 self.sparent = service.MultiService()
56 self.sparent.startService()
58 log.msg("shutting down SystemTest services")
59 d = self.sparent.stopService()
60 d.addBoth(flush_but_dont_ignore)
63 def getdir(self, subdir):
64 return os.path.join(self.basedir, subdir)
66 def add_service(self, s):
67 s.setServiceParent(self.sparent)
70 def set_up_nodes(self, NUMCLIENTS=5, createprivdir=False):
71 self.numclients = NUMCLIENTS
72 self.createprivdir = createprivdir
73 iv_dir = self.getdir("introducer")
74 if not os.path.isdir(iv_dir):
75 fileutil.make_dirs(iv_dir)
76 f = open(os.path.join(iv_dir, "webport"), "w")
77 f.write("tcp:0:interface=127.0.0.1\n")
79 iv = IntroducerNode(basedir=iv_dir)
80 self.introducer = self.add_service(iv)
81 d = self.introducer.when_tub_ready()
82 d.addCallback(self._get_introducer_web)
83 d.addCallback(self._set_up_stats_gatherer)
84 d.addCallback(self._set_up_key_generator)
85 d.addCallback(self._set_up_nodes_2)
86 d.addCallback(self._grab_stats)
89 def _get_introducer_web(self, res):
90 f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
91 self.introweb_url = f.read().strip()
94 def _set_up_stats_gatherer(self, res):
95 statsdir = self.getdir("stats_gatherer")
96 fileutil.make_dirs(statsdir)
99 l = t.listenOn("tcp:0")
101 t.setLocation("localhost:%d" % p)
103 self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
104 self.add_service(self.stats_gatherer)
105 self.stats_gatherer_furl = self.stats_gatherer.get_furl()
107 def _set_up_key_generator(self, res):
108 kgsdir = self.getdir("key_generator")
109 fileutil.make_dirs(kgsdir)
111 self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False)
112 self.key_generator_svc.key_generator.pool_size = 4
113 self.key_generator_svc.key_generator.pool_refresh_delay = 60
114 self.add_service(self.key_generator_svc)
117 def check_for_furl():
118 return os.path.exists(os.path.join(kgsdir, 'key_generator.furl'))
119 d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
121 kgf = os.path.join(kgsdir, 'key_generator.furl')
122 self.key_generator_furl = file(kgf, 'rb').read().strip()
123 d.addCallback(get_furl)
126 def _set_up_nodes_2(self, res):
128 self.introducer_furl = q.introducer_url
131 for i in range(self.numclients):
132 basedir = self.getdir("client%d" % i)
133 basedirs.append(basedir)
134 fileutil.make_dirs(basedir)
136 # client[0] runs a webserver and a helper, no key_generator
137 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
138 open(os.path.join(basedir, "run_helper"), "w").write("yes\n")
140 # client[3] runs a webserver and uses a helper, uses key_generator
141 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
142 kgf = "%s\n" % (self.key_generator_furl,)
143 open(os.path.join(basedir, "key_generator.furl"), "w").write(kgf)
144 if self.createprivdir:
145 fileutil.make_dirs(os.path.join(basedir, "private"))
146 open(os.path.join(basedir, "private", "root_dir.cap"), "w")
147 open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
148 open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
150 # start client[0], wait for it's tub to be ready (at which point it
151 # will have registered the helper furl).
152 c = self.add_service(client.Client(basedir=basedirs[0]))
153 self.clients.append(c)
154 d = c.when_tub_ready()
156 f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
157 helper_furl = f.read()
159 self.helper_furl = helper_furl
160 f = open(os.path.join(basedirs[3],"helper.furl"), "w")
164 # this starts the rest of the clients
165 for i in range(1, self.numclients):
166 c = self.add_service(client.Client(basedir=basedirs[i]))
167 self.clients.append(c)
169 return self.wait_for_connections()
170 d.addCallback(_ready)
173 # now find out where the web port was
174 l = self.clients[0].getServiceNamed("webish").listener
175 port = l._port.getHost().port
176 self.webish_url = "http://localhost:%d/" % port
177 # and the helper-using webport
178 l = self.clients[3].getServiceNamed("webish").listener
179 port = l._port.getHost().port
180 self.helper_webish_url = "http://localhost:%d/" % port
181 d.addCallback(_connected)
184 def _grab_stats(self, res):
185 d = self.stats_gatherer.poll()
188 def bounce_client(self, num):
189 c = self.clients[num]
190 d = c.disownServiceParent()
191 # I think windows requires a moment to let the connection really stop
192 # and the port number made available for re-use. TODO: examine the
193 # behavior, see if this is really the problem, see if we can do
194 # better than blindly waiting for a second.
195 d.addCallback(self.stall, 1.0)
197 new_c = client.Client(basedir=self.getdir("client%d" % num))
198 self.clients[num] = new_c
199 self.add_service(new_c)
200 return new_c.when_tub_ready()
201 d.addCallback(_stopped)
202 d.addCallback(lambda res: self.wait_for_connections())
203 def _maybe_get_webport(res):
205 # now find out where the web port was
206 l = self.clients[0].getServiceNamed("webish").listener
207 port = l._port.getHost().port
208 self.webish_url = "http://localhost:%d/" % port
209 d.addCallback(_maybe_get_webport)
212 def add_extra_node(self, client_num, helper_furl=None,
213 add_to_sparent=False):
214 # usually this node is *not* parented to our self.sparent, so we can
215 # shut it down separately from the rest, to exercise the
216 # connection-lost code
217 basedir = self.getdir("client%d" % client_num)
218 if not os.path.isdir(basedir):
219 fileutil.make_dirs(basedir)
220 open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
222 f = open(os.path.join(basedir, "helper.furl") ,"w")
223 f.write(helper_furl+"\n")
226 c = client.Client(basedir=basedir)
227 self.clients.append(c)
230 c.setServiceParent(self.sparent)
233 d = self.wait_for_connections()
234 d.addCallback(lambda res: c)
237 def _check_connections(self):
238 for c in self.clients:
239 ic = c.introducer_client
240 if not ic.connected_to_introducer():
242 if len(ic.get_all_peerids()) != self.numclients:
246 def wait_for_connections(self, ignored=None):
247 # TODO: replace this with something that takes a list of peerids and
248 # fires when they've all been heard from, instead of using a count
250 return self.poll(self._check_connections, timeout=200)
252 def test_connections(self):
253 self.basedir = "system/SystemTest/test_connections"
254 d = self.set_up_nodes()
255 self.extra_node = None
256 d.addCallback(lambda res: self.add_extra_node(self.numclients))
257 def _check(extra_node):
258 self.extra_node = extra_node
259 for c in self.clients:
260 all_peerids = list(c.get_all_peerids())
261 self.failUnlessEqual(len(all_peerids), self.numclients+1)
262 permuted_peers = list(c.get_permuted_peers("storage", "a"))
263 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
265 d.addCallback(_check)
266 def _shutdown_extra_node(res):
268 return self.extra_node.stopService()
270 d.addBoth(_shutdown_extra_node)
272 test_connections.timeout = 300
273 # test_connections is subsumed by test_upload_and_download, and takes
274 # quite a while to run on a slow machine (because of all the TLS
275 # connections that must be established). If we ever rework the introducer
276 # code to such an extent that we're not sure if it works anymore, we can
277 # reinstate this test until it does.
280 def test_upload_and_download_random_key(self):
281 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
282 return self._test_upload_and_download(convergence=None)
283 test_upload_and_download_random_key.timeout = 4800
285 def test_upload_and_download_convergent(self):
286 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
287 return self._test_upload_and_download(convergence="some convergence string")
288 test_upload_and_download_convergent.timeout = 4800
290 def _test_upload_and_download(self, convergence):
291 # we use 4000 bytes of data, which will result in about 400k written
292 # to disk among all our simulated nodes
293 DATA = "Some data to upload\n" * 200
294 d = self.set_up_nodes()
295 def _check_connections(res):
296 for c in self.clients:
297 all_peerids = list(c.get_all_peerids())
298 self.failUnlessEqual(len(all_peerids), self.numclients)
299 permuted_peers = list(c.get_permuted_peers("storage", "a"))
300 self.failUnlessEqual(len(permuted_peers), self.numclients)
301 d.addCallback(_check_connections)
305 u = self.clients[0].getServiceNamed("uploader")
307 # we crank the max segsize down to 1024b for the duration of this
308 # test, so we can exercise multiple segments. It is important
309 # that this is not a multiple of the segment size, so that the
310 # tail segment is not the same length as the others. This actualy
311 # gets rounded up to 1025 to be a multiple of the number of
312 # required shares (since we use 25 out of 100 FEC).
313 up = upload.Data(DATA, convergence=convergence)
314 up.max_segment_size = 1024
317 d.addCallback(_do_upload)
318 def _upload_done(results):
320 log.msg("upload finished: uri is %s" % (uri,))
322 dl = self.clients[1].getServiceNamed("downloader")
324 d.addCallback(_upload_done)
326 def _upload_again(res):
327 # Upload again. If using convergent encryption then this ought to be
328 # short-circuited, however with the way we currently generate URIs
329 # (i.e. because they include the roothash), we have to do all of the
330 # encoding work, and only get to save on the upload part.
331 log.msg("UPLOADING AGAIN")
332 up = upload.Data(DATA, convergence=convergence)
333 up.max_segment_size = 1024
334 d1 = self.uploader.upload(up)
335 d.addCallback(_upload_again)
337 def _download_to_data(res):
338 log.msg("DOWNLOADING")
339 return self.downloader.download_to_data(self.uri)
340 d.addCallback(_download_to_data)
341 def _download_to_data_done(data):
342 log.msg("download finished")
343 self.failUnlessEqual(data, DATA)
344 d.addCallback(_download_to_data_done)
346 target_filename = os.path.join(self.basedir, "download.target")
347 def _download_to_filename(res):
348 return self.downloader.download_to_filename(self.uri,
350 d.addCallback(_download_to_filename)
351 def _download_to_filename_done(res):
352 newdata = open(target_filename, "rb").read()
353 self.failUnlessEqual(newdata, DATA)
354 d.addCallback(_download_to_filename_done)
356 target_filename2 = os.path.join(self.basedir, "download.target2")
357 def _download_to_filehandle(res):
358 fh = open(target_filename2, "wb")
359 return self.downloader.download_to_filehandle(self.uri, fh)
360 d.addCallback(_download_to_filehandle)
361 def _download_to_filehandle_done(fh):
363 newdata = open(target_filename2, "rb").read()
364 self.failUnlessEqual(newdata, DATA)
365 d.addCallback(_download_to_filehandle_done)
367 def _download_nonexistent_uri(res):
368 baduri = self.mangle_uri(self.uri)
369 log.msg("about to download non-existent URI", level=log.UNUSUAL,
370 facility="tahoe.tests")
371 d1 = self.downloader.download_to_data(baduri)
372 def _baduri_should_fail(res):
373 log.msg("finished downloading non-existend URI",
374 level=log.UNUSUAL, facility="tahoe.tests")
375 self.failUnless(isinstance(res, Failure))
376 self.failUnless(res.check(download.NotEnoughPeersError),
377 "expected NotEnoughPeersError, got %s" % res)
378 # TODO: files that have zero peers should get a special kind
379 # of NotEnoughPeersError, which can be used to suggest that
380 # the URI might be wrong or that they've never uploaded the
381 # file in the first place.
382 d1.addBoth(_baduri_should_fail)
384 d.addCallback(_download_nonexistent_uri)
386 # add a new node, which doesn't accept shares, and only uses the
388 d.addCallback(lambda res: self.add_extra_node(self.numclients,
390 add_to_sparent=True))
391 def _added(extra_node):
392 self.extra_node = extra_node
393 extra_node.getServiceNamed("storage").sizelimit = 0
394 d.addCallback(_added)
396 HELPER_DATA = "Data that needs help to upload" * 1000
397 def _upload_with_helper(res):
398 u = upload.Data(HELPER_DATA, convergence=convergence)
399 d = self.extra_node.upload(u)
400 def _uploaded(results):
402 return self.downloader.download_to_data(uri)
403 d.addCallback(_uploaded)
405 self.failUnlessEqual(newdata, HELPER_DATA)
406 d.addCallback(_check)
408 d.addCallback(_upload_with_helper)
410 def _upload_duplicate_with_helper(res):
411 u = upload.Data(HELPER_DATA, convergence=convergence)
412 u.debug_stash_RemoteEncryptedUploadable = True
413 d = self.extra_node.upload(u)
414 def _uploaded(results):
416 return self.downloader.download_to_data(uri)
417 d.addCallback(_uploaded)
419 self.failUnlessEqual(newdata, HELPER_DATA)
420 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
421 "uploadable started uploading, should have been avoided")
422 d.addCallback(_check)
424 if convergence is not None:
425 d.addCallback(_upload_duplicate_with_helper)
427 def _upload_resumable(res):
428 DATA = "Data that needs help to upload and gets interrupted" * 1000
429 u1 = CountingDataUploadable(DATA, convergence=convergence)
430 u2 = CountingDataUploadable(DATA, convergence=convergence)
432 # we interrupt the connection after about 5kB by shutting down
433 # the helper, then restartingit.
434 u1.interrupt_after = 5000
435 u1.interrupt_after_d = defer.Deferred()
436 u1.interrupt_after_d.addCallback(lambda res:
437 self.bounce_client(0))
439 # sneak into the helper and reduce its chunk size, so that our
440 # debug_interrupt will sever the connection on about the fifth
441 # chunk fetched. This makes sure that we've started to write the
442 # new shares before we abandon them, which exercises the
443 # abort/delete-partial-share code. TODO: find a cleaner way to do
444 # this. I know that this will affect later uses of the helper in
445 # this same test run, but I'm not currently worried about it.
446 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
448 d = self.extra_node.upload(u1)
450 def _should_not_finish(res):
451 self.fail("interrupted upload should have failed, not finished"
452 " with result %s" % (res,))
454 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
456 # make sure we actually interrupted it before finishing the
458 self.failUnless(u1.bytes_read < len(DATA),
459 "read %d out of %d total" % (u1.bytes_read,
462 log.msg("waiting for reconnect", level=log.NOISY,
463 facility="tahoe.test.test_system")
464 # now, we need to give the nodes a chance to notice that this
465 # connection has gone away. When this happens, the storage
466 # servers will be told to abort their uploads, removing the
467 # partial shares. Unfortunately this involves TCP messages
468 # going through the loopback interface, and we can't easily
469 # predict how long that will take. If it were all local, we
470 # could use fireEventually() to stall. Since we don't have
471 # the right introduction hooks, the best we can do is use a
472 # fixed delay. TODO: this is fragile.
473 u1.interrupt_after_d.addCallback(self.stall, 2.0)
474 return u1.interrupt_after_d
475 d.addCallbacks(_should_not_finish, _interrupted)
477 def _disconnected(res):
478 # check to make sure the storage servers aren't still hanging
479 # on to the partial share: their incoming/ directories should
481 log.msg("disconnected", level=log.NOISY,
482 facility="tahoe.test.test_system")
483 for i in range(self.numclients):
484 incdir = os.path.join(self.getdir("client%d" % i),
485 "storage", "shares", "incoming")
486 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
487 d.addCallback(_disconnected)
489 # then we need to give the reconnector a chance to
490 # reestablish the connection to the helper.
491 d.addCallback(lambda res:
492 log.msg("wait_for_connections", level=log.NOISY,
493 facility="tahoe.test.test_system"))
494 d.addCallback(lambda res: self.wait_for_connections())
497 d.addCallback(lambda res:
498 log.msg("uploading again", level=log.NOISY,
499 facility="tahoe.test.test_system"))
500 d.addCallback(lambda res: self.extra_node.upload(u2))
502 def _uploaded(results):
504 log.msg("Second upload complete", level=log.NOISY,
505 facility="tahoe.test.test_system")
507 # this is really bytes received rather than sent, but it's
508 # convenient and basically measures the same thing
509 bytes_sent = results.ciphertext_fetched
511 # We currently don't support resumption of upload if the data is
512 # encrypted with a random key. (Because that would require us
513 # to store the key locally and re-use it on the next upload of
514 # this file, which isn't a bad thing to do, but we currently
516 if convergence is not None:
517 # Make sure we did not have to read the whole file the
518 # second time around .
519 self.failUnless(bytes_sent < len(DATA),
520 "resumption didn't save us any work:"
521 " read %d bytes out of %d total" %
522 (bytes_sent, len(DATA)))
524 # Make sure we did have to read the whole file the second
525 # time around -- because the one that we partially uploaded
526 # earlier was encrypted with a different random key.
527 self.failIf(bytes_sent < len(DATA),
528 "resumption saved us some work even though we were using random keys:"
529 " read %d bytes out of %d total" %
530 (bytes_sent, len(DATA)))
531 return self.downloader.download_to_data(uri)
532 d.addCallback(_uploaded)
535 self.failUnlessEqual(newdata, DATA)
536 # If using convergent encryption, then also check that the
537 # helper has removed the temp file from its directories.
538 if convergence is not None:
539 basedir = os.path.join(self.getdir("client0"), "helper")
540 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
541 self.failUnlessEqual(files, [])
542 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
543 self.failUnlessEqual(files, [])
544 d.addCallback(_check)
546 d.addCallback(_upload_resumable)
550 def _find_shares(self, basedir):
552 for (dirpath, dirnames, filenames) in os.walk(basedir):
553 if "storage" not in dirpath:
557 pieces = dirpath.split(os.sep)
558 if pieces[-4] == "storage" and pieces[-3] == "shares":
559 # we're sitting in .../storage/shares/$START/$SINDEX , and there
560 # are sharefiles here
561 assert pieces[-5].startswith("client")
562 client_num = int(pieces[-5][-1])
563 storage_index_s = pieces[-1]
564 storage_index = storage.si_a2b(storage_index_s)
565 for sharename in filenames:
566 shnum = int(sharename)
567 filename = os.path.join(dirpath, sharename)
568 data = (client_num, storage_index, filename, shnum)
571 self.fail("unable to find any share files in %s" % basedir)
574 def _corrupt_mutable_share(self, filename, which):
575 msf = storage.MutableShareFile(filename)
576 datav = msf.readv([ (0, 1000000) ])
577 final_share = datav[0]
578 assert len(final_share) < 1000000 # ought to be truncated
579 pieces = mutable.unpack_share(final_share)
580 (seqnum, root_hash, IV, k, N, segsize, datalen,
581 verification_key, signature, share_hash_chain, block_hash_tree,
582 share_data, enc_privkey) = pieces
584 if which == "seqnum":
587 root_hash = self.flip_bit(root_hash)
589 IV = self.flip_bit(IV)
590 elif which == "segsize":
591 segsize = segsize + 15
592 elif which == "pubkey":
593 verification_key = self.flip_bit(verification_key)
594 elif which == "signature":
595 signature = self.flip_bit(signature)
596 elif which == "share_hash_chain":
597 nodenum = share_hash_chain.keys()[0]
598 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
599 elif which == "block_hash_tree":
600 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
601 elif which == "share_data":
602 share_data = self.flip_bit(share_data)
603 elif which == "encprivkey":
604 enc_privkey = self.flip_bit(enc_privkey)
606 prefix = mutable.pack_prefix(seqnum, root_hash, IV, k, N,
608 final_share = mutable.pack_share(prefix,
615 msf.writev( [(0, final_share)], None)
617 def test_mutable(self):
618 self.basedir = "system/SystemTest/test_mutable"
619 DATA = "initial contents go here." # 25 bytes % 3 != 0
620 NEWDATA = "new contents yay"
621 NEWERDATA = "this is getting old"
623 d = self.set_up_nodes()
625 def _create_mutable(res):
627 log.msg("starting create_mutable_file")
628 d1 = c.create_mutable_file(DATA)
630 log.msg("DONE: %s" % (res,))
631 self._mutable_node_1 = res
633 d1.addCallback(_done)
635 d.addCallback(_create_mutable)
637 def _test_debug(res):
638 # find a share. It is important to run this while there is only
639 # one slot in the grid.
640 shares = self._find_shares(self.basedir)
641 (client_num, storage_index, filename, shnum) = shares[0]
642 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
644 log.msg(" for clients[%d]" % client_num)
646 out,err = StringIO(), StringIO()
647 rc = runner.runner(["dump-share",
649 stdout=out, stderr=err)
650 output = out.getvalue()
651 self.failUnlessEqual(rc, 0)
653 self.failUnless("Mutable slot found:\n" in output)
654 self.failUnless("share_type: SDMF\n" in output)
655 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
656 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
657 self.failUnless(" num_extra_leases: 0\n" in output)
658 # the pubkey size can vary by a byte, so the container might
659 # be a bit larger on some runs.
660 m = re.search(r'^ container_size: (\d+)$', output, re.M)
662 container_size = int(m.group(1))
663 self.failUnless(2037 <= container_size <= 2049, container_size)
664 m = re.search(r'^ data_length: (\d+)$', output, re.M)
666 data_length = int(m.group(1))
667 self.failUnless(2037 <= data_length <= 2049, data_length)
668 self.failUnless(" secrets are for nodeid: %s\n" % peerid
670 self.failUnless(" SDMF contents:\n" in output)
671 self.failUnless(" seqnum: 1\n" in output)
672 self.failUnless(" required_shares: 3\n" in output)
673 self.failUnless(" total_shares: 10\n" in output)
674 self.failUnless(" segsize: 27\n" in output, (output, filename))
675 self.failUnless(" datalen: 25\n" in output)
676 # the exact share_hash_chain nodes depends upon the sharenum,
677 # and is more of a hassle to compute than I want to deal with
679 self.failUnless(" share_hash_chain: " in output)
680 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
681 except unittest.FailTest:
683 print "dump-share output was:"
686 d.addCallback(_test_debug)
690 # first, let's see if we can use the existing node to retrieve the
691 # contents. This allows it to use the cached pubkey and maybe the
692 # latest-known sharemap.
694 d.addCallback(lambda res: self._mutable_node_1.download_to_data())
695 def _check_download_1(res):
696 self.failUnlessEqual(res, DATA)
697 # now we see if we can retrieve the data from a new node,
698 # constructed using the URI of the original one. We do this test
699 # on the same client that uploaded the data.
700 uri = self._mutable_node_1.get_uri()
701 log.msg("starting retrieve1")
702 newnode = self.clients[0].create_node_from_uri(uri)
703 return newnode.download_to_data()
704 d.addCallback(_check_download_1)
706 def _check_download_2(res):
707 self.failUnlessEqual(res, DATA)
708 # same thing, but with a different client
709 uri = self._mutable_node_1.get_uri()
710 newnode = self.clients[1].create_node_from_uri(uri)
711 log.msg("starting retrieve2")
712 d1 = newnode.download_to_data()
713 d1.addCallback(lambda res: (res, newnode))
715 d.addCallback(_check_download_2)
717 def _check_download_3((res, newnode)):
718 self.failUnlessEqual(res, DATA)
720 log.msg("starting replace1")
721 d1 = newnode.update(NEWDATA)
722 d1.addCallback(lambda res: newnode.download_to_data())
724 d.addCallback(_check_download_3)
726 def _check_download_4(res):
727 self.failUnlessEqual(res, NEWDATA)
728 # now create an even newer node and replace the data on it. This
729 # new node has never been used for download before.
730 uri = self._mutable_node_1.get_uri()
731 newnode1 = self.clients[2].create_node_from_uri(uri)
732 newnode2 = self.clients[3].create_node_from_uri(uri)
733 self._newnode3 = self.clients[3].create_node_from_uri(uri)
734 log.msg("starting replace2")
735 d1 = newnode1.overwrite(NEWERDATA)
736 d1.addCallback(lambda res: newnode2.download_to_data())
738 d.addCallback(_check_download_4)
740 def _check_download_5(res):
741 log.msg("finished replace2")
742 self.failUnlessEqual(res, NEWERDATA)
743 d.addCallback(_check_download_5)
745 def _corrupt_shares(res):
746 # run around and flip bits in all but k of the shares, to test
748 shares = self._find_shares(self.basedir)
749 ## sort by share number
750 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
751 where = dict([ (shnum, filename)
752 for (client_num, storage_index, filename, shnum)
754 assert len(where) == 10 # this test is designed for 3-of-10
755 for shnum, filename in where.items():
756 # shares 7,8,9 are left alone. read will check
757 # (share_hash_chain, block_hash_tree, share_data). New
758 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
759 # segsize, signature).
761 # read: this will trigger "pubkey doesn't match
763 self._corrupt_mutable_share(filename, "pubkey")
764 self._corrupt_mutable_share(filename, "encprivkey")
766 # triggers "signature is invalid"
767 self._corrupt_mutable_share(filename, "seqnum")
769 # triggers "signature is invalid"
770 self._corrupt_mutable_share(filename, "R")
772 # triggers "signature is invalid"
773 self._corrupt_mutable_share(filename, "segsize")
775 self._corrupt_mutable_share(filename, "share_hash_chain")
777 self._corrupt_mutable_share(filename, "block_hash_tree")
779 self._corrupt_mutable_share(filename, "share_data")
780 # other things to correct: IV, signature
781 # 7,8,9 are left alone
783 # note that initial_query_count=5 means that we'll hit the
784 # first 5 servers in effectively random order (based upon
785 # response time), so we won't necessarily ever get a "pubkey
786 # doesn't match fingerprint" error (if we hit shnum>=1 before
787 # shnum=0, we pull the pubkey from there). To get repeatable
788 # specific failures, we need to set initial_query_count=1,
789 # but of course that will change the sequencing behavior of
790 # the retrieval process. TODO: find a reasonable way to make
791 # this a parameter, probably when we expand this test to test
792 # for one failure mode at a time.
794 # when we retrieve this, we should get three signature
795 # failures (where we've mangled seqnum, R, and segsize). The
797 d.addCallback(_corrupt_shares)
799 d.addCallback(lambda res: self._newnode3.download_to_data())
800 d.addCallback(_check_download_5)
802 def _check_empty_file(res):
803 # make sure we can create empty files, this usually screws up the
805 d1 = self.clients[2].create_mutable_file("")
806 d1.addCallback(lambda newnode: newnode.download_to_data())
807 d1.addCallback(lambda res: self.failUnlessEqual("", res))
809 d.addCallback(_check_empty_file)
811 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
812 def _created_dirnode(dnode):
813 log.msg("_created_dirnode(%s)" % (dnode,))
815 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
816 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
817 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
818 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
819 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
820 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
821 d1.addCallback(lambda res: dnode.build_manifest())
822 d1.addCallback(lambda manifest:
823 self.failUnlessEqual(len(manifest), 1))
825 d.addCallback(_created_dirnode)
827 def wait_for_c3_kg_conn():
828 return self.clients[3]._key_generator is not None
829 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
831 def check_kg_poolsize(junk, size_delta):
832 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
833 self.key_generator_svc.key_generator.pool_size + size_delta)
835 d.addCallback(check_kg_poolsize, 0)
836 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
837 d.addCallback(check_kg_poolsize, -1)
838 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
839 d.addCallback(check_kg_poolsize, -2)
840 # use_helper induces use of clients[3], which is the using-key_gen client
841 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
842 d.addCallback(check_kg_poolsize, -3)
845 # The default 120 second timeout went off when running it under valgrind
846 # on my old Windows laptop, so I'm bumping up the timeout.
847 test_mutable.timeout = 240
849 def flip_bit(self, good):
850 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
852 def mangle_uri(self, gooduri):
853 # change the key, which changes the storage index, which means we'll
854 # be asking about the wrong file, so nobody will have any shares
855 u = IFileURI(gooduri)
856 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
857 uri_extension_hash=u.uri_extension_hash,
858 needed_shares=u.needed_shares,
859 total_shares=u.total_shares,
861 return u2.to_string()
863 # TODO: add a test which mangles the uri_extension_hash instead, and
864 # should fail due to not being able to get a valid uri_extension block.
865 # Also a test which sneakily mangles the uri_extension block to change
866 # some of the validation data, so it will fail in the post-download phase
867 # when the file's crypttext integrity check fails. Do the same thing for
868 # the key, which should cause the download to fail the post-download
869 # plaintext_hash check.
871 def test_vdrive(self):
872 self.basedir = "system/SystemTest/test_vdrive"
873 self.data = LARGE_DATA
874 d = self.set_up_nodes(createprivdir=True)
875 d.addCallback(self._test_introweb)
876 d.addCallback(self.log, "starting publish")
877 d.addCallback(self._do_publish1)
878 d.addCallback(self._test_runner)
879 d.addCallback(self._do_publish2)
880 # at this point, we have the following filesystem (where "R" denotes
881 # self._root_directory_uri):
884 # R/subdir1/mydata567
886 # R/subdir1/subdir2/mydata992
888 d.addCallback(lambda res: self.bounce_client(0))
889 d.addCallback(self.log, "bounced client0")
891 d.addCallback(self._check_publish1)
892 d.addCallback(self.log, "did _check_publish1")
893 d.addCallback(self._check_publish2)
894 d.addCallback(self.log, "did _check_publish2")
895 d.addCallback(self._do_publish_private)
896 d.addCallback(self.log, "did _do_publish_private")
897 # now we also have (where "P" denotes a new dir):
898 # P/personal/sekrit data
899 # P/s2-rw -> /subdir1/subdir2/
900 # P/s2-ro -> /subdir1/subdir2/ (read-only)
901 d.addCallback(self._check_publish_private)
902 d.addCallback(self.log, "did _check_publish_private")
903 d.addCallback(self._test_web)
904 d.addCallback(self._test_control)
905 d.addCallback(self._test_cli)
906 # P now has four top-level children:
907 # P/personal/sekrit data
910 # P/test_put/ (empty)
911 d.addCallback(self._test_checker)
912 d.addCallback(self._test_verifier)
913 d.addCallback(self._grab_stats)
915 test_vdrive.timeout = 1100
917 def _test_introweb(self, res):
918 d = getPage(self.introweb_url, method="GET", followRedirect=True)
921 self.failUnless("allmydata: %s" % str(allmydata.__version__)
923 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
924 self.failUnless("Subscription Summary: storage: 5" in res)
925 except unittest.FailTest:
927 print "GET %s output was:" % self.introweb_url
930 d.addCallback(_check)
931 d.addCallback(lambda res:
932 getPage(self.introweb_url + "?t=json",
933 method="GET", followRedirect=True))
934 def _check_json(res):
935 data = simplejson.loads(res)
937 self.failUnlessEqual(data["subscription_summary"],
939 self.failUnlessEqual(data["announcement_summary"],
940 {"storage": 5, "stub_client": 5})
941 except unittest.FailTest:
943 print "GET %s?t=json output was:" % self.introweb_url
946 d.addCallback(_check_json)
949 def _do_publish1(self, res):
950 ut = upload.Data(self.data, convergence=None)
952 d = c0.create_empty_dirnode()
953 def _made_root(new_dirnode):
954 self._root_directory_uri = new_dirnode.get_uri()
955 return c0.create_node_from_uri(self._root_directory_uri)
956 d.addCallback(_made_root)
957 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
958 def _made_subdir1(subdir1_node):
959 self._subdir1_node = subdir1_node
960 d1 = subdir1_node.add_file(u"mydata567", ut)
961 d1.addCallback(self.log, "publish finished")
962 def _stash_uri(filenode):
963 self.uri = filenode.get_uri()
964 d1.addCallback(_stash_uri)
966 d.addCallback(_made_subdir1)
969 def _do_publish2(self, res):
970 ut = upload.Data(self.data, convergence=None)
971 d = self._subdir1_node.create_empty_directory(u"subdir2")
972 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
975 def log(self, res, msg, **kwargs):
976 # print "MSG: %s RES: %s" % (msg, res)
977 log.msg(msg, **kwargs)
980 def stall(self, res, delay=1.0):
982 reactor.callLater(delay, d.callback, res)
985 def _do_publish_private(self, res):
986 self.smalldata = "sssh, very secret stuff"
987 ut = upload.Data(self.smalldata, convergence=None)
988 d = self.clients[0].create_empty_dirnode()
989 d.addCallback(self.log, "GOT private directory")
990 def _got_new_dir(privnode):
991 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
992 d1 = privnode.create_empty_directory(u"personal")
993 d1.addCallback(self.log, "made P/personal")
994 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
995 d1.addCallback(self.log, "made P/personal/sekrit data")
996 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
998 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
999 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
1001 d1.addCallback(_got_s2)
1002 d1.addCallback(lambda res: privnode)
1004 d.addCallback(_got_new_dir)
1007 def _check_publish1(self, res):
1008 # this one uses the iterative API
1009 c1 = self.clients[1]
1010 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
1011 d.addCallback(self.log, "check_publish1 got /")
1012 d.addCallback(lambda root: root.get(u"subdir1"))
1013 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
1014 d.addCallback(lambda filenode: filenode.download_to_data())
1015 d.addCallback(self.log, "get finished")
1016 def _get_done(data):
1017 self.failUnlessEqual(data, self.data)
1018 d.addCallback(_get_done)
1021 def _check_publish2(self, res):
1022 # this one uses the path-based API
1023 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
1024 d = rootnode.get_child_at_path(u"subdir1")
1025 d.addCallback(lambda dirnode:
1026 self.failUnless(IDirectoryNode.providedBy(dirnode)))
1027 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1028 d.addCallback(lambda filenode: filenode.download_to_data())
1029 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
1031 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1032 def _got_filenode(filenode):
1033 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
1034 assert fnode == filenode
1035 d.addCallback(_got_filenode)
1038 def _check_publish_private(self, resnode):
1039 # this one uses the path-based API
1040 self._private_node = resnode
1042 d = self._private_node.get_child_at_path(u"personal")
1043 def _got_personal(personal):
1044 self._personal_node = personal
1046 d.addCallback(_got_personal)
1048 d.addCallback(lambda dirnode:
1049 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
1051 return self._private_node.get_child_at_path(path)
1053 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
1054 d.addCallback(lambda filenode: filenode.download_to_data())
1055 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
1056 d.addCallback(lambda res: get_path(u"s2-rw"))
1057 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
1058 d.addCallback(lambda res: get_path(u"s2-ro"))
1059 def _got_s2ro(dirnode):
1060 self.failUnless(dirnode.is_mutable(), dirnode)
1061 self.failUnless(dirnode.is_readonly(), dirnode)
1062 d1 = defer.succeed(None)
1063 d1.addCallback(lambda res: dirnode.list())
1064 d1.addCallback(self.log, "dirnode.list")
1066 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
1068 d1.addCallback(self.log, "doing add_file(ro)")
1069 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
1070 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
1072 d1.addCallback(self.log, "doing get(ro)")
1073 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
1074 d1.addCallback(lambda filenode:
1075 self.failUnless(IFileNode.providedBy(filenode)))
1077 d1.addCallback(self.log, "doing delete(ro)")
1078 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
1080 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
1082 d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
1084 personal = self._personal_node
1085 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
1087 d1.addCallback(self.log, "doing move_child_to(ro)2")
1088 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
1090 d1.addCallback(self.log, "finished with _got_s2ro")
1092 d.addCallback(_got_s2ro)
1093 def _got_home(dummy):
1094 home = self._private_node
1095 personal = self._personal_node
1096 d1 = defer.succeed(None)
1097 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
1098 d1.addCallback(lambda res:
1099 personal.move_child_to(u"sekrit data",home,u"sekrit"))
1101 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
1102 d1.addCallback(lambda res:
1103 home.move_child_to(u"sekrit", home, u"sekrit data"))
1105 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
1106 d1.addCallback(lambda res:
1107 home.move_child_to(u"sekrit data", personal))
1109 d1.addCallback(lambda res: home.build_manifest())
1110 d1.addCallback(self.log, "manifest")
1113 # P/personal/sekrit data
1114 # P/s2-rw (same as P/s2-ro)
1115 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1116 d1.addCallback(lambda manifest:
1117 self.failUnlessEqual(len(manifest), 4))
1119 d.addCallback(_got_home)
1122 def shouldFail(self, res, expected_failure, which, substring=None):
1123 if isinstance(res, Failure):
1124 res.trap(expected_failure)
1126 self.failUnless(substring in str(res),
1127 "substring '%s' not in '%s'"
1128 % (substring, str(res)))
1130 self.fail("%s was supposed to raise %s, not get '%s'" %
1131 (which, expected_failure, res))
1133 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1134 assert substring is None or isinstance(substring, str)
1135 d = defer.maybeDeferred(callable, *args, **kwargs)
1137 if isinstance(res, Failure):
1138 res.trap(expected_failure)
1140 self.failUnless(substring in str(res),
1141 "substring '%s' not in '%s'"
1142 % (substring, str(res)))
1144 self.fail("%s was supposed to raise %s, not get '%s'" %
1145 (which, expected_failure, res))
1149 def PUT(self, urlpath, data):
1150 url = self.webish_url + urlpath
1151 return getPage(url, method="PUT", postdata=data)
1153 def GET(self, urlpath, followRedirect=False):
1154 url = self.webish_url + urlpath
1155 return getPage(url, method="GET", followRedirect=followRedirect)
1157 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1159 url = self.helper_webish_url + urlpath
1161 url = self.webish_url + urlpath
1162 sepbase = "boogabooga"
1163 sep = "--" + sepbase
1166 form.append('Content-Disposition: form-data; name="_charset"')
1168 form.append('UTF-8')
1170 for name, value in fields.iteritems():
1171 if isinstance(value, tuple):
1172 filename, value = value
1173 form.append('Content-Disposition: form-data; name="%s"; '
1174 'filename="%s"' % (name, filename.encode("utf-8")))
1176 form.append('Content-Disposition: form-data; name="%s"' % name)
1178 form.append(str(value))
1181 body = "\r\n".join(form) + "\r\n"
1182 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1184 return getPage(url, method="POST", postdata=body,
1185 headers=headers, followRedirect=followRedirect)
1187 def _test_web(self, res):
1188 base = self.webish_url
1189 public = "uri/" + self._root_directory_uri
1191 def _got_welcome(page):
1192 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1193 self.failUnless(expected in page,
1194 "I didn't see the right 'connected storage servers'"
1195 " message in: %s" % page
1197 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1198 self.failUnless(expected in page,
1199 "I didn't see the right 'My nodeid' message "
1201 d.addCallback(_got_welcome)
1202 d.addCallback(self.log, "done with _got_welcome")
1204 # get the welcome page from the node that uses the helper too
1205 d.addCallback(lambda res: getPage(self.helper_webish_url))
1206 def _got_welcome_helper(page):
1207 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1209 d.addCallback(_got_welcome_helper)
1211 d.addCallback(lambda res: getPage(base + public))
1212 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1213 def _got_subdir1(page):
1214 # there ought to be an href for our file
1215 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1216 self.failUnless(">mydata567</a>" in page)
1217 d.addCallback(_got_subdir1)
1218 d.addCallback(self.log, "done with _got_subdir1")
1219 d.addCallback(lambda res:
1220 getPage(base + public + "/subdir1/mydata567"))
1221 def _got_data(page):
1222 self.failUnlessEqual(page, self.data)
1223 d.addCallback(_got_data)
1225 # download from a URI embedded in a URL
1226 d.addCallback(self.log, "_get_from_uri")
1227 def _get_from_uri(res):
1228 return getPage(base + "uri/%s?filename=%s"
1229 % (self.uri, "mydata567"))
1230 d.addCallback(_get_from_uri)
1231 def _got_from_uri(page):
1232 self.failUnlessEqual(page, self.data)
1233 d.addCallback(_got_from_uri)
1235 # download from a URI embedded in a URL, second form
1236 d.addCallback(self.log, "_get_from_uri2")
1237 def _get_from_uri2(res):
1238 return getPage(base + "uri?uri=%s" % (self.uri,))
1239 d.addCallback(_get_from_uri2)
1240 d.addCallback(_got_from_uri)
1242 # download from a bogus URI, make sure we get a reasonable error
1243 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1244 def _get_from_bogus_uri(res):
1245 d1 = getPage(base + "uri/%s?filename=%s"
1246 % (self.mangle_uri(self.uri), "mydata567"))
1247 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1250 d.addCallback(_get_from_bogus_uri)
1251 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1253 # upload a file with PUT
1254 d.addCallback(self.log, "about to try PUT")
1255 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1256 "new.txt contents"))
1257 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1258 d.addCallback(self.failUnlessEqual, "new.txt contents")
1259 # and again with something large enough to use multiple segments,
1260 # and hopefully trigger pauseProducing too
1261 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1262 "big" * 500000)) # 1.5MB
1263 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1264 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1266 # can we replace files in place?
1267 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1269 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1270 d.addCallback(self.failUnlessEqual, "NEWER contents")
1272 # test unlinked POST
1273 d.addCallback(lambda res: self.POST("uri", t="upload",
1274 file=("new.txt", "data" * 10000)))
1275 # and again using the helper, which exercises different upload-status
1277 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1278 file=("foo.txt", "data2" * 10000)))
1280 # check that the status page exists
1281 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1282 def _got_status(res):
1283 # find an interesting upload and download to look at. LIT files
1284 # are not interesting.
1285 for dl in self.clients[0].list_recent_downloads():
1286 if dl.get_size() > 200:
1287 self._down_status = dl.get_counter()
1288 for ul in self.clients[0].list_recent_uploads():
1289 if ul.get_size() > 200:
1290 self._up_status = ul.get_counter()
1291 rs = self.clients[0].list_recent_retrieve()[0]
1292 self._retrieve_status = rs.get_counter()
1293 ps = self.clients[0].list_recent_publish()[0]
1294 self._publish_status = ps.get_counter()
1296 # and that there are some upload- and download- status pages
1297 return self.GET("status/up-%d" % self._up_status)
1298 d.addCallback(_got_status)
1300 return self.GET("status/down-%d" % self._down_status)
1301 d.addCallback(_got_up)
1303 return self.GET("status/publish-%d" % self._publish_status)
1304 d.addCallback(_got_down)
1305 def _got_publish(res):
1306 return self.GET("status/retrieve-%d" % self._retrieve_status)
1307 d.addCallback(_got_publish)
1309 # check that the helper status page exists
1310 d.addCallback(lambda res:
1311 self.GET("helper_status", followRedirect=True))
1312 def _got_helper_status(res):
1313 self.failUnless("Bytes Fetched:" in res)
1314 # touch a couple of files in the helper's working directory to
1315 # exercise more code paths
1316 workdir = os.path.join(self.getdir("client0"), "helper")
1317 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1318 f = open(incfile, "wb")
1319 f.write("small file")
1321 then = time.time() - 86400*3
1323 os.utime(incfile, (now, then))
1324 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1325 f = open(encfile, "wb")
1326 f.write("less small file")
1328 os.utime(encfile, (now, then))
1329 d.addCallback(_got_helper_status)
1330 # and that the json form exists
1331 d.addCallback(lambda res:
1332 self.GET("helper_status?t=json", followRedirect=True))
1333 def _got_helper_status_json(res):
1334 data = simplejson.loads(res)
1335 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1337 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1338 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1339 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1341 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1342 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1343 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1345 d.addCallback(_got_helper_status_json)
1347 # and check that client[3] (which uses a helper but does not run one
1348 # itself) doesn't explode when you ask for its helper status with
1350 d.addCallback(lambda res:
1351 getPage(self.helper_webish_url + "helper_status?t=json"))
1352 def _got_non_helper_status_json(res):
1353 data = simplejson.loads(res)
1354 self.failUnlessEqual(data, {})
1355 d.addCallback(_got_non_helper_status_json)
1357 # see if the statistics page exists
1358 d.addCallback(lambda res: self.GET("statistics"))
1359 def _got_stats(res):
1360 self.failUnless("Node Statistics" in res)
1361 self.failUnless(" 'downloader.files_downloaded': 8," in res)
1362 d.addCallback(_got_stats)
1363 d.addCallback(lambda res: self.GET("statistics?t=json"))
1364 def _got_stats_json(res):
1365 data = simplejson.loads(res)
1366 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1367 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1368 d.addCallback(_got_stats_json)
1370 # TODO: mangle the second segment of a file, to test errors that
1371 # occur after we've already sent some good data, which uses a
1372 # different error path.
1374 # TODO: download a URI with a form
1375 # TODO: create a directory by using a form
1376 # TODO: upload by using a form on the directory page
1377 # url = base + "somedir/subdir1/freeform_post!!upload"
1378 # TODO: delete a file by using a button on the directory page
1382 def _test_runner(self, res):
1383 # exercise some of the diagnostic tools in runner.py
1386 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1387 if "storage" not in dirpath:
1391 pieces = dirpath.split(os.sep)
1392 if pieces[-4] == "storage" and pieces[-3] == "shares":
1393 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1394 # are sharefiles here
1395 filename = os.path.join(dirpath, filenames[0])
1396 # peek at the magic to see if it is a chk share
1397 magic = open(filename, "rb").read(4)
1398 if magic == '\x00\x00\x00\x01':
1401 self.fail("unable to find any uri_extension files in %s"
1403 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1405 out,err = StringIO(), StringIO()
1406 rc = runner.runner(["dump-share",
1408 stdout=out, stderr=err)
1409 output = out.getvalue()
1410 self.failUnlessEqual(rc, 0)
1412 # we only upload a single file, so we can assert some things about
1413 # its size and shares.
1414 self.failUnless(("share filename: %s" % filename) in output)
1415 self.failUnless("size: %d\n" % len(self.data) in output)
1416 self.failUnless("num_segments: 1\n" in output)
1417 # segment_size is always a multiple of needed_shares
1418 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1419 self.failUnless("total_shares: 10\n" in output)
1420 # keys which are supposed to be present
1421 for key in ("size", "num_segments", "segment_size",
1422 "needed_shares", "total_shares",
1423 "codec_name", "codec_params", "tail_codec_params",
1424 #"plaintext_hash", "plaintext_root_hash",
1425 "crypttext_hash", "crypttext_root_hash",
1426 "share_root_hash", "UEB_hash"):
1427 self.failUnless("%s: " % key in output, key)
1429 # now use its storage index to find the other shares using the
1430 # 'find-shares' tool
1431 sharedir, shnum = os.path.split(filename)
1432 storagedir, storage_index_s = os.path.split(sharedir)
1433 out,err = StringIO(), StringIO()
1434 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1435 cmd = ["find-shares", storage_index_s] + nodedirs
1436 rc = runner.runner(cmd, stdout=out, stderr=err)
1437 self.failUnlessEqual(rc, 0)
1439 sharefiles = [sfn.strip() for sfn in out.readlines()]
1440 self.failUnlessEqual(len(sharefiles), 10)
1442 # also exercise the 'catalog-shares' tool
1443 out,err = StringIO(), StringIO()
1444 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1445 cmd = ["catalog-shares"] + nodedirs
1446 rc = runner.runner(cmd, stdout=out, stderr=err)
1447 self.failUnlessEqual(rc, 0)
1449 descriptions = [sfn.strip() for sfn in out.readlines()]
1450 self.failUnlessEqual(len(descriptions), 30)
1452 for line in descriptions
1453 if line.startswith("CHK %s " % storage_index_s)]
1454 self.failUnlessEqual(len(matching), 10)
1456 def _test_control(self, res):
1457 # exercise the remote-control-the-client foolscap interfaces in
1458 # allmydata.control (mostly used for performance tests)
1459 c0 = self.clients[0]
1460 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1461 control_furl = open(control_furl_file, "r").read().strip()
1462 # it doesn't really matter which Tub we use to connect to the client,
1463 # so let's just use our IntroducerNode's
1464 d = self.introducer.tub.getReference(control_furl)
1465 d.addCallback(self._test_control2, control_furl_file)
1467 def _test_control2(self, rref, filename):
1468 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1469 downfile = os.path.join(self.basedir, "control.downfile")
1470 d.addCallback(lambda uri:
1471 rref.callRemote("download_from_uri_to_file",
1474 self.failUnlessEqual(res, downfile)
1475 data = open(downfile, "r").read()
1476 expected_data = open(filename, "r").read()
1477 self.failUnlessEqual(data, expected_data)
1478 d.addCallback(_check)
1479 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1480 if sys.platform == "linux2":
1481 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1482 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1485 def _test_cli(self, res):
1486 # run various CLI commands (in a thread, since they use blocking
1489 private_uri = self._private_node.get_uri()
1490 some_uri = self._root_directory_uri
1491 client0_basedir = self.getdir("client0")
1494 "--node-directory", client0_basedir,
1495 "--dir-cap", private_uri,
1498 "--node-url", self.webish_url,
1499 "--dir-cap", some_uri,
1501 TESTDATA = "I will not write the same thing over and over.\n" * 100
1503 d = defer.succeed(None)
1506 argv = ["ls"] + nodeargs
1507 return self._run_cli(argv)
1508 d.addCallback(_ls_root)
1509 def _check_ls_root((out,err)):
1510 self.failUnless("personal" in out)
1511 self.failUnless("s2-ro" in out)
1512 self.failUnless("s2-rw" in out)
1513 self.failUnlessEqual(err, "")
1514 d.addCallback(_check_ls_root)
1516 def _ls_subdir(res):
1517 argv = ["ls"] + nodeargs + ["personal"]
1518 return self._run_cli(argv)
1519 d.addCallback(_ls_subdir)
1520 def _check_ls_subdir((out,err)):
1521 self.failUnless("sekrit data" in out)
1522 self.failUnlessEqual(err, "")
1523 d.addCallback(_check_ls_subdir)
1525 def _ls_public_subdir(res):
1526 argv = ["ls"] + public_nodeargs + ["subdir1"]
1527 return self._run_cli(argv)
1528 d.addCallback(_ls_public_subdir)
1529 def _check_ls_public_subdir((out,err)):
1530 self.failUnless("subdir2" in out)
1531 self.failUnless("mydata567" in out)
1532 self.failUnlessEqual(err, "")
1533 d.addCallback(_check_ls_public_subdir)
1536 argv = ["ls"] + public_nodeargs + ["subdir1/mydata567"]
1537 return self._run_cli(argv)
1538 d.addCallback(_ls_file)
1539 def _check_ls_file((out,err)):
1540 self.failUnlessEqual(out.strip(), "112 subdir1/mydata567")
1541 self.failUnlessEqual(err, "")
1542 d.addCallback(_check_ls_file)
1544 # tahoe_ls doesn't currently handle the error correctly: it tries to
1545 # JSON-parse a traceback.
1546 ## def _ls_missing(res):
1547 ## argv = ["ls"] + nodeargs + ["bogus"]
1548 ## return self._run_cli(argv)
1549 ## d.addCallback(_ls_missing)
1550 ## def _check_ls_missing((out,err)):
1553 ## self.failUnlessEqual(err, "")
1554 ## d.addCallback(_check_ls_missing)
1557 tdir = self.getdir("cli_put")
1558 fileutil.make_dirs(tdir)
1559 fn = os.path.join(tdir, "upload_me")
1563 argv = ["put"] + nodeargs + [fn, "test_put/upload.txt"]
1564 return self._run_cli(argv)
1566 def _check_put((out,err)):
1567 self.failUnless("200 OK" in out)
1568 self.failUnlessEqual(err, "")
1569 d = self._private_node.get_child_at_path(u"test_put/upload.txt")
1570 d.addCallback(lambda filenode: filenode.download_to_data())
1571 def _check_put2(res):
1572 self.failUnlessEqual(res, TESTDATA)
1573 d.addCallback(_check_put2)
1575 d.addCallback(_check_put)
1577 def _get_to_stdout(res):
1578 argv = ["get"] + nodeargs + ["test_put/upload.txt"]
1579 return self._run_cli(argv)
1580 d.addCallback(_get_to_stdout)
1581 def _check_get_to_stdout((out,err)):
1582 self.failUnlessEqual(out, TESTDATA)
1583 self.failUnlessEqual(err, "")
1584 d.addCallback(_check_get_to_stdout)
1586 get_to_file_target = self.basedir + "/get.downfile"
1587 def _get_to_file(res):
1588 argv = ["get"] + nodeargs + ["test_put/upload.txt",
1590 return self._run_cli(argv)
1591 d.addCallback(_get_to_file)
1592 def _check_get_to_file((out,err)):
1593 data = open(get_to_file_target, "rb").read()
1594 self.failUnlessEqual(data, TESTDATA)
1595 self.failUnlessEqual(out, "")
1596 self.failUnlessEqual(err, "test_put/upload.txt retrieved and written to system/SystemTest/test_vdrive/get.downfile\n")
1597 d.addCallback(_check_get_to_file)
1601 argv = ["mv"] + nodeargs + ["test_put/upload.txt",
1602 "test_put/moved.txt"]
1603 return self._run_cli(argv)
1605 def _check_mv((out,err)):
1606 self.failUnless("OK" in out)
1607 self.failUnlessEqual(err, "")
1608 d = self.shouldFail2(KeyError, "test_cli._check_rm", "'upload.txt'", self._private_node.get_child_at_path, u"test_put/upload.txt")
1610 d.addCallback(lambda res:
1611 self._private_node.get_child_at_path(u"test_put/moved.txt"))
1612 d.addCallback(lambda filenode: filenode.download_to_data())
1613 def _check_mv2(res):
1614 self.failUnlessEqual(res, TESTDATA)
1615 d.addCallback(_check_mv2)
1617 d.addCallback(_check_mv)
1620 argv = ["rm"] + nodeargs + ["test_put/moved.txt"]
1621 return self._run_cli(argv)
1623 def _check_rm((out,err)):
1624 self.failUnless("200 OK" in out)
1625 self.failUnlessEqual(err, "")
1626 d = self.shouldFail2(KeyError, "test_cli._check_rm", "'moved.txt'", self._private_node.get_child_at_path, u"test_put/moved.txt")
1628 d.addCallback(_check_rm)
1631 def _run_cli(self, argv):
1632 stdout, stderr = StringIO(), StringIO()
1633 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1634 stdout=stdout, stderr=stderr)
1636 return stdout.getvalue(), stderr.getvalue()
1637 d.addCallback(_done)
1640 def _test_checker(self, res):
1641 d = self._private_node.build_manifest()
1642 d.addCallback(self._test_checker_2)
1645 def _test_checker_2(self, manifest):
1646 checker1 = self.clients[1].getServiceNamed("checker")
1647 self.failUnlessEqual(checker1.checker_results_for(None), [])
1648 self.failUnlessEqual(checker1.checker_results_for(list(manifest)[0]),
1651 starting_time = time.time()
1653 dl.append(checker1.check(si))
1654 d = deferredutil.DeferredListShouldSucceed(dl)
1656 def _check_checker_results(res):
1659 self.failUnless(i is True)
1661 (needed, total, found, sharemap) = i
1662 self.failUnlessEqual(needed, 3)
1663 self.failUnlessEqual(total, 10)
1664 self.failUnlessEqual(found, total)
1665 self.failUnlessEqual(len(sharemap.keys()), 10)
1667 for shpeers in sharemap.values():
1668 peers.update(shpeers)
1669 self.failUnlessEqual(len(peers), self.numclients)
1670 d.addCallback(_check_checker_results)
1672 def _check_stored_results(res):
1673 finish_time = time.time()
1676 results = checker1.checker_results_for(si)
1678 # TODO: implement checker for mutable files and implement tests of that checker
1680 self.failUnlessEqual(len(results), 1)
1681 when, those_results = results[0]
1682 self.failUnless(isinstance(when, (int, float)))
1683 self.failUnless(starting_time <= when <= finish_time)
1684 all_results.append(those_results)
1685 _check_checker_results(all_results)
1686 d.addCallback(_check_stored_results)
1688 d.addCallback(self._test_checker_3)
1691 def _test_checker_3(self, res):
1692 # check one file, through FileNode.check()
1693 d = self._private_node.get_child_at_path(u"personal/sekrit data")
1694 d.addCallback(lambda n: n.check())
1695 def _checked(results):
1696 # 'sekrit data' is small, and fits in a LiteralFileNode, so
1697 # checking it is trivial and always returns True
1698 self.failUnlessEqual(results, True)
1699 d.addCallback(_checked)
1701 c0 = self.clients[1]
1702 n = c0.create_node_from_uri(self._root_directory_uri)
1703 d.addCallback(lambda res: n.get_child_at_path(u"subdir1/mydata567"))
1704 d.addCallback(lambda n: n.check())
1705 def _checked2(results):
1706 # mydata567 is large and lives in a CHK
1707 (needed, total, found, sharemap) = results
1708 self.failUnlessEqual(needed, 3)
1709 self.failUnlessEqual(total, 10)
1710 self.failUnlessEqual(found, 10)
1711 self.failUnlessEqual(len(sharemap), 10)
1712 for shnum in range(10):
1713 self.failUnlessEqual(len(sharemap[shnum]), 1)
1714 d.addCallback(_checked2)
1718 def _test_verifier(self, res):
1719 checker1 = self.clients[1].getServiceNamed("checker")
1720 d = self._private_node.build_manifest()
1721 def _check_all(manifest):
1724 dl.append(checker1.verify(si))
1725 return deferredutil.DeferredListShouldSucceed(dl)
1726 d.addCallback(_check_all)
1729 self.failUnless(i is True)
1730 d.addCallback(_done)
1731 d.addCallback(lambda res: checker1.verify(None))
1732 d.addCallback(self.failUnlessEqual, True)