1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
11 from allmydata import uri, storage, offloaded
12 from allmydata.immutable import download, upload, filenode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17 ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
18 IDeepCheckAndRepairResults, NoSuchChildError, NotEnoughSharesError
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
27 from allmydata.test.common import SystemTestMixin, ErrorMixin, \
28 MemoryConsumer, download_to_data
31 This is some data to publish to the virtual drive, which needs to be large
32 enough to not fit inside a LIT uri.
35 class CountingDataUploadable(upload.Data):
37 interrupt_after = None
38 interrupt_after_d = None
40 def read(self, length):
41 self.bytes_read += length
42 if self.interrupt_after is not None:
43 if self.bytes_read > self.interrupt_after:
44 self.interrupt_after = None
45 self.interrupt_after_d.callback(self)
46 return upload.Data.read(self, length)
48 class GrabEverythingConsumer:
54 def registerProducer(self, producer, streaming):
56 assert IPushProducer.providedBy(producer)
58 def write(self, data):
61 def unregisterProducer(self):
64 class SystemTest(SystemTestMixin, unittest.TestCase):
66 def test_connections(self):
67 self.basedir = "system/SystemTest/test_connections"
68 d = self.set_up_nodes()
69 self.extra_node = None
70 d.addCallback(lambda res: self.add_extra_node(self.numclients))
71 def _check(extra_node):
72 self.extra_node = extra_node
73 for c in self.clients:
74 all_peerids = list(c.get_all_peerids())
75 self.failUnlessEqual(len(all_peerids), self.numclients+1)
76 permuted_peers = list(c.get_permuted_peers("storage", "a"))
77 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
80 def _shutdown_extra_node(res):
82 return self.extra_node.stopService()
84 d.addBoth(_shutdown_extra_node)
86 test_connections.timeout = 300
87 # test_connections is subsumed by test_upload_and_download, and takes
88 # quite a while to run on a slow machine (because of all the TLS
89 # connections that must be established). If we ever rework the introducer
90 # code to such an extent that we're not sure if it works anymore, we can
91 # reinstate this test until it does.
94 def test_upload_and_download_random_key(self):
95 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
96 return self._test_upload_and_download(convergence=None)
97 test_upload_and_download_random_key.timeout = 4800
99 def test_upload_and_download_convergent(self):
100 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
101 return self._test_upload_and_download(convergence="some convergence string")
102 test_upload_and_download_convergent.timeout = 4800
104 def _test_upload_and_download(self, convergence):
105 # we use 4000 bytes of data, which will result in about 400k written
106 # to disk among all our simulated nodes
107 DATA = "Some data to upload\n" * 200
108 d = self.set_up_nodes()
109 def _check_connections(res):
110 for c in self.clients:
111 all_peerids = list(c.get_all_peerids())
112 self.failUnlessEqual(len(all_peerids), self.numclients)
113 permuted_peers = list(c.get_permuted_peers("storage", "a"))
114 self.failUnlessEqual(len(permuted_peers), self.numclients)
115 d.addCallback(_check_connections)
119 u = self.clients[0].getServiceNamed("uploader")
121 # we crank the max segsize down to 1024b for the duration of this
122 # test, so we can exercise multiple segments. It is important
123 # that this is not a multiple of the segment size, so that the
124 # tail segment is not the same length as the others. This actualy
125 # gets rounded up to 1025 to be a multiple of the number of
126 # required shares (since we use 25 out of 100 FEC).
127 up = upload.Data(DATA, convergence=convergence)
128 up.max_segment_size = 1024
131 d.addCallback(_do_upload)
132 def _upload_done(results):
134 log.msg("upload finished: uri is %s" % (uri,))
136 dl = self.clients[1].getServiceNamed("downloader")
138 d.addCallback(_upload_done)
140 def _upload_again(res):
141 # Upload again. If using convergent encryption then this ought to be
142 # short-circuited, however with the way we currently generate URIs
143 # (i.e. because they include the roothash), we have to do all of the
144 # encoding work, and only get to save on the upload part.
145 log.msg("UPLOADING AGAIN")
146 up = upload.Data(DATA, convergence=convergence)
147 up.max_segment_size = 1024
148 d1 = self.uploader.upload(up)
149 d.addCallback(_upload_again)
151 def _download_to_data(res):
152 log.msg("DOWNLOADING")
153 return self.downloader.download_to_data(self.uri)
154 d.addCallback(_download_to_data)
155 def _download_to_data_done(data):
156 log.msg("download finished")
157 self.failUnlessEqual(data, DATA)
158 d.addCallback(_download_to_data_done)
160 target_filename = os.path.join(self.basedir, "download.target")
161 def _download_to_filename(res):
162 return self.downloader.download_to_filename(self.uri,
164 d.addCallback(_download_to_filename)
165 def _download_to_filename_done(res):
166 newdata = open(target_filename, "rb").read()
167 self.failUnlessEqual(newdata, DATA)
168 d.addCallback(_download_to_filename_done)
170 target_filename2 = os.path.join(self.basedir, "download.target2")
171 def _download_to_filehandle(res):
172 fh = open(target_filename2, "wb")
173 return self.downloader.download_to_filehandle(self.uri, fh)
174 d.addCallback(_download_to_filehandle)
175 def _download_to_filehandle_done(fh):
177 newdata = open(target_filename2, "rb").read()
178 self.failUnlessEqual(newdata, DATA)
179 d.addCallback(_download_to_filehandle_done)
181 consumer = GrabEverythingConsumer()
182 ct = download.ConsumerAdapter(consumer)
183 d.addCallback(lambda res:
184 self.downloader.download(self.uri, ct))
185 def _download_to_consumer_done(ign):
186 self.failUnlessEqual(consumer.contents, DATA)
187 d.addCallback(_download_to_consumer_done)
190 n = self.clients[1].create_node_from_uri(self.uri)
191 d = download_to_data(n)
192 def _read_done(data):
193 self.failUnlessEqual(data, DATA)
194 d.addCallback(_read_done)
195 d.addCallback(lambda ign:
196 n.read(MemoryConsumer(), offset=1, size=4))
197 def _read_portion_done(mc):
198 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
199 d.addCallback(_read_portion_done)
200 d.addCallback(lambda ign:
201 n.read(MemoryConsumer(), offset=2, size=None))
202 def _read_tail_done(mc):
203 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
204 d.addCallback(_read_tail_done)
205 d.addCallback(lambda ign:
206 n.read(MemoryConsumer(), size=len(DATA)+1000))
207 def _read_too_much(mc):
208 self.failUnlessEqual("".join(mc.chunks), DATA)
209 d.addCallback(_read_too_much)
212 d.addCallback(_test_read)
214 def _test_bad_read(res):
215 bad_u = uri.from_string_filenode(self.uri)
216 bad_u.key = self.flip_bit(bad_u.key)
217 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
218 # this should cause an error during download
220 d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
222 bad_n.read, MemoryConsumer(), offset=2)
224 d.addCallback(_test_bad_read)
226 def _download_nonexistent_uri(res):
227 baduri = self.mangle_uri(self.uri)
228 log.msg("about to download non-existent URI", level=log.UNUSUAL,
229 facility="tahoe.tests")
230 d1 = self.downloader.download_to_data(baduri)
231 def _baduri_should_fail(res):
232 log.msg("finished downloading non-existend URI",
233 level=log.UNUSUAL, facility="tahoe.tests")
234 self.failUnless(isinstance(res, Failure))
235 self.failUnless(res.check(NotEnoughSharesError),
236 "expected NotEnoughSharesError, got %s" % res)
237 # TODO: files that have zero peers should get a special kind
238 # of NotEnoughSharesError, which can be used to suggest that
239 # the URI might be wrong or that they've never uploaded the
240 # file in the first place.
241 d1.addBoth(_baduri_should_fail)
243 d.addCallback(_download_nonexistent_uri)
245 # add a new node, which doesn't accept shares, and only uses the
247 d.addCallback(lambda res: self.add_extra_node(self.numclients,
249 add_to_sparent=True))
250 def _added(extra_node):
251 self.extra_node = extra_node
252 extra_node.getServiceNamed("storage").sizelimit = 0
253 d.addCallback(_added)
255 HELPER_DATA = "Data that needs help to upload" * 1000
256 def _upload_with_helper(res):
257 u = upload.Data(HELPER_DATA, convergence=convergence)
258 d = self.extra_node.upload(u)
259 def _uploaded(results):
261 return self.downloader.download_to_data(uri)
262 d.addCallback(_uploaded)
264 self.failUnlessEqual(newdata, HELPER_DATA)
265 d.addCallback(_check)
267 d.addCallback(_upload_with_helper)
269 def _upload_duplicate_with_helper(res):
270 u = upload.Data(HELPER_DATA, convergence=convergence)
271 u.debug_stash_RemoteEncryptedUploadable = True
272 d = self.extra_node.upload(u)
273 def _uploaded(results):
275 return self.downloader.download_to_data(uri)
276 d.addCallback(_uploaded)
278 self.failUnlessEqual(newdata, HELPER_DATA)
279 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
280 "uploadable started uploading, should have been avoided")
281 d.addCallback(_check)
283 if convergence is not None:
284 d.addCallback(_upload_duplicate_with_helper)
286 def _upload_resumable(res):
287 DATA = "Data that needs help to upload and gets interrupted" * 1000
288 u1 = CountingDataUploadable(DATA, convergence=convergence)
289 u2 = CountingDataUploadable(DATA, convergence=convergence)
291 # we interrupt the connection after about 5kB by shutting down
292 # the helper, then restartingit.
293 u1.interrupt_after = 5000
294 u1.interrupt_after_d = defer.Deferred()
295 u1.interrupt_after_d.addCallback(lambda res:
296 self.bounce_client(0))
298 # sneak into the helper and reduce its chunk size, so that our
299 # debug_interrupt will sever the connection on about the fifth
300 # chunk fetched. This makes sure that we've started to write the
301 # new shares before we abandon them, which exercises the
302 # abort/delete-partial-share code. TODO: find a cleaner way to do
303 # this. I know that this will affect later uses of the helper in
304 # this same test run, but I'm not currently worried about it.
305 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
307 d = self.extra_node.upload(u1)
309 def _should_not_finish(res):
310 self.fail("interrupted upload should have failed, not finished"
311 " with result %s" % (res,))
313 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
315 # make sure we actually interrupted it before finishing the
317 self.failUnless(u1.bytes_read < len(DATA),
318 "read %d out of %d total" % (u1.bytes_read,
321 log.msg("waiting for reconnect", level=log.NOISY,
322 facility="tahoe.test.test_system")
323 # now, we need to give the nodes a chance to notice that this
324 # connection has gone away. When this happens, the storage
325 # servers will be told to abort their uploads, removing the
326 # partial shares. Unfortunately this involves TCP messages
327 # going through the loopback interface, and we can't easily
328 # predict how long that will take. If it were all local, we
329 # could use fireEventually() to stall. Since we don't have
330 # the right introduction hooks, the best we can do is use a
331 # fixed delay. TODO: this is fragile.
332 u1.interrupt_after_d.addCallback(self.stall, 2.0)
333 return u1.interrupt_after_d
334 d.addCallbacks(_should_not_finish, _interrupted)
336 def _disconnected(res):
337 # check to make sure the storage servers aren't still hanging
338 # on to the partial share: their incoming/ directories should
340 log.msg("disconnected", level=log.NOISY,
341 facility="tahoe.test.test_system")
342 for i in range(self.numclients):
343 incdir = os.path.join(self.getdir("client%d" % i),
344 "storage", "shares", "incoming")
345 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
346 d.addCallback(_disconnected)
348 # then we need to give the reconnector a chance to
349 # reestablish the connection to the helper.
350 d.addCallback(lambda res:
351 log.msg("wait_for_connections", level=log.NOISY,
352 facility="tahoe.test.test_system"))
353 d.addCallback(lambda res: self.wait_for_connections())
356 d.addCallback(lambda res:
357 log.msg("uploading again", level=log.NOISY,
358 facility="tahoe.test.test_system"))
359 d.addCallback(lambda res: self.extra_node.upload(u2))
361 def _uploaded(results):
363 log.msg("Second upload complete", level=log.NOISY,
364 facility="tahoe.test.test_system")
366 # this is really bytes received rather than sent, but it's
367 # convenient and basically measures the same thing
368 bytes_sent = results.ciphertext_fetched
370 # We currently don't support resumption of upload if the data is
371 # encrypted with a random key. (Because that would require us
372 # to store the key locally and re-use it on the next upload of
373 # this file, which isn't a bad thing to do, but we currently
375 if convergence is not None:
376 # Make sure we did not have to read the whole file the
377 # second time around .
378 self.failUnless(bytes_sent < len(DATA),
379 "resumption didn't save us any work:"
380 " read %d bytes out of %d total" %
381 (bytes_sent, len(DATA)))
383 # Make sure we did have to read the whole file the second
384 # time around -- because the one that we partially uploaded
385 # earlier was encrypted with a different random key.
386 self.failIf(bytes_sent < len(DATA),
387 "resumption saved us some work even though we were using random keys:"
388 " read %d bytes out of %d total" %
389 (bytes_sent, len(DATA)))
390 return self.downloader.download_to_data(uri)
391 d.addCallback(_uploaded)
394 self.failUnlessEqual(newdata, DATA)
395 # If using convergent encryption, then also check that the
396 # helper has removed the temp file from its directories.
397 if convergence is not None:
398 basedir = os.path.join(self.getdir("client0"), "helper")
399 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
400 self.failUnlessEqual(files, [])
401 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
402 self.failUnlessEqual(files, [])
403 d.addCallback(_check)
405 d.addCallback(_upload_resumable)
409 def _find_shares(self, basedir):
411 for (dirpath, dirnames, filenames) in os.walk(basedir):
412 if "storage" not in dirpath:
416 pieces = dirpath.split(os.sep)
417 if pieces[-4] == "storage" and pieces[-3] == "shares":
418 # we're sitting in .../storage/shares/$START/$SINDEX , and there
419 # are sharefiles here
420 assert pieces[-5].startswith("client")
421 client_num = int(pieces[-5][-1])
422 storage_index_s = pieces[-1]
423 storage_index = storage.si_a2b(storage_index_s)
424 for sharename in filenames:
425 shnum = int(sharename)
426 filename = os.path.join(dirpath, sharename)
427 data = (client_num, storage_index, filename, shnum)
430 self.fail("unable to find any share files in %s" % basedir)
433 def _corrupt_mutable_share(self, filename, which):
434 msf = storage.MutableShareFile(filename)
435 datav = msf.readv([ (0, 1000000) ])
436 final_share = datav[0]
437 assert len(final_share) < 1000000 # ought to be truncated
438 pieces = mutable_layout.unpack_share(final_share)
439 (seqnum, root_hash, IV, k, N, segsize, datalen,
440 verification_key, signature, share_hash_chain, block_hash_tree,
441 share_data, enc_privkey) = pieces
443 if which == "seqnum":
446 root_hash = self.flip_bit(root_hash)
448 IV = self.flip_bit(IV)
449 elif which == "segsize":
450 segsize = segsize + 15
451 elif which == "pubkey":
452 verification_key = self.flip_bit(verification_key)
453 elif which == "signature":
454 signature = self.flip_bit(signature)
455 elif which == "share_hash_chain":
456 nodenum = share_hash_chain.keys()[0]
457 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
458 elif which == "block_hash_tree":
459 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
460 elif which == "share_data":
461 share_data = self.flip_bit(share_data)
462 elif which == "encprivkey":
463 enc_privkey = self.flip_bit(enc_privkey)
465 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
467 final_share = mutable_layout.pack_share(prefix,
474 msf.writev( [(0, final_share)], None)
477 def test_mutable(self):
478 self.basedir = "system/SystemTest/test_mutable"
479 DATA = "initial contents go here." # 25 bytes % 3 != 0
480 NEWDATA = "new contents yay"
481 NEWERDATA = "this is getting old"
483 d = self.set_up_nodes(use_key_generator=True)
485 def _create_mutable(res):
487 log.msg("starting create_mutable_file")
488 d1 = c.create_mutable_file(DATA)
490 log.msg("DONE: %s" % (res,))
491 self._mutable_node_1 = res
493 d1.addCallback(_done)
495 d.addCallback(_create_mutable)
497 def _test_debug(res):
498 # find a share. It is important to run this while there is only
499 # one slot in the grid.
500 shares = self._find_shares(self.basedir)
501 (client_num, storage_index, filename, shnum) = shares[0]
502 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
504 log.msg(" for clients[%d]" % client_num)
506 out,err = StringIO(), StringIO()
507 rc = runner.runner(["debug", "dump-share", "--offsets",
509 stdout=out, stderr=err)
510 output = out.getvalue()
511 self.failUnlessEqual(rc, 0)
513 self.failUnless("Mutable slot found:\n" in output)
514 self.failUnless("share_type: SDMF\n" in output)
515 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
516 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
517 self.failUnless(" num_extra_leases: 0\n" in output)
518 # the pubkey size can vary by a byte, so the container might
519 # be a bit larger on some runs.
520 m = re.search(r'^ container_size: (\d+)$', output, re.M)
522 container_size = int(m.group(1))
523 self.failUnless(2037 <= container_size <= 2049, container_size)
524 m = re.search(r'^ data_length: (\d+)$', output, re.M)
526 data_length = int(m.group(1))
527 self.failUnless(2037 <= data_length <= 2049, data_length)
528 self.failUnless(" secrets are for nodeid: %s\n" % peerid
530 self.failUnless(" SDMF contents:\n" in output)
531 self.failUnless(" seqnum: 1\n" in output)
532 self.failUnless(" required_shares: 3\n" in output)
533 self.failUnless(" total_shares: 10\n" in output)
534 self.failUnless(" segsize: 27\n" in output, (output, filename))
535 self.failUnless(" datalen: 25\n" in output)
536 # the exact share_hash_chain nodes depends upon the sharenum,
537 # and is more of a hassle to compute than I want to deal with
539 self.failUnless(" share_hash_chain: " in output)
540 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
541 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
542 base32.b2a(storage_index))
543 self.failUnless(expected in output)
544 except unittest.FailTest:
546 print "dump-share output was:"
549 d.addCallback(_test_debug)
553 # first, let's see if we can use the existing node to retrieve the
554 # contents. This allows it to use the cached pubkey and maybe the
555 # latest-known sharemap.
557 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
558 def _check_download_1(res):
559 self.failUnlessEqual(res, DATA)
560 # now we see if we can retrieve the data from a new node,
561 # constructed using the URI of the original one. We do this test
562 # on the same client that uploaded the data.
563 uri = self._mutable_node_1.get_uri()
564 log.msg("starting retrieve1")
565 newnode = self.clients[0].create_node_from_uri(uri)
566 newnode_2 = self.clients[0].create_node_from_uri(uri)
567 self.failUnlessIdentical(newnode, newnode_2)
568 return newnode.download_best_version()
569 d.addCallback(_check_download_1)
571 def _check_download_2(res):
572 self.failUnlessEqual(res, DATA)
573 # same thing, but with a different client
574 uri = self._mutable_node_1.get_uri()
575 newnode = self.clients[1].create_node_from_uri(uri)
576 log.msg("starting retrieve2")
577 d1 = newnode.download_best_version()
578 d1.addCallback(lambda res: (res, newnode))
580 d.addCallback(_check_download_2)
582 def _check_download_3((res, newnode)):
583 self.failUnlessEqual(res, DATA)
585 log.msg("starting replace1")
586 d1 = newnode.overwrite(NEWDATA)
587 d1.addCallback(lambda res: newnode.download_best_version())
589 d.addCallback(_check_download_3)
591 def _check_download_4(res):
592 self.failUnlessEqual(res, NEWDATA)
593 # now create an even newer node and replace the data on it. This
594 # new node has never been used for download before.
595 uri = self._mutable_node_1.get_uri()
596 newnode1 = self.clients[2].create_node_from_uri(uri)
597 newnode2 = self.clients[3].create_node_from_uri(uri)
598 self._newnode3 = self.clients[3].create_node_from_uri(uri)
599 log.msg("starting replace2")
600 d1 = newnode1.overwrite(NEWERDATA)
601 d1.addCallback(lambda res: newnode2.download_best_version())
603 d.addCallback(_check_download_4)
605 def _check_download_5(res):
606 log.msg("finished replace2")
607 self.failUnlessEqual(res, NEWERDATA)
608 d.addCallback(_check_download_5)
610 def _corrupt_shares(res):
611 # run around and flip bits in all but k of the shares, to test
613 shares = self._find_shares(self.basedir)
614 ## sort by share number
615 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
616 where = dict([ (shnum, filename)
617 for (client_num, storage_index, filename, shnum)
619 assert len(where) == 10 # this test is designed for 3-of-10
620 for shnum, filename in where.items():
621 # shares 7,8,9 are left alone. read will check
622 # (share_hash_chain, block_hash_tree, share_data). New
623 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
624 # segsize, signature).
626 # read: this will trigger "pubkey doesn't match
628 self._corrupt_mutable_share(filename, "pubkey")
629 self._corrupt_mutable_share(filename, "encprivkey")
631 # triggers "signature is invalid"
632 self._corrupt_mutable_share(filename, "seqnum")
634 # triggers "signature is invalid"
635 self._corrupt_mutable_share(filename, "R")
637 # triggers "signature is invalid"
638 self._corrupt_mutable_share(filename, "segsize")
640 self._corrupt_mutable_share(filename, "share_hash_chain")
642 self._corrupt_mutable_share(filename, "block_hash_tree")
644 self._corrupt_mutable_share(filename, "share_data")
645 # other things to correct: IV, signature
646 # 7,8,9 are left alone
648 # note that initial_query_count=5 means that we'll hit the
649 # first 5 servers in effectively random order (based upon
650 # response time), so we won't necessarily ever get a "pubkey
651 # doesn't match fingerprint" error (if we hit shnum>=1 before
652 # shnum=0, we pull the pubkey from there). To get repeatable
653 # specific failures, we need to set initial_query_count=1,
654 # but of course that will change the sequencing behavior of
655 # the retrieval process. TODO: find a reasonable way to make
656 # this a parameter, probably when we expand this test to test
657 # for one failure mode at a time.
659 # when we retrieve this, we should get three signature
660 # failures (where we've mangled seqnum, R, and segsize). The
662 d.addCallback(_corrupt_shares)
664 d.addCallback(lambda res: self._newnode3.download_best_version())
665 d.addCallback(_check_download_5)
667 def _check_empty_file(res):
668 # make sure we can create empty files, this usually screws up the
670 d1 = self.clients[2].create_mutable_file("")
671 d1.addCallback(lambda newnode: newnode.download_best_version())
672 d1.addCallback(lambda res: self.failUnlessEqual("", res))
674 d.addCallback(_check_empty_file)
676 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
677 def _created_dirnode(dnode):
678 log.msg("_created_dirnode(%s)" % (dnode,))
680 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
681 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
682 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
683 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
684 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
685 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
686 d1.addCallback(lambda res: dnode.build_manifest().when_done())
687 d1.addCallback(lambda manifest:
688 self.failUnlessEqual(len(manifest), 1))
690 d.addCallback(_created_dirnode)
692 def wait_for_c3_kg_conn():
693 return self.clients[3]._key_generator is not None
694 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
696 def check_kg_poolsize(junk, size_delta):
697 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
698 self.key_generator_svc.key_generator.pool_size + size_delta)
700 d.addCallback(check_kg_poolsize, 0)
701 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
702 d.addCallback(check_kg_poolsize, -1)
703 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
704 d.addCallback(check_kg_poolsize, -2)
705 # use_helper induces use of clients[3], which is the using-key_gen client
706 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
707 d.addCallback(check_kg_poolsize, -3)
710 # The default 120 second timeout went off when running it under valgrind
711 # on my old Windows laptop, so I'm bumping up the timeout.
712 test_mutable.timeout = 240
714 def flip_bit(self, good):
715 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
717 def mangle_uri(self, gooduri):
718 # change the key, which changes the storage index, which means we'll
719 # be asking about the wrong file, so nobody will have any shares
720 u = IFileURI(gooduri)
721 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
722 uri_extension_hash=u.uri_extension_hash,
723 needed_shares=u.needed_shares,
724 total_shares=u.total_shares,
726 return u2.to_string()
728 # TODO: add a test which mangles the uri_extension_hash instead, and
729 # should fail due to not being able to get a valid uri_extension block.
730 # Also a test which sneakily mangles the uri_extension block to change
731 # some of the validation data, so it will fail in the post-download phase
732 # when the file's crypttext integrity check fails. Do the same thing for
733 # the key, which should cause the download to fail the post-download
734 # plaintext_hash check.
736 def test_vdrive(self):
737 self.basedir = "system/SystemTest/test_vdrive"
738 self.data = LARGE_DATA
739 d = self.set_up_nodes(use_stats_gatherer=True)
740 d.addCallback(self._test_introweb)
741 d.addCallback(self.log, "starting publish")
742 d.addCallback(self._do_publish1)
743 d.addCallback(self._test_runner)
744 d.addCallback(self._do_publish2)
745 # at this point, we have the following filesystem (where "R" denotes
746 # self._root_directory_uri):
749 # R/subdir1/mydata567
751 # R/subdir1/subdir2/mydata992
753 d.addCallback(lambda res: self.bounce_client(0))
754 d.addCallback(self.log, "bounced client0")
756 d.addCallback(self._check_publish1)
757 d.addCallback(self.log, "did _check_publish1")
758 d.addCallback(self._check_publish2)
759 d.addCallback(self.log, "did _check_publish2")
760 d.addCallback(self._do_publish_private)
761 d.addCallback(self.log, "did _do_publish_private")
762 # now we also have (where "P" denotes a new dir):
763 # P/personal/sekrit data
764 # P/s2-rw -> /subdir1/subdir2/
765 # P/s2-ro -> /subdir1/subdir2/ (read-only)
766 d.addCallback(self._check_publish_private)
767 d.addCallback(self.log, "did _check_publish_private")
768 d.addCallback(self._test_web)
769 d.addCallback(self._test_control)
770 d.addCallback(self._test_cli)
771 # P now has four top-level children:
772 # P/personal/sekrit data
775 # P/test_put/ (empty)
776 d.addCallback(self._test_checker)
777 d.addCallback(self._grab_stats)
779 test_vdrive.timeout = 1100
781 def _test_introweb(self, res):
782 d = getPage(self.introweb_url, method="GET", followRedirect=True)
785 self.failUnless("allmydata: %s" % str(allmydata.__version__)
787 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
788 self.failUnless("Subscription Summary: storage: 5" in res)
789 except unittest.FailTest:
791 print "GET %s output was:" % self.introweb_url
794 d.addCallback(_check)
795 d.addCallback(lambda res:
796 getPage(self.introweb_url + "?t=json",
797 method="GET", followRedirect=True))
798 def _check_json(res):
799 data = simplejson.loads(res)
801 self.failUnlessEqual(data["subscription_summary"],
803 self.failUnlessEqual(data["announcement_summary"],
804 {"storage": 5, "stub_client": 5})
805 except unittest.FailTest:
807 print "GET %s?t=json output was:" % self.introweb_url
810 d.addCallback(_check_json)
813 def _do_publish1(self, res):
814 ut = upload.Data(self.data, convergence=None)
816 d = c0.create_empty_dirnode()
817 def _made_root(new_dirnode):
818 self._root_directory_uri = new_dirnode.get_uri()
819 return c0.create_node_from_uri(self._root_directory_uri)
820 d.addCallback(_made_root)
821 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
822 def _made_subdir1(subdir1_node):
823 self._subdir1_node = subdir1_node
824 d1 = subdir1_node.add_file(u"mydata567", ut)
825 d1.addCallback(self.log, "publish finished")
826 def _stash_uri(filenode):
827 self.uri = filenode.get_uri()
828 d1.addCallback(_stash_uri)
830 d.addCallback(_made_subdir1)
833 def _do_publish2(self, res):
834 ut = upload.Data(self.data, convergence=None)
835 d = self._subdir1_node.create_empty_directory(u"subdir2")
836 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
839 def log(self, res, *args, **kwargs):
840 # print "MSG: %s RES: %s" % (msg, args)
841 log.msg(*args, **kwargs)
844 def _do_publish_private(self, res):
845 self.smalldata = "sssh, very secret stuff"
846 ut = upload.Data(self.smalldata, convergence=None)
847 d = self.clients[0].create_empty_dirnode()
848 d.addCallback(self.log, "GOT private directory")
849 def _got_new_dir(privnode):
850 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
851 d1 = privnode.create_empty_directory(u"personal")
852 d1.addCallback(self.log, "made P/personal")
853 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
854 d1.addCallback(self.log, "made P/personal/sekrit data")
855 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
857 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
858 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
860 d1.addCallback(_got_s2)
861 d1.addCallback(lambda res: privnode)
863 d.addCallback(_got_new_dir)
866 def _check_publish1(self, res):
867 # this one uses the iterative API
869 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
870 d.addCallback(self.log, "check_publish1 got /")
871 d.addCallback(lambda root: root.get(u"subdir1"))
872 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
873 d.addCallback(lambda filenode: filenode.download_to_data())
874 d.addCallback(self.log, "get finished")
876 self.failUnlessEqual(data, self.data)
877 d.addCallback(_get_done)
880 def _check_publish2(self, res):
881 # this one uses the path-based API
882 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
883 d = rootnode.get_child_at_path(u"subdir1")
884 d.addCallback(lambda dirnode:
885 self.failUnless(IDirectoryNode.providedBy(dirnode)))
886 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
887 d.addCallback(lambda filenode: filenode.download_to_data())
888 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
890 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
891 def _got_filenode(filenode):
892 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
893 assert fnode == filenode
894 d.addCallback(_got_filenode)
897 def _check_publish_private(self, resnode):
898 # this one uses the path-based API
899 self._private_node = resnode
901 d = self._private_node.get_child_at_path(u"personal")
902 def _got_personal(personal):
903 self._personal_node = personal
905 d.addCallback(_got_personal)
907 d.addCallback(lambda dirnode:
908 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
910 return self._private_node.get_child_at_path(path)
912 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
913 d.addCallback(lambda filenode: filenode.download_to_data())
914 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
915 d.addCallback(lambda res: get_path(u"s2-rw"))
916 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
917 d.addCallback(lambda res: get_path(u"s2-ro"))
918 def _got_s2ro(dirnode):
919 self.failUnless(dirnode.is_mutable(), dirnode)
920 self.failUnless(dirnode.is_readonly(), dirnode)
921 d1 = defer.succeed(None)
922 d1.addCallback(lambda res: dirnode.list())
923 d1.addCallback(self.log, "dirnode.list")
925 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
927 d1.addCallback(self.log, "doing add_file(ro)")
928 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
929 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
931 d1.addCallback(self.log, "doing get(ro)")
932 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
933 d1.addCallback(lambda filenode:
934 self.failUnless(IFileNode.providedBy(filenode)))
936 d1.addCallback(self.log, "doing delete(ro)")
937 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
939 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
941 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
943 personal = self._personal_node
944 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
946 d1.addCallback(self.log, "doing move_child_to(ro)2")
947 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
949 d1.addCallback(self.log, "finished with _got_s2ro")
951 d.addCallback(_got_s2ro)
952 def _got_home(dummy):
953 home = self._private_node
954 personal = self._personal_node
955 d1 = defer.succeed(None)
956 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
957 d1.addCallback(lambda res:
958 personal.move_child_to(u"sekrit data",home,u"sekrit"))
960 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
961 d1.addCallback(lambda res:
962 home.move_child_to(u"sekrit", home, u"sekrit data"))
964 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
965 d1.addCallback(lambda res:
966 home.move_child_to(u"sekrit data", personal))
968 d1.addCallback(lambda res: home.build_manifest().when_done())
969 d1.addCallback(self.log, "manifest")
973 # P/personal/sekrit data
974 # P/s2-rw (same as P/s2-ro)
975 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
976 d1.addCallback(lambda manifest:
977 self.failUnlessEqual(len(manifest), 5))
978 d1.addCallback(lambda res: home.start_deep_stats().when_done())
979 def _check_stats(stats):
980 expected = {"count-immutable-files": 1,
981 "count-mutable-files": 0,
982 "count-literal-files": 1,
984 "count-directories": 3,
985 "size-immutable-files": 112,
986 "size-literal-files": 23,
987 #"size-directories": 616, # varies
988 #"largest-directory": 616,
989 "largest-directory-children": 3,
990 "largest-immutable-file": 112,
992 for k,v in expected.iteritems():
993 self.failUnlessEqual(stats[k], v,
994 "stats[%s] was %s, not %s" %
996 self.failUnless(stats["size-directories"] > 1300,
997 stats["size-directories"])
998 self.failUnless(stats["largest-directory"] > 800,
999 stats["largest-directory"])
1000 self.failUnlessEqual(stats["size-files-histogram"],
1001 [ (11, 31, 1), (101, 316, 1) ])
1002 d1.addCallback(_check_stats)
1004 d.addCallback(_got_home)
1007 def shouldFail(self, res, expected_failure, which, substring=None):
1008 if isinstance(res, Failure):
1009 res.trap(expected_failure)
1011 self.failUnless(substring in str(res),
1012 "substring '%s' not in '%s'"
1013 % (substring, str(res)))
1015 self.fail("%s was supposed to raise %s, not get '%s'" %
1016 (which, expected_failure, res))
1018 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1019 assert substring is None or isinstance(substring, str)
1020 d = defer.maybeDeferred(callable, *args, **kwargs)
1022 if isinstance(res, Failure):
1023 res.trap(expected_failure)
1025 self.failUnless(substring in str(res),
1026 "substring '%s' not in '%s'"
1027 % (substring, str(res)))
1029 self.fail("%s was supposed to raise %s, not get '%s'" %
1030 (which, expected_failure, res))
1034 def PUT(self, urlpath, data):
1035 url = self.webish_url + urlpath
1036 return getPage(url, method="PUT", postdata=data)
1038 def GET(self, urlpath, followRedirect=False):
1039 url = self.webish_url + urlpath
1040 return getPage(url, method="GET", followRedirect=followRedirect)
1042 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1044 url = self.helper_webish_url + urlpath
1046 url = self.webish_url + urlpath
1047 sepbase = "boogabooga"
1048 sep = "--" + sepbase
1051 form.append('Content-Disposition: form-data; name="_charset"')
1053 form.append('UTF-8')
1055 for name, value in fields.iteritems():
1056 if isinstance(value, tuple):
1057 filename, value = value
1058 form.append('Content-Disposition: form-data; name="%s"; '
1059 'filename="%s"' % (name, filename.encode("utf-8")))
1061 form.append('Content-Disposition: form-data; name="%s"' % name)
1063 form.append(str(value))
1066 body = "\r\n".join(form) + "\r\n"
1067 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1069 return getPage(url, method="POST", postdata=body,
1070 headers=headers, followRedirect=followRedirect)
1072 def _test_web(self, res):
1073 base = self.webish_url
1074 public = "uri/" + self._root_directory_uri
1076 def _got_welcome(page):
1077 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1078 self.failUnless(expected in page,
1079 "I didn't see the right 'connected storage servers'"
1080 " message in: %s" % page
1082 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1083 self.failUnless(expected in page,
1084 "I didn't see the right 'My nodeid' message "
1086 self.failUnless("Helper: 0 active uploads" in page)
1087 d.addCallback(_got_welcome)
1088 d.addCallback(self.log, "done with _got_welcome")
1090 # get the welcome page from the node that uses the helper too
1091 d.addCallback(lambda res: getPage(self.helper_webish_url))
1092 def _got_welcome_helper(page):
1093 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1095 self.failUnless("Not running helper" in page)
1096 d.addCallback(_got_welcome_helper)
1098 d.addCallback(lambda res: getPage(base + public))
1099 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1100 def _got_subdir1(page):
1101 # there ought to be an href for our file
1102 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1103 self.failUnless(">mydata567</a>" in page)
1104 d.addCallback(_got_subdir1)
1105 d.addCallback(self.log, "done with _got_subdir1")
1106 d.addCallback(lambda res:
1107 getPage(base + public + "/subdir1/mydata567"))
1108 def _got_data(page):
1109 self.failUnlessEqual(page, self.data)
1110 d.addCallback(_got_data)
1112 # download from a URI embedded in a URL
1113 d.addCallback(self.log, "_get_from_uri")
1114 def _get_from_uri(res):
1115 return getPage(base + "uri/%s?filename=%s"
1116 % (self.uri, "mydata567"))
1117 d.addCallback(_get_from_uri)
1118 def _got_from_uri(page):
1119 self.failUnlessEqual(page, self.data)
1120 d.addCallback(_got_from_uri)
1122 # download from a URI embedded in a URL, second form
1123 d.addCallback(self.log, "_get_from_uri2")
1124 def _get_from_uri2(res):
1125 return getPage(base + "uri?uri=%s" % (self.uri,))
1126 d.addCallback(_get_from_uri2)
1127 d.addCallback(_got_from_uri)
1129 # download from a bogus URI, make sure we get a reasonable error
1130 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1131 def _get_from_bogus_uri(res):
1132 d1 = getPage(base + "uri/%s?filename=%s"
1133 % (self.mangle_uri(self.uri), "mydata567"))
1134 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1137 d.addCallback(_get_from_bogus_uri)
1138 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1140 # upload a file with PUT
1141 d.addCallback(self.log, "about to try PUT")
1142 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1143 "new.txt contents"))
1144 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1145 d.addCallback(self.failUnlessEqual, "new.txt contents")
1146 # and again with something large enough to use multiple segments,
1147 # and hopefully trigger pauseProducing too
1148 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1149 "big" * 500000)) # 1.5MB
1150 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1151 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1153 # can we replace files in place?
1154 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1156 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1157 d.addCallback(self.failUnlessEqual, "NEWER contents")
1159 # test unlinked POST
1160 d.addCallback(lambda res: self.POST("uri", t="upload",
1161 file=("new.txt", "data" * 10000)))
1162 # and again using the helper, which exercises different upload-status
1164 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1165 file=("foo.txt", "data2" * 10000)))
1167 # check that the status page exists
1168 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1169 def _got_status(res):
1170 # find an interesting upload and download to look at. LIT files
1171 # are not interesting.
1172 for ds in self.clients[0].list_all_download_statuses():
1173 if ds.get_size() > 200:
1174 self._down_status = ds.get_counter()
1175 for us in self.clients[0].list_all_upload_statuses():
1176 if us.get_size() > 200:
1177 self._up_status = us.get_counter()
1178 rs = self.clients[0].list_all_retrieve_statuses()[0]
1179 self._retrieve_status = rs.get_counter()
1180 ps = self.clients[0].list_all_publish_statuses()[0]
1181 self._publish_status = ps.get_counter()
1182 us = self.clients[0].list_all_mapupdate_statuses()[0]
1183 self._update_status = us.get_counter()
1185 # and that there are some upload- and download- status pages
1186 return self.GET("status/up-%d" % self._up_status)
1187 d.addCallback(_got_status)
1189 return self.GET("status/down-%d" % self._down_status)
1190 d.addCallback(_got_up)
1192 return self.GET("status/mapupdate-%d" % self._update_status)
1193 d.addCallback(_got_down)
1194 def _got_update(res):
1195 return self.GET("status/publish-%d" % self._publish_status)
1196 d.addCallback(_got_update)
1197 def _got_publish(res):
1198 return self.GET("status/retrieve-%d" % self._retrieve_status)
1199 d.addCallback(_got_publish)
1201 # check that the helper status page exists
1202 d.addCallback(lambda res:
1203 self.GET("helper_status", followRedirect=True))
1204 def _got_helper_status(res):
1205 self.failUnless("Bytes Fetched:" in res)
1206 # touch a couple of files in the helper's working directory to
1207 # exercise more code paths
1208 workdir = os.path.join(self.getdir("client0"), "helper")
1209 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1210 f = open(incfile, "wb")
1211 f.write("small file")
1213 then = time.time() - 86400*3
1215 os.utime(incfile, (now, then))
1216 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1217 f = open(encfile, "wb")
1218 f.write("less small file")
1220 os.utime(encfile, (now, then))
1221 d.addCallback(_got_helper_status)
1222 # and that the json form exists
1223 d.addCallback(lambda res:
1224 self.GET("helper_status?t=json", followRedirect=True))
1225 def _got_helper_status_json(res):
1226 data = simplejson.loads(res)
1227 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1229 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1230 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1231 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1233 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1234 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1235 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1237 d.addCallback(_got_helper_status_json)
1239 # and check that client[3] (which uses a helper but does not run one
1240 # itself) doesn't explode when you ask for its status
1241 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1242 def _got_non_helper_status(res):
1243 self.failUnless("Upload and Download Status" in res)
1244 d.addCallback(_got_non_helper_status)
1246 # or for helper status with t=json
1247 d.addCallback(lambda res:
1248 getPage(self.helper_webish_url + "helper_status?t=json"))
1249 def _got_non_helper_status_json(res):
1250 data = simplejson.loads(res)
1251 self.failUnlessEqual(data, {})
1252 d.addCallback(_got_non_helper_status_json)
1254 # see if the statistics page exists
1255 d.addCallback(lambda res: self.GET("statistics"))
1256 def _got_stats(res):
1257 self.failUnless("Node Statistics" in res)
1258 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1259 d.addCallback(_got_stats)
1260 d.addCallback(lambda res: self.GET("statistics?t=json"))
1261 def _got_stats_json(res):
1262 data = simplejson.loads(res)
1263 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1264 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1265 d.addCallback(_got_stats_json)
1267 # TODO: mangle the second segment of a file, to test errors that
1268 # occur after we've already sent some good data, which uses a
1269 # different error path.
1271 # TODO: download a URI with a form
1272 # TODO: create a directory by using a form
1273 # TODO: upload by using a form on the directory page
1274 # url = base + "somedir/subdir1/freeform_post!!upload"
1275 # TODO: delete a file by using a button on the directory page
1279 def _test_runner(self, res):
1280 # exercise some of the diagnostic tools in runner.py
1283 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1284 if "storage" not in dirpath:
1288 pieces = dirpath.split(os.sep)
1289 if pieces[-4] == "storage" and pieces[-3] == "shares":
1290 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1291 # are sharefiles here
1292 filename = os.path.join(dirpath, filenames[0])
1293 # peek at the magic to see if it is a chk share
1294 magic = open(filename, "rb").read(4)
1295 if magic == '\x00\x00\x00\x01':
1298 self.fail("unable to find any uri_extension files in %s"
1300 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1302 out,err = StringIO(), StringIO()
1303 rc = runner.runner(["debug", "dump-share", "--offsets",
1305 stdout=out, stderr=err)
1306 output = out.getvalue()
1307 self.failUnlessEqual(rc, 0)
1309 # we only upload a single file, so we can assert some things about
1310 # its size and shares.
1311 self.failUnless(("share filename: %s" % filename) in output)
1312 self.failUnless("size: %d\n" % len(self.data) in output)
1313 self.failUnless("num_segments: 1\n" in output)
1314 # segment_size is always a multiple of needed_shares
1315 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1316 self.failUnless("total_shares: 10\n" in output)
1317 # keys which are supposed to be present
1318 for key in ("size", "num_segments", "segment_size",
1319 "needed_shares", "total_shares",
1320 "codec_name", "codec_params", "tail_codec_params",
1321 #"plaintext_hash", "plaintext_root_hash",
1322 "crypttext_hash", "crypttext_root_hash",
1323 "share_root_hash", "UEB_hash"):
1324 self.failUnless("%s: " % key in output, key)
1325 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1327 # now use its storage index to find the other shares using the
1328 # 'find-shares' tool
1329 sharedir, shnum = os.path.split(filename)
1330 storagedir, storage_index_s = os.path.split(sharedir)
1331 out,err = StringIO(), StringIO()
1332 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1333 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1334 rc = runner.runner(cmd, stdout=out, stderr=err)
1335 self.failUnlessEqual(rc, 0)
1337 sharefiles = [sfn.strip() for sfn in out.readlines()]
1338 self.failUnlessEqual(len(sharefiles), 10)
1340 # also exercise the 'catalog-shares' tool
1341 out,err = StringIO(), StringIO()
1342 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1343 cmd = ["debug", "catalog-shares"] + nodedirs
1344 rc = runner.runner(cmd, stdout=out, stderr=err)
1345 self.failUnlessEqual(rc, 0)
1347 descriptions = [sfn.strip() for sfn in out.readlines()]
1348 self.failUnlessEqual(len(descriptions), 30)
1350 for line in descriptions
1351 if line.startswith("CHK %s " % storage_index_s)]
1352 self.failUnlessEqual(len(matching), 10)
1354 def _test_control(self, res):
1355 # exercise the remote-control-the-client foolscap interfaces in
1356 # allmydata.control (mostly used for performance tests)
1357 c0 = self.clients[0]
1358 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1359 control_furl = open(control_furl_file, "r").read().strip()
1360 # it doesn't really matter which Tub we use to connect to the client,
1361 # so let's just use our IntroducerNode's
1362 d = self.introducer.tub.getReference(control_furl)
1363 d.addCallback(self._test_control2, control_furl_file)
1365 def _test_control2(self, rref, filename):
1366 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1367 downfile = os.path.join(self.basedir, "control.downfile")
1368 d.addCallback(lambda uri:
1369 rref.callRemote("download_from_uri_to_file",
1372 self.failUnlessEqual(res, downfile)
1373 data = open(downfile, "r").read()
1374 expected_data = open(filename, "r").read()
1375 self.failUnlessEqual(data, expected_data)
1376 d.addCallback(_check)
1377 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1378 if sys.platform == "linux2":
1379 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1380 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1383 def _test_cli(self, res):
1384 # run various CLI commands (in a thread, since they use blocking
1387 private_uri = self._private_node.get_uri()
1388 some_uri = self._root_directory_uri
1389 client0_basedir = self.getdir("client0")
1392 "--node-directory", client0_basedir,
1394 TESTDATA = "I will not write the same thing over and over.\n" * 100
1396 d = defer.succeed(None)
1398 # for compatibility with earlier versions, private/root_dir.cap is
1399 # supposed to be treated as an alias named "tahoe:". Start by making
1400 # sure that works, before we add other aliases.
1402 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1403 f = open(root_file, "w")
1404 f.write(private_uri)
1407 def run(ignored, verb, *args, **kwargs):
1408 stdin = kwargs.get("stdin", "")
1409 newargs = [verb] + nodeargs + list(args)
1410 return self._run_cli(newargs, stdin=stdin)
1412 def _check_ls((out,err), expected_children, unexpected_children=[]):
1413 self.failUnlessEqual(err, "")
1414 for s in expected_children:
1415 self.failUnless(s in out, (s,out))
1416 for s in unexpected_children:
1417 self.failIf(s in out, (s,out))
1419 def _check_ls_root((out,err)):
1420 self.failUnless("personal" in out)
1421 self.failUnless("s2-ro" in out)
1422 self.failUnless("s2-rw" in out)
1423 self.failUnlessEqual(err, "")
1425 # this should reference private_uri
1426 d.addCallback(run, "ls")
1427 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1429 d.addCallback(run, "list-aliases")
1430 def _check_aliases_1((out,err)):
1431 self.failUnlessEqual(err, "")
1432 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1433 d.addCallback(_check_aliases_1)
1435 # now that that's out of the way, remove root_dir.cap and work with
1437 d.addCallback(lambda res: os.unlink(root_file))
1438 d.addCallback(run, "list-aliases")
1439 def _check_aliases_2((out,err)):
1440 self.failUnlessEqual(err, "")
1441 self.failUnlessEqual(out, "")
1442 d.addCallback(_check_aliases_2)
1444 d.addCallback(run, "mkdir")
1445 def _got_dir( (out,err) ):
1446 self.failUnless(uri.from_string_dirnode(out.strip()))
1448 d.addCallback(_got_dir)
1449 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1451 d.addCallback(run, "list-aliases")
1452 def _check_aliases_3((out,err)):
1453 self.failUnlessEqual(err, "")
1454 self.failUnless("tahoe: " in out)
1455 d.addCallback(_check_aliases_3)
1457 def _check_empty_dir((out,err)):
1458 self.failUnlessEqual(out, "")
1459 self.failUnlessEqual(err, "")
1460 d.addCallback(run, "ls")
1461 d.addCallback(_check_empty_dir)
1463 def _check_missing_dir((out,err)):
1464 # TODO: check that rc==2
1465 self.failUnlessEqual(out, "")
1466 self.failUnlessEqual(err, "No such file or directory\n")
1467 d.addCallback(run, "ls", "bogus")
1468 d.addCallback(_check_missing_dir)
1473 fn = os.path.join(self.basedir, "file%d" % i)
1475 data = "data to be uploaded: file%d\n" % i
1477 open(fn,"wb").write(data)
1479 def _check_stdout_against((out,err), filenum=None, data=None):
1480 self.failUnlessEqual(err, "")
1481 if filenum is not None:
1482 self.failUnlessEqual(out, datas[filenum])
1483 if data is not None:
1484 self.failUnlessEqual(out, data)
1486 # test all both forms of put: from a file, and from stdin
1488 d.addCallback(run, "put", files[0], "tahoe-file0")
1489 def _put_out((out,err)):
1490 self.failUnless("URI:LIT:" in out, out)
1491 self.failUnless("201 Created" in err, err)
1493 return run(None, "get", uri0)
1494 d.addCallback(_put_out)
1495 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1497 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1498 # tahoe put bar tahoe:FOO
1499 d.addCallback(run, "put", files[2], "tahoe:file2")
1500 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1501 def _check_put_mutable((out,err)):
1502 self._mutable_file3_uri = out.strip()
1503 d.addCallback(_check_put_mutable)
1504 d.addCallback(run, "get", "tahoe:file3")
1505 d.addCallback(_check_stdout_against, 3)
1508 STDIN_DATA = "This is the file to upload from stdin."
1509 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1510 # tahoe put tahoe:FOO
1511 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1512 stdin="Other file from stdin.")
1514 d.addCallback(run, "ls")
1515 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1516 "tahoe-file-stdin", "from-stdin"])
1517 d.addCallback(run, "ls", "subdir")
1518 d.addCallback(_check_ls, ["tahoe-file1"])
1521 d.addCallback(run, "mkdir", "subdir2")
1522 d.addCallback(run, "ls")
1523 # TODO: extract the URI, set an alias with it
1524 d.addCallback(_check_ls, ["subdir2"])
1526 # tahoe get: (to stdin and to a file)
1527 d.addCallback(run, "get", "tahoe-file0")
1528 d.addCallback(_check_stdout_against, 0)
1529 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1530 d.addCallback(_check_stdout_against, 1)
1531 outfile0 = os.path.join(self.basedir, "outfile0")
1532 d.addCallback(run, "get", "file2", outfile0)
1533 def _check_outfile0((out,err)):
1534 data = open(outfile0,"rb").read()
1535 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1536 d.addCallback(_check_outfile0)
1537 outfile1 = os.path.join(self.basedir, "outfile0")
1538 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1539 def _check_outfile1((out,err)):
1540 data = open(outfile1,"rb").read()
1541 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1542 d.addCallback(_check_outfile1)
1544 d.addCallback(run, "rm", "tahoe-file0")
1545 d.addCallback(run, "rm", "tahoe:file2")
1546 d.addCallback(run, "ls")
1547 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1549 d.addCallback(run, "ls", "-l")
1550 def _check_ls_l((out,err)):
1551 lines = out.split("\n")
1553 if "tahoe-file-stdin" in l:
1554 self.failUnless(l.startswith("-r-- "), l)
1555 self.failUnless(" %d " % len(STDIN_DATA) in l)
1557 self.failUnless(l.startswith("-rw- "), l) # mutable
1558 d.addCallback(_check_ls_l)
1560 d.addCallback(run, "ls", "--uri")
1561 def _check_ls_uri((out,err)):
1562 lines = out.split("\n")
1565 self.failUnless(self._mutable_file3_uri in l)
1566 d.addCallback(_check_ls_uri)
1568 d.addCallback(run, "ls", "--readonly-uri")
1569 def _check_ls_rouri((out,err)):
1570 lines = out.split("\n")
1573 rw_uri = self._mutable_file3_uri
1574 u = uri.from_string_mutable_filenode(rw_uri)
1575 ro_uri = u.get_readonly().to_string()
1576 self.failUnless(ro_uri in l)
1577 d.addCallback(_check_ls_rouri)
1580 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1581 d.addCallback(run, "ls")
1582 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1584 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1585 d.addCallback(run, "ls")
1586 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1588 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1589 d.addCallback(run, "ls")
1590 d.addCallback(_check_ls, ["file3", "file3-copy"])
1591 d.addCallback(run, "get", "tahoe:file3-copy")
1592 d.addCallback(_check_stdout_against, 3)
1594 # copy from disk into tahoe
1595 d.addCallback(run, "cp", files[4], "tahoe:file4")
1596 d.addCallback(run, "ls")
1597 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1598 d.addCallback(run, "get", "tahoe:file4")
1599 d.addCallback(_check_stdout_against, 4)
1601 # copy from tahoe into disk
1602 target_filename = os.path.join(self.basedir, "file-out")
1603 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1604 def _check_cp_out((out,err)):
1605 self.failUnless(os.path.exists(target_filename))
1606 got = open(target_filename,"rb").read()
1607 self.failUnlessEqual(got, datas[4])
1608 d.addCallback(_check_cp_out)
1610 # copy from disk to disk (silly case)
1611 target2_filename = os.path.join(self.basedir, "file-out-copy")
1612 d.addCallback(run, "cp", target_filename, target2_filename)
1613 def _check_cp_out2((out,err)):
1614 self.failUnless(os.path.exists(target2_filename))
1615 got = open(target2_filename,"rb").read()
1616 self.failUnlessEqual(got, datas[4])
1617 d.addCallback(_check_cp_out2)
1619 # copy from tahoe into disk, overwriting an existing file
1620 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1621 def _check_cp_out3((out,err)):
1622 self.failUnless(os.path.exists(target_filename))
1623 got = open(target_filename,"rb").read()
1624 self.failUnlessEqual(got, datas[3])
1625 d.addCallback(_check_cp_out3)
1627 # copy from disk into tahoe, overwriting an existing immutable file
1628 d.addCallback(run, "cp", files[5], "tahoe:file4")
1629 d.addCallback(run, "ls")
1630 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1631 d.addCallback(run, "get", "tahoe:file4")
1632 d.addCallback(_check_stdout_against, 5)
1634 # copy from disk into tahoe, overwriting an existing mutable file
1635 d.addCallback(run, "cp", files[5], "tahoe:file3")
1636 d.addCallback(run, "ls")
1637 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1638 d.addCallback(run, "get", "tahoe:file3")
1639 d.addCallback(_check_stdout_against, 5)
1641 # recursive copy: setup
1642 dn = os.path.join(self.basedir, "dir1")
1644 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1645 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1646 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1647 sdn2 = os.path.join(dn, "subdir2")
1649 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1650 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1652 # from disk into tahoe
1653 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1654 d.addCallback(run, "ls")
1655 d.addCallback(_check_ls, ["dir1"])
1656 d.addCallback(run, "ls", "dir1")
1657 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1658 ["rfile4", "rfile5"])
1659 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1660 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1661 ["rfile1", "rfile2", "rfile3"])
1662 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1663 d.addCallback(_check_stdout_against, data="rfile4")
1665 # and back out again
1666 dn_copy = os.path.join(self.basedir, "dir1-copy")
1667 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1668 def _check_cp_r_out((out,err)):
1670 old = open(os.path.join(dn, name), "rb").read()
1671 newfn = os.path.join(dn_copy, name)
1672 self.failUnless(os.path.exists(newfn))
1673 new = open(newfn, "rb").read()
1674 self.failUnlessEqual(old, new)
1678 _cmp(os.path.join("subdir2", "rfile4"))
1679 _cmp(os.path.join("subdir2", "rfile5"))
1680 d.addCallback(_check_cp_r_out)
1682 # and copy it a second time, which ought to overwrite the same files
1683 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1685 # and tahoe-to-tahoe
1686 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1687 d.addCallback(run, "ls")
1688 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1689 d.addCallback(run, "ls", "dir1-copy")
1690 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1691 ["rfile4", "rfile5"])
1692 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1693 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1694 ["rfile1", "rfile2", "rfile3"])
1695 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1696 d.addCallback(_check_stdout_against, data="rfile4")
1698 # and copy it a second time, which ought to overwrite the same files
1699 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1701 # tahoe_ls doesn't currently handle the error correctly: it tries to
1702 # JSON-parse a traceback.
1703 ## def _ls_missing(res):
1704 ## argv = ["ls"] + nodeargs + ["bogus"]
1705 ## return self._run_cli(argv)
1706 ## d.addCallback(_ls_missing)
1707 ## def _check_ls_missing((out,err)):
1710 ## self.failUnlessEqual(err, "")
1711 ## d.addCallback(_check_ls_missing)
1715 def _run_cli(self, argv, stdin=""):
1717 stdout, stderr = StringIO(), StringIO()
1718 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1719 stdin=StringIO(stdin),
1720 stdout=stdout, stderr=stderr)
1722 return stdout.getvalue(), stderr.getvalue()
1723 d.addCallback(_done)
1726 def _test_checker(self, res):
1727 ut = upload.Data("too big to be literal" * 200, convergence=None)
1728 d = self._personal_node.add_file(u"big file", ut)
1730 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1731 def _check_dirnode_results(r):
1732 self.failUnless(r.is_healthy())
1733 d.addCallback(_check_dirnode_results)
1734 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1735 d.addCallback(_check_dirnode_results)
1737 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1738 def _got_chk_filenode(n):
1739 self.failUnless(isinstance(n, filenode.FileNode))
1740 d = n.check(Monitor())
1741 def _check_filenode_results(r):
1742 self.failUnless(r.is_healthy())
1743 d.addCallback(_check_filenode_results)
1744 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1745 d.addCallback(_check_filenode_results)
1747 d.addCallback(_got_chk_filenode)
1749 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1750 def _got_lit_filenode(n):
1751 self.failUnless(isinstance(n, filenode.LiteralFileNode))
1752 d = n.check(Monitor())
1753 def _check_lit_filenode_results(r):
1754 self.failUnlessEqual(r, None)
1755 d.addCallback(_check_lit_filenode_results)
1756 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1757 d.addCallback(_check_lit_filenode_results)
1759 d.addCallback(_got_lit_filenode)
1763 class MutableChecker(SystemTestMixin, unittest.TestCase, ErrorMixin):
1765 def _run_cli(self, argv):
1766 stdout, stderr = StringIO(), StringIO()
1767 # this can only do synchronous operations
1768 assert argv[0] == "debug"
1769 runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1770 return stdout.getvalue()
1772 def test_good(self):
1773 self.basedir = self.mktemp()
1774 d = self.set_up_nodes()
1775 CONTENTS = "a little bit of data"
1776 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1779 si = self.node.get_storage_index()
1780 d.addCallback(_created)
1781 # now make sure the webapi verifier sees no problems
1783 url = (self.webish_url +
1784 "uri/%s" % urllib.quote(self.node.get_uri()) +
1785 "?t=check&verify=true")
1786 return getPage(url, method="POST")
1787 d.addCallback(_do_check)
1788 def _got_results(out):
1789 self.failUnless("<span>Healthy : Healthy</span>" in out, out)
1790 self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1791 self.failIf("Not Healthy!" in out, out)
1792 self.failIf("Unhealthy" in out, out)
1793 self.failIf("Corrupt Shares" in out, out)
1794 d.addCallback(_got_results)
1795 d.addErrback(self.explain_web_error)
1798 def test_corrupt(self):
1799 self.basedir = self.mktemp()
1800 d = self.set_up_nodes()
1801 CONTENTS = "a little bit of data"
1802 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1805 si = self.node.get_storage_index()
1806 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1807 self.clients[1].basedir])
1808 files = out.split("\n")
1809 # corrupt one of them, using the CLI debug command
1811 shnum = os.path.basename(f)
1812 nodeid = self.clients[1].nodeid
1813 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1814 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1815 out = self._run_cli(["debug", "corrupt-share", files[0]])
1816 d.addCallback(_created)
1817 # now make sure the webapi verifier notices it
1819 url = (self.webish_url +
1820 "uri/%s" % urllib.quote(self.node.get_uri()) +
1821 "?t=check&verify=true")
1822 return getPage(url, method="POST")
1823 d.addCallback(_do_check)
1824 def _got_results(out):
1825 self.failUnless("Not Healthy!" in out, out)
1826 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1827 self.failUnless("Corrupt Shares:" in out, out)
1828 d.addCallback(_got_results)
1830 # now make sure the webapi repairer can fix it
1831 def _do_repair(res):
1832 url = (self.webish_url +
1833 "uri/%s" % urllib.quote(self.node.get_uri()) +
1834 "?t=check&verify=true&repair=true")
1835 return getPage(url, method="POST")
1836 d.addCallback(_do_repair)
1837 def _got_repair_results(out):
1838 self.failUnless("<div>Repair successful</div>" in out, out)
1839 d.addCallback(_got_repair_results)
1840 d.addCallback(_do_check)
1841 def _got_postrepair_results(out):
1842 self.failIf("Not Healthy!" in out, out)
1843 self.failUnless("Recoverable Versions: 10*seq" in out, out)
1844 d.addCallback(_got_postrepair_results)
1845 d.addErrback(self.explain_web_error)
1849 def test_delete_share(self):
1850 self.basedir = self.mktemp()
1851 d = self.set_up_nodes()
1852 CONTENTS = "a little bit of data"
1853 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1856 si = self.node.get_storage_index()
1857 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1858 self.clients[1].basedir])
1859 files = out.split("\n")
1860 # corrupt one of them, using the CLI debug command
1862 shnum = os.path.basename(f)
1863 nodeid = self.clients[1].nodeid
1864 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1865 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1867 d.addCallback(_created)
1868 # now make sure the webapi checker notices it
1870 url = (self.webish_url +
1871 "uri/%s" % urllib.quote(self.node.get_uri()) +
1872 "?t=check&verify=false")
1873 return getPage(url, method="POST")
1874 d.addCallback(_do_check)
1875 def _got_results(out):
1876 self.failUnless("Not Healthy!" in out, out)
1877 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1878 self.failIf("Corrupt Shares" in out, out)
1879 d.addCallback(_got_results)
1881 # now make sure the webapi repairer can fix it
1882 def _do_repair(res):
1883 url = (self.webish_url +
1884 "uri/%s" % urllib.quote(self.node.get_uri()) +
1885 "?t=check&verify=false&repair=true")
1886 return getPage(url, method="POST")
1887 d.addCallback(_do_repair)
1888 def _got_repair_results(out):
1889 self.failUnless("Repair successful" in out)
1890 d.addCallback(_got_repair_results)
1891 d.addCallback(_do_check)
1892 def _got_postrepair_results(out):
1893 self.failIf("Not Healthy!" in out, out)
1894 self.failUnless("Recoverable Versions: 10*seq" in out)
1895 d.addCallback(_got_postrepair_results)
1896 d.addErrback(self.explain_web_error)
1901 class DeepCheckBase(SystemTestMixin, ErrorMixin):
1903 def web_json(self, n, **kwargs):
1904 kwargs["output"] = "json"
1905 d = self.web(n, "POST", **kwargs)
1906 d.addCallback(self.decode_json)
1909 def decode_json(self, (s,url)):
1911 data = simplejson.loads(s)
1913 self.fail("%s: not JSON: '%s'" % (url, s))
1916 def web(self, n, method="GET", **kwargs):
1917 # returns (data, url)
1918 url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
1919 + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
1920 d = getPage(url, method=method)
1921 d.addCallback(lambda data: (data,url))
1924 def wait_for_operation(self, ignored, ophandle):
1925 url = self.webish_url + "operations/" + ophandle
1926 url += "?t=status&output=JSON"
1930 data = simplejson.loads(res)
1932 self.fail("%s: not JSON: '%s'" % (url, res))
1933 if not data["finished"]:
1934 d = self.stall(delay=1.0)
1935 d.addCallback(self.wait_for_operation, ophandle)
1941 def get_operation_results(self, ignored, ophandle, output=None):
1942 url = self.webish_url + "operations/" + ophandle
1945 url += "&output=" + output
1948 if output and output.lower() == "json":
1950 return simplejson.loads(res)
1952 self.fail("%s: not JSON: '%s'" % (url, res))
1957 def slow_web(self, n, output=None, **kwargs):
1959 handle = base32.b2a(os.urandom(4))
1960 d = self.web(n, "POST", ophandle=handle, **kwargs)
1961 d.addCallback(self.wait_for_operation, handle)
1962 d.addCallback(self.get_operation_results, handle, output=output)
1966 class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
1967 # construct a small directory tree (with one dir, one immutable file, one
1968 # mutable file, one LIT file, and a loop), and then check/examine it in
1971 def set_up_tree(self, ignored):
1980 c0 = self.clients[0]
1981 d = c0.create_empty_dirnode()
1982 def _created_root(n):
1984 self.root_uri = n.get_uri()
1985 d.addCallback(_created_root)
1986 d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1987 d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1988 def _created_mutable(n):
1990 self.mutable_uri = n.get_uri()
1991 d.addCallback(_created_mutable)
1993 large = upload.Data("Lots of data\n" * 1000, None)
1994 d.addCallback(lambda ign: self.root.add_file(u"large", large))
1995 def _created_large(n):
1997 self.large_uri = n.get_uri()
1998 d.addCallback(_created_large)
2000 small = upload.Data("Small enough for a LIT", None)
2001 d.addCallback(lambda ign: self.root.add_file(u"small", small))
2002 def _created_small(n):
2004 self.small_uri = n.get_uri()
2005 d.addCallback(_created_small)
2007 small2 = upload.Data("Small enough for a LIT too", None)
2008 d.addCallback(lambda ign: self.root.add_file(u"small2", small2))
2009 def _created_small2(n):
2011 self.small2_uri = n.get_uri()
2012 d.addCallback(_created_small2)
2014 d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
2017 def check_is_healthy(self, cr, n, where, incomplete=False):
2018 self.failUnless(ICheckerResults.providedBy(cr), where)
2019 self.failUnless(cr.is_healthy(), where)
2020 self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
2022 self.failUnlessEqual(cr.get_storage_index_string(),
2023 base32.b2a(n.get_storage_index()), where)
2024 needs_rebalancing = bool( len(self.clients) < 10 )
2026 self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
2028 self.failUnlessEqual(d["count-shares-good"], 10, where)
2029 self.failUnlessEqual(d["count-shares-needed"], 3, where)
2030 self.failUnlessEqual(d["count-shares-expected"], 10, where)
2032 self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
2033 self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
2034 self.failUnlessEqual(d["list-corrupt-shares"], [], where)
2036 self.failUnlessEqual(sorted(d["servers-responding"]),
2037 sorted([c.nodeid for c in self.clients]),
2039 self.failUnless("sharemap" in d, where)
2040 all_serverids = set()
2041 for (shareid, serverids) in d["sharemap"].items():
2042 all_serverids.update(serverids)
2043 self.failUnlessEqual(sorted(all_serverids),
2044 sorted([c.nodeid for c in self.clients]),
2047 self.failUnlessEqual(d["count-wrong-shares"], 0, where)
2048 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2049 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2052 def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
2053 self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
2054 self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
2055 self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
2056 self.failUnless(cr.get_post_repair_results().is_healthy(), where)
2057 self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
2058 self.failIf(cr.get_repair_attempted(), where)
2060 def deep_check_is_healthy(self, cr, num_healthy, where):
2061 self.failUnless(IDeepCheckResults.providedBy(cr))
2062 self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
2065 def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
2066 self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
2067 c = cr.get_counters()
2068 self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
2070 self.failUnlessEqual(c["count-objects-healthy-post-repair"],
2072 self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
2074 def test_good(self):
2075 self.basedir = self.mktemp()
2076 d = self.set_up_nodes()
2077 d.addCallback(self.set_up_tree)
2078 d.addCallback(self.do_stats)
2079 d.addCallback(self.do_test_check_good)
2080 d.addCallback(self.do_test_web_good)
2081 d.addCallback(self.do_test_cli_good)
2082 d.addErrback(self.explain_web_error)
2083 d.addErrback(self.explain_error)
2086 def do_stats(self, ignored):
2087 d = defer.succeed(None)
2088 d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
2089 d.addCallback(self.check_stats_good)
2092 def check_stats_good(self, s):
2093 self.failUnlessEqual(s["count-directories"], 1)
2094 self.failUnlessEqual(s["count-files"], 4)
2095 self.failUnlessEqual(s["count-immutable-files"], 1)
2096 self.failUnlessEqual(s["count-literal-files"], 2)
2097 self.failUnlessEqual(s["count-mutable-files"], 1)
2098 # don't check directories: their size will vary
2099 # s["largest-directory"]
2100 # s["size-directories"]
2101 self.failUnlessEqual(s["largest-directory-children"], 5)
2102 self.failUnlessEqual(s["largest-immutable-file"], 13000)
2103 # to re-use this function for both the local
2104 # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
2105 # coerce the result into a list of tuples. dirnode.start_deep_stats()
2106 # returns a list of tuples, but JSON only knows about lists., so
2107 # t=start-deep-stats returns a list of lists.
2108 histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
2109 self.failUnlessEqual(histogram, [(11, 31, 2),
2112 self.failUnlessEqual(s["size-immutable-files"], 13000)
2113 self.failUnlessEqual(s["size-literal-files"], 48)
2115 def do_test_check_good(self, ignored):
2116 d = defer.succeed(None)
2117 # check the individual items
2118 d.addCallback(lambda ign: self.root.check(Monitor()))
2119 d.addCallback(self.check_is_healthy, self.root, "root")
2120 d.addCallback(lambda ign: self.mutable.check(Monitor()))
2121 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2122 d.addCallback(lambda ign: self.large.check(Monitor()))
2123 d.addCallback(self.check_is_healthy, self.large, "large")
2124 d.addCallback(lambda ign: self.small.check(Monitor()))
2125 d.addCallback(self.failUnlessEqual, None, "small")
2126 d.addCallback(lambda ign: self.small2.check(Monitor()))
2127 d.addCallback(self.failUnlessEqual, None, "small2")
2129 # and again with verify=True
2130 d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2131 d.addCallback(self.check_is_healthy, self.root, "root")
2132 d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2133 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2134 d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2135 d.addCallback(self.check_is_healthy, self.large, "large",
2137 d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2138 d.addCallback(self.failUnlessEqual, None, "small")
2139 d.addCallback(lambda ign: self.small2.check(Monitor(), verify=True))
2140 d.addCallback(self.failUnlessEqual, None, "small2")
2142 # and check_and_repair(), which should be a nop
2143 d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2144 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2145 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2146 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2147 d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2148 d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2149 d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2150 d.addCallback(self.failUnlessEqual, None, "small")
2151 d.addCallback(lambda ign: self.small2.check_and_repair(Monitor()))
2152 d.addCallback(self.failUnlessEqual, None, "small2")
2154 # check_and_repair(verify=True)
2155 d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2156 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2157 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2158 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2159 d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2160 d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2162 d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2163 d.addCallback(self.failUnlessEqual, None, "small")
2164 d.addCallback(lambda ign: self.small2.check_and_repair(Monitor(), verify=True))
2165 d.addCallback(self.failUnlessEqual, None, "small2")
2168 # now deep-check the root, with various verify= and repair= options
2169 d.addCallback(lambda ign:
2170 self.root.start_deep_check().when_done())
2171 d.addCallback(self.deep_check_is_healthy, 3, "root")
2172 d.addCallback(lambda ign:
2173 self.root.start_deep_check(verify=True).when_done())
2174 d.addCallback(self.deep_check_is_healthy, 3, "root")
2175 d.addCallback(lambda ign:
2176 self.root.start_deep_check_and_repair().when_done())
2177 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2178 d.addCallback(lambda ign:
2179 self.root.start_deep_check_and_repair(verify=True).when_done())
2180 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2182 # and finally, start a deep-check, but then cancel it.
2183 d.addCallback(lambda ign: self.root.start_deep_check())
2184 def _checking(monitor):
2186 d = monitor.when_done()
2187 # this should fire as soon as the next dirnode.list finishes.
2188 # TODO: add a counter to measure how many list() calls are made,
2189 # assert that no more than one gets to run before the cancel()
2191 def _finished_normally(res):
2192 self.fail("this was supposed to fail, not finish normally")
2194 f.trap(OperationCancelledError)
2195 d.addCallbacks(_finished_normally, _cancelled)
2197 d.addCallback(_checking)
2201 def json_check_is_healthy(self, data, n, where, incomplete=False):
2203 self.failUnlessEqual(data["storage-index"],
2204 base32.b2a(n.get_storage_index()), where)
2206 self.failUnlessEqual(r["healthy"], True, where)
2207 needs_rebalancing = bool( len(self.clients) < 10 )
2209 self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2210 self.failUnlessEqual(r["count-shares-good"], 10, where)
2211 self.failUnlessEqual(r["count-shares-needed"], 3, where)
2212 self.failUnlessEqual(r["count-shares-expected"], 10, where)
2214 self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2215 self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2216 self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2218 self.failUnlessEqual(sorted(r["servers-responding"]),
2219 sorted([idlib.nodeid_b2a(c.nodeid)
2220 for c in self.clients]), where)
2221 self.failUnless("sharemap" in r, where)
2222 all_serverids = set()
2223 for (shareid, serverids_s) in r["sharemap"].items():
2224 all_serverids.update(serverids_s)
2225 self.failUnlessEqual(sorted(all_serverids),
2226 sorted([idlib.nodeid_b2a(c.nodeid)
2227 for c in self.clients]), where)
2228 self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2229 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2230 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2232 def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2233 self.failUnlessEqual(data["storage-index"],
2234 base32.b2a(n.get_storage_index()), where)
2235 self.failUnlessEqual(data["repair-attempted"], False, where)
2236 self.json_check_is_healthy(data["pre-repair-results"],
2237 n, where, incomplete)
2238 self.json_check_is_healthy(data["post-repair-results"],
2239 n, where, incomplete)
2241 def json_full_deepcheck_is_healthy(self, data, n, where):
2242 self.failUnlessEqual(data["root-storage-index"],
2243 base32.b2a(n.get_storage_index()), where)
2244 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2245 self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2246 self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2247 self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2248 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2249 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2250 self.json_check_stats_good(data["stats"], where)
2252 def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2253 self.failUnlessEqual(data["root-storage-index"],
2254 base32.b2a(n.get_storage_index()), where)
2255 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2257 self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2258 self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2259 self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2261 self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2262 self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2263 self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2265 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2266 self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2267 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2269 self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2270 self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2271 self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2274 def json_check_lit(self, data, n, where):
2275 self.failUnlessEqual(data["storage-index"], "", where)
2276 self.failUnlessEqual(data["results"]["healthy"], True, where)
2278 def json_check_stats_good(self, data, where):
2279 self.check_stats_good(data)
2281 def do_test_web_good(self, ignored):
2282 d = defer.succeed(None)
2285 d.addCallback(lambda ign:
2286 self.slow_web(self.root,
2287 t="start-deep-stats", output="json"))
2288 d.addCallback(self.json_check_stats_good, "deep-stats")
2291 d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2292 d.addCallback(self.json_check_is_healthy, self.root, "root")
2293 d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2294 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2295 d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2296 d.addCallback(self.json_check_is_healthy, self.large, "large")
2297 d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2298 d.addCallback(self.json_check_lit, self.small, "small")
2299 d.addCallback(lambda ign: self.web_json(self.small2, t="check"))
2300 d.addCallback(self.json_check_lit, self.small2, "small2")
2303 d.addCallback(lambda ign:
2304 self.web_json(self.root, t="check", verify="true"))
2305 d.addCallback(self.json_check_is_healthy, self.root, "root")
2306 d.addCallback(lambda ign:
2307 self.web_json(self.mutable, t="check", verify="true"))
2308 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2309 d.addCallback(lambda ign:
2310 self.web_json(self.large, t="check", verify="true"))
2311 d.addCallback(self.json_check_is_healthy, self.large, "large", incomplete=True)
2312 d.addCallback(lambda ign:
2313 self.web_json(self.small, t="check", verify="true"))
2314 d.addCallback(self.json_check_lit, self.small, "small")
2315 d.addCallback(lambda ign:
2316 self.web_json(self.small2, t="check", verify="true"))
2317 d.addCallback(self.json_check_lit, self.small2, "small2")
2319 # check and repair, no verify
2320 d.addCallback(lambda ign:
2321 self.web_json(self.root, t="check", repair="true"))
2322 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2323 d.addCallback(lambda ign:
2324 self.web_json(self.mutable, t="check", repair="true"))
2325 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2326 d.addCallback(lambda ign:
2327 self.web_json(self.large, t="check", repair="true"))
2328 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large")
2329 d.addCallback(lambda ign:
2330 self.web_json(self.small, t="check", repair="true"))
2331 d.addCallback(self.json_check_lit, self.small, "small")
2332 d.addCallback(lambda ign:
2333 self.web_json(self.small2, t="check", repair="true"))
2334 d.addCallback(self.json_check_lit, self.small2, "small2")
2336 # check+verify+repair
2337 d.addCallback(lambda ign:
2338 self.web_json(self.root, t="check", repair="true", verify="true"))
2339 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2340 d.addCallback(lambda ign:
2341 self.web_json(self.mutable, t="check", repair="true", verify="true"))
2342 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2343 d.addCallback(lambda ign:
2344 self.web_json(self.large, t="check", repair="true", verify="true"))
2345 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large", incomplete=True)
2346 d.addCallback(lambda ign:
2347 self.web_json(self.small, t="check", repair="true", verify="true"))
2348 d.addCallback(self.json_check_lit, self.small, "small")
2349 d.addCallback(lambda ign:
2350 self.web_json(self.small2, t="check", repair="true", verify="true"))
2351 d.addCallback(self.json_check_lit, self.small2, "small2")
2353 # now run a deep-check, with various verify= and repair= flags
2354 d.addCallback(lambda ign:
2355 self.slow_web(self.root, t="start-deep-check", output="json"))
2356 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2357 d.addCallback(lambda ign:
2358 self.slow_web(self.root, t="start-deep-check", verify="true",
2360 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2361 d.addCallback(lambda ign:
2362 self.slow_web(self.root, t="start-deep-check", repair="true",
2364 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2365 d.addCallback(lambda ign:
2366 self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2367 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2369 # now look at t=info
2370 d.addCallback(lambda ign: self.web(self.root, t="info"))
2371 # TODO: examine the output
2372 d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2373 d.addCallback(lambda ign: self.web(self.large, t="info"))
2374 d.addCallback(lambda ign: self.web(self.small, t="info"))
2375 d.addCallback(lambda ign: self.web(self.small2, t="info"))
2379 def _run_cli(self, argv, stdin=""):
2381 stdout, stderr = StringIO(), StringIO()
2382 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
2383 stdin=StringIO(stdin),
2384 stdout=stdout, stderr=stderr)
2386 return stdout.getvalue(), stderr.getvalue()
2387 d.addCallback(_done)
2390 def do_test_cli_good(self, ignored):
2391 basedir = self.getdir("client0")
2392 d = self._run_cli(["manifest",
2393 "--node-directory", basedir,
2395 def _check((out,err)):
2396 lines = [l for l in out.split("\n") if l]
2397 self.failUnlessEqual(len(lines), 5)
2401 cap, path = l.split(None, 1)
2406 self.failUnless(self.root.get_uri() in caps)
2407 self.failUnlessEqual(caps[self.root.get_uri()], "")
2408 self.failUnlessEqual(caps[self.mutable.get_uri()], "mutable")
2409 self.failUnlessEqual(caps[self.large.get_uri()], "large")
2410 self.failUnlessEqual(caps[self.small.get_uri()], "small")
2411 self.failUnlessEqual(caps[self.small2.get_uri()], "small2")
2412 d.addCallback(_check)
2414 d.addCallback(lambda res:
2415 self._run_cli(["manifest",
2416 "--node-directory", basedir,
2417 "--storage-index", self.root_uri]))
2418 def _check2((out,err)):
2419 lines = [l for l in out.split("\n") if l]
2420 self.failUnlessEqual(len(lines), 3)
2421 self.failUnless(base32.b2a(self.root.get_storage_index()) in lines)
2422 self.failUnless(base32.b2a(self.mutable.get_storage_index()) in lines)
2423 self.failUnless(base32.b2a(self.large.get_storage_index()) in lines)
2424 d.addCallback(_check2)
2426 d.addCallback(lambda res:
2427 self._run_cli(["stats",
2428 "--node-directory", basedir,
2430 def _check3((out,err)):
2431 lines = [l.strip() for l in out.split("\n") if l]
2432 self.failUnless("count-immutable-files: 1" in lines)
2433 self.failUnless("count-mutable-files: 1" in lines)
2434 self.failUnless("count-literal-files: 2" in lines)
2435 self.failUnless("count-files: 4" in lines)
2436 self.failUnless("count-directories: 1" in lines)
2437 self.failUnless("size-immutable-files: 13000" in lines)
2438 self.failUnless("size-literal-files: 48" in lines)
2439 self.failUnless(" 11-31 : 2".strip() in lines)
2440 self.failUnless("10001-31622 : 1".strip() in lines)
2441 d.addCallback(_check3)
2446 class DeepCheckWebBad(DeepCheckBase, unittest.TestCase):
2449 self.basedir = self.mktemp()
2450 d = self.set_up_nodes()
2451 d.addCallback(self.set_up_damaged_tree)
2452 d.addCallback(self.do_test_check_bad)
2453 d.addCallback(self.do_test_deepcheck_bad)
2454 d.addCallback(self.do_test_web_bad)
2455 d.addErrback(self.explain_web_error)
2456 d.addErrback(self.explain_error)
2461 def set_up_damaged_tree(self, ignored):
2466 # mutable-missing-shares
2467 # mutable-corrupt-shares
2468 # mutable-unrecoverable
2470 # large-missing-shares
2471 # large-corrupt-shares
2472 # large-unrecoverable
2476 c0 = self.clients[0]
2477 d = c0.create_empty_dirnode()
2478 def _created_root(n):
2480 self.root_uri = n.get_uri()
2481 d.addCallback(_created_root)
2482 d.addCallback(self.create_mangled, "mutable-good")
2483 d.addCallback(self.create_mangled, "mutable-missing-shares")
2484 d.addCallback(self.create_mangled, "mutable-corrupt-shares")
2485 d.addCallback(self.create_mangled, "mutable-unrecoverable")
2486 d.addCallback(self.create_mangled, "large-good")
2487 d.addCallback(self.create_mangled, "large-missing-shares")
2488 d.addCallback(self.create_mangled, "large-corrupt-shares")
2489 d.addCallback(self.create_mangled, "large-unrecoverable")
2494 def create_mangled(self, ignored, name):
2495 nodetype, mangletype = name.split("-", 1)
2496 if nodetype == "mutable":
2497 d = self.clients[0].create_mutable_file("mutable file contents")
2498 d.addCallback(lambda n: self.root.set_node(unicode(name), n))
2499 elif nodetype == "large":
2500 large = upload.Data("Lots of data\n" * 1000 + name + "\n", None)
2501 d = self.root.add_file(unicode(name), large)
2502 elif nodetype == "small":
2503 small = upload.Data("Small enough for a LIT", None)
2504 d = self.root.add_file(unicode(name), small)
2506 def _stash_node(node):
2507 self.nodes[name] = node
2509 d.addCallback(_stash_node)
2511 if mangletype == "good":
2513 elif mangletype == "missing-shares":
2514 d.addCallback(self._delete_some_shares)
2515 elif mangletype == "corrupt-shares":
2516 d.addCallback(self._corrupt_some_shares)
2518 assert mangletype == "unrecoverable"
2519 d.addCallback(self._delete_most_shares)
2523 def _run_cli(self, argv):
2524 stdout, stderr = StringIO(), StringIO()
2525 # this can only do synchronous operations
2526 assert argv[0] == "debug"
2527 runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
2528 return stdout.getvalue()
2530 def _find_shares(self, node):
2531 si = node.get_storage_index()
2532 out = self._run_cli(["debug", "find-shares", base32.b2a(si)] +
2533 [c.basedir for c in self.clients])
2534 files = out.split("\n")
2535 return [f for f in files if f]
2537 def _delete_some_shares(self, node):
2538 shares = self._find_shares(node)
2539 os.unlink(shares[0])
2540 os.unlink(shares[1])
2542 def _corrupt_some_shares(self, node):
2543 shares = self._find_shares(node)
2544 self._run_cli(["debug", "corrupt-share", shares[0]])
2545 self._run_cli(["debug", "corrupt-share", shares[1]])
2547 def _delete_most_shares(self, node):
2548 shares = self._find_shares(node)
2549 for share in shares[1:]:
2553 def check_is_healthy(self, cr, where):
2554 self.failUnless(ICheckerResults.providedBy(cr), where)
2555 self.failUnless(cr.is_healthy(), where)
2556 self.failUnless(cr.is_recoverable(), where)
2558 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2559 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2562 def check_is_missing_shares(self, cr, where):
2563 self.failUnless(ICheckerResults.providedBy(cr), where)
2564 self.failIf(cr.is_healthy(), where)
2565 self.failUnless(cr.is_recoverable(), where)
2567 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2568 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2571 def check_has_corrupt_shares(self, cr, where):
2572 # by "corrupt-shares" we mean the file is still recoverable
2573 self.failUnless(ICheckerResults.providedBy(cr), where)
2575 self.failIf(cr.is_healthy(), where)
2576 self.failUnless(cr.is_recoverable(), where)
2578 self.failUnless(d["count-shares-good"] < 10, where)
2579 self.failUnless(d["count-corrupt-shares"], where)
2580 self.failUnless(d["list-corrupt-shares"], where)
2583 def check_is_unrecoverable(self, cr, where):
2584 self.failUnless(ICheckerResults.providedBy(cr), where)
2586 self.failIf(cr.is_healthy(), where)
2587 self.failIf(cr.is_recoverable(), where)
2588 self.failUnless(d["count-shares-good"] < d["count-shares-needed"],
2590 self.failUnlessEqual(d["count-recoverable-versions"], 0, where)
2591 self.failUnlessEqual(d["count-unrecoverable-versions"], 1, where)
2594 def do_test_check_bad(self, ignored):
2595 d = defer.succeed(None)
2597 # check the individual items, without verification. This will not
2598 # detect corrupt shares.
2599 def _check(which, checker):
2600 d = self.nodes[which].check(Monitor())
2601 d.addCallback(checker, which + "--check")
2604 d.addCallback(lambda ign: _check("mutable-good", self.check_is_healthy))
2605 d.addCallback(lambda ign: _check("mutable-missing-shares",
2606 self.check_is_missing_shares))
2607 d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2608 self.check_is_healthy))
2609 d.addCallback(lambda ign: _check("mutable-unrecoverable",
2610 self.check_is_unrecoverable))
2611 d.addCallback(lambda ign: _check("large-good", self.check_is_healthy))
2612 d.addCallback(lambda ign: _check("large-missing-shares",
2613 self.check_is_missing_shares))
2614 d.addCallback(lambda ign: _check("large-corrupt-shares",
2615 self.check_is_healthy))
2616 d.addCallback(lambda ign: _check("large-unrecoverable",
2617 self.check_is_unrecoverable))
2619 # and again with verify=True, which *does* detect corrupt shares.
2620 def _checkv(which, checker):
2621 d = self.nodes[which].check(Monitor(), verify=True)
2622 d.addCallback(checker, which + "--check-and-verify")
2625 d.addCallback(lambda ign: _checkv("mutable-good", self.check_is_healthy))
2626 d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2627 self.check_is_missing_shares))
2628 d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2629 self.check_has_corrupt_shares))
2630 d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2631 self.check_is_unrecoverable))
2632 d.addCallback(lambda ign: _checkv("large-good", self.check_is_healthy))
2633 # disabled pending immutable verifier
2634 #d.addCallback(lambda ign: _checkv("large-missing-shares",
2635 # self.check_is_missing_shares))
2636 #d.addCallback(lambda ign: _checkv("large-corrupt-shares",
2637 # self.check_has_corrupt_shares))
2638 d.addCallback(lambda ign: _checkv("large-unrecoverable",
2639 self.check_is_unrecoverable))
2643 def do_test_deepcheck_bad(self, ignored):
2644 d = defer.succeed(None)
2646 # now deep-check the root, with various verify= and repair= options
2647 d.addCallback(lambda ign:
2648 self.root.start_deep_check().when_done())
2650 self.failUnless(IDeepCheckResults.providedBy(cr))
2651 c = cr.get_counters()
2652 self.failUnlessEqual(c["count-objects-checked"], 9)
2653 self.failUnlessEqual(c["count-objects-healthy"], 5)
2654 self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2655 self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2656 d.addCallback(_check1)
2658 d.addCallback(lambda ign:
2659 self.root.start_deep_check(verify=True).when_done())
2661 self.failUnless(IDeepCheckResults.providedBy(cr))
2662 c = cr.get_counters()
2663 self.failUnlessEqual(c["count-objects-checked"], 9)
2664 # until we have a real immutable verifier, these counts will be
2666 #self.failUnlessEqual(c["count-objects-healthy"], 3)
2667 #self.failUnlessEqual(c["count-objects-unhealthy"], 6)
2668 self.failUnlessEqual(c["count-objects-healthy"], 5) # todo
2669 self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2670 self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2671 d.addCallback(_check2)
2675 def json_is_healthy(self, data, where):
2677 self.failUnless(r["healthy"], where)
2678 self.failUnless(r["recoverable"], where)
2679 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2680 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2682 def json_is_missing_shares(self, data, where):
2684 self.failIf(r["healthy"], where)
2685 self.failUnless(r["recoverable"], where)
2686 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2687 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2689 def json_has_corrupt_shares(self, data, where):
2690 # by "corrupt-shares" we mean the file is still recoverable
2692 self.failIf(r["healthy"], where)
2693 self.failUnless(r["recoverable"], where)
2694 self.failUnless(r["count-shares-good"] < 10, where)
2695 self.failUnless(r["count-corrupt-shares"], where)
2696 self.failUnless(r["list-corrupt-shares"], where)
2698 def json_is_unrecoverable(self, data, where):
2700 self.failIf(r["healthy"], where)
2701 self.failIf(r["recoverable"], where)
2702 self.failUnless(r["count-shares-good"] < r["count-shares-needed"],
2704 self.failUnlessEqual(r["count-recoverable-versions"], 0, where)
2705 self.failUnlessEqual(r["count-unrecoverable-versions"], 1, where)
2707 def do_test_web_bad(self, ignored):
2708 d = defer.succeed(None)
2711 def _check(which, checker):
2712 d = self.web_json(self.nodes[which], t="check")
2713 d.addCallback(checker, which + "--webcheck")
2716 d.addCallback(lambda ign: _check("mutable-good",
2717 self.json_is_healthy))
2718 d.addCallback(lambda ign: _check("mutable-missing-shares",
2719 self.json_is_missing_shares))
2720 d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2721 self.json_is_healthy))
2722 d.addCallback(lambda ign: _check("mutable-unrecoverable",
2723 self.json_is_unrecoverable))
2724 d.addCallback(lambda ign: _check("large-good",
2725 self.json_is_healthy))
2726 d.addCallback(lambda ign: _check("large-missing-shares",
2727 self.json_is_missing_shares))
2728 d.addCallback(lambda ign: _check("large-corrupt-shares",
2729 self.json_is_healthy))
2730 d.addCallback(lambda ign: _check("large-unrecoverable",
2731 self.json_is_unrecoverable))
2734 def _checkv(which, checker):
2735 d = self.web_json(self.nodes[which], t="check", verify="true")
2736 d.addCallback(checker, which + "--webcheck-and-verify")
2739 d.addCallback(lambda ign: _checkv("mutable-good",
2740 self.json_is_healthy))
2741 d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2742 self.json_is_missing_shares))
2743 d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2744 self.json_has_corrupt_shares))
2745 d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2746 self.json_is_unrecoverable))
2747 d.addCallback(lambda ign: _checkv("large-good",
2748 self.json_is_healthy))
2749 # disabled pending immutable verifier
2750 #d.addCallback(lambda ign: _checkv("large-missing-shares",
2751 # self.json_is_missing_shares))
2752 #d.addCallback(lambda ign: _checkv("large-corrupt-shares",
2753 # self.json_has_corrupt_shares))
2754 d.addCallback(lambda ign: _checkv("large-unrecoverable",
2755 self.json_is_unrecoverable))