1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
11 from allmydata import uri, storage, offloaded
12 from allmydata.immutable import download, upload, filenode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17 ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
18 IDeepCheckAndRepairResults, NoSuchChildError, NotEnoughSharesError
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
27 from allmydata.test.common import SystemTestMixin, ErrorMixin, \
28 MemoryConsumer, download_to_data
31 This is some data to publish to the virtual drive, which needs to be large
32 enough to not fit inside a LIT uri.
35 class CountingDataUploadable(upload.Data):
37 interrupt_after = None
38 interrupt_after_d = None
40 def read(self, length):
41 self.bytes_read += length
42 if self.interrupt_after is not None:
43 if self.bytes_read > self.interrupt_after:
44 self.interrupt_after = None
45 self.interrupt_after_d.callback(self)
46 return upload.Data.read(self, length)
48 class GrabEverythingConsumer:
54 def registerProducer(self, producer, streaming):
56 assert IPushProducer.providedBy(producer)
58 def write(self, data):
61 def unregisterProducer(self):
64 class SystemTest(SystemTestMixin, unittest.TestCase):
66 def test_connections(self):
67 self.basedir = "system/SystemTest/test_connections"
68 d = self.set_up_nodes()
69 self.extra_node = None
70 d.addCallback(lambda res: self.add_extra_node(self.numclients))
71 def _check(extra_node):
72 self.extra_node = extra_node
73 for c in self.clients:
74 all_peerids = list(c.get_all_peerids())
75 self.failUnlessEqual(len(all_peerids), self.numclients+1)
76 permuted_peers = list(c.get_permuted_peers("storage", "a"))
77 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
80 def _shutdown_extra_node(res):
82 return self.extra_node.stopService()
84 d.addBoth(_shutdown_extra_node)
86 test_connections.timeout = 300
87 # test_connections is subsumed by test_upload_and_download, and takes
88 # quite a while to run on a slow machine (because of all the TLS
89 # connections that must be established). If we ever rework the introducer
90 # code to such an extent that we're not sure if it works anymore, we can
91 # reinstate this test until it does.
94 def test_upload_and_download_random_key(self):
95 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
96 return self._test_upload_and_download(convergence=None)
97 test_upload_and_download_random_key.timeout = 4800
99 def test_upload_and_download_convergent(self):
100 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
101 return self._test_upload_and_download(convergence="some convergence string")
102 test_upload_and_download_convergent.timeout = 4800
104 def _test_upload_and_download(self, convergence):
105 # we use 4000 bytes of data, which will result in about 400k written
106 # to disk among all our simulated nodes
107 DATA = "Some data to upload\n" * 200
108 d = self.set_up_nodes()
109 def _check_connections(res):
110 for c in self.clients:
111 all_peerids = list(c.get_all_peerids())
112 self.failUnlessEqual(len(all_peerids), self.numclients)
113 permuted_peers = list(c.get_permuted_peers("storage", "a"))
114 self.failUnlessEqual(len(permuted_peers), self.numclients)
115 d.addCallback(_check_connections)
119 u = self.clients[0].getServiceNamed("uploader")
121 # we crank the max segsize down to 1024b for the duration of this
122 # test, so we can exercise multiple segments. It is important
123 # that this is not a multiple of the segment size, so that the
124 # tail segment is not the same length as the others. This actualy
125 # gets rounded up to 1025 to be a multiple of the number of
126 # required shares (since we use 25 out of 100 FEC).
127 up = upload.Data(DATA, convergence=convergence)
128 up.max_segment_size = 1024
131 d.addCallback(_do_upload)
132 def _upload_done(results):
134 log.msg("upload finished: uri is %s" % (uri,))
136 dl = self.clients[1].getServiceNamed("downloader")
138 d.addCallback(_upload_done)
140 def _upload_again(res):
141 # Upload again. If using convergent encryption then this ought to be
142 # short-circuited, however with the way we currently generate URIs
143 # (i.e. because they include the roothash), we have to do all of the
144 # encoding work, and only get to save on the upload part.
145 log.msg("UPLOADING AGAIN")
146 up = upload.Data(DATA, convergence=convergence)
147 up.max_segment_size = 1024
148 d1 = self.uploader.upload(up)
149 d.addCallback(_upload_again)
151 def _download_to_data(res):
152 log.msg("DOWNLOADING")
153 return self.downloader.download_to_data(self.uri)
154 d.addCallback(_download_to_data)
155 def _download_to_data_done(data):
156 log.msg("download finished")
157 self.failUnlessEqual(data, DATA)
158 d.addCallback(_download_to_data_done)
160 target_filename = os.path.join(self.basedir, "download.target")
161 def _download_to_filename(res):
162 return self.downloader.download_to_filename(self.uri,
164 d.addCallback(_download_to_filename)
165 def _download_to_filename_done(res):
166 newdata = open(target_filename, "rb").read()
167 self.failUnlessEqual(newdata, DATA)
168 d.addCallback(_download_to_filename_done)
170 target_filename2 = os.path.join(self.basedir, "download.target2")
171 def _download_to_filehandle(res):
172 fh = open(target_filename2, "wb")
173 return self.downloader.download_to_filehandle(self.uri, fh)
174 d.addCallback(_download_to_filehandle)
175 def _download_to_filehandle_done(fh):
177 newdata = open(target_filename2, "rb").read()
178 self.failUnlessEqual(newdata, DATA)
179 d.addCallback(_download_to_filehandle_done)
181 consumer = GrabEverythingConsumer()
182 ct = download.ConsumerAdapter(consumer)
183 d.addCallback(lambda res:
184 self.downloader.download(self.uri, ct))
185 def _download_to_consumer_done(ign):
186 self.failUnlessEqual(consumer.contents, DATA)
187 d.addCallback(_download_to_consumer_done)
190 n = self.clients[1].create_node_from_uri(self.uri)
191 d = download_to_data(n)
192 def _read_done(data):
193 self.failUnlessEqual(data, DATA)
194 d.addCallback(_read_done)
195 d.addCallback(lambda ign:
196 n.read(MemoryConsumer(), offset=1, size=4))
197 def _read_portion_done(mc):
198 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
199 d.addCallback(_read_portion_done)
200 d.addCallback(lambda ign:
201 n.read(MemoryConsumer(), offset=2, size=None))
202 def _read_tail_done(mc):
203 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
204 d.addCallback(_read_tail_done)
205 d.addCallback(lambda ign:
206 n.read(MemoryConsumer(), size=len(DATA)+1000))
207 def _read_too_much(mc):
208 self.failUnlessEqual("".join(mc.chunks), DATA)
209 d.addCallback(_read_too_much)
212 d.addCallback(_test_read)
214 def _test_bad_read(res):
215 bad_u = uri.from_string_filenode(self.uri)
216 bad_u.key = self.flip_bit(bad_u.key)
217 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
218 # this should cause an error during download
220 d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
222 bad_n.read, MemoryConsumer(), offset=2)
224 d.addCallback(_test_bad_read)
226 def _download_nonexistent_uri(res):
227 baduri = self.mangle_uri(self.uri)
228 log.msg("about to download non-existent URI", level=log.UNUSUAL,
229 facility="tahoe.tests")
230 d1 = self.downloader.download_to_data(baduri)
231 def _baduri_should_fail(res):
232 log.msg("finished downloading non-existend URI",
233 level=log.UNUSUAL, facility="tahoe.tests")
234 self.failUnless(isinstance(res, Failure))
235 self.failUnless(res.check(NotEnoughSharesError),
236 "expected NotEnoughSharesError, got %s" % res)
237 # TODO: files that have zero peers should get a special kind
238 # of NotEnoughSharesError, which can be used to suggest that
239 # the URI might be wrong or that they've never uploaded the
240 # file in the first place.
241 d1.addBoth(_baduri_should_fail)
243 d.addCallback(_download_nonexistent_uri)
245 # add a new node, which doesn't accept shares, and only uses the
247 d.addCallback(lambda res: self.add_extra_node(self.numclients,
249 add_to_sparent=True))
250 def _added(extra_node):
251 self.extra_node = extra_node
252 extra_node.getServiceNamed("storage").sizelimit = 0
253 d.addCallback(_added)
255 HELPER_DATA = "Data that needs help to upload" * 1000
256 def _upload_with_helper(res):
257 u = upload.Data(HELPER_DATA, convergence=convergence)
258 d = self.extra_node.upload(u)
259 def _uploaded(results):
261 return self.downloader.download_to_data(uri)
262 d.addCallback(_uploaded)
264 self.failUnlessEqual(newdata, HELPER_DATA)
265 d.addCallback(_check)
267 d.addCallback(_upload_with_helper)
269 def _upload_duplicate_with_helper(res):
270 u = upload.Data(HELPER_DATA, convergence=convergence)
271 u.debug_stash_RemoteEncryptedUploadable = True
272 d = self.extra_node.upload(u)
273 def _uploaded(results):
275 return self.downloader.download_to_data(uri)
276 d.addCallback(_uploaded)
278 self.failUnlessEqual(newdata, HELPER_DATA)
279 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
280 "uploadable started uploading, should have been avoided")
281 d.addCallback(_check)
283 if convergence is not None:
284 d.addCallback(_upload_duplicate_with_helper)
286 def _upload_resumable(res):
287 DATA = "Data that needs help to upload and gets interrupted" * 1000
288 u1 = CountingDataUploadable(DATA, convergence=convergence)
289 u2 = CountingDataUploadable(DATA, convergence=convergence)
291 # we interrupt the connection after about 5kB by shutting down
292 # the helper, then restartingit.
293 u1.interrupt_after = 5000
294 u1.interrupt_after_d = defer.Deferred()
295 u1.interrupt_after_d.addCallback(lambda res:
296 self.bounce_client(0))
298 # sneak into the helper and reduce its chunk size, so that our
299 # debug_interrupt will sever the connection on about the fifth
300 # chunk fetched. This makes sure that we've started to write the
301 # new shares before we abandon them, which exercises the
302 # abort/delete-partial-share code. TODO: find a cleaner way to do
303 # this. I know that this will affect later uses of the helper in
304 # this same test run, but I'm not currently worried about it.
305 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
307 d = self.extra_node.upload(u1)
309 def _should_not_finish(res):
310 self.fail("interrupted upload should have failed, not finished"
311 " with result %s" % (res,))
313 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
315 # make sure we actually interrupted it before finishing the
317 self.failUnless(u1.bytes_read < len(DATA),
318 "read %d out of %d total" % (u1.bytes_read,
321 log.msg("waiting for reconnect", level=log.NOISY,
322 facility="tahoe.test.test_system")
323 # now, we need to give the nodes a chance to notice that this
324 # connection has gone away. When this happens, the storage
325 # servers will be told to abort their uploads, removing the
326 # partial shares. Unfortunately this involves TCP messages
327 # going through the loopback interface, and we can't easily
328 # predict how long that will take. If it were all local, we
329 # could use fireEventually() to stall. Since we don't have
330 # the right introduction hooks, the best we can do is use a
331 # fixed delay. TODO: this is fragile.
332 u1.interrupt_after_d.addCallback(self.stall, 2.0)
333 return u1.interrupt_after_d
334 d.addCallbacks(_should_not_finish, _interrupted)
336 def _disconnected(res):
337 # check to make sure the storage servers aren't still hanging
338 # on to the partial share: their incoming/ directories should
340 log.msg("disconnected", level=log.NOISY,
341 facility="tahoe.test.test_system")
342 for i in range(self.numclients):
343 incdir = os.path.join(self.getdir("client%d" % i),
344 "storage", "shares", "incoming")
345 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
346 d.addCallback(_disconnected)
348 # then we need to give the reconnector a chance to
349 # reestablish the connection to the helper.
350 d.addCallback(lambda res:
351 log.msg("wait_for_connections", level=log.NOISY,
352 facility="tahoe.test.test_system"))
353 d.addCallback(lambda res: self.wait_for_connections())
356 d.addCallback(lambda res:
357 log.msg("uploading again", level=log.NOISY,
358 facility="tahoe.test.test_system"))
359 d.addCallback(lambda res: self.extra_node.upload(u2))
361 def _uploaded(results):
363 log.msg("Second upload complete", level=log.NOISY,
364 facility="tahoe.test.test_system")
366 # this is really bytes received rather than sent, but it's
367 # convenient and basically measures the same thing
368 bytes_sent = results.ciphertext_fetched
370 # We currently don't support resumption of upload if the data is
371 # encrypted with a random key. (Because that would require us
372 # to store the key locally and re-use it on the next upload of
373 # this file, which isn't a bad thing to do, but we currently
375 if convergence is not None:
376 # Make sure we did not have to read the whole file the
377 # second time around .
378 self.failUnless(bytes_sent < len(DATA),
379 "resumption didn't save us any work:"
380 " read %d bytes out of %d total" %
381 (bytes_sent, len(DATA)))
383 # Make sure we did have to read the whole file the second
384 # time around -- because the one that we partially uploaded
385 # earlier was encrypted with a different random key.
386 self.failIf(bytes_sent < len(DATA),
387 "resumption saved us some work even though we were using random keys:"
388 " read %d bytes out of %d total" %
389 (bytes_sent, len(DATA)))
390 return self.downloader.download_to_data(uri)
391 d.addCallback(_uploaded)
394 self.failUnlessEqual(newdata, DATA)
395 # If using convergent encryption, then also check that the
396 # helper has removed the temp file from its directories.
397 if convergence is not None:
398 basedir = os.path.join(self.getdir("client0"), "helper")
399 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
400 self.failUnlessEqual(files, [])
401 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
402 self.failUnlessEqual(files, [])
403 d.addCallback(_check)
405 d.addCallback(_upload_resumable)
409 def _find_shares(self, basedir):
411 for (dirpath, dirnames, filenames) in os.walk(basedir):
412 if "storage" not in dirpath:
416 pieces = dirpath.split(os.sep)
417 if pieces[-4] == "storage" and pieces[-3] == "shares":
418 # we're sitting in .../storage/shares/$START/$SINDEX , and there
419 # are sharefiles here
420 assert pieces[-5].startswith("client")
421 client_num = int(pieces[-5][-1])
422 storage_index_s = pieces[-1]
423 storage_index = storage.si_a2b(storage_index_s)
424 for sharename in filenames:
425 shnum = int(sharename)
426 filename = os.path.join(dirpath, sharename)
427 data = (client_num, storage_index, filename, shnum)
430 self.fail("unable to find any share files in %s" % basedir)
433 def _corrupt_mutable_share(self, filename, which):
434 msf = storage.MutableShareFile(filename)
435 datav = msf.readv([ (0, 1000000) ])
436 final_share = datav[0]
437 assert len(final_share) < 1000000 # ought to be truncated
438 pieces = mutable_layout.unpack_share(final_share)
439 (seqnum, root_hash, IV, k, N, segsize, datalen,
440 verification_key, signature, share_hash_chain, block_hash_tree,
441 share_data, enc_privkey) = pieces
443 if which == "seqnum":
446 root_hash = self.flip_bit(root_hash)
448 IV = self.flip_bit(IV)
449 elif which == "segsize":
450 segsize = segsize + 15
451 elif which == "pubkey":
452 verification_key = self.flip_bit(verification_key)
453 elif which == "signature":
454 signature = self.flip_bit(signature)
455 elif which == "share_hash_chain":
456 nodenum = share_hash_chain.keys()[0]
457 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
458 elif which == "block_hash_tree":
459 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
460 elif which == "share_data":
461 share_data = self.flip_bit(share_data)
462 elif which == "encprivkey":
463 enc_privkey = self.flip_bit(enc_privkey)
465 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
467 final_share = mutable_layout.pack_share(prefix,
474 msf.writev( [(0, final_share)], None)
477 def test_mutable(self):
478 self.basedir = "system/SystemTest/test_mutable"
479 DATA = "initial contents go here." # 25 bytes % 3 != 0
480 NEWDATA = "new contents yay"
481 NEWERDATA = "this is getting old"
483 d = self.set_up_nodes(use_key_generator=True)
485 def _create_mutable(res):
487 log.msg("starting create_mutable_file")
488 d1 = c.create_mutable_file(DATA)
490 log.msg("DONE: %s" % (res,))
491 self._mutable_node_1 = res
493 d1.addCallback(_done)
495 d.addCallback(_create_mutable)
497 def _test_debug(res):
498 # find a share. It is important to run this while there is only
499 # one slot in the grid.
500 shares = self._find_shares(self.basedir)
501 (client_num, storage_index, filename, shnum) = shares[0]
502 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
504 log.msg(" for clients[%d]" % client_num)
506 out,err = StringIO(), StringIO()
507 rc = runner.runner(["debug", "dump-share", "--offsets",
509 stdout=out, stderr=err)
510 output = out.getvalue()
511 self.failUnlessEqual(rc, 0)
513 self.failUnless("Mutable slot found:\n" in output)
514 self.failUnless("share_type: SDMF\n" in output)
515 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
516 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
517 self.failUnless(" num_extra_leases: 0\n" in output)
518 # the pubkey size can vary by a byte, so the container might
519 # be a bit larger on some runs.
520 m = re.search(r'^ container_size: (\d+)$', output, re.M)
522 container_size = int(m.group(1))
523 self.failUnless(2037 <= container_size <= 2049, container_size)
524 m = re.search(r'^ data_length: (\d+)$', output, re.M)
526 data_length = int(m.group(1))
527 self.failUnless(2037 <= data_length <= 2049, data_length)
528 self.failUnless(" secrets are for nodeid: %s\n" % peerid
530 self.failUnless(" SDMF contents:\n" in output)
531 self.failUnless(" seqnum: 1\n" in output)
532 self.failUnless(" required_shares: 3\n" in output)
533 self.failUnless(" total_shares: 10\n" in output)
534 self.failUnless(" segsize: 27\n" in output, (output, filename))
535 self.failUnless(" datalen: 25\n" in output)
536 # the exact share_hash_chain nodes depends upon the sharenum,
537 # and is more of a hassle to compute than I want to deal with
539 self.failUnless(" share_hash_chain: " in output)
540 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
541 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
542 base32.b2a(storage_index))
543 self.failUnless(expected in output)
544 except unittest.FailTest:
546 print "dump-share output was:"
549 d.addCallback(_test_debug)
553 # first, let's see if we can use the existing node to retrieve the
554 # contents. This allows it to use the cached pubkey and maybe the
555 # latest-known sharemap.
557 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
558 def _check_download_1(res):
559 self.failUnlessEqual(res, DATA)
560 # now we see if we can retrieve the data from a new node,
561 # constructed using the URI of the original one. We do this test
562 # on the same client that uploaded the data.
563 uri = self._mutable_node_1.get_uri()
564 log.msg("starting retrieve1")
565 newnode = self.clients[0].create_node_from_uri(uri)
566 newnode_2 = self.clients[0].create_node_from_uri(uri)
567 self.failUnlessIdentical(newnode, newnode_2)
568 return newnode.download_best_version()
569 d.addCallback(_check_download_1)
571 def _check_download_2(res):
572 self.failUnlessEqual(res, DATA)
573 # same thing, but with a different client
574 uri = self._mutable_node_1.get_uri()
575 newnode = self.clients[1].create_node_from_uri(uri)
576 log.msg("starting retrieve2")
577 d1 = newnode.download_best_version()
578 d1.addCallback(lambda res: (res, newnode))
580 d.addCallback(_check_download_2)
582 def _check_download_3((res, newnode)):
583 self.failUnlessEqual(res, DATA)
585 log.msg("starting replace1")
586 d1 = newnode.overwrite(NEWDATA)
587 d1.addCallback(lambda res: newnode.download_best_version())
589 d.addCallback(_check_download_3)
591 def _check_download_4(res):
592 self.failUnlessEqual(res, NEWDATA)
593 # now create an even newer node and replace the data on it. This
594 # new node has never been used for download before.
595 uri = self._mutable_node_1.get_uri()
596 newnode1 = self.clients[2].create_node_from_uri(uri)
597 newnode2 = self.clients[3].create_node_from_uri(uri)
598 self._newnode3 = self.clients[3].create_node_from_uri(uri)
599 log.msg("starting replace2")
600 d1 = newnode1.overwrite(NEWERDATA)
601 d1.addCallback(lambda res: newnode2.download_best_version())
603 d.addCallback(_check_download_4)
605 def _check_download_5(res):
606 log.msg("finished replace2")
607 self.failUnlessEqual(res, NEWERDATA)
608 d.addCallback(_check_download_5)
610 def _corrupt_shares(res):
611 # run around and flip bits in all but k of the shares, to test
613 shares = self._find_shares(self.basedir)
614 ## sort by share number
615 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
616 where = dict([ (shnum, filename)
617 for (client_num, storage_index, filename, shnum)
619 assert len(where) == 10 # this test is designed for 3-of-10
620 for shnum, filename in where.items():
621 # shares 7,8,9 are left alone. read will check
622 # (share_hash_chain, block_hash_tree, share_data). New
623 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
624 # segsize, signature).
626 # read: this will trigger "pubkey doesn't match
628 self._corrupt_mutable_share(filename, "pubkey")
629 self._corrupt_mutable_share(filename, "encprivkey")
631 # triggers "signature is invalid"
632 self._corrupt_mutable_share(filename, "seqnum")
634 # triggers "signature is invalid"
635 self._corrupt_mutable_share(filename, "R")
637 # triggers "signature is invalid"
638 self._corrupt_mutable_share(filename, "segsize")
640 self._corrupt_mutable_share(filename, "share_hash_chain")
642 self._corrupt_mutable_share(filename, "block_hash_tree")
644 self._corrupt_mutable_share(filename, "share_data")
645 # other things to correct: IV, signature
646 # 7,8,9 are left alone
648 # note that initial_query_count=5 means that we'll hit the
649 # first 5 servers in effectively random order (based upon
650 # response time), so we won't necessarily ever get a "pubkey
651 # doesn't match fingerprint" error (if we hit shnum>=1 before
652 # shnum=0, we pull the pubkey from there). To get repeatable
653 # specific failures, we need to set initial_query_count=1,
654 # but of course that will change the sequencing behavior of
655 # the retrieval process. TODO: find a reasonable way to make
656 # this a parameter, probably when we expand this test to test
657 # for one failure mode at a time.
659 # when we retrieve this, we should get three signature
660 # failures (where we've mangled seqnum, R, and segsize). The
662 d.addCallback(_corrupt_shares)
664 d.addCallback(lambda res: self._newnode3.download_best_version())
665 d.addCallback(_check_download_5)
667 def _check_empty_file(res):
668 # make sure we can create empty files, this usually screws up the
670 d1 = self.clients[2].create_mutable_file("")
671 d1.addCallback(lambda newnode: newnode.download_best_version())
672 d1.addCallback(lambda res: self.failUnlessEqual("", res))
674 d.addCallback(_check_empty_file)
676 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
677 def _created_dirnode(dnode):
678 log.msg("_created_dirnode(%s)" % (dnode,))
680 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
681 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
682 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
683 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
684 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
685 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
686 d1.addCallback(lambda res: dnode.build_manifest().when_done())
687 d1.addCallback(lambda res:
688 self.failUnlessEqual(len(res["manifest"]), 1))
690 d.addCallback(_created_dirnode)
692 def wait_for_c3_kg_conn():
693 return self.clients[3]._key_generator is not None
694 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
696 def check_kg_poolsize(junk, size_delta):
697 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
698 self.key_generator_svc.key_generator.pool_size + size_delta)
700 d.addCallback(check_kg_poolsize, 0)
701 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
702 d.addCallback(check_kg_poolsize, -1)
703 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
704 d.addCallback(check_kg_poolsize, -2)
705 # use_helper induces use of clients[3], which is the using-key_gen client
706 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
707 d.addCallback(check_kg_poolsize, -3)
710 # The default 120 second timeout went off when running it under valgrind
711 # on my old Windows laptop, so I'm bumping up the timeout.
712 test_mutable.timeout = 240
714 def flip_bit(self, good):
715 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
717 def mangle_uri(self, gooduri):
718 # change the key, which changes the storage index, which means we'll
719 # be asking about the wrong file, so nobody will have any shares
720 u = IFileURI(gooduri)
721 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
722 uri_extension_hash=u.uri_extension_hash,
723 needed_shares=u.needed_shares,
724 total_shares=u.total_shares,
726 return u2.to_string()
728 # TODO: add a test which mangles the uri_extension_hash instead, and
729 # should fail due to not being able to get a valid uri_extension block.
730 # Also a test which sneakily mangles the uri_extension block to change
731 # some of the validation data, so it will fail in the post-download phase
732 # when the file's crypttext integrity check fails. Do the same thing for
733 # the key, which should cause the download to fail the post-download
734 # plaintext_hash check.
736 def test_vdrive(self):
737 self.basedir = "system/SystemTest/test_vdrive"
738 self.data = LARGE_DATA
739 d = self.set_up_nodes(use_stats_gatherer=True)
740 d.addCallback(self._test_introweb)
741 d.addCallback(self.log, "starting publish")
742 d.addCallback(self._do_publish1)
743 d.addCallback(self._test_runner)
744 d.addCallback(self._do_publish2)
745 # at this point, we have the following filesystem (where "R" denotes
746 # self._root_directory_uri):
749 # R/subdir1/mydata567
751 # R/subdir1/subdir2/mydata992
753 d.addCallback(lambda res: self.bounce_client(0))
754 d.addCallback(self.log, "bounced client0")
756 d.addCallback(self._check_publish1)
757 d.addCallback(self.log, "did _check_publish1")
758 d.addCallback(self._check_publish2)
759 d.addCallback(self.log, "did _check_publish2")
760 d.addCallback(self._do_publish_private)
761 d.addCallback(self.log, "did _do_publish_private")
762 # now we also have (where "P" denotes a new dir):
763 # P/personal/sekrit data
764 # P/s2-rw -> /subdir1/subdir2/
765 # P/s2-ro -> /subdir1/subdir2/ (read-only)
766 d.addCallback(self._check_publish_private)
767 d.addCallback(self.log, "did _check_publish_private")
768 d.addCallback(self._test_web)
769 d.addCallback(self._test_control)
770 d.addCallback(self._test_cli)
771 # P now has four top-level children:
772 # P/personal/sekrit data
775 # P/test_put/ (empty)
776 d.addCallback(self._test_checker)
777 d.addCallback(self._grab_stats)
779 test_vdrive.timeout = 1100
781 def _test_introweb(self, res):
782 d = getPage(self.introweb_url, method="GET", followRedirect=True)
785 self.failUnless("allmydata: %s" % str(allmydata.__version__)
787 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
788 self.failUnless("Subscription Summary: storage: 5" in res)
789 except unittest.FailTest:
791 print "GET %s output was:" % self.introweb_url
794 d.addCallback(_check)
795 d.addCallback(lambda res:
796 getPage(self.introweb_url + "?t=json",
797 method="GET", followRedirect=True))
798 def _check_json(res):
799 data = simplejson.loads(res)
801 self.failUnlessEqual(data["subscription_summary"],
803 self.failUnlessEqual(data["announcement_summary"],
804 {"storage": 5, "stub_client": 5})
805 self.failUnlessEqual(data["announcement_distinct_hosts"],
806 {"storage": 1, "stub_client": 1})
807 except unittest.FailTest:
809 print "GET %s?t=json output was:" % self.introweb_url
812 d.addCallback(_check_json)
815 def _do_publish1(self, res):
816 ut = upload.Data(self.data, convergence=None)
818 d = c0.create_empty_dirnode()
819 def _made_root(new_dirnode):
820 self._root_directory_uri = new_dirnode.get_uri()
821 return c0.create_node_from_uri(self._root_directory_uri)
822 d.addCallback(_made_root)
823 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
824 def _made_subdir1(subdir1_node):
825 self._subdir1_node = subdir1_node
826 d1 = subdir1_node.add_file(u"mydata567", ut)
827 d1.addCallback(self.log, "publish finished")
828 def _stash_uri(filenode):
829 self.uri = filenode.get_uri()
830 d1.addCallback(_stash_uri)
832 d.addCallback(_made_subdir1)
835 def _do_publish2(self, res):
836 ut = upload.Data(self.data, convergence=None)
837 d = self._subdir1_node.create_empty_directory(u"subdir2")
838 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
841 def log(self, res, *args, **kwargs):
842 # print "MSG: %s RES: %s" % (msg, args)
843 log.msg(*args, **kwargs)
846 def _do_publish_private(self, res):
847 self.smalldata = "sssh, very secret stuff"
848 ut = upload.Data(self.smalldata, convergence=None)
849 d = self.clients[0].create_empty_dirnode()
850 d.addCallback(self.log, "GOT private directory")
851 def _got_new_dir(privnode):
852 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
853 d1 = privnode.create_empty_directory(u"personal")
854 d1.addCallback(self.log, "made P/personal")
855 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
856 d1.addCallback(self.log, "made P/personal/sekrit data")
857 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
859 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
860 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
862 d1.addCallback(_got_s2)
863 d1.addCallback(lambda res: privnode)
865 d.addCallback(_got_new_dir)
868 def _check_publish1(self, res):
869 # this one uses the iterative API
871 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
872 d.addCallback(self.log, "check_publish1 got /")
873 d.addCallback(lambda root: root.get(u"subdir1"))
874 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
875 d.addCallback(lambda filenode: filenode.download_to_data())
876 d.addCallback(self.log, "get finished")
878 self.failUnlessEqual(data, self.data)
879 d.addCallback(_get_done)
882 def _check_publish2(self, res):
883 # this one uses the path-based API
884 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
885 d = rootnode.get_child_at_path(u"subdir1")
886 d.addCallback(lambda dirnode:
887 self.failUnless(IDirectoryNode.providedBy(dirnode)))
888 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
889 d.addCallback(lambda filenode: filenode.download_to_data())
890 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
892 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
893 def _got_filenode(filenode):
894 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
895 assert fnode == filenode
896 d.addCallback(_got_filenode)
899 def _check_publish_private(self, resnode):
900 # this one uses the path-based API
901 self._private_node = resnode
903 d = self._private_node.get_child_at_path(u"personal")
904 def _got_personal(personal):
905 self._personal_node = personal
907 d.addCallback(_got_personal)
909 d.addCallback(lambda dirnode:
910 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
912 return self._private_node.get_child_at_path(path)
914 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
915 d.addCallback(lambda filenode: filenode.download_to_data())
916 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
917 d.addCallback(lambda res: get_path(u"s2-rw"))
918 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
919 d.addCallback(lambda res: get_path(u"s2-ro"))
920 def _got_s2ro(dirnode):
921 self.failUnless(dirnode.is_mutable(), dirnode)
922 self.failUnless(dirnode.is_readonly(), dirnode)
923 d1 = defer.succeed(None)
924 d1.addCallback(lambda res: dirnode.list())
925 d1.addCallback(self.log, "dirnode.list")
927 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
929 d1.addCallback(self.log, "doing add_file(ro)")
930 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
931 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
933 d1.addCallback(self.log, "doing get(ro)")
934 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
935 d1.addCallback(lambda filenode:
936 self.failUnless(IFileNode.providedBy(filenode)))
938 d1.addCallback(self.log, "doing delete(ro)")
939 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
941 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
943 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
945 personal = self._personal_node
946 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
948 d1.addCallback(self.log, "doing move_child_to(ro)2")
949 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
951 d1.addCallback(self.log, "finished with _got_s2ro")
953 d.addCallback(_got_s2ro)
954 def _got_home(dummy):
955 home = self._private_node
956 personal = self._personal_node
957 d1 = defer.succeed(None)
958 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
959 d1.addCallback(lambda res:
960 personal.move_child_to(u"sekrit data",home,u"sekrit"))
962 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
963 d1.addCallback(lambda res:
964 home.move_child_to(u"sekrit", home, u"sekrit data"))
966 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
967 d1.addCallback(lambda res:
968 home.move_child_to(u"sekrit data", personal))
970 d1.addCallback(lambda res: home.build_manifest().when_done())
971 d1.addCallback(self.log, "manifest")
975 # P/personal/sekrit data
976 # P/s2-rw (same as P/s2-ro)
977 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
978 d1.addCallback(lambda res:
979 self.failUnlessEqual(len(res["manifest"]), 5))
980 d1.addCallback(lambda res: home.start_deep_stats().when_done())
981 def _check_stats(stats):
982 expected = {"count-immutable-files": 1,
983 "count-mutable-files": 0,
984 "count-literal-files": 1,
986 "count-directories": 3,
987 "size-immutable-files": 112,
988 "size-literal-files": 23,
989 #"size-directories": 616, # varies
990 #"largest-directory": 616,
991 "largest-directory-children": 3,
992 "largest-immutable-file": 112,
994 for k,v in expected.iteritems():
995 self.failUnlessEqual(stats[k], v,
996 "stats[%s] was %s, not %s" %
998 self.failUnless(stats["size-directories"] > 1300,
999 stats["size-directories"])
1000 self.failUnless(stats["largest-directory"] > 800,
1001 stats["largest-directory"])
1002 self.failUnlessEqual(stats["size-files-histogram"],
1003 [ (11, 31, 1), (101, 316, 1) ])
1004 d1.addCallback(_check_stats)
1006 d.addCallback(_got_home)
1009 def shouldFail(self, res, expected_failure, which, substring=None):
1010 if isinstance(res, Failure):
1011 res.trap(expected_failure)
1013 self.failUnless(substring in str(res),
1014 "substring '%s' not in '%s'"
1015 % (substring, str(res)))
1017 self.fail("%s was supposed to raise %s, not get '%s'" %
1018 (which, expected_failure, res))
1020 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1021 assert substring is None or isinstance(substring, str)
1022 d = defer.maybeDeferred(callable, *args, **kwargs)
1024 if isinstance(res, Failure):
1025 res.trap(expected_failure)
1027 self.failUnless(substring in str(res),
1028 "substring '%s' not in '%s'"
1029 % (substring, str(res)))
1031 self.fail("%s was supposed to raise %s, not get '%s'" %
1032 (which, expected_failure, res))
1036 def PUT(self, urlpath, data):
1037 url = self.webish_url + urlpath
1038 return getPage(url, method="PUT", postdata=data)
1040 def GET(self, urlpath, followRedirect=False):
1041 url = self.webish_url + urlpath
1042 return getPage(url, method="GET", followRedirect=followRedirect)
1044 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1046 url = self.helper_webish_url + urlpath
1048 url = self.webish_url + urlpath
1049 sepbase = "boogabooga"
1050 sep = "--" + sepbase
1053 form.append('Content-Disposition: form-data; name="_charset"')
1055 form.append('UTF-8')
1057 for name, value in fields.iteritems():
1058 if isinstance(value, tuple):
1059 filename, value = value
1060 form.append('Content-Disposition: form-data; name="%s"; '
1061 'filename="%s"' % (name, filename.encode("utf-8")))
1063 form.append('Content-Disposition: form-data; name="%s"' % name)
1065 form.append(str(value))
1068 body = "\r\n".join(form) + "\r\n"
1069 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1071 return getPage(url, method="POST", postdata=body,
1072 headers=headers, followRedirect=followRedirect)
1074 def _test_web(self, res):
1075 base = self.webish_url
1076 public = "uri/" + self._root_directory_uri
1078 def _got_welcome(page):
1079 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1080 self.failUnless(expected in page,
1081 "I didn't see the right 'connected storage servers'"
1082 " message in: %s" % page
1084 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1085 self.failUnless(expected in page,
1086 "I didn't see the right 'My nodeid' message "
1088 self.failUnless("Helper: 0 active uploads" in page)
1089 d.addCallback(_got_welcome)
1090 d.addCallback(self.log, "done with _got_welcome")
1092 # get the welcome page from the node that uses the helper too
1093 d.addCallback(lambda res: getPage(self.helper_webish_url))
1094 def _got_welcome_helper(page):
1095 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1097 self.failUnless("Not running helper" in page)
1098 d.addCallback(_got_welcome_helper)
1100 d.addCallback(lambda res: getPage(base + public))
1101 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1102 def _got_subdir1(page):
1103 # there ought to be an href for our file
1104 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1105 self.failUnless(">mydata567</a>" in page)
1106 d.addCallback(_got_subdir1)
1107 d.addCallback(self.log, "done with _got_subdir1")
1108 d.addCallback(lambda res:
1109 getPage(base + public + "/subdir1/mydata567"))
1110 def _got_data(page):
1111 self.failUnlessEqual(page, self.data)
1112 d.addCallback(_got_data)
1114 # download from a URI embedded in a URL
1115 d.addCallback(self.log, "_get_from_uri")
1116 def _get_from_uri(res):
1117 return getPage(base + "uri/%s?filename=%s"
1118 % (self.uri, "mydata567"))
1119 d.addCallback(_get_from_uri)
1120 def _got_from_uri(page):
1121 self.failUnlessEqual(page, self.data)
1122 d.addCallback(_got_from_uri)
1124 # download from a URI embedded in a URL, second form
1125 d.addCallback(self.log, "_get_from_uri2")
1126 def _get_from_uri2(res):
1127 return getPage(base + "uri?uri=%s" % (self.uri,))
1128 d.addCallback(_get_from_uri2)
1129 d.addCallback(_got_from_uri)
1131 # download from a bogus URI, make sure we get a reasonable error
1132 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1133 def _get_from_bogus_uri(res):
1134 d1 = getPage(base + "uri/%s?filename=%s"
1135 % (self.mangle_uri(self.uri), "mydata567"))
1136 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1139 d.addCallback(_get_from_bogus_uri)
1140 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1142 # upload a file with PUT
1143 d.addCallback(self.log, "about to try PUT")
1144 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1145 "new.txt contents"))
1146 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1147 d.addCallback(self.failUnlessEqual, "new.txt contents")
1148 # and again with something large enough to use multiple segments,
1149 # and hopefully trigger pauseProducing too
1150 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1151 "big" * 500000)) # 1.5MB
1152 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1153 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1155 # can we replace files in place?
1156 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1158 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1159 d.addCallback(self.failUnlessEqual, "NEWER contents")
1161 # test unlinked POST
1162 d.addCallback(lambda res: self.POST("uri", t="upload",
1163 file=("new.txt", "data" * 10000)))
1164 # and again using the helper, which exercises different upload-status
1166 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1167 file=("foo.txt", "data2" * 10000)))
1169 # check that the status page exists
1170 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1171 def _got_status(res):
1172 # find an interesting upload and download to look at. LIT files
1173 # are not interesting.
1174 for ds in self.clients[0].list_all_download_statuses():
1175 if ds.get_size() > 200:
1176 self._down_status = ds.get_counter()
1177 for us in self.clients[0].list_all_upload_statuses():
1178 if us.get_size() > 200:
1179 self._up_status = us.get_counter()
1180 rs = self.clients[0].list_all_retrieve_statuses()[0]
1181 self._retrieve_status = rs.get_counter()
1182 ps = self.clients[0].list_all_publish_statuses()[0]
1183 self._publish_status = ps.get_counter()
1184 us = self.clients[0].list_all_mapupdate_statuses()[0]
1185 self._update_status = us.get_counter()
1187 # and that there are some upload- and download- status pages
1188 return self.GET("status/up-%d" % self._up_status)
1189 d.addCallback(_got_status)
1191 return self.GET("status/down-%d" % self._down_status)
1192 d.addCallback(_got_up)
1194 return self.GET("status/mapupdate-%d" % self._update_status)
1195 d.addCallback(_got_down)
1196 def _got_update(res):
1197 return self.GET("status/publish-%d" % self._publish_status)
1198 d.addCallback(_got_update)
1199 def _got_publish(res):
1200 return self.GET("status/retrieve-%d" % self._retrieve_status)
1201 d.addCallback(_got_publish)
1203 # check that the helper status page exists
1204 d.addCallback(lambda res:
1205 self.GET("helper_status", followRedirect=True))
1206 def _got_helper_status(res):
1207 self.failUnless("Bytes Fetched:" in res)
1208 # touch a couple of files in the helper's working directory to
1209 # exercise more code paths
1210 workdir = os.path.join(self.getdir("client0"), "helper")
1211 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1212 f = open(incfile, "wb")
1213 f.write("small file")
1215 then = time.time() - 86400*3
1217 os.utime(incfile, (now, then))
1218 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1219 f = open(encfile, "wb")
1220 f.write("less small file")
1222 os.utime(encfile, (now, then))
1223 d.addCallback(_got_helper_status)
1224 # and that the json form exists
1225 d.addCallback(lambda res:
1226 self.GET("helper_status?t=json", followRedirect=True))
1227 def _got_helper_status_json(res):
1228 data = simplejson.loads(res)
1229 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1231 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1232 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1233 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1235 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1236 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1237 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1239 d.addCallback(_got_helper_status_json)
1241 # and check that client[3] (which uses a helper but does not run one
1242 # itself) doesn't explode when you ask for its status
1243 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1244 def _got_non_helper_status(res):
1245 self.failUnless("Upload and Download Status" in res)
1246 d.addCallback(_got_non_helper_status)
1248 # or for helper status with t=json
1249 d.addCallback(lambda res:
1250 getPage(self.helper_webish_url + "helper_status?t=json"))
1251 def _got_non_helper_status_json(res):
1252 data = simplejson.loads(res)
1253 self.failUnlessEqual(data, {})
1254 d.addCallback(_got_non_helper_status_json)
1256 # see if the statistics page exists
1257 d.addCallback(lambda res: self.GET("statistics"))
1258 def _got_stats(res):
1259 self.failUnless("Node Statistics" in res)
1260 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1261 d.addCallback(_got_stats)
1262 d.addCallback(lambda res: self.GET("statistics?t=json"))
1263 def _got_stats_json(res):
1264 data = simplejson.loads(res)
1265 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1266 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1267 d.addCallback(_got_stats_json)
1269 # TODO: mangle the second segment of a file, to test errors that
1270 # occur after we've already sent some good data, which uses a
1271 # different error path.
1273 # TODO: download a URI with a form
1274 # TODO: create a directory by using a form
1275 # TODO: upload by using a form on the directory page
1276 # url = base + "somedir/subdir1/freeform_post!!upload"
1277 # TODO: delete a file by using a button on the directory page
1281 def _test_runner(self, res):
1282 # exercise some of the diagnostic tools in runner.py
1285 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1286 if "storage" not in dirpath:
1290 pieces = dirpath.split(os.sep)
1291 if pieces[-4] == "storage" and pieces[-3] == "shares":
1292 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1293 # are sharefiles here
1294 filename = os.path.join(dirpath, filenames[0])
1295 # peek at the magic to see if it is a chk share
1296 magic = open(filename, "rb").read(4)
1297 if magic == '\x00\x00\x00\x01':
1300 self.fail("unable to find any uri_extension files in %s"
1302 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1304 out,err = StringIO(), StringIO()
1305 rc = runner.runner(["debug", "dump-share", "--offsets",
1307 stdout=out, stderr=err)
1308 output = out.getvalue()
1309 self.failUnlessEqual(rc, 0)
1311 # we only upload a single file, so we can assert some things about
1312 # its size and shares.
1313 self.failUnless(("share filename: %s" % filename) in output)
1314 self.failUnless("size: %d\n" % len(self.data) in output)
1315 self.failUnless("num_segments: 1\n" in output)
1316 # segment_size is always a multiple of needed_shares
1317 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1318 self.failUnless("total_shares: 10\n" in output)
1319 # keys which are supposed to be present
1320 for key in ("size", "num_segments", "segment_size",
1321 "needed_shares", "total_shares",
1322 "codec_name", "codec_params", "tail_codec_params",
1323 #"plaintext_hash", "plaintext_root_hash",
1324 "crypttext_hash", "crypttext_root_hash",
1325 "share_root_hash", "UEB_hash"):
1326 self.failUnless("%s: " % key in output, key)
1327 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1329 # now use its storage index to find the other shares using the
1330 # 'find-shares' tool
1331 sharedir, shnum = os.path.split(filename)
1332 storagedir, storage_index_s = os.path.split(sharedir)
1333 out,err = StringIO(), StringIO()
1334 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1335 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1336 rc = runner.runner(cmd, stdout=out, stderr=err)
1337 self.failUnlessEqual(rc, 0)
1339 sharefiles = [sfn.strip() for sfn in out.readlines()]
1340 self.failUnlessEqual(len(sharefiles), 10)
1342 # also exercise the 'catalog-shares' tool
1343 out,err = StringIO(), StringIO()
1344 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1345 cmd = ["debug", "catalog-shares"] + nodedirs
1346 rc = runner.runner(cmd, stdout=out, stderr=err)
1347 self.failUnlessEqual(rc, 0)
1349 descriptions = [sfn.strip() for sfn in out.readlines()]
1350 self.failUnlessEqual(len(descriptions), 30)
1352 for line in descriptions
1353 if line.startswith("CHK %s " % storage_index_s)]
1354 self.failUnlessEqual(len(matching), 10)
1356 def _test_control(self, res):
1357 # exercise the remote-control-the-client foolscap interfaces in
1358 # allmydata.control (mostly used for performance tests)
1359 c0 = self.clients[0]
1360 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1361 control_furl = open(control_furl_file, "r").read().strip()
1362 # it doesn't really matter which Tub we use to connect to the client,
1363 # so let's just use our IntroducerNode's
1364 d = self.introducer.tub.getReference(control_furl)
1365 d.addCallback(self._test_control2, control_furl_file)
1367 def _test_control2(self, rref, filename):
1368 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1369 downfile = os.path.join(self.basedir, "control.downfile")
1370 d.addCallback(lambda uri:
1371 rref.callRemote("download_from_uri_to_file",
1374 self.failUnlessEqual(res, downfile)
1375 data = open(downfile, "r").read()
1376 expected_data = open(filename, "r").read()
1377 self.failUnlessEqual(data, expected_data)
1378 d.addCallback(_check)
1379 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1380 if sys.platform == "linux2":
1381 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1382 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1385 def _test_cli(self, res):
1386 # run various CLI commands (in a thread, since they use blocking
1389 private_uri = self._private_node.get_uri()
1390 some_uri = self._root_directory_uri
1391 client0_basedir = self.getdir("client0")
1394 "--node-directory", client0_basedir,
1396 TESTDATA = "I will not write the same thing over and over.\n" * 100
1398 d = defer.succeed(None)
1400 # for compatibility with earlier versions, private/root_dir.cap is
1401 # supposed to be treated as an alias named "tahoe:". Start by making
1402 # sure that works, before we add other aliases.
1404 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1405 f = open(root_file, "w")
1406 f.write(private_uri)
1409 def run(ignored, verb, *args, **kwargs):
1410 stdin = kwargs.get("stdin", "")
1411 newargs = [verb] + nodeargs + list(args)
1412 return self._run_cli(newargs, stdin=stdin)
1414 def _check_ls((out,err), expected_children, unexpected_children=[]):
1415 self.failUnlessEqual(err, "")
1416 for s in expected_children:
1417 self.failUnless(s in out, (s,out))
1418 for s in unexpected_children:
1419 self.failIf(s in out, (s,out))
1421 def _check_ls_root((out,err)):
1422 self.failUnless("personal" in out)
1423 self.failUnless("s2-ro" in out)
1424 self.failUnless("s2-rw" in out)
1425 self.failUnlessEqual(err, "")
1427 # this should reference private_uri
1428 d.addCallback(run, "ls")
1429 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1431 d.addCallback(run, "list-aliases")
1432 def _check_aliases_1((out,err)):
1433 self.failUnlessEqual(err, "")
1434 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1435 d.addCallback(_check_aliases_1)
1437 # now that that's out of the way, remove root_dir.cap and work with
1439 d.addCallback(lambda res: os.unlink(root_file))
1440 d.addCallback(run, "list-aliases")
1441 def _check_aliases_2((out,err)):
1442 self.failUnlessEqual(err, "")
1443 self.failUnlessEqual(out, "")
1444 d.addCallback(_check_aliases_2)
1446 d.addCallback(run, "mkdir")
1447 def _got_dir( (out,err) ):
1448 self.failUnless(uri.from_string_dirnode(out.strip()))
1450 d.addCallback(_got_dir)
1451 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1453 d.addCallback(run, "list-aliases")
1454 def _check_aliases_3((out,err)):
1455 self.failUnlessEqual(err, "")
1456 self.failUnless("tahoe: " in out)
1457 d.addCallback(_check_aliases_3)
1459 def _check_empty_dir((out,err)):
1460 self.failUnlessEqual(out, "")
1461 self.failUnlessEqual(err, "")
1462 d.addCallback(run, "ls")
1463 d.addCallback(_check_empty_dir)
1465 def _check_missing_dir((out,err)):
1466 # TODO: check that rc==2
1467 self.failUnlessEqual(out, "")
1468 self.failUnlessEqual(err, "No such file or directory\n")
1469 d.addCallback(run, "ls", "bogus")
1470 d.addCallback(_check_missing_dir)
1475 fn = os.path.join(self.basedir, "file%d" % i)
1477 data = "data to be uploaded: file%d\n" % i
1479 open(fn,"wb").write(data)
1481 def _check_stdout_against((out,err), filenum=None, data=None):
1482 self.failUnlessEqual(err, "")
1483 if filenum is not None:
1484 self.failUnlessEqual(out, datas[filenum])
1485 if data is not None:
1486 self.failUnlessEqual(out, data)
1488 # test all both forms of put: from a file, and from stdin
1490 d.addCallback(run, "put", files[0], "tahoe-file0")
1491 def _put_out((out,err)):
1492 self.failUnless("URI:LIT:" in out, out)
1493 self.failUnless("201 Created" in err, err)
1495 return run(None, "get", uri0)
1496 d.addCallback(_put_out)
1497 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1499 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1500 # tahoe put bar tahoe:FOO
1501 d.addCallback(run, "put", files[2], "tahoe:file2")
1502 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1503 def _check_put_mutable((out,err)):
1504 self._mutable_file3_uri = out.strip()
1505 d.addCallback(_check_put_mutable)
1506 d.addCallback(run, "get", "tahoe:file3")
1507 d.addCallback(_check_stdout_against, 3)
1510 STDIN_DATA = "This is the file to upload from stdin."
1511 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1512 # tahoe put tahoe:FOO
1513 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1514 stdin="Other file from stdin.")
1516 d.addCallback(run, "ls")
1517 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1518 "tahoe-file-stdin", "from-stdin"])
1519 d.addCallback(run, "ls", "subdir")
1520 d.addCallback(_check_ls, ["tahoe-file1"])
1523 d.addCallback(run, "mkdir", "subdir2")
1524 d.addCallback(run, "ls")
1525 # TODO: extract the URI, set an alias with it
1526 d.addCallback(_check_ls, ["subdir2"])
1528 # tahoe get: (to stdin and to a file)
1529 d.addCallback(run, "get", "tahoe-file0")
1530 d.addCallback(_check_stdout_against, 0)
1531 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1532 d.addCallback(_check_stdout_against, 1)
1533 outfile0 = os.path.join(self.basedir, "outfile0")
1534 d.addCallback(run, "get", "file2", outfile0)
1535 def _check_outfile0((out,err)):
1536 data = open(outfile0,"rb").read()
1537 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1538 d.addCallback(_check_outfile0)
1539 outfile1 = os.path.join(self.basedir, "outfile0")
1540 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1541 def _check_outfile1((out,err)):
1542 data = open(outfile1,"rb").read()
1543 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1544 d.addCallback(_check_outfile1)
1546 d.addCallback(run, "rm", "tahoe-file0")
1547 d.addCallback(run, "rm", "tahoe:file2")
1548 d.addCallback(run, "ls")
1549 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1551 d.addCallback(run, "ls", "-l")
1552 def _check_ls_l((out,err)):
1553 lines = out.split("\n")
1555 if "tahoe-file-stdin" in l:
1556 self.failUnless(l.startswith("-r-- "), l)
1557 self.failUnless(" %d " % len(STDIN_DATA) in l)
1559 self.failUnless(l.startswith("-rw- "), l) # mutable
1560 d.addCallback(_check_ls_l)
1562 d.addCallback(run, "ls", "--uri")
1563 def _check_ls_uri((out,err)):
1564 lines = out.split("\n")
1567 self.failUnless(self._mutable_file3_uri in l)
1568 d.addCallback(_check_ls_uri)
1570 d.addCallback(run, "ls", "--readonly-uri")
1571 def _check_ls_rouri((out,err)):
1572 lines = out.split("\n")
1575 rw_uri = self._mutable_file3_uri
1576 u = uri.from_string_mutable_filenode(rw_uri)
1577 ro_uri = u.get_readonly().to_string()
1578 self.failUnless(ro_uri in l)
1579 d.addCallback(_check_ls_rouri)
1582 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1583 d.addCallback(run, "ls")
1584 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1586 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1587 d.addCallback(run, "ls")
1588 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1590 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1591 d.addCallback(run, "ls")
1592 d.addCallback(_check_ls, ["file3", "file3-copy"])
1593 d.addCallback(run, "get", "tahoe:file3-copy")
1594 d.addCallback(_check_stdout_against, 3)
1596 # copy from disk into tahoe
1597 d.addCallback(run, "cp", files[4], "tahoe:file4")
1598 d.addCallback(run, "ls")
1599 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1600 d.addCallback(run, "get", "tahoe:file4")
1601 d.addCallback(_check_stdout_against, 4)
1603 # copy from tahoe into disk
1604 target_filename = os.path.join(self.basedir, "file-out")
1605 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1606 def _check_cp_out((out,err)):
1607 self.failUnless(os.path.exists(target_filename))
1608 got = open(target_filename,"rb").read()
1609 self.failUnlessEqual(got, datas[4])
1610 d.addCallback(_check_cp_out)
1612 # copy from disk to disk (silly case)
1613 target2_filename = os.path.join(self.basedir, "file-out-copy")
1614 d.addCallback(run, "cp", target_filename, target2_filename)
1615 def _check_cp_out2((out,err)):
1616 self.failUnless(os.path.exists(target2_filename))
1617 got = open(target2_filename,"rb").read()
1618 self.failUnlessEqual(got, datas[4])
1619 d.addCallback(_check_cp_out2)
1621 # copy from tahoe into disk, overwriting an existing file
1622 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1623 def _check_cp_out3((out,err)):
1624 self.failUnless(os.path.exists(target_filename))
1625 got = open(target_filename,"rb").read()
1626 self.failUnlessEqual(got, datas[3])
1627 d.addCallback(_check_cp_out3)
1629 # copy from disk into tahoe, overwriting an existing immutable file
1630 d.addCallback(run, "cp", files[5], "tahoe:file4")
1631 d.addCallback(run, "ls")
1632 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1633 d.addCallback(run, "get", "tahoe:file4")
1634 d.addCallback(_check_stdout_against, 5)
1636 # copy from disk into tahoe, overwriting an existing mutable file
1637 d.addCallback(run, "cp", files[5], "tahoe:file3")
1638 d.addCallback(run, "ls")
1639 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1640 d.addCallback(run, "get", "tahoe:file3")
1641 d.addCallback(_check_stdout_against, 5)
1643 # recursive copy: setup
1644 dn = os.path.join(self.basedir, "dir1")
1646 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1647 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1648 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1649 sdn2 = os.path.join(dn, "subdir2")
1651 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1652 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1654 # from disk into tahoe
1655 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1656 d.addCallback(run, "ls")
1657 d.addCallback(_check_ls, ["dir1"])
1658 d.addCallback(run, "ls", "dir1")
1659 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1660 ["rfile4", "rfile5"])
1661 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1662 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1663 ["rfile1", "rfile2", "rfile3"])
1664 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1665 d.addCallback(_check_stdout_against, data="rfile4")
1667 # and back out again
1668 dn_copy = os.path.join(self.basedir, "dir1-copy")
1669 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1670 def _check_cp_r_out((out,err)):
1672 old = open(os.path.join(dn, name), "rb").read()
1673 newfn = os.path.join(dn_copy, name)
1674 self.failUnless(os.path.exists(newfn))
1675 new = open(newfn, "rb").read()
1676 self.failUnlessEqual(old, new)
1680 _cmp(os.path.join("subdir2", "rfile4"))
1681 _cmp(os.path.join("subdir2", "rfile5"))
1682 d.addCallback(_check_cp_r_out)
1684 # and copy it a second time, which ought to overwrite the same files
1685 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1687 # and tahoe-to-tahoe
1688 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1689 d.addCallback(run, "ls")
1690 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1691 d.addCallback(run, "ls", "dir1-copy")
1692 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1693 ["rfile4", "rfile5"])
1694 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1695 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1696 ["rfile1", "rfile2", "rfile3"])
1697 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1698 d.addCallback(_check_stdout_against, data="rfile4")
1700 # and copy it a second time, which ought to overwrite the same files
1701 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1703 # tahoe_ls doesn't currently handle the error correctly: it tries to
1704 # JSON-parse a traceback.
1705 ## def _ls_missing(res):
1706 ## argv = ["ls"] + nodeargs + ["bogus"]
1707 ## return self._run_cli(argv)
1708 ## d.addCallback(_ls_missing)
1709 ## def _check_ls_missing((out,err)):
1712 ## self.failUnlessEqual(err, "")
1713 ## d.addCallback(_check_ls_missing)
1717 def _run_cli(self, argv, stdin=""):
1719 stdout, stderr = StringIO(), StringIO()
1720 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1721 stdin=StringIO(stdin),
1722 stdout=stdout, stderr=stderr)
1724 return stdout.getvalue(), stderr.getvalue()
1725 d.addCallback(_done)
1728 def _test_checker(self, res):
1729 ut = upload.Data("too big to be literal" * 200, convergence=None)
1730 d = self._personal_node.add_file(u"big file", ut)
1732 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1733 def _check_dirnode_results(r):
1734 self.failUnless(r.is_healthy())
1735 d.addCallback(_check_dirnode_results)
1736 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1737 d.addCallback(_check_dirnode_results)
1739 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1740 def _got_chk_filenode(n):
1741 self.failUnless(isinstance(n, filenode.FileNode))
1742 d = n.check(Monitor())
1743 def _check_filenode_results(r):
1744 self.failUnless(r.is_healthy())
1745 d.addCallback(_check_filenode_results)
1746 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1747 d.addCallback(_check_filenode_results)
1749 d.addCallback(_got_chk_filenode)
1751 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1752 def _got_lit_filenode(n):
1753 self.failUnless(isinstance(n, filenode.LiteralFileNode))
1754 d = n.check(Monitor())
1755 def _check_lit_filenode_results(r):
1756 self.failUnlessEqual(r, None)
1757 d.addCallback(_check_lit_filenode_results)
1758 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1759 d.addCallback(_check_lit_filenode_results)
1761 d.addCallback(_got_lit_filenode)
1765 class MutableChecker(SystemTestMixin, unittest.TestCase, ErrorMixin):
1767 def _run_cli(self, argv):
1768 stdout, stderr = StringIO(), StringIO()
1769 # this can only do synchronous operations
1770 assert argv[0] == "debug"
1771 runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1772 return stdout.getvalue()
1774 def test_good(self):
1775 self.basedir = self.mktemp()
1776 d = self.set_up_nodes()
1777 CONTENTS = "a little bit of data"
1778 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1781 si = self.node.get_storage_index()
1782 d.addCallback(_created)
1783 # now make sure the webapi verifier sees no problems
1785 url = (self.webish_url +
1786 "uri/%s" % urllib.quote(self.node.get_uri()) +
1787 "?t=check&verify=true")
1788 return getPage(url, method="POST")
1789 d.addCallback(_do_check)
1790 def _got_results(out):
1791 self.failUnless("<span>Healthy : Healthy</span>" in out, out)
1792 self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1793 self.failIf("Not Healthy!" in out, out)
1794 self.failIf("Unhealthy" in out, out)
1795 self.failIf("Corrupt Shares" in out, out)
1796 d.addCallback(_got_results)
1797 d.addErrback(self.explain_web_error)
1800 def test_corrupt(self):
1801 self.basedir = self.mktemp()
1802 d = self.set_up_nodes()
1803 CONTENTS = "a little bit of data"
1804 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1807 si = self.node.get_storage_index()
1808 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1809 self.clients[1].basedir])
1810 files = out.split("\n")
1811 # corrupt one of them, using the CLI debug command
1813 shnum = os.path.basename(f)
1814 nodeid = self.clients[1].nodeid
1815 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1816 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1817 out = self._run_cli(["debug", "corrupt-share", files[0]])
1818 d.addCallback(_created)
1819 # now make sure the webapi verifier notices it
1821 url = (self.webish_url +
1822 "uri/%s" % urllib.quote(self.node.get_uri()) +
1823 "?t=check&verify=true")
1824 return getPage(url, method="POST")
1825 d.addCallback(_do_check)
1826 def _got_results(out):
1827 self.failUnless("Not Healthy!" in out, out)
1828 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1829 self.failUnless("Corrupt Shares:" in out, out)
1830 d.addCallback(_got_results)
1832 # now make sure the webapi repairer can fix it
1833 def _do_repair(res):
1834 url = (self.webish_url +
1835 "uri/%s" % urllib.quote(self.node.get_uri()) +
1836 "?t=check&verify=true&repair=true")
1837 return getPage(url, method="POST")
1838 d.addCallback(_do_repair)
1839 def _got_repair_results(out):
1840 self.failUnless("<div>Repair successful</div>" in out, out)
1841 d.addCallback(_got_repair_results)
1842 d.addCallback(_do_check)
1843 def _got_postrepair_results(out):
1844 self.failIf("Not Healthy!" in out, out)
1845 self.failUnless("Recoverable Versions: 10*seq" in out, out)
1846 d.addCallback(_got_postrepair_results)
1847 d.addErrback(self.explain_web_error)
1851 def test_delete_share(self):
1852 self.basedir = self.mktemp()
1853 d = self.set_up_nodes()
1854 CONTENTS = "a little bit of data"
1855 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1858 si = self.node.get_storage_index()
1859 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1860 self.clients[1].basedir])
1861 files = out.split("\n")
1862 # corrupt one of them, using the CLI debug command
1864 shnum = os.path.basename(f)
1865 nodeid = self.clients[1].nodeid
1866 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1867 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1869 d.addCallback(_created)
1870 # now make sure the webapi checker notices it
1872 url = (self.webish_url +
1873 "uri/%s" % urllib.quote(self.node.get_uri()) +
1874 "?t=check&verify=false")
1875 return getPage(url, method="POST")
1876 d.addCallback(_do_check)
1877 def _got_results(out):
1878 self.failUnless("Not Healthy!" in out, out)
1879 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1880 self.failIf("Corrupt Shares" in out, out)
1881 d.addCallback(_got_results)
1883 # now make sure the webapi repairer can fix it
1884 def _do_repair(res):
1885 url = (self.webish_url +
1886 "uri/%s" % urllib.quote(self.node.get_uri()) +
1887 "?t=check&verify=false&repair=true")
1888 return getPage(url, method="POST")
1889 d.addCallback(_do_repair)
1890 def _got_repair_results(out):
1891 self.failUnless("Repair successful" in out)
1892 d.addCallback(_got_repair_results)
1893 d.addCallback(_do_check)
1894 def _got_postrepair_results(out):
1895 self.failIf("Not Healthy!" in out, out)
1896 self.failUnless("Recoverable Versions: 10*seq" in out)
1897 d.addCallback(_got_postrepair_results)
1898 d.addErrback(self.explain_web_error)
1903 class DeepCheckBase(SystemTestMixin, ErrorMixin):
1905 def web_json(self, n, **kwargs):
1906 kwargs["output"] = "json"
1907 d = self.web(n, "POST", **kwargs)
1908 d.addCallback(self.decode_json)
1911 def decode_json(self, (s,url)):
1913 data = simplejson.loads(s)
1915 self.fail("%s: not JSON: '%s'" % (url, s))
1918 def web(self, n, method="GET", **kwargs):
1919 # returns (data, url)
1920 url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
1921 + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
1922 d = getPage(url, method=method)
1923 d.addCallback(lambda data: (data,url))
1926 def wait_for_operation(self, ignored, ophandle):
1927 url = self.webish_url + "operations/" + ophandle
1928 url += "?t=status&output=JSON"
1932 data = simplejson.loads(res)
1934 self.fail("%s: not JSON: '%s'" % (url, res))
1935 if not data["finished"]:
1936 d = self.stall(delay=1.0)
1937 d.addCallback(self.wait_for_operation, ophandle)
1943 def get_operation_results(self, ignored, ophandle, output=None):
1944 url = self.webish_url + "operations/" + ophandle
1947 url += "&output=" + output
1950 if output and output.lower() == "json":
1952 return simplejson.loads(res)
1954 self.fail("%s: not JSON: '%s'" % (url, res))
1959 def slow_web(self, n, output=None, **kwargs):
1961 handle = base32.b2a(os.urandom(4))
1962 d = self.web(n, "POST", ophandle=handle, **kwargs)
1963 d.addCallback(self.wait_for_operation, handle)
1964 d.addCallback(self.get_operation_results, handle, output=output)
1968 class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
1969 # construct a small directory tree (with one dir, one immutable file, one
1970 # mutable file, one LIT file, and a loop), and then check/examine it in
1973 def set_up_tree(self, ignored):
1982 c0 = self.clients[0]
1983 d = c0.create_empty_dirnode()
1984 def _created_root(n):
1986 self.root_uri = n.get_uri()
1987 d.addCallback(_created_root)
1988 d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1989 d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1990 def _created_mutable(n):
1992 self.mutable_uri = n.get_uri()
1993 d.addCallback(_created_mutable)
1995 large = upload.Data("Lots of data\n" * 1000, None)
1996 d.addCallback(lambda ign: self.root.add_file(u"large", large))
1997 def _created_large(n):
1999 self.large_uri = n.get_uri()
2000 d.addCallback(_created_large)
2002 small = upload.Data("Small enough for a LIT", None)
2003 d.addCallback(lambda ign: self.root.add_file(u"small", small))
2004 def _created_small(n):
2006 self.small_uri = n.get_uri()
2007 d.addCallback(_created_small)
2009 small2 = upload.Data("Small enough for a LIT too", None)
2010 d.addCallback(lambda ign: self.root.add_file(u"small2", small2))
2011 def _created_small2(n):
2013 self.small2_uri = n.get_uri()
2014 d.addCallback(_created_small2)
2016 d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
2019 def check_is_healthy(self, cr, n, where, incomplete=False):
2020 self.failUnless(ICheckerResults.providedBy(cr), where)
2021 self.failUnless(cr.is_healthy(), where)
2022 self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
2024 self.failUnlessEqual(cr.get_storage_index_string(),
2025 base32.b2a(n.get_storage_index()), where)
2026 needs_rebalancing = bool( len(self.clients) < 10 )
2028 self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
2030 self.failUnlessEqual(d["count-shares-good"], 10, where)
2031 self.failUnlessEqual(d["count-shares-needed"], 3, where)
2032 self.failUnlessEqual(d["count-shares-expected"], 10, where)
2034 self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
2035 self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
2036 self.failUnlessEqual(d["list-corrupt-shares"], [], where)
2038 self.failUnlessEqual(sorted(d["servers-responding"]),
2039 sorted([c.nodeid for c in self.clients]),
2041 self.failUnless("sharemap" in d, where)
2042 all_serverids = set()
2043 for (shareid, serverids) in d["sharemap"].items():
2044 all_serverids.update(serverids)
2045 self.failUnlessEqual(sorted(all_serverids),
2046 sorted([c.nodeid for c in self.clients]),
2049 self.failUnlessEqual(d["count-wrong-shares"], 0, where)
2050 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2051 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2054 def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
2055 self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
2056 self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
2057 self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
2058 self.failUnless(cr.get_post_repair_results().is_healthy(), where)
2059 self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
2060 self.failIf(cr.get_repair_attempted(), where)
2062 def deep_check_is_healthy(self, cr, num_healthy, where):
2063 self.failUnless(IDeepCheckResults.providedBy(cr))
2064 self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
2067 def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
2068 self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
2069 c = cr.get_counters()
2070 self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
2072 self.failUnlessEqual(c["count-objects-healthy-post-repair"],
2074 self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
2076 def test_good(self):
2077 self.basedir = self.mktemp()
2078 d = self.set_up_nodes()
2079 d.addCallback(self.set_up_tree)
2080 d.addCallback(self.do_stats)
2081 d.addCallback(self.do_test_check_good)
2082 d.addCallback(self.do_test_web_good)
2083 d.addCallback(self.do_test_cli_good)
2084 d.addErrback(self.explain_web_error)
2085 d.addErrback(self.explain_error)
2088 def do_stats(self, ignored):
2089 d = defer.succeed(None)
2090 d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
2091 d.addCallback(self.check_stats_good)
2094 def check_stats_good(self, s):
2095 self.failUnlessEqual(s["count-directories"], 1)
2096 self.failUnlessEqual(s["count-files"], 4)
2097 self.failUnlessEqual(s["count-immutable-files"], 1)
2098 self.failUnlessEqual(s["count-literal-files"], 2)
2099 self.failUnlessEqual(s["count-mutable-files"], 1)
2100 # don't check directories: their size will vary
2101 # s["largest-directory"]
2102 # s["size-directories"]
2103 self.failUnlessEqual(s["largest-directory-children"], 5)
2104 self.failUnlessEqual(s["largest-immutable-file"], 13000)
2105 # to re-use this function for both the local
2106 # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
2107 # coerce the result into a list of tuples. dirnode.start_deep_stats()
2108 # returns a list of tuples, but JSON only knows about lists., so
2109 # t=start-deep-stats returns a list of lists.
2110 histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
2111 self.failUnlessEqual(histogram, [(11, 31, 2),
2114 self.failUnlessEqual(s["size-immutable-files"], 13000)
2115 self.failUnlessEqual(s["size-literal-files"], 48)
2117 def do_test_check_good(self, ignored):
2118 d = defer.succeed(None)
2119 # check the individual items
2120 d.addCallback(lambda ign: self.root.check(Monitor()))
2121 d.addCallback(self.check_is_healthy, self.root, "root")
2122 d.addCallback(lambda ign: self.mutable.check(Monitor()))
2123 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2124 d.addCallback(lambda ign: self.large.check(Monitor()))
2125 d.addCallback(self.check_is_healthy, self.large, "large")
2126 d.addCallback(lambda ign: self.small.check(Monitor()))
2127 d.addCallback(self.failUnlessEqual, None, "small")
2128 d.addCallback(lambda ign: self.small2.check(Monitor()))
2129 d.addCallback(self.failUnlessEqual, None, "small2")
2131 # and again with verify=True
2132 d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2133 d.addCallback(self.check_is_healthy, self.root, "root")
2134 d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2135 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2136 d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2137 d.addCallback(self.check_is_healthy, self.large, "large",
2139 d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2140 d.addCallback(self.failUnlessEqual, None, "small")
2141 d.addCallback(lambda ign: self.small2.check(Monitor(), verify=True))
2142 d.addCallback(self.failUnlessEqual, None, "small2")
2144 # and check_and_repair(), which should be a nop
2145 d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2146 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2147 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2148 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2149 d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2150 d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2151 d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2152 d.addCallback(self.failUnlessEqual, None, "small")
2153 d.addCallback(lambda ign: self.small2.check_and_repair(Monitor()))
2154 d.addCallback(self.failUnlessEqual, None, "small2")
2156 # check_and_repair(verify=True)
2157 d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2158 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2159 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2160 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2161 d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2162 d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2164 d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2165 d.addCallback(self.failUnlessEqual, None, "small")
2166 d.addCallback(lambda ign: self.small2.check_and_repair(Monitor(), verify=True))
2167 d.addCallback(self.failUnlessEqual, None, "small2")
2170 # now deep-check the root, with various verify= and repair= options
2171 d.addCallback(lambda ign:
2172 self.root.start_deep_check().when_done())
2173 d.addCallback(self.deep_check_is_healthy, 3, "root")
2174 d.addCallback(lambda ign:
2175 self.root.start_deep_check(verify=True).when_done())
2176 d.addCallback(self.deep_check_is_healthy, 3, "root")
2177 d.addCallback(lambda ign:
2178 self.root.start_deep_check_and_repair().when_done())
2179 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2180 d.addCallback(lambda ign:
2181 self.root.start_deep_check_and_repair(verify=True).when_done())
2182 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2184 # and finally, start a deep-check, but then cancel it.
2185 d.addCallback(lambda ign: self.root.start_deep_check())
2186 def _checking(monitor):
2188 d = monitor.when_done()
2189 # this should fire as soon as the next dirnode.list finishes.
2190 # TODO: add a counter to measure how many list() calls are made,
2191 # assert that no more than one gets to run before the cancel()
2193 def _finished_normally(res):
2194 self.fail("this was supposed to fail, not finish normally")
2196 f.trap(OperationCancelledError)
2197 d.addCallbacks(_finished_normally, _cancelled)
2199 d.addCallback(_checking)
2203 def json_check_is_healthy(self, data, n, where, incomplete=False):
2205 self.failUnlessEqual(data["storage-index"],
2206 base32.b2a(n.get_storage_index()), where)
2207 self.failUnless("summary" in data, (where, data))
2208 self.failUnlessEqual(data["summary"].lower(), "healthy",
2209 "%s: '%s'" % (where, data["summary"]))
2211 self.failUnlessEqual(r["healthy"], True, where)
2212 needs_rebalancing = bool( len(self.clients) < 10 )
2214 self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2215 self.failUnlessEqual(r["count-shares-good"], 10, where)
2216 self.failUnlessEqual(r["count-shares-needed"], 3, where)
2217 self.failUnlessEqual(r["count-shares-expected"], 10, where)
2219 self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2220 self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2221 self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2223 self.failUnlessEqual(sorted(r["servers-responding"]),
2224 sorted([idlib.nodeid_b2a(c.nodeid)
2225 for c in self.clients]), where)
2226 self.failUnless("sharemap" in r, where)
2227 all_serverids = set()
2228 for (shareid, serverids_s) in r["sharemap"].items():
2229 all_serverids.update(serverids_s)
2230 self.failUnlessEqual(sorted(all_serverids),
2231 sorted([idlib.nodeid_b2a(c.nodeid)
2232 for c in self.clients]), where)
2233 self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2234 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2235 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2237 def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2238 self.failUnlessEqual(data["storage-index"],
2239 base32.b2a(n.get_storage_index()), where)
2240 self.failUnlessEqual(data["repair-attempted"], False, where)
2241 self.json_check_is_healthy(data["pre-repair-results"],
2242 n, where, incomplete)
2243 self.json_check_is_healthy(data["post-repair-results"],
2244 n, where, incomplete)
2246 def json_full_deepcheck_is_healthy(self, data, n, where):
2247 self.failUnlessEqual(data["root-storage-index"],
2248 base32.b2a(n.get_storage_index()), where)
2249 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2250 self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2251 self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2252 self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2253 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2254 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2255 self.json_check_stats_good(data["stats"], where)
2257 def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2258 self.failUnlessEqual(data["root-storage-index"],
2259 base32.b2a(n.get_storage_index()), where)
2260 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2262 self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2263 self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2264 self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2266 self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2267 self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2268 self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2270 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2271 self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2272 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2274 self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2275 self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2276 self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2279 def json_check_lit(self, data, n, where):
2280 self.failUnlessEqual(data["storage-index"], "", where)
2281 self.failUnlessEqual(data["results"]["healthy"], True, where)
2283 def json_check_stats_good(self, data, where):
2284 self.check_stats_good(data)
2286 def do_test_web_good(self, ignored):
2287 d = defer.succeed(None)
2290 d.addCallback(lambda ign:
2291 self.slow_web(self.root,
2292 t="start-deep-stats", output="json"))
2293 d.addCallback(self.json_check_stats_good, "deep-stats")
2296 d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2297 d.addCallback(self.json_check_is_healthy, self.root, "root")
2298 d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2299 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2300 d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2301 d.addCallback(self.json_check_is_healthy, self.large, "large")
2302 d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2303 d.addCallback(self.json_check_lit, self.small, "small")
2304 d.addCallback(lambda ign: self.web_json(self.small2, t="check"))
2305 d.addCallback(self.json_check_lit, self.small2, "small2")
2308 d.addCallback(lambda ign:
2309 self.web_json(self.root, t="check", verify="true"))
2310 d.addCallback(self.json_check_is_healthy, self.root, "root+v")
2311 d.addCallback(lambda ign:
2312 self.web_json(self.mutable, t="check", verify="true"))
2313 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable+v")
2314 d.addCallback(lambda ign:
2315 self.web_json(self.large, t="check", verify="true"))
2316 d.addCallback(self.json_check_is_healthy, self.large, "large+v",
2318 d.addCallback(lambda ign:
2319 self.web_json(self.small, t="check", verify="true"))
2320 d.addCallback(self.json_check_lit, self.small, "small+v")
2321 d.addCallback(lambda ign:
2322 self.web_json(self.small2, t="check", verify="true"))
2323 d.addCallback(self.json_check_lit, self.small2, "small2+v")
2325 # check and repair, no verify
2326 d.addCallback(lambda ign:
2327 self.web_json(self.root, t="check", repair="true"))
2328 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+r")
2329 d.addCallback(lambda ign:
2330 self.web_json(self.mutable, t="check", repair="true"))
2331 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+r")
2332 d.addCallback(lambda ign:
2333 self.web_json(self.large, t="check", repair="true"))
2334 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+r")
2335 d.addCallback(lambda ign:
2336 self.web_json(self.small, t="check", repair="true"))
2337 d.addCallback(self.json_check_lit, self.small, "small+r")
2338 d.addCallback(lambda ign:
2339 self.web_json(self.small2, t="check", repair="true"))
2340 d.addCallback(self.json_check_lit, self.small2, "small2+r")
2342 # check+verify+repair
2343 d.addCallback(lambda ign:
2344 self.web_json(self.root, t="check", repair="true", verify="true"))
2345 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+vr")
2346 d.addCallback(lambda ign:
2347 self.web_json(self.mutable, t="check", repair="true", verify="true"))
2348 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+vr")
2349 d.addCallback(lambda ign:
2350 self.web_json(self.large, t="check", repair="true", verify="true"))
2351 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+vr", incomplete=True)
2352 d.addCallback(lambda ign:
2353 self.web_json(self.small, t="check", repair="true", verify="true"))
2354 d.addCallback(self.json_check_lit, self.small, "small+vr")
2355 d.addCallback(lambda ign:
2356 self.web_json(self.small2, t="check", repair="true", verify="true"))
2357 d.addCallback(self.json_check_lit, self.small2, "small2+vr")
2359 # now run a deep-check, with various verify= and repair= flags
2360 d.addCallback(lambda ign:
2361 self.slow_web(self.root, t="start-deep-check", output="json"))
2362 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+d")
2363 d.addCallback(lambda ign:
2364 self.slow_web(self.root, t="start-deep-check", verify="true",
2366 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+dv")
2367 d.addCallback(lambda ign:
2368 self.slow_web(self.root, t="start-deep-check", repair="true",
2370 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dr")
2371 d.addCallback(lambda ign:
2372 self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2373 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dvr")
2375 # now look at t=info
2376 d.addCallback(lambda ign: self.web(self.root, t="info"))
2377 # TODO: examine the output
2378 d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2379 d.addCallback(lambda ign: self.web(self.large, t="info"))
2380 d.addCallback(lambda ign: self.web(self.small, t="info"))
2381 d.addCallback(lambda ign: self.web(self.small2, t="info"))
2385 def _run_cli(self, argv, stdin=""):
2387 stdout, stderr = StringIO(), StringIO()
2388 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
2389 stdin=StringIO(stdin),
2390 stdout=stdout, stderr=stderr)
2392 return stdout.getvalue(), stderr.getvalue()
2393 d.addCallback(_done)
2396 def do_test_cli_good(self, ignored):
2397 basedir = self.getdir("client0")
2398 d = self._run_cli(["manifest",
2399 "--node-directory", basedir,
2401 def _check((out,err)):
2402 lines = [l for l in out.split("\n") if l]
2403 self.failUnlessEqual(len(lines), 5)
2407 cap, path = l.split(None, 1)
2412 self.failUnless(self.root.get_uri() in caps)
2413 self.failUnlessEqual(caps[self.root.get_uri()], "")
2414 self.failUnlessEqual(caps[self.mutable.get_uri()], "mutable")
2415 self.failUnlessEqual(caps[self.large.get_uri()], "large")
2416 self.failUnlessEqual(caps[self.small.get_uri()], "small")
2417 self.failUnlessEqual(caps[self.small2.get_uri()], "small2")
2418 d.addCallback(_check)
2420 d.addCallback(lambda res:
2421 self._run_cli(["manifest",
2422 "--node-directory", basedir,
2423 "--storage-index", self.root_uri]))
2424 def _check2((out,err)):
2425 lines = [l for l in out.split("\n") if l]
2426 self.failUnlessEqual(len(lines), 3)
2427 self.failUnless(base32.b2a(self.root.get_storage_index()) in lines)
2428 self.failUnless(base32.b2a(self.mutable.get_storage_index()) in lines)
2429 self.failUnless(base32.b2a(self.large.get_storage_index()) in lines)
2430 d.addCallback(_check2)
2432 d.addCallback(lambda res:
2433 self._run_cli(["stats",
2434 "--node-directory", basedir,
2436 def _check3((out,err)):
2437 lines = [l.strip() for l in out.split("\n") if l]
2438 self.failUnless("count-immutable-files: 1" in lines)
2439 self.failUnless("count-mutable-files: 1" in lines)
2440 self.failUnless("count-literal-files: 2" in lines)
2441 self.failUnless("count-files: 4" in lines)
2442 self.failUnless("count-directories: 1" in lines)
2443 self.failUnless("size-immutable-files: 13000 (13.00 kB, 12.70 kiB)" in lines, lines)
2444 self.failUnless("size-literal-files: 48" in lines)
2445 self.failUnless(" 11-31 : 2 (31 B, 31 B)".strip() in lines)
2446 self.failUnless("10001-31622 : 1 (31.62 kB, 30.88 kiB)".strip() in lines)
2447 d.addCallback(_check3)
2449 d.addCallback(lambda res:
2450 self._run_cli(["stats",
2451 "--node-directory", basedir,
2454 def _check4((out,err)):
2455 data = simplejson.loads(out)
2456 self.failUnlessEqual(data["count-immutable-files"], 1)
2457 self.failUnlessEqual(data["count-immutable-files"], 1)
2458 self.failUnlessEqual(data["count-mutable-files"], 1)
2459 self.failUnlessEqual(data["count-literal-files"], 2)
2460 self.failUnlessEqual(data["count-files"], 4)
2461 self.failUnlessEqual(data["count-directories"], 1)
2462 self.failUnlessEqual(data["size-immutable-files"], 13000)
2463 self.failUnlessEqual(data["size-literal-files"], 48)
2464 self.failUnless([11,31,2] in data["size-files-histogram"])
2465 self.failUnless([10001,31622,1] in data["size-files-histogram"])
2466 d.addCallback(_check4)
2471 class DeepCheckWebBad(DeepCheckBase, unittest.TestCase):
2474 self.basedir = self.mktemp()
2475 d = self.set_up_nodes()
2476 d.addCallback(self.set_up_damaged_tree)
2477 d.addCallback(self.do_test_check_bad)
2478 d.addCallback(self.do_test_deepcheck_bad)
2479 d.addCallback(self.do_test_web_bad)
2480 d.addErrback(self.explain_web_error)
2481 d.addErrback(self.explain_error)
2486 def set_up_damaged_tree(self, ignored):
2491 # mutable-missing-shares
2492 # mutable-corrupt-shares
2493 # mutable-unrecoverable
2495 # large-missing-shares
2496 # large-corrupt-shares
2497 # large-unrecoverable
2501 c0 = self.clients[0]
2502 d = c0.create_empty_dirnode()
2503 def _created_root(n):
2505 self.root_uri = n.get_uri()
2506 d.addCallback(_created_root)
2507 d.addCallback(self.create_mangled, "mutable-good")
2508 d.addCallback(self.create_mangled, "mutable-missing-shares")
2509 d.addCallback(self.create_mangled, "mutable-corrupt-shares")
2510 d.addCallback(self.create_mangled, "mutable-unrecoverable")
2511 d.addCallback(self.create_mangled, "large-good")
2512 d.addCallback(self.create_mangled, "large-missing-shares")
2513 d.addCallback(self.create_mangled, "large-corrupt-shares")
2514 d.addCallback(self.create_mangled, "large-unrecoverable")
2519 def create_mangled(self, ignored, name):
2520 nodetype, mangletype = name.split("-", 1)
2521 if nodetype == "mutable":
2522 d = self.clients[0].create_mutable_file("mutable file contents")
2523 d.addCallback(lambda n: self.root.set_node(unicode(name), n))
2524 elif nodetype == "large":
2525 large = upload.Data("Lots of data\n" * 1000 + name + "\n", None)
2526 d = self.root.add_file(unicode(name), large)
2527 elif nodetype == "small":
2528 small = upload.Data("Small enough for a LIT", None)
2529 d = self.root.add_file(unicode(name), small)
2531 def _stash_node(node):
2532 self.nodes[name] = node
2534 d.addCallback(_stash_node)
2536 if mangletype == "good":
2538 elif mangletype == "missing-shares":
2539 d.addCallback(self._delete_some_shares)
2540 elif mangletype == "corrupt-shares":
2541 d.addCallback(self._corrupt_some_shares)
2543 assert mangletype == "unrecoverable"
2544 d.addCallback(self._delete_most_shares)
2548 def _run_cli(self, argv):
2549 stdout, stderr = StringIO(), StringIO()
2550 # this can only do synchronous operations
2551 assert argv[0] == "debug"
2552 runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
2553 return stdout.getvalue()
2555 def _find_shares(self, node):
2556 si = node.get_storage_index()
2557 out = self._run_cli(["debug", "find-shares", base32.b2a(si)] +
2558 [c.basedir for c in self.clients])
2559 files = out.split("\n")
2560 return [f for f in files if f]
2562 def _delete_some_shares(self, node):
2563 shares = self._find_shares(node)
2564 os.unlink(shares[0])
2565 os.unlink(shares[1])
2567 def _corrupt_some_shares(self, node):
2568 shares = self._find_shares(node)
2569 self._run_cli(["debug", "corrupt-share", shares[0]])
2570 self._run_cli(["debug", "corrupt-share", shares[1]])
2572 def _delete_most_shares(self, node):
2573 shares = self._find_shares(node)
2574 for share in shares[1:]:
2578 def check_is_healthy(self, cr, where):
2579 self.failUnless(ICheckerResults.providedBy(cr), where)
2580 self.failUnless(cr.is_healthy(), where)
2581 self.failUnless(cr.is_recoverable(), where)
2583 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2584 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2587 def check_is_missing_shares(self, cr, where):
2588 self.failUnless(ICheckerResults.providedBy(cr), where)
2589 self.failIf(cr.is_healthy(), where)
2590 self.failUnless(cr.is_recoverable(), where)
2592 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2593 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2596 def check_has_corrupt_shares(self, cr, where):
2597 # by "corrupt-shares" we mean the file is still recoverable
2598 self.failUnless(ICheckerResults.providedBy(cr), where)
2600 self.failIf(cr.is_healthy(), where)
2601 self.failUnless(cr.is_recoverable(), where)
2603 self.failUnless(d["count-shares-good"] < 10, where)
2604 self.failUnless(d["count-corrupt-shares"], where)
2605 self.failUnless(d["list-corrupt-shares"], where)
2608 def check_is_unrecoverable(self, cr, where):
2609 self.failUnless(ICheckerResults.providedBy(cr), where)
2611 self.failIf(cr.is_healthy(), where)
2612 self.failIf(cr.is_recoverable(), where)
2613 self.failUnless(d["count-shares-good"] < d["count-shares-needed"],
2615 self.failUnlessEqual(d["count-recoverable-versions"], 0, where)
2616 self.failUnlessEqual(d["count-unrecoverable-versions"], 1, where)
2619 def do_test_check_bad(self, ignored):
2620 d = defer.succeed(None)
2622 # check the individual items, without verification. This will not
2623 # detect corrupt shares.
2624 def _check(which, checker):
2625 d = self.nodes[which].check(Monitor())
2626 d.addCallback(checker, which + "--check")
2629 d.addCallback(lambda ign: _check("mutable-good", self.check_is_healthy))
2630 d.addCallback(lambda ign: _check("mutable-missing-shares",
2631 self.check_is_missing_shares))
2632 d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2633 self.check_is_healthy))
2634 d.addCallback(lambda ign: _check("mutable-unrecoverable",
2635 self.check_is_unrecoverable))
2636 d.addCallback(lambda ign: _check("large-good", self.check_is_healthy))
2637 d.addCallback(lambda ign: _check("large-missing-shares",
2638 self.check_is_missing_shares))
2639 d.addCallback(lambda ign: _check("large-corrupt-shares",
2640 self.check_is_healthy))
2641 d.addCallback(lambda ign: _check("large-unrecoverable",
2642 self.check_is_unrecoverable))
2644 # and again with verify=True, which *does* detect corrupt shares.
2645 def _checkv(which, checker):
2646 d = self.nodes[which].check(Monitor(), verify=True)
2647 d.addCallback(checker, which + "--check-and-verify")
2650 d.addCallback(lambda ign: _checkv("mutable-good", self.check_is_healthy))
2651 d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2652 self.check_is_missing_shares))
2653 d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2654 self.check_has_corrupt_shares))
2655 d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2656 self.check_is_unrecoverable))
2657 d.addCallback(lambda ign: _checkv("large-good", self.check_is_healthy))
2658 # disabled pending immutable verifier
2659 #d.addCallback(lambda ign: _checkv("large-missing-shares",
2660 # self.check_is_missing_shares))
2661 #d.addCallback(lambda ign: _checkv("large-corrupt-shares",
2662 # self.check_has_corrupt_shares))
2663 d.addCallback(lambda ign: _checkv("large-unrecoverable",
2664 self.check_is_unrecoverable))
2668 def do_test_deepcheck_bad(self, ignored):
2669 d = defer.succeed(None)
2671 # now deep-check the root, with various verify= and repair= options
2672 d.addCallback(lambda ign:
2673 self.root.start_deep_check().when_done())
2675 self.failUnless(IDeepCheckResults.providedBy(cr))
2676 c = cr.get_counters()
2677 self.failUnlessEqual(c["count-objects-checked"], 9)
2678 self.failUnlessEqual(c["count-objects-healthy"], 5)
2679 self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2680 self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2681 d.addCallback(_check1)
2683 d.addCallback(lambda ign:
2684 self.root.start_deep_check(verify=True).when_done())
2686 self.failUnless(IDeepCheckResults.providedBy(cr))
2687 c = cr.get_counters()
2688 self.failUnlessEqual(c["count-objects-checked"], 9)
2689 # until we have a real immutable verifier, these counts will be
2691 #self.failUnlessEqual(c["count-objects-healthy"], 3)
2692 #self.failUnlessEqual(c["count-objects-unhealthy"], 6)
2693 self.failUnlessEqual(c["count-objects-healthy"], 5) # todo
2694 self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2695 self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2696 d.addCallback(_check2)
2700 def json_is_healthy(self, data, where):
2702 self.failUnless(r["healthy"], where)
2703 self.failUnless(r["recoverable"], where)
2704 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2705 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2707 def json_is_missing_shares(self, data, where):
2709 self.failIf(r["healthy"], where)
2710 self.failUnless(r["recoverable"], where)
2711 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2712 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2714 def json_has_corrupt_shares(self, data, where):
2715 # by "corrupt-shares" we mean the file is still recoverable
2717 self.failIf(r["healthy"], where)
2718 self.failUnless(r["recoverable"], where)
2719 self.failUnless(r["count-shares-good"] < 10, where)
2720 self.failUnless(r["count-corrupt-shares"], where)
2721 self.failUnless(r["list-corrupt-shares"], where)
2723 def json_is_unrecoverable(self, data, where):
2725 self.failIf(r["healthy"], where)
2726 self.failIf(r["recoverable"], where)
2727 self.failUnless(r["count-shares-good"] < r["count-shares-needed"],
2729 self.failUnlessEqual(r["count-recoverable-versions"], 0, where)
2730 self.failUnlessEqual(r["count-unrecoverable-versions"], 1, where)
2732 def do_test_web_bad(self, ignored):
2733 d = defer.succeed(None)
2736 def _check(which, checker):
2737 d = self.web_json(self.nodes[which], t="check")
2738 d.addCallback(checker, which + "--webcheck")
2741 d.addCallback(lambda ign: _check("mutable-good",
2742 self.json_is_healthy))
2743 d.addCallback(lambda ign: _check("mutable-missing-shares",
2744 self.json_is_missing_shares))
2745 d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2746 self.json_is_healthy))
2747 d.addCallback(lambda ign: _check("mutable-unrecoverable",
2748 self.json_is_unrecoverable))
2749 d.addCallback(lambda ign: _check("large-good",
2750 self.json_is_healthy))
2751 d.addCallback(lambda ign: _check("large-missing-shares",
2752 self.json_is_missing_shares))
2753 d.addCallback(lambda ign: _check("large-corrupt-shares",
2754 self.json_is_healthy))
2755 d.addCallback(lambda ign: _check("large-unrecoverable",
2756 self.json_is_unrecoverable))
2759 def _checkv(which, checker):
2760 d = self.web_json(self.nodes[which], t="check", verify="true")
2761 d.addCallback(checker, which + "--webcheck-and-verify")
2764 d.addCallback(lambda ign: _checkv("mutable-good",
2765 self.json_is_healthy))
2766 d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2767 self.json_is_missing_shares))
2768 d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2769 self.json_has_corrupt_shares))
2770 d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2771 self.json_is_unrecoverable))
2772 d.addCallback(lambda ign: _checkv("large-good",
2773 self.json_is_healthy))
2774 # disabled pending immutable verifier
2775 #d.addCallback(lambda ign: _checkv("large-missing-shares",
2776 # self.json_is_missing_shares))
2777 #d.addCallback(lambda ign: _checkv("large-corrupt-shares",
2778 # self.json_has_corrupt_shares))
2779 d.addCallback(lambda ign: _checkv("large-unrecoverable",
2780 self.json_is_unrecoverable))