1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
11 from allmydata import uri, storage, offloaded
12 from allmydata.immutable import download, upload, filenode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17 ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
18 IDeepCheckAndRepairResults, NoSuchChildError, NotEnoughSharesError
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
27 from allmydata.test.common import SystemTestMixin, ErrorMixin, \
28 MemoryConsumer, download_to_data
31 This is some data to publish to the virtual drive, which needs to be large
32 enough to not fit inside a LIT uri.
35 class CountingDataUploadable(upload.Data):
37 interrupt_after = None
38 interrupt_after_d = None
40 def read(self, length):
41 self.bytes_read += length
42 if self.interrupt_after is not None:
43 if self.bytes_read > self.interrupt_after:
44 self.interrupt_after = None
45 self.interrupt_after_d.callback(self)
46 return upload.Data.read(self, length)
48 class GrabEverythingConsumer:
54 def registerProducer(self, producer, streaming):
56 assert IPushProducer.providedBy(producer)
58 def write(self, data):
61 def unregisterProducer(self):
64 class SystemTest(SystemTestMixin, unittest.TestCase):
66 def test_connections(self):
67 self.basedir = "system/SystemTest/test_connections"
68 d = self.set_up_nodes()
69 self.extra_node = None
70 d.addCallback(lambda res: self.add_extra_node(self.numclients))
71 def _check(extra_node):
72 self.extra_node = extra_node
73 for c in self.clients:
74 all_peerids = list(c.get_all_peerids())
75 self.failUnlessEqual(len(all_peerids), self.numclients+1)
76 permuted_peers = list(c.get_permuted_peers("storage", "a"))
77 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
80 def _shutdown_extra_node(res):
82 return self.extra_node.stopService()
84 d.addBoth(_shutdown_extra_node)
86 test_connections.timeout = 300
87 # test_connections is subsumed by test_upload_and_download, and takes
88 # quite a while to run on a slow machine (because of all the TLS
89 # connections that must be established). If we ever rework the introducer
90 # code to such an extent that we're not sure if it works anymore, we can
91 # reinstate this test until it does.
94 def test_upload_and_download_random_key(self):
95 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
96 return self._test_upload_and_download(convergence=None)
97 test_upload_and_download_random_key.timeout = 4800
99 def test_upload_and_download_convergent(self):
100 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
101 return self._test_upload_and_download(convergence="some convergence string")
102 test_upload_and_download_convergent.timeout = 4800
104 def _test_upload_and_download(self, convergence):
105 # we use 4000 bytes of data, which will result in about 400k written
106 # to disk among all our simulated nodes
107 DATA = "Some data to upload\n" * 200
108 d = self.set_up_nodes()
109 def _check_connections(res):
110 for c in self.clients:
111 all_peerids = list(c.get_all_peerids())
112 self.failUnlessEqual(len(all_peerids), self.numclients)
113 permuted_peers = list(c.get_permuted_peers("storage", "a"))
114 self.failUnlessEqual(len(permuted_peers), self.numclients)
115 d.addCallback(_check_connections)
119 u = self.clients[0].getServiceNamed("uploader")
121 # we crank the max segsize down to 1024b for the duration of this
122 # test, so we can exercise multiple segments. It is important
123 # that this is not a multiple of the segment size, so that the
124 # tail segment is not the same length as the others. This actualy
125 # gets rounded up to 1025 to be a multiple of the number of
126 # required shares (since we use 25 out of 100 FEC).
127 up = upload.Data(DATA, convergence=convergence)
128 up.max_segment_size = 1024
131 d.addCallback(_do_upload)
132 def _upload_done(results):
134 log.msg("upload finished: uri is %s" % (uri,))
136 dl = self.clients[1].getServiceNamed("downloader")
138 d.addCallback(_upload_done)
140 def _upload_again(res):
141 # Upload again. If using convergent encryption then this ought to be
142 # short-circuited, however with the way we currently generate URIs
143 # (i.e. because they include the roothash), we have to do all of the
144 # encoding work, and only get to save on the upload part.
145 log.msg("UPLOADING AGAIN")
146 up = upload.Data(DATA, convergence=convergence)
147 up.max_segment_size = 1024
148 d1 = self.uploader.upload(up)
149 d.addCallback(_upload_again)
151 def _download_to_data(res):
152 log.msg("DOWNLOADING")
153 return self.downloader.download_to_data(self.uri)
154 d.addCallback(_download_to_data)
155 def _download_to_data_done(data):
156 log.msg("download finished")
157 self.failUnlessEqual(data, DATA)
158 d.addCallback(_download_to_data_done)
160 target_filename = os.path.join(self.basedir, "download.target")
161 def _download_to_filename(res):
162 return self.downloader.download_to_filename(self.uri,
164 d.addCallback(_download_to_filename)
165 def _download_to_filename_done(res):
166 newdata = open(target_filename, "rb").read()
167 self.failUnlessEqual(newdata, DATA)
168 d.addCallback(_download_to_filename_done)
170 target_filename2 = os.path.join(self.basedir, "download.target2")
171 def _download_to_filehandle(res):
172 fh = open(target_filename2, "wb")
173 return self.downloader.download_to_filehandle(self.uri, fh)
174 d.addCallback(_download_to_filehandle)
175 def _download_to_filehandle_done(fh):
177 newdata = open(target_filename2, "rb").read()
178 self.failUnlessEqual(newdata, DATA)
179 d.addCallback(_download_to_filehandle_done)
181 consumer = GrabEverythingConsumer()
182 ct = download.ConsumerAdapter(consumer)
183 d.addCallback(lambda res:
184 self.downloader.download(self.uri, ct))
185 def _download_to_consumer_done(ign):
186 self.failUnlessEqual(consumer.contents, DATA)
187 d.addCallback(_download_to_consumer_done)
190 n = self.clients[1].create_node_from_uri(self.uri)
191 d = download_to_data(n)
192 def _read_done(data):
193 self.failUnlessEqual(data, DATA)
194 d.addCallback(_read_done)
195 d.addCallback(lambda ign:
196 n.read(MemoryConsumer(), offset=1, size=4))
197 def _read_portion_done(mc):
198 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
199 d.addCallback(_read_portion_done)
200 d.addCallback(lambda ign:
201 n.read(MemoryConsumer(), offset=2, size=None))
202 def _read_tail_done(mc):
203 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
204 d.addCallback(_read_tail_done)
205 d.addCallback(lambda ign:
206 n.read(MemoryConsumer(), size=len(DATA)+1000))
207 def _read_too_much(mc):
208 self.failUnlessEqual("".join(mc.chunks), DATA)
209 d.addCallback(_read_too_much)
212 d.addCallback(_test_read)
214 def _test_bad_read(res):
215 bad_u = uri.from_string_filenode(self.uri)
216 bad_u.key = self.flip_bit(bad_u.key)
217 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
218 # this should cause an error during download
220 d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
222 bad_n.read, MemoryConsumer(), offset=2)
224 d.addCallback(_test_bad_read)
226 def _download_nonexistent_uri(res):
227 baduri = self.mangle_uri(self.uri)
228 log.msg("about to download non-existent URI", level=log.UNUSUAL,
229 facility="tahoe.tests")
230 d1 = self.downloader.download_to_data(baduri)
231 def _baduri_should_fail(res):
232 log.msg("finished downloading non-existend URI",
233 level=log.UNUSUAL, facility="tahoe.tests")
234 self.failUnless(isinstance(res, Failure))
235 self.failUnless(res.check(NotEnoughSharesError),
236 "expected NotEnoughSharesError, got %s" % res)
237 # TODO: files that have zero peers should get a special kind
238 # of NotEnoughSharesError, which can be used to suggest that
239 # the URI might be wrong or that they've never uploaded the
240 # file in the first place.
241 d1.addBoth(_baduri_should_fail)
243 d.addCallback(_download_nonexistent_uri)
245 # add a new node, which doesn't accept shares, and only uses the
247 d.addCallback(lambda res: self.add_extra_node(self.numclients,
249 add_to_sparent=True))
250 def _added(extra_node):
251 self.extra_node = extra_node
252 d.addCallback(_added)
254 HELPER_DATA = "Data that needs help to upload" * 1000
255 def _upload_with_helper(res):
256 u = upload.Data(HELPER_DATA, convergence=convergence)
257 d = self.extra_node.upload(u)
258 def _uploaded(results):
260 return self.downloader.download_to_data(uri)
261 d.addCallback(_uploaded)
263 self.failUnlessEqual(newdata, HELPER_DATA)
264 d.addCallback(_check)
266 d.addCallback(_upload_with_helper)
268 def _upload_duplicate_with_helper(res):
269 u = upload.Data(HELPER_DATA, convergence=convergence)
270 u.debug_stash_RemoteEncryptedUploadable = True
271 d = self.extra_node.upload(u)
272 def _uploaded(results):
274 return self.downloader.download_to_data(uri)
275 d.addCallback(_uploaded)
277 self.failUnlessEqual(newdata, HELPER_DATA)
278 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
279 "uploadable started uploading, should have been avoided")
280 d.addCallback(_check)
282 if convergence is not None:
283 d.addCallback(_upload_duplicate_with_helper)
285 def _upload_resumable(res):
286 DATA = "Data that needs help to upload and gets interrupted" * 1000
287 u1 = CountingDataUploadable(DATA, convergence=convergence)
288 u2 = CountingDataUploadable(DATA, convergence=convergence)
290 # we interrupt the connection after about 5kB by shutting down
291 # the helper, then restartingit.
292 u1.interrupt_after = 5000
293 u1.interrupt_after_d = defer.Deferred()
294 u1.interrupt_after_d.addCallback(lambda res:
295 self.bounce_client(0))
297 # sneak into the helper and reduce its chunk size, so that our
298 # debug_interrupt will sever the connection on about the fifth
299 # chunk fetched. This makes sure that we've started to write the
300 # new shares before we abandon them, which exercises the
301 # abort/delete-partial-share code. TODO: find a cleaner way to do
302 # this. I know that this will affect later uses of the helper in
303 # this same test run, but I'm not currently worried about it.
304 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
306 d = self.extra_node.upload(u1)
308 def _should_not_finish(res):
309 self.fail("interrupted upload should have failed, not finished"
310 " with result %s" % (res,))
312 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
314 # make sure we actually interrupted it before finishing the
316 self.failUnless(u1.bytes_read < len(DATA),
317 "read %d out of %d total" % (u1.bytes_read,
320 log.msg("waiting for reconnect", level=log.NOISY,
321 facility="tahoe.test.test_system")
322 # now, we need to give the nodes a chance to notice that this
323 # connection has gone away. When this happens, the storage
324 # servers will be told to abort their uploads, removing the
325 # partial shares. Unfortunately this involves TCP messages
326 # going through the loopback interface, and we can't easily
327 # predict how long that will take. If it were all local, we
328 # could use fireEventually() to stall. Since we don't have
329 # the right introduction hooks, the best we can do is use a
330 # fixed delay. TODO: this is fragile.
331 u1.interrupt_after_d.addCallback(self.stall, 2.0)
332 return u1.interrupt_after_d
333 d.addCallbacks(_should_not_finish, _interrupted)
335 def _disconnected(res):
336 # check to make sure the storage servers aren't still hanging
337 # on to the partial share: their incoming/ directories should
339 log.msg("disconnected", level=log.NOISY,
340 facility="tahoe.test.test_system")
341 for i in range(self.numclients):
342 incdir = os.path.join(self.getdir("client%d" % i),
343 "storage", "shares", "incoming")
344 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
345 d.addCallback(_disconnected)
347 # then we need to give the reconnector a chance to
348 # reestablish the connection to the helper.
349 d.addCallback(lambda res:
350 log.msg("wait_for_connections", level=log.NOISY,
351 facility="tahoe.test.test_system"))
352 d.addCallback(lambda res: self.wait_for_connections())
355 d.addCallback(lambda res:
356 log.msg("uploading again", level=log.NOISY,
357 facility="tahoe.test.test_system"))
358 d.addCallback(lambda res: self.extra_node.upload(u2))
360 def _uploaded(results):
362 log.msg("Second upload complete", level=log.NOISY,
363 facility="tahoe.test.test_system")
365 # this is really bytes received rather than sent, but it's
366 # convenient and basically measures the same thing
367 bytes_sent = results.ciphertext_fetched
369 # We currently don't support resumption of upload if the data is
370 # encrypted with a random key. (Because that would require us
371 # to store the key locally and re-use it on the next upload of
372 # this file, which isn't a bad thing to do, but we currently
374 if convergence is not None:
375 # Make sure we did not have to read the whole file the
376 # second time around .
377 self.failUnless(bytes_sent < len(DATA),
378 "resumption didn't save us any work:"
379 " read %d bytes out of %d total" %
380 (bytes_sent, len(DATA)))
382 # Make sure we did have to read the whole file the second
383 # time around -- because the one that we partially uploaded
384 # earlier was encrypted with a different random key.
385 self.failIf(bytes_sent < len(DATA),
386 "resumption saved us some work even though we were using random keys:"
387 " read %d bytes out of %d total" %
388 (bytes_sent, len(DATA)))
389 return self.downloader.download_to_data(uri)
390 d.addCallback(_uploaded)
393 self.failUnlessEqual(newdata, DATA)
394 # If using convergent encryption, then also check that the
395 # helper has removed the temp file from its directories.
396 if convergence is not None:
397 basedir = os.path.join(self.getdir("client0"), "helper")
398 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
399 self.failUnlessEqual(files, [])
400 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
401 self.failUnlessEqual(files, [])
402 d.addCallback(_check)
404 d.addCallback(_upload_resumable)
408 def _find_shares(self, basedir):
410 for (dirpath, dirnames, filenames) in os.walk(basedir):
411 if "storage" not in dirpath:
415 pieces = dirpath.split(os.sep)
416 if pieces[-4] == "storage" and pieces[-3] == "shares":
417 # we're sitting in .../storage/shares/$START/$SINDEX , and there
418 # are sharefiles here
419 assert pieces[-5].startswith("client")
420 client_num = int(pieces[-5][-1])
421 storage_index_s = pieces[-1]
422 storage_index = storage.si_a2b(storage_index_s)
423 for sharename in filenames:
424 shnum = int(sharename)
425 filename = os.path.join(dirpath, sharename)
426 data = (client_num, storage_index, filename, shnum)
429 self.fail("unable to find any share files in %s" % basedir)
432 def _corrupt_mutable_share(self, filename, which):
433 msf = storage.MutableShareFile(filename)
434 datav = msf.readv([ (0, 1000000) ])
435 final_share = datav[0]
436 assert len(final_share) < 1000000 # ought to be truncated
437 pieces = mutable_layout.unpack_share(final_share)
438 (seqnum, root_hash, IV, k, N, segsize, datalen,
439 verification_key, signature, share_hash_chain, block_hash_tree,
440 share_data, enc_privkey) = pieces
442 if which == "seqnum":
445 root_hash = self.flip_bit(root_hash)
447 IV = self.flip_bit(IV)
448 elif which == "segsize":
449 segsize = segsize + 15
450 elif which == "pubkey":
451 verification_key = self.flip_bit(verification_key)
452 elif which == "signature":
453 signature = self.flip_bit(signature)
454 elif which == "share_hash_chain":
455 nodenum = share_hash_chain.keys()[0]
456 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
457 elif which == "block_hash_tree":
458 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
459 elif which == "share_data":
460 share_data = self.flip_bit(share_data)
461 elif which == "encprivkey":
462 enc_privkey = self.flip_bit(enc_privkey)
464 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
466 final_share = mutable_layout.pack_share(prefix,
473 msf.writev( [(0, final_share)], None)
476 def test_mutable(self):
477 self.basedir = "system/SystemTest/test_mutable"
478 DATA = "initial contents go here." # 25 bytes % 3 != 0
479 NEWDATA = "new contents yay"
480 NEWERDATA = "this is getting old"
482 d = self.set_up_nodes(use_key_generator=True)
484 def _create_mutable(res):
486 log.msg("starting create_mutable_file")
487 d1 = c.create_mutable_file(DATA)
489 log.msg("DONE: %s" % (res,))
490 self._mutable_node_1 = res
492 d1.addCallback(_done)
494 d.addCallback(_create_mutable)
496 def _test_debug(res):
497 # find a share. It is important to run this while there is only
498 # one slot in the grid.
499 shares = self._find_shares(self.basedir)
500 (client_num, storage_index, filename, shnum) = shares[0]
501 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
503 log.msg(" for clients[%d]" % client_num)
505 out,err = StringIO(), StringIO()
506 rc = runner.runner(["debug", "dump-share", "--offsets",
508 stdout=out, stderr=err)
509 output = out.getvalue()
510 self.failUnlessEqual(rc, 0)
512 self.failUnless("Mutable slot found:\n" in output)
513 self.failUnless("share_type: SDMF\n" in output)
514 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
515 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
516 self.failUnless(" num_extra_leases: 0\n" in output)
517 # the pubkey size can vary by a byte, so the container might
518 # be a bit larger on some runs.
519 m = re.search(r'^ container_size: (\d+)$', output, re.M)
521 container_size = int(m.group(1))
522 self.failUnless(2037 <= container_size <= 2049, container_size)
523 m = re.search(r'^ data_length: (\d+)$', output, re.M)
525 data_length = int(m.group(1))
526 self.failUnless(2037 <= data_length <= 2049, data_length)
527 self.failUnless(" secrets are for nodeid: %s\n" % peerid
529 self.failUnless(" SDMF contents:\n" in output)
530 self.failUnless(" seqnum: 1\n" in output)
531 self.failUnless(" required_shares: 3\n" in output)
532 self.failUnless(" total_shares: 10\n" in output)
533 self.failUnless(" segsize: 27\n" in output, (output, filename))
534 self.failUnless(" datalen: 25\n" in output)
535 # the exact share_hash_chain nodes depends upon the sharenum,
536 # and is more of a hassle to compute than I want to deal with
538 self.failUnless(" share_hash_chain: " in output)
539 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
540 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
541 base32.b2a(storage_index))
542 self.failUnless(expected in output)
543 except unittest.FailTest:
545 print "dump-share output was:"
548 d.addCallback(_test_debug)
552 # first, let's see if we can use the existing node to retrieve the
553 # contents. This allows it to use the cached pubkey and maybe the
554 # latest-known sharemap.
556 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
557 def _check_download_1(res):
558 self.failUnlessEqual(res, DATA)
559 # now we see if we can retrieve the data from a new node,
560 # constructed using the URI of the original one. We do this test
561 # on the same client that uploaded the data.
562 uri = self._mutable_node_1.get_uri()
563 log.msg("starting retrieve1")
564 newnode = self.clients[0].create_node_from_uri(uri)
565 newnode_2 = self.clients[0].create_node_from_uri(uri)
566 self.failUnlessIdentical(newnode, newnode_2)
567 return newnode.download_best_version()
568 d.addCallback(_check_download_1)
570 def _check_download_2(res):
571 self.failUnlessEqual(res, DATA)
572 # same thing, but with a different client
573 uri = self._mutable_node_1.get_uri()
574 newnode = self.clients[1].create_node_from_uri(uri)
575 log.msg("starting retrieve2")
576 d1 = newnode.download_best_version()
577 d1.addCallback(lambda res: (res, newnode))
579 d.addCallback(_check_download_2)
581 def _check_download_3((res, newnode)):
582 self.failUnlessEqual(res, DATA)
584 log.msg("starting replace1")
585 d1 = newnode.overwrite(NEWDATA)
586 d1.addCallback(lambda res: newnode.download_best_version())
588 d.addCallback(_check_download_3)
590 def _check_download_4(res):
591 self.failUnlessEqual(res, NEWDATA)
592 # now create an even newer node and replace the data on it. This
593 # new node has never been used for download before.
594 uri = self._mutable_node_1.get_uri()
595 newnode1 = self.clients[2].create_node_from_uri(uri)
596 newnode2 = self.clients[3].create_node_from_uri(uri)
597 self._newnode3 = self.clients[3].create_node_from_uri(uri)
598 log.msg("starting replace2")
599 d1 = newnode1.overwrite(NEWERDATA)
600 d1.addCallback(lambda res: newnode2.download_best_version())
602 d.addCallback(_check_download_4)
604 def _check_download_5(res):
605 log.msg("finished replace2")
606 self.failUnlessEqual(res, NEWERDATA)
607 d.addCallback(_check_download_5)
609 def _corrupt_shares(res):
610 # run around and flip bits in all but k of the shares, to test
612 shares = self._find_shares(self.basedir)
613 ## sort by share number
614 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
615 where = dict([ (shnum, filename)
616 for (client_num, storage_index, filename, shnum)
618 assert len(where) == 10 # this test is designed for 3-of-10
619 for shnum, filename in where.items():
620 # shares 7,8,9 are left alone. read will check
621 # (share_hash_chain, block_hash_tree, share_data). New
622 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
623 # segsize, signature).
625 # read: this will trigger "pubkey doesn't match
627 self._corrupt_mutable_share(filename, "pubkey")
628 self._corrupt_mutable_share(filename, "encprivkey")
630 # triggers "signature is invalid"
631 self._corrupt_mutable_share(filename, "seqnum")
633 # triggers "signature is invalid"
634 self._corrupt_mutable_share(filename, "R")
636 # triggers "signature is invalid"
637 self._corrupt_mutable_share(filename, "segsize")
639 self._corrupt_mutable_share(filename, "share_hash_chain")
641 self._corrupt_mutable_share(filename, "block_hash_tree")
643 self._corrupt_mutable_share(filename, "share_data")
644 # other things to correct: IV, signature
645 # 7,8,9 are left alone
647 # note that initial_query_count=5 means that we'll hit the
648 # first 5 servers in effectively random order (based upon
649 # response time), so we won't necessarily ever get a "pubkey
650 # doesn't match fingerprint" error (if we hit shnum>=1 before
651 # shnum=0, we pull the pubkey from there). To get repeatable
652 # specific failures, we need to set initial_query_count=1,
653 # but of course that will change the sequencing behavior of
654 # the retrieval process. TODO: find a reasonable way to make
655 # this a parameter, probably when we expand this test to test
656 # for one failure mode at a time.
658 # when we retrieve this, we should get three signature
659 # failures (where we've mangled seqnum, R, and segsize). The
661 d.addCallback(_corrupt_shares)
663 d.addCallback(lambda res: self._newnode3.download_best_version())
664 d.addCallback(_check_download_5)
666 def _check_empty_file(res):
667 # make sure we can create empty files, this usually screws up the
669 d1 = self.clients[2].create_mutable_file("")
670 d1.addCallback(lambda newnode: newnode.download_best_version())
671 d1.addCallback(lambda res: self.failUnlessEqual("", res))
673 d.addCallback(_check_empty_file)
675 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
676 def _created_dirnode(dnode):
677 log.msg("_created_dirnode(%s)" % (dnode,))
679 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
680 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
681 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
682 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
683 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
684 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
685 d1.addCallback(lambda res: dnode.build_manifest().when_done())
686 d1.addCallback(lambda res:
687 self.failUnlessEqual(len(res["manifest"]), 1))
689 d.addCallback(_created_dirnode)
691 def wait_for_c3_kg_conn():
692 return self.clients[3]._key_generator is not None
693 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
695 def check_kg_poolsize(junk, size_delta):
696 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
697 self.key_generator_svc.key_generator.pool_size + size_delta)
699 d.addCallback(check_kg_poolsize, 0)
700 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
701 d.addCallback(check_kg_poolsize, -1)
702 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
703 d.addCallback(check_kg_poolsize, -2)
704 # use_helper induces use of clients[3], which is the using-key_gen client
705 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
706 d.addCallback(check_kg_poolsize, -3)
709 # The default 120 second timeout went off when running it under valgrind
710 # on my old Windows laptop, so I'm bumping up the timeout.
711 test_mutable.timeout = 240
713 def flip_bit(self, good):
714 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
716 def mangle_uri(self, gooduri):
717 # change the key, which changes the storage index, which means we'll
718 # be asking about the wrong file, so nobody will have any shares
719 u = IFileURI(gooduri)
720 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
721 uri_extension_hash=u.uri_extension_hash,
722 needed_shares=u.needed_shares,
723 total_shares=u.total_shares,
725 return u2.to_string()
727 # TODO: add a test which mangles the uri_extension_hash instead, and
728 # should fail due to not being able to get a valid uri_extension block.
729 # Also a test which sneakily mangles the uri_extension block to change
730 # some of the validation data, so it will fail in the post-download phase
731 # when the file's crypttext integrity check fails. Do the same thing for
732 # the key, which should cause the download to fail the post-download
733 # plaintext_hash check.
735 def test_vdrive(self):
736 self.basedir = "system/SystemTest/test_vdrive"
737 self.data = LARGE_DATA
738 d = self.set_up_nodes(use_stats_gatherer=True)
739 d.addCallback(self._test_introweb)
740 d.addCallback(self.log, "starting publish")
741 d.addCallback(self._do_publish1)
742 d.addCallback(self._test_runner)
743 d.addCallback(self._do_publish2)
744 # at this point, we have the following filesystem (where "R" denotes
745 # self._root_directory_uri):
748 # R/subdir1/mydata567
750 # R/subdir1/subdir2/mydata992
752 d.addCallback(lambda res: self.bounce_client(0))
753 d.addCallback(self.log, "bounced client0")
755 d.addCallback(self._check_publish1)
756 d.addCallback(self.log, "did _check_publish1")
757 d.addCallback(self._check_publish2)
758 d.addCallback(self.log, "did _check_publish2")
759 d.addCallback(self._do_publish_private)
760 d.addCallback(self.log, "did _do_publish_private")
761 # now we also have (where "P" denotes a new dir):
762 # P/personal/sekrit data
763 # P/s2-rw -> /subdir1/subdir2/
764 # P/s2-ro -> /subdir1/subdir2/ (read-only)
765 d.addCallback(self._check_publish_private)
766 d.addCallback(self.log, "did _check_publish_private")
767 d.addCallback(self._test_web)
768 d.addCallback(self._test_control)
769 d.addCallback(self._test_cli)
770 # P now has four top-level children:
771 # P/personal/sekrit data
774 # P/test_put/ (empty)
775 d.addCallback(self._test_checker)
777 test_vdrive.timeout = 1100
779 def _test_introweb(self, res):
780 d = getPage(self.introweb_url, method="GET", followRedirect=True)
783 self.failUnless("allmydata: %s" % str(allmydata.__version__)
785 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
786 self.failUnless("Subscription Summary: storage: 5" in res)
787 except unittest.FailTest:
789 print "GET %s output was:" % self.introweb_url
792 d.addCallback(_check)
793 d.addCallback(lambda res:
794 getPage(self.introweb_url + "?t=json",
795 method="GET", followRedirect=True))
796 def _check_json(res):
797 data = simplejson.loads(res)
799 self.failUnlessEqual(data["subscription_summary"],
801 self.failUnlessEqual(data["announcement_summary"],
802 {"storage": 5, "stub_client": 5})
803 self.failUnlessEqual(data["announcement_distinct_hosts"],
804 {"storage": 1, "stub_client": 1})
805 except unittest.FailTest:
807 print "GET %s?t=json output was:" % self.introweb_url
810 d.addCallback(_check_json)
813 def _do_publish1(self, res):
814 ut = upload.Data(self.data, convergence=None)
816 d = c0.create_empty_dirnode()
817 def _made_root(new_dirnode):
818 self._root_directory_uri = new_dirnode.get_uri()
819 return c0.create_node_from_uri(self._root_directory_uri)
820 d.addCallback(_made_root)
821 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
822 def _made_subdir1(subdir1_node):
823 self._subdir1_node = subdir1_node
824 d1 = subdir1_node.add_file(u"mydata567", ut)
825 d1.addCallback(self.log, "publish finished")
826 def _stash_uri(filenode):
827 self.uri = filenode.get_uri()
828 d1.addCallback(_stash_uri)
830 d.addCallback(_made_subdir1)
833 def _do_publish2(self, res):
834 ut = upload.Data(self.data, convergence=None)
835 d = self._subdir1_node.create_empty_directory(u"subdir2")
836 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
839 def log(self, res, *args, **kwargs):
840 # print "MSG: %s RES: %s" % (msg, args)
841 log.msg(*args, **kwargs)
844 def _do_publish_private(self, res):
845 self.smalldata = "sssh, very secret stuff"
846 ut = upload.Data(self.smalldata, convergence=None)
847 d = self.clients[0].create_empty_dirnode()
848 d.addCallback(self.log, "GOT private directory")
849 def _got_new_dir(privnode):
850 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
851 d1 = privnode.create_empty_directory(u"personal")
852 d1.addCallback(self.log, "made P/personal")
853 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
854 d1.addCallback(self.log, "made P/personal/sekrit data")
855 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
857 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
858 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
860 d1.addCallback(_got_s2)
861 d1.addCallback(lambda res: privnode)
863 d.addCallback(_got_new_dir)
866 def _check_publish1(self, res):
867 # this one uses the iterative API
869 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
870 d.addCallback(self.log, "check_publish1 got /")
871 d.addCallback(lambda root: root.get(u"subdir1"))
872 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
873 d.addCallback(lambda filenode: filenode.download_to_data())
874 d.addCallback(self.log, "get finished")
876 self.failUnlessEqual(data, self.data)
877 d.addCallback(_get_done)
880 def _check_publish2(self, res):
881 # this one uses the path-based API
882 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
883 d = rootnode.get_child_at_path(u"subdir1")
884 d.addCallback(lambda dirnode:
885 self.failUnless(IDirectoryNode.providedBy(dirnode)))
886 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
887 d.addCallback(lambda filenode: filenode.download_to_data())
888 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
890 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
891 def _got_filenode(filenode):
892 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
893 assert fnode == filenode
894 d.addCallback(_got_filenode)
897 def _check_publish_private(self, resnode):
898 # this one uses the path-based API
899 self._private_node = resnode
901 d = self._private_node.get_child_at_path(u"personal")
902 def _got_personal(personal):
903 self._personal_node = personal
905 d.addCallback(_got_personal)
907 d.addCallback(lambda dirnode:
908 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
910 return self._private_node.get_child_at_path(path)
912 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
913 d.addCallback(lambda filenode: filenode.download_to_data())
914 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
915 d.addCallback(lambda res: get_path(u"s2-rw"))
916 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
917 d.addCallback(lambda res: get_path(u"s2-ro"))
918 def _got_s2ro(dirnode):
919 self.failUnless(dirnode.is_mutable(), dirnode)
920 self.failUnless(dirnode.is_readonly(), dirnode)
921 d1 = defer.succeed(None)
922 d1.addCallback(lambda res: dirnode.list())
923 d1.addCallback(self.log, "dirnode.list")
925 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
927 d1.addCallback(self.log, "doing add_file(ro)")
928 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
929 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
931 d1.addCallback(self.log, "doing get(ro)")
932 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
933 d1.addCallback(lambda filenode:
934 self.failUnless(IFileNode.providedBy(filenode)))
936 d1.addCallback(self.log, "doing delete(ro)")
937 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
939 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
941 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
943 personal = self._personal_node
944 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
946 d1.addCallback(self.log, "doing move_child_to(ro)2")
947 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
949 d1.addCallback(self.log, "finished with _got_s2ro")
951 d.addCallback(_got_s2ro)
952 def _got_home(dummy):
953 home = self._private_node
954 personal = self._personal_node
955 d1 = defer.succeed(None)
956 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
957 d1.addCallback(lambda res:
958 personal.move_child_to(u"sekrit data",home,u"sekrit"))
960 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
961 d1.addCallback(lambda res:
962 home.move_child_to(u"sekrit", home, u"sekrit data"))
964 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
965 d1.addCallback(lambda res:
966 home.move_child_to(u"sekrit data", personal))
968 d1.addCallback(lambda res: home.build_manifest().when_done())
969 d1.addCallback(self.log, "manifest")
973 # P/personal/sekrit data
974 # P/s2-rw (same as P/s2-ro)
975 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
976 d1.addCallback(lambda res:
977 self.failUnlessEqual(len(res["manifest"]), 5))
978 d1.addCallback(lambda res: home.start_deep_stats().when_done())
979 def _check_stats(stats):
980 expected = {"count-immutable-files": 1,
981 "count-mutable-files": 0,
982 "count-literal-files": 1,
984 "count-directories": 3,
985 "size-immutable-files": 112,
986 "size-literal-files": 23,
987 #"size-directories": 616, # varies
988 #"largest-directory": 616,
989 "largest-directory-children": 3,
990 "largest-immutable-file": 112,
992 for k,v in expected.iteritems():
993 self.failUnlessEqual(stats[k], v,
994 "stats[%s] was %s, not %s" %
996 self.failUnless(stats["size-directories"] > 1300,
997 stats["size-directories"])
998 self.failUnless(stats["largest-directory"] > 800,
999 stats["largest-directory"])
1000 self.failUnlessEqual(stats["size-files-histogram"],
1001 [ (11, 31, 1), (101, 316, 1) ])
1002 d1.addCallback(_check_stats)
1004 d.addCallback(_got_home)
1007 def shouldFail(self, res, expected_failure, which, substring=None):
1008 if isinstance(res, Failure):
1009 res.trap(expected_failure)
1011 self.failUnless(substring in str(res),
1012 "substring '%s' not in '%s'"
1013 % (substring, str(res)))
1015 self.fail("%s was supposed to raise %s, not get '%s'" %
1016 (which, expected_failure, res))
1018 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1019 assert substring is None or isinstance(substring, str)
1020 d = defer.maybeDeferred(callable, *args, **kwargs)
1022 if isinstance(res, Failure):
1023 res.trap(expected_failure)
1025 self.failUnless(substring in str(res),
1026 "substring '%s' not in '%s'"
1027 % (substring, str(res)))
1029 self.fail("%s was supposed to raise %s, not get '%s'" %
1030 (which, expected_failure, res))
1034 def PUT(self, urlpath, data):
1035 url = self.webish_url + urlpath
1036 return getPage(url, method="PUT", postdata=data)
1038 def GET(self, urlpath, followRedirect=False):
1039 url = self.webish_url + urlpath
1040 return getPage(url, method="GET", followRedirect=followRedirect)
1042 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1044 url = self.helper_webish_url + urlpath
1046 url = self.webish_url + urlpath
1047 sepbase = "boogabooga"
1048 sep = "--" + sepbase
1051 form.append('Content-Disposition: form-data; name="_charset"')
1053 form.append('UTF-8')
1055 for name, value in fields.iteritems():
1056 if isinstance(value, tuple):
1057 filename, value = value
1058 form.append('Content-Disposition: form-data; name="%s"; '
1059 'filename="%s"' % (name, filename.encode("utf-8")))
1061 form.append('Content-Disposition: form-data; name="%s"' % name)
1063 form.append(str(value))
1066 body = "\r\n".join(form) + "\r\n"
1067 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1069 return getPage(url, method="POST", postdata=body,
1070 headers=headers, followRedirect=followRedirect)
1072 def _test_web(self, res):
1073 base = self.webish_url
1074 public = "uri/" + self._root_directory_uri
1076 def _got_welcome(page):
1077 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1078 self.failUnless(expected in page,
1079 "I didn't see the right 'connected storage servers'"
1080 " message in: %s" % page
1082 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1083 self.failUnless(expected in page,
1084 "I didn't see the right 'My nodeid' message "
1086 self.failUnless("Helper: 0 active uploads" in page)
1087 d.addCallback(_got_welcome)
1088 d.addCallback(self.log, "done with _got_welcome")
1090 # get the welcome page from the node that uses the helper too
1091 d.addCallback(lambda res: getPage(self.helper_webish_url))
1092 def _got_welcome_helper(page):
1093 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1095 self.failUnless("Not running helper" in page)
1096 d.addCallback(_got_welcome_helper)
1098 d.addCallback(lambda res: getPage(base + public))
1099 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1100 def _got_subdir1(page):
1101 # there ought to be an href for our file
1102 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1103 self.failUnless(">mydata567</a>" in page)
1104 d.addCallback(_got_subdir1)
1105 d.addCallback(self.log, "done with _got_subdir1")
1106 d.addCallback(lambda res:
1107 getPage(base + public + "/subdir1/mydata567"))
1108 def _got_data(page):
1109 self.failUnlessEqual(page, self.data)
1110 d.addCallback(_got_data)
1112 # download from a URI embedded in a URL
1113 d.addCallback(self.log, "_get_from_uri")
1114 def _get_from_uri(res):
1115 return getPage(base + "uri/%s?filename=%s"
1116 % (self.uri, "mydata567"))
1117 d.addCallback(_get_from_uri)
1118 def _got_from_uri(page):
1119 self.failUnlessEqual(page, self.data)
1120 d.addCallback(_got_from_uri)
1122 # download from a URI embedded in a URL, second form
1123 d.addCallback(self.log, "_get_from_uri2")
1124 def _get_from_uri2(res):
1125 return getPage(base + "uri?uri=%s" % (self.uri,))
1126 d.addCallback(_get_from_uri2)
1127 d.addCallback(_got_from_uri)
1129 # download from a bogus URI, make sure we get a reasonable error
1130 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1131 def _get_from_bogus_uri(res):
1132 d1 = getPage(base + "uri/%s?filename=%s"
1133 % (self.mangle_uri(self.uri), "mydata567"))
1134 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1137 d.addCallback(_get_from_bogus_uri)
1138 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1140 # upload a file with PUT
1141 d.addCallback(self.log, "about to try PUT")
1142 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1143 "new.txt contents"))
1144 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1145 d.addCallback(self.failUnlessEqual, "new.txt contents")
1146 # and again with something large enough to use multiple segments,
1147 # and hopefully trigger pauseProducing too
1148 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1149 "big" * 500000)) # 1.5MB
1150 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1151 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1153 # can we replace files in place?
1154 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1156 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1157 d.addCallback(self.failUnlessEqual, "NEWER contents")
1159 # test unlinked POST
1160 d.addCallback(lambda res: self.POST("uri", t="upload",
1161 file=("new.txt", "data" * 10000)))
1162 # and again using the helper, which exercises different upload-status
1164 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1165 file=("foo.txt", "data2" * 10000)))
1167 # check that the status page exists
1168 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1169 def _got_status(res):
1170 # find an interesting upload and download to look at. LIT files
1171 # are not interesting.
1172 for ds in self.clients[0].list_all_download_statuses():
1173 if ds.get_size() > 200:
1174 self._down_status = ds.get_counter()
1175 for us in self.clients[0].list_all_upload_statuses():
1176 if us.get_size() > 200:
1177 self._up_status = us.get_counter()
1178 rs = self.clients[0].list_all_retrieve_statuses()[0]
1179 self._retrieve_status = rs.get_counter()
1180 ps = self.clients[0].list_all_publish_statuses()[0]
1181 self._publish_status = ps.get_counter()
1182 us = self.clients[0].list_all_mapupdate_statuses()[0]
1183 self._update_status = us.get_counter()
1185 # and that there are some upload- and download- status pages
1186 return self.GET("status/up-%d" % self._up_status)
1187 d.addCallback(_got_status)
1189 return self.GET("status/down-%d" % self._down_status)
1190 d.addCallback(_got_up)
1192 return self.GET("status/mapupdate-%d" % self._update_status)
1193 d.addCallback(_got_down)
1194 def _got_update(res):
1195 return self.GET("status/publish-%d" % self._publish_status)
1196 d.addCallback(_got_update)
1197 def _got_publish(res):
1198 return self.GET("status/retrieve-%d" % self._retrieve_status)
1199 d.addCallback(_got_publish)
1201 # check that the helper status page exists
1202 d.addCallback(lambda res:
1203 self.GET("helper_status", followRedirect=True))
1204 def _got_helper_status(res):
1205 self.failUnless("Bytes Fetched:" in res)
1206 # touch a couple of files in the helper's working directory to
1207 # exercise more code paths
1208 workdir = os.path.join(self.getdir("client0"), "helper")
1209 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1210 f = open(incfile, "wb")
1211 f.write("small file")
1213 then = time.time() - 86400*3
1215 os.utime(incfile, (now, then))
1216 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1217 f = open(encfile, "wb")
1218 f.write("less small file")
1220 os.utime(encfile, (now, then))
1221 d.addCallback(_got_helper_status)
1222 # and that the json form exists
1223 d.addCallback(lambda res:
1224 self.GET("helper_status?t=json", followRedirect=True))
1225 def _got_helper_status_json(res):
1226 data = simplejson.loads(res)
1227 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1229 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1230 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1231 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1233 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1234 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1235 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1237 d.addCallback(_got_helper_status_json)
1239 # and check that client[3] (which uses a helper but does not run one
1240 # itself) doesn't explode when you ask for its status
1241 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1242 def _got_non_helper_status(res):
1243 self.failUnless("Upload and Download Status" in res)
1244 d.addCallback(_got_non_helper_status)
1246 # or for helper status with t=json
1247 d.addCallback(lambda res:
1248 getPage(self.helper_webish_url + "helper_status?t=json"))
1249 def _got_non_helper_status_json(res):
1250 data = simplejson.loads(res)
1251 self.failUnlessEqual(data, {})
1252 d.addCallback(_got_non_helper_status_json)
1254 # see if the statistics page exists
1255 d.addCallback(lambda res: self.GET("statistics"))
1256 def _got_stats(res):
1257 self.failUnless("Node Statistics" in res)
1258 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1259 d.addCallback(_got_stats)
1260 d.addCallback(lambda res: self.GET("statistics?t=json"))
1261 def _got_stats_json(res):
1262 data = simplejson.loads(res)
1263 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1264 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1265 d.addCallback(_got_stats_json)
1267 # TODO: mangle the second segment of a file, to test errors that
1268 # occur after we've already sent some good data, which uses a
1269 # different error path.
1271 # TODO: download a URI with a form
1272 # TODO: create a directory by using a form
1273 # TODO: upload by using a form on the directory page
1274 # url = base + "somedir/subdir1/freeform_post!!upload"
1275 # TODO: delete a file by using a button on the directory page
1279 def _test_runner(self, res):
1280 # exercise some of the diagnostic tools in runner.py
1283 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1284 if "storage" not in dirpath:
1288 pieces = dirpath.split(os.sep)
1289 if pieces[-4] == "storage" and pieces[-3] == "shares":
1290 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1291 # are sharefiles here
1292 filename = os.path.join(dirpath, filenames[0])
1293 # peek at the magic to see if it is a chk share
1294 magic = open(filename, "rb").read(4)
1295 if magic == '\x00\x00\x00\x01':
1298 self.fail("unable to find any uri_extension files in %s"
1300 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1302 out,err = StringIO(), StringIO()
1303 rc = runner.runner(["debug", "dump-share", "--offsets",
1305 stdout=out, stderr=err)
1306 output = out.getvalue()
1307 self.failUnlessEqual(rc, 0)
1309 # we only upload a single file, so we can assert some things about
1310 # its size and shares.
1311 self.failUnless(("share filename: %s" % filename) in output)
1312 self.failUnless("size: %d\n" % len(self.data) in output)
1313 self.failUnless("num_segments: 1\n" in output)
1314 # segment_size is always a multiple of needed_shares
1315 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1316 self.failUnless("total_shares: 10\n" in output)
1317 # keys which are supposed to be present
1318 for key in ("size", "num_segments", "segment_size",
1319 "needed_shares", "total_shares",
1320 "codec_name", "codec_params", "tail_codec_params",
1321 #"plaintext_hash", "plaintext_root_hash",
1322 "crypttext_hash", "crypttext_root_hash",
1323 "share_root_hash", "UEB_hash"):
1324 self.failUnless("%s: " % key in output, key)
1325 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1327 # now use its storage index to find the other shares using the
1328 # 'find-shares' tool
1329 sharedir, shnum = os.path.split(filename)
1330 storagedir, storage_index_s = os.path.split(sharedir)
1331 out,err = StringIO(), StringIO()
1332 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1333 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1334 rc = runner.runner(cmd, stdout=out, stderr=err)
1335 self.failUnlessEqual(rc, 0)
1337 sharefiles = [sfn.strip() for sfn in out.readlines()]
1338 self.failUnlessEqual(len(sharefiles), 10)
1340 # also exercise the 'catalog-shares' tool
1341 out,err = StringIO(), StringIO()
1342 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1343 cmd = ["debug", "catalog-shares"] + nodedirs
1344 rc = runner.runner(cmd, stdout=out, stderr=err)
1345 self.failUnlessEqual(rc, 0)
1347 descriptions = [sfn.strip() for sfn in out.readlines()]
1348 self.failUnlessEqual(len(descriptions), 30)
1350 for line in descriptions
1351 if line.startswith("CHK %s " % storage_index_s)]
1352 self.failUnlessEqual(len(matching), 10)
1354 def _test_control(self, res):
1355 # exercise the remote-control-the-client foolscap interfaces in
1356 # allmydata.control (mostly used for performance tests)
1357 c0 = self.clients[0]
1358 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1359 control_furl = open(control_furl_file, "r").read().strip()
1360 # it doesn't really matter which Tub we use to connect to the client,
1361 # so let's just use our IntroducerNode's
1362 d = self.introducer.tub.getReference(control_furl)
1363 d.addCallback(self._test_control2, control_furl_file)
1365 def _test_control2(self, rref, filename):
1366 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1367 downfile = os.path.join(self.basedir, "control.downfile")
1368 d.addCallback(lambda uri:
1369 rref.callRemote("download_from_uri_to_file",
1372 self.failUnlessEqual(res, downfile)
1373 data = open(downfile, "r").read()
1374 expected_data = open(filename, "r").read()
1375 self.failUnlessEqual(data, expected_data)
1376 d.addCallback(_check)
1377 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1378 if sys.platform == "linux2":
1379 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1380 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1383 def _test_cli(self, res):
1384 # run various CLI commands (in a thread, since they use blocking
1387 private_uri = self._private_node.get_uri()
1388 some_uri = self._root_directory_uri
1389 client0_basedir = self.getdir("client0")
1392 "--node-directory", client0_basedir,
1394 TESTDATA = "I will not write the same thing over and over.\n" * 100
1396 d = defer.succeed(None)
1398 # for compatibility with earlier versions, private/root_dir.cap is
1399 # supposed to be treated as an alias named "tahoe:". Start by making
1400 # sure that works, before we add other aliases.
1402 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1403 f = open(root_file, "w")
1404 f.write(private_uri)
1407 def run(ignored, verb, *args, **kwargs):
1408 stdin = kwargs.get("stdin", "")
1409 newargs = [verb] + nodeargs + list(args)
1410 return self._run_cli(newargs, stdin=stdin)
1412 def _check_ls((out,err), expected_children, unexpected_children=[]):
1413 self.failUnlessEqual(err, "")
1414 for s in expected_children:
1415 self.failUnless(s in out, (s,out))
1416 for s in unexpected_children:
1417 self.failIf(s in out, (s,out))
1419 def _check_ls_root((out,err)):
1420 self.failUnless("personal" in out)
1421 self.failUnless("s2-ro" in out)
1422 self.failUnless("s2-rw" in out)
1423 self.failUnlessEqual(err, "")
1425 # this should reference private_uri
1426 d.addCallback(run, "ls")
1427 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1429 d.addCallback(run, "list-aliases")
1430 def _check_aliases_1((out,err)):
1431 self.failUnlessEqual(err, "")
1432 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1433 d.addCallback(_check_aliases_1)
1435 # now that that's out of the way, remove root_dir.cap and work with
1437 d.addCallback(lambda res: os.unlink(root_file))
1438 d.addCallback(run, "list-aliases")
1439 def _check_aliases_2((out,err)):
1440 self.failUnlessEqual(err, "")
1441 self.failUnlessEqual(out, "")
1442 d.addCallback(_check_aliases_2)
1444 d.addCallback(run, "mkdir")
1445 def _got_dir( (out,err) ):
1446 self.failUnless(uri.from_string_dirnode(out.strip()))
1448 d.addCallback(_got_dir)
1449 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1451 d.addCallback(run, "list-aliases")
1452 def _check_aliases_3((out,err)):
1453 self.failUnlessEqual(err, "")
1454 self.failUnless("tahoe: " in out)
1455 d.addCallback(_check_aliases_3)
1457 def _check_empty_dir((out,err)):
1458 self.failUnlessEqual(out, "")
1459 self.failUnlessEqual(err, "")
1460 d.addCallback(run, "ls")
1461 d.addCallback(_check_empty_dir)
1463 def _check_missing_dir((out,err)):
1464 # TODO: check that rc==2
1465 self.failUnlessEqual(out, "")
1466 self.failUnlessEqual(err, "No such file or directory\n")
1467 d.addCallback(run, "ls", "bogus")
1468 d.addCallback(_check_missing_dir)
1473 fn = os.path.join(self.basedir, "file%d" % i)
1475 data = "data to be uploaded: file%d\n" % i
1477 open(fn,"wb").write(data)
1479 def _check_stdout_against((out,err), filenum=None, data=None):
1480 self.failUnlessEqual(err, "")
1481 if filenum is not None:
1482 self.failUnlessEqual(out, datas[filenum])
1483 if data is not None:
1484 self.failUnlessEqual(out, data)
1486 # test all both forms of put: from a file, and from stdin
1488 d.addCallback(run, "put", files[0], "tahoe-file0")
1489 def _put_out((out,err)):
1490 self.failUnless("URI:LIT:" in out, out)
1491 self.failUnless("201 Created" in err, err)
1493 return run(None, "get", uri0)
1494 d.addCallback(_put_out)
1495 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1497 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1498 # tahoe put bar tahoe:FOO
1499 d.addCallback(run, "put", files[2], "tahoe:file2")
1500 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1501 def _check_put_mutable((out,err)):
1502 self._mutable_file3_uri = out.strip()
1503 d.addCallback(_check_put_mutable)
1504 d.addCallback(run, "get", "tahoe:file3")
1505 d.addCallback(_check_stdout_against, 3)
1508 STDIN_DATA = "This is the file to upload from stdin."
1509 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1510 # tahoe put tahoe:FOO
1511 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1512 stdin="Other file from stdin.")
1514 d.addCallback(run, "ls")
1515 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1516 "tahoe-file-stdin", "from-stdin"])
1517 d.addCallback(run, "ls", "subdir")
1518 d.addCallback(_check_ls, ["tahoe-file1"])
1521 d.addCallback(run, "mkdir", "subdir2")
1522 d.addCallback(run, "ls")
1523 # TODO: extract the URI, set an alias with it
1524 d.addCallback(_check_ls, ["subdir2"])
1526 # tahoe get: (to stdin and to a file)
1527 d.addCallback(run, "get", "tahoe-file0")
1528 d.addCallback(_check_stdout_against, 0)
1529 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1530 d.addCallback(_check_stdout_against, 1)
1531 outfile0 = os.path.join(self.basedir, "outfile0")
1532 d.addCallback(run, "get", "file2", outfile0)
1533 def _check_outfile0((out,err)):
1534 data = open(outfile0,"rb").read()
1535 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1536 d.addCallback(_check_outfile0)
1537 outfile1 = os.path.join(self.basedir, "outfile0")
1538 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1539 def _check_outfile1((out,err)):
1540 data = open(outfile1,"rb").read()
1541 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1542 d.addCallback(_check_outfile1)
1544 d.addCallback(run, "rm", "tahoe-file0")
1545 d.addCallback(run, "rm", "tahoe:file2")
1546 d.addCallback(run, "ls")
1547 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1549 d.addCallback(run, "ls", "-l")
1550 def _check_ls_l((out,err)):
1551 lines = out.split("\n")
1553 if "tahoe-file-stdin" in l:
1554 self.failUnless(l.startswith("-r-- "), l)
1555 self.failUnless(" %d " % len(STDIN_DATA) in l)
1557 self.failUnless(l.startswith("-rw- "), l) # mutable
1558 d.addCallback(_check_ls_l)
1560 d.addCallback(run, "ls", "--uri")
1561 def _check_ls_uri((out,err)):
1562 lines = out.split("\n")
1565 self.failUnless(self._mutable_file3_uri in l)
1566 d.addCallback(_check_ls_uri)
1568 d.addCallback(run, "ls", "--readonly-uri")
1569 def _check_ls_rouri((out,err)):
1570 lines = out.split("\n")
1573 rw_uri = self._mutable_file3_uri
1574 u = uri.from_string_mutable_filenode(rw_uri)
1575 ro_uri = u.get_readonly().to_string()
1576 self.failUnless(ro_uri in l)
1577 d.addCallback(_check_ls_rouri)
1580 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1581 d.addCallback(run, "ls")
1582 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1584 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1585 d.addCallback(run, "ls")
1586 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1588 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1589 d.addCallback(run, "ls")
1590 d.addCallback(_check_ls, ["file3", "file3-copy"])
1591 d.addCallback(run, "get", "tahoe:file3-copy")
1592 d.addCallback(_check_stdout_against, 3)
1594 # copy from disk into tahoe
1595 d.addCallback(run, "cp", files[4], "tahoe:file4")
1596 d.addCallback(run, "ls")
1597 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1598 d.addCallback(run, "get", "tahoe:file4")
1599 d.addCallback(_check_stdout_against, 4)
1601 # copy from tahoe into disk
1602 target_filename = os.path.join(self.basedir, "file-out")
1603 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1604 def _check_cp_out((out,err)):
1605 self.failUnless(os.path.exists(target_filename))
1606 got = open(target_filename,"rb").read()
1607 self.failUnlessEqual(got, datas[4])
1608 d.addCallback(_check_cp_out)
1610 # copy from disk to disk (silly case)
1611 target2_filename = os.path.join(self.basedir, "file-out-copy")
1612 d.addCallback(run, "cp", target_filename, target2_filename)
1613 def _check_cp_out2((out,err)):
1614 self.failUnless(os.path.exists(target2_filename))
1615 got = open(target2_filename,"rb").read()
1616 self.failUnlessEqual(got, datas[4])
1617 d.addCallback(_check_cp_out2)
1619 # copy from tahoe into disk, overwriting an existing file
1620 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1621 def _check_cp_out3((out,err)):
1622 self.failUnless(os.path.exists(target_filename))
1623 got = open(target_filename,"rb").read()
1624 self.failUnlessEqual(got, datas[3])
1625 d.addCallback(_check_cp_out3)
1627 # copy from disk into tahoe, overwriting an existing immutable file
1628 d.addCallback(run, "cp", files[5], "tahoe:file4")
1629 d.addCallback(run, "ls")
1630 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1631 d.addCallback(run, "get", "tahoe:file4")
1632 d.addCallback(_check_stdout_against, 5)
1634 # copy from disk into tahoe, overwriting an existing mutable file
1635 d.addCallback(run, "cp", files[5], "tahoe:file3")
1636 d.addCallback(run, "ls")
1637 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1638 d.addCallback(run, "get", "tahoe:file3")
1639 d.addCallback(_check_stdout_against, 5)
1641 # recursive copy: setup
1642 dn = os.path.join(self.basedir, "dir1")
1644 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1645 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1646 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1647 sdn2 = os.path.join(dn, "subdir2")
1649 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1650 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1652 # from disk into tahoe
1653 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1654 d.addCallback(run, "ls")
1655 d.addCallback(_check_ls, ["dir1"])
1656 d.addCallback(run, "ls", "dir1")
1657 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1658 ["rfile4", "rfile5"])
1659 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1660 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1661 ["rfile1", "rfile2", "rfile3"])
1662 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1663 d.addCallback(_check_stdout_against, data="rfile4")
1665 # and back out again
1666 dn_copy = os.path.join(self.basedir, "dir1-copy")
1667 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1668 def _check_cp_r_out((out,err)):
1670 old = open(os.path.join(dn, name), "rb").read()
1671 newfn = os.path.join(dn_copy, name)
1672 self.failUnless(os.path.exists(newfn))
1673 new = open(newfn, "rb").read()
1674 self.failUnlessEqual(old, new)
1678 _cmp(os.path.join("subdir2", "rfile4"))
1679 _cmp(os.path.join("subdir2", "rfile5"))
1680 d.addCallback(_check_cp_r_out)
1682 # and copy it a second time, which ought to overwrite the same files
1683 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1685 # and tahoe-to-tahoe
1686 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1687 d.addCallback(run, "ls")
1688 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1689 d.addCallback(run, "ls", "dir1-copy")
1690 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1691 ["rfile4", "rfile5"])
1692 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1693 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1694 ["rfile1", "rfile2", "rfile3"])
1695 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1696 d.addCallback(_check_stdout_against, data="rfile4")
1698 # and copy it a second time, which ought to overwrite the same files
1699 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1701 # tahoe_ls doesn't currently handle the error correctly: it tries to
1702 # JSON-parse a traceback.
1703 ## def _ls_missing(res):
1704 ## argv = ["ls"] + nodeargs + ["bogus"]
1705 ## return self._run_cli(argv)
1706 ## d.addCallback(_ls_missing)
1707 ## def _check_ls_missing((out,err)):
1710 ## self.failUnlessEqual(err, "")
1711 ## d.addCallback(_check_ls_missing)
1715 def _run_cli(self, argv, stdin=""):
1717 stdout, stderr = StringIO(), StringIO()
1718 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1719 stdin=StringIO(stdin),
1720 stdout=stdout, stderr=stderr)
1722 return stdout.getvalue(), stderr.getvalue()
1723 d.addCallback(_done)
1726 def _test_checker(self, res):
1727 ut = upload.Data("too big to be literal" * 200, convergence=None)
1728 d = self._personal_node.add_file(u"big file", ut)
1730 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1731 def _check_dirnode_results(r):
1732 self.failUnless(r.is_healthy())
1733 d.addCallback(_check_dirnode_results)
1734 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1735 d.addCallback(_check_dirnode_results)
1737 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1738 def _got_chk_filenode(n):
1739 self.failUnless(isinstance(n, filenode.FileNode))
1740 d = n.check(Monitor())
1741 def _check_filenode_results(r):
1742 self.failUnless(r.is_healthy())
1743 d.addCallback(_check_filenode_results)
1744 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1745 d.addCallback(_check_filenode_results)
1747 d.addCallback(_got_chk_filenode)
1749 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1750 def _got_lit_filenode(n):
1751 self.failUnless(isinstance(n, filenode.LiteralFileNode))
1752 d = n.check(Monitor())
1753 def _check_lit_filenode_results(r):
1754 self.failUnlessEqual(r, None)
1755 d.addCallback(_check_lit_filenode_results)
1756 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1757 d.addCallback(_check_lit_filenode_results)
1759 d.addCallback(_got_lit_filenode)
1763 class MutableChecker(SystemTestMixin, unittest.TestCase, ErrorMixin):
1765 def _run_cli(self, argv):
1766 stdout, stderr = StringIO(), StringIO()
1767 # this can only do synchronous operations
1768 assert argv[0] == "debug"
1769 runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1770 return stdout.getvalue()
1772 def test_good(self):
1773 self.basedir = self.mktemp()
1774 d = self.set_up_nodes()
1775 CONTENTS = "a little bit of data"
1776 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1779 si = self.node.get_storage_index()
1780 d.addCallback(_created)
1781 # now make sure the webapi verifier sees no problems
1783 url = (self.webish_url +
1784 "uri/%s" % urllib.quote(self.node.get_uri()) +
1785 "?t=check&verify=true")
1786 return getPage(url, method="POST")
1787 d.addCallback(_do_check)
1788 def _got_results(out):
1789 self.failUnless("<span>Healthy : Healthy</span>" in out, out)
1790 self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1791 self.failIf("Not Healthy!" in out, out)
1792 self.failIf("Unhealthy" in out, out)
1793 self.failIf("Corrupt Shares" in out, out)
1794 d.addCallback(_got_results)
1795 d.addErrback(self.explain_web_error)
1798 def test_corrupt(self):
1799 self.basedir = self.mktemp()
1800 d = self.set_up_nodes()
1801 CONTENTS = "a little bit of data"
1802 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1805 si = self.node.get_storage_index()
1806 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1807 self.clients[1].basedir])
1808 files = out.split("\n")
1809 # corrupt one of them, using the CLI debug command
1811 shnum = os.path.basename(f)
1812 nodeid = self.clients[1].nodeid
1813 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1814 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1815 out = self._run_cli(["debug", "corrupt-share", files[0]])
1816 d.addCallback(_created)
1817 # now make sure the webapi verifier notices it
1819 url = (self.webish_url +
1820 "uri/%s" % urllib.quote(self.node.get_uri()) +
1821 "?t=check&verify=true")
1822 return getPage(url, method="POST")
1823 d.addCallback(_do_check)
1824 def _got_results(out):
1825 self.failUnless("Not Healthy!" in out, out)
1826 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1827 self.failUnless("Corrupt Shares:" in out, out)
1828 d.addCallback(_got_results)
1830 # now make sure the webapi repairer can fix it
1831 def _do_repair(res):
1832 url = (self.webish_url +
1833 "uri/%s" % urllib.quote(self.node.get_uri()) +
1834 "?t=check&verify=true&repair=true")
1835 return getPage(url, method="POST")
1836 d.addCallback(_do_repair)
1837 def _got_repair_results(out):
1838 self.failUnless("<div>Repair successful</div>" in out, out)
1839 d.addCallback(_got_repair_results)
1840 d.addCallback(_do_check)
1841 def _got_postrepair_results(out):
1842 self.failIf("Not Healthy!" in out, out)
1843 self.failUnless("Recoverable Versions: 10*seq" in out, out)
1844 d.addCallback(_got_postrepair_results)
1845 d.addErrback(self.explain_web_error)
1849 def test_delete_share(self):
1850 self.basedir = self.mktemp()
1851 d = self.set_up_nodes()
1852 CONTENTS = "a little bit of data"
1853 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1856 si = self.node.get_storage_index()
1857 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1858 self.clients[1].basedir])
1859 files = out.split("\n")
1860 # corrupt one of them, using the CLI debug command
1862 shnum = os.path.basename(f)
1863 nodeid = self.clients[1].nodeid
1864 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1865 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1867 d.addCallback(_created)
1868 # now make sure the webapi checker notices it
1870 url = (self.webish_url +
1871 "uri/%s" % urllib.quote(self.node.get_uri()) +
1872 "?t=check&verify=false")
1873 return getPage(url, method="POST")
1874 d.addCallback(_do_check)
1875 def _got_results(out):
1876 self.failUnless("Not Healthy!" in out, out)
1877 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1878 self.failIf("Corrupt Shares" in out, out)
1879 d.addCallback(_got_results)
1881 # now make sure the webapi repairer can fix it
1882 def _do_repair(res):
1883 url = (self.webish_url +
1884 "uri/%s" % urllib.quote(self.node.get_uri()) +
1885 "?t=check&verify=false&repair=true")
1886 return getPage(url, method="POST")
1887 d.addCallback(_do_repair)
1888 def _got_repair_results(out):
1889 self.failUnless("Repair successful" in out)
1890 d.addCallback(_got_repair_results)
1891 d.addCallback(_do_check)
1892 def _got_postrepair_results(out):
1893 self.failIf("Not Healthy!" in out, out)
1894 self.failUnless("Recoverable Versions: 10*seq" in out)
1895 d.addCallback(_got_postrepair_results)
1896 d.addErrback(self.explain_web_error)
1901 class DeepCheckBase(SystemTestMixin, ErrorMixin):
1903 def web_json(self, n, **kwargs):
1904 kwargs["output"] = "json"
1905 d = self.web(n, "POST", **kwargs)
1906 d.addCallback(self.decode_json)
1909 def decode_json(self, (s,url)):
1911 data = simplejson.loads(s)
1913 self.fail("%s: not JSON: '%s'" % (url, s))
1916 def web(self, n, method="GET", **kwargs):
1917 # returns (data, url)
1918 url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
1919 + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
1920 d = getPage(url, method=method)
1921 d.addCallback(lambda data: (data,url))
1924 def wait_for_operation(self, ignored, ophandle):
1925 url = self.webish_url + "operations/" + ophandle
1926 url += "?t=status&output=JSON"
1930 data = simplejson.loads(res)
1932 self.fail("%s: not JSON: '%s'" % (url, res))
1933 if not data["finished"]:
1934 d = self.stall(delay=1.0)
1935 d.addCallback(self.wait_for_operation, ophandle)
1941 def get_operation_results(self, ignored, ophandle, output=None):
1942 url = self.webish_url + "operations/" + ophandle
1945 url += "&output=" + output
1948 if output and output.lower() == "json":
1950 return simplejson.loads(res)
1952 self.fail("%s: not JSON: '%s'" % (url, res))
1957 def slow_web(self, n, output=None, **kwargs):
1959 handle = base32.b2a(os.urandom(4))
1960 d = self.web(n, "POST", ophandle=handle, **kwargs)
1961 d.addCallback(self.wait_for_operation, handle)
1962 d.addCallback(self.get_operation_results, handle, output=output)
1966 class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
1967 # construct a small directory tree (with one dir, one immutable file, one
1968 # mutable file, one LIT file, and a loop), and then check/examine it in
1971 def set_up_tree(self, ignored):
1980 c0 = self.clients[0]
1981 d = c0.create_empty_dirnode()
1982 def _created_root(n):
1984 self.root_uri = n.get_uri()
1985 d.addCallback(_created_root)
1986 d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1987 d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1988 def _created_mutable(n):
1990 self.mutable_uri = n.get_uri()
1991 d.addCallback(_created_mutable)
1993 large = upload.Data("Lots of data\n" * 1000, None)
1994 d.addCallback(lambda ign: self.root.add_file(u"large", large))
1995 def _created_large(n):
1997 self.large_uri = n.get_uri()
1998 d.addCallback(_created_large)
2000 small = upload.Data("Small enough for a LIT", None)
2001 d.addCallback(lambda ign: self.root.add_file(u"small", small))
2002 def _created_small(n):
2004 self.small_uri = n.get_uri()
2005 d.addCallback(_created_small)
2007 small2 = upload.Data("Small enough for a LIT too", None)
2008 d.addCallback(lambda ign: self.root.add_file(u"small2", small2))
2009 def _created_small2(n):
2011 self.small2_uri = n.get_uri()
2012 d.addCallback(_created_small2)
2014 d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
2017 def check_is_healthy(self, cr, n, where, incomplete=False):
2018 self.failUnless(ICheckerResults.providedBy(cr), where)
2019 self.failUnless(cr.is_healthy(), where)
2020 self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
2022 self.failUnlessEqual(cr.get_storage_index_string(),
2023 base32.b2a(n.get_storage_index()), where)
2024 needs_rebalancing = bool( len(self.clients) < 10 )
2026 self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
2028 self.failUnlessEqual(d["count-shares-good"], 10, where)
2029 self.failUnlessEqual(d["count-shares-needed"], 3, where)
2030 self.failUnlessEqual(d["count-shares-expected"], 10, where)
2032 self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
2033 self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
2034 self.failUnlessEqual(d["list-corrupt-shares"], [], where)
2036 self.failUnlessEqual(sorted(d["servers-responding"]),
2037 sorted([c.nodeid for c in self.clients]),
2039 self.failUnless("sharemap" in d, where)
2040 all_serverids = set()
2041 for (shareid, serverids) in d["sharemap"].items():
2042 all_serverids.update(serverids)
2043 self.failUnlessEqual(sorted(all_serverids),
2044 sorted([c.nodeid for c in self.clients]),
2047 self.failUnlessEqual(d["count-wrong-shares"], 0, where)
2048 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2049 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2052 def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
2053 self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
2054 self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
2055 self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
2056 self.failUnless(cr.get_post_repair_results().is_healthy(), where)
2057 self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
2058 self.failIf(cr.get_repair_attempted(), where)
2060 def deep_check_is_healthy(self, cr, num_healthy, where):
2061 self.failUnless(IDeepCheckResults.providedBy(cr))
2062 self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
2065 def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
2066 self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
2067 c = cr.get_counters()
2068 self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
2070 self.failUnlessEqual(c["count-objects-healthy-post-repair"],
2072 self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
2074 def test_good(self):
2075 self.basedir = self.mktemp()
2076 d = self.set_up_nodes()
2077 d.addCallback(self.set_up_tree)
2078 d.addCallback(self.do_stats)
2079 d.addCallback(self.do_test_check_good)
2080 d.addCallback(self.do_test_web_good)
2081 d.addCallback(self.do_test_cli_good)
2082 d.addErrback(self.explain_web_error)
2083 d.addErrback(self.explain_error)
2086 def do_stats(self, ignored):
2087 d = defer.succeed(None)
2088 d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
2089 d.addCallback(self.check_stats_good)
2092 def check_stats_good(self, s):
2093 self.failUnlessEqual(s["count-directories"], 1)
2094 self.failUnlessEqual(s["count-files"], 4)
2095 self.failUnlessEqual(s["count-immutable-files"], 1)
2096 self.failUnlessEqual(s["count-literal-files"], 2)
2097 self.failUnlessEqual(s["count-mutable-files"], 1)
2098 # don't check directories: their size will vary
2099 # s["largest-directory"]
2100 # s["size-directories"]
2101 self.failUnlessEqual(s["largest-directory-children"], 5)
2102 self.failUnlessEqual(s["largest-immutable-file"], 13000)
2103 # to re-use this function for both the local
2104 # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
2105 # coerce the result into a list of tuples. dirnode.start_deep_stats()
2106 # returns a list of tuples, but JSON only knows about lists., so
2107 # t=start-deep-stats returns a list of lists.
2108 histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
2109 self.failUnlessEqual(histogram, [(11, 31, 2),
2112 self.failUnlessEqual(s["size-immutable-files"], 13000)
2113 self.failUnlessEqual(s["size-literal-files"], 48)
2115 def do_test_check_good(self, ignored):
2116 d = defer.succeed(None)
2117 # check the individual items
2118 d.addCallback(lambda ign: self.root.check(Monitor()))
2119 d.addCallback(self.check_is_healthy, self.root, "root")
2120 d.addCallback(lambda ign: self.mutable.check(Monitor()))
2121 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2122 d.addCallback(lambda ign: self.large.check(Monitor()))
2123 d.addCallback(self.check_is_healthy, self.large, "large")
2124 d.addCallback(lambda ign: self.small.check(Monitor()))
2125 d.addCallback(self.failUnlessEqual, None, "small")
2126 d.addCallback(lambda ign: self.small2.check(Monitor()))
2127 d.addCallback(self.failUnlessEqual, None, "small2")
2129 # and again with verify=True
2130 d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2131 d.addCallback(self.check_is_healthy, self.root, "root")
2132 d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2133 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2134 d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2135 d.addCallback(self.check_is_healthy, self.large, "large",
2137 d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2138 d.addCallback(self.failUnlessEqual, None, "small")
2139 d.addCallback(lambda ign: self.small2.check(Monitor(), verify=True))
2140 d.addCallback(self.failUnlessEqual, None, "small2")
2142 # and check_and_repair(), which should be a nop
2143 d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2144 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2145 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2146 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2147 d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2148 d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2149 d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2150 d.addCallback(self.failUnlessEqual, None, "small")
2151 d.addCallback(lambda ign: self.small2.check_and_repair(Monitor()))
2152 d.addCallback(self.failUnlessEqual, None, "small2")
2154 # check_and_repair(verify=True)
2155 d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2156 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2157 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2158 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2159 d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2160 d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2162 d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2163 d.addCallback(self.failUnlessEqual, None, "small")
2164 d.addCallback(lambda ign: self.small2.check_and_repair(Monitor(), verify=True))
2165 d.addCallback(self.failUnlessEqual, None, "small2")
2168 # now deep-check the root, with various verify= and repair= options
2169 d.addCallback(lambda ign:
2170 self.root.start_deep_check().when_done())
2171 d.addCallback(self.deep_check_is_healthy, 3, "root")
2172 d.addCallback(lambda ign:
2173 self.root.start_deep_check(verify=True).when_done())
2174 d.addCallback(self.deep_check_is_healthy, 3, "root")
2175 d.addCallback(lambda ign:
2176 self.root.start_deep_check_and_repair().when_done())
2177 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2178 d.addCallback(lambda ign:
2179 self.root.start_deep_check_and_repair(verify=True).when_done())
2180 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2182 # and finally, start a deep-check, but then cancel it.
2183 d.addCallback(lambda ign: self.root.start_deep_check())
2184 def _checking(monitor):
2186 d = monitor.when_done()
2187 # this should fire as soon as the next dirnode.list finishes.
2188 # TODO: add a counter to measure how many list() calls are made,
2189 # assert that no more than one gets to run before the cancel()
2191 def _finished_normally(res):
2192 self.fail("this was supposed to fail, not finish normally")
2194 f.trap(OperationCancelledError)
2195 d.addCallbacks(_finished_normally, _cancelled)
2197 d.addCallback(_checking)
2201 def json_check_is_healthy(self, data, n, where, incomplete=False):
2203 self.failUnlessEqual(data["storage-index"],
2204 base32.b2a(n.get_storage_index()), where)
2205 self.failUnless("summary" in data, (where, data))
2206 self.failUnlessEqual(data["summary"].lower(), "healthy",
2207 "%s: '%s'" % (where, data["summary"]))
2209 self.failUnlessEqual(r["healthy"], True, where)
2210 needs_rebalancing = bool( len(self.clients) < 10 )
2212 self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2213 self.failUnlessEqual(r["count-shares-good"], 10, where)
2214 self.failUnlessEqual(r["count-shares-needed"], 3, where)
2215 self.failUnlessEqual(r["count-shares-expected"], 10, where)
2217 self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2218 self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2219 self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2221 self.failUnlessEqual(sorted(r["servers-responding"]),
2222 sorted([idlib.nodeid_b2a(c.nodeid)
2223 for c in self.clients]), where)
2224 self.failUnless("sharemap" in r, where)
2225 all_serverids = set()
2226 for (shareid, serverids_s) in r["sharemap"].items():
2227 all_serverids.update(serverids_s)
2228 self.failUnlessEqual(sorted(all_serverids),
2229 sorted([idlib.nodeid_b2a(c.nodeid)
2230 for c in self.clients]), where)
2231 self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2232 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2233 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2235 def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2236 self.failUnlessEqual(data["storage-index"],
2237 base32.b2a(n.get_storage_index()), where)
2238 self.failUnlessEqual(data["repair-attempted"], False, where)
2239 self.json_check_is_healthy(data["pre-repair-results"],
2240 n, where, incomplete)
2241 self.json_check_is_healthy(data["post-repair-results"],
2242 n, where, incomplete)
2244 def json_full_deepcheck_is_healthy(self, data, n, where):
2245 self.failUnlessEqual(data["root-storage-index"],
2246 base32.b2a(n.get_storage_index()), where)
2247 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2248 self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2249 self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2250 self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2251 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2252 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2253 self.json_check_stats_good(data["stats"], where)
2255 def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2256 self.failUnlessEqual(data["root-storage-index"],
2257 base32.b2a(n.get_storage_index()), where)
2258 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2260 self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2261 self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2262 self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2264 self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2265 self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2266 self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2268 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2269 self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2270 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2272 self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2273 self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2274 self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2277 def json_check_lit(self, data, n, where):
2278 self.failUnlessEqual(data["storage-index"], "", where)
2279 self.failUnlessEqual(data["results"]["healthy"], True, where)
2281 def json_check_stats_good(self, data, where):
2282 self.check_stats_good(data)
2284 def do_test_web_good(self, ignored):
2285 d = defer.succeed(None)
2288 d.addCallback(lambda ign:
2289 self.slow_web(self.root,
2290 t="start-deep-stats", output="json"))
2291 d.addCallback(self.json_check_stats_good, "deep-stats")
2294 d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2295 d.addCallback(self.json_check_is_healthy, self.root, "root")
2296 d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2297 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2298 d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2299 d.addCallback(self.json_check_is_healthy, self.large, "large")
2300 d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2301 d.addCallback(self.json_check_lit, self.small, "small")
2302 d.addCallback(lambda ign: self.web_json(self.small2, t="check"))
2303 d.addCallback(self.json_check_lit, self.small2, "small2")
2306 d.addCallback(lambda ign:
2307 self.web_json(self.root, t="check", verify="true"))
2308 d.addCallback(self.json_check_is_healthy, self.root, "root+v")
2309 d.addCallback(lambda ign:
2310 self.web_json(self.mutable, t="check", verify="true"))
2311 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable+v")
2312 d.addCallback(lambda ign:
2313 self.web_json(self.large, t="check", verify="true"))
2314 d.addCallback(self.json_check_is_healthy, self.large, "large+v",
2316 d.addCallback(lambda ign:
2317 self.web_json(self.small, t="check", verify="true"))
2318 d.addCallback(self.json_check_lit, self.small, "small+v")
2319 d.addCallback(lambda ign:
2320 self.web_json(self.small2, t="check", verify="true"))
2321 d.addCallback(self.json_check_lit, self.small2, "small2+v")
2323 # check and repair, no verify
2324 d.addCallback(lambda ign:
2325 self.web_json(self.root, t="check", repair="true"))
2326 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+r")
2327 d.addCallback(lambda ign:
2328 self.web_json(self.mutable, t="check", repair="true"))
2329 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+r")
2330 d.addCallback(lambda ign:
2331 self.web_json(self.large, t="check", repair="true"))
2332 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+r")
2333 d.addCallback(lambda ign:
2334 self.web_json(self.small, t="check", repair="true"))
2335 d.addCallback(self.json_check_lit, self.small, "small+r")
2336 d.addCallback(lambda ign:
2337 self.web_json(self.small2, t="check", repair="true"))
2338 d.addCallback(self.json_check_lit, self.small2, "small2+r")
2340 # check+verify+repair
2341 d.addCallback(lambda ign:
2342 self.web_json(self.root, t="check", repair="true", verify="true"))
2343 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+vr")
2344 d.addCallback(lambda ign:
2345 self.web_json(self.mutable, t="check", repair="true", verify="true"))
2346 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+vr")
2347 d.addCallback(lambda ign:
2348 self.web_json(self.large, t="check", repair="true", verify="true"))
2349 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+vr", incomplete=True)
2350 d.addCallback(lambda ign:
2351 self.web_json(self.small, t="check", repair="true", verify="true"))
2352 d.addCallback(self.json_check_lit, self.small, "small+vr")
2353 d.addCallback(lambda ign:
2354 self.web_json(self.small2, t="check", repair="true", verify="true"))
2355 d.addCallback(self.json_check_lit, self.small2, "small2+vr")
2357 # now run a deep-check, with various verify= and repair= flags
2358 d.addCallback(lambda ign:
2359 self.slow_web(self.root, t="start-deep-check", output="json"))
2360 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+d")
2361 d.addCallback(lambda ign:
2362 self.slow_web(self.root, t="start-deep-check", verify="true",
2364 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+dv")
2365 d.addCallback(lambda ign:
2366 self.slow_web(self.root, t="start-deep-check", repair="true",
2368 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dr")
2369 d.addCallback(lambda ign:
2370 self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2371 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dvr")
2373 # now look at t=info
2374 d.addCallback(lambda ign: self.web(self.root, t="info"))
2375 # TODO: examine the output
2376 d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2377 d.addCallback(lambda ign: self.web(self.large, t="info"))
2378 d.addCallback(lambda ign: self.web(self.small, t="info"))
2379 d.addCallback(lambda ign: self.web(self.small2, t="info"))
2383 def _run_cli(self, argv, stdin=""):
2385 stdout, stderr = StringIO(), StringIO()
2386 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
2387 stdin=StringIO(stdin),
2388 stdout=stdout, stderr=stderr)
2390 return stdout.getvalue(), stderr.getvalue()
2391 d.addCallback(_done)
2394 def do_test_cli_good(self, ignored):
2395 basedir = self.getdir("client0")
2396 d = self._run_cli(["manifest",
2397 "--node-directory", basedir,
2399 def _check((out,err)):
2400 self.failUnlessEqual(err, "")
2401 lines = [l for l in out.split("\n") if l]
2402 self.failUnlessEqual(len(lines), 5)
2406 cap, path = l.split(None, 1)
2411 self.failUnless(self.root.get_uri() in caps)
2412 self.failUnlessEqual(caps[self.root.get_uri()], "")
2413 self.failUnlessEqual(caps[self.mutable.get_uri()], "mutable")
2414 self.failUnlessEqual(caps[self.large.get_uri()], "large")
2415 self.failUnlessEqual(caps[self.small.get_uri()], "small")
2416 self.failUnlessEqual(caps[self.small2.get_uri()], "small2")
2417 d.addCallback(_check)
2419 d.addCallback(lambda res:
2420 self._run_cli(["manifest",
2421 "--node-directory", basedir,
2422 "--storage-index", self.root_uri]))
2423 def _check2((out,err)):
2424 self.failUnlessEqual(err, "")
2425 lines = [l for l in out.split("\n") if l]
2426 self.failUnlessEqual(len(lines), 3)
2427 self.failUnless(base32.b2a(self.root.get_storage_index()) in lines)
2428 self.failUnless(base32.b2a(self.mutable.get_storage_index()) in lines)
2429 self.failUnless(base32.b2a(self.large.get_storage_index()) in lines)
2430 d.addCallback(_check2)
2432 d.addCallback(lambda res:
2433 self._run_cli(["manifest",
2434 "--node-directory", basedir,
2435 "--raw", self.root_uri]))
2436 def _check2r((out,err)):
2437 self.failUnlessEqual(err, "")
2438 data = simplejson.loads(out)
2439 sis = data["storage-index"]
2440 self.failUnlessEqual(len(sis), 3)
2441 self.failUnless(base32.b2a(self.root.get_storage_index()) in sis)
2442 self.failUnless(base32.b2a(self.mutable.get_storage_index()) in sis)
2443 self.failUnless(base32.b2a(self.large.get_storage_index()) in sis)
2444 self.failUnlessEqual(data["stats"]["count-files"], 4)
2445 self.failUnlessEqual(data["origin"],
2446 base32.b2a(self.root.get_storage_index()))
2447 verifycaps = data["verifycaps"]
2448 self.failUnlessEqual(len(verifycaps), 3)
2449 self.failUnless(self.root.get_verifier().to_string() in verifycaps)
2450 self.failUnless(self.mutable.get_verifier().to_string() in verifycaps)
2451 self.failUnless(self.large.get_verifier().to_string() in verifycaps)
2452 d.addCallback(_check2r)
2454 d.addCallback(lambda res:
2455 self._run_cli(["stats",
2456 "--node-directory", basedir,
2458 def _check3((out,err)):
2459 lines = [l.strip() for l in out.split("\n") if l]
2460 self.failUnless("count-immutable-files: 1" in lines)
2461 self.failUnless("count-mutable-files: 1" in lines)
2462 self.failUnless("count-literal-files: 2" in lines)
2463 self.failUnless("count-files: 4" in lines)
2464 self.failUnless("count-directories: 1" in lines)
2465 self.failUnless("size-immutable-files: 13000 (13.00 kB, 12.70 kiB)" in lines, lines)
2466 self.failUnless("size-literal-files: 48" in lines)
2467 self.failUnless(" 11-31 : 2 (31 B, 31 B)".strip() in lines)
2468 self.failUnless("10001-31622 : 1 (31.62 kB, 30.88 kiB)".strip() in lines)
2469 d.addCallback(_check3)
2471 d.addCallback(lambda res:
2472 self._run_cli(["stats",
2473 "--node-directory", basedir,
2476 def _check4((out,err)):
2477 data = simplejson.loads(out)
2478 self.failUnlessEqual(data["count-immutable-files"], 1)
2479 self.failUnlessEqual(data["count-immutable-files"], 1)
2480 self.failUnlessEqual(data["count-mutable-files"], 1)
2481 self.failUnlessEqual(data["count-literal-files"], 2)
2482 self.failUnlessEqual(data["count-files"], 4)
2483 self.failUnlessEqual(data["count-directories"], 1)
2484 self.failUnlessEqual(data["size-immutable-files"], 13000)
2485 self.failUnlessEqual(data["size-literal-files"], 48)
2486 self.failUnless([11,31,2] in data["size-files-histogram"])
2487 self.failUnless([10001,31622,1] in data["size-files-histogram"])
2488 d.addCallback(_check4)
2493 class DeepCheckWebBad(DeepCheckBase, unittest.TestCase):
2496 self.basedir = self.mktemp()
2497 d = self.set_up_nodes()
2498 d.addCallback(self.set_up_damaged_tree)
2499 d.addCallback(self.do_test_check_bad)
2500 d.addCallback(self.do_test_deepcheck_bad)
2501 d.addCallback(self.do_test_web_bad)
2502 d.addErrback(self.explain_web_error)
2503 d.addErrback(self.explain_error)
2508 def set_up_damaged_tree(self, ignored):
2513 # mutable-missing-shares
2514 # mutable-corrupt-shares
2515 # mutable-unrecoverable
2517 # large-missing-shares
2518 # large-corrupt-shares
2519 # large-unrecoverable
2523 c0 = self.clients[0]
2524 d = c0.create_empty_dirnode()
2525 def _created_root(n):
2527 self.root_uri = n.get_uri()
2528 d.addCallback(_created_root)
2529 d.addCallback(self.create_mangled, "mutable-good")
2530 d.addCallback(self.create_mangled, "mutable-missing-shares")
2531 d.addCallback(self.create_mangled, "mutable-corrupt-shares")
2532 d.addCallback(self.create_mangled, "mutable-unrecoverable")
2533 d.addCallback(self.create_mangled, "large-good")
2534 d.addCallback(self.create_mangled, "large-missing-shares")
2535 d.addCallback(self.create_mangled, "large-corrupt-shares")
2536 d.addCallback(self.create_mangled, "large-unrecoverable")
2541 def create_mangled(self, ignored, name):
2542 nodetype, mangletype = name.split("-", 1)
2543 if nodetype == "mutable":
2544 d = self.clients[0].create_mutable_file("mutable file contents")
2545 d.addCallback(lambda n: self.root.set_node(unicode(name), n))
2546 elif nodetype == "large":
2547 large = upload.Data("Lots of data\n" * 1000 + name + "\n", None)
2548 d = self.root.add_file(unicode(name), large)
2549 elif nodetype == "small":
2550 small = upload.Data("Small enough for a LIT", None)
2551 d = self.root.add_file(unicode(name), small)
2553 def _stash_node(node):
2554 self.nodes[name] = node
2556 d.addCallback(_stash_node)
2558 if mangletype == "good":
2560 elif mangletype == "missing-shares":
2561 d.addCallback(self._delete_some_shares)
2562 elif mangletype == "corrupt-shares":
2563 d.addCallback(self._corrupt_some_shares)
2565 assert mangletype == "unrecoverable"
2566 d.addCallback(self._delete_most_shares)
2570 def _run_cli(self, argv):
2571 stdout, stderr = StringIO(), StringIO()
2572 # this can only do synchronous operations
2573 assert argv[0] == "debug"
2574 runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
2575 return stdout.getvalue()
2577 def _find_shares(self, node):
2578 si = node.get_storage_index()
2579 out = self._run_cli(["debug", "find-shares", base32.b2a(si)] +
2580 [c.basedir for c in self.clients])
2581 files = out.split("\n")
2582 return [f for f in files if f]
2584 def _delete_some_shares(self, node):
2585 shares = self._find_shares(node)
2586 os.unlink(shares[0])
2587 os.unlink(shares[1])
2589 def _corrupt_some_shares(self, node):
2590 shares = self._find_shares(node)
2591 self._run_cli(["debug", "corrupt-share", shares[0]])
2592 self._run_cli(["debug", "corrupt-share", shares[1]])
2594 def _delete_most_shares(self, node):
2595 shares = self._find_shares(node)
2596 for share in shares[1:]:
2600 def check_is_healthy(self, cr, where):
2601 self.failUnless(ICheckerResults.providedBy(cr), where)
2602 self.failUnless(cr.is_healthy(), where)
2603 self.failUnless(cr.is_recoverable(), where)
2605 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2606 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2609 def check_is_missing_shares(self, cr, where):
2610 self.failUnless(ICheckerResults.providedBy(cr), where)
2611 self.failIf(cr.is_healthy(), where)
2612 self.failUnless(cr.is_recoverable(), where)
2614 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2615 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2618 def check_has_corrupt_shares(self, cr, where):
2619 # by "corrupt-shares" we mean the file is still recoverable
2620 self.failUnless(ICheckerResults.providedBy(cr), where)
2622 self.failIf(cr.is_healthy(), where)
2623 self.failUnless(cr.is_recoverable(), where)
2625 self.failUnless(d["count-shares-good"] < 10, where)
2626 self.failUnless(d["count-corrupt-shares"], where)
2627 self.failUnless(d["list-corrupt-shares"], where)
2630 def check_is_unrecoverable(self, cr, where):
2631 self.failUnless(ICheckerResults.providedBy(cr), where)
2633 self.failIf(cr.is_healthy(), where)
2634 self.failIf(cr.is_recoverable(), where)
2635 self.failUnless(d["count-shares-good"] < d["count-shares-needed"],
2637 self.failUnlessEqual(d["count-recoverable-versions"], 0, where)
2638 self.failUnlessEqual(d["count-unrecoverable-versions"], 1, where)
2641 def do_test_check_bad(self, ignored):
2642 d = defer.succeed(None)
2644 # check the individual items, without verification. This will not
2645 # detect corrupt shares.
2646 def _check(which, checker):
2647 d = self.nodes[which].check(Monitor())
2648 d.addCallback(checker, which + "--check")
2651 d.addCallback(lambda ign: _check("mutable-good", self.check_is_healthy))
2652 d.addCallback(lambda ign: _check("mutable-missing-shares",
2653 self.check_is_missing_shares))
2654 d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2655 self.check_is_healthy))
2656 d.addCallback(lambda ign: _check("mutable-unrecoverable",
2657 self.check_is_unrecoverable))
2658 d.addCallback(lambda ign: _check("large-good", self.check_is_healthy))
2659 d.addCallback(lambda ign: _check("large-missing-shares",
2660 self.check_is_missing_shares))
2661 d.addCallback(lambda ign: _check("large-corrupt-shares",
2662 self.check_is_healthy))
2663 d.addCallback(lambda ign: _check("large-unrecoverable",
2664 self.check_is_unrecoverable))
2666 # and again with verify=True, which *does* detect corrupt shares.
2667 def _checkv(which, checker):
2668 d = self.nodes[which].check(Monitor(), verify=True)
2669 d.addCallback(checker, which + "--check-and-verify")
2672 d.addCallback(lambda ign: _checkv("mutable-good", self.check_is_healthy))
2673 d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2674 self.check_is_missing_shares))
2675 d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2676 self.check_has_corrupt_shares))
2677 d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2678 self.check_is_unrecoverable))
2679 d.addCallback(lambda ign: _checkv("large-good", self.check_is_healthy))
2680 # disabled pending immutable verifier
2681 #d.addCallback(lambda ign: _checkv("large-missing-shares",
2682 # self.check_is_missing_shares))
2683 #d.addCallback(lambda ign: _checkv("large-corrupt-shares",
2684 # self.check_has_corrupt_shares))
2685 d.addCallback(lambda ign: _checkv("large-unrecoverable",
2686 self.check_is_unrecoverable))
2690 def do_test_deepcheck_bad(self, ignored):
2691 d = defer.succeed(None)
2693 # now deep-check the root, with various verify= and repair= options
2694 d.addCallback(lambda ign:
2695 self.root.start_deep_check().when_done())
2697 self.failUnless(IDeepCheckResults.providedBy(cr))
2698 c = cr.get_counters()
2699 self.failUnlessEqual(c["count-objects-checked"], 9)
2700 self.failUnlessEqual(c["count-objects-healthy"], 5)
2701 self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2702 self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2703 d.addCallback(_check1)
2705 d.addCallback(lambda ign:
2706 self.root.start_deep_check(verify=True).when_done())
2708 self.failUnless(IDeepCheckResults.providedBy(cr))
2709 c = cr.get_counters()
2710 self.failUnlessEqual(c["count-objects-checked"], 9)
2711 # until we have a real immutable verifier, these counts will be
2713 #self.failUnlessEqual(c["count-objects-healthy"], 3)
2714 #self.failUnlessEqual(c["count-objects-unhealthy"], 6)
2715 self.failUnlessEqual(c["count-objects-healthy"], 5) # todo
2716 self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2717 self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2718 d.addCallback(_check2)
2722 def json_is_healthy(self, data, where):
2724 self.failUnless(r["healthy"], where)
2725 self.failUnless(r["recoverable"], where)
2726 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2727 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2729 def json_is_missing_shares(self, data, where):
2731 self.failIf(r["healthy"], where)
2732 self.failUnless(r["recoverable"], where)
2733 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2734 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2736 def json_has_corrupt_shares(self, data, where):
2737 # by "corrupt-shares" we mean the file is still recoverable
2739 self.failIf(r["healthy"], where)
2740 self.failUnless(r["recoverable"], where)
2741 self.failUnless(r["count-shares-good"] < 10, where)
2742 self.failUnless(r["count-corrupt-shares"], where)
2743 self.failUnless(r["list-corrupt-shares"], where)
2745 def json_is_unrecoverable(self, data, where):
2747 self.failIf(r["healthy"], where)
2748 self.failIf(r["recoverable"], where)
2749 self.failUnless(r["count-shares-good"] < r["count-shares-needed"],
2751 self.failUnlessEqual(r["count-recoverable-versions"], 0, where)
2752 self.failUnlessEqual(r["count-unrecoverable-versions"], 1, where)
2754 def do_test_web_bad(self, ignored):
2755 d = defer.succeed(None)
2758 def _check(which, checker):
2759 d = self.web_json(self.nodes[which], t="check")
2760 d.addCallback(checker, which + "--webcheck")
2763 d.addCallback(lambda ign: _check("mutable-good",
2764 self.json_is_healthy))
2765 d.addCallback(lambda ign: _check("mutable-missing-shares",
2766 self.json_is_missing_shares))
2767 d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2768 self.json_is_healthy))
2769 d.addCallback(lambda ign: _check("mutable-unrecoverable",
2770 self.json_is_unrecoverable))
2771 d.addCallback(lambda ign: _check("large-good",
2772 self.json_is_healthy))
2773 d.addCallback(lambda ign: _check("large-missing-shares",
2774 self.json_is_missing_shares))
2775 d.addCallback(lambda ign: _check("large-corrupt-shares",
2776 self.json_is_healthy))
2777 d.addCallback(lambda ign: _check("large-unrecoverable",
2778 self.json_is_unrecoverable))
2781 def _checkv(which, checker):
2782 d = self.web_json(self.nodes[which], t="check", verify="true")
2783 d.addCallback(checker, which + "--webcheck-and-verify")
2786 d.addCallback(lambda ign: _checkv("mutable-good",
2787 self.json_is_healthy))
2788 d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2789 self.json_is_missing_shares))
2790 d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2791 self.json_has_corrupt_shares))
2792 d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2793 self.json_is_unrecoverable))
2794 d.addCallback(lambda ign: _checkv("large-good",
2795 self.json_is_healthy))
2796 # disabled pending immutable verifier
2797 #d.addCallback(lambda ign: _checkv("large-missing-shares",
2798 # self.json_is_missing_shares))
2799 #d.addCallback(lambda ign: _checkv("large-corrupt-shares",
2800 # self.json_has_corrupt_shares))
2801 d.addCallback(lambda ign: _checkv("large-unrecoverable",
2802 self.json_is_unrecoverable))