]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
cli: add tests for 'tahoe stats --verbose'
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri, storage, offloaded
12 from allmydata.immutable import download, upload, filenode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17      ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
18      IDeepCheckAndRepairResults, NoSuchChildError, NotEnoughSharesError
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
26
27 from allmydata.test.common import SystemTestMixin, ErrorMixin, \
28      MemoryConsumer, download_to_data
29
30 LARGE_DATA = """
31 This is some data to publish to the virtual drive, which needs to be large
32 enough to not fit inside a LIT uri.
33 """
34
35 class CountingDataUploadable(upload.Data):
36     bytes_read = 0
37     interrupt_after = None
38     interrupt_after_d = None
39
40     def read(self, length):
41         self.bytes_read += length
42         if self.interrupt_after is not None:
43             if self.bytes_read > self.interrupt_after:
44                 self.interrupt_after = None
45                 self.interrupt_after_d.callback(self)
46         return upload.Data.read(self, length)
47
48 class GrabEverythingConsumer:
49     implements(IConsumer)
50
51     def __init__(self):
52         self.contents = ""
53
54     def registerProducer(self, producer, streaming):
55         assert streaming
56         assert IPushProducer.providedBy(producer)
57
58     def write(self, data):
59         self.contents += data
60
61     def unregisterProducer(self):
62         pass
63
64 class SystemTest(SystemTestMixin, unittest.TestCase):
65
66     def test_connections(self):
67         self.basedir = "system/SystemTest/test_connections"
68         d = self.set_up_nodes()
69         self.extra_node = None
70         d.addCallback(lambda res: self.add_extra_node(self.numclients))
71         def _check(extra_node):
72             self.extra_node = extra_node
73             for c in self.clients:
74                 all_peerids = list(c.get_all_peerids())
75                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
76                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
77                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
78
79         d.addCallback(_check)
80         def _shutdown_extra_node(res):
81             if self.extra_node:
82                 return self.extra_node.stopService()
83             return res
84         d.addBoth(_shutdown_extra_node)
85         return d
86     test_connections.timeout = 300
87     # test_connections is subsumed by test_upload_and_download, and takes
88     # quite a while to run on a slow machine (because of all the TLS
89     # connections that must be established). If we ever rework the introducer
90     # code to such an extent that we're not sure if it works anymore, we can
91     # reinstate this test until it does.
92     del test_connections
93
94     def test_upload_and_download_random_key(self):
95         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
96         return self._test_upload_and_download(convergence=None)
97     test_upload_and_download_random_key.timeout = 4800
98
99     def test_upload_and_download_convergent(self):
100         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
101         return self._test_upload_and_download(convergence="some convergence string")
102     test_upload_and_download_convergent.timeout = 4800
103
104     def _test_upload_and_download(self, convergence):
105         # we use 4000 bytes of data, which will result in about 400k written
106         # to disk among all our simulated nodes
107         DATA = "Some data to upload\n" * 200
108         d = self.set_up_nodes()
109         def _check_connections(res):
110             for c in self.clients:
111                 all_peerids = list(c.get_all_peerids())
112                 self.failUnlessEqual(len(all_peerids), self.numclients)
113                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
114                 self.failUnlessEqual(len(permuted_peers), self.numclients)
115         d.addCallback(_check_connections)
116
117         def _do_upload(res):
118             log.msg("UPLOADING")
119             u = self.clients[0].getServiceNamed("uploader")
120             self.uploader = u
121             # we crank the max segsize down to 1024b for the duration of this
122             # test, so we can exercise multiple segments. It is important
123             # that this is not a multiple of the segment size, so that the
124             # tail segment is not the same length as the others. This actualy
125             # gets rounded up to 1025 to be a multiple of the number of
126             # required shares (since we use 25 out of 100 FEC).
127             up = upload.Data(DATA, convergence=convergence)
128             up.max_segment_size = 1024
129             d1 = u.upload(up)
130             return d1
131         d.addCallback(_do_upload)
132         def _upload_done(results):
133             uri = results.uri
134             log.msg("upload finished: uri is %s" % (uri,))
135             self.uri = uri
136             dl = self.clients[1].getServiceNamed("downloader")
137             self.downloader = dl
138         d.addCallback(_upload_done)
139
140         def _upload_again(res):
141             # Upload again. If using convergent encryption then this ought to be
142             # short-circuited, however with the way we currently generate URIs
143             # (i.e. because they include the roothash), we have to do all of the
144             # encoding work, and only get to save on the upload part.
145             log.msg("UPLOADING AGAIN")
146             up = upload.Data(DATA, convergence=convergence)
147             up.max_segment_size = 1024
148             d1 = self.uploader.upload(up)
149         d.addCallback(_upload_again)
150
151         def _download_to_data(res):
152             log.msg("DOWNLOADING")
153             return self.downloader.download_to_data(self.uri)
154         d.addCallback(_download_to_data)
155         def _download_to_data_done(data):
156             log.msg("download finished")
157             self.failUnlessEqual(data, DATA)
158         d.addCallback(_download_to_data_done)
159
160         target_filename = os.path.join(self.basedir, "download.target")
161         def _download_to_filename(res):
162             return self.downloader.download_to_filename(self.uri,
163                                                         target_filename)
164         d.addCallback(_download_to_filename)
165         def _download_to_filename_done(res):
166             newdata = open(target_filename, "rb").read()
167             self.failUnlessEqual(newdata, DATA)
168         d.addCallback(_download_to_filename_done)
169
170         target_filename2 = os.path.join(self.basedir, "download.target2")
171         def _download_to_filehandle(res):
172             fh = open(target_filename2, "wb")
173             return self.downloader.download_to_filehandle(self.uri, fh)
174         d.addCallback(_download_to_filehandle)
175         def _download_to_filehandle_done(fh):
176             fh.close()
177             newdata = open(target_filename2, "rb").read()
178             self.failUnlessEqual(newdata, DATA)
179         d.addCallback(_download_to_filehandle_done)
180
181         consumer = GrabEverythingConsumer()
182         ct = download.ConsumerAdapter(consumer)
183         d.addCallback(lambda res:
184                       self.downloader.download(self.uri, ct))
185         def _download_to_consumer_done(ign):
186             self.failUnlessEqual(consumer.contents, DATA)
187         d.addCallback(_download_to_consumer_done)
188
189         def _test_read(res):
190             n = self.clients[1].create_node_from_uri(self.uri)
191             d = download_to_data(n)
192             def _read_done(data):
193                 self.failUnlessEqual(data, DATA)
194             d.addCallback(_read_done)
195             d.addCallback(lambda ign:
196                           n.read(MemoryConsumer(), offset=1, size=4))
197             def _read_portion_done(mc):
198                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
199             d.addCallback(_read_portion_done)
200             d.addCallback(lambda ign:
201                           n.read(MemoryConsumer(), offset=2, size=None))
202             def _read_tail_done(mc):
203                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
204             d.addCallback(_read_tail_done)
205             d.addCallback(lambda ign:
206                           n.read(MemoryConsumer(), size=len(DATA)+1000))
207             def _read_too_much(mc):
208                 self.failUnlessEqual("".join(mc.chunks), DATA)
209             d.addCallback(_read_too_much)
210
211             return d
212         d.addCallback(_test_read)
213
214         def _test_bad_read(res):
215             bad_u = uri.from_string_filenode(self.uri)
216             bad_u.key = self.flip_bit(bad_u.key)
217             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
218             # this should cause an error during download
219
220             d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
221                                  None,
222                                  bad_n.read, MemoryConsumer(), offset=2)
223             return d
224         d.addCallback(_test_bad_read)
225
226         def _download_nonexistent_uri(res):
227             baduri = self.mangle_uri(self.uri)
228             log.msg("about to download non-existent URI", level=log.UNUSUAL,
229                     facility="tahoe.tests")
230             d1 = self.downloader.download_to_data(baduri)
231             def _baduri_should_fail(res):
232                 log.msg("finished downloading non-existend URI",
233                         level=log.UNUSUAL, facility="tahoe.tests")
234                 self.failUnless(isinstance(res, Failure))
235                 self.failUnless(res.check(NotEnoughSharesError),
236                                 "expected NotEnoughSharesError, got %s" % res)
237                 # TODO: files that have zero peers should get a special kind
238                 # of NotEnoughSharesError, which can be used to suggest that
239                 # the URI might be wrong or that they've never uploaded the
240                 # file in the first place.
241             d1.addBoth(_baduri_should_fail)
242             return d1
243         d.addCallback(_download_nonexistent_uri)
244
245         # add a new node, which doesn't accept shares, and only uses the
246         # helper for upload.
247         d.addCallback(lambda res: self.add_extra_node(self.numclients,
248                                                       self.helper_furl,
249                                                       add_to_sparent=True))
250         def _added(extra_node):
251             self.extra_node = extra_node
252             extra_node.getServiceNamed("storage").sizelimit = 0
253         d.addCallback(_added)
254
255         HELPER_DATA = "Data that needs help to upload" * 1000
256         def _upload_with_helper(res):
257             u = upload.Data(HELPER_DATA, convergence=convergence)
258             d = self.extra_node.upload(u)
259             def _uploaded(results):
260                 uri = results.uri
261                 return self.downloader.download_to_data(uri)
262             d.addCallback(_uploaded)
263             def _check(newdata):
264                 self.failUnlessEqual(newdata, HELPER_DATA)
265             d.addCallback(_check)
266             return d
267         d.addCallback(_upload_with_helper)
268
269         def _upload_duplicate_with_helper(res):
270             u = upload.Data(HELPER_DATA, convergence=convergence)
271             u.debug_stash_RemoteEncryptedUploadable = True
272             d = self.extra_node.upload(u)
273             def _uploaded(results):
274                 uri = results.uri
275                 return self.downloader.download_to_data(uri)
276             d.addCallback(_uploaded)
277             def _check(newdata):
278                 self.failUnlessEqual(newdata, HELPER_DATA)
279                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
280                             "uploadable started uploading, should have been avoided")
281             d.addCallback(_check)
282             return d
283         if convergence is not None:
284             d.addCallback(_upload_duplicate_with_helper)
285
286         def _upload_resumable(res):
287             DATA = "Data that needs help to upload and gets interrupted" * 1000
288             u1 = CountingDataUploadable(DATA, convergence=convergence)
289             u2 = CountingDataUploadable(DATA, convergence=convergence)
290
291             # we interrupt the connection after about 5kB by shutting down
292             # the helper, then restartingit.
293             u1.interrupt_after = 5000
294             u1.interrupt_after_d = defer.Deferred()
295             u1.interrupt_after_d.addCallback(lambda res:
296                                              self.bounce_client(0))
297
298             # sneak into the helper and reduce its chunk size, so that our
299             # debug_interrupt will sever the connection on about the fifth
300             # chunk fetched. This makes sure that we've started to write the
301             # new shares before we abandon them, which exercises the
302             # abort/delete-partial-share code. TODO: find a cleaner way to do
303             # this. I know that this will affect later uses of the helper in
304             # this same test run, but I'm not currently worried about it.
305             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
306
307             d = self.extra_node.upload(u1)
308
309             def _should_not_finish(res):
310                 self.fail("interrupted upload should have failed, not finished"
311                           " with result %s" % (res,))
312             def _interrupted(f):
313                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
314
315                 # make sure we actually interrupted it before finishing the
316                 # file
317                 self.failUnless(u1.bytes_read < len(DATA),
318                                 "read %d out of %d total" % (u1.bytes_read,
319                                                              len(DATA)))
320
321                 log.msg("waiting for reconnect", level=log.NOISY,
322                         facility="tahoe.test.test_system")
323                 # now, we need to give the nodes a chance to notice that this
324                 # connection has gone away. When this happens, the storage
325                 # servers will be told to abort their uploads, removing the
326                 # partial shares. Unfortunately this involves TCP messages
327                 # going through the loopback interface, and we can't easily
328                 # predict how long that will take. If it were all local, we
329                 # could use fireEventually() to stall. Since we don't have
330                 # the right introduction hooks, the best we can do is use a
331                 # fixed delay. TODO: this is fragile.
332                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
333                 return u1.interrupt_after_d
334             d.addCallbacks(_should_not_finish, _interrupted)
335
336             def _disconnected(res):
337                 # check to make sure the storage servers aren't still hanging
338                 # on to the partial share: their incoming/ directories should
339                 # now be empty.
340                 log.msg("disconnected", level=log.NOISY,
341                         facility="tahoe.test.test_system")
342                 for i in range(self.numclients):
343                     incdir = os.path.join(self.getdir("client%d" % i),
344                                           "storage", "shares", "incoming")
345                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
346             d.addCallback(_disconnected)
347
348             # then we need to give the reconnector a chance to
349             # reestablish the connection to the helper.
350             d.addCallback(lambda res:
351                           log.msg("wait_for_connections", level=log.NOISY,
352                                   facility="tahoe.test.test_system"))
353             d.addCallback(lambda res: self.wait_for_connections())
354
355
356             d.addCallback(lambda res:
357                           log.msg("uploading again", level=log.NOISY,
358                                   facility="tahoe.test.test_system"))
359             d.addCallback(lambda res: self.extra_node.upload(u2))
360
361             def _uploaded(results):
362                 uri = results.uri
363                 log.msg("Second upload complete", level=log.NOISY,
364                         facility="tahoe.test.test_system")
365
366                 # this is really bytes received rather than sent, but it's
367                 # convenient and basically measures the same thing
368                 bytes_sent = results.ciphertext_fetched
369
370                 # We currently don't support resumption of upload if the data is
371                 # encrypted with a random key.  (Because that would require us
372                 # to store the key locally and re-use it on the next upload of
373                 # this file, which isn't a bad thing to do, but we currently
374                 # don't do it.)
375                 if convergence is not None:
376                     # Make sure we did not have to read the whole file the
377                     # second time around .
378                     self.failUnless(bytes_sent < len(DATA),
379                                 "resumption didn't save us any work:"
380                                 " read %d bytes out of %d total" %
381                                 (bytes_sent, len(DATA)))
382                 else:
383                     # Make sure we did have to read the whole file the second
384                     # time around -- because the one that we partially uploaded
385                     # earlier was encrypted with a different random key.
386                     self.failIf(bytes_sent < len(DATA),
387                                 "resumption saved us some work even though we were using random keys:"
388                                 " read %d bytes out of %d total" %
389                                 (bytes_sent, len(DATA)))
390                 return self.downloader.download_to_data(uri)
391             d.addCallback(_uploaded)
392
393             def _check(newdata):
394                 self.failUnlessEqual(newdata, DATA)
395                 # If using convergent encryption, then also check that the
396                 # helper has removed the temp file from its directories.
397                 if convergence is not None:
398                     basedir = os.path.join(self.getdir("client0"), "helper")
399                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
400                     self.failUnlessEqual(files, [])
401                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
402                     self.failUnlessEqual(files, [])
403             d.addCallback(_check)
404             return d
405         d.addCallback(_upload_resumable)
406
407         return d
408
409     def _find_shares(self, basedir):
410         shares = []
411         for (dirpath, dirnames, filenames) in os.walk(basedir):
412             if "storage" not in dirpath:
413                 continue
414             if not filenames:
415                 continue
416             pieces = dirpath.split(os.sep)
417             if pieces[-4] == "storage" and pieces[-3] == "shares":
418                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
419                 # are sharefiles here
420                 assert pieces[-5].startswith("client")
421                 client_num = int(pieces[-5][-1])
422                 storage_index_s = pieces[-1]
423                 storage_index = storage.si_a2b(storage_index_s)
424                 for sharename in filenames:
425                     shnum = int(sharename)
426                     filename = os.path.join(dirpath, sharename)
427                     data = (client_num, storage_index, filename, shnum)
428                     shares.append(data)
429         if not shares:
430             self.fail("unable to find any share files in %s" % basedir)
431         return shares
432
433     def _corrupt_mutable_share(self, filename, which):
434         msf = storage.MutableShareFile(filename)
435         datav = msf.readv([ (0, 1000000) ])
436         final_share = datav[0]
437         assert len(final_share) < 1000000 # ought to be truncated
438         pieces = mutable_layout.unpack_share(final_share)
439         (seqnum, root_hash, IV, k, N, segsize, datalen,
440          verification_key, signature, share_hash_chain, block_hash_tree,
441          share_data, enc_privkey) = pieces
442
443         if which == "seqnum":
444             seqnum = seqnum + 15
445         elif which == "R":
446             root_hash = self.flip_bit(root_hash)
447         elif which == "IV":
448             IV = self.flip_bit(IV)
449         elif which == "segsize":
450             segsize = segsize + 15
451         elif which == "pubkey":
452             verification_key = self.flip_bit(verification_key)
453         elif which == "signature":
454             signature = self.flip_bit(signature)
455         elif which == "share_hash_chain":
456             nodenum = share_hash_chain.keys()[0]
457             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
458         elif which == "block_hash_tree":
459             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
460         elif which == "share_data":
461             share_data = self.flip_bit(share_data)
462         elif which == "encprivkey":
463             enc_privkey = self.flip_bit(enc_privkey)
464
465         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
466                                             segsize, datalen)
467         final_share = mutable_layout.pack_share(prefix,
468                                                 verification_key,
469                                                 signature,
470                                                 share_hash_chain,
471                                                 block_hash_tree,
472                                                 share_data,
473                                                 enc_privkey)
474         msf.writev( [(0, final_share)], None)
475
476
477     def test_mutable(self):
478         self.basedir = "system/SystemTest/test_mutable"
479         DATA = "initial contents go here."  # 25 bytes % 3 != 0
480         NEWDATA = "new contents yay"
481         NEWERDATA = "this is getting old"
482
483         d = self.set_up_nodes(use_key_generator=True)
484
485         def _create_mutable(res):
486             c = self.clients[0]
487             log.msg("starting create_mutable_file")
488             d1 = c.create_mutable_file(DATA)
489             def _done(res):
490                 log.msg("DONE: %s" % (res,))
491                 self._mutable_node_1 = res
492                 uri = res.get_uri()
493             d1.addCallback(_done)
494             return d1
495         d.addCallback(_create_mutable)
496
497         def _test_debug(res):
498             # find a share. It is important to run this while there is only
499             # one slot in the grid.
500             shares = self._find_shares(self.basedir)
501             (client_num, storage_index, filename, shnum) = shares[0]
502             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
503                     % filename)
504             log.msg(" for clients[%d]" % client_num)
505
506             out,err = StringIO(), StringIO()
507             rc = runner.runner(["debug", "dump-share", "--offsets",
508                                 filename],
509                                stdout=out, stderr=err)
510             output = out.getvalue()
511             self.failUnlessEqual(rc, 0)
512             try:
513                 self.failUnless("Mutable slot found:\n" in output)
514                 self.failUnless("share_type: SDMF\n" in output)
515                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
516                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
517                 self.failUnless(" num_extra_leases: 0\n" in output)
518                 # the pubkey size can vary by a byte, so the container might
519                 # be a bit larger on some runs.
520                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
521                 self.failUnless(m)
522                 container_size = int(m.group(1))
523                 self.failUnless(2037 <= container_size <= 2049, container_size)
524                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
525                 self.failUnless(m)
526                 data_length = int(m.group(1))
527                 self.failUnless(2037 <= data_length <= 2049, data_length)
528                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
529                                 in output)
530                 self.failUnless(" SDMF contents:\n" in output)
531                 self.failUnless("  seqnum: 1\n" in output)
532                 self.failUnless("  required_shares: 3\n" in output)
533                 self.failUnless("  total_shares: 10\n" in output)
534                 self.failUnless("  segsize: 27\n" in output, (output, filename))
535                 self.failUnless("  datalen: 25\n" in output)
536                 # the exact share_hash_chain nodes depends upon the sharenum,
537                 # and is more of a hassle to compute than I want to deal with
538                 # now
539                 self.failUnless("  share_hash_chain: " in output)
540                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
541                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
542                             base32.b2a(storage_index))
543                 self.failUnless(expected in output)
544             except unittest.FailTest:
545                 print
546                 print "dump-share output was:"
547                 print output
548                 raise
549         d.addCallback(_test_debug)
550
551         # test retrieval
552
553         # first, let's see if we can use the existing node to retrieve the
554         # contents. This allows it to use the cached pubkey and maybe the
555         # latest-known sharemap.
556
557         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
558         def _check_download_1(res):
559             self.failUnlessEqual(res, DATA)
560             # now we see if we can retrieve the data from a new node,
561             # constructed using the URI of the original one. We do this test
562             # on the same client that uploaded the data.
563             uri = self._mutable_node_1.get_uri()
564             log.msg("starting retrieve1")
565             newnode = self.clients[0].create_node_from_uri(uri)
566             newnode_2 = self.clients[0].create_node_from_uri(uri)
567             self.failUnlessIdentical(newnode, newnode_2)
568             return newnode.download_best_version()
569         d.addCallback(_check_download_1)
570
571         def _check_download_2(res):
572             self.failUnlessEqual(res, DATA)
573             # same thing, but with a different client
574             uri = self._mutable_node_1.get_uri()
575             newnode = self.clients[1].create_node_from_uri(uri)
576             log.msg("starting retrieve2")
577             d1 = newnode.download_best_version()
578             d1.addCallback(lambda res: (res, newnode))
579             return d1
580         d.addCallback(_check_download_2)
581
582         def _check_download_3((res, newnode)):
583             self.failUnlessEqual(res, DATA)
584             # replace the data
585             log.msg("starting replace1")
586             d1 = newnode.overwrite(NEWDATA)
587             d1.addCallback(lambda res: newnode.download_best_version())
588             return d1
589         d.addCallback(_check_download_3)
590
591         def _check_download_4(res):
592             self.failUnlessEqual(res, NEWDATA)
593             # now create an even newer node and replace the data on it. This
594             # new node has never been used for download before.
595             uri = self._mutable_node_1.get_uri()
596             newnode1 = self.clients[2].create_node_from_uri(uri)
597             newnode2 = self.clients[3].create_node_from_uri(uri)
598             self._newnode3 = self.clients[3].create_node_from_uri(uri)
599             log.msg("starting replace2")
600             d1 = newnode1.overwrite(NEWERDATA)
601             d1.addCallback(lambda res: newnode2.download_best_version())
602             return d1
603         d.addCallback(_check_download_4)
604
605         def _check_download_5(res):
606             log.msg("finished replace2")
607             self.failUnlessEqual(res, NEWERDATA)
608         d.addCallback(_check_download_5)
609
610         def _corrupt_shares(res):
611             # run around and flip bits in all but k of the shares, to test
612             # the hash checks
613             shares = self._find_shares(self.basedir)
614             ## sort by share number
615             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
616             where = dict([ (shnum, filename)
617                            for (client_num, storage_index, filename, shnum)
618                            in shares ])
619             assert len(where) == 10 # this test is designed for 3-of-10
620             for shnum, filename in where.items():
621                 # shares 7,8,9 are left alone. read will check
622                 # (share_hash_chain, block_hash_tree, share_data). New
623                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
624                 # segsize, signature).
625                 if shnum == 0:
626                     # read: this will trigger "pubkey doesn't match
627                     # fingerprint".
628                     self._corrupt_mutable_share(filename, "pubkey")
629                     self._corrupt_mutable_share(filename, "encprivkey")
630                 elif shnum == 1:
631                     # triggers "signature is invalid"
632                     self._corrupt_mutable_share(filename, "seqnum")
633                 elif shnum == 2:
634                     # triggers "signature is invalid"
635                     self._corrupt_mutable_share(filename, "R")
636                 elif shnum == 3:
637                     # triggers "signature is invalid"
638                     self._corrupt_mutable_share(filename, "segsize")
639                 elif shnum == 4:
640                     self._corrupt_mutable_share(filename, "share_hash_chain")
641                 elif shnum == 5:
642                     self._corrupt_mutable_share(filename, "block_hash_tree")
643                 elif shnum == 6:
644                     self._corrupt_mutable_share(filename, "share_data")
645                 # other things to correct: IV, signature
646                 # 7,8,9 are left alone
647
648                 # note that initial_query_count=5 means that we'll hit the
649                 # first 5 servers in effectively random order (based upon
650                 # response time), so we won't necessarily ever get a "pubkey
651                 # doesn't match fingerprint" error (if we hit shnum>=1 before
652                 # shnum=0, we pull the pubkey from there). To get repeatable
653                 # specific failures, we need to set initial_query_count=1,
654                 # but of course that will change the sequencing behavior of
655                 # the retrieval process. TODO: find a reasonable way to make
656                 # this a parameter, probably when we expand this test to test
657                 # for one failure mode at a time.
658
659                 # when we retrieve this, we should get three signature
660                 # failures (where we've mangled seqnum, R, and segsize). The
661                 # pubkey mangling
662         d.addCallback(_corrupt_shares)
663
664         d.addCallback(lambda res: self._newnode3.download_best_version())
665         d.addCallback(_check_download_5)
666
667         def _check_empty_file(res):
668             # make sure we can create empty files, this usually screws up the
669             # segsize math
670             d1 = self.clients[2].create_mutable_file("")
671             d1.addCallback(lambda newnode: newnode.download_best_version())
672             d1.addCallback(lambda res: self.failUnlessEqual("", res))
673             return d1
674         d.addCallback(_check_empty_file)
675
676         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
677         def _created_dirnode(dnode):
678             log.msg("_created_dirnode(%s)" % (dnode,))
679             d1 = dnode.list()
680             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
681             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
682             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
683             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
684             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
685             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
686             d1.addCallback(lambda res: dnode.build_manifest().when_done())
687             d1.addCallback(lambda manifest:
688                            self.failUnlessEqual(len(manifest), 1))
689             return d1
690         d.addCallback(_created_dirnode)
691
692         def wait_for_c3_kg_conn():
693             return self.clients[3]._key_generator is not None
694         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
695
696         def check_kg_poolsize(junk, size_delta):
697             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
698                                  self.key_generator_svc.key_generator.pool_size + size_delta)
699
700         d.addCallback(check_kg_poolsize, 0)
701         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
702         d.addCallback(check_kg_poolsize, -1)
703         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
704         d.addCallback(check_kg_poolsize, -2)
705         # use_helper induces use of clients[3], which is the using-key_gen client
706         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
707         d.addCallback(check_kg_poolsize, -3)
708
709         return d
710     # The default 120 second timeout went off when running it under valgrind
711     # on my old Windows laptop, so I'm bumping up the timeout.
712     test_mutable.timeout = 240
713
714     def flip_bit(self, good):
715         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
716
717     def mangle_uri(self, gooduri):
718         # change the key, which changes the storage index, which means we'll
719         # be asking about the wrong file, so nobody will have any shares
720         u = IFileURI(gooduri)
721         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
722                             uri_extension_hash=u.uri_extension_hash,
723                             needed_shares=u.needed_shares,
724                             total_shares=u.total_shares,
725                             size=u.size)
726         return u2.to_string()
727
728     # TODO: add a test which mangles the uri_extension_hash instead, and
729     # should fail due to not being able to get a valid uri_extension block.
730     # Also a test which sneakily mangles the uri_extension block to change
731     # some of the validation data, so it will fail in the post-download phase
732     # when the file's crypttext integrity check fails. Do the same thing for
733     # the key, which should cause the download to fail the post-download
734     # plaintext_hash check.
735
736     def test_vdrive(self):
737         self.basedir = "system/SystemTest/test_vdrive"
738         self.data = LARGE_DATA
739         d = self.set_up_nodes(use_stats_gatherer=True)
740         d.addCallback(self._test_introweb)
741         d.addCallback(self.log, "starting publish")
742         d.addCallback(self._do_publish1)
743         d.addCallback(self._test_runner)
744         d.addCallback(self._do_publish2)
745         # at this point, we have the following filesystem (where "R" denotes
746         # self._root_directory_uri):
747         # R
748         # R/subdir1
749         # R/subdir1/mydata567
750         # R/subdir1/subdir2/
751         # R/subdir1/subdir2/mydata992
752
753         d.addCallback(lambda res: self.bounce_client(0))
754         d.addCallback(self.log, "bounced client0")
755
756         d.addCallback(self._check_publish1)
757         d.addCallback(self.log, "did _check_publish1")
758         d.addCallback(self._check_publish2)
759         d.addCallback(self.log, "did _check_publish2")
760         d.addCallback(self._do_publish_private)
761         d.addCallback(self.log, "did _do_publish_private")
762         # now we also have (where "P" denotes a new dir):
763         #  P/personal/sekrit data
764         #  P/s2-rw -> /subdir1/subdir2/
765         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
766         d.addCallback(self._check_publish_private)
767         d.addCallback(self.log, "did _check_publish_private")
768         d.addCallback(self._test_web)
769         d.addCallback(self._test_control)
770         d.addCallback(self._test_cli)
771         # P now has four top-level children:
772         # P/personal/sekrit data
773         # P/s2-ro/
774         # P/s2-rw/
775         # P/test_put/  (empty)
776         d.addCallback(self._test_checker)
777         d.addCallback(self._grab_stats)
778         return d
779     test_vdrive.timeout = 1100
780
781     def _test_introweb(self, res):
782         d = getPage(self.introweb_url, method="GET", followRedirect=True)
783         def _check(res):
784             try:
785                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
786                                 in res)
787                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
788                 self.failUnless("Subscription Summary: storage: 5" in res)
789             except unittest.FailTest:
790                 print
791                 print "GET %s output was:" % self.introweb_url
792                 print res
793                 raise
794         d.addCallback(_check)
795         d.addCallback(lambda res:
796                       getPage(self.introweb_url + "?t=json",
797                               method="GET", followRedirect=True))
798         def _check_json(res):
799             data = simplejson.loads(res)
800             try:
801                 self.failUnlessEqual(data["subscription_summary"],
802                                      {"storage": 5})
803                 self.failUnlessEqual(data["announcement_summary"],
804                                      {"storage": 5, "stub_client": 5})
805             except unittest.FailTest:
806                 print
807                 print "GET %s?t=json output was:" % self.introweb_url
808                 print res
809                 raise
810         d.addCallback(_check_json)
811         return d
812
813     def _do_publish1(self, res):
814         ut = upload.Data(self.data, convergence=None)
815         c0 = self.clients[0]
816         d = c0.create_empty_dirnode()
817         def _made_root(new_dirnode):
818             self._root_directory_uri = new_dirnode.get_uri()
819             return c0.create_node_from_uri(self._root_directory_uri)
820         d.addCallback(_made_root)
821         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
822         def _made_subdir1(subdir1_node):
823             self._subdir1_node = subdir1_node
824             d1 = subdir1_node.add_file(u"mydata567", ut)
825             d1.addCallback(self.log, "publish finished")
826             def _stash_uri(filenode):
827                 self.uri = filenode.get_uri()
828             d1.addCallback(_stash_uri)
829             return d1
830         d.addCallback(_made_subdir1)
831         return d
832
833     def _do_publish2(self, res):
834         ut = upload.Data(self.data, convergence=None)
835         d = self._subdir1_node.create_empty_directory(u"subdir2")
836         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
837         return d
838
839     def log(self, res, *args, **kwargs):
840         # print "MSG: %s  RES: %s" % (msg, args)
841         log.msg(*args, **kwargs)
842         return res
843
844     def _do_publish_private(self, res):
845         self.smalldata = "sssh, very secret stuff"
846         ut = upload.Data(self.smalldata, convergence=None)
847         d = self.clients[0].create_empty_dirnode()
848         d.addCallback(self.log, "GOT private directory")
849         def _got_new_dir(privnode):
850             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
851             d1 = privnode.create_empty_directory(u"personal")
852             d1.addCallback(self.log, "made P/personal")
853             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
854             d1.addCallback(self.log, "made P/personal/sekrit data")
855             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
856             def _got_s2(s2node):
857                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
858                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
859                 return d2
860             d1.addCallback(_got_s2)
861             d1.addCallback(lambda res: privnode)
862             return d1
863         d.addCallback(_got_new_dir)
864         return d
865
866     def _check_publish1(self, res):
867         # this one uses the iterative API
868         c1 = self.clients[1]
869         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
870         d.addCallback(self.log, "check_publish1 got /")
871         d.addCallback(lambda root: root.get(u"subdir1"))
872         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
873         d.addCallback(lambda filenode: filenode.download_to_data())
874         d.addCallback(self.log, "get finished")
875         def _get_done(data):
876             self.failUnlessEqual(data, self.data)
877         d.addCallback(_get_done)
878         return d
879
880     def _check_publish2(self, res):
881         # this one uses the path-based API
882         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
883         d = rootnode.get_child_at_path(u"subdir1")
884         d.addCallback(lambda dirnode:
885                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
886         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
887         d.addCallback(lambda filenode: filenode.download_to_data())
888         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
889
890         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
891         def _got_filenode(filenode):
892             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
893             assert fnode == filenode
894         d.addCallback(_got_filenode)
895         return d
896
897     def _check_publish_private(self, resnode):
898         # this one uses the path-based API
899         self._private_node = resnode
900
901         d = self._private_node.get_child_at_path(u"personal")
902         def _got_personal(personal):
903             self._personal_node = personal
904             return personal
905         d.addCallback(_got_personal)
906
907         d.addCallback(lambda dirnode:
908                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
909         def get_path(path):
910             return self._private_node.get_child_at_path(path)
911
912         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
913         d.addCallback(lambda filenode: filenode.download_to_data())
914         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
915         d.addCallback(lambda res: get_path(u"s2-rw"))
916         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
917         d.addCallback(lambda res: get_path(u"s2-ro"))
918         def _got_s2ro(dirnode):
919             self.failUnless(dirnode.is_mutable(), dirnode)
920             self.failUnless(dirnode.is_readonly(), dirnode)
921             d1 = defer.succeed(None)
922             d1.addCallback(lambda res: dirnode.list())
923             d1.addCallback(self.log, "dirnode.list")
924
925             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
926
927             d1.addCallback(self.log, "doing add_file(ro)")
928             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
929             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
930
931             d1.addCallback(self.log, "doing get(ro)")
932             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
933             d1.addCallback(lambda filenode:
934                            self.failUnless(IFileNode.providedBy(filenode)))
935
936             d1.addCallback(self.log, "doing delete(ro)")
937             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
938
939             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
940
941             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
942
943             personal = self._personal_node
944             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
945
946             d1.addCallback(self.log, "doing move_child_to(ro)2")
947             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
948
949             d1.addCallback(self.log, "finished with _got_s2ro")
950             return d1
951         d.addCallback(_got_s2ro)
952         def _got_home(dummy):
953             home = self._private_node
954             personal = self._personal_node
955             d1 = defer.succeed(None)
956             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
957             d1.addCallback(lambda res:
958                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
959
960             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
961             d1.addCallback(lambda res:
962                            home.move_child_to(u"sekrit", home, u"sekrit data"))
963
964             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
965             d1.addCallback(lambda res:
966                            home.move_child_to(u"sekrit data", personal))
967
968             d1.addCallback(lambda res: home.build_manifest().when_done())
969             d1.addCallback(self.log, "manifest")
970             #  five items:
971             # P/
972             # P/personal/
973             # P/personal/sekrit data
974             # P/s2-rw  (same as P/s2-ro)
975             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
976             d1.addCallback(lambda manifest:
977                            self.failUnlessEqual(len(manifest), 5))
978             d1.addCallback(lambda res: home.start_deep_stats().when_done())
979             def _check_stats(stats):
980                 expected = {"count-immutable-files": 1,
981                             "count-mutable-files": 0,
982                             "count-literal-files": 1,
983                             "count-files": 2,
984                             "count-directories": 3,
985                             "size-immutable-files": 112,
986                             "size-literal-files": 23,
987                             #"size-directories": 616, # varies
988                             #"largest-directory": 616,
989                             "largest-directory-children": 3,
990                             "largest-immutable-file": 112,
991                             }
992                 for k,v in expected.iteritems():
993                     self.failUnlessEqual(stats[k], v,
994                                          "stats[%s] was %s, not %s" %
995                                          (k, stats[k], v))
996                 self.failUnless(stats["size-directories"] > 1300,
997                                 stats["size-directories"])
998                 self.failUnless(stats["largest-directory"] > 800,
999                                 stats["largest-directory"])
1000                 self.failUnlessEqual(stats["size-files-histogram"],
1001                                      [ (11, 31, 1), (101, 316, 1) ])
1002             d1.addCallback(_check_stats)
1003             return d1
1004         d.addCallback(_got_home)
1005         return d
1006
1007     def shouldFail(self, res, expected_failure, which, substring=None):
1008         if isinstance(res, Failure):
1009             res.trap(expected_failure)
1010             if substring:
1011                 self.failUnless(substring in str(res),
1012                                 "substring '%s' not in '%s'"
1013                                 % (substring, str(res)))
1014         else:
1015             self.fail("%s was supposed to raise %s, not get '%s'" %
1016                       (which, expected_failure, res))
1017
1018     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1019         assert substring is None or isinstance(substring, str)
1020         d = defer.maybeDeferred(callable, *args, **kwargs)
1021         def done(res):
1022             if isinstance(res, Failure):
1023                 res.trap(expected_failure)
1024                 if substring:
1025                     self.failUnless(substring in str(res),
1026                                     "substring '%s' not in '%s'"
1027                                     % (substring, str(res)))
1028             else:
1029                 self.fail("%s was supposed to raise %s, not get '%s'" %
1030                           (which, expected_failure, res))
1031         d.addBoth(done)
1032         return d
1033
1034     def PUT(self, urlpath, data):
1035         url = self.webish_url + urlpath
1036         return getPage(url, method="PUT", postdata=data)
1037
1038     def GET(self, urlpath, followRedirect=False):
1039         url = self.webish_url + urlpath
1040         return getPage(url, method="GET", followRedirect=followRedirect)
1041
1042     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1043         if use_helper:
1044             url = self.helper_webish_url + urlpath
1045         else:
1046             url = self.webish_url + urlpath
1047         sepbase = "boogabooga"
1048         sep = "--" + sepbase
1049         form = []
1050         form.append(sep)
1051         form.append('Content-Disposition: form-data; name="_charset"')
1052         form.append('')
1053         form.append('UTF-8')
1054         form.append(sep)
1055         for name, value in fields.iteritems():
1056             if isinstance(value, tuple):
1057                 filename, value = value
1058                 form.append('Content-Disposition: form-data; name="%s"; '
1059                             'filename="%s"' % (name, filename.encode("utf-8")))
1060             else:
1061                 form.append('Content-Disposition: form-data; name="%s"' % name)
1062             form.append('')
1063             form.append(str(value))
1064             form.append(sep)
1065         form[-1] += "--"
1066         body = "\r\n".join(form) + "\r\n"
1067         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1068                    }
1069         return getPage(url, method="POST", postdata=body,
1070                        headers=headers, followRedirect=followRedirect)
1071
1072     def _test_web(self, res):
1073         base = self.webish_url
1074         public = "uri/" + self._root_directory_uri
1075         d = getPage(base)
1076         def _got_welcome(page):
1077             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1078             self.failUnless(expected in page,
1079                             "I didn't see the right 'connected storage servers'"
1080                             " message in: %s" % page
1081                             )
1082             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1083             self.failUnless(expected in page,
1084                             "I didn't see the right 'My nodeid' message "
1085                             "in: %s" % page)
1086             self.failUnless("Helper: 0 active uploads" in page)
1087         d.addCallback(_got_welcome)
1088         d.addCallback(self.log, "done with _got_welcome")
1089
1090         # get the welcome page from the node that uses the helper too
1091         d.addCallback(lambda res: getPage(self.helper_webish_url))
1092         def _got_welcome_helper(page):
1093             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1094                             page)
1095             self.failUnless("Not running helper" in page)
1096         d.addCallback(_got_welcome_helper)
1097
1098         d.addCallback(lambda res: getPage(base + public))
1099         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1100         def _got_subdir1(page):
1101             # there ought to be an href for our file
1102             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1103             self.failUnless(">mydata567</a>" in page)
1104         d.addCallback(_got_subdir1)
1105         d.addCallback(self.log, "done with _got_subdir1")
1106         d.addCallback(lambda res:
1107                       getPage(base + public + "/subdir1/mydata567"))
1108         def _got_data(page):
1109             self.failUnlessEqual(page, self.data)
1110         d.addCallback(_got_data)
1111
1112         # download from a URI embedded in a URL
1113         d.addCallback(self.log, "_get_from_uri")
1114         def _get_from_uri(res):
1115             return getPage(base + "uri/%s?filename=%s"
1116                            % (self.uri, "mydata567"))
1117         d.addCallback(_get_from_uri)
1118         def _got_from_uri(page):
1119             self.failUnlessEqual(page, self.data)
1120         d.addCallback(_got_from_uri)
1121
1122         # download from a URI embedded in a URL, second form
1123         d.addCallback(self.log, "_get_from_uri2")
1124         def _get_from_uri2(res):
1125             return getPage(base + "uri?uri=%s" % (self.uri,))
1126         d.addCallback(_get_from_uri2)
1127         d.addCallback(_got_from_uri)
1128
1129         # download from a bogus URI, make sure we get a reasonable error
1130         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1131         def _get_from_bogus_uri(res):
1132             d1 = getPage(base + "uri/%s?filename=%s"
1133                          % (self.mangle_uri(self.uri), "mydata567"))
1134             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1135                        "410")
1136             return d1
1137         d.addCallback(_get_from_bogus_uri)
1138         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1139
1140         # upload a file with PUT
1141         d.addCallback(self.log, "about to try PUT")
1142         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1143                                            "new.txt contents"))
1144         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1145         d.addCallback(self.failUnlessEqual, "new.txt contents")
1146         # and again with something large enough to use multiple segments,
1147         # and hopefully trigger pauseProducing too
1148         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1149                                            "big" * 500000)) # 1.5MB
1150         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1151         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1152
1153         # can we replace files in place?
1154         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1155                                            "NEWER contents"))
1156         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1157         d.addCallback(self.failUnlessEqual, "NEWER contents")
1158
1159         # test unlinked POST
1160         d.addCallback(lambda res: self.POST("uri", t="upload",
1161                                             file=("new.txt", "data" * 10000)))
1162         # and again using the helper, which exercises different upload-status
1163         # display code
1164         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1165                                             file=("foo.txt", "data2" * 10000)))
1166
1167         # check that the status page exists
1168         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1169         def _got_status(res):
1170             # find an interesting upload and download to look at. LIT files
1171             # are not interesting.
1172             for ds in self.clients[0].list_all_download_statuses():
1173                 if ds.get_size() > 200:
1174                     self._down_status = ds.get_counter()
1175             for us in self.clients[0].list_all_upload_statuses():
1176                 if us.get_size() > 200:
1177                     self._up_status = us.get_counter()
1178             rs = self.clients[0].list_all_retrieve_statuses()[0]
1179             self._retrieve_status = rs.get_counter()
1180             ps = self.clients[0].list_all_publish_statuses()[0]
1181             self._publish_status = ps.get_counter()
1182             us = self.clients[0].list_all_mapupdate_statuses()[0]
1183             self._update_status = us.get_counter()
1184
1185             # and that there are some upload- and download- status pages
1186             return self.GET("status/up-%d" % self._up_status)
1187         d.addCallback(_got_status)
1188         def _got_up(res):
1189             return self.GET("status/down-%d" % self._down_status)
1190         d.addCallback(_got_up)
1191         def _got_down(res):
1192             return self.GET("status/mapupdate-%d" % self._update_status)
1193         d.addCallback(_got_down)
1194         def _got_update(res):
1195             return self.GET("status/publish-%d" % self._publish_status)
1196         d.addCallback(_got_update)
1197         def _got_publish(res):
1198             return self.GET("status/retrieve-%d" % self._retrieve_status)
1199         d.addCallback(_got_publish)
1200
1201         # check that the helper status page exists
1202         d.addCallback(lambda res:
1203                       self.GET("helper_status", followRedirect=True))
1204         def _got_helper_status(res):
1205             self.failUnless("Bytes Fetched:" in res)
1206             # touch a couple of files in the helper's working directory to
1207             # exercise more code paths
1208             workdir = os.path.join(self.getdir("client0"), "helper")
1209             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1210             f = open(incfile, "wb")
1211             f.write("small file")
1212             f.close()
1213             then = time.time() - 86400*3
1214             now = time.time()
1215             os.utime(incfile, (now, then))
1216             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1217             f = open(encfile, "wb")
1218             f.write("less small file")
1219             f.close()
1220             os.utime(encfile, (now, then))
1221         d.addCallback(_got_helper_status)
1222         # and that the json form exists
1223         d.addCallback(lambda res:
1224                       self.GET("helper_status?t=json", followRedirect=True))
1225         def _got_helper_status_json(res):
1226             data = simplejson.loads(res)
1227             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1228                                  1)
1229             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1230             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1231             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1232                                  10)
1233             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1234             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1235             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1236                                  15)
1237         d.addCallback(_got_helper_status_json)
1238
1239         # and check that client[3] (which uses a helper but does not run one
1240         # itself) doesn't explode when you ask for its status
1241         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1242         def _got_non_helper_status(res):
1243             self.failUnless("Upload and Download Status" in res)
1244         d.addCallback(_got_non_helper_status)
1245
1246         # or for helper status with t=json
1247         d.addCallback(lambda res:
1248                       getPage(self.helper_webish_url + "helper_status?t=json"))
1249         def _got_non_helper_status_json(res):
1250             data = simplejson.loads(res)
1251             self.failUnlessEqual(data, {})
1252         d.addCallback(_got_non_helper_status_json)
1253
1254         # see if the statistics page exists
1255         d.addCallback(lambda res: self.GET("statistics"))
1256         def _got_stats(res):
1257             self.failUnless("Node Statistics" in res)
1258             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1259         d.addCallback(_got_stats)
1260         d.addCallback(lambda res: self.GET("statistics?t=json"))
1261         def _got_stats_json(res):
1262             data = simplejson.loads(res)
1263             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1264             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1265         d.addCallback(_got_stats_json)
1266
1267         # TODO: mangle the second segment of a file, to test errors that
1268         # occur after we've already sent some good data, which uses a
1269         # different error path.
1270
1271         # TODO: download a URI with a form
1272         # TODO: create a directory by using a form
1273         # TODO: upload by using a form on the directory page
1274         #    url = base + "somedir/subdir1/freeform_post!!upload"
1275         # TODO: delete a file by using a button on the directory page
1276
1277         return d
1278
1279     def _test_runner(self, res):
1280         # exercise some of the diagnostic tools in runner.py
1281
1282         # find a share
1283         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1284             if "storage" not in dirpath:
1285                 continue
1286             if not filenames:
1287                 continue
1288             pieces = dirpath.split(os.sep)
1289             if pieces[-4] == "storage" and pieces[-3] == "shares":
1290                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1291                 # are sharefiles here
1292                 filename = os.path.join(dirpath, filenames[0])
1293                 # peek at the magic to see if it is a chk share
1294                 magic = open(filename, "rb").read(4)
1295                 if magic == '\x00\x00\x00\x01':
1296                     break
1297         else:
1298             self.fail("unable to find any uri_extension files in %s"
1299                       % self.basedir)
1300         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1301
1302         out,err = StringIO(), StringIO()
1303         rc = runner.runner(["debug", "dump-share", "--offsets",
1304                             filename],
1305                            stdout=out, stderr=err)
1306         output = out.getvalue()
1307         self.failUnlessEqual(rc, 0)
1308
1309         # we only upload a single file, so we can assert some things about
1310         # its size and shares.
1311         self.failUnless(("share filename: %s" % filename) in output)
1312         self.failUnless("size: %d\n" % len(self.data) in output)
1313         self.failUnless("num_segments: 1\n" in output)
1314         # segment_size is always a multiple of needed_shares
1315         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1316         self.failUnless("total_shares: 10\n" in output)
1317         # keys which are supposed to be present
1318         for key in ("size", "num_segments", "segment_size",
1319                     "needed_shares", "total_shares",
1320                     "codec_name", "codec_params", "tail_codec_params",
1321                     #"plaintext_hash", "plaintext_root_hash",
1322                     "crypttext_hash", "crypttext_root_hash",
1323                     "share_root_hash", "UEB_hash"):
1324             self.failUnless("%s: " % key in output, key)
1325         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1326
1327         # now use its storage index to find the other shares using the
1328         # 'find-shares' tool
1329         sharedir, shnum = os.path.split(filename)
1330         storagedir, storage_index_s = os.path.split(sharedir)
1331         out,err = StringIO(), StringIO()
1332         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1333         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1334         rc = runner.runner(cmd, stdout=out, stderr=err)
1335         self.failUnlessEqual(rc, 0)
1336         out.seek(0)
1337         sharefiles = [sfn.strip() for sfn in out.readlines()]
1338         self.failUnlessEqual(len(sharefiles), 10)
1339
1340         # also exercise the 'catalog-shares' tool
1341         out,err = StringIO(), StringIO()
1342         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1343         cmd = ["debug", "catalog-shares"] + nodedirs
1344         rc = runner.runner(cmd, stdout=out, stderr=err)
1345         self.failUnlessEqual(rc, 0)
1346         out.seek(0)
1347         descriptions = [sfn.strip() for sfn in out.readlines()]
1348         self.failUnlessEqual(len(descriptions), 30)
1349         matching = [line
1350                     for line in descriptions
1351                     if line.startswith("CHK %s " % storage_index_s)]
1352         self.failUnlessEqual(len(matching), 10)
1353
1354     def _test_control(self, res):
1355         # exercise the remote-control-the-client foolscap interfaces in
1356         # allmydata.control (mostly used for performance tests)
1357         c0 = self.clients[0]
1358         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1359         control_furl = open(control_furl_file, "r").read().strip()
1360         # it doesn't really matter which Tub we use to connect to the client,
1361         # so let's just use our IntroducerNode's
1362         d = self.introducer.tub.getReference(control_furl)
1363         d.addCallback(self._test_control2, control_furl_file)
1364         return d
1365     def _test_control2(self, rref, filename):
1366         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1367         downfile = os.path.join(self.basedir, "control.downfile")
1368         d.addCallback(lambda uri:
1369                       rref.callRemote("download_from_uri_to_file",
1370                                       uri, downfile))
1371         def _check(res):
1372             self.failUnlessEqual(res, downfile)
1373             data = open(downfile, "r").read()
1374             expected_data = open(filename, "r").read()
1375             self.failUnlessEqual(data, expected_data)
1376         d.addCallback(_check)
1377         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1378         if sys.platform == "linux2":
1379             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1380         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1381         return d
1382
1383     def _test_cli(self, res):
1384         # run various CLI commands (in a thread, since they use blocking
1385         # network calls)
1386
1387         private_uri = self._private_node.get_uri()
1388         some_uri = self._root_directory_uri
1389         client0_basedir = self.getdir("client0")
1390
1391         nodeargs = [
1392             "--node-directory", client0_basedir,
1393             ]
1394         TESTDATA = "I will not write the same thing over and over.\n" * 100
1395
1396         d = defer.succeed(None)
1397
1398         # for compatibility with earlier versions, private/root_dir.cap is
1399         # supposed to be treated as an alias named "tahoe:". Start by making
1400         # sure that works, before we add other aliases.
1401
1402         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1403         f = open(root_file, "w")
1404         f.write(private_uri)
1405         f.close()
1406
1407         def run(ignored, verb, *args, **kwargs):
1408             stdin = kwargs.get("stdin", "")
1409             newargs = [verb] + nodeargs + list(args)
1410             return self._run_cli(newargs, stdin=stdin)
1411
1412         def _check_ls((out,err), expected_children, unexpected_children=[]):
1413             self.failUnlessEqual(err, "")
1414             for s in expected_children:
1415                 self.failUnless(s in out, (s,out))
1416             for s in unexpected_children:
1417                 self.failIf(s in out, (s,out))
1418
1419         def _check_ls_root((out,err)):
1420             self.failUnless("personal" in out)
1421             self.failUnless("s2-ro" in out)
1422             self.failUnless("s2-rw" in out)
1423             self.failUnlessEqual(err, "")
1424
1425         # this should reference private_uri
1426         d.addCallback(run, "ls")
1427         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1428
1429         d.addCallback(run, "list-aliases")
1430         def _check_aliases_1((out,err)):
1431             self.failUnlessEqual(err, "")
1432             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1433         d.addCallback(_check_aliases_1)
1434
1435         # now that that's out of the way, remove root_dir.cap and work with
1436         # new files
1437         d.addCallback(lambda res: os.unlink(root_file))
1438         d.addCallback(run, "list-aliases")
1439         def _check_aliases_2((out,err)):
1440             self.failUnlessEqual(err, "")
1441             self.failUnlessEqual(out, "")
1442         d.addCallback(_check_aliases_2)
1443
1444         d.addCallback(run, "mkdir")
1445         def _got_dir( (out,err) ):
1446             self.failUnless(uri.from_string_dirnode(out.strip()))
1447             return out.strip()
1448         d.addCallback(_got_dir)
1449         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1450
1451         d.addCallback(run, "list-aliases")
1452         def _check_aliases_3((out,err)):
1453             self.failUnlessEqual(err, "")
1454             self.failUnless("tahoe: " in out)
1455         d.addCallback(_check_aliases_3)
1456
1457         def _check_empty_dir((out,err)):
1458             self.failUnlessEqual(out, "")
1459             self.failUnlessEqual(err, "")
1460         d.addCallback(run, "ls")
1461         d.addCallback(_check_empty_dir)
1462
1463         def _check_missing_dir((out,err)):
1464             # TODO: check that rc==2
1465             self.failUnlessEqual(out, "")
1466             self.failUnlessEqual(err, "No such file or directory\n")
1467         d.addCallback(run, "ls", "bogus")
1468         d.addCallback(_check_missing_dir)
1469
1470         files = []
1471         datas = []
1472         for i in range(10):
1473             fn = os.path.join(self.basedir, "file%d" % i)
1474             files.append(fn)
1475             data = "data to be uploaded: file%d\n" % i
1476             datas.append(data)
1477             open(fn,"wb").write(data)
1478
1479         def _check_stdout_against((out,err), filenum=None, data=None):
1480             self.failUnlessEqual(err, "")
1481             if filenum is not None:
1482                 self.failUnlessEqual(out, datas[filenum])
1483             if data is not None:
1484                 self.failUnlessEqual(out, data)
1485
1486         # test all both forms of put: from a file, and from stdin
1487         #  tahoe put bar FOO
1488         d.addCallback(run, "put", files[0], "tahoe-file0")
1489         def _put_out((out,err)):
1490             self.failUnless("URI:LIT:" in out, out)
1491             self.failUnless("201 Created" in err, err)
1492             uri0 = out.strip()
1493             return run(None, "get", uri0)
1494         d.addCallback(_put_out)
1495         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1496
1497         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1498         #  tahoe put bar tahoe:FOO
1499         d.addCallback(run, "put", files[2], "tahoe:file2")
1500         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1501         def _check_put_mutable((out,err)):
1502             self._mutable_file3_uri = out.strip()
1503         d.addCallback(_check_put_mutable)
1504         d.addCallback(run, "get", "tahoe:file3")
1505         d.addCallback(_check_stdout_against, 3)
1506
1507         #  tahoe put FOO
1508         STDIN_DATA = "This is the file to upload from stdin."
1509         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1510         #  tahoe put tahoe:FOO
1511         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1512                       stdin="Other file from stdin.")
1513
1514         d.addCallback(run, "ls")
1515         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1516                                   "tahoe-file-stdin", "from-stdin"])
1517         d.addCallback(run, "ls", "subdir")
1518         d.addCallback(_check_ls, ["tahoe-file1"])
1519
1520         # tahoe mkdir FOO
1521         d.addCallback(run, "mkdir", "subdir2")
1522         d.addCallback(run, "ls")
1523         # TODO: extract the URI, set an alias with it
1524         d.addCallback(_check_ls, ["subdir2"])
1525
1526         # tahoe get: (to stdin and to a file)
1527         d.addCallback(run, "get", "tahoe-file0")
1528         d.addCallback(_check_stdout_against, 0)
1529         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1530         d.addCallback(_check_stdout_against, 1)
1531         outfile0 = os.path.join(self.basedir, "outfile0")
1532         d.addCallback(run, "get", "file2", outfile0)
1533         def _check_outfile0((out,err)):
1534             data = open(outfile0,"rb").read()
1535             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1536         d.addCallback(_check_outfile0)
1537         outfile1 = os.path.join(self.basedir, "outfile0")
1538         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1539         def _check_outfile1((out,err)):
1540             data = open(outfile1,"rb").read()
1541             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1542         d.addCallback(_check_outfile1)
1543
1544         d.addCallback(run, "rm", "tahoe-file0")
1545         d.addCallback(run, "rm", "tahoe:file2")
1546         d.addCallback(run, "ls")
1547         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1548
1549         d.addCallback(run, "ls", "-l")
1550         def _check_ls_l((out,err)):
1551             lines = out.split("\n")
1552             for l in lines:
1553                 if "tahoe-file-stdin" in l:
1554                     self.failUnless(l.startswith("-r-- "), l)
1555                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1556                 if "file3" in l:
1557                     self.failUnless(l.startswith("-rw- "), l) # mutable
1558         d.addCallback(_check_ls_l)
1559
1560         d.addCallback(run, "ls", "--uri")
1561         def _check_ls_uri((out,err)):
1562             lines = out.split("\n")
1563             for l in lines:
1564                 if "file3" in l:
1565                     self.failUnless(self._mutable_file3_uri in l)
1566         d.addCallback(_check_ls_uri)
1567
1568         d.addCallback(run, "ls", "--readonly-uri")
1569         def _check_ls_rouri((out,err)):
1570             lines = out.split("\n")
1571             for l in lines:
1572                 if "file3" in l:
1573                     rw_uri = self._mutable_file3_uri
1574                     u = uri.from_string_mutable_filenode(rw_uri)
1575                     ro_uri = u.get_readonly().to_string()
1576                     self.failUnless(ro_uri in l)
1577         d.addCallback(_check_ls_rouri)
1578
1579
1580         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1581         d.addCallback(run, "ls")
1582         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1583
1584         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1585         d.addCallback(run, "ls")
1586         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1587
1588         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1589         d.addCallback(run, "ls")
1590         d.addCallback(_check_ls, ["file3", "file3-copy"])
1591         d.addCallback(run, "get", "tahoe:file3-copy")
1592         d.addCallback(_check_stdout_against, 3)
1593
1594         # copy from disk into tahoe
1595         d.addCallback(run, "cp", files[4], "tahoe:file4")
1596         d.addCallback(run, "ls")
1597         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1598         d.addCallback(run, "get", "tahoe:file4")
1599         d.addCallback(_check_stdout_against, 4)
1600
1601         # copy from tahoe into disk
1602         target_filename = os.path.join(self.basedir, "file-out")
1603         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1604         def _check_cp_out((out,err)):
1605             self.failUnless(os.path.exists(target_filename))
1606             got = open(target_filename,"rb").read()
1607             self.failUnlessEqual(got, datas[4])
1608         d.addCallback(_check_cp_out)
1609
1610         # copy from disk to disk (silly case)
1611         target2_filename = os.path.join(self.basedir, "file-out-copy")
1612         d.addCallback(run, "cp", target_filename, target2_filename)
1613         def _check_cp_out2((out,err)):
1614             self.failUnless(os.path.exists(target2_filename))
1615             got = open(target2_filename,"rb").read()
1616             self.failUnlessEqual(got, datas[4])
1617         d.addCallback(_check_cp_out2)
1618
1619         # copy from tahoe into disk, overwriting an existing file
1620         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1621         def _check_cp_out3((out,err)):
1622             self.failUnless(os.path.exists(target_filename))
1623             got = open(target_filename,"rb").read()
1624             self.failUnlessEqual(got, datas[3])
1625         d.addCallback(_check_cp_out3)
1626
1627         # copy from disk into tahoe, overwriting an existing immutable file
1628         d.addCallback(run, "cp", files[5], "tahoe:file4")
1629         d.addCallback(run, "ls")
1630         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1631         d.addCallback(run, "get", "tahoe:file4")
1632         d.addCallback(_check_stdout_against, 5)
1633
1634         # copy from disk into tahoe, overwriting an existing mutable file
1635         d.addCallback(run, "cp", files[5], "tahoe:file3")
1636         d.addCallback(run, "ls")
1637         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1638         d.addCallback(run, "get", "tahoe:file3")
1639         d.addCallback(_check_stdout_against, 5)
1640
1641         # recursive copy: setup
1642         dn = os.path.join(self.basedir, "dir1")
1643         os.makedirs(dn)
1644         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1645         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1646         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1647         sdn2 = os.path.join(dn, "subdir2")
1648         os.makedirs(sdn2)
1649         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1650         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1651
1652         # from disk into tahoe
1653         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1654         d.addCallback(run, "ls")
1655         d.addCallback(_check_ls, ["dir1"])
1656         d.addCallback(run, "ls", "dir1")
1657         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1658                       ["rfile4", "rfile5"])
1659         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1660         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1661                       ["rfile1", "rfile2", "rfile3"])
1662         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1663         d.addCallback(_check_stdout_against, data="rfile4")
1664
1665         # and back out again
1666         dn_copy = os.path.join(self.basedir, "dir1-copy")
1667         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1668         def _check_cp_r_out((out,err)):
1669             def _cmp(name):
1670                 old = open(os.path.join(dn, name), "rb").read()
1671                 newfn = os.path.join(dn_copy, name)
1672                 self.failUnless(os.path.exists(newfn))
1673                 new = open(newfn, "rb").read()
1674                 self.failUnlessEqual(old, new)
1675             _cmp("rfile1")
1676             _cmp("rfile2")
1677             _cmp("rfile3")
1678             _cmp(os.path.join("subdir2", "rfile4"))
1679             _cmp(os.path.join("subdir2", "rfile5"))
1680         d.addCallback(_check_cp_r_out)
1681
1682         # and copy it a second time, which ought to overwrite the same files
1683         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1684
1685         # and tahoe-to-tahoe
1686         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1687         d.addCallback(run, "ls")
1688         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1689         d.addCallback(run, "ls", "dir1-copy")
1690         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1691                       ["rfile4", "rfile5"])
1692         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1693         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1694                       ["rfile1", "rfile2", "rfile3"])
1695         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1696         d.addCallback(_check_stdout_against, data="rfile4")
1697
1698         # and copy it a second time, which ought to overwrite the same files
1699         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1700
1701         # tahoe_ls doesn't currently handle the error correctly: it tries to
1702         # JSON-parse a traceback.
1703 ##         def _ls_missing(res):
1704 ##             argv = ["ls"] + nodeargs + ["bogus"]
1705 ##             return self._run_cli(argv)
1706 ##         d.addCallback(_ls_missing)
1707 ##         def _check_ls_missing((out,err)):
1708 ##             print "OUT", out
1709 ##             print "ERR", err
1710 ##             self.failUnlessEqual(err, "")
1711 ##         d.addCallback(_check_ls_missing)
1712
1713         return d
1714
1715     def _run_cli(self, argv, stdin=""):
1716         #print "CLI:", argv
1717         stdout, stderr = StringIO(), StringIO()
1718         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1719                                   stdin=StringIO(stdin),
1720                                   stdout=stdout, stderr=stderr)
1721         def _done(res):
1722             return stdout.getvalue(), stderr.getvalue()
1723         d.addCallback(_done)
1724         return d
1725
1726     def _test_checker(self, res):
1727         ut = upload.Data("too big to be literal" * 200, convergence=None)
1728         d = self._personal_node.add_file(u"big file", ut)
1729
1730         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1731         def _check_dirnode_results(r):
1732             self.failUnless(r.is_healthy())
1733         d.addCallback(_check_dirnode_results)
1734         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1735         d.addCallback(_check_dirnode_results)
1736
1737         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1738         def _got_chk_filenode(n):
1739             self.failUnless(isinstance(n, filenode.FileNode))
1740             d = n.check(Monitor())
1741             def _check_filenode_results(r):
1742                 self.failUnless(r.is_healthy())
1743             d.addCallback(_check_filenode_results)
1744             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1745             d.addCallback(_check_filenode_results)
1746             return d
1747         d.addCallback(_got_chk_filenode)
1748
1749         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1750         def _got_lit_filenode(n):
1751             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1752             d = n.check(Monitor())
1753             def _check_lit_filenode_results(r):
1754                 self.failUnlessEqual(r, None)
1755             d.addCallback(_check_lit_filenode_results)
1756             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1757             d.addCallback(_check_lit_filenode_results)
1758             return d
1759         d.addCallback(_got_lit_filenode)
1760         return d
1761
1762
1763 class MutableChecker(SystemTestMixin, unittest.TestCase, ErrorMixin):
1764
1765     def _run_cli(self, argv):
1766         stdout, stderr = StringIO(), StringIO()
1767         # this can only do synchronous operations
1768         assert argv[0] == "debug"
1769         runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1770         return stdout.getvalue()
1771
1772     def test_good(self):
1773         self.basedir = self.mktemp()
1774         d = self.set_up_nodes()
1775         CONTENTS = "a little bit of data"
1776         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1777         def _created(node):
1778             self.node = node
1779             si = self.node.get_storage_index()
1780         d.addCallback(_created)
1781         # now make sure the webapi verifier sees no problems
1782         def _do_check(res):
1783             url = (self.webish_url +
1784                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1785                    "?t=check&verify=true")
1786             return getPage(url, method="POST")
1787         d.addCallback(_do_check)
1788         def _got_results(out):
1789             self.failUnless("<span>Healthy : Healthy</span>" in out, out)
1790             self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1791             self.failIf("Not Healthy!" in out, out)
1792             self.failIf("Unhealthy" in out, out)
1793             self.failIf("Corrupt Shares" in out, out)
1794         d.addCallback(_got_results)
1795         d.addErrback(self.explain_web_error)
1796         return d
1797
1798     def test_corrupt(self):
1799         self.basedir = self.mktemp()
1800         d = self.set_up_nodes()
1801         CONTENTS = "a little bit of data"
1802         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1803         def _created(node):
1804             self.node = node
1805             si = self.node.get_storage_index()
1806             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1807                                 self.clients[1].basedir])
1808             files = out.split("\n")
1809             # corrupt one of them, using the CLI debug command
1810             f = files[0]
1811             shnum = os.path.basename(f)
1812             nodeid = self.clients[1].nodeid
1813             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1814             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1815             out = self._run_cli(["debug", "corrupt-share", files[0]])
1816         d.addCallback(_created)
1817         # now make sure the webapi verifier notices it
1818         def _do_check(res):
1819             url = (self.webish_url +
1820                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1821                    "?t=check&verify=true")
1822             return getPage(url, method="POST")
1823         d.addCallback(_do_check)
1824         def _got_results(out):
1825             self.failUnless("Not Healthy!" in out, out)
1826             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1827             self.failUnless("Corrupt Shares:" in out, out)
1828         d.addCallback(_got_results)
1829
1830         # now make sure the webapi repairer can fix it
1831         def _do_repair(res):
1832             url = (self.webish_url +
1833                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1834                    "?t=check&verify=true&repair=true")
1835             return getPage(url, method="POST")
1836         d.addCallback(_do_repair)
1837         def _got_repair_results(out):
1838             self.failUnless("<div>Repair successful</div>" in out, out)
1839         d.addCallback(_got_repair_results)
1840         d.addCallback(_do_check)
1841         def _got_postrepair_results(out):
1842             self.failIf("Not Healthy!" in out, out)
1843             self.failUnless("Recoverable Versions: 10*seq" in out, out)
1844         d.addCallback(_got_postrepair_results)
1845         d.addErrback(self.explain_web_error)
1846
1847         return d
1848
1849     def test_delete_share(self):
1850         self.basedir = self.mktemp()
1851         d = self.set_up_nodes()
1852         CONTENTS = "a little bit of data"
1853         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1854         def _created(node):
1855             self.node = node
1856             si = self.node.get_storage_index()
1857             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1858                                 self.clients[1].basedir])
1859             files = out.split("\n")
1860             # corrupt one of them, using the CLI debug command
1861             f = files[0]
1862             shnum = os.path.basename(f)
1863             nodeid = self.clients[1].nodeid
1864             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1865             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1866             os.unlink(files[0])
1867         d.addCallback(_created)
1868         # now make sure the webapi checker notices it
1869         def _do_check(res):
1870             url = (self.webish_url +
1871                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1872                    "?t=check&verify=false")
1873             return getPage(url, method="POST")
1874         d.addCallback(_do_check)
1875         def _got_results(out):
1876             self.failUnless("Not Healthy!" in out, out)
1877             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1878             self.failIf("Corrupt Shares" in out, out)
1879         d.addCallback(_got_results)
1880
1881         # now make sure the webapi repairer can fix it
1882         def _do_repair(res):
1883             url = (self.webish_url +
1884                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1885                    "?t=check&verify=false&repair=true")
1886             return getPage(url, method="POST")
1887         d.addCallback(_do_repair)
1888         def _got_repair_results(out):
1889             self.failUnless("Repair successful" in out)
1890         d.addCallback(_got_repair_results)
1891         d.addCallback(_do_check)
1892         def _got_postrepair_results(out):
1893             self.failIf("Not Healthy!" in out, out)
1894             self.failUnless("Recoverable Versions: 10*seq" in out)
1895         d.addCallback(_got_postrepair_results)
1896         d.addErrback(self.explain_web_error)
1897
1898         return d
1899
1900
1901 class DeepCheckBase(SystemTestMixin, ErrorMixin):
1902
1903     def web_json(self, n, **kwargs):
1904         kwargs["output"] = "json"
1905         d = self.web(n, "POST", **kwargs)
1906         d.addCallback(self.decode_json)
1907         return d
1908
1909     def decode_json(self, (s,url)):
1910         try:
1911             data = simplejson.loads(s)
1912         except ValueError:
1913             self.fail("%s: not JSON: '%s'" % (url, s))
1914         return data
1915
1916     def web(self, n, method="GET", **kwargs):
1917         # returns (data, url)
1918         url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
1919                + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
1920         d = getPage(url, method=method)
1921         d.addCallback(lambda data: (data,url))
1922         return d
1923
1924     def wait_for_operation(self, ignored, ophandle):
1925         url = self.webish_url + "operations/" + ophandle
1926         url += "?t=status&output=JSON"
1927         d = getPage(url)
1928         def _got(res):
1929             try:
1930                 data = simplejson.loads(res)
1931             except ValueError:
1932                 self.fail("%s: not JSON: '%s'" % (url, res))
1933             if not data["finished"]:
1934                 d = self.stall(delay=1.0)
1935                 d.addCallback(self.wait_for_operation, ophandle)
1936                 return d
1937             return data
1938         d.addCallback(_got)
1939         return d
1940
1941     def get_operation_results(self, ignored, ophandle, output=None):
1942         url = self.webish_url + "operations/" + ophandle
1943         url += "?t=status"
1944         if output:
1945             url += "&output=" + output
1946         d = getPage(url)
1947         def _got(res):
1948             if output and output.lower() == "json":
1949                 try:
1950                     return simplejson.loads(res)
1951                 except ValueError:
1952                     self.fail("%s: not JSON: '%s'" % (url, res))
1953             return res
1954         d.addCallback(_got)
1955         return d
1956
1957     def slow_web(self, n, output=None, **kwargs):
1958         # use ophandle=
1959         handle = base32.b2a(os.urandom(4))
1960         d = self.web(n, "POST", ophandle=handle, **kwargs)
1961         d.addCallback(self.wait_for_operation, handle)
1962         d.addCallback(self.get_operation_results, handle, output=output)
1963         return d
1964
1965
1966 class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
1967     # construct a small directory tree (with one dir, one immutable file, one
1968     # mutable file, one LIT file, and a loop), and then check/examine it in
1969     # various ways.
1970
1971     def set_up_tree(self, ignored):
1972         # 2.9s
1973
1974         # root
1975         #   mutable
1976         #   large
1977         #   small
1978         #   small2
1979         #   loop -> root
1980         c0 = self.clients[0]
1981         d = c0.create_empty_dirnode()
1982         def _created_root(n):
1983             self.root = n
1984             self.root_uri = n.get_uri()
1985         d.addCallback(_created_root)
1986         d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1987         d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1988         def _created_mutable(n):
1989             self.mutable = n
1990             self.mutable_uri = n.get_uri()
1991         d.addCallback(_created_mutable)
1992
1993         large = upload.Data("Lots of data\n" * 1000, None)
1994         d.addCallback(lambda ign: self.root.add_file(u"large", large))
1995         def _created_large(n):
1996             self.large = n
1997             self.large_uri = n.get_uri()
1998         d.addCallback(_created_large)
1999
2000         small = upload.Data("Small enough for a LIT", None)
2001         d.addCallback(lambda ign: self.root.add_file(u"small", small))
2002         def _created_small(n):
2003             self.small = n
2004             self.small_uri = n.get_uri()
2005         d.addCallback(_created_small)
2006
2007         small2 = upload.Data("Small enough for a LIT too", None)
2008         d.addCallback(lambda ign: self.root.add_file(u"small2", small2))
2009         def _created_small2(n):
2010             self.small2 = n
2011             self.small2_uri = n.get_uri()
2012         d.addCallback(_created_small2)
2013
2014         d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
2015         return d
2016
2017     def check_is_healthy(self, cr, n, where, incomplete=False):
2018         self.failUnless(ICheckerResults.providedBy(cr), where)
2019         self.failUnless(cr.is_healthy(), where)
2020         self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
2021                              where)
2022         self.failUnlessEqual(cr.get_storage_index_string(),
2023                              base32.b2a(n.get_storage_index()), where)
2024         needs_rebalancing = bool( len(self.clients) < 10 )
2025         if not incomplete:
2026             self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
2027         d = cr.get_data()
2028         self.failUnlessEqual(d["count-shares-good"], 10, where)
2029         self.failUnlessEqual(d["count-shares-needed"], 3, where)
2030         self.failUnlessEqual(d["count-shares-expected"], 10, where)
2031         if not incomplete:
2032             self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
2033         self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
2034         self.failUnlessEqual(d["list-corrupt-shares"], [], where)
2035         if not incomplete:
2036             self.failUnlessEqual(sorted(d["servers-responding"]),
2037                                  sorted([c.nodeid for c in self.clients]),
2038                                  where)
2039             self.failUnless("sharemap" in d, where)
2040             all_serverids = set()
2041             for (shareid, serverids) in d["sharemap"].items():
2042                 all_serverids.update(serverids)
2043             self.failUnlessEqual(sorted(all_serverids),
2044                                  sorted([c.nodeid for c in self.clients]),
2045                                  where)
2046
2047         self.failUnlessEqual(d["count-wrong-shares"], 0, where)
2048         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2049         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2050
2051
2052     def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
2053         self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
2054         self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
2055         self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
2056         self.failUnless(cr.get_post_repair_results().is_healthy(), where)
2057         self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
2058         self.failIf(cr.get_repair_attempted(), where)
2059
2060     def deep_check_is_healthy(self, cr, num_healthy, where):
2061         self.failUnless(IDeepCheckResults.providedBy(cr))
2062         self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
2063                              num_healthy, where)
2064
2065     def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
2066         self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
2067         c = cr.get_counters()
2068         self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
2069                              num_healthy, where)
2070         self.failUnlessEqual(c["count-objects-healthy-post-repair"],
2071                              num_healthy, where)
2072         self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
2073
2074     def test_good(self):
2075         self.basedir = self.mktemp()
2076         d = self.set_up_nodes()
2077         d.addCallback(self.set_up_tree)
2078         d.addCallback(self.do_stats)
2079         d.addCallback(self.do_test_check_good)
2080         d.addCallback(self.do_test_web_good)
2081         d.addCallback(self.do_test_cli_good)
2082         d.addErrback(self.explain_web_error)
2083         d.addErrback(self.explain_error)
2084         return d
2085
2086     def do_stats(self, ignored):
2087         d = defer.succeed(None)
2088         d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
2089         d.addCallback(self.check_stats_good)
2090         return d
2091
2092     def check_stats_good(self, s):
2093         self.failUnlessEqual(s["count-directories"], 1)
2094         self.failUnlessEqual(s["count-files"], 4)
2095         self.failUnlessEqual(s["count-immutable-files"], 1)
2096         self.failUnlessEqual(s["count-literal-files"], 2)
2097         self.failUnlessEqual(s["count-mutable-files"], 1)
2098         # don't check directories: their size will vary
2099         # s["largest-directory"]
2100         # s["size-directories"]
2101         self.failUnlessEqual(s["largest-directory-children"], 5)
2102         self.failUnlessEqual(s["largest-immutable-file"], 13000)
2103         # to re-use this function for both the local
2104         # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
2105         # coerce the result into a list of tuples. dirnode.start_deep_stats()
2106         # returns a list of tuples, but JSON only knows about lists., so
2107         # t=start-deep-stats returns a list of lists.
2108         histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
2109         self.failUnlessEqual(histogram, [(11, 31, 2),
2110                                          (10001, 31622, 1),
2111                                          ])
2112         self.failUnlessEqual(s["size-immutable-files"], 13000)
2113         self.failUnlessEqual(s["size-literal-files"], 48)
2114
2115     def do_test_check_good(self, ignored):
2116         d = defer.succeed(None)
2117         # check the individual items
2118         d.addCallback(lambda ign: self.root.check(Monitor()))
2119         d.addCallback(self.check_is_healthy, self.root, "root")
2120         d.addCallback(lambda ign: self.mutable.check(Monitor()))
2121         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2122         d.addCallback(lambda ign: self.large.check(Monitor()))
2123         d.addCallback(self.check_is_healthy, self.large, "large")
2124         d.addCallback(lambda ign: self.small.check(Monitor()))
2125         d.addCallback(self.failUnlessEqual, None, "small")
2126         d.addCallback(lambda ign: self.small2.check(Monitor()))
2127         d.addCallback(self.failUnlessEqual, None, "small2")
2128
2129         # and again with verify=True
2130         d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2131         d.addCallback(self.check_is_healthy, self.root, "root")
2132         d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2133         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2134         d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2135         d.addCallback(self.check_is_healthy, self.large, "large",
2136                       incomplete=True)
2137         d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2138         d.addCallback(self.failUnlessEqual, None, "small")
2139         d.addCallback(lambda ign: self.small2.check(Monitor(), verify=True))
2140         d.addCallback(self.failUnlessEqual, None, "small2")
2141
2142         # and check_and_repair(), which should be a nop
2143         d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2144         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2145         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2146         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2147         d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2148         d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2149         d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2150         d.addCallback(self.failUnlessEqual, None, "small")
2151         d.addCallback(lambda ign: self.small2.check_and_repair(Monitor()))
2152         d.addCallback(self.failUnlessEqual, None, "small2")
2153
2154         # check_and_repair(verify=True)
2155         d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2156         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2157         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2158         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2159         d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2160         d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2161                       incomplete=True)
2162         d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2163         d.addCallback(self.failUnlessEqual, None, "small")
2164         d.addCallback(lambda ign: self.small2.check_and_repair(Monitor(), verify=True))
2165         d.addCallback(self.failUnlessEqual, None, "small2")
2166
2167
2168         # now deep-check the root, with various verify= and repair= options
2169         d.addCallback(lambda ign:
2170                       self.root.start_deep_check().when_done())
2171         d.addCallback(self.deep_check_is_healthy, 3, "root")
2172         d.addCallback(lambda ign:
2173                       self.root.start_deep_check(verify=True).when_done())
2174         d.addCallback(self.deep_check_is_healthy, 3, "root")
2175         d.addCallback(lambda ign:
2176                       self.root.start_deep_check_and_repair().when_done())
2177         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2178         d.addCallback(lambda ign:
2179                       self.root.start_deep_check_and_repair(verify=True).when_done())
2180         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2181
2182         # and finally, start a deep-check, but then cancel it.
2183         d.addCallback(lambda ign: self.root.start_deep_check())
2184         def _checking(monitor):
2185             monitor.cancel()
2186             d = monitor.when_done()
2187             # this should fire as soon as the next dirnode.list finishes.
2188             # TODO: add a counter to measure how many list() calls are made,
2189             # assert that no more than one gets to run before the cancel()
2190             # takes effect.
2191             def _finished_normally(res):
2192                 self.fail("this was supposed to fail, not finish normally")
2193             def _cancelled(f):
2194                 f.trap(OperationCancelledError)
2195             d.addCallbacks(_finished_normally, _cancelled)
2196             return d
2197         d.addCallback(_checking)
2198
2199         return d
2200
2201     def json_check_is_healthy(self, data, n, where, incomplete=False):
2202
2203         self.failUnlessEqual(data["storage-index"],
2204                              base32.b2a(n.get_storage_index()), where)
2205         r = data["results"]
2206         self.failUnlessEqual(r["healthy"], True, where)
2207         needs_rebalancing = bool( len(self.clients) < 10 )
2208         if not incomplete:
2209             self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2210         self.failUnlessEqual(r["count-shares-good"], 10, where)
2211         self.failUnlessEqual(r["count-shares-needed"], 3, where)
2212         self.failUnlessEqual(r["count-shares-expected"], 10, where)
2213         if not incomplete:
2214             self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2215         self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2216         self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2217         if not incomplete:
2218             self.failUnlessEqual(sorted(r["servers-responding"]),
2219                                  sorted([idlib.nodeid_b2a(c.nodeid)
2220                                          for c in self.clients]), where)
2221             self.failUnless("sharemap" in r, where)
2222             all_serverids = set()
2223             for (shareid, serverids_s) in r["sharemap"].items():
2224                 all_serverids.update(serverids_s)
2225             self.failUnlessEqual(sorted(all_serverids),
2226                                  sorted([idlib.nodeid_b2a(c.nodeid)
2227                                          for c in self.clients]), where)
2228         self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2229         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2230         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2231
2232     def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2233         self.failUnlessEqual(data["storage-index"],
2234                              base32.b2a(n.get_storage_index()), where)
2235         self.failUnlessEqual(data["repair-attempted"], False, where)
2236         self.json_check_is_healthy(data["pre-repair-results"],
2237                                    n, where, incomplete)
2238         self.json_check_is_healthy(data["post-repair-results"],
2239                                    n, where, incomplete)
2240
2241     def json_full_deepcheck_is_healthy(self, data, n, where):
2242         self.failUnlessEqual(data["root-storage-index"],
2243                              base32.b2a(n.get_storage_index()), where)
2244         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2245         self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2246         self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2247         self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2248         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2249         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2250         self.json_check_stats_good(data["stats"], where)
2251
2252     def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2253         self.failUnlessEqual(data["root-storage-index"],
2254                              base32.b2a(n.get_storage_index()), where)
2255         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2256
2257         self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2258         self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2259         self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2260
2261         self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2262         self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2263         self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2264
2265         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2266         self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2267         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2268
2269         self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2270         self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2271         self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2272
2273
2274     def json_check_lit(self, data, n, where):
2275         self.failUnlessEqual(data["storage-index"], "", where)
2276         self.failUnlessEqual(data["results"]["healthy"], True, where)
2277
2278     def json_check_stats_good(self, data, where):
2279         self.check_stats_good(data)
2280
2281     def do_test_web_good(self, ignored):
2282         d = defer.succeed(None)
2283
2284         # stats
2285         d.addCallback(lambda ign:
2286                       self.slow_web(self.root,
2287                                     t="start-deep-stats", output="json"))
2288         d.addCallback(self.json_check_stats_good, "deep-stats")
2289
2290         # check, no verify
2291         d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2292         d.addCallback(self.json_check_is_healthy, self.root, "root")
2293         d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2294         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2295         d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2296         d.addCallback(self.json_check_is_healthy, self.large, "large")
2297         d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2298         d.addCallback(self.json_check_lit, self.small, "small")
2299         d.addCallback(lambda ign: self.web_json(self.small2, t="check"))
2300         d.addCallback(self.json_check_lit, self.small2, "small2")
2301
2302         # check and verify
2303         d.addCallback(lambda ign:
2304                       self.web_json(self.root, t="check", verify="true"))
2305         d.addCallback(self.json_check_is_healthy, self.root, "root")
2306         d.addCallback(lambda ign:
2307                       self.web_json(self.mutable, t="check", verify="true"))
2308         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2309         d.addCallback(lambda ign:
2310                       self.web_json(self.large, t="check", verify="true"))
2311         d.addCallback(self.json_check_is_healthy, self.large, "large", incomplete=True)
2312         d.addCallback(lambda ign:
2313                       self.web_json(self.small, t="check", verify="true"))
2314         d.addCallback(self.json_check_lit, self.small, "small")
2315         d.addCallback(lambda ign:
2316                       self.web_json(self.small2, t="check", verify="true"))
2317         d.addCallback(self.json_check_lit, self.small2, "small2")
2318
2319         # check and repair, no verify
2320         d.addCallback(lambda ign:
2321                       self.web_json(self.root, t="check", repair="true"))
2322         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2323         d.addCallback(lambda ign:
2324                       self.web_json(self.mutable, t="check", repair="true"))
2325         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2326         d.addCallback(lambda ign:
2327                       self.web_json(self.large, t="check", repair="true"))
2328         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large")
2329         d.addCallback(lambda ign:
2330                       self.web_json(self.small, t="check", repair="true"))
2331         d.addCallback(self.json_check_lit, self.small, "small")
2332         d.addCallback(lambda ign:
2333                       self.web_json(self.small2, t="check", repair="true"))
2334         d.addCallback(self.json_check_lit, self.small2, "small2")
2335
2336         # check+verify+repair
2337         d.addCallback(lambda ign:
2338                       self.web_json(self.root, t="check", repair="true", verify="true"))
2339         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2340         d.addCallback(lambda ign:
2341                       self.web_json(self.mutable, t="check", repair="true", verify="true"))
2342         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2343         d.addCallback(lambda ign:
2344                       self.web_json(self.large, t="check", repair="true", verify="true"))
2345         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large", incomplete=True)
2346         d.addCallback(lambda ign:
2347                       self.web_json(self.small, t="check", repair="true", verify="true"))
2348         d.addCallback(self.json_check_lit, self.small, "small")
2349         d.addCallback(lambda ign:
2350                       self.web_json(self.small2, t="check", repair="true", verify="true"))
2351         d.addCallback(self.json_check_lit, self.small2, "small2")
2352
2353         # now run a deep-check, with various verify= and repair= flags
2354         d.addCallback(lambda ign:
2355                       self.slow_web(self.root, t="start-deep-check", output="json"))
2356         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2357         d.addCallback(lambda ign:
2358                       self.slow_web(self.root, t="start-deep-check", verify="true",
2359                                     output="json"))
2360         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2361         d.addCallback(lambda ign:
2362                       self.slow_web(self.root, t="start-deep-check", repair="true",
2363                                     output="json"))
2364         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2365         d.addCallback(lambda ign:
2366                       self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2367         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2368
2369         # now look at t=info
2370         d.addCallback(lambda ign: self.web(self.root, t="info"))
2371         # TODO: examine the output
2372         d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2373         d.addCallback(lambda ign: self.web(self.large, t="info"))
2374         d.addCallback(lambda ign: self.web(self.small, t="info"))
2375         d.addCallback(lambda ign: self.web(self.small2, t="info"))
2376
2377         return d
2378
2379     def _run_cli(self, argv, stdin=""):
2380         #print "CLI:", argv
2381         stdout, stderr = StringIO(), StringIO()
2382         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
2383                                   stdin=StringIO(stdin),
2384                                   stdout=stdout, stderr=stderr)
2385         def _done(res):
2386             return stdout.getvalue(), stderr.getvalue()
2387         d.addCallback(_done)
2388         return d
2389
2390     def do_test_cli_good(self, ignored):
2391         basedir = self.getdir("client0")
2392         d = self._run_cli(["manifest",
2393                            "--node-directory", basedir,
2394                            self.root_uri])
2395         def _check((out,err)):
2396             lines = [l for l in out.split("\n") if l]
2397             self.failUnlessEqual(len(lines), 5)
2398             caps = {}
2399             for l in lines:
2400                 try:
2401                     cap, path = l.split(None, 1)
2402                 except ValueError:
2403                     cap = l.strip()
2404                     path = ""
2405                 caps[cap] = path
2406             self.failUnless(self.root.get_uri() in caps)
2407             self.failUnlessEqual(caps[self.root.get_uri()], "")
2408             self.failUnlessEqual(caps[self.mutable.get_uri()], "mutable")
2409             self.failUnlessEqual(caps[self.large.get_uri()], "large")
2410             self.failUnlessEqual(caps[self.small.get_uri()], "small")
2411             self.failUnlessEqual(caps[self.small2.get_uri()], "small2")
2412         d.addCallback(_check)
2413
2414         d.addCallback(lambda res:
2415                       self._run_cli(["manifest",
2416                                      "--node-directory", basedir,
2417                                      "--storage-index", self.root_uri]))
2418         def _check2((out,err)):
2419             lines = [l for l in out.split("\n") if l]
2420             self.failUnlessEqual(len(lines), 3)
2421             self.failUnless(base32.b2a(self.root.get_storage_index()) in lines)
2422             self.failUnless(base32.b2a(self.mutable.get_storage_index()) in lines)
2423             self.failUnless(base32.b2a(self.large.get_storage_index()) in lines)
2424         d.addCallback(_check2)
2425
2426         d.addCallback(lambda res:
2427                       self._run_cli(["stats",
2428                                      "--node-directory", basedir,
2429                                      self.root_uri]))
2430         def _check3((out,err)):
2431             lines = [l.strip() for l in out.split("\n") if l]
2432             self.failUnless("count-immutable-files: 1" in lines)
2433             self.failUnless("count-mutable-files: 1" in lines)
2434             self.failUnless("count-literal-files: 2" in lines)
2435             self.failUnless("count-files: 4" in lines)
2436             self.failUnless("count-directories: 1" in lines)
2437             self.failUnless("size-immutable-files: 13000" in lines)
2438             self.failUnless("size-literal-files: 48" in lines)
2439             self.failUnless("   11-31    : 2".strip() in lines)
2440             self.failUnless("10001-31622 : 1".strip() in lines)
2441         d.addCallback(_check3)
2442
2443         d.addCallback(lambda res:
2444                       self._run_cli(["stats",
2445                                      "--node-directory", basedir,
2446                                      "--verbose",
2447                                      self.root_uri]))
2448         def _check4((out,err)):
2449             data = simplejson.loads(out)
2450             self.failUnlessEqual(data["count-immutable-files"], 1)
2451             self.failUnlessEqual(data["count-immutable-files"], 1)
2452             self.failUnlessEqual(data["count-mutable-files"], 1)
2453             self.failUnlessEqual(data["count-literal-files"], 2)
2454             self.failUnlessEqual(data["count-files"], 4)
2455             self.failUnlessEqual(data["count-directories"], 1)
2456             self.failUnlessEqual(data["size-immutable-files"], 13000)
2457             self.failUnlessEqual(data["size-literal-files"], 48)
2458             self.failUnless([11,31,2] in data["size-files-histogram"])
2459             self.failUnless([10001,31622,1] in data["size-files-histogram"])
2460         d.addCallback(_check4)
2461
2462         return d
2463
2464
2465 class DeepCheckWebBad(DeepCheckBase, unittest.TestCase):
2466
2467     def test_bad(self):
2468         self.basedir = self.mktemp()
2469         d = self.set_up_nodes()
2470         d.addCallback(self.set_up_damaged_tree)
2471         d.addCallback(self.do_test_check_bad)
2472         d.addCallback(self.do_test_deepcheck_bad)
2473         d.addCallback(self.do_test_web_bad)
2474         d.addErrback(self.explain_web_error)
2475         d.addErrback(self.explain_error)
2476         return d
2477
2478
2479
2480     def set_up_damaged_tree(self, ignored):
2481         # 6.4s
2482
2483         # root
2484         #   mutable-good
2485         #   mutable-missing-shares
2486         #   mutable-corrupt-shares
2487         #   mutable-unrecoverable
2488         #   large-good
2489         #   large-missing-shares
2490         #   large-corrupt-shares
2491         #   large-unrecoverable
2492
2493         self.nodes = {}
2494
2495         c0 = self.clients[0]
2496         d = c0.create_empty_dirnode()
2497         def _created_root(n):
2498             self.root = n
2499             self.root_uri = n.get_uri()
2500         d.addCallback(_created_root)
2501         d.addCallback(self.create_mangled, "mutable-good")
2502         d.addCallback(self.create_mangled, "mutable-missing-shares")
2503         d.addCallback(self.create_mangled, "mutable-corrupt-shares")
2504         d.addCallback(self.create_mangled, "mutable-unrecoverable")
2505         d.addCallback(self.create_mangled, "large-good")
2506         d.addCallback(self.create_mangled, "large-missing-shares")
2507         d.addCallback(self.create_mangled, "large-corrupt-shares")
2508         d.addCallback(self.create_mangled, "large-unrecoverable")
2509
2510         return d
2511
2512
2513     def create_mangled(self, ignored, name):
2514         nodetype, mangletype = name.split("-", 1)
2515         if nodetype == "mutable":
2516             d = self.clients[0].create_mutable_file("mutable file contents")
2517             d.addCallback(lambda n: self.root.set_node(unicode(name), n))
2518         elif nodetype == "large":
2519             large = upload.Data("Lots of data\n" * 1000 + name + "\n", None)
2520             d = self.root.add_file(unicode(name), large)
2521         elif nodetype == "small":
2522             small = upload.Data("Small enough for a LIT", None)
2523             d = self.root.add_file(unicode(name), small)
2524
2525         def _stash_node(node):
2526             self.nodes[name] = node
2527             return node
2528         d.addCallback(_stash_node)
2529
2530         if mangletype == "good":
2531             pass
2532         elif mangletype == "missing-shares":
2533             d.addCallback(self._delete_some_shares)
2534         elif mangletype == "corrupt-shares":
2535             d.addCallback(self._corrupt_some_shares)
2536         else:
2537             assert mangletype == "unrecoverable"
2538             d.addCallback(self._delete_most_shares)
2539
2540         return d
2541
2542     def _run_cli(self, argv):
2543         stdout, stderr = StringIO(), StringIO()
2544         # this can only do synchronous operations
2545         assert argv[0] == "debug"
2546         runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
2547         return stdout.getvalue()
2548
2549     def _find_shares(self, node):
2550         si = node.get_storage_index()
2551         out = self._run_cli(["debug", "find-shares", base32.b2a(si)] +
2552                             [c.basedir for c in self.clients])
2553         files = out.split("\n")
2554         return [f for f in files if f]
2555
2556     def _delete_some_shares(self, node):
2557         shares = self._find_shares(node)
2558         os.unlink(shares[0])
2559         os.unlink(shares[1])
2560
2561     def _corrupt_some_shares(self, node):
2562         shares = self._find_shares(node)
2563         self._run_cli(["debug", "corrupt-share", shares[0]])
2564         self._run_cli(["debug", "corrupt-share", shares[1]])
2565
2566     def _delete_most_shares(self, node):
2567         shares = self._find_shares(node)
2568         for share in shares[1:]:
2569             os.unlink(share)
2570
2571
2572     def check_is_healthy(self, cr, where):
2573         self.failUnless(ICheckerResults.providedBy(cr), where)
2574         self.failUnless(cr.is_healthy(), where)
2575         self.failUnless(cr.is_recoverable(), where)
2576         d = cr.get_data()
2577         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2578         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2579         return cr
2580
2581     def check_is_missing_shares(self, cr, where):
2582         self.failUnless(ICheckerResults.providedBy(cr), where)
2583         self.failIf(cr.is_healthy(), where)
2584         self.failUnless(cr.is_recoverable(), where)
2585         d = cr.get_data()
2586         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2587         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2588         return cr
2589
2590     def check_has_corrupt_shares(self, cr, where):
2591         # by "corrupt-shares" we mean the file is still recoverable
2592         self.failUnless(ICheckerResults.providedBy(cr), where)
2593         d = cr.get_data()
2594         self.failIf(cr.is_healthy(), where)
2595         self.failUnless(cr.is_recoverable(), where)
2596         d = cr.get_data()
2597         self.failUnless(d["count-shares-good"] < 10, where)
2598         self.failUnless(d["count-corrupt-shares"], where)
2599         self.failUnless(d["list-corrupt-shares"], where)
2600         return cr
2601
2602     def check_is_unrecoverable(self, cr, where):
2603         self.failUnless(ICheckerResults.providedBy(cr), where)
2604         d = cr.get_data()
2605         self.failIf(cr.is_healthy(), where)
2606         self.failIf(cr.is_recoverable(), where)
2607         self.failUnless(d["count-shares-good"] < d["count-shares-needed"],
2608                         where)
2609         self.failUnlessEqual(d["count-recoverable-versions"], 0, where)
2610         self.failUnlessEqual(d["count-unrecoverable-versions"], 1, where)
2611         return cr
2612
2613     def do_test_check_bad(self, ignored):
2614         d = defer.succeed(None)
2615
2616         # check the individual items, without verification. This will not
2617         # detect corrupt shares.
2618         def _check(which, checker):
2619             d = self.nodes[which].check(Monitor())
2620             d.addCallback(checker, which + "--check")
2621             return d
2622
2623         d.addCallback(lambda ign: _check("mutable-good", self.check_is_healthy))
2624         d.addCallback(lambda ign: _check("mutable-missing-shares",
2625                                          self.check_is_missing_shares))
2626         d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2627                                          self.check_is_healthy))
2628         d.addCallback(lambda ign: _check("mutable-unrecoverable",
2629                                          self.check_is_unrecoverable))
2630         d.addCallback(lambda ign: _check("large-good", self.check_is_healthy))
2631         d.addCallback(lambda ign: _check("large-missing-shares",
2632                                          self.check_is_missing_shares))
2633         d.addCallback(lambda ign: _check("large-corrupt-shares",
2634                                          self.check_is_healthy))
2635         d.addCallback(lambda ign: _check("large-unrecoverable",
2636                                          self.check_is_unrecoverable))
2637
2638         # and again with verify=True, which *does* detect corrupt shares.
2639         def _checkv(which, checker):
2640             d = self.nodes[which].check(Monitor(), verify=True)
2641             d.addCallback(checker, which + "--check-and-verify")
2642             return d
2643
2644         d.addCallback(lambda ign: _checkv("mutable-good", self.check_is_healthy))
2645         d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2646                                          self.check_is_missing_shares))
2647         d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2648                                          self.check_has_corrupt_shares))
2649         d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2650                                          self.check_is_unrecoverable))
2651         d.addCallback(lambda ign: _checkv("large-good", self.check_is_healthy))
2652         # disabled pending immutable verifier
2653         #d.addCallback(lambda ign: _checkv("large-missing-shares",
2654         #                                 self.check_is_missing_shares))
2655         #d.addCallback(lambda ign: _checkv("large-corrupt-shares",
2656         #                                 self.check_has_corrupt_shares))
2657         d.addCallback(lambda ign: _checkv("large-unrecoverable",
2658                                          self.check_is_unrecoverable))
2659
2660         return d
2661
2662     def do_test_deepcheck_bad(self, ignored):
2663         d = defer.succeed(None)
2664
2665         # now deep-check the root, with various verify= and repair= options
2666         d.addCallback(lambda ign:
2667                       self.root.start_deep_check().when_done())
2668         def _check1(cr):
2669             self.failUnless(IDeepCheckResults.providedBy(cr))
2670             c = cr.get_counters()
2671             self.failUnlessEqual(c["count-objects-checked"], 9)
2672             self.failUnlessEqual(c["count-objects-healthy"], 5)
2673             self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2674             self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2675         d.addCallback(_check1)
2676
2677         d.addCallback(lambda ign:
2678                       self.root.start_deep_check(verify=True).when_done())
2679         def _check2(cr):
2680             self.failUnless(IDeepCheckResults.providedBy(cr))
2681             c = cr.get_counters()
2682             self.failUnlessEqual(c["count-objects-checked"], 9)
2683             # until we have a real immutable verifier, these counts will be
2684             # off
2685             #self.failUnlessEqual(c["count-objects-healthy"], 3)
2686             #self.failUnlessEqual(c["count-objects-unhealthy"], 6)
2687             self.failUnlessEqual(c["count-objects-healthy"], 5) # todo
2688             self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2689             self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2690         d.addCallback(_check2)
2691
2692         return d
2693
2694     def json_is_healthy(self, data, where):
2695         r = data["results"]
2696         self.failUnless(r["healthy"], where)
2697         self.failUnless(r["recoverable"], where)
2698         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2699         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2700
2701     def json_is_missing_shares(self, data, where):
2702         r = data["results"]
2703         self.failIf(r["healthy"], where)
2704         self.failUnless(r["recoverable"], where)
2705         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2706         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2707
2708     def json_has_corrupt_shares(self, data, where):
2709         # by "corrupt-shares" we mean the file is still recoverable
2710         r = data["results"]
2711         self.failIf(r["healthy"], where)
2712         self.failUnless(r["recoverable"], where)
2713         self.failUnless(r["count-shares-good"] < 10, where)
2714         self.failUnless(r["count-corrupt-shares"], where)
2715         self.failUnless(r["list-corrupt-shares"], where)
2716
2717     def json_is_unrecoverable(self, data, where):
2718         r = data["results"]
2719         self.failIf(r["healthy"], where)
2720         self.failIf(r["recoverable"], where)
2721         self.failUnless(r["count-shares-good"] < r["count-shares-needed"],
2722                         where)
2723         self.failUnlessEqual(r["count-recoverable-versions"], 0, where)
2724         self.failUnlessEqual(r["count-unrecoverable-versions"], 1, where)
2725
2726     def do_test_web_bad(self, ignored):
2727         d = defer.succeed(None)
2728
2729         # check, no verify
2730         def _check(which, checker):
2731             d = self.web_json(self.nodes[which], t="check")
2732             d.addCallback(checker, which + "--webcheck")
2733             return d
2734
2735         d.addCallback(lambda ign: _check("mutable-good",
2736                                          self.json_is_healthy))
2737         d.addCallback(lambda ign: _check("mutable-missing-shares",
2738                                          self.json_is_missing_shares))
2739         d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2740                                          self.json_is_healthy))
2741         d.addCallback(lambda ign: _check("mutable-unrecoverable",
2742                                          self.json_is_unrecoverable))
2743         d.addCallback(lambda ign: _check("large-good",
2744                                          self.json_is_healthy))
2745         d.addCallback(lambda ign: _check("large-missing-shares",
2746                                          self.json_is_missing_shares))
2747         d.addCallback(lambda ign: _check("large-corrupt-shares",
2748                                          self.json_is_healthy))
2749         d.addCallback(lambda ign: _check("large-unrecoverable",
2750                                          self.json_is_unrecoverable))
2751
2752         # check and verify
2753         def _checkv(which, checker):
2754             d = self.web_json(self.nodes[which], t="check", verify="true")
2755             d.addCallback(checker, which + "--webcheck-and-verify")
2756             return d
2757
2758         d.addCallback(lambda ign: _checkv("mutable-good",
2759                                           self.json_is_healthy))
2760         d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2761                                          self.json_is_missing_shares))
2762         d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2763                                          self.json_has_corrupt_shares))
2764         d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2765                                          self.json_is_unrecoverable))
2766         d.addCallback(lambda ign: _checkv("large-good",
2767                                           self.json_is_healthy))
2768         # disabled pending immutable verifier
2769         #d.addCallback(lambda ign: _checkv("large-missing-shares",
2770         #                                 self.json_is_missing_shares))
2771         #d.addCallback(lambda ign: _checkv("large-corrupt-shares",
2772         #                                 self.json_has_corrupt_shares))
2773         d.addCallback(lambda ign: _checkv("large-unrecoverable",
2774                                          self.json_is_unrecoverable))
2775
2776         return d