]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
5cbf36c3189b3e28313d598460f5d22861c9f9d5
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri, storage, offloaded
12 from allmydata.immutable import download, upload, filenode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17      ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
18      IDeepCheckAndRepairResults, NoSuchChildError, NotEnoughSharesError
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
26
27 from allmydata.test.common import SystemTestMixin, ErrorMixin, \
28      MemoryConsumer, download_to_data
29
30 LARGE_DATA = """
31 This is some data to publish to the virtual drive, which needs to be large
32 enough to not fit inside a LIT uri.
33 """
34
35 class CountingDataUploadable(upload.Data):
36     bytes_read = 0
37     interrupt_after = None
38     interrupt_after_d = None
39
40     def read(self, length):
41         self.bytes_read += length
42         if self.interrupt_after is not None:
43             if self.bytes_read > self.interrupt_after:
44                 self.interrupt_after = None
45                 self.interrupt_after_d.callback(self)
46         return upload.Data.read(self, length)
47
48 class GrabEverythingConsumer:
49     implements(IConsumer)
50
51     def __init__(self):
52         self.contents = ""
53
54     def registerProducer(self, producer, streaming):
55         assert streaming
56         assert IPushProducer.providedBy(producer)
57
58     def write(self, data):
59         self.contents += data
60
61     def unregisterProducer(self):
62         pass
63
64 class SystemTest(SystemTestMixin, unittest.TestCase):
65
66     def test_connections(self):
67         self.basedir = "system/SystemTest/test_connections"
68         d = self.set_up_nodes()
69         self.extra_node = None
70         d.addCallback(lambda res: self.add_extra_node(self.numclients))
71         def _check(extra_node):
72             self.extra_node = extra_node
73             for c in self.clients:
74                 all_peerids = list(c.get_all_peerids())
75                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
76                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
77                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
78
79         d.addCallback(_check)
80         def _shutdown_extra_node(res):
81             if self.extra_node:
82                 return self.extra_node.stopService()
83             return res
84         d.addBoth(_shutdown_extra_node)
85         return d
86     test_connections.timeout = 300
87     # test_connections is subsumed by test_upload_and_download, and takes
88     # quite a while to run on a slow machine (because of all the TLS
89     # connections that must be established). If we ever rework the introducer
90     # code to such an extent that we're not sure if it works anymore, we can
91     # reinstate this test until it does.
92     del test_connections
93
94     def test_upload_and_download_random_key(self):
95         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
96         return self._test_upload_and_download(convergence=None)
97     test_upload_and_download_random_key.timeout = 4800
98
99     def test_upload_and_download_convergent(self):
100         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
101         return self._test_upload_and_download(convergence="some convergence string")
102     test_upload_and_download_convergent.timeout = 4800
103
104     def _test_upload_and_download(self, convergence):
105         # we use 4000 bytes of data, which will result in about 400k written
106         # to disk among all our simulated nodes
107         DATA = "Some data to upload\n" * 200
108         d = self.set_up_nodes()
109         def _check_connections(res):
110             for c in self.clients:
111                 all_peerids = list(c.get_all_peerids())
112                 self.failUnlessEqual(len(all_peerids), self.numclients)
113                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
114                 self.failUnlessEqual(len(permuted_peers), self.numclients)
115         d.addCallback(_check_connections)
116
117         def _do_upload(res):
118             log.msg("UPLOADING")
119             u = self.clients[0].getServiceNamed("uploader")
120             self.uploader = u
121             # we crank the max segsize down to 1024b for the duration of this
122             # test, so we can exercise multiple segments. It is important
123             # that this is not a multiple of the segment size, so that the
124             # tail segment is not the same length as the others. This actualy
125             # gets rounded up to 1025 to be a multiple of the number of
126             # required shares (since we use 25 out of 100 FEC).
127             up = upload.Data(DATA, convergence=convergence)
128             up.max_segment_size = 1024
129             d1 = u.upload(up)
130             return d1
131         d.addCallback(_do_upload)
132         def _upload_done(results):
133             uri = results.uri
134             log.msg("upload finished: uri is %s" % (uri,))
135             self.uri = uri
136             dl = self.clients[1].getServiceNamed("downloader")
137             self.downloader = dl
138         d.addCallback(_upload_done)
139
140         def _upload_again(res):
141             # Upload again. If using convergent encryption then this ought to be
142             # short-circuited, however with the way we currently generate URIs
143             # (i.e. because they include the roothash), we have to do all of the
144             # encoding work, and only get to save on the upload part.
145             log.msg("UPLOADING AGAIN")
146             up = upload.Data(DATA, convergence=convergence)
147             up.max_segment_size = 1024
148             d1 = self.uploader.upload(up)
149         d.addCallback(_upload_again)
150
151         def _download_to_data(res):
152             log.msg("DOWNLOADING")
153             return self.downloader.download_to_data(self.uri)
154         d.addCallback(_download_to_data)
155         def _download_to_data_done(data):
156             log.msg("download finished")
157             self.failUnlessEqual(data, DATA)
158         d.addCallback(_download_to_data_done)
159
160         target_filename = os.path.join(self.basedir, "download.target")
161         def _download_to_filename(res):
162             return self.downloader.download_to_filename(self.uri,
163                                                         target_filename)
164         d.addCallback(_download_to_filename)
165         def _download_to_filename_done(res):
166             newdata = open(target_filename, "rb").read()
167             self.failUnlessEqual(newdata, DATA)
168         d.addCallback(_download_to_filename_done)
169
170         target_filename2 = os.path.join(self.basedir, "download.target2")
171         def _download_to_filehandle(res):
172             fh = open(target_filename2, "wb")
173             return self.downloader.download_to_filehandle(self.uri, fh)
174         d.addCallback(_download_to_filehandle)
175         def _download_to_filehandle_done(fh):
176             fh.close()
177             newdata = open(target_filename2, "rb").read()
178             self.failUnlessEqual(newdata, DATA)
179         d.addCallback(_download_to_filehandle_done)
180
181         consumer = GrabEverythingConsumer()
182         ct = download.ConsumerAdapter(consumer)
183         d.addCallback(lambda res:
184                       self.downloader.download(self.uri, ct))
185         def _download_to_consumer_done(ign):
186             self.failUnlessEqual(consumer.contents, DATA)
187         d.addCallback(_download_to_consumer_done)
188
189         def _test_read(res):
190             n = self.clients[1].create_node_from_uri(self.uri)
191             d = download_to_data(n)
192             def _read_done(data):
193                 self.failUnlessEqual(data, DATA)
194             d.addCallback(_read_done)
195             d.addCallback(lambda ign:
196                           n.read(MemoryConsumer(), offset=1, size=4))
197             def _read_portion_done(mc):
198                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
199             d.addCallback(_read_portion_done)
200             d.addCallback(lambda ign:
201                           n.read(MemoryConsumer(), offset=2, size=None))
202             def _read_tail_done(mc):
203                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
204             d.addCallback(_read_tail_done)
205             d.addCallback(lambda ign:
206                           n.read(MemoryConsumer(), size=len(DATA)+1000))
207             def _read_too_much(mc):
208                 self.failUnlessEqual("".join(mc.chunks), DATA)
209             d.addCallback(_read_too_much)
210
211             return d
212         d.addCallback(_test_read)
213
214         def _test_bad_read(res):
215             bad_u = uri.from_string_filenode(self.uri)
216             bad_u.key = self.flip_bit(bad_u.key)
217             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
218             # this should cause an error during download
219
220             d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
221                                  None,
222                                  bad_n.read, MemoryConsumer(), offset=2)
223             return d
224         d.addCallback(_test_bad_read)
225
226         def _download_nonexistent_uri(res):
227             baduri = self.mangle_uri(self.uri)
228             log.msg("about to download non-existent URI", level=log.UNUSUAL,
229                     facility="tahoe.tests")
230             d1 = self.downloader.download_to_data(baduri)
231             def _baduri_should_fail(res):
232                 log.msg("finished downloading non-existend URI",
233                         level=log.UNUSUAL, facility="tahoe.tests")
234                 self.failUnless(isinstance(res, Failure))
235                 self.failUnless(res.check(NotEnoughSharesError),
236                                 "expected NotEnoughSharesError, got %s" % res)
237                 # TODO: files that have zero peers should get a special kind
238                 # of NotEnoughSharesError, which can be used to suggest that
239                 # the URI might be wrong or that they've never uploaded the
240                 # file in the first place.
241             d1.addBoth(_baduri_should_fail)
242             return d1
243         d.addCallback(_download_nonexistent_uri)
244
245         # add a new node, which doesn't accept shares, and only uses the
246         # helper for upload.
247         d.addCallback(lambda res: self.add_extra_node(self.numclients,
248                                                       self.helper_furl,
249                                                       add_to_sparent=True))
250         def _added(extra_node):
251             self.extra_node = extra_node
252             extra_node.getServiceNamed("storage").sizelimit = 0
253         d.addCallback(_added)
254
255         HELPER_DATA = "Data that needs help to upload" * 1000
256         def _upload_with_helper(res):
257             u = upload.Data(HELPER_DATA, convergence=convergence)
258             d = self.extra_node.upload(u)
259             def _uploaded(results):
260                 uri = results.uri
261                 return self.downloader.download_to_data(uri)
262             d.addCallback(_uploaded)
263             def _check(newdata):
264                 self.failUnlessEqual(newdata, HELPER_DATA)
265             d.addCallback(_check)
266             return d
267         d.addCallback(_upload_with_helper)
268
269         def _upload_duplicate_with_helper(res):
270             u = upload.Data(HELPER_DATA, convergence=convergence)
271             u.debug_stash_RemoteEncryptedUploadable = True
272             d = self.extra_node.upload(u)
273             def _uploaded(results):
274                 uri = results.uri
275                 return self.downloader.download_to_data(uri)
276             d.addCallback(_uploaded)
277             def _check(newdata):
278                 self.failUnlessEqual(newdata, HELPER_DATA)
279                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
280                             "uploadable started uploading, should have been avoided")
281             d.addCallback(_check)
282             return d
283         if convergence is not None:
284             d.addCallback(_upload_duplicate_with_helper)
285
286         def _upload_resumable(res):
287             DATA = "Data that needs help to upload and gets interrupted" * 1000
288             u1 = CountingDataUploadable(DATA, convergence=convergence)
289             u2 = CountingDataUploadable(DATA, convergence=convergence)
290
291             # we interrupt the connection after about 5kB by shutting down
292             # the helper, then restartingit.
293             u1.interrupt_after = 5000
294             u1.interrupt_after_d = defer.Deferred()
295             u1.interrupt_after_d.addCallback(lambda res:
296                                              self.bounce_client(0))
297
298             # sneak into the helper and reduce its chunk size, so that our
299             # debug_interrupt will sever the connection on about the fifth
300             # chunk fetched. This makes sure that we've started to write the
301             # new shares before we abandon them, which exercises the
302             # abort/delete-partial-share code. TODO: find a cleaner way to do
303             # this. I know that this will affect later uses of the helper in
304             # this same test run, but I'm not currently worried about it.
305             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
306
307             d = self.extra_node.upload(u1)
308
309             def _should_not_finish(res):
310                 self.fail("interrupted upload should have failed, not finished"
311                           " with result %s" % (res,))
312             def _interrupted(f):
313                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
314
315                 # make sure we actually interrupted it before finishing the
316                 # file
317                 self.failUnless(u1.bytes_read < len(DATA),
318                                 "read %d out of %d total" % (u1.bytes_read,
319                                                              len(DATA)))
320
321                 log.msg("waiting for reconnect", level=log.NOISY,
322                         facility="tahoe.test.test_system")
323                 # now, we need to give the nodes a chance to notice that this
324                 # connection has gone away. When this happens, the storage
325                 # servers will be told to abort their uploads, removing the
326                 # partial shares. Unfortunately this involves TCP messages
327                 # going through the loopback interface, and we can't easily
328                 # predict how long that will take. If it were all local, we
329                 # could use fireEventually() to stall. Since we don't have
330                 # the right introduction hooks, the best we can do is use a
331                 # fixed delay. TODO: this is fragile.
332                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
333                 return u1.interrupt_after_d
334             d.addCallbacks(_should_not_finish, _interrupted)
335
336             def _disconnected(res):
337                 # check to make sure the storage servers aren't still hanging
338                 # on to the partial share: their incoming/ directories should
339                 # now be empty.
340                 log.msg("disconnected", level=log.NOISY,
341                         facility="tahoe.test.test_system")
342                 for i in range(self.numclients):
343                     incdir = os.path.join(self.getdir("client%d" % i),
344                                           "storage", "shares", "incoming")
345                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
346             d.addCallback(_disconnected)
347
348             # then we need to give the reconnector a chance to
349             # reestablish the connection to the helper.
350             d.addCallback(lambda res:
351                           log.msg("wait_for_connections", level=log.NOISY,
352                                   facility="tahoe.test.test_system"))
353             d.addCallback(lambda res: self.wait_for_connections())
354
355
356             d.addCallback(lambda res:
357                           log.msg("uploading again", level=log.NOISY,
358                                   facility="tahoe.test.test_system"))
359             d.addCallback(lambda res: self.extra_node.upload(u2))
360
361             def _uploaded(results):
362                 uri = results.uri
363                 log.msg("Second upload complete", level=log.NOISY,
364                         facility="tahoe.test.test_system")
365
366                 # this is really bytes received rather than sent, but it's
367                 # convenient and basically measures the same thing
368                 bytes_sent = results.ciphertext_fetched
369
370                 # We currently don't support resumption of upload if the data is
371                 # encrypted with a random key.  (Because that would require us
372                 # to store the key locally and re-use it on the next upload of
373                 # this file, which isn't a bad thing to do, but we currently
374                 # don't do it.)
375                 if convergence is not None:
376                     # Make sure we did not have to read the whole file the
377                     # second time around .
378                     self.failUnless(bytes_sent < len(DATA),
379                                 "resumption didn't save us any work:"
380                                 " read %d bytes out of %d total" %
381                                 (bytes_sent, len(DATA)))
382                 else:
383                     # Make sure we did have to read the whole file the second
384                     # time around -- because the one that we partially uploaded
385                     # earlier was encrypted with a different random key.
386                     self.failIf(bytes_sent < len(DATA),
387                                 "resumption saved us some work even though we were using random keys:"
388                                 " read %d bytes out of %d total" %
389                                 (bytes_sent, len(DATA)))
390                 return self.downloader.download_to_data(uri)
391             d.addCallback(_uploaded)
392
393             def _check(newdata):
394                 self.failUnlessEqual(newdata, DATA)
395                 # If using convergent encryption, then also check that the
396                 # helper has removed the temp file from its directories.
397                 if convergence is not None:
398                     basedir = os.path.join(self.getdir("client0"), "helper")
399                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
400                     self.failUnlessEqual(files, [])
401                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
402                     self.failUnlessEqual(files, [])
403             d.addCallback(_check)
404             return d
405         d.addCallback(_upload_resumable)
406
407         return d
408
409     def _find_shares(self, basedir):
410         shares = []
411         for (dirpath, dirnames, filenames) in os.walk(basedir):
412             if "storage" not in dirpath:
413                 continue
414             if not filenames:
415                 continue
416             pieces = dirpath.split(os.sep)
417             if pieces[-4] == "storage" and pieces[-3] == "shares":
418                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
419                 # are sharefiles here
420                 assert pieces[-5].startswith("client")
421                 client_num = int(pieces[-5][-1])
422                 storage_index_s = pieces[-1]
423                 storage_index = storage.si_a2b(storage_index_s)
424                 for sharename in filenames:
425                     shnum = int(sharename)
426                     filename = os.path.join(dirpath, sharename)
427                     data = (client_num, storage_index, filename, shnum)
428                     shares.append(data)
429         if not shares:
430             self.fail("unable to find any share files in %s" % basedir)
431         return shares
432
433     def _corrupt_mutable_share(self, filename, which):
434         msf = storage.MutableShareFile(filename)
435         datav = msf.readv([ (0, 1000000) ])
436         final_share = datav[0]
437         assert len(final_share) < 1000000 # ought to be truncated
438         pieces = mutable_layout.unpack_share(final_share)
439         (seqnum, root_hash, IV, k, N, segsize, datalen,
440          verification_key, signature, share_hash_chain, block_hash_tree,
441          share_data, enc_privkey) = pieces
442
443         if which == "seqnum":
444             seqnum = seqnum + 15
445         elif which == "R":
446             root_hash = self.flip_bit(root_hash)
447         elif which == "IV":
448             IV = self.flip_bit(IV)
449         elif which == "segsize":
450             segsize = segsize + 15
451         elif which == "pubkey":
452             verification_key = self.flip_bit(verification_key)
453         elif which == "signature":
454             signature = self.flip_bit(signature)
455         elif which == "share_hash_chain":
456             nodenum = share_hash_chain.keys()[0]
457             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
458         elif which == "block_hash_tree":
459             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
460         elif which == "share_data":
461             share_data = self.flip_bit(share_data)
462         elif which == "encprivkey":
463             enc_privkey = self.flip_bit(enc_privkey)
464
465         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
466                                             segsize, datalen)
467         final_share = mutable_layout.pack_share(prefix,
468                                                 verification_key,
469                                                 signature,
470                                                 share_hash_chain,
471                                                 block_hash_tree,
472                                                 share_data,
473                                                 enc_privkey)
474         msf.writev( [(0, final_share)], None)
475
476
477     def test_mutable(self):
478         self.basedir = "system/SystemTest/test_mutable"
479         DATA = "initial contents go here."  # 25 bytes % 3 != 0
480         NEWDATA = "new contents yay"
481         NEWERDATA = "this is getting old"
482
483         d = self.set_up_nodes(use_key_generator=True)
484
485         def _create_mutable(res):
486             c = self.clients[0]
487             log.msg("starting create_mutable_file")
488             d1 = c.create_mutable_file(DATA)
489             def _done(res):
490                 log.msg("DONE: %s" % (res,))
491                 self._mutable_node_1 = res
492                 uri = res.get_uri()
493             d1.addCallback(_done)
494             return d1
495         d.addCallback(_create_mutable)
496
497         def _test_debug(res):
498             # find a share. It is important to run this while there is only
499             # one slot in the grid.
500             shares = self._find_shares(self.basedir)
501             (client_num, storage_index, filename, shnum) = shares[0]
502             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
503                     % filename)
504             log.msg(" for clients[%d]" % client_num)
505
506             out,err = StringIO(), StringIO()
507             rc = runner.runner(["debug", "dump-share", "--offsets",
508                                 filename],
509                                stdout=out, stderr=err)
510             output = out.getvalue()
511             self.failUnlessEqual(rc, 0)
512             try:
513                 self.failUnless("Mutable slot found:\n" in output)
514                 self.failUnless("share_type: SDMF\n" in output)
515                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
516                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
517                 self.failUnless(" num_extra_leases: 0\n" in output)
518                 # the pubkey size can vary by a byte, so the container might
519                 # be a bit larger on some runs.
520                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
521                 self.failUnless(m)
522                 container_size = int(m.group(1))
523                 self.failUnless(2037 <= container_size <= 2049, container_size)
524                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
525                 self.failUnless(m)
526                 data_length = int(m.group(1))
527                 self.failUnless(2037 <= data_length <= 2049, data_length)
528                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
529                                 in output)
530                 self.failUnless(" SDMF contents:\n" in output)
531                 self.failUnless("  seqnum: 1\n" in output)
532                 self.failUnless("  required_shares: 3\n" in output)
533                 self.failUnless("  total_shares: 10\n" in output)
534                 self.failUnless("  segsize: 27\n" in output, (output, filename))
535                 self.failUnless("  datalen: 25\n" in output)
536                 # the exact share_hash_chain nodes depends upon the sharenum,
537                 # and is more of a hassle to compute than I want to deal with
538                 # now
539                 self.failUnless("  share_hash_chain: " in output)
540                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
541                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
542                             base32.b2a(storage_index))
543                 self.failUnless(expected in output)
544             except unittest.FailTest:
545                 print
546                 print "dump-share output was:"
547                 print output
548                 raise
549         d.addCallback(_test_debug)
550
551         # test retrieval
552
553         # first, let's see if we can use the existing node to retrieve the
554         # contents. This allows it to use the cached pubkey and maybe the
555         # latest-known sharemap.
556
557         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
558         def _check_download_1(res):
559             self.failUnlessEqual(res, DATA)
560             # now we see if we can retrieve the data from a new node,
561             # constructed using the URI of the original one. We do this test
562             # on the same client that uploaded the data.
563             uri = self._mutable_node_1.get_uri()
564             log.msg("starting retrieve1")
565             newnode = self.clients[0].create_node_from_uri(uri)
566             newnode_2 = self.clients[0].create_node_from_uri(uri)
567             self.failUnlessIdentical(newnode, newnode_2)
568             return newnode.download_best_version()
569         d.addCallback(_check_download_1)
570
571         def _check_download_2(res):
572             self.failUnlessEqual(res, DATA)
573             # same thing, but with a different client
574             uri = self._mutable_node_1.get_uri()
575             newnode = self.clients[1].create_node_from_uri(uri)
576             log.msg("starting retrieve2")
577             d1 = newnode.download_best_version()
578             d1.addCallback(lambda res: (res, newnode))
579             return d1
580         d.addCallback(_check_download_2)
581
582         def _check_download_3((res, newnode)):
583             self.failUnlessEqual(res, DATA)
584             # replace the data
585             log.msg("starting replace1")
586             d1 = newnode.overwrite(NEWDATA)
587             d1.addCallback(lambda res: newnode.download_best_version())
588             return d1
589         d.addCallback(_check_download_3)
590
591         def _check_download_4(res):
592             self.failUnlessEqual(res, NEWDATA)
593             # now create an even newer node and replace the data on it. This
594             # new node has never been used for download before.
595             uri = self._mutable_node_1.get_uri()
596             newnode1 = self.clients[2].create_node_from_uri(uri)
597             newnode2 = self.clients[3].create_node_from_uri(uri)
598             self._newnode3 = self.clients[3].create_node_from_uri(uri)
599             log.msg("starting replace2")
600             d1 = newnode1.overwrite(NEWERDATA)
601             d1.addCallback(lambda res: newnode2.download_best_version())
602             return d1
603         d.addCallback(_check_download_4)
604
605         def _check_download_5(res):
606             log.msg("finished replace2")
607             self.failUnlessEqual(res, NEWERDATA)
608         d.addCallback(_check_download_5)
609
610         def _corrupt_shares(res):
611             # run around and flip bits in all but k of the shares, to test
612             # the hash checks
613             shares = self._find_shares(self.basedir)
614             ## sort by share number
615             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
616             where = dict([ (shnum, filename)
617                            for (client_num, storage_index, filename, shnum)
618                            in shares ])
619             assert len(where) == 10 # this test is designed for 3-of-10
620             for shnum, filename in where.items():
621                 # shares 7,8,9 are left alone. read will check
622                 # (share_hash_chain, block_hash_tree, share_data). New
623                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
624                 # segsize, signature).
625                 if shnum == 0:
626                     # read: this will trigger "pubkey doesn't match
627                     # fingerprint".
628                     self._corrupt_mutable_share(filename, "pubkey")
629                     self._corrupt_mutable_share(filename, "encprivkey")
630                 elif shnum == 1:
631                     # triggers "signature is invalid"
632                     self._corrupt_mutable_share(filename, "seqnum")
633                 elif shnum == 2:
634                     # triggers "signature is invalid"
635                     self._corrupt_mutable_share(filename, "R")
636                 elif shnum == 3:
637                     # triggers "signature is invalid"
638                     self._corrupt_mutable_share(filename, "segsize")
639                 elif shnum == 4:
640                     self._corrupt_mutable_share(filename, "share_hash_chain")
641                 elif shnum == 5:
642                     self._corrupt_mutable_share(filename, "block_hash_tree")
643                 elif shnum == 6:
644                     self._corrupt_mutable_share(filename, "share_data")
645                 # other things to correct: IV, signature
646                 # 7,8,9 are left alone
647
648                 # note that initial_query_count=5 means that we'll hit the
649                 # first 5 servers in effectively random order (based upon
650                 # response time), so we won't necessarily ever get a "pubkey
651                 # doesn't match fingerprint" error (if we hit shnum>=1 before
652                 # shnum=0, we pull the pubkey from there). To get repeatable
653                 # specific failures, we need to set initial_query_count=1,
654                 # but of course that will change the sequencing behavior of
655                 # the retrieval process. TODO: find a reasonable way to make
656                 # this a parameter, probably when we expand this test to test
657                 # for one failure mode at a time.
658
659                 # when we retrieve this, we should get three signature
660                 # failures (where we've mangled seqnum, R, and segsize). The
661                 # pubkey mangling
662         d.addCallback(_corrupt_shares)
663
664         d.addCallback(lambda res: self._newnode3.download_best_version())
665         d.addCallback(_check_download_5)
666
667         def _check_empty_file(res):
668             # make sure we can create empty files, this usually screws up the
669             # segsize math
670             d1 = self.clients[2].create_mutable_file("")
671             d1.addCallback(lambda newnode: newnode.download_best_version())
672             d1.addCallback(lambda res: self.failUnlessEqual("", res))
673             return d1
674         d.addCallback(_check_empty_file)
675
676         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
677         def _created_dirnode(dnode):
678             log.msg("_created_dirnode(%s)" % (dnode,))
679             d1 = dnode.list()
680             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
681             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
682             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
683             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
684             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
685             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
686             d1.addCallback(lambda res: dnode.build_manifest().when_done())
687             d1.addCallback(lambda res:
688                            self.failUnlessEqual(len(res["manifest"]), 1))
689             return d1
690         d.addCallback(_created_dirnode)
691
692         def wait_for_c3_kg_conn():
693             return self.clients[3]._key_generator is not None
694         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
695
696         def check_kg_poolsize(junk, size_delta):
697             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
698                                  self.key_generator_svc.key_generator.pool_size + size_delta)
699
700         d.addCallback(check_kg_poolsize, 0)
701         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
702         d.addCallback(check_kg_poolsize, -1)
703         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
704         d.addCallback(check_kg_poolsize, -2)
705         # use_helper induces use of clients[3], which is the using-key_gen client
706         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
707         d.addCallback(check_kg_poolsize, -3)
708
709         return d
710     # The default 120 second timeout went off when running it under valgrind
711     # on my old Windows laptop, so I'm bumping up the timeout.
712     test_mutable.timeout = 240
713
714     def flip_bit(self, good):
715         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
716
717     def mangle_uri(self, gooduri):
718         # change the key, which changes the storage index, which means we'll
719         # be asking about the wrong file, so nobody will have any shares
720         u = IFileURI(gooduri)
721         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
722                             uri_extension_hash=u.uri_extension_hash,
723                             needed_shares=u.needed_shares,
724                             total_shares=u.total_shares,
725                             size=u.size)
726         return u2.to_string()
727
728     # TODO: add a test which mangles the uri_extension_hash instead, and
729     # should fail due to not being able to get a valid uri_extension block.
730     # Also a test which sneakily mangles the uri_extension block to change
731     # some of the validation data, so it will fail in the post-download phase
732     # when the file's crypttext integrity check fails. Do the same thing for
733     # the key, which should cause the download to fail the post-download
734     # plaintext_hash check.
735
736     def test_vdrive(self):
737         self.basedir = "system/SystemTest/test_vdrive"
738         self.data = LARGE_DATA
739         d = self.set_up_nodes(use_stats_gatherer=True)
740         d.addCallback(self._test_introweb)
741         d.addCallback(self.log, "starting publish")
742         d.addCallback(self._do_publish1)
743         d.addCallback(self._test_runner)
744         d.addCallback(self._do_publish2)
745         # at this point, we have the following filesystem (where "R" denotes
746         # self._root_directory_uri):
747         # R
748         # R/subdir1
749         # R/subdir1/mydata567
750         # R/subdir1/subdir2/
751         # R/subdir1/subdir2/mydata992
752
753         d.addCallback(lambda res: self.bounce_client(0))
754         d.addCallback(self.log, "bounced client0")
755
756         d.addCallback(self._check_publish1)
757         d.addCallback(self.log, "did _check_publish1")
758         d.addCallback(self._check_publish2)
759         d.addCallback(self.log, "did _check_publish2")
760         d.addCallback(self._do_publish_private)
761         d.addCallback(self.log, "did _do_publish_private")
762         # now we also have (where "P" denotes a new dir):
763         #  P/personal/sekrit data
764         #  P/s2-rw -> /subdir1/subdir2/
765         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
766         d.addCallback(self._check_publish_private)
767         d.addCallback(self.log, "did _check_publish_private")
768         d.addCallback(self._test_web)
769         d.addCallback(self._test_control)
770         d.addCallback(self._test_cli)
771         # P now has four top-level children:
772         # P/personal/sekrit data
773         # P/s2-ro/
774         # P/s2-rw/
775         # P/test_put/  (empty)
776         d.addCallback(self._test_checker)
777         d.addCallback(self._grab_stats)
778         return d
779     test_vdrive.timeout = 1100
780
781     def _test_introweb(self, res):
782         d = getPage(self.introweb_url, method="GET", followRedirect=True)
783         def _check(res):
784             try:
785                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
786                                 in res)
787                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
788                 self.failUnless("Subscription Summary: storage: 5" in res)
789             except unittest.FailTest:
790                 print
791                 print "GET %s output was:" % self.introweb_url
792                 print res
793                 raise
794         d.addCallback(_check)
795         d.addCallback(lambda res:
796                       getPage(self.introweb_url + "?t=json",
797                               method="GET", followRedirect=True))
798         def _check_json(res):
799             data = simplejson.loads(res)
800             try:
801                 self.failUnlessEqual(data["subscription_summary"],
802                                      {"storage": 5})
803                 self.failUnlessEqual(data["announcement_summary"],
804                                      {"storage": 5, "stub_client": 5})
805                 self.failUnlessEqual(data["announcement_distinct_hosts"],
806                                      {"storage": 1, "stub_client": 1})
807             except unittest.FailTest:
808                 print
809                 print "GET %s?t=json output was:" % self.introweb_url
810                 print res
811                 raise
812         d.addCallback(_check_json)
813         return d
814
815     def _do_publish1(self, res):
816         ut = upload.Data(self.data, convergence=None)
817         c0 = self.clients[0]
818         d = c0.create_empty_dirnode()
819         def _made_root(new_dirnode):
820             self._root_directory_uri = new_dirnode.get_uri()
821             return c0.create_node_from_uri(self._root_directory_uri)
822         d.addCallback(_made_root)
823         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
824         def _made_subdir1(subdir1_node):
825             self._subdir1_node = subdir1_node
826             d1 = subdir1_node.add_file(u"mydata567", ut)
827             d1.addCallback(self.log, "publish finished")
828             def _stash_uri(filenode):
829                 self.uri = filenode.get_uri()
830             d1.addCallback(_stash_uri)
831             return d1
832         d.addCallback(_made_subdir1)
833         return d
834
835     def _do_publish2(self, res):
836         ut = upload.Data(self.data, convergence=None)
837         d = self._subdir1_node.create_empty_directory(u"subdir2")
838         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
839         return d
840
841     def log(self, res, *args, **kwargs):
842         # print "MSG: %s  RES: %s" % (msg, args)
843         log.msg(*args, **kwargs)
844         return res
845
846     def _do_publish_private(self, res):
847         self.smalldata = "sssh, very secret stuff"
848         ut = upload.Data(self.smalldata, convergence=None)
849         d = self.clients[0].create_empty_dirnode()
850         d.addCallback(self.log, "GOT private directory")
851         def _got_new_dir(privnode):
852             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
853             d1 = privnode.create_empty_directory(u"personal")
854             d1.addCallback(self.log, "made P/personal")
855             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
856             d1.addCallback(self.log, "made P/personal/sekrit data")
857             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
858             def _got_s2(s2node):
859                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
860                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
861                 return d2
862             d1.addCallback(_got_s2)
863             d1.addCallback(lambda res: privnode)
864             return d1
865         d.addCallback(_got_new_dir)
866         return d
867
868     def _check_publish1(self, res):
869         # this one uses the iterative API
870         c1 = self.clients[1]
871         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
872         d.addCallback(self.log, "check_publish1 got /")
873         d.addCallback(lambda root: root.get(u"subdir1"))
874         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
875         d.addCallback(lambda filenode: filenode.download_to_data())
876         d.addCallback(self.log, "get finished")
877         def _get_done(data):
878             self.failUnlessEqual(data, self.data)
879         d.addCallback(_get_done)
880         return d
881
882     def _check_publish2(self, res):
883         # this one uses the path-based API
884         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
885         d = rootnode.get_child_at_path(u"subdir1")
886         d.addCallback(lambda dirnode:
887                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
888         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
889         d.addCallback(lambda filenode: filenode.download_to_data())
890         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
891
892         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
893         def _got_filenode(filenode):
894             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
895             assert fnode == filenode
896         d.addCallback(_got_filenode)
897         return d
898
899     def _check_publish_private(self, resnode):
900         # this one uses the path-based API
901         self._private_node = resnode
902
903         d = self._private_node.get_child_at_path(u"personal")
904         def _got_personal(personal):
905             self._personal_node = personal
906             return personal
907         d.addCallback(_got_personal)
908
909         d.addCallback(lambda dirnode:
910                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
911         def get_path(path):
912             return self._private_node.get_child_at_path(path)
913
914         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
915         d.addCallback(lambda filenode: filenode.download_to_data())
916         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
917         d.addCallback(lambda res: get_path(u"s2-rw"))
918         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
919         d.addCallback(lambda res: get_path(u"s2-ro"))
920         def _got_s2ro(dirnode):
921             self.failUnless(dirnode.is_mutable(), dirnode)
922             self.failUnless(dirnode.is_readonly(), dirnode)
923             d1 = defer.succeed(None)
924             d1.addCallback(lambda res: dirnode.list())
925             d1.addCallback(self.log, "dirnode.list")
926
927             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
928
929             d1.addCallback(self.log, "doing add_file(ro)")
930             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
931             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
932
933             d1.addCallback(self.log, "doing get(ro)")
934             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
935             d1.addCallback(lambda filenode:
936                            self.failUnless(IFileNode.providedBy(filenode)))
937
938             d1.addCallback(self.log, "doing delete(ro)")
939             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
940
941             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
942
943             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
944
945             personal = self._personal_node
946             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
947
948             d1.addCallback(self.log, "doing move_child_to(ro)2")
949             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
950
951             d1.addCallback(self.log, "finished with _got_s2ro")
952             return d1
953         d.addCallback(_got_s2ro)
954         def _got_home(dummy):
955             home = self._private_node
956             personal = self._personal_node
957             d1 = defer.succeed(None)
958             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
959             d1.addCallback(lambda res:
960                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
961
962             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
963             d1.addCallback(lambda res:
964                            home.move_child_to(u"sekrit", home, u"sekrit data"))
965
966             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
967             d1.addCallback(lambda res:
968                            home.move_child_to(u"sekrit data", personal))
969
970             d1.addCallback(lambda res: home.build_manifest().when_done())
971             d1.addCallback(self.log, "manifest")
972             #  five items:
973             # P/
974             # P/personal/
975             # P/personal/sekrit data
976             # P/s2-rw  (same as P/s2-ro)
977             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
978             d1.addCallback(lambda res:
979                            self.failUnlessEqual(len(res["manifest"]), 5))
980             d1.addCallback(lambda res: home.start_deep_stats().when_done())
981             def _check_stats(stats):
982                 expected = {"count-immutable-files": 1,
983                             "count-mutable-files": 0,
984                             "count-literal-files": 1,
985                             "count-files": 2,
986                             "count-directories": 3,
987                             "size-immutable-files": 112,
988                             "size-literal-files": 23,
989                             #"size-directories": 616, # varies
990                             #"largest-directory": 616,
991                             "largest-directory-children": 3,
992                             "largest-immutable-file": 112,
993                             }
994                 for k,v in expected.iteritems():
995                     self.failUnlessEqual(stats[k], v,
996                                          "stats[%s] was %s, not %s" %
997                                          (k, stats[k], v))
998                 self.failUnless(stats["size-directories"] > 1300,
999                                 stats["size-directories"])
1000                 self.failUnless(stats["largest-directory"] > 800,
1001                                 stats["largest-directory"])
1002                 self.failUnlessEqual(stats["size-files-histogram"],
1003                                      [ (11, 31, 1), (101, 316, 1) ])
1004             d1.addCallback(_check_stats)
1005             return d1
1006         d.addCallback(_got_home)
1007         return d
1008
1009     def shouldFail(self, res, expected_failure, which, substring=None):
1010         if isinstance(res, Failure):
1011             res.trap(expected_failure)
1012             if substring:
1013                 self.failUnless(substring in str(res),
1014                                 "substring '%s' not in '%s'"
1015                                 % (substring, str(res)))
1016         else:
1017             self.fail("%s was supposed to raise %s, not get '%s'" %
1018                       (which, expected_failure, res))
1019
1020     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1021         assert substring is None or isinstance(substring, str)
1022         d = defer.maybeDeferred(callable, *args, **kwargs)
1023         def done(res):
1024             if isinstance(res, Failure):
1025                 res.trap(expected_failure)
1026                 if substring:
1027                     self.failUnless(substring in str(res),
1028                                     "substring '%s' not in '%s'"
1029                                     % (substring, str(res)))
1030             else:
1031                 self.fail("%s was supposed to raise %s, not get '%s'" %
1032                           (which, expected_failure, res))
1033         d.addBoth(done)
1034         return d
1035
1036     def PUT(self, urlpath, data):
1037         url = self.webish_url + urlpath
1038         return getPage(url, method="PUT", postdata=data)
1039
1040     def GET(self, urlpath, followRedirect=False):
1041         url = self.webish_url + urlpath
1042         return getPage(url, method="GET", followRedirect=followRedirect)
1043
1044     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1045         if use_helper:
1046             url = self.helper_webish_url + urlpath
1047         else:
1048             url = self.webish_url + urlpath
1049         sepbase = "boogabooga"
1050         sep = "--" + sepbase
1051         form = []
1052         form.append(sep)
1053         form.append('Content-Disposition: form-data; name="_charset"')
1054         form.append('')
1055         form.append('UTF-8')
1056         form.append(sep)
1057         for name, value in fields.iteritems():
1058             if isinstance(value, tuple):
1059                 filename, value = value
1060                 form.append('Content-Disposition: form-data; name="%s"; '
1061                             'filename="%s"' % (name, filename.encode("utf-8")))
1062             else:
1063                 form.append('Content-Disposition: form-data; name="%s"' % name)
1064             form.append('')
1065             form.append(str(value))
1066             form.append(sep)
1067         form[-1] += "--"
1068         body = "\r\n".join(form) + "\r\n"
1069         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1070                    }
1071         return getPage(url, method="POST", postdata=body,
1072                        headers=headers, followRedirect=followRedirect)
1073
1074     def _test_web(self, res):
1075         base = self.webish_url
1076         public = "uri/" + self._root_directory_uri
1077         d = getPage(base)
1078         def _got_welcome(page):
1079             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1080             self.failUnless(expected in page,
1081                             "I didn't see the right 'connected storage servers'"
1082                             " message in: %s" % page
1083                             )
1084             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1085             self.failUnless(expected in page,
1086                             "I didn't see the right 'My nodeid' message "
1087                             "in: %s" % page)
1088             self.failUnless("Helper: 0 active uploads" in page)
1089         d.addCallback(_got_welcome)
1090         d.addCallback(self.log, "done with _got_welcome")
1091
1092         # get the welcome page from the node that uses the helper too
1093         d.addCallback(lambda res: getPage(self.helper_webish_url))
1094         def _got_welcome_helper(page):
1095             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1096                             page)
1097             self.failUnless("Not running helper" in page)
1098         d.addCallback(_got_welcome_helper)
1099
1100         d.addCallback(lambda res: getPage(base + public))
1101         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1102         def _got_subdir1(page):
1103             # there ought to be an href for our file
1104             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1105             self.failUnless(">mydata567</a>" in page)
1106         d.addCallback(_got_subdir1)
1107         d.addCallback(self.log, "done with _got_subdir1")
1108         d.addCallback(lambda res:
1109                       getPage(base + public + "/subdir1/mydata567"))
1110         def _got_data(page):
1111             self.failUnlessEqual(page, self.data)
1112         d.addCallback(_got_data)
1113
1114         # download from a URI embedded in a URL
1115         d.addCallback(self.log, "_get_from_uri")
1116         def _get_from_uri(res):
1117             return getPage(base + "uri/%s?filename=%s"
1118                            % (self.uri, "mydata567"))
1119         d.addCallback(_get_from_uri)
1120         def _got_from_uri(page):
1121             self.failUnlessEqual(page, self.data)
1122         d.addCallback(_got_from_uri)
1123
1124         # download from a URI embedded in a URL, second form
1125         d.addCallback(self.log, "_get_from_uri2")
1126         def _get_from_uri2(res):
1127             return getPage(base + "uri?uri=%s" % (self.uri,))
1128         d.addCallback(_get_from_uri2)
1129         d.addCallback(_got_from_uri)
1130
1131         # download from a bogus URI, make sure we get a reasonable error
1132         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1133         def _get_from_bogus_uri(res):
1134             d1 = getPage(base + "uri/%s?filename=%s"
1135                          % (self.mangle_uri(self.uri), "mydata567"))
1136             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1137                        "410")
1138             return d1
1139         d.addCallback(_get_from_bogus_uri)
1140         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1141
1142         # upload a file with PUT
1143         d.addCallback(self.log, "about to try PUT")
1144         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1145                                            "new.txt contents"))
1146         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1147         d.addCallback(self.failUnlessEqual, "new.txt contents")
1148         # and again with something large enough to use multiple segments,
1149         # and hopefully trigger pauseProducing too
1150         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1151                                            "big" * 500000)) # 1.5MB
1152         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1153         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1154
1155         # can we replace files in place?
1156         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1157                                            "NEWER contents"))
1158         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1159         d.addCallback(self.failUnlessEqual, "NEWER contents")
1160
1161         # test unlinked POST
1162         d.addCallback(lambda res: self.POST("uri", t="upload",
1163                                             file=("new.txt", "data" * 10000)))
1164         # and again using the helper, which exercises different upload-status
1165         # display code
1166         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1167                                             file=("foo.txt", "data2" * 10000)))
1168
1169         # check that the status page exists
1170         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1171         def _got_status(res):
1172             # find an interesting upload and download to look at. LIT files
1173             # are not interesting.
1174             for ds in self.clients[0].list_all_download_statuses():
1175                 if ds.get_size() > 200:
1176                     self._down_status = ds.get_counter()
1177             for us in self.clients[0].list_all_upload_statuses():
1178                 if us.get_size() > 200:
1179                     self._up_status = us.get_counter()
1180             rs = self.clients[0].list_all_retrieve_statuses()[0]
1181             self._retrieve_status = rs.get_counter()
1182             ps = self.clients[0].list_all_publish_statuses()[0]
1183             self._publish_status = ps.get_counter()
1184             us = self.clients[0].list_all_mapupdate_statuses()[0]
1185             self._update_status = us.get_counter()
1186
1187             # and that there are some upload- and download- status pages
1188             return self.GET("status/up-%d" % self._up_status)
1189         d.addCallback(_got_status)
1190         def _got_up(res):
1191             return self.GET("status/down-%d" % self._down_status)
1192         d.addCallback(_got_up)
1193         def _got_down(res):
1194             return self.GET("status/mapupdate-%d" % self._update_status)
1195         d.addCallback(_got_down)
1196         def _got_update(res):
1197             return self.GET("status/publish-%d" % self._publish_status)
1198         d.addCallback(_got_update)
1199         def _got_publish(res):
1200             return self.GET("status/retrieve-%d" % self._retrieve_status)
1201         d.addCallback(_got_publish)
1202
1203         # check that the helper status page exists
1204         d.addCallback(lambda res:
1205                       self.GET("helper_status", followRedirect=True))
1206         def _got_helper_status(res):
1207             self.failUnless("Bytes Fetched:" in res)
1208             # touch a couple of files in the helper's working directory to
1209             # exercise more code paths
1210             workdir = os.path.join(self.getdir("client0"), "helper")
1211             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1212             f = open(incfile, "wb")
1213             f.write("small file")
1214             f.close()
1215             then = time.time() - 86400*3
1216             now = time.time()
1217             os.utime(incfile, (now, then))
1218             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1219             f = open(encfile, "wb")
1220             f.write("less small file")
1221             f.close()
1222             os.utime(encfile, (now, then))
1223         d.addCallback(_got_helper_status)
1224         # and that the json form exists
1225         d.addCallback(lambda res:
1226                       self.GET("helper_status?t=json", followRedirect=True))
1227         def _got_helper_status_json(res):
1228             data = simplejson.loads(res)
1229             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1230                                  1)
1231             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1232             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1233             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1234                                  10)
1235             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1236             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1237             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1238                                  15)
1239         d.addCallback(_got_helper_status_json)
1240
1241         # and check that client[3] (which uses a helper but does not run one
1242         # itself) doesn't explode when you ask for its status
1243         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1244         def _got_non_helper_status(res):
1245             self.failUnless("Upload and Download Status" in res)
1246         d.addCallback(_got_non_helper_status)
1247
1248         # or for helper status with t=json
1249         d.addCallback(lambda res:
1250                       getPage(self.helper_webish_url + "helper_status?t=json"))
1251         def _got_non_helper_status_json(res):
1252             data = simplejson.loads(res)
1253             self.failUnlessEqual(data, {})
1254         d.addCallback(_got_non_helper_status_json)
1255
1256         # see if the statistics page exists
1257         d.addCallback(lambda res: self.GET("statistics"))
1258         def _got_stats(res):
1259             self.failUnless("Node Statistics" in res)
1260             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1261         d.addCallback(_got_stats)
1262         d.addCallback(lambda res: self.GET("statistics?t=json"))
1263         def _got_stats_json(res):
1264             data = simplejson.loads(res)
1265             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1266             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1267         d.addCallback(_got_stats_json)
1268
1269         # TODO: mangle the second segment of a file, to test errors that
1270         # occur after we've already sent some good data, which uses a
1271         # different error path.
1272
1273         # TODO: download a URI with a form
1274         # TODO: create a directory by using a form
1275         # TODO: upload by using a form on the directory page
1276         #    url = base + "somedir/subdir1/freeform_post!!upload"
1277         # TODO: delete a file by using a button on the directory page
1278
1279         return d
1280
1281     def _test_runner(self, res):
1282         # exercise some of the diagnostic tools in runner.py
1283
1284         # find a share
1285         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1286             if "storage" not in dirpath:
1287                 continue
1288             if not filenames:
1289                 continue
1290             pieces = dirpath.split(os.sep)
1291             if pieces[-4] == "storage" and pieces[-3] == "shares":
1292                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1293                 # are sharefiles here
1294                 filename = os.path.join(dirpath, filenames[0])
1295                 # peek at the magic to see if it is a chk share
1296                 magic = open(filename, "rb").read(4)
1297                 if magic == '\x00\x00\x00\x01':
1298                     break
1299         else:
1300             self.fail("unable to find any uri_extension files in %s"
1301                       % self.basedir)
1302         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1303
1304         out,err = StringIO(), StringIO()
1305         rc = runner.runner(["debug", "dump-share", "--offsets",
1306                             filename],
1307                            stdout=out, stderr=err)
1308         output = out.getvalue()
1309         self.failUnlessEqual(rc, 0)
1310
1311         # we only upload a single file, so we can assert some things about
1312         # its size and shares.
1313         self.failUnless(("share filename: %s" % filename) in output)
1314         self.failUnless("size: %d\n" % len(self.data) in output)
1315         self.failUnless("num_segments: 1\n" in output)
1316         # segment_size is always a multiple of needed_shares
1317         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1318         self.failUnless("total_shares: 10\n" in output)
1319         # keys which are supposed to be present
1320         for key in ("size", "num_segments", "segment_size",
1321                     "needed_shares", "total_shares",
1322                     "codec_name", "codec_params", "tail_codec_params",
1323                     #"plaintext_hash", "plaintext_root_hash",
1324                     "crypttext_hash", "crypttext_root_hash",
1325                     "share_root_hash", "UEB_hash"):
1326             self.failUnless("%s: " % key in output, key)
1327         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1328
1329         # now use its storage index to find the other shares using the
1330         # 'find-shares' tool
1331         sharedir, shnum = os.path.split(filename)
1332         storagedir, storage_index_s = os.path.split(sharedir)
1333         out,err = StringIO(), StringIO()
1334         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1335         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1336         rc = runner.runner(cmd, stdout=out, stderr=err)
1337         self.failUnlessEqual(rc, 0)
1338         out.seek(0)
1339         sharefiles = [sfn.strip() for sfn in out.readlines()]
1340         self.failUnlessEqual(len(sharefiles), 10)
1341
1342         # also exercise the 'catalog-shares' tool
1343         out,err = StringIO(), StringIO()
1344         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1345         cmd = ["debug", "catalog-shares"] + nodedirs
1346         rc = runner.runner(cmd, stdout=out, stderr=err)
1347         self.failUnlessEqual(rc, 0)
1348         out.seek(0)
1349         descriptions = [sfn.strip() for sfn in out.readlines()]
1350         self.failUnlessEqual(len(descriptions), 30)
1351         matching = [line
1352                     for line in descriptions
1353                     if line.startswith("CHK %s " % storage_index_s)]
1354         self.failUnlessEqual(len(matching), 10)
1355
1356     def _test_control(self, res):
1357         # exercise the remote-control-the-client foolscap interfaces in
1358         # allmydata.control (mostly used for performance tests)
1359         c0 = self.clients[0]
1360         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1361         control_furl = open(control_furl_file, "r").read().strip()
1362         # it doesn't really matter which Tub we use to connect to the client,
1363         # so let's just use our IntroducerNode's
1364         d = self.introducer.tub.getReference(control_furl)
1365         d.addCallback(self._test_control2, control_furl_file)
1366         return d
1367     def _test_control2(self, rref, filename):
1368         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1369         downfile = os.path.join(self.basedir, "control.downfile")
1370         d.addCallback(lambda uri:
1371                       rref.callRemote("download_from_uri_to_file",
1372                                       uri, downfile))
1373         def _check(res):
1374             self.failUnlessEqual(res, downfile)
1375             data = open(downfile, "r").read()
1376             expected_data = open(filename, "r").read()
1377             self.failUnlessEqual(data, expected_data)
1378         d.addCallback(_check)
1379         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1380         if sys.platform == "linux2":
1381             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1382         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1383         return d
1384
1385     def _test_cli(self, res):
1386         # run various CLI commands (in a thread, since they use blocking
1387         # network calls)
1388
1389         private_uri = self._private_node.get_uri()
1390         some_uri = self._root_directory_uri
1391         client0_basedir = self.getdir("client0")
1392
1393         nodeargs = [
1394             "--node-directory", client0_basedir,
1395             ]
1396         TESTDATA = "I will not write the same thing over and over.\n" * 100
1397
1398         d = defer.succeed(None)
1399
1400         # for compatibility with earlier versions, private/root_dir.cap is
1401         # supposed to be treated as an alias named "tahoe:". Start by making
1402         # sure that works, before we add other aliases.
1403
1404         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1405         f = open(root_file, "w")
1406         f.write(private_uri)
1407         f.close()
1408
1409         def run(ignored, verb, *args, **kwargs):
1410             stdin = kwargs.get("stdin", "")
1411             newargs = [verb] + nodeargs + list(args)
1412             return self._run_cli(newargs, stdin=stdin)
1413
1414         def _check_ls((out,err), expected_children, unexpected_children=[]):
1415             self.failUnlessEqual(err, "")
1416             for s in expected_children:
1417                 self.failUnless(s in out, (s,out))
1418             for s in unexpected_children:
1419                 self.failIf(s in out, (s,out))
1420
1421         def _check_ls_root((out,err)):
1422             self.failUnless("personal" in out)
1423             self.failUnless("s2-ro" in out)
1424             self.failUnless("s2-rw" in out)
1425             self.failUnlessEqual(err, "")
1426
1427         # this should reference private_uri
1428         d.addCallback(run, "ls")
1429         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1430
1431         d.addCallback(run, "list-aliases")
1432         def _check_aliases_1((out,err)):
1433             self.failUnlessEqual(err, "")
1434             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1435         d.addCallback(_check_aliases_1)
1436
1437         # now that that's out of the way, remove root_dir.cap and work with
1438         # new files
1439         d.addCallback(lambda res: os.unlink(root_file))
1440         d.addCallback(run, "list-aliases")
1441         def _check_aliases_2((out,err)):
1442             self.failUnlessEqual(err, "")
1443             self.failUnlessEqual(out, "")
1444         d.addCallback(_check_aliases_2)
1445
1446         d.addCallback(run, "mkdir")
1447         def _got_dir( (out,err) ):
1448             self.failUnless(uri.from_string_dirnode(out.strip()))
1449             return out.strip()
1450         d.addCallback(_got_dir)
1451         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1452
1453         d.addCallback(run, "list-aliases")
1454         def _check_aliases_3((out,err)):
1455             self.failUnlessEqual(err, "")
1456             self.failUnless("tahoe: " in out)
1457         d.addCallback(_check_aliases_3)
1458
1459         def _check_empty_dir((out,err)):
1460             self.failUnlessEqual(out, "")
1461             self.failUnlessEqual(err, "")
1462         d.addCallback(run, "ls")
1463         d.addCallback(_check_empty_dir)
1464
1465         def _check_missing_dir((out,err)):
1466             # TODO: check that rc==2
1467             self.failUnlessEqual(out, "")
1468             self.failUnlessEqual(err, "No such file or directory\n")
1469         d.addCallback(run, "ls", "bogus")
1470         d.addCallback(_check_missing_dir)
1471
1472         files = []
1473         datas = []
1474         for i in range(10):
1475             fn = os.path.join(self.basedir, "file%d" % i)
1476             files.append(fn)
1477             data = "data to be uploaded: file%d\n" % i
1478             datas.append(data)
1479             open(fn,"wb").write(data)
1480
1481         def _check_stdout_against((out,err), filenum=None, data=None):
1482             self.failUnlessEqual(err, "")
1483             if filenum is not None:
1484                 self.failUnlessEqual(out, datas[filenum])
1485             if data is not None:
1486                 self.failUnlessEqual(out, data)
1487
1488         # test all both forms of put: from a file, and from stdin
1489         #  tahoe put bar FOO
1490         d.addCallback(run, "put", files[0], "tahoe-file0")
1491         def _put_out((out,err)):
1492             self.failUnless("URI:LIT:" in out, out)
1493             self.failUnless("201 Created" in err, err)
1494             uri0 = out.strip()
1495             return run(None, "get", uri0)
1496         d.addCallback(_put_out)
1497         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1498
1499         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1500         #  tahoe put bar tahoe:FOO
1501         d.addCallback(run, "put", files[2], "tahoe:file2")
1502         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1503         def _check_put_mutable((out,err)):
1504             self._mutable_file3_uri = out.strip()
1505         d.addCallback(_check_put_mutable)
1506         d.addCallback(run, "get", "tahoe:file3")
1507         d.addCallback(_check_stdout_against, 3)
1508
1509         #  tahoe put FOO
1510         STDIN_DATA = "This is the file to upload from stdin."
1511         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1512         #  tahoe put tahoe:FOO
1513         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1514                       stdin="Other file from stdin.")
1515
1516         d.addCallback(run, "ls")
1517         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1518                                   "tahoe-file-stdin", "from-stdin"])
1519         d.addCallback(run, "ls", "subdir")
1520         d.addCallback(_check_ls, ["tahoe-file1"])
1521
1522         # tahoe mkdir FOO
1523         d.addCallback(run, "mkdir", "subdir2")
1524         d.addCallback(run, "ls")
1525         # TODO: extract the URI, set an alias with it
1526         d.addCallback(_check_ls, ["subdir2"])
1527
1528         # tahoe get: (to stdin and to a file)
1529         d.addCallback(run, "get", "tahoe-file0")
1530         d.addCallback(_check_stdout_against, 0)
1531         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1532         d.addCallback(_check_stdout_against, 1)
1533         outfile0 = os.path.join(self.basedir, "outfile0")
1534         d.addCallback(run, "get", "file2", outfile0)
1535         def _check_outfile0((out,err)):
1536             data = open(outfile0,"rb").read()
1537             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1538         d.addCallback(_check_outfile0)
1539         outfile1 = os.path.join(self.basedir, "outfile0")
1540         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1541         def _check_outfile1((out,err)):
1542             data = open(outfile1,"rb").read()
1543             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1544         d.addCallback(_check_outfile1)
1545
1546         d.addCallback(run, "rm", "tahoe-file0")
1547         d.addCallback(run, "rm", "tahoe:file2")
1548         d.addCallback(run, "ls")
1549         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1550
1551         d.addCallback(run, "ls", "-l")
1552         def _check_ls_l((out,err)):
1553             lines = out.split("\n")
1554             for l in lines:
1555                 if "tahoe-file-stdin" in l:
1556                     self.failUnless(l.startswith("-r-- "), l)
1557                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1558                 if "file3" in l:
1559                     self.failUnless(l.startswith("-rw- "), l) # mutable
1560         d.addCallback(_check_ls_l)
1561
1562         d.addCallback(run, "ls", "--uri")
1563         def _check_ls_uri((out,err)):
1564             lines = out.split("\n")
1565             for l in lines:
1566                 if "file3" in l:
1567                     self.failUnless(self._mutable_file3_uri in l)
1568         d.addCallback(_check_ls_uri)
1569
1570         d.addCallback(run, "ls", "--readonly-uri")
1571         def _check_ls_rouri((out,err)):
1572             lines = out.split("\n")
1573             for l in lines:
1574                 if "file3" in l:
1575                     rw_uri = self._mutable_file3_uri
1576                     u = uri.from_string_mutable_filenode(rw_uri)
1577                     ro_uri = u.get_readonly().to_string()
1578                     self.failUnless(ro_uri in l)
1579         d.addCallback(_check_ls_rouri)
1580
1581
1582         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1583         d.addCallback(run, "ls")
1584         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1585
1586         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1587         d.addCallback(run, "ls")
1588         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1589
1590         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1591         d.addCallback(run, "ls")
1592         d.addCallback(_check_ls, ["file3", "file3-copy"])
1593         d.addCallback(run, "get", "tahoe:file3-copy")
1594         d.addCallback(_check_stdout_against, 3)
1595
1596         # copy from disk into tahoe
1597         d.addCallback(run, "cp", files[4], "tahoe:file4")
1598         d.addCallback(run, "ls")
1599         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1600         d.addCallback(run, "get", "tahoe:file4")
1601         d.addCallback(_check_stdout_against, 4)
1602
1603         # copy from tahoe into disk
1604         target_filename = os.path.join(self.basedir, "file-out")
1605         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1606         def _check_cp_out((out,err)):
1607             self.failUnless(os.path.exists(target_filename))
1608             got = open(target_filename,"rb").read()
1609             self.failUnlessEqual(got, datas[4])
1610         d.addCallback(_check_cp_out)
1611
1612         # copy from disk to disk (silly case)
1613         target2_filename = os.path.join(self.basedir, "file-out-copy")
1614         d.addCallback(run, "cp", target_filename, target2_filename)
1615         def _check_cp_out2((out,err)):
1616             self.failUnless(os.path.exists(target2_filename))
1617             got = open(target2_filename,"rb").read()
1618             self.failUnlessEqual(got, datas[4])
1619         d.addCallback(_check_cp_out2)
1620
1621         # copy from tahoe into disk, overwriting an existing file
1622         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1623         def _check_cp_out3((out,err)):
1624             self.failUnless(os.path.exists(target_filename))
1625             got = open(target_filename,"rb").read()
1626             self.failUnlessEqual(got, datas[3])
1627         d.addCallback(_check_cp_out3)
1628
1629         # copy from disk into tahoe, overwriting an existing immutable file
1630         d.addCallback(run, "cp", files[5], "tahoe:file4")
1631         d.addCallback(run, "ls")
1632         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1633         d.addCallback(run, "get", "tahoe:file4")
1634         d.addCallback(_check_stdout_against, 5)
1635
1636         # copy from disk into tahoe, overwriting an existing mutable file
1637         d.addCallback(run, "cp", files[5], "tahoe:file3")
1638         d.addCallback(run, "ls")
1639         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1640         d.addCallback(run, "get", "tahoe:file3")
1641         d.addCallback(_check_stdout_against, 5)
1642
1643         # recursive copy: setup
1644         dn = os.path.join(self.basedir, "dir1")
1645         os.makedirs(dn)
1646         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1647         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1648         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1649         sdn2 = os.path.join(dn, "subdir2")
1650         os.makedirs(sdn2)
1651         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1652         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1653
1654         # from disk into tahoe
1655         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1656         d.addCallback(run, "ls")
1657         d.addCallback(_check_ls, ["dir1"])
1658         d.addCallback(run, "ls", "dir1")
1659         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1660                       ["rfile4", "rfile5"])
1661         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1662         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1663                       ["rfile1", "rfile2", "rfile3"])
1664         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1665         d.addCallback(_check_stdout_against, data="rfile4")
1666
1667         # and back out again
1668         dn_copy = os.path.join(self.basedir, "dir1-copy")
1669         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1670         def _check_cp_r_out((out,err)):
1671             def _cmp(name):
1672                 old = open(os.path.join(dn, name), "rb").read()
1673                 newfn = os.path.join(dn_copy, name)
1674                 self.failUnless(os.path.exists(newfn))
1675                 new = open(newfn, "rb").read()
1676                 self.failUnlessEqual(old, new)
1677             _cmp("rfile1")
1678             _cmp("rfile2")
1679             _cmp("rfile3")
1680             _cmp(os.path.join("subdir2", "rfile4"))
1681             _cmp(os.path.join("subdir2", "rfile5"))
1682         d.addCallback(_check_cp_r_out)
1683
1684         # and copy it a second time, which ought to overwrite the same files
1685         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1686
1687         # and tahoe-to-tahoe
1688         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1689         d.addCallback(run, "ls")
1690         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1691         d.addCallback(run, "ls", "dir1-copy")
1692         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1693                       ["rfile4", "rfile5"])
1694         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1695         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1696                       ["rfile1", "rfile2", "rfile3"])
1697         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1698         d.addCallback(_check_stdout_against, data="rfile4")
1699
1700         # and copy it a second time, which ought to overwrite the same files
1701         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1702
1703         # tahoe_ls doesn't currently handle the error correctly: it tries to
1704         # JSON-parse a traceback.
1705 ##         def _ls_missing(res):
1706 ##             argv = ["ls"] + nodeargs + ["bogus"]
1707 ##             return self._run_cli(argv)
1708 ##         d.addCallback(_ls_missing)
1709 ##         def _check_ls_missing((out,err)):
1710 ##             print "OUT", out
1711 ##             print "ERR", err
1712 ##             self.failUnlessEqual(err, "")
1713 ##         d.addCallback(_check_ls_missing)
1714
1715         return d
1716
1717     def _run_cli(self, argv, stdin=""):
1718         #print "CLI:", argv
1719         stdout, stderr = StringIO(), StringIO()
1720         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1721                                   stdin=StringIO(stdin),
1722                                   stdout=stdout, stderr=stderr)
1723         def _done(res):
1724             return stdout.getvalue(), stderr.getvalue()
1725         d.addCallback(_done)
1726         return d
1727
1728     def _test_checker(self, res):
1729         ut = upload.Data("too big to be literal" * 200, convergence=None)
1730         d = self._personal_node.add_file(u"big file", ut)
1731
1732         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1733         def _check_dirnode_results(r):
1734             self.failUnless(r.is_healthy())
1735         d.addCallback(_check_dirnode_results)
1736         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1737         d.addCallback(_check_dirnode_results)
1738
1739         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1740         def _got_chk_filenode(n):
1741             self.failUnless(isinstance(n, filenode.FileNode))
1742             d = n.check(Monitor())
1743             def _check_filenode_results(r):
1744                 self.failUnless(r.is_healthy())
1745             d.addCallback(_check_filenode_results)
1746             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1747             d.addCallback(_check_filenode_results)
1748             return d
1749         d.addCallback(_got_chk_filenode)
1750
1751         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1752         def _got_lit_filenode(n):
1753             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1754             d = n.check(Monitor())
1755             def _check_lit_filenode_results(r):
1756                 self.failUnlessEqual(r, None)
1757             d.addCallback(_check_lit_filenode_results)
1758             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1759             d.addCallback(_check_lit_filenode_results)
1760             return d
1761         d.addCallback(_got_lit_filenode)
1762         return d
1763
1764
1765 class MutableChecker(SystemTestMixin, unittest.TestCase, ErrorMixin):
1766
1767     def _run_cli(self, argv):
1768         stdout, stderr = StringIO(), StringIO()
1769         # this can only do synchronous operations
1770         assert argv[0] == "debug"
1771         runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1772         return stdout.getvalue()
1773
1774     def test_good(self):
1775         self.basedir = self.mktemp()
1776         d = self.set_up_nodes()
1777         CONTENTS = "a little bit of data"
1778         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1779         def _created(node):
1780             self.node = node
1781             si = self.node.get_storage_index()
1782         d.addCallback(_created)
1783         # now make sure the webapi verifier sees no problems
1784         def _do_check(res):
1785             url = (self.webish_url +
1786                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1787                    "?t=check&verify=true")
1788             return getPage(url, method="POST")
1789         d.addCallback(_do_check)
1790         def _got_results(out):
1791             self.failUnless("<span>Healthy : Healthy</span>" in out, out)
1792             self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1793             self.failIf("Not Healthy!" in out, out)
1794             self.failIf("Unhealthy" in out, out)
1795             self.failIf("Corrupt Shares" in out, out)
1796         d.addCallback(_got_results)
1797         d.addErrback(self.explain_web_error)
1798         return d
1799
1800     def test_corrupt(self):
1801         self.basedir = self.mktemp()
1802         d = self.set_up_nodes()
1803         CONTENTS = "a little bit of data"
1804         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1805         def _created(node):
1806             self.node = node
1807             si = self.node.get_storage_index()
1808             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1809                                 self.clients[1].basedir])
1810             files = out.split("\n")
1811             # corrupt one of them, using the CLI debug command
1812             f = files[0]
1813             shnum = os.path.basename(f)
1814             nodeid = self.clients[1].nodeid
1815             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1816             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1817             out = self._run_cli(["debug", "corrupt-share", files[0]])
1818         d.addCallback(_created)
1819         # now make sure the webapi verifier notices it
1820         def _do_check(res):
1821             url = (self.webish_url +
1822                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1823                    "?t=check&verify=true")
1824             return getPage(url, method="POST")
1825         d.addCallback(_do_check)
1826         def _got_results(out):
1827             self.failUnless("Not Healthy!" in out, out)
1828             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1829             self.failUnless("Corrupt Shares:" in out, out)
1830         d.addCallback(_got_results)
1831
1832         # now make sure the webapi repairer can fix it
1833         def _do_repair(res):
1834             url = (self.webish_url +
1835                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1836                    "?t=check&verify=true&repair=true")
1837             return getPage(url, method="POST")
1838         d.addCallback(_do_repair)
1839         def _got_repair_results(out):
1840             self.failUnless("<div>Repair successful</div>" in out, out)
1841         d.addCallback(_got_repair_results)
1842         d.addCallback(_do_check)
1843         def _got_postrepair_results(out):
1844             self.failIf("Not Healthy!" in out, out)
1845             self.failUnless("Recoverable Versions: 10*seq" in out, out)
1846         d.addCallback(_got_postrepair_results)
1847         d.addErrback(self.explain_web_error)
1848
1849         return d
1850
1851     def test_delete_share(self):
1852         self.basedir = self.mktemp()
1853         d = self.set_up_nodes()
1854         CONTENTS = "a little bit of data"
1855         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1856         def _created(node):
1857             self.node = node
1858             si = self.node.get_storage_index()
1859             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1860                                 self.clients[1].basedir])
1861             files = out.split("\n")
1862             # corrupt one of them, using the CLI debug command
1863             f = files[0]
1864             shnum = os.path.basename(f)
1865             nodeid = self.clients[1].nodeid
1866             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1867             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1868             os.unlink(files[0])
1869         d.addCallback(_created)
1870         # now make sure the webapi checker notices it
1871         def _do_check(res):
1872             url = (self.webish_url +
1873                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1874                    "?t=check&verify=false")
1875             return getPage(url, method="POST")
1876         d.addCallback(_do_check)
1877         def _got_results(out):
1878             self.failUnless("Not Healthy!" in out, out)
1879             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1880             self.failIf("Corrupt Shares" in out, out)
1881         d.addCallback(_got_results)
1882
1883         # now make sure the webapi repairer can fix it
1884         def _do_repair(res):
1885             url = (self.webish_url +
1886                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1887                    "?t=check&verify=false&repair=true")
1888             return getPage(url, method="POST")
1889         d.addCallback(_do_repair)
1890         def _got_repair_results(out):
1891             self.failUnless("Repair successful" in out)
1892         d.addCallback(_got_repair_results)
1893         d.addCallback(_do_check)
1894         def _got_postrepair_results(out):
1895             self.failIf("Not Healthy!" in out, out)
1896             self.failUnless("Recoverable Versions: 10*seq" in out)
1897         d.addCallback(_got_postrepair_results)
1898         d.addErrback(self.explain_web_error)
1899
1900         return d
1901
1902
1903 class DeepCheckBase(SystemTestMixin, ErrorMixin):
1904
1905     def web_json(self, n, **kwargs):
1906         kwargs["output"] = "json"
1907         d = self.web(n, "POST", **kwargs)
1908         d.addCallback(self.decode_json)
1909         return d
1910
1911     def decode_json(self, (s,url)):
1912         try:
1913             data = simplejson.loads(s)
1914         except ValueError:
1915             self.fail("%s: not JSON: '%s'" % (url, s))
1916         return data
1917
1918     def web(self, n, method="GET", **kwargs):
1919         # returns (data, url)
1920         url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
1921                + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
1922         d = getPage(url, method=method)
1923         d.addCallback(lambda data: (data,url))
1924         return d
1925
1926     def wait_for_operation(self, ignored, ophandle):
1927         url = self.webish_url + "operations/" + ophandle
1928         url += "?t=status&output=JSON"
1929         d = getPage(url)
1930         def _got(res):
1931             try:
1932                 data = simplejson.loads(res)
1933             except ValueError:
1934                 self.fail("%s: not JSON: '%s'" % (url, res))
1935             if not data["finished"]:
1936                 d = self.stall(delay=1.0)
1937                 d.addCallback(self.wait_for_operation, ophandle)
1938                 return d
1939             return data
1940         d.addCallback(_got)
1941         return d
1942
1943     def get_operation_results(self, ignored, ophandle, output=None):
1944         url = self.webish_url + "operations/" + ophandle
1945         url += "?t=status"
1946         if output:
1947             url += "&output=" + output
1948         d = getPage(url)
1949         def _got(res):
1950             if output and output.lower() == "json":
1951                 try:
1952                     return simplejson.loads(res)
1953                 except ValueError:
1954                     self.fail("%s: not JSON: '%s'" % (url, res))
1955             return res
1956         d.addCallback(_got)
1957         return d
1958
1959     def slow_web(self, n, output=None, **kwargs):
1960         # use ophandle=
1961         handle = base32.b2a(os.urandom(4))
1962         d = self.web(n, "POST", ophandle=handle, **kwargs)
1963         d.addCallback(self.wait_for_operation, handle)
1964         d.addCallback(self.get_operation_results, handle, output=output)
1965         return d
1966
1967
1968 class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
1969     # construct a small directory tree (with one dir, one immutable file, one
1970     # mutable file, one LIT file, and a loop), and then check/examine it in
1971     # various ways.
1972
1973     def set_up_tree(self, ignored):
1974         # 2.9s
1975
1976         # root
1977         #   mutable
1978         #   large
1979         #   small
1980         #   small2
1981         #   loop -> root
1982         c0 = self.clients[0]
1983         d = c0.create_empty_dirnode()
1984         def _created_root(n):
1985             self.root = n
1986             self.root_uri = n.get_uri()
1987         d.addCallback(_created_root)
1988         d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1989         d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1990         def _created_mutable(n):
1991             self.mutable = n
1992             self.mutable_uri = n.get_uri()
1993         d.addCallback(_created_mutable)
1994
1995         large = upload.Data("Lots of data\n" * 1000, None)
1996         d.addCallback(lambda ign: self.root.add_file(u"large", large))
1997         def _created_large(n):
1998             self.large = n
1999             self.large_uri = n.get_uri()
2000         d.addCallback(_created_large)
2001
2002         small = upload.Data("Small enough for a LIT", None)
2003         d.addCallback(lambda ign: self.root.add_file(u"small", small))
2004         def _created_small(n):
2005             self.small = n
2006             self.small_uri = n.get_uri()
2007         d.addCallback(_created_small)
2008
2009         small2 = upload.Data("Small enough for a LIT too", None)
2010         d.addCallback(lambda ign: self.root.add_file(u"small2", small2))
2011         def _created_small2(n):
2012             self.small2 = n
2013             self.small2_uri = n.get_uri()
2014         d.addCallback(_created_small2)
2015
2016         d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
2017         return d
2018
2019     def check_is_healthy(self, cr, n, where, incomplete=False):
2020         self.failUnless(ICheckerResults.providedBy(cr), where)
2021         self.failUnless(cr.is_healthy(), where)
2022         self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
2023                              where)
2024         self.failUnlessEqual(cr.get_storage_index_string(),
2025                              base32.b2a(n.get_storage_index()), where)
2026         needs_rebalancing = bool( len(self.clients) < 10 )
2027         if not incomplete:
2028             self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
2029         d = cr.get_data()
2030         self.failUnlessEqual(d["count-shares-good"], 10, where)
2031         self.failUnlessEqual(d["count-shares-needed"], 3, where)
2032         self.failUnlessEqual(d["count-shares-expected"], 10, where)
2033         if not incomplete:
2034             self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
2035         self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
2036         self.failUnlessEqual(d["list-corrupt-shares"], [], where)
2037         if not incomplete:
2038             self.failUnlessEqual(sorted(d["servers-responding"]),
2039                                  sorted([c.nodeid for c in self.clients]),
2040                                  where)
2041             self.failUnless("sharemap" in d, where)
2042             all_serverids = set()
2043             for (shareid, serverids) in d["sharemap"].items():
2044                 all_serverids.update(serverids)
2045             self.failUnlessEqual(sorted(all_serverids),
2046                                  sorted([c.nodeid for c in self.clients]),
2047                                  where)
2048
2049         self.failUnlessEqual(d["count-wrong-shares"], 0, where)
2050         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2051         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2052
2053
2054     def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
2055         self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
2056         self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
2057         self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
2058         self.failUnless(cr.get_post_repair_results().is_healthy(), where)
2059         self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
2060         self.failIf(cr.get_repair_attempted(), where)
2061
2062     def deep_check_is_healthy(self, cr, num_healthy, where):
2063         self.failUnless(IDeepCheckResults.providedBy(cr))
2064         self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
2065                              num_healthy, where)
2066
2067     def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
2068         self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
2069         c = cr.get_counters()
2070         self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
2071                              num_healthy, where)
2072         self.failUnlessEqual(c["count-objects-healthy-post-repair"],
2073                              num_healthy, where)
2074         self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
2075
2076     def test_good(self):
2077         self.basedir = self.mktemp()
2078         d = self.set_up_nodes()
2079         d.addCallback(self.set_up_tree)
2080         d.addCallback(self.do_stats)
2081         d.addCallback(self.do_test_check_good)
2082         d.addCallback(self.do_test_web_good)
2083         d.addCallback(self.do_test_cli_good)
2084         d.addErrback(self.explain_web_error)
2085         d.addErrback(self.explain_error)
2086         return d
2087
2088     def do_stats(self, ignored):
2089         d = defer.succeed(None)
2090         d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
2091         d.addCallback(self.check_stats_good)
2092         return d
2093
2094     def check_stats_good(self, s):
2095         self.failUnlessEqual(s["count-directories"], 1)
2096         self.failUnlessEqual(s["count-files"], 4)
2097         self.failUnlessEqual(s["count-immutable-files"], 1)
2098         self.failUnlessEqual(s["count-literal-files"], 2)
2099         self.failUnlessEqual(s["count-mutable-files"], 1)
2100         # don't check directories: their size will vary
2101         # s["largest-directory"]
2102         # s["size-directories"]
2103         self.failUnlessEqual(s["largest-directory-children"], 5)
2104         self.failUnlessEqual(s["largest-immutable-file"], 13000)
2105         # to re-use this function for both the local
2106         # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
2107         # coerce the result into a list of tuples. dirnode.start_deep_stats()
2108         # returns a list of tuples, but JSON only knows about lists., so
2109         # t=start-deep-stats returns a list of lists.
2110         histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
2111         self.failUnlessEqual(histogram, [(11, 31, 2),
2112                                          (10001, 31622, 1),
2113                                          ])
2114         self.failUnlessEqual(s["size-immutable-files"], 13000)
2115         self.failUnlessEqual(s["size-literal-files"], 48)
2116
2117     def do_test_check_good(self, ignored):
2118         d = defer.succeed(None)
2119         # check the individual items
2120         d.addCallback(lambda ign: self.root.check(Monitor()))
2121         d.addCallback(self.check_is_healthy, self.root, "root")
2122         d.addCallback(lambda ign: self.mutable.check(Monitor()))
2123         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2124         d.addCallback(lambda ign: self.large.check(Monitor()))
2125         d.addCallback(self.check_is_healthy, self.large, "large")
2126         d.addCallback(lambda ign: self.small.check(Monitor()))
2127         d.addCallback(self.failUnlessEqual, None, "small")
2128         d.addCallback(lambda ign: self.small2.check(Monitor()))
2129         d.addCallback(self.failUnlessEqual, None, "small2")
2130
2131         # and again with verify=True
2132         d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2133         d.addCallback(self.check_is_healthy, self.root, "root")
2134         d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2135         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2136         d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2137         d.addCallback(self.check_is_healthy, self.large, "large",
2138                       incomplete=True)
2139         d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2140         d.addCallback(self.failUnlessEqual, None, "small")
2141         d.addCallback(lambda ign: self.small2.check(Monitor(), verify=True))
2142         d.addCallback(self.failUnlessEqual, None, "small2")
2143
2144         # and check_and_repair(), which should be a nop
2145         d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2146         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2147         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2148         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2149         d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2150         d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2151         d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2152         d.addCallback(self.failUnlessEqual, None, "small")
2153         d.addCallback(lambda ign: self.small2.check_and_repair(Monitor()))
2154         d.addCallback(self.failUnlessEqual, None, "small2")
2155
2156         # check_and_repair(verify=True)
2157         d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2158         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2159         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2160         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2161         d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2162         d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2163                       incomplete=True)
2164         d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2165         d.addCallback(self.failUnlessEqual, None, "small")
2166         d.addCallback(lambda ign: self.small2.check_and_repair(Monitor(), verify=True))
2167         d.addCallback(self.failUnlessEqual, None, "small2")
2168
2169
2170         # now deep-check the root, with various verify= and repair= options
2171         d.addCallback(lambda ign:
2172                       self.root.start_deep_check().when_done())
2173         d.addCallback(self.deep_check_is_healthy, 3, "root")
2174         d.addCallback(lambda ign:
2175                       self.root.start_deep_check(verify=True).when_done())
2176         d.addCallback(self.deep_check_is_healthy, 3, "root")
2177         d.addCallback(lambda ign:
2178                       self.root.start_deep_check_and_repair().when_done())
2179         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2180         d.addCallback(lambda ign:
2181                       self.root.start_deep_check_and_repair(verify=True).when_done())
2182         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2183
2184         # and finally, start a deep-check, but then cancel it.
2185         d.addCallback(lambda ign: self.root.start_deep_check())
2186         def _checking(monitor):
2187             monitor.cancel()
2188             d = monitor.when_done()
2189             # this should fire as soon as the next dirnode.list finishes.
2190             # TODO: add a counter to measure how many list() calls are made,
2191             # assert that no more than one gets to run before the cancel()
2192             # takes effect.
2193             def _finished_normally(res):
2194                 self.fail("this was supposed to fail, not finish normally")
2195             def _cancelled(f):
2196                 f.trap(OperationCancelledError)
2197             d.addCallbacks(_finished_normally, _cancelled)
2198             return d
2199         d.addCallback(_checking)
2200
2201         return d
2202
2203     def json_check_is_healthy(self, data, n, where, incomplete=False):
2204
2205         self.failUnlessEqual(data["storage-index"],
2206                              base32.b2a(n.get_storage_index()), where)
2207         self.failUnless("summary" in data, (where, data))
2208         self.failUnlessEqual(data["summary"].lower(), "healthy",
2209                              "%s: '%s'" % (where, data["summary"]))
2210         r = data["results"]
2211         self.failUnlessEqual(r["healthy"], True, where)
2212         needs_rebalancing = bool( len(self.clients) < 10 )
2213         if not incomplete:
2214             self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2215         self.failUnlessEqual(r["count-shares-good"], 10, where)
2216         self.failUnlessEqual(r["count-shares-needed"], 3, where)
2217         self.failUnlessEqual(r["count-shares-expected"], 10, where)
2218         if not incomplete:
2219             self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2220         self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2221         self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2222         if not incomplete:
2223             self.failUnlessEqual(sorted(r["servers-responding"]),
2224                                  sorted([idlib.nodeid_b2a(c.nodeid)
2225                                          for c in self.clients]), where)
2226             self.failUnless("sharemap" in r, where)
2227             all_serverids = set()
2228             for (shareid, serverids_s) in r["sharemap"].items():
2229                 all_serverids.update(serverids_s)
2230             self.failUnlessEqual(sorted(all_serverids),
2231                                  sorted([idlib.nodeid_b2a(c.nodeid)
2232                                          for c in self.clients]), where)
2233         self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2234         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2235         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2236
2237     def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2238         self.failUnlessEqual(data["storage-index"],
2239                              base32.b2a(n.get_storage_index()), where)
2240         self.failUnlessEqual(data["repair-attempted"], False, where)
2241         self.json_check_is_healthy(data["pre-repair-results"],
2242                                    n, where, incomplete)
2243         self.json_check_is_healthy(data["post-repair-results"],
2244                                    n, where, incomplete)
2245
2246     def json_full_deepcheck_is_healthy(self, data, n, where):
2247         self.failUnlessEqual(data["root-storage-index"],
2248                              base32.b2a(n.get_storage_index()), where)
2249         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2250         self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2251         self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2252         self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2253         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2254         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2255         self.json_check_stats_good(data["stats"], where)
2256
2257     def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2258         self.failUnlessEqual(data["root-storage-index"],
2259                              base32.b2a(n.get_storage_index()), where)
2260         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2261
2262         self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2263         self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2264         self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2265
2266         self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2267         self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2268         self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2269
2270         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2271         self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2272         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2273
2274         self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2275         self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2276         self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2277
2278
2279     def json_check_lit(self, data, n, where):
2280         self.failUnlessEqual(data["storage-index"], "", where)
2281         self.failUnlessEqual(data["results"]["healthy"], True, where)
2282
2283     def json_check_stats_good(self, data, where):
2284         self.check_stats_good(data)
2285
2286     def do_test_web_good(self, ignored):
2287         d = defer.succeed(None)
2288
2289         # stats
2290         d.addCallback(lambda ign:
2291                       self.slow_web(self.root,
2292                                     t="start-deep-stats", output="json"))
2293         d.addCallback(self.json_check_stats_good, "deep-stats")
2294
2295         # check, no verify
2296         d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2297         d.addCallback(self.json_check_is_healthy, self.root, "root")
2298         d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2299         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2300         d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2301         d.addCallback(self.json_check_is_healthy, self.large, "large")
2302         d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2303         d.addCallback(self.json_check_lit, self.small, "small")
2304         d.addCallback(lambda ign: self.web_json(self.small2, t="check"))
2305         d.addCallback(self.json_check_lit, self.small2, "small2")
2306
2307         # check and verify
2308         d.addCallback(lambda ign:
2309                       self.web_json(self.root, t="check", verify="true"))
2310         d.addCallback(self.json_check_is_healthy, self.root, "root+v")
2311         d.addCallback(lambda ign:
2312                       self.web_json(self.mutable, t="check", verify="true"))
2313         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable+v")
2314         d.addCallback(lambda ign:
2315                       self.web_json(self.large, t="check", verify="true"))
2316         d.addCallback(self.json_check_is_healthy, self.large, "large+v",
2317                       incomplete=True)
2318         d.addCallback(lambda ign:
2319                       self.web_json(self.small, t="check", verify="true"))
2320         d.addCallback(self.json_check_lit, self.small, "small+v")
2321         d.addCallback(lambda ign:
2322                       self.web_json(self.small2, t="check", verify="true"))
2323         d.addCallback(self.json_check_lit, self.small2, "small2+v")
2324
2325         # check and repair, no verify
2326         d.addCallback(lambda ign:
2327                       self.web_json(self.root, t="check", repair="true"))
2328         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+r")
2329         d.addCallback(lambda ign:
2330                       self.web_json(self.mutable, t="check", repair="true"))
2331         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+r")
2332         d.addCallback(lambda ign:
2333                       self.web_json(self.large, t="check", repair="true"))
2334         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+r")
2335         d.addCallback(lambda ign:
2336                       self.web_json(self.small, t="check", repair="true"))
2337         d.addCallback(self.json_check_lit, self.small, "small+r")
2338         d.addCallback(lambda ign:
2339                       self.web_json(self.small2, t="check", repair="true"))
2340         d.addCallback(self.json_check_lit, self.small2, "small2+r")
2341
2342         # check+verify+repair
2343         d.addCallback(lambda ign:
2344                       self.web_json(self.root, t="check", repair="true", verify="true"))
2345         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+vr")
2346         d.addCallback(lambda ign:
2347                       self.web_json(self.mutable, t="check", repair="true", verify="true"))
2348         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+vr")
2349         d.addCallback(lambda ign:
2350                       self.web_json(self.large, t="check", repair="true", verify="true"))
2351         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+vr", incomplete=True)
2352         d.addCallback(lambda ign:
2353                       self.web_json(self.small, t="check", repair="true", verify="true"))
2354         d.addCallback(self.json_check_lit, self.small, "small+vr")
2355         d.addCallback(lambda ign:
2356                       self.web_json(self.small2, t="check", repair="true", verify="true"))
2357         d.addCallback(self.json_check_lit, self.small2, "small2+vr")
2358
2359         # now run a deep-check, with various verify= and repair= flags
2360         d.addCallback(lambda ign:
2361                       self.slow_web(self.root, t="start-deep-check", output="json"))
2362         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+d")
2363         d.addCallback(lambda ign:
2364                       self.slow_web(self.root, t="start-deep-check", verify="true",
2365                                     output="json"))
2366         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+dv")
2367         d.addCallback(lambda ign:
2368                       self.slow_web(self.root, t="start-deep-check", repair="true",
2369                                     output="json"))
2370         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dr")
2371         d.addCallback(lambda ign:
2372                       self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2373         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dvr")
2374
2375         # now look at t=info
2376         d.addCallback(lambda ign: self.web(self.root, t="info"))
2377         # TODO: examine the output
2378         d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2379         d.addCallback(lambda ign: self.web(self.large, t="info"))
2380         d.addCallback(lambda ign: self.web(self.small, t="info"))
2381         d.addCallback(lambda ign: self.web(self.small2, t="info"))
2382
2383         return d
2384
2385     def _run_cli(self, argv, stdin=""):
2386         #print "CLI:", argv
2387         stdout, stderr = StringIO(), StringIO()
2388         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
2389                                   stdin=StringIO(stdin),
2390                                   stdout=stdout, stderr=stderr)
2391         def _done(res):
2392             return stdout.getvalue(), stderr.getvalue()
2393         d.addCallback(_done)
2394         return d
2395
2396     def do_test_cli_good(self, ignored):
2397         basedir = self.getdir("client0")
2398         d = self._run_cli(["manifest",
2399                            "--node-directory", basedir,
2400                            self.root_uri])
2401         def _check((out,err)):
2402             lines = [l for l in out.split("\n") if l]
2403             self.failUnlessEqual(len(lines), 5)
2404             caps = {}
2405             for l in lines:
2406                 try:
2407                     cap, path = l.split(None, 1)
2408                 except ValueError:
2409                     cap = l.strip()
2410                     path = ""
2411                 caps[cap] = path
2412             self.failUnless(self.root.get_uri() in caps)
2413             self.failUnlessEqual(caps[self.root.get_uri()], "")
2414             self.failUnlessEqual(caps[self.mutable.get_uri()], "mutable")
2415             self.failUnlessEqual(caps[self.large.get_uri()], "large")
2416             self.failUnlessEqual(caps[self.small.get_uri()], "small")
2417             self.failUnlessEqual(caps[self.small2.get_uri()], "small2")
2418         d.addCallback(_check)
2419
2420         d.addCallback(lambda res:
2421                       self._run_cli(["manifest",
2422                                      "--node-directory", basedir,
2423                                      "--storage-index", self.root_uri]))
2424         def _check2((out,err)):
2425             lines = [l for l in out.split("\n") if l]
2426             self.failUnlessEqual(len(lines), 3)
2427             self.failUnless(base32.b2a(self.root.get_storage_index()) in lines)
2428             self.failUnless(base32.b2a(self.mutable.get_storage_index()) in lines)
2429             self.failUnless(base32.b2a(self.large.get_storage_index()) in lines)
2430         d.addCallback(_check2)
2431
2432         d.addCallback(lambda res:
2433                       self._run_cli(["stats",
2434                                      "--node-directory", basedir,
2435                                      self.root_uri]))
2436         def _check3((out,err)):
2437             lines = [l.strip() for l in out.split("\n") if l]
2438             self.failUnless("count-immutable-files: 1" in lines)
2439             self.failUnless("count-mutable-files: 1" in lines)
2440             self.failUnless("count-literal-files: 2" in lines)
2441             self.failUnless("count-files: 4" in lines)
2442             self.failUnless("count-directories: 1" in lines)
2443             self.failUnless("size-immutable-files: 13000    (13.00 kB, 12.70 kiB)" in lines, lines)
2444             self.failUnless("size-literal-files: 48" in lines)
2445             self.failUnless("   11-31    : 2    (31 B, 31 B)".strip() in lines)
2446             self.failUnless("10001-31622 : 1    (31.62 kB, 30.88 kiB)".strip() in lines)
2447         d.addCallback(_check3)
2448
2449         d.addCallback(lambda res:
2450                       self._run_cli(["stats",
2451                                      "--node-directory", basedir,
2452                                      "--raw",
2453                                      self.root_uri]))
2454         def _check4((out,err)):
2455             data = simplejson.loads(out)
2456             self.failUnlessEqual(data["count-immutable-files"], 1)
2457             self.failUnlessEqual(data["count-immutable-files"], 1)
2458             self.failUnlessEqual(data["count-mutable-files"], 1)
2459             self.failUnlessEqual(data["count-literal-files"], 2)
2460             self.failUnlessEqual(data["count-files"], 4)
2461             self.failUnlessEqual(data["count-directories"], 1)
2462             self.failUnlessEqual(data["size-immutable-files"], 13000)
2463             self.failUnlessEqual(data["size-literal-files"], 48)
2464             self.failUnless([11,31,2] in data["size-files-histogram"])
2465             self.failUnless([10001,31622,1] in data["size-files-histogram"])
2466         d.addCallback(_check4)
2467
2468         return d
2469
2470
2471 class DeepCheckWebBad(DeepCheckBase, unittest.TestCase):
2472
2473     def test_bad(self):
2474         self.basedir = self.mktemp()
2475         d = self.set_up_nodes()
2476         d.addCallback(self.set_up_damaged_tree)
2477         d.addCallback(self.do_test_check_bad)
2478         d.addCallback(self.do_test_deepcheck_bad)
2479         d.addCallback(self.do_test_web_bad)
2480         d.addErrback(self.explain_web_error)
2481         d.addErrback(self.explain_error)
2482         return d
2483
2484
2485
2486     def set_up_damaged_tree(self, ignored):
2487         # 6.4s
2488
2489         # root
2490         #   mutable-good
2491         #   mutable-missing-shares
2492         #   mutable-corrupt-shares
2493         #   mutable-unrecoverable
2494         #   large-good
2495         #   large-missing-shares
2496         #   large-corrupt-shares
2497         #   large-unrecoverable
2498
2499         self.nodes = {}
2500
2501         c0 = self.clients[0]
2502         d = c0.create_empty_dirnode()
2503         def _created_root(n):
2504             self.root = n
2505             self.root_uri = n.get_uri()
2506         d.addCallback(_created_root)
2507         d.addCallback(self.create_mangled, "mutable-good")
2508         d.addCallback(self.create_mangled, "mutable-missing-shares")
2509         d.addCallback(self.create_mangled, "mutable-corrupt-shares")
2510         d.addCallback(self.create_mangled, "mutable-unrecoverable")
2511         d.addCallback(self.create_mangled, "large-good")
2512         d.addCallback(self.create_mangled, "large-missing-shares")
2513         d.addCallback(self.create_mangled, "large-corrupt-shares")
2514         d.addCallback(self.create_mangled, "large-unrecoverable")
2515
2516         return d
2517
2518
2519     def create_mangled(self, ignored, name):
2520         nodetype, mangletype = name.split("-", 1)
2521         if nodetype == "mutable":
2522             d = self.clients[0].create_mutable_file("mutable file contents")
2523             d.addCallback(lambda n: self.root.set_node(unicode(name), n))
2524         elif nodetype == "large":
2525             large = upload.Data("Lots of data\n" * 1000 + name + "\n", None)
2526             d = self.root.add_file(unicode(name), large)
2527         elif nodetype == "small":
2528             small = upload.Data("Small enough for a LIT", None)
2529             d = self.root.add_file(unicode(name), small)
2530
2531         def _stash_node(node):
2532             self.nodes[name] = node
2533             return node
2534         d.addCallback(_stash_node)
2535
2536         if mangletype == "good":
2537             pass
2538         elif mangletype == "missing-shares":
2539             d.addCallback(self._delete_some_shares)
2540         elif mangletype == "corrupt-shares":
2541             d.addCallback(self._corrupt_some_shares)
2542         else:
2543             assert mangletype == "unrecoverable"
2544             d.addCallback(self._delete_most_shares)
2545
2546         return d
2547
2548     def _run_cli(self, argv):
2549         stdout, stderr = StringIO(), StringIO()
2550         # this can only do synchronous operations
2551         assert argv[0] == "debug"
2552         runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
2553         return stdout.getvalue()
2554
2555     def _find_shares(self, node):
2556         si = node.get_storage_index()
2557         out = self._run_cli(["debug", "find-shares", base32.b2a(si)] +
2558                             [c.basedir for c in self.clients])
2559         files = out.split("\n")
2560         return [f for f in files if f]
2561
2562     def _delete_some_shares(self, node):
2563         shares = self._find_shares(node)
2564         os.unlink(shares[0])
2565         os.unlink(shares[1])
2566
2567     def _corrupt_some_shares(self, node):
2568         shares = self._find_shares(node)
2569         self._run_cli(["debug", "corrupt-share", shares[0]])
2570         self._run_cli(["debug", "corrupt-share", shares[1]])
2571
2572     def _delete_most_shares(self, node):
2573         shares = self._find_shares(node)
2574         for share in shares[1:]:
2575             os.unlink(share)
2576
2577
2578     def check_is_healthy(self, cr, where):
2579         self.failUnless(ICheckerResults.providedBy(cr), where)
2580         self.failUnless(cr.is_healthy(), where)
2581         self.failUnless(cr.is_recoverable(), where)
2582         d = cr.get_data()
2583         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2584         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2585         return cr
2586
2587     def check_is_missing_shares(self, cr, where):
2588         self.failUnless(ICheckerResults.providedBy(cr), where)
2589         self.failIf(cr.is_healthy(), where)
2590         self.failUnless(cr.is_recoverable(), where)
2591         d = cr.get_data()
2592         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2593         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2594         return cr
2595
2596     def check_has_corrupt_shares(self, cr, where):
2597         # by "corrupt-shares" we mean the file is still recoverable
2598         self.failUnless(ICheckerResults.providedBy(cr), where)
2599         d = cr.get_data()
2600         self.failIf(cr.is_healthy(), where)
2601         self.failUnless(cr.is_recoverable(), where)
2602         d = cr.get_data()
2603         self.failUnless(d["count-shares-good"] < 10, where)
2604         self.failUnless(d["count-corrupt-shares"], where)
2605         self.failUnless(d["list-corrupt-shares"], where)
2606         return cr
2607
2608     def check_is_unrecoverable(self, cr, where):
2609         self.failUnless(ICheckerResults.providedBy(cr), where)
2610         d = cr.get_data()
2611         self.failIf(cr.is_healthy(), where)
2612         self.failIf(cr.is_recoverable(), where)
2613         self.failUnless(d["count-shares-good"] < d["count-shares-needed"],
2614                         where)
2615         self.failUnlessEqual(d["count-recoverable-versions"], 0, where)
2616         self.failUnlessEqual(d["count-unrecoverable-versions"], 1, where)
2617         return cr
2618
2619     def do_test_check_bad(self, ignored):
2620         d = defer.succeed(None)
2621
2622         # check the individual items, without verification. This will not
2623         # detect corrupt shares.
2624         def _check(which, checker):
2625             d = self.nodes[which].check(Monitor())
2626             d.addCallback(checker, which + "--check")
2627             return d
2628
2629         d.addCallback(lambda ign: _check("mutable-good", self.check_is_healthy))
2630         d.addCallback(lambda ign: _check("mutable-missing-shares",
2631                                          self.check_is_missing_shares))
2632         d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2633                                          self.check_is_healthy))
2634         d.addCallback(lambda ign: _check("mutable-unrecoverable",
2635                                          self.check_is_unrecoverable))
2636         d.addCallback(lambda ign: _check("large-good", self.check_is_healthy))
2637         d.addCallback(lambda ign: _check("large-missing-shares",
2638                                          self.check_is_missing_shares))
2639         d.addCallback(lambda ign: _check("large-corrupt-shares",
2640                                          self.check_is_healthy))
2641         d.addCallback(lambda ign: _check("large-unrecoverable",
2642                                          self.check_is_unrecoverable))
2643
2644         # and again with verify=True, which *does* detect corrupt shares.
2645         def _checkv(which, checker):
2646             d = self.nodes[which].check(Monitor(), verify=True)
2647             d.addCallback(checker, which + "--check-and-verify")
2648             return d
2649
2650         d.addCallback(lambda ign: _checkv("mutable-good", self.check_is_healthy))
2651         d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2652                                          self.check_is_missing_shares))
2653         d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2654                                          self.check_has_corrupt_shares))
2655         d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2656                                          self.check_is_unrecoverable))
2657         d.addCallback(lambda ign: _checkv("large-good", self.check_is_healthy))
2658         # disabled pending immutable verifier
2659         #d.addCallback(lambda ign: _checkv("large-missing-shares",
2660         #                                 self.check_is_missing_shares))
2661         #d.addCallback(lambda ign: _checkv("large-corrupt-shares",
2662         #                                 self.check_has_corrupt_shares))
2663         d.addCallback(lambda ign: _checkv("large-unrecoverable",
2664                                          self.check_is_unrecoverable))
2665
2666         return d
2667
2668     def do_test_deepcheck_bad(self, ignored):
2669         d = defer.succeed(None)
2670
2671         # now deep-check the root, with various verify= and repair= options
2672         d.addCallback(lambda ign:
2673                       self.root.start_deep_check().when_done())
2674         def _check1(cr):
2675             self.failUnless(IDeepCheckResults.providedBy(cr))
2676             c = cr.get_counters()
2677             self.failUnlessEqual(c["count-objects-checked"], 9)
2678             self.failUnlessEqual(c["count-objects-healthy"], 5)
2679             self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2680             self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2681         d.addCallback(_check1)
2682
2683         d.addCallback(lambda ign:
2684                       self.root.start_deep_check(verify=True).when_done())
2685         def _check2(cr):
2686             self.failUnless(IDeepCheckResults.providedBy(cr))
2687             c = cr.get_counters()
2688             self.failUnlessEqual(c["count-objects-checked"], 9)
2689             # until we have a real immutable verifier, these counts will be
2690             # off
2691             #self.failUnlessEqual(c["count-objects-healthy"], 3)
2692             #self.failUnlessEqual(c["count-objects-unhealthy"], 6)
2693             self.failUnlessEqual(c["count-objects-healthy"], 5) # todo
2694             self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2695             self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2696         d.addCallback(_check2)
2697
2698         return d
2699
2700     def json_is_healthy(self, data, where):
2701         r = data["results"]
2702         self.failUnless(r["healthy"], where)
2703         self.failUnless(r["recoverable"], where)
2704         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2705         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2706
2707     def json_is_missing_shares(self, data, where):
2708         r = data["results"]
2709         self.failIf(r["healthy"], where)
2710         self.failUnless(r["recoverable"], where)
2711         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2712         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2713
2714     def json_has_corrupt_shares(self, data, where):
2715         # by "corrupt-shares" we mean the file is still recoverable
2716         r = data["results"]
2717         self.failIf(r["healthy"], where)
2718         self.failUnless(r["recoverable"], where)
2719         self.failUnless(r["count-shares-good"] < 10, where)
2720         self.failUnless(r["count-corrupt-shares"], where)
2721         self.failUnless(r["list-corrupt-shares"], where)
2722
2723     def json_is_unrecoverable(self, data, where):
2724         r = data["results"]
2725         self.failIf(r["healthy"], where)
2726         self.failIf(r["recoverable"], where)
2727         self.failUnless(r["count-shares-good"] < r["count-shares-needed"],
2728                         where)
2729         self.failUnlessEqual(r["count-recoverable-versions"], 0, where)
2730         self.failUnlessEqual(r["count-unrecoverable-versions"], 1, where)
2731
2732     def do_test_web_bad(self, ignored):
2733         d = defer.succeed(None)
2734
2735         # check, no verify
2736         def _check(which, checker):
2737             d = self.web_json(self.nodes[which], t="check")
2738             d.addCallback(checker, which + "--webcheck")
2739             return d
2740
2741         d.addCallback(lambda ign: _check("mutable-good",
2742                                          self.json_is_healthy))
2743         d.addCallback(lambda ign: _check("mutable-missing-shares",
2744                                          self.json_is_missing_shares))
2745         d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2746                                          self.json_is_healthy))
2747         d.addCallback(lambda ign: _check("mutable-unrecoverable",
2748                                          self.json_is_unrecoverable))
2749         d.addCallback(lambda ign: _check("large-good",
2750                                          self.json_is_healthy))
2751         d.addCallback(lambda ign: _check("large-missing-shares",
2752                                          self.json_is_missing_shares))
2753         d.addCallback(lambda ign: _check("large-corrupt-shares",
2754                                          self.json_is_healthy))
2755         d.addCallback(lambda ign: _check("large-unrecoverable",
2756                                          self.json_is_unrecoverable))
2757
2758         # check and verify
2759         def _checkv(which, checker):
2760             d = self.web_json(self.nodes[which], t="check", verify="true")
2761             d.addCallback(checker, which + "--webcheck-and-verify")
2762             return d
2763
2764         d.addCallback(lambda ign: _checkv("mutable-good",
2765                                           self.json_is_healthy))
2766         d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2767                                          self.json_is_missing_shares))
2768         d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2769                                          self.json_has_corrupt_shares))
2770         d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2771                                          self.json_is_unrecoverable))
2772         d.addCallback(lambda ign: _checkv("large-good",
2773                                           self.json_is_healthy))
2774         # disabled pending immutable verifier
2775         #d.addCallback(lambda ign: _checkv("large-missing-shares",
2776         #                                 self.json_is_missing_shares))
2777         #d.addCallback(lambda ign: _checkv("large-corrupt-shares",
2778         #                                 self.json_has_corrupt_shares))
2779         d.addCallback(lambda ign: _checkv("large-unrecoverable",
2780                                          self.json_is_unrecoverable))
2781
2782         return d