]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
942ffe4b7d0557a69bed45b4968d072015b2650e
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri, storage, offloaded
12 from allmydata.immutable import download, upload, filenode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17      ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
18      IDeepCheckAndRepairResults, NoSuchChildError, NotEnoughSharesError
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
26
27 from allmydata.test.common import SystemTestMixin, ErrorMixin, \
28      MemoryConsumer, download_to_data
29
30 LARGE_DATA = """
31 This is some data to publish to the virtual drive, which needs to be large
32 enough to not fit inside a LIT uri.
33 """
34
35 class CountingDataUploadable(upload.Data):
36     bytes_read = 0
37     interrupt_after = None
38     interrupt_after_d = None
39
40     def read(self, length):
41         self.bytes_read += length
42         if self.interrupt_after is not None:
43             if self.bytes_read > self.interrupt_after:
44                 self.interrupt_after = None
45                 self.interrupt_after_d.callback(self)
46         return upload.Data.read(self, length)
47
48 class GrabEverythingConsumer:
49     implements(IConsumer)
50
51     def __init__(self):
52         self.contents = ""
53
54     def registerProducer(self, producer, streaming):
55         assert streaming
56         assert IPushProducer.providedBy(producer)
57
58     def write(self, data):
59         self.contents += data
60
61     def unregisterProducer(self):
62         pass
63
64 class SystemTest(SystemTestMixin, unittest.TestCase):
65
66     def test_connections(self):
67         self.basedir = "system/SystemTest/test_connections"
68         d = self.set_up_nodes()
69         self.extra_node = None
70         d.addCallback(lambda res: self.add_extra_node(self.numclients))
71         def _check(extra_node):
72             self.extra_node = extra_node
73             for c in self.clients:
74                 all_peerids = list(c.get_all_peerids())
75                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
76                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
77                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
78
79         d.addCallback(_check)
80         def _shutdown_extra_node(res):
81             if self.extra_node:
82                 return self.extra_node.stopService()
83             return res
84         d.addBoth(_shutdown_extra_node)
85         return d
86     test_connections.timeout = 300
87     # test_connections is subsumed by test_upload_and_download, and takes
88     # quite a while to run on a slow machine (because of all the TLS
89     # connections that must be established). If we ever rework the introducer
90     # code to such an extent that we're not sure if it works anymore, we can
91     # reinstate this test until it does.
92     del test_connections
93
94     def test_upload_and_download_random_key(self):
95         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
96         return self._test_upload_and_download(convergence=None)
97     test_upload_and_download_random_key.timeout = 4800
98
99     def test_upload_and_download_convergent(self):
100         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
101         return self._test_upload_and_download(convergence="some convergence string")
102     test_upload_and_download_convergent.timeout = 4800
103
104     def _test_upload_and_download(self, convergence):
105         # we use 4000 bytes of data, which will result in about 400k written
106         # to disk among all our simulated nodes
107         DATA = "Some data to upload\n" * 200
108         d = self.set_up_nodes()
109         def _check_connections(res):
110             for c in self.clients:
111                 all_peerids = list(c.get_all_peerids())
112                 self.failUnlessEqual(len(all_peerids), self.numclients)
113                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
114                 self.failUnlessEqual(len(permuted_peers), self.numclients)
115         d.addCallback(_check_connections)
116
117         def _do_upload(res):
118             log.msg("UPLOADING")
119             u = self.clients[0].getServiceNamed("uploader")
120             self.uploader = u
121             # we crank the max segsize down to 1024b for the duration of this
122             # test, so we can exercise multiple segments. It is important
123             # that this is not a multiple of the segment size, so that the
124             # tail segment is not the same length as the others. This actualy
125             # gets rounded up to 1025 to be a multiple of the number of
126             # required shares (since we use 25 out of 100 FEC).
127             up = upload.Data(DATA, convergence=convergence)
128             up.max_segment_size = 1024
129             d1 = u.upload(up)
130             return d1
131         d.addCallback(_do_upload)
132         def _upload_done(results):
133             uri = results.uri
134             log.msg("upload finished: uri is %s" % (uri,))
135             self.uri = uri
136             dl = self.clients[1].getServiceNamed("downloader")
137             self.downloader = dl
138         d.addCallback(_upload_done)
139
140         def _upload_again(res):
141             # Upload again. If using convergent encryption then this ought to be
142             # short-circuited, however with the way we currently generate URIs
143             # (i.e. because they include the roothash), we have to do all of the
144             # encoding work, and only get to save on the upload part.
145             log.msg("UPLOADING AGAIN")
146             up = upload.Data(DATA, convergence=convergence)
147             up.max_segment_size = 1024
148             d1 = self.uploader.upload(up)
149         d.addCallback(_upload_again)
150
151         def _download_to_data(res):
152             log.msg("DOWNLOADING")
153             return self.downloader.download_to_data(self.uri)
154         d.addCallback(_download_to_data)
155         def _download_to_data_done(data):
156             log.msg("download finished")
157             self.failUnlessEqual(data, DATA)
158         d.addCallback(_download_to_data_done)
159
160         target_filename = os.path.join(self.basedir, "download.target")
161         def _download_to_filename(res):
162             return self.downloader.download_to_filename(self.uri,
163                                                         target_filename)
164         d.addCallback(_download_to_filename)
165         def _download_to_filename_done(res):
166             newdata = open(target_filename, "rb").read()
167             self.failUnlessEqual(newdata, DATA)
168         d.addCallback(_download_to_filename_done)
169
170         target_filename2 = os.path.join(self.basedir, "download.target2")
171         def _download_to_filehandle(res):
172             fh = open(target_filename2, "wb")
173             return self.downloader.download_to_filehandle(self.uri, fh)
174         d.addCallback(_download_to_filehandle)
175         def _download_to_filehandle_done(fh):
176             fh.close()
177             newdata = open(target_filename2, "rb").read()
178             self.failUnlessEqual(newdata, DATA)
179         d.addCallback(_download_to_filehandle_done)
180
181         consumer = GrabEverythingConsumer()
182         ct = download.ConsumerAdapter(consumer)
183         d.addCallback(lambda res:
184                       self.downloader.download(self.uri, ct))
185         def _download_to_consumer_done(ign):
186             self.failUnlessEqual(consumer.contents, DATA)
187         d.addCallback(_download_to_consumer_done)
188
189         def _test_read(res):
190             n = self.clients[1].create_node_from_uri(self.uri)
191             d = download_to_data(n)
192             def _read_done(data):
193                 self.failUnlessEqual(data, DATA)
194             d.addCallback(_read_done)
195             d.addCallback(lambda ign:
196                           n.read(MemoryConsumer(), offset=1, size=4))
197             def _read_portion_done(mc):
198                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
199             d.addCallback(_read_portion_done)
200             d.addCallback(lambda ign:
201                           n.read(MemoryConsumer(), offset=2, size=None))
202             def _read_tail_done(mc):
203                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
204             d.addCallback(_read_tail_done)
205             d.addCallback(lambda ign:
206                           n.read(MemoryConsumer(), size=len(DATA)+1000))
207             def _read_too_much(mc):
208                 self.failUnlessEqual("".join(mc.chunks), DATA)
209             d.addCallback(_read_too_much)
210
211             return d
212         d.addCallback(_test_read)
213
214         def _test_bad_read(res):
215             bad_u = uri.from_string_filenode(self.uri)
216             bad_u.key = self.flip_bit(bad_u.key)
217             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
218             # this should cause an error during download
219
220             d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
221                                  None,
222                                  bad_n.read, MemoryConsumer(), offset=2)
223             return d
224         d.addCallback(_test_bad_read)
225
226         def _download_nonexistent_uri(res):
227             baduri = self.mangle_uri(self.uri)
228             log.msg("about to download non-existent URI", level=log.UNUSUAL,
229                     facility="tahoe.tests")
230             d1 = self.downloader.download_to_data(baduri)
231             def _baduri_should_fail(res):
232                 log.msg("finished downloading non-existend URI",
233                         level=log.UNUSUAL, facility="tahoe.tests")
234                 self.failUnless(isinstance(res, Failure))
235                 self.failUnless(res.check(NotEnoughSharesError),
236                                 "expected NotEnoughSharesError, got %s" % res)
237                 # TODO: files that have zero peers should get a special kind
238                 # of NotEnoughSharesError, which can be used to suggest that
239                 # the URI might be wrong or that they've never uploaded the
240                 # file in the first place.
241             d1.addBoth(_baduri_should_fail)
242             return d1
243         d.addCallback(_download_nonexistent_uri)
244
245         # add a new node, which doesn't accept shares, and only uses the
246         # helper for upload.
247         d.addCallback(lambda res: self.add_extra_node(self.numclients,
248                                                       self.helper_furl,
249                                                       add_to_sparent=True))
250         def _added(extra_node):
251             self.extra_node = extra_node
252         d.addCallback(_added)
253
254         HELPER_DATA = "Data that needs help to upload" * 1000
255         def _upload_with_helper(res):
256             u = upload.Data(HELPER_DATA, convergence=convergence)
257             d = self.extra_node.upload(u)
258             def _uploaded(results):
259                 uri = results.uri
260                 return self.downloader.download_to_data(uri)
261             d.addCallback(_uploaded)
262             def _check(newdata):
263                 self.failUnlessEqual(newdata, HELPER_DATA)
264             d.addCallback(_check)
265             return d
266         d.addCallback(_upload_with_helper)
267
268         def _upload_duplicate_with_helper(res):
269             u = upload.Data(HELPER_DATA, convergence=convergence)
270             u.debug_stash_RemoteEncryptedUploadable = True
271             d = self.extra_node.upload(u)
272             def _uploaded(results):
273                 uri = results.uri
274                 return self.downloader.download_to_data(uri)
275             d.addCallback(_uploaded)
276             def _check(newdata):
277                 self.failUnlessEqual(newdata, HELPER_DATA)
278                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
279                             "uploadable started uploading, should have been avoided")
280             d.addCallback(_check)
281             return d
282         if convergence is not None:
283             d.addCallback(_upload_duplicate_with_helper)
284
285         def _upload_resumable(res):
286             DATA = "Data that needs help to upload and gets interrupted" * 1000
287             u1 = CountingDataUploadable(DATA, convergence=convergence)
288             u2 = CountingDataUploadable(DATA, convergence=convergence)
289
290             # we interrupt the connection after about 5kB by shutting down
291             # the helper, then restartingit.
292             u1.interrupt_after = 5000
293             u1.interrupt_after_d = defer.Deferred()
294             u1.interrupt_after_d.addCallback(lambda res:
295                                              self.bounce_client(0))
296
297             # sneak into the helper and reduce its chunk size, so that our
298             # debug_interrupt will sever the connection on about the fifth
299             # chunk fetched. This makes sure that we've started to write the
300             # new shares before we abandon them, which exercises the
301             # abort/delete-partial-share code. TODO: find a cleaner way to do
302             # this. I know that this will affect later uses of the helper in
303             # this same test run, but I'm not currently worried about it.
304             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
305
306             d = self.extra_node.upload(u1)
307
308             def _should_not_finish(res):
309                 self.fail("interrupted upload should have failed, not finished"
310                           " with result %s" % (res,))
311             def _interrupted(f):
312                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
313
314                 # make sure we actually interrupted it before finishing the
315                 # file
316                 self.failUnless(u1.bytes_read < len(DATA),
317                                 "read %d out of %d total" % (u1.bytes_read,
318                                                              len(DATA)))
319
320                 log.msg("waiting for reconnect", level=log.NOISY,
321                         facility="tahoe.test.test_system")
322                 # now, we need to give the nodes a chance to notice that this
323                 # connection has gone away. When this happens, the storage
324                 # servers will be told to abort their uploads, removing the
325                 # partial shares. Unfortunately this involves TCP messages
326                 # going through the loopback interface, and we can't easily
327                 # predict how long that will take. If it were all local, we
328                 # could use fireEventually() to stall. Since we don't have
329                 # the right introduction hooks, the best we can do is use a
330                 # fixed delay. TODO: this is fragile.
331                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
332                 return u1.interrupt_after_d
333             d.addCallbacks(_should_not_finish, _interrupted)
334
335             def _disconnected(res):
336                 # check to make sure the storage servers aren't still hanging
337                 # on to the partial share: their incoming/ directories should
338                 # now be empty.
339                 log.msg("disconnected", level=log.NOISY,
340                         facility="tahoe.test.test_system")
341                 for i in range(self.numclients):
342                     incdir = os.path.join(self.getdir("client%d" % i),
343                                           "storage", "shares", "incoming")
344                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
345             d.addCallback(_disconnected)
346
347             # then we need to give the reconnector a chance to
348             # reestablish the connection to the helper.
349             d.addCallback(lambda res:
350                           log.msg("wait_for_connections", level=log.NOISY,
351                                   facility="tahoe.test.test_system"))
352             d.addCallback(lambda res: self.wait_for_connections())
353
354
355             d.addCallback(lambda res:
356                           log.msg("uploading again", level=log.NOISY,
357                                   facility="tahoe.test.test_system"))
358             d.addCallback(lambda res: self.extra_node.upload(u2))
359
360             def _uploaded(results):
361                 uri = results.uri
362                 log.msg("Second upload complete", level=log.NOISY,
363                         facility="tahoe.test.test_system")
364
365                 # this is really bytes received rather than sent, but it's
366                 # convenient and basically measures the same thing
367                 bytes_sent = results.ciphertext_fetched
368
369                 # We currently don't support resumption of upload if the data is
370                 # encrypted with a random key.  (Because that would require us
371                 # to store the key locally and re-use it on the next upload of
372                 # this file, which isn't a bad thing to do, but we currently
373                 # don't do it.)
374                 if convergence is not None:
375                     # Make sure we did not have to read the whole file the
376                     # second time around .
377                     self.failUnless(bytes_sent < len(DATA),
378                                 "resumption didn't save us any work:"
379                                 " read %d bytes out of %d total" %
380                                 (bytes_sent, len(DATA)))
381                 else:
382                     # Make sure we did have to read the whole file the second
383                     # time around -- because the one that we partially uploaded
384                     # earlier was encrypted with a different random key.
385                     self.failIf(bytes_sent < len(DATA),
386                                 "resumption saved us some work even though we were using random keys:"
387                                 " read %d bytes out of %d total" %
388                                 (bytes_sent, len(DATA)))
389                 return self.downloader.download_to_data(uri)
390             d.addCallback(_uploaded)
391
392             def _check(newdata):
393                 self.failUnlessEqual(newdata, DATA)
394                 # If using convergent encryption, then also check that the
395                 # helper has removed the temp file from its directories.
396                 if convergence is not None:
397                     basedir = os.path.join(self.getdir("client0"), "helper")
398                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
399                     self.failUnlessEqual(files, [])
400                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
401                     self.failUnlessEqual(files, [])
402             d.addCallback(_check)
403             return d
404         d.addCallback(_upload_resumable)
405
406         return d
407
408     def _find_shares(self, basedir):
409         shares = []
410         for (dirpath, dirnames, filenames) in os.walk(basedir):
411             if "storage" not in dirpath:
412                 continue
413             if not filenames:
414                 continue
415             pieces = dirpath.split(os.sep)
416             if pieces[-4] == "storage" and pieces[-3] == "shares":
417                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
418                 # are sharefiles here
419                 assert pieces[-5].startswith("client")
420                 client_num = int(pieces[-5][-1])
421                 storage_index_s = pieces[-1]
422                 storage_index = storage.si_a2b(storage_index_s)
423                 for sharename in filenames:
424                     shnum = int(sharename)
425                     filename = os.path.join(dirpath, sharename)
426                     data = (client_num, storage_index, filename, shnum)
427                     shares.append(data)
428         if not shares:
429             self.fail("unable to find any share files in %s" % basedir)
430         return shares
431
432     def _corrupt_mutable_share(self, filename, which):
433         msf = storage.MutableShareFile(filename)
434         datav = msf.readv([ (0, 1000000) ])
435         final_share = datav[0]
436         assert len(final_share) < 1000000 # ought to be truncated
437         pieces = mutable_layout.unpack_share(final_share)
438         (seqnum, root_hash, IV, k, N, segsize, datalen,
439          verification_key, signature, share_hash_chain, block_hash_tree,
440          share_data, enc_privkey) = pieces
441
442         if which == "seqnum":
443             seqnum = seqnum + 15
444         elif which == "R":
445             root_hash = self.flip_bit(root_hash)
446         elif which == "IV":
447             IV = self.flip_bit(IV)
448         elif which == "segsize":
449             segsize = segsize + 15
450         elif which == "pubkey":
451             verification_key = self.flip_bit(verification_key)
452         elif which == "signature":
453             signature = self.flip_bit(signature)
454         elif which == "share_hash_chain":
455             nodenum = share_hash_chain.keys()[0]
456             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
457         elif which == "block_hash_tree":
458             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
459         elif which == "share_data":
460             share_data = self.flip_bit(share_data)
461         elif which == "encprivkey":
462             enc_privkey = self.flip_bit(enc_privkey)
463
464         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
465                                             segsize, datalen)
466         final_share = mutable_layout.pack_share(prefix,
467                                                 verification_key,
468                                                 signature,
469                                                 share_hash_chain,
470                                                 block_hash_tree,
471                                                 share_data,
472                                                 enc_privkey)
473         msf.writev( [(0, final_share)], None)
474
475
476     def test_mutable(self):
477         self.basedir = "system/SystemTest/test_mutable"
478         DATA = "initial contents go here."  # 25 bytes % 3 != 0
479         NEWDATA = "new contents yay"
480         NEWERDATA = "this is getting old"
481
482         d = self.set_up_nodes(use_key_generator=True)
483
484         def _create_mutable(res):
485             c = self.clients[0]
486             log.msg("starting create_mutable_file")
487             d1 = c.create_mutable_file(DATA)
488             def _done(res):
489                 log.msg("DONE: %s" % (res,))
490                 self._mutable_node_1 = res
491                 uri = res.get_uri()
492             d1.addCallback(_done)
493             return d1
494         d.addCallback(_create_mutable)
495
496         def _test_debug(res):
497             # find a share. It is important to run this while there is only
498             # one slot in the grid.
499             shares = self._find_shares(self.basedir)
500             (client_num, storage_index, filename, shnum) = shares[0]
501             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
502                     % filename)
503             log.msg(" for clients[%d]" % client_num)
504
505             out,err = StringIO(), StringIO()
506             rc = runner.runner(["debug", "dump-share", "--offsets",
507                                 filename],
508                                stdout=out, stderr=err)
509             output = out.getvalue()
510             self.failUnlessEqual(rc, 0)
511             try:
512                 self.failUnless("Mutable slot found:\n" in output)
513                 self.failUnless("share_type: SDMF\n" in output)
514                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
515                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
516                 self.failUnless(" num_extra_leases: 0\n" in output)
517                 # the pubkey size can vary by a byte, so the container might
518                 # be a bit larger on some runs.
519                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
520                 self.failUnless(m)
521                 container_size = int(m.group(1))
522                 self.failUnless(2037 <= container_size <= 2049, container_size)
523                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
524                 self.failUnless(m)
525                 data_length = int(m.group(1))
526                 self.failUnless(2037 <= data_length <= 2049, data_length)
527                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
528                                 in output)
529                 self.failUnless(" SDMF contents:\n" in output)
530                 self.failUnless("  seqnum: 1\n" in output)
531                 self.failUnless("  required_shares: 3\n" in output)
532                 self.failUnless("  total_shares: 10\n" in output)
533                 self.failUnless("  segsize: 27\n" in output, (output, filename))
534                 self.failUnless("  datalen: 25\n" in output)
535                 # the exact share_hash_chain nodes depends upon the sharenum,
536                 # and is more of a hassle to compute than I want to deal with
537                 # now
538                 self.failUnless("  share_hash_chain: " in output)
539                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
540                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
541                             base32.b2a(storage_index))
542                 self.failUnless(expected in output)
543             except unittest.FailTest:
544                 print
545                 print "dump-share output was:"
546                 print output
547                 raise
548         d.addCallback(_test_debug)
549
550         # test retrieval
551
552         # first, let's see if we can use the existing node to retrieve the
553         # contents. This allows it to use the cached pubkey and maybe the
554         # latest-known sharemap.
555
556         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
557         def _check_download_1(res):
558             self.failUnlessEqual(res, DATA)
559             # now we see if we can retrieve the data from a new node,
560             # constructed using the URI of the original one. We do this test
561             # on the same client that uploaded the data.
562             uri = self._mutable_node_1.get_uri()
563             log.msg("starting retrieve1")
564             newnode = self.clients[0].create_node_from_uri(uri)
565             newnode_2 = self.clients[0].create_node_from_uri(uri)
566             self.failUnlessIdentical(newnode, newnode_2)
567             return newnode.download_best_version()
568         d.addCallback(_check_download_1)
569
570         def _check_download_2(res):
571             self.failUnlessEqual(res, DATA)
572             # same thing, but with a different client
573             uri = self._mutable_node_1.get_uri()
574             newnode = self.clients[1].create_node_from_uri(uri)
575             log.msg("starting retrieve2")
576             d1 = newnode.download_best_version()
577             d1.addCallback(lambda res: (res, newnode))
578             return d1
579         d.addCallback(_check_download_2)
580
581         def _check_download_3((res, newnode)):
582             self.failUnlessEqual(res, DATA)
583             # replace the data
584             log.msg("starting replace1")
585             d1 = newnode.overwrite(NEWDATA)
586             d1.addCallback(lambda res: newnode.download_best_version())
587             return d1
588         d.addCallback(_check_download_3)
589
590         def _check_download_4(res):
591             self.failUnlessEqual(res, NEWDATA)
592             # now create an even newer node and replace the data on it. This
593             # new node has never been used for download before.
594             uri = self._mutable_node_1.get_uri()
595             newnode1 = self.clients[2].create_node_from_uri(uri)
596             newnode2 = self.clients[3].create_node_from_uri(uri)
597             self._newnode3 = self.clients[3].create_node_from_uri(uri)
598             log.msg("starting replace2")
599             d1 = newnode1.overwrite(NEWERDATA)
600             d1.addCallback(lambda res: newnode2.download_best_version())
601             return d1
602         d.addCallback(_check_download_4)
603
604         def _check_download_5(res):
605             log.msg("finished replace2")
606             self.failUnlessEqual(res, NEWERDATA)
607         d.addCallback(_check_download_5)
608
609         def _corrupt_shares(res):
610             # run around and flip bits in all but k of the shares, to test
611             # the hash checks
612             shares = self._find_shares(self.basedir)
613             ## sort by share number
614             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
615             where = dict([ (shnum, filename)
616                            for (client_num, storage_index, filename, shnum)
617                            in shares ])
618             assert len(where) == 10 # this test is designed for 3-of-10
619             for shnum, filename in where.items():
620                 # shares 7,8,9 are left alone. read will check
621                 # (share_hash_chain, block_hash_tree, share_data). New
622                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
623                 # segsize, signature).
624                 if shnum == 0:
625                     # read: this will trigger "pubkey doesn't match
626                     # fingerprint".
627                     self._corrupt_mutable_share(filename, "pubkey")
628                     self._corrupt_mutable_share(filename, "encprivkey")
629                 elif shnum == 1:
630                     # triggers "signature is invalid"
631                     self._corrupt_mutable_share(filename, "seqnum")
632                 elif shnum == 2:
633                     # triggers "signature is invalid"
634                     self._corrupt_mutable_share(filename, "R")
635                 elif shnum == 3:
636                     # triggers "signature is invalid"
637                     self._corrupt_mutable_share(filename, "segsize")
638                 elif shnum == 4:
639                     self._corrupt_mutable_share(filename, "share_hash_chain")
640                 elif shnum == 5:
641                     self._corrupt_mutable_share(filename, "block_hash_tree")
642                 elif shnum == 6:
643                     self._corrupt_mutable_share(filename, "share_data")
644                 # other things to correct: IV, signature
645                 # 7,8,9 are left alone
646
647                 # note that initial_query_count=5 means that we'll hit the
648                 # first 5 servers in effectively random order (based upon
649                 # response time), so we won't necessarily ever get a "pubkey
650                 # doesn't match fingerprint" error (if we hit shnum>=1 before
651                 # shnum=0, we pull the pubkey from there). To get repeatable
652                 # specific failures, we need to set initial_query_count=1,
653                 # but of course that will change the sequencing behavior of
654                 # the retrieval process. TODO: find a reasonable way to make
655                 # this a parameter, probably when we expand this test to test
656                 # for one failure mode at a time.
657
658                 # when we retrieve this, we should get three signature
659                 # failures (where we've mangled seqnum, R, and segsize). The
660                 # pubkey mangling
661         d.addCallback(_corrupt_shares)
662
663         d.addCallback(lambda res: self._newnode3.download_best_version())
664         d.addCallback(_check_download_5)
665
666         def _check_empty_file(res):
667             # make sure we can create empty files, this usually screws up the
668             # segsize math
669             d1 = self.clients[2].create_mutable_file("")
670             d1.addCallback(lambda newnode: newnode.download_best_version())
671             d1.addCallback(lambda res: self.failUnlessEqual("", res))
672             return d1
673         d.addCallback(_check_empty_file)
674
675         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
676         def _created_dirnode(dnode):
677             log.msg("_created_dirnode(%s)" % (dnode,))
678             d1 = dnode.list()
679             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
680             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
681             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
682             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
683             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
684             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
685             d1.addCallback(lambda res: dnode.build_manifest().when_done())
686             d1.addCallback(lambda res:
687                            self.failUnlessEqual(len(res["manifest"]), 1))
688             return d1
689         d.addCallback(_created_dirnode)
690
691         def wait_for_c3_kg_conn():
692             return self.clients[3]._key_generator is not None
693         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
694
695         def check_kg_poolsize(junk, size_delta):
696             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
697                                  self.key_generator_svc.key_generator.pool_size + size_delta)
698
699         d.addCallback(check_kg_poolsize, 0)
700         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
701         d.addCallback(check_kg_poolsize, -1)
702         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
703         d.addCallback(check_kg_poolsize, -2)
704         # use_helper induces use of clients[3], which is the using-key_gen client
705         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
706         d.addCallback(check_kg_poolsize, -3)
707
708         return d
709     # The default 120 second timeout went off when running it under valgrind
710     # on my old Windows laptop, so I'm bumping up the timeout.
711     test_mutable.timeout = 240
712
713     def flip_bit(self, good):
714         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
715
716     def mangle_uri(self, gooduri):
717         # change the key, which changes the storage index, which means we'll
718         # be asking about the wrong file, so nobody will have any shares
719         u = IFileURI(gooduri)
720         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
721                             uri_extension_hash=u.uri_extension_hash,
722                             needed_shares=u.needed_shares,
723                             total_shares=u.total_shares,
724                             size=u.size)
725         return u2.to_string()
726
727     # TODO: add a test which mangles the uri_extension_hash instead, and
728     # should fail due to not being able to get a valid uri_extension block.
729     # Also a test which sneakily mangles the uri_extension block to change
730     # some of the validation data, so it will fail in the post-download phase
731     # when the file's crypttext integrity check fails. Do the same thing for
732     # the key, which should cause the download to fail the post-download
733     # plaintext_hash check.
734
735     def test_vdrive(self):
736         self.basedir = "system/SystemTest/test_vdrive"
737         self.data = LARGE_DATA
738         d = self.set_up_nodes(use_stats_gatherer=True)
739         d.addCallback(self._test_introweb)
740         d.addCallback(self.log, "starting publish")
741         d.addCallback(self._do_publish1)
742         d.addCallback(self._test_runner)
743         d.addCallback(self._do_publish2)
744         # at this point, we have the following filesystem (where "R" denotes
745         # self._root_directory_uri):
746         # R
747         # R/subdir1
748         # R/subdir1/mydata567
749         # R/subdir1/subdir2/
750         # R/subdir1/subdir2/mydata992
751
752         d.addCallback(lambda res: self.bounce_client(0))
753         d.addCallback(self.log, "bounced client0")
754
755         d.addCallback(self._check_publish1)
756         d.addCallback(self.log, "did _check_publish1")
757         d.addCallback(self._check_publish2)
758         d.addCallback(self.log, "did _check_publish2")
759         d.addCallback(self._do_publish_private)
760         d.addCallback(self.log, "did _do_publish_private")
761         # now we also have (where "P" denotes a new dir):
762         #  P/personal/sekrit data
763         #  P/s2-rw -> /subdir1/subdir2/
764         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
765         d.addCallback(self._check_publish_private)
766         d.addCallback(self.log, "did _check_publish_private")
767         d.addCallback(self._test_web)
768         d.addCallback(self._test_control)
769         d.addCallback(self._test_cli)
770         # P now has four top-level children:
771         # P/personal/sekrit data
772         # P/s2-ro/
773         # P/s2-rw/
774         # P/test_put/  (empty)
775         d.addCallback(self._test_checker)
776         return d
777     test_vdrive.timeout = 1100
778
779     def _test_introweb(self, res):
780         d = getPage(self.introweb_url, method="GET", followRedirect=True)
781         def _check(res):
782             try:
783                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
784                                 in res)
785                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
786                 self.failUnless("Subscription Summary: storage: 5" in res)
787             except unittest.FailTest:
788                 print
789                 print "GET %s output was:" % self.introweb_url
790                 print res
791                 raise
792         d.addCallback(_check)
793         d.addCallback(lambda res:
794                       getPage(self.introweb_url + "?t=json",
795                               method="GET", followRedirect=True))
796         def _check_json(res):
797             data = simplejson.loads(res)
798             try:
799                 self.failUnlessEqual(data["subscription_summary"],
800                                      {"storage": 5})
801                 self.failUnlessEqual(data["announcement_summary"],
802                                      {"storage": 5, "stub_client": 5})
803                 self.failUnlessEqual(data["announcement_distinct_hosts"],
804                                      {"storage": 1, "stub_client": 1})
805             except unittest.FailTest:
806                 print
807                 print "GET %s?t=json output was:" % self.introweb_url
808                 print res
809                 raise
810         d.addCallback(_check_json)
811         return d
812
813     def _do_publish1(self, res):
814         ut = upload.Data(self.data, convergence=None)
815         c0 = self.clients[0]
816         d = c0.create_empty_dirnode()
817         def _made_root(new_dirnode):
818             self._root_directory_uri = new_dirnode.get_uri()
819             return c0.create_node_from_uri(self._root_directory_uri)
820         d.addCallback(_made_root)
821         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
822         def _made_subdir1(subdir1_node):
823             self._subdir1_node = subdir1_node
824             d1 = subdir1_node.add_file(u"mydata567", ut)
825             d1.addCallback(self.log, "publish finished")
826             def _stash_uri(filenode):
827                 self.uri = filenode.get_uri()
828             d1.addCallback(_stash_uri)
829             return d1
830         d.addCallback(_made_subdir1)
831         return d
832
833     def _do_publish2(self, res):
834         ut = upload.Data(self.data, convergence=None)
835         d = self._subdir1_node.create_empty_directory(u"subdir2")
836         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
837         return d
838
839     def log(self, res, *args, **kwargs):
840         # print "MSG: %s  RES: %s" % (msg, args)
841         log.msg(*args, **kwargs)
842         return res
843
844     def _do_publish_private(self, res):
845         self.smalldata = "sssh, very secret stuff"
846         ut = upload.Data(self.smalldata, convergence=None)
847         d = self.clients[0].create_empty_dirnode()
848         d.addCallback(self.log, "GOT private directory")
849         def _got_new_dir(privnode):
850             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
851             d1 = privnode.create_empty_directory(u"personal")
852             d1.addCallback(self.log, "made P/personal")
853             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
854             d1.addCallback(self.log, "made P/personal/sekrit data")
855             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
856             def _got_s2(s2node):
857                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
858                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
859                 return d2
860             d1.addCallback(_got_s2)
861             d1.addCallback(lambda res: privnode)
862             return d1
863         d.addCallback(_got_new_dir)
864         return d
865
866     def _check_publish1(self, res):
867         # this one uses the iterative API
868         c1 = self.clients[1]
869         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
870         d.addCallback(self.log, "check_publish1 got /")
871         d.addCallback(lambda root: root.get(u"subdir1"))
872         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
873         d.addCallback(lambda filenode: filenode.download_to_data())
874         d.addCallback(self.log, "get finished")
875         def _get_done(data):
876             self.failUnlessEqual(data, self.data)
877         d.addCallback(_get_done)
878         return d
879
880     def _check_publish2(self, res):
881         # this one uses the path-based API
882         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
883         d = rootnode.get_child_at_path(u"subdir1")
884         d.addCallback(lambda dirnode:
885                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
886         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
887         d.addCallback(lambda filenode: filenode.download_to_data())
888         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
889
890         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
891         def _got_filenode(filenode):
892             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
893             assert fnode == filenode
894         d.addCallback(_got_filenode)
895         return d
896
897     def _check_publish_private(self, resnode):
898         # this one uses the path-based API
899         self._private_node = resnode
900
901         d = self._private_node.get_child_at_path(u"personal")
902         def _got_personal(personal):
903             self._personal_node = personal
904             return personal
905         d.addCallback(_got_personal)
906
907         d.addCallback(lambda dirnode:
908                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
909         def get_path(path):
910             return self._private_node.get_child_at_path(path)
911
912         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
913         d.addCallback(lambda filenode: filenode.download_to_data())
914         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
915         d.addCallback(lambda res: get_path(u"s2-rw"))
916         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
917         d.addCallback(lambda res: get_path(u"s2-ro"))
918         def _got_s2ro(dirnode):
919             self.failUnless(dirnode.is_mutable(), dirnode)
920             self.failUnless(dirnode.is_readonly(), dirnode)
921             d1 = defer.succeed(None)
922             d1.addCallback(lambda res: dirnode.list())
923             d1.addCallback(self.log, "dirnode.list")
924
925             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
926
927             d1.addCallback(self.log, "doing add_file(ro)")
928             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
929             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
930
931             d1.addCallback(self.log, "doing get(ro)")
932             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
933             d1.addCallback(lambda filenode:
934                            self.failUnless(IFileNode.providedBy(filenode)))
935
936             d1.addCallback(self.log, "doing delete(ro)")
937             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
938
939             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
940
941             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
942
943             personal = self._personal_node
944             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
945
946             d1.addCallback(self.log, "doing move_child_to(ro)2")
947             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
948
949             d1.addCallback(self.log, "finished with _got_s2ro")
950             return d1
951         d.addCallback(_got_s2ro)
952         def _got_home(dummy):
953             home = self._private_node
954             personal = self._personal_node
955             d1 = defer.succeed(None)
956             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
957             d1.addCallback(lambda res:
958                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
959
960             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
961             d1.addCallback(lambda res:
962                            home.move_child_to(u"sekrit", home, u"sekrit data"))
963
964             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
965             d1.addCallback(lambda res:
966                            home.move_child_to(u"sekrit data", personal))
967
968             d1.addCallback(lambda res: home.build_manifest().when_done())
969             d1.addCallback(self.log, "manifest")
970             #  five items:
971             # P/
972             # P/personal/
973             # P/personal/sekrit data
974             # P/s2-rw  (same as P/s2-ro)
975             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
976             d1.addCallback(lambda res:
977                            self.failUnlessEqual(len(res["manifest"]), 5))
978             d1.addCallback(lambda res: home.start_deep_stats().when_done())
979             def _check_stats(stats):
980                 expected = {"count-immutable-files": 1,
981                             "count-mutable-files": 0,
982                             "count-literal-files": 1,
983                             "count-files": 2,
984                             "count-directories": 3,
985                             "size-immutable-files": 112,
986                             "size-literal-files": 23,
987                             #"size-directories": 616, # varies
988                             #"largest-directory": 616,
989                             "largest-directory-children": 3,
990                             "largest-immutable-file": 112,
991                             }
992                 for k,v in expected.iteritems():
993                     self.failUnlessEqual(stats[k], v,
994                                          "stats[%s] was %s, not %s" %
995                                          (k, stats[k], v))
996                 self.failUnless(stats["size-directories"] > 1300,
997                                 stats["size-directories"])
998                 self.failUnless(stats["largest-directory"] > 800,
999                                 stats["largest-directory"])
1000                 self.failUnlessEqual(stats["size-files-histogram"],
1001                                      [ (11, 31, 1), (101, 316, 1) ])
1002             d1.addCallback(_check_stats)
1003             return d1
1004         d.addCallback(_got_home)
1005         return d
1006
1007     def shouldFail(self, res, expected_failure, which, substring=None):
1008         if isinstance(res, Failure):
1009             res.trap(expected_failure)
1010             if substring:
1011                 self.failUnless(substring in str(res),
1012                                 "substring '%s' not in '%s'"
1013                                 % (substring, str(res)))
1014         else:
1015             self.fail("%s was supposed to raise %s, not get '%s'" %
1016                       (which, expected_failure, res))
1017
1018     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1019         assert substring is None or isinstance(substring, str)
1020         d = defer.maybeDeferred(callable, *args, **kwargs)
1021         def done(res):
1022             if isinstance(res, Failure):
1023                 res.trap(expected_failure)
1024                 if substring:
1025                     self.failUnless(substring in str(res),
1026                                     "substring '%s' not in '%s'"
1027                                     % (substring, str(res)))
1028             else:
1029                 self.fail("%s was supposed to raise %s, not get '%s'" %
1030                           (which, expected_failure, res))
1031         d.addBoth(done)
1032         return d
1033
1034     def PUT(self, urlpath, data):
1035         url = self.webish_url + urlpath
1036         return getPage(url, method="PUT", postdata=data)
1037
1038     def GET(self, urlpath, followRedirect=False):
1039         url = self.webish_url + urlpath
1040         return getPage(url, method="GET", followRedirect=followRedirect)
1041
1042     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1043         if use_helper:
1044             url = self.helper_webish_url + urlpath
1045         else:
1046             url = self.webish_url + urlpath
1047         sepbase = "boogabooga"
1048         sep = "--" + sepbase
1049         form = []
1050         form.append(sep)
1051         form.append('Content-Disposition: form-data; name="_charset"')
1052         form.append('')
1053         form.append('UTF-8')
1054         form.append(sep)
1055         for name, value in fields.iteritems():
1056             if isinstance(value, tuple):
1057                 filename, value = value
1058                 form.append('Content-Disposition: form-data; name="%s"; '
1059                             'filename="%s"' % (name, filename.encode("utf-8")))
1060             else:
1061                 form.append('Content-Disposition: form-data; name="%s"' % name)
1062             form.append('')
1063             form.append(str(value))
1064             form.append(sep)
1065         form[-1] += "--"
1066         body = "\r\n".join(form) + "\r\n"
1067         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1068                    }
1069         return getPage(url, method="POST", postdata=body,
1070                        headers=headers, followRedirect=followRedirect)
1071
1072     def _test_web(self, res):
1073         base = self.webish_url
1074         public = "uri/" + self._root_directory_uri
1075         d = getPage(base)
1076         def _got_welcome(page):
1077             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1078             self.failUnless(expected in page,
1079                             "I didn't see the right 'connected storage servers'"
1080                             " message in: %s" % page
1081                             )
1082             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1083             self.failUnless(expected in page,
1084                             "I didn't see the right 'My nodeid' message "
1085                             "in: %s" % page)
1086             self.failUnless("Helper: 0 active uploads" in page)
1087         d.addCallback(_got_welcome)
1088         d.addCallback(self.log, "done with _got_welcome")
1089
1090         # get the welcome page from the node that uses the helper too
1091         d.addCallback(lambda res: getPage(self.helper_webish_url))
1092         def _got_welcome_helper(page):
1093             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1094                             page)
1095             self.failUnless("Not running helper" in page)
1096         d.addCallback(_got_welcome_helper)
1097
1098         d.addCallback(lambda res: getPage(base + public))
1099         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1100         def _got_subdir1(page):
1101             # there ought to be an href for our file
1102             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1103             self.failUnless(">mydata567</a>" in page)
1104         d.addCallback(_got_subdir1)
1105         d.addCallback(self.log, "done with _got_subdir1")
1106         d.addCallback(lambda res:
1107                       getPage(base + public + "/subdir1/mydata567"))
1108         def _got_data(page):
1109             self.failUnlessEqual(page, self.data)
1110         d.addCallback(_got_data)
1111
1112         # download from a URI embedded in a URL
1113         d.addCallback(self.log, "_get_from_uri")
1114         def _get_from_uri(res):
1115             return getPage(base + "uri/%s?filename=%s"
1116                            % (self.uri, "mydata567"))
1117         d.addCallback(_get_from_uri)
1118         def _got_from_uri(page):
1119             self.failUnlessEqual(page, self.data)
1120         d.addCallback(_got_from_uri)
1121
1122         # download from a URI embedded in a URL, second form
1123         d.addCallback(self.log, "_get_from_uri2")
1124         def _get_from_uri2(res):
1125             return getPage(base + "uri?uri=%s" % (self.uri,))
1126         d.addCallback(_get_from_uri2)
1127         d.addCallback(_got_from_uri)
1128
1129         # download from a bogus URI, make sure we get a reasonable error
1130         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1131         def _get_from_bogus_uri(res):
1132             d1 = getPage(base + "uri/%s?filename=%s"
1133                          % (self.mangle_uri(self.uri), "mydata567"))
1134             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1135                        "410")
1136             return d1
1137         d.addCallback(_get_from_bogus_uri)
1138         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1139
1140         # upload a file with PUT
1141         d.addCallback(self.log, "about to try PUT")
1142         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1143                                            "new.txt contents"))
1144         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1145         d.addCallback(self.failUnlessEqual, "new.txt contents")
1146         # and again with something large enough to use multiple segments,
1147         # and hopefully trigger pauseProducing too
1148         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1149                                            "big" * 500000)) # 1.5MB
1150         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1151         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1152
1153         # can we replace files in place?
1154         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1155                                            "NEWER contents"))
1156         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1157         d.addCallback(self.failUnlessEqual, "NEWER contents")
1158
1159         # test unlinked POST
1160         d.addCallback(lambda res: self.POST("uri", t="upload",
1161                                             file=("new.txt", "data" * 10000)))
1162         # and again using the helper, which exercises different upload-status
1163         # display code
1164         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1165                                             file=("foo.txt", "data2" * 10000)))
1166
1167         # check that the status page exists
1168         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1169         def _got_status(res):
1170             # find an interesting upload and download to look at. LIT files
1171             # are not interesting.
1172             for ds in self.clients[0].list_all_download_statuses():
1173                 if ds.get_size() > 200:
1174                     self._down_status = ds.get_counter()
1175             for us in self.clients[0].list_all_upload_statuses():
1176                 if us.get_size() > 200:
1177                     self._up_status = us.get_counter()
1178             rs = self.clients[0].list_all_retrieve_statuses()[0]
1179             self._retrieve_status = rs.get_counter()
1180             ps = self.clients[0].list_all_publish_statuses()[0]
1181             self._publish_status = ps.get_counter()
1182             us = self.clients[0].list_all_mapupdate_statuses()[0]
1183             self._update_status = us.get_counter()
1184
1185             # and that there are some upload- and download- status pages
1186             return self.GET("status/up-%d" % self._up_status)
1187         d.addCallback(_got_status)
1188         def _got_up(res):
1189             return self.GET("status/down-%d" % self._down_status)
1190         d.addCallback(_got_up)
1191         def _got_down(res):
1192             return self.GET("status/mapupdate-%d" % self._update_status)
1193         d.addCallback(_got_down)
1194         def _got_update(res):
1195             return self.GET("status/publish-%d" % self._publish_status)
1196         d.addCallback(_got_update)
1197         def _got_publish(res):
1198             return self.GET("status/retrieve-%d" % self._retrieve_status)
1199         d.addCallback(_got_publish)
1200
1201         # check that the helper status page exists
1202         d.addCallback(lambda res:
1203                       self.GET("helper_status", followRedirect=True))
1204         def _got_helper_status(res):
1205             self.failUnless("Bytes Fetched:" in res)
1206             # touch a couple of files in the helper's working directory to
1207             # exercise more code paths
1208             workdir = os.path.join(self.getdir("client0"), "helper")
1209             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1210             f = open(incfile, "wb")
1211             f.write("small file")
1212             f.close()
1213             then = time.time() - 86400*3
1214             now = time.time()
1215             os.utime(incfile, (now, then))
1216             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1217             f = open(encfile, "wb")
1218             f.write("less small file")
1219             f.close()
1220             os.utime(encfile, (now, then))
1221         d.addCallback(_got_helper_status)
1222         # and that the json form exists
1223         d.addCallback(lambda res:
1224                       self.GET("helper_status?t=json", followRedirect=True))
1225         def _got_helper_status_json(res):
1226             data = simplejson.loads(res)
1227             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1228                                  1)
1229             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1230             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1231             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1232                                  10)
1233             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1234             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1235             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1236                                  15)
1237         d.addCallback(_got_helper_status_json)
1238
1239         # and check that client[3] (which uses a helper but does not run one
1240         # itself) doesn't explode when you ask for its status
1241         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1242         def _got_non_helper_status(res):
1243             self.failUnless("Upload and Download Status" in res)
1244         d.addCallback(_got_non_helper_status)
1245
1246         # or for helper status with t=json
1247         d.addCallback(lambda res:
1248                       getPage(self.helper_webish_url + "helper_status?t=json"))
1249         def _got_non_helper_status_json(res):
1250             data = simplejson.loads(res)
1251             self.failUnlessEqual(data, {})
1252         d.addCallback(_got_non_helper_status_json)
1253
1254         # see if the statistics page exists
1255         d.addCallback(lambda res: self.GET("statistics"))
1256         def _got_stats(res):
1257             self.failUnless("Node Statistics" in res)
1258             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1259         d.addCallback(_got_stats)
1260         d.addCallback(lambda res: self.GET("statistics?t=json"))
1261         def _got_stats_json(res):
1262             data = simplejson.loads(res)
1263             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1264             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1265         d.addCallback(_got_stats_json)
1266
1267         # TODO: mangle the second segment of a file, to test errors that
1268         # occur after we've already sent some good data, which uses a
1269         # different error path.
1270
1271         # TODO: download a URI with a form
1272         # TODO: create a directory by using a form
1273         # TODO: upload by using a form on the directory page
1274         #    url = base + "somedir/subdir1/freeform_post!!upload"
1275         # TODO: delete a file by using a button on the directory page
1276
1277         return d
1278
1279     def _test_runner(self, res):
1280         # exercise some of the diagnostic tools in runner.py
1281
1282         # find a share
1283         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1284             if "storage" not in dirpath:
1285                 continue
1286             if not filenames:
1287                 continue
1288             pieces = dirpath.split(os.sep)
1289             if pieces[-4] == "storage" and pieces[-3] == "shares":
1290                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1291                 # are sharefiles here
1292                 filename = os.path.join(dirpath, filenames[0])
1293                 # peek at the magic to see if it is a chk share
1294                 magic = open(filename, "rb").read(4)
1295                 if magic == '\x00\x00\x00\x01':
1296                     break
1297         else:
1298             self.fail("unable to find any uri_extension files in %s"
1299                       % self.basedir)
1300         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1301
1302         out,err = StringIO(), StringIO()
1303         rc = runner.runner(["debug", "dump-share", "--offsets",
1304                             filename],
1305                            stdout=out, stderr=err)
1306         output = out.getvalue()
1307         self.failUnlessEqual(rc, 0)
1308
1309         # we only upload a single file, so we can assert some things about
1310         # its size and shares.
1311         self.failUnless(("share filename: %s" % filename) in output)
1312         self.failUnless("size: %d\n" % len(self.data) in output)
1313         self.failUnless("num_segments: 1\n" in output)
1314         # segment_size is always a multiple of needed_shares
1315         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1316         self.failUnless("total_shares: 10\n" in output)
1317         # keys which are supposed to be present
1318         for key in ("size", "num_segments", "segment_size",
1319                     "needed_shares", "total_shares",
1320                     "codec_name", "codec_params", "tail_codec_params",
1321                     #"plaintext_hash", "plaintext_root_hash",
1322                     "crypttext_hash", "crypttext_root_hash",
1323                     "share_root_hash", "UEB_hash"):
1324             self.failUnless("%s: " % key in output, key)
1325         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1326
1327         # now use its storage index to find the other shares using the
1328         # 'find-shares' tool
1329         sharedir, shnum = os.path.split(filename)
1330         storagedir, storage_index_s = os.path.split(sharedir)
1331         out,err = StringIO(), StringIO()
1332         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1333         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1334         rc = runner.runner(cmd, stdout=out, stderr=err)
1335         self.failUnlessEqual(rc, 0)
1336         out.seek(0)
1337         sharefiles = [sfn.strip() for sfn in out.readlines()]
1338         self.failUnlessEqual(len(sharefiles), 10)
1339
1340         # also exercise the 'catalog-shares' tool
1341         out,err = StringIO(), StringIO()
1342         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1343         cmd = ["debug", "catalog-shares"] + nodedirs
1344         rc = runner.runner(cmd, stdout=out, stderr=err)
1345         self.failUnlessEqual(rc, 0)
1346         out.seek(0)
1347         descriptions = [sfn.strip() for sfn in out.readlines()]
1348         self.failUnlessEqual(len(descriptions), 30)
1349         matching = [line
1350                     for line in descriptions
1351                     if line.startswith("CHK %s " % storage_index_s)]
1352         self.failUnlessEqual(len(matching), 10)
1353
1354     def _test_control(self, res):
1355         # exercise the remote-control-the-client foolscap interfaces in
1356         # allmydata.control (mostly used for performance tests)
1357         c0 = self.clients[0]
1358         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1359         control_furl = open(control_furl_file, "r").read().strip()
1360         # it doesn't really matter which Tub we use to connect to the client,
1361         # so let's just use our IntroducerNode's
1362         d = self.introducer.tub.getReference(control_furl)
1363         d.addCallback(self._test_control2, control_furl_file)
1364         return d
1365     def _test_control2(self, rref, filename):
1366         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1367         downfile = os.path.join(self.basedir, "control.downfile")
1368         d.addCallback(lambda uri:
1369                       rref.callRemote("download_from_uri_to_file",
1370                                       uri, downfile))
1371         def _check(res):
1372             self.failUnlessEqual(res, downfile)
1373             data = open(downfile, "r").read()
1374             expected_data = open(filename, "r").read()
1375             self.failUnlessEqual(data, expected_data)
1376         d.addCallback(_check)
1377         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1378         if sys.platform == "linux2":
1379             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1380         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1381         return d
1382
1383     def _test_cli(self, res):
1384         # run various CLI commands (in a thread, since they use blocking
1385         # network calls)
1386
1387         private_uri = self._private_node.get_uri()
1388         some_uri = self._root_directory_uri
1389         client0_basedir = self.getdir("client0")
1390
1391         nodeargs = [
1392             "--node-directory", client0_basedir,
1393             ]
1394         TESTDATA = "I will not write the same thing over and over.\n" * 100
1395
1396         d = defer.succeed(None)
1397
1398         # for compatibility with earlier versions, private/root_dir.cap is
1399         # supposed to be treated as an alias named "tahoe:". Start by making
1400         # sure that works, before we add other aliases.
1401
1402         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1403         f = open(root_file, "w")
1404         f.write(private_uri)
1405         f.close()
1406
1407         def run(ignored, verb, *args, **kwargs):
1408             stdin = kwargs.get("stdin", "")
1409             newargs = [verb] + nodeargs + list(args)
1410             return self._run_cli(newargs, stdin=stdin)
1411
1412         def _check_ls((out,err), expected_children, unexpected_children=[]):
1413             self.failUnlessEqual(err, "")
1414             for s in expected_children:
1415                 self.failUnless(s in out, (s,out))
1416             for s in unexpected_children:
1417                 self.failIf(s in out, (s,out))
1418
1419         def _check_ls_root((out,err)):
1420             self.failUnless("personal" in out)
1421             self.failUnless("s2-ro" in out)
1422             self.failUnless("s2-rw" in out)
1423             self.failUnlessEqual(err, "")
1424
1425         # this should reference private_uri
1426         d.addCallback(run, "ls")
1427         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1428
1429         d.addCallback(run, "list-aliases")
1430         def _check_aliases_1((out,err)):
1431             self.failUnlessEqual(err, "")
1432             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1433         d.addCallback(_check_aliases_1)
1434
1435         # now that that's out of the way, remove root_dir.cap and work with
1436         # new files
1437         d.addCallback(lambda res: os.unlink(root_file))
1438         d.addCallback(run, "list-aliases")
1439         def _check_aliases_2((out,err)):
1440             self.failUnlessEqual(err, "")
1441             self.failUnlessEqual(out, "")
1442         d.addCallback(_check_aliases_2)
1443
1444         d.addCallback(run, "mkdir")
1445         def _got_dir( (out,err) ):
1446             self.failUnless(uri.from_string_dirnode(out.strip()))
1447             return out.strip()
1448         d.addCallback(_got_dir)
1449         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1450
1451         d.addCallback(run, "list-aliases")
1452         def _check_aliases_3((out,err)):
1453             self.failUnlessEqual(err, "")
1454             self.failUnless("tahoe: " in out)
1455         d.addCallback(_check_aliases_3)
1456
1457         def _check_empty_dir((out,err)):
1458             self.failUnlessEqual(out, "")
1459             self.failUnlessEqual(err, "")
1460         d.addCallback(run, "ls")
1461         d.addCallback(_check_empty_dir)
1462
1463         def _check_missing_dir((out,err)):
1464             # TODO: check that rc==2
1465             self.failUnlessEqual(out, "")
1466             self.failUnlessEqual(err, "No such file or directory\n")
1467         d.addCallback(run, "ls", "bogus")
1468         d.addCallback(_check_missing_dir)
1469
1470         files = []
1471         datas = []
1472         for i in range(10):
1473             fn = os.path.join(self.basedir, "file%d" % i)
1474             files.append(fn)
1475             data = "data to be uploaded: file%d\n" % i
1476             datas.append(data)
1477             open(fn,"wb").write(data)
1478
1479         def _check_stdout_against((out,err), filenum=None, data=None):
1480             self.failUnlessEqual(err, "")
1481             if filenum is not None:
1482                 self.failUnlessEqual(out, datas[filenum])
1483             if data is not None:
1484                 self.failUnlessEqual(out, data)
1485
1486         # test all both forms of put: from a file, and from stdin
1487         #  tahoe put bar FOO
1488         d.addCallback(run, "put", files[0], "tahoe-file0")
1489         def _put_out((out,err)):
1490             self.failUnless("URI:LIT:" in out, out)
1491             self.failUnless("201 Created" in err, err)
1492             uri0 = out.strip()
1493             return run(None, "get", uri0)
1494         d.addCallback(_put_out)
1495         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1496
1497         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1498         #  tahoe put bar tahoe:FOO
1499         d.addCallback(run, "put", files[2], "tahoe:file2")
1500         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1501         def _check_put_mutable((out,err)):
1502             self._mutable_file3_uri = out.strip()
1503         d.addCallback(_check_put_mutable)
1504         d.addCallback(run, "get", "tahoe:file3")
1505         d.addCallback(_check_stdout_against, 3)
1506
1507         #  tahoe put FOO
1508         STDIN_DATA = "This is the file to upload from stdin."
1509         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1510         #  tahoe put tahoe:FOO
1511         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1512                       stdin="Other file from stdin.")
1513
1514         d.addCallback(run, "ls")
1515         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1516                                   "tahoe-file-stdin", "from-stdin"])
1517         d.addCallback(run, "ls", "subdir")
1518         d.addCallback(_check_ls, ["tahoe-file1"])
1519
1520         # tahoe mkdir FOO
1521         d.addCallback(run, "mkdir", "subdir2")
1522         d.addCallback(run, "ls")
1523         # TODO: extract the URI, set an alias with it
1524         d.addCallback(_check_ls, ["subdir2"])
1525
1526         # tahoe get: (to stdin and to a file)
1527         d.addCallback(run, "get", "tahoe-file0")
1528         d.addCallback(_check_stdout_against, 0)
1529         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1530         d.addCallback(_check_stdout_against, 1)
1531         outfile0 = os.path.join(self.basedir, "outfile0")
1532         d.addCallback(run, "get", "file2", outfile0)
1533         def _check_outfile0((out,err)):
1534             data = open(outfile0,"rb").read()
1535             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1536         d.addCallback(_check_outfile0)
1537         outfile1 = os.path.join(self.basedir, "outfile0")
1538         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1539         def _check_outfile1((out,err)):
1540             data = open(outfile1,"rb").read()
1541             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1542         d.addCallback(_check_outfile1)
1543
1544         d.addCallback(run, "rm", "tahoe-file0")
1545         d.addCallback(run, "rm", "tahoe:file2")
1546         d.addCallback(run, "ls")
1547         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1548
1549         d.addCallback(run, "ls", "-l")
1550         def _check_ls_l((out,err)):
1551             lines = out.split("\n")
1552             for l in lines:
1553                 if "tahoe-file-stdin" in l:
1554                     self.failUnless(l.startswith("-r-- "), l)
1555                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1556                 if "file3" in l:
1557                     self.failUnless(l.startswith("-rw- "), l) # mutable
1558         d.addCallback(_check_ls_l)
1559
1560         d.addCallback(run, "ls", "--uri")
1561         def _check_ls_uri((out,err)):
1562             lines = out.split("\n")
1563             for l in lines:
1564                 if "file3" in l:
1565                     self.failUnless(self._mutable_file3_uri in l)
1566         d.addCallback(_check_ls_uri)
1567
1568         d.addCallback(run, "ls", "--readonly-uri")
1569         def _check_ls_rouri((out,err)):
1570             lines = out.split("\n")
1571             for l in lines:
1572                 if "file3" in l:
1573                     rw_uri = self._mutable_file3_uri
1574                     u = uri.from_string_mutable_filenode(rw_uri)
1575                     ro_uri = u.get_readonly().to_string()
1576                     self.failUnless(ro_uri in l)
1577         d.addCallback(_check_ls_rouri)
1578
1579
1580         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1581         d.addCallback(run, "ls")
1582         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1583
1584         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1585         d.addCallback(run, "ls")
1586         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1587
1588         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1589         d.addCallback(run, "ls")
1590         d.addCallback(_check_ls, ["file3", "file3-copy"])
1591         d.addCallback(run, "get", "tahoe:file3-copy")
1592         d.addCallback(_check_stdout_against, 3)
1593
1594         # copy from disk into tahoe
1595         d.addCallback(run, "cp", files[4], "tahoe:file4")
1596         d.addCallback(run, "ls")
1597         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1598         d.addCallback(run, "get", "tahoe:file4")
1599         d.addCallback(_check_stdout_against, 4)
1600
1601         # copy from tahoe into disk
1602         target_filename = os.path.join(self.basedir, "file-out")
1603         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1604         def _check_cp_out((out,err)):
1605             self.failUnless(os.path.exists(target_filename))
1606             got = open(target_filename,"rb").read()
1607             self.failUnlessEqual(got, datas[4])
1608         d.addCallback(_check_cp_out)
1609
1610         # copy from disk to disk (silly case)
1611         target2_filename = os.path.join(self.basedir, "file-out-copy")
1612         d.addCallback(run, "cp", target_filename, target2_filename)
1613         def _check_cp_out2((out,err)):
1614             self.failUnless(os.path.exists(target2_filename))
1615             got = open(target2_filename,"rb").read()
1616             self.failUnlessEqual(got, datas[4])
1617         d.addCallback(_check_cp_out2)
1618
1619         # copy from tahoe into disk, overwriting an existing file
1620         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1621         def _check_cp_out3((out,err)):
1622             self.failUnless(os.path.exists(target_filename))
1623             got = open(target_filename,"rb").read()
1624             self.failUnlessEqual(got, datas[3])
1625         d.addCallback(_check_cp_out3)
1626
1627         # copy from disk into tahoe, overwriting an existing immutable file
1628         d.addCallback(run, "cp", files[5], "tahoe:file4")
1629         d.addCallback(run, "ls")
1630         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1631         d.addCallback(run, "get", "tahoe:file4")
1632         d.addCallback(_check_stdout_against, 5)
1633
1634         # copy from disk into tahoe, overwriting an existing mutable file
1635         d.addCallback(run, "cp", files[5], "tahoe:file3")
1636         d.addCallback(run, "ls")
1637         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1638         d.addCallback(run, "get", "tahoe:file3")
1639         d.addCallback(_check_stdout_against, 5)
1640
1641         # recursive copy: setup
1642         dn = os.path.join(self.basedir, "dir1")
1643         os.makedirs(dn)
1644         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1645         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1646         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1647         sdn2 = os.path.join(dn, "subdir2")
1648         os.makedirs(sdn2)
1649         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1650         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1651
1652         # from disk into tahoe
1653         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1654         d.addCallback(run, "ls")
1655         d.addCallback(_check_ls, ["dir1"])
1656         d.addCallback(run, "ls", "dir1")
1657         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1658                       ["rfile4", "rfile5"])
1659         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1660         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1661                       ["rfile1", "rfile2", "rfile3"])
1662         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1663         d.addCallback(_check_stdout_against, data="rfile4")
1664
1665         # and back out again
1666         dn_copy = os.path.join(self.basedir, "dir1-copy")
1667         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1668         def _check_cp_r_out((out,err)):
1669             def _cmp(name):
1670                 old = open(os.path.join(dn, name), "rb").read()
1671                 newfn = os.path.join(dn_copy, name)
1672                 self.failUnless(os.path.exists(newfn))
1673                 new = open(newfn, "rb").read()
1674                 self.failUnlessEqual(old, new)
1675             _cmp("rfile1")
1676             _cmp("rfile2")
1677             _cmp("rfile3")
1678             _cmp(os.path.join("subdir2", "rfile4"))
1679             _cmp(os.path.join("subdir2", "rfile5"))
1680         d.addCallback(_check_cp_r_out)
1681
1682         # and copy it a second time, which ought to overwrite the same files
1683         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1684
1685         # and tahoe-to-tahoe
1686         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1687         d.addCallback(run, "ls")
1688         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1689         d.addCallback(run, "ls", "dir1-copy")
1690         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1691                       ["rfile4", "rfile5"])
1692         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1693         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1694                       ["rfile1", "rfile2", "rfile3"])
1695         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1696         d.addCallback(_check_stdout_against, data="rfile4")
1697
1698         # and copy it a second time, which ought to overwrite the same files
1699         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1700
1701         # tahoe_ls doesn't currently handle the error correctly: it tries to
1702         # JSON-parse a traceback.
1703 ##         def _ls_missing(res):
1704 ##             argv = ["ls"] + nodeargs + ["bogus"]
1705 ##             return self._run_cli(argv)
1706 ##         d.addCallback(_ls_missing)
1707 ##         def _check_ls_missing((out,err)):
1708 ##             print "OUT", out
1709 ##             print "ERR", err
1710 ##             self.failUnlessEqual(err, "")
1711 ##         d.addCallback(_check_ls_missing)
1712
1713         return d
1714
1715     def _run_cli(self, argv, stdin=""):
1716         #print "CLI:", argv
1717         stdout, stderr = StringIO(), StringIO()
1718         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1719                                   stdin=StringIO(stdin),
1720                                   stdout=stdout, stderr=stderr)
1721         def _done(res):
1722             return stdout.getvalue(), stderr.getvalue()
1723         d.addCallback(_done)
1724         return d
1725
1726     def _test_checker(self, res):
1727         ut = upload.Data("too big to be literal" * 200, convergence=None)
1728         d = self._personal_node.add_file(u"big file", ut)
1729
1730         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1731         def _check_dirnode_results(r):
1732             self.failUnless(r.is_healthy())
1733         d.addCallback(_check_dirnode_results)
1734         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1735         d.addCallback(_check_dirnode_results)
1736
1737         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1738         def _got_chk_filenode(n):
1739             self.failUnless(isinstance(n, filenode.FileNode))
1740             d = n.check(Monitor())
1741             def _check_filenode_results(r):
1742                 self.failUnless(r.is_healthy())
1743             d.addCallback(_check_filenode_results)
1744             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1745             d.addCallback(_check_filenode_results)
1746             return d
1747         d.addCallback(_got_chk_filenode)
1748
1749         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1750         def _got_lit_filenode(n):
1751             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1752             d = n.check(Monitor())
1753             def _check_lit_filenode_results(r):
1754                 self.failUnlessEqual(r, None)
1755             d.addCallback(_check_lit_filenode_results)
1756             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1757             d.addCallback(_check_lit_filenode_results)
1758             return d
1759         d.addCallback(_got_lit_filenode)
1760         return d
1761
1762
1763 class MutableChecker(SystemTestMixin, unittest.TestCase, ErrorMixin):
1764
1765     def _run_cli(self, argv):
1766         stdout, stderr = StringIO(), StringIO()
1767         # this can only do synchronous operations
1768         assert argv[0] == "debug"
1769         runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1770         return stdout.getvalue()
1771
1772     def test_good(self):
1773         self.basedir = self.mktemp()
1774         d = self.set_up_nodes()
1775         CONTENTS = "a little bit of data"
1776         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1777         def _created(node):
1778             self.node = node
1779             si = self.node.get_storage_index()
1780         d.addCallback(_created)
1781         # now make sure the webapi verifier sees no problems
1782         def _do_check(res):
1783             url = (self.webish_url +
1784                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1785                    "?t=check&verify=true")
1786             return getPage(url, method="POST")
1787         d.addCallback(_do_check)
1788         def _got_results(out):
1789             self.failUnless("<span>Healthy : Healthy</span>" in out, out)
1790             self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1791             self.failIf("Not Healthy!" in out, out)
1792             self.failIf("Unhealthy" in out, out)
1793             self.failIf("Corrupt Shares" in out, out)
1794         d.addCallback(_got_results)
1795         d.addErrback(self.explain_web_error)
1796         return d
1797
1798     def test_corrupt(self):
1799         self.basedir = self.mktemp()
1800         d = self.set_up_nodes()
1801         CONTENTS = "a little bit of data"
1802         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1803         def _created(node):
1804             self.node = node
1805             si = self.node.get_storage_index()
1806             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1807                                 self.clients[1].basedir])
1808             files = out.split("\n")
1809             # corrupt one of them, using the CLI debug command
1810             f = files[0]
1811             shnum = os.path.basename(f)
1812             nodeid = self.clients[1].nodeid
1813             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1814             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1815             out = self._run_cli(["debug", "corrupt-share", files[0]])
1816         d.addCallback(_created)
1817         # now make sure the webapi verifier notices it
1818         def _do_check(res):
1819             url = (self.webish_url +
1820                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1821                    "?t=check&verify=true")
1822             return getPage(url, method="POST")
1823         d.addCallback(_do_check)
1824         def _got_results(out):
1825             self.failUnless("Not Healthy!" in out, out)
1826             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1827             self.failUnless("Corrupt Shares:" in out, out)
1828         d.addCallback(_got_results)
1829
1830         # now make sure the webapi repairer can fix it
1831         def _do_repair(res):
1832             url = (self.webish_url +
1833                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1834                    "?t=check&verify=true&repair=true")
1835             return getPage(url, method="POST")
1836         d.addCallback(_do_repair)
1837         def _got_repair_results(out):
1838             self.failUnless("<div>Repair successful</div>" in out, out)
1839         d.addCallback(_got_repair_results)
1840         d.addCallback(_do_check)
1841         def _got_postrepair_results(out):
1842             self.failIf("Not Healthy!" in out, out)
1843             self.failUnless("Recoverable Versions: 10*seq" in out, out)
1844         d.addCallback(_got_postrepair_results)
1845         d.addErrback(self.explain_web_error)
1846
1847         return d
1848
1849     def test_delete_share(self):
1850         self.basedir = self.mktemp()
1851         d = self.set_up_nodes()
1852         CONTENTS = "a little bit of data"
1853         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1854         def _created(node):
1855             self.node = node
1856             si = self.node.get_storage_index()
1857             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1858                                 self.clients[1].basedir])
1859             files = out.split("\n")
1860             # corrupt one of them, using the CLI debug command
1861             f = files[0]
1862             shnum = os.path.basename(f)
1863             nodeid = self.clients[1].nodeid
1864             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1865             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1866             os.unlink(files[0])
1867         d.addCallback(_created)
1868         # now make sure the webapi checker notices it
1869         def _do_check(res):
1870             url = (self.webish_url +
1871                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1872                    "?t=check&verify=false")
1873             return getPage(url, method="POST")
1874         d.addCallback(_do_check)
1875         def _got_results(out):
1876             self.failUnless("Not Healthy!" in out, out)
1877             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1878             self.failIf("Corrupt Shares" in out, out)
1879         d.addCallback(_got_results)
1880
1881         # now make sure the webapi repairer can fix it
1882         def _do_repair(res):
1883             url = (self.webish_url +
1884                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1885                    "?t=check&verify=false&repair=true")
1886             return getPage(url, method="POST")
1887         d.addCallback(_do_repair)
1888         def _got_repair_results(out):
1889             self.failUnless("Repair successful" in out)
1890         d.addCallback(_got_repair_results)
1891         d.addCallback(_do_check)
1892         def _got_postrepair_results(out):
1893             self.failIf("Not Healthy!" in out, out)
1894             self.failUnless("Recoverable Versions: 10*seq" in out)
1895         d.addCallback(_got_postrepair_results)
1896         d.addErrback(self.explain_web_error)
1897
1898         return d
1899
1900
1901 class DeepCheckBase(SystemTestMixin, ErrorMixin):
1902
1903     def web_json(self, n, **kwargs):
1904         kwargs["output"] = "json"
1905         d = self.web(n, "POST", **kwargs)
1906         d.addCallback(self.decode_json)
1907         return d
1908
1909     def decode_json(self, (s,url)):
1910         try:
1911             data = simplejson.loads(s)
1912         except ValueError:
1913             self.fail("%s: not JSON: '%s'" % (url, s))
1914         return data
1915
1916     def web(self, n, method="GET", **kwargs):
1917         # returns (data, url)
1918         url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
1919                + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
1920         d = getPage(url, method=method)
1921         d.addCallback(lambda data: (data,url))
1922         return d
1923
1924     def wait_for_operation(self, ignored, ophandle):
1925         url = self.webish_url + "operations/" + ophandle
1926         url += "?t=status&output=JSON"
1927         d = getPage(url)
1928         def _got(res):
1929             try:
1930                 data = simplejson.loads(res)
1931             except ValueError:
1932                 self.fail("%s: not JSON: '%s'" % (url, res))
1933             if not data["finished"]:
1934                 d = self.stall(delay=1.0)
1935                 d.addCallback(self.wait_for_operation, ophandle)
1936                 return d
1937             return data
1938         d.addCallback(_got)
1939         return d
1940
1941     def get_operation_results(self, ignored, ophandle, output=None):
1942         url = self.webish_url + "operations/" + ophandle
1943         url += "?t=status"
1944         if output:
1945             url += "&output=" + output
1946         d = getPage(url)
1947         def _got(res):
1948             if output and output.lower() == "json":
1949                 try:
1950                     return simplejson.loads(res)
1951                 except ValueError:
1952                     self.fail("%s: not JSON: '%s'" % (url, res))
1953             return res
1954         d.addCallback(_got)
1955         return d
1956
1957     def slow_web(self, n, output=None, **kwargs):
1958         # use ophandle=
1959         handle = base32.b2a(os.urandom(4))
1960         d = self.web(n, "POST", ophandle=handle, **kwargs)
1961         d.addCallback(self.wait_for_operation, handle)
1962         d.addCallback(self.get_operation_results, handle, output=output)
1963         return d
1964
1965
1966 class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
1967     # construct a small directory tree (with one dir, one immutable file, one
1968     # mutable file, one LIT file, and a loop), and then check/examine it in
1969     # various ways.
1970
1971     def set_up_tree(self, ignored):
1972         # 2.9s
1973
1974         # root
1975         #   mutable
1976         #   large
1977         #   small
1978         #   small2
1979         #   loop -> root
1980         c0 = self.clients[0]
1981         d = c0.create_empty_dirnode()
1982         def _created_root(n):
1983             self.root = n
1984             self.root_uri = n.get_uri()
1985         d.addCallback(_created_root)
1986         d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1987         d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1988         def _created_mutable(n):
1989             self.mutable = n
1990             self.mutable_uri = n.get_uri()
1991         d.addCallback(_created_mutable)
1992
1993         large = upload.Data("Lots of data\n" * 1000, None)
1994         d.addCallback(lambda ign: self.root.add_file(u"large", large))
1995         def _created_large(n):
1996             self.large = n
1997             self.large_uri = n.get_uri()
1998         d.addCallback(_created_large)
1999
2000         small = upload.Data("Small enough for a LIT", None)
2001         d.addCallback(lambda ign: self.root.add_file(u"small", small))
2002         def _created_small(n):
2003             self.small = n
2004             self.small_uri = n.get_uri()
2005         d.addCallback(_created_small)
2006
2007         small2 = upload.Data("Small enough for a LIT too", None)
2008         d.addCallback(lambda ign: self.root.add_file(u"small2", small2))
2009         def _created_small2(n):
2010             self.small2 = n
2011             self.small2_uri = n.get_uri()
2012         d.addCallback(_created_small2)
2013
2014         d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
2015         return d
2016
2017     def check_is_healthy(self, cr, n, where, incomplete=False):
2018         self.failUnless(ICheckerResults.providedBy(cr), where)
2019         self.failUnless(cr.is_healthy(), where)
2020         self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
2021                              where)
2022         self.failUnlessEqual(cr.get_storage_index_string(),
2023                              base32.b2a(n.get_storage_index()), where)
2024         needs_rebalancing = bool( len(self.clients) < 10 )
2025         if not incomplete:
2026             self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
2027         d = cr.get_data()
2028         self.failUnlessEqual(d["count-shares-good"], 10, where)
2029         self.failUnlessEqual(d["count-shares-needed"], 3, where)
2030         self.failUnlessEqual(d["count-shares-expected"], 10, where)
2031         if not incomplete:
2032             self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
2033         self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
2034         self.failUnlessEqual(d["list-corrupt-shares"], [], where)
2035         if not incomplete:
2036             self.failUnlessEqual(sorted(d["servers-responding"]),
2037                                  sorted([c.nodeid for c in self.clients]),
2038                                  where)
2039             self.failUnless("sharemap" in d, where)
2040             all_serverids = set()
2041             for (shareid, serverids) in d["sharemap"].items():
2042                 all_serverids.update(serverids)
2043             self.failUnlessEqual(sorted(all_serverids),
2044                                  sorted([c.nodeid for c in self.clients]),
2045                                  where)
2046
2047         self.failUnlessEqual(d["count-wrong-shares"], 0, where)
2048         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2049         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2050
2051
2052     def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
2053         self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
2054         self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
2055         self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
2056         self.failUnless(cr.get_post_repair_results().is_healthy(), where)
2057         self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
2058         self.failIf(cr.get_repair_attempted(), where)
2059
2060     def deep_check_is_healthy(self, cr, num_healthy, where):
2061         self.failUnless(IDeepCheckResults.providedBy(cr))
2062         self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
2063                              num_healthy, where)
2064
2065     def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
2066         self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
2067         c = cr.get_counters()
2068         self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
2069                              num_healthy, where)
2070         self.failUnlessEqual(c["count-objects-healthy-post-repair"],
2071                              num_healthy, where)
2072         self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
2073
2074     def test_good(self):
2075         self.basedir = self.mktemp()
2076         d = self.set_up_nodes()
2077         d.addCallback(self.set_up_tree)
2078         d.addCallback(self.do_stats)
2079         d.addCallback(self.do_test_check_good)
2080         d.addCallback(self.do_test_web_good)
2081         d.addCallback(self.do_test_cli_good)
2082         d.addErrback(self.explain_web_error)
2083         d.addErrback(self.explain_error)
2084         return d
2085
2086     def do_stats(self, ignored):
2087         d = defer.succeed(None)
2088         d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
2089         d.addCallback(self.check_stats_good)
2090         return d
2091
2092     def check_stats_good(self, s):
2093         self.failUnlessEqual(s["count-directories"], 1)
2094         self.failUnlessEqual(s["count-files"], 4)
2095         self.failUnlessEqual(s["count-immutable-files"], 1)
2096         self.failUnlessEqual(s["count-literal-files"], 2)
2097         self.failUnlessEqual(s["count-mutable-files"], 1)
2098         # don't check directories: their size will vary
2099         # s["largest-directory"]
2100         # s["size-directories"]
2101         self.failUnlessEqual(s["largest-directory-children"], 5)
2102         self.failUnlessEqual(s["largest-immutable-file"], 13000)
2103         # to re-use this function for both the local
2104         # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
2105         # coerce the result into a list of tuples. dirnode.start_deep_stats()
2106         # returns a list of tuples, but JSON only knows about lists., so
2107         # t=start-deep-stats returns a list of lists.
2108         histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
2109         self.failUnlessEqual(histogram, [(11, 31, 2),
2110                                          (10001, 31622, 1),
2111                                          ])
2112         self.failUnlessEqual(s["size-immutable-files"], 13000)
2113         self.failUnlessEqual(s["size-literal-files"], 48)
2114
2115     def do_test_check_good(self, ignored):
2116         d = defer.succeed(None)
2117         # check the individual items
2118         d.addCallback(lambda ign: self.root.check(Monitor()))
2119         d.addCallback(self.check_is_healthy, self.root, "root")
2120         d.addCallback(lambda ign: self.mutable.check(Monitor()))
2121         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2122         d.addCallback(lambda ign: self.large.check(Monitor()))
2123         d.addCallback(self.check_is_healthy, self.large, "large")
2124         d.addCallback(lambda ign: self.small.check(Monitor()))
2125         d.addCallback(self.failUnlessEqual, None, "small")
2126         d.addCallback(lambda ign: self.small2.check(Monitor()))
2127         d.addCallback(self.failUnlessEqual, None, "small2")
2128
2129         # and again with verify=True
2130         d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2131         d.addCallback(self.check_is_healthy, self.root, "root")
2132         d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2133         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2134         d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2135         d.addCallback(self.check_is_healthy, self.large, "large",
2136                       incomplete=True)
2137         d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2138         d.addCallback(self.failUnlessEqual, None, "small")
2139         d.addCallback(lambda ign: self.small2.check(Monitor(), verify=True))
2140         d.addCallback(self.failUnlessEqual, None, "small2")
2141
2142         # and check_and_repair(), which should be a nop
2143         d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2144         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2145         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2146         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2147         d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2148         d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2149         d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2150         d.addCallback(self.failUnlessEqual, None, "small")
2151         d.addCallback(lambda ign: self.small2.check_and_repair(Monitor()))
2152         d.addCallback(self.failUnlessEqual, None, "small2")
2153
2154         # check_and_repair(verify=True)
2155         d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2156         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2157         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2158         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2159         d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2160         d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2161                       incomplete=True)
2162         d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2163         d.addCallback(self.failUnlessEqual, None, "small")
2164         d.addCallback(lambda ign: self.small2.check_and_repair(Monitor(), verify=True))
2165         d.addCallback(self.failUnlessEqual, None, "small2")
2166
2167
2168         # now deep-check the root, with various verify= and repair= options
2169         d.addCallback(lambda ign:
2170                       self.root.start_deep_check().when_done())
2171         d.addCallback(self.deep_check_is_healthy, 3, "root")
2172         d.addCallback(lambda ign:
2173                       self.root.start_deep_check(verify=True).when_done())
2174         d.addCallback(self.deep_check_is_healthy, 3, "root")
2175         d.addCallback(lambda ign:
2176                       self.root.start_deep_check_and_repair().when_done())
2177         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2178         d.addCallback(lambda ign:
2179                       self.root.start_deep_check_and_repair(verify=True).when_done())
2180         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2181
2182         # and finally, start a deep-check, but then cancel it.
2183         d.addCallback(lambda ign: self.root.start_deep_check())
2184         def _checking(monitor):
2185             monitor.cancel()
2186             d = monitor.when_done()
2187             # this should fire as soon as the next dirnode.list finishes.
2188             # TODO: add a counter to measure how many list() calls are made,
2189             # assert that no more than one gets to run before the cancel()
2190             # takes effect.
2191             def _finished_normally(res):
2192                 self.fail("this was supposed to fail, not finish normally")
2193             def _cancelled(f):
2194                 f.trap(OperationCancelledError)
2195             d.addCallbacks(_finished_normally, _cancelled)
2196             return d
2197         d.addCallback(_checking)
2198
2199         return d
2200
2201     def json_check_is_healthy(self, data, n, where, incomplete=False):
2202
2203         self.failUnlessEqual(data["storage-index"],
2204                              base32.b2a(n.get_storage_index()), where)
2205         self.failUnless("summary" in data, (where, data))
2206         self.failUnlessEqual(data["summary"].lower(), "healthy",
2207                              "%s: '%s'" % (where, data["summary"]))
2208         r = data["results"]
2209         self.failUnlessEqual(r["healthy"], True, where)
2210         needs_rebalancing = bool( len(self.clients) < 10 )
2211         if not incomplete:
2212             self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2213         self.failUnlessEqual(r["count-shares-good"], 10, where)
2214         self.failUnlessEqual(r["count-shares-needed"], 3, where)
2215         self.failUnlessEqual(r["count-shares-expected"], 10, where)
2216         if not incomplete:
2217             self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2218         self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2219         self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2220         if not incomplete:
2221             self.failUnlessEqual(sorted(r["servers-responding"]),
2222                                  sorted([idlib.nodeid_b2a(c.nodeid)
2223                                          for c in self.clients]), where)
2224             self.failUnless("sharemap" in r, where)
2225             all_serverids = set()
2226             for (shareid, serverids_s) in r["sharemap"].items():
2227                 all_serverids.update(serverids_s)
2228             self.failUnlessEqual(sorted(all_serverids),
2229                                  sorted([idlib.nodeid_b2a(c.nodeid)
2230                                          for c in self.clients]), where)
2231         self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2232         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2233         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2234
2235     def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2236         self.failUnlessEqual(data["storage-index"],
2237                              base32.b2a(n.get_storage_index()), where)
2238         self.failUnlessEqual(data["repair-attempted"], False, where)
2239         self.json_check_is_healthy(data["pre-repair-results"],
2240                                    n, where, incomplete)
2241         self.json_check_is_healthy(data["post-repair-results"],
2242                                    n, where, incomplete)
2243
2244     def json_full_deepcheck_is_healthy(self, data, n, where):
2245         self.failUnlessEqual(data["root-storage-index"],
2246                              base32.b2a(n.get_storage_index()), where)
2247         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2248         self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2249         self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2250         self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2251         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2252         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2253         self.json_check_stats_good(data["stats"], where)
2254
2255     def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2256         self.failUnlessEqual(data["root-storage-index"],
2257                              base32.b2a(n.get_storage_index()), where)
2258         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2259
2260         self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2261         self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2262         self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2263
2264         self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2265         self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2266         self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2267
2268         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2269         self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2270         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2271
2272         self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2273         self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2274         self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2275
2276
2277     def json_check_lit(self, data, n, where):
2278         self.failUnlessEqual(data["storage-index"], "", where)
2279         self.failUnlessEqual(data["results"]["healthy"], True, where)
2280
2281     def json_check_stats_good(self, data, where):
2282         self.check_stats_good(data)
2283
2284     def do_test_web_good(self, ignored):
2285         d = defer.succeed(None)
2286
2287         # stats
2288         d.addCallback(lambda ign:
2289                       self.slow_web(self.root,
2290                                     t="start-deep-stats", output="json"))
2291         d.addCallback(self.json_check_stats_good, "deep-stats")
2292
2293         # check, no verify
2294         d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2295         d.addCallback(self.json_check_is_healthy, self.root, "root")
2296         d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2297         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2298         d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2299         d.addCallback(self.json_check_is_healthy, self.large, "large")
2300         d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2301         d.addCallback(self.json_check_lit, self.small, "small")
2302         d.addCallback(lambda ign: self.web_json(self.small2, t="check"))
2303         d.addCallback(self.json_check_lit, self.small2, "small2")
2304
2305         # check and verify
2306         d.addCallback(lambda ign:
2307                       self.web_json(self.root, t="check", verify="true"))
2308         d.addCallback(self.json_check_is_healthy, self.root, "root+v")
2309         d.addCallback(lambda ign:
2310                       self.web_json(self.mutable, t="check", verify="true"))
2311         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable+v")
2312         d.addCallback(lambda ign:
2313                       self.web_json(self.large, t="check", verify="true"))
2314         d.addCallback(self.json_check_is_healthy, self.large, "large+v",
2315                       incomplete=True)
2316         d.addCallback(lambda ign:
2317                       self.web_json(self.small, t="check", verify="true"))
2318         d.addCallback(self.json_check_lit, self.small, "small+v")
2319         d.addCallback(lambda ign:
2320                       self.web_json(self.small2, t="check", verify="true"))
2321         d.addCallback(self.json_check_lit, self.small2, "small2+v")
2322
2323         # check and repair, no verify
2324         d.addCallback(lambda ign:
2325                       self.web_json(self.root, t="check", repair="true"))
2326         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+r")
2327         d.addCallback(lambda ign:
2328                       self.web_json(self.mutable, t="check", repair="true"))
2329         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+r")
2330         d.addCallback(lambda ign:
2331                       self.web_json(self.large, t="check", repair="true"))
2332         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+r")
2333         d.addCallback(lambda ign:
2334                       self.web_json(self.small, t="check", repair="true"))
2335         d.addCallback(self.json_check_lit, self.small, "small+r")
2336         d.addCallback(lambda ign:
2337                       self.web_json(self.small2, t="check", repair="true"))
2338         d.addCallback(self.json_check_lit, self.small2, "small2+r")
2339
2340         # check+verify+repair
2341         d.addCallback(lambda ign:
2342                       self.web_json(self.root, t="check", repair="true", verify="true"))
2343         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+vr")
2344         d.addCallback(lambda ign:
2345                       self.web_json(self.mutable, t="check", repair="true", verify="true"))
2346         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+vr")
2347         d.addCallback(lambda ign:
2348                       self.web_json(self.large, t="check", repair="true", verify="true"))
2349         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+vr", incomplete=True)
2350         d.addCallback(lambda ign:
2351                       self.web_json(self.small, t="check", repair="true", verify="true"))
2352         d.addCallback(self.json_check_lit, self.small, "small+vr")
2353         d.addCallback(lambda ign:
2354                       self.web_json(self.small2, t="check", repair="true", verify="true"))
2355         d.addCallback(self.json_check_lit, self.small2, "small2+vr")
2356
2357         # now run a deep-check, with various verify= and repair= flags
2358         d.addCallback(lambda ign:
2359                       self.slow_web(self.root, t="start-deep-check", output="json"))
2360         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+d")
2361         d.addCallback(lambda ign:
2362                       self.slow_web(self.root, t="start-deep-check", verify="true",
2363                                     output="json"))
2364         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+dv")
2365         d.addCallback(lambda ign:
2366                       self.slow_web(self.root, t="start-deep-check", repair="true",
2367                                     output="json"))
2368         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dr")
2369         d.addCallback(lambda ign:
2370                       self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2371         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dvr")
2372
2373         # now look at t=info
2374         d.addCallback(lambda ign: self.web(self.root, t="info"))
2375         # TODO: examine the output
2376         d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2377         d.addCallback(lambda ign: self.web(self.large, t="info"))
2378         d.addCallback(lambda ign: self.web(self.small, t="info"))
2379         d.addCallback(lambda ign: self.web(self.small2, t="info"))
2380
2381         return d
2382
2383     def _run_cli(self, argv, stdin=""):
2384         #print "CLI:", argv
2385         stdout, stderr = StringIO(), StringIO()
2386         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
2387                                   stdin=StringIO(stdin),
2388                                   stdout=stdout, stderr=stderr)
2389         def _done(res):
2390             return stdout.getvalue(), stderr.getvalue()
2391         d.addCallback(_done)
2392         return d
2393
2394     def do_test_cli_good(self, ignored):
2395         basedir = self.getdir("client0")
2396         d = self._run_cli(["manifest",
2397                            "--node-directory", basedir,
2398                            self.root_uri])
2399         def _check((out,err)):
2400             self.failUnlessEqual(err, "")
2401             lines = [l for l in out.split("\n") if l]
2402             self.failUnlessEqual(len(lines), 5)
2403             caps = {}
2404             for l in lines:
2405                 try:
2406                     cap, path = l.split(None, 1)
2407                 except ValueError:
2408                     cap = l.strip()
2409                     path = ""
2410                 caps[cap] = path
2411             self.failUnless(self.root.get_uri() in caps)
2412             self.failUnlessEqual(caps[self.root.get_uri()], "")
2413             self.failUnlessEqual(caps[self.mutable.get_uri()], "mutable")
2414             self.failUnlessEqual(caps[self.large.get_uri()], "large")
2415             self.failUnlessEqual(caps[self.small.get_uri()], "small")
2416             self.failUnlessEqual(caps[self.small2.get_uri()], "small2")
2417         d.addCallback(_check)
2418
2419         d.addCallback(lambda res:
2420                       self._run_cli(["manifest",
2421                                      "--node-directory", basedir,
2422                                      "--storage-index", self.root_uri]))
2423         def _check2((out,err)):
2424             self.failUnlessEqual(err, "")
2425             lines = [l for l in out.split("\n") if l]
2426             self.failUnlessEqual(len(lines), 3)
2427             self.failUnless(base32.b2a(self.root.get_storage_index()) in lines)
2428             self.failUnless(base32.b2a(self.mutable.get_storage_index()) in lines)
2429             self.failUnless(base32.b2a(self.large.get_storage_index()) in lines)
2430         d.addCallback(_check2)
2431
2432         d.addCallback(lambda res:
2433                       self._run_cli(["manifest",
2434                                      "--node-directory", basedir,
2435                                      "--raw", self.root_uri]))
2436         def _check2r((out,err)):
2437             self.failUnlessEqual(err, "")
2438             data = simplejson.loads(out)
2439             sis = data["storage-index"]
2440             self.failUnlessEqual(len(sis), 3)
2441             self.failUnless(base32.b2a(self.root.get_storage_index()) in sis)
2442             self.failUnless(base32.b2a(self.mutable.get_storage_index()) in sis)
2443             self.failUnless(base32.b2a(self.large.get_storage_index()) in sis)
2444             self.failUnlessEqual(data["stats"]["count-files"], 4)
2445             self.failUnlessEqual(data["origin"],
2446                                  base32.b2a(self.root.get_storage_index()))
2447             verifycaps = data["verifycaps"]
2448             self.failUnlessEqual(len(verifycaps), 3)
2449             self.failUnless(self.root.get_verifier().to_string() in verifycaps)
2450             self.failUnless(self.mutable.get_verifier().to_string() in verifycaps)
2451             self.failUnless(self.large.get_verifier().to_string() in verifycaps)
2452         d.addCallback(_check2r)
2453
2454         d.addCallback(lambda res:
2455                       self._run_cli(["stats",
2456                                      "--node-directory", basedir,
2457                                      self.root_uri]))
2458         def _check3((out,err)):
2459             lines = [l.strip() for l in out.split("\n") if l]
2460             self.failUnless("count-immutable-files: 1" in lines)
2461             self.failUnless("count-mutable-files: 1" in lines)
2462             self.failUnless("count-literal-files: 2" in lines)
2463             self.failUnless("count-files: 4" in lines)
2464             self.failUnless("count-directories: 1" in lines)
2465             self.failUnless("size-immutable-files: 13000    (13.00 kB, 12.70 kiB)" in lines, lines)
2466             self.failUnless("size-literal-files: 48" in lines)
2467             self.failUnless("   11-31    : 2    (31 B, 31 B)".strip() in lines)
2468             self.failUnless("10001-31622 : 1    (31.62 kB, 30.88 kiB)".strip() in lines)
2469         d.addCallback(_check3)
2470
2471         d.addCallback(lambda res:
2472                       self._run_cli(["stats",
2473                                      "--node-directory", basedir,
2474                                      "--raw",
2475                                      self.root_uri]))
2476         def _check4((out,err)):
2477             data = simplejson.loads(out)
2478             self.failUnlessEqual(data["count-immutable-files"], 1)
2479             self.failUnlessEqual(data["count-immutable-files"], 1)
2480             self.failUnlessEqual(data["count-mutable-files"], 1)
2481             self.failUnlessEqual(data["count-literal-files"], 2)
2482             self.failUnlessEqual(data["count-files"], 4)
2483             self.failUnlessEqual(data["count-directories"], 1)
2484             self.failUnlessEqual(data["size-immutable-files"], 13000)
2485             self.failUnlessEqual(data["size-literal-files"], 48)
2486             self.failUnless([11,31,2] in data["size-files-histogram"])
2487             self.failUnless([10001,31622,1] in data["size-files-histogram"])
2488         d.addCallback(_check4)
2489
2490         return d
2491
2492
2493 class DeepCheckWebBad(DeepCheckBase, unittest.TestCase):
2494
2495     def test_bad(self):
2496         self.basedir = self.mktemp()
2497         d = self.set_up_nodes()
2498         d.addCallback(self.set_up_damaged_tree)
2499         d.addCallback(self.do_test_check_bad)
2500         d.addCallback(self.do_test_deepcheck_bad)
2501         d.addCallback(self.do_test_web_bad)
2502         d.addErrback(self.explain_web_error)
2503         d.addErrback(self.explain_error)
2504         return d
2505
2506
2507
2508     def set_up_damaged_tree(self, ignored):
2509         # 6.4s
2510
2511         # root
2512         #   mutable-good
2513         #   mutable-missing-shares
2514         #   mutable-corrupt-shares
2515         #   mutable-unrecoverable
2516         #   large-good
2517         #   large-missing-shares
2518         #   large-corrupt-shares
2519         #   large-unrecoverable
2520
2521         self.nodes = {}
2522
2523         c0 = self.clients[0]
2524         d = c0.create_empty_dirnode()
2525         def _created_root(n):
2526             self.root = n
2527             self.root_uri = n.get_uri()
2528         d.addCallback(_created_root)
2529         d.addCallback(self.create_mangled, "mutable-good")
2530         d.addCallback(self.create_mangled, "mutable-missing-shares")
2531         d.addCallback(self.create_mangled, "mutable-corrupt-shares")
2532         d.addCallback(self.create_mangled, "mutable-unrecoverable")
2533         d.addCallback(self.create_mangled, "large-good")
2534         d.addCallback(self.create_mangled, "large-missing-shares")
2535         d.addCallback(self.create_mangled, "large-corrupt-shares")
2536         d.addCallback(self.create_mangled, "large-unrecoverable")
2537
2538         return d
2539
2540
2541     def create_mangled(self, ignored, name):
2542         nodetype, mangletype = name.split("-", 1)
2543         if nodetype == "mutable":
2544             d = self.clients[0].create_mutable_file("mutable file contents")
2545             d.addCallback(lambda n: self.root.set_node(unicode(name), n))
2546         elif nodetype == "large":
2547             large = upload.Data("Lots of data\n" * 1000 + name + "\n", None)
2548             d = self.root.add_file(unicode(name), large)
2549         elif nodetype == "small":
2550             small = upload.Data("Small enough for a LIT", None)
2551             d = self.root.add_file(unicode(name), small)
2552
2553         def _stash_node(node):
2554             self.nodes[name] = node
2555             return node
2556         d.addCallback(_stash_node)
2557
2558         if mangletype == "good":
2559             pass
2560         elif mangletype == "missing-shares":
2561             d.addCallback(self._delete_some_shares)
2562         elif mangletype == "corrupt-shares":
2563             d.addCallback(self._corrupt_some_shares)
2564         else:
2565             assert mangletype == "unrecoverable"
2566             d.addCallback(self._delete_most_shares)
2567
2568         return d
2569
2570     def _run_cli(self, argv):
2571         stdout, stderr = StringIO(), StringIO()
2572         # this can only do synchronous operations
2573         assert argv[0] == "debug"
2574         runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
2575         return stdout.getvalue()
2576
2577     def _find_shares(self, node):
2578         si = node.get_storage_index()
2579         out = self._run_cli(["debug", "find-shares", base32.b2a(si)] +
2580                             [c.basedir for c in self.clients])
2581         files = out.split("\n")
2582         return [f for f in files if f]
2583
2584     def _delete_some_shares(self, node):
2585         shares = self._find_shares(node)
2586         os.unlink(shares[0])
2587         os.unlink(shares[1])
2588
2589     def _corrupt_some_shares(self, node):
2590         shares = self._find_shares(node)
2591         self._run_cli(["debug", "corrupt-share", shares[0]])
2592         self._run_cli(["debug", "corrupt-share", shares[1]])
2593
2594     def _delete_most_shares(self, node):
2595         shares = self._find_shares(node)
2596         for share in shares[1:]:
2597             os.unlink(share)
2598
2599
2600     def check_is_healthy(self, cr, where):
2601         self.failUnless(ICheckerResults.providedBy(cr), where)
2602         self.failUnless(cr.is_healthy(), where)
2603         self.failUnless(cr.is_recoverable(), where)
2604         d = cr.get_data()
2605         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2606         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2607         return cr
2608
2609     def check_is_missing_shares(self, cr, where):
2610         self.failUnless(ICheckerResults.providedBy(cr), where)
2611         self.failIf(cr.is_healthy(), where)
2612         self.failUnless(cr.is_recoverable(), where)
2613         d = cr.get_data()
2614         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2615         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2616         return cr
2617
2618     def check_has_corrupt_shares(self, cr, where):
2619         # by "corrupt-shares" we mean the file is still recoverable
2620         self.failUnless(ICheckerResults.providedBy(cr), where)
2621         d = cr.get_data()
2622         self.failIf(cr.is_healthy(), where)
2623         self.failUnless(cr.is_recoverable(), where)
2624         d = cr.get_data()
2625         self.failUnless(d["count-shares-good"] < 10, where)
2626         self.failUnless(d["count-corrupt-shares"], where)
2627         self.failUnless(d["list-corrupt-shares"], where)
2628         return cr
2629
2630     def check_is_unrecoverable(self, cr, where):
2631         self.failUnless(ICheckerResults.providedBy(cr), where)
2632         d = cr.get_data()
2633         self.failIf(cr.is_healthy(), where)
2634         self.failIf(cr.is_recoverable(), where)
2635         self.failUnless(d["count-shares-good"] < d["count-shares-needed"],
2636                         where)
2637         self.failUnlessEqual(d["count-recoverable-versions"], 0, where)
2638         self.failUnlessEqual(d["count-unrecoverable-versions"], 1, where)
2639         return cr
2640
2641     def do_test_check_bad(self, ignored):
2642         d = defer.succeed(None)
2643
2644         # check the individual items, without verification. This will not
2645         # detect corrupt shares.
2646         def _check(which, checker):
2647             d = self.nodes[which].check(Monitor())
2648             d.addCallback(checker, which + "--check")
2649             return d
2650
2651         d.addCallback(lambda ign: _check("mutable-good", self.check_is_healthy))
2652         d.addCallback(lambda ign: _check("mutable-missing-shares",
2653                                          self.check_is_missing_shares))
2654         d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2655                                          self.check_is_healthy))
2656         d.addCallback(lambda ign: _check("mutable-unrecoverable",
2657                                          self.check_is_unrecoverable))
2658         d.addCallback(lambda ign: _check("large-good", self.check_is_healthy))
2659         d.addCallback(lambda ign: _check("large-missing-shares",
2660                                          self.check_is_missing_shares))
2661         d.addCallback(lambda ign: _check("large-corrupt-shares",
2662                                          self.check_is_healthy))
2663         d.addCallback(lambda ign: _check("large-unrecoverable",
2664                                          self.check_is_unrecoverable))
2665
2666         # and again with verify=True, which *does* detect corrupt shares.
2667         def _checkv(which, checker):
2668             d = self.nodes[which].check(Monitor(), verify=True)
2669             d.addCallback(checker, which + "--check-and-verify")
2670             return d
2671
2672         d.addCallback(lambda ign: _checkv("mutable-good", self.check_is_healthy))
2673         d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2674                                          self.check_is_missing_shares))
2675         d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2676                                          self.check_has_corrupt_shares))
2677         d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2678                                          self.check_is_unrecoverable))
2679         d.addCallback(lambda ign: _checkv("large-good", self.check_is_healthy))
2680         # disabled pending immutable verifier
2681         #d.addCallback(lambda ign: _checkv("large-missing-shares",
2682         #                                 self.check_is_missing_shares))
2683         #d.addCallback(lambda ign: _checkv("large-corrupt-shares",
2684         #                                 self.check_has_corrupt_shares))
2685         d.addCallback(lambda ign: _checkv("large-unrecoverable",
2686                                          self.check_is_unrecoverable))
2687
2688         return d
2689
2690     def do_test_deepcheck_bad(self, ignored):
2691         d = defer.succeed(None)
2692
2693         # now deep-check the root, with various verify= and repair= options
2694         d.addCallback(lambda ign:
2695                       self.root.start_deep_check().when_done())
2696         def _check1(cr):
2697             self.failUnless(IDeepCheckResults.providedBy(cr))
2698             c = cr.get_counters()
2699             self.failUnlessEqual(c["count-objects-checked"], 9)
2700             self.failUnlessEqual(c["count-objects-healthy"], 5)
2701             self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2702             self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2703         d.addCallback(_check1)
2704
2705         d.addCallback(lambda ign:
2706                       self.root.start_deep_check(verify=True).when_done())
2707         def _check2(cr):
2708             self.failUnless(IDeepCheckResults.providedBy(cr))
2709             c = cr.get_counters()
2710             self.failUnlessEqual(c["count-objects-checked"], 9)
2711             # until we have a real immutable verifier, these counts will be
2712             # off
2713             #self.failUnlessEqual(c["count-objects-healthy"], 3)
2714             #self.failUnlessEqual(c["count-objects-unhealthy"], 6)
2715             self.failUnlessEqual(c["count-objects-healthy"], 5) # todo
2716             self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2717             self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2718         d.addCallback(_check2)
2719
2720         return d
2721
2722     def json_is_healthy(self, data, where):
2723         r = data["results"]
2724         self.failUnless(r["healthy"], where)
2725         self.failUnless(r["recoverable"], where)
2726         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2727         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2728
2729     def json_is_missing_shares(self, data, where):
2730         r = data["results"]
2731         self.failIf(r["healthy"], where)
2732         self.failUnless(r["recoverable"], where)
2733         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2734         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2735
2736     def json_has_corrupt_shares(self, data, where):
2737         # by "corrupt-shares" we mean the file is still recoverable
2738         r = data["results"]
2739         self.failIf(r["healthy"], where)
2740         self.failUnless(r["recoverable"], where)
2741         self.failUnless(r["count-shares-good"] < 10, where)
2742         self.failUnless(r["count-corrupt-shares"], where)
2743         self.failUnless(r["list-corrupt-shares"], where)
2744
2745     def json_is_unrecoverable(self, data, where):
2746         r = data["results"]
2747         self.failIf(r["healthy"], where)
2748         self.failIf(r["recoverable"], where)
2749         self.failUnless(r["count-shares-good"] < r["count-shares-needed"],
2750                         where)
2751         self.failUnlessEqual(r["count-recoverable-versions"], 0, where)
2752         self.failUnlessEqual(r["count-unrecoverable-versions"], 1, where)
2753
2754     def do_test_web_bad(self, ignored):
2755         d = defer.succeed(None)
2756
2757         # check, no verify
2758         def _check(which, checker):
2759             d = self.web_json(self.nodes[which], t="check")
2760             d.addCallback(checker, which + "--webcheck")
2761             return d
2762
2763         d.addCallback(lambda ign: _check("mutable-good",
2764                                          self.json_is_healthy))
2765         d.addCallback(lambda ign: _check("mutable-missing-shares",
2766                                          self.json_is_missing_shares))
2767         d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2768                                          self.json_is_healthy))
2769         d.addCallback(lambda ign: _check("mutable-unrecoverable",
2770                                          self.json_is_unrecoverable))
2771         d.addCallback(lambda ign: _check("large-good",
2772                                          self.json_is_healthy))
2773         d.addCallback(lambda ign: _check("large-missing-shares",
2774                                          self.json_is_missing_shares))
2775         d.addCallback(lambda ign: _check("large-corrupt-shares",
2776                                          self.json_is_healthy))
2777         d.addCallback(lambda ign: _check("large-unrecoverable",
2778                                          self.json_is_unrecoverable))
2779
2780         # check and verify
2781         def _checkv(which, checker):
2782             d = self.web_json(self.nodes[which], t="check", verify="true")
2783             d.addCallback(checker, which + "--webcheck-and-verify")
2784             return d
2785
2786         d.addCallback(lambda ign: _checkv("mutable-good",
2787                                           self.json_is_healthy))
2788         d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2789                                          self.json_is_missing_shares))
2790         d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2791                                          self.json_has_corrupt_shares))
2792         d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2793                                          self.json_is_unrecoverable))
2794         d.addCallback(lambda ign: _checkv("large-good",
2795                                           self.json_is_healthy))
2796         # disabled pending immutable verifier
2797         #d.addCallback(lambda ign: _checkv("large-missing-shares",
2798         #                                 self.json_is_missing_shares))
2799         #d.addCallback(lambda ign: _checkv("large-corrupt-shares",
2800         #                                 self.json_has_corrupt_shares))
2801         d.addCallback(lambda ign: _checkv("large-unrecoverable",
2802                                          self.json_is_unrecoverable))
2803
2804         return d