1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
11 from allmydata import uri, storage, offloaded
12 from allmydata.immutable import download, upload, filenode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17 ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
18 IDeepCheckAndRepairResults
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
27 from allmydata.test.common import SystemTestMixin, WebErrorMixin
30 This is some data to publish to the virtual drive, which needs to be large
31 enough to not fit inside a LIT uri.
34 class CountingDataUploadable(upload.Data):
36 interrupt_after = None
37 interrupt_after_d = None
39 def read(self, length):
40 self.bytes_read += length
41 if self.interrupt_after is not None:
42 if self.bytes_read > self.interrupt_after:
43 self.interrupt_after = None
44 self.interrupt_after_d.callback(self)
45 return upload.Data.read(self, length)
47 class GrabEverythingConsumer:
53 def registerProducer(self, producer, streaming):
55 assert IPushProducer.providedBy(producer)
57 def write(self, data):
60 def unregisterProducer(self):
63 class SystemTest(SystemTestMixin, unittest.TestCase):
65 def test_connections(self):
66 self.basedir = "system/SystemTest/test_connections"
67 d = self.set_up_nodes()
68 self.extra_node = None
69 d.addCallback(lambda res: self.add_extra_node(self.numclients))
70 def _check(extra_node):
71 self.extra_node = extra_node
72 for c in self.clients:
73 all_peerids = list(c.get_all_peerids())
74 self.failUnlessEqual(len(all_peerids), self.numclients+1)
75 permuted_peers = list(c.get_permuted_peers("storage", "a"))
76 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
79 def _shutdown_extra_node(res):
81 return self.extra_node.stopService()
83 d.addBoth(_shutdown_extra_node)
85 test_connections.timeout = 300
86 # test_connections is subsumed by test_upload_and_download, and takes
87 # quite a while to run on a slow machine (because of all the TLS
88 # connections that must be established). If we ever rework the introducer
89 # code to such an extent that we're not sure if it works anymore, we can
90 # reinstate this test until it does.
93 def test_upload_and_download_random_key(self):
94 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
95 return self._test_upload_and_download(convergence=None)
96 test_upload_and_download_random_key.timeout = 4800
98 def test_upload_and_download_convergent(self):
99 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
100 return self._test_upload_and_download(convergence="some convergence string")
101 test_upload_and_download_convergent.timeout = 4800
103 def _test_upload_and_download(self, convergence):
104 # we use 4000 bytes of data, which will result in about 400k written
105 # to disk among all our simulated nodes
106 DATA = "Some data to upload\n" * 200
107 d = self.set_up_nodes()
108 def _check_connections(res):
109 for c in self.clients:
110 all_peerids = list(c.get_all_peerids())
111 self.failUnlessEqual(len(all_peerids), self.numclients)
112 permuted_peers = list(c.get_permuted_peers("storage", "a"))
113 self.failUnlessEqual(len(permuted_peers), self.numclients)
114 d.addCallback(_check_connections)
118 u = self.clients[0].getServiceNamed("uploader")
120 # we crank the max segsize down to 1024b for the duration of this
121 # test, so we can exercise multiple segments. It is important
122 # that this is not a multiple of the segment size, so that the
123 # tail segment is not the same length as the others. This actualy
124 # gets rounded up to 1025 to be a multiple of the number of
125 # required shares (since we use 25 out of 100 FEC).
126 up = upload.Data(DATA, convergence=convergence)
127 up.max_segment_size = 1024
130 d.addCallback(_do_upload)
131 def _upload_done(results):
133 log.msg("upload finished: uri is %s" % (uri,))
135 dl = self.clients[1].getServiceNamed("downloader")
137 d.addCallback(_upload_done)
139 def _upload_again(res):
140 # Upload again. If using convergent encryption then this ought to be
141 # short-circuited, however with the way we currently generate URIs
142 # (i.e. because they include the roothash), we have to do all of the
143 # encoding work, and only get to save on the upload part.
144 log.msg("UPLOADING AGAIN")
145 up = upload.Data(DATA, convergence=convergence)
146 up.max_segment_size = 1024
147 d1 = self.uploader.upload(up)
148 d.addCallback(_upload_again)
150 def _download_to_data(res):
151 log.msg("DOWNLOADING")
152 return self.downloader.download_to_data(self.uri)
153 d.addCallback(_download_to_data)
154 def _download_to_data_done(data):
155 log.msg("download finished")
156 self.failUnlessEqual(data, DATA)
157 d.addCallback(_download_to_data_done)
159 target_filename = os.path.join(self.basedir, "download.target")
160 def _download_to_filename(res):
161 return self.downloader.download_to_filename(self.uri,
163 d.addCallback(_download_to_filename)
164 def _download_to_filename_done(res):
165 newdata = open(target_filename, "rb").read()
166 self.failUnlessEqual(newdata, DATA)
167 d.addCallback(_download_to_filename_done)
169 target_filename2 = os.path.join(self.basedir, "download.target2")
170 def _download_to_filehandle(res):
171 fh = open(target_filename2, "wb")
172 return self.downloader.download_to_filehandle(self.uri, fh)
173 d.addCallback(_download_to_filehandle)
174 def _download_to_filehandle_done(fh):
176 newdata = open(target_filename2, "rb").read()
177 self.failUnlessEqual(newdata, DATA)
178 d.addCallback(_download_to_filehandle_done)
180 consumer = GrabEverythingConsumer()
181 ct = download.ConsumerAdapter(consumer)
182 d.addCallback(lambda res:
183 self.downloader.download(self.uri, ct))
184 def _download_to_consumer_done(ign):
185 self.failUnlessEqual(consumer.contents, DATA)
186 d.addCallback(_download_to_consumer_done)
188 def _download_nonexistent_uri(res):
189 baduri = self.mangle_uri(self.uri)
190 log.msg("about to download non-existent URI", level=log.UNUSUAL,
191 facility="tahoe.tests")
192 d1 = self.downloader.download_to_data(baduri)
193 def _baduri_should_fail(res):
194 log.msg("finished downloading non-existend URI",
195 level=log.UNUSUAL, facility="tahoe.tests")
196 self.failUnless(isinstance(res, Failure))
197 self.failUnless(res.check(download.NotEnoughSharesError),
198 "expected NotEnoughSharesError, got %s" % res)
199 # TODO: files that have zero peers should get a special kind
200 # of NotEnoughSharesError, which can be used to suggest that
201 # the URI might be wrong or that they've never uploaded the
202 # file in the first place.
203 d1.addBoth(_baduri_should_fail)
205 d.addCallback(_download_nonexistent_uri)
207 # add a new node, which doesn't accept shares, and only uses the
209 d.addCallback(lambda res: self.add_extra_node(self.numclients,
211 add_to_sparent=True))
212 def _added(extra_node):
213 self.extra_node = extra_node
214 extra_node.getServiceNamed("storage").sizelimit = 0
215 d.addCallback(_added)
217 HELPER_DATA = "Data that needs help to upload" * 1000
218 def _upload_with_helper(res):
219 u = upload.Data(HELPER_DATA, convergence=convergence)
220 d = self.extra_node.upload(u)
221 def _uploaded(results):
223 return self.downloader.download_to_data(uri)
224 d.addCallback(_uploaded)
226 self.failUnlessEqual(newdata, HELPER_DATA)
227 d.addCallback(_check)
229 d.addCallback(_upload_with_helper)
231 def _upload_duplicate_with_helper(res):
232 u = upload.Data(HELPER_DATA, convergence=convergence)
233 u.debug_stash_RemoteEncryptedUploadable = True
234 d = self.extra_node.upload(u)
235 def _uploaded(results):
237 return self.downloader.download_to_data(uri)
238 d.addCallback(_uploaded)
240 self.failUnlessEqual(newdata, HELPER_DATA)
241 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
242 "uploadable started uploading, should have been avoided")
243 d.addCallback(_check)
245 if convergence is not None:
246 d.addCallback(_upload_duplicate_with_helper)
248 def _upload_resumable(res):
249 DATA = "Data that needs help to upload and gets interrupted" * 1000
250 u1 = CountingDataUploadable(DATA, convergence=convergence)
251 u2 = CountingDataUploadable(DATA, convergence=convergence)
253 # we interrupt the connection after about 5kB by shutting down
254 # the helper, then restartingit.
255 u1.interrupt_after = 5000
256 u1.interrupt_after_d = defer.Deferred()
257 u1.interrupt_after_d.addCallback(lambda res:
258 self.bounce_client(0))
260 # sneak into the helper and reduce its chunk size, so that our
261 # debug_interrupt will sever the connection on about the fifth
262 # chunk fetched. This makes sure that we've started to write the
263 # new shares before we abandon them, which exercises the
264 # abort/delete-partial-share code. TODO: find a cleaner way to do
265 # this. I know that this will affect later uses of the helper in
266 # this same test run, but I'm not currently worried about it.
267 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
269 d = self.extra_node.upload(u1)
271 def _should_not_finish(res):
272 self.fail("interrupted upload should have failed, not finished"
273 " with result %s" % (res,))
275 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
277 # make sure we actually interrupted it before finishing the
279 self.failUnless(u1.bytes_read < len(DATA),
280 "read %d out of %d total" % (u1.bytes_read,
283 log.msg("waiting for reconnect", level=log.NOISY,
284 facility="tahoe.test.test_system")
285 # now, we need to give the nodes a chance to notice that this
286 # connection has gone away. When this happens, the storage
287 # servers will be told to abort their uploads, removing the
288 # partial shares. Unfortunately this involves TCP messages
289 # going through the loopback interface, and we can't easily
290 # predict how long that will take. If it were all local, we
291 # could use fireEventually() to stall. Since we don't have
292 # the right introduction hooks, the best we can do is use a
293 # fixed delay. TODO: this is fragile.
294 u1.interrupt_after_d.addCallback(self.stall, 2.0)
295 return u1.interrupt_after_d
296 d.addCallbacks(_should_not_finish, _interrupted)
298 def _disconnected(res):
299 # check to make sure the storage servers aren't still hanging
300 # on to the partial share: their incoming/ directories should
302 log.msg("disconnected", level=log.NOISY,
303 facility="tahoe.test.test_system")
304 for i in range(self.numclients):
305 incdir = os.path.join(self.getdir("client%d" % i),
306 "storage", "shares", "incoming")
307 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
308 d.addCallback(_disconnected)
310 # then we need to give the reconnector a chance to
311 # reestablish the connection to the helper.
312 d.addCallback(lambda res:
313 log.msg("wait_for_connections", level=log.NOISY,
314 facility="tahoe.test.test_system"))
315 d.addCallback(lambda res: self.wait_for_connections())
318 d.addCallback(lambda res:
319 log.msg("uploading again", level=log.NOISY,
320 facility="tahoe.test.test_system"))
321 d.addCallback(lambda res: self.extra_node.upload(u2))
323 def _uploaded(results):
325 log.msg("Second upload complete", level=log.NOISY,
326 facility="tahoe.test.test_system")
328 # this is really bytes received rather than sent, but it's
329 # convenient and basically measures the same thing
330 bytes_sent = results.ciphertext_fetched
332 # We currently don't support resumption of upload if the data is
333 # encrypted with a random key. (Because that would require us
334 # to store the key locally and re-use it on the next upload of
335 # this file, which isn't a bad thing to do, but we currently
337 if convergence is not None:
338 # Make sure we did not have to read the whole file the
339 # second time around .
340 self.failUnless(bytes_sent < len(DATA),
341 "resumption didn't save us any work:"
342 " read %d bytes out of %d total" %
343 (bytes_sent, len(DATA)))
345 # Make sure we did have to read the whole file the second
346 # time around -- because the one that we partially uploaded
347 # earlier was encrypted with a different random key.
348 self.failIf(bytes_sent < len(DATA),
349 "resumption saved us some work even though we were using random keys:"
350 " read %d bytes out of %d total" %
351 (bytes_sent, len(DATA)))
352 return self.downloader.download_to_data(uri)
353 d.addCallback(_uploaded)
356 self.failUnlessEqual(newdata, DATA)
357 # If using convergent encryption, then also check that the
358 # helper has removed the temp file from its directories.
359 if convergence is not None:
360 basedir = os.path.join(self.getdir("client0"), "helper")
361 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
362 self.failUnlessEqual(files, [])
363 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
364 self.failUnlessEqual(files, [])
365 d.addCallback(_check)
367 d.addCallback(_upload_resumable)
371 def _find_shares(self, basedir):
373 for (dirpath, dirnames, filenames) in os.walk(basedir):
374 if "storage" not in dirpath:
378 pieces = dirpath.split(os.sep)
379 if pieces[-4] == "storage" and pieces[-3] == "shares":
380 # we're sitting in .../storage/shares/$START/$SINDEX , and there
381 # are sharefiles here
382 assert pieces[-5].startswith("client")
383 client_num = int(pieces[-5][-1])
384 storage_index_s = pieces[-1]
385 storage_index = storage.si_a2b(storage_index_s)
386 for sharename in filenames:
387 shnum = int(sharename)
388 filename = os.path.join(dirpath, sharename)
389 data = (client_num, storage_index, filename, shnum)
392 self.fail("unable to find any share files in %s" % basedir)
395 def _corrupt_mutable_share(self, filename, which):
396 msf = storage.MutableShareFile(filename)
397 datav = msf.readv([ (0, 1000000) ])
398 final_share = datav[0]
399 assert len(final_share) < 1000000 # ought to be truncated
400 pieces = mutable_layout.unpack_share(final_share)
401 (seqnum, root_hash, IV, k, N, segsize, datalen,
402 verification_key, signature, share_hash_chain, block_hash_tree,
403 share_data, enc_privkey) = pieces
405 if which == "seqnum":
408 root_hash = self.flip_bit(root_hash)
410 IV = self.flip_bit(IV)
411 elif which == "segsize":
412 segsize = segsize + 15
413 elif which == "pubkey":
414 verification_key = self.flip_bit(verification_key)
415 elif which == "signature":
416 signature = self.flip_bit(signature)
417 elif which == "share_hash_chain":
418 nodenum = share_hash_chain.keys()[0]
419 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
420 elif which == "block_hash_tree":
421 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
422 elif which == "share_data":
423 share_data = self.flip_bit(share_data)
424 elif which == "encprivkey":
425 enc_privkey = self.flip_bit(enc_privkey)
427 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
429 final_share = mutable_layout.pack_share(prefix,
436 msf.writev( [(0, final_share)], None)
439 def test_mutable(self):
440 self.basedir = "system/SystemTest/test_mutable"
441 DATA = "initial contents go here." # 25 bytes % 3 != 0
442 NEWDATA = "new contents yay"
443 NEWERDATA = "this is getting old"
445 d = self.set_up_nodes(use_key_generator=True)
447 def _create_mutable(res):
449 log.msg("starting create_mutable_file")
450 d1 = c.create_mutable_file(DATA)
452 log.msg("DONE: %s" % (res,))
453 self._mutable_node_1 = res
455 d1.addCallback(_done)
457 d.addCallback(_create_mutable)
459 def _test_debug(res):
460 # find a share. It is important to run this while there is only
461 # one slot in the grid.
462 shares = self._find_shares(self.basedir)
463 (client_num, storage_index, filename, shnum) = shares[0]
464 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
466 log.msg(" for clients[%d]" % client_num)
468 out,err = StringIO(), StringIO()
469 rc = runner.runner(["debug", "dump-share", "--offsets",
471 stdout=out, stderr=err)
472 output = out.getvalue()
473 self.failUnlessEqual(rc, 0)
475 self.failUnless("Mutable slot found:\n" in output)
476 self.failUnless("share_type: SDMF\n" in output)
477 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
478 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
479 self.failUnless(" num_extra_leases: 0\n" in output)
480 # the pubkey size can vary by a byte, so the container might
481 # be a bit larger on some runs.
482 m = re.search(r'^ container_size: (\d+)$', output, re.M)
484 container_size = int(m.group(1))
485 self.failUnless(2037 <= container_size <= 2049, container_size)
486 m = re.search(r'^ data_length: (\d+)$', output, re.M)
488 data_length = int(m.group(1))
489 self.failUnless(2037 <= data_length <= 2049, data_length)
490 self.failUnless(" secrets are for nodeid: %s\n" % peerid
492 self.failUnless(" SDMF contents:\n" in output)
493 self.failUnless(" seqnum: 1\n" in output)
494 self.failUnless(" required_shares: 3\n" in output)
495 self.failUnless(" total_shares: 10\n" in output)
496 self.failUnless(" segsize: 27\n" in output, (output, filename))
497 self.failUnless(" datalen: 25\n" in output)
498 # the exact share_hash_chain nodes depends upon the sharenum,
499 # and is more of a hassle to compute than I want to deal with
501 self.failUnless(" share_hash_chain: " in output)
502 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
503 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
504 base32.b2a(storage_index))
505 self.failUnless(expected in output)
506 except unittest.FailTest:
508 print "dump-share output was:"
511 d.addCallback(_test_debug)
515 # first, let's see if we can use the existing node to retrieve the
516 # contents. This allows it to use the cached pubkey and maybe the
517 # latest-known sharemap.
519 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
520 def _check_download_1(res):
521 self.failUnlessEqual(res, DATA)
522 # now we see if we can retrieve the data from a new node,
523 # constructed using the URI of the original one. We do this test
524 # on the same client that uploaded the data.
525 uri = self._mutable_node_1.get_uri()
526 log.msg("starting retrieve1")
527 newnode = self.clients[0].create_node_from_uri(uri)
528 newnode_2 = self.clients[0].create_node_from_uri(uri)
529 self.failUnlessIdentical(newnode, newnode_2)
530 return newnode.download_best_version()
531 d.addCallback(_check_download_1)
533 def _check_download_2(res):
534 self.failUnlessEqual(res, DATA)
535 # same thing, but with a different client
536 uri = self._mutable_node_1.get_uri()
537 newnode = self.clients[1].create_node_from_uri(uri)
538 log.msg("starting retrieve2")
539 d1 = newnode.download_best_version()
540 d1.addCallback(lambda res: (res, newnode))
542 d.addCallback(_check_download_2)
544 def _check_download_3((res, newnode)):
545 self.failUnlessEqual(res, DATA)
547 log.msg("starting replace1")
548 d1 = newnode.overwrite(NEWDATA)
549 d1.addCallback(lambda res: newnode.download_best_version())
551 d.addCallback(_check_download_3)
553 def _check_download_4(res):
554 self.failUnlessEqual(res, NEWDATA)
555 # now create an even newer node and replace the data on it. This
556 # new node has never been used for download before.
557 uri = self._mutable_node_1.get_uri()
558 newnode1 = self.clients[2].create_node_from_uri(uri)
559 newnode2 = self.clients[3].create_node_from_uri(uri)
560 self._newnode3 = self.clients[3].create_node_from_uri(uri)
561 log.msg("starting replace2")
562 d1 = newnode1.overwrite(NEWERDATA)
563 d1.addCallback(lambda res: newnode2.download_best_version())
565 d.addCallback(_check_download_4)
567 def _check_download_5(res):
568 log.msg("finished replace2")
569 self.failUnlessEqual(res, NEWERDATA)
570 d.addCallback(_check_download_5)
572 def _corrupt_shares(res):
573 # run around and flip bits in all but k of the shares, to test
575 shares = self._find_shares(self.basedir)
576 ## sort by share number
577 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
578 where = dict([ (shnum, filename)
579 for (client_num, storage_index, filename, shnum)
581 assert len(where) == 10 # this test is designed for 3-of-10
582 for shnum, filename in where.items():
583 # shares 7,8,9 are left alone. read will check
584 # (share_hash_chain, block_hash_tree, share_data). New
585 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
586 # segsize, signature).
588 # read: this will trigger "pubkey doesn't match
590 self._corrupt_mutable_share(filename, "pubkey")
591 self._corrupt_mutable_share(filename, "encprivkey")
593 # triggers "signature is invalid"
594 self._corrupt_mutable_share(filename, "seqnum")
596 # triggers "signature is invalid"
597 self._corrupt_mutable_share(filename, "R")
599 # triggers "signature is invalid"
600 self._corrupt_mutable_share(filename, "segsize")
602 self._corrupt_mutable_share(filename, "share_hash_chain")
604 self._corrupt_mutable_share(filename, "block_hash_tree")
606 self._corrupt_mutable_share(filename, "share_data")
607 # other things to correct: IV, signature
608 # 7,8,9 are left alone
610 # note that initial_query_count=5 means that we'll hit the
611 # first 5 servers in effectively random order (based upon
612 # response time), so we won't necessarily ever get a "pubkey
613 # doesn't match fingerprint" error (if we hit shnum>=1 before
614 # shnum=0, we pull the pubkey from there). To get repeatable
615 # specific failures, we need to set initial_query_count=1,
616 # but of course that will change the sequencing behavior of
617 # the retrieval process. TODO: find a reasonable way to make
618 # this a parameter, probably when we expand this test to test
619 # for one failure mode at a time.
621 # when we retrieve this, we should get three signature
622 # failures (where we've mangled seqnum, R, and segsize). The
624 d.addCallback(_corrupt_shares)
626 d.addCallback(lambda res: self._newnode3.download_best_version())
627 d.addCallback(_check_download_5)
629 def _check_empty_file(res):
630 # make sure we can create empty files, this usually screws up the
632 d1 = self.clients[2].create_mutable_file("")
633 d1.addCallback(lambda newnode: newnode.download_best_version())
634 d1.addCallback(lambda res: self.failUnlessEqual("", res))
636 d.addCallback(_check_empty_file)
638 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
639 def _created_dirnode(dnode):
640 log.msg("_created_dirnode(%s)" % (dnode,))
642 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
643 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
644 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
645 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
646 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
647 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
648 d1.addCallback(lambda res: dnode.build_manifest().when_done())
649 d1.addCallback(lambda manifest:
650 self.failUnlessEqual(len(manifest), 1))
652 d.addCallback(_created_dirnode)
654 def wait_for_c3_kg_conn():
655 return self.clients[3]._key_generator is not None
656 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
658 def check_kg_poolsize(junk, size_delta):
659 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
660 self.key_generator_svc.key_generator.pool_size + size_delta)
662 d.addCallback(check_kg_poolsize, 0)
663 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
664 d.addCallback(check_kg_poolsize, -1)
665 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
666 d.addCallback(check_kg_poolsize, -2)
667 # use_helper induces use of clients[3], which is the using-key_gen client
668 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
669 d.addCallback(check_kg_poolsize, -3)
672 # The default 120 second timeout went off when running it under valgrind
673 # on my old Windows laptop, so I'm bumping up the timeout.
674 test_mutable.timeout = 240
676 def flip_bit(self, good):
677 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
679 def mangle_uri(self, gooduri):
680 # change the key, which changes the storage index, which means we'll
681 # be asking about the wrong file, so nobody will have any shares
682 u = IFileURI(gooduri)
683 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
684 uri_extension_hash=u.uri_extension_hash,
685 needed_shares=u.needed_shares,
686 total_shares=u.total_shares,
688 return u2.to_string()
690 # TODO: add a test which mangles the uri_extension_hash instead, and
691 # should fail due to not being able to get a valid uri_extension block.
692 # Also a test which sneakily mangles the uri_extension block to change
693 # some of the validation data, so it will fail in the post-download phase
694 # when the file's crypttext integrity check fails. Do the same thing for
695 # the key, which should cause the download to fail the post-download
696 # plaintext_hash check.
698 def test_vdrive(self):
699 self.basedir = "system/SystemTest/test_vdrive"
700 self.data = LARGE_DATA
701 d = self.set_up_nodes(use_stats_gatherer=True)
702 d.addCallback(self._test_introweb)
703 d.addCallback(self.log, "starting publish")
704 d.addCallback(self._do_publish1)
705 d.addCallback(self._test_runner)
706 d.addCallback(self._do_publish2)
707 # at this point, we have the following filesystem (where "R" denotes
708 # self._root_directory_uri):
711 # R/subdir1/mydata567
713 # R/subdir1/subdir2/mydata992
715 d.addCallback(lambda res: self.bounce_client(0))
716 d.addCallback(self.log, "bounced client0")
718 d.addCallback(self._check_publish1)
719 d.addCallback(self.log, "did _check_publish1")
720 d.addCallback(self._check_publish2)
721 d.addCallback(self.log, "did _check_publish2")
722 d.addCallback(self._do_publish_private)
723 d.addCallback(self.log, "did _do_publish_private")
724 # now we also have (where "P" denotes a new dir):
725 # P/personal/sekrit data
726 # P/s2-rw -> /subdir1/subdir2/
727 # P/s2-ro -> /subdir1/subdir2/ (read-only)
728 d.addCallback(self._check_publish_private)
729 d.addCallback(self.log, "did _check_publish_private")
730 d.addCallback(self._test_web)
731 d.addCallback(self._test_control)
732 d.addCallback(self._test_cli)
733 # P now has four top-level children:
734 # P/personal/sekrit data
737 # P/test_put/ (empty)
738 d.addCallback(self._test_checker)
739 d.addCallback(self._grab_stats)
741 test_vdrive.timeout = 1100
743 def _test_introweb(self, res):
744 d = getPage(self.introweb_url, method="GET", followRedirect=True)
747 self.failUnless("allmydata: %s" % str(allmydata.__version__)
749 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
750 self.failUnless("Subscription Summary: storage: 5" in res)
751 except unittest.FailTest:
753 print "GET %s output was:" % self.introweb_url
756 d.addCallback(_check)
757 d.addCallback(lambda res:
758 getPage(self.introweb_url + "?t=json",
759 method="GET", followRedirect=True))
760 def _check_json(res):
761 data = simplejson.loads(res)
763 self.failUnlessEqual(data["subscription_summary"],
765 self.failUnlessEqual(data["announcement_summary"],
766 {"storage": 5, "stub_client": 5})
767 except unittest.FailTest:
769 print "GET %s?t=json output was:" % self.introweb_url
772 d.addCallback(_check_json)
775 def _do_publish1(self, res):
776 ut = upload.Data(self.data, convergence=None)
778 d = c0.create_empty_dirnode()
779 def _made_root(new_dirnode):
780 self._root_directory_uri = new_dirnode.get_uri()
781 return c0.create_node_from_uri(self._root_directory_uri)
782 d.addCallback(_made_root)
783 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
784 def _made_subdir1(subdir1_node):
785 self._subdir1_node = subdir1_node
786 d1 = subdir1_node.add_file(u"mydata567", ut)
787 d1.addCallback(self.log, "publish finished")
788 def _stash_uri(filenode):
789 self.uri = filenode.get_uri()
790 d1.addCallback(_stash_uri)
792 d.addCallback(_made_subdir1)
795 def _do_publish2(self, res):
796 ut = upload.Data(self.data, convergence=None)
797 d = self._subdir1_node.create_empty_directory(u"subdir2")
798 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
801 def log(self, res, *args, **kwargs):
802 # print "MSG: %s RES: %s" % (msg, args)
803 log.msg(*args, **kwargs)
806 def _do_publish_private(self, res):
807 self.smalldata = "sssh, very secret stuff"
808 ut = upload.Data(self.smalldata, convergence=None)
809 d = self.clients[0].create_empty_dirnode()
810 d.addCallback(self.log, "GOT private directory")
811 def _got_new_dir(privnode):
812 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
813 d1 = privnode.create_empty_directory(u"personal")
814 d1.addCallback(self.log, "made P/personal")
815 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
816 d1.addCallback(self.log, "made P/personal/sekrit data")
817 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
819 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
820 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
822 d1.addCallback(_got_s2)
823 d1.addCallback(lambda res: privnode)
825 d.addCallback(_got_new_dir)
828 def _check_publish1(self, res):
829 # this one uses the iterative API
831 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
832 d.addCallback(self.log, "check_publish1 got /")
833 d.addCallback(lambda root: root.get(u"subdir1"))
834 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
835 d.addCallback(lambda filenode: filenode.download_to_data())
836 d.addCallback(self.log, "get finished")
838 self.failUnlessEqual(data, self.data)
839 d.addCallback(_get_done)
842 def _check_publish2(self, res):
843 # this one uses the path-based API
844 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
845 d = rootnode.get_child_at_path(u"subdir1")
846 d.addCallback(lambda dirnode:
847 self.failUnless(IDirectoryNode.providedBy(dirnode)))
848 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
849 d.addCallback(lambda filenode: filenode.download_to_data())
850 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
852 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
853 def _got_filenode(filenode):
854 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
855 assert fnode == filenode
856 d.addCallback(_got_filenode)
859 def _check_publish_private(self, resnode):
860 # this one uses the path-based API
861 self._private_node = resnode
863 d = self._private_node.get_child_at_path(u"personal")
864 def _got_personal(personal):
865 self._personal_node = personal
867 d.addCallback(_got_personal)
869 d.addCallback(lambda dirnode:
870 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
872 return self._private_node.get_child_at_path(path)
874 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
875 d.addCallback(lambda filenode: filenode.download_to_data())
876 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
877 d.addCallback(lambda res: get_path(u"s2-rw"))
878 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
879 d.addCallback(lambda res: get_path(u"s2-ro"))
880 def _got_s2ro(dirnode):
881 self.failUnless(dirnode.is_mutable(), dirnode)
882 self.failUnless(dirnode.is_readonly(), dirnode)
883 d1 = defer.succeed(None)
884 d1.addCallback(lambda res: dirnode.list())
885 d1.addCallback(self.log, "dirnode.list")
887 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
889 d1.addCallback(self.log, "doing add_file(ro)")
890 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
891 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
893 d1.addCallback(self.log, "doing get(ro)")
894 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
895 d1.addCallback(lambda filenode:
896 self.failUnless(IFileNode.providedBy(filenode)))
898 d1.addCallback(self.log, "doing delete(ro)")
899 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
901 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
903 d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
905 personal = self._personal_node
906 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
908 d1.addCallback(self.log, "doing move_child_to(ro)2")
909 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
911 d1.addCallback(self.log, "finished with _got_s2ro")
913 d.addCallback(_got_s2ro)
914 def _got_home(dummy):
915 home = self._private_node
916 personal = self._personal_node
917 d1 = defer.succeed(None)
918 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
919 d1.addCallback(lambda res:
920 personal.move_child_to(u"sekrit data",home,u"sekrit"))
922 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
923 d1.addCallback(lambda res:
924 home.move_child_to(u"sekrit", home, u"sekrit data"))
926 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
927 d1.addCallback(lambda res:
928 home.move_child_to(u"sekrit data", personal))
930 d1.addCallback(lambda res: home.build_manifest().when_done())
931 d1.addCallback(self.log, "manifest")
935 # P/personal/sekrit data
936 # P/s2-rw (same as P/s2-ro)
937 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
938 d1.addCallback(lambda manifest:
939 self.failUnlessEqual(len(manifest), 5))
940 d1.addCallback(lambda res: home.start_deep_stats().when_done())
941 def _check_stats(stats):
942 expected = {"count-immutable-files": 1,
943 "count-mutable-files": 0,
944 "count-literal-files": 1,
946 "count-directories": 3,
947 "size-immutable-files": 112,
948 "size-literal-files": 23,
949 #"size-directories": 616, # varies
950 #"largest-directory": 616,
951 "largest-directory-children": 3,
952 "largest-immutable-file": 112,
954 for k,v in expected.iteritems():
955 self.failUnlessEqual(stats[k], v,
956 "stats[%s] was %s, not %s" %
958 self.failUnless(stats["size-directories"] > 1300,
959 stats["size-directories"])
960 self.failUnless(stats["largest-directory"] > 800,
961 stats["largest-directory"])
962 self.failUnlessEqual(stats["size-files-histogram"],
963 [ (11, 31, 1), (101, 316, 1) ])
964 d1.addCallback(_check_stats)
966 d.addCallback(_got_home)
969 def shouldFail(self, res, expected_failure, which, substring=None):
970 if isinstance(res, Failure):
971 res.trap(expected_failure)
973 self.failUnless(substring in str(res),
974 "substring '%s' not in '%s'"
975 % (substring, str(res)))
977 self.fail("%s was supposed to raise %s, not get '%s'" %
978 (which, expected_failure, res))
980 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
981 assert substring is None or isinstance(substring, str)
982 d = defer.maybeDeferred(callable, *args, **kwargs)
984 if isinstance(res, Failure):
985 res.trap(expected_failure)
987 self.failUnless(substring in str(res),
988 "substring '%s' not in '%s'"
989 % (substring, str(res)))
991 self.fail("%s was supposed to raise %s, not get '%s'" %
992 (which, expected_failure, res))
996 def PUT(self, urlpath, data):
997 url = self.webish_url + urlpath
998 return getPage(url, method="PUT", postdata=data)
1000 def GET(self, urlpath, followRedirect=False):
1001 url = self.webish_url + urlpath
1002 return getPage(url, method="GET", followRedirect=followRedirect)
1004 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1006 url = self.helper_webish_url + urlpath
1008 url = self.webish_url + urlpath
1009 sepbase = "boogabooga"
1010 sep = "--" + sepbase
1013 form.append('Content-Disposition: form-data; name="_charset"')
1015 form.append('UTF-8')
1017 for name, value in fields.iteritems():
1018 if isinstance(value, tuple):
1019 filename, value = value
1020 form.append('Content-Disposition: form-data; name="%s"; '
1021 'filename="%s"' % (name, filename.encode("utf-8")))
1023 form.append('Content-Disposition: form-data; name="%s"' % name)
1025 form.append(str(value))
1028 body = "\r\n".join(form) + "\r\n"
1029 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1031 return getPage(url, method="POST", postdata=body,
1032 headers=headers, followRedirect=followRedirect)
1034 def _test_web(self, res):
1035 base = self.webish_url
1036 public = "uri/" + self._root_directory_uri
1038 def _got_welcome(page):
1039 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1040 self.failUnless(expected in page,
1041 "I didn't see the right 'connected storage servers'"
1042 " message in: %s" % page
1044 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1045 self.failUnless(expected in page,
1046 "I didn't see the right 'My nodeid' message "
1048 self.failUnless("Helper: 0 active uploads" in page)
1049 d.addCallback(_got_welcome)
1050 d.addCallback(self.log, "done with _got_welcome")
1052 # get the welcome page from the node that uses the helper too
1053 d.addCallback(lambda res: getPage(self.helper_webish_url))
1054 def _got_welcome_helper(page):
1055 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1057 self.failUnless("Not running helper" in page)
1058 d.addCallback(_got_welcome_helper)
1060 d.addCallback(lambda res: getPage(base + public))
1061 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1062 def _got_subdir1(page):
1063 # there ought to be an href for our file
1064 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1065 self.failUnless(">mydata567</a>" in page)
1066 d.addCallback(_got_subdir1)
1067 d.addCallback(self.log, "done with _got_subdir1")
1068 d.addCallback(lambda res:
1069 getPage(base + public + "/subdir1/mydata567"))
1070 def _got_data(page):
1071 self.failUnlessEqual(page, self.data)
1072 d.addCallback(_got_data)
1074 # download from a URI embedded in a URL
1075 d.addCallback(self.log, "_get_from_uri")
1076 def _get_from_uri(res):
1077 return getPage(base + "uri/%s?filename=%s"
1078 % (self.uri, "mydata567"))
1079 d.addCallback(_get_from_uri)
1080 def _got_from_uri(page):
1081 self.failUnlessEqual(page, self.data)
1082 d.addCallback(_got_from_uri)
1084 # download from a URI embedded in a URL, second form
1085 d.addCallback(self.log, "_get_from_uri2")
1086 def _get_from_uri2(res):
1087 return getPage(base + "uri?uri=%s" % (self.uri,))
1088 d.addCallback(_get_from_uri2)
1089 d.addCallback(_got_from_uri)
1091 # download from a bogus URI, make sure we get a reasonable error
1092 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1093 def _get_from_bogus_uri(res):
1094 d1 = getPage(base + "uri/%s?filename=%s"
1095 % (self.mangle_uri(self.uri), "mydata567"))
1096 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1099 d.addCallback(_get_from_bogus_uri)
1100 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1102 # upload a file with PUT
1103 d.addCallback(self.log, "about to try PUT")
1104 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1105 "new.txt contents"))
1106 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1107 d.addCallback(self.failUnlessEqual, "new.txt contents")
1108 # and again with something large enough to use multiple segments,
1109 # and hopefully trigger pauseProducing too
1110 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1111 "big" * 500000)) # 1.5MB
1112 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1113 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1115 # can we replace files in place?
1116 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1118 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1119 d.addCallback(self.failUnlessEqual, "NEWER contents")
1121 # test unlinked POST
1122 d.addCallback(lambda res: self.POST("uri", t="upload",
1123 file=("new.txt", "data" * 10000)))
1124 # and again using the helper, which exercises different upload-status
1126 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1127 file=("foo.txt", "data2" * 10000)))
1129 # check that the status page exists
1130 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1131 def _got_status(res):
1132 # find an interesting upload and download to look at. LIT files
1133 # are not interesting.
1134 for ds in self.clients[0].list_all_download_statuses():
1135 if ds.get_size() > 200:
1136 self._down_status = ds.get_counter()
1137 for us in self.clients[0].list_all_upload_statuses():
1138 if us.get_size() > 200:
1139 self._up_status = us.get_counter()
1140 rs = self.clients[0].list_all_retrieve_statuses()[0]
1141 self._retrieve_status = rs.get_counter()
1142 ps = self.clients[0].list_all_publish_statuses()[0]
1143 self._publish_status = ps.get_counter()
1144 us = self.clients[0].list_all_mapupdate_statuses()[0]
1145 self._update_status = us.get_counter()
1147 # and that there are some upload- and download- status pages
1148 return self.GET("status/up-%d" % self._up_status)
1149 d.addCallback(_got_status)
1151 return self.GET("status/down-%d" % self._down_status)
1152 d.addCallback(_got_up)
1154 return self.GET("status/mapupdate-%d" % self._update_status)
1155 d.addCallback(_got_down)
1156 def _got_update(res):
1157 return self.GET("status/publish-%d" % self._publish_status)
1158 d.addCallback(_got_update)
1159 def _got_publish(res):
1160 return self.GET("status/retrieve-%d" % self._retrieve_status)
1161 d.addCallback(_got_publish)
1163 # check that the helper status page exists
1164 d.addCallback(lambda res:
1165 self.GET("helper_status", followRedirect=True))
1166 def _got_helper_status(res):
1167 self.failUnless("Bytes Fetched:" in res)
1168 # touch a couple of files in the helper's working directory to
1169 # exercise more code paths
1170 workdir = os.path.join(self.getdir("client0"), "helper")
1171 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1172 f = open(incfile, "wb")
1173 f.write("small file")
1175 then = time.time() - 86400*3
1177 os.utime(incfile, (now, then))
1178 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1179 f = open(encfile, "wb")
1180 f.write("less small file")
1182 os.utime(encfile, (now, then))
1183 d.addCallback(_got_helper_status)
1184 # and that the json form exists
1185 d.addCallback(lambda res:
1186 self.GET("helper_status?t=json", followRedirect=True))
1187 def _got_helper_status_json(res):
1188 data = simplejson.loads(res)
1189 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1191 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1192 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1193 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1195 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1196 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1197 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1199 d.addCallback(_got_helper_status_json)
1201 # and check that client[3] (which uses a helper but does not run one
1202 # itself) doesn't explode when you ask for its status
1203 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1204 def _got_non_helper_status(res):
1205 self.failUnless("Upload and Download Status" in res)
1206 d.addCallback(_got_non_helper_status)
1208 # or for helper status with t=json
1209 d.addCallback(lambda res:
1210 getPage(self.helper_webish_url + "helper_status?t=json"))
1211 def _got_non_helper_status_json(res):
1212 data = simplejson.loads(res)
1213 self.failUnlessEqual(data, {})
1214 d.addCallback(_got_non_helper_status_json)
1216 # see if the statistics page exists
1217 d.addCallback(lambda res: self.GET("statistics"))
1218 def _got_stats(res):
1219 self.failUnless("Node Statistics" in res)
1220 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1221 d.addCallback(_got_stats)
1222 d.addCallback(lambda res: self.GET("statistics?t=json"))
1223 def _got_stats_json(res):
1224 data = simplejson.loads(res)
1225 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1226 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1227 d.addCallback(_got_stats_json)
1229 # TODO: mangle the second segment of a file, to test errors that
1230 # occur after we've already sent some good data, which uses a
1231 # different error path.
1233 # TODO: download a URI with a form
1234 # TODO: create a directory by using a form
1235 # TODO: upload by using a form on the directory page
1236 # url = base + "somedir/subdir1/freeform_post!!upload"
1237 # TODO: delete a file by using a button on the directory page
1241 def _test_runner(self, res):
1242 # exercise some of the diagnostic tools in runner.py
1245 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1246 if "storage" not in dirpath:
1250 pieces = dirpath.split(os.sep)
1251 if pieces[-4] == "storage" and pieces[-3] == "shares":
1252 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1253 # are sharefiles here
1254 filename = os.path.join(dirpath, filenames[0])
1255 # peek at the magic to see if it is a chk share
1256 magic = open(filename, "rb").read(4)
1257 if magic == '\x00\x00\x00\x01':
1260 self.fail("unable to find any uri_extension files in %s"
1262 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1264 out,err = StringIO(), StringIO()
1265 rc = runner.runner(["debug", "dump-share", "--offsets",
1267 stdout=out, stderr=err)
1268 output = out.getvalue()
1269 self.failUnlessEqual(rc, 0)
1271 # we only upload a single file, so we can assert some things about
1272 # its size and shares.
1273 self.failUnless(("share filename: %s" % filename) in output)
1274 self.failUnless("size: %d\n" % len(self.data) in output)
1275 self.failUnless("num_segments: 1\n" in output)
1276 # segment_size is always a multiple of needed_shares
1277 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1278 self.failUnless("total_shares: 10\n" in output)
1279 # keys which are supposed to be present
1280 for key in ("size", "num_segments", "segment_size",
1281 "needed_shares", "total_shares",
1282 "codec_name", "codec_params", "tail_codec_params",
1283 #"plaintext_hash", "plaintext_root_hash",
1284 "crypttext_hash", "crypttext_root_hash",
1285 "share_root_hash", "UEB_hash"):
1286 self.failUnless("%s: " % key in output, key)
1287 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1289 # now use its storage index to find the other shares using the
1290 # 'find-shares' tool
1291 sharedir, shnum = os.path.split(filename)
1292 storagedir, storage_index_s = os.path.split(sharedir)
1293 out,err = StringIO(), StringIO()
1294 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1295 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1296 rc = runner.runner(cmd, stdout=out, stderr=err)
1297 self.failUnlessEqual(rc, 0)
1299 sharefiles = [sfn.strip() for sfn in out.readlines()]
1300 self.failUnlessEqual(len(sharefiles), 10)
1302 # also exercise the 'catalog-shares' tool
1303 out,err = StringIO(), StringIO()
1304 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1305 cmd = ["debug", "catalog-shares"] + nodedirs
1306 rc = runner.runner(cmd, stdout=out, stderr=err)
1307 self.failUnlessEqual(rc, 0)
1309 descriptions = [sfn.strip() for sfn in out.readlines()]
1310 self.failUnlessEqual(len(descriptions), 30)
1312 for line in descriptions
1313 if line.startswith("CHK %s " % storage_index_s)]
1314 self.failUnlessEqual(len(matching), 10)
1316 def _test_control(self, res):
1317 # exercise the remote-control-the-client foolscap interfaces in
1318 # allmydata.control (mostly used for performance tests)
1319 c0 = self.clients[0]
1320 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1321 control_furl = open(control_furl_file, "r").read().strip()
1322 # it doesn't really matter which Tub we use to connect to the client,
1323 # so let's just use our IntroducerNode's
1324 d = self.introducer.tub.getReference(control_furl)
1325 d.addCallback(self._test_control2, control_furl_file)
1327 def _test_control2(self, rref, filename):
1328 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1329 downfile = os.path.join(self.basedir, "control.downfile")
1330 d.addCallback(lambda uri:
1331 rref.callRemote("download_from_uri_to_file",
1334 self.failUnlessEqual(res, downfile)
1335 data = open(downfile, "r").read()
1336 expected_data = open(filename, "r").read()
1337 self.failUnlessEqual(data, expected_data)
1338 d.addCallback(_check)
1339 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1340 if sys.platform == "linux2":
1341 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1342 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1345 def _test_cli(self, res):
1346 # run various CLI commands (in a thread, since they use blocking
1349 private_uri = self._private_node.get_uri()
1350 some_uri = self._root_directory_uri
1351 client0_basedir = self.getdir("client0")
1354 "--node-directory", client0_basedir,
1356 TESTDATA = "I will not write the same thing over and over.\n" * 100
1358 d = defer.succeed(None)
1360 # for compatibility with earlier versions, private/root_dir.cap is
1361 # supposed to be treated as an alias named "tahoe:". Start by making
1362 # sure that works, before we add other aliases.
1364 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1365 f = open(root_file, "w")
1366 f.write(private_uri)
1369 def run(ignored, verb, *args, **kwargs):
1370 stdin = kwargs.get("stdin", "")
1371 newargs = [verb] + nodeargs + list(args)
1372 return self._run_cli(newargs, stdin=stdin)
1374 def _check_ls((out,err), expected_children, unexpected_children=[]):
1375 self.failUnlessEqual(err, "")
1376 for s in expected_children:
1377 self.failUnless(s in out, (s,out))
1378 for s in unexpected_children:
1379 self.failIf(s in out, (s,out))
1381 def _check_ls_root((out,err)):
1382 self.failUnless("personal" in out)
1383 self.failUnless("s2-ro" in out)
1384 self.failUnless("s2-rw" in out)
1385 self.failUnlessEqual(err, "")
1387 # this should reference private_uri
1388 d.addCallback(run, "ls")
1389 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1391 d.addCallback(run, "list-aliases")
1392 def _check_aliases_1((out,err)):
1393 self.failUnlessEqual(err, "")
1394 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1395 d.addCallback(_check_aliases_1)
1397 # now that that's out of the way, remove root_dir.cap and work with
1399 d.addCallback(lambda res: os.unlink(root_file))
1400 d.addCallback(run, "list-aliases")
1401 def _check_aliases_2((out,err)):
1402 self.failUnlessEqual(err, "")
1403 self.failUnlessEqual(out, "")
1404 d.addCallback(_check_aliases_2)
1406 d.addCallback(run, "mkdir")
1407 def _got_dir( (out,err) ):
1408 self.failUnless(uri.from_string_dirnode(out.strip()))
1410 d.addCallback(_got_dir)
1411 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1413 d.addCallback(run, "list-aliases")
1414 def _check_aliases_3((out,err)):
1415 self.failUnlessEqual(err, "")
1416 self.failUnless("tahoe: " in out)
1417 d.addCallback(_check_aliases_3)
1419 def _check_empty_dir((out,err)):
1420 self.failUnlessEqual(out, "")
1421 self.failUnlessEqual(err, "")
1422 d.addCallback(run, "ls")
1423 d.addCallback(_check_empty_dir)
1425 def _check_missing_dir((out,err)):
1426 # TODO: check that rc==2
1427 self.failUnlessEqual(out, "")
1428 self.failUnlessEqual(err, "No such file or directory\n")
1429 d.addCallback(run, "ls", "bogus")
1430 d.addCallback(_check_missing_dir)
1435 fn = os.path.join(self.basedir, "file%d" % i)
1437 data = "data to be uploaded: file%d\n" % i
1439 open(fn,"wb").write(data)
1441 def _check_stdout_against((out,err), filenum=None, data=None):
1442 self.failUnlessEqual(err, "")
1443 if filenum is not None:
1444 self.failUnlessEqual(out, datas[filenum])
1445 if data is not None:
1446 self.failUnlessEqual(out, data)
1448 # test all both forms of put: from a file, and from stdin
1450 d.addCallback(run, "put", files[0], "tahoe-file0")
1451 def _put_out((out,err)):
1452 self.failUnless("URI:LIT:" in out, out)
1453 self.failUnless("201 Created" in err, err)
1455 return run(None, "get", uri0)
1456 d.addCallback(_put_out)
1457 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1459 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1460 # tahoe put bar tahoe:FOO
1461 d.addCallback(run, "put", files[2], "tahoe:file2")
1462 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1463 def _check_put_mutable((out,err)):
1464 self._mutable_file3_uri = out.strip()
1465 d.addCallback(_check_put_mutable)
1466 d.addCallback(run, "get", "tahoe:file3")
1467 d.addCallback(_check_stdout_against, 3)
1470 STDIN_DATA = "This is the file to upload from stdin."
1471 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1472 # tahoe put tahoe:FOO
1473 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1474 stdin="Other file from stdin.")
1476 d.addCallback(run, "ls")
1477 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1478 "tahoe-file-stdin", "from-stdin"])
1479 d.addCallback(run, "ls", "subdir")
1480 d.addCallback(_check_ls, ["tahoe-file1"])
1483 d.addCallback(run, "mkdir", "subdir2")
1484 d.addCallback(run, "ls")
1485 # TODO: extract the URI, set an alias with it
1486 d.addCallback(_check_ls, ["subdir2"])
1488 # tahoe get: (to stdin and to a file)
1489 d.addCallback(run, "get", "tahoe-file0")
1490 d.addCallback(_check_stdout_against, 0)
1491 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1492 d.addCallback(_check_stdout_against, 1)
1493 outfile0 = os.path.join(self.basedir, "outfile0")
1494 d.addCallback(run, "get", "file2", outfile0)
1495 def _check_outfile0((out,err)):
1496 data = open(outfile0,"rb").read()
1497 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1498 d.addCallback(_check_outfile0)
1499 outfile1 = os.path.join(self.basedir, "outfile0")
1500 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1501 def _check_outfile1((out,err)):
1502 data = open(outfile1,"rb").read()
1503 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1504 d.addCallback(_check_outfile1)
1506 d.addCallback(run, "rm", "tahoe-file0")
1507 d.addCallback(run, "rm", "tahoe:file2")
1508 d.addCallback(run, "ls")
1509 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1511 d.addCallback(run, "ls", "-l")
1512 def _check_ls_l((out,err)):
1513 lines = out.split("\n")
1515 if "tahoe-file-stdin" in l:
1516 self.failUnless(l.startswith("-r-- "), l)
1517 self.failUnless(" %d " % len(STDIN_DATA) in l)
1519 self.failUnless(l.startswith("-rw- "), l) # mutable
1520 d.addCallback(_check_ls_l)
1522 d.addCallback(run, "ls", "--uri")
1523 def _check_ls_uri((out,err)):
1524 lines = out.split("\n")
1527 self.failUnless(self._mutable_file3_uri in l)
1528 d.addCallback(_check_ls_uri)
1530 d.addCallback(run, "ls", "--readonly-uri")
1531 def _check_ls_rouri((out,err)):
1532 lines = out.split("\n")
1535 rw_uri = self._mutable_file3_uri
1536 u = uri.from_string_mutable_filenode(rw_uri)
1537 ro_uri = u.get_readonly().to_string()
1538 self.failUnless(ro_uri in l)
1539 d.addCallback(_check_ls_rouri)
1542 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1543 d.addCallback(run, "ls")
1544 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1546 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1547 d.addCallback(run, "ls")
1548 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1550 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1551 d.addCallback(run, "ls")
1552 d.addCallback(_check_ls, ["file3", "file3-copy"])
1553 d.addCallback(run, "get", "tahoe:file3-copy")
1554 d.addCallback(_check_stdout_against, 3)
1556 # copy from disk into tahoe
1557 d.addCallback(run, "cp", files[4], "tahoe:file4")
1558 d.addCallback(run, "ls")
1559 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1560 d.addCallback(run, "get", "tahoe:file4")
1561 d.addCallback(_check_stdout_against, 4)
1563 # copy from tahoe into disk
1564 target_filename = os.path.join(self.basedir, "file-out")
1565 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1566 def _check_cp_out((out,err)):
1567 self.failUnless(os.path.exists(target_filename))
1568 got = open(target_filename,"rb").read()
1569 self.failUnlessEqual(got, datas[4])
1570 d.addCallback(_check_cp_out)
1572 # copy from disk to disk (silly case)
1573 target2_filename = os.path.join(self.basedir, "file-out-copy")
1574 d.addCallback(run, "cp", target_filename, target2_filename)
1575 def _check_cp_out2((out,err)):
1576 self.failUnless(os.path.exists(target2_filename))
1577 got = open(target2_filename,"rb").read()
1578 self.failUnlessEqual(got, datas[4])
1579 d.addCallback(_check_cp_out2)
1581 # copy from tahoe into disk, overwriting an existing file
1582 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1583 def _check_cp_out3((out,err)):
1584 self.failUnless(os.path.exists(target_filename))
1585 got = open(target_filename,"rb").read()
1586 self.failUnlessEqual(got, datas[3])
1587 d.addCallback(_check_cp_out3)
1589 # copy from disk into tahoe, overwriting an existing immutable file
1590 d.addCallback(run, "cp", files[5], "tahoe:file4")
1591 d.addCallback(run, "ls")
1592 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1593 d.addCallback(run, "get", "tahoe:file4")
1594 d.addCallback(_check_stdout_against, 5)
1596 # copy from disk into tahoe, overwriting an existing mutable file
1597 d.addCallback(run, "cp", files[5], "tahoe:file3")
1598 d.addCallback(run, "ls")
1599 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1600 d.addCallback(run, "get", "tahoe:file3")
1601 d.addCallback(_check_stdout_against, 5)
1603 # recursive copy: setup
1604 dn = os.path.join(self.basedir, "dir1")
1606 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1607 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1608 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1609 sdn2 = os.path.join(dn, "subdir2")
1611 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1612 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1614 # from disk into tahoe
1615 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1616 d.addCallback(run, "ls")
1617 d.addCallback(_check_ls, ["dir1"])
1618 d.addCallback(run, "ls", "dir1")
1619 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1620 ["rfile4", "rfile5"])
1621 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1622 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1623 ["rfile1", "rfile2", "rfile3"])
1624 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1625 d.addCallback(_check_stdout_against, data="rfile4")
1627 # and back out again
1628 dn_copy = os.path.join(self.basedir, "dir1-copy")
1629 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1630 def _check_cp_r_out((out,err)):
1632 old = open(os.path.join(dn, name), "rb").read()
1633 newfn = os.path.join(dn_copy, name)
1634 self.failUnless(os.path.exists(newfn))
1635 new = open(newfn, "rb").read()
1636 self.failUnlessEqual(old, new)
1640 _cmp(os.path.join("subdir2", "rfile4"))
1641 _cmp(os.path.join("subdir2", "rfile5"))
1642 d.addCallback(_check_cp_r_out)
1644 # and copy it a second time, which ought to overwrite the same files
1645 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1647 # and tahoe-to-tahoe
1648 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1649 d.addCallback(run, "ls")
1650 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1651 d.addCallback(run, "ls", "dir1-copy")
1652 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1653 ["rfile4", "rfile5"])
1654 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1655 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1656 ["rfile1", "rfile2", "rfile3"])
1657 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1658 d.addCallback(_check_stdout_against, data="rfile4")
1660 # and copy it a second time, which ought to overwrite the same files
1661 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1663 # tahoe_ls doesn't currently handle the error correctly: it tries to
1664 # JSON-parse a traceback.
1665 ## def _ls_missing(res):
1666 ## argv = ["ls"] + nodeargs + ["bogus"]
1667 ## return self._run_cli(argv)
1668 ## d.addCallback(_ls_missing)
1669 ## def _check_ls_missing((out,err)):
1672 ## self.failUnlessEqual(err, "")
1673 ## d.addCallback(_check_ls_missing)
1677 def _run_cli(self, argv, stdin=""):
1679 stdout, stderr = StringIO(), StringIO()
1680 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1681 stdin=StringIO(stdin),
1682 stdout=stdout, stderr=stderr)
1684 return stdout.getvalue(), stderr.getvalue()
1685 d.addCallback(_done)
1688 def _test_checker(self, res):
1689 ut = upload.Data("too big to be literal" * 200, convergence=None)
1690 d = self._personal_node.add_file(u"big file", ut)
1692 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1693 def _check_dirnode_results(r):
1694 self.failUnless(r.is_healthy())
1695 d.addCallback(_check_dirnode_results)
1696 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1697 d.addCallback(_check_dirnode_results)
1699 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1700 def _got_chk_filenode(n):
1701 self.failUnless(isinstance(n, filenode.FileNode))
1702 d = n.check(Monitor())
1703 def _check_filenode_results(r):
1704 self.failUnless(r.is_healthy())
1705 d.addCallback(_check_filenode_results)
1706 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1707 d.addCallback(_check_filenode_results)
1709 d.addCallback(_got_chk_filenode)
1711 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1712 def _got_lit_filenode(n):
1713 self.failUnless(isinstance(n, filenode.LiteralFileNode))
1714 d = n.check(Monitor())
1715 def _check_lit_filenode_results(r):
1716 self.failUnlessEqual(r, None)
1717 d.addCallback(_check_lit_filenode_results)
1718 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1719 d.addCallback(_check_lit_filenode_results)
1721 d.addCallback(_got_lit_filenode)
1725 class MutableChecker(SystemTestMixin, unittest.TestCase, WebErrorMixin):
1727 def _run_cli(self, argv):
1728 stdout, stderr = StringIO(), StringIO()
1729 runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1730 return stdout.getvalue()
1732 def test_good(self):
1733 self.basedir = self.mktemp()
1734 d = self.set_up_nodes()
1735 CONTENTS = "a little bit of data"
1736 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1739 si = self.node.get_storage_index()
1740 d.addCallback(_created)
1741 # now make sure the webapi verifier sees no problems
1743 url = (self.webish_url +
1744 "uri/%s" % urllib.quote(self.node.get_uri()) +
1745 "?t=check&verify=true")
1746 return getPage(url, method="POST")
1747 d.addCallback(_do_check)
1748 def _got_results(out):
1749 self.failUnless("<span>Healthy!</span>" in out, out)
1750 self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1751 self.failIf("Not Healthy!" in out, out)
1752 self.failIf("Unhealthy" in out, out)
1753 self.failIf("Corrupt Shares" in out, out)
1754 d.addCallback(_got_results)
1755 d.addErrback(self.explain_web_error)
1758 def test_corrupt(self):
1759 self.basedir = self.mktemp()
1760 d = self.set_up_nodes()
1761 CONTENTS = "a little bit of data"
1762 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1765 si = self.node.get_storage_index()
1766 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1767 self.clients[1].basedir])
1768 files = out.split("\n")
1769 # corrupt one of them, using the CLI debug command
1771 shnum = os.path.basename(f)
1772 nodeid = self.clients[1].nodeid
1773 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1774 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1775 out = self._run_cli(["debug", "corrupt-share", files[0]])
1776 d.addCallback(_created)
1777 # now make sure the webapi verifier notices it
1779 url = (self.webish_url +
1780 "uri/%s" % urllib.quote(self.node.get_uri()) +
1781 "?t=check&verify=true")
1782 return getPage(url, method="POST")
1783 d.addCallback(_do_check)
1784 def _got_results(out):
1785 self.failUnless("Not Healthy!" in out, out)
1786 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1787 self.failUnless("Corrupt Shares:" in out, out)
1788 d.addCallback(_got_results)
1790 # now make sure the webapi repairer can fix it
1791 def _do_repair(res):
1792 url = (self.webish_url +
1793 "uri/%s" % urllib.quote(self.node.get_uri()) +
1794 "?t=check&verify=true&repair=true")
1795 return getPage(url, method="POST")
1796 d.addCallback(_do_repair)
1797 def _got_repair_results(out):
1798 self.failUnless("<div>Repair successful</div>" in out, out)
1799 d.addCallback(_got_repair_results)
1800 d.addCallback(_do_check)
1801 def _got_postrepair_results(out):
1802 self.failIf("Not Healthy!" in out, out)
1803 self.failUnless("Recoverable Versions: 10*seq" in out, out)
1804 d.addCallback(_got_postrepair_results)
1805 d.addErrback(self.explain_web_error)
1809 def test_delete_share(self):
1810 self.basedir = self.mktemp()
1811 d = self.set_up_nodes()
1812 CONTENTS = "a little bit of data"
1813 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1816 si = self.node.get_storage_index()
1817 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1818 self.clients[1].basedir])
1819 files = out.split("\n")
1820 # corrupt one of them, using the CLI debug command
1822 shnum = os.path.basename(f)
1823 nodeid = self.clients[1].nodeid
1824 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1825 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1827 d.addCallback(_created)
1828 # now make sure the webapi checker notices it
1830 url = (self.webish_url +
1831 "uri/%s" % urllib.quote(self.node.get_uri()) +
1832 "?t=check&verify=false")
1833 return getPage(url, method="POST")
1834 d.addCallback(_do_check)
1835 def _got_results(out):
1836 self.failUnless("Not Healthy!" in out, out)
1837 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1838 self.failIf("Corrupt Shares" in out, out)
1839 d.addCallback(_got_results)
1841 # now make sure the webapi repairer can fix it
1842 def _do_repair(res):
1843 url = (self.webish_url +
1844 "uri/%s" % urllib.quote(self.node.get_uri()) +
1845 "?t=check&verify=false&repair=true")
1846 return getPage(url, method="POST")
1847 d.addCallback(_do_repair)
1848 def _got_repair_results(out):
1849 self.failUnless("Repair successful" in out)
1850 d.addCallback(_got_repair_results)
1851 d.addCallback(_do_check)
1852 def _got_postrepair_results(out):
1853 self.failIf("Not Healthy!" in out, out)
1854 self.failUnless("Recoverable Versions: 10*seq" in out)
1855 d.addCallback(_got_postrepair_results)
1856 d.addErrback(self.explain_web_error)
1860 class DeepCheckWeb(SystemTestMixin, unittest.TestCase, WebErrorMixin):
1861 # construct a small directory tree (with one dir, one immutable file, one
1862 # mutable file, one LIT file, and a loop), and then check/examine it in
1865 def set_up_tree(self, ignored):
1867 c0 = self.clients[0]
1868 d = c0.create_empty_dirnode()
1869 def _created_root(n):
1871 self.root_uri = n.get_uri()
1872 d.addCallback(_created_root)
1873 d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1874 d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1875 def _created_mutable(n):
1877 self.mutable_uri = n.get_uri()
1878 d.addCallback(_created_mutable)
1880 large = upload.Data("Lots of data\n" * 1000, None)
1881 d.addCallback(lambda ign: self.root.add_file(u"large", large))
1882 def _created_large(n):
1884 self.large_uri = n.get_uri()
1885 d.addCallback(_created_large)
1887 small = upload.Data("Small enough for a LIT", None)
1888 d.addCallback(lambda ign: self.root.add_file(u"small", small))
1889 def _created_small(n):
1891 self.small_uri = n.get_uri()
1892 d.addCallback(_created_small)
1894 d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
1897 def check_is_healthy(self, cr, n, where, incomplete=False):
1898 self.failUnless(ICheckerResults.providedBy(cr), where)
1899 self.failUnless(cr.is_healthy(), where)
1900 self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
1902 self.failUnlessEqual(cr.get_storage_index_string(),
1903 base32.b2a(n.get_storage_index()), where)
1904 needs_rebalancing = bool( len(self.clients) < 10 )
1906 self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
1908 self.failUnlessEqual(d["count-shares-good"], 10, where)
1909 self.failUnlessEqual(d["count-shares-needed"], 3, where)
1910 self.failUnlessEqual(d["count-shares-expected"], 10, where)
1912 self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
1913 self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
1914 self.failUnlessEqual(d["list-corrupt-shares"], [], where)
1916 self.failUnlessEqual(sorted(d["servers-responding"]),
1917 sorted([c.nodeid for c in self.clients]),
1919 self.failUnless("sharemap" in d, where)
1920 all_serverids = set()
1921 for (shareid, serverids) in d["sharemap"].items():
1922 all_serverids.update(serverids)
1923 self.failUnlessEqual(sorted(all_serverids),
1924 sorted([c.nodeid for c in self.clients]),
1927 self.failUnlessEqual(d["count-wrong-shares"], 0, where)
1928 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
1929 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
1932 def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
1933 self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
1934 self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
1935 self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
1936 self.failUnless(cr.get_post_repair_results().is_healthy(), where)
1937 self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
1938 self.failIf(cr.get_repair_attempted(), where)
1940 def deep_check_is_healthy(self, cr, num_healthy, where):
1941 self.failUnless(IDeepCheckResults.providedBy(cr))
1942 self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
1945 def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
1946 self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
1947 c = cr.get_counters()
1948 self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
1950 self.failUnlessEqual(c["count-objects-healthy-post-repair"],
1952 self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
1954 def test_good(self):
1955 self.basedir = self.mktemp()
1956 d = self.set_up_nodes()
1957 d.addCallback(self.set_up_tree)
1958 d.addCallback(self.do_stats)
1959 d.addCallback(self.do_test_good)
1960 d.addCallback(self.do_test_web)
1961 d.addErrback(self.explain_web_error)
1964 def do_stats(self, ignored):
1965 d = defer.succeed(None)
1966 d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
1967 d.addCallback(self.check_stats)
1970 def check_stats(self, s):
1971 self.failUnlessEqual(s["count-directories"], 1)
1972 self.failUnlessEqual(s["count-files"], 3)
1973 self.failUnlessEqual(s["count-immutable-files"], 1)
1974 self.failUnlessEqual(s["count-literal-files"], 1)
1975 self.failUnlessEqual(s["count-mutable-files"], 1)
1976 # don't check directories: their size will vary
1977 # s["largest-directory"]
1978 # s["size-directories"]
1979 self.failUnlessEqual(s["largest-directory-children"], 4)
1980 self.failUnlessEqual(s["largest-immutable-file"], 13000)
1981 # to re-use this function for both the local
1982 # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
1983 # coerce the result into a list of tuples. dirnode.start_deep_stats()
1984 # returns a list of tuples, but JSON only knows about lists., so
1985 # t=start-deep-stats returns a list of lists.
1986 histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
1987 self.failUnlessEqual(histogram, [(11, 31, 1),
1990 self.failUnlessEqual(s["size-immutable-files"], 13000)
1991 self.failUnlessEqual(s["size-literal-files"], 22)
1993 def do_test_good(self, ignored):
1994 d = defer.succeed(None)
1995 # check the individual items
1996 d.addCallback(lambda ign: self.root.check(Monitor()))
1997 d.addCallback(self.check_is_healthy, self.root, "root")
1998 d.addCallback(lambda ign: self.mutable.check(Monitor()))
1999 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2000 d.addCallback(lambda ign: self.large.check(Monitor()))
2001 d.addCallback(self.check_is_healthy, self.large, "large")
2002 d.addCallback(lambda ign: self.small.check(Monitor()))
2003 d.addCallback(self.failUnlessEqual, None, "small")
2005 # and again with verify=True
2006 d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2007 d.addCallback(self.check_is_healthy, self.root, "root")
2008 d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2009 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2010 d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2011 d.addCallback(self.check_is_healthy, self.large, "large",
2013 d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2014 d.addCallback(self.failUnlessEqual, None, "small")
2016 # and check_and_repair(), which should be a nop
2017 d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2018 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2019 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2020 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2021 d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2022 d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2023 d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2024 d.addCallback(self.failUnlessEqual, None, "small")
2026 # check_and_repair(verify=True)
2027 d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2028 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2029 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2030 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2031 d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2032 d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2034 d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2035 d.addCallback(self.failUnlessEqual, None, "small")
2038 # now deep-check the root, with various verify= and repair= options
2039 d.addCallback(lambda ign:
2040 self.root.start_deep_check().when_done())
2041 d.addCallback(self.deep_check_is_healthy, 3, "root")
2042 d.addCallback(lambda ign:
2043 self.root.start_deep_check(verify=True).when_done())
2044 d.addCallback(self.deep_check_is_healthy, 3, "root")
2045 d.addCallback(lambda ign:
2046 self.root.start_deep_check_and_repair().when_done())
2047 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2048 d.addCallback(lambda ign:
2049 self.root.start_deep_check_and_repair(verify=True).when_done())
2050 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2052 # and finally, start a deep-check, but then cancel it.
2053 d.addCallback(lambda ign: self.root.start_deep_check())
2054 def _checking(monitor):
2056 d = monitor.when_done()
2057 # this should fire as soon as the next dirnode.list finishes.
2058 # TODO: add a counter to measure how many list() calls are made,
2059 # assert that no more than one gets to run before the cancel()
2061 def _finished_normally(res):
2062 self.fail("this was supposed to fail, not finish normally")
2064 f.trap(OperationCancelledError)
2065 d.addCallbacks(_finished_normally, _cancelled)
2067 d.addCallback(_checking)
2071 def web_json(self, n, **kwargs):
2072 kwargs["output"] = "json"
2073 d = self.web(n, "POST", **kwargs)
2074 d.addCallback(self.decode_json)
2077 def decode_json(self, (s,url)):
2079 data = simplejson.loads(s)
2081 self.fail("%s: not JSON: '%s'" % (url, s))
2084 def web(self, n, method="GET", **kwargs):
2085 # returns (data, url)
2086 url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
2087 + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
2088 d = getPage(url, method=method)
2089 d.addCallback(lambda data: (data,url))
2092 def wait_for_operation(self, ignored, ophandle):
2093 url = self.webish_url + "operations/" + ophandle
2094 url += "?t=status&output=JSON"
2098 data = simplejson.loads(res)
2100 self.fail("%s: not JSON: '%s'" % (url, res))
2101 if not data["finished"]:
2102 d = self.stall(delay=1.0)
2103 d.addCallback(self.wait_for_operation, ophandle)
2109 def get_operation_results(self, ignored, ophandle, output=None):
2110 url = self.webish_url + "operations/" + ophandle
2113 url += "&output=" + output
2116 if output and output.lower() == "json":
2118 return simplejson.loads(res)
2120 self.fail("%s: not JSON: '%s'" % (url, res))
2125 def slow_web(self, n, output=None, **kwargs):
2127 handle = base32.b2a(os.urandom(4))
2128 d = self.web(n, "POST", ophandle=handle, **kwargs)
2129 d.addCallback(self.wait_for_operation, handle)
2130 d.addCallback(self.get_operation_results, handle, output=output)
2133 def json_check_is_healthy(self, data, n, where, incomplete=False):
2135 self.failUnlessEqual(data["storage-index"],
2136 base32.b2a(n.get_storage_index()), where)
2138 self.failUnlessEqual(r["healthy"], True, where)
2139 needs_rebalancing = bool( len(self.clients) < 10 )
2141 self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2142 self.failUnlessEqual(r["count-shares-good"], 10, where)
2143 self.failUnlessEqual(r["count-shares-needed"], 3, where)
2144 self.failUnlessEqual(r["count-shares-expected"], 10, where)
2146 self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2147 self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2148 self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2150 self.failUnlessEqual(sorted(r["servers-responding"]),
2151 sorted([idlib.nodeid_b2a(c.nodeid)
2152 for c in self.clients]), where)
2153 self.failUnless("sharemap" in r, where)
2154 all_serverids = set()
2155 for (shareid, serverids_s) in r["sharemap"].items():
2156 all_serverids.update(serverids_s)
2157 self.failUnlessEqual(sorted(all_serverids),
2158 sorted([idlib.nodeid_b2a(c.nodeid)
2159 for c in self.clients]), where)
2160 self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2161 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2162 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2164 def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2165 self.failUnlessEqual(data["storage-index"],
2166 base32.b2a(n.get_storage_index()), where)
2167 self.failUnlessEqual(data["repair-attempted"], False, where)
2168 self.json_check_is_healthy(data["pre-repair-results"],
2169 n, where, incomplete)
2170 self.json_check_is_healthy(data["post-repair-results"],
2171 n, where, incomplete)
2173 def json_full_deepcheck_is_healthy(self, data, n, where):
2174 self.failUnlessEqual(data["root-storage-index"],
2175 base32.b2a(n.get_storage_index()), where)
2176 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2177 self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2178 self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2179 self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2180 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2181 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2182 self.json_check_stats(data["stats"], where)
2184 def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2185 self.failUnlessEqual(data["root-storage-index"],
2186 base32.b2a(n.get_storage_index()), where)
2187 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2189 self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2190 self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2191 self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2193 self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2194 self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2195 self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2197 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2198 self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2199 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2201 self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2202 self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2203 self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2206 def json_check_lit(self, data, n, where):
2207 self.failUnlessEqual(data["storage-index"], "", where)
2208 self.failUnlessEqual(data["results"]["healthy"], True, where)
2210 def json_check_stats(self, data, where):
2211 self.check_stats(data)
2213 def do_test_web(self, ignored):
2214 d = defer.succeed(None)
2217 d.addCallback(lambda ign:
2218 self.slow_web(self.root,
2219 t="start-deep-stats", output="json"))
2220 d.addCallback(self.json_check_stats, "deep-stats")
2223 d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2224 d.addCallback(self.json_check_is_healthy, self.root, "root")
2225 d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2226 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2227 d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2228 d.addCallback(self.json_check_is_healthy, self.large, "large")
2229 d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2230 d.addCallback(self.json_check_lit, self.small, "small")
2233 d.addCallback(lambda ign:
2234 self.web_json(self.root, t="check", verify="true"))
2235 d.addCallback(self.json_check_is_healthy, self.root, "root")
2236 d.addCallback(lambda ign:
2237 self.web_json(self.mutable, t="check", verify="true"))
2238 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2239 d.addCallback(lambda ign:
2240 self.web_json(self.large, t="check", verify="true"))
2241 d.addCallback(self.json_check_is_healthy, self.large, "large", incomplete=True)
2242 d.addCallback(lambda ign:
2243 self.web_json(self.small, t="check", verify="true"))
2244 d.addCallback(self.json_check_lit, self.small, "small")
2246 # check and repair, no verify
2247 d.addCallback(lambda ign:
2248 self.web_json(self.root, t="check", repair="true"))
2249 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2250 d.addCallback(lambda ign:
2251 self.web_json(self.mutable, t="check", repair="true"))
2252 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2253 d.addCallback(lambda ign:
2254 self.web_json(self.large, t="check", repair="true"))
2255 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large")
2256 d.addCallback(lambda ign:
2257 self.web_json(self.small, t="check", repair="true"))
2258 d.addCallback(self.json_check_lit, self.small, "small")
2260 # check+verify+repair
2261 d.addCallback(lambda ign:
2262 self.web_json(self.root, t="check", repair="true", verify="true"))
2263 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2264 d.addCallback(lambda ign:
2265 self.web_json(self.mutable, t="check", repair="true", verify="true"))
2266 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2267 d.addCallback(lambda ign:
2268 self.web_json(self.large, t="check", repair="true", verify="true"))
2269 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large", incomplete=True)
2270 d.addCallback(lambda ign:
2271 self.web_json(self.small, t="check", repair="true", verify="true"))
2272 d.addCallback(self.json_check_lit, self.small, "small")
2274 # now run a deep-check, with various verify= and repair= flags
2275 d.addCallback(lambda ign:
2276 self.slow_web(self.root, t="start-deep-check", output="json"))
2277 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2278 d.addCallback(lambda ign:
2279 self.slow_web(self.root, t="start-deep-check", verify="true",
2281 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2282 d.addCallback(lambda ign:
2283 self.slow_web(self.root, t="start-deep-check", repair="true",
2285 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2286 d.addCallback(lambda ign:
2287 self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2288 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2290 # now look at t=info
2291 d.addCallback(lambda ign: self.web(self.root, t="info"))
2292 # TODO: examine the output
2293 d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2294 d.addCallback(lambda ign: self.web(self.large, t="info"))
2295 d.addCallback(lambda ign: self.web(self.small, t="info"))