1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
11 from allmydata import uri, storage, offloaded
12 from allmydata.immutable import download, upload, filenode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17 ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
18 IDeepCheckAndRepairResults
19 from allmydata.mutable.common import NotMutableError
20 from allmydata.mutable import layout as mutable_layout
21 from foolscap import DeadReferenceError
22 from twisted.python.failure import Failure
23 from twisted.web.client import getPage
24 from twisted.web.error import Error
26 from allmydata.test.common import SystemTestMixin
29 This is some data to publish to the virtual drive, which needs to be large
30 enough to not fit inside a LIT uri.
33 class CountingDataUploadable(upload.Data):
35 interrupt_after = None
36 interrupt_after_d = None
38 def read(self, length):
39 self.bytes_read += length
40 if self.interrupt_after is not None:
41 if self.bytes_read > self.interrupt_after:
42 self.interrupt_after = None
43 self.interrupt_after_d.callback(self)
44 return upload.Data.read(self, length)
46 class GrabEverythingConsumer:
52 def registerProducer(self, producer, streaming):
54 assert IPushProducer.providedBy(producer)
56 def write(self, data):
59 def unregisterProducer(self):
62 class SystemTest(SystemTestMixin, unittest.TestCase):
64 def test_connections(self):
65 self.basedir = "system/SystemTest/test_connections"
66 d = self.set_up_nodes()
67 self.extra_node = None
68 d.addCallback(lambda res: self.add_extra_node(self.numclients))
69 def _check(extra_node):
70 self.extra_node = extra_node
71 for c in self.clients:
72 all_peerids = list(c.get_all_peerids())
73 self.failUnlessEqual(len(all_peerids), self.numclients+1)
74 permuted_peers = list(c.get_permuted_peers("storage", "a"))
75 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
78 def _shutdown_extra_node(res):
80 return self.extra_node.stopService()
82 d.addBoth(_shutdown_extra_node)
84 test_connections.timeout = 300
85 # test_connections is subsumed by test_upload_and_download, and takes
86 # quite a while to run on a slow machine (because of all the TLS
87 # connections that must be established). If we ever rework the introducer
88 # code to such an extent that we're not sure if it works anymore, we can
89 # reinstate this test until it does.
92 def test_upload_and_download_random_key(self):
93 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
94 return self._test_upload_and_download(convergence=None)
95 test_upload_and_download_random_key.timeout = 4800
97 def test_upload_and_download_convergent(self):
98 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
99 return self._test_upload_and_download(convergence="some convergence string")
100 test_upload_and_download_convergent.timeout = 4800
102 def _test_upload_and_download(self, convergence):
103 # we use 4000 bytes of data, which will result in about 400k written
104 # to disk among all our simulated nodes
105 DATA = "Some data to upload\n" * 200
106 d = self.set_up_nodes()
107 def _check_connections(res):
108 for c in self.clients:
109 all_peerids = list(c.get_all_peerids())
110 self.failUnlessEqual(len(all_peerids), self.numclients)
111 permuted_peers = list(c.get_permuted_peers("storage", "a"))
112 self.failUnlessEqual(len(permuted_peers), self.numclients)
113 d.addCallback(_check_connections)
117 u = self.clients[0].getServiceNamed("uploader")
119 # we crank the max segsize down to 1024b for the duration of this
120 # test, so we can exercise multiple segments. It is important
121 # that this is not a multiple of the segment size, so that the
122 # tail segment is not the same length as the others. This actualy
123 # gets rounded up to 1025 to be a multiple of the number of
124 # required shares (since we use 25 out of 100 FEC).
125 up = upload.Data(DATA, convergence=convergence)
126 up.max_segment_size = 1024
129 d.addCallback(_do_upload)
130 def _upload_done(results):
132 log.msg("upload finished: uri is %s" % (uri,))
134 dl = self.clients[1].getServiceNamed("downloader")
136 d.addCallback(_upload_done)
138 def _upload_again(res):
139 # Upload again. If using convergent encryption then this ought to be
140 # short-circuited, however with the way we currently generate URIs
141 # (i.e. because they include the roothash), we have to do all of the
142 # encoding work, and only get to save on the upload part.
143 log.msg("UPLOADING AGAIN")
144 up = upload.Data(DATA, convergence=convergence)
145 up.max_segment_size = 1024
146 d1 = self.uploader.upload(up)
147 d.addCallback(_upload_again)
149 def _download_to_data(res):
150 log.msg("DOWNLOADING")
151 return self.downloader.download_to_data(self.uri)
152 d.addCallback(_download_to_data)
153 def _download_to_data_done(data):
154 log.msg("download finished")
155 self.failUnlessEqual(data, DATA)
156 d.addCallback(_download_to_data_done)
158 target_filename = os.path.join(self.basedir, "download.target")
159 def _download_to_filename(res):
160 return self.downloader.download_to_filename(self.uri,
162 d.addCallback(_download_to_filename)
163 def _download_to_filename_done(res):
164 newdata = open(target_filename, "rb").read()
165 self.failUnlessEqual(newdata, DATA)
166 d.addCallback(_download_to_filename_done)
168 target_filename2 = os.path.join(self.basedir, "download.target2")
169 def _download_to_filehandle(res):
170 fh = open(target_filename2, "wb")
171 return self.downloader.download_to_filehandle(self.uri, fh)
172 d.addCallback(_download_to_filehandle)
173 def _download_to_filehandle_done(fh):
175 newdata = open(target_filename2, "rb").read()
176 self.failUnlessEqual(newdata, DATA)
177 d.addCallback(_download_to_filehandle_done)
179 consumer = GrabEverythingConsumer()
180 ct = download.ConsumerAdapter(consumer)
181 d.addCallback(lambda res:
182 self.downloader.download(self.uri, ct))
183 def _download_to_consumer_done(ign):
184 self.failUnlessEqual(consumer.contents, DATA)
185 d.addCallback(_download_to_consumer_done)
187 def _download_nonexistent_uri(res):
188 baduri = self.mangle_uri(self.uri)
189 log.msg("about to download non-existent URI", level=log.UNUSUAL,
190 facility="tahoe.tests")
191 d1 = self.downloader.download_to_data(baduri)
192 def _baduri_should_fail(res):
193 log.msg("finished downloading non-existend URI",
194 level=log.UNUSUAL, facility="tahoe.tests")
195 self.failUnless(isinstance(res, Failure))
196 self.failUnless(res.check(download.NotEnoughSharesError),
197 "expected NotEnoughSharesError, got %s" % res)
198 # TODO: files that have zero peers should get a special kind
199 # of NotEnoughSharesError, which can be used to suggest that
200 # the URI might be wrong or that they've never uploaded the
201 # file in the first place.
202 d1.addBoth(_baduri_should_fail)
204 d.addCallback(_download_nonexistent_uri)
206 # add a new node, which doesn't accept shares, and only uses the
208 d.addCallback(lambda res: self.add_extra_node(self.numclients,
210 add_to_sparent=True))
211 def _added(extra_node):
212 self.extra_node = extra_node
213 extra_node.getServiceNamed("storage").sizelimit = 0
214 d.addCallback(_added)
216 HELPER_DATA = "Data that needs help to upload" * 1000
217 def _upload_with_helper(res):
218 u = upload.Data(HELPER_DATA, convergence=convergence)
219 d = self.extra_node.upload(u)
220 def _uploaded(results):
222 return self.downloader.download_to_data(uri)
223 d.addCallback(_uploaded)
225 self.failUnlessEqual(newdata, HELPER_DATA)
226 d.addCallback(_check)
228 d.addCallback(_upload_with_helper)
230 def _upload_duplicate_with_helper(res):
231 u = upload.Data(HELPER_DATA, convergence=convergence)
232 u.debug_stash_RemoteEncryptedUploadable = True
233 d = self.extra_node.upload(u)
234 def _uploaded(results):
236 return self.downloader.download_to_data(uri)
237 d.addCallback(_uploaded)
239 self.failUnlessEqual(newdata, HELPER_DATA)
240 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
241 "uploadable started uploading, should have been avoided")
242 d.addCallback(_check)
244 if convergence is not None:
245 d.addCallback(_upload_duplicate_with_helper)
247 def _upload_resumable(res):
248 DATA = "Data that needs help to upload and gets interrupted" * 1000
249 u1 = CountingDataUploadable(DATA, convergence=convergence)
250 u2 = CountingDataUploadable(DATA, convergence=convergence)
252 # we interrupt the connection after about 5kB by shutting down
253 # the helper, then restartingit.
254 u1.interrupt_after = 5000
255 u1.interrupt_after_d = defer.Deferred()
256 u1.interrupt_after_d.addCallback(lambda res:
257 self.bounce_client(0))
259 # sneak into the helper and reduce its chunk size, so that our
260 # debug_interrupt will sever the connection on about the fifth
261 # chunk fetched. This makes sure that we've started to write the
262 # new shares before we abandon them, which exercises the
263 # abort/delete-partial-share code. TODO: find a cleaner way to do
264 # this. I know that this will affect later uses of the helper in
265 # this same test run, but I'm not currently worried about it.
266 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
268 d = self.extra_node.upload(u1)
270 def _should_not_finish(res):
271 self.fail("interrupted upload should have failed, not finished"
272 " with result %s" % (res,))
274 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
276 # make sure we actually interrupted it before finishing the
278 self.failUnless(u1.bytes_read < len(DATA),
279 "read %d out of %d total" % (u1.bytes_read,
282 log.msg("waiting for reconnect", level=log.NOISY,
283 facility="tahoe.test.test_system")
284 # now, we need to give the nodes a chance to notice that this
285 # connection has gone away. When this happens, the storage
286 # servers will be told to abort their uploads, removing the
287 # partial shares. Unfortunately this involves TCP messages
288 # going through the loopback interface, and we can't easily
289 # predict how long that will take. If it were all local, we
290 # could use fireEventually() to stall. Since we don't have
291 # the right introduction hooks, the best we can do is use a
292 # fixed delay. TODO: this is fragile.
293 u1.interrupt_after_d.addCallback(self.stall, 2.0)
294 return u1.interrupt_after_d
295 d.addCallbacks(_should_not_finish, _interrupted)
297 def _disconnected(res):
298 # check to make sure the storage servers aren't still hanging
299 # on to the partial share: their incoming/ directories should
301 log.msg("disconnected", level=log.NOISY,
302 facility="tahoe.test.test_system")
303 for i in range(self.numclients):
304 incdir = os.path.join(self.getdir("client%d" % i),
305 "storage", "shares", "incoming")
306 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
307 d.addCallback(_disconnected)
309 # then we need to give the reconnector a chance to
310 # reestablish the connection to the helper.
311 d.addCallback(lambda res:
312 log.msg("wait_for_connections", level=log.NOISY,
313 facility="tahoe.test.test_system"))
314 d.addCallback(lambda res: self.wait_for_connections())
317 d.addCallback(lambda res:
318 log.msg("uploading again", level=log.NOISY,
319 facility="tahoe.test.test_system"))
320 d.addCallback(lambda res: self.extra_node.upload(u2))
322 def _uploaded(results):
324 log.msg("Second upload complete", level=log.NOISY,
325 facility="tahoe.test.test_system")
327 # this is really bytes received rather than sent, but it's
328 # convenient and basically measures the same thing
329 bytes_sent = results.ciphertext_fetched
331 # We currently don't support resumption of upload if the data is
332 # encrypted with a random key. (Because that would require us
333 # to store the key locally and re-use it on the next upload of
334 # this file, which isn't a bad thing to do, but we currently
336 if convergence is not None:
337 # Make sure we did not have to read the whole file the
338 # second time around .
339 self.failUnless(bytes_sent < len(DATA),
340 "resumption didn't save us any work:"
341 " read %d bytes out of %d total" %
342 (bytes_sent, len(DATA)))
344 # Make sure we did have to read the whole file the second
345 # time around -- because the one that we partially uploaded
346 # earlier was encrypted with a different random key.
347 self.failIf(bytes_sent < len(DATA),
348 "resumption saved us some work even though we were using random keys:"
349 " read %d bytes out of %d total" %
350 (bytes_sent, len(DATA)))
351 return self.downloader.download_to_data(uri)
352 d.addCallback(_uploaded)
355 self.failUnlessEqual(newdata, DATA)
356 # If using convergent encryption, then also check that the
357 # helper has removed the temp file from its directories.
358 if convergence is not None:
359 basedir = os.path.join(self.getdir("client0"), "helper")
360 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
361 self.failUnlessEqual(files, [])
362 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
363 self.failUnlessEqual(files, [])
364 d.addCallback(_check)
366 d.addCallback(_upload_resumable)
370 def _find_shares(self, basedir):
372 for (dirpath, dirnames, filenames) in os.walk(basedir):
373 if "storage" not in dirpath:
377 pieces = dirpath.split(os.sep)
378 if pieces[-4] == "storage" and pieces[-3] == "shares":
379 # we're sitting in .../storage/shares/$START/$SINDEX , and there
380 # are sharefiles here
381 assert pieces[-5].startswith("client")
382 client_num = int(pieces[-5][-1])
383 storage_index_s = pieces[-1]
384 storage_index = storage.si_a2b(storage_index_s)
385 for sharename in filenames:
386 shnum = int(sharename)
387 filename = os.path.join(dirpath, sharename)
388 data = (client_num, storage_index, filename, shnum)
391 self.fail("unable to find any share files in %s" % basedir)
394 def _corrupt_mutable_share(self, filename, which):
395 msf = storage.MutableShareFile(filename)
396 datav = msf.readv([ (0, 1000000) ])
397 final_share = datav[0]
398 assert len(final_share) < 1000000 # ought to be truncated
399 pieces = mutable_layout.unpack_share(final_share)
400 (seqnum, root_hash, IV, k, N, segsize, datalen,
401 verification_key, signature, share_hash_chain, block_hash_tree,
402 share_data, enc_privkey) = pieces
404 if which == "seqnum":
407 root_hash = self.flip_bit(root_hash)
409 IV = self.flip_bit(IV)
410 elif which == "segsize":
411 segsize = segsize + 15
412 elif which == "pubkey":
413 verification_key = self.flip_bit(verification_key)
414 elif which == "signature":
415 signature = self.flip_bit(signature)
416 elif which == "share_hash_chain":
417 nodenum = share_hash_chain.keys()[0]
418 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
419 elif which == "block_hash_tree":
420 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
421 elif which == "share_data":
422 share_data = self.flip_bit(share_data)
423 elif which == "encprivkey":
424 enc_privkey = self.flip_bit(enc_privkey)
426 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
428 final_share = mutable_layout.pack_share(prefix,
435 msf.writev( [(0, final_share)], None)
438 def test_mutable(self):
439 self.basedir = "system/SystemTest/test_mutable"
440 DATA = "initial contents go here." # 25 bytes % 3 != 0
441 NEWDATA = "new contents yay"
442 NEWERDATA = "this is getting old"
444 d = self.set_up_nodes(use_key_generator=True)
446 def _create_mutable(res):
448 log.msg("starting create_mutable_file")
449 d1 = c.create_mutable_file(DATA)
451 log.msg("DONE: %s" % (res,))
452 self._mutable_node_1 = res
454 d1.addCallback(_done)
456 d.addCallback(_create_mutable)
458 def _test_debug(res):
459 # find a share. It is important to run this while there is only
460 # one slot in the grid.
461 shares = self._find_shares(self.basedir)
462 (client_num, storage_index, filename, shnum) = shares[0]
463 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
465 log.msg(" for clients[%d]" % client_num)
467 out,err = StringIO(), StringIO()
468 rc = runner.runner(["debug", "dump-share", "--offsets",
470 stdout=out, stderr=err)
471 output = out.getvalue()
472 self.failUnlessEqual(rc, 0)
474 self.failUnless("Mutable slot found:\n" in output)
475 self.failUnless("share_type: SDMF\n" in output)
476 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
477 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
478 self.failUnless(" num_extra_leases: 0\n" in output)
479 # the pubkey size can vary by a byte, so the container might
480 # be a bit larger on some runs.
481 m = re.search(r'^ container_size: (\d+)$', output, re.M)
483 container_size = int(m.group(1))
484 self.failUnless(2037 <= container_size <= 2049, container_size)
485 m = re.search(r'^ data_length: (\d+)$', output, re.M)
487 data_length = int(m.group(1))
488 self.failUnless(2037 <= data_length <= 2049, data_length)
489 self.failUnless(" secrets are for nodeid: %s\n" % peerid
491 self.failUnless(" SDMF contents:\n" in output)
492 self.failUnless(" seqnum: 1\n" in output)
493 self.failUnless(" required_shares: 3\n" in output)
494 self.failUnless(" total_shares: 10\n" in output)
495 self.failUnless(" segsize: 27\n" in output, (output, filename))
496 self.failUnless(" datalen: 25\n" in output)
497 # the exact share_hash_chain nodes depends upon the sharenum,
498 # and is more of a hassle to compute than I want to deal with
500 self.failUnless(" share_hash_chain: " in output)
501 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
502 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
503 base32.b2a(storage_index))
504 self.failUnless(expected in output)
505 except unittest.FailTest:
507 print "dump-share output was:"
510 d.addCallback(_test_debug)
514 # first, let's see if we can use the existing node to retrieve the
515 # contents. This allows it to use the cached pubkey and maybe the
516 # latest-known sharemap.
518 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
519 def _check_download_1(res):
520 self.failUnlessEqual(res, DATA)
521 # now we see if we can retrieve the data from a new node,
522 # constructed using the URI of the original one. We do this test
523 # on the same client that uploaded the data.
524 uri = self._mutable_node_1.get_uri()
525 log.msg("starting retrieve1")
526 newnode = self.clients[0].create_node_from_uri(uri)
527 newnode_2 = self.clients[0].create_node_from_uri(uri)
528 self.failUnlessIdentical(newnode, newnode_2)
529 return newnode.download_best_version()
530 d.addCallback(_check_download_1)
532 def _check_download_2(res):
533 self.failUnlessEqual(res, DATA)
534 # same thing, but with a different client
535 uri = self._mutable_node_1.get_uri()
536 newnode = self.clients[1].create_node_from_uri(uri)
537 log.msg("starting retrieve2")
538 d1 = newnode.download_best_version()
539 d1.addCallback(lambda res: (res, newnode))
541 d.addCallback(_check_download_2)
543 def _check_download_3((res, newnode)):
544 self.failUnlessEqual(res, DATA)
546 log.msg("starting replace1")
547 d1 = newnode.overwrite(NEWDATA)
548 d1.addCallback(lambda res: newnode.download_best_version())
550 d.addCallback(_check_download_3)
552 def _check_download_4(res):
553 self.failUnlessEqual(res, NEWDATA)
554 # now create an even newer node and replace the data on it. This
555 # new node has never been used for download before.
556 uri = self._mutable_node_1.get_uri()
557 newnode1 = self.clients[2].create_node_from_uri(uri)
558 newnode2 = self.clients[3].create_node_from_uri(uri)
559 self._newnode3 = self.clients[3].create_node_from_uri(uri)
560 log.msg("starting replace2")
561 d1 = newnode1.overwrite(NEWERDATA)
562 d1.addCallback(lambda res: newnode2.download_best_version())
564 d.addCallback(_check_download_4)
566 def _check_download_5(res):
567 log.msg("finished replace2")
568 self.failUnlessEqual(res, NEWERDATA)
569 d.addCallback(_check_download_5)
571 def _corrupt_shares(res):
572 # run around and flip bits in all but k of the shares, to test
574 shares = self._find_shares(self.basedir)
575 ## sort by share number
576 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
577 where = dict([ (shnum, filename)
578 for (client_num, storage_index, filename, shnum)
580 assert len(where) == 10 # this test is designed for 3-of-10
581 for shnum, filename in where.items():
582 # shares 7,8,9 are left alone. read will check
583 # (share_hash_chain, block_hash_tree, share_data). New
584 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
585 # segsize, signature).
587 # read: this will trigger "pubkey doesn't match
589 self._corrupt_mutable_share(filename, "pubkey")
590 self._corrupt_mutable_share(filename, "encprivkey")
592 # triggers "signature is invalid"
593 self._corrupt_mutable_share(filename, "seqnum")
595 # triggers "signature is invalid"
596 self._corrupt_mutable_share(filename, "R")
598 # triggers "signature is invalid"
599 self._corrupt_mutable_share(filename, "segsize")
601 self._corrupt_mutable_share(filename, "share_hash_chain")
603 self._corrupt_mutable_share(filename, "block_hash_tree")
605 self._corrupt_mutable_share(filename, "share_data")
606 # other things to correct: IV, signature
607 # 7,8,9 are left alone
609 # note that initial_query_count=5 means that we'll hit the
610 # first 5 servers in effectively random order (based upon
611 # response time), so we won't necessarily ever get a "pubkey
612 # doesn't match fingerprint" error (if we hit shnum>=1 before
613 # shnum=0, we pull the pubkey from there). To get repeatable
614 # specific failures, we need to set initial_query_count=1,
615 # but of course that will change the sequencing behavior of
616 # the retrieval process. TODO: find a reasonable way to make
617 # this a parameter, probably when we expand this test to test
618 # for one failure mode at a time.
620 # when we retrieve this, we should get three signature
621 # failures (where we've mangled seqnum, R, and segsize). The
623 d.addCallback(_corrupt_shares)
625 d.addCallback(lambda res: self._newnode3.download_best_version())
626 d.addCallback(_check_download_5)
628 def _check_empty_file(res):
629 # make sure we can create empty files, this usually screws up the
631 d1 = self.clients[2].create_mutable_file("")
632 d1.addCallback(lambda newnode: newnode.download_best_version())
633 d1.addCallback(lambda res: self.failUnlessEqual("", res))
635 d.addCallback(_check_empty_file)
637 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
638 def _created_dirnode(dnode):
639 log.msg("_created_dirnode(%s)" % (dnode,))
641 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
642 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
643 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
644 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
645 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
646 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
647 d1.addCallback(lambda res: dnode.build_manifest())
648 d1.addCallback(lambda manifest:
649 self.failUnlessEqual(len(manifest), 1))
651 d.addCallback(_created_dirnode)
653 def wait_for_c3_kg_conn():
654 return self.clients[3]._key_generator is not None
655 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
657 def check_kg_poolsize(junk, size_delta):
658 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
659 self.key_generator_svc.key_generator.pool_size + size_delta)
661 d.addCallback(check_kg_poolsize, 0)
662 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
663 d.addCallback(check_kg_poolsize, -1)
664 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
665 d.addCallback(check_kg_poolsize, -2)
666 # use_helper induces use of clients[3], which is the using-key_gen client
667 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
668 d.addCallback(check_kg_poolsize, -3)
671 # The default 120 second timeout went off when running it under valgrind
672 # on my old Windows laptop, so I'm bumping up the timeout.
673 test_mutable.timeout = 240
675 def flip_bit(self, good):
676 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
678 def mangle_uri(self, gooduri):
679 # change the key, which changes the storage index, which means we'll
680 # be asking about the wrong file, so nobody will have any shares
681 u = IFileURI(gooduri)
682 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
683 uri_extension_hash=u.uri_extension_hash,
684 needed_shares=u.needed_shares,
685 total_shares=u.total_shares,
687 return u2.to_string()
689 # TODO: add a test which mangles the uri_extension_hash instead, and
690 # should fail due to not being able to get a valid uri_extension block.
691 # Also a test which sneakily mangles the uri_extension block to change
692 # some of the validation data, so it will fail in the post-download phase
693 # when the file's crypttext integrity check fails. Do the same thing for
694 # the key, which should cause the download to fail the post-download
695 # plaintext_hash check.
697 def test_vdrive(self):
698 self.basedir = "system/SystemTest/test_vdrive"
699 self.data = LARGE_DATA
700 d = self.set_up_nodes(use_stats_gatherer=True)
701 d.addCallback(self._test_introweb)
702 d.addCallback(self.log, "starting publish")
703 d.addCallback(self._do_publish1)
704 d.addCallback(self._test_runner)
705 d.addCallback(self._do_publish2)
706 # at this point, we have the following filesystem (where "R" denotes
707 # self._root_directory_uri):
710 # R/subdir1/mydata567
712 # R/subdir1/subdir2/mydata992
714 d.addCallback(lambda res: self.bounce_client(0))
715 d.addCallback(self.log, "bounced client0")
717 d.addCallback(self._check_publish1)
718 d.addCallback(self.log, "did _check_publish1")
719 d.addCallback(self._check_publish2)
720 d.addCallback(self.log, "did _check_publish2")
721 d.addCallback(self._do_publish_private)
722 d.addCallback(self.log, "did _do_publish_private")
723 # now we also have (where "P" denotes a new dir):
724 # P/personal/sekrit data
725 # P/s2-rw -> /subdir1/subdir2/
726 # P/s2-ro -> /subdir1/subdir2/ (read-only)
727 d.addCallback(self._check_publish_private)
728 d.addCallback(self.log, "did _check_publish_private")
729 d.addCallback(self._test_web)
730 d.addCallback(self._test_control)
731 d.addCallback(self._test_cli)
732 # P now has four top-level children:
733 # P/personal/sekrit data
736 # P/test_put/ (empty)
737 d.addCallback(self._test_checker)
738 d.addCallback(self._grab_stats)
740 test_vdrive.timeout = 1100
742 def _test_introweb(self, res):
743 d = getPage(self.introweb_url, method="GET", followRedirect=True)
746 self.failUnless("allmydata: %s" % str(allmydata.__version__)
748 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
749 self.failUnless("Subscription Summary: storage: 5" in res)
750 except unittest.FailTest:
752 print "GET %s output was:" % self.introweb_url
755 d.addCallback(_check)
756 d.addCallback(lambda res:
757 getPage(self.introweb_url + "?t=json",
758 method="GET", followRedirect=True))
759 def _check_json(res):
760 data = simplejson.loads(res)
762 self.failUnlessEqual(data["subscription_summary"],
764 self.failUnlessEqual(data["announcement_summary"],
765 {"storage": 5, "stub_client": 5})
766 except unittest.FailTest:
768 print "GET %s?t=json output was:" % self.introweb_url
771 d.addCallback(_check_json)
774 def _do_publish1(self, res):
775 ut = upload.Data(self.data, convergence=None)
777 d = c0.create_empty_dirnode()
778 def _made_root(new_dirnode):
779 self._root_directory_uri = new_dirnode.get_uri()
780 return c0.create_node_from_uri(self._root_directory_uri)
781 d.addCallback(_made_root)
782 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
783 def _made_subdir1(subdir1_node):
784 self._subdir1_node = subdir1_node
785 d1 = subdir1_node.add_file(u"mydata567", ut)
786 d1.addCallback(self.log, "publish finished")
787 def _stash_uri(filenode):
788 self.uri = filenode.get_uri()
789 d1.addCallback(_stash_uri)
791 d.addCallback(_made_subdir1)
794 def _do_publish2(self, res):
795 ut = upload.Data(self.data, convergence=None)
796 d = self._subdir1_node.create_empty_directory(u"subdir2")
797 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
800 def log(self, res, *args, **kwargs):
801 # print "MSG: %s RES: %s" % (msg, args)
802 log.msg(*args, **kwargs)
805 def _do_publish_private(self, res):
806 self.smalldata = "sssh, very secret stuff"
807 ut = upload.Data(self.smalldata, convergence=None)
808 d = self.clients[0].create_empty_dirnode()
809 d.addCallback(self.log, "GOT private directory")
810 def _got_new_dir(privnode):
811 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
812 d1 = privnode.create_empty_directory(u"personal")
813 d1.addCallback(self.log, "made P/personal")
814 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
815 d1.addCallback(self.log, "made P/personal/sekrit data")
816 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
818 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
819 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
821 d1.addCallback(_got_s2)
822 d1.addCallback(lambda res: privnode)
824 d.addCallback(_got_new_dir)
827 def _check_publish1(self, res):
828 # this one uses the iterative API
830 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
831 d.addCallback(self.log, "check_publish1 got /")
832 d.addCallback(lambda root: root.get(u"subdir1"))
833 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
834 d.addCallback(lambda filenode: filenode.download_to_data())
835 d.addCallback(self.log, "get finished")
837 self.failUnlessEqual(data, self.data)
838 d.addCallback(_get_done)
841 def _check_publish2(self, res):
842 # this one uses the path-based API
843 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
844 d = rootnode.get_child_at_path(u"subdir1")
845 d.addCallback(lambda dirnode:
846 self.failUnless(IDirectoryNode.providedBy(dirnode)))
847 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
848 d.addCallback(lambda filenode: filenode.download_to_data())
849 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
851 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
852 def _got_filenode(filenode):
853 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
854 assert fnode == filenode
855 d.addCallback(_got_filenode)
858 def _check_publish_private(self, resnode):
859 # this one uses the path-based API
860 self._private_node = resnode
862 d = self._private_node.get_child_at_path(u"personal")
863 def _got_personal(personal):
864 self._personal_node = personal
866 d.addCallback(_got_personal)
868 d.addCallback(lambda dirnode:
869 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
871 return self._private_node.get_child_at_path(path)
873 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
874 d.addCallback(lambda filenode: filenode.download_to_data())
875 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
876 d.addCallback(lambda res: get_path(u"s2-rw"))
877 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
878 d.addCallback(lambda res: get_path(u"s2-ro"))
879 def _got_s2ro(dirnode):
880 self.failUnless(dirnode.is_mutable(), dirnode)
881 self.failUnless(dirnode.is_readonly(), dirnode)
882 d1 = defer.succeed(None)
883 d1.addCallback(lambda res: dirnode.list())
884 d1.addCallback(self.log, "dirnode.list")
886 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
888 d1.addCallback(self.log, "doing add_file(ro)")
889 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
890 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
892 d1.addCallback(self.log, "doing get(ro)")
893 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
894 d1.addCallback(lambda filenode:
895 self.failUnless(IFileNode.providedBy(filenode)))
897 d1.addCallback(self.log, "doing delete(ro)")
898 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
900 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
902 d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
904 personal = self._personal_node
905 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
907 d1.addCallback(self.log, "doing move_child_to(ro)2")
908 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
910 d1.addCallback(self.log, "finished with _got_s2ro")
912 d.addCallback(_got_s2ro)
913 def _got_home(dummy):
914 home = self._private_node
915 personal = self._personal_node
916 d1 = defer.succeed(None)
917 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
918 d1.addCallback(lambda res:
919 personal.move_child_to(u"sekrit data",home,u"sekrit"))
921 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
922 d1.addCallback(lambda res:
923 home.move_child_to(u"sekrit", home, u"sekrit data"))
925 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
926 d1.addCallback(lambda res:
927 home.move_child_to(u"sekrit data", personal))
929 d1.addCallback(lambda res: home.build_manifest())
930 d1.addCallback(self.log, "manifest")
933 # P/personal/sekrit data
934 # P/s2-rw (same as P/s2-ro)
935 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
936 d1.addCallback(lambda manifest:
937 self.failUnlessEqual(len(manifest), 4))
938 d1.addCallback(lambda res: home.deep_stats())
939 def _check_stats(stats):
940 expected = {"count-immutable-files": 1,
941 "count-mutable-files": 0,
942 "count-literal-files": 1,
944 "count-directories": 3,
945 "size-immutable-files": 112,
946 "size-literal-files": 23,
947 #"size-directories": 616, # varies
948 #"largest-directory": 616,
949 "largest-directory-children": 3,
950 "largest-immutable-file": 112,
952 for k,v in expected.iteritems():
953 self.failUnlessEqual(stats[k], v,
954 "stats[%s] was %s, not %s" %
956 self.failUnless(stats["size-directories"] > 1300,
957 stats["size-directories"])
958 self.failUnless(stats["largest-directory"] > 800,
959 stats["largest-directory"])
960 self.failUnlessEqual(stats["size-files-histogram"],
961 [ (11, 31, 1), (101, 316, 1) ])
962 d1.addCallback(_check_stats)
964 d.addCallback(_got_home)
967 def shouldFail(self, res, expected_failure, which, substring=None):
968 if isinstance(res, Failure):
969 res.trap(expected_failure)
971 self.failUnless(substring in str(res),
972 "substring '%s' not in '%s'"
973 % (substring, str(res)))
975 self.fail("%s was supposed to raise %s, not get '%s'" %
976 (which, expected_failure, res))
978 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
979 assert substring is None or isinstance(substring, str)
980 d = defer.maybeDeferred(callable, *args, **kwargs)
982 if isinstance(res, Failure):
983 res.trap(expected_failure)
985 self.failUnless(substring in str(res),
986 "substring '%s' not in '%s'"
987 % (substring, str(res)))
989 self.fail("%s was supposed to raise %s, not get '%s'" %
990 (which, expected_failure, res))
994 def PUT(self, urlpath, data):
995 url = self.webish_url + urlpath
996 return getPage(url, method="PUT", postdata=data)
998 def GET(self, urlpath, followRedirect=False):
999 url = self.webish_url + urlpath
1000 return getPage(url, method="GET", followRedirect=followRedirect)
1002 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1004 url = self.helper_webish_url + urlpath
1006 url = self.webish_url + urlpath
1007 sepbase = "boogabooga"
1008 sep = "--" + sepbase
1011 form.append('Content-Disposition: form-data; name="_charset"')
1013 form.append('UTF-8')
1015 for name, value in fields.iteritems():
1016 if isinstance(value, tuple):
1017 filename, value = value
1018 form.append('Content-Disposition: form-data; name="%s"; '
1019 'filename="%s"' % (name, filename.encode("utf-8")))
1021 form.append('Content-Disposition: form-data; name="%s"' % name)
1023 form.append(str(value))
1026 body = "\r\n".join(form) + "\r\n"
1027 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1029 return getPage(url, method="POST", postdata=body,
1030 headers=headers, followRedirect=followRedirect)
1032 def _test_web(self, res):
1033 base = self.webish_url
1034 public = "uri/" + self._root_directory_uri
1036 def _got_welcome(page):
1037 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1038 self.failUnless(expected in page,
1039 "I didn't see the right 'connected storage servers'"
1040 " message in: %s" % page
1042 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1043 self.failUnless(expected in page,
1044 "I didn't see the right 'My nodeid' message "
1046 self.failUnless("Helper: 0 active uploads" in page)
1047 d.addCallback(_got_welcome)
1048 d.addCallback(self.log, "done with _got_welcome")
1050 # get the welcome page from the node that uses the helper too
1051 d.addCallback(lambda res: getPage(self.helper_webish_url))
1052 def _got_welcome_helper(page):
1053 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1055 self.failUnless("Not running helper" in page)
1056 d.addCallback(_got_welcome_helper)
1058 d.addCallback(lambda res: getPage(base + public))
1059 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1060 def _got_subdir1(page):
1061 # there ought to be an href for our file
1062 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1063 self.failUnless(">mydata567</a>" in page)
1064 d.addCallback(_got_subdir1)
1065 d.addCallback(self.log, "done with _got_subdir1")
1066 d.addCallback(lambda res:
1067 getPage(base + public + "/subdir1/mydata567"))
1068 def _got_data(page):
1069 self.failUnlessEqual(page, self.data)
1070 d.addCallback(_got_data)
1072 # download from a URI embedded in a URL
1073 d.addCallback(self.log, "_get_from_uri")
1074 def _get_from_uri(res):
1075 return getPage(base + "uri/%s?filename=%s"
1076 % (self.uri, "mydata567"))
1077 d.addCallback(_get_from_uri)
1078 def _got_from_uri(page):
1079 self.failUnlessEqual(page, self.data)
1080 d.addCallback(_got_from_uri)
1082 # download from a URI embedded in a URL, second form
1083 d.addCallback(self.log, "_get_from_uri2")
1084 def _get_from_uri2(res):
1085 return getPage(base + "uri?uri=%s" % (self.uri,))
1086 d.addCallback(_get_from_uri2)
1087 d.addCallback(_got_from_uri)
1089 # download from a bogus URI, make sure we get a reasonable error
1090 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1091 def _get_from_bogus_uri(res):
1092 d1 = getPage(base + "uri/%s?filename=%s"
1093 % (self.mangle_uri(self.uri), "mydata567"))
1094 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1097 d.addCallback(_get_from_bogus_uri)
1098 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1100 # upload a file with PUT
1101 d.addCallback(self.log, "about to try PUT")
1102 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1103 "new.txt contents"))
1104 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1105 d.addCallback(self.failUnlessEqual, "new.txt contents")
1106 # and again with something large enough to use multiple segments,
1107 # and hopefully trigger pauseProducing too
1108 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1109 "big" * 500000)) # 1.5MB
1110 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1111 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1113 # can we replace files in place?
1114 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1116 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1117 d.addCallback(self.failUnlessEqual, "NEWER contents")
1119 # test unlinked POST
1120 d.addCallback(lambda res: self.POST("uri", t="upload",
1121 file=("new.txt", "data" * 10000)))
1122 # and again using the helper, which exercises different upload-status
1124 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1125 file=("foo.txt", "data2" * 10000)))
1127 # check that the status page exists
1128 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1129 def _got_status(res):
1130 # find an interesting upload and download to look at. LIT files
1131 # are not interesting.
1132 for ds in self.clients[0].list_all_download_statuses():
1133 if ds.get_size() > 200:
1134 self._down_status = ds.get_counter()
1135 for us in self.clients[0].list_all_upload_statuses():
1136 if us.get_size() > 200:
1137 self._up_status = us.get_counter()
1138 rs = self.clients[0].list_all_retrieve_statuses()[0]
1139 self._retrieve_status = rs.get_counter()
1140 ps = self.clients[0].list_all_publish_statuses()[0]
1141 self._publish_status = ps.get_counter()
1142 us = self.clients[0].list_all_mapupdate_statuses()[0]
1143 self._update_status = us.get_counter()
1145 # and that there are some upload- and download- status pages
1146 return self.GET("status/up-%d" % self._up_status)
1147 d.addCallback(_got_status)
1149 return self.GET("status/down-%d" % self._down_status)
1150 d.addCallback(_got_up)
1152 return self.GET("status/mapupdate-%d" % self._update_status)
1153 d.addCallback(_got_down)
1154 def _got_update(res):
1155 return self.GET("status/publish-%d" % self._publish_status)
1156 d.addCallback(_got_update)
1157 def _got_publish(res):
1158 return self.GET("status/retrieve-%d" % self._retrieve_status)
1159 d.addCallback(_got_publish)
1161 # check that the helper status page exists
1162 d.addCallback(lambda res:
1163 self.GET("helper_status", followRedirect=True))
1164 def _got_helper_status(res):
1165 self.failUnless("Bytes Fetched:" in res)
1166 # touch a couple of files in the helper's working directory to
1167 # exercise more code paths
1168 workdir = os.path.join(self.getdir("client0"), "helper")
1169 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1170 f = open(incfile, "wb")
1171 f.write("small file")
1173 then = time.time() - 86400*3
1175 os.utime(incfile, (now, then))
1176 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1177 f = open(encfile, "wb")
1178 f.write("less small file")
1180 os.utime(encfile, (now, then))
1181 d.addCallback(_got_helper_status)
1182 # and that the json form exists
1183 d.addCallback(lambda res:
1184 self.GET("helper_status?t=json", followRedirect=True))
1185 def _got_helper_status_json(res):
1186 data = simplejson.loads(res)
1187 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1189 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1190 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1191 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1193 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1194 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1195 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1197 d.addCallback(_got_helper_status_json)
1199 # and check that client[3] (which uses a helper but does not run one
1200 # itself) doesn't explode when you ask for its status
1201 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1202 def _got_non_helper_status(res):
1203 self.failUnless("Upload and Download Status" in res)
1204 d.addCallback(_got_non_helper_status)
1206 # or for helper status with t=json
1207 d.addCallback(lambda res:
1208 getPage(self.helper_webish_url + "helper_status?t=json"))
1209 def _got_non_helper_status_json(res):
1210 data = simplejson.loads(res)
1211 self.failUnlessEqual(data, {})
1212 d.addCallback(_got_non_helper_status_json)
1214 # see if the statistics page exists
1215 d.addCallback(lambda res: self.GET("statistics"))
1216 def _got_stats(res):
1217 self.failUnless("Node Statistics" in res)
1218 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1219 d.addCallback(_got_stats)
1220 d.addCallback(lambda res: self.GET("statistics?t=json"))
1221 def _got_stats_json(res):
1222 data = simplejson.loads(res)
1223 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1224 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1225 d.addCallback(_got_stats_json)
1227 # TODO: mangle the second segment of a file, to test errors that
1228 # occur after we've already sent some good data, which uses a
1229 # different error path.
1231 # TODO: download a URI with a form
1232 # TODO: create a directory by using a form
1233 # TODO: upload by using a form on the directory page
1234 # url = base + "somedir/subdir1/freeform_post!!upload"
1235 # TODO: delete a file by using a button on the directory page
1239 def _test_runner(self, res):
1240 # exercise some of the diagnostic tools in runner.py
1243 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1244 if "storage" not in dirpath:
1248 pieces = dirpath.split(os.sep)
1249 if pieces[-4] == "storage" and pieces[-3] == "shares":
1250 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1251 # are sharefiles here
1252 filename = os.path.join(dirpath, filenames[0])
1253 # peek at the magic to see if it is a chk share
1254 magic = open(filename, "rb").read(4)
1255 if magic == '\x00\x00\x00\x01':
1258 self.fail("unable to find any uri_extension files in %s"
1260 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1262 out,err = StringIO(), StringIO()
1263 rc = runner.runner(["debug", "dump-share", "--offsets",
1265 stdout=out, stderr=err)
1266 output = out.getvalue()
1267 self.failUnlessEqual(rc, 0)
1269 # we only upload a single file, so we can assert some things about
1270 # its size and shares.
1271 self.failUnless(("share filename: %s" % filename) in output)
1272 self.failUnless("size: %d\n" % len(self.data) in output)
1273 self.failUnless("num_segments: 1\n" in output)
1274 # segment_size is always a multiple of needed_shares
1275 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1276 self.failUnless("total_shares: 10\n" in output)
1277 # keys which are supposed to be present
1278 for key in ("size", "num_segments", "segment_size",
1279 "needed_shares", "total_shares",
1280 "codec_name", "codec_params", "tail_codec_params",
1281 #"plaintext_hash", "plaintext_root_hash",
1282 "crypttext_hash", "crypttext_root_hash",
1283 "share_root_hash", "UEB_hash"):
1284 self.failUnless("%s: " % key in output, key)
1285 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1287 # now use its storage index to find the other shares using the
1288 # 'find-shares' tool
1289 sharedir, shnum = os.path.split(filename)
1290 storagedir, storage_index_s = os.path.split(sharedir)
1291 out,err = StringIO(), StringIO()
1292 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1293 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1294 rc = runner.runner(cmd, stdout=out, stderr=err)
1295 self.failUnlessEqual(rc, 0)
1297 sharefiles = [sfn.strip() for sfn in out.readlines()]
1298 self.failUnlessEqual(len(sharefiles), 10)
1300 # also exercise the 'catalog-shares' tool
1301 out,err = StringIO(), StringIO()
1302 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1303 cmd = ["debug", "catalog-shares"] + nodedirs
1304 rc = runner.runner(cmd, stdout=out, stderr=err)
1305 self.failUnlessEqual(rc, 0)
1307 descriptions = [sfn.strip() for sfn in out.readlines()]
1308 self.failUnlessEqual(len(descriptions), 30)
1310 for line in descriptions
1311 if line.startswith("CHK %s " % storage_index_s)]
1312 self.failUnlessEqual(len(matching), 10)
1314 def _test_control(self, res):
1315 # exercise the remote-control-the-client foolscap interfaces in
1316 # allmydata.control (mostly used for performance tests)
1317 c0 = self.clients[0]
1318 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1319 control_furl = open(control_furl_file, "r").read().strip()
1320 # it doesn't really matter which Tub we use to connect to the client,
1321 # so let's just use our IntroducerNode's
1322 d = self.introducer.tub.getReference(control_furl)
1323 d.addCallback(self._test_control2, control_furl_file)
1325 def _test_control2(self, rref, filename):
1326 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1327 downfile = os.path.join(self.basedir, "control.downfile")
1328 d.addCallback(lambda uri:
1329 rref.callRemote("download_from_uri_to_file",
1332 self.failUnlessEqual(res, downfile)
1333 data = open(downfile, "r").read()
1334 expected_data = open(filename, "r").read()
1335 self.failUnlessEqual(data, expected_data)
1336 d.addCallback(_check)
1337 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1338 if sys.platform == "linux2":
1339 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1340 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1343 def _test_cli(self, res):
1344 # run various CLI commands (in a thread, since they use blocking
1347 private_uri = self._private_node.get_uri()
1348 some_uri = self._root_directory_uri
1349 client0_basedir = self.getdir("client0")
1352 "--node-directory", client0_basedir,
1354 TESTDATA = "I will not write the same thing over and over.\n" * 100
1356 d = defer.succeed(None)
1358 # for compatibility with earlier versions, private/root_dir.cap is
1359 # supposed to be treated as an alias named "tahoe:". Start by making
1360 # sure that works, before we add other aliases.
1362 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1363 f = open(root_file, "w")
1364 f.write(private_uri)
1367 def run(ignored, verb, *args, **kwargs):
1368 stdin = kwargs.get("stdin", "")
1369 newargs = [verb] + nodeargs + list(args)
1370 return self._run_cli(newargs, stdin=stdin)
1372 def _check_ls((out,err), expected_children, unexpected_children=[]):
1373 self.failUnlessEqual(err, "")
1374 for s in expected_children:
1375 self.failUnless(s in out, (s,out))
1376 for s in unexpected_children:
1377 self.failIf(s in out, (s,out))
1379 def _check_ls_root((out,err)):
1380 self.failUnless("personal" in out)
1381 self.failUnless("s2-ro" in out)
1382 self.failUnless("s2-rw" in out)
1383 self.failUnlessEqual(err, "")
1385 # this should reference private_uri
1386 d.addCallback(run, "ls")
1387 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1389 d.addCallback(run, "list-aliases")
1390 def _check_aliases_1((out,err)):
1391 self.failUnlessEqual(err, "")
1392 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1393 d.addCallback(_check_aliases_1)
1395 # now that that's out of the way, remove root_dir.cap and work with
1397 d.addCallback(lambda res: os.unlink(root_file))
1398 d.addCallback(run, "list-aliases")
1399 def _check_aliases_2((out,err)):
1400 self.failUnlessEqual(err, "")
1401 self.failUnlessEqual(out, "")
1402 d.addCallback(_check_aliases_2)
1404 d.addCallback(run, "mkdir")
1405 def _got_dir( (out,err) ):
1406 self.failUnless(uri.from_string_dirnode(out.strip()))
1408 d.addCallback(_got_dir)
1409 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1411 d.addCallback(run, "list-aliases")
1412 def _check_aliases_3((out,err)):
1413 self.failUnlessEqual(err, "")
1414 self.failUnless("tahoe: " in out)
1415 d.addCallback(_check_aliases_3)
1417 def _check_empty_dir((out,err)):
1418 self.failUnlessEqual(out, "")
1419 self.failUnlessEqual(err, "")
1420 d.addCallback(run, "ls")
1421 d.addCallback(_check_empty_dir)
1423 def _check_missing_dir((out,err)):
1424 # TODO: check that rc==2
1425 self.failUnlessEqual(out, "")
1426 self.failUnlessEqual(err, "No such file or directory\n")
1427 d.addCallback(run, "ls", "bogus")
1428 d.addCallback(_check_missing_dir)
1433 fn = os.path.join(self.basedir, "file%d" % i)
1435 data = "data to be uploaded: file%d\n" % i
1437 open(fn,"wb").write(data)
1439 def _check_stdout_against((out,err), filenum=None, data=None):
1440 self.failUnlessEqual(err, "")
1441 if filenum is not None:
1442 self.failUnlessEqual(out, datas[filenum])
1443 if data is not None:
1444 self.failUnlessEqual(out, data)
1446 # test all both forms of put: from a file, and from stdin
1448 d.addCallback(run, "put", files[0], "tahoe-file0")
1449 def _put_out((out,err)):
1450 self.failUnless("URI:LIT:" in out, out)
1451 self.failUnless("201 Created" in err, err)
1453 return run(None, "get", uri0)
1454 d.addCallback(_put_out)
1455 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1457 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1458 # tahoe put bar tahoe:FOO
1459 d.addCallback(run, "put", files[2], "tahoe:file2")
1460 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1461 def _check_put_mutable((out,err)):
1462 self._mutable_file3_uri = out.strip()
1463 d.addCallback(_check_put_mutable)
1464 d.addCallback(run, "get", "tahoe:file3")
1465 d.addCallback(_check_stdout_against, 3)
1468 STDIN_DATA = "This is the file to upload from stdin."
1469 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1470 # tahoe put tahoe:FOO
1471 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1472 stdin="Other file from stdin.")
1474 d.addCallback(run, "ls")
1475 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1476 "tahoe-file-stdin", "from-stdin"])
1477 d.addCallback(run, "ls", "subdir")
1478 d.addCallback(_check_ls, ["tahoe-file1"])
1481 d.addCallback(run, "mkdir", "subdir2")
1482 d.addCallback(run, "ls")
1483 # TODO: extract the URI, set an alias with it
1484 d.addCallback(_check_ls, ["subdir2"])
1486 # tahoe get: (to stdin and to a file)
1487 d.addCallback(run, "get", "tahoe-file0")
1488 d.addCallback(_check_stdout_against, 0)
1489 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1490 d.addCallback(_check_stdout_against, 1)
1491 outfile0 = os.path.join(self.basedir, "outfile0")
1492 d.addCallback(run, "get", "file2", outfile0)
1493 def _check_outfile0((out,err)):
1494 data = open(outfile0,"rb").read()
1495 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1496 d.addCallback(_check_outfile0)
1497 outfile1 = os.path.join(self.basedir, "outfile0")
1498 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1499 def _check_outfile1((out,err)):
1500 data = open(outfile1,"rb").read()
1501 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1502 d.addCallback(_check_outfile1)
1504 d.addCallback(run, "rm", "tahoe-file0")
1505 d.addCallback(run, "rm", "tahoe:file2")
1506 d.addCallback(run, "ls")
1507 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1509 d.addCallback(run, "ls", "-l")
1510 def _check_ls_l((out,err)):
1511 lines = out.split("\n")
1513 if "tahoe-file-stdin" in l:
1514 self.failUnless(l.startswith("-r-- "), l)
1515 self.failUnless(" %d " % len(STDIN_DATA) in l)
1517 self.failUnless(l.startswith("-rw- "), l) # mutable
1518 d.addCallback(_check_ls_l)
1520 d.addCallback(run, "ls", "--uri")
1521 def _check_ls_uri((out,err)):
1522 lines = out.split("\n")
1525 self.failUnless(self._mutable_file3_uri in l)
1526 d.addCallback(_check_ls_uri)
1528 d.addCallback(run, "ls", "--readonly-uri")
1529 def _check_ls_rouri((out,err)):
1530 lines = out.split("\n")
1533 rw_uri = self._mutable_file3_uri
1534 u = uri.from_string_mutable_filenode(rw_uri)
1535 ro_uri = u.get_readonly().to_string()
1536 self.failUnless(ro_uri in l)
1537 d.addCallback(_check_ls_rouri)
1540 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1541 d.addCallback(run, "ls")
1542 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1544 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1545 d.addCallback(run, "ls")
1546 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1548 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1549 d.addCallback(run, "ls")
1550 d.addCallback(_check_ls, ["file3", "file3-copy"])
1551 d.addCallback(run, "get", "tahoe:file3-copy")
1552 d.addCallback(_check_stdout_against, 3)
1554 # copy from disk into tahoe
1555 d.addCallback(run, "cp", files[4], "tahoe:file4")
1556 d.addCallback(run, "ls")
1557 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1558 d.addCallback(run, "get", "tahoe:file4")
1559 d.addCallback(_check_stdout_against, 4)
1561 # copy from tahoe into disk
1562 target_filename = os.path.join(self.basedir, "file-out")
1563 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1564 def _check_cp_out((out,err)):
1565 self.failUnless(os.path.exists(target_filename))
1566 got = open(target_filename,"rb").read()
1567 self.failUnlessEqual(got, datas[4])
1568 d.addCallback(_check_cp_out)
1570 # copy from disk to disk (silly case)
1571 target2_filename = os.path.join(self.basedir, "file-out-copy")
1572 d.addCallback(run, "cp", target_filename, target2_filename)
1573 def _check_cp_out2((out,err)):
1574 self.failUnless(os.path.exists(target2_filename))
1575 got = open(target2_filename,"rb").read()
1576 self.failUnlessEqual(got, datas[4])
1577 d.addCallback(_check_cp_out2)
1579 # copy from tahoe into disk, overwriting an existing file
1580 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1581 def _check_cp_out3((out,err)):
1582 self.failUnless(os.path.exists(target_filename))
1583 got = open(target_filename,"rb").read()
1584 self.failUnlessEqual(got, datas[3])
1585 d.addCallback(_check_cp_out3)
1587 # copy from disk into tahoe, overwriting an existing immutable file
1588 d.addCallback(run, "cp", files[5], "tahoe:file4")
1589 d.addCallback(run, "ls")
1590 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1591 d.addCallback(run, "get", "tahoe:file4")
1592 d.addCallback(_check_stdout_against, 5)
1594 # copy from disk into tahoe, overwriting an existing mutable file
1595 d.addCallback(run, "cp", files[5], "tahoe:file3")
1596 d.addCallback(run, "ls")
1597 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1598 d.addCallback(run, "get", "tahoe:file3")
1599 d.addCallback(_check_stdout_against, 5)
1601 # recursive copy: setup
1602 dn = os.path.join(self.basedir, "dir1")
1604 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1605 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1606 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1607 sdn2 = os.path.join(dn, "subdir2")
1609 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1610 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1612 # from disk into tahoe
1613 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1614 d.addCallback(run, "ls")
1615 d.addCallback(_check_ls, ["dir1"])
1616 d.addCallback(run, "ls", "dir1")
1617 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1618 ["rfile4", "rfile5"])
1619 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1620 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1621 ["rfile1", "rfile2", "rfile3"])
1622 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1623 d.addCallback(_check_stdout_against, data="rfile4")
1625 # and back out again
1626 dn_copy = os.path.join(self.basedir, "dir1-copy")
1627 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1628 def _check_cp_r_out((out,err)):
1630 old = open(os.path.join(dn, name), "rb").read()
1631 newfn = os.path.join(dn_copy, name)
1632 self.failUnless(os.path.exists(newfn))
1633 new = open(newfn, "rb").read()
1634 self.failUnlessEqual(old, new)
1638 _cmp(os.path.join("subdir2", "rfile4"))
1639 _cmp(os.path.join("subdir2", "rfile5"))
1640 d.addCallback(_check_cp_r_out)
1642 # and copy it a second time, which ought to overwrite the same files
1643 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1645 # and tahoe-to-tahoe
1646 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1647 d.addCallback(run, "ls")
1648 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1649 d.addCallback(run, "ls", "dir1-copy")
1650 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1651 ["rfile4", "rfile5"])
1652 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1653 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1654 ["rfile1", "rfile2", "rfile3"])
1655 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1656 d.addCallback(_check_stdout_against, data="rfile4")
1658 # and copy it a second time, which ought to overwrite the same files
1659 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1661 # tahoe_ls doesn't currently handle the error correctly: it tries to
1662 # JSON-parse a traceback.
1663 ## def _ls_missing(res):
1664 ## argv = ["ls"] + nodeargs + ["bogus"]
1665 ## return self._run_cli(argv)
1666 ## d.addCallback(_ls_missing)
1667 ## def _check_ls_missing((out,err)):
1670 ## self.failUnlessEqual(err, "")
1671 ## d.addCallback(_check_ls_missing)
1675 def _run_cli(self, argv, stdin=""):
1677 stdout, stderr = StringIO(), StringIO()
1678 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1679 stdin=StringIO(stdin),
1680 stdout=stdout, stderr=stderr)
1682 return stdout.getvalue(), stderr.getvalue()
1683 d.addCallback(_done)
1686 def _test_checker(self, res):
1687 ut = upload.Data("too big to be literal" * 200, convergence=None)
1688 d = self._personal_node.add_file(u"big file", ut)
1690 d.addCallback(lambda res: self._personal_node.check())
1691 def _check_dirnode_results(r):
1692 self.failUnless(r.is_healthy())
1693 d.addCallback(_check_dirnode_results)
1694 d.addCallback(lambda res: self._personal_node.check(verify=True))
1695 d.addCallback(_check_dirnode_results)
1697 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1698 def _got_chk_filenode(n):
1699 self.failUnless(isinstance(n, filenode.FileNode))
1701 def _check_filenode_results(r):
1702 self.failUnless(r.is_healthy())
1703 d.addCallback(_check_filenode_results)
1704 d.addCallback(lambda res: n.check(verify=True))
1705 d.addCallback(_check_filenode_results)
1707 d.addCallback(_got_chk_filenode)
1709 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1710 def _got_lit_filenode(n):
1711 self.failUnless(isinstance(n, filenode.LiteralFileNode))
1713 def _check_lit_filenode_results(r):
1714 self.failUnlessEqual(r, None)
1715 d.addCallback(_check_lit_filenode_results)
1716 d.addCallback(lambda res: n.check(verify=True))
1717 d.addCallback(_check_lit_filenode_results)
1719 d.addCallback(_got_lit_filenode)
1723 class MutableChecker(SystemTestMixin, unittest.TestCase):
1725 def _run_cli(self, argv):
1726 stdout, stderr = StringIO(), StringIO()
1727 runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1728 return stdout.getvalue()
1730 def test_good(self):
1731 self.basedir = self.mktemp()
1732 d = self.set_up_nodes()
1733 CONTENTS = "a little bit of data"
1734 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1737 si = self.node.get_storage_index()
1738 d.addCallback(_created)
1739 # now make sure the webapi verifier sees no problems
1741 url = (self.webish_url +
1742 "uri/%s" % urllib.quote(self.node.get_uri()) +
1743 "?t=check&verify=true")
1744 return getPage(url, method="POST")
1745 d.addCallback(_do_check)
1746 def _got_results(out):
1747 self.failUnless("<div>Healthy!</div>" in out, out)
1748 self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1749 self.failIf("Not Healthy!" in out, out)
1750 self.failIf("Unhealthy" in out, out)
1751 self.failIf("Corrupt Shares" in out, out)
1752 d.addCallback(_got_results)
1755 def test_corrupt(self):
1756 self.basedir = self.mktemp()
1757 d = self.set_up_nodes()
1758 CONTENTS = "a little bit of data"
1759 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1762 si = self.node.get_storage_index()
1763 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1764 self.clients[1].basedir])
1765 files = out.split("\n")
1766 # corrupt one of them, using the CLI debug command
1768 shnum = os.path.basename(f)
1769 nodeid = self.clients[1].nodeid
1770 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1771 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1772 out = self._run_cli(["debug", "corrupt-share", files[0]])
1773 d.addCallback(_created)
1774 # now make sure the webapi verifier notices it
1776 url = (self.webish_url +
1777 "uri/%s" % urllib.quote(self.node.get_uri()) +
1778 "?t=check&verify=true")
1779 return getPage(url, method="POST")
1780 d.addCallback(_do_check)
1781 def _got_results(out):
1782 self.failUnless("Not Healthy!" in out, out)
1783 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1784 self.failUnless("Corrupt Shares:" in out, out)
1785 d.addCallback(_got_results)
1787 # now make sure the webapi repairer can fix it
1788 def _do_repair(res):
1789 url = (self.webish_url +
1790 "uri/%s" % urllib.quote(self.node.get_uri()) +
1791 "?t=check&verify=true&repair=true")
1792 return getPage(url, method="POST")
1793 d.addCallback(_do_repair)
1794 def _got_repair_results(out):
1795 self.failUnless("<div>Repair successful</div>" in out, out)
1796 d.addCallback(_got_repair_results)
1797 d.addCallback(_do_check)
1798 def _got_postrepair_results(out):
1799 self.failIf("Not Healthy!" in out, out)
1800 self.failUnless("Recoverable Versions: 10*seq" in out, out)
1801 d.addCallback(_got_postrepair_results)
1805 def test_delete_share(self):
1806 self.basedir = self.mktemp()
1807 d = self.set_up_nodes()
1808 CONTENTS = "a little bit of data"
1809 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1812 si = self.node.get_storage_index()
1813 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1814 self.clients[1].basedir])
1815 files = out.split("\n")
1816 # corrupt one of them, using the CLI debug command
1818 shnum = os.path.basename(f)
1819 nodeid = self.clients[1].nodeid
1820 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1821 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1823 d.addCallback(_created)
1824 # now make sure the webapi checker notices it
1826 url = (self.webish_url +
1827 "uri/%s" % urllib.quote(self.node.get_uri()) +
1828 "?t=check&verify=false")
1829 return getPage(url, method="POST")
1830 d.addCallback(_do_check)
1831 def _got_results(out):
1832 self.failUnless("Not Healthy!" in out, out)
1833 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1834 self.failIf("Corrupt Shares" in out, out)
1835 d.addCallback(_got_results)
1837 # now make sure the webapi repairer can fix it
1838 def _do_repair(res):
1839 url = (self.webish_url +
1840 "uri/%s" % urllib.quote(self.node.get_uri()) +
1841 "?t=check&verify=false&repair=true")
1842 return getPage(url, method="POST")
1843 d.addCallback(_do_repair)
1844 def _got_repair_results(out):
1845 self.failUnless("Repair successful" in out)
1846 d.addCallback(_got_repair_results)
1847 d.addCallback(_do_check)
1848 def _got_postrepair_results(out):
1849 self.failIf("Not Healthy!" in out, out)
1850 self.failUnless("Recoverable Versions: 10*seq" in out)
1851 d.addCallback(_got_postrepair_results)
1855 class DeepCheckWeb(SystemTestMixin, unittest.TestCase):
1856 # construct a small directory tree (with one dir, one immutable file, one
1857 # mutable file, one LIT file, and a loop), and then check/examine it in
1860 def set_up_tree(self, ignored):
1862 c0 = self.clients[0]
1863 d = c0.create_empty_dirnode()
1864 def _created_root(n):
1866 self.root_uri = n.get_uri()
1867 d.addCallback(_created_root)
1868 d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1869 d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1870 def _created_mutable(n):
1872 self.mutable_uri = n.get_uri()
1873 d.addCallback(_created_mutable)
1875 large = upload.Data("Lots of data\n" * 1000, None)
1876 d.addCallback(lambda ign: self.root.add_file(u"large", large))
1877 def _created_large(n):
1879 self.large_uri = n.get_uri()
1880 d.addCallback(_created_large)
1882 small = upload.Data("Small enough for a LIT", None)
1883 d.addCallback(lambda ign: self.root.add_file(u"small", small))
1884 def _created_small(n):
1886 self.small_uri = n.get_uri()
1887 d.addCallback(_created_small)
1889 d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
1892 def check_is_healthy(self, cr, n, where, incomplete=False):
1893 self.failUnless(ICheckerResults.providedBy(cr), where)
1894 self.failUnless(cr.is_healthy(), where)
1895 self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
1897 self.failUnlessEqual(cr.get_storage_index_string(),
1898 base32.b2a(n.get_storage_index()), where)
1899 needs_rebalancing = bool( len(self.clients) < 10 )
1901 self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
1903 self.failUnlessEqual(d["count-shares-good"], 10, where)
1904 self.failUnlessEqual(d["count-shares-needed"], 3, where)
1905 self.failUnlessEqual(d["count-shares-expected"], 10, where)
1907 self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
1908 self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
1909 self.failUnlessEqual(d["list-corrupt-shares"], [], where)
1911 self.failUnlessEqual(sorted(d["servers-responding"]),
1912 sorted([c.nodeid for c in self.clients]),
1914 self.failUnless("sharemap" in d, where)
1915 all_serverids = set()
1916 for (shareid, serverids) in d["sharemap"].items():
1917 all_serverids.update(serverids)
1918 self.failUnlessEqual(sorted(all_serverids),
1919 sorted([c.nodeid for c in self.clients]),
1922 self.failUnlessEqual(d["count-wrong-shares"], 0, where)
1923 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
1924 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
1927 def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
1928 self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
1929 self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
1930 self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
1931 self.failUnless(cr.get_post_repair_results().is_healthy(), where)
1932 self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
1933 self.failIf(cr.get_repair_attempted(), where)
1935 def deep_check_is_healthy(self, cr, num_healthy, where):
1936 self.failUnless(IDeepCheckResults.providedBy(cr))
1937 self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
1940 def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
1941 self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
1942 c = cr.get_counters()
1943 self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
1945 self.failUnlessEqual(c["count-objects-healthy-post-repair"],
1947 self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
1949 def test_good(self):
1950 self.basedir = self.mktemp()
1951 d = self.set_up_nodes()
1952 d.addCallback(self.set_up_tree)
1953 d.addCallback(self.do_stats)
1954 d.addCallback(self.do_test_good)
1955 d.addCallback(self.do_test_web)
1958 def do_stats(self, ignored):
1959 d = defer.succeed(None)
1960 d.addCallback(lambda ign: self.root.deep_stats())
1961 d.addCallback(self.check_stats)
1964 def check_stats(self, s):
1965 self.failUnlessEqual(s["count-directories"], 1)
1966 self.failUnlessEqual(s["count-files"], 3)
1967 self.failUnlessEqual(s["count-immutable-files"], 1)
1968 self.failUnlessEqual(s["count-literal-files"], 1)
1969 self.failUnlessEqual(s["count-mutable-files"], 1)
1970 # don't check directories: their size will vary
1971 # s["largest-directory"]
1972 # s["size-directories"]
1973 self.failUnlessEqual(s["largest-directory-children"], 4)
1974 self.failUnlessEqual(s["largest-immutable-file"], 13000)
1975 # to re-use this function for both the local dirnode.deep_stats() and
1976 # the webapi t=deep-stats, we coerce the result into a list of
1977 # tuples. dirnode.deep_stats() returns a list of tuples, but JSON
1978 # only knows about lists., so t=deep-stats returns a list of lists.
1979 histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
1980 self.failUnlessEqual(histogram, [(11, 31, 1),
1983 self.failUnlessEqual(s["size-immutable-files"], 13000)
1984 self.failUnlessEqual(s["size-literal-files"], 22)
1986 def do_test_good(self, ignored):
1987 d = defer.succeed(None)
1988 # check the individual items
1989 d.addCallback(lambda ign: self.root.check())
1990 d.addCallback(self.check_is_healthy, self.root, "root")
1991 d.addCallback(lambda ign: self.mutable.check())
1992 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
1993 d.addCallback(lambda ign: self.large.check())
1994 d.addCallback(self.check_is_healthy, self.large, "large")
1995 d.addCallback(lambda ign: self.small.check())
1996 d.addCallback(self.failUnlessEqual, None, "small")
1998 # and again with verify=True
1999 d.addCallback(lambda ign: self.root.check(verify=True))
2000 d.addCallback(self.check_is_healthy, self.root, "root")
2001 d.addCallback(lambda ign: self.mutable.check(verify=True))
2002 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2003 d.addCallback(lambda ign: self.large.check(verify=True))
2004 d.addCallback(self.check_is_healthy, self.large, "large",
2006 d.addCallback(lambda ign: self.small.check(verify=True))
2007 d.addCallback(self.failUnlessEqual, None, "small")
2009 # and check_and_repair(), which should be a nop
2010 d.addCallback(lambda ign: self.root.check_and_repair())
2011 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2012 d.addCallback(lambda ign: self.mutable.check_and_repair())
2013 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2014 d.addCallback(lambda ign: self.large.check_and_repair())
2015 d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2016 d.addCallback(lambda ign: self.small.check_and_repair())
2017 d.addCallback(self.failUnlessEqual, None, "small")
2019 # check_and_repair(verify=True)
2020 d.addCallback(lambda ign: self.root.check_and_repair(verify=True))
2021 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2022 d.addCallback(lambda ign: self.mutable.check_and_repair(verify=True))
2023 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2024 d.addCallback(lambda ign: self.large.check_and_repair(verify=True))
2025 d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2027 d.addCallback(lambda ign: self.small.check_and_repair(verify=True))
2028 d.addCallback(self.failUnlessEqual, None, "small")
2031 # now deep-check the root, with various verify= and repair= options
2032 d.addCallback(lambda ign: self.root.deep_check())
2033 d.addCallback(self.deep_check_is_healthy, 3, "root")
2034 d.addCallback(lambda ign: self.root.deep_check(verify=True))
2035 d.addCallback(self.deep_check_is_healthy, 3, "root")
2036 d.addCallback(lambda ign: self.root.deep_check_and_repair())
2037 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2038 d.addCallback(lambda ign: self.root.deep_check_and_repair(verify=True))
2039 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2043 def web_json(self, n, **kwargs):
2044 kwargs["output"] = "json"
2045 d = self.web(n, "POST", **kwargs)
2046 d.addCallback(self.decode_json)
2049 def decode_json(self, (s,url)):
2051 data = simplejson.loads(s)
2053 self.fail("%s: not JSON: '%s'" % (url, s))
2056 def web(self, n, method="GET", **kwargs):
2057 # returns (data, url)
2058 url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
2059 + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
2060 d = getPage(url, method=method)
2061 d.addCallback(lambda data: (data,url))
2064 def json_check_is_healthy(self, data, n, where, incomplete=False):
2066 self.failUnlessEqual(data["storage-index"],
2067 base32.b2a(n.get_storage_index()), where)
2069 self.failUnlessEqual(r["healthy"], True, where)
2070 needs_rebalancing = bool( len(self.clients) < 10 )
2072 self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2073 self.failUnlessEqual(r["count-shares-good"], 10, where)
2074 self.failUnlessEqual(r["count-shares-needed"], 3, where)
2075 self.failUnlessEqual(r["count-shares-expected"], 10, where)
2077 self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2078 self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2079 self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2081 self.failUnlessEqual(sorted(r["servers-responding"]),
2082 sorted([idlib.nodeid_b2a(c.nodeid)
2083 for c in self.clients]), where)
2084 self.failUnless("sharemap" in r, where)
2085 all_serverids = set()
2086 for (shareid, serverids_s) in r["sharemap"].items():
2087 all_serverids.update(serverids_s)
2088 self.failUnlessEqual(sorted(all_serverids),
2089 sorted([idlib.nodeid_b2a(c.nodeid)
2090 for c in self.clients]), where)
2091 self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2092 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2093 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2095 def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2096 self.failUnlessEqual(data["storage-index"],
2097 base32.b2a(n.get_storage_index()), where)
2098 self.failUnlessEqual(data["repair-attempted"], False, where)
2099 self.json_check_is_healthy(data["pre-repair-results"],
2100 n, where, incomplete)
2101 self.json_check_is_healthy(data["post-repair-results"],
2102 n, where, incomplete)
2104 def json_full_deepcheck_is_healthy(self, data, n, where):
2105 self.failUnlessEqual(data["root-storage-index"],
2106 base32.b2a(n.get_storage_index()), where)
2107 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2108 self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2109 self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2110 self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2111 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2112 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2113 self.json_check_stats(data["stats"], where)
2115 def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2116 self.failUnlessEqual(data["root-storage-index"],
2117 base32.b2a(n.get_storage_index()), where)
2118 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2120 self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2121 self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2122 self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2124 self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2125 self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2126 self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2128 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2129 self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2130 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2132 self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2133 self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2134 self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2137 def json_check_lit(self, data, n, where):
2138 self.failUnlessEqual(data["storage-index"], "", where)
2139 self.failUnlessEqual(data["results"]["healthy"], True, where)
2141 def json_check_stats(self, data, where):
2142 self.check_stats(data)
2144 def do_test_web(self, ignored):
2145 d = defer.succeed(None)
2148 d.addCallback(lambda ign: self.web(self.root, t="deep-stats"))
2149 d.addCallback(self.decode_json)
2150 d.addCallback(self.json_check_stats, "deep-stats")
2153 d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2154 d.addCallback(self.json_check_is_healthy, self.root, "root")
2155 d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2156 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2157 d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2158 d.addCallback(self.json_check_is_healthy, self.large, "large")
2159 d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2160 d.addCallback(self.json_check_lit, self.small, "small")
2163 d.addCallback(lambda ign:
2164 self.web_json(self.root, t="check", verify="true"))
2165 d.addCallback(self.json_check_is_healthy, self.root, "root")
2166 d.addCallback(lambda ign:
2167 self.web_json(self.mutable, t="check", verify="true"))
2168 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2169 d.addCallback(lambda ign:
2170 self.web_json(self.large, t="check", verify="true"))
2171 d.addCallback(self.json_check_is_healthy, self.large, "large", incomplete=True)
2172 d.addCallback(lambda ign:
2173 self.web_json(self.small, t="check", verify="true"))
2174 d.addCallback(self.json_check_lit, self.small, "small")
2176 # check and repair, no verify
2177 d.addCallback(lambda ign:
2178 self.web_json(self.root, t="check", repair="true"))
2179 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2180 d.addCallback(lambda ign:
2181 self.web_json(self.mutable, t="check", repair="true"))
2182 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2183 d.addCallback(lambda ign:
2184 self.web_json(self.large, t="check", repair="true"))
2185 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large")
2186 d.addCallback(lambda ign:
2187 self.web_json(self.small, t="check", repair="true"))
2188 d.addCallback(self.json_check_lit, self.small, "small")
2190 # check+verify+repair
2191 d.addCallback(lambda ign:
2192 self.web_json(self.root, t="check", repair="true", verify="true"))
2193 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2194 d.addCallback(lambda ign:
2195 self.web_json(self.mutable, t="check", repair="true", verify="true"))
2196 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2197 d.addCallback(lambda ign:
2198 self.web_json(self.large, t="check", repair="true", verify="true"))
2199 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large", incomplete=True)
2200 d.addCallback(lambda ign:
2201 self.web_json(self.small, t="check", repair="true", verify="true"))
2202 d.addCallback(self.json_check_lit, self.small, "small")
2204 # now run a deep-check, with various verify= and repair= flags
2205 d.addCallback(lambda ign:
2206 self.web_json(self.root, t="deep-check"))
2207 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2208 d.addCallback(lambda ign:
2209 self.web_json(self.root, t="deep-check", verify="true"))
2210 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2211 d.addCallback(lambda ign:
2212 self.web_json(self.root, t="deep-check", repair="true"))
2213 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2214 d.addCallback(lambda ign:
2215 self.web_json(self.root, t="deep-check", verify="true", repair="true"))
2216 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2218 # now look at t=info
2219 d.addCallback(lambda ign: self.web(self.root, t="info"))
2220 # TODO: examine the output
2221 d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2222 d.addCallback(lambda ign: self.web(self.large, t="info"))
2223 d.addCallback(lambda ign: self.web(self.small, t="info"))