1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
11 from allmydata import uri, storage
12 from allmydata.immutable import download, filenode, offloaded, upload
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17 ICheckResults, ICheckAndRepairResults, IDeepCheckResults, \
18 IDeepCheckAndRepairResults, NoSuchChildError, NotEnoughSharesError
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
27 from allmydata.test.common import SystemTestMixin, ErrorMixin, \
28 MemoryConsumer, download_to_data
31 This is some data to publish to the virtual drive, which needs to be large
32 enough to not fit inside a LIT uri.
35 class CountingDataUploadable(upload.Data):
37 interrupt_after = None
38 interrupt_after_d = None
40 def read(self, length):
41 self.bytes_read += length
42 if self.interrupt_after is not None:
43 if self.bytes_read > self.interrupt_after:
44 self.interrupt_after = None
45 self.interrupt_after_d.callback(self)
46 return upload.Data.read(self, length)
48 class GrabEverythingConsumer:
54 def registerProducer(self, producer, streaming):
56 assert IPushProducer.providedBy(producer)
58 def write(self, data):
61 def unregisterProducer(self):
64 class SystemTest(SystemTestMixin, unittest.TestCase):
66 def test_connections(self):
67 self.basedir = "system/SystemTest/test_connections"
68 d = self.set_up_nodes()
69 self.extra_node = None
70 d.addCallback(lambda res: self.add_extra_node(self.numclients))
71 def _check(extra_node):
72 self.extra_node = extra_node
73 for c in self.clients:
74 all_peerids = list(c.get_all_peerids())
75 self.failUnlessEqual(len(all_peerids), self.numclients+1)
76 permuted_peers = list(c.get_permuted_peers("storage", "a"))
77 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
80 def _shutdown_extra_node(res):
82 return self.extra_node.stopService()
84 d.addBoth(_shutdown_extra_node)
86 test_connections.timeout = 300
87 # test_connections is subsumed by test_upload_and_download, and takes
88 # quite a while to run on a slow machine (because of all the TLS
89 # connections that must be established). If we ever rework the introducer
90 # code to such an extent that we're not sure if it works anymore, we can
91 # reinstate this test until it does.
94 def test_upload_and_download_random_key(self):
95 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
96 return self._test_upload_and_download(convergence=None)
97 test_upload_and_download_random_key.timeout = 4800
99 def test_upload_and_download_convergent(self):
100 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
101 return self._test_upload_and_download(convergence="some convergence string")
102 test_upload_and_download_convergent.timeout = 4800
104 def _test_upload_and_download(self, convergence):
105 # we use 4000 bytes of data, which will result in about 400k written
106 # to disk among all our simulated nodes
107 DATA = "Some data to upload\n" * 200
108 d = self.set_up_nodes()
109 def _check_connections(res):
110 for c in self.clients:
111 all_peerids = list(c.get_all_peerids())
112 self.failUnlessEqual(len(all_peerids), self.numclients)
113 permuted_peers = list(c.get_permuted_peers("storage", "a"))
114 self.failUnlessEqual(len(permuted_peers), self.numclients)
115 d.addCallback(_check_connections)
119 u = self.clients[0].getServiceNamed("uploader")
121 # we crank the max segsize down to 1024b for the duration of this
122 # test, so we can exercise multiple segments. It is important
123 # that this is not a multiple of the segment size, so that the
124 # tail segment is not the same length as the others. This actualy
125 # gets rounded up to 1025 to be a multiple of the number of
126 # required shares (since we use 25 out of 100 FEC).
127 up = upload.Data(DATA, convergence=convergence)
128 up.max_segment_size = 1024
131 d.addCallback(_do_upload)
132 def _upload_done(results):
134 log.msg("upload finished: uri is %s" % (theuri,))
136 assert isinstance(self.uri, str), self.uri
137 dl = self.clients[1].getServiceNamed("downloader")
139 d.addCallback(_upload_done)
141 def _upload_again(res):
142 # Upload again. If using convergent encryption then this ought to be
143 # short-circuited, however with the way we currently generate URIs
144 # (i.e. because they include the roothash), we have to do all of the
145 # encoding work, and only get to save on the upload part.
146 log.msg("UPLOADING AGAIN")
147 up = upload.Data(DATA, convergence=convergence)
148 up.max_segment_size = 1024
149 d1 = self.uploader.upload(up)
150 d.addCallback(_upload_again)
152 def _download_to_data(res):
153 log.msg("DOWNLOADING")
154 return self.downloader.download_to_data(self.uri)
155 d.addCallback(_download_to_data)
156 def _download_to_data_done(data):
157 log.msg("download finished")
158 self.failUnlessEqual(data, DATA)
159 d.addCallback(_download_to_data_done)
161 target_filename = os.path.join(self.basedir, "download.target")
162 def _download_to_filename(res):
163 return self.downloader.download_to_filename(self.uri,
165 d.addCallback(_download_to_filename)
166 def _download_to_filename_done(res):
167 newdata = open(target_filename, "rb").read()
168 self.failUnlessEqual(newdata, DATA)
169 d.addCallback(_download_to_filename_done)
171 target_filename2 = os.path.join(self.basedir, "download.target2")
172 def _download_to_filehandle(res):
173 fh = open(target_filename2, "wb")
174 return self.downloader.download_to_filehandle(self.uri, fh)
175 d.addCallback(_download_to_filehandle)
176 def _download_to_filehandle_done(fh):
178 newdata = open(target_filename2, "rb").read()
179 self.failUnlessEqual(newdata, DATA)
180 d.addCallback(_download_to_filehandle_done)
182 consumer = GrabEverythingConsumer()
183 ct = download.ConsumerAdapter(consumer)
184 d.addCallback(lambda res:
185 self.downloader.download(self.uri, ct))
186 def _download_to_consumer_done(ign):
187 self.failUnlessEqual(consumer.contents, DATA)
188 d.addCallback(_download_to_consumer_done)
191 n = self.clients[1].create_node_from_uri(self.uri)
192 d = download_to_data(n)
193 def _read_done(data):
194 self.failUnlessEqual(data, DATA)
195 d.addCallback(_read_done)
196 d.addCallback(lambda ign:
197 n.read(MemoryConsumer(), offset=1, size=4))
198 def _read_portion_done(mc):
199 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
200 d.addCallback(_read_portion_done)
201 d.addCallback(lambda ign:
202 n.read(MemoryConsumer(), offset=2, size=None))
203 def _read_tail_done(mc):
204 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
205 d.addCallback(_read_tail_done)
206 d.addCallback(lambda ign:
207 n.read(MemoryConsumer(), size=len(DATA)+1000))
208 def _read_too_much(mc):
209 self.failUnlessEqual("".join(mc.chunks), DATA)
210 d.addCallback(_read_too_much)
213 d.addCallback(_test_read)
215 def _test_bad_read(res):
216 bad_u = uri.from_string_filenode(self.uri)
217 bad_u.key = self.flip_bit(bad_u.key)
218 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
219 # this should cause an error during download
221 d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
223 bad_n.read, MemoryConsumer(), offset=2)
225 d.addCallback(_test_bad_read)
227 def _download_nonexistent_uri(res):
228 baduri = self.mangle_uri(self.uri)
229 log.msg("about to download non-existent URI", level=log.UNUSUAL,
230 facility="tahoe.tests")
231 d1 = self.downloader.download_to_data(baduri)
232 def _baduri_should_fail(res):
233 log.msg("finished downloading non-existend URI",
234 level=log.UNUSUAL, facility="tahoe.tests")
235 self.failUnless(isinstance(res, Failure))
236 self.failUnless(res.check(NotEnoughSharesError),
237 "expected NotEnoughSharesError, got %s" % res)
238 # TODO: files that have zero peers should get a special kind
239 # of NotEnoughSharesError, which can be used to suggest that
240 # the URI might be wrong or that they've never uploaded the
241 # file in the first place.
242 d1.addBoth(_baduri_should_fail)
244 d.addCallback(_download_nonexistent_uri)
246 # add a new node, which doesn't accept shares, and only uses the
248 d.addCallback(lambda res: self.add_extra_node(self.numclients,
250 add_to_sparent=True))
251 def _added(extra_node):
252 self.extra_node = extra_node
253 d.addCallback(_added)
255 HELPER_DATA = "Data that needs help to upload" * 1000
256 def _upload_with_helper(res):
257 u = upload.Data(HELPER_DATA, convergence=convergence)
258 d = self.extra_node.upload(u)
259 def _uploaded(results):
261 return self.downloader.download_to_data(uri)
262 d.addCallback(_uploaded)
264 self.failUnlessEqual(newdata, HELPER_DATA)
265 d.addCallback(_check)
267 d.addCallback(_upload_with_helper)
269 def _upload_duplicate_with_helper(res):
270 u = upload.Data(HELPER_DATA, convergence=convergence)
271 u.debug_stash_RemoteEncryptedUploadable = True
272 d = self.extra_node.upload(u)
273 def _uploaded(results):
275 return self.downloader.download_to_data(uri)
276 d.addCallback(_uploaded)
278 self.failUnlessEqual(newdata, HELPER_DATA)
279 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
280 "uploadable started uploading, should have been avoided")
281 d.addCallback(_check)
283 if convergence is not None:
284 d.addCallback(_upload_duplicate_with_helper)
286 def _upload_resumable(res):
287 DATA = "Data that needs help to upload and gets interrupted" * 1000
288 u1 = CountingDataUploadable(DATA, convergence=convergence)
289 u2 = CountingDataUploadable(DATA, convergence=convergence)
291 # we interrupt the connection after about 5kB by shutting down
292 # the helper, then restartingit.
293 u1.interrupt_after = 5000
294 u1.interrupt_after_d = defer.Deferred()
295 u1.interrupt_after_d.addCallback(lambda res:
296 self.bounce_client(0))
298 # sneak into the helper and reduce its chunk size, so that our
299 # debug_interrupt will sever the connection on about the fifth
300 # chunk fetched. This makes sure that we've started to write the
301 # new shares before we abandon them, which exercises the
302 # abort/delete-partial-share code. TODO: find a cleaner way to do
303 # this. I know that this will affect later uses of the helper in
304 # this same test run, but I'm not currently worried about it.
305 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
307 d = self.extra_node.upload(u1)
309 def _should_not_finish(res):
310 self.fail("interrupted upload should have failed, not finished"
311 " with result %s" % (res,))
313 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
315 # make sure we actually interrupted it before finishing the
317 self.failUnless(u1.bytes_read < len(DATA),
318 "read %d out of %d total" % (u1.bytes_read,
321 log.msg("waiting for reconnect", level=log.NOISY,
322 facility="tahoe.test.test_system")
323 # now, we need to give the nodes a chance to notice that this
324 # connection has gone away. When this happens, the storage
325 # servers will be told to abort their uploads, removing the
326 # partial shares. Unfortunately this involves TCP messages
327 # going through the loopback interface, and we can't easily
328 # predict how long that will take. If it were all local, we
329 # could use fireEventually() to stall. Since we don't have
330 # the right introduction hooks, the best we can do is use a
331 # fixed delay. TODO: this is fragile.
332 u1.interrupt_after_d.addCallback(self.stall, 2.0)
333 return u1.interrupt_after_d
334 d.addCallbacks(_should_not_finish, _interrupted)
336 def _disconnected(res):
337 # check to make sure the storage servers aren't still hanging
338 # on to the partial share: their incoming/ directories should
340 log.msg("disconnected", level=log.NOISY,
341 facility="tahoe.test.test_system")
342 for i in range(self.numclients):
343 incdir = os.path.join(self.getdir("client%d" % i),
344 "storage", "shares", "incoming")
345 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
346 d.addCallback(_disconnected)
348 # then we need to give the reconnector a chance to
349 # reestablish the connection to the helper.
350 d.addCallback(lambda res:
351 log.msg("wait_for_connections", level=log.NOISY,
352 facility="tahoe.test.test_system"))
353 d.addCallback(lambda res: self.wait_for_connections())
356 d.addCallback(lambda res:
357 log.msg("uploading again", level=log.NOISY,
358 facility="tahoe.test.test_system"))
359 d.addCallback(lambda res: self.extra_node.upload(u2))
361 def _uploaded(results):
363 log.msg("Second upload complete", level=log.NOISY,
364 facility="tahoe.test.test_system")
366 # this is really bytes received rather than sent, but it's
367 # convenient and basically measures the same thing
368 bytes_sent = results.ciphertext_fetched
370 # We currently don't support resumption of upload if the data is
371 # encrypted with a random key. (Because that would require us
372 # to store the key locally and re-use it on the next upload of
373 # this file, which isn't a bad thing to do, but we currently
375 if convergence is not None:
376 # Make sure we did not have to read the whole file the
377 # second time around .
378 self.failUnless(bytes_sent < len(DATA),
379 "resumption didn't save us any work:"
380 " read %d bytes out of %d total" %
381 (bytes_sent, len(DATA)))
383 # Make sure we did have to read the whole file the second
384 # time around -- because the one that we partially uploaded
385 # earlier was encrypted with a different random key.
386 self.failIf(bytes_sent < len(DATA),
387 "resumption saved us some work even though we were using random keys:"
388 " read %d bytes out of %d total" %
389 (bytes_sent, len(DATA)))
390 return self.downloader.download_to_data(uri)
391 d.addCallback(_uploaded)
394 self.failUnlessEqual(newdata, DATA)
395 # If using convergent encryption, then also check that the
396 # helper has removed the temp file from its directories.
397 if convergence is not None:
398 basedir = os.path.join(self.getdir("client0"), "helper")
399 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
400 self.failUnlessEqual(files, [])
401 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
402 self.failUnlessEqual(files, [])
403 d.addCallback(_check)
405 d.addCallback(_upload_resumable)
407 def _grab_stats(ignored):
408 # the StatsProvider doesn't normally publish a FURL:
409 # instead it passes a live reference to the StatsGatherer
410 # (if and when it connects). To exercise the remote stats
411 # interface, we manually publish client0's StatsProvider
412 # and use client1 to query it.
413 sp = self.clients[0].stats_provider
414 sp_furl = self.clients[0].tub.registerReference(sp)
415 d = self.clients[1].tub.getReference(sp_furl)
416 d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
417 def _got_stats(stats):
419 #from pprint import pprint
422 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
423 c = stats["counters"]
424 self.failUnless("storage_server.allocate" in c)
425 d.addCallback(_got_stats)
427 d.addCallback(_grab_stats)
431 def _find_shares(self, basedir):
433 for (dirpath, dirnames, filenames) in os.walk(basedir):
434 if "storage" not in dirpath:
438 pieces = dirpath.split(os.sep)
439 if pieces[-4] == "storage" and pieces[-3] == "shares":
440 # we're sitting in .../storage/shares/$START/$SINDEX , and there
441 # are sharefiles here
442 assert pieces[-5].startswith("client")
443 client_num = int(pieces[-5][-1])
444 storage_index_s = pieces[-1]
445 storage_index = storage.si_a2b(storage_index_s)
446 for sharename in filenames:
447 shnum = int(sharename)
448 filename = os.path.join(dirpath, sharename)
449 data = (client_num, storage_index, filename, shnum)
452 self.fail("unable to find any share files in %s" % basedir)
455 def _corrupt_mutable_share(self, filename, which):
456 msf = storage.MutableShareFile(filename)
457 datav = msf.readv([ (0, 1000000) ])
458 final_share = datav[0]
459 assert len(final_share) < 1000000 # ought to be truncated
460 pieces = mutable_layout.unpack_share(final_share)
461 (seqnum, root_hash, IV, k, N, segsize, datalen,
462 verification_key, signature, share_hash_chain, block_hash_tree,
463 share_data, enc_privkey) = pieces
465 if which == "seqnum":
468 root_hash = self.flip_bit(root_hash)
470 IV = self.flip_bit(IV)
471 elif which == "segsize":
472 segsize = segsize + 15
473 elif which == "pubkey":
474 verification_key = self.flip_bit(verification_key)
475 elif which == "signature":
476 signature = self.flip_bit(signature)
477 elif which == "share_hash_chain":
478 nodenum = share_hash_chain.keys()[0]
479 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
480 elif which == "block_hash_tree":
481 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
482 elif which == "share_data":
483 share_data = self.flip_bit(share_data)
484 elif which == "encprivkey":
485 enc_privkey = self.flip_bit(enc_privkey)
487 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
489 final_share = mutable_layout.pack_share(prefix,
496 msf.writev( [(0, final_share)], None)
499 def test_mutable(self):
500 self.basedir = "system/SystemTest/test_mutable"
501 DATA = "initial contents go here." # 25 bytes % 3 != 0
502 NEWDATA = "new contents yay"
503 NEWERDATA = "this is getting old"
505 d = self.set_up_nodes(use_key_generator=True)
507 def _create_mutable(res):
509 log.msg("starting create_mutable_file")
510 d1 = c.create_mutable_file(DATA)
512 log.msg("DONE: %s" % (res,))
513 self._mutable_node_1 = res
515 d1.addCallback(_done)
517 d.addCallback(_create_mutable)
519 def _test_debug(res):
520 # find a share. It is important to run this while there is only
521 # one slot in the grid.
522 shares = self._find_shares(self.basedir)
523 (client_num, storage_index, filename, shnum) = shares[0]
524 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
526 log.msg(" for clients[%d]" % client_num)
528 out,err = StringIO(), StringIO()
529 rc = runner.runner(["debug", "dump-share", "--offsets",
531 stdout=out, stderr=err)
532 output = out.getvalue()
533 self.failUnlessEqual(rc, 0)
535 self.failUnless("Mutable slot found:\n" in output)
536 self.failUnless("share_type: SDMF\n" in output)
537 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
538 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
539 self.failUnless(" num_extra_leases: 0\n" in output)
540 # the pubkey size can vary by a byte, so the container might
541 # be a bit larger on some runs.
542 m = re.search(r'^ container_size: (\d+)$', output, re.M)
544 container_size = int(m.group(1))
545 self.failUnless(2037 <= container_size <= 2049, container_size)
546 m = re.search(r'^ data_length: (\d+)$', output, re.M)
548 data_length = int(m.group(1))
549 self.failUnless(2037 <= data_length <= 2049, data_length)
550 self.failUnless(" secrets are for nodeid: %s\n" % peerid
552 self.failUnless(" SDMF contents:\n" in output)
553 self.failUnless(" seqnum: 1\n" in output)
554 self.failUnless(" required_shares: 3\n" in output)
555 self.failUnless(" total_shares: 10\n" in output)
556 self.failUnless(" segsize: 27\n" in output, (output, filename))
557 self.failUnless(" datalen: 25\n" in output)
558 # the exact share_hash_chain nodes depends upon the sharenum,
559 # and is more of a hassle to compute than I want to deal with
561 self.failUnless(" share_hash_chain: " in output)
562 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
563 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
564 base32.b2a(storage_index))
565 self.failUnless(expected in output)
566 except unittest.FailTest:
568 print "dump-share output was:"
571 d.addCallback(_test_debug)
575 # first, let's see if we can use the existing node to retrieve the
576 # contents. This allows it to use the cached pubkey and maybe the
577 # latest-known sharemap.
579 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
580 def _check_download_1(res):
581 self.failUnlessEqual(res, DATA)
582 # now we see if we can retrieve the data from a new node,
583 # constructed using the URI of the original one. We do this test
584 # on the same client that uploaded the data.
585 uri = self._mutable_node_1.get_uri()
586 log.msg("starting retrieve1")
587 newnode = self.clients[0].create_node_from_uri(uri)
588 newnode_2 = self.clients[0].create_node_from_uri(uri)
589 self.failUnlessIdentical(newnode, newnode_2)
590 return newnode.download_best_version()
591 d.addCallback(_check_download_1)
593 def _check_download_2(res):
594 self.failUnlessEqual(res, DATA)
595 # same thing, but with a different client
596 uri = self._mutable_node_1.get_uri()
597 newnode = self.clients[1].create_node_from_uri(uri)
598 log.msg("starting retrieve2")
599 d1 = newnode.download_best_version()
600 d1.addCallback(lambda res: (res, newnode))
602 d.addCallback(_check_download_2)
604 def _check_download_3((res, newnode)):
605 self.failUnlessEqual(res, DATA)
607 log.msg("starting replace1")
608 d1 = newnode.overwrite(NEWDATA)
609 d1.addCallback(lambda res: newnode.download_best_version())
611 d.addCallback(_check_download_3)
613 def _check_download_4(res):
614 self.failUnlessEqual(res, NEWDATA)
615 # now create an even newer node and replace the data on it. This
616 # new node has never been used for download before.
617 uri = self._mutable_node_1.get_uri()
618 newnode1 = self.clients[2].create_node_from_uri(uri)
619 newnode2 = self.clients[3].create_node_from_uri(uri)
620 self._newnode3 = self.clients[3].create_node_from_uri(uri)
621 log.msg("starting replace2")
622 d1 = newnode1.overwrite(NEWERDATA)
623 d1.addCallback(lambda res: newnode2.download_best_version())
625 d.addCallback(_check_download_4)
627 def _check_download_5(res):
628 log.msg("finished replace2")
629 self.failUnlessEqual(res, NEWERDATA)
630 d.addCallback(_check_download_5)
632 def _corrupt_shares(res):
633 # run around and flip bits in all but k of the shares, to test
635 shares = self._find_shares(self.basedir)
636 ## sort by share number
637 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
638 where = dict([ (shnum, filename)
639 for (client_num, storage_index, filename, shnum)
641 assert len(where) == 10 # this test is designed for 3-of-10
642 for shnum, filename in where.items():
643 # shares 7,8,9 are left alone. read will check
644 # (share_hash_chain, block_hash_tree, share_data). New
645 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
646 # segsize, signature).
648 # read: this will trigger "pubkey doesn't match
650 self._corrupt_mutable_share(filename, "pubkey")
651 self._corrupt_mutable_share(filename, "encprivkey")
653 # triggers "signature is invalid"
654 self._corrupt_mutable_share(filename, "seqnum")
656 # triggers "signature is invalid"
657 self._corrupt_mutable_share(filename, "R")
659 # triggers "signature is invalid"
660 self._corrupt_mutable_share(filename, "segsize")
662 self._corrupt_mutable_share(filename, "share_hash_chain")
664 self._corrupt_mutable_share(filename, "block_hash_tree")
666 self._corrupt_mutable_share(filename, "share_data")
667 # other things to correct: IV, signature
668 # 7,8,9 are left alone
670 # note that initial_query_count=5 means that we'll hit the
671 # first 5 servers in effectively random order (based upon
672 # response time), so we won't necessarily ever get a "pubkey
673 # doesn't match fingerprint" error (if we hit shnum>=1 before
674 # shnum=0, we pull the pubkey from there). To get repeatable
675 # specific failures, we need to set initial_query_count=1,
676 # but of course that will change the sequencing behavior of
677 # the retrieval process. TODO: find a reasonable way to make
678 # this a parameter, probably when we expand this test to test
679 # for one failure mode at a time.
681 # when we retrieve this, we should get three signature
682 # failures (where we've mangled seqnum, R, and segsize). The
684 d.addCallback(_corrupt_shares)
686 d.addCallback(lambda res: self._newnode3.download_best_version())
687 d.addCallback(_check_download_5)
689 def _check_empty_file(res):
690 # make sure we can create empty files, this usually screws up the
692 d1 = self.clients[2].create_mutable_file("")
693 d1.addCallback(lambda newnode: newnode.download_best_version())
694 d1.addCallback(lambda res: self.failUnlessEqual("", res))
696 d.addCallback(_check_empty_file)
698 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
699 def _created_dirnode(dnode):
700 log.msg("_created_dirnode(%s)" % (dnode,))
702 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
703 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
704 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
705 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
706 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
707 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
708 d1.addCallback(lambda res: dnode.build_manifest().when_done())
709 d1.addCallback(lambda res:
710 self.failUnlessEqual(len(res["manifest"]), 1))
712 d.addCallback(_created_dirnode)
714 def wait_for_c3_kg_conn():
715 return self.clients[3]._key_generator is not None
716 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
718 def check_kg_poolsize(junk, size_delta):
719 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
720 self.key_generator_svc.key_generator.pool_size + size_delta)
722 d.addCallback(check_kg_poolsize, 0)
723 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
724 d.addCallback(check_kg_poolsize, -1)
725 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
726 d.addCallback(check_kg_poolsize, -2)
727 # use_helper induces use of clients[3], which is the using-key_gen client
728 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
729 d.addCallback(check_kg_poolsize, -3)
732 # The default 120 second timeout went off when running it under valgrind
733 # on my old Windows laptop, so I'm bumping up the timeout.
734 test_mutable.timeout = 240
736 def flip_bit(self, good):
737 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
739 def mangle_uri(self, gooduri):
740 # change the key, which changes the storage index, which means we'll
741 # be asking about the wrong file, so nobody will have any shares
742 u = IFileURI(gooduri)
743 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
744 uri_extension_hash=u.uri_extension_hash,
745 needed_shares=u.needed_shares,
746 total_shares=u.total_shares,
748 return u2.to_string()
750 # TODO: add a test which mangles the uri_extension_hash instead, and
751 # should fail due to not being able to get a valid uri_extension block.
752 # Also a test which sneakily mangles the uri_extension block to change
753 # some of the validation data, so it will fail in the post-download phase
754 # when the file's crypttext integrity check fails. Do the same thing for
755 # the key, which should cause the download to fail the post-download
756 # plaintext_hash check.
758 def test_vdrive(self):
759 self.basedir = "system/SystemTest/test_vdrive"
760 self.data = LARGE_DATA
761 d = self.set_up_nodes(use_stats_gatherer=True)
762 d.addCallback(self._test_introweb)
763 d.addCallback(self.log, "starting publish")
764 d.addCallback(self._do_publish1)
765 d.addCallback(self._test_runner)
766 d.addCallback(self._do_publish2)
767 # at this point, we have the following filesystem (where "R" denotes
768 # self._root_directory_uri):
771 # R/subdir1/mydata567
773 # R/subdir1/subdir2/mydata992
775 d.addCallback(lambda res: self.bounce_client(0))
776 d.addCallback(self.log, "bounced client0")
778 d.addCallback(self._check_publish1)
779 d.addCallback(self.log, "did _check_publish1")
780 d.addCallback(self._check_publish2)
781 d.addCallback(self.log, "did _check_publish2")
782 d.addCallback(self._do_publish_private)
783 d.addCallback(self.log, "did _do_publish_private")
784 # now we also have (where "P" denotes a new dir):
785 # P/personal/sekrit data
786 # P/s2-rw -> /subdir1/subdir2/
787 # P/s2-ro -> /subdir1/subdir2/ (read-only)
788 d.addCallback(self._check_publish_private)
789 d.addCallback(self.log, "did _check_publish_private")
790 d.addCallback(self._test_web)
791 d.addCallback(self._test_control)
792 d.addCallback(self._test_cli)
793 # P now has four top-level children:
794 # P/personal/sekrit data
797 # P/test_put/ (empty)
798 d.addCallback(self._test_checker)
800 test_vdrive.timeout = 1100
802 def _test_introweb(self, res):
803 d = getPage(self.introweb_url, method="GET", followRedirect=True)
806 self.failUnless("allmydata: %s" % str(allmydata.__version__)
808 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
809 self.failUnless("Subscription Summary: storage: 5" in res)
810 except unittest.FailTest:
812 print "GET %s output was:" % self.introweb_url
815 d.addCallback(_check)
816 d.addCallback(lambda res:
817 getPage(self.introweb_url + "?t=json",
818 method="GET", followRedirect=True))
819 def _check_json(res):
820 data = simplejson.loads(res)
822 self.failUnlessEqual(data["subscription_summary"],
824 self.failUnlessEqual(data["announcement_summary"],
825 {"storage": 5, "stub_client": 5})
826 self.failUnlessEqual(data["announcement_distinct_hosts"],
827 {"storage": 1, "stub_client": 1})
828 except unittest.FailTest:
830 print "GET %s?t=json output was:" % self.introweb_url
833 d.addCallback(_check_json)
836 def _do_publish1(self, res):
837 ut = upload.Data(self.data, convergence=None)
839 d = c0.create_empty_dirnode()
840 def _made_root(new_dirnode):
841 self._root_directory_uri = new_dirnode.get_uri()
842 return c0.create_node_from_uri(self._root_directory_uri)
843 d.addCallback(_made_root)
844 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
845 def _made_subdir1(subdir1_node):
846 self._subdir1_node = subdir1_node
847 d1 = subdir1_node.add_file(u"mydata567", ut)
848 d1.addCallback(self.log, "publish finished")
849 def _stash_uri(filenode):
850 self.uri = filenode.get_uri()
851 assert isinstance(self.uri, str), (self.uri, filenode)
852 d1.addCallback(_stash_uri)
854 d.addCallback(_made_subdir1)
857 def _do_publish2(self, res):
858 ut = upload.Data(self.data, convergence=None)
859 d = self._subdir1_node.create_empty_directory(u"subdir2")
860 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
863 def log(self, res, *args, **kwargs):
864 # print "MSG: %s RES: %s" % (msg, args)
865 log.msg(*args, **kwargs)
868 def _do_publish_private(self, res):
869 self.smalldata = "sssh, very secret stuff"
870 ut = upload.Data(self.smalldata, convergence=None)
871 d = self.clients[0].create_empty_dirnode()
872 d.addCallback(self.log, "GOT private directory")
873 def _got_new_dir(privnode):
874 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
875 d1 = privnode.create_empty_directory(u"personal")
876 d1.addCallback(self.log, "made P/personal")
877 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
878 d1.addCallback(self.log, "made P/personal/sekrit data")
879 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
881 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
882 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
884 d1.addCallback(_got_s2)
885 d1.addCallback(lambda res: privnode)
887 d.addCallback(_got_new_dir)
890 def _check_publish1(self, res):
891 # this one uses the iterative API
893 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
894 d.addCallback(self.log, "check_publish1 got /")
895 d.addCallback(lambda root: root.get(u"subdir1"))
896 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
897 d.addCallback(lambda filenode: filenode.download_to_data())
898 d.addCallback(self.log, "get finished")
900 self.failUnlessEqual(data, self.data)
901 d.addCallback(_get_done)
904 def _check_publish2(self, res):
905 # this one uses the path-based API
906 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
907 d = rootnode.get_child_at_path(u"subdir1")
908 d.addCallback(lambda dirnode:
909 self.failUnless(IDirectoryNode.providedBy(dirnode)))
910 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
911 d.addCallback(lambda filenode: filenode.download_to_data())
912 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
914 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
915 def _got_filenode(filenode):
916 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
917 assert fnode == filenode
918 d.addCallback(_got_filenode)
921 def _check_publish_private(self, resnode):
922 # this one uses the path-based API
923 self._private_node = resnode
925 d = self._private_node.get_child_at_path(u"personal")
926 def _got_personal(personal):
927 self._personal_node = personal
929 d.addCallback(_got_personal)
931 d.addCallback(lambda dirnode:
932 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
934 return self._private_node.get_child_at_path(path)
936 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
937 d.addCallback(lambda filenode: filenode.download_to_data())
938 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
939 d.addCallback(lambda res: get_path(u"s2-rw"))
940 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
941 d.addCallback(lambda res: get_path(u"s2-ro"))
942 def _got_s2ro(dirnode):
943 self.failUnless(dirnode.is_mutable(), dirnode)
944 self.failUnless(dirnode.is_readonly(), dirnode)
945 d1 = defer.succeed(None)
946 d1.addCallback(lambda res: dirnode.list())
947 d1.addCallback(self.log, "dirnode.list")
949 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
951 d1.addCallback(self.log, "doing add_file(ro)")
952 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
953 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
955 d1.addCallback(self.log, "doing get(ro)")
956 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
957 d1.addCallback(lambda filenode:
958 self.failUnless(IFileNode.providedBy(filenode)))
960 d1.addCallback(self.log, "doing delete(ro)")
961 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
963 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
965 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
967 personal = self._personal_node
968 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
970 d1.addCallback(self.log, "doing move_child_to(ro)2")
971 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
973 d1.addCallback(self.log, "finished with _got_s2ro")
975 d.addCallback(_got_s2ro)
976 def _got_home(dummy):
977 home = self._private_node
978 personal = self._personal_node
979 d1 = defer.succeed(None)
980 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
981 d1.addCallback(lambda res:
982 personal.move_child_to(u"sekrit data",home,u"sekrit"))
984 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
985 d1.addCallback(lambda res:
986 home.move_child_to(u"sekrit", home, u"sekrit data"))
988 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
989 d1.addCallback(lambda res:
990 home.move_child_to(u"sekrit data", personal))
992 d1.addCallback(lambda res: home.build_manifest().when_done())
993 d1.addCallback(self.log, "manifest")
997 # P/personal/sekrit data
998 # P/s2-rw (same as P/s2-ro)
999 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1000 d1.addCallback(lambda res:
1001 self.failUnlessEqual(len(res["manifest"]), 5))
1002 d1.addCallback(lambda res: home.start_deep_stats().when_done())
1003 def _check_stats(stats):
1004 expected = {"count-immutable-files": 1,
1005 "count-mutable-files": 0,
1006 "count-literal-files": 1,
1008 "count-directories": 3,
1009 "size-immutable-files": 112,
1010 "size-literal-files": 23,
1011 #"size-directories": 616, # varies
1012 #"largest-directory": 616,
1013 "largest-directory-children": 3,
1014 "largest-immutable-file": 112,
1016 for k,v in expected.iteritems():
1017 self.failUnlessEqual(stats[k], v,
1018 "stats[%s] was %s, not %s" %
1020 self.failUnless(stats["size-directories"] > 1300,
1021 stats["size-directories"])
1022 self.failUnless(stats["largest-directory"] > 800,
1023 stats["largest-directory"])
1024 self.failUnlessEqual(stats["size-files-histogram"],
1025 [ (11, 31, 1), (101, 316, 1) ])
1026 d1.addCallback(_check_stats)
1028 d.addCallback(_got_home)
1031 def shouldFail(self, res, expected_failure, which, substring=None):
1032 if isinstance(res, Failure):
1033 res.trap(expected_failure)
1035 self.failUnless(substring in str(res),
1036 "substring '%s' not in '%s'"
1037 % (substring, str(res)))
1039 self.fail("%s was supposed to raise %s, not get '%s'" %
1040 (which, expected_failure, res))
1042 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1043 assert substring is None or isinstance(substring, str)
1044 d = defer.maybeDeferred(callable, *args, **kwargs)
1046 if isinstance(res, Failure):
1047 res.trap(expected_failure)
1049 self.failUnless(substring in str(res),
1050 "substring '%s' not in '%s'"
1051 % (substring, str(res)))
1053 self.fail("%s was supposed to raise %s, not get '%s'" %
1054 (which, expected_failure, res))
1058 def PUT(self, urlpath, data):
1059 url = self.webish_url + urlpath
1060 return getPage(url, method="PUT", postdata=data)
1062 def GET(self, urlpath, followRedirect=False):
1063 url = self.webish_url + urlpath
1064 return getPage(url, method="GET", followRedirect=followRedirect)
1066 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1068 url = self.helper_webish_url + urlpath
1070 url = self.webish_url + urlpath
1071 sepbase = "boogabooga"
1072 sep = "--" + sepbase
1075 form.append('Content-Disposition: form-data; name="_charset"')
1077 form.append('UTF-8')
1079 for name, value in fields.iteritems():
1080 if isinstance(value, tuple):
1081 filename, value = value
1082 form.append('Content-Disposition: form-data; name="%s"; '
1083 'filename="%s"' % (name, filename.encode("utf-8")))
1085 form.append('Content-Disposition: form-data; name="%s"' % name)
1087 form.append(str(value))
1090 body = "\r\n".join(form) + "\r\n"
1091 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1093 return getPage(url, method="POST", postdata=body,
1094 headers=headers, followRedirect=followRedirect)
1096 def _test_web(self, res):
1097 base = self.webish_url
1098 public = "uri/" + self._root_directory_uri
1100 def _got_welcome(page):
1101 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1102 self.failUnless(expected in page,
1103 "I didn't see the right 'connected storage servers'"
1104 " message in: %s" % page
1106 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1107 self.failUnless(expected in page,
1108 "I didn't see the right 'My nodeid' message "
1110 self.failUnless("Helper: 0 active uploads" in page)
1111 d.addCallback(_got_welcome)
1112 d.addCallback(self.log, "done with _got_welcome")
1114 # get the welcome page from the node that uses the helper too
1115 d.addCallback(lambda res: getPage(self.helper_webish_url))
1116 def _got_welcome_helper(page):
1117 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1119 self.failUnless("Not running helper" in page)
1120 d.addCallback(_got_welcome_helper)
1122 d.addCallback(lambda res: getPage(base + public))
1123 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1124 def _got_subdir1(page):
1125 # there ought to be an href for our file
1126 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1127 self.failUnless(">mydata567</a>" in page)
1128 d.addCallback(_got_subdir1)
1129 d.addCallback(self.log, "done with _got_subdir1")
1130 d.addCallback(lambda res:
1131 getPage(base + public + "/subdir1/mydata567"))
1132 def _got_data(page):
1133 self.failUnlessEqual(page, self.data)
1134 d.addCallback(_got_data)
1136 # download from a URI embedded in a URL
1137 d.addCallback(self.log, "_get_from_uri")
1138 def _get_from_uri(res):
1139 return getPage(base + "uri/%s?filename=%s"
1140 % (self.uri, "mydata567"))
1141 d.addCallback(_get_from_uri)
1142 def _got_from_uri(page):
1143 self.failUnlessEqual(page, self.data)
1144 d.addCallback(_got_from_uri)
1146 # download from a URI embedded in a URL, second form
1147 d.addCallback(self.log, "_get_from_uri2")
1148 def _get_from_uri2(res):
1149 return getPage(base + "uri?uri=%s" % (self.uri,))
1150 d.addCallback(_get_from_uri2)
1151 d.addCallback(_got_from_uri)
1153 # download from a bogus URI, make sure we get a reasonable error
1154 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1155 def _get_from_bogus_uri(res):
1156 d1 = getPage(base + "uri/%s?filename=%s"
1157 % (self.mangle_uri(self.uri), "mydata567"))
1158 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1161 d.addCallback(_get_from_bogus_uri)
1162 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1164 # upload a file with PUT
1165 d.addCallback(self.log, "about to try PUT")
1166 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1167 "new.txt contents"))
1168 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1169 d.addCallback(self.failUnlessEqual, "new.txt contents")
1170 # and again with something large enough to use multiple segments,
1171 # and hopefully trigger pauseProducing too
1172 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1173 "big" * 500000)) # 1.5MB
1174 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1175 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1177 # can we replace files in place?
1178 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1180 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1181 d.addCallback(self.failUnlessEqual, "NEWER contents")
1183 # test unlinked POST
1184 d.addCallback(lambda res: self.POST("uri", t="upload",
1185 file=("new.txt", "data" * 10000)))
1186 # and again using the helper, which exercises different upload-status
1188 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1189 file=("foo.txt", "data2" * 10000)))
1191 # check that the status page exists
1192 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1193 def _got_status(res):
1194 # find an interesting upload and download to look at. LIT files
1195 # are not interesting.
1196 for ds in self.clients[0].list_all_download_statuses():
1197 if ds.get_size() > 200:
1198 self._down_status = ds.get_counter()
1199 for us in self.clients[0].list_all_upload_statuses():
1200 if us.get_size() > 200:
1201 self._up_status = us.get_counter()
1202 rs = list(self.clients[0].list_all_retrieve_statuses())[0]
1203 self._retrieve_status = rs.get_counter()
1204 ps = list(self.clients[0].list_all_publish_statuses())[0]
1205 self._publish_status = ps.get_counter()
1206 us = list(self.clients[0].list_all_mapupdate_statuses())[0]
1207 self._update_status = us.get_counter()
1209 # and that there are some upload- and download- status pages
1210 return self.GET("status/up-%d" % self._up_status)
1211 d.addCallback(_got_status)
1213 return self.GET("status/down-%d" % self._down_status)
1214 d.addCallback(_got_up)
1216 return self.GET("status/mapupdate-%d" % self._update_status)
1217 d.addCallback(_got_down)
1218 def _got_update(res):
1219 return self.GET("status/publish-%d" % self._publish_status)
1220 d.addCallback(_got_update)
1221 def _got_publish(res):
1222 return self.GET("status/retrieve-%d" % self._retrieve_status)
1223 d.addCallback(_got_publish)
1225 # check that the helper status page exists
1226 d.addCallback(lambda res:
1227 self.GET("helper_status", followRedirect=True))
1228 def _got_helper_status(res):
1229 self.failUnless("Bytes Fetched:" in res)
1230 # touch a couple of files in the helper's working directory to
1231 # exercise more code paths
1232 workdir = os.path.join(self.getdir("client0"), "helper")
1233 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1234 f = open(incfile, "wb")
1235 f.write("small file")
1237 then = time.time() - 86400*3
1239 os.utime(incfile, (now, then))
1240 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1241 f = open(encfile, "wb")
1242 f.write("less small file")
1244 os.utime(encfile, (now, then))
1245 d.addCallback(_got_helper_status)
1246 # and that the json form exists
1247 d.addCallback(lambda res:
1248 self.GET("helper_status?t=json", followRedirect=True))
1249 def _got_helper_status_json(res):
1250 data = simplejson.loads(res)
1251 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1253 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1254 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1255 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1257 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1258 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1259 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1261 d.addCallback(_got_helper_status_json)
1263 # and check that client[3] (which uses a helper but does not run one
1264 # itself) doesn't explode when you ask for its status
1265 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1266 def _got_non_helper_status(res):
1267 self.failUnless("Upload and Download Status" in res)
1268 d.addCallback(_got_non_helper_status)
1270 # or for helper status with t=json
1271 d.addCallback(lambda res:
1272 getPage(self.helper_webish_url + "helper_status?t=json"))
1273 def _got_non_helper_status_json(res):
1274 data = simplejson.loads(res)
1275 self.failUnlessEqual(data, {})
1276 d.addCallback(_got_non_helper_status_json)
1278 # see if the statistics page exists
1279 d.addCallback(lambda res: self.GET("statistics"))
1280 def _got_stats(res):
1281 self.failUnless("Node Statistics" in res)
1282 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1283 d.addCallback(_got_stats)
1284 d.addCallback(lambda res: self.GET("statistics?t=json"))
1285 def _got_stats_json(res):
1286 data = simplejson.loads(res)
1287 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1288 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1289 d.addCallback(_got_stats_json)
1291 # TODO: mangle the second segment of a file, to test errors that
1292 # occur after we've already sent some good data, which uses a
1293 # different error path.
1295 # TODO: download a URI with a form
1296 # TODO: create a directory by using a form
1297 # TODO: upload by using a form on the directory page
1298 # url = base + "somedir/subdir1/freeform_post!!upload"
1299 # TODO: delete a file by using a button on the directory page
1303 def _test_runner(self, res):
1304 # exercise some of the diagnostic tools in runner.py
1307 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1308 if "storage" not in dirpath:
1312 pieces = dirpath.split(os.sep)
1313 if pieces[-4] == "storage" and pieces[-3] == "shares":
1314 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1315 # are sharefiles here
1316 filename = os.path.join(dirpath, filenames[0])
1317 # peek at the magic to see if it is a chk share
1318 magic = open(filename, "rb").read(4)
1319 if magic == '\x00\x00\x00\x01':
1322 self.fail("unable to find any uri_extension files in %s"
1324 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1326 out,err = StringIO(), StringIO()
1327 rc = runner.runner(["debug", "dump-share", "--offsets",
1329 stdout=out, stderr=err)
1330 output = out.getvalue()
1331 self.failUnlessEqual(rc, 0)
1333 # we only upload a single file, so we can assert some things about
1334 # its size and shares.
1335 self.failUnless(("share filename: %s" % filename) in output)
1336 self.failUnless("size: %d\n" % len(self.data) in output)
1337 self.failUnless("num_segments: 1\n" in output)
1338 # segment_size is always a multiple of needed_shares
1339 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1340 self.failUnless("total_shares: 10\n" in output)
1341 # keys which are supposed to be present
1342 for key in ("size", "num_segments", "segment_size",
1343 "needed_shares", "total_shares",
1344 "codec_name", "codec_params", "tail_codec_params",
1345 #"plaintext_hash", "plaintext_root_hash",
1346 "crypttext_hash", "crypttext_root_hash",
1347 "share_root_hash", "UEB_hash"):
1348 self.failUnless("%s: " % key in output, key)
1349 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1351 # now use its storage index to find the other shares using the
1352 # 'find-shares' tool
1353 sharedir, shnum = os.path.split(filename)
1354 storagedir, storage_index_s = os.path.split(sharedir)
1355 out,err = StringIO(), StringIO()
1356 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1357 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1358 rc = runner.runner(cmd, stdout=out, stderr=err)
1359 self.failUnlessEqual(rc, 0)
1361 sharefiles = [sfn.strip() for sfn in out.readlines()]
1362 self.failUnlessEqual(len(sharefiles), 10)
1364 # also exercise the 'catalog-shares' tool
1365 out,err = StringIO(), StringIO()
1366 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1367 cmd = ["debug", "catalog-shares"] + nodedirs
1368 rc = runner.runner(cmd, stdout=out, stderr=err)
1369 self.failUnlessEqual(rc, 0)
1371 descriptions = [sfn.strip() for sfn in out.readlines()]
1372 self.failUnlessEqual(len(descriptions), 30)
1374 for line in descriptions
1375 if line.startswith("CHK %s " % storage_index_s)]
1376 self.failUnlessEqual(len(matching), 10)
1378 def _test_control(self, res):
1379 # exercise the remote-control-the-client foolscap interfaces in
1380 # allmydata.control (mostly used for performance tests)
1381 c0 = self.clients[0]
1382 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1383 control_furl = open(control_furl_file, "r").read().strip()
1384 # it doesn't really matter which Tub we use to connect to the client,
1385 # so let's just use our IntroducerNode's
1386 d = self.introducer.tub.getReference(control_furl)
1387 d.addCallback(self._test_control2, control_furl_file)
1389 def _test_control2(self, rref, filename):
1390 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1391 downfile = os.path.join(self.basedir, "control.downfile")
1392 d.addCallback(lambda uri:
1393 rref.callRemote("download_from_uri_to_file",
1396 self.failUnlessEqual(res, downfile)
1397 data = open(downfile, "r").read()
1398 expected_data = open(filename, "r").read()
1399 self.failUnlessEqual(data, expected_data)
1400 d.addCallback(_check)
1401 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1402 if sys.platform == "linux2":
1403 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1404 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1407 def _test_cli(self, res):
1408 # run various CLI commands (in a thread, since they use blocking
1411 private_uri = self._private_node.get_uri()
1412 some_uri = self._root_directory_uri
1413 client0_basedir = self.getdir("client0")
1416 "--node-directory", client0_basedir,
1418 TESTDATA = "I will not write the same thing over and over.\n" * 100
1420 d = defer.succeed(None)
1422 # for compatibility with earlier versions, private/root_dir.cap is
1423 # supposed to be treated as an alias named "tahoe:". Start by making
1424 # sure that works, before we add other aliases.
1426 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1427 f = open(root_file, "w")
1428 f.write(private_uri)
1431 def run(ignored, verb, *args, **kwargs):
1432 stdin = kwargs.get("stdin", "")
1433 newargs = [verb] + nodeargs + list(args)
1434 return self._run_cli(newargs, stdin=stdin)
1436 def _check_ls((out,err), expected_children, unexpected_children=[]):
1437 self.failUnlessEqual(err, "")
1438 for s in expected_children:
1439 self.failUnless(s in out, (s,out))
1440 for s in unexpected_children:
1441 self.failIf(s in out, (s,out))
1443 def _check_ls_root((out,err)):
1444 self.failUnless("personal" in out)
1445 self.failUnless("s2-ro" in out)
1446 self.failUnless("s2-rw" in out)
1447 self.failUnlessEqual(err, "")
1449 # this should reference private_uri
1450 d.addCallback(run, "ls")
1451 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1453 d.addCallback(run, "list-aliases")
1454 def _check_aliases_1((out,err)):
1455 self.failUnlessEqual(err, "")
1456 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1457 d.addCallback(_check_aliases_1)
1459 # now that that's out of the way, remove root_dir.cap and work with
1461 d.addCallback(lambda res: os.unlink(root_file))
1462 d.addCallback(run, "list-aliases")
1463 def _check_aliases_2((out,err)):
1464 self.failUnlessEqual(err, "")
1465 self.failUnlessEqual(out, "")
1466 d.addCallback(_check_aliases_2)
1468 d.addCallback(run, "mkdir")
1469 def _got_dir( (out,err) ):
1470 self.failUnless(uri.from_string_dirnode(out.strip()))
1472 d.addCallback(_got_dir)
1473 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1475 d.addCallback(run, "list-aliases")
1476 def _check_aliases_3((out,err)):
1477 self.failUnlessEqual(err, "")
1478 self.failUnless("tahoe: " in out)
1479 d.addCallback(_check_aliases_3)
1481 def _check_empty_dir((out,err)):
1482 self.failUnlessEqual(out, "")
1483 self.failUnlessEqual(err, "")
1484 d.addCallback(run, "ls")
1485 d.addCallback(_check_empty_dir)
1487 def _check_missing_dir((out,err)):
1488 # TODO: check that rc==2
1489 self.failUnlessEqual(out, "")
1490 self.failUnlessEqual(err, "No such file or directory\n")
1491 d.addCallback(run, "ls", "bogus")
1492 d.addCallback(_check_missing_dir)
1497 fn = os.path.join(self.basedir, "file%d" % i)
1499 data = "data to be uploaded: file%d\n" % i
1501 open(fn,"wb").write(data)
1503 def _check_stdout_against((out,err), filenum=None, data=None):
1504 self.failUnlessEqual(err, "")
1505 if filenum is not None:
1506 self.failUnlessEqual(out, datas[filenum])
1507 if data is not None:
1508 self.failUnlessEqual(out, data)
1510 # test all both forms of put: from a file, and from stdin
1512 d.addCallback(run, "put", files[0], "tahoe-file0")
1513 def _put_out((out,err)):
1514 self.failUnless("URI:LIT:" in out, out)
1515 self.failUnless("201 Created" in err, err)
1517 return run(None, "get", uri0)
1518 d.addCallback(_put_out)
1519 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1521 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1522 # tahoe put bar tahoe:FOO
1523 d.addCallback(run, "put", files[2], "tahoe:file2")
1524 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1525 def _check_put_mutable((out,err)):
1526 self._mutable_file3_uri = out.strip()
1527 d.addCallback(_check_put_mutable)
1528 d.addCallback(run, "get", "tahoe:file3")
1529 d.addCallback(_check_stdout_against, 3)
1532 STDIN_DATA = "This is the file to upload from stdin."
1533 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1534 # tahoe put tahoe:FOO
1535 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1536 stdin="Other file from stdin.")
1538 d.addCallback(run, "ls")
1539 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1540 "tahoe-file-stdin", "from-stdin"])
1541 d.addCallback(run, "ls", "subdir")
1542 d.addCallback(_check_ls, ["tahoe-file1"])
1545 d.addCallback(run, "mkdir", "subdir2")
1546 d.addCallback(run, "ls")
1547 # TODO: extract the URI, set an alias with it
1548 d.addCallback(_check_ls, ["subdir2"])
1550 # tahoe get: (to stdin and to a file)
1551 d.addCallback(run, "get", "tahoe-file0")
1552 d.addCallback(_check_stdout_against, 0)
1553 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1554 d.addCallback(_check_stdout_against, 1)
1555 outfile0 = os.path.join(self.basedir, "outfile0")
1556 d.addCallback(run, "get", "file2", outfile0)
1557 def _check_outfile0((out,err)):
1558 data = open(outfile0,"rb").read()
1559 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1560 d.addCallback(_check_outfile0)
1561 outfile1 = os.path.join(self.basedir, "outfile0")
1562 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1563 def _check_outfile1((out,err)):
1564 data = open(outfile1,"rb").read()
1565 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1566 d.addCallback(_check_outfile1)
1568 d.addCallback(run, "rm", "tahoe-file0")
1569 d.addCallback(run, "rm", "tahoe:file2")
1570 d.addCallback(run, "ls")
1571 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1573 d.addCallback(run, "ls", "-l")
1574 def _check_ls_l((out,err)):
1575 lines = out.split("\n")
1577 if "tahoe-file-stdin" in l:
1578 self.failUnless(l.startswith("-r-- "), l)
1579 self.failUnless(" %d " % len(STDIN_DATA) in l)
1581 self.failUnless(l.startswith("-rw- "), l) # mutable
1582 d.addCallback(_check_ls_l)
1584 d.addCallback(run, "ls", "--uri")
1585 def _check_ls_uri((out,err)):
1586 lines = out.split("\n")
1589 self.failUnless(self._mutable_file3_uri in l)
1590 d.addCallback(_check_ls_uri)
1592 d.addCallback(run, "ls", "--readonly-uri")
1593 def _check_ls_rouri((out,err)):
1594 lines = out.split("\n")
1597 rw_uri = self._mutable_file3_uri
1598 u = uri.from_string_mutable_filenode(rw_uri)
1599 ro_uri = u.get_readonly().to_string()
1600 self.failUnless(ro_uri in l)
1601 d.addCallback(_check_ls_rouri)
1604 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1605 d.addCallback(run, "ls")
1606 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1608 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1609 d.addCallback(run, "ls")
1610 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1612 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1613 d.addCallback(run, "ls")
1614 d.addCallback(_check_ls, ["file3", "file3-copy"])
1615 d.addCallback(run, "get", "tahoe:file3-copy")
1616 d.addCallback(_check_stdout_against, 3)
1618 # copy from disk into tahoe
1619 d.addCallback(run, "cp", files[4], "tahoe:file4")
1620 d.addCallback(run, "ls")
1621 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1622 d.addCallback(run, "get", "tahoe:file4")
1623 d.addCallback(_check_stdout_against, 4)
1625 # copy from tahoe into disk
1626 target_filename = os.path.join(self.basedir, "file-out")
1627 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1628 def _check_cp_out((out,err)):
1629 self.failUnless(os.path.exists(target_filename))
1630 got = open(target_filename,"rb").read()
1631 self.failUnlessEqual(got, datas[4])
1632 d.addCallback(_check_cp_out)
1634 # copy from disk to disk (silly case)
1635 target2_filename = os.path.join(self.basedir, "file-out-copy")
1636 d.addCallback(run, "cp", target_filename, target2_filename)
1637 def _check_cp_out2((out,err)):
1638 self.failUnless(os.path.exists(target2_filename))
1639 got = open(target2_filename,"rb").read()
1640 self.failUnlessEqual(got, datas[4])
1641 d.addCallback(_check_cp_out2)
1643 # copy from tahoe into disk, overwriting an existing file
1644 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1645 def _check_cp_out3((out,err)):
1646 self.failUnless(os.path.exists(target_filename))
1647 got = open(target_filename,"rb").read()
1648 self.failUnlessEqual(got, datas[3])
1649 d.addCallback(_check_cp_out3)
1651 # copy from disk into tahoe, overwriting an existing immutable file
1652 d.addCallback(run, "cp", files[5], "tahoe:file4")
1653 d.addCallback(run, "ls")
1654 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1655 d.addCallback(run, "get", "tahoe:file4")
1656 d.addCallback(_check_stdout_against, 5)
1658 # copy from disk into tahoe, overwriting an existing mutable file
1659 d.addCallback(run, "cp", files[5], "tahoe:file3")
1660 d.addCallback(run, "ls")
1661 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1662 d.addCallback(run, "get", "tahoe:file3")
1663 d.addCallback(_check_stdout_against, 5)
1665 # recursive copy: setup
1666 dn = os.path.join(self.basedir, "dir1")
1668 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1669 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1670 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1671 sdn2 = os.path.join(dn, "subdir2")
1673 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1674 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1676 # from disk into tahoe
1677 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1678 d.addCallback(run, "ls")
1679 d.addCallback(_check_ls, ["dir1"])
1680 d.addCallback(run, "ls", "dir1")
1681 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1682 ["rfile4", "rfile5"])
1683 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1684 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1685 ["rfile1", "rfile2", "rfile3"])
1686 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1687 d.addCallback(_check_stdout_against, data="rfile4")
1689 # and back out again
1690 dn_copy = os.path.join(self.basedir, "dir1-copy")
1691 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1692 def _check_cp_r_out((out,err)):
1694 old = open(os.path.join(dn, name), "rb").read()
1695 newfn = os.path.join(dn_copy, name)
1696 self.failUnless(os.path.exists(newfn))
1697 new = open(newfn, "rb").read()
1698 self.failUnlessEqual(old, new)
1702 _cmp(os.path.join("subdir2", "rfile4"))
1703 _cmp(os.path.join("subdir2", "rfile5"))
1704 d.addCallback(_check_cp_r_out)
1706 # and copy it a second time, which ought to overwrite the same files
1707 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1709 # and tahoe-to-tahoe
1710 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1711 d.addCallback(run, "ls")
1712 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1713 d.addCallback(run, "ls", "dir1-copy")
1714 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1715 ["rfile4", "rfile5"])
1716 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1717 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1718 ["rfile1", "rfile2", "rfile3"])
1719 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1720 d.addCallback(_check_stdout_against, data="rfile4")
1722 # and copy it a second time, which ought to overwrite the same files
1723 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1725 # tahoe_ls doesn't currently handle the error correctly: it tries to
1726 # JSON-parse a traceback.
1727 ## def _ls_missing(res):
1728 ## argv = ["ls"] + nodeargs + ["bogus"]
1729 ## return self._run_cli(argv)
1730 ## d.addCallback(_ls_missing)
1731 ## def _check_ls_missing((out,err)):
1734 ## self.failUnlessEqual(err, "")
1735 ## d.addCallback(_check_ls_missing)
1739 def _run_cli(self, argv, stdin=""):
1741 stdout, stderr = StringIO(), StringIO()
1742 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1743 stdin=StringIO(stdin),
1744 stdout=stdout, stderr=stderr)
1746 return stdout.getvalue(), stderr.getvalue()
1747 d.addCallback(_done)
1750 def _test_checker(self, res):
1751 ut = upload.Data("too big to be literal" * 200, convergence=None)
1752 d = self._personal_node.add_file(u"big file", ut)
1754 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1755 def _check_dirnode_results(r):
1756 self.failUnless(r.is_healthy())
1757 d.addCallback(_check_dirnode_results)
1758 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1759 d.addCallback(_check_dirnode_results)
1761 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1762 def _got_chk_filenode(n):
1763 self.failUnless(isinstance(n, filenode.FileNode))
1764 d = n.check(Monitor())
1765 def _check_filenode_results(r):
1766 self.failUnless(r.is_healthy())
1767 d.addCallback(_check_filenode_results)
1768 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1769 d.addCallback(_check_filenode_results)
1771 d.addCallback(_got_chk_filenode)
1773 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1774 def _got_lit_filenode(n):
1775 self.failUnless(isinstance(n, filenode.LiteralFileNode))
1776 d = n.check(Monitor())
1777 def _check_lit_filenode_results(r):
1778 self.failUnlessEqual(r, None)
1779 d.addCallback(_check_lit_filenode_results)
1780 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1781 d.addCallback(_check_lit_filenode_results)
1783 d.addCallback(_got_lit_filenode)
1787 class MutableChecker(SystemTestMixin, unittest.TestCase, ErrorMixin):
1789 def _run_cli(self, argv):
1790 stdout, stderr = StringIO(), StringIO()
1791 # this can only do synchronous operations
1792 assert argv[0] == "debug"
1793 runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1794 return stdout.getvalue()
1796 def test_good(self):
1797 self.basedir = self.mktemp()
1798 d = self.set_up_nodes()
1799 CONTENTS = "a little bit of data"
1800 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1803 si = self.node.get_storage_index()
1804 d.addCallback(_created)
1805 # now make sure the webapi verifier sees no problems
1807 url = (self.webish_url +
1808 "uri/%s" % urllib.quote(self.node.get_uri()) +
1809 "?t=check&verify=true")
1810 return getPage(url, method="POST")
1811 d.addCallback(_do_check)
1812 def _got_results(out):
1813 self.failUnless("<span>Healthy : Healthy</span>" in out, out)
1814 self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1815 self.failIf("Not Healthy!" in out, out)
1816 self.failIf("Unhealthy" in out, out)
1817 self.failIf("Corrupt Shares" in out, out)
1818 d.addCallback(_got_results)
1819 d.addErrback(self.explain_web_error)
1822 def test_corrupt(self):
1823 self.basedir = self.mktemp()
1824 d = self.set_up_nodes()
1825 CONTENTS = "a little bit of data"
1826 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1829 si = self.node.get_storage_index()
1830 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1831 self.clients[1].basedir])
1832 files = out.split("\n")
1833 # corrupt one of them, using the CLI debug command
1835 shnum = os.path.basename(f)
1836 nodeid = self.clients[1].nodeid
1837 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1838 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1839 out = self._run_cli(["debug", "corrupt-share", files[0]])
1840 d.addCallback(_created)
1841 # now make sure the webapi verifier notices it
1843 url = (self.webish_url +
1844 "uri/%s" % urllib.quote(self.node.get_uri()) +
1845 "?t=check&verify=true")
1846 return getPage(url, method="POST")
1847 d.addCallback(_do_check)
1848 def _got_results(out):
1849 self.failUnless("Not Healthy!" in out, out)
1850 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1851 self.failUnless("Corrupt Shares:" in out, out)
1852 d.addCallback(_got_results)
1854 # now make sure the webapi repairer can fix it
1855 def _do_repair(res):
1856 url = (self.webish_url +
1857 "uri/%s" % urllib.quote(self.node.get_uri()) +
1858 "?t=check&verify=true&repair=true")
1859 return getPage(url, method="POST")
1860 d.addCallback(_do_repair)
1861 def _got_repair_results(out):
1862 self.failUnless("<div>Repair successful</div>" in out, out)
1863 d.addCallback(_got_repair_results)
1864 d.addCallback(_do_check)
1865 def _got_postrepair_results(out):
1866 self.failIf("Not Healthy!" in out, out)
1867 self.failUnless("Recoverable Versions: 10*seq" in out, out)
1868 d.addCallback(_got_postrepair_results)
1869 d.addErrback(self.explain_web_error)
1873 def test_delete_share(self):
1874 self.basedir = self.mktemp()
1875 d = self.set_up_nodes()
1876 CONTENTS = "a little bit of data"
1877 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1880 si = self.node.get_storage_index()
1881 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1882 self.clients[1].basedir])
1883 files = out.split("\n")
1884 # corrupt one of them, using the CLI debug command
1886 shnum = os.path.basename(f)
1887 nodeid = self.clients[1].nodeid
1888 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1889 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1891 d.addCallback(_created)
1892 # now make sure the webapi checker notices it
1894 url = (self.webish_url +
1895 "uri/%s" % urllib.quote(self.node.get_uri()) +
1896 "?t=check&verify=false")
1897 return getPage(url, method="POST")
1898 d.addCallback(_do_check)
1899 def _got_results(out):
1900 self.failUnless("Not Healthy!" in out, out)
1901 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1902 self.failIf("Corrupt Shares" in out, out)
1903 d.addCallback(_got_results)
1905 # now make sure the webapi repairer can fix it
1906 def _do_repair(res):
1907 url = (self.webish_url +
1908 "uri/%s" % urllib.quote(self.node.get_uri()) +
1909 "?t=check&verify=false&repair=true")
1910 return getPage(url, method="POST")
1911 d.addCallback(_do_repair)
1912 def _got_repair_results(out):
1913 self.failUnless("Repair successful" in out)
1914 d.addCallback(_got_repair_results)
1915 d.addCallback(_do_check)
1916 def _got_postrepair_results(out):
1917 self.failIf("Not Healthy!" in out, out)
1918 self.failUnless("Recoverable Versions: 10*seq" in out)
1919 d.addCallback(_got_postrepair_results)
1920 d.addErrback(self.explain_web_error)
1925 class DeepCheckBase(SystemTestMixin, ErrorMixin):
1927 def web_json(self, n, **kwargs):
1928 kwargs["output"] = "json"
1929 d = self.web(n, "POST", **kwargs)
1930 d.addCallback(self.decode_json)
1933 def decode_json(self, (s,url)):
1935 data = simplejson.loads(s)
1937 self.fail("%s: not JSON: '%s'" % (url, s))
1940 def web(self, n, method="GET", **kwargs):
1941 # returns (data, url)
1942 url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
1943 + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
1944 d = getPage(url, method=method)
1945 d.addCallback(lambda data: (data,url))
1948 def wait_for_operation(self, ignored, ophandle):
1949 url = self.webish_url + "operations/" + ophandle
1950 url += "?t=status&output=JSON"
1954 data = simplejson.loads(res)
1956 self.fail("%s: not JSON: '%s'" % (url, res))
1957 if not data["finished"]:
1958 d = self.stall(delay=1.0)
1959 d.addCallback(self.wait_for_operation, ophandle)
1965 def get_operation_results(self, ignored, ophandle, output=None):
1966 url = self.webish_url + "operations/" + ophandle
1969 url += "&output=" + output
1972 if output and output.lower() == "json":
1974 return simplejson.loads(res)
1976 self.fail("%s: not JSON: '%s'" % (url, res))
1981 def slow_web(self, n, output=None, **kwargs):
1983 handle = base32.b2a(os.urandom(4))
1984 d = self.web(n, "POST", ophandle=handle, **kwargs)
1985 d.addCallback(self.wait_for_operation, handle)
1986 d.addCallback(self.get_operation_results, handle, output=output)
1990 class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
1991 # construct a small directory tree (with one dir, one immutable file, one
1992 # mutable file, one LIT file, and a loop), and then check/examine it in
1995 def set_up_tree(self, ignored):
2004 c0 = self.clients[0]
2005 d = c0.create_empty_dirnode()
2006 def _created_root(n):
2008 self.root_uri = n.get_uri()
2009 d.addCallback(_created_root)
2010 d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
2011 d.addCallback(lambda n: self.root.set_node(u"mutable", n))
2012 def _created_mutable(n):
2014 self.mutable_uri = n.get_uri()
2015 d.addCallback(_created_mutable)
2017 large = upload.Data("Lots of data\n" * 1000, None)
2018 d.addCallback(lambda ign: self.root.add_file(u"large", large))
2019 def _created_large(n):
2021 self.large_uri = n.get_uri()
2022 d.addCallback(_created_large)
2024 small = upload.Data("Small enough for a LIT", None)
2025 d.addCallback(lambda ign: self.root.add_file(u"small", small))
2026 def _created_small(n):
2028 self.small_uri = n.get_uri()
2029 d.addCallback(_created_small)
2031 small2 = upload.Data("Small enough for a LIT too", None)
2032 d.addCallback(lambda ign: self.root.add_file(u"small2", small2))
2033 def _created_small2(n):
2035 self.small2_uri = n.get_uri()
2036 d.addCallback(_created_small2)
2038 d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
2041 def check_is_healthy(self, cr, n, where, incomplete=False):
2042 self.failUnless(ICheckResults.providedBy(cr), where)
2043 self.failUnless(cr.is_healthy(), where)
2044 self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
2046 self.failUnlessEqual(cr.get_storage_index_string(),
2047 base32.b2a(n.get_storage_index()), where)
2048 needs_rebalancing = bool( len(self.clients) < 10 )
2050 self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, str((where, cr, cr.get_data())))
2052 self.failUnlessEqual(d["count-shares-good"], 10, where)
2053 self.failUnlessEqual(d["count-shares-needed"], 3, where)
2054 self.failUnlessEqual(d["count-shares-expected"], 10, where)
2056 self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
2057 self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
2058 self.failUnlessEqual(d["list-corrupt-shares"], [], where)
2060 self.failUnlessEqual(sorted(d["servers-responding"]),
2061 sorted([c.nodeid for c in self.clients]),
2063 self.failUnless("sharemap" in d, str((where, d)))
2064 all_serverids = set()
2065 for (shareid, serverids) in d["sharemap"].items():
2066 all_serverids.update(serverids)
2067 self.failUnlessEqual(sorted(all_serverids),
2068 sorted([c.nodeid for c in self.clients]),
2071 self.failUnlessEqual(d["count-wrong-shares"], 0, where)
2072 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2073 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2076 def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
2077 self.failUnless(ICheckAndRepairResults.providedBy(cr), (where, cr))
2078 self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
2079 self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
2080 self.failUnless(cr.get_post_repair_results().is_healthy(), where)
2081 self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
2082 self.failIf(cr.get_repair_attempted(), where)
2084 def deep_check_is_healthy(self, cr, num_healthy, where):
2085 self.failUnless(IDeepCheckResults.providedBy(cr))
2086 self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
2089 def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
2090 self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
2091 c = cr.get_counters()
2092 self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
2094 self.failUnlessEqual(c["count-objects-healthy-post-repair"],
2096 self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
2098 def test_good(self):
2099 self.basedir = self.mktemp()
2100 d = self.set_up_nodes()
2101 d.addCallback(self.set_up_tree)
2102 d.addCallback(self.do_stats)
2103 d.addCallback(self.do_test_check_good)
2104 d.addCallback(self.do_test_web_good)
2105 d.addCallback(self.do_test_cli_good)
2106 d.addErrback(self.explain_web_error)
2107 d.addErrback(self.explain_error)
2110 def do_stats(self, ignored):
2111 d = defer.succeed(None)
2112 d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
2113 d.addCallback(self.check_stats_good)
2116 def check_stats_good(self, s):
2117 self.failUnlessEqual(s["count-directories"], 1)
2118 self.failUnlessEqual(s["count-files"], 4)
2119 self.failUnlessEqual(s["count-immutable-files"], 1)
2120 self.failUnlessEqual(s["count-literal-files"], 2)
2121 self.failUnlessEqual(s["count-mutable-files"], 1)
2122 # don't check directories: their size will vary
2123 # s["largest-directory"]
2124 # s["size-directories"]
2125 self.failUnlessEqual(s["largest-directory-children"], 5)
2126 self.failUnlessEqual(s["largest-immutable-file"], 13000)
2127 # to re-use this function for both the local
2128 # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
2129 # coerce the result into a list of tuples. dirnode.start_deep_stats()
2130 # returns a list of tuples, but JSON only knows about lists., so
2131 # t=start-deep-stats returns a list of lists.
2132 histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
2133 self.failUnlessEqual(histogram, [(11, 31, 2),
2136 self.failUnlessEqual(s["size-immutable-files"], 13000)
2137 self.failUnlessEqual(s["size-literal-files"], 48)
2139 def do_test_check_good(self, ignored):
2140 d = defer.succeed(None)
2141 # check the individual items
2142 d.addCallback(lambda ign: self.root.check(Monitor()))
2143 d.addCallback(self.check_is_healthy, self.root, "root")
2144 d.addCallback(lambda ign: self.mutable.check(Monitor()))
2145 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2146 d.addCallback(lambda ign: self.large.check(Monitor()))
2147 d.addCallback(self.check_is_healthy, self.large, "large")
2148 d.addCallback(lambda ign: self.small.check(Monitor()))
2149 d.addCallback(self.failUnlessEqual, None, "small")
2150 d.addCallback(lambda ign: self.small2.check(Monitor()))
2151 d.addCallback(self.failUnlessEqual, None, "small2")
2153 # and again with verify=True
2154 d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2155 d.addCallback(self.check_is_healthy, self.root, "root")
2156 d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2157 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2158 d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2159 d.addCallback(self.check_is_healthy, self.large, "large", incomplete=True)
2160 d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2161 d.addCallback(self.failUnlessEqual, None, "small")
2162 d.addCallback(lambda ign: self.small2.check(Monitor(), verify=True))
2163 d.addCallback(self.failUnlessEqual, None, "small2")
2165 # and check_and_repair(), which should be a nop
2166 d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2167 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2168 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2169 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2170 #TODO d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2171 #TODO d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2172 #TODO d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2173 #TODO d.addCallback(self.failUnlessEqual, None, "small")
2174 #TODO d.addCallback(lambda ign: self.small2.check_and_repair(Monitor()))
2175 #TODO d.addCallback(self.failUnlessEqual, None, "small2")
2177 # check_and_repair(verify=True)
2178 d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2179 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2180 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2181 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2182 #TODO d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2183 #TODO d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2184 #TODO incomplete=True)
2185 #TODO d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2186 #TODO d.addCallback(self.failUnlessEqual, None, "small")
2187 #TODO d.addCallback(lambda ign: self.small2.check_and_repair(Monitor(), verify=True))
2188 #TODO d.addCallback(self.failUnlessEqual, None, "small2")
2191 # now deep-check the root, with various verify= and repair= options
2192 d.addCallback(lambda ign:
2193 self.root.start_deep_check().when_done())
2194 d.addCallback(self.deep_check_is_healthy, 3, "root")
2195 d.addCallback(lambda ign:
2196 self.root.start_deep_check(verify=True).when_done())
2197 d.addCallback(self.deep_check_is_healthy, 3, "root")
2198 d.addCallback(lambda ign:
2199 self.root.start_deep_check_and_repair().when_done())
2200 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2201 d.addCallback(lambda ign:
2202 self.root.start_deep_check_and_repair(verify=True).when_done())
2203 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2205 # and finally, start a deep-check, but then cancel it.
2206 d.addCallback(lambda ign: self.root.start_deep_check())
2207 def _checking(monitor):
2209 d = monitor.when_done()
2210 # this should fire as soon as the next dirnode.list finishes.
2211 # TODO: add a counter to measure how many list() calls are made,
2212 # assert that no more than one gets to run before the cancel()
2214 def _finished_normally(res):
2215 self.fail("this was supposed to fail, not finish normally")
2217 f.trap(OperationCancelledError)
2218 d.addCallbacks(_finished_normally, _cancelled)
2220 d.addCallback(_checking)
2224 def json_check_is_healthy(self, data, n, where, incomplete=False):
2226 self.failUnlessEqual(data["storage-index"],
2227 base32.b2a(n.get_storage_index()), where)
2228 self.failUnless("summary" in data, (where, data))
2229 self.failUnlessEqual(data["summary"].lower(), "healthy",
2230 "%s: '%s'" % (where, data["summary"]))
2232 self.failUnlessEqual(r["healthy"], True, where)
2233 needs_rebalancing = bool( len(self.clients) < 10 )
2235 self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2236 self.failUnlessEqual(r["count-shares-good"], 10, where)
2237 self.failUnlessEqual(r["count-shares-needed"], 3, where)
2238 self.failUnlessEqual(r["count-shares-expected"], 10, where)
2240 self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2241 self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2242 self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2244 self.failUnlessEqual(sorted(r["servers-responding"]),
2245 sorted([idlib.nodeid_b2a(c.nodeid)
2246 for c in self.clients]), where)
2247 self.failUnless("sharemap" in r, where)
2248 all_serverids = set()
2249 for (shareid, serverids_s) in r["sharemap"].items():
2250 all_serverids.update(serverids_s)
2251 self.failUnlessEqual(sorted(all_serverids),
2252 sorted([idlib.nodeid_b2a(c.nodeid)
2253 for c in self.clients]), where)
2254 self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2255 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2256 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2258 def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2259 self.failUnlessEqual(data["storage-index"],
2260 base32.b2a(n.get_storage_index()), where)
2261 self.failUnlessEqual(data["repair-attempted"], False, where)
2262 self.json_check_is_healthy(data["pre-repair-results"],
2263 n, where, incomplete)
2264 self.json_check_is_healthy(data["post-repair-results"],
2265 n, where, incomplete)
2267 def json_full_deepcheck_is_healthy(self, data, n, where):
2268 self.failUnlessEqual(data["root-storage-index"],
2269 base32.b2a(n.get_storage_index()), where)
2270 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2271 self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2272 self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2273 self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2274 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2275 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2276 self.json_check_stats_good(data["stats"], where)
2278 def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2279 self.failUnlessEqual(data["root-storage-index"],
2280 base32.b2a(n.get_storage_index()), where)
2281 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2283 self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2284 self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2285 self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2287 self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2288 self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2289 self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2291 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2292 self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2293 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2295 self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2296 self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2297 self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2300 def json_check_lit(self, data, n, where):
2301 self.failUnlessEqual(data["storage-index"], "", where)
2302 self.failUnlessEqual(data["results"]["healthy"], True, where)
2304 def json_check_stats_good(self, data, where):
2305 self.check_stats_good(data)
2307 def do_test_web_good(self, ignored):
2308 d = defer.succeed(None)
2311 d.addCallback(lambda ign:
2312 self.slow_web(self.root,
2313 t="start-deep-stats", output="json"))
2314 d.addCallback(self.json_check_stats_good, "deep-stats")
2317 d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2318 d.addCallback(self.json_check_is_healthy, self.root, "root")
2319 d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2320 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2321 d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2322 d.addCallback(self.json_check_is_healthy, self.large, "large")
2323 d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2324 d.addCallback(self.json_check_lit, self.small, "small")
2325 d.addCallback(lambda ign: self.web_json(self.small2, t="check"))
2326 d.addCallback(self.json_check_lit, self.small2, "small2")
2329 d.addCallback(lambda ign:
2330 self.web_json(self.root, t="check", verify="true"))
2331 d.addCallback(self.json_check_is_healthy, self.root, "root+v")
2332 d.addCallback(lambda ign:
2333 self.web_json(self.mutable, t="check", verify="true"))
2334 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable+v")
2335 d.addCallback(lambda ign:
2336 self.web_json(self.large, t="check", verify="true"))
2337 d.addCallback(self.json_check_is_healthy, self.large, "large+v",
2339 d.addCallback(lambda ign:
2340 self.web_json(self.small, t="check", verify="true"))
2341 d.addCallback(self.json_check_lit, self.small, "small+v")
2342 d.addCallback(lambda ign:
2343 self.web_json(self.small2, t="check", verify="true"))
2344 d.addCallback(self.json_check_lit, self.small2, "small2+v")
2346 # check and repair, no verify
2347 d.addCallback(lambda ign:
2348 self.web_json(self.root, t="check", repair="true"))
2349 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+r")
2350 d.addCallback(lambda ign:
2351 self.web_json(self.mutable, t="check", repair="true"))
2352 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+r")
2353 d.addCallback(lambda ign:
2354 self.web_json(self.large, t="check", repair="true"))
2355 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+r")
2356 d.addCallback(lambda ign:
2357 self.web_json(self.small, t="check", repair="true"))
2358 d.addCallback(self.json_check_lit, self.small, "small+r")
2359 d.addCallback(lambda ign:
2360 self.web_json(self.small2, t="check", repair="true"))
2361 d.addCallback(self.json_check_lit, self.small2, "small2+r")
2363 # check+verify+repair
2364 d.addCallback(lambda ign:
2365 self.web_json(self.root, t="check", repair="true", verify="true"))
2366 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+vr")
2367 d.addCallback(lambda ign:
2368 self.web_json(self.mutable, t="check", repair="true", verify="true"))
2369 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+vr")
2370 d.addCallback(lambda ign:
2371 self.web_json(self.large, t="check", repair="true", verify="true"))
2372 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+vr", incomplete=True)
2373 d.addCallback(lambda ign:
2374 self.web_json(self.small, t="check", repair="true", verify="true"))
2375 d.addCallback(self.json_check_lit, self.small, "small+vr")
2376 d.addCallback(lambda ign:
2377 self.web_json(self.small2, t="check", repair="true", verify="true"))
2378 d.addCallback(self.json_check_lit, self.small2, "small2+vr")
2380 # now run a deep-check, with various verify= and repair= flags
2381 d.addCallback(lambda ign:
2382 self.slow_web(self.root, t="start-deep-check", output="json"))
2383 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+d")
2384 d.addCallback(lambda ign:
2385 self.slow_web(self.root, t="start-deep-check", verify="true",
2387 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+dv")
2388 d.addCallback(lambda ign:
2389 self.slow_web(self.root, t="start-deep-check", repair="true",
2391 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dr")
2392 d.addCallback(lambda ign:
2393 self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2394 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dvr")
2396 # now look at t=info
2397 d.addCallback(lambda ign: self.web(self.root, t="info"))
2398 # TODO: examine the output
2399 d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2400 d.addCallback(lambda ign: self.web(self.large, t="info"))
2401 d.addCallback(lambda ign: self.web(self.small, t="info"))
2402 d.addCallback(lambda ign: self.web(self.small2, t="info"))
2406 def _run_cli(self, argv, stdin=""):
2408 stdout, stderr = StringIO(), StringIO()
2409 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
2410 stdin=StringIO(stdin),
2411 stdout=stdout, stderr=stderr)
2413 return stdout.getvalue(), stderr.getvalue()
2414 d.addCallback(_done)
2417 def do_test_cli_good(self, ignored):
2418 basedir = self.getdir("client0")
2419 d = self._run_cli(["manifest",
2420 "--node-directory", basedir,
2422 def _check((out,err)):
2423 self.failUnlessEqual(err, "")
2424 lines = [l for l in out.split("\n") if l]
2425 self.failUnlessEqual(len(lines), 5)
2429 cap, path = l.split(None, 1)
2434 self.failUnless(self.root.get_uri() in caps)
2435 self.failUnlessEqual(caps[self.root.get_uri()], "")
2436 self.failUnlessEqual(caps[self.mutable.get_uri()], "mutable")
2437 self.failUnlessEqual(caps[self.large.get_uri()], "large")
2438 self.failUnlessEqual(caps[self.small.get_uri()], "small")
2439 self.failUnlessEqual(caps[self.small2.get_uri()], "small2")
2440 d.addCallback(_check)
2442 d.addCallback(lambda res:
2443 self._run_cli(["manifest",
2444 "--node-directory", basedir,
2445 "--storage-index", self.root_uri]))
2446 def _check2((out,err)):
2447 self.failUnlessEqual(err, "")
2448 lines = [l for l in out.split("\n") if l]
2449 self.failUnlessEqual(len(lines), 3)
2450 self.failUnless(base32.b2a(self.root.get_storage_index()) in lines)
2451 self.failUnless(base32.b2a(self.mutable.get_storage_index()) in lines)
2452 self.failUnless(base32.b2a(self.large.get_storage_index()) in lines)
2453 d.addCallback(_check2)
2455 d.addCallback(lambda res:
2456 self._run_cli(["manifest",
2457 "--node-directory", basedir,
2458 "--raw", self.root_uri]))
2459 def _check2r((out,err)):
2460 self.failUnlessEqual(err, "")
2461 data = simplejson.loads(out)
2462 sis = data["storage-index"]
2463 self.failUnlessEqual(len(sis), 3)
2464 self.failUnless(base32.b2a(self.root.get_storage_index()) in sis)
2465 self.failUnless(base32.b2a(self.mutable.get_storage_index()) in sis)
2466 self.failUnless(base32.b2a(self.large.get_storage_index()) in sis)
2467 self.failUnlessEqual(data["stats"]["count-files"], 4)
2468 self.failUnlessEqual(data["origin"],
2469 base32.b2a(self.root.get_storage_index()))
2470 verifycaps = data["verifycaps"]
2471 self.failUnlessEqual(len(verifycaps), 3)
2472 self.failUnless(self.root.get_verify_cap().to_string() in verifycaps)
2473 self.failUnless(self.mutable.get_verify_cap().to_string() in verifycaps)
2474 self.failUnless(self.large.get_verify_cap().to_string() in verifycaps)
2475 d.addCallback(_check2r)
2477 d.addCallback(lambda res:
2478 self._run_cli(["stats",
2479 "--node-directory", basedir,
2481 def _check3((out,err)):
2482 lines = [l.strip() for l in out.split("\n") if l]
2483 self.failUnless("count-immutable-files: 1" in lines)
2484 self.failUnless("count-mutable-files: 1" in lines)
2485 self.failUnless("count-literal-files: 2" in lines)
2486 self.failUnless("count-files: 4" in lines)
2487 self.failUnless("count-directories: 1" in lines)
2488 self.failUnless("size-immutable-files: 13000 (13.00 kB, 12.70 kiB)" in lines, lines)
2489 self.failUnless("size-literal-files: 48" in lines)
2490 self.failUnless(" 11-31 : 2 (31 B, 31 B)".strip() in lines)
2491 self.failUnless("10001-31622 : 1 (31.62 kB, 30.88 kiB)".strip() in lines)
2492 d.addCallback(_check3)
2494 d.addCallback(lambda res:
2495 self._run_cli(["stats",
2496 "--node-directory", basedir,
2499 def _check4((out,err)):
2500 data = simplejson.loads(out)
2501 self.failUnlessEqual(data["count-immutable-files"], 1)
2502 self.failUnlessEqual(data["count-immutable-files"], 1)
2503 self.failUnlessEqual(data["count-mutable-files"], 1)
2504 self.failUnlessEqual(data["count-literal-files"], 2)
2505 self.failUnlessEqual(data["count-files"], 4)
2506 self.failUnlessEqual(data["count-directories"], 1)
2507 self.failUnlessEqual(data["size-immutable-files"], 13000)
2508 self.failUnlessEqual(data["size-literal-files"], 48)
2509 self.failUnless([11,31,2] in data["size-files-histogram"])
2510 self.failUnless([10001,31622,1] in data["size-files-histogram"])
2511 d.addCallback(_check4)
2516 class DeepCheckWebBad(DeepCheckBase, unittest.TestCase):
2519 self.basedir = self.mktemp()
2520 d = self.set_up_nodes()
2521 d.addCallback(self.set_up_damaged_tree)
2522 d.addCallback(self.do_check)
2523 d.addCallback(self.do_deepcheck)
2524 d.addCallback(self.do_test_web_bad)
2525 d.addErrback(self.explain_web_error)
2526 d.addErrback(self.explain_error)
2531 def set_up_damaged_tree(self, ignored):
2536 # mutable-missing-shares
2537 # mutable-corrupt-shares
2538 # mutable-unrecoverable
2540 # large-missing-shares
2541 # large-corrupt-shares
2542 # large-unrecoverable
2546 c0 = self.clients[0]
2547 d = c0.create_empty_dirnode()
2548 def _created_root(n):
2550 self.root_uri = n.get_uri()
2551 d.addCallback(_created_root)
2552 d.addCallback(self.create_mangled, "mutable-good")
2553 d.addCallback(self.create_mangled, "mutable-missing-shares")
2554 d.addCallback(self.create_mangled, "mutable-corrupt-shares")
2555 d.addCallback(self.create_mangled, "mutable-unrecoverable")
2556 d.addCallback(self.create_mangled, "large-good")
2557 d.addCallback(self.create_mangled, "large-missing-shares")
2558 d.addCallback(self.create_mangled, "large-corrupt-shares")
2559 d.addCallback(self.create_mangled, "large-unrecoverable")
2564 def create_mangled(self, ignored, name):
2565 nodetype, mangletype = name.split("-", 1)
2566 if nodetype == "mutable":
2567 d = self.clients[0].create_mutable_file("mutable file contents")
2568 d.addCallback(lambda n: self.root.set_node(unicode(name), n))
2569 elif nodetype == "large":
2570 large = upload.Data("Lots of data\n" * 1000 + name + "\n", None)
2571 d = self.root.add_file(unicode(name), large)
2572 elif nodetype == "small":
2573 small = upload.Data("Small enough for a LIT", None)
2574 d = self.root.add_file(unicode(name), small)
2576 def _stash_node(node):
2577 self.nodes[name] = node
2579 d.addCallback(_stash_node)
2581 if mangletype == "good":
2583 elif mangletype == "missing-shares":
2584 d.addCallback(self._delete_some_shares)
2585 elif mangletype == "corrupt-shares":
2586 d.addCallback(self._corrupt_some_shares)
2588 assert mangletype == "unrecoverable"
2589 d.addCallback(self._delete_most_shares)
2593 def _run_cli(self, argv):
2594 stdout, stderr = StringIO(), StringIO()
2595 # this can only do synchronous operations
2596 assert argv[0] == "debug"
2597 runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
2598 return stdout.getvalue()
2600 def _find_shares(self, node):
2601 si = node.get_storage_index()
2602 out = self._run_cli(["debug", "find-shares", base32.b2a(si)] +
2603 [c.basedir for c in self.clients])
2604 files = out.split("\n")
2605 return [f for f in files if f]
2607 def _delete_some_shares(self, node):
2608 shares = self._find_shares(node)
2609 os.unlink(shares[0])
2610 os.unlink(shares[1])
2612 def _corrupt_some_shares(self, node):
2613 shares = self._find_shares(node)
2614 self._run_cli(["debug", "corrupt-share", shares[0]])
2615 self._run_cli(["debug", "corrupt-share", shares[1]])
2617 def _delete_most_shares(self, node):
2618 shares = self._find_shares(node)
2619 for share in shares[1:]:
2623 def check_is_healthy(self, cr, where):
2625 self.failUnless(ICheckResults.providedBy(cr), (cr, type(cr), where))
2626 self.failUnless(cr.is_healthy(), (cr.get_report(), cr.is_healthy(), cr.get_summary(), where))
2627 self.failUnless(cr.is_recoverable(), where)
2629 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2630 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2632 except Exception, le:
2633 le.args = tuple(le.args + (where,))
2636 def check_is_missing_shares(self, cr, where):
2637 self.failUnless(ICheckResults.providedBy(cr), where)
2638 self.failIf(cr.is_healthy(), where)
2639 self.failUnless(cr.is_recoverable(), where)
2641 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2642 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2645 def check_has_corrupt_shares(self, cr, where):
2646 # by "corrupt-shares" we mean the file is still recoverable
2647 self.failUnless(ICheckResults.providedBy(cr), where)
2649 self.failIf(cr.is_healthy(), (where, cr))
2650 self.failUnless(cr.is_recoverable(), where)
2652 self.failUnless(d["count-shares-good"] < 10, where)
2653 self.failUnless(d["count-corrupt-shares"], where)
2654 self.failUnless(d["list-corrupt-shares"], where)
2657 def check_is_unrecoverable(self, cr, where):
2658 self.failUnless(ICheckResults.providedBy(cr), where)
2660 self.failIf(cr.is_healthy(), where)
2661 self.failIf(cr.is_recoverable(), where)
2662 self.failUnless(d["count-shares-good"] < d["count-shares-needed"], (d["count-shares-good"], d["count-shares-needed"], where))
2663 self.failUnlessEqual(d["count-recoverable-versions"], 0, where)
2664 self.failUnlessEqual(d["count-unrecoverable-versions"], 1, where)
2667 def do_check(self, ignored):
2668 d = defer.succeed(None)
2670 # check the individual items, without verification. This will not
2671 # detect corrupt shares.
2672 def _check(which, checker):
2673 d = self.nodes[which].check(Monitor())
2674 d.addCallback(checker, which + "--check")
2677 d.addCallback(lambda ign: _check("mutable-good", self.check_is_healthy))
2678 d.addCallback(lambda ign: _check("mutable-missing-shares",
2679 self.check_is_missing_shares))
2680 d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2681 self.check_is_healthy))
2682 d.addCallback(lambda ign: _check("mutable-unrecoverable",
2683 self.check_is_unrecoverable))
2684 d.addCallback(lambda ign: _check("large-good", self.check_is_healthy))
2685 d.addCallback(lambda ign: _check("large-missing-shares",
2686 self.check_is_missing_shares))
2687 d.addCallback(lambda ign: _check("large-corrupt-shares",
2688 self.check_is_healthy))
2689 d.addCallback(lambda ign: _check("large-unrecoverable",
2690 self.check_is_unrecoverable))
2692 # and again with verify=True, which *does* detect corrupt shares.
2693 def _checkv(which, checker):
2694 d = self.nodes[which].check(Monitor(), verify=True)
2695 d.addCallback(checker, which + "--check-and-verify")
2698 d.addCallback(lambda ign: _checkv("mutable-good", self.check_is_healthy))
2699 d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2700 self.check_is_missing_shares))
2701 d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2702 self.check_has_corrupt_shares))
2703 d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2704 self.check_is_unrecoverable))
2705 d.addCallback(lambda ign: _checkv("large-good", self.check_is_healthy))
2706 d.addCallback(lambda ign: _checkv("large-missing-shares", self.check_is_missing_shares))
2707 d.addCallback(lambda ign: _checkv("large-corrupt-shares", self.check_has_corrupt_shares))
2708 d.addCallback(lambda ign: _checkv("large-unrecoverable",
2709 self.check_is_unrecoverable))
2713 def do_deepcheck(self, ignored):
2714 d = defer.succeed(None)
2716 # now deep-check the root, with various verify= and repair= options
2717 d.addCallback(lambda ign:
2718 self.root.start_deep_check().when_done())
2720 self.failUnless(IDeepCheckResults.providedBy(cr))
2721 c = cr.get_counters()
2722 self.failUnlessEqual(c["count-objects-checked"], 9)
2723 self.failUnlessEqual(c["count-objects-healthy"], 5)
2724 self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2725 self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2726 d.addCallback(_check1)
2728 d.addCallback(lambda ign:
2729 self.root.start_deep_check(verify=True).when_done())
2731 self.failUnless(IDeepCheckResults.providedBy(cr))
2732 c = cr.get_counters()
2733 self.failUnlessEqual(c["count-objects-checked"], 9)
2734 self.failUnlessEqual(c["count-objects-healthy"], 3)
2735 self.failUnlessEqual(c["count-objects-unhealthy"], 6)
2736 self.failUnlessEqual(c["count-objects-healthy"], 3) # root, mutable good, large good
2737 self.failUnlessEqual(c["count-objects-unrecoverable"], 2) # mutable unrecoverable, large unrecoverable
2738 d.addCallback(_check2)
2742 def json_is_healthy(self, data, where):
2744 self.failUnless(r["healthy"], where)
2745 self.failUnless(r["recoverable"], where)
2746 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2747 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2749 def json_is_missing_shares(self, data, where):
2751 self.failIf(r["healthy"], where)
2752 self.failUnless(r["recoverable"], where)
2753 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2754 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2756 def json_has_corrupt_shares(self, data, where):
2757 # by "corrupt-shares" we mean the file is still recoverable
2759 self.failIf(r["healthy"], where)
2760 self.failUnless(r["recoverable"], where)
2761 self.failUnless(r["count-shares-good"] < 10, where)
2762 self.failUnless(r["count-corrupt-shares"], where)
2763 self.failUnless(r["list-corrupt-shares"], where)
2765 def json_is_unrecoverable(self, data, where):
2767 self.failIf(r["healthy"], where)
2768 self.failIf(r["recoverable"], where)
2769 self.failUnless(r["count-shares-good"] < r["count-shares-needed"],
2771 self.failUnlessEqual(r["count-recoverable-versions"], 0, where)
2772 self.failUnlessEqual(r["count-unrecoverable-versions"], 1, where)
2774 def do_test_web_bad(self, ignored):
2775 d = defer.succeed(None)
2778 def _check(which, checker):
2779 d = self.web_json(self.nodes[which], t="check")
2780 d.addCallback(checker, which + "--webcheck")
2783 d.addCallback(lambda ign: _check("mutable-good",
2784 self.json_is_healthy))
2785 d.addCallback(lambda ign: _check("mutable-missing-shares",
2786 self.json_is_missing_shares))
2787 d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2788 self.json_is_healthy))
2789 d.addCallback(lambda ign: _check("mutable-unrecoverable",
2790 self.json_is_unrecoverable))
2791 d.addCallback(lambda ign: _check("large-good",
2792 self.json_is_healthy))
2793 d.addCallback(lambda ign: _check("large-missing-shares",
2794 self.json_is_missing_shares))
2795 d.addCallback(lambda ign: _check("large-corrupt-shares",
2796 self.json_is_healthy))
2797 d.addCallback(lambda ign: _check("large-unrecoverable",
2798 self.json_is_unrecoverable))
2801 def _checkv(which, checker):
2802 d = self.web_json(self.nodes[which], t="check", verify="true")
2803 d.addCallback(checker, which + "--webcheck-and-verify")
2806 d.addCallback(lambda ign: _checkv("mutable-good",
2807 self.json_is_healthy))
2808 d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2809 self.json_is_missing_shares))
2810 d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2811 self.json_has_corrupt_shares))
2812 d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2813 self.json_is_unrecoverable))
2814 d.addCallback(lambda ign: _checkv("large-good",
2815 self.json_is_healthy))
2816 d.addCallback(lambda ign: _checkv("large-missing-shares", self.json_is_missing_shares))
2817 d.addCallback(lambda ign: _checkv("large-corrupt-shares", self.json_has_corrupt_shares))
2818 d.addCallback(lambda ign: _checkv("large-unrecoverable",
2819 self.json_is_unrecoverable))