1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from allmydata import uri, storage, offloaded
10 from allmydata.immutable import download, upload, filenode
11 from allmydata.util import idlib, mathutil
12 from allmydata.util import log, base32
13 from allmydata.scripts import runner
14 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
15 ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
16 IDeepCheckAndRepairResults
17 from allmydata.mutable.common import NotMutableError
18 from allmydata.mutable import layout as mutable_layout
19 from foolscap import DeadReferenceError
20 from twisted.python.failure import Failure
21 from twisted.web.client import getPage
22 from twisted.web.error import Error
24 from allmydata.test.common import SystemTestMixin
27 This is some data to publish to the virtual drive, which needs to be large
28 enough to not fit inside a LIT uri.
31 class CountingDataUploadable(upload.Data):
33 interrupt_after = None
34 interrupt_after_d = None
36 def read(self, length):
37 self.bytes_read += length
38 if self.interrupt_after is not None:
39 if self.bytes_read > self.interrupt_after:
40 self.interrupt_after = None
41 self.interrupt_after_d.callback(self)
42 return upload.Data.read(self, length)
45 class SystemTest(SystemTestMixin, unittest.TestCase):
47 def test_connections(self):
48 self.basedir = "system/SystemTest/test_connections"
49 d = self.set_up_nodes()
50 self.extra_node = None
51 d.addCallback(lambda res: self.add_extra_node(self.numclients))
52 def _check(extra_node):
53 self.extra_node = extra_node
54 for c in self.clients:
55 all_peerids = list(c.get_all_peerids())
56 self.failUnlessEqual(len(all_peerids), self.numclients+1)
57 permuted_peers = list(c.get_permuted_peers("storage", "a"))
58 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
61 def _shutdown_extra_node(res):
63 return self.extra_node.stopService()
65 d.addBoth(_shutdown_extra_node)
67 test_connections.timeout = 300
68 # test_connections is subsumed by test_upload_and_download, and takes
69 # quite a while to run on a slow machine (because of all the TLS
70 # connections that must be established). If we ever rework the introducer
71 # code to such an extent that we're not sure if it works anymore, we can
72 # reinstate this test until it does.
75 def test_upload_and_download_random_key(self):
76 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
77 return self._test_upload_and_download(convergence=None)
78 test_upload_and_download_random_key.timeout = 4800
80 def test_upload_and_download_convergent(self):
81 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
82 return self._test_upload_and_download(convergence="some convergence string")
83 test_upload_and_download_convergent.timeout = 4800
85 def _test_upload_and_download(self, convergence):
86 # we use 4000 bytes of data, which will result in about 400k written
87 # to disk among all our simulated nodes
88 DATA = "Some data to upload\n" * 200
89 d = self.set_up_nodes()
90 def _check_connections(res):
91 for c in self.clients:
92 all_peerids = list(c.get_all_peerids())
93 self.failUnlessEqual(len(all_peerids), self.numclients)
94 permuted_peers = list(c.get_permuted_peers("storage", "a"))
95 self.failUnlessEqual(len(permuted_peers), self.numclients)
96 d.addCallback(_check_connections)
100 u = self.clients[0].getServiceNamed("uploader")
102 # we crank the max segsize down to 1024b for the duration of this
103 # test, so we can exercise multiple segments. It is important
104 # that this is not a multiple of the segment size, so that the
105 # tail segment is not the same length as the others. This actualy
106 # gets rounded up to 1025 to be a multiple of the number of
107 # required shares (since we use 25 out of 100 FEC).
108 up = upload.Data(DATA, convergence=convergence)
109 up.max_segment_size = 1024
112 d.addCallback(_do_upload)
113 def _upload_done(results):
115 log.msg("upload finished: uri is %s" % (uri,))
117 dl = self.clients[1].getServiceNamed("downloader")
119 d.addCallback(_upload_done)
121 def _upload_again(res):
122 # Upload again. If using convergent encryption then this ought to be
123 # short-circuited, however with the way we currently generate URIs
124 # (i.e. because they include the roothash), we have to do all of the
125 # encoding work, and only get to save on the upload part.
126 log.msg("UPLOADING AGAIN")
127 up = upload.Data(DATA, convergence=convergence)
128 up.max_segment_size = 1024
129 d1 = self.uploader.upload(up)
130 d.addCallback(_upload_again)
132 def _download_to_data(res):
133 log.msg("DOWNLOADING")
134 return self.downloader.download_to_data(self.uri)
135 d.addCallback(_download_to_data)
136 def _download_to_data_done(data):
137 log.msg("download finished")
138 self.failUnlessEqual(data, DATA)
139 d.addCallback(_download_to_data_done)
141 target_filename = os.path.join(self.basedir, "download.target")
142 def _download_to_filename(res):
143 return self.downloader.download_to_filename(self.uri,
145 d.addCallback(_download_to_filename)
146 def _download_to_filename_done(res):
147 newdata = open(target_filename, "rb").read()
148 self.failUnlessEqual(newdata, DATA)
149 d.addCallback(_download_to_filename_done)
151 target_filename2 = os.path.join(self.basedir, "download.target2")
152 def _download_to_filehandle(res):
153 fh = open(target_filename2, "wb")
154 return self.downloader.download_to_filehandle(self.uri, fh)
155 d.addCallback(_download_to_filehandle)
156 def _download_to_filehandle_done(fh):
158 newdata = open(target_filename2, "rb").read()
159 self.failUnlessEqual(newdata, DATA)
160 d.addCallback(_download_to_filehandle_done)
162 def _download_nonexistent_uri(res):
163 baduri = self.mangle_uri(self.uri)
164 log.msg("about to download non-existent URI", level=log.UNUSUAL,
165 facility="tahoe.tests")
166 d1 = self.downloader.download_to_data(baduri)
167 def _baduri_should_fail(res):
168 log.msg("finished downloading non-existend URI",
169 level=log.UNUSUAL, facility="tahoe.tests")
170 self.failUnless(isinstance(res, Failure))
171 self.failUnless(res.check(download.NotEnoughSharesError),
172 "expected NotEnoughSharesError, got %s" % res)
173 # TODO: files that have zero peers should get a special kind
174 # of NotEnoughSharesError, which can be used to suggest that
175 # the URI might be wrong or that they've never uploaded the
176 # file in the first place.
177 d1.addBoth(_baduri_should_fail)
179 d.addCallback(_download_nonexistent_uri)
181 # add a new node, which doesn't accept shares, and only uses the
183 d.addCallback(lambda res: self.add_extra_node(self.numclients,
185 add_to_sparent=True))
186 def _added(extra_node):
187 self.extra_node = extra_node
188 extra_node.getServiceNamed("storage").sizelimit = 0
189 d.addCallback(_added)
191 HELPER_DATA = "Data that needs help to upload" * 1000
192 def _upload_with_helper(res):
193 u = upload.Data(HELPER_DATA, convergence=convergence)
194 d = self.extra_node.upload(u)
195 def _uploaded(results):
197 return self.downloader.download_to_data(uri)
198 d.addCallback(_uploaded)
200 self.failUnlessEqual(newdata, HELPER_DATA)
201 d.addCallback(_check)
203 d.addCallback(_upload_with_helper)
205 def _upload_duplicate_with_helper(res):
206 u = upload.Data(HELPER_DATA, convergence=convergence)
207 u.debug_stash_RemoteEncryptedUploadable = True
208 d = self.extra_node.upload(u)
209 def _uploaded(results):
211 return self.downloader.download_to_data(uri)
212 d.addCallback(_uploaded)
214 self.failUnlessEqual(newdata, HELPER_DATA)
215 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
216 "uploadable started uploading, should have been avoided")
217 d.addCallback(_check)
219 if convergence is not None:
220 d.addCallback(_upload_duplicate_with_helper)
222 def _upload_resumable(res):
223 DATA = "Data that needs help to upload and gets interrupted" * 1000
224 u1 = CountingDataUploadable(DATA, convergence=convergence)
225 u2 = CountingDataUploadable(DATA, convergence=convergence)
227 # we interrupt the connection after about 5kB by shutting down
228 # the helper, then restartingit.
229 u1.interrupt_after = 5000
230 u1.interrupt_after_d = defer.Deferred()
231 u1.interrupt_after_d.addCallback(lambda res:
232 self.bounce_client(0))
234 # sneak into the helper and reduce its chunk size, so that our
235 # debug_interrupt will sever the connection on about the fifth
236 # chunk fetched. This makes sure that we've started to write the
237 # new shares before we abandon them, which exercises the
238 # abort/delete-partial-share code. TODO: find a cleaner way to do
239 # this. I know that this will affect later uses of the helper in
240 # this same test run, but I'm not currently worried about it.
241 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
243 d = self.extra_node.upload(u1)
245 def _should_not_finish(res):
246 self.fail("interrupted upload should have failed, not finished"
247 " with result %s" % (res,))
249 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
251 # make sure we actually interrupted it before finishing the
253 self.failUnless(u1.bytes_read < len(DATA),
254 "read %d out of %d total" % (u1.bytes_read,
257 log.msg("waiting for reconnect", level=log.NOISY,
258 facility="tahoe.test.test_system")
259 # now, we need to give the nodes a chance to notice that this
260 # connection has gone away. When this happens, the storage
261 # servers will be told to abort their uploads, removing the
262 # partial shares. Unfortunately this involves TCP messages
263 # going through the loopback interface, and we can't easily
264 # predict how long that will take. If it were all local, we
265 # could use fireEventually() to stall. Since we don't have
266 # the right introduction hooks, the best we can do is use a
267 # fixed delay. TODO: this is fragile.
268 u1.interrupt_after_d.addCallback(self.stall, 2.0)
269 return u1.interrupt_after_d
270 d.addCallbacks(_should_not_finish, _interrupted)
272 def _disconnected(res):
273 # check to make sure the storage servers aren't still hanging
274 # on to the partial share: their incoming/ directories should
276 log.msg("disconnected", level=log.NOISY,
277 facility="tahoe.test.test_system")
278 for i in range(self.numclients):
279 incdir = os.path.join(self.getdir("client%d" % i),
280 "storage", "shares", "incoming")
281 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
282 d.addCallback(_disconnected)
284 # then we need to give the reconnector a chance to
285 # reestablish the connection to the helper.
286 d.addCallback(lambda res:
287 log.msg("wait_for_connections", level=log.NOISY,
288 facility="tahoe.test.test_system"))
289 d.addCallback(lambda res: self.wait_for_connections())
292 d.addCallback(lambda res:
293 log.msg("uploading again", level=log.NOISY,
294 facility="tahoe.test.test_system"))
295 d.addCallback(lambda res: self.extra_node.upload(u2))
297 def _uploaded(results):
299 log.msg("Second upload complete", level=log.NOISY,
300 facility="tahoe.test.test_system")
302 # this is really bytes received rather than sent, but it's
303 # convenient and basically measures the same thing
304 bytes_sent = results.ciphertext_fetched
306 # We currently don't support resumption of upload if the data is
307 # encrypted with a random key. (Because that would require us
308 # to store the key locally and re-use it on the next upload of
309 # this file, which isn't a bad thing to do, but we currently
311 if convergence is not None:
312 # Make sure we did not have to read the whole file the
313 # second time around .
314 self.failUnless(bytes_sent < len(DATA),
315 "resumption didn't save us any work:"
316 " read %d bytes out of %d total" %
317 (bytes_sent, len(DATA)))
319 # Make sure we did have to read the whole file the second
320 # time around -- because the one that we partially uploaded
321 # earlier was encrypted with a different random key.
322 self.failIf(bytes_sent < len(DATA),
323 "resumption saved us some work even though we were using random keys:"
324 " read %d bytes out of %d total" %
325 (bytes_sent, len(DATA)))
326 return self.downloader.download_to_data(uri)
327 d.addCallback(_uploaded)
330 self.failUnlessEqual(newdata, DATA)
331 # If using convergent encryption, then also check that the
332 # helper has removed the temp file from its directories.
333 if convergence is not None:
334 basedir = os.path.join(self.getdir("client0"), "helper")
335 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
336 self.failUnlessEqual(files, [])
337 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
338 self.failUnlessEqual(files, [])
339 d.addCallback(_check)
341 d.addCallback(_upload_resumable)
345 def _find_shares(self, basedir):
347 for (dirpath, dirnames, filenames) in os.walk(basedir):
348 if "storage" not in dirpath:
352 pieces = dirpath.split(os.sep)
353 if pieces[-4] == "storage" and pieces[-3] == "shares":
354 # we're sitting in .../storage/shares/$START/$SINDEX , and there
355 # are sharefiles here
356 assert pieces[-5].startswith("client")
357 client_num = int(pieces[-5][-1])
358 storage_index_s = pieces[-1]
359 storage_index = storage.si_a2b(storage_index_s)
360 for sharename in filenames:
361 shnum = int(sharename)
362 filename = os.path.join(dirpath, sharename)
363 data = (client_num, storage_index, filename, shnum)
366 self.fail("unable to find any share files in %s" % basedir)
369 def _corrupt_mutable_share(self, filename, which):
370 msf = storage.MutableShareFile(filename)
371 datav = msf.readv([ (0, 1000000) ])
372 final_share = datav[0]
373 assert len(final_share) < 1000000 # ought to be truncated
374 pieces = mutable_layout.unpack_share(final_share)
375 (seqnum, root_hash, IV, k, N, segsize, datalen,
376 verification_key, signature, share_hash_chain, block_hash_tree,
377 share_data, enc_privkey) = pieces
379 if which == "seqnum":
382 root_hash = self.flip_bit(root_hash)
384 IV = self.flip_bit(IV)
385 elif which == "segsize":
386 segsize = segsize + 15
387 elif which == "pubkey":
388 verification_key = self.flip_bit(verification_key)
389 elif which == "signature":
390 signature = self.flip_bit(signature)
391 elif which == "share_hash_chain":
392 nodenum = share_hash_chain.keys()[0]
393 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
394 elif which == "block_hash_tree":
395 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
396 elif which == "share_data":
397 share_data = self.flip_bit(share_data)
398 elif which == "encprivkey":
399 enc_privkey = self.flip_bit(enc_privkey)
401 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
403 final_share = mutable_layout.pack_share(prefix,
410 msf.writev( [(0, final_share)], None)
413 def test_mutable(self):
414 self.basedir = "system/SystemTest/test_mutable"
415 DATA = "initial contents go here." # 25 bytes % 3 != 0
416 NEWDATA = "new contents yay"
417 NEWERDATA = "this is getting old"
419 d = self.set_up_nodes(use_key_generator=True)
421 def _create_mutable(res):
423 log.msg("starting create_mutable_file")
424 d1 = c.create_mutable_file(DATA)
426 log.msg("DONE: %s" % (res,))
427 self._mutable_node_1 = res
429 d1.addCallback(_done)
431 d.addCallback(_create_mutable)
433 def _test_debug(res):
434 # find a share. It is important to run this while there is only
435 # one slot in the grid.
436 shares = self._find_shares(self.basedir)
437 (client_num, storage_index, filename, shnum) = shares[0]
438 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
440 log.msg(" for clients[%d]" % client_num)
442 out,err = StringIO(), StringIO()
443 rc = runner.runner(["debug", "dump-share", "--offsets",
445 stdout=out, stderr=err)
446 output = out.getvalue()
447 self.failUnlessEqual(rc, 0)
449 self.failUnless("Mutable slot found:\n" in output)
450 self.failUnless("share_type: SDMF\n" in output)
451 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
452 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
453 self.failUnless(" num_extra_leases: 0\n" in output)
454 # the pubkey size can vary by a byte, so the container might
455 # be a bit larger on some runs.
456 m = re.search(r'^ container_size: (\d+)$', output, re.M)
458 container_size = int(m.group(1))
459 self.failUnless(2037 <= container_size <= 2049, container_size)
460 m = re.search(r'^ data_length: (\d+)$', output, re.M)
462 data_length = int(m.group(1))
463 self.failUnless(2037 <= data_length <= 2049, data_length)
464 self.failUnless(" secrets are for nodeid: %s\n" % peerid
466 self.failUnless(" SDMF contents:\n" in output)
467 self.failUnless(" seqnum: 1\n" in output)
468 self.failUnless(" required_shares: 3\n" in output)
469 self.failUnless(" total_shares: 10\n" in output)
470 self.failUnless(" segsize: 27\n" in output, (output, filename))
471 self.failUnless(" datalen: 25\n" in output)
472 # the exact share_hash_chain nodes depends upon the sharenum,
473 # and is more of a hassle to compute than I want to deal with
475 self.failUnless(" share_hash_chain: " in output)
476 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
477 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
478 base32.b2a(storage_index))
479 self.failUnless(expected in output)
480 except unittest.FailTest:
482 print "dump-share output was:"
485 d.addCallback(_test_debug)
489 # first, let's see if we can use the existing node to retrieve the
490 # contents. This allows it to use the cached pubkey and maybe the
491 # latest-known sharemap.
493 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
494 def _check_download_1(res):
495 self.failUnlessEqual(res, DATA)
496 # now we see if we can retrieve the data from a new node,
497 # constructed using the URI of the original one. We do this test
498 # on the same client that uploaded the data.
499 uri = self._mutable_node_1.get_uri()
500 log.msg("starting retrieve1")
501 newnode = self.clients[0].create_node_from_uri(uri)
502 newnode_2 = self.clients[0].create_node_from_uri(uri)
503 self.failUnlessIdentical(newnode, newnode_2)
504 return newnode.download_best_version()
505 d.addCallback(_check_download_1)
507 def _check_download_2(res):
508 self.failUnlessEqual(res, DATA)
509 # same thing, but with a different client
510 uri = self._mutable_node_1.get_uri()
511 newnode = self.clients[1].create_node_from_uri(uri)
512 log.msg("starting retrieve2")
513 d1 = newnode.download_best_version()
514 d1.addCallback(lambda res: (res, newnode))
516 d.addCallback(_check_download_2)
518 def _check_download_3((res, newnode)):
519 self.failUnlessEqual(res, DATA)
521 log.msg("starting replace1")
522 d1 = newnode.overwrite(NEWDATA)
523 d1.addCallback(lambda res: newnode.download_best_version())
525 d.addCallback(_check_download_3)
527 def _check_download_4(res):
528 self.failUnlessEqual(res, NEWDATA)
529 # now create an even newer node and replace the data on it. This
530 # new node has never been used for download before.
531 uri = self._mutable_node_1.get_uri()
532 newnode1 = self.clients[2].create_node_from_uri(uri)
533 newnode2 = self.clients[3].create_node_from_uri(uri)
534 self._newnode3 = self.clients[3].create_node_from_uri(uri)
535 log.msg("starting replace2")
536 d1 = newnode1.overwrite(NEWERDATA)
537 d1.addCallback(lambda res: newnode2.download_best_version())
539 d.addCallback(_check_download_4)
541 def _check_download_5(res):
542 log.msg("finished replace2")
543 self.failUnlessEqual(res, NEWERDATA)
544 d.addCallback(_check_download_5)
546 def _corrupt_shares(res):
547 # run around and flip bits in all but k of the shares, to test
549 shares = self._find_shares(self.basedir)
550 ## sort by share number
551 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
552 where = dict([ (shnum, filename)
553 for (client_num, storage_index, filename, shnum)
555 assert len(where) == 10 # this test is designed for 3-of-10
556 for shnum, filename in where.items():
557 # shares 7,8,9 are left alone. read will check
558 # (share_hash_chain, block_hash_tree, share_data). New
559 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
560 # segsize, signature).
562 # read: this will trigger "pubkey doesn't match
564 self._corrupt_mutable_share(filename, "pubkey")
565 self._corrupt_mutable_share(filename, "encprivkey")
567 # triggers "signature is invalid"
568 self._corrupt_mutable_share(filename, "seqnum")
570 # triggers "signature is invalid"
571 self._corrupt_mutable_share(filename, "R")
573 # triggers "signature is invalid"
574 self._corrupt_mutable_share(filename, "segsize")
576 self._corrupt_mutable_share(filename, "share_hash_chain")
578 self._corrupt_mutable_share(filename, "block_hash_tree")
580 self._corrupt_mutable_share(filename, "share_data")
581 # other things to correct: IV, signature
582 # 7,8,9 are left alone
584 # note that initial_query_count=5 means that we'll hit the
585 # first 5 servers in effectively random order (based upon
586 # response time), so we won't necessarily ever get a "pubkey
587 # doesn't match fingerprint" error (if we hit shnum>=1 before
588 # shnum=0, we pull the pubkey from there). To get repeatable
589 # specific failures, we need to set initial_query_count=1,
590 # but of course that will change the sequencing behavior of
591 # the retrieval process. TODO: find a reasonable way to make
592 # this a parameter, probably when we expand this test to test
593 # for one failure mode at a time.
595 # when we retrieve this, we should get three signature
596 # failures (where we've mangled seqnum, R, and segsize). The
598 d.addCallback(_corrupt_shares)
600 d.addCallback(lambda res: self._newnode3.download_best_version())
601 d.addCallback(_check_download_5)
603 def _check_empty_file(res):
604 # make sure we can create empty files, this usually screws up the
606 d1 = self.clients[2].create_mutable_file("")
607 d1.addCallback(lambda newnode: newnode.download_best_version())
608 d1.addCallback(lambda res: self.failUnlessEqual("", res))
610 d.addCallback(_check_empty_file)
612 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
613 def _created_dirnode(dnode):
614 log.msg("_created_dirnode(%s)" % (dnode,))
616 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
617 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
618 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
619 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
620 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
621 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
622 d1.addCallback(lambda res: dnode.build_manifest())
623 d1.addCallback(lambda manifest:
624 self.failUnlessEqual(len(manifest), 1))
626 d.addCallback(_created_dirnode)
628 def wait_for_c3_kg_conn():
629 return self.clients[3]._key_generator is not None
630 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
632 def check_kg_poolsize(junk, size_delta):
633 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
634 self.key_generator_svc.key_generator.pool_size + size_delta)
636 d.addCallback(check_kg_poolsize, 0)
637 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
638 d.addCallback(check_kg_poolsize, -1)
639 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
640 d.addCallback(check_kg_poolsize, -2)
641 # use_helper induces use of clients[3], which is the using-key_gen client
642 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
643 d.addCallback(check_kg_poolsize, -3)
646 # The default 120 second timeout went off when running it under valgrind
647 # on my old Windows laptop, so I'm bumping up the timeout.
648 test_mutable.timeout = 240
650 def flip_bit(self, good):
651 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
653 def mangle_uri(self, gooduri):
654 # change the key, which changes the storage index, which means we'll
655 # be asking about the wrong file, so nobody will have any shares
656 u = IFileURI(gooduri)
657 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
658 uri_extension_hash=u.uri_extension_hash,
659 needed_shares=u.needed_shares,
660 total_shares=u.total_shares,
662 return u2.to_string()
664 # TODO: add a test which mangles the uri_extension_hash instead, and
665 # should fail due to not being able to get a valid uri_extension block.
666 # Also a test which sneakily mangles the uri_extension block to change
667 # some of the validation data, so it will fail in the post-download phase
668 # when the file's crypttext integrity check fails. Do the same thing for
669 # the key, which should cause the download to fail the post-download
670 # plaintext_hash check.
672 def test_vdrive(self):
673 self.basedir = "system/SystemTest/test_vdrive"
674 self.data = LARGE_DATA
675 d = self.set_up_nodes(use_stats_gatherer=True)
676 d.addCallback(self._test_introweb)
677 d.addCallback(self.log, "starting publish")
678 d.addCallback(self._do_publish1)
679 d.addCallback(self._test_runner)
680 d.addCallback(self._do_publish2)
681 # at this point, we have the following filesystem (where "R" denotes
682 # self._root_directory_uri):
685 # R/subdir1/mydata567
687 # R/subdir1/subdir2/mydata992
689 d.addCallback(lambda res: self.bounce_client(0))
690 d.addCallback(self.log, "bounced client0")
692 d.addCallback(self._check_publish1)
693 d.addCallback(self.log, "did _check_publish1")
694 d.addCallback(self._check_publish2)
695 d.addCallback(self.log, "did _check_publish2")
696 d.addCallback(self._do_publish_private)
697 d.addCallback(self.log, "did _do_publish_private")
698 # now we also have (where "P" denotes a new dir):
699 # P/personal/sekrit data
700 # P/s2-rw -> /subdir1/subdir2/
701 # P/s2-ro -> /subdir1/subdir2/ (read-only)
702 d.addCallback(self._check_publish_private)
703 d.addCallback(self.log, "did _check_publish_private")
704 d.addCallback(self._test_web)
705 d.addCallback(self._test_control)
706 d.addCallback(self._test_cli)
707 # P now has four top-level children:
708 # P/personal/sekrit data
711 # P/test_put/ (empty)
712 d.addCallback(self._test_checker)
713 d.addCallback(self._grab_stats)
715 test_vdrive.timeout = 1100
717 def _test_introweb(self, res):
718 d = getPage(self.introweb_url, method="GET", followRedirect=True)
721 self.failUnless("allmydata: %s" % str(allmydata.__version__)
723 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
724 self.failUnless("Subscription Summary: storage: 5" in res)
725 except unittest.FailTest:
727 print "GET %s output was:" % self.introweb_url
730 d.addCallback(_check)
731 d.addCallback(lambda res:
732 getPage(self.introweb_url + "?t=json",
733 method="GET", followRedirect=True))
734 def _check_json(res):
735 data = simplejson.loads(res)
737 self.failUnlessEqual(data["subscription_summary"],
739 self.failUnlessEqual(data["announcement_summary"],
740 {"storage": 5, "stub_client": 5})
741 except unittest.FailTest:
743 print "GET %s?t=json output was:" % self.introweb_url
746 d.addCallback(_check_json)
749 def _do_publish1(self, res):
750 ut = upload.Data(self.data, convergence=None)
752 d = c0.create_empty_dirnode()
753 def _made_root(new_dirnode):
754 self._root_directory_uri = new_dirnode.get_uri()
755 return c0.create_node_from_uri(self._root_directory_uri)
756 d.addCallback(_made_root)
757 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
758 def _made_subdir1(subdir1_node):
759 self._subdir1_node = subdir1_node
760 d1 = subdir1_node.add_file(u"mydata567", ut)
761 d1.addCallback(self.log, "publish finished")
762 def _stash_uri(filenode):
763 self.uri = filenode.get_uri()
764 d1.addCallback(_stash_uri)
766 d.addCallback(_made_subdir1)
769 def _do_publish2(self, res):
770 ut = upload.Data(self.data, convergence=None)
771 d = self._subdir1_node.create_empty_directory(u"subdir2")
772 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
775 def log(self, res, *args, **kwargs):
776 # print "MSG: %s RES: %s" % (msg, args)
777 log.msg(*args, **kwargs)
780 def _do_publish_private(self, res):
781 self.smalldata = "sssh, very secret stuff"
782 ut = upload.Data(self.smalldata, convergence=None)
783 d = self.clients[0].create_empty_dirnode()
784 d.addCallback(self.log, "GOT private directory")
785 def _got_new_dir(privnode):
786 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
787 d1 = privnode.create_empty_directory(u"personal")
788 d1.addCallback(self.log, "made P/personal")
789 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
790 d1.addCallback(self.log, "made P/personal/sekrit data")
791 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
793 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
794 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
796 d1.addCallback(_got_s2)
797 d1.addCallback(lambda res: privnode)
799 d.addCallback(_got_new_dir)
802 def _check_publish1(self, res):
803 # this one uses the iterative API
805 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
806 d.addCallback(self.log, "check_publish1 got /")
807 d.addCallback(lambda root: root.get(u"subdir1"))
808 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
809 d.addCallback(lambda filenode: filenode.download_to_data())
810 d.addCallback(self.log, "get finished")
812 self.failUnlessEqual(data, self.data)
813 d.addCallback(_get_done)
816 def _check_publish2(self, res):
817 # this one uses the path-based API
818 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
819 d = rootnode.get_child_at_path(u"subdir1")
820 d.addCallback(lambda dirnode:
821 self.failUnless(IDirectoryNode.providedBy(dirnode)))
822 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
823 d.addCallback(lambda filenode: filenode.download_to_data())
824 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
826 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
827 def _got_filenode(filenode):
828 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
829 assert fnode == filenode
830 d.addCallback(_got_filenode)
833 def _check_publish_private(self, resnode):
834 # this one uses the path-based API
835 self._private_node = resnode
837 d = self._private_node.get_child_at_path(u"personal")
838 def _got_personal(personal):
839 self._personal_node = personal
841 d.addCallback(_got_personal)
843 d.addCallback(lambda dirnode:
844 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
846 return self._private_node.get_child_at_path(path)
848 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
849 d.addCallback(lambda filenode: filenode.download_to_data())
850 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
851 d.addCallback(lambda res: get_path(u"s2-rw"))
852 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
853 d.addCallback(lambda res: get_path(u"s2-ro"))
854 def _got_s2ro(dirnode):
855 self.failUnless(dirnode.is_mutable(), dirnode)
856 self.failUnless(dirnode.is_readonly(), dirnode)
857 d1 = defer.succeed(None)
858 d1.addCallback(lambda res: dirnode.list())
859 d1.addCallback(self.log, "dirnode.list")
861 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
863 d1.addCallback(self.log, "doing add_file(ro)")
864 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
865 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
867 d1.addCallback(self.log, "doing get(ro)")
868 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
869 d1.addCallback(lambda filenode:
870 self.failUnless(IFileNode.providedBy(filenode)))
872 d1.addCallback(self.log, "doing delete(ro)")
873 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
875 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
877 d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
879 personal = self._personal_node
880 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
882 d1.addCallback(self.log, "doing move_child_to(ro)2")
883 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
885 d1.addCallback(self.log, "finished with _got_s2ro")
887 d.addCallback(_got_s2ro)
888 def _got_home(dummy):
889 home = self._private_node
890 personal = self._personal_node
891 d1 = defer.succeed(None)
892 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
893 d1.addCallback(lambda res:
894 personal.move_child_to(u"sekrit data",home,u"sekrit"))
896 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
897 d1.addCallback(lambda res:
898 home.move_child_to(u"sekrit", home, u"sekrit data"))
900 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
901 d1.addCallback(lambda res:
902 home.move_child_to(u"sekrit data", personal))
904 d1.addCallback(lambda res: home.build_manifest())
905 d1.addCallback(self.log, "manifest")
908 # P/personal/sekrit data
909 # P/s2-rw (same as P/s2-ro)
910 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
911 d1.addCallback(lambda manifest:
912 self.failUnlessEqual(len(manifest), 4))
913 d1.addCallback(lambda res: home.deep_stats())
914 def _check_stats(stats):
915 expected = {"count-immutable-files": 1,
916 "count-mutable-files": 0,
917 "count-literal-files": 1,
919 "count-directories": 3,
920 "size-immutable-files": 112,
921 "size-literal-files": 23,
922 #"size-directories": 616, # varies
923 #"largest-directory": 616,
924 "largest-directory-children": 3,
925 "largest-immutable-file": 112,
927 for k,v in expected.iteritems():
928 self.failUnlessEqual(stats[k], v,
929 "stats[%s] was %s, not %s" %
931 self.failUnless(stats["size-directories"] > 1300,
932 stats["size-directories"])
933 self.failUnless(stats["largest-directory"] > 800,
934 stats["largest-directory"])
935 self.failUnlessEqual(stats["size-files-histogram"],
936 [ (11, 31, 1), (101, 316, 1) ])
937 d1.addCallback(_check_stats)
939 d.addCallback(_got_home)
942 def shouldFail(self, res, expected_failure, which, substring=None):
943 if isinstance(res, Failure):
944 res.trap(expected_failure)
946 self.failUnless(substring in str(res),
947 "substring '%s' not in '%s'"
948 % (substring, str(res)))
950 self.fail("%s was supposed to raise %s, not get '%s'" %
951 (which, expected_failure, res))
953 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
954 assert substring is None or isinstance(substring, str)
955 d = defer.maybeDeferred(callable, *args, **kwargs)
957 if isinstance(res, Failure):
958 res.trap(expected_failure)
960 self.failUnless(substring in str(res),
961 "substring '%s' not in '%s'"
962 % (substring, str(res)))
964 self.fail("%s was supposed to raise %s, not get '%s'" %
965 (which, expected_failure, res))
969 def PUT(self, urlpath, data):
970 url = self.webish_url + urlpath
971 return getPage(url, method="PUT", postdata=data)
973 def GET(self, urlpath, followRedirect=False):
974 url = self.webish_url + urlpath
975 return getPage(url, method="GET", followRedirect=followRedirect)
977 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
979 url = self.helper_webish_url + urlpath
981 url = self.webish_url + urlpath
982 sepbase = "boogabooga"
986 form.append('Content-Disposition: form-data; name="_charset"')
990 for name, value in fields.iteritems():
991 if isinstance(value, tuple):
992 filename, value = value
993 form.append('Content-Disposition: form-data; name="%s"; '
994 'filename="%s"' % (name, filename.encode("utf-8")))
996 form.append('Content-Disposition: form-data; name="%s"' % name)
998 form.append(str(value))
1001 body = "\r\n".join(form) + "\r\n"
1002 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1004 return getPage(url, method="POST", postdata=body,
1005 headers=headers, followRedirect=followRedirect)
1007 def _test_web(self, res):
1008 base = self.webish_url
1009 public = "uri/" + self._root_directory_uri
1011 def _got_welcome(page):
1012 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1013 self.failUnless(expected in page,
1014 "I didn't see the right 'connected storage servers'"
1015 " message in: %s" % page
1017 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1018 self.failUnless(expected in page,
1019 "I didn't see the right 'My nodeid' message "
1021 self.failUnless("Helper: 0 active uploads" in page)
1022 d.addCallback(_got_welcome)
1023 d.addCallback(self.log, "done with _got_welcome")
1025 # get the welcome page from the node that uses the helper too
1026 d.addCallback(lambda res: getPage(self.helper_webish_url))
1027 def _got_welcome_helper(page):
1028 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1030 self.failUnless("Not running helper" in page)
1031 d.addCallback(_got_welcome_helper)
1033 d.addCallback(lambda res: getPage(base + public))
1034 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1035 def _got_subdir1(page):
1036 # there ought to be an href for our file
1037 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1038 self.failUnless(">mydata567</a>" in page)
1039 d.addCallback(_got_subdir1)
1040 d.addCallback(self.log, "done with _got_subdir1")
1041 d.addCallback(lambda res:
1042 getPage(base + public + "/subdir1/mydata567"))
1043 def _got_data(page):
1044 self.failUnlessEqual(page, self.data)
1045 d.addCallback(_got_data)
1047 # download from a URI embedded in a URL
1048 d.addCallback(self.log, "_get_from_uri")
1049 def _get_from_uri(res):
1050 return getPage(base + "uri/%s?filename=%s"
1051 % (self.uri, "mydata567"))
1052 d.addCallback(_get_from_uri)
1053 def _got_from_uri(page):
1054 self.failUnlessEqual(page, self.data)
1055 d.addCallback(_got_from_uri)
1057 # download from a URI embedded in a URL, second form
1058 d.addCallback(self.log, "_get_from_uri2")
1059 def _get_from_uri2(res):
1060 return getPage(base + "uri?uri=%s" % (self.uri,))
1061 d.addCallback(_get_from_uri2)
1062 d.addCallback(_got_from_uri)
1064 # download from a bogus URI, make sure we get a reasonable error
1065 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1066 def _get_from_bogus_uri(res):
1067 d1 = getPage(base + "uri/%s?filename=%s"
1068 % (self.mangle_uri(self.uri), "mydata567"))
1069 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1072 d.addCallback(_get_from_bogus_uri)
1073 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1075 # upload a file with PUT
1076 d.addCallback(self.log, "about to try PUT")
1077 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1078 "new.txt contents"))
1079 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1080 d.addCallback(self.failUnlessEqual, "new.txt contents")
1081 # and again with something large enough to use multiple segments,
1082 # and hopefully trigger pauseProducing too
1083 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1084 "big" * 500000)) # 1.5MB
1085 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1086 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1088 # can we replace files in place?
1089 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1091 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1092 d.addCallback(self.failUnlessEqual, "NEWER contents")
1094 # test unlinked POST
1095 d.addCallback(lambda res: self.POST("uri", t="upload",
1096 file=("new.txt", "data" * 10000)))
1097 # and again using the helper, which exercises different upload-status
1099 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1100 file=("foo.txt", "data2" * 10000)))
1102 # check that the status page exists
1103 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1104 def _got_status(res):
1105 # find an interesting upload and download to look at. LIT files
1106 # are not interesting.
1107 for ds in self.clients[0].list_all_download_statuses():
1108 if ds.get_size() > 200:
1109 self._down_status = ds.get_counter()
1110 for us in self.clients[0].list_all_upload_statuses():
1111 if us.get_size() > 200:
1112 self._up_status = us.get_counter()
1113 rs = self.clients[0].list_all_retrieve_statuses()[0]
1114 self._retrieve_status = rs.get_counter()
1115 ps = self.clients[0].list_all_publish_statuses()[0]
1116 self._publish_status = ps.get_counter()
1117 us = self.clients[0].list_all_mapupdate_statuses()[0]
1118 self._update_status = us.get_counter()
1120 # and that there are some upload- and download- status pages
1121 return self.GET("status/up-%d" % self._up_status)
1122 d.addCallback(_got_status)
1124 return self.GET("status/down-%d" % self._down_status)
1125 d.addCallback(_got_up)
1127 return self.GET("status/mapupdate-%d" % self._update_status)
1128 d.addCallback(_got_down)
1129 def _got_update(res):
1130 return self.GET("status/publish-%d" % self._publish_status)
1131 d.addCallback(_got_update)
1132 def _got_publish(res):
1133 return self.GET("status/retrieve-%d" % self._retrieve_status)
1134 d.addCallback(_got_publish)
1136 # check that the helper status page exists
1137 d.addCallback(lambda res:
1138 self.GET("helper_status", followRedirect=True))
1139 def _got_helper_status(res):
1140 self.failUnless("Bytes Fetched:" in res)
1141 # touch a couple of files in the helper's working directory to
1142 # exercise more code paths
1143 workdir = os.path.join(self.getdir("client0"), "helper")
1144 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1145 f = open(incfile, "wb")
1146 f.write("small file")
1148 then = time.time() - 86400*3
1150 os.utime(incfile, (now, then))
1151 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1152 f = open(encfile, "wb")
1153 f.write("less small file")
1155 os.utime(encfile, (now, then))
1156 d.addCallback(_got_helper_status)
1157 # and that the json form exists
1158 d.addCallback(lambda res:
1159 self.GET("helper_status?t=json", followRedirect=True))
1160 def _got_helper_status_json(res):
1161 data = simplejson.loads(res)
1162 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1164 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1165 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1166 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1168 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1169 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1170 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1172 d.addCallback(_got_helper_status_json)
1174 # and check that client[3] (which uses a helper but does not run one
1175 # itself) doesn't explode when you ask for its status
1176 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1177 def _got_non_helper_status(res):
1178 self.failUnless("Upload and Download Status" in res)
1179 d.addCallback(_got_non_helper_status)
1181 # or for helper status with t=json
1182 d.addCallback(lambda res:
1183 getPage(self.helper_webish_url + "helper_status?t=json"))
1184 def _got_non_helper_status_json(res):
1185 data = simplejson.loads(res)
1186 self.failUnlessEqual(data, {})
1187 d.addCallback(_got_non_helper_status_json)
1189 # see if the statistics page exists
1190 d.addCallback(lambda res: self.GET("statistics"))
1191 def _got_stats(res):
1192 self.failUnless("Node Statistics" in res)
1193 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1194 d.addCallback(_got_stats)
1195 d.addCallback(lambda res: self.GET("statistics?t=json"))
1196 def _got_stats_json(res):
1197 data = simplejson.loads(res)
1198 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1199 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1200 d.addCallback(_got_stats_json)
1202 # TODO: mangle the second segment of a file, to test errors that
1203 # occur after we've already sent some good data, which uses a
1204 # different error path.
1206 # TODO: download a URI with a form
1207 # TODO: create a directory by using a form
1208 # TODO: upload by using a form on the directory page
1209 # url = base + "somedir/subdir1/freeform_post!!upload"
1210 # TODO: delete a file by using a button on the directory page
1214 def _test_runner(self, res):
1215 # exercise some of the diagnostic tools in runner.py
1218 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1219 if "storage" not in dirpath:
1223 pieces = dirpath.split(os.sep)
1224 if pieces[-4] == "storage" and pieces[-3] == "shares":
1225 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1226 # are sharefiles here
1227 filename = os.path.join(dirpath, filenames[0])
1228 # peek at the magic to see if it is a chk share
1229 magic = open(filename, "rb").read(4)
1230 if magic == '\x00\x00\x00\x01':
1233 self.fail("unable to find any uri_extension files in %s"
1235 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1237 out,err = StringIO(), StringIO()
1238 rc = runner.runner(["debug", "dump-share", "--offsets",
1240 stdout=out, stderr=err)
1241 output = out.getvalue()
1242 self.failUnlessEqual(rc, 0)
1244 # we only upload a single file, so we can assert some things about
1245 # its size and shares.
1246 self.failUnless(("share filename: %s" % filename) in output)
1247 self.failUnless("size: %d\n" % len(self.data) in output)
1248 self.failUnless("num_segments: 1\n" in output)
1249 # segment_size is always a multiple of needed_shares
1250 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1251 self.failUnless("total_shares: 10\n" in output)
1252 # keys which are supposed to be present
1253 for key in ("size", "num_segments", "segment_size",
1254 "needed_shares", "total_shares",
1255 "codec_name", "codec_params", "tail_codec_params",
1256 #"plaintext_hash", "plaintext_root_hash",
1257 "crypttext_hash", "crypttext_root_hash",
1258 "share_root_hash", "UEB_hash"):
1259 self.failUnless("%s: " % key in output, key)
1260 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1262 # now use its storage index to find the other shares using the
1263 # 'find-shares' tool
1264 sharedir, shnum = os.path.split(filename)
1265 storagedir, storage_index_s = os.path.split(sharedir)
1266 out,err = StringIO(), StringIO()
1267 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1268 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1269 rc = runner.runner(cmd, stdout=out, stderr=err)
1270 self.failUnlessEqual(rc, 0)
1272 sharefiles = [sfn.strip() for sfn in out.readlines()]
1273 self.failUnlessEqual(len(sharefiles), 10)
1275 # also exercise the 'catalog-shares' tool
1276 out,err = StringIO(), StringIO()
1277 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1278 cmd = ["debug", "catalog-shares"] + nodedirs
1279 rc = runner.runner(cmd, stdout=out, stderr=err)
1280 self.failUnlessEqual(rc, 0)
1282 descriptions = [sfn.strip() for sfn in out.readlines()]
1283 self.failUnlessEqual(len(descriptions), 30)
1285 for line in descriptions
1286 if line.startswith("CHK %s " % storage_index_s)]
1287 self.failUnlessEqual(len(matching), 10)
1289 def _test_control(self, res):
1290 # exercise the remote-control-the-client foolscap interfaces in
1291 # allmydata.control (mostly used for performance tests)
1292 c0 = self.clients[0]
1293 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1294 control_furl = open(control_furl_file, "r").read().strip()
1295 # it doesn't really matter which Tub we use to connect to the client,
1296 # so let's just use our IntroducerNode's
1297 d = self.introducer.tub.getReference(control_furl)
1298 d.addCallback(self._test_control2, control_furl_file)
1300 def _test_control2(self, rref, filename):
1301 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1302 downfile = os.path.join(self.basedir, "control.downfile")
1303 d.addCallback(lambda uri:
1304 rref.callRemote("download_from_uri_to_file",
1307 self.failUnlessEqual(res, downfile)
1308 data = open(downfile, "r").read()
1309 expected_data = open(filename, "r").read()
1310 self.failUnlessEqual(data, expected_data)
1311 d.addCallback(_check)
1312 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1313 if sys.platform == "linux2":
1314 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1315 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1318 def _test_cli(self, res):
1319 # run various CLI commands (in a thread, since they use blocking
1322 private_uri = self._private_node.get_uri()
1323 some_uri = self._root_directory_uri
1324 client0_basedir = self.getdir("client0")
1327 "--node-directory", client0_basedir,
1329 TESTDATA = "I will not write the same thing over and over.\n" * 100
1331 d = defer.succeed(None)
1333 # for compatibility with earlier versions, private/root_dir.cap is
1334 # supposed to be treated as an alias named "tahoe:". Start by making
1335 # sure that works, before we add other aliases.
1337 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1338 f = open(root_file, "w")
1339 f.write(private_uri)
1342 def run(ignored, verb, *args, **kwargs):
1343 stdin = kwargs.get("stdin", "")
1344 newargs = [verb] + nodeargs + list(args)
1345 return self._run_cli(newargs, stdin=stdin)
1347 def _check_ls((out,err), expected_children, unexpected_children=[]):
1348 self.failUnlessEqual(err, "")
1349 for s in expected_children:
1350 self.failUnless(s in out, (s,out))
1351 for s in unexpected_children:
1352 self.failIf(s in out, (s,out))
1354 def _check_ls_root((out,err)):
1355 self.failUnless("personal" in out)
1356 self.failUnless("s2-ro" in out)
1357 self.failUnless("s2-rw" in out)
1358 self.failUnlessEqual(err, "")
1360 # this should reference private_uri
1361 d.addCallback(run, "ls")
1362 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1364 d.addCallback(run, "list-aliases")
1365 def _check_aliases_1((out,err)):
1366 self.failUnlessEqual(err, "")
1367 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1368 d.addCallback(_check_aliases_1)
1370 # now that that's out of the way, remove root_dir.cap and work with
1372 d.addCallback(lambda res: os.unlink(root_file))
1373 d.addCallback(run, "list-aliases")
1374 def _check_aliases_2((out,err)):
1375 self.failUnlessEqual(err, "")
1376 self.failUnlessEqual(out, "")
1377 d.addCallback(_check_aliases_2)
1379 d.addCallback(run, "mkdir")
1380 def _got_dir( (out,err) ):
1381 self.failUnless(uri.from_string_dirnode(out.strip()))
1383 d.addCallback(_got_dir)
1384 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1386 d.addCallback(run, "list-aliases")
1387 def _check_aliases_3((out,err)):
1388 self.failUnlessEqual(err, "")
1389 self.failUnless("tahoe: " in out)
1390 d.addCallback(_check_aliases_3)
1392 def _check_empty_dir((out,err)):
1393 self.failUnlessEqual(out, "")
1394 self.failUnlessEqual(err, "")
1395 d.addCallback(run, "ls")
1396 d.addCallback(_check_empty_dir)
1398 def _check_missing_dir((out,err)):
1399 # TODO: check that rc==2
1400 self.failUnlessEqual(out, "")
1401 self.failUnlessEqual(err, "No such file or directory\n")
1402 d.addCallback(run, "ls", "bogus")
1403 d.addCallback(_check_missing_dir)
1408 fn = os.path.join(self.basedir, "file%d" % i)
1410 data = "data to be uploaded: file%d\n" % i
1412 open(fn,"wb").write(data)
1414 def _check_stdout_against((out,err), filenum=None, data=None):
1415 self.failUnlessEqual(err, "")
1416 if filenum is not None:
1417 self.failUnlessEqual(out, datas[filenum])
1418 if data is not None:
1419 self.failUnlessEqual(out, data)
1421 # test all both forms of put: from a file, and from stdin
1423 d.addCallback(run, "put", files[0], "tahoe-file0")
1424 def _put_out((out,err)):
1425 self.failUnless("URI:LIT:" in out, out)
1426 self.failUnless("201 Created" in err, err)
1428 return run(None, "get", uri0)
1429 d.addCallback(_put_out)
1430 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1432 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1433 # tahoe put bar tahoe:FOO
1434 d.addCallback(run, "put", files[2], "tahoe:file2")
1435 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1436 def _check_put_mutable((out,err)):
1437 self._mutable_file3_uri = out.strip()
1438 d.addCallback(_check_put_mutable)
1439 d.addCallback(run, "get", "tahoe:file3")
1440 d.addCallback(_check_stdout_against, 3)
1443 STDIN_DATA = "This is the file to upload from stdin."
1444 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1445 # tahoe put tahoe:FOO
1446 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1447 stdin="Other file from stdin.")
1449 d.addCallback(run, "ls")
1450 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1451 "tahoe-file-stdin", "from-stdin"])
1452 d.addCallback(run, "ls", "subdir")
1453 d.addCallback(_check_ls, ["tahoe-file1"])
1456 d.addCallback(run, "mkdir", "subdir2")
1457 d.addCallback(run, "ls")
1458 # TODO: extract the URI, set an alias with it
1459 d.addCallback(_check_ls, ["subdir2"])
1461 # tahoe get: (to stdin and to a file)
1462 d.addCallback(run, "get", "tahoe-file0")
1463 d.addCallback(_check_stdout_against, 0)
1464 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1465 d.addCallback(_check_stdout_against, 1)
1466 outfile0 = os.path.join(self.basedir, "outfile0")
1467 d.addCallback(run, "get", "file2", outfile0)
1468 def _check_outfile0((out,err)):
1469 data = open(outfile0,"rb").read()
1470 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1471 d.addCallback(_check_outfile0)
1472 outfile1 = os.path.join(self.basedir, "outfile0")
1473 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1474 def _check_outfile1((out,err)):
1475 data = open(outfile1,"rb").read()
1476 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1477 d.addCallback(_check_outfile1)
1479 d.addCallback(run, "rm", "tahoe-file0")
1480 d.addCallback(run, "rm", "tahoe:file2")
1481 d.addCallback(run, "ls")
1482 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1484 d.addCallback(run, "ls", "-l")
1485 def _check_ls_l((out,err)):
1486 lines = out.split("\n")
1488 if "tahoe-file-stdin" in l:
1489 self.failUnless(l.startswith("-r-- "), l)
1490 self.failUnless(" %d " % len(STDIN_DATA) in l)
1492 self.failUnless(l.startswith("-rw- "), l) # mutable
1493 d.addCallback(_check_ls_l)
1495 d.addCallback(run, "ls", "--uri")
1496 def _check_ls_uri((out,err)):
1497 lines = out.split("\n")
1500 self.failUnless(self._mutable_file3_uri in l)
1501 d.addCallback(_check_ls_uri)
1503 d.addCallback(run, "ls", "--readonly-uri")
1504 def _check_ls_rouri((out,err)):
1505 lines = out.split("\n")
1508 rw_uri = self._mutable_file3_uri
1509 u = uri.from_string_mutable_filenode(rw_uri)
1510 ro_uri = u.get_readonly().to_string()
1511 self.failUnless(ro_uri in l)
1512 d.addCallback(_check_ls_rouri)
1515 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1516 d.addCallback(run, "ls")
1517 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1519 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1520 d.addCallback(run, "ls")
1521 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1523 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1524 d.addCallback(run, "ls")
1525 d.addCallback(_check_ls, ["file3", "file3-copy"])
1526 d.addCallback(run, "get", "tahoe:file3-copy")
1527 d.addCallback(_check_stdout_against, 3)
1529 # copy from disk into tahoe
1530 d.addCallback(run, "cp", files[4], "tahoe:file4")
1531 d.addCallback(run, "ls")
1532 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1533 d.addCallback(run, "get", "tahoe:file4")
1534 d.addCallback(_check_stdout_against, 4)
1536 # copy from tahoe into disk
1537 target_filename = os.path.join(self.basedir, "file-out")
1538 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1539 def _check_cp_out((out,err)):
1540 self.failUnless(os.path.exists(target_filename))
1541 got = open(target_filename,"rb").read()
1542 self.failUnlessEqual(got, datas[4])
1543 d.addCallback(_check_cp_out)
1545 # copy from disk to disk (silly case)
1546 target2_filename = os.path.join(self.basedir, "file-out-copy")
1547 d.addCallback(run, "cp", target_filename, target2_filename)
1548 def _check_cp_out2((out,err)):
1549 self.failUnless(os.path.exists(target2_filename))
1550 got = open(target2_filename,"rb").read()
1551 self.failUnlessEqual(got, datas[4])
1552 d.addCallback(_check_cp_out2)
1554 # copy from tahoe into disk, overwriting an existing file
1555 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1556 def _check_cp_out3((out,err)):
1557 self.failUnless(os.path.exists(target_filename))
1558 got = open(target_filename,"rb").read()
1559 self.failUnlessEqual(got, datas[3])
1560 d.addCallback(_check_cp_out3)
1562 # copy from disk into tahoe, overwriting an existing immutable file
1563 d.addCallback(run, "cp", files[5], "tahoe:file4")
1564 d.addCallback(run, "ls")
1565 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1566 d.addCallback(run, "get", "tahoe:file4")
1567 d.addCallback(_check_stdout_against, 5)
1569 # copy from disk into tahoe, overwriting an existing mutable file
1570 d.addCallback(run, "cp", files[5], "tahoe:file3")
1571 d.addCallback(run, "ls")
1572 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1573 d.addCallback(run, "get", "tahoe:file3")
1574 d.addCallback(_check_stdout_against, 5)
1576 # recursive copy: setup
1577 dn = os.path.join(self.basedir, "dir1")
1579 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1580 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1581 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1582 sdn2 = os.path.join(dn, "subdir2")
1584 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1585 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1587 # from disk into tahoe
1588 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1589 d.addCallback(run, "ls")
1590 d.addCallback(_check_ls, ["dir1"])
1591 d.addCallback(run, "ls", "dir1")
1592 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1593 ["rfile4", "rfile5"])
1594 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1595 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1596 ["rfile1", "rfile2", "rfile3"])
1597 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1598 d.addCallback(_check_stdout_against, data="rfile4")
1600 # and back out again
1601 dn_copy = os.path.join(self.basedir, "dir1-copy")
1602 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1603 def _check_cp_r_out((out,err)):
1605 old = open(os.path.join(dn, name), "rb").read()
1606 newfn = os.path.join(dn_copy, name)
1607 self.failUnless(os.path.exists(newfn))
1608 new = open(newfn, "rb").read()
1609 self.failUnlessEqual(old, new)
1613 _cmp(os.path.join("subdir2", "rfile4"))
1614 _cmp(os.path.join("subdir2", "rfile5"))
1615 d.addCallback(_check_cp_r_out)
1617 # and copy it a second time, which ought to overwrite the same files
1618 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1620 # and tahoe-to-tahoe
1621 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1622 d.addCallback(run, "ls")
1623 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1624 d.addCallback(run, "ls", "dir1-copy")
1625 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1626 ["rfile4", "rfile5"])
1627 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1628 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1629 ["rfile1", "rfile2", "rfile3"])
1630 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1631 d.addCallback(_check_stdout_against, data="rfile4")
1633 # and copy it a second time, which ought to overwrite the same files
1634 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1636 # tahoe_ls doesn't currently handle the error correctly: it tries to
1637 # JSON-parse a traceback.
1638 ## def _ls_missing(res):
1639 ## argv = ["ls"] + nodeargs + ["bogus"]
1640 ## return self._run_cli(argv)
1641 ## d.addCallback(_ls_missing)
1642 ## def _check_ls_missing((out,err)):
1645 ## self.failUnlessEqual(err, "")
1646 ## d.addCallback(_check_ls_missing)
1650 def _run_cli(self, argv, stdin=""):
1652 stdout, stderr = StringIO(), StringIO()
1653 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1654 stdin=StringIO(stdin),
1655 stdout=stdout, stderr=stderr)
1657 return stdout.getvalue(), stderr.getvalue()
1658 d.addCallback(_done)
1661 def _test_checker(self, res):
1662 ut = upload.Data("too big to be literal" * 200, convergence=None)
1663 d = self._personal_node.add_file(u"big file", ut)
1665 d.addCallback(lambda res: self._personal_node.check())
1666 def _check_dirnode_results(r):
1667 self.failUnless(r.is_healthy())
1668 d.addCallback(_check_dirnode_results)
1669 d.addCallback(lambda res: self._personal_node.check(verify=True))
1670 d.addCallback(_check_dirnode_results)
1672 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1673 def _got_chk_filenode(n):
1674 self.failUnless(isinstance(n, filenode.FileNode))
1676 def _check_filenode_results(r):
1677 self.failUnless(r.is_healthy())
1678 d.addCallback(_check_filenode_results)
1679 d.addCallback(lambda res: n.check(verify=True))
1680 d.addCallback(_check_filenode_results)
1682 d.addCallback(_got_chk_filenode)
1684 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1685 def _got_lit_filenode(n):
1686 self.failUnless(isinstance(n, filenode.LiteralFileNode))
1688 def _check_lit_filenode_results(r):
1689 self.failUnlessEqual(r, None)
1690 d.addCallback(_check_lit_filenode_results)
1691 d.addCallback(lambda res: n.check(verify=True))
1692 d.addCallback(_check_lit_filenode_results)
1694 d.addCallback(_got_lit_filenode)
1698 class MutableChecker(SystemTestMixin, unittest.TestCase):
1700 def _run_cli(self, argv):
1701 stdout, stderr = StringIO(), StringIO()
1702 runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1703 return stdout.getvalue()
1705 def test_good(self):
1706 self.basedir = self.mktemp()
1707 d = self.set_up_nodes()
1708 CONTENTS = "a little bit of data"
1709 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1712 si = self.node.get_storage_index()
1713 d.addCallback(_created)
1714 # now make sure the webapi verifier sees no problems
1716 url = (self.webish_url +
1717 "uri/%s" % urllib.quote(self.node.get_uri()) +
1718 "?t=check&verify=true")
1719 return getPage(url, method="POST")
1720 d.addCallback(_do_check)
1721 def _got_results(out):
1722 self.failUnless("<div>Healthy!</div>" in out, out)
1723 self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1724 self.failIf("Not Healthy!" in out, out)
1725 self.failIf("Unhealthy" in out, out)
1726 self.failIf("Corrupt Shares" in out, out)
1727 d.addCallback(_got_results)
1730 def test_corrupt(self):
1731 self.basedir = self.mktemp()
1732 d = self.set_up_nodes()
1733 CONTENTS = "a little bit of data"
1734 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1737 si = self.node.get_storage_index()
1738 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1739 self.clients[1].basedir])
1740 files = out.split("\n")
1741 # corrupt one of them, using the CLI debug command
1743 shnum = os.path.basename(f)
1744 nodeid = self.clients[1].nodeid
1745 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1746 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1747 out = self._run_cli(["debug", "corrupt-share", files[0]])
1748 d.addCallback(_created)
1749 # now make sure the webapi verifier notices it
1751 url = (self.webish_url +
1752 "uri/%s" % urllib.quote(self.node.get_uri()) +
1753 "?t=check&verify=true")
1754 return getPage(url, method="POST")
1755 d.addCallback(_do_check)
1756 def _got_results(out):
1757 self.failUnless("Not Healthy!" in out, out)
1758 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1759 self.failUnless("Corrupt Shares:" in out, out)
1760 d.addCallback(_got_results)
1762 # now make sure the webapi repairer can fix it
1763 def _do_repair(res):
1764 url = (self.webish_url +
1765 "uri/%s" % urllib.quote(self.node.get_uri()) +
1766 "?t=check&verify=true&repair=true")
1767 return getPage(url, method="POST")
1768 d.addCallback(_do_repair)
1769 def _got_repair_results(out):
1770 self.failUnless("<div>Repair successful</div>" in out, out)
1771 d.addCallback(_got_repair_results)
1772 d.addCallback(_do_check)
1773 def _got_postrepair_results(out):
1774 self.failIf("Not Healthy!" in out, out)
1775 self.failUnless("Recoverable Versions: 10*seq" in out, out)
1776 d.addCallback(_got_postrepair_results)
1780 def test_delete_share(self):
1781 self.basedir = self.mktemp()
1782 d = self.set_up_nodes()
1783 CONTENTS = "a little bit of data"
1784 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1787 si = self.node.get_storage_index()
1788 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1789 self.clients[1].basedir])
1790 files = out.split("\n")
1791 # corrupt one of them, using the CLI debug command
1793 shnum = os.path.basename(f)
1794 nodeid = self.clients[1].nodeid
1795 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1796 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1798 d.addCallback(_created)
1799 # now make sure the webapi checker notices it
1801 url = (self.webish_url +
1802 "uri/%s" % urllib.quote(self.node.get_uri()) +
1803 "?t=check&verify=false")
1804 return getPage(url, method="POST")
1805 d.addCallback(_do_check)
1806 def _got_results(out):
1807 self.failUnless("Not Healthy!" in out, out)
1808 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1809 self.failIf("Corrupt Shares" in out, out)
1810 d.addCallback(_got_results)
1812 # now make sure the webapi repairer can fix it
1813 def _do_repair(res):
1814 url = (self.webish_url +
1815 "uri/%s" % urllib.quote(self.node.get_uri()) +
1816 "?t=check&verify=false&repair=true")
1817 return getPage(url, method="POST")
1818 d.addCallback(_do_repair)
1819 def _got_repair_results(out):
1820 self.failUnless("Repair successful" in out)
1821 d.addCallback(_got_repair_results)
1822 d.addCallback(_do_check)
1823 def _got_postrepair_results(out):
1824 self.failIf("Not Healthy!" in out, out)
1825 self.failUnless("Recoverable Versions: 10*seq" in out)
1826 d.addCallback(_got_postrepair_results)
1830 class DeepCheckWeb(SystemTestMixin, unittest.TestCase):
1831 # construct a small directory tree (with one dir, one immutable file, one
1832 # mutable file, one LIT file, and a loop), and then check/examine it in
1835 def set_up_tree(self, ignored):
1837 c0 = self.clients[0]
1838 d = c0.create_empty_dirnode()
1839 def _created_root(n):
1841 self.root_uri = n.get_uri()
1842 d.addCallback(_created_root)
1843 d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1844 d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1845 def _created_mutable(n):
1847 self.mutable_uri = n.get_uri()
1848 d.addCallback(_created_mutable)
1850 large = upload.Data("Lots of data\n" * 1000, None)
1851 d.addCallback(lambda ign: self.root.add_file(u"large", large))
1852 def _created_large(n):
1854 self.large_uri = n.get_uri()
1855 d.addCallback(_created_large)
1857 small = upload.Data("Small enough for a LIT", None)
1858 d.addCallback(lambda ign: self.root.add_file(u"small", small))
1859 def _created_small(n):
1861 self.small_uri = n.get_uri()
1862 d.addCallback(_created_small)
1864 d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
1867 def check_is_healthy(self, cr, n, where, incomplete=False):
1868 self.failUnless(ICheckerResults.providedBy(cr), where)
1869 self.failUnless(cr.is_healthy(), where)
1870 self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
1872 self.failUnlessEqual(cr.get_storage_index_string(),
1873 base32.b2a(n.get_storage_index()), where)
1874 needs_rebalancing = bool( len(self.clients) < 10 )
1876 self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
1878 self.failUnlessEqual(d["count-shares-good"], 10, where)
1879 self.failUnlessEqual(d["count-shares-needed"], 3, where)
1880 self.failUnlessEqual(d["count-shares-expected"], 10, where)
1882 self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
1883 self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
1884 self.failUnlessEqual(d["list-corrupt-shares"], [], where)
1886 self.failUnlessEqual(sorted(d["servers-responding"]),
1887 sorted([c.nodeid for c in self.clients]),
1889 self.failUnless("sharemap" in d, where)
1890 all_serverids = set()
1891 for (shareid, serverids) in d["sharemap"].items():
1892 all_serverids.update(serverids)
1893 self.failUnlessEqual(sorted(all_serverids),
1894 sorted([c.nodeid for c in self.clients]),
1897 self.failUnlessEqual(d["count-wrong-shares"], 0, where)
1898 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
1899 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
1902 def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
1903 self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
1904 self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
1905 self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
1906 self.failUnless(cr.get_post_repair_results().is_healthy(), where)
1907 self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
1908 self.failIf(cr.get_repair_attempted(), where)
1910 def deep_check_is_healthy(self, cr, num_healthy, where):
1911 self.failUnless(IDeepCheckResults.providedBy(cr))
1912 self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
1915 def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
1916 self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
1917 c = cr.get_counters()
1918 self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
1920 self.failUnlessEqual(c["count-objects-healthy-post-repair"],
1922 self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
1924 def test_good(self):
1925 self.basedir = self.mktemp()
1926 d = self.set_up_nodes()
1927 d.addCallback(self.set_up_tree)
1928 d.addCallback(self.do_stats)
1929 d.addCallback(self.do_test_good)
1930 d.addCallback(self.do_test_web)
1933 def do_stats(self, ignored):
1934 d = defer.succeed(None)
1935 d.addCallback(lambda ign: self.root.deep_stats())
1936 d.addCallback(self.check_stats)
1939 def check_stats(self, s):
1940 self.failUnlessEqual(s["count-directories"], 1)
1941 self.failUnlessEqual(s["count-files"], 3)
1942 self.failUnlessEqual(s["count-immutable-files"], 1)
1943 self.failUnlessEqual(s["count-literal-files"], 1)
1944 self.failUnlessEqual(s["count-mutable-files"], 1)
1945 # don't check directories: their size will vary
1946 # s["largest-directory"]
1947 # s["size-directories"]
1948 self.failUnlessEqual(s["largest-directory-children"], 4)
1949 self.failUnlessEqual(s["largest-immutable-file"], 13000)
1950 # to re-use this function for both the local dirnode.deep_stats() and
1951 # the webapi t=deep-stats, we coerce the result into a list of
1952 # tuples. dirnode.deep_stats() returns a list of tuples, but JSON
1953 # only knows about lists., so t=deep-stats returns a list of lists.
1954 histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
1955 self.failUnlessEqual(histogram, [(11, 31, 1),
1958 self.failUnlessEqual(s["size-immutable-files"], 13000)
1959 self.failUnlessEqual(s["size-literal-files"], 22)
1961 def do_test_good(self, ignored):
1962 d = defer.succeed(None)
1963 # check the individual items
1964 d.addCallback(lambda ign: self.root.check())
1965 d.addCallback(self.check_is_healthy, self.root, "root")
1966 d.addCallback(lambda ign: self.mutable.check())
1967 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
1968 d.addCallback(lambda ign: self.large.check())
1969 d.addCallback(self.check_is_healthy, self.large, "large")
1970 d.addCallback(lambda ign: self.small.check())
1971 d.addCallback(self.failUnlessEqual, None, "small")
1973 # and again with verify=True
1974 d.addCallback(lambda ign: self.root.check(verify=True))
1975 d.addCallback(self.check_is_healthy, self.root, "root")
1976 d.addCallback(lambda ign: self.mutable.check(verify=True))
1977 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
1978 d.addCallback(lambda ign: self.large.check(verify=True))
1979 d.addCallback(self.check_is_healthy, self.large, "large",
1981 d.addCallback(lambda ign: self.small.check(verify=True))
1982 d.addCallback(self.failUnlessEqual, None, "small")
1984 # and check_and_repair(), which should be a nop
1985 d.addCallback(lambda ign: self.root.check_and_repair())
1986 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
1987 d.addCallback(lambda ign: self.mutable.check_and_repair())
1988 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
1989 d.addCallback(lambda ign: self.large.check_and_repair())
1990 d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
1991 d.addCallback(lambda ign: self.small.check_and_repair())
1992 d.addCallback(self.failUnlessEqual, None, "small")
1994 # check_and_repair(verify=True)
1995 d.addCallback(lambda ign: self.root.check_and_repair(verify=True))
1996 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
1997 d.addCallback(lambda ign: self.mutable.check_and_repair(verify=True))
1998 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
1999 d.addCallback(lambda ign: self.large.check_and_repair(verify=True))
2000 d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2002 d.addCallback(lambda ign: self.small.check_and_repair(verify=True))
2003 d.addCallback(self.failUnlessEqual, None, "small")
2006 # now deep-check the root, with various verify= and repair= options
2007 d.addCallback(lambda ign: self.root.deep_check())
2008 d.addCallback(self.deep_check_is_healthy, 3, "root")
2009 d.addCallback(lambda ign: self.root.deep_check(verify=True))
2010 d.addCallback(self.deep_check_is_healthy, 3, "root")
2011 d.addCallback(lambda ign: self.root.deep_check_and_repair())
2012 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2013 d.addCallback(lambda ign: self.root.deep_check_and_repair(verify=True))
2014 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2018 def web_json(self, n, **kwargs):
2019 kwargs["output"] = "json"
2020 d = self.web(n, "POST", **kwargs)
2021 d.addCallback(self.decode_json)
2024 def decode_json(self, (s,url)):
2026 data = simplejson.loads(s)
2028 self.fail("%s: not JSON: '%s'" % (url, s))
2031 def web(self, n, method="GET", **kwargs):
2032 # returns (data, url)
2033 url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
2034 + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
2035 d = getPage(url, method=method)
2036 d.addCallback(lambda data: (data,url))
2039 def json_check_is_healthy(self, data, n, where, incomplete=False):
2041 self.failUnlessEqual(data["storage-index"],
2042 base32.b2a(n.get_storage_index()), where)
2044 self.failUnlessEqual(r["healthy"], True, where)
2045 needs_rebalancing = bool( len(self.clients) < 10 )
2047 self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2048 self.failUnlessEqual(r["count-shares-good"], 10, where)
2049 self.failUnlessEqual(r["count-shares-needed"], 3, where)
2050 self.failUnlessEqual(r["count-shares-expected"], 10, where)
2052 self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2053 self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2054 self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2056 self.failUnlessEqual(sorted(r["servers-responding"]),
2057 sorted([idlib.nodeid_b2a(c.nodeid)
2058 for c in self.clients]), where)
2059 self.failUnless("sharemap" in r, where)
2060 all_serverids = set()
2061 for (shareid, serverids_s) in r["sharemap"].items():
2062 all_serverids.update(serverids_s)
2063 self.failUnlessEqual(sorted(all_serverids),
2064 sorted([idlib.nodeid_b2a(c.nodeid)
2065 for c in self.clients]), where)
2066 self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2067 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2068 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2070 def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2071 self.failUnlessEqual(data["storage-index"],
2072 base32.b2a(n.get_storage_index()), where)
2073 self.failUnlessEqual(data["repair-attempted"], False, where)
2074 self.json_check_is_healthy(data["pre-repair-results"],
2075 n, where, incomplete)
2076 self.json_check_is_healthy(data["post-repair-results"],
2077 n, where, incomplete)
2079 def json_full_deepcheck_is_healthy(self, data, n, where):
2080 self.failUnlessEqual(data["root-storage-index"],
2081 base32.b2a(n.get_storage_index()), where)
2082 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2083 self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2084 self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2085 self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2086 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2087 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2088 self.json_check_stats(data["stats"], where)
2090 def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2091 self.failUnlessEqual(data["root-storage-index"],
2092 base32.b2a(n.get_storage_index()), where)
2093 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2095 self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2096 self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2097 self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2099 self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2100 self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2101 self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2103 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2104 self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2105 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2107 self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2108 self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2109 self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2112 def json_check_lit(self, data, n, where):
2113 self.failUnlessEqual(data["storage-index"], "", where)
2114 self.failUnlessEqual(data["results"]["healthy"], True, where)
2116 def json_check_stats(self, data, where):
2117 self.check_stats(data)
2119 def do_test_web(self, ignored):
2120 d = defer.succeed(None)
2123 d.addCallback(lambda ign: self.web(self.root, t="deep-stats"))
2124 d.addCallback(self.decode_json)
2125 d.addCallback(self.json_check_stats, "deep-stats")
2128 d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2129 d.addCallback(self.json_check_is_healthy, self.root, "root")
2130 d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2131 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2132 d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2133 d.addCallback(self.json_check_is_healthy, self.large, "large")
2134 d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2135 d.addCallback(self.json_check_lit, self.small, "small")
2138 d.addCallback(lambda ign:
2139 self.web_json(self.root, t="check", verify="true"))
2140 d.addCallback(self.json_check_is_healthy, self.root, "root")
2141 d.addCallback(lambda ign:
2142 self.web_json(self.mutable, t="check", verify="true"))
2143 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2144 d.addCallback(lambda ign:
2145 self.web_json(self.large, t="check", verify="true"))
2146 d.addCallback(self.json_check_is_healthy, self.large, "large", incomplete=True)
2147 d.addCallback(lambda ign:
2148 self.web_json(self.small, t="check", verify="true"))
2149 d.addCallback(self.json_check_lit, self.small, "small")
2151 # check and repair, no verify
2152 d.addCallback(lambda ign:
2153 self.web_json(self.root, t="check", repair="true"))
2154 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2155 d.addCallback(lambda ign:
2156 self.web_json(self.mutable, t="check", repair="true"))
2157 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2158 d.addCallback(lambda ign:
2159 self.web_json(self.large, t="check", repair="true"))
2160 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large")
2161 d.addCallback(lambda ign:
2162 self.web_json(self.small, t="check", repair="true"))
2163 d.addCallback(self.json_check_lit, self.small, "small")
2165 # check+verify+repair
2166 d.addCallback(lambda ign:
2167 self.web_json(self.root, t="check", repair="true", verify="true"))
2168 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2169 d.addCallback(lambda ign:
2170 self.web_json(self.mutable, t="check", repair="true", verify="true"))
2171 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2172 d.addCallback(lambda ign:
2173 self.web_json(self.large, t="check", repair="true", verify="true"))
2174 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large", incomplete=True)
2175 d.addCallback(lambda ign:
2176 self.web_json(self.small, t="check", repair="true", verify="true"))
2177 d.addCallback(self.json_check_lit, self.small, "small")
2179 # now run a deep-check, with various verify= and repair= flags
2180 d.addCallback(lambda ign:
2181 self.web_json(self.root, t="deep-check"))
2182 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2183 d.addCallback(lambda ign:
2184 self.web_json(self.root, t="deep-check", verify="true"))
2185 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2186 d.addCallback(lambda ign:
2187 self.web_json(self.root, t="deep-check", repair="true"))
2188 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2189 d.addCallback(lambda ign:
2190 self.web_json(self.root, t="deep-check", verify="true", repair="true"))
2191 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2193 # now look at t=info
2194 d.addCallback(lambda ign: self.web(self.root, t="info"))
2195 # TODO: examine the output
2196 d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2197 d.addCallback(lambda ign: self.web(self.large, t="info"))
2198 d.addCallback(lambda ign: self.web(self.small, t="info"))