1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
11 from allmydata import uri, storage, offloaded
12 from allmydata.immutable import download, upload, filenode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17 ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
18 IDeepCheckAndRepairResults, NoSuchChildError, NotEnoughSharesError
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
27 from allmydata.test.common import SystemTestMixin, WebErrorMixin, \
28 MemoryConsumer, download_to_data
31 This is some data to publish to the virtual drive, which needs to be large
32 enough to not fit inside a LIT uri.
35 class CountingDataUploadable(upload.Data):
37 interrupt_after = None
38 interrupt_after_d = None
40 def read(self, length):
41 self.bytes_read += length
42 if self.interrupt_after is not None:
43 if self.bytes_read > self.interrupt_after:
44 self.interrupt_after = None
45 self.interrupt_after_d.callback(self)
46 return upload.Data.read(self, length)
48 class GrabEverythingConsumer:
54 def registerProducer(self, producer, streaming):
56 assert IPushProducer.providedBy(producer)
58 def write(self, data):
61 def unregisterProducer(self):
64 class SystemTest(SystemTestMixin, unittest.TestCase):
66 def test_connections(self):
67 self.basedir = "system/SystemTest/test_connections"
68 d = self.set_up_nodes()
69 self.extra_node = None
70 d.addCallback(lambda res: self.add_extra_node(self.numclients))
71 def _check(extra_node):
72 self.extra_node = extra_node
73 for c in self.clients:
74 all_peerids = list(c.get_all_peerids())
75 self.failUnlessEqual(len(all_peerids), self.numclients+1)
76 permuted_peers = list(c.get_permuted_peers("storage", "a"))
77 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
80 def _shutdown_extra_node(res):
82 return self.extra_node.stopService()
84 d.addBoth(_shutdown_extra_node)
86 test_connections.timeout = 300
87 # test_connections is subsumed by test_upload_and_download, and takes
88 # quite a while to run on a slow machine (because of all the TLS
89 # connections that must be established). If we ever rework the introducer
90 # code to such an extent that we're not sure if it works anymore, we can
91 # reinstate this test until it does.
94 def test_upload_and_download_random_key(self):
95 self.basedir = "system/SystemTest/test_upload_and_download_random_key"
96 return self._test_upload_and_download(convergence=None)
97 test_upload_and_download_random_key.timeout = 4800
99 def test_upload_and_download_convergent(self):
100 self.basedir = "system/SystemTest/test_upload_and_download_convergent"
101 return self._test_upload_and_download(convergence="some convergence string")
102 test_upload_and_download_convergent.timeout = 4800
104 def _test_upload_and_download(self, convergence):
105 # we use 4000 bytes of data, which will result in about 400k written
106 # to disk among all our simulated nodes
107 DATA = "Some data to upload\n" * 200
108 d = self.set_up_nodes()
109 def _check_connections(res):
110 for c in self.clients:
111 all_peerids = list(c.get_all_peerids())
112 self.failUnlessEqual(len(all_peerids), self.numclients)
113 permuted_peers = list(c.get_permuted_peers("storage", "a"))
114 self.failUnlessEqual(len(permuted_peers), self.numclients)
115 d.addCallback(_check_connections)
119 u = self.clients[0].getServiceNamed("uploader")
121 # we crank the max segsize down to 1024b for the duration of this
122 # test, so we can exercise multiple segments. It is important
123 # that this is not a multiple of the segment size, so that the
124 # tail segment is not the same length as the others. This actualy
125 # gets rounded up to 1025 to be a multiple of the number of
126 # required shares (since we use 25 out of 100 FEC).
127 up = upload.Data(DATA, convergence=convergence)
128 up.max_segment_size = 1024
131 d.addCallback(_do_upload)
132 def _upload_done(results):
134 log.msg("upload finished: uri is %s" % (uri,))
136 dl = self.clients[1].getServiceNamed("downloader")
138 d.addCallback(_upload_done)
140 def _upload_again(res):
141 # Upload again. If using convergent encryption then this ought to be
142 # short-circuited, however with the way we currently generate URIs
143 # (i.e. because they include the roothash), we have to do all of the
144 # encoding work, and only get to save on the upload part.
145 log.msg("UPLOADING AGAIN")
146 up = upload.Data(DATA, convergence=convergence)
147 up.max_segment_size = 1024
148 d1 = self.uploader.upload(up)
149 d.addCallback(_upload_again)
151 def _download_to_data(res):
152 log.msg("DOWNLOADING")
153 return self.downloader.download_to_data(self.uri)
154 d.addCallback(_download_to_data)
155 def _download_to_data_done(data):
156 log.msg("download finished")
157 self.failUnlessEqual(data, DATA)
158 d.addCallback(_download_to_data_done)
160 target_filename = os.path.join(self.basedir, "download.target")
161 def _download_to_filename(res):
162 return self.downloader.download_to_filename(self.uri,
164 d.addCallback(_download_to_filename)
165 def _download_to_filename_done(res):
166 newdata = open(target_filename, "rb").read()
167 self.failUnlessEqual(newdata, DATA)
168 d.addCallback(_download_to_filename_done)
170 target_filename2 = os.path.join(self.basedir, "download.target2")
171 def _download_to_filehandle(res):
172 fh = open(target_filename2, "wb")
173 return self.downloader.download_to_filehandle(self.uri, fh)
174 d.addCallback(_download_to_filehandle)
175 def _download_to_filehandle_done(fh):
177 newdata = open(target_filename2, "rb").read()
178 self.failUnlessEqual(newdata, DATA)
179 d.addCallback(_download_to_filehandle_done)
181 consumer = GrabEverythingConsumer()
182 ct = download.ConsumerAdapter(consumer)
183 d.addCallback(lambda res:
184 self.downloader.download(self.uri, ct))
185 def _download_to_consumer_done(ign):
186 self.failUnlessEqual(consumer.contents, DATA)
187 d.addCallback(_download_to_consumer_done)
190 n = self.clients[1].create_node_from_uri(self.uri)
191 d = download_to_data(n)
192 def _read_done(data):
193 self.failUnlessEqual(data, DATA)
194 d.addCallback(_read_done)
195 d.addCallback(lambda ign:
196 n.read(MemoryConsumer(), offset=1, size=4))
197 def _read_portion_done(mc):
198 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
199 d.addCallback(_read_portion_done)
200 d.addCallback(lambda ign:
201 n.read(MemoryConsumer(), offset=2, size=None))
202 def _read_tail_done(mc):
203 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
204 d.addCallback(_read_tail_done)
206 d.addCallback(_test_read)
208 def _download_nonexistent_uri(res):
209 baduri = self.mangle_uri(self.uri)
210 log.msg("about to download non-existent URI", level=log.UNUSUAL,
211 facility="tahoe.tests")
212 d1 = self.downloader.download_to_data(baduri)
213 def _baduri_should_fail(res):
214 log.msg("finished downloading non-existend URI",
215 level=log.UNUSUAL, facility="tahoe.tests")
216 self.failUnless(isinstance(res, Failure))
217 self.failUnless(res.check(NotEnoughSharesError),
218 "expected NotEnoughSharesError, got %s" % res)
219 # TODO: files that have zero peers should get a special kind
220 # of NotEnoughSharesError, which can be used to suggest that
221 # the URI might be wrong or that they've never uploaded the
222 # file in the first place.
223 d1.addBoth(_baduri_should_fail)
225 d.addCallback(_download_nonexistent_uri)
227 # add a new node, which doesn't accept shares, and only uses the
229 d.addCallback(lambda res: self.add_extra_node(self.numclients,
231 add_to_sparent=True))
232 def _added(extra_node):
233 self.extra_node = extra_node
234 extra_node.getServiceNamed("storage").sizelimit = 0
235 d.addCallback(_added)
237 HELPER_DATA = "Data that needs help to upload" * 1000
238 def _upload_with_helper(res):
239 u = upload.Data(HELPER_DATA, convergence=convergence)
240 d = self.extra_node.upload(u)
241 def _uploaded(results):
243 return self.downloader.download_to_data(uri)
244 d.addCallback(_uploaded)
246 self.failUnlessEqual(newdata, HELPER_DATA)
247 d.addCallback(_check)
249 d.addCallback(_upload_with_helper)
251 def _upload_duplicate_with_helper(res):
252 u = upload.Data(HELPER_DATA, convergence=convergence)
253 u.debug_stash_RemoteEncryptedUploadable = True
254 d = self.extra_node.upload(u)
255 def _uploaded(results):
257 return self.downloader.download_to_data(uri)
258 d.addCallback(_uploaded)
260 self.failUnlessEqual(newdata, HELPER_DATA)
261 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
262 "uploadable started uploading, should have been avoided")
263 d.addCallback(_check)
265 if convergence is not None:
266 d.addCallback(_upload_duplicate_with_helper)
268 def _upload_resumable(res):
269 DATA = "Data that needs help to upload and gets interrupted" * 1000
270 u1 = CountingDataUploadable(DATA, convergence=convergence)
271 u2 = CountingDataUploadable(DATA, convergence=convergence)
273 # we interrupt the connection after about 5kB by shutting down
274 # the helper, then restartingit.
275 u1.interrupt_after = 5000
276 u1.interrupt_after_d = defer.Deferred()
277 u1.interrupt_after_d.addCallback(lambda res:
278 self.bounce_client(0))
280 # sneak into the helper and reduce its chunk size, so that our
281 # debug_interrupt will sever the connection on about the fifth
282 # chunk fetched. This makes sure that we've started to write the
283 # new shares before we abandon them, which exercises the
284 # abort/delete-partial-share code. TODO: find a cleaner way to do
285 # this. I know that this will affect later uses of the helper in
286 # this same test run, but I'm not currently worried about it.
287 offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
289 d = self.extra_node.upload(u1)
291 def _should_not_finish(res):
292 self.fail("interrupted upload should have failed, not finished"
293 " with result %s" % (res,))
295 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
297 # make sure we actually interrupted it before finishing the
299 self.failUnless(u1.bytes_read < len(DATA),
300 "read %d out of %d total" % (u1.bytes_read,
303 log.msg("waiting for reconnect", level=log.NOISY,
304 facility="tahoe.test.test_system")
305 # now, we need to give the nodes a chance to notice that this
306 # connection has gone away. When this happens, the storage
307 # servers will be told to abort their uploads, removing the
308 # partial shares. Unfortunately this involves TCP messages
309 # going through the loopback interface, and we can't easily
310 # predict how long that will take. If it were all local, we
311 # could use fireEventually() to stall. Since we don't have
312 # the right introduction hooks, the best we can do is use a
313 # fixed delay. TODO: this is fragile.
314 u1.interrupt_after_d.addCallback(self.stall, 2.0)
315 return u1.interrupt_after_d
316 d.addCallbacks(_should_not_finish, _interrupted)
318 def _disconnected(res):
319 # check to make sure the storage servers aren't still hanging
320 # on to the partial share: their incoming/ directories should
322 log.msg("disconnected", level=log.NOISY,
323 facility="tahoe.test.test_system")
324 for i in range(self.numclients):
325 incdir = os.path.join(self.getdir("client%d" % i),
326 "storage", "shares", "incoming")
327 self.failIf(os.path.exists(incdir) and os.listdir(incdir))
328 d.addCallback(_disconnected)
330 # then we need to give the reconnector a chance to
331 # reestablish the connection to the helper.
332 d.addCallback(lambda res:
333 log.msg("wait_for_connections", level=log.NOISY,
334 facility="tahoe.test.test_system"))
335 d.addCallback(lambda res: self.wait_for_connections())
338 d.addCallback(lambda res:
339 log.msg("uploading again", level=log.NOISY,
340 facility="tahoe.test.test_system"))
341 d.addCallback(lambda res: self.extra_node.upload(u2))
343 def _uploaded(results):
345 log.msg("Second upload complete", level=log.NOISY,
346 facility="tahoe.test.test_system")
348 # this is really bytes received rather than sent, but it's
349 # convenient and basically measures the same thing
350 bytes_sent = results.ciphertext_fetched
352 # We currently don't support resumption of upload if the data is
353 # encrypted with a random key. (Because that would require us
354 # to store the key locally and re-use it on the next upload of
355 # this file, which isn't a bad thing to do, but we currently
357 if convergence is not None:
358 # Make sure we did not have to read the whole file the
359 # second time around .
360 self.failUnless(bytes_sent < len(DATA),
361 "resumption didn't save us any work:"
362 " read %d bytes out of %d total" %
363 (bytes_sent, len(DATA)))
365 # Make sure we did have to read the whole file the second
366 # time around -- because the one that we partially uploaded
367 # earlier was encrypted with a different random key.
368 self.failIf(bytes_sent < len(DATA),
369 "resumption saved us some work even though we were using random keys:"
370 " read %d bytes out of %d total" %
371 (bytes_sent, len(DATA)))
372 return self.downloader.download_to_data(uri)
373 d.addCallback(_uploaded)
376 self.failUnlessEqual(newdata, DATA)
377 # If using convergent encryption, then also check that the
378 # helper has removed the temp file from its directories.
379 if convergence is not None:
380 basedir = os.path.join(self.getdir("client0"), "helper")
381 files = os.listdir(os.path.join(basedir, "CHK_encoding"))
382 self.failUnlessEqual(files, [])
383 files = os.listdir(os.path.join(basedir, "CHK_incoming"))
384 self.failUnlessEqual(files, [])
385 d.addCallback(_check)
387 d.addCallback(_upload_resumable)
391 def _find_shares(self, basedir):
393 for (dirpath, dirnames, filenames) in os.walk(basedir):
394 if "storage" not in dirpath:
398 pieces = dirpath.split(os.sep)
399 if pieces[-4] == "storage" and pieces[-3] == "shares":
400 # we're sitting in .../storage/shares/$START/$SINDEX , and there
401 # are sharefiles here
402 assert pieces[-5].startswith("client")
403 client_num = int(pieces[-5][-1])
404 storage_index_s = pieces[-1]
405 storage_index = storage.si_a2b(storage_index_s)
406 for sharename in filenames:
407 shnum = int(sharename)
408 filename = os.path.join(dirpath, sharename)
409 data = (client_num, storage_index, filename, shnum)
412 self.fail("unable to find any share files in %s" % basedir)
415 def _corrupt_mutable_share(self, filename, which):
416 msf = storage.MutableShareFile(filename)
417 datav = msf.readv([ (0, 1000000) ])
418 final_share = datav[0]
419 assert len(final_share) < 1000000 # ought to be truncated
420 pieces = mutable_layout.unpack_share(final_share)
421 (seqnum, root_hash, IV, k, N, segsize, datalen,
422 verification_key, signature, share_hash_chain, block_hash_tree,
423 share_data, enc_privkey) = pieces
425 if which == "seqnum":
428 root_hash = self.flip_bit(root_hash)
430 IV = self.flip_bit(IV)
431 elif which == "segsize":
432 segsize = segsize + 15
433 elif which == "pubkey":
434 verification_key = self.flip_bit(verification_key)
435 elif which == "signature":
436 signature = self.flip_bit(signature)
437 elif which == "share_hash_chain":
438 nodenum = share_hash_chain.keys()[0]
439 share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
440 elif which == "block_hash_tree":
441 block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
442 elif which == "share_data":
443 share_data = self.flip_bit(share_data)
444 elif which == "encprivkey":
445 enc_privkey = self.flip_bit(enc_privkey)
447 prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
449 final_share = mutable_layout.pack_share(prefix,
456 msf.writev( [(0, final_share)], None)
459 def test_mutable(self):
460 self.basedir = "system/SystemTest/test_mutable"
461 DATA = "initial contents go here." # 25 bytes % 3 != 0
462 NEWDATA = "new contents yay"
463 NEWERDATA = "this is getting old"
465 d = self.set_up_nodes(use_key_generator=True)
467 def _create_mutable(res):
469 log.msg("starting create_mutable_file")
470 d1 = c.create_mutable_file(DATA)
472 log.msg("DONE: %s" % (res,))
473 self._mutable_node_1 = res
475 d1.addCallback(_done)
477 d.addCallback(_create_mutable)
479 def _test_debug(res):
480 # find a share. It is important to run this while there is only
481 # one slot in the grid.
482 shares = self._find_shares(self.basedir)
483 (client_num, storage_index, filename, shnum) = shares[0]
484 log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
486 log.msg(" for clients[%d]" % client_num)
488 out,err = StringIO(), StringIO()
489 rc = runner.runner(["debug", "dump-share", "--offsets",
491 stdout=out, stderr=err)
492 output = out.getvalue()
493 self.failUnlessEqual(rc, 0)
495 self.failUnless("Mutable slot found:\n" in output)
496 self.failUnless("share_type: SDMF\n" in output)
497 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
498 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
499 self.failUnless(" num_extra_leases: 0\n" in output)
500 # the pubkey size can vary by a byte, so the container might
501 # be a bit larger on some runs.
502 m = re.search(r'^ container_size: (\d+)$', output, re.M)
504 container_size = int(m.group(1))
505 self.failUnless(2037 <= container_size <= 2049, container_size)
506 m = re.search(r'^ data_length: (\d+)$', output, re.M)
508 data_length = int(m.group(1))
509 self.failUnless(2037 <= data_length <= 2049, data_length)
510 self.failUnless(" secrets are for nodeid: %s\n" % peerid
512 self.failUnless(" SDMF contents:\n" in output)
513 self.failUnless(" seqnum: 1\n" in output)
514 self.failUnless(" required_shares: 3\n" in output)
515 self.failUnless(" total_shares: 10\n" in output)
516 self.failUnless(" segsize: 27\n" in output, (output, filename))
517 self.failUnless(" datalen: 25\n" in output)
518 # the exact share_hash_chain nodes depends upon the sharenum,
519 # and is more of a hassle to compute than I want to deal with
521 self.failUnless(" share_hash_chain: " in output)
522 self.failUnless(" block_hash_tree: 1 nodes\n" in output)
523 expected = (" verify-cap: URI:SSK-Verifier:%s:" %
524 base32.b2a(storage_index))
525 self.failUnless(expected in output)
526 except unittest.FailTest:
528 print "dump-share output was:"
531 d.addCallback(_test_debug)
535 # first, let's see if we can use the existing node to retrieve the
536 # contents. This allows it to use the cached pubkey and maybe the
537 # latest-known sharemap.
539 d.addCallback(lambda res: self._mutable_node_1.download_best_version())
540 def _check_download_1(res):
541 self.failUnlessEqual(res, DATA)
542 # now we see if we can retrieve the data from a new node,
543 # constructed using the URI of the original one. We do this test
544 # on the same client that uploaded the data.
545 uri = self._mutable_node_1.get_uri()
546 log.msg("starting retrieve1")
547 newnode = self.clients[0].create_node_from_uri(uri)
548 newnode_2 = self.clients[0].create_node_from_uri(uri)
549 self.failUnlessIdentical(newnode, newnode_2)
550 return newnode.download_best_version()
551 d.addCallback(_check_download_1)
553 def _check_download_2(res):
554 self.failUnlessEqual(res, DATA)
555 # same thing, but with a different client
556 uri = self._mutable_node_1.get_uri()
557 newnode = self.clients[1].create_node_from_uri(uri)
558 log.msg("starting retrieve2")
559 d1 = newnode.download_best_version()
560 d1.addCallback(lambda res: (res, newnode))
562 d.addCallback(_check_download_2)
564 def _check_download_3((res, newnode)):
565 self.failUnlessEqual(res, DATA)
567 log.msg("starting replace1")
568 d1 = newnode.overwrite(NEWDATA)
569 d1.addCallback(lambda res: newnode.download_best_version())
571 d.addCallback(_check_download_3)
573 def _check_download_4(res):
574 self.failUnlessEqual(res, NEWDATA)
575 # now create an even newer node and replace the data on it. This
576 # new node has never been used for download before.
577 uri = self._mutable_node_1.get_uri()
578 newnode1 = self.clients[2].create_node_from_uri(uri)
579 newnode2 = self.clients[3].create_node_from_uri(uri)
580 self._newnode3 = self.clients[3].create_node_from_uri(uri)
581 log.msg("starting replace2")
582 d1 = newnode1.overwrite(NEWERDATA)
583 d1.addCallback(lambda res: newnode2.download_best_version())
585 d.addCallback(_check_download_4)
587 def _check_download_5(res):
588 log.msg("finished replace2")
589 self.failUnlessEqual(res, NEWERDATA)
590 d.addCallback(_check_download_5)
592 def _corrupt_shares(res):
593 # run around and flip bits in all but k of the shares, to test
595 shares = self._find_shares(self.basedir)
596 ## sort by share number
597 #shares.sort( lambda a,b: cmp(a[3], b[3]) )
598 where = dict([ (shnum, filename)
599 for (client_num, storage_index, filename, shnum)
601 assert len(where) == 10 # this test is designed for 3-of-10
602 for shnum, filename in where.items():
603 # shares 7,8,9 are left alone. read will check
604 # (share_hash_chain, block_hash_tree, share_data). New
605 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
606 # segsize, signature).
608 # read: this will trigger "pubkey doesn't match
610 self._corrupt_mutable_share(filename, "pubkey")
611 self._corrupt_mutable_share(filename, "encprivkey")
613 # triggers "signature is invalid"
614 self._corrupt_mutable_share(filename, "seqnum")
616 # triggers "signature is invalid"
617 self._corrupt_mutable_share(filename, "R")
619 # triggers "signature is invalid"
620 self._corrupt_mutable_share(filename, "segsize")
622 self._corrupt_mutable_share(filename, "share_hash_chain")
624 self._corrupt_mutable_share(filename, "block_hash_tree")
626 self._corrupt_mutable_share(filename, "share_data")
627 # other things to correct: IV, signature
628 # 7,8,9 are left alone
630 # note that initial_query_count=5 means that we'll hit the
631 # first 5 servers in effectively random order (based upon
632 # response time), so we won't necessarily ever get a "pubkey
633 # doesn't match fingerprint" error (if we hit shnum>=1 before
634 # shnum=0, we pull the pubkey from there). To get repeatable
635 # specific failures, we need to set initial_query_count=1,
636 # but of course that will change the sequencing behavior of
637 # the retrieval process. TODO: find a reasonable way to make
638 # this a parameter, probably when we expand this test to test
639 # for one failure mode at a time.
641 # when we retrieve this, we should get three signature
642 # failures (where we've mangled seqnum, R, and segsize). The
644 d.addCallback(_corrupt_shares)
646 d.addCallback(lambda res: self._newnode3.download_best_version())
647 d.addCallback(_check_download_5)
649 def _check_empty_file(res):
650 # make sure we can create empty files, this usually screws up the
652 d1 = self.clients[2].create_mutable_file("")
653 d1.addCallback(lambda newnode: newnode.download_best_version())
654 d1.addCallback(lambda res: self.failUnlessEqual("", res))
656 d.addCallback(_check_empty_file)
658 d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
659 def _created_dirnode(dnode):
660 log.msg("_created_dirnode(%s)" % (dnode,))
662 d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
663 d1.addCallback(lambda res: dnode.has_child(u"edgar"))
664 d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
665 d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
666 d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
667 d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
668 d1.addCallback(lambda res: dnode.build_manifest().when_done())
669 d1.addCallback(lambda manifest:
670 self.failUnlessEqual(len(manifest), 1))
672 d.addCallback(_created_dirnode)
674 def wait_for_c3_kg_conn():
675 return self.clients[3]._key_generator is not None
676 d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
678 def check_kg_poolsize(junk, size_delta):
679 self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
680 self.key_generator_svc.key_generator.pool_size + size_delta)
682 d.addCallback(check_kg_poolsize, 0)
683 d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
684 d.addCallback(check_kg_poolsize, -1)
685 d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
686 d.addCallback(check_kg_poolsize, -2)
687 # use_helper induces use of clients[3], which is the using-key_gen client
688 d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
689 d.addCallback(check_kg_poolsize, -3)
692 # The default 120 second timeout went off when running it under valgrind
693 # on my old Windows laptop, so I'm bumping up the timeout.
694 test_mutable.timeout = 240
696 def flip_bit(self, good):
697 return good[:-1] + chr(ord(good[-1]) ^ 0x01)
699 def mangle_uri(self, gooduri):
700 # change the key, which changes the storage index, which means we'll
701 # be asking about the wrong file, so nobody will have any shares
702 u = IFileURI(gooduri)
703 u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
704 uri_extension_hash=u.uri_extension_hash,
705 needed_shares=u.needed_shares,
706 total_shares=u.total_shares,
708 return u2.to_string()
710 # TODO: add a test which mangles the uri_extension_hash instead, and
711 # should fail due to not being able to get a valid uri_extension block.
712 # Also a test which sneakily mangles the uri_extension block to change
713 # some of the validation data, so it will fail in the post-download phase
714 # when the file's crypttext integrity check fails. Do the same thing for
715 # the key, which should cause the download to fail the post-download
716 # plaintext_hash check.
718 def test_vdrive(self):
719 self.basedir = "system/SystemTest/test_vdrive"
720 self.data = LARGE_DATA
721 d = self.set_up_nodes(use_stats_gatherer=True)
722 d.addCallback(self._test_introweb)
723 d.addCallback(self.log, "starting publish")
724 d.addCallback(self._do_publish1)
725 d.addCallback(self._test_runner)
726 d.addCallback(self._do_publish2)
727 # at this point, we have the following filesystem (where "R" denotes
728 # self._root_directory_uri):
731 # R/subdir1/mydata567
733 # R/subdir1/subdir2/mydata992
735 d.addCallback(lambda res: self.bounce_client(0))
736 d.addCallback(self.log, "bounced client0")
738 d.addCallback(self._check_publish1)
739 d.addCallback(self.log, "did _check_publish1")
740 d.addCallback(self._check_publish2)
741 d.addCallback(self.log, "did _check_publish2")
742 d.addCallback(self._do_publish_private)
743 d.addCallback(self.log, "did _do_publish_private")
744 # now we also have (where "P" denotes a new dir):
745 # P/personal/sekrit data
746 # P/s2-rw -> /subdir1/subdir2/
747 # P/s2-ro -> /subdir1/subdir2/ (read-only)
748 d.addCallback(self._check_publish_private)
749 d.addCallback(self.log, "did _check_publish_private")
750 d.addCallback(self._test_web)
751 d.addCallback(self._test_control)
752 d.addCallback(self._test_cli)
753 # P now has four top-level children:
754 # P/personal/sekrit data
757 # P/test_put/ (empty)
758 d.addCallback(self._test_checker)
759 d.addCallback(self._grab_stats)
761 test_vdrive.timeout = 1100
763 def _test_introweb(self, res):
764 d = getPage(self.introweb_url, method="GET", followRedirect=True)
767 self.failUnless("allmydata: %s" % str(allmydata.__version__)
769 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
770 self.failUnless("Subscription Summary: storage: 5" in res)
771 except unittest.FailTest:
773 print "GET %s output was:" % self.introweb_url
776 d.addCallback(_check)
777 d.addCallback(lambda res:
778 getPage(self.introweb_url + "?t=json",
779 method="GET", followRedirect=True))
780 def _check_json(res):
781 data = simplejson.loads(res)
783 self.failUnlessEqual(data["subscription_summary"],
785 self.failUnlessEqual(data["announcement_summary"],
786 {"storage": 5, "stub_client": 5})
787 except unittest.FailTest:
789 print "GET %s?t=json output was:" % self.introweb_url
792 d.addCallback(_check_json)
795 def _do_publish1(self, res):
796 ut = upload.Data(self.data, convergence=None)
798 d = c0.create_empty_dirnode()
799 def _made_root(new_dirnode):
800 self._root_directory_uri = new_dirnode.get_uri()
801 return c0.create_node_from_uri(self._root_directory_uri)
802 d.addCallback(_made_root)
803 d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
804 def _made_subdir1(subdir1_node):
805 self._subdir1_node = subdir1_node
806 d1 = subdir1_node.add_file(u"mydata567", ut)
807 d1.addCallback(self.log, "publish finished")
808 def _stash_uri(filenode):
809 self.uri = filenode.get_uri()
810 d1.addCallback(_stash_uri)
812 d.addCallback(_made_subdir1)
815 def _do_publish2(self, res):
816 ut = upload.Data(self.data, convergence=None)
817 d = self._subdir1_node.create_empty_directory(u"subdir2")
818 d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
821 def log(self, res, *args, **kwargs):
822 # print "MSG: %s RES: %s" % (msg, args)
823 log.msg(*args, **kwargs)
826 def _do_publish_private(self, res):
827 self.smalldata = "sssh, very secret stuff"
828 ut = upload.Data(self.smalldata, convergence=None)
829 d = self.clients[0].create_empty_dirnode()
830 d.addCallback(self.log, "GOT private directory")
831 def _got_new_dir(privnode):
832 rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
833 d1 = privnode.create_empty_directory(u"personal")
834 d1.addCallback(self.log, "made P/personal")
835 d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
836 d1.addCallback(self.log, "made P/personal/sekrit data")
837 d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
839 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
840 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
842 d1.addCallback(_got_s2)
843 d1.addCallback(lambda res: privnode)
845 d.addCallback(_got_new_dir)
848 def _check_publish1(self, res):
849 # this one uses the iterative API
851 d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
852 d.addCallback(self.log, "check_publish1 got /")
853 d.addCallback(lambda root: root.get(u"subdir1"))
854 d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
855 d.addCallback(lambda filenode: filenode.download_to_data())
856 d.addCallback(self.log, "get finished")
858 self.failUnlessEqual(data, self.data)
859 d.addCallback(_get_done)
862 def _check_publish2(self, res):
863 # this one uses the path-based API
864 rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
865 d = rootnode.get_child_at_path(u"subdir1")
866 d.addCallback(lambda dirnode:
867 self.failUnless(IDirectoryNode.providedBy(dirnode)))
868 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
869 d.addCallback(lambda filenode: filenode.download_to_data())
870 d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
872 d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
873 def _got_filenode(filenode):
874 fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
875 assert fnode == filenode
876 d.addCallback(_got_filenode)
879 def _check_publish_private(self, resnode):
880 # this one uses the path-based API
881 self._private_node = resnode
883 d = self._private_node.get_child_at_path(u"personal")
884 def _got_personal(personal):
885 self._personal_node = personal
887 d.addCallback(_got_personal)
889 d.addCallback(lambda dirnode:
890 self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
892 return self._private_node.get_child_at_path(path)
894 d.addCallback(lambda res: get_path(u"personal/sekrit data"))
895 d.addCallback(lambda filenode: filenode.download_to_data())
896 d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
897 d.addCallback(lambda res: get_path(u"s2-rw"))
898 d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
899 d.addCallback(lambda res: get_path(u"s2-ro"))
900 def _got_s2ro(dirnode):
901 self.failUnless(dirnode.is_mutable(), dirnode)
902 self.failUnless(dirnode.is_readonly(), dirnode)
903 d1 = defer.succeed(None)
904 d1.addCallback(lambda res: dirnode.list())
905 d1.addCallback(self.log, "dirnode.list")
907 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
909 d1.addCallback(self.log, "doing add_file(ro)")
910 ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
911 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
913 d1.addCallback(self.log, "doing get(ro)")
914 d1.addCallback(lambda res: dirnode.get(u"mydata992"))
915 d1.addCallback(lambda filenode:
916 self.failUnless(IFileNode.providedBy(filenode)))
918 d1.addCallback(self.log, "doing delete(ro)")
919 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
921 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
923 d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
925 personal = self._personal_node
926 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
928 d1.addCallback(self.log, "doing move_child_to(ro)2")
929 d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
931 d1.addCallback(self.log, "finished with _got_s2ro")
933 d.addCallback(_got_s2ro)
934 def _got_home(dummy):
935 home = self._private_node
936 personal = self._personal_node
937 d1 = defer.succeed(None)
938 d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
939 d1.addCallback(lambda res:
940 personal.move_child_to(u"sekrit data",home,u"sekrit"))
942 d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
943 d1.addCallback(lambda res:
944 home.move_child_to(u"sekrit", home, u"sekrit data"))
946 d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
947 d1.addCallback(lambda res:
948 home.move_child_to(u"sekrit data", personal))
950 d1.addCallback(lambda res: home.build_manifest().when_done())
951 d1.addCallback(self.log, "manifest")
955 # P/personal/sekrit data
956 # P/s2-rw (same as P/s2-ro)
957 # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
958 d1.addCallback(lambda manifest:
959 self.failUnlessEqual(len(manifest), 5))
960 d1.addCallback(lambda res: home.start_deep_stats().when_done())
961 def _check_stats(stats):
962 expected = {"count-immutable-files": 1,
963 "count-mutable-files": 0,
964 "count-literal-files": 1,
966 "count-directories": 3,
967 "size-immutable-files": 112,
968 "size-literal-files": 23,
969 #"size-directories": 616, # varies
970 #"largest-directory": 616,
971 "largest-directory-children": 3,
972 "largest-immutable-file": 112,
974 for k,v in expected.iteritems():
975 self.failUnlessEqual(stats[k], v,
976 "stats[%s] was %s, not %s" %
978 self.failUnless(stats["size-directories"] > 1300,
979 stats["size-directories"])
980 self.failUnless(stats["largest-directory"] > 800,
981 stats["largest-directory"])
982 self.failUnlessEqual(stats["size-files-histogram"],
983 [ (11, 31, 1), (101, 316, 1) ])
984 d1.addCallback(_check_stats)
986 d.addCallback(_got_home)
989 def shouldFail(self, res, expected_failure, which, substring=None):
990 if isinstance(res, Failure):
991 res.trap(expected_failure)
993 self.failUnless(substring in str(res),
994 "substring '%s' not in '%s'"
995 % (substring, str(res)))
997 self.fail("%s was supposed to raise %s, not get '%s'" %
998 (which, expected_failure, res))
1000 def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1001 assert substring is None or isinstance(substring, str)
1002 d = defer.maybeDeferred(callable, *args, **kwargs)
1004 if isinstance(res, Failure):
1005 res.trap(expected_failure)
1007 self.failUnless(substring in str(res),
1008 "substring '%s' not in '%s'"
1009 % (substring, str(res)))
1011 self.fail("%s was supposed to raise %s, not get '%s'" %
1012 (which, expected_failure, res))
1016 def PUT(self, urlpath, data):
1017 url = self.webish_url + urlpath
1018 return getPage(url, method="PUT", postdata=data)
1020 def GET(self, urlpath, followRedirect=False):
1021 url = self.webish_url + urlpath
1022 return getPage(url, method="GET", followRedirect=followRedirect)
1024 def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1026 url = self.helper_webish_url + urlpath
1028 url = self.webish_url + urlpath
1029 sepbase = "boogabooga"
1030 sep = "--" + sepbase
1033 form.append('Content-Disposition: form-data; name="_charset"')
1035 form.append('UTF-8')
1037 for name, value in fields.iteritems():
1038 if isinstance(value, tuple):
1039 filename, value = value
1040 form.append('Content-Disposition: form-data; name="%s"; '
1041 'filename="%s"' % (name, filename.encode("utf-8")))
1043 form.append('Content-Disposition: form-data; name="%s"' % name)
1045 form.append(str(value))
1048 body = "\r\n".join(form) + "\r\n"
1049 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1051 return getPage(url, method="POST", postdata=body,
1052 headers=headers, followRedirect=followRedirect)
1054 def _test_web(self, res):
1055 base = self.webish_url
1056 public = "uri/" + self._root_directory_uri
1058 def _got_welcome(page):
1059 expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1060 self.failUnless(expected in page,
1061 "I didn't see the right 'connected storage servers'"
1062 " message in: %s" % page
1064 expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1065 self.failUnless(expected in page,
1066 "I didn't see the right 'My nodeid' message "
1068 self.failUnless("Helper: 0 active uploads" in page)
1069 d.addCallback(_got_welcome)
1070 d.addCallback(self.log, "done with _got_welcome")
1072 # get the welcome page from the node that uses the helper too
1073 d.addCallback(lambda res: getPage(self.helper_webish_url))
1074 def _got_welcome_helper(page):
1075 self.failUnless("Connected to helper?: <span>yes</span>" in page,
1077 self.failUnless("Not running helper" in page)
1078 d.addCallback(_got_welcome_helper)
1080 d.addCallback(lambda res: getPage(base + public))
1081 d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1082 def _got_subdir1(page):
1083 # there ought to be an href for our file
1084 self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1085 self.failUnless(">mydata567</a>" in page)
1086 d.addCallback(_got_subdir1)
1087 d.addCallback(self.log, "done with _got_subdir1")
1088 d.addCallback(lambda res:
1089 getPage(base + public + "/subdir1/mydata567"))
1090 def _got_data(page):
1091 self.failUnlessEqual(page, self.data)
1092 d.addCallback(_got_data)
1094 # download from a URI embedded in a URL
1095 d.addCallback(self.log, "_get_from_uri")
1096 def _get_from_uri(res):
1097 return getPage(base + "uri/%s?filename=%s"
1098 % (self.uri, "mydata567"))
1099 d.addCallback(_get_from_uri)
1100 def _got_from_uri(page):
1101 self.failUnlessEqual(page, self.data)
1102 d.addCallback(_got_from_uri)
1104 # download from a URI embedded in a URL, second form
1105 d.addCallback(self.log, "_get_from_uri2")
1106 def _get_from_uri2(res):
1107 return getPage(base + "uri?uri=%s" % (self.uri,))
1108 d.addCallback(_get_from_uri2)
1109 d.addCallback(_got_from_uri)
1111 # download from a bogus URI, make sure we get a reasonable error
1112 d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1113 def _get_from_bogus_uri(res):
1114 d1 = getPage(base + "uri/%s?filename=%s"
1115 % (self.mangle_uri(self.uri), "mydata567"))
1116 d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1119 d.addCallback(_get_from_bogus_uri)
1120 d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1122 # upload a file with PUT
1123 d.addCallback(self.log, "about to try PUT")
1124 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1125 "new.txt contents"))
1126 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1127 d.addCallback(self.failUnlessEqual, "new.txt contents")
1128 # and again with something large enough to use multiple segments,
1129 # and hopefully trigger pauseProducing too
1130 d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1131 "big" * 500000)) # 1.5MB
1132 d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1133 d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1135 # can we replace files in place?
1136 d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1138 d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1139 d.addCallback(self.failUnlessEqual, "NEWER contents")
1141 # test unlinked POST
1142 d.addCallback(lambda res: self.POST("uri", t="upload",
1143 file=("new.txt", "data" * 10000)))
1144 # and again using the helper, which exercises different upload-status
1146 d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1147 file=("foo.txt", "data2" * 10000)))
1149 # check that the status page exists
1150 d.addCallback(lambda res: self.GET("status", followRedirect=True))
1151 def _got_status(res):
1152 # find an interesting upload and download to look at. LIT files
1153 # are not interesting.
1154 for ds in self.clients[0].list_all_download_statuses():
1155 if ds.get_size() > 200:
1156 self._down_status = ds.get_counter()
1157 for us in self.clients[0].list_all_upload_statuses():
1158 if us.get_size() > 200:
1159 self._up_status = us.get_counter()
1160 rs = self.clients[0].list_all_retrieve_statuses()[0]
1161 self._retrieve_status = rs.get_counter()
1162 ps = self.clients[0].list_all_publish_statuses()[0]
1163 self._publish_status = ps.get_counter()
1164 us = self.clients[0].list_all_mapupdate_statuses()[0]
1165 self._update_status = us.get_counter()
1167 # and that there are some upload- and download- status pages
1168 return self.GET("status/up-%d" % self._up_status)
1169 d.addCallback(_got_status)
1171 return self.GET("status/down-%d" % self._down_status)
1172 d.addCallback(_got_up)
1174 return self.GET("status/mapupdate-%d" % self._update_status)
1175 d.addCallback(_got_down)
1176 def _got_update(res):
1177 return self.GET("status/publish-%d" % self._publish_status)
1178 d.addCallback(_got_update)
1179 def _got_publish(res):
1180 return self.GET("status/retrieve-%d" % self._retrieve_status)
1181 d.addCallback(_got_publish)
1183 # check that the helper status page exists
1184 d.addCallback(lambda res:
1185 self.GET("helper_status", followRedirect=True))
1186 def _got_helper_status(res):
1187 self.failUnless("Bytes Fetched:" in res)
1188 # touch a couple of files in the helper's working directory to
1189 # exercise more code paths
1190 workdir = os.path.join(self.getdir("client0"), "helper")
1191 incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1192 f = open(incfile, "wb")
1193 f.write("small file")
1195 then = time.time() - 86400*3
1197 os.utime(incfile, (now, then))
1198 encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1199 f = open(encfile, "wb")
1200 f.write("less small file")
1202 os.utime(encfile, (now, then))
1203 d.addCallback(_got_helper_status)
1204 # and that the json form exists
1205 d.addCallback(lambda res:
1206 self.GET("helper_status?t=json", followRedirect=True))
1207 def _got_helper_status_json(res):
1208 data = simplejson.loads(res)
1209 self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1211 self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1212 self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1213 self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1215 self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1216 self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1217 self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1219 d.addCallback(_got_helper_status_json)
1221 # and check that client[3] (which uses a helper but does not run one
1222 # itself) doesn't explode when you ask for its status
1223 d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1224 def _got_non_helper_status(res):
1225 self.failUnless("Upload and Download Status" in res)
1226 d.addCallback(_got_non_helper_status)
1228 # or for helper status with t=json
1229 d.addCallback(lambda res:
1230 getPage(self.helper_webish_url + "helper_status?t=json"))
1231 def _got_non_helper_status_json(res):
1232 data = simplejson.loads(res)
1233 self.failUnlessEqual(data, {})
1234 d.addCallback(_got_non_helper_status_json)
1236 # see if the statistics page exists
1237 d.addCallback(lambda res: self.GET("statistics"))
1238 def _got_stats(res):
1239 self.failUnless("Node Statistics" in res)
1240 self.failUnless(" 'downloader.files_downloaded': 5," in res, res)
1241 d.addCallback(_got_stats)
1242 d.addCallback(lambda res: self.GET("statistics?t=json"))
1243 def _got_stats_json(res):
1244 data = simplejson.loads(res)
1245 self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1246 self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1247 d.addCallback(_got_stats_json)
1249 # TODO: mangle the second segment of a file, to test errors that
1250 # occur after we've already sent some good data, which uses a
1251 # different error path.
1253 # TODO: download a URI with a form
1254 # TODO: create a directory by using a form
1255 # TODO: upload by using a form on the directory page
1256 # url = base + "somedir/subdir1/freeform_post!!upload"
1257 # TODO: delete a file by using a button on the directory page
1261 def _test_runner(self, res):
1262 # exercise some of the diagnostic tools in runner.py
1265 for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1266 if "storage" not in dirpath:
1270 pieces = dirpath.split(os.sep)
1271 if pieces[-4] == "storage" and pieces[-3] == "shares":
1272 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1273 # are sharefiles here
1274 filename = os.path.join(dirpath, filenames[0])
1275 # peek at the magic to see if it is a chk share
1276 magic = open(filename, "rb").read(4)
1277 if magic == '\x00\x00\x00\x01':
1280 self.fail("unable to find any uri_extension files in %s"
1282 log.msg("test_system.SystemTest._test_runner using %s" % filename)
1284 out,err = StringIO(), StringIO()
1285 rc = runner.runner(["debug", "dump-share", "--offsets",
1287 stdout=out, stderr=err)
1288 output = out.getvalue()
1289 self.failUnlessEqual(rc, 0)
1291 # we only upload a single file, so we can assert some things about
1292 # its size and shares.
1293 self.failUnless(("share filename: %s" % filename) in output)
1294 self.failUnless("size: %d\n" % len(self.data) in output)
1295 self.failUnless("num_segments: 1\n" in output)
1296 # segment_size is always a multiple of needed_shares
1297 self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1298 self.failUnless("total_shares: 10\n" in output)
1299 # keys which are supposed to be present
1300 for key in ("size", "num_segments", "segment_size",
1301 "needed_shares", "total_shares",
1302 "codec_name", "codec_params", "tail_codec_params",
1303 #"plaintext_hash", "plaintext_root_hash",
1304 "crypttext_hash", "crypttext_root_hash",
1305 "share_root_hash", "UEB_hash"):
1306 self.failUnless("%s: " % key in output, key)
1307 self.failUnless(" verify-cap: URI:CHK-Verifier:" in output)
1309 # now use its storage index to find the other shares using the
1310 # 'find-shares' tool
1311 sharedir, shnum = os.path.split(filename)
1312 storagedir, storage_index_s = os.path.split(sharedir)
1313 out,err = StringIO(), StringIO()
1314 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1315 cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1316 rc = runner.runner(cmd, stdout=out, stderr=err)
1317 self.failUnlessEqual(rc, 0)
1319 sharefiles = [sfn.strip() for sfn in out.readlines()]
1320 self.failUnlessEqual(len(sharefiles), 10)
1322 # also exercise the 'catalog-shares' tool
1323 out,err = StringIO(), StringIO()
1324 nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1325 cmd = ["debug", "catalog-shares"] + nodedirs
1326 rc = runner.runner(cmd, stdout=out, stderr=err)
1327 self.failUnlessEqual(rc, 0)
1329 descriptions = [sfn.strip() for sfn in out.readlines()]
1330 self.failUnlessEqual(len(descriptions), 30)
1332 for line in descriptions
1333 if line.startswith("CHK %s " % storage_index_s)]
1334 self.failUnlessEqual(len(matching), 10)
1336 def _test_control(self, res):
1337 # exercise the remote-control-the-client foolscap interfaces in
1338 # allmydata.control (mostly used for performance tests)
1339 c0 = self.clients[0]
1340 control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1341 control_furl = open(control_furl_file, "r").read().strip()
1342 # it doesn't really matter which Tub we use to connect to the client,
1343 # so let's just use our IntroducerNode's
1344 d = self.introducer.tub.getReference(control_furl)
1345 d.addCallback(self._test_control2, control_furl_file)
1347 def _test_control2(self, rref, filename):
1348 d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1349 downfile = os.path.join(self.basedir, "control.downfile")
1350 d.addCallback(lambda uri:
1351 rref.callRemote("download_from_uri_to_file",
1354 self.failUnlessEqual(res, downfile)
1355 data = open(downfile, "r").read()
1356 expected_data = open(filename, "r").read()
1357 self.failUnlessEqual(data, expected_data)
1358 d.addCallback(_check)
1359 d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1360 if sys.platform == "linux2":
1361 d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1362 d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1365 def _test_cli(self, res):
1366 # run various CLI commands (in a thread, since they use blocking
1369 private_uri = self._private_node.get_uri()
1370 some_uri = self._root_directory_uri
1371 client0_basedir = self.getdir("client0")
1374 "--node-directory", client0_basedir,
1376 TESTDATA = "I will not write the same thing over and over.\n" * 100
1378 d = defer.succeed(None)
1380 # for compatibility with earlier versions, private/root_dir.cap is
1381 # supposed to be treated as an alias named "tahoe:". Start by making
1382 # sure that works, before we add other aliases.
1384 root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1385 f = open(root_file, "w")
1386 f.write(private_uri)
1389 def run(ignored, verb, *args, **kwargs):
1390 stdin = kwargs.get("stdin", "")
1391 newargs = [verb] + nodeargs + list(args)
1392 return self._run_cli(newargs, stdin=stdin)
1394 def _check_ls((out,err), expected_children, unexpected_children=[]):
1395 self.failUnlessEqual(err, "")
1396 for s in expected_children:
1397 self.failUnless(s in out, (s,out))
1398 for s in unexpected_children:
1399 self.failIf(s in out, (s,out))
1401 def _check_ls_root((out,err)):
1402 self.failUnless("personal" in out)
1403 self.failUnless("s2-ro" in out)
1404 self.failUnless("s2-rw" in out)
1405 self.failUnlessEqual(err, "")
1407 # this should reference private_uri
1408 d.addCallback(run, "ls")
1409 d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1411 d.addCallback(run, "list-aliases")
1412 def _check_aliases_1((out,err)):
1413 self.failUnlessEqual(err, "")
1414 self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1415 d.addCallback(_check_aliases_1)
1417 # now that that's out of the way, remove root_dir.cap and work with
1419 d.addCallback(lambda res: os.unlink(root_file))
1420 d.addCallback(run, "list-aliases")
1421 def _check_aliases_2((out,err)):
1422 self.failUnlessEqual(err, "")
1423 self.failUnlessEqual(out, "")
1424 d.addCallback(_check_aliases_2)
1426 d.addCallback(run, "mkdir")
1427 def _got_dir( (out,err) ):
1428 self.failUnless(uri.from_string_dirnode(out.strip()))
1430 d.addCallback(_got_dir)
1431 d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1433 d.addCallback(run, "list-aliases")
1434 def _check_aliases_3((out,err)):
1435 self.failUnlessEqual(err, "")
1436 self.failUnless("tahoe: " in out)
1437 d.addCallback(_check_aliases_3)
1439 def _check_empty_dir((out,err)):
1440 self.failUnlessEqual(out, "")
1441 self.failUnlessEqual(err, "")
1442 d.addCallback(run, "ls")
1443 d.addCallback(_check_empty_dir)
1445 def _check_missing_dir((out,err)):
1446 # TODO: check that rc==2
1447 self.failUnlessEqual(out, "")
1448 self.failUnlessEqual(err, "No such file or directory\n")
1449 d.addCallback(run, "ls", "bogus")
1450 d.addCallback(_check_missing_dir)
1455 fn = os.path.join(self.basedir, "file%d" % i)
1457 data = "data to be uploaded: file%d\n" % i
1459 open(fn,"wb").write(data)
1461 def _check_stdout_against((out,err), filenum=None, data=None):
1462 self.failUnlessEqual(err, "")
1463 if filenum is not None:
1464 self.failUnlessEqual(out, datas[filenum])
1465 if data is not None:
1466 self.failUnlessEqual(out, data)
1468 # test all both forms of put: from a file, and from stdin
1470 d.addCallback(run, "put", files[0], "tahoe-file0")
1471 def _put_out((out,err)):
1472 self.failUnless("URI:LIT:" in out, out)
1473 self.failUnless("201 Created" in err, err)
1475 return run(None, "get", uri0)
1476 d.addCallback(_put_out)
1477 d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1479 d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1480 # tahoe put bar tahoe:FOO
1481 d.addCallback(run, "put", files[2], "tahoe:file2")
1482 d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1483 def _check_put_mutable((out,err)):
1484 self._mutable_file3_uri = out.strip()
1485 d.addCallback(_check_put_mutable)
1486 d.addCallback(run, "get", "tahoe:file3")
1487 d.addCallback(_check_stdout_against, 3)
1490 STDIN_DATA = "This is the file to upload from stdin."
1491 d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1492 # tahoe put tahoe:FOO
1493 d.addCallback(run, "put", "-", "tahoe:from-stdin",
1494 stdin="Other file from stdin.")
1496 d.addCallback(run, "ls")
1497 d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1498 "tahoe-file-stdin", "from-stdin"])
1499 d.addCallback(run, "ls", "subdir")
1500 d.addCallback(_check_ls, ["tahoe-file1"])
1503 d.addCallback(run, "mkdir", "subdir2")
1504 d.addCallback(run, "ls")
1505 # TODO: extract the URI, set an alias with it
1506 d.addCallback(_check_ls, ["subdir2"])
1508 # tahoe get: (to stdin and to a file)
1509 d.addCallback(run, "get", "tahoe-file0")
1510 d.addCallback(_check_stdout_against, 0)
1511 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1512 d.addCallback(_check_stdout_against, 1)
1513 outfile0 = os.path.join(self.basedir, "outfile0")
1514 d.addCallback(run, "get", "file2", outfile0)
1515 def _check_outfile0((out,err)):
1516 data = open(outfile0,"rb").read()
1517 self.failUnlessEqual(data, "data to be uploaded: file2\n")
1518 d.addCallback(_check_outfile0)
1519 outfile1 = os.path.join(self.basedir, "outfile0")
1520 d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1521 def _check_outfile1((out,err)):
1522 data = open(outfile1,"rb").read()
1523 self.failUnlessEqual(data, "data to be uploaded: file1\n")
1524 d.addCallback(_check_outfile1)
1526 d.addCallback(run, "rm", "tahoe-file0")
1527 d.addCallback(run, "rm", "tahoe:file2")
1528 d.addCallback(run, "ls")
1529 d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1531 d.addCallback(run, "ls", "-l")
1532 def _check_ls_l((out,err)):
1533 lines = out.split("\n")
1535 if "tahoe-file-stdin" in l:
1536 self.failUnless(l.startswith("-r-- "), l)
1537 self.failUnless(" %d " % len(STDIN_DATA) in l)
1539 self.failUnless(l.startswith("-rw- "), l) # mutable
1540 d.addCallback(_check_ls_l)
1542 d.addCallback(run, "ls", "--uri")
1543 def _check_ls_uri((out,err)):
1544 lines = out.split("\n")
1547 self.failUnless(self._mutable_file3_uri in l)
1548 d.addCallback(_check_ls_uri)
1550 d.addCallback(run, "ls", "--readonly-uri")
1551 def _check_ls_rouri((out,err)):
1552 lines = out.split("\n")
1555 rw_uri = self._mutable_file3_uri
1556 u = uri.from_string_mutable_filenode(rw_uri)
1557 ro_uri = u.get_readonly().to_string()
1558 self.failUnless(ro_uri in l)
1559 d.addCallback(_check_ls_rouri)
1562 d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1563 d.addCallback(run, "ls")
1564 d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1566 d.addCallback(run, "ln", "tahoe-moved", "newlink")
1567 d.addCallback(run, "ls")
1568 d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1570 d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1571 d.addCallback(run, "ls")
1572 d.addCallback(_check_ls, ["file3", "file3-copy"])
1573 d.addCallback(run, "get", "tahoe:file3-copy")
1574 d.addCallback(_check_stdout_against, 3)
1576 # copy from disk into tahoe
1577 d.addCallback(run, "cp", files[4], "tahoe:file4")
1578 d.addCallback(run, "ls")
1579 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1580 d.addCallback(run, "get", "tahoe:file4")
1581 d.addCallback(_check_stdout_against, 4)
1583 # copy from tahoe into disk
1584 target_filename = os.path.join(self.basedir, "file-out")
1585 d.addCallback(run, "cp", "tahoe:file4", target_filename)
1586 def _check_cp_out((out,err)):
1587 self.failUnless(os.path.exists(target_filename))
1588 got = open(target_filename,"rb").read()
1589 self.failUnlessEqual(got, datas[4])
1590 d.addCallback(_check_cp_out)
1592 # copy from disk to disk (silly case)
1593 target2_filename = os.path.join(self.basedir, "file-out-copy")
1594 d.addCallback(run, "cp", target_filename, target2_filename)
1595 def _check_cp_out2((out,err)):
1596 self.failUnless(os.path.exists(target2_filename))
1597 got = open(target2_filename,"rb").read()
1598 self.failUnlessEqual(got, datas[4])
1599 d.addCallback(_check_cp_out2)
1601 # copy from tahoe into disk, overwriting an existing file
1602 d.addCallback(run, "cp", "tahoe:file3", target_filename)
1603 def _check_cp_out3((out,err)):
1604 self.failUnless(os.path.exists(target_filename))
1605 got = open(target_filename,"rb").read()
1606 self.failUnlessEqual(got, datas[3])
1607 d.addCallback(_check_cp_out3)
1609 # copy from disk into tahoe, overwriting an existing immutable file
1610 d.addCallback(run, "cp", files[5], "tahoe:file4")
1611 d.addCallback(run, "ls")
1612 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1613 d.addCallback(run, "get", "tahoe:file4")
1614 d.addCallback(_check_stdout_against, 5)
1616 # copy from disk into tahoe, overwriting an existing mutable file
1617 d.addCallback(run, "cp", files[5], "tahoe:file3")
1618 d.addCallback(run, "ls")
1619 d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1620 d.addCallback(run, "get", "tahoe:file3")
1621 d.addCallback(_check_stdout_against, 5)
1623 # recursive copy: setup
1624 dn = os.path.join(self.basedir, "dir1")
1626 open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1627 open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1628 open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1629 sdn2 = os.path.join(dn, "subdir2")
1631 open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1632 open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1634 # from disk into tahoe
1635 d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1636 d.addCallback(run, "ls")
1637 d.addCallback(_check_ls, ["dir1"])
1638 d.addCallback(run, "ls", "dir1")
1639 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1640 ["rfile4", "rfile5"])
1641 d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1642 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1643 ["rfile1", "rfile2", "rfile3"])
1644 d.addCallback(run, "get", "dir1/subdir2/rfile4")
1645 d.addCallback(_check_stdout_against, data="rfile4")
1647 # and back out again
1648 dn_copy = os.path.join(self.basedir, "dir1-copy")
1649 d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1650 def _check_cp_r_out((out,err)):
1652 old = open(os.path.join(dn, name), "rb").read()
1653 newfn = os.path.join(dn_copy, name)
1654 self.failUnless(os.path.exists(newfn))
1655 new = open(newfn, "rb").read()
1656 self.failUnlessEqual(old, new)
1660 _cmp(os.path.join("subdir2", "rfile4"))
1661 _cmp(os.path.join("subdir2", "rfile5"))
1662 d.addCallback(_check_cp_r_out)
1664 # and copy it a second time, which ought to overwrite the same files
1665 d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1667 # and tahoe-to-tahoe
1668 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1669 d.addCallback(run, "ls")
1670 d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1671 d.addCallback(run, "ls", "dir1-copy")
1672 d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1673 ["rfile4", "rfile5"])
1674 d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1675 d.addCallback(_check_ls, ["rfile4", "rfile5"],
1676 ["rfile1", "rfile2", "rfile3"])
1677 d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1678 d.addCallback(_check_stdout_against, data="rfile4")
1680 # and copy it a second time, which ought to overwrite the same files
1681 d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1683 # tahoe_ls doesn't currently handle the error correctly: it tries to
1684 # JSON-parse a traceback.
1685 ## def _ls_missing(res):
1686 ## argv = ["ls"] + nodeargs + ["bogus"]
1687 ## return self._run_cli(argv)
1688 ## d.addCallback(_ls_missing)
1689 ## def _check_ls_missing((out,err)):
1692 ## self.failUnlessEqual(err, "")
1693 ## d.addCallback(_check_ls_missing)
1697 def _run_cli(self, argv, stdin=""):
1699 stdout, stderr = StringIO(), StringIO()
1700 d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1701 stdin=StringIO(stdin),
1702 stdout=stdout, stderr=stderr)
1704 return stdout.getvalue(), stderr.getvalue()
1705 d.addCallback(_done)
1708 def _test_checker(self, res):
1709 ut = upload.Data("too big to be literal" * 200, convergence=None)
1710 d = self._personal_node.add_file(u"big file", ut)
1712 d.addCallback(lambda res: self._personal_node.check(Monitor()))
1713 def _check_dirnode_results(r):
1714 self.failUnless(r.is_healthy())
1715 d.addCallback(_check_dirnode_results)
1716 d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1717 d.addCallback(_check_dirnode_results)
1719 d.addCallback(lambda res: self._personal_node.get(u"big file"))
1720 def _got_chk_filenode(n):
1721 self.failUnless(isinstance(n, filenode.FileNode))
1722 d = n.check(Monitor())
1723 def _check_filenode_results(r):
1724 self.failUnless(r.is_healthy())
1725 d.addCallback(_check_filenode_results)
1726 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1727 d.addCallback(_check_filenode_results)
1729 d.addCallback(_got_chk_filenode)
1731 d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1732 def _got_lit_filenode(n):
1733 self.failUnless(isinstance(n, filenode.LiteralFileNode))
1734 d = n.check(Monitor())
1735 def _check_lit_filenode_results(r):
1736 self.failUnlessEqual(r, None)
1737 d.addCallback(_check_lit_filenode_results)
1738 d.addCallback(lambda res: n.check(Monitor(), verify=True))
1739 d.addCallback(_check_lit_filenode_results)
1741 d.addCallback(_got_lit_filenode)
1745 class MutableChecker(SystemTestMixin, unittest.TestCase, WebErrorMixin):
1747 def _run_cli(self, argv):
1748 stdout, stderr = StringIO(), StringIO()
1749 runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1750 return stdout.getvalue()
1752 def test_good(self):
1753 self.basedir = self.mktemp()
1754 d = self.set_up_nodes()
1755 CONTENTS = "a little bit of data"
1756 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1759 si = self.node.get_storage_index()
1760 d.addCallback(_created)
1761 # now make sure the webapi verifier sees no problems
1763 url = (self.webish_url +
1764 "uri/%s" % urllib.quote(self.node.get_uri()) +
1765 "?t=check&verify=true")
1766 return getPage(url, method="POST")
1767 d.addCallback(_do_check)
1768 def _got_results(out):
1769 self.failUnless("<span>Healthy!</span>" in out, out)
1770 self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1771 self.failIf("Not Healthy!" in out, out)
1772 self.failIf("Unhealthy" in out, out)
1773 self.failIf("Corrupt Shares" in out, out)
1774 d.addCallback(_got_results)
1775 d.addErrback(self.explain_web_error)
1778 def test_corrupt(self):
1779 self.basedir = self.mktemp()
1780 d = self.set_up_nodes()
1781 CONTENTS = "a little bit of data"
1782 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1785 si = self.node.get_storage_index()
1786 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1787 self.clients[1].basedir])
1788 files = out.split("\n")
1789 # corrupt one of them, using the CLI debug command
1791 shnum = os.path.basename(f)
1792 nodeid = self.clients[1].nodeid
1793 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1794 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1795 out = self._run_cli(["debug", "corrupt-share", files[0]])
1796 d.addCallback(_created)
1797 # now make sure the webapi verifier notices it
1799 url = (self.webish_url +
1800 "uri/%s" % urllib.quote(self.node.get_uri()) +
1801 "?t=check&verify=true")
1802 return getPage(url, method="POST")
1803 d.addCallback(_do_check)
1804 def _got_results(out):
1805 self.failUnless("Not Healthy!" in out, out)
1806 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1807 self.failUnless("Corrupt Shares:" in out, out)
1808 d.addCallback(_got_results)
1810 # now make sure the webapi repairer can fix it
1811 def _do_repair(res):
1812 url = (self.webish_url +
1813 "uri/%s" % urllib.quote(self.node.get_uri()) +
1814 "?t=check&verify=true&repair=true")
1815 return getPage(url, method="POST")
1816 d.addCallback(_do_repair)
1817 def _got_repair_results(out):
1818 self.failUnless("<div>Repair successful</div>" in out, out)
1819 d.addCallback(_got_repair_results)
1820 d.addCallback(_do_check)
1821 def _got_postrepair_results(out):
1822 self.failIf("Not Healthy!" in out, out)
1823 self.failUnless("Recoverable Versions: 10*seq" in out, out)
1824 d.addCallback(_got_postrepair_results)
1825 d.addErrback(self.explain_web_error)
1829 def test_delete_share(self):
1830 self.basedir = self.mktemp()
1831 d = self.set_up_nodes()
1832 CONTENTS = "a little bit of data"
1833 d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1836 si = self.node.get_storage_index()
1837 out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1838 self.clients[1].basedir])
1839 files = out.split("\n")
1840 # corrupt one of them, using the CLI debug command
1842 shnum = os.path.basename(f)
1843 nodeid = self.clients[1].nodeid
1844 nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1845 self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1847 d.addCallback(_created)
1848 # now make sure the webapi checker notices it
1850 url = (self.webish_url +
1851 "uri/%s" % urllib.quote(self.node.get_uri()) +
1852 "?t=check&verify=false")
1853 return getPage(url, method="POST")
1854 d.addCallback(_do_check)
1855 def _got_results(out):
1856 self.failUnless("Not Healthy!" in out, out)
1857 self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1858 self.failIf("Corrupt Shares" in out, out)
1859 d.addCallback(_got_results)
1861 # now make sure the webapi repairer can fix it
1862 def _do_repair(res):
1863 url = (self.webish_url +
1864 "uri/%s" % urllib.quote(self.node.get_uri()) +
1865 "?t=check&verify=false&repair=true")
1866 return getPage(url, method="POST")
1867 d.addCallback(_do_repair)
1868 def _got_repair_results(out):
1869 self.failUnless("Repair successful" in out)
1870 d.addCallback(_got_repair_results)
1871 d.addCallback(_do_check)
1872 def _got_postrepair_results(out):
1873 self.failIf("Not Healthy!" in out, out)
1874 self.failUnless("Recoverable Versions: 10*seq" in out)
1875 d.addCallback(_got_postrepair_results)
1876 d.addErrback(self.explain_web_error)
1880 class DeepCheckWeb(SystemTestMixin, unittest.TestCase, WebErrorMixin):
1881 # construct a small directory tree (with one dir, one immutable file, one
1882 # mutable file, one LIT file, and a loop), and then check/examine it in
1885 def set_up_tree(self, ignored):
1887 c0 = self.clients[0]
1888 d = c0.create_empty_dirnode()
1889 def _created_root(n):
1891 self.root_uri = n.get_uri()
1892 d.addCallback(_created_root)
1893 d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1894 d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1895 def _created_mutable(n):
1897 self.mutable_uri = n.get_uri()
1898 d.addCallback(_created_mutable)
1900 large = upload.Data("Lots of data\n" * 1000, None)
1901 d.addCallback(lambda ign: self.root.add_file(u"large", large))
1902 def _created_large(n):
1904 self.large_uri = n.get_uri()
1905 d.addCallback(_created_large)
1907 small = upload.Data("Small enough for a LIT", None)
1908 d.addCallback(lambda ign: self.root.add_file(u"small", small))
1909 def _created_small(n):
1911 self.small_uri = n.get_uri()
1912 d.addCallback(_created_small)
1914 d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
1917 def check_is_healthy(self, cr, n, where, incomplete=False):
1918 self.failUnless(ICheckerResults.providedBy(cr), where)
1919 self.failUnless(cr.is_healthy(), where)
1920 self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
1922 self.failUnlessEqual(cr.get_storage_index_string(),
1923 base32.b2a(n.get_storage_index()), where)
1924 needs_rebalancing = bool( len(self.clients) < 10 )
1926 self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
1928 self.failUnlessEqual(d["count-shares-good"], 10, where)
1929 self.failUnlessEqual(d["count-shares-needed"], 3, where)
1930 self.failUnlessEqual(d["count-shares-expected"], 10, where)
1932 self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
1933 self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
1934 self.failUnlessEqual(d["list-corrupt-shares"], [], where)
1936 self.failUnlessEqual(sorted(d["servers-responding"]),
1937 sorted([c.nodeid for c in self.clients]),
1939 self.failUnless("sharemap" in d, where)
1940 all_serverids = set()
1941 for (shareid, serverids) in d["sharemap"].items():
1942 all_serverids.update(serverids)
1943 self.failUnlessEqual(sorted(all_serverids),
1944 sorted([c.nodeid for c in self.clients]),
1947 self.failUnlessEqual(d["count-wrong-shares"], 0, where)
1948 self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
1949 self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
1952 def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
1953 self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
1954 self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
1955 self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
1956 self.failUnless(cr.get_post_repair_results().is_healthy(), where)
1957 self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
1958 self.failIf(cr.get_repair_attempted(), where)
1960 def deep_check_is_healthy(self, cr, num_healthy, where):
1961 self.failUnless(IDeepCheckResults.providedBy(cr))
1962 self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
1965 def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
1966 self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
1967 c = cr.get_counters()
1968 self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
1970 self.failUnlessEqual(c["count-objects-healthy-post-repair"],
1972 self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
1974 def test_good(self):
1975 self.basedir = self.mktemp()
1976 d = self.set_up_nodes()
1977 d.addCallback(self.set_up_tree)
1978 d.addCallback(self.do_stats)
1979 d.addCallback(self.do_test_good)
1980 d.addCallback(self.do_test_web)
1981 d.addErrback(self.explain_web_error)
1984 def do_stats(self, ignored):
1985 d = defer.succeed(None)
1986 d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
1987 d.addCallback(self.check_stats)
1990 def check_stats(self, s):
1991 self.failUnlessEqual(s["count-directories"], 1)
1992 self.failUnlessEqual(s["count-files"], 3)
1993 self.failUnlessEqual(s["count-immutable-files"], 1)
1994 self.failUnlessEqual(s["count-literal-files"], 1)
1995 self.failUnlessEqual(s["count-mutable-files"], 1)
1996 # don't check directories: their size will vary
1997 # s["largest-directory"]
1998 # s["size-directories"]
1999 self.failUnlessEqual(s["largest-directory-children"], 4)
2000 self.failUnlessEqual(s["largest-immutable-file"], 13000)
2001 # to re-use this function for both the local
2002 # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
2003 # coerce the result into a list of tuples. dirnode.start_deep_stats()
2004 # returns a list of tuples, but JSON only knows about lists., so
2005 # t=start-deep-stats returns a list of lists.
2006 histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
2007 self.failUnlessEqual(histogram, [(11, 31, 1),
2010 self.failUnlessEqual(s["size-immutable-files"], 13000)
2011 self.failUnlessEqual(s["size-literal-files"], 22)
2013 def do_test_good(self, ignored):
2014 d = defer.succeed(None)
2015 # check the individual items
2016 d.addCallback(lambda ign: self.root.check(Monitor()))
2017 d.addCallback(self.check_is_healthy, self.root, "root")
2018 d.addCallback(lambda ign: self.mutable.check(Monitor()))
2019 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2020 d.addCallback(lambda ign: self.large.check(Monitor()))
2021 d.addCallback(self.check_is_healthy, self.large, "large")
2022 d.addCallback(lambda ign: self.small.check(Monitor()))
2023 d.addCallback(self.failUnlessEqual, None, "small")
2025 # and again with verify=True
2026 d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2027 d.addCallback(self.check_is_healthy, self.root, "root")
2028 d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2029 d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2030 d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2031 d.addCallback(self.check_is_healthy, self.large, "large",
2033 d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2034 d.addCallback(self.failUnlessEqual, None, "small")
2036 # and check_and_repair(), which should be a nop
2037 d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2038 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2039 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2040 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2041 d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2042 d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2043 d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2044 d.addCallback(self.failUnlessEqual, None, "small")
2046 # check_and_repair(verify=True)
2047 d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2048 d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2049 d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2050 d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2051 d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2052 d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2054 d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2055 d.addCallback(self.failUnlessEqual, None, "small")
2058 # now deep-check the root, with various verify= and repair= options
2059 d.addCallback(lambda ign:
2060 self.root.start_deep_check().when_done())
2061 d.addCallback(self.deep_check_is_healthy, 3, "root")
2062 d.addCallback(lambda ign:
2063 self.root.start_deep_check(verify=True).when_done())
2064 d.addCallback(self.deep_check_is_healthy, 3, "root")
2065 d.addCallback(lambda ign:
2066 self.root.start_deep_check_and_repair().when_done())
2067 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2068 d.addCallback(lambda ign:
2069 self.root.start_deep_check_and_repair(verify=True).when_done())
2070 d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2072 # and finally, start a deep-check, but then cancel it.
2073 d.addCallback(lambda ign: self.root.start_deep_check())
2074 def _checking(monitor):
2076 d = monitor.when_done()
2077 # this should fire as soon as the next dirnode.list finishes.
2078 # TODO: add a counter to measure how many list() calls are made,
2079 # assert that no more than one gets to run before the cancel()
2081 def _finished_normally(res):
2082 self.fail("this was supposed to fail, not finish normally")
2084 f.trap(OperationCancelledError)
2085 d.addCallbacks(_finished_normally, _cancelled)
2087 d.addCallback(_checking)
2091 def web_json(self, n, **kwargs):
2092 kwargs["output"] = "json"
2093 d = self.web(n, "POST", **kwargs)
2094 d.addCallback(self.decode_json)
2097 def decode_json(self, (s,url)):
2099 data = simplejson.loads(s)
2101 self.fail("%s: not JSON: '%s'" % (url, s))
2104 def web(self, n, method="GET", **kwargs):
2105 # returns (data, url)
2106 url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
2107 + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
2108 d = getPage(url, method=method)
2109 d.addCallback(lambda data: (data,url))
2112 def wait_for_operation(self, ignored, ophandle):
2113 url = self.webish_url + "operations/" + ophandle
2114 url += "?t=status&output=JSON"
2118 data = simplejson.loads(res)
2120 self.fail("%s: not JSON: '%s'" % (url, res))
2121 if not data["finished"]:
2122 d = self.stall(delay=1.0)
2123 d.addCallback(self.wait_for_operation, ophandle)
2129 def get_operation_results(self, ignored, ophandle, output=None):
2130 url = self.webish_url + "operations/" + ophandle
2133 url += "&output=" + output
2136 if output and output.lower() == "json":
2138 return simplejson.loads(res)
2140 self.fail("%s: not JSON: '%s'" % (url, res))
2145 def slow_web(self, n, output=None, **kwargs):
2147 handle = base32.b2a(os.urandom(4))
2148 d = self.web(n, "POST", ophandle=handle, **kwargs)
2149 d.addCallback(self.wait_for_operation, handle)
2150 d.addCallback(self.get_operation_results, handle, output=output)
2153 def json_check_is_healthy(self, data, n, where, incomplete=False):
2155 self.failUnlessEqual(data["storage-index"],
2156 base32.b2a(n.get_storage_index()), where)
2158 self.failUnlessEqual(r["healthy"], True, where)
2159 needs_rebalancing = bool( len(self.clients) < 10 )
2161 self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2162 self.failUnlessEqual(r["count-shares-good"], 10, where)
2163 self.failUnlessEqual(r["count-shares-needed"], 3, where)
2164 self.failUnlessEqual(r["count-shares-expected"], 10, where)
2166 self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2167 self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2168 self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2170 self.failUnlessEqual(sorted(r["servers-responding"]),
2171 sorted([idlib.nodeid_b2a(c.nodeid)
2172 for c in self.clients]), where)
2173 self.failUnless("sharemap" in r, where)
2174 all_serverids = set()
2175 for (shareid, serverids_s) in r["sharemap"].items():
2176 all_serverids.update(serverids_s)
2177 self.failUnlessEqual(sorted(all_serverids),
2178 sorted([idlib.nodeid_b2a(c.nodeid)
2179 for c in self.clients]), where)
2180 self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2181 self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2182 self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2184 def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2185 self.failUnlessEqual(data["storage-index"],
2186 base32.b2a(n.get_storage_index()), where)
2187 self.failUnlessEqual(data["repair-attempted"], False, where)
2188 self.json_check_is_healthy(data["pre-repair-results"],
2189 n, where, incomplete)
2190 self.json_check_is_healthy(data["post-repair-results"],
2191 n, where, incomplete)
2193 def json_full_deepcheck_is_healthy(self, data, n, where):
2194 self.failUnlessEqual(data["root-storage-index"],
2195 base32.b2a(n.get_storage_index()), where)
2196 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2197 self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2198 self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2199 self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2200 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2201 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2202 self.json_check_stats(data["stats"], where)
2204 def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2205 self.failUnlessEqual(data["root-storage-index"],
2206 base32.b2a(n.get_storage_index()), where)
2207 self.failUnlessEqual(data["count-objects-checked"], 3, where)
2209 self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2210 self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2211 self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2213 self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2214 self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2215 self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2217 self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2218 self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2219 self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2221 self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2222 self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2223 self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2226 def json_check_lit(self, data, n, where):
2227 self.failUnlessEqual(data["storage-index"], "", where)
2228 self.failUnlessEqual(data["results"]["healthy"], True, where)
2230 def json_check_stats(self, data, where):
2231 self.check_stats(data)
2233 def do_test_web(self, ignored):
2234 d = defer.succeed(None)
2237 d.addCallback(lambda ign:
2238 self.slow_web(self.root,
2239 t="start-deep-stats", output="json"))
2240 d.addCallback(self.json_check_stats, "deep-stats")
2243 d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2244 d.addCallback(self.json_check_is_healthy, self.root, "root")
2245 d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2246 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2247 d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2248 d.addCallback(self.json_check_is_healthy, self.large, "large")
2249 d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2250 d.addCallback(self.json_check_lit, self.small, "small")
2253 d.addCallback(lambda ign:
2254 self.web_json(self.root, t="check", verify="true"))
2255 d.addCallback(self.json_check_is_healthy, self.root, "root")
2256 d.addCallback(lambda ign:
2257 self.web_json(self.mutable, t="check", verify="true"))
2258 d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2259 d.addCallback(lambda ign:
2260 self.web_json(self.large, t="check", verify="true"))
2261 d.addCallback(self.json_check_is_healthy, self.large, "large", incomplete=True)
2262 d.addCallback(lambda ign:
2263 self.web_json(self.small, t="check", verify="true"))
2264 d.addCallback(self.json_check_lit, self.small, "small")
2266 # check and repair, no verify
2267 d.addCallback(lambda ign:
2268 self.web_json(self.root, t="check", repair="true"))
2269 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2270 d.addCallback(lambda ign:
2271 self.web_json(self.mutable, t="check", repair="true"))
2272 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2273 d.addCallback(lambda ign:
2274 self.web_json(self.large, t="check", repair="true"))
2275 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large")
2276 d.addCallback(lambda ign:
2277 self.web_json(self.small, t="check", repair="true"))
2278 d.addCallback(self.json_check_lit, self.small, "small")
2280 # check+verify+repair
2281 d.addCallback(lambda ign:
2282 self.web_json(self.root, t="check", repair="true", verify="true"))
2283 d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2284 d.addCallback(lambda ign:
2285 self.web_json(self.mutable, t="check", repair="true", verify="true"))
2286 d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2287 d.addCallback(lambda ign:
2288 self.web_json(self.large, t="check", repair="true", verify="true"))
2289 d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large", incomplete=True)
2290 d.addCallback(lambda ign:
2291 self.web_json(self.small, t="check", repair="true", verify="true"))
2292 d.addCallback(self.json_check_lit, self.small, "small")
2294 # now run a deep-check, with various verify= and repair= flags
2295 d.addCallback(lambda ign:
2296 self.slow_web(self.root, t="start-deep-check", output="json"))
2297 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2298 d.addCallback(lambda ign:
2299 self.slow_web(self.root, t="start-deep-check", verify="true",
2301 d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2302 d.addCallback(lambda ign:
2303 self.slow_web(self.root, t="start-deep-check", repair="true",
2305 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2306 d.addCallback(lambda ign:
2307 self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2308 d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2310 # now look at t=info
2311 d.addCallback(lambda ign: self.web(self.root, t="info"))
2312 # TODO: examine the output
2313 d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2314 d.addCallback(lambda ign: self.web(self.large, t="info"))
2315 d.addCallback(lambda ign: self.web(self.small, t="info"))