2 from base64 import b32encode
3 import os, sys, time, re, simplejson
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.application import service
11 from allmydata import client, uri, download, upload, storage, offloaded
12 from allmydata.introducer import IntroducerNode
13 from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
14 from allmydata.util import log
15 from allmydata.scripts import runner, cli
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
17 from allmydata.mutable.common import NotMutableError
18 from allmydata.mutable import layout as mutable_layout
19 from allmydata.stats import PickleStatsGatherer
20 from allmydata.key_generator import KeyGeneratorService
21 from foolscap.eventual import flushEventualQueue, fireEventually
22 from foolscap import DeadReferenceError, Tub
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
27 def flush_but_dont_ignore(res):
28 d = flushEventualQueue()
35 This is some data to publish to the virtual drive, which needs to be large
36 enough to not fit inside a LIT uri.
39 class CountingDataUploadable(upload.Data):
41 interrupt_after = None
42 interrupt_after_d = None
44 def read(self, length):
45 self.bytes_read += length
46 if self.interrupt_after is not None:
47 if self.bytes_read > self.interrupt_after:
48 self.interrupt_after = None
49 self.interrupt_after_d.callback(self)
50 return upload.Data.read(self, length)
53 class SystemTest(testutil.SignalMixin, testutil.PollMixin, testutil.StallMixin,
57 self.sparent = service.MultiService()
58 self.sparent.startService()
60 log.msg("shutting down SystemTest services")
61 d = self.sparent.stopService()
62 d.addBoth(flush_but_dont_ignore)
65 def getdir(self, subdir):
66 return os.path.join(self.basedir, subdir)
68 def add_service(self, s):
69 s.setServiceParent(self.sparent)
72 def set_up_nodes(self, NUMCLIENTS=5):
73 self.numclients = NUMCLIENTS
74 iv_dir = self.getdir("introducer")
75 if not os.path.isdir(iv_dir):
76 fileutil.make_dirs(iv_dir)
77 f = open(os.path.join(iv_dir, "webport"), "w")
78 f.write("tcp:0:interface=127.0.0.1\n")
80 iv = IntroducerNode(basedir=iv_dir)
81 self.introducer = self.add_service(iv)
82 d = self.introducer.when_tub_ready()
83 d.addCallback(self._get_introducer_web)
84 d.addCallback(self._set_up_stats_gatherer)
85 d.addCallback(self._set_up_key_generator)
86 d.addCallback(self._set_up_nodes_2)
87 d.addCallback(self._grab_stats)
90 def _get_introducer_web(self, res):
91 f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
92 self.introweb_url = f.read().strip()
95 def _set_up_stats_gatherer(self, res):
96 statsdir = self.getdir("stats_gatherer")
97 fileutil.make_dirs(statsdir)
100 l = t.listenOn("tcp:0")
102 t.setLocation("localhost:%d" % p)
104 self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
105 self.add_service(self.stats_gatherer)
106 self.stats_gatherer_furl = self.stats_gatherer.get_furl()
108 def _set_up_key_generator(self, res):
109 kgsdir = self.getdir("key_generator")
110 fileutil.make_dirs(kgsdir)
112 self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False)
113 self.key_generator_svc.key_generator.pool_size = 4
114 self.key_generator_svc.key_generator.pool_refresh_delay = 60
115 self.add_service(self.key_generator_svc)
118 def check_for_furl():
119 return os.path.exists(os.path.join(kgsdir, 'key_generator.furl'))
120 d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
122 kgf = os.path.join(kgsdir, 'key_generator.furl')
123 self.key_generator_furl = file(kgf, 'rb').read().strip()
124 d.addCallback(get_furl)
127 def _set_up_nodes_2(self, res):
129 self.introducer_furl = q.introducer_url
132 for i in range(self.numclients):
133 basedir = self.getdir("client%d" % i)
134 basedirs.append(basedir)
135 fileutil.make_dirs(basedir)
137 # client[0] runs a webserver and a helper, no key_generator
138 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
139 open(os.path.join(basedir, "run_helper"), "w").write("yes\n")
140 open(os.path.join(basedir, "sizelimit"), "w").write("10GB\n")
142 # client[3] runs a webserver and uses a helper, uses key_generator
143 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
144 kgf = "%s\n" % (self.key_generator_furl,)
145 open(os.path.join(basedir, "key_generator.furl"), "w").write(kgf)
146 open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
147 open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
149 # start client[0], wait for it's tub to be ready (at which point it
150 # will have registered the helper furl).
151 c = self.add_service(client.Client(basedir=basedirs[0]))
152 self.clients.append(c)
153 d = c.when_tub_ready()
155 f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
156 helper_furl = f.read()
158 self.helper_furl = helper_furl
159 f = open(os.path.join(basedirs[3],"helper.furl"), "w")
163 # this starts the rest of the clients
164 for i in range(1, self.numclients):
165 c = self.add_service(client.Client(basedir=basedirs[i]))
166 self.clients.append(c)
168 return self.wait_for_connections()
169 d.addCallback(_ready)
172 # now find out where the web port was
173 l = self.clients[0].getServiceNamed("webish").listener
174 port = l._port.getHost().port
175 self.webish_url = "http://localhost:%d/" % port
176 # and the helper-using webport
177 l = self.clients[3].getServiceNamed("webish").listener
178 port = l._port.getHost().port
179 self.helper_webish_url = "http://localhost:%d/" % port
180 d.addCallback(_connected)
183 def _grab_stats(self, res):
184 d = self.stats_gatherer.poll()
187 def bounce_client(self, num):
188 c = self.clients[num]
189 d = c.disownServiceParent()
190 # I think windows requires a moment to let the connection really stop
191 # and the port number made available for re-use. TODO: examine the
192 # behavior, see if this is really the problem, see if we can do
193 # better than blindly waiting for a second.
194 d.addCallback(self.stall, 1.0)
196 new_c = client.Client(basedir=self.getdir("client%d" % num))
197 self.clients[num] = new_c
198 self.add_service(new_c)
199 return new_c.when_tub_ready()
200 d.addCallback(_stopped)
201 d.addCallback(lambda res: self.wait_for_connections())
202 def _maybe_get_webport(res):
204 # now find out where the web port was
205 l = self.clients[0].getServiceNamed("webish").listener
206 port = l._port.getHost().port
207 self.webish_url = "http://localhost:%d/" % port
208 d.addCallback(_maybe_get_webport)
211 def add_extra_node(self, client_num, helper_furl=None,
212 add_to_sparent=False):
213 # usually this node is *not* parented to our self.sparent, so we can
214 # shut it down separately from the rest, to exercise the
215 # connection-lost code
216 basedir = self.getdir("client%d" % client_num)
217 if not os.path.isdir(basedir):
218 fileutil.make_dirs(basedir)
219 open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
221 f = open(os.path.join(basedir, "helper.furl") ,"w")
222 f.write(helper_furl+"\n")
225 c = client.Client(basedir=basedir)
226 self.clients.append(c)
229 c.setServiceParent(self.sparent)
232 d = self.wait_for_connections()
233 d.addCallback(lambda res: c)
236 def _check_connections(self):
237 for c in self.clients:
238 ic = c.introducer_client
239 if not ic.connected_to_introducer():
241 if len(ic.get_all_peerids()) != self.numclients:
245 def wait_for_connections(self, ignored=None):
246 # TODO: replace this with something that takes a list of peerids and
247 # fires when they've all been heard from, instead of using a count
249 return self.poll(self._check_connections, timeout=200)
251 def test_connections(self):
252 self.basedir = "system/SystemTest/test_connections"
253 d = self.set_up_nodes()
254 self.extra_node = None
255 d.addCallback(lambda res: self.add_extra_node(self.numclients))
256 def _check(extra_node):
257 self.extra_node = extra_node
258 for c in self.clients:
259 all_peerids = list(c.get_all_peerids())
260 self.failUnlessEqual(len(all_peerids), self.numclients+1)
261 permuted_peers = list(c.get_permuted_peers("storage", "a"))
262 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
264 d.addCallback(_check)
265 def _shutdown_extra_node(res):
267 return self.extra_node.stopService()
269 d.addBoth(_shutdown_extra_node)
271 test_connections.timeout = 300
272 # test_connections is subsumed by test_upload_and_download, and takes
273 # quite a while to run on a slow machine (because of all the TLS
274 # connections that must be established). If we ever rework the introducer
275 # code to such an extent that we're not sure if it works anymore, we can
276 # reinstate this test until it does.
279 def test_upload_and_download_random_key(self):
280 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
281 return self._test_upload_and_download(convergence=None)
282 test_upload_and_download_random_key.timeout = 4800
284 def test_upload_and_download_convergent(self):
285 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
286 return self._test_upload_and_download(convergence="some convergence string")
287 test_upload_and_download_convergent.timeout = 4800
289 def _test_upload_and_download(self, convergence):
290 # we use 4000 bytes of data, which will result in about 400k written
291 # to disk among all our simulated nodes
292 DATA = "Some data to upload\n" * 200
293 d = self.set_up_nodes()
294 def _check_connections(res):
295 for c in self.clients:
296 all_peerids = list(c.get_all_peerids())
297 self.failUnlessEqual(len(all_peerids), self.numclients)
298 permuted_peers = list(c.get_permuted_peers("storage", "a"))
299 self.failUnlessEqual(len(permuted_peers), self.numclients)
300 d.addCallback(_check_connections)
304 u = self.clients[0].getServiceNamed("uploader")
306 # we crank the max segsize down to 1024b for the duration of this
307 # test, so we can exercise multiple segments. It is important
308 # that this is not a multiple of the segment size, so that the
309 # tail segment is not the same length as the others. This actualy
310 # gets rounded up to 1025 to be a multiple of the number of
311 # required shares (since we use 25 out of 100 FEC).
312 up = upload.Data(DATA, convergence=convergence)
313 up.max_segment_size = 1024
316 d.addCallback(_do_upload)
317 def _upload_done(results):
319 log.msg("upload finished: uri is %s" % (uri,))
321 dl = self.clients[1].getServiceNamed("downloader")
323 d.addCallback(_upload_done)
325 def _upload_again(res):
326 # Upload again. If using convergent encryption then this ought to be
327 # short-circuited, however with the way we currently generate URIs
328 # (i.e. because they include the roothash), we have to do all of the
329 # encoding work, and only get to save on the upload part.
330 log.msg("UPLOADING AGAIN")
331 up = upload.Data(DATA, convergence=convergence)
332 up.max_segment_size = 1024
333 d1 = self.uploader.upload(up)
334 d.addCallback(_upload_again)
336 def _download_to_data(res):
337 log.msg("DOWNLOADING")
338 return self.downloader.download_to_data(self.uri)
339 d.addCallback(_download_to_data)
340 def _download_to_data_done(data):
341 log.msg("download finished")
342 self.failUnlessEqual(data, DATA)
343 d.addCallback(_download_to_data_done)
345 target_filename = os.path.join(self.basedir, "download.target")
346 def _download_to_filename(res):
347 return self.downloader.download_to_filename(self.uri,
349 d.addCallback(_download_to_filename)
350 def _download_to_filename_done(res):
351 newdata = open(target_filename, "rb").read()
352 self.failUnlessEqual(newdata, DATA)
353 d.addCallback(_download_to_filename_done)
355 target_filename2 = os.path.join(self.basedir, "download.target2")
356 def _download_to_filehandle(res):
357 fh = open(target_filename2, "wb")
358 return self.downloader.download_to_filehandle(self.uri, fh)
359 d.addCallback(_download_to_filehandle)
360 def _download_to_filehandle_done(fh):
362 newdata = open(target_filename2, "rb").read()
363 self.failUnlessEqual(newdata, DATA)
364 d.addCallback(_download_to_filehandle_done)
366 def _download_nonexistent_uri(res):
367 baduri = self.mangle_uri(self.uri)
368 log.msg("about to download non-existent URI", level=log.UNUSUAL,
369 facility="tahoe.tests")
370 d1 = self.downloader.download_to_data(baduri)
371 def _baduri_should_fail(res):
372 log.msg("finished downloading non-existend URI",
373 level=log.UNUSUAL, facility="tahoe.tests")
374 self.failUnless(isinstance(res, Failure))
375 self.failUnless(res.check(download.NotEnoughSharesError),
376 "expected NotEnoughSharesError, got %s" % res)
377 # TODO: files that have zero peers should get a special kind
378 # of NotEnoughSharesError, which can be used to suggest that
379 # the URI might be wrong or that they've never uploaded the
380 # file in the first place.
381 d1.addBoth(_baduri_should_fail)
383 d.addCallback(_download_nonexistent_uri)
385 # add a new node, which doesn't accept shares, and only uses the
387 d.addCallback(lambda res: self.add_extra_node(self.numclients,
389 add_to_sparent=True))
390 def _added(extra_node):
391 self.extra_node = extra_node
392 extra_node.getServiceNamed("storage").sizelimit = 0
393 d.addCallback(_added)
395 HELPER_DATA = "Data that needs help to upload" * 1000
396 def _upload_with_helper(res):
397 u = upload.Data(HELPER_DATA, convergence=convergence)
398 d = self.extra_node.upload(u)
399 def _uploaded(results):
401 return self.downloader.download_to_data(uri)
402 d.addCallback(_uploaded)
404 self.failUnlessEqual(newdata, HELPER_DATA)
405 d.addCallback(_check)
407 d.addCallback(_upload_with_helper)
409 def _upload_duplicate_with_helper(res):
410 u = upload.Data(HELPER_DATA, convergence=convergence)
411 u.debug_stash_RemoteEncryptedUploadable = True
412 d = self.extra_node.upload(u)
413 def _uploaded(results):
415 return self.downloader.download_to_data(uri)
416 d.addCallback(_uploaded)
418 self.failUnlessEqual(newdata, HELPER_DATA)
419 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
420 "uploadable started uploading, should have been avoided")
421 d.addCallback(_check)
423 if convergence is not None:
424 d.addCallback(_upload_duplicate_with_helper)
426 def _upload_resumable(res):
427 DATA = "Data that needs help to upload and gets interrupted" * 1000
428 u1 = CountingDataUploadable(DATA, convergence=convergence)
429 u2 = CountingDataUploadable(DATA, convergence=convergence)
431 # we interrupt the connection after about 5kB by shutting down
432 # the helper, then restartingit.
433 u1.interrupt_after = 5000
434 u1.interrupt_after_d = defer.Deferred()
435 u1.interrupt_after_d.addCallback(lambda res:
436 self.bounce_client(0))
438 # sneak into the helper and reduce its chunk size, so that our
439 # debug_interrupt will sever the connection on about the fifth
440 # chunk fetched. This makes sure that we've started to write the
441 # new shares before we abandon them, which exercises the
442 # abort/delete-partial-share code. TODO: find a cleaner way to do
443 # this. I know that this will affect later uses of the helper in
444 # this same test run, but I'm not currently worried about it.
445 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
447 d = self.extra_node.upload(u1)
449 def _should_not_finish(res):
450 self.fail("interrupted upload should have failed, not finished"
451 " with result %s" % (res,))
453 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
455 # make sure we actually interrupted it before finishing the
457 self.failUnless(u1.bytes_read < len(DATA),
458 "read %d out of %d total" % (u1.bytes_read,
461 log.msg("waiting for reconnect", level=log.NOISY,
462 facility="tahoe.test.test_system")
463 # now, we need to give the nodes a chance to notice that this
464 # connection has gone away. When this happens, the storage
465 # servers will be told to abort their uploads, removing the
466 # partial shares. Unfortunately this involves TCP messages
467 # going through the loopback interface, and we can't easily
468 # predict how long that will take. If it were all local, we
469 # could use fireEventually() to stall. Since we don't have
470 # the right introduction hooks, the best we can do is use a
471 # fixed delay. TODO: this is fragile.
472 u1.interrupt_after_d.addCallback(self.stall, 2.0)
473 return u1.interrupt_after_d
474 d.addCallbacks(_should_not_finish, _interrupted)
476 def _disconnected(res):
477 # check to make sure the storage servers aren't still hanging
478 # on to the partial share: their incoming/ directories should
480 log.msg("disconnected", level=log.NOISY,
481 facility="tahoe.test.test_system")
482 for i in range(self.numclients):
483 incdir = os.path.join(self.getdir("client%d" % i),
484 "storage", "shares", "incoming")
485 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
486 d.addCallback(_disconnected)
488 # then we need to give the reconnector a chance to
489 # reestablish the connection to the helper.
490 d.addCallback(lambda res:
491 log.msg("wait_for_connections", level=log.NOISY,
492 facility="tahoe.test.test_system"))
493 d.addCallback(lambda res: self.wait_for_connections())
496 d.addCallback(lambda res:
497 log.msg("uploading again", level=log.NOISY,
498 facility="tahoe.test.test_system"))
499 d.addCallback(lambda res: self.extra_node.upload(u2))
501 def _uploaded(results):
503 log.msg("Second upload complete", level=log.NOISY,
504 facility="tahoe.test.test_system")
506 # this is really bytes received rather than sent, but it's
507 # convenient and basically measures the same thing
508 bytes_sent = results.ciphertext_fetched
510 # We currently don't support resumption of upload if the data is
511 # encrypted with a random key. (Because that would require us
512 # to store the key locally and re-use it on the next upload of
513 # this file, which isn't a bad thing to do, but we currently
515 if convergence is not None:
516 # Make sure we did not have to read the whole file the
517 # second time around .
518 self.failUnless(bytes_sent < len(DATA),
519 "resumption didn't save us any work:"
520 " read %d bytes out of %d total" %
521 (bytes_sent, len(DATA)))
523 # Make sure we did have to read the whole file the second
524 # time around -- because the one that we partially uploaded
525 # earlier was encrypted with a different random key.
526 self.failIf(bytes_sent < len(DATA),
527 "resumption saved us some work even though we were using random keys:"
528 " read %d bytes out of %d total" %
529 (bytes_sent, len(DATA)))
530 return self.downloader.download_to_data(uri)
531 d.addCallback(_uploaded)
534 self.failUnlessEqual(newdata, DATA)
535 # If using convergent encryption, then also check that the
536 # helper has removed the temp file from its directories.
537 if convergence is not None:
538 basedir = os.path.join(self.getdir("client0"), "helper")
539 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
540 self.failUnlessEqual(files, [])
541 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
542 self.failUnlessEqual(files, [])
543 d.addCallback(_check)
545 d.addCallback(_upload_resumable)
549 def _find_shares(self, basedir):
551 for (dirpath, dirnames, filenames) in os.walk(basedir):
552 if "storage" not in dirpath:
556 pieces = dirpath.split(os.sep)
557 if pieces[-4] == "storage" and pieces[-3] == "shares":
558 # we're sitting in .../storage/shares/$START/$SINDEX , and there
559 # are sharefiles here
560 assert pieces[-5].startswith("client")
561 client_num = int(pieces[-5][-1])
562 storage_index_s = pieces[-1]
563 storage_index = storage.si_a2b(storage_index_s)
564 for sharename in filenames:
565 shnum = int(sharename)
566 filename = os.path.join(dirpath, sharename)
567 data = (client_num, storage_index, filename, shnum)
570 self.fail("unable to find any share files in %s" % basedir)
573 def _corrupt_mutable_share(self, filename, which):
574 msf = storage.MutableShareFile(filename)
575 datav = msf.readv([ (0, 1000000) ])
576 final_share = datav[0]
577 assert len(final_share) < 1000000 # ought to be truncated
578 pieces = mutable_layout.unpack_share(final_share)
579 (seqnum, root_hash, IV, k, N, segsize, datalen,
580 verification_key, signature, share_hash_chain, block_hash_tree,
581 share_data, enc_privkey) = pieces
583 if which == "seqnum":
586 root_hash = self.flip_bit(root_hash)
588 IV = self.flip_bit(IV)
589 elif which == "segsize":
590 segsize = segsize + 15
591 elif which == "pubkey":
592 verification_key = self.flip_bit(verification_key)
593 elif which == "signature":
594 signature = self.flip_bit(signature)
595 elif which == "share_hash_chain":
596 nodenum = share_hash_chain.keys()[0]
597 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
598 elif which == "block_hash_tree":
599 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
600 elif which == "share_data":
601 share_data = self.flip_bit(share_data)
602 elif which == "encprivkey":
603 enc_privkey = self.flip_bit(enc_privkey)
605 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
607 final_share = mutable_layout.pack_share(prefix,
614 msf.writev( [(0, final_share)], None)
616 def test_mutable(self):
617 self.basedir = "system/SystemTest/test_mutable"
618 DATA = "initial contents go here." # 25 bytes % 3 != 0
619 NEWDATA = "new contents yay"
620 NEWERDATA = "this is getting old"
622 d = self.set_up_nodes()
624 def _create_mutable(res):
626 log.msg("starting create_mutable_file")
627 d1 = c.create_mutable_file(DATA)
629 log.msg("DONE: %s" % (res,))
630 self._mutable_node_1 = res
632 d1.addCallback(_done)
634 d.addCallback(_create_mutable)
636 def _test_debug(res):
637 # find a share. It is important to run this while there is only
638 # one slot in the grid.
639 shares = self._find_shares(self.basedir)
640 (client_num, storage_index, filename, shnum) = shares[0]
641 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
643 log.msg(" for clients[%d]" % client_num)
645 out,err = StringIO(), StringIO()
646 rc = runner.runner(["dump-share",
648 stdout=out, stderr=err)
649 output = out.getvalue()
650 self.failUnlessEqual(rc, 0)
652 self.failUnless("Mutable slot found:\n" in output)
653 self.failUnless("share_type: SDMF\n" in output)
654 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
655 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
656 self.failUnless(" num_extra_leases: 0\n" in output)
657 # the pubkey size can vary by a byte, so the container might
658 # be a bit larger on some runs.
659 m = re.search(r'^ container_size: (\d+)$', output, re.M)
661 container_size = int(m.group(1))
662 self.failUnless(2037 <= container_size <= 2049, container_size)
663 m = re.search(r'^ data_length: (\d+)$', output, re.M)
665 data_length = int(m.group(1))
666 self.failUnless(2037 <= data_length <= 2049, data_length)
667 self.failUnless(" secrets are for nodeid: %s\n" % peerid
669 self.failUnless(" SDMF contents:\n" in output)
670 self.failUnless(" seqnum: 1\n" in output)
671 self.failUnless(" required_shares: 3\n" in output)
672 self.failUnless(" total_shares: 10\n" in output)
673 self.failUnless(" segsize: 27\n" in output, (output, filename))
674 self.failUnless(" datalen: 25\n" in output)
675 # the exact share_hash_chain nodes depends upon the sharenum,
676 # and is more of a hassle to compute than I want to deal with
678 self.failUnless(" share_hash_chain: " in output)
679 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
680 except unittest.FailTest:
682 print "dump-share output was:"
685 d.addCallback(_test_debug)
689 # first, let's see if we can use the existing node to retrieve the
690 # contents. This allows it to use the cached pubkey and maybe the
691 # latest-known sharemap.
693 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
694 def _check_download_1(res):
695 self.failUnlessEqual(res, DATA)
696 # now we see if we can retrieve the data from a new node,
697 # constructed using the URI of the original one. We do this test
698 # on the same client that uploaded the data.
699 uri = self._mutable_node_1.get_uri()
700 log.msg("starting retrieve1")
701 newnode = self.clients[0].create_node_from_uri(uri)
702 newnode_2 = self.clients[0].create_node_from_uri(uri)
703 self.failUnlessIdentical(newnode, newnode_2)
704 return newnode.download_best_version()
705 d.addCallback(_check_download_1)
707 def _check_download_2(res):
708 self.failUnlessEqual(res, DATA)
709 # same thing, but with a different client
710 uri = self._mutable_node_1.get_uri()
711 newnode = self.clients[1].create_node_from_uri(uri)
712 log.msg("starting retrieve2")
713 d1 = newnode.download_best_version()
714 d1.addCallback(lambda res: (res, newnode))
716 d.addCallback(_check_download_2)
718 def _check_download_3((res, newnode)):
719 self.failUnlessEqual(res, DATA)
721 log.msg("starting replace1")
722 d1 = newnode.overwrite(NEWDATA)
723 d1.addCallback(lambda res: newnode.download_best_version())
725 d.addCallback(_check_download_3)
727 def _check_download_4(res):
728 self.failUnlessEqual(res, NEWDATA)
729 # now create an even newer node and replace the data on it. This
730 # new node has never been used for download before.
731 uri = self._mutable_node_1.get_uri()
732 newnode1 = self.clients[2].create_node_from_uri(uri)
733 newnode2 = self.clients[3].create_node_from_uri(uri)
734 self._newnode3 = self.clients[3].create_node_from_uri(uri)
735 log.msg("starting replace2")
736 d1 = newnode1.overwrite(NEWERDATA)
737 d1.addCallback(lambda res: newnode2.download_best_version())
739 d.addCallback(_check_download_4)
741 def _check_download_5(res):
742 log.msg("finished replace2")
743 self.failUnlessEqual(res, NEWERDATA)
744 d.addCallback(_check_download_5)
746 def _corrupt_shares(res):
747 # run around and flip bits in all but k of the shares, to test
749 shares = self._find_shares(self.basedir)
750 ## sort by share number
751 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
752 where = dict([ (shnum, filename)
753 for (client_num, storage_index, filename, shnum)
755 assert len(where) == 10 # this test is designed for 3-of-10
756 for shnum, filename in where.items():
757 # shares 7,8,9 are left alone. read will check
758 # (share_hash_chain, block_hash_tree, share_data). New
759 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
760 # segsize, signature).
762 # read: this will trigger "pubkey doesn't match
764 self._corrupt_mutable_share(filename, "pubkey")
765 self._corrupt_mutable_share(filename, "encprivkey")
767 # triggers "signature is invalid"
768 self._corrupt_mutable_share(filename, "seqnum")
770 # triggers "signature is invalid"
771 self._corrupt_mutable_share(filename, "R")
773 # triggers "signature is invalid"
774 self._corrupt_mutable_share(filename, "segsize")
776 self._corrupt_mutable_share(filename, "share_hash_chain")
778 self._corrupt_mutable_share(filename, "block_hash_tree")
780 self._corrupt_mutable_share(filename, "share_data")
781 # other things to correct: IV, signature
782 # 7,8,9 are left alone
784 # note that initial_query_count=5 means that we'll hit the
785 # first 5 servers in effectively random order (based upon
786 # response time), so we won't necessarily ever get a "pubkey
787 # doesn't match fingerprint" error (if we hit shnum>=1 before
788 # shnum=0, we pull the pubkey from there). To get repeatable
789 # specific failures, we need to set initial_query_count=1,
790 # but of course that will change the sequencing behavior of
791 # the retrieval process. TODO: find a reasonable way to make
792 # this a parameter, probably when we expand this test to test
793 # for one failure mode at a time.
795 # when we retrieve this, we should get three signature
796 # failures (where we've mangled seqnum, R, and segsize). The
798 d.addCallback(_corrupt_shares)
800 d.addCallback(lambda res: self._newnode3.download_best_version())
801 d.addCallback(_check_download_5)
803 def _check_empty_file(res):
804 # make sure we can create empty files, this usually screws up the
806 d1 = self.clients[2].create_mutable_file("")
807 d1.addCallback(lambda newnode: newnode.download_best_version())
808 d1.addCallback(lambda res: self.failUnlessEqual("", res))
810 d.addCallback(_check_empty_file)
812 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
813 def _created_dirnode(dnode):
814 log.msg("_created_dirnode(%s)" % (dnode,))
816 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
817 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
818 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
819 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
820 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
821 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
822 d1.addCallback(lambda res: dnode.build_manifest())
823 d1.addCallback(lambda manifest:
824 self.failUnlessEqual(len(manifest), 1))
826 d.addCallback(_created_dirnode)
828 def wait_for_c3_kg_conn():
829 return self.clients[3]._key_generator is not None
830 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
832 def check_kg_poolsize(junk, size_delta):
833 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
834 self.key_generator_svc.key_generator.pool_size + size_delta)
836 d.addCallback(check_kg_poolsize, 0)
837 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
838 d.addCallback(check_kg_poolsize, -1)
839 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
840 d.addCallback(check_kg_poolsize, -2)
841 # use_helper induces use of clients[3], which is the using-key_gen client
842 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
843 d.addCallback(check_kg_poolsize, -3)
846 # The default 120 second timeout went off when running it under valgrind
847 # on my old Windows laptop, so I'm bumping up the timeout.
848 test_mutable.timeout = 240
850 def flip_bit(self, good):
851 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
853 def mangle_uri(self, gooduri):
854 # change the key, which changes the storage index, which means we'll
855 # be asking about the wrong file, so nobody will have any shares
856 u = IFileURI(gooduri)
857 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
858 uri_extension_hash=u.uri_extension_hash,
859 needed_shares=u.needed_shares,
860 total_shares=u.total_shares,
862 return u2.to_string()
864 # TODO: add a test which mangles the uri_extension_hash instead, and
865 # should fail due to not being able to get a valid uri_extension block.
866 # Also a test which sneakily mangles the uri_extension block to change
867 # some of the validation data, so it will fail in the post-download phase
868 # when the file's crypttext integrity check fails. Do the same thing for
869 # the key, which should cause the download to fail the post-download
870 # plaintext_hash check.
872 def test_vdrive(self):
873 self.basedir = "system/SystemTest/test_vdrive"
874 self.data = LARGE_DATA
875 d = self.set_up_nodes()
876 d.addCallback(self._test_introweb)
877 d.addCallback(self.log, "starting publish")
878 d.addCallback(self._do_publish1)
879 d.addCallback(self._test_runner)
880 d.addCallback(self._do_publish2)
881 # at this point, we have the following filesystem (where "R" denotes
882 # self._root_directory_uri):
885 # R/subdir1/mydata567
887 # R/subdir1/subdir2/mydata992
889 d.addCallback(lambda res: self.bounce_client(0))
890 d.addCallback(self.log, "bounced client0")
892 d.addCallback(self._check_publish1)
893 d.addCallback(self.log, "did _check_publish1")
894 d.addCallback(self._check_publish2)
895 d.addCallback(self.log, "did _check_publish2")
896 d.addCallback(self._do_publish_private)
897 d.addCallback(self.log, "did _do_publish_private")
898 # now we also have (where "P" denotes a new dir):
899 # P/personal/sekrit data
900 # P/s2-rw -> /subdir1/subdir2/
901 # P/s2-ro -> /subdir1/subdir2/ (read-only)
902 d.addCallback(self._check_publish_private)
903 d.addCallback(self.log, "did _check_publish_private")
904 d.addCallback(self._test_web)
905 d.addCallback(self._test_control)
906 d.addCallback(self._test_cli)
907 # P now has four top-level children:
908 # P/personal/sekrit data
911 # P/test_put/ (empty)
912 d.addCallback(self._test_checker)
913 d.addCallback(self._test_verifier)
914 d.addCallback(self._grab_stats)
916 test_vdrive.timeout = 1100
918 def _test_introweb(self, res):
919 d = getPage(self.introweb_url, method="GET", followRedirect=True)
922 self.failUnless("allmydata: %s" % str(allmydata.__version__)
924 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
925 self.failUnless("Subscription Summary: storage: 5" in res)
926 except unittest.FailTest:
928 print "GET %s output was:" % self.introweb_url
931 d.addCallback(_check)
932 d.addCallback(lambda res:
933 getPage(self.introweb_url + "?t=json",
934 method="GET", followRedirect=True))
935 def _check_json(res):
936 data = simplejson.loads(res)
938 self.failUnlessEqual(data["subscription_summary"],
940 self.failUnlessEqual(data["announcement_summary"],
941 {"storage": 5, "stub_client": 5})
942 except unittest.FailTest:
944 print "GET %s?t=json output was:" % self.introweb_url
947 d.addCallback(_check_json)
950 def _do_publish1(self, res):
951 ut = upload.Data(self.data, convergence=None)
953 d = c0.create_empty_dirnode()
954 def _made_root(new_dirnode):
955 self._root_directory_uri = new_dirnode.get_uri()
956 return c0.create_node_from_uri(self._root_directory_uri)
957 d.addCallback(_made_root)
958 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
959 def _made_subdir1(subdir1_node):
960 self._subdir1_node = subdir1_node
961 d1 = subdir1_node.add_file(u"mydata567", ut)
962 d1.addCallback(self.log, "publish finished")
963 def _stash_uri(filenode):
964 self.uri = filenode.get_uri()
965 d1.addCallback(_stash_uri)
967 d.addCallback(_made_subdir1)
970 def _do_publish2(self, res):
971 ut = upload.Data(self.data, convergence=None)
972 d = self._subdir1_node.create_empty_directory(u"subdir2")
973 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
976 def log(self, res, msg, **kwargs):
977 # print "MSG: %s RES: %s" % (msg, res)
978 log.msg(msg, **kwargs)
981 def _do_publish_private(self, res):
982 self.smalldata = "sssh, very secret stuff"
983 ut = upload.Data(self.smalldata, convergence=None)
984 d = self.clients[0].create_empty_dirnode()
985 d.addCallback(self.log, "GOT private directory")
986 def _got_new_dir(privnode):
987 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
988 d1 = privnode.create_empty_directory(u"personal")
989 d1.addCallback(self.log, "made P/personal")
990 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
991 d1.addCallback(self.log, "made P/personal/sekrit data")
992 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
994 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
995 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
997 d1.addCallback(_got_s2)
998 d1.addCallback(lambda res: privnode)
1000 d.addCallback(_got_new_dir)
1003 def _check_publish1(self, res):
1004 # this one uses the iterative API
1005 c1 = self.clients[1]
1006 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
1007 d.addCallback(self.log, "check_publish1 got /")
1008 d.addCallback(lambda root: root.get(u"subdir1"))
1009 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
1010 d.addCallback(lambda filenode: filenode.download_to_data())
1011 d.addCallback(self.log, "get finished")
1012 def _get_done(data):
1013 self.failUnlessEqual(data, self.data)
1014 d.addCallback(_get_done)
1017 def _check_publish2(self, res):
1018 # this one uses the path-based API
1019 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
1020 d = rootnode.get_child_at_path(u"subdir1")
1021 d.addCallback(lambda dirnode:
1022 self.failUnless(IDirectoryNode.providedBy(dirnode)))
1023 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1024 d.addCallback(lambda filenode: filenode.download_to_data())
1025 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
1027 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1028 def _got_filenode(filenode):
1029 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
1030 assert fnode == filenode
1031 d.addCallback(_got_filenode)
1034 def _check_publish_private(self, resnode):
1035 # this one uses the path-based API
1036 self._private_node = resnode
1038 d = self._private_node.get_child_at_path(u"personal")
1039 def _got_personal(personal):
1040 self._personal_node = personal
1042 d.addCallback(_got_personal)
1044 d.addCallback(lambda dirnode:
1045 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
1047 return self._private_node.get_child_at_path(path)
1049 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
1050 d.addCallback(lambda filenode: filenode.download_to_data())
1051 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
1052 d.addCallback(lambda res: get_path(u"s2-rw"))
1053 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
1054 d.addCallback(lambda res: get_path(u"s2-ro"))
1055 def _got_s2ro(dirnode):
1056 self.failUnless(dirnode.is_mutable(), dirnode)
1057 self.failUnless(dirnode.is_readonly(), dirnode)
1058 d1 = defer.succeed(None)
1059 d1.addCallback(lambda res: dirnode.list())
1060 d1.addCallback(self.log, "dirnode.list")
1062 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
1064 d1.addCallback(self.log, "doing add_file(ro)")
1065 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
1066 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
1068 d1.addCallback(self.log, "doing get(ro)")
1069 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
1070 d1.addCallback(lambda filenode:
1071 self.failUnless(IFileNode.providedBy(filenode)))
1073 d1.addCallback(self.log, "doing delete(ro)")
1074 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
1076 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
1078 d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
1080 personal = self._personal_node
1081 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
1083 d1.addCallback(self.log, "doing move_child_to(ro)2")
1084 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
1086 d1.addCallback(self.log, "finished with _got_s2ro")
1088 d.addCallback(_got_s2ro)
1089 def _got_home(dummy):
1090 home = self._private_node
1091 personal = self._personal_node
1092 d1 = defer.succeed(None)
1093 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
1094 d1.addCallback(lambda res:
1095 personal.move_child_to(u"sekrit data",home,u"sekrit"))
1097 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
1098 d1.addCallback(lambda res:
1099 home.move_child_to(u"sekrit", home, u"sekrit data"))
1101 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
1102 d1.addCallback(lambda res:
1103 home.move_child_to(u"sekrit data", personal))
1105 d1.addCallback(lambda res: home.build_manifest())
1106 d1.addCallback(self.log, "manifest")
1109 # P/personal/sekrit data
1110 # P/s2-rw (same as P/s2-ro)
1111 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1112 d1.addCallback(lambda manifest:
1113 self.failUnlessEqual(len(manifest), 4))
1114 d1.addCallback(lambda res: home.deep_stats())
1115 def _check_stats(stats):
1116 expected = {"count-immutable-files": 1,
1117 "count-mutable-files": 0,
1118 "count-literal-files": 1,
1120 "count-directories": 3,
1121 "size-immutable-files": 112,
1122 "size-literal-files": 23,
1123 #"size-directories": 616, # varies
1124 #"largest-directory": 616,
1125 "largest-directory-children": 3,
1126 "largest-immutable-file": 112,
1128 for k,v in expected.iteritems():
1129 self.failUnlessEqual(stats[k], v,
1130 "stats[%s] was %s, not %s" %
1132 self.failUnless(stats["size-directories"] > 1300,
1133 stats["size-directories"])
1134 self.failUnless(stats["largest-directory"] > 800,
1135 stats["largest-directory"])
1136 self.failUnlessEqual(stats["size-files-histogram"],
1137 [ (11, 31, 1), (101, 316, 1) ])
1138 d1.addCallback(_check_stats)
1140 d.addCallback(_got_home)
1143 def shouldFail(self, res, expected_failure, which, substring=None):
1144 if isinstance(res, Failure):
1145 res.trap(expected_failure)
1147 self.failUnless(substring in str(res),
1148 "substring '%s' not in '%s'"
1149 % (substring, str(res)))
1151 self.fail("%s was supposed to raise %s, not get '%s'" %
1152 (which, expected_failure, res))
1154 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1155 assert substring is None or isinstance(substring, str)
1156 d = defer.maybeDeferred(callable, *args, **kwargs)
1158 if isinstance(res, Failure):
1159 res.trap(expected_failure)
1161 self.failUnless(substring in str(res),
1162 "substring '%s' not in '%s'"
1163 % (substring, str(res)))
1165 self.fail("%s was supposed to raise %s, not get '%s'" %
1166 (which, expected_failure, res))
1170 def PUT(self, urlpath, data):
1171 url = self.webish_url + urlpath
1172 return getPage(url, method="PUT", postdata=data)
1174 def GET(self, urlpath, followRedirect=False):
1175 url = self.webish_url + urlpath
1176 return getPage(url, method="GET", followRedirect=followRedirect)
1178 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1180 url = self.helper_webish_url + urlpath
1182 url = self.webish_url + urlpath
1183 sepbase = "boogabooga"
1184 sep = "--" + sepbase
1187 form.append('Content-Disposition: form-data; name="_charset"')
1189 form.append('UTF-8')
1191 for name, value in fields.iteritems():
1192 if isinstance(value, tuple):
1193 filename, value = value
1194 form.append('Content-Disposition: form-data; name="%s"; '
1195 'filename="%s"' % (name, filename.encode("utf-8")))
1197 form.append('Content-Disposition: form-data; name="%s"' % name)
1199 form.append(str(value))
1202 body = "\r\n".join(form) + "\r\n"
1203 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1205 return getPage(url, method="POST", postdata=body,
1206 headers=headers, followRedirect=followRedirect)
1208 def _test_web(self, res):
1209 base = self.webish_url
1210 public = "uri/" + self._root_directory_uri
1212 def _got_welcome(page):
1213 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1214 self.failUnless(expected in page,
1215 "I didn't see the right 'connected storage servers'"
1216 " message in: %s" % page
1218 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1219 self.failUnless(expected in page,
1220 "I didn't see the right 'My nodeid' message "
1222 self.failUnless("Helper: 0 active uploads" in page)
1223 d.addCallback(_got_welcome)
1224 d.addCallback(self.log, "done with _got_welcome")
1226 # get the welcome page from the node that uses the helper too
1227 d.addCallback(lambda res: getPage(self.helper_webish_url))
1228 def _got_welcome_helper(page):
1229 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1231 self.failUnless("Not running helper" in page)
1232 d.addCallback(_got_welcome_helper)
1234 d.addCallback(lambda res: getPage(base + public))
1235 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1236 def _got_subdir1(page):
1237 # there ought to be an href for our file
1238 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1239 self.failUnless(">mydata567</a>" in page)
1240 d.addCallback(_got_subdir1)
1241 d.addCallback(self.log, "done with _got_subdir1")
1242 d.addCallback(lambda res:
1243 getPage(base + public + "/subdir1/mydata567"))
1244 def _got_data(page):
1245 self.failUnlessEqual(page, self.data)
1246 d.addCallback(_got_data)
1248 # download from a URI embedded in a URL
1249 d.addCallback(self.log, "_get_from_uri")
1250 def _get_from_uri(res):
1251 return getPage(base + "uri/%s?filename=%s"
1252 % (self.uri, "mydata567"))
1253 d.addCallback(_get_from_uri)
1254 def _got_from_uri(page):
1255 self.failUnlessEqual(page, self.data)
1256 d.addCallback(_got_from_uri)
1258 # download from a URI embedded in a URL, second form
1259 d.addCallback(self.log, "_get_from_uri2")
1260 def _get_from_uri2(res):
1261 return getPage(base + "uri?uri=%s" % (self.uri,))
1262 d.addCallback(_get_from_uri2)
1263 d.addCallback(_got_from_uri)
1265 # download from a bogus URI, make sure we get a reasonable error
1266 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1267 def _get_from_bogus_uri(res):
1268 d1 = getPage(base + "uri/%s?filename=%s"
1269 % (self.mangle_uri(self.uri), "mydata567"))
1270 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1273 d.addCallback(_get_from_bogus_uri)
1274 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1276 # upload a file with PUT
1277 d.addCallback(self.log, "about to try PUT")
1278 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1279 "new.txt contents"))
1280 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1281 d.addCallback(self.failUnlessEqual, "new.txt contents")
1282 # and again with something large enough to use multiple segments,
1283 # and hopefully trigger pauseProducing too
1284 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1285 "big" * 500000)) # 1.5MB
1286 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1287 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1289 # can we replace files in place?
1290 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1292 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1293 d.addCallback(self.failUnlessEqual, "NEWER contents")
1295 # test unlinked POST
1296 d.addCallback(lambda res: self.POST("uri", t="upload",
1297 file=("new.txt", "data" * 10000)))
1298 # and again using the helper, which exercises different upload-status
1300 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1301 file=("foo.txt", "data2" * 10000)))
1303 # check that the status page exists
1304 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1305 def _got_status(res):
1306 # find an interesting upload and download to look at. LIT files
1307 # are not interesting.
1308 for ds in self.clients[0].list_all_download_statuses():
1309 if ds.get_size() > 200:
1310 self._down_status = ds.get_counter()
1311 for us in self.clients[0].list_all_upload_statuses():
1312 if us.get_size() > 200:
1313 self._up_status = us.get_counter()
1314 rs = self.clients[0].list_all_retrieve_statuses()[0]
1315 self._retrieve_status = rs.get_counter()
1316 ps = self.clients[0].list_all_publish_statuses()[0]
1317 self._publish_status = ps.get_counter()
1318 us = self.clients[0].list_all_mapupdate_statuses()[0]
1319 self._update_status = us.get_counter()
1321 # and that there are some upload- and download- status pages
1322 return self.GET("status/up-%d" % self._up_status)
1323 d.addCallback(_got_status)
1325 return self.GET("status/down-%d" % self._down_status)
1326 d.addCallback(_got_up)
1328 return self.GET("status/mapupdate-%d" % self._update_status)
1329 d.addCallback(_got_down)
1330 def _got_update(res):
1331 return self.GET("status/publish-%d" % self._publish_status)
1332 d.addCallback(_got_update)
1333 def _got_publish(res):
1334 return self.GET("status/retrieve-%d" % self._retrieve_status)
1335 d.addCallback(_got_publish)
1337 # check that the helper status page exists
1338 d.addCallback(lambda res:
1339 self.GET("helper_status", followRedirect=True))
1340 def _got_helper_status(res):
1341 self.failUnless("Bytes Fetched:" in res)
1342 # touch a couple of files in the helper's working directory to
1343 # exercise more code paths
1344 workdir = os.path.join(self.getdir("client0"), "helper")
1345 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1346 f = open(incfile, "wb")
1347 f.write("small file")
1349 then = time.time() - 86400*3
1351 os.utime(incfile, (now, then))
1352 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1353 f = open(encfile, "wb")
1354 f.write("less small file")
1356 os.utime(encfile, (now, then))
1357 d.addCallback(_got_helper_status)
1358 # and that the json form exists
1359 d.addCallback(lambda res:
1360 self.GET("helper_status?t=json", followRedirect=True))
1361 def _got_helper_status_json(res):
1362 data = simplejson.loads(res)
1363 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1365 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1366 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1367 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1369 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1370 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1371 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1373 d.addCallback(_got_helper_status_json)
1375 # and check that client[3] (which uses a helper but does not run one
1376 # itself) doesn't explode when you ask for its status
1377 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1378 def _got_non_helper_status(res):
1379 self.failUnless("Upload and Download Status" in res)
1380 d.addCallback(_got_non_helper_status)
1382 # or for helper status with t=json
1383 d.addCallback(lambda res:
1384 getPage(self.helper_webish_url + "helper_status?t=json"))
1385 def _got_non_helper_status_json(res):
1386 data = simplejson.loads(res)
1387 self.failUnlessEqual(data, {})
1388 d.addCallback(_got_non_helper_status_json)
1390 # see if the statistics page exists
1391 d.addCallback(lambda res: self.GET("statistics"))
1392 def _got_stats(res):
1393 self.failUnless("Node Statistics" in res)
1394 self.failUnless(" 'downloader.files_downloaded': 8," in res)
1395 d.addCallback(_got_stats)
1396 d.addCallback(lambda res: self.GET("statistics?t=json"))
1397 def _got_stats_json(res):
1398 data = simplejson.loads(res)
1399 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1400 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1401 d.addCallback(_got_stats_json)
1403 # TODO: mangle the second segment of a file, to test errors that
1404 # occur after we've already sent some good data, which uses a
1405 # different error path.
1407 # TODO: download a URI with a form
1408 # TODO: create a directory by using a form
1409 # TODO: upload by using a form on the directory page
1410 # url = base + "somedir/subdir1/freeform_post!!upload"
1411 # TODO: delete a file by using a button on the directory page
1415 def _test_runner(self, res):
1416 # exercise some of the diagnostic tools in runner.py
1419 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1420 if "storage" not in dirpath:
1424 pieces = dirpath.split(os.sep)
1425 if pieces[-4] == "storage" and pieces[-3] == "shares":
1426 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1427 # are sharefiles here
1428 filename = os.path.join(dirpath, filenames[0])
1429 # peek at the magic to see if it is a chk share
1430 magic = open(filename, "rb").read(4)
1431 if magic == '\x00\x00\x00\x01':
1434 self.fail("unable to find any uri_extension files in %s"
1436 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1438 out,err = StringIO(), StringIO()
1439 rc = runner.runner(["dump-share",
1441 stdout=out, stderr=err)
1442 output = out.getvalue()
1443 self.failUnlessEqual(rc, 0)
1445 # we only upload a single file, so we can assert some things about
1446 # its size and shares.
1447 self.failUnless(("share filename: %s" % filename) in output)
1448 self.failUnless("size: %d\n" % len(self.data) in output)
1449 self.failUnless("num_segments: 1\n" in output)
1450 # segment_size is always a multiple of needed_shares
1451 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1452 self.failUnless("total_shares: 10\n" in output)
1453 # keys which are supposed to be present
1454 for key in ("size", "num_segments", "segment_size",
1455 "needed_shares", "total_shares",
1456 "codec_name", "codec_params", "tail_codec_params",
1457 #"plaintext_hash", "plaintext_root_hash",
1458 "crypttext_hash", "crypttext_root_hash",
1459 "share_root_hash", "UEB_hash"):
1460 self.failUnless("%s: " % key in output, key)
1462 # now use its storage index to find the other shares using the
1463 # 'find-shares' tool
1464 sharedir, shnum = os.path.split(filename)
1465 storagedir, storage_index_s = os.path.split(sharedir)
1466 out,err = StringIO(), StringIO()
1467 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1468 cmd = ["find-shares", storage_index_s] + nodedirs
1469 rc = runner.runner(cmd, stdout=out, stderr=err)
1470 self.failUnlessEqual(rc, 0)
1472 sharefiles = [sfn.strip() for sfn in out.readlines()]
1473 self.failUnlessEqual(len(sharefiles), 10)
1475 # also exercise the 'catalog-shares' tool
1476 out,err = StringIO(), StringIO()
1477 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1478 cmd = ["catalog-shares"] + nodedirs
1479 rc = runner.runner(cmd, stdout=out, stderr=err)
1480 self.failUnlessEqual(rc, 0)
1482 descriptions = [sfn.strip() for sfn in out.readlines()]
1483 self.failUnlessEqual(len(descriptions), 30)
1485 for line in descriptions
1486 if line.startswith("CHK %s " % storage_index_s)]
1487 self.failUnlessEqual(len(matching), 10)
1489 def _test_control(self, res):
1490 # exercise the remote-control-the-client foolscap interfaces in
1491 # allmydata.control (mostly used for performance tests)
1492 c0 = self.clients[0]
1493 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1494 control_furl = open(control_furl_file, "r").read().strip()
1495 # it doesn't really matter which Tub we use to connect to the client,
1496 # so let's just use our IntroducerNode's
1497 d = self.introducer.tub.getReference(control_furl)
1498 d.addCallback(self._test_control2, control_furl_file)
1500 def _test_control2(self, rref, filename):
1501 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1502 downfile = os.path.join(self.basedir, "control.downfile")
1503 d.addCallback(lambda uri:
1504 rref.callRemote("download_from_uri_to_file",
1507 self.failUnlessEqual(res, downfile)
1508 data = open(downfile, "r").read()
1509 expected_data = open(filename, "r").read()
1510 self.failUnlessEqual(data, expected_data)
1511 d.addCallback(_check)
1512 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1513 if sys.platform == "linux2":
1514 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1515 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1518 def _test_cli(self, res):
1519 # run various CLI commands (in a thread, since they use blocking
1522 private_uri = self._private_node.get_uri()
1523 some_uri = self._root_directory_uri
1524 client0_basedir = self.getdir("client0")
1527 "--node-directory", client0_basedir,
1529 TESTDATA = "I will not write the same thing over and over.\n" * 100
1531 d = defer.succeed(None)
1533 # for compatibility with earlier versions, private/root_dir.cap is
1534 # supposed to be treated as an alias named "tahoe:". Start by making
1535 # sure that works, before we add other aliases.
1537 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1538 f = open(root_file, "w")
1539 f.write(private_uri)
1542 def run(ignored, verb, *args):
1543 newargs = [verb] + nodeargs + list(args)
1544 return self._run_cli(newargs)
1546 def _check_ls((out,err), expected_children, unexpected_children=[]):
1547 self.failUnlessEqual(err, "")
1548 for s in expected_children:
1549 self.failUnless(s in out, s)
1550 for s in unexpected_children:
1551 self.failIf(s in out, s)
1553 def _check_ls_root((out,err)):
1554 self.failUnless("personal" in out)
1555 self.failUnless("s2-ro" in out)
1556 self.failUnless("s2-rw" in out)
1557 self.failUnlessEqual(err, "")
1559 # this should reference private_uri
1560 d.addCallback(run, "ls")
1561 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1563 d.addCallback(run, "list-aliases")
1564 def _check_aliases_1((out,err)):
1565 self.failUnlessEqual(err, "")
1566 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1567 d.addCallback(_check_aliases_1)
1569 # now that that's out of the way, remove root_dir.cap and work with
1571 d.addCallback(lambda res: os.unlink(root_file))
1572 d.addCallback(run, "list-aliases")
1573 def _check_aliases_2((out,err)):
1574 self.failUnlessEqual(err, "")
1575 self.failUnlessEqual(out, "")
1576 d.addCallback(_check_aliases_2)
1578 d.addCallback(run, "mkdir")
1579 def _got_dir( (out,err) ):
1580 self.failUnless(uri.from_string_dirnode(out.strip()))
1582 d.addCallback(_got_dir)
1583 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1585 d.addCallback(run, "list-aliases")
1586 def _check_aliases_3((out,err)):
1587 self.failUnlessEqual(err, "")
1588 self.failUnless("tahoe: " in out)
1589 d.addCallback(_check_aliases_3)
1591 def _check_empty_dir((out,err)):
1592 self.failUnlessEqual(out, "")
1593 self.failUnlessEqual(err, "")
1594 d.addCallback(run, "ls")
1595 d.addCallback(_check_empty_dir)
1597 def _check_missing_dir((out,err)):
1598 # TODO: check that rc==2
1599 self.failUnlessEqual(out, "")
1600 self.failUnlessEqual(err, "No such file or directory\n")
1601 d.addCallback(run, "ls", "bogus")
1602 d.addCallback(_check_missing_dir)
1607 fn = os.path.join(self.basedir, "file%d" % i)
1609 data = "data to be uploaded: file%d\n" % i
1611 open(fn,"wb").write(data)
1613 def _check_stdout_against((out,err), filenum=None, data=None):
1614 self.failUnlessEqual(err, "")
1615 if filenum is not None:
1616 self.failUnlessEqual(out, datas[filenum])
1617 if data is not None:
1618 self.failUnlessEqual(out, data)
1620 # test all both forms of put: from a file, and from stdin
1622 d.addCallback(run, "put", files[0], "tahoe-file0")
1623 def _put_out((out,err)):
1624 self.failUnless("URI:LIT:" in out, out)
1625 self.failUnless("201 Created" in err, err)
1627 return run(None, "get", uri0)
1628 d.addCallback(_put_out)
1629 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1631 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1632 # tahoe put bar tahoe:FOO
1633 d.addCallback(run, "put", files[2], "tahoe:file2")
1634 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1635 def _check_put_mutable((out,err)):
1636 self._mutable_file3_uri = out.strip()
1637 d.addCallback(_check_put_mutable)
1638 d.addCallback(run, "get", "tahoe:file3")
1639 d.addCallback(_check_stdout_against, 3)
1641 def _put_from_stdin(res, data, *args):
1642 args = nodeargs + list(args)
1643 o = cli.PutOptions()
1644 o.parseOptions(args)
1645 stdin = StringIO(data)
1646 stdout, stderr = StringIO(), StringIO()
1647 d = threads.deferToThread(cli.put, o,
1648 stdout=stdout, stderr=stderr, stdin=stdin)
1650 return stdout.getvalue(), stderr.getvalue()
1651 d.addCallback(_done)
1655 STDIN_DATA = "This is the file to upload from stdin."
1656 d.addCallback(_put_from_stdin,
1659 # tahoe put tahoe:FOO
1660 d.addCallback(_put_from_stdin,
1661 "Other file from stdin.",
1664 d.addCallback(run, "ls")
1665 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1666 "tahoe-file-stdin", "from-stdin"])
1667 d.addCallback(run, "ls", "subdir")
1668 d.addCallback(_check_ls, ["tahoe-file1"])
1671 d.addCallback(run, "mkdir", "subdir2")
1672 d.addCallback(run, "ls")
1673 # TODO: extract the URI, set an alias with it
1674 d.addCallback(_check_ls, ["subdir2"])
1676 # tahoe get: (to stdin and to a file)
1677 d.addCallback(run, "get", "tahoe-file0")
1678 d.addCallback(_check_stdout_against, 0)
1679 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1680 d.addCallback(_check_stdout_against, 1)
1681 outfile0 = os.path.join(self.basedir, "outfile0")
1682 d.addCallback(run, "get", "file2", outfile0)
1683 def _check_outfile0((out,err)):
1684 data = open(outfile0,"rb").read()
1685 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1686 d.addCallback(_check_outfile0)
1687 outfile1 = os.path.join(self.basedir, "outfile0")
1688 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1689 def _check_outfile1((out,err)):
1690 data = open(outfile1,"rb").read()
1691 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1692 d.addCallback(_check_outfile1)
1694 d.addCallback(run, "rm", "tahoe-file0")
1695 d.addCallback(run, "rm", "tahoe:file2")
1696 d.addCallback(run, "ls")
1697 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1699 d.addCallback(run, "ls", "-l")
1700 def _check_ls_l((out,err)):
1701 lines = out.split("\n")
1703 if "tahoe-file-stdin" in l:
1704 self.failUnless(l.startswith("-r-- "), l)
1705 self.failUnless(" %d " % len(STDIN_DATA) in l)
1707 self.failUnless(l.startswith("-rw- "), l) # mutable
1708 d.addCallback(_check_ls_l)
1710 d.addCallback(run, "ls", "--uri")
1711 def _check_ls_uri((out,err)):
1712 lines = out.split("\n")
1715 self.failUnless(self._mutable_file3_uri in l)
1716 d.addCallback(_check_ls_uri)
1718 d.addCallback(run, "ls", "--readonly-uri")
1719 def _check_ls_rouri((out,err)):
1720 lines = out.split("\n")
1723 rw_uri = self._mutable_file3_uri
1724 u = uri.from_string_mutable_filenode(rw_uri)
1725 ro_uri = u.get_readonly().to_string()
1726 self.failUnless(ro_uri in l)
1727 d.addCallback(_check_ls_rouri)
1730 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1731 d.addCallback(run, "ls")
1732 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1734 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1735 d.addCallback(run, "ls")
1736 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1738 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1739 d.addCallback(run, "ls")
1740 d.addCallback(_check_ls, ["file3", "file3-copy"])
1741 d.addCallback(run, "get", "tahoe:file3-copy")
1742 d.addCallback(_check_stdout_against, 3)
1744 # copy from disk into tahoe
1745 d.addCallback(run, "cp", files[4], "tahoe:file4")
1746 d.addCallback(run, "ls")
1747 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1748 d.addCallback(run, "get", "tahoe:file4")
1749 d.addCallback(_check_stdout_against, 4)
1751 # copy from tahoe into disk
1752 target_filename = os.path.join(self.basedir, "file-out")
1753 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1754 def _check_cp_out((out,err)):
1755 self.failUnless(os.path.exists(target_filename))
1756 got = open(target_filename,"rb").read()
1757 self.failUnlessEqual(got, datas[4])
1758 d.addCallback(_check_cp_out)
1760 # copy from disk to disk (silly case)
1761 target2_filename = os.path.join(self.basedir, "file-out-copy")
1762 d.addCallback(run, "cp", target_filename, target2_filename)
1763 def _check_cp_out2((out,err)):
1764 self.failUnless(os.path.exists(target2_filename))
1765 got = open(target2_filename,"rb").read()
1766 self.failUnlessEqual(got, datas[4])
1767 d.addCallback(_check_cp_out2)
1769 # copy from tahoe into disk, overwriting an existing file
1770 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1771 def _check_cp_out3((out,err)):
1772 self.failUnless(os.path.exists(target_filename))
1773 got = open(target_filename,"rb").read()
1774 self.failUnlessEqual(got, datas[3])
1775 d.addCallback(_check_cp_out3)
1777 # copy from disk into tahoe, overwriting an existing immutable file
1778 d.addCallback(run, "cp", files[5], "tahoe:file4")
1779 d.addCallback(run, "ls")
1780 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1781 d.addCallback(run, "get", "tahoe:file4")
1782 d.addCallback(_check_stdout_against, 5)
1784 # copy from disk into tahoe, overwriting an existing mutable file
1785 d.addCallback(run, "cp", files[5], "tahoe:file3")
1786 d.addCallback(run, "ls")
1787 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1788 d.addCallback(run, "get", "tahoe:file3")
1789 d.addCallback(_check_stdout_against, 5)
1791 # recursive copy: setup
1792 dn = os.path.join(self.basedir, "dir1")
1794 open(os.path.join(dn, "file1"), "wb").write("file1")
1795 open(os.path.join(dn, "file2"), "wb").write("file2")
1796 open(os.path.join(dn, "file3"), "wb").write("file3")
1797 sdn2 = os.path.join(dn, "subdir2")
1799 open(os.path.join(dn, "file4"), "wb").write("file4")
1800 open(os.path.join(dn, "file5"), "wb").write("file5")
1802 # from disk into tahoe
1803 #d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1804 #d.addCallback(run, "ls")
1805 #d.addCallback(_check_ls, ["dir1"])
1806 #d.addCallback(run, "ls", "dir1")
1807 #d.addCallback(_check_ls, ["file1", "file2", "file3", "subdir2"])
1808 #d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1809 #d.addCallback(_check_ls, ["file4", "file5"])
1810 #d.addCallback(run, "get", "dir1/subdir2/file4")
1811 #d.addCallback(_check_stdout_against, data="file4")
1813 # tahoe_ls doesn't currently handle the error correctly: it tries to
1814 # JSON-parse a traceback.
1815 ## def _ls_missing(res):
1816 ## argv = ["ls"] + nodeargs + ["bogus"]
1817 ## return self._run_cli(argv)
1818 ## d.addCallback(_ls_missing)
1819 ## def _check_ls_missing((out,err)):
1822 ## self.failUnlessEqual(err, "")
1823 ## d.addCallback(_check_ls_missing)
1827 def _run_cli(self, argv):
1829 stdout, stderr = StringIO(), StringIO()
1830 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1831 stdout=stdout, stderr=stderr)
1833 return stdout.getvalue(), stderr.getvalue()
1834 d.addCallback(_done)
1837 def _test_checker(self, res):
1838 d = self._private_node.build_manifest()
1839 d.addCallback(self._test_checker_2)
1842 def _test_checker_2(self, manifest):
1843 checker1 = self.clients[1].getServiceNamed("checker")
1844 self.failUnlessEqual(checker1.checker_results_for(None), [])
1845 self.failUnlessEqual(checker1.checker_results_for(list(manifest)[0]),
1848 starting_time = time.time()
1850 dl.append(checker1.check(si))
1851 d = deferredutil.DeferredListShouldSucceed(dl)
1853 def _check_checker_results(res):
1856 self.failUnless(i is True)
1858 (needed, total, found, sharemap) = i
1859 self.failUnlessEqual(needed, 3)
1860 self.failUnlessEqual(total, 10)
1861 self.failUnlessEqual(found, total)
1862 self.failUnlessEqual(len(sharemap.keys()), 10)
1864 for shpeers in sharemap.values():
1865 peers.update(shpeers)
1866 self.failUnlessEqual(len(peers), self.numclients)
1867 d.addCallback(_check_checker_results)
1869 def _check_stored_results(res):
1870 finish_time = time.time()
1873 results = checker1.checker_results_for(si)
1875 # TODO: implement checker for mutable files and implement tests of that checker
1877 self.failUnlessEqual(len(results), 1)
1878 when, those_results = results[0]
1879 self.failUnless(isinstance(when, (int, float)))
1880 self.failUnless(starting_time <= when <= finish_time)
1881 all_results.append(those_results)
1882 _check_checker_results(all_results)
1883 d.addCallback(_check_stored_results)
1885 d.addCallback(self._test_checker_3)
1888 def _test_checker_3(self, res):
1889 # check one file, through FileNode.check()
1890 d = self._private_node.get_child_at_path(u"personal/sekrit data")
1891 d.addCallback(lambda n: n.check())
1892 def _checked(results):
1893 # 'sekrit data' is small, and fits in a LiteralFileNode, so
1894 # checking it is trivial and always returns True
1895 self.failUnlessEqual(results, True)
1896 d.addCallback(_checked)
1898 c0 = self.clients[1]
1899 n = c0.create_node_from_uri(self._root_directory_uri)
1900 d.addCallback(lambda res: n.get_child_at_path(u"subdir1/mydata567"))
1901 d.addCallback(lambda n: n.check())
1902 def _checked2(results):
1903 # mydata567 is large and lives in a CHK
1904 (needed, total, found, sharemap) = results
1905 self.failUnlessEqual(needed, 3)
1906 self.failUnlessEqual(total, 10)
1907 self.failUnlessEqual(found, 10)
1908 self.failUnlessEqual(len(sharemap), 10)
1909 for shnum in range(10):
1910 self.failUnlessEqual(len(sharemap[shnum]), 1)
1911 d.addCallback(_checked2)
1915 def _test_verifier(self, res):
1916 checker1 = self.clients[1].getServiceNamed("checker")
1917 d = self._private_node.build_manifest()
1918 def _check_all(manifest):
1921 dl.append(checker1.verify(si))
1922 return deferredutil.DeferredListShouldSucceed(dl)
1923 d.addCallback(_check_all)
1926 self.failUnless(i is True)
1927 d.addCallback(_done)
1928 d.addCallback(lambda res: checker1.verify(None))
1929 d.addCallback(self.failUnlessEqual, True)