1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
11 from allmydata import uri, storage, offloaded
12 from allmydata.immutable import download, upload, filenode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17 ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
18 IDeepCheckAndRepairResults, NoSuchChildError, NotEnoughSharesError
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
27 from allmydata.test.common import SystemTestMixin, WebErrorMixin, \
28 MemoryConsumer, download_to_data
31 This is some data to publish to the virtual drive, which needs to be large
32 enough to not fit inside a LIT uri.
35 class CountingDataUploadable(upload.Data):
37 interrupt_after = None
38 interrupt_after_d = None
40 def read(self, length):
41 self.bytes_read += length
42 if self.interrupt_after is not None:
43 if self.bytes_read > self.interrupt_after:
44 self.interrupt_after = None
45 self.interrupt_after_d.callback(self)
46 return upload.Data.read(self, length)
48 class GrabEverythingConsumer:
54 def registerProducer(self, producer, streaming):
56 assert IPushProducer.providedBy(producer)
58 def write(self, data):
61 def unregisterProducer(self):
64 class SystemTest(SystemTestMixin, unittest.TestCase):
66 def test_connections(self):
67 self.basedir = "system/SystemTest/test_connections"
68 d = self.set_up_nodes()
69 self.extra_node = None
70 d.addCallback(lambda res: self.add_extra_node(self.numclients))
71 def _check(extra_node):
72 self.extra_node = extra_node
73 for c in self.clients:
74 all_peerids = list(c.get_all_peerids())
75 self.failUnlessEqual(len(all_peerids), self.numclients+1)
76 permuted_peers = list(c.get_permuted_peers("storage", "a"))
77 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
80 def _shutdown_extra_node(res):
82 return self.extra_node.stopService()
84 d.addBoth(_shutdown_extra_node)
86 test_connections.timeout = 300
87 # test_connections is subsumed by test_upload_and_download, and takes
88 # quite a while to run on a slow machine (because of all the TLS
89 # connections that must be established). If we ever rework the introducer
90 # code to such an extent that we're not sure if it works anymore, we can
91 # reinstate this test until it does.
94 def test_upload_and_download_random_key(self):
95 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
96 return self._test_upload_and_download(convergence=None)
97 test_upload_and_download_random_key.timeout = 4800
99 def test_upload_and_download_convergent(self):
100 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
101 return self._test_upload_and_download(convergence="some convergence string")
102 test_upload_and_download_convergent.timeout = 4800
104 def _test_upload_and_download(self, convergence):
105 # we use 4000 bytes of data, which will result in about 400k written
106 # to disk among all our simulated nodes
107 DATA = "Some data to upload\n" * 200
108 d = self.set_up_nodes()
109 def _check_connections(res):
110 for c in self.clients:
111 all_peerids = list(c.get_all_peerids())
112 self.failUnlessEqual(len(all_peerids), self.numclients)
113 permuted_peers = list(c.get_permuted_peers("storage", "a"))
114 self.failUnlessEqual(len(permuted_peers), self.numclients)
115 d.addCallback(_check_connections)
119 u = self.clients[0].getServiceNamed("uploader")
121 # we crank the max segsize down to 1024b for the duration of this
122 # test, so we can exercise multiple segments. It is important
123 # that this is not a multiple of the segment size, so that the
124 # tail segment is not the same length as the others. This actualy
125 # gets rounded up to 1025 to be a multiple of the number of
126 # required shares (since we use 25 out of 100 FEC).
127 up = upload.Data(DATA, convergence=convergence)
128 up.max_segment_size = 1024
131 d.addCallback(_do_upload)
132 def _upload_done(results):
134 log.msg("upload finished: uri is %s" % (uri,))
136 dl = self.clients[1].getServiceNamed("downloader")
138 d.addCallback(_upload_done)
140 def _upload_again(res):
141 # Upload again. If using convergent encryption then this ought to be
142 # short-circuited, however with the way we currently generate URIs
143 # (i.e. because they include the roothash), we have to do all of the
144 # encoding work, and only get to save on the upload part.
145 log.msg("UPLOADING AGAIN")
146 up = upload.Data(DATA, convergence=convergence)
147 up.max_segment_size = 1024
148 d1 = self.uploader.upload(up)
149 d.addCallback(_upload_again)
151 def _download_to_data(res):
152 log.msg("DOWNLOADING")
153 return self.downloader.download_to_data(self.uri)
154 d.addCallback(_download_to_data)
155 def _download_to_data_done(data):
156 log.msg("download finished")
157 self.failUnlessEqual(data, DATA)
158 d.addCallback(_download_to_data_done)
160 target_filename = os.path.join(self.basedir, "download.target")
161 def _download_to_filename(res):
162 return self.downloader.download_to_filename(self.uri,
164 d.addCallback(_download_to_filename)
165 def _download_to_filename_done(res):
166 newdata = open(target_filename, "rb").read()
167 self.failUnlessEqual(newdata, DATA)
168 d.addCallback(_download_to_filename_done)
170 target_filename2 = os.path.join(self.basedir, "download.target2")
171 def _download_to_filehandle(res):
172 fh = open(target_filename2, "wb")
173 return self.downloader.download_to_filehandle(self.uri, fh)
174 d.addCallback(_download_to_filehandle)
175 def _download_to_filehandle_done(fh):
177 newdata = open(target_filename2, "rb").read()
178 self.failUnlessEqual(newdata, DATA)
179 d.addCallback(_download_to_filehandle_done)
181 consumer = GrabEverythingConsumer()
182 ct = download.ConsumerAdapter(consumer)
183 d.addCallback(lambda res:
184 self.downloader.download(self.uri, ct))
185 def _download_to_consumer_done(ign):
186 self.failUnlessEqual(consumer.contents, DATA)
187 d.addCallback(_download_to_consumer_done)
190 n = self.clients[1].create_node_from_uri(self.uri)
191 d = download_to_data(n)
192 def _read_done(data):
193 self.failUnlessEqual(data, DATA)
194 d.addCallback(_read_done)
195 d.addCallback(lambda ign:
196 n.read(MemoryConsumer(), offset=1, size=4))
197 def _read_portion_done(mc):
198 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
199 d.addCallback(_read_portion_done)
200 d.addCallback(lambda ign:
201 n.read(MemoryConsumer(), offset=2, size=None))
202 def _read_tail_done(mc):
203 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
204 d.addCallback(_read_tail_done)
207 d.addCallback(_test_read)
209 def _test_bad_read(res):
210 bad_u = uri.from_string_filenode(self.uri)
211 bad_u.key = self.flip_bit(bad_u.key)
212 bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
213 # this should cause an error during download
215 d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
217 bad_n.read, MemoryConsumer(), offset=2)
219 d.addCallback(_test_bad_read)
221 def _download_nonexistent_uri(res):
222 baduri = self.mangle_uri(self.uri)
223 log.msg("about to download non-existent URI", level=log.UNUSUAL,
224 facility="tahoe.tests")
225 d1 = self.downloader.download_to_data(baduri)
226 def _baduri_should_fail(res):
227 log.msg("finished downloading non-existend URI",
228 level=log.UNUSUAL, facility="tahoe.tests")
229 self.failUnless(isinstance(res, Failure))
230 self.failUnless(res.check(NotEnoughSharesError),
231 "expected NotEnoughSharesError, got %s" % res)
232 # TODO: files that have zero peers should get a special kind
233 # of NotEnoughSharesError, which can be used to suggest that
234 # the URI might be wrong or that they've never uploaded the
235 # file in the first place.
236 d1.addBoth(_baduri_should_fail)
238 d.addCallback(_download_nonexistent_uri)
240 # add a new node, which doesn't accept shares, and only uses the
242 d.addCallback(lambda res: self.add_extra_node(self.numclients,
244 add_to_sparent=True))
245 def _added(extra_node):
246 self.extra_node = extra_node
247 extra_node.getServiceNamed("storage").sizelimit = 0
248 d.addCallback(_added)
250 HELPER_DATA = "Data that needs help to upload" * 1000
251 def _upload_with_helper(res):
252 u = upload.Data(HELPER_DATA, convergence=convergence)
253 d = self.extra_node.upload(u)
254 def _uploaded(results):
256 return self.downloader.download_to_data(uri)
257 d.addCallback(_uploaded)
259 self.failUnlessEqual(newdata, HELPER_DATA)
260 d.addCallback(_check)
262 d.addCallback(_upload_with_helper)
264 def _upload_duplicate_with_helper(res):
265 u = upload.Data(HELPER_DATA, convergence=convergence)
266 u.debug_stash_RemoteEncryptedUploadable = True
267 d = self.extra_node.upload(u)
268 def _uploaded(results):
270 return self.downloader.download_to_data(uri)
271 d.addCallback(_uploaded)
273 self.failUnlessEqual(newdata, HELPER_DATA)
274 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
275 "uploadable started uploading, should have been avoided")
276 d.addCallback(_check)
278 if convergence is not None:
279 d.addCallback(_upload_duplicate_with_helper)
281 def _upload_resumable(res):
282 DATA = "Data that needs help to upload and gets interrupted" * 1000
283 u1 = CountingDataUploadable(DATA, convergence=convergence)
284 u2 = CountingDataUploadable(DATA, convergence=convergence)
286 # we interrupt the connection after about 5kB by shutting down
287 # the helper, then restartingit.
288 u1.interrupt_after = 5000
289 u1.interrupt_after_d = defer.Deferred()
290 u1.interrupt_after_d.addCallback(lambda res:
291 self.bounce_client(0))
293 # sneak into the helper and reduce its chunk size, so that our
294 # debug_interrupt will sever the connection on about the fifth
295 # chunk fetched. This makes sure that we've started to write the
296 # new shares before we abandon them, which exercises the
297 # abort/delete-partial-share code. TODO: find a cleaner way to do
298 # this. I know that this will affect later uses of the helper in
299 # this same test run, but I'm not currently worried about it.
300 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
302 d = self.extra_node.upload(u1)
304 def _should_not_finish(res):
305 self.fail("interrupted upload should have failed, not finished"
306 " with result %s" % (res,))
308 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
310 # make sure we actually interrupted it before finishing the
312 self.failUnless(u1.bytes_read < len(DATA),
313 "read %d out of %d total" % (u1.bytes_read,
316 log.msg("waiting for reconnect", level=log.NOISY,
317 facility="tahoe.test.test_system")
318 # now, we need to give the nodes a chance to notice that this
319 # connection has gone away. When this happens, the storage
320 # servers will be told to abort their uploads, removing the
321 # partial shares. Unfortunately this involves TCP messages
322 # going through the loopback interface, and we can't easily
323 # predict how long that will take. If it were all local, we
324 # could use fireEventually() to stall. Since we don't have
325 # the right introduction hooks, the best we can do is use a
326 # fixed delay. TODO: this is fragile.
327 u1.interrupt_after_d.addCallback(self.stall, 2.0)
328 return u1.interrupt_after_d
329 d.addCallbacks(_should_not_finish, _interrupted)
331 def _disconnected(res):
332 # check to make sure the storage servers aren't still hanging
333 # on to the partial share: their incoming/ directories should
335 log.msg("disconnected", level=log.NOISY,
336 facility="tahoe.test.test_system")
337 for i in range(self.numclients):
338 incdir = os.path.join(self.getdir("client%d" % i),
339 "storage", "shares", "incoming")
340 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
341 d.addCallback(_disconnected)
343 # then we need to give the reconnector a chance to
344 # reestablish the connection to the helper.
345 d.addCallback(lambda res:
346 log.msg("wait_for_connections", level=log.NOISY,
347 facility="tahoe.test.test_system"))
348 d.addCallback(lambda res: self.wait_for_connections())
351 d.addCallback(lambda res:
352 log.msg("uploading again", level=log.NOISY,
353 facility="tahoe.test.test_system"))
354 d.addCallback(lambda res: self.extra_node.upload(u2))
356 def _uploaded(results):
358 log.msg("Second upload complete", level=log.NOISY,
359 facility="tahoe.test.test_system")
361 # this is really bytes received rather than sent, but it's
362 # convenient and basically measures the same thing
363 bytes_sent = results.ciphertext_fetched
365 # We currently don't support resumption of upload if the data is
366 # encrypted with a random key. (Because that would require us
367 # to store the key locally and re-use it on the next upload of
368 # this file, which isn't a bad thing to do, but we currently
370 if convergence is not None:
371 # Make sure we did not have to read the whole file the
372 # second time around .
373 self.failUnless(bytes_sent < len(DATA),
374 "resumption didn't save us any work:"
375 " read %d bytes out of %d total" %
376 (bytes_sent, len(DATA)))
378 # Make sure we did have to read the whole file the second
379 # time around -- because the one that we partially uploaded
380 # earlier was encrypted with a different random key.
381 self.failIf(bytes_sent < len(DATA),
382 "resumption saved us some work even though we were using random keys:"
383 " read %d bytes out of %d total" %
384 (bytes_sent, len(DATA)))
385 return self.downloader.download_to_data(uri)
386 d.addCallback(_uploaded)
389 self.failUnlessEqual(newdata, DATA)
390 # If using convergent encryption, then also check that the
391 # helper has removed the temp file from its directories.
392 if convergence is not None:
393 basedir = os.path.join(self.getdir("client0"), "helper")
394 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
395 self.failUnlessEqual(files, [])
396 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
397 self.failUnlessEqual(files, [])
398 d.addCallback(_check)
400 d.addCallback(_upload_resumable)
404 def _find_shares(self, basedir):
406 for (dirpath, dirnames, filenames) in os.walk(basedir):
407 if "storage" not in dirpath:
411 pieces = dirpath.split(os.sep)
412 if pieces[-4] == "storage" and pieces[-3] == "shares":
413 # we're sitting in .../storage/shares/$START/$SINDEX , and there
414 # are sharefiles here
415 assert pieces[-5].startswith("client")
416 client_num = int(pieces[-5][-1])
417 storage_index_s = pieces[-1]
418 storage_index = storage.si_a2b(storage_index_s)
419 for sharename in filenames:
420 shnum = int(sharename)
421 filename = os.path.join(dirpath, sharename)
422 data = (client_num, storage_index, filename, shnum)
425 self.fail("unable to find any share files in %s" % basedir)
428 def _corrupt_mutable_share(self, filename, which):
429 msf = storage.MutableShareFile(filename)
430 datav = msf.readv([ (0, 1000000) ])
431 final_share = datav[0]
432 assert len(final_share) < 1000000 # ought to be truncated
433 pieces = mutable_layout.unpack_share(final_share)
434 (seqnum, root_hash, IV, k, N, segsize, datalen,
435 verification_key, signature, share_hash_chain, block_hash_tree,
436 share_data, enc_privkey) = pieces
438 if which == "seqnum":
441 root_hash = self.flip_bit(root_hash)
443 IV = self.flip_bit(IV)
444 elif which == "segsize":
445 segsize = segsize + 15
446 elif which == "pubkey":
447 verification_key = self.flip_bit(verification_key)
448 elif which == "signature":
449 signature = self.flip_bit(signature)
450 elif which == "share_hash_chain":
451 nodenum = share_hash_chain.keys()[0]
452 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
453 elif which == "block_hash_tree":
454 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
455 elif which == "share_data":
456 share_data = self.flip_bit(share_data)
457 elif which == "encprivkey":
458 enc_privkey = self.flip_bit(enc_privkey)
460 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
462 final_share = mutable_layout.pack_share(prefix,
469 msf.writev( [(0, final_share)], None)
472 def test_mutable(self):
473 self.basedir = "system/SystemTest/test_mutable"
474 DATA = "initial contents go here." # 25 bytes % 3 != 0
475 NEWDATA = "new contents yay"
476 NEWERDATA = "this is getting old"
478 d = self.set_up_nodes(use_key_generator=True)
480 def _create_mutable(res):
482 log.msg("starting create_mutable_file")
483 d1 = c.create_mutable_file(DATA)
485 log.msg("DONE: %s" % (res,))
486 self._mutable_node_1 = res
488 d1.addCallback(_done)
490 d.addCallback(_create_mutable)
492 def _test_debug(res):
493 # find a share. It is important to run this while there is only
494 # one slot in the grid.
495 shares = self._find_shares(self.basedir)
496 (client_num, storage_index, filename, shnum) = shares[0]
497 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
499 log.msg(" for clients[%d]" % client_num)
501 out,err = StringIO(), StringIO()
502 rc = runner.runner(["debug", "dump-share", "--offsets",
504 stdout=out, stderr=err)
505 output = out.getvalue()
506 self.failUnlessEqual(rc, 0)
508 self.failUnless("Mutable slot found:\n" in output)
509 self.failUnless("share_type: SDMF\n" in output)
510 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
511 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
512 self.failUnless(" num_extra_leases: 0\n" in output)
513 # the pubkey size can vary by a byte, so the container might
514 # be a bit larger on some runs.
515 m = re.search(r'^ container_size: (\d+)$', output, re.M)
517 container_size = int(m.group(1))
518 self.failUnless(2037 <= container_size <= 2049, container_size)
519 m = re.search(r'^ data_length: (\d+)$', output, re.M)
521 data_length = int(m.group(1))
522 self.failUnless(2037 <= data_length <= 2049, data_length)
523 self.failUnless(" secrets are for nodeid: %s\n" % peerid
525 self.failUnless(" SDMF contents:\n" in output)
526 self.failUnless(" seqnum: 1\n" in output)
527 self.failUnless(" required_shares: 3\n" in output)
528 self.failUnless(" total_shares: 10\n" in output)
529 self.failUnless(" segsize: 27\n" in output, (output, filename))
530 self.failUnless(" datalen: 25\n" in output)
531 # the exact share_hash_chain nodes depends upon the sharenum,
532 # and is more of a hassle to compute than I want to deal with
534 self.failUnless(" share_hash_chain: " in output)
535 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
536 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
537 base32.b2a(storage_index))
538 self.failUnless(expected in output)
539 except unittest.FailTest:
541 print "dump-share output was:"
544 d.addCallback(_test_debug)
548 # first, let's see if we can use the existing node to retrieve the
549 # contents. This allows it to use the cached pubkey and maybe the
550 # latest-known sharemap.
552 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
553 def _check_download_1(res):
554 self.failUnlessEqual(res, DATA)
555 # now we see if we can retrieve the data from a new node,
556 # constructed using the URI of the original one. We do this test
557 # on the same client that uploaded the data.
558 uri = self._mutable_node_1.get_uri()
559 log.msg("starting retrieve1")
560 newnode = self.clients[0].create_node_from_uri(uri)
561 newnode_2 = self.clients[0].create_node_from_uri(uri)
562 self.failUnlessIdentical(newnode, newnode_2)
563 return newnode.download_best_version()
564 d.addCallback(_check_download_1)
566 def _check_download_2(res):
567 self.failUnlessEqual(res, DATA)
568 # same thing, but with a different client
569 uri = self._mutable_node_1.get_uri()
570 newnode = self.clients[1].create_node_from_uri(uri)
571 log.msg("starting retrieve2")
572 d1 = newnode.download_best_version()
573 d1.addCallback(lambda res: (res, newnode))
575 d.addCallback(_check_download_2)
577 def _check_download_3((res, newnode)):
578 self.failUnlessEqual(res, DATA)
580 log.msg("starting replace1")
581 d1 = newnode.overwrite(NEWDATA)
582 d1.addCallback(lambda res: newnode.download_best_version())
584 d.addCallback(_check_download_3)
586 def _check_download_4(res):
587 self.failUnlessEqual(res, NEWDATA)
588 # now create an even newer node and replace the data on it. This
589 # new node has never been used for download before.
590 uri = self._mutable_node_1.get_uri()
591 newnode1 = self.clients[2].create_node_from_uri(uri)
592 newnode2 = self.clients[3].create_node_from_uri(uri)
593 self._newnode3 = self.clients[3].create_node_from_uri(uri)
594 log.msg("starting replace2")
595 d1 = newnode1.overwrite(NEWERDATA)
596 d1.addCallback(lambda res: newnode2.download_best_version())
598 d.addCallback(_check_download_4)
600 def _check_download_5(res):
601 log.msg("finished replace2")
602 self.failUnlessEqual(res, NEWERDATA)
603 d.addCallback(_check_download_5)
605 def _corrupt_shares(res):
606 # run around and flip bits in all but k of the shares, to test
608 shares = self._find_shares(self.basedir)
609 ## sort by share number
610 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
611 where = dict([ (shnum, filename)
612 for (client_num, storage_index, filename, shnum)
614 assert len(where) == 10 # this test is designed for 3-of-10
615 for shnum, filename in where.items():
616 # shares 7,8,9 are left alone. read will check
617 # (share_hash_chain, block_hash_tree, share_data). New
618 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
619 # segsize, signature).
621 # read: this will trigger "pubkey doesn't match
623 self._corrupt_mutable_share(filename, "pubkey")
624 self._corrupt_mutable_share(filename, "encprivkey")
626 # triggers "signature is invalid"
627 self._corrupt_mutable_share(filename, "seqnum")
629 # triggers "signature is invalid"
630 self._corrupt_mutable_share(filename, "R")
632 # triggers "signature is invalid"
633 self._corrupt_mutable_share(filename, "segsize")
635 self._corrupt_mutable_share(filename, "share_hash_chain")
637 self._corrupt_mutable_share(filename, "block_hash_tree")
639 self._corrupt_mutable_share(filename, "share_data")
640 # other things to correct: IV, signature
641 # 7,8,9 are left alone
643 # note that initial_query_count=5 means that we'll hit the
644 # first 5 servers in effectively random order (based upon
645 # response time), so we won't necessarily ever get a "pubkey
646 # doesn't match fingerprint" error (if we hit shnum>=1 before
647 # shnum=0, we pull the pubkey from there). To get repeatable
648 # specific failures, we need to set initial_query_count=1,
649 # but of course that will change the sequencing behavior of
650 # the retrieval process. TODO: find a reasonable way to make
651 # this a parameter, probably when we expand this test to test
652 # for one failure mode at a time.
654 # when we retrieve this, we should get three signature
655 # failures (where we've mangled seqnum, R, and segsize). The
657 d.addCallback(_corrupt_shares)
659 d.addCallback(lambda res: self._newnode3.download_best_version())
660 d.addCallback(_check_download_5)
662 def _check_empty_file(res):
663 # make sure we can create empty files, this usually screws up the
665 d1 = self.clients[2].create_mutable_file("")
666 d1.addCallback(lambda newnode: newnode.download_best_version())
667 d1.addCallback(lambda res: self.failUnlessEqual("", res))
669 d.addCallback(_check_empty_file)
671 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
672 def _created_dirnode(dnode):
673 log.msg("_created_dirnode(%s)" % (dnode,))
675 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
676 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
677 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
678 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
679 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
680 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
681 d1.addCallback(lambda res: dnode.build_manifest().when_done())
682 d1.addCallback(lambda manifest:
683 self.failUnlessEqual(len(manifest), 1))
685 d.addCallback(_created_dirnode)
687 def wait_for_c3_kg_conn():
688 return self.clients[3]._key_generator is not None
689 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
691 def check_kg_poolsize(junk, size_delta):
692 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
693 self.key_generator_svc.key_generator.pool_size + size_delta)
695 d.addCallback(check_kg_poolsize, 0)
696 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
697 d.addCallback(check_kg_poolsize, -1)
698 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
699 d.addCallback(check_kg_poolsize, -2)
700 # use_helper induces use of clients[3], which is the using-key_gen client
701 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
702 d.addCallback(check_kg_poolsize, -3)
705 # The default 120 second timeout went off when running it under valgrind
706 # on my old Windows laptop, so I'm bumping up the timeout.
707 test_mutable.timeout = 240
709 def flip_bit(self, good):
710 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
712 def mangle_uri(self, gooduri):
713 # change the key, which changes the storage index, which means we'll
714 # be asking about the wrong file, so nobody will have any shares
715 u = IFileURI(gooduri)
716 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
717 uri_extension_hash=u.uri_extension_hash,
718 needed_shares=u.needed_shares,
719 total_shares=u.total_shares,
721 return u2.to_string()
723 # TODO: add a test which mangles the uri_extension_hash instead, and
724 # should fail due to not being able to get a valid uri_extension block.
725 # Also a test which sneakily mangles the uri_extension block to change
726 # some of the validation data, so it will fail in the post-download phase
727 # when the file's crypttext integrity check fails. Do the same thing for
728 # the key, which should cause the download to fail the post-download
729 # plaintext_hash check.
731 def test_vdrive(self):
732 self.basedir = "system/SystemTest/test_vdrive"
733 self.data = LARGE_DATA
734 d = self.set_up_nodes(use_stats_gatherer=True)
735 d.addCallback(self._test_introweb)
736 d.addCallback(self.log, "starting publish")
737 d.addCallback(self._do_publish1)
738 d.addCallback(self._test_runner)
739 d.addCallback(self._do_publish2)
740 # at this point, we have the following filesystem (where "R" denotes
741 # self._root_directory_uri):
744 # R/subdir1/mydata567
746 # R/subdir1/subdir2/mydata992
748 d.addCallback(lambda res: self.bounce_client(0))
749 d.addCallback(self.log, "bounced client0")
751 d.addCallback(self._check_publish1)
752 d.addCallback(self.log, "did _check_publish1")
753 d.addCallback(self._check_publish2)
754 d.addCallback(self.log, "did _check_publish2")
755 d.addCallback(self._do_publish_private)
756 d.addCallback(self.log, "did _do_publish_private")
757 # now we also have (where "P" denotes a new dir):
758 # P/personal/sekrit data
759 # P/s2-rw -> /subdir1/subdir2/
760 # P/s2-ro -> /subdir1/subdir2/ (read-only)
761 d.addCallback(self._check_publish_private)
762 d.addCallback(self.log, "did _check_publish_private")
763 d.addCallback(self._test_web)
764 d.addCallback(self._test_control)
765 d.addCallback(self._test_cli)
766 # P now has four top-level children:
767 # P/personal/sekrit data
770 # P/test_put/ (empty)
771 d.addCallback(self._test_checker)
772 d.addCallback(self._grab_stats)
774 test_vdrive.timeout = 1100
776 def _test_introweb(self, res):
777 d = getPage(self.introweb_url, method="GET", followRedirect=True)
780 self.failUnless("allmydata: %s" % str(allmydata.__version__)
782 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
783 self.failUnless("Subscription Summary: storage: 5" in res)
784 except unittest.FailTest:
786 print "GET %s output was:" % self.introweb_url
789 d.addCallback(_check)
790 d.addCallback(lambda res:
791 getPage(self.introweb_url + "?t=json",
792 method="GET", followRedirect=True))
793 def _check_json(res):
794 data = simplejson.loads(res)
796 self.failUnlessEqual(data["subscription_summary"],
798 self.failUnlessEqual(data["announcement_summary"],
799 {"storage": 5, "stub_client": 5})
800 except unittest.FailTest:
802 print "GET %s?t=json output was:" % self.introweb_url
805 d.addCallback(_check_json)
808 def _do_publish1(self, res):
809 ut = upload.Data(self.data, convergence=None)
811 d = c0.create_empty_dirnode()
812 def _made_root(new_dirnode):
813 self._root_directory_uri = new_dirnode.get_uri()
814 return c0.create_node_from_uri(self._root_directory_uri)
815 d.addCallback(_made_root)
816 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
817 def _made_subdir1(subdir1_node):
818 self._subdir1_node = subdir1_node
819 d1 = subdir1_node.add_file(u"mydata567", ut)
820 d1.addCallback(self.log, "publish finished")
821 def _stash_uri(filenode):
822 self.uri = filenode.get_uri()
823 d1.addCallback(_stash_uri)
825 d.addCallback(_made_subdir1)
828 def _do_publish2(self, res):
829 ut = upload.Data(self.data, convergence=None)
830 d = self._subdir1_node.create_empty_directory(u"subdir2")
831 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
834 def log(self, res, *args, **kwargs):
835 # print "MSG: %s RES: %s" % (msg, args)
836 log.msg(*args, **kwargs)
839 def _do_publish_private(self, res):
840 self.smalldata = "sssh, very secret stuff"
841 ut = upload.Data(self.smalldata, convergence=None)
842 d = self.clients[0].create_empty_dirnode()
843 d.addCallback(self.log, "GOT private directory")
844 def _got_new_dir(privnode):
845 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
846 d1 = privnode.create_empty_directory(u"personal")
847 d1.addCallback(self.log, "made P/personal")
848 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
849 d1.addCallback(self.log, "made P/personal/sekrit data")
850 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
852 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
853 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
855 d1.addCallback(_got_s2)
856 d1.addCallback(lambda res: privnode)
858 d.addCallback(_got_new_dir)
861 def _check_publish1(self, res):
862 # this one uses the iterative API
864 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
865 d.addCallback(self.log, "check_publish1 got /")
866 d.addCallback(lambda root: root.get(u"subdir1"))
867 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
868 d.addCallback(lambda filenode: filenode.download_to_data())
869 d.addCallback(self.log, "get finished")
871 self.failUnlessEqual(data, self.data)
872 d.addCallback(_get_done)
875 def _check_publish2(self, res):
876 # this one uses the path-based API
877 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
878 d = rootnode.get_child_at_path(u"subdir1")
879 d.addCallback(lambda dirnode:
880 self.failUnless(IDirectoryNode.providedBy(dirnode)))
881 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
882 d.addCallback(lambda filenode: filenode.download_to_data())
883 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
885 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
886 def _got_filenode(filenode):
887 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
888 assert fnode == filenode
889 d.addCallback(_got_filenode)
892 def _check_publish_private(self, resnode):
893 # this one uses the path-based API
894 self._private_node = resnode
896 d = self._private_node.get_child_at_path(u"personal")
897 def _got_personal(personal):
898 self._personal_node = personal
900 d.addCallback(_got_personal)
902 d.addCallback(lambda dirnode:
903 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
905 return self._private_node.get_child_at_path(path)
907 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
908 d.addCallback(lambda filenode: filenode.download_to_data())
909 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
910 d.addCallback(lambda res: get_path(u"s2-rw"))
911 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
912 d.addCallback(lambda res: get_path(u"s2-ro"))
913 def _got_s2ro(dirnode):
914 self.failUnless(dirnode.is_mutable(), dirnode)
915 self.failUnless(dirnode.is_readonly(), dirnode)
916 d1 = defer.succeed(None)
917 d1.addCallback(lambda res: dirnode.list())
918 d1.addCallback(self.log, "dirnode.list")
920 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
922 d1.addCallback(self.log, "doing add_file(ro)")
923 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
924 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
926 d1.addCallback(self.log, "doing get(ro)")
927 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
928 d1.addCallback(lambda filenode:
929 self.failUnless(IFileNode.providedBy(filenode)))
931 d1.addCallback(self.log, "doing delete(ro)")
932 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
934 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
936 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
938 personal = self._personal_node
939 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
941 d1.addCallback(self.log, "doing move_child_to(ro)2")
942 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
944 d1.addCallback(self.log, "finished with _got_s2ro")
946 d.addCallback(_got_s2ro)
947 def _got_home(dummy):
948 home = self._private_node
949 personal = self._personal_node
950 d1 = defer.succeed(None)
951 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
952 d1.addCallback(lambda res:
953 personal.move_child_to(u"sekrit data",home,u"sekrit"))
955 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
956 d1.addCallback(lambda res:
957 home.move_child_to(u"sekrit", home, u"sekrit data"))
959 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
960 d1.addCallback(lambda res:
961 home.move_child_to(u"sekrit data", personal))
963 d1.addCallback(lambda res: home.build_manifest().when_done())
964 d1.addCallback(self.log, "manifest")
968 # P/personal/sekrit data
969 # P/s2-rw (same as P/s2-ro)
970 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
971 d1.addCallback(lambda manifest:
972 self.failUnlessEqual(len(manifest), 5))
973 d1.addCallback(lambda res: home.start_deep_stats().when_done())
974 def _check_stats(stats):
975 expected = {"count-immutable-files": 1,
976 "count-mutable-files": 0,
977 "count-literal-files": 1,
979 "count-directories": 3,
980 "size-immutable-files": 112,
981 "size-literal-files": 23,
982 #"size-directories": 616, # varies
983 #"largest-directory": 616,
984 "largest-directory-children": 3,
985 "largest-immutable-file": 112,
987 for k,v in expected.iteritems():
988 self.failUnlessEqual(stats[k], v,
989 "stats[%s] was %s, not %s" %
991 self.failUnless(stats["size-directories"] > 1300,
992 stats["size-directories"])
993 self.failUnless(stats["largest-directory"] > 800,
994 stats["largest-directory"])
995 self.failUnlessEqual(stats["size-files-histogram"],
996 [ (11, 31, 1), (101, 316, 1) ])
997 d1.addCallback(_check_stats)
999 d.addCallback(_got_home)
1002 def shouldFail(self, res, expected_failure, which, substring=None):
1003 if isinstance(res, Failure):
1004 res.trap(expected_failure)
1006 self.failUnless(substring in str(res),
1007 "substring '%s' not in '%s'"
1008 % (substring, str(res)))
1010 self.fail("%s was supposed to raise %s, not get '%s'" %
1011 (which, expected_failure, res))
1013 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1014 assert substring is None or isinstance(substring, str)
1015 d = defer.maybeDeferred(callable, *args, **kwargs)
1017 if isinstance(res, Failure):
1018 res.trap(expected_failure)
1020 self.failUnless(substring in str(res),
1021 "substring '%s' not in '%s'"
1022 % (substring, str(res)))
1024 self.fail("%s was supposed to raise %s, not get '%s'" %
1025 (which, expected_failure, res))
1029 def PUT(self, urlpath, data):
1030 url = self.webish_url + urlpath
1031 return getPage(url, method="PUT", postdata=data)
1033 def GET(self, urlpath, followRedirect=False):
1034 url = self.webish_url + urlpath
1035 return getPage(url, method="GET", followRedirect=followRedirect)
1037 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1039 url = self.helper_webish_url + urlpath
1041 url = self.webish_url + urlpath
1042 sepbase = "boogabooga"
1043 sep = "--" + sepbase
1046 form.append('Content-Disposition: form-data; name="_charset"')
1048 form.append('UTF-8')
1050 for name, value in fields.iteritems():
1051 if isinstance(value, tuple):
1052 filename, value = value
1053 form.append('Content-Disposition: form-data; name="%s"; '
1054 'filename="%s"' % (name, filename.encode("utf-8")))
1056 form.append('Content-Disposition: form-data; name="%s"' % name)
1058 form.append(str(value))
1061 body = "\r\n".join(form) + "\r\n"
1062 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1064 return getPage(url, method="POST", postdata=body,
1065 headers=headers, followRedirect=followRedirect)
1067 def _test_web(self, res):
1068 base = self.webish_url
1069 public = "uri/" + self._root_directory_uri
1071 def _got_welcome(page):
1072 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1073 self.failUnless(expected in page,
1074 "I didn't see the right 'connected storage servers'"
1075 " message in: %s" % page
1077 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1078 self.failUnless(expected in page,
1079 "I didn't see the right 'My nodeid' message "
1081 self.failUnless("Helper: 0 active uploads" in page)
1082 d.addCallback(_got_welcome)
1083 d.addCallback(self.log, "done with _got_welcome")
1085 # get the welcome page from the node that uses the helper too
1086 d.addCallback(lambda res: getPage(self.helper_webish_url))
1087 def _got_welcome_helper(page):
1088 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1090 self.failUnless("Not running helper" in page)
1091 d.addCallback(_got_welcome_helper)
1093 d.addCallback(lambda res: getPage(base + public))
1094 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1095 def _got_subdir1(page):
1096 # there ought to be an href for our file
1097 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1098 self.failUnless(">mydata567</a>" in page)
1099 d.addCallback(_got_subdir1)
1100 d.addCallback(self.log, "done with _got_subdir1")
1101 d.addCallback(lambda res:
1102 getPage(base + public + "/subdir1/mydata567"))
1103 def _got_data(page):
1104 self.failUnlessEqual(page, self.data)
1105 d.addCallback(_got_data)
1107 # download from a URI embedded in a URL
1108 d.addCallback(self.log, "_get_from_uri")
1109 def _get_from_uri(res):
1110 return getPage(base + "uri/%s?filename=%s"
1111 % (self.uri, "mydata567"))
1112 d.addCallback(_get_from_uri)
1113 def _got_from_uri(page):
1114 self.failUnlessEqual(page, self.data)
1115 d.addCallback(_got_from_uri)
1117 # download from a URI embedded in a URL, second form
1118 d.addCallback(self.log, "_get_from_uri2")
1119 def _get_from_uri2(res):
1120 return getPage(base + "uri?uri=%s" % (self.uri,))
1121 d.addCallback(_get_from_uri2)
1122 d.addCallback(_got_from_uri)
1124 # download from a bogus URI, make sure we get a reasonable error
1125 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1126 def _get_from_bogus_uri(res):
1127 d1 = getPage(base + "uri/%s?filename=%s"
1128 % (self.mangle_uri(self.uri), "mydata567"))
1129 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1132 d.addCallback(_get_from_bogus_uri)
1133 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1135 # upload a file with PUT
1136 d.addCallback(self.log, "about to try PUT")
1137 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1138 "new.txt contents"))
1139 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1140 d.addCallback(self.failUnlessEqual, "new.txt contents")
1141 # and again with something large enough to use multiple segments,
1142 # and hopefully trigger pauseProducing too
1143 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1144 "big" * 500000)) # 1.5MB
1145 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1146 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1148 # can we replace files in place?
1149 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1151 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1152 d.addCallback(self.failUnlessEqual, "NEWER contents")
1154 # test unlinked POST
1155 d.addCallback(lambda res: self.POST("uri", t="upload",
1156 file=("new.txt", "data" * 10000)))
1157 # and again using the helper, which exercises different upload-status
1159 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1160 file=("foo.txt", "data2" * 10000)))
1162 # check that the status page exists
1163 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1164 def _got_status(res):
1165 # find an interesting upload and download to look at. LIT files
1166 # are not interesting.
1167 for ds in self.clients[0].list_all_download_statuses():
1168 if ds.get_size() > 200:
1169 self._down_status = ds.get_counter()
1170 for us in self.clients[0].list_all_upload_statuses():
1171 if us.get_size() > 200:
1172 self._up_status = us.get_counter()
1173 rs = self.clients[0].list_all_retrieve_statuses()[0]
1174 self._retrieve_status = rs.get_counter()
1175 ps = self.clients[0].list_all_publish_statuses()[0]
1176 self._publish_status = ps.get_counter()
1177 us = self.clients[0].list_all_mapupdate_statuses()[0]
1178 self._update_status = us.get_counter()
1180 # and that there are some upload- and download- status pages
1181 return self.GET("status/up-%d" % self._up_status)
1182 d.addCallback(_got_status)
1184 return self.GET("status/down-%d" % self._down_status)
1185 d.addCallback(_got_up)
1187 return self.GET("status/mapupdate-%d" % self._update_status)
1188 d.addCallback(_got_down)
1189 def _got_update(res):
1190 return self.GET("status/publish-%d" % self._publish_status)
1191 d.addCallback(_got_update)
1192 def _got_publish(res):
1193 return self.GET("status/retrieve-%d" % self._retrieve_status)
1194 d.addCallback(_got_publish)
1196 # check that the helper status page exists
1197 d.addCallback(lambda res:
1198 self.GET("helper_status", followRedirect=True))
1199 def _got_helper_status(res):
1200 self.failUnless("Bytes Fetched:" in res)
1201 # touch a couple of files in the helper's working directory to
1202 # exercise more code paths
1203 workdir = os.path.join(self.getdir("client0"), "helper")
1204 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1205 f = open(incfile, "wb")
1206 f.write("small file")
1208 then = time.time() - 86400*3
1210 os.utime(incfile, (now, then))
1211 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1212 f = open(encfile, "wb")
1213 f.write("less small file")
1215 os.utime(encfile, (now, then))
1216 d.addCallback(_got_helper_status)
1217 # and that the json form exists
1218 d.addCallback(lambda res:
1219 self.GET("helper_status?t=json", followRedirect=True))
1220 def _got_helper_status_json(res):
1221 data = simplejson.loads(res)
1222 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1224 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1225 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1226 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1228 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1229 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1230 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1232 d.addCallback(_got_helper_status_json)
1234 # and check that client[3] (which uses a helper but does not run one
1235 # itself) doesn't explode when you ask for its status
1236 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1237 def _got_non_helper_status(res):
1238 self.failUnless("Upload and Download Status" in res)
1239 d.addCallback(_got_non_helper_status)
1241 # or for helper status with t=json
1242 d.addCallback(lambda res:
1243 getPage(self.helper_webish_url + "helper_status?t=json"))
1244 def _got_non_helper_status_json(res):
1245 data = simplejson.loads(res)
1246 self.failUnlessEqual(data, {})
1247 d.addCallback(_got_non_helper_status_json)
1249 # see if the statistics page exists
1250 d.addCallback(lambda res: self.GET("statistics"))
1251 def _got_stats(res):
1252 self.failUnless("Node Statistics" in res)
1253 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1254 d.addCallback(_got_stats)
1255 d.addCallback(lambda res: self.GET("statistics?t=json"))
1256 def _got_stats_json(res):
1257 data = simplejson.loads(res)
1258 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1259 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1260 d.addCallback(_got_stats_json)
1262 # TODO: mangle the second segment of a file, to test errors that
1263 # occur after we've already sent some good data, which uses a
1264 # different error path.
1266 # TODO: download a URI with a form
1267 # TODO: create a directory by using a form
1268 # TODO: upload by using a form on the directory page
1269 # url = base + "somedir/subdir1/freeform_post!!upload"
1270 # TODO: delete a file by using a button on the directory page
1274 def _test_runner(self, res):
1275 # exercise some of the diagnostic tools in runner.py
1278 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1279 if "storage" not in dirpath:
1283 pieces = dirpath.split(os.sep)
1284 if pieces[-4] == "storage" and pieces[-3] == "shares":
1285 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1286 # are sharefiles here
1287 filename = os.path.join(dirpath, filenames[0])
1288 # peek at the magic to see if it is a chk share
1289 magic = open(filename, "rb").read(4)
1290 if magic == '\x00\x00\x00\x01':
1293 self.fail("unable to find any uri_extension files in %s"
1295 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1297 out,err = StringIO(), StringIO()
1298 rc = runner.runner(["debug", "dump-share", "--offsets",
1300 stdout=out, stderr=err)
1301 output = out.getvalue()
1302 self.failUnlessEqual(rc, 0)
1304 # we only upload a single file, so we can assert some things about
1305 # its size and shares.
1306 self.failUnless(("share filename: %s" % filename) in output)
1307 self.failUnless("size: %d\n" % len(self.data) in output)
1308 self.failUnless("num_segments: 1\n" in output)
1309 # segment_size is always a multiple of needed_shares
1310 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1311 self.failUnless("total_shares: 10\n" in output)
1312 # keys which are supposed to be present
1313 for key in ("size", "num_segments", "segment_size",
1314 "needed_shares", "total_shares",
1315 "codec_name", "codec_params", "tail_codec_params",
1316 #"plaintext_hash", "plaintext_root_hash",
1317 "crypttext_hash", "crypttext_root_hash",
1318 "share_root_hash", "UEB_hash"):
1319 self.failUnless("%s: " % key in output, key)
1320 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1322 # now use its storage index to find the other shares using the
1323 # 'find-shares' tool
1324 sharedir, shnum = os.path.split(filename)
1325 storagedir, storage_index_s = os.path.split(sharedir)
1326 out,err = StringIO(), StringIO()
1327 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1328 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1329 rc = runner.runner(cmd, stdout=out, stderr=err)
1330 self.failUnlessEqual(rc, 0)
1332 sharefiles = [sfn.strip() for sfn in out.readlines()]
1333 self.failUnlessEqual(len(sharefiles), 10)
1335 # also exercise the 'catalog-shares' tool
1336 out,err = StringIO(), StringIO()
1337 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1338 cmd = ["debug", "catalog-shares"] + nodedirs
1339 rc = runner.runner(cmd, stdout=out, stderr=err)
1340 self.failUnlessEqual(rc, 0)
1342 descriptions = [sfn.strip() for sfn in out.readlines()]
1343 self.failUnlessEqual(len(descriptions), 30)
1345 for line in descriptions
1346 if line.startswith("CHK %s " % storage_index_s)]
1347 self.failUnlessEqual(len(matching), 10)
1349 def _test_control(self, res):
1350 # exercise the remote-control-the-client foolscap interfaces in
1351 # allmydata.control (mostly used for performance tests)
1352 c0 = self.clients[0]
1353 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1354 control_furl = open(control_furl_file, "r").read().strip()
1355 # it doesn't really matter which Tub we use to connect to the client,
1356 # so let's just use our IntroducerNode's
1357 d = self.introducer.tub.getReference(control_furl)
1358 d.addCallback(self._test_control2, control_furl_file)
1360 def _test_control2(self, rref, filename):
1361 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1362 downfile = os.path.join(self.basedir, "control.downfile")
1363 d.addCallback(lambda uri:
1364 rref.callRemote("download_from_uri_to_file",
1367 self.failUnlessEqual(res, downfile)
1368 data = open(downfile, "r").read()
1369 expected_data = open(filename, "r").read()
1370 self.failUnlessEqual(data, expected_data)
1371 d.addCallback(_check)
1372 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1373 if sys.platform == "linux2":
1374 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1375 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1378 def _test_cli(self, res):
1379 # run various CLI commands (in a thread, since they use blocking
1382 private_uri = self._private_node.get_uri()
1383 some_uri = self._root_directory_uri
1384 client0_basedir = self.getdir("client0")
1387 "--node-directory", client0_basedir,
1389 TESTDATA = "I will not write the same thing over and over.\n" * 100
1391 d = defer.succeed(None)
1393 # for compatibility with earlier versions, private/root_dir.cap is
1394 # supposed to be treated as an alias named "tahoe:". Start by making
1395 # sure that works, before we add other aliases.
1397 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1398 f = open(root_file, "w")
1399 f.write(private_uri)
1402 def run(ignored, verb, *args, **kwargs):
1403 stdin = kwargs.get("stdin", "")
1404 newargs = [verb] + nodeargs + list(args)
1405 return self._run_cli(newargs, stdin=stdin)
1407 def _check_ls((out,err), expected_children, unexpected_children=[]):
1408 self.failUnlessEqual(err, "")
1409 for s in expected_children:
1410 self.failUnless(s in out, (s,out))
1411 for s in unexpected_children:
1412 self.failIf(s in out, (s,out))
1414 def _check_ls_root((out,err)):
1415 self.failUnless("personal" in out)
1416 self.failUnless("s2-ro" in out)
1417 self.failUnless("s2-rw" in out)
1418 self.failUnlessEqual(err, "")
1420 # this should reference private_uri
1421 d.addCallback(run, "ls")
1422 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1424 d.addCallback(run, "list-aliases")
1425 def _check_aliases_1((out,err)):
1426 self.failUnlessEqual(err, "")
1427 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1428 d.addCallback(_check_aliases_1)
1430 # now that that's out of the way, remove root_dir.cap and work with
1432 d.addCallback(lambda res: os.unlink(root_file))
1433 d.addCallback(run, "list-aliases")
1434 def _check_aliases_2((out,err)):
1435 self.failUnlessEqual(err, "")
1436 self.failUnlessEqual(out, "")
1437 d.addCallback(_check_aliases_2)
1439 d.addCallback(run, "mkdir")
1440 def _got_dir( (out,err) ):
1441 self.failUnless(uri.from_string_dirnode(out.strip()))
1443 d.addCallback(_got_dir)
1444 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1446 d.addCallback(run, "list-aliases")
1447 def _check_aliases_3((out,err)):
1448 self.failUnlessEqual(err, "")
1449 self.failUnless("tahoe: " in out)
1450 d.addCallback(_check_aliases_3)
1452 def _check_empty_dir((out,err)):
1453 self.failUnlessEqual(out, "")
1454 self.failUnlessEqual(err, "")
1455 d.addCallback(run, "ls")
1456 d.addCallback(_check_empty_dir)
1458 def _check_missing_dir((out,err)):
1459 # TODO: check that rc==2
1460 self.failUnlessEqual(out, "")
1461 self.failUnlessEqual(err, "No such file or directory\n")
1462 d.addCallback(run, "ls", "bogus")
1463 d.addCallback(_check_missing_dir)
1468 fn = os.path.join(self.basedir, "file%d" % i)
1470 data = "data to be uploaded: file%d\n" % i
1472 open(fn,"wb").write(data)
1474 def _check_stdout_against((out,err), filenum=None, data=None):
1475 self.failUnlessEqual(err, "")
1476 if filenum is not None:
1477 self.failUnlessEqual(out, datas[filenum])
1478 if data is not None:
1479 self.failUnlessEqual(out, data)
1481 # test all both forms of put: from a file, and from stdin
1483 d.addCallback(run, "put", files[0], "tahoe-file0")
1484 def _put_out((out,err)):
1485 self.failUnless("URI:LIT:" in out, out)
1486 self.failUnless("201 Created" in err, err)
1488 return run(None, "get", uri0)
1489 d.addCallback(_put_out)
1490 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1492 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1493 # tahoe put bar tahoe:FOO
1494 d.addCallback(run, "put", files[2], "tahoe:file2")
1495 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1496 def _check_put_mutable((out,err)):
1497 self._mutable_file3_uri = out.strip()
1498 d.addCallback(_check_put_mutable)
1499 d.addCallback(run, "get", "tahoe:file3")
1500 d.addCallback(_check_stdout_against, 3)
1503 STDIN_DATA = "This is the file to upload from stdin."
1504 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1505 # tahoe put tahoe:FOO
1506 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1507 stdin="Other file from stdin.")
1509 d.addCallback(run, "ls")
1510 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1511 "tahoe-file-stdin", "from-stdin"])
1512 d.addCallback(run, "ls", "subdir")
1513 d.addCallback(_check_ls, ["tahoe-file1"])
1516 d.addCallback(run, "mkdir", "subdir2")
1517 d.addCallback(run, "ls")
1518 # TODO: extract the URI, set an alias with it
1519 d.addCallback(_check_ls, ["subdir2"])
1521 # tahoe get: (to stdin and to a file)
1522 d.addCallback(run, "get", "tahoe-file0")
1523 d.addCallback(_check_stdout_against, 0)
1524 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1525 d.addCallback(_check_stdout_against, 1)
1526 outfile0 = os.path.join(self.basedir, "outfile0")
1527 d.addCallback(run, "get", "file2", outfile0)
1528 def _check_outfile0((out,err)):
1529 data = open(outfile0,"rb").read()
1530 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1531 d.addCallback(_check_outfile0)
1532 outfile1 = os.path.join(self.basedir, "outfile0")
1533 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1534 def _check_outfile1((out,err)):
1535 data = open(outfile1,"rb").read()
1536 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1537 d.addCallback(_check_outfile1)
1539 d.addCallback(run, "rm", "tahoe-file0")
1540 d.addCallback(run, "rm", "tahoe:file2")
1541 d.addCallback(run, "ls")
1542 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1544 d.addCallback(run, "ls", "-l")
1545 def _check_ls_l((out,err)):
1546 lines = out.split("\n")
1548 if "tahoe-file-stdin" in l:
1549 self.failUnless(l.startswith("-r-- "), l)
1550 self.failUnless(" %d " % len(STDIN_DATA) in l)
1552 self.failUnless(l.startswith("-rw- "), l) # mutable
1553 d.addCallback(_check_ls_l)
1555 d.addCallback(run, "ls", "--uri")
1556 def _check_ls_uri((out,err)):
1557 lines = out.split("\n")
1560 self.failUnless(self._mutable_file3_uri in l)
1561 d.addCallback(_check_ls_uri)
1563 d.addCallback(run, "ls", "--readonly-uri")
1564 def _check_ls_rouri((out,err)):
1565 lines = out.split("\n")
1568 rw_uri = self._mutable_file3_uri
1569 u = uri.from_string_mutable_filenode(rw_uri)
1570 ro_uri = u.get_readonly().to_string()
1571 self.failUnless(ro_uri in l)
1572 d.addCallback(_check_ls_rouri)
1575 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1576 d.addCallback(run, "ls")
1577 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1579 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1580 d.addCallback(run, "ls")
1581 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1583 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1584 d.addCallback(run, "ls")
1585 d.addCallback(_check_ls, ["file3", "file3-copy"])
1586 d.addCallback(run, "get", "tahoe:file3-copy")
1587 d.addCallback(_check_stdout_against, 3)
1589 # copy from disk into tahoe
1590 d.addCallback(run, "cp", files[4], "tahoe:file4")
1591 d.addCallback(run, "ls")
1592 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1593 d.addCallback(run, "get", "tahoe:file4")
1594 d.addCallback(_check_stdout_against, 4)
1596 # copy from tahoe into disk
1597 target_filename = os.path.join(self.basedir, "file-out")
1598 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1599 def _check_cp_out((out,err)):
1600 self.failUnless(os.path.exists(target_filename))
1601 got = open(target_filename,"rb").read()
1602 self.failUnlessEqual(got, datas[4])
1603 d.addCallback(_check_cp_out)
1605 # copy from disk to disk (silly case)
1606 target2_filename = os.path.join(self.basedir, "file-out-copy")
1607 d.addCallback(run, "cp", target_filename, target2_filename)
1608 def _check_cp_out2((out,err)):
1609 self.failUnless(os.path.exists(target2_filename))
1610 got = open(target2_filename,"rb").read()
1611 self.failUnlessEqual(got, datas[4])
1612 d.addCallback(_check_cp_out2)
1614 # copy from tahoe into disk, overwriting an existing file
1615 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1616 def _check_cp_out3((out,err)):
1617 self.failUnless(os.path.exists(target_filename))
1618 got = open(target_filename,"rb").read()
1619 self.failUnlessEqual(got, datas[3])
1620 d.addCallback(_check_cp_out3)
1622 # copy from disk into tahoe, overwriting an existing immutable file
1623 d.addCallback(run, "cp", files[5], "tahoe:file4")
1624 d.addCallback(run, "ls")
1625 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1626 d.addCallback(run, "get", "tahoe:file4")
1627 d.addCallback(_check_stdout_against, 5)
1629 # copy from disk into tahoe, overwriting an existing mutable file
1630 d.addCallback(run, "cp", files[5], "tahoe:file3")
1631 d.addCallback(run, "ls")
1632 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1633 d.addCallback(run, "get", "tahoe:file3")
1634 d.addCallback(_check_stdout_against, 5)
1636 # recursive copy: setup
1637 dn = os.path.join(self.basedir, "dir1")
1639 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1640 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1641 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1642 sdn2 = os.path.join(dn, "subdir2")
1644 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1645 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1647 # from disk into tahoe
1648 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1649 d.addCallback(run, "ls")
1650 d.addCallback(_check_ls, ["dir1"])
1651 d.addCallback(run, "ls", "dir1")
1652 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1653 ["rfile4", "rfile5"])
1654 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1655 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1656 ["rfile1", "rfile2", "rfile3"])
1657 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1658 d.addCallback(_check_stdout_against, data="rfile4")
1660 # and back out again
1661 dn_copy = os.path.join(self.basedir, "dir1-copy")
1662 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1663 def _check_cp_r_out((out,err)):
1665 old = open(os.path.join(dn, name), "rb").read()
1666 newfn = os.path.join(dn_copy, name)
1667 self.failUnless(os.path.exists(newfn))
1668 new = open(newfn, "rb").read()
1669 self.failUnlessEqual(old, new)
1673 _cmp(os.path.join("subdir2", "rfile4"))
1674 _cmp(os.path.join("subdir2", "rfile5"))
1675 d.addCallback(_check_cp_r_out)
1677 # and copy it a second time, which ought to overwrite the same files
1678 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1680 # and tahoe-to-tahoe
1681 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1682 d.addCallback(run, "ls")
1683 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1684 d.addCallback(run, "ls", "dir1-copy")
1685 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1686 ["rfile4", "rfile5"])
1687 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1688 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1689 ["rfile1", "rfile2", "rfile3"])
1690 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1691 d.addCallback(_check_stdout_against, data="rfile4")
1693 # and copy it a second time, which ought to overwrite the same files
1694 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1696 # tahoe_ls doesn't currently handle the error correctly: it tries to
1697 # JSON-parse a traceback.
1698 ## def _ls_missing(res):
1699 ## argv = ["ls"] + nodeargs + ["bogus"]
1700 ## return self._run_cli(argv)
1701 ## d.addCallback(_ls_missing)
1702 ## def _check_ls_missing((out,err)):
1705 ## self.failUnlessEqual(err, "")
1706 ## d.addCallback(_check_ls_missing)
1710 def _run_cli(self, argv, stdin=""):
1712 stdout, stderr = StringIO(), StringIO()
1713 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1714 stdin=StringIO(stdin),
1715 stdout=stdout, stderr=stderr)
1717 return stdout.getvalue(), stderr.getvalue()
1718 d.addCallback(_done)
1721 def _test_checker(self, res):
1722 ut = upload.Data("too big to be literal" * 200, convergence=None)
1723 d = self._personal_node.add_file(u"big file", ut)
1725 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1726 def _check_dirnode_results(r):
1727 self.failUnless(r.is_healthy())
1728 d.addCallback(_check_dirnode_results)
1729 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1730 d.addCallback(_check_dirnode_results)
1732 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1733 def _got_chk_filenode(n):
1734 self.failUnless(isinstance(n, filenode.FileNode))
1735 d = n.check(Monitor())
1736 def _check_filenode_results(r):
1737 self.failUnless(r.is_healthy())
1738 d.addCallback(_check_filenode_results)
1739 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1740 d.addCallback(_check_filenode_results)
1742 d.addCallback(_got_chk_filenode)
1744 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1745 def _got_lit_filenode(n):
1746 self.failUnless(isinstance(n, filenode.LiteralFileNode))
1747 d = n.check(Monitor())
1748 def _check_lit_filenode_results(r):
1749 self.failUnlessEqual(r, None)
1750 d.addCallback(_check_lit_filenode_results)
1751 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1752 d.addCallback(_check_lit_filenode_results)
1754 d.addCallback(_got_lit_filenode)
1758 class MutableChecker(SystemTestMixin, unittest.TestCase, WebErrorMixin):
1760 def _run_cli(self, argv):
1761 stdout, stderr = StringIO(), StringIO()
1762 runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1763 return stdout.getvalue()
1765 def test_good(self):
1766 self.basedir = self.mktemp()
1767 d = self.set_up_nodes()
1768 CONTENTS = "a little bit of data"
1769 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1772 si = self.node.get_storage_index()
1773 d.addCallback(_created)
1774 # now make sure the webapi verifier sees no problems
1776 url = (self.webish_url +
1777 "uri/%s" % urllib.quote(self.node.get_uri()) +
1778 "?t=check&verify=true")
1779 return getPage(url, method="POST")
1780 d.addCallback(_do_check)
1781 def _got_results(out):
1782 self.failUnless("<span>Healthy!</span>" in out, out)
1783 self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1784 self.failIf("Not Healthy!" in out, out)
1785 self.failIf("Unhealthy" in out, out)
1786 self.failIf("Corrupt Shares" in out, out)
1787 d.addCallback(_got_results)
1788 d.addErrback(self.explain_web_error)
1791 def test_corrupt(self):
1792 self.basedir = self.mktemp()
1793 d = self.set_up_nodes()
1794 CONTENTS = "a little bit of data"
1795 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1798 si = self.node.get_storage_index()
1799 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1800 self.clients[1].basedir])
1801 files = out.split("\n")
1802 # corrupt one of them, using the CLI debug command
1804 shnum = os.path.basename(f)
1805 nodeid = self.clients[1].nodeid
1806 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1807 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1808 out = self._run_cli(["debug", "corrupt-share", files[0]])
1809 d.addCallback(_created)
1810 # now make sure the webapi verifier notices it
1812 url = (self.webish_url +
1813 "uri/%s" % urllib.quote(self.node.get_uri()) +
1814 "?t=check&verify=true")
1815 return getPage(url, method="POST")
1816 d.addCallback(_do_check)
1817 def _got_results(out):
1818 self.failUnless("Not Healthy!" in out, out)
1819 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1820 self.failUnless("Corrupt Shares:" in out, out)
1821 d.addCallback(_got_results)
1823 # now make sure the webapi repairer can fix it
1824 def _do_repair(res):
1825 url = (self.webish_url +
1826 "uri/%s" % urllib.quote(self.node.get_uri()) +
1827 "?t=check&verify=true&repair=true")
1828 return getPage(url, method="POST")
1829 d.addCallback(_do_repair)
1830 def _got_repair_results(out):
1831 self.failUnless("<div>Repair successful</div>" in out, out)
1832 d.addCallback(_got_repair_results)
1833 d.addCallback(_do_check)
1834 def _got_postrepair_results(out):
1835 self.failIf("Not Healthy!" in out, out)
1836 self.failUnless("Recoverable Versions: 10*seq" in out, out)
1837 d.addCallback(_got_postrepair_results)
1838 d.addErrback(self.explain_web_error)
1842 def test_delete_share(self):
1843 self.basedir = self.mktemp()
1844 d = self.set_up_nodes()
1845 CONTENTS = "a little bit of data"
1846 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1849 si = self.node.get_storage_index()
1850 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1851 self.clients[1].basedir])
1852 files = out.split("\n")
1853 # corrupt one of them, using the CLI debug command
1855 shnum = os.path.basename(f)
1856 nodeid = self.clients[1].nodeid
1857 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1858 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1860 d.addCallback(_created)
1861 # now make sure the webapi checker notices it
1863 url = (self.webish_url +
1864 "uri/%s" % urllib.quote(self.node.get_uri()) +
1865 "?t=check&verify=false")
1866 return getPage(url, method="POST")
1867 d.addCallback(_do_check)
1868 def _got_results(out):
1869 self.failUnless("Not Healthy!" in out, out)
1870 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1871 self.failIf("Corrupt Shares" in out, out)
1872 d.addCallback(_got_results)
1874 # now make sure the webapi repairer can fix it
1875 def _do_repair(res):
1876 url = (self.webish_url +
1877 "uri/%s" % urllib.quote(self.node.get_uri()) +
1878 "?t=check&verify=false&repair=true")
1879 return getPage(url, method="POST")
1880 d.addCallback(_do_repair)
1881 def _got_repair_results(out):
1882 self.failUnless("Repair successful" in out)
1883 d.addCallback(_got_repair_results)
1884 d.addCallback(_do_check)
1885 def _got_postrepair_results(out):
1886 self.failIf("Not Healthy!" in out, out)
1887 self.failUnless("Recoverable Versions: 10*seq" in out)
1888 d.addCallback(_got_postrepair_results)
1889 d.addErrback(self.explain_web_error)
1893 class DeepCheckWeb(SystemTestMixin, unittest.TestCase, WebErrorMixin):
1894 # construct a small directory tree (with one dir, one immutable file, one
1895 # mutable file, one LIT file, and a loop), and then check/examine it in
1898 def set_up_tree(self, ignored):
1900 c0 = self.clients[0]
1901 d = c0.create_empty_dirnode()
1902 def _created_root(n):
1904 self.root_uri = n.get_uri()
1905 d.addCallback(_created_root)
1906 d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1907 d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1908 def _created_mutable(n):
1910 self.mutable_uri = n.get_uri()
1911 d.addCallback(_created_mutable)
1913 large = upload.Data("Lots of data\n" * 1000, None)
1914 d.addCallback(lambda ign: self.root.add_file(u"large", large))
1915 def _created_large(n):
1917 self.large_uri = n.get_uri()
1918 d.addCallback(_created_large)
1920 small = upload.Data("Small enough for a LIT", None)
1921 d.addCallback(lambda ign: self.root.add_file(u"small", small))
1922 def _created_small(n):
1924 self.small_uri = n.get_uri()
1925 d.addCallback(_created_small)
1927 d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
1930 def check_is_healthy(self, cr, n, where, incomplete=False):
1931 self.failUnless(ICheckerResults.providedBy(cr), where)
1932 self.failUnless(cr.is_healthy(), where)
1933 self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
1935 self.failUnlessEqual(cr.get_storage_index_string(),
1936 base32.b2a(n.get_storage_index()), where)
1937 needs_rebalancing = bool( len(self.clients) < 10 )
1939 self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
1941 self.failUnlessEqual(d["count-shares-good"], 10, where)
1942 self.failUnlessEqual(d["count-shares-needed"], 3, where)
1943 self.failUnlessEqual(d["count-shares-expected"], 10, where)
1945 self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
1946 self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
1947 self.failUnlessEqual(d["list-corrupt-shares"], [], where)
1949 self.failUnlessEqual(sorted(d["servers-responding"]),
1950 sorted([c.nodeid for c in self.clients]),
1952 self.failUnless("sharemap" in d, where)
1953 all_serverids = set()
1954 for (shareid, serverids) in d["sharemap"].items():
1955 all_serverids.update(serverids)
1956 self.failUnlessEqual(sorted(all_serverids),
1957 sorted([c.nodeid for c in self.clients]),
1960 self.failUnlessEqual(d["count-wrong-shares"], 0, where)
1961 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
1962 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
1965 def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
1966 self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
1967 self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
1968 self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
1969 self.failUnless(cr.get_post_repair_results().is_healthy(), where)
1970 self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
1971 self.failIf(cr.get_repair_attempted(), where)
1973 def deep_check_is_healthy(self, cr, num_healthy, where):
1974 self.failUnless(IDeepCheckResults.providedBy(cr))
1975 self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
1978 def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
1979 self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
1980 c = cr.get_counters()
1981 self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
1983 self.failUnlessEqual(c["count-objects-healthy-post-repair"],
1985 self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
1987 def test_good(self):
1988 self.basedir = self.mktemp()
1989 d = self.set_up_nodes()
1990 d.addCallback(self.set_up_tree)
1991 d.addCallback(self.do_stats)
1992 d.addCallback(self.do_test_good)
1993 d.addCallback(self.do_test_web)
1994 d.addErrback(self.explain_web_error)
1997 def do_stats(self, ignored):
1998 d = defer.succeed(None)
1999 d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
2000 d.addCallback(self.check_stats)
2003 def check_stats(self, s):
2004 self.failUnlessEqual(s["count-directories"], 1)
2005 self.failUnlessEqual(s["count-files"], 3)
2006 self.failUnlessEqual(s["count-immutable-files"], 1)
2007 self.failUnlessEqual(s["count-literal-files"], 1)
2008 self.failUnlessEqual(s["count-mutable-files"], 1)
2009 # don't check directories: their size will vary
2010 # s["largest-directory"]
2011 # s["size-directories"]
2012 self.failUnlessEqual(s["largest-directory-children"], 4)
2013 self.failUnlessEqual(s["largest-immutable-file"], 13000)
2014 # to re-use this function for both the local
2015 # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
2016 # coerce the result into a list of tuples. dirnode.start_deep_stats()
2017 # returns a list of tuples, but JSON only knows about lists., so
2018 # t=start-deep-stats returns a list of lists.
2019 histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
2020 self.failUnlessEqual(histogram, [(11, 31, 1),
2023 self.failUnlessEqual(s["size-immutable-files"], 13000)
2024 self.failUnlessEqual(s["size-literal-files"], 22)
2026 def do_test_good(self, ignored):
2027 d = defer.succeed(None)
2028 # check the individual items
2029 d.addCallback(lambda ign: self.root.check(Monitor()))
2030 d.addCallback(self.check_is_healthy, self.root, "root")
2031 d.addCallback(lambda ign: self.mutable.check(Monitor()))
2032 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2033 d.addCallback(lambda ign: self.large.check(Monitor()))
2034 d.addCallback(self.check_is_healthy, self.large, "large")
2035 d.addCallback(lambda ign: self.small.check(Monitor()))
2036 d.addCallback(self.failUnlessEqual, None, "small")
2038 # and again with verify=True
2039 d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2040 d.addCallback(self.check_is_healthy, self.root, "root")
2041 d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2042 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2043 d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2044 d.addCallback(self.check_is_healthy, self.large, "large",
2046 d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2047 d.addCallback(self.failUnlessEqual, None, "small")
2049 # and check_and_repair(), which should be a nop
2050 d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2051 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2052 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2053 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2054 d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2055 d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2056 d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2057 d.addCallback(self.failUnlessEqual, None, "small")
2059 # check_and_repair(verify=True)
2060 d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2061 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2062 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2063 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2064 d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2065 d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2067 d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2068 d.addCallback(self.failUnlessEqual, None, "small")
2071 # now deep-check the root, with various verify= and repair= options
2072 d.addCallback(lambda ign:
2073 self.root.start_deep_check().when_done())
2074 d.addCallback(self.deep_check_is_healthy, 3, "root")
2075 d.addCallback(lambda ign:
2076 self.root.start_deep_check(verify=True).when_done())
2077 d.addCallback(self.deep_check_is_healthy, 3, "root")
2078 d.addCallback(lambda ign:
2079 self.root.start_deep_check_and_repair().when_done())
2080 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2081 d.addCallback(lambda ign:
2082 self.root.start_deep_check_and_repair(verify=True).when_done())
2083 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2085 # and finally, start a deep-check, but then cancel it.
2086 d.addCallback(lambda ign: self.root.start_deep_check())
2087 def _checking(monitor):
2089 d = monitor.when_done()
2090 # this should fire as soon as the next dirnode.list finishes.
2091 # TODO: add a counter to measure how many list() calls are made,
2092 # assert that no more than one gets to run before the cancel()
2094 def _finished_normally(res):
2095 self.fail("this was supposed to fail, not finish normally")
2097 f.trap(OperationCancelledError)
2098 d.addCallbacks(_finished_normally, _cancelled)
2100 d.addCallback(_checking)
2104 def web_json(self, n, **kwargs):
2105 kwargs["output"] = "json"
2106 d = self.web(n, "POST", **kwargs)
2107 d.addCallback(self.decode_json)
2110 def decode_json(self, (s,url)):
2112 data = simplejson.loads(s)
2114 self.fail("%s: not JSON: '%s'" % (url, s))
2117 def web(self, n, method="GET", **kwargs):
2118 # returns (data, url)
2119 url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
2120 + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
2121 d = getPage(url, method=method)
2122 d.addCallback(lambda data: (data,url))
2125 def wait_for_operation(self, ignored, ophandle):
2126 url = self.webish_url + "operations/" + ophandle
2127 url += "?t=status&output=JSON"
2131 data = simplejson.loads(res)
2133 self.fail("%s: not JSON: '%s'" % (url, res))
2134 if not data["finished"]:
2135 d = self.stall(delay=1.0)
2136 d.addCallback(self.wait_for_operation, ophandle)
2142 def get_operation_results(self, ignored, ophandle, output=None):
2143 url = self.webish_url + "operations/" + ophandle
2146 url += "&output=" + output
2149 if output and output.lower() == "json":
2151 return simplejson.loads(res)
2153 self.fail("%s: not JSON: '%s'" % (url, res))
2158 def slow_web(self, n, output=None, **kwargs):
2160 handle = base32.b2a(os.urandom(4))
2161 d = self.web(n, "POST", ophandle=handle, **kwargs)
2162 d.addCallback(self.wait_for_operation, handle)
2163 d.addCallback(self.get_operation_results, handle, output=output)
2166 def json_check_is_healthy(self, data, n, where, incomplete=False):
2168 self.failUnlessEqual(data["storage-index"],
2169 base32.b2a(n.get_storage_index()), where)
2171 self.failUnlessEqual(r["healthy"], True, where)
2172 needs_rebalancing = bool( len(self.clients) < 10 )
2174 self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2175 self.failUnlessEqual(r["count-shares-good"], 10, where)
2176 self.failUnlessEqual(r["count-shares-needed"], 3, where)
2177 self.failUnlessEqual(r["count-shares-expected"], 10, where)
2179 self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2180 self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2181 self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2183 self.failUnlessEqual(sorted(r["servers-responding"]),
2184 sorted([idlib.nodeid_b2a(c.nodeid)
2185 for c in self.clients]), where)
2186 self.failUnless("sharemap" in r, where)
2187 all_serverids = set()
2188 for (shareid, serverids_s) in r["sharemap"].items():
2189 all_serverids.update(serverids_s)
2190 self.failUnlessEqual(sorted(all_serverids),
2191 sorted([idlib.nodeid_b2a(c.nodeid)
2192 for c in self.clients]), where)
2193 self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2194 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2195 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2197 def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2198 self.failUnlessEqual(data["storage-index"],
2199 base32.b2a(n.get_storage_index()), where)
2200 self.failUnlessEqual(data["repair-attempted"], False, where)
2201 self.json_check_is_healthy(data["pre-repair-results"],
2202 n, where, incomplete)
2203 self.json_check_is_healthy(data["post-repair-results"],
2204 n, where, incomplete)
2206 def json_full_deepcheck_is_healthy(self, data, n, where):
2207 self.failUnlessEqual(data["root-storage-index"],
2208 base32.b2a(n.get_storage_index()), where)
2209 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2210 self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2211 self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2212 self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2213 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2214 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2215 self.json_check_stats(data["stats"], where)
2217 def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2218 self.failUnlessEqual(data["root-storage-index"],
2219 base32.b2a(n.get_storage_index()), where)
2220 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2222 self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2223 self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2224 self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2226 self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2227 self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2228 self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2230 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2231 self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2232 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2234 self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2235 self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2236 self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2239 def json_check_lit(self, data, n, where):
2240 self.failUnlessEqual(data["storage-index"], "", where)
2241 self.failUnlessEqual(data["results"]["healthy"], True, where)
2243 def json_check_stats(self, data, where):
2244 self.check_stats(data)
2246 def do_test_web(self, ignored):
2247 d = defer.succeed(None)
2250 d.addCallback(lambda ign:
2251 self.slow_web(self.root,
2252 t="start-deep-stats", output="json"))
2253 d.addCallback(self.json_check_stats, "deep-stats")
2256 d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2257 d.addCallback(self.json_check_is_healthy, self.root, "root")
2258 d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2259 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2260 d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2261 d.addCallback(self.json_check_is_healthy, self.large, "large")
2262 d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2263 d.addCallback(self.json_check_lit, self.small, "small")
2266 d.addCallback(lambda ign:
2267 self.web_json(self.root, t="check", verify="true"))
2268 d.addCallback(self.json_check_is_healthy, self.root, "root")
2269 d.addCallback(lambda ign:
2270 self.web_json(self.mutable, t="check", verify="true"))
2271 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2272 d.addCallback(lambda ign:
2273 self.web_json(self.large, t="check", verify="true"))
2274 d.addCallback(self.json_check_is_healthy, self.large, "large", incomplete=True)
2275 d.addCallback(lambda ign:
2276 self.web_json(self.small, t="check", verify="true"))
2277 d.addCallback(self.json_check_lit, self.small, "small")
2279 # check and repair, no verify
2280 d.addCallback(lambda ign:
2281 self.web_json(self.root, t="check", repair="true"))
2282 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2283 d.addCallback(lambda ign:
2284 self.web_json(self.mutable, t="check", repair="true"))
2285 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2286 d.addCallback(lambda ign:
2287 self.web_json(self.large, t="check", repair="true"))
2288 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large")
2289 d.addCallback(lambda ign:
2290 self.web_json(self.small, t="check", repair="true"))
2291 d.addCallback(self.json_check_lit, self.small, "small")
2293 # check+verify+repair
2294 d.addCallback(lambda ign:
2295 self.web_json(self.root, t="check", repair="true", verify="true"))
2296 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2297 d.addCallback(lambda ign:
2298 self.web_json(self.mutable, t="check", repair="true", verify="true"))
2299 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2300 d.addCallback(lambda ign:
2301 self.web_json(self.large, t="check", repair="true", verify="true"))
2302 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large", incomplete=True)
2303 d.addCallback(lambda ign:
2304 self.web_json(self.small, t="check", repair="true", verify="true"))
2305 d.addCallback(self.json_check_lit, self.small, "small")
2307 # now run a deep-check, with various verify= and repair= flags
2308 d.addCallback(lambda ign:
2309 self.slow_web(self.root, t="start-deep-check", output="json"))
2310 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2311 d.addCallback(lambda ign:
2312 self.slow_web(self.root, t="start-deep-check", verify="true",
2314 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2315 d.addCallback(lambda ign:
2316 self.slow_web(self.root, t="start-deep-check", repair="true",
2318 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2319 d.addCallback(lambda ign:
2320 self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2321 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2323 # now look at t=info
2324 d.addCallback(lambda ign: self.web(self.root, t="info"))
2325 # TODO: examine the output
2326 d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2327 d.addCallback(lambda ign: self.web(self.large, t="info"))
2328 d.addCallback(lambda ign: self.web(self.small, t="info"))