]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
#590: add webish t=stream-manifest
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri, storage
12 from allmydata.immutable import download, filenode, offloaded, upload
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17      ICheckResults, ICheckAndRepairResults, IDeepCheckResults, \
18      IDeepCheckAndRepairResults, NoSuchChildError, NotEnoughSharesError
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
26
27 from allmydata.test.common import SystemTestMixin, ErrorMixin, \
28      MemoryConsumer, download_to_data
29
30 LARGE_DATA = """
31 This is some data to publish to the virtual drive, which needs to be large
32 enough to not fit inside a LIT uri.
33 """
34
35 class CountingDataUploadable(upload.Data):
36     bytes_read = 0
37     interrupt_after = None
38     interrupt_after_d = None
39
40     def read(self, length):
41         self.bytes_read += length
42         if self.interrupt_after is not None:
43             if self.bytes_read > self.interrupt_after:
44                 self.interrupt_after = None
45                 self.interrupt_after_d.callback(self)
46         return upload.Data.read(self, length)
47
48 class GrabEverythingConsumer:
49     implements(IConsumer)
50
51     def __init__(self):
52         self.contents = ""
53
54     def registerProducer(self, producer, streaming):
55         assert streaming
56         assert IPushProducer.providedBy(producer)
57
58     def write(self, data):
59         self.contents += data
60
61     def unregisterProducer(self):
62         pass
63
64 class SystemTest(SystemTestMixin, unittest.TestCase):
65
66     def test_connections(self):
67         self.basedir = "system/SystemTest/test_connections"
68         d = self.set_up_nodes()
69         self.extra_node = None
70         d.addCallback(lambda res: self.add_extra_node(self.numclients))
71         def _check(extra_node):
72             self.extra_node = extra_node
73             for c in self.clients:
74                 all_peerids = list(c.get_all_peerids())
75                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
76                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
77                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
78
79         d.addCallback(_check)
80         def _shutdown_extra_node(res):
81             if self.extra_node:
82                 return self.extra_node.stopService()
83             return res
84         d.addBoth(_shutdown_extra_node)
85         return d
86     test_connections.timeout = 300
87     # test_connections is subsumed by test_upload_and_download, and takes
88     # quite a while to run on a slow machine (because of all the TLS
89     # connections that must be established). If we ever rework the introducer
90     # code to such an extent that we're not sure if it works anymore, we can
91     # reinstate this test until it does.
92     del test_connections
93
94     def test_upload_and_download_random_key(self):
95         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
96         return self._test_upload_and_download(convergence=None)
97     test_upload_and_download_random_key.timeout = 4800
98
99     def test_upload_and_download_convergent(self):
100         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
101         return self._test_upload_and_download(convergence="some convergence string")
102     test_upload_and_download_convergent.timeout = 4800
103
104     def _test_upload_and_download(self, convergence):
105         # we use 4000 bytes of data, which will result in about 400k written
106         # to disk among all our simulated nodes
107         DATA = "Some data to upload\n" * 200
108         d = self.set_up_nodes()
109         def _check_connections(res):
110             for c in self.clients:
111                 all_peerids = list(c.get_all_peerids())
112                 self.failUnlessEqual(len(all_peerids), self.numclients)
113                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
114                 self.failUnlessEqual(len(permuted_peers), self.numclients)
115         d.addCallback(_check_connections)
116
117         def _do_upload(res):
118             log.msg("UPLOADING")
119             u = self.clients[0].getServiceNamed("uploader")
120             self.uploader = u
121             # we crank the max segsize down to 1024b for the duration of this
122             # test, so we can exercise multiple segments. It is important
123             # that this is not a multiple of the segment size, so that the
124             # tail segment is not the same length as the others. This actualy
125             # gets rounded up to 1025 to be a multiple of the number of
126             # required shares (since we use 25 out of 100 FEC).
127             up = upload.Data(DATA, convergence=convergence)
128             up.max_segment_size = 1024
129             d1 = u.upload(up)
130             return d1
131         d.addCallback(_do_upload)
132         def _upload_done(results):
133             theuri = results.uri
134             log.msg("upload finished: uri is %s" % (theuri,))
135             self.uri = theuri
136             assert isinstance(self.uri, str), self.uri
137             dl = self.clients[1].getServiceNamed("downloader")
138             self.downloader = dl
139         d.addCallback(_upload_done)
140
141         def _upload_again(res):
142             # Upload again. If using convergent encryption then this ought to be
143             # short-circuited, however with the way we currently generate URIs
144             # (i.e. because they include the roothash), we have to do all of the
145             # encoding work, and only get to save on the upload part.
146             log.msg("UPLOADING AGAIN")
147             up = upload.Data(DATA, convergence=convergence)
148             up.max_segment_size = 1024
149             d1 = self.uploader.upload(up)
150         d.addCallback(_upload_again)
151
152         def _download_to_data(res):
153             log.msg("DOWNLOADING")
154             return self.downloader.download_to_data(self.uri)
155         d.addCallback(_download_to_data)
156         def _download_to_data_done(data):
157             log.msg("download finished")
158             self.failUnlessEqual(data, DATA)
159         d.addCallback(_download_to_data_done)
160
161         target_filename = os.path.join(self.basedir, "download.target")
162         def _download_to_filename(res):
163             return self.downloader.download_to_filename(self.uri,
164                                                         target_filename)
165         d.addCallback(_download_to_filename)
166         def _download_to_filename_done(res):
167             newdata = open(target_filename, "rb").read()
168             self.failUnlessEqual(newdata, DATA)
169         d.addCallback(_download_to_filename_done)
170
171         target_filename2 = os.path.join(self.basedir, "download.target2")
172         def _download_to_filehandle(res):
173             fh = open(target_filename2, "wb")
174             return self.downloader.download_to_filehandle(self.uri, fh)
175         d.addCallback(_download_to_filehandle)
176         def _download_to_filehandle_done(fh):
177             fh.close()
178             newdata = open(target_filename2, "rb").read()
179             self.failUnlessEqual(newdata, DATA)
180         d.addCallback(_download_to_filehandle_done)
181
182         consumer = GrabEverythingConsumer()
183         ct = download.ConsumerAdapter(consumer)
184         d.addCallback(lambda res:
185                       self.downloader.download(self.uri, ct))
186         def _download_to_consumer_done(ign):
187             self.failUnlessEqual(consumer.contents, DATA)
188         d.addCallback(_download_to_consumer_done)
189
190         def _test_read(res):
191             n = self.clients[1].create_node_from_uri(self.uri)
192             d = download_to_data(n)
193             def _read_done(data):
194                 self.failUnlessEqual(data, DATA)
195             d.addCallback(_read_done)
196             d.addCallback(lambda ign:
197                           n.read(MemoryConsumer(), offset=1, size=4))
198             def _read_portion_done(mc):
199                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
200             d.addCallback(_read_portion_done)
201             d.addCallback(lambda ign:
202                           n.read(MemoryConsumer(), offset=2, size=None))
203             def _read_tail_done(mc):
204                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
205             d.addCallback(_read_tail_done)
206             d.addCallback(lambda ign:
207                           n.read(MemoryConsumer(), size=len(DATA)+1000))
208             def _read_too_much(mc):
209                 self.failUnlessEqual("".join(mc.chunks), DATA)
210             d.addCallback(_read_too_much)
211
212             return d
213         d.addCallback(_test_read)
214
215         def _test_bad_read(res):
216             bad_u = uri.from_string_filenode(self.uri)
217             bad_u.key = self.flip_bit(bad_u.key)
218             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
219             # this should cause an error during download
220
221             d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
222                                  None,
223                                  bad_n.read, MemoryConsumer(), offset=2)
224             return d
225         d.addCallback(_test_bad_read)
226
227         def _download_nonexistent_uri(res):
228             baduri = self.mangle_uri(self.uri)
229             log.msg("about to download non-existent URI", level=log.UNUSUAL,
230                     facility="tahoe.tests")
231             d1 = self.downloader.download_to_data(baduri)
232             def _baduri_should_fail(res):
233                 log.msg("finished downloading non-existend URI",
234                         level=log.UNUSUAL, facility="tahoe.tests")
235                 self.failUnless(isinstance(res, Failure))
236                 self.failUnless(res.check(NotEnoughSharesError),
237                                 "expected NotEnoughSharesError, got %s" % res)
238                 # TODO: files that have zero peers should get a special kind
239                 # of NotEnoughSharesError, which can be used to suggest that
240                 # the URI might be wrong or that they've never uploaded the
241                 # file in the first place.
242             d1.addBoth(_baduri_should_fail)
243             return d1
244         d.addCallback(_download_nonexistent_uri)
245
246         # add a new node, which doesn't accept shares, and only uses the
247         # helper for upload.
248         d.addCallback(lambda res: self.add_extra_node(self.numclients,
249                                                       self.helper_furl,
250                                                       add_to_sparent=True))
251         def _added(extra_node):
252             self.extra_node = extra_node
253         d.addCallback(_added)
254
255         HELPER_DATA = "Data that needs help to upload" * 1000
256         def _upload_with_helper(res):
257             u = upload.Data(HELPER_DATA, convergence=convergence)
258             d = self.extra_node.upload(u)
259             def _uploaded(results):
260                 uri = results.uri
261                 return self.downloader.download_to_data(uri)
262             d.addCallback(_uploaded)
263             def _check(newdata):
264                 self.failUnlessEqual(newdata, HELPER_DATA)
265             d.addCallback(_check)
266             return d
267         d.addCallback(_upload_with_helper)
268
269         def _upload_duplicate_with_helper(res):
270             u = upload.Data(HELPER_DATA, convergence=convergence)
271             u.debug_stash_RemoteEncryptedUploadable = True
272             d = self.extra_node.upload(u)
273             def _uploaded(results):
274                 uri = results.uri
275                 return self.downloader.download_to_data(uri)
276             d.addCallback(_uploaded)
277             def _check(newdata):
278                 self.failUnlessEqual(newdata, HELPER_DATA)
279                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
280                             "uploadable started uploading, should have been avoided")
281             d.addCallback(_check)
282             return d
283         if convergence is not None:
284             d.addCallback(_upload_duplicate_with_helper)
285
286         def _upload_resumable(res):
287             DATA = "Data that needs help to upload and gets interrupted" * 1000
288             u1 = CountingDataUploadable(DATA, convergence=convergence)
289             u2 = CountingDataUploadable(DATA, convergence=convergence)
290
291             # we interrupt the connection after about 5kB by shutting down
292             # the helper, then restartingit.
293             u1.interrupt_after = 5000
294             u1.interrupt_after_d = defer.Deferred()
295             u1.interrupt_after_d.addCallback(lambda res:
296                                              self.bounce_client(0))
297
298             # sneak into the helper and reduce its chunk size, so that our
299             # debug_interrupt will sever the connection on about the fifth
300             # chunk fetched. This makes sure that we've started to write the
301             # new shares before we abandon them, which exercises the
302             # abort/delete-partial-share code. TODO: find a cleaner way to do
303             # this. I know that this will affect later uses of the helper in
304             # this same test run, but I'm not currently worried about it.
305             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
306
307             d = self.extra_node.upload(u1)
308
309             def _should_not_finish(res):
310                 self.fail("interrupted upload should have failed, not finished"
311                           " with result %s" % (res,))
312             def _interrupted(f):
313                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
314
315                 # make sure we actually interrupted it before finishing the
316                 # file
317                 self.failUnless(u1.bytes_read < len(DATA),
318                                 "read %d out of %d total" % (u1.bytes_read,
319                                                              len(DATA)))
320
321                 log.msg("waiting for reconnect", level=log.NOISY,
322                         facility="tahoe.test.test_system")
323                 # now, we need to give the nodes a chance to notice that this
324                 # connection has gone away. When this happens, the storage
325                 # servers will be told to abort their uploads, removing the
326                 # partial shares. Unfortunately this involves TCP messages
327                 # going through the loopback interface, and we can't easily
328                 # predict how long that will take. If it were all local, we
329                 # could use fireEventually() to stall. Since we don't have
330                 # the right introduction hooks, the best we can do is use a
331                 # fixed delay. TODO: this is fragile.
332                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
333                 return u1.interrupt_after_d
334             d.addCallbacks(_should_not_finish, _interrupted)
335
336             def _disconnected(res):
337                 # check to make sure the storage servers aren't still hanging
338                 # on to the partial share: their incoming/ directories should
339                 # now be empty.
340                 log.msg("disconnected", level=log.NOISY,
341                         facility="tahoe.test.test_system")
342                 for i in range(self.numclients):
343                     incdir = os.path.join(self.getdir("client%d" % i),
344                                           "storage", "shares", "incoming")
345                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
346             d.addCallback(_disconnected)
347
348             # then we need to give the reconnector a chance to
349             # reestablish the connection to the helper.
350             d.addCallback(lambda res:
351                           log.msg("wait_for_connections", level=log.NOISY,
352                                   facility="tahoe.test.test_system"))
353             d.addCallback(lambda res: self.wait_for_connections())
354
355
356             d.addCallback(lambda res:
357                           log.msg("uploading again", level=log.NOISY,
358                                   facility="tahoe.test.test_system"))
359             d.addCallback(lambda res: self.extra_node.upload(u2))
360
361             def _uploaded(results):
362                 uri = results.uri
363                 log.msg("Second upload complete", level=log.NOISY,
364                         facility="tahoe.test.test_system")
365
366                 # this is really bytes received rather than sent, but it's
367                 # convenient and basically measures the same thing
368                 bytes_sent = results.ciphertext_fetched
369
370                 # We currently don't support resumption of upload if the data is
371                 # encrypted with a random key.  (Because that would require us
372                 # to store the key locally and re-use it on the next upload of
373                 # this file, which isn't a bad thing to do, but we currently
374                 # don't do it.)
375                 if convergence is not None:
376                     # Make sure we did not have to read the whole file the
377                     # second time around .
378                     self.failUnless(bytes_sent < len(DATA),
379                                 "resumption didn't save us any work:"
380                                 " read %d bytes out of %d total" %
381                                 (bytes_sent, len(DATA)))
382                 else:
383                     # Make sure we did have to read the whole file the second
384                     # time around -- because the one that we partially uploaded
385                     # earlier was encrypted with a different random key.
386                     self.failIf(bytes_sent < len(DATA),
387                                 "resumption saved us some work even though we were using random keys:"
388                                 " read %d bytes out of %d total" %
389                                 (bytes_sent, len(DATA)))
390                 return self.downloader.download_to_data(uri)
391             d.addCallback(_uploaded)
392
393             def _check(newdata):
394                 self.failUnlessEqual(newdata, DATA)
395                 # If using convergent encryption, then also check that the
396                 # helper has removed the temp file from its directories.
397                 if convergence is not None:
398                     basedir = os.path.join(self.getdir("client0"), "helper")
399                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
400                     self.failUnlessEqual(files, [])
401                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
402                     self.failUnlessEqual(files, [])
403             d.addCallback(_check)
404             return d
405         d.addCallback(_upload_resumable)
406
407         def _grab_stats(ignored):
408             # the StatsProvider doesn't normally publish a FURL:
409             # instead it passes a live reference to the StatsGatherer
410             # (if and when it connects). To exercise the remote stats
411             # interface, we manually publish client0's StatsProvider
412             # and use client1 to query it.
413             sp = self.clients[0].stats_provider
414             sp_furl = self.clients[0].tub.registerReference(sp)
415             d = self.clients[1].tub.getReference(sp_furl)
416             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
417             def _got_stats(stats):
418                 #print "STATS"
419                 #from pprint import pprint
420                 #pprint(stats)
421                 s = stats["stats"]
422                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
423                 c = stats["counters"]
424                 self.failUnless("storage_server.allocate" in c)
425             d.addCallback(_got_stats)
426             return d
427         d.addCallback(_grab_stats)
428
429         return d
430
431     def _find_shares(self, basedir):
432         shares = []
433         for (dirpath, dirnames, filenames) in os.walk(basedir):
434             if "storage" not in dirpath:
435                 continue
436             if not filenames:
437                 continue
438             pieces = dirpath.split(os.sep)
439             if pieces[-4] == "storage" and pieces[-3] == "shares":
440                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
441                 # are sharefiles here
442                 assert pieces[-5].startswith("client")
443                 client_num = int(pieces[-5][-1])
444                 storage_index_s = pieces[-1]
445                 storage_index = storage.si_a2b(storage_index_s)
446                 for sharename in filenames:
447                     shnum = int(sharename)
448                     filename = os.path.join(dirpath, sharename)
449                     data = (client_num, storage_index, filename, shnum)
450                     shares.append(data)
451         if not shares:
452             self.fail("unable to find any share files in %s" % basedir)
453         return shares
454
455     def _corrupt_mutable_share(self, filename, which):
456         msf = storage.MutableShareFile(filename)
457         datav = msf.readv([ (0, 1000000) ])
458         final_share = datav[0]
459         assert len(final_share) < 1000000 # ought to be truncated
460         pieces = mutable_layout.unpack_share(final_share)
461         (seqnum, root_hash, IV, k, N, segsize, datalen,
462          verification_key, signature, share_hash_chain, block_hash_tree,
463          share_data, enc_privkey) = pieces
464
465         if which == "seqnum":
466             seqnum = seqnum + 15
467         elif which == "R":
468             root_hash = self.flip_bit(root_hash)
469         elif which == "IV":
470             IV = self.flip_bit(IV)
471         elif which == "segsize":
472             segsize = segsize + 15
473         elif which == "pubkey":
474             verification_key = self.flip_bit(verification_key)
475         elif which == "signature":
476             signature = self.flip_bit(signature)
477         elif which == "share_hash_chain":
478             nodenum = share_hash_chain.keys()[0]
479             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
480         elif which == "block_hash_tree":
481             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
482         elif which == "share_data":
483             share_data = self.flip_bit(share_data)
484         elif which == "encprivkey":
485             enc_privkey = self.flip_bit(enc_privkey)
486
487         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
488                                             segsize, datalen)
489         final_share = mutable_layout.pack_share(prefix,
490                                                 verification_key,
491                                                 signature,
492                                                 share_hash_chain,
493                                                 block_hash_tree,
494                                                 share_data,
495                                                 enc_privkey)
496         msf.writev( [(0, final_share)], None)
497
498
499     def test_mutable(self):
500         self.basedir = "system/SystemTest/test_mutable"
501         DATA = "initial contents go here."  # 25 bytes % 3 != 0
502         NEWDATA = "new contents yay"
503         NEWERDATA = "this is getting old"
504
505         d = self.set_up_nodes(use_key_generator=True)
506
507         def _create_mutable(res):
508             c = self.clients[0]
509             log.msg("starting create_mutable_file")
510             d1 = c.create_mutable_file(DATA)
511             def _done(res):
512                 log.msg("DONE: %s" % (res,))
513                 self._mutable_node_1 = res
514                 uri = res.get_uri()
515             d1.addCallback(_done)
516             return d1
517         d.addCallback(_create_mutable)
518
519         def _test_debug(res):
520             # find a share. It is important to run this while there is only
521             # one slot in the grid.
522             shares = self._find_shares(self.basedir)
523             (client_num, storage_index, filename, shnum) = shares[0]
524             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
525                     % filename)
526             log.msg(" for clients[%d]" % client_num)
527
528             out,err = StringIO(), StringIO()
529             rc = runner.runner(["debug", "dump-share", "--offsets",
530                                 filename],
531                                stdout=out, stderr=err)
532             output = out.getvalue()
533             self.failUnlessEqual(rc, 0)
534             try:
535                 self.failUnless("Mutable slot found:\n" in output)
536                 self.failUnless("share_type: SDMF\n" in output)
537                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
538                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
539                 self.failUnless(" num_extra_leases: 0\n" in output)
540                 # the pubkey size can vary by a byte, so the container might
541                 # be a bit larger on some runs.
542                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
543                 self.failUnless(m)
544                 container_size = int(m.group(1))
545                 self.failUnless(2037 <= container_size <= 2049, container_size)
546                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
547                 self.failUnless(m)
548                 data_length = int(m.group(1))
549                 self.failUnless(2037 <= data_length <= 2049, data_length)
550                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
551                                 in output)
552                 self.failUnless(" SDMF contents:\n" in output)
553                 self.failUnless("  seqnum: 1\n" in output)
554                 self.failUnless("  required_shares: 3\n" in output)
555                 self.failUnless("  total_shares: 10\n" in output)
556                 self.failUnless("  segsize: 27\n" in output, (output, filename))
557                 self.failUnless("  datalen: 25\n" in output)
558                 # the exact share_hash_chain nodes depends upon the sharenum,
559                 # and is more of a hassle to compute than I want to deal with
560                 # now
561                 self.failUnless("  share_hash_chain: " in output)
562                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
563                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
564                             base32.b2a(storage_index))
565                 self.failUnless(expected in output)
566             except unittest.FailTest:
567                 print
568                 print "dump-share output was:"
569                 print output
570                 raise
571         d.addCallback(_test_debug)
572
573         # test retrieval
574
575         # first, let's see if we can use the existing node to retrieve the
576         # contents. This allows it to use the cached pubkey and maybe the
577         # latest-known sharemap.
578
579         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
580         def _check_download_1(res):
581             self.failUnlessEqual(res, DATA)
582             # now we see if we can retrieve the data from a new node,
583             # constructed using the URI of the original one. We do this test
584             # on the same client that uploaded the data.
585             uri = self._mutable_node_1.get_uri()
586             log.msg("starting retrieve1")
587             newnode = self.clients[0].create_node_from_uri(uri)
588             newnode_2 = self.clients[0].create_node_from_uri(uri)
589             self.failUnlessIdentical(newnode, newnode_2)
590             return newnode.download_best_version()
591         d.addCallback(_check_download_1)
592
593         def _check_download_2(res):
594             self.failUnlessEqual(res, DATA)
595             # same thing, but with a different client
596             uri = self._mutable_node_1.get_uri()
597             newnode = self.clients[1].create_node_from_uri(uri)
598             log.msg("starting retrieve2")
599             d1 = newnode.download_best_version()
600             d1.addCallback(lambda res: (res, newnode))
601             return d1
602         d.addCallback(_check_download_2)
603
604         def _check_download_3((res, newnode)):
605             self.failUnlessEqual(res, DATA)
606             # replace the data
607             log.msg("starting replace1")
608             d1 = newnode.overwrite(NEWDATA)
609             d1.addCallback(lambda res: newnode.download_best_version())
610             return d1
611         d.addCallback(_check_download_3)
612
613         def _check_download_4(res):
614             self.failUnlessEqual(res, NEWDATA)
615             # now create an even newer node and replace the data on it. This
616             # new node has never been used for download before.
617             uri = self._mutable_node_1.get_uri()
618             newnode1 = self.clients[2].create_node_from_uri(uri)
619             newnode2 = self.clients[3].create_node_from_uri(uri)
620             self._newnode3 = self.clients[3].create_node_from_uri(uri)
621             log.msg("starting replace2")
622             d1 = newnode1.overwrite(NEWERDATA)
623             d1.addCallback(lambda res: newnode2.download_best_version())
624             return d1
625         d.addCallback(_check_download_4)
626
627         def _check_download_5(res):
628             log.msg("finished replace2")
629             self.failUnlessEqual(res, NEWERDATA)
630         d.addCallback(_check_download_5)
631
632         def _corrupt_shares(res):
633             # run around and flip bits in all but k of the shares, to test
634             # the hash checks
635             shares = self._find_shares(self.basedir)
636             ## sort by share number
637             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
638             where = dict([ (shnum, filename)
639                            for (client_num, storage_index, filename, shnum)
640                            in shares ])
641             assert len(where) == 10 # this test is designed for 3-of-10
642             for shnum, filename in where.items():
643                 # shares 7,8,9 are left alone. read will check
644                 # (share_hash_chain, block_hash_tree, share_data). New
645                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
646                 # segsize, signature).
647                 if shnum == 0:
648                     # read: this will trigger "pubkey doesn't match
649                     # fingerprint".
650                     self._corrupt_mutable_share(filename, "pubkey")
651                     self._corrupt_mutable_share(filename, "encprivkey")
652                 elif shnum == 1:
653                     # triggers "signature is invalid"
654                     self._corrupt_mutable_share(filename, "seqnum")
655                 elif shnum == 2:
656                     # triggers "signature is invalid"
657                     self._corrupt_mutable_share(filename, "R")
658                 elif shnum == 3:
659                     # triggers "signature is invalid"
660                     self._corrupt_mutable_share(filename, "segsize")
661                 elif shnum == 4:
662                     self._corrupt_mutable_share(filename, "share_hash_chain")
663                 elif shnum == 5:
664                     self._corrupt_mutable_share(filename, "block_hash_tree")
665                 elif shnum == 6:
666                     self._corrupt_mutable_share(filename, "share_data")
667                 # other things to correct: IV, signature
668                 # 7,8,9 are left alone
669
670                 # note that initial_query_count=5 means that we'll hit the
671                 # first 5 servers in effectively random order (based upon
672                 # response time), so we won't necessarily ever get a "pubkey
673                 # doesn't match fingerprint" error (if we hit shnum>=1 before
674                 # shnum=0, we pull the pubkey from there). To get repeatable
675                 # specific failures, we need to set initial_query_count=1,
676                 # but of course that will change the sequencing behavior of
677                 # the retrieval process. TODO: find a reasonable way to make
678                 # this a parameter, probably when we expand this test to test
679                 # for one failure mode at a time.
680
681                 # when we retrieve this, we should get three signature
682                 # failures (where we've mangled seqnum, R, and segsize). The
683                 # pubkey mangling
684         d.addCallback(_corrupt_shares)
685
686         d.addCallback(lambda res: self._newnode3.download_best_version())
687         d.addCallback(_check_download_5)
688
689         def _check_empty_file(res):
690             # make sure we can create empty files, this usually screws up the
691             # segsize math
692             d1 = self.clients[2].create_mutable_file("")
693             d1.addCallback(lambda newnode: newnode.download_best_version())
694             d1.addCallback(lambda res: self.failUnlessEqual("", res))
695             return d1
696         d.addCallback(_check_empty_file)
697
698         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
699         def _created_dirnode(dnode):
700             log.msg("_created_dirnode(%s)" % (dnode,))
701             d1 = dnode.list()
702             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
703             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
704             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
705             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
706             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
707             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
708             d1.addCallback(lambda res: dnode.build_manifest().when_done())
709             d1.addCallback(lambda res:
710                            self.failUnlessEqual(len(res["manifest"]), 1))
711             return d1
712         d.addCallback(_created_dirnode)
713
714         def wait_for_c3_kg_conn():
715             return self.clients[3]._key_generator is not None
716         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
717
718         def check_kg_poolsize(junk, size_delta):
719             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
720                                  self.key_generator_svc.key_generator.pool_size + size_delta)
721
722         d.addCallback(check_kg_poolsize, 0)
723         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
724         d.addCallback(check_kg_poolsize, -1)
725         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
726         d.addCallback(check_kg_poolsize, -2)
727         # use_helper induces use of clients[3], which is the using-key_gen client
728         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
729         d.addCallback(check_kg_poolsize, -3)
730
731         return d
732     # The default 120 second timeout went off when running it under valgrind
733     # on my old Windows laptop, so I'm bumping up the timeout.
734     test_mutable.timeout = 240
735
736     def flip_bit(self, good):
737         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
738
739     def mangle_uri(self, gooduri):
740         # change the key, which changes the storage index, which means we'll
741         # be asking about the wrong file, so nobody will have any shares
742         u = IFileURI(gooduri)
743         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
744                             uri_extension_hash=u.uri_extension_hash,
745                             needed_shares=u.needed_shares,
746                             total_shares=u.total_shares,
747                             size=u.size)
748         return u2.to_string()
749
750     # TODO: add a test which mangles the uri_extension_hash instead, and
751     # should fail due to not being able to get a valid uri_extension block.
752     # Also a test which sneakily mangles the uri_extension block to change
753     # some of the validation data, so it will fail in the post-download phase
754     # when the file's crypttext integrity check fails. Do the same thing for
755     # the key, which should cause the download to fail the post-download
756     # plaintext_hash check.
757
758     def test_vdrive(self):
759         self.basedir = "system/SystemTest/test_vdrive"
760         self.data = LARGE_DATA
761         d = self.set_up_nodes(use_stats_gatherer=True)
762         d.addCallback(self._test_introweb)
763         d.addCallback(self.log, "starting publish")
764         d.addCallback(self._do_publish1)
765         d.addCallback(self._test_runner)
766         d.addCallback(self._do_publish2)
767         # at this point, we have the following filesystem (where "R" denotes
768         # self._root_directory_uri):
769         # R
770         # R/subdir1
771         # R/subdir1/mydata567
772         # R/subdir1/subdir2/
773         # R/subdir1/subdir2/mydata992
774
775         d.addCallback(lambda res: self.bounce_client(0))
776         d.addCallback(self.log, "bounced client0")
777
778         d.addCallback(self._check_publish1)
779         d.addCallback(self.log, "did _check_publish1")
780         d.addCallback(self._check_publish2)
781         d.addCallback(self.log, "did _check_publish2")
782         d.addCallback(self._do_publish_private)
783         d.addCallback(self.log, "did _do_publish_private")
784         # now we also have (where "P" denotes a new dir):
785         #  P/personal/sekrit data
786         #  P/s2-rw -> /subdir1/subdir2/
787         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
788         d.addCallback(self._check_publish_private)
789         d.addCallback(self.log, "did _check_publish_private")
790         d.addCallback(self._test_web)
791         d.addCallback(self._test_control)
792         d.addCallback(self._test_cli)
793         # P now has four top-level children:
794         # P/personal/sekrit data
795         # P/s2-ro/
796         # P/s2-rw/
797         # P/test_put/  (empty)
798         d.addCallback(self._test_checker)
799         return d
800     test_vdrive.timeout = 1100
801
802     def _test_introweb(self, res):
803         d = getPage(self.introweb_url, method="GET", followRedirect=True)
804         def _check(res):
805             try:
806                 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
807                                 in res)
808                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
809                 self.failUnless("Subscription Summary: storage: 5" in res)
810             except unittest.FailTest:
811                 print
812                 print "GET %s output was:" % self.introweb_url
813                 print res
814                 raise
815         d.addCallback(_check)
816         d.addCallback(lambda res:
817                       getPage(self.introweb_url + "?t=json",
818                               method="GET", followRedirect=True))
819         def _check_json(res):
820             data = simplejson.loads(res)
821             try:
822                 self.failUnlessEqual(data["subscription_summary"],
823                                      {"storage": 5})
824                 self.failUnlessEqual(data["announcement_summary"],
825                                      {"storage": 5, "stub_client": 5})
826                 self.failUnlessEqual(data["announcement_distinct_hosts"],
827                                      {"storage": 1, "stub_client": 1})
828             except unittest.FailTest:
829                 print
830                 print "GET %s?t=json output was:" % self.introweb_url
831                 print res
832                 raise
833         d.addCallback(_check_json)
834         return d
835
836     def _do_publish1(self, res):
837         ut = upload.Data(self.data, convergence=None)
838         c0 = self.clients[0]
839         d = c0.create_empty_dirnode()
840         def _made_root(new_dirnode):
841             self._root_directory_uri = new_dirnode.get_uri()
842             return c0.create_node_from_uri(self._root_directory_uri)
843         d.addCallback(_made_root)
844         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
845         def _made_subdir1(subdir1_node):
846             self._subdir1_node = subdir1_node
847             d1 = subdir1_node.add_file(u"mydata567", ut)
848             d1.addCallback(self.log, "publish finished")
849             def _stash_uri(filenode):
850                 self.uri = filenode.get_uri()
851                 assert isinstance(self.uri, str), (self.uri, filenode)
852             d1.addCallback(_stash_uri)
853             return d1
854         d.addCallback(_made_subdir1)
855         return d
856
857     def _do_publish2(self, res):
858         ut = upload.Data(self.data, convergence=None)
859         d = self._subdir1_node.create_empty_directory(u"subdir2")
860         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
861         return d
862
863     def log(self, res, *args, **kwargs):
864         # print "MSG: %s  RES: %s" % (msg, args)
865         log.msg(*args, **kwargs)
866         return res
867
868     def _do_publish_private(self, res):
869         self.smalldata = "sssh, very secret stuff"
870         ut = upload.Data(self.smalldata, convergence=None)
871         d = self.clients[0].create_empty_dirnode()
872         d.addCallback(self.log, "GOT private directory")
873         def _got_new_dir(privnode):
874             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
875             d1 = privnode.create_empty_directory(u"personal")
876             d1.addCallback(self.log, "made P/personal")
877             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
878             d1.addCallback(self.log, "made P/personal/sekrit data")
879             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
880             def _got_s2(s2node):
881                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
882                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
883                 return d2
884             d1.addCallback(_got_s2)
885             d1.addCallback(lambda res: privnode)
886             return d1
887         d.addCallback(_got_new_dir)
888         return d
889
890     def _check_publish1(self, res):
891         # this one uses the iterative API
892         c1 = self.clients[1]
893         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
894         d.addCallback(self.log, "check_publish1 got /")
895         d.addCallback(lambda root: root.get(u"subdir1"))
896         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
897         d.addCallback(lambda filenode: filenode.download_to_data())
898         d.addCallback(self.log, "get finished")
899         def _get_done(data):
900             self.failUnlessEqual(data, self.data)
901         d.addCallback(_get_done)
902         return d
903
904     def _check_publish2(self, res):
905         # this one uses the path-based API
906         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
907         d = rootnode.get_child_at_path(u"subdir1")
908         d.addCallback(lambda dirnode:
909                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
910         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
911         d.addCallback(lambda filenode: filenode.download_to_data())
912         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
913
914         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
915         def _got_filenode(filenode):
916             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
917             assert fnode == filenode
918         d.addCallback(_got_filenode)
919         return d
920
921     def _check_publish_private(self, resnode):
922         # this one uses the path-based API
923         self._private_node = resnode
924
925         d = self._private_node.get_child_at_path(u"personal")
926         def _got_personal(personal):
927             self._personal_node = personal
928             return personal
929         d.addCallback(_got_personal)
930
931         d.addCallback(lambda dirnode:
932                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
933         def get_path(path):
934             return self._private_node.get_child_at_path(path)
935
936         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
937         d.addCallback(lambda filenode: filenode.download_to_data())
938         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
939         d.addCallback(lambda res: get_path(u"s2-rw"))
940         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
941         d.addCallback(lambda res: get_path(u"s2-ro"))
942         def _got_s2ro(dirnode):
943             self.failUnless(dirnode.is_mutable(), dirnode)
944             self.failUnless(dirnode.is_readonly(), dirnode)
945             d1 = defer.succeed(None)
946             d1.addCallback(lambda res: dirnode.list())
947             d1.addCallback(self.log, "dirnode.list")
948
949             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
950
951             d1.addCallback(self.log, "doing add_file(ro)")
952             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
953             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
954
955             d1.addCallback(self.log, "doing get(ro)")
956             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
957             d1.addCallback(lambda filenode:
958                            self.failUnless(IFileNode.providedBy(filenode)))
959
960             d1.addCallback(self.log, "doing delete(ro)")
961             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
962
963             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
964
965             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
966
967             personal = self._personal_node
968             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
969
970             d1.addCallback(self.log, "doing move_child_to(ro)2")
971             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
972
973             d1.addCallback(self.log, "finished with _got_s2ro")
974             return d1
975         d.addCallback(_got_s2ro)
976         def _got_home(dummy):
977             home = self._private_node
978             personal = self._personal_node
979             d1 = defer.succeed(None)
980             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
981             d1.addCallback(lambda res:
982                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
983
984             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
985             d1.addCallback(lambda res:
986                            home.move_child_to(u"sekrit", home, u"sekrit data"))
987
988             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
989             d1.addCallback(lambda res:
990                            home.move_child_to(u"sekrit data", personal))
991
992             d1.addCallback(lambda res: home.build_manifest().when_done())
993             d1.addCallback(self.log, "manifest")
994             #  five items:
995             # P/
996             # P/personal/
997             # P/personal/sekrit data
998             # P/s2-rw  (same as P/s2-ro)
999             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1000             d1.addCallback(lambda res:
1001                            self.failUnlessEqual(len(res["manifest"]), 5))
1002             d1.addCallback(lambda res: home.start_deep_stats().when_done())
1003             def _check_stats(stats):
1004                 expected = {"count-immutable-files": 1,
1005                             "count-mutable-files": 0,
1006                             "count-literal-files": 1,
1007                             "count-files": 2,
1008                             "count-directories": 3,
1009                             "size-immutable-files": 112,
1010                             "size-literal-files": 23,
1011                             #"size-directories": 616, # varies
1012                             #"largest-directory": 616,
1013                             "largest-directory-children": 3,
1014                             "largest-immutable-file": 112,
1015                             }
1016                 for k,v in expected.iteritems():
1017                     self.failUnlessEqual(stats[k], v,
1018                                          "stats[%s] was %s, not %s" %
1019                                          (k, stats[k], v))
1020                 self.failUnless(stats["size-directories"] > 1300,
1021                                 stats["size-directories"])
1022                 self.failUnless(stats["largest-directory"] > 800,
1023                                 stats["largest-directory"])
1024                 self.failUnlessEqual(stats["size-files-histogram"],
1025                                      [ (11, 31, 1), (101, 316, 1) ])
1026             d1.addCallback(_check_stats)
1027             return d1
1028         d.addCallback(_got_home)
1029         return d
1030
1031     def shouldFail(self, res, expected_failure, which, substring=None):
1032         if isinstance(res, Failure):
1033             res.trap(expected_failure)
1034             if substring:
1035                 self.failUnless(substring in str(res),
1036                                 "substring '%s' not in '%s'"
1037                                 % (substring, str(res)))
1038         else:
1039             self.fail("%s was supposed to raise %s, not get '%s'" %
1040                       (which, expected_failure, res))
1041
1042     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1043         assert substring is None or isinstance(substring, str)
1044         d = defer.maybeDeferred(callable, *args, **kwargs)
1045         def done(res):
1046             if isinstance(res, Failure):
1047                 res.trap(expected_failure)
1048                 if substring:
1049                     self.failUnless(substring in str(res),
1050                                     "substring '%s' not in '%s'"
1051                                     % (substring, str(res)))
1052             else:
1053                 self.fail("%s was supposed to raise %s, not get '%s'" %
1054                           (which, expected_failure, res))
1055         d.addBoth(done)
1056         return d
1057
1058     def PUT(self, urlpath, data):
1059         url = self.webish_url + urlpath
1060         return getPage(url, method="PUT", postdata=data)
1061
1062     def GET(self, urlpath, followRedirect=False):
1063         url = self.webish_url + urlpath
1064         return getPage(url, method="GET", followRedirect=followRedirect)
1065
1066     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1067         if use_helper:
1068             url = self.helper_webish_url + urlpath
1069         else:
1070             url = self.webish_url + urlpath
1071         sepbase = "boogabooga"
1072         sep = "--" + sepbase
1073         form = []
1074         form.append(sep)
1075         form.append('Content-Disposition: form-data; name="_charset"')
1076         form.append('')
1077         form.append('UTF-8')
1078         form.append(sep)
1079         for name, value in fields.iteritems():
1080             if isinstance(value, tuple):
1081                 filename, value = value
1082                 form.append('Content-Disposition: form-data; name="%s"; '
1083                             'filename="%s"' % (name, filename.encode("utf-8")))
1084             else:
1085                 form.append('Content-Disposition: form-data; name="%s"' % name)
1086             form.append('')
1087             form.append(str(value))
1088             form.append(sep)
1089         form[-1] += "--"
1090         body = "\r\n".join(form) + "\r\n"
1091         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1092                    }
1093         return getPage(url, method="POST", postdata=body,
1094                        headers=headers, followRedirect=followRedirect)
1095
1096     def _test_web(self, res):
1097         base = self.webish_url
1098         public = "uri/" + self._root_directory_uri
1099         d = getPage(base)
1100         def _got_welcome(page):
1101             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1102             self.failUnless(expected in page,
1103                             "I didn't see the right 'connected storage servers'"
1104                             " message in: %s" % page
1105                             )
1106             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1107             self.failUnless(expected in page,
1108                             "I didn't see the right 'My nodeid' message "
1109                             "in: %s" % page)
1110             self.failUnless("Helper: 0 active uploads" in page)
1111         d.addCallback(_got_welcome)
1112         d.addCallback(self.log, "done with _got_welcome")
1113
1114         # get the welcome page from the node that uses the helper too
1115         d.addCallback(lambda res: getPage(self.helper_webish_url))
1116         def _got_welcome_helper(page):
1117             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1118                             page)
1119             self.failUnless("Not running helper" in page)
1120         d.addCallback(_got_welcome_helper)
1121
1122         d.addCallback(lambda res: getPage(base + public))
1123         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1124         def _got_subdir1(page):
1125             # there ought to be an href for our file
1126             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1127             self.failUnless(">mydata567</a>" in page)
1128         d.addCallback(_got_subdir1)
1129         d.addCallback(self.log, "done with _got_subdir1")
1130         d.addCallback(lambda res:
1131                       getPage(base + public + "/subdir1/mydata567"))
1132         def _got_data(page):
1133             self.failUnlessEqual(page, self.data)
1134         d.addCallback(_got_data)
1135
1136         # download from a URI embedded in a URL
1137         d.addCallback(self.log, "_get_from_uri")
1138         def _get_from_uri(res):
1139             return getPage(base + "uri/%s?filename=%s"
1140                            % (self.uri, "mydata567"))
1141         d.addCallback(_get_from_uri)
1142         def _got_from_uri(page):
1143             self.failUnlessEqual(page, self.data)
1144         d.addCallback(_got_from_uri)
1145
1146         # download from a URI embedded in a URL, second form
1147         d.addCallback(self.log, "_get_from_uri2")
1148         def _get_from_uri2(res):
1149             return getPage(base + "uri?uri=%s" % (self.uri,))
1150         d.addCallback(_get_from_uri2)
1151         d.addCallback(_got_from_uri)
1152
1153         # download from a bogus URI, make sure we get a reasonable error
1154         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1155         def _get_from_bogus_uri(res):
1156             d1 = getPage(base + "uri/%s?filename=%s"
1157                          % (self.mangle_uri(self.uri), "mydata567"))
1158             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1159                        "410")
1160             return d1
1161         d.addCallback(_get_from_bogus_uri)
1162         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1163
1164         # upload a file with PUT
1165         d.addCallback(self.log, "about to try PUT")
1166         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1167                                            "new.txt contents"))
1168         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1169         d.addCallback(self.failUnlessEqual, "new.txt contents")
1170         # and again with something large enough to use multiple segments,
1171         # and hopefully trigger pauseProducing too
1172         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1173                                            "big" * 500000)) # 1.5MB
1174         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1175         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1176
1177         # can we replace files in place?
1178         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1179                                            "NEWER contents"))
1180         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1181         d.addCallback(self.failUnlessEqual, "NEWER contents")
1182
1183         # test unlinked POST
1184         d.addCallback(lambda res: self.POST("uri", t="upload",
1185                                             file=("new.txt", "data" * 10000)))
1186         # and again using the helper, which exercises different upload-status
1187         # display code
1188         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1189                                             file=("foo.txt", "data2" * 10000)))
1190
1191         # check that the status page exists
1192         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1193         def _got_status(res):
1194             # find an interesting upload and download to look at. LIT files
1195             # are not interesting.
1196             for ds in self.clients[0].list_all_download_statuses():
1197                 if ds.get_size() > 200:
1198                     self._down_status = ds.get_counter()
1199             for us in self.clients[0].list_all_upload_statuses():
1200                 if us.get_size() > 200:
1201                     self._up_status = us.get_counter()
1202             rs = list(self.clients[0].list_all_retrieve_statuses())[0]
1203             self._retrieve_status = rs.get_counter()
1204             ps = list(self.clients[0].list_all_publish_statuses())[0]
1205             self._publish_status = ps.get_counter()
1206             us = list(self.clients[0].list_all_mapupdate_statuses())[0]
1207             self._update_status = us.get_counter()
1208
1209             # and that there are some upload- and download- status pages
1210             return self.GET("status/up-%d" % self._up_status)
1211         d.addCallback(_got_status)
1212         def _got_up(res):
1213             return self.GET("status/down-%d" % self._down_status)
1214         d.addCallback(_got_up)
1215         def _got_down(res):
1216             return self.GET("status/mapupdate-%d" % self._update_status)
1217         d.addCallback(_got_down)
1218         def _got_update(res):
1219             return self.GET("status/publish-%d" % self._publish_status)
1220         d.addCallback(_got_update)
1221         def _got_publish(res):
1222             return self.GET("status/retrieve-%d" % self._retrieve_status)
1223         d.addCallback(_got_publish)
1224
1225         # check that the helper status page exists
1226         d.addCallback(lambda res:
1227                       self.GET("helper_status", followRedirect=True))
1228         def _got_helper_status(res):
1229             self.failUnless("Bytes Fetched:" in res)
1230             # touch a couple of files in the helper's working directory to
1231             # exercise more code paths
1232             workdir = os.path.join(self.getdir("client0"), "helper")
1233             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1234             f = open(incfile, "wb")
1235             f.write("small file")
1236             f.close()
1237             then = time.time() - 86400*3
1238             now = time.time()
1239             os.utime(incfile, (now, then))
1240             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1241             f = open(encfile, "wb")
1242             f.write("less small file")
1243             f.close()
1244             os.utime(encfile, (now, then))
1245         d.addCallback(_got_helper_status)
1246         # and that the json form exists
1247         d.addCallback(lambda res:
1248                       self.GET("helper_status?t=json", followRedirect=True))
1249         def _got_helper_status_json(res):
1250             data = simplejson.loads(res)
1251             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1252                                  1)
1253             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1254             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1255             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1256                                  10)
1257             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1258             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1259             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1260                                  15)
1261         d.addCallback(_got_helper_status_json)
1262
1263         # and check that client[3] (which uses a helper but does not run one
1264         # itself) doesn't explode when you ask for its status
1265         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1266         def _got_non_helper_status(res):
1267             self.failUnless("Upload and Download Status" in res)
1268         d.addCallback(_got_non_helper_status)
1269
1270         # or for helper status with t=json
1271         d.addCallback(lambda res:
1272                       getPage(self.helper_webish_url + "helper_status?t=json"))
1273         def _got_non_helper_status_json(res):
1274             data = simplejson.loads(res)
1275             self.failUnlessEqual(data, {})
1276         d.addCallback(_got_non_helper_status_json)
1277
1278         # see if the statistics page exists
1279         d.addCallback(lambda res: self.GET("statistics"))
1280         def _got_stats(res):
1281             self.failUnless("Node Statistics" in res)
1282             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1283         d.addCallback(_got_stats)
1284         d.addCallback(lambda res: self.GET("statistics?t=json"))
1285         def _got_stats_json(res):
1286             data = simplejson.loads(res)
1287             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1288             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1289         d.addCallback(_got_stats_json)
1290
1291         # TODO: mangle the second segment of a file, to test errors that
1292         # occur after we've already sent some good data, which uses a
1293         # different error path.
1294
1295         # TODO: download a URI with a form
1296         # TODO: create a directory by using a form
1297         # TODO: upload by using a form on the directory page
1298         #    url = base + "somedir/subdir1/freeform_post!!upload"
1299         # TODO: delete a file by using a button on the directory page
1300
1301         return d
1302
1303     def _test_runner(self, res):
1304         # exercise some of the diagnostic tools in runner.py
1305
1306         # find a share
1307         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1308             if "storage" not in dirpath:
1309                 continue
1310             if not filenames:
1311                 continue
1312             pieces = dirpath.split(os.sep)
1313             if pieces[-4] == "storage" and pieces[-3] == "shares":
1314                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1315                 # are sharefiles here
1316                 filename = os.path.join(dirpath, filenames[0])
1317                 # peek at the magic to see if it is a chk share
1318                 magic = open(filename, "rb").read(4)
1319                 if magic == '\x00\x00\x00\x01':
1320                     break
1321         else:
1322             self.fail("unable to find any uri_extension files in %s"
1323                       % self.basedir)
1324         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1325
1326         out,err = StringIO(), StringIO()
1327         rc = runner.runner(["debug", "dump-share", "--offsets",
1328                             filename],
1329                            stdout=out, stderr=err)
1330         output = out.getvalue()
1331         self.failUnlessEqual(rc, 0)
1332
1333         # we only upload a single file, so we can assert some things about
1334         # its size and shares.
1335         self.failUnless(("share filename: %s" % filename) in output)
1336         self.failUnless("size: %d\n" % len(self.data) in output)
1337         self.failUnless("num_segments: 1\n" in output)
1338         # segment_size is always a multiple of needed_shares
1339         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1340         self.failUnless("total_shares: 10\n" in output)
1341         # keys which are supposed to be present
1342         for key in ("size", "num_segments", "segment_size",
1343                     "needed_shares", "total_shares",
1344                     "codec_name", "codec_params", "tail_codec_params",
1345                     #"plaintext_hash", "plaintext_root_hash",
1346                     "crypttext_hash", "crypttext_root_hash",
1347                     "share_root_hash", "UEB_hash"):
1348             self.failUnless("%s: " % key in output, key)
1349         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1350
1351         # now use its storage index to find the other shares using the
1352         # 'find-shares' tool
1353         sharedir, shnum = os.path.split(filename)
1354         storagedir, storage_index_s = os.path.split(sharedir)
1355         out,err = StringIO(), StringIO()
1356         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1357         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1358         rc = runner.runner(cmd, stdout=out, stderr=err)
1359         self.failUnlessEqual(rc, 0)
1360         out.seek(0)
1361         sharefiles = [sfn.strip() for sfn in out.readlines()]
1362         self.failUnlessEqual(len(sharefiles), 10)
1363
1364         # also exercise the 'catalog-shares' tool
1365         out,err = StringIO(), StringIO()
1366         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1367         cmd = ["debug", "catalog-shares"] + nodedirs
1368         rc = runner.runner(cmd, stdout=out, stderr=err)
1369         self.failUnlessEqual(rc, 0)
1370         out.seek(0)
1371         descriptions = [sfn.strip() for sfn in out.readlines()]
1372         self.failUnlessEqual(len(descriptions), 30)
1373         matching = [line
1374                     for line in descriptions
1375                     if line.startswith("CHK %s " % storage_index_s)]
1376         self.failUnlessEqual(len(matching), 10)
1377
1378     def _test_control(self, res):
1379         # exercise the remote-control-the-client foolscap interfaces in
1380         # allmydata.control (mostly used for performance tests)
1381         c0 = self.clients[0]
1382         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1383         control_furl = open(control_furl_file, "r").read().strip()
1384         # it doesn't really matter which Tub we use to connect to the client,
1385         # so let's just use our IntroducerNode's
1386         d = self.introducer.tub.getReference(control_furl)
1387         d.addCallback(self._test_control2, control_furl_file)
1388         return d
1389     def _test_control2(self, rref, filename):
1390         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1391         downfile = os.path.join(self.basedir, "control.downfile")
1392         d.addCallback(lambda uri:
1393                       rref.callRemote("download_from_uri_to_file",
1394                                       uri, downfile))
1395         def _check(res):
1396             self.failUnlessEqual(res, downfile)
1397             data = open(downfile, "r").read()
1398             expected_data = open(filename, "r").read()
1399             self.failUnlessEqual(data, expected_data)
1400         d.addCallback(_check)
1401         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1402         if sys.platform == "linux2":
1403             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1404         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1405         return d
1406
1407     def _test_cli(self, res):
1408         # run various CLI commands (in a thread, since they use blocking
1409         # network calls)
1410
1411         private_uri = self._private_node.get_uri()
1412         some_uri = self._root_directory_uri
1413         client0_basedir = self.getdir("client0")
1414
1415         nodeargs = [
1416             "--node-directory", client0_basedir,
1417             ]
1418         TESTDATA = "I will not write the same thing over and over.\n" * 100
1419
1420         d = defer.succeed(None)
1421
1422         # for compatibility with earlier versions, private/root_dir.cap is
1423         # supposed to be treated as an alias named "tahoe:". Start by making
1424         # sure that works, before we add other aliases.
1425
1426         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1427         f = open(root_file, "w")
1428         f.write(private_uri)
1429         f.close()
1430
1431         def run(ignored, verb, *args, **kwargs):
1432             stdin = kwargs.get("stdin", "")
1433             newargs = [verb] + nodeargs + list(args)
1434             return self._run_cli(newargs, stdin=stdin)
1435
1436         def _check_ls((out,err), expected_children, unexpected_children=[]):
1437             self.failUnlessEqual(err, "")
1438             for s in expected_children:
1439                 self.failUnless(s in out, (s,out))
1440             for s in unexpected_children:
1441                 self.failIf(s in out, (s,out))
1442
1443         def _check_ls_root((out,err)):
1444             self.failUnless("personal" in out)
1445             self.failUnless("s2-ro" in out)
1446             self.failUnless("s2-rw" in out)
1447             self.failUnlessEqual(err, "")
1448
1449         # this should reference private_uri
1450         d.addCallback(run, "ls")
1451         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1452
1453         d.addCallback(run, "list-aliases")
1454         def _check_aliases_1((out,err)):
1455             self.failUnlessEqual(err, "")
1456             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1457         d.addCallback(_check_aliases_1)
1458
1459         # now that that's out of the way, remove root_dir.cap and work with
1460         # new files
1461         d.addCallback(lambda res: os.unlink(root_file))
1462         d.addCallback(run, "list-aliases")
1463         def _check_aliases_2((out,err)):
1464             self.failUnlessEqual(err, "")
1465             self.failUnlessEqual(out, "")
1466         d.addCallback(_check_aliases_2)
1467
1468         d.addCallback(run, "mkdir")
1469         def _got_dir( (out,err) ):
1470             self.failUnless(uri.from_string_dirnode(out.strip()))
1471             return out.strip()
1472         d.addCallback(_got_dir)
1473         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1474
1475         d.addCallback(run, "list-aliases")
1476         def _check_aliases_3((out,err)):
1477             self.failUnlessEqual(err, "")
1478             self.failUnless("tahoe: " in out)
1479         d.addCallback(_check_aliases_3)
1480
1481         def _check_empty_dir((out,err)):
1482             self.failUnlessEqual(out, "")
1483             self.failUnlessEqual(err, "")
1484         d.addCallback(run, "ls")
1485         d.addCallback(_check_empty_dir)
1486
1487         def _check_missing_dir((out,err)):
1488             # TODO: check that rc==2
1489             self.failUnlessEqual(out, "")
1490             self.failUnlessEqual(err, "No such file or directory\n")
1491         d.addCallback(run, "ls", "bogus")
1492         d.addCallback(_check_missing_dir)
1493
1494         files = []
1495         datas = []
1496         for i in range(10):
1497             fn = os.path.join(self.basedir, "file%d" % i)
1498             files.append(fn)
1499             data = "data to be uploaded: file%d\n" % i
1500             datas.append(data)
1501             open(fn,"wb").write(data)
1502
1503         def _check_stdout_against((out,err), filenum=None, data=None):
1504             self.failUnlessEqual(err, "")
1505             if filenum is not None:
1506                 self.failUnlessEqual(out, datas[filenum])
1507             if data is not None:
1508                 self.failUnlessEqual(out, data)
1509
1510         # test all both forms of put: from a file, and from stdin
1511         #  tahoe put bar FOO
1512         d.addCallback(run, "put", files[0], "tahoe-file0")
1513         def _put_out((out,err)):
1514             self.failUnless("URI:LIT:" in out, out)
1515             self.failUnless("201 Created" in err, err)
1516             uri0 = out.strip()
1517             return run(None, "get", uri0)
1518         d.addCallback(_put_out)
1519         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1520
1521         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1522         #  tahoe put bar tahoe:FOO
1523         d.addCallback(run, "put", files[2], "tahoe:file2")
1524         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1525         def _check_put_mutable((out,err)):
1526             self._mutable_file3_uri = out.strip()
1527         d.addCallback(_check_put_mutable)
1528         d.addCallback(run, "get", "tahoe:file3")
1529         d.addCallback(_check_stdout_against, 3)
1530
1531         #  tahoe put FOO
1532         STDIN_DATA = "This is the file to upload from stdin."
1533         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1534         #  tahoe put tahoe:FOO
1535         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1536                       stdin="Other file from stdin.")
1537
1538         d.addCallback(run, "ls")
1539         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1540                                   "tahoe-file-stdin", "from-stdin"])
1541         d.addCallback(run, "ls", "subdir")
1542         d.addCallback(_check_ls, ["tahoe-file1"])
1543
1544         # tahoe mkdir FOO
1545         d.addCallback(run, "mkdir", "subdir2")
1546         d.addCallback(run, "ls")
1547         # TODO: extract the URI, set an alias with it
1548         d.addCallback(_check_ls, ["subdir2"])
1549
1550         # tahoe get: (to stdin and to a file)
1551         d.addCallback(run, "get", "tahoe-file0")
1552         d.addCallback(_check_stdout_against, 0)
1553         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1554         d.addCallback(_check_stdout_against, 1)
1555         outfile0 = os.path.join(self.basedir, "outfile0")
1556         d.addCallback(run, "get", "file2", outfile0)
1557         def _check_outfile0((out,err)):
1558             data = open(outfile0,"rb").read()
1559             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1560         d.addCallback(_check_outfile0)
1561         outfile1 = os.path.join(self.basedir, "outfile0")
1562         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1563         def _check_outfile1((out,err)):
1564             data = open(outfile1,"rb").read()
1565             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1566         d.addCallback(_check_outfile1)
1567
1568         d.addCallback(run, "rm", "tahoe-file0")
1569         d.addCallback(run, "rm", "tahoe:file2")
1570         d.addCallback(run, "ls")
1571         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1572
1573         d.addCallback(run, "ls", "-l")
1574         def _check_ls_l((out,err)):
1575             lines = out.split("\n")
1576             for l in lines:
1577                 if "tahoe-file-stdin" in l:
1578                     self.failUnless(l.startswith("-r-- "), l)
1579                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1580                 if "file3" in l:
1581                     self.failUnless(l.startswith("-rw- "), l) # mutable
1582         d.addCallback(_check_ls_l)
1583
1584         d.addCallback(run, "ls", "--uri")
1585         def _check_ls_uri((out,err)):
1586             lines = out.split("\n")
1587             for l in lines:
1588                 if "file3" in l:
1589                     self.failUnless(self._mutable_file3_uri in l)
1590         d.addCallback(_check_ls_uri)
1591
1592         d.addCallback(run, "ls", "--readonly-uri")
1593         def _check_ls_rouri((out,err)):
1594             lines = out.split("\n")
1595             for l in lines:
1596                 if "file3" in l:
1597                     rw_uri = self._mutable_file3_uri
1598                     u = uri.from_string_mutable_filenode(rw_uri)
1599                     ro_uri = u.get_readonly().to_string()
1600                     self.failUnless(ro_uri in l)
1601         d.addCallback(_check_ls_rouri)
1602
1603
1604         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1605         d.addCallback(run, "ls")
1606         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1607
1608         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1609         d.addCallback(run, "ls")
1610         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1611
1612         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1613         d.addCallback(run, "ls")
1614         d.addCallback(_check_ls, ["file3", "file3-copy"])
1615         d.addCallback(run, "get", "tahoe:file3-copy")
1616         d.addCallback(_check_stdout_against, 3)
1617
1618         # copy from disk into tahoe
1619         d.addCallback(run, "cp", files[4], "tahoe:file4")
1620         d.addCallback(run, "ls")
1621         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1622         d.addCallback(run, "get", "tahoe:file4")
1623         d.addCallback(_check_stdout_against, 4)
1624
1625         # copy from tahoe into disk
1626         target_filename = os.path.join(self.basedir, "file-out")
1627         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1628         def _check_cp_out((out,err)):
1629             self.failUnless(os.path.exists(target_filename))
1630             got = open(target_filename,"rb").read()
1631             self.failUnlessEqual(got, datas[4])
1632         d.addCallback(_check_cp_out)
1633
1634         # copy from disk to disk (silly case)
1635         target2_filename = os.path.join(self.basedir, "file-out-copy")
1636         d.addCallback(run, "cp", target_filename, target2_filename)
1637         def _check_cp_out2((out,err)):
1638             self.failUnless(os.path.exists(target2_filename))
1639             got = open(target2_filename,"rb").read()
1640             self.failUnlessEqual(got, datas[4])
1641         d.addCallback(_check_cp_out2)
1642
1643         # copy from tahoe into disk, overwriting an existing file
1644         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1645         def _check_cp_out3((out,err)):
1646             self.failUnless(os.path.exists(target_filename))
1647             got = open(target_filename,"rb").read()
1648             self.failUnlessEqual(got, datas[3])
1649         d.addCallback(_check_cp_out3)
1650
1651         # copy from disk into tahoe, overwriting an existing immutable file
1652         d.addCallback(run, "cp", files[5], "tahoe:file4")
1653         d.addCallback(run, "ls")
1654         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1655         d.addCallback(run, "get", "tahoe:file4")
1656         d.addCallback(_check_stdout_against, 5)
1657
1658         # copy from disk into tahoe, overwriting an existing mutable file
1659         d.addCallback(run, "cp", files[5], "tahoe:file3")
1660         d.addCallback(run, "ls")
1661         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1662         d.addCallback(run, "get", "tahoe:file3")
1663         d.addCallback(_check_stdout_against, 5)
1664
1665         # recursive copy: setup
1666         dn = os.path.join(self.basedir, "dir1")
1667         os.makedirs(dn)
1668         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1669         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1670         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1671         sdn2 = os.path.join(dn, "subdir2")
1672         os.makedirs(sdn2)
1673         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1674         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1675
1676         # from disk into tahoe
1677         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1678         d.addCallback(run, "ls")
1679         d.addCallback(_check_ls, ["dir1"])
1680         d.addCallback(run, "ls", "dir1")
1681         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1682                       ["rfile4", "rfile5"])
1683         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1684         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1685                       ["rfile1", "rfile2", "rfile3"])
1686         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1687         d.addCallback(_check_stdout_against, data="rfile4")
1688
1689         # and back out again
1690         dn_copy = os.path.join(self.basedir, "dir1-copy")
1691         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1692         def _check_cp_r_out((out,err)):
1693             def _cmp(name):
1694                 old = open(os.path.join(dn, name), "rb").read()
1695                 newfn = os.path.join(dn_copy, name)
1696                 self.failUnless(os.path.exists(newfn))
1697                 new = open(newfn, "rb").read()
1698                 self.failUnlessEqual(old, new)
1699             _cmp("rfile1")
1700             _cmp("rfile2")
1701             _cmp("rfile3")
1702             _cmp(os.path.join("subdir2", "rfile4"))
1703             _cmp(os.path.join("subdir2", "rfile5"))
1704         d.addCallback(_check_cp_r_out)
1705
1706         # and copy it a second time, which ought to overwrite the same files
1707         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1708
1709         # and tahoe-to-tahoe
1710         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1711         d.addCallback(run, "ls")
1712         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1713         d.addCallback(run, "ls", "dir1-copy")
1714         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1715                       ["rfile4", "rfile5"])
1716         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1717         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1718                       ["rfile1", "rfile2", "rfile3"])
1719         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1720         d.addCallback(_check_stdout_against, data="rfile4")
1721
1722         # and copy it a second time, which ought to overwrite the same files
1723         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1724
1725         # tahoe_ls doesn't currently handle the error correctly: it tries to
1726         # JSON-parse a traceback.
1727 ##         def _ls_missing(res):
1728 ##             argv = ["ls"] + nodeargs + ["bogus"]
1729 ##             return self._run_cli(argv)
1730 ##         d.addCallback(_ls_missing)
1731 ##         def _check_ls_missing((out,err)):
1732 ##             print "OUT", out
1733 ##             print "ERR", err
1734 ##             self.failUnlessEqual(err, "")
1735 ##         d.addCallback(_check_ls_missing)
1736
1737         return d
1738
1739     def _run_cli(self, argv, stdin=""):
1740         #print "CLI:", argv
1741         stdout, stderr = StringIO(), StringIO()
1742         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1743                                   stdin=StringIO(stdin),
1744                                   stdout=stdout, stderr=stderr)
1745         def _done(res):
1746             return stdout.getvalue(), stderr.getvalue()
1747         d.addCallback(_done)
1748         return d
1749
1750     def _test_checker(self, res):
1751         ut = upload.Data("too big to be literal" * 200, convergence=None)
1752         d = self._personal_node.add_file(u"big file", ut)
1753
1754         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1755         def _check_dirnode_results(r):
1756             self.failUnless(r.is_healthy())
1757         d.addCallback(_check_dirnode_results)
1758         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1759         d.addCallback(_check_dirnode_results)
1760
1761         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1762         def _got_chk_filenode(n):
1763             self.failUnless(isinstance(n, filenode.FileNode))
1764             d = n.check(Monitor())
1765             def _check_filenode_results(r):
1766                 self.failUnless(r.is_healthy())
1767             d.addCallback(_check_filenode_results)
1768             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1769             d.addCallback(_check_filenode_results)
1770             return d
1771         d.addCallback(_got_chk_filenode)
1772
1773         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1774         def _got_lit_filenode(n):
1775             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1776             d = n.check(Monitor())
1777             def _check_lit_filenode_results(r):
1778                 self.failUnlessEqual(r, None)
1779             d.addCallback(_check_lit_filenode_results)
1780             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1781             d.addCallback(_check_lit_filenode_results)
1782             return d
1783         d.addCallback(_got_lit_filenode)
1784         return d
1785
1786
1787 class MutableChecker(SystemTestMixin, unittest.TestCase, ErrorMixin):
1788
1789     def _run_cli(self, argv):
1790         stdout, stderr = StringIO(), StringIO()
1791         # this can only do synchronous operations
1792         assert argv[0] == "debug"
1793         runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1794         return stdout.getvalue()
1795
1796     def test_good(self):
1797         self.basedir = self.mktemp()
1798         d = self.set_up_nodes()
1799         CONTENTS = "a little bit of data"
1800         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1801         def _created(node):
1802             self.node = node
1803             si = self.node.get_storage_index()
1804         d.addCallback(_created)
1805         # now make sure the webapi verifier sees no problems
1806         def _do_check(res):
1807             url = (self.webish_url +
1808                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1809                    "?t=check&verify=true")
1810             return getPage(url, method="POST")
1811         d.addCallback(_do_check)
1812         def _got_results(out):
1813             self.failUnless("<span>Healthy : Healthy</span>" in out, out)
1814             self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1815             self.failIf("Not Healthy!" in out, out)
1816             self.failIf("Unhealthy" in out, out)
1817             self.failIf("Corrupt Shares" in out, out)
1818         d.addCallback(_got_results)
1819         d.addErrback(self.explain_web_error)
1820         return d
1821
1822     def test_corrupt(self):
1823         self.basedir = self.mktemp()
1824         d = self.set_up_nodes()
1825         CONTENTS = "a little bit of data"
1826         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1827         def _created(node):
1828             self.node = node
1829             si = self.node.get_storage_index()
1830             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1831                                 self.clients[1].basedir])
1832             files = out.split("\n")
1833             # corrupt one of them, using the CLI debug command
1834             f = files[0]
1835             shnum = os.path.basename(f)
1836             nodeid = self.clients[1].nodeid
1837             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1838             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1839             out = self._run_cli(["debug", "corrupt-share", files[0]])
1840         d.addCallback(_created)
1841         # now make sure the webapi verifier notices it
1842         def _do_check(res):
1843             url = (self.webish_url +
1844                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1845                    "?t=check&verify=true")
1846             return getPage(url, method="POST")
1847         d.addCallback(_do_check)
1848         def _got_results(out):
1849             self.failUnless("Not Healthy!" in out, out)
1850             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1851             self.failUnless("Corrupt Shares:" in out, out)
1852         d.addCallback(_got_results)
1853
1854         # now make sure the webapi repairer can fix it
1855         def _do_repair(res):
1856             url = (self.webish_url +
1857                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1858                    "?t=check&verify=true&repair=true")
1859             return getPage(url, method="POST")
1860         d.addCallback(_do_repair)
1861         def _got_repair_results(out):
1862             self.failUnless("<div>Repair successful</div>" in out, out)
1863         d.addCallback(_got_repair_results)
1864         d.addCallback(_do_check)
1865         def _got_postrepair_results(out):
1866             self.failIf("Not Healthy!" in out, out)
1867             self.failUnless("Recoverable Versions: 10*seq" in out, out)
1868         d.addCallback(_got_postrepair_results)
1869         d.addErrback(self.explain_web_error)
1870
1871         return d
1872
1873     def test_delete_share(self):
1874         self.basedir = self.mktemp()
1875         d = self.set_up_nodes()
1876         CONTENTS = "a little bit of data"
1877         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1878         def _created(node):
1879             self.node = node
1880             si = self.node.get_storage_index()
1881             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1882                                 self.clients[1].basedir])
1883             files = out.split("\n")
1884             # corrupt one of them, using the CLI debug command
1885             f = files[0]
1886             shnum = os.path.basename(f)
1887             nodeid = self.clients[1].nodeid
1888             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1889             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1890             os.unlink(files[0])
1891         d.addCallback(_created)
1892         # now make sure the webapi checker notices it
1893         def _do_check(res):
1894             url = (self.webish_url +
1895                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1896                    "?t=check&verify=false")
1897             return getPage(url, method="POST")
1898         d.addCallback(_do_check)
1899         def _got_results(out):
1900             self.failUnless("Not Healthy!" in out, out)
1901             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1902             self.failIf("Corrupt Shares" in out, out)
1903         d.addCallback(_got_results)
1904
1905         # now make sure the webapi repairer can fix it
1906         def _do_repair(res):
1907             url = (self.webish_url +
1908                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1909                    "?t=check&verify=false&repair=true")
1910             return getPage(url, method="POST")
1911         d.addCallback(_do_repair)
1912         def _got_repair_results(out):
1913             self.failUnless("Repair successful" in out)
1914         d.addCallback(_got_repair_results)
1915         d.addCallback(_do_check)
1916         def _got_postrepair_results(out):
1917             self.failIf("Not Healthy!" in out, out)
1918             self.failUnless("Recoverable Versions: 10*seq" in out)
1919         d.addCallback(_got_postrepair_results)
1920         d.addErrback(self.explain_web_error)
1921
1922         return d
1923
1924
1925 class DeepCheckBase(SystemTestMixin, ErrorMixin):
1926
1927     def web_json(self, n, **kwargs):
1928         kwargs["output"] = "json"
1929         d = self.web(n, "POST", **kwargs)
1930         d.addCallback(self.decode_json)
1931         return d
1932
1933     def decode_json(self, (s,url)):
1934         try:
1935             data = simplejson.loads(s)
1936         except ValueError:
1937             self.fail("%s: not JSON: '%s'" % (url, s))
1938         return data
1939
1940     def parse_streamed_json(self, s):
1941         for unit in s.split("\n"):
1942             if not unit:
1943                 # stream should end with a newline, so split returns ""
1944                 continue
1945             yield simplejson.loads(unit)
1946
1947     def web(self, n, method="GET", **kwargs):
1948         # returns (data, url)
1949         url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
1950                + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
1951         d = getPage(url, method=method)
1952         d.addCallback(lambda data: (data,url))
1953         return d
1954
1955     def wait_for_operation(self, ignored, ophandle):
1956         url = self.webish_url + "operations/" + ophandle
1957         url += "?t=status&output=JSON"
1958         d = getPage(url)
1959         def _got(res):
1960             try:
1961                 data = simplejson.loads(res)
1962             except ValueError:
1963                 self.fail("%s: not JSON: '%s'" % (url, res))
1964             if not data["finished"]:
1965                 d = self.stall(delay=1.0)
1966                 d.addCallback(self.wait_for_operation, ophandle)
1967                 return d
1968             return data
1969         d.addCallback(_got)
1970         return d
1971
1972     def get_operation_results(self, ignored, ophandle, output=None):
1973         url = self.webish_url + "operations/" + ophandle
1974         url += "?t=status"
1975         if output:
1976             url += "&output=" + output
1977         d = getPage(url)
1978         def _got(res):
1979             if output and output.lower() == "json":
1980                 try:
1981                     return simplejson.loads(res)
1982                 except ValueError:
1983                     self.fail("%s: not JSON: '%s'" % (url, res))
1984             return res
1985         d.addCallback(_got)
1986         return d
1987
1988     def slow_web(self, n, output=None, **kwargs):
1989         # use ophandle=
1990         handle = base32.b2a(os.urandom(4))
1991         d = self.web(n, "POST", ophandle=handle, **kwargs)
1992         d.addCallback(self.wait_for_operation, handle)
1993         d.addCallback(self.get_operation_results, handle, output=output)
1994         return d
1995
1996
1997 class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
1998     # construct a small directory tree (with one dir, one immutable file, one
1999     # mutable file, one LIT file, and a loop), and then check/examine it in
2000     # various ways.
2001
2002     def set_up_tree(self, ignored):
2003         # 2.9s
2004
2005         # root
2006         #   mutable
2007         #   large
2008         #   small
2009         #   small2
2010         #   loop -> root
2011         c0 = self.clients[0]
2012         d = c0.create_empty_dirnode()
2013         def _created_root(n):
2014             self.root = n
2015             self.root_uri = n.get_uri()
2016         d.addCallback(_created_root)
2017         d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
2018         d.addCallback(lambda n: self.root.set_node(u"mutable", n))
2019         def _created_mutable(n):
2020             self.mutable = n
2021             self.mutable_uri = n.get_uri()
2022         d.addCallback(_created_mutable)
2023
2024         large = upload.Data("Lots of data\n" * 1000, None)
2025         d.addCallback(lambda ign: self.root.add_file(u"large", large))
2026         def _created_large(n):
2027             self.large = n
2028             self.large_uri = n.get_uri()
2029         d.addCallback(_created_large)
2030
2031         small = upload.Data("Small enough for a LIT", None)
2032         d.addCallback(lambda ign: self.root.add_file(u"small", small))
2033         def _created_small(n):
2034             self.small = n
2035             self.small_uri = n.get_uri()
2036         d.addCallback(_created_small)
2037
2038         small2 = upload.Data("Small enough for a LIT too", None)
2039         d.addCallback(lambda ign: self.root.add_file(u"small2", small2))
2040         def _created_small2(n):
2041             self.small2 = n
2042             self.small2_uri = n.get_uri()
2043         d.addCallback(_created_small2)
2044
2045         d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
2046         return d
2047
2048     def check_is_healthy(self, cr, n, where, incomplete=False):
2049         self.failUnless(ICheckResults.providedBy(cr), where)
2050         self.failUnless(cr.is_healthy(), where)
2051         self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
2052                              where)
2053         self.failUnlessEqual(cr.get_storage_index_string(),
2054                              base32.b2a(n.get_storage_index()), where)
2055         needs_rebalancing = bool( len(self.clients) < 10 )
2056         if not incomplete:
2057             self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, str((where, cr, cr.get_data())))
2058         d = cr.get_data()
2059         self.failUnlessEqual(d["count-shares-good"], 10, where)
2060         self.failUnlessEqual(d["count-shares-needed"], 3, where)
2061         self.failUnlessEqual(d["count-shares-expected"], 10, where)
2062         if not incomplete:
2063             self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
2064         self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
2065         self.failUnlessEqual(d["list-corrupt-shares"], [], where)
2066         if not incomplete:
2067             self.failUnlessEqual(sorted(d["servers-responding"]),
2068                                  sorted([c.nodeid for c in self.clients]),
2069                                  where)
2070             self.failUnless("sharemap" in d, str((where, d)))
2071             all_serverids = set()
2072             for (shareid, serverids) in d["sharemap"].items():
2073                 all_serverids.update(serverids)
2074             self.failUnlessEqual(sorted(all_serverids),
2075                                  sorted([c.nodeid for c in self.clients]),
2076                                  where)
2077
2078         self.failUnlessEqual(d["count-wrong-shares"], 0, where)
2079         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2080         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2081
2082
2083     def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
2084         self.failUnless(ICheckAndRepairResults.providedBy(cr), (where, cr))
2085         self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
2086         self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
2087         self.failUnless(cr.get_post_repair_results().is_healthy(), where)
2088         self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
2089         self.failIf(cr.get_repair_attempted(), where)
2090
2091     def deep_check_is_healthy(self, cr, num_healthy, where):
2092         self.failUnless(IDeepCheckResults.providedBy(cr))
2093         self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
2094                              num_healthy, where)
2095
2096     def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
2097         self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
2098         c = cr.get_counters()
2099         self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
2100                              num_healthy, where)
2101         self.failUnlessEqual(c["count-objects-healthy-post-repair"],
2102                              num_healthy, where)
2103         self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
2104
2105     def test_good(self):
2106         self.basedir = self.mktemp()
2107         d = self.set_up_nodes()
2108         d.addCallback(self.set_up_tree)
2109         d.addCallback(self.do_stats)
2110         d.addCallback(self.do_web_stream_manifest)
2111         d.addCallback(self.do_test_check_good)
2112         d.addCallback(self.do_test_web_good)
2113         d.addCallback(self.do_test_cli_good)
2114         d.addErrback(self.explain_web_error)
2115         d.addErrback(self.explain_error)
2116         return d
2117
2118     def do_stats(self, ignored):
2119         d = defer.succeed(None)
2120         d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
2121         d.addCallback(self.check_stats_good)
2122         return d
2123
2124     def check_stats_good(self, s):
2125         self.failUnlessEqual(s["count-directories"], 1)
2126         self.failUnlessEqual(s["count-files"], 4)
2127         self.failUnlessEqual(s["count-immutable-files"], 1)
2128         self.failUnlessEqual(s["count-literal-files"], 2)
2129         self.failUnlessEqual(s["count-mutable-files"], 1)
2130         # don't check directories: their size will vary
2131         # s["largest-directory"]
2132         # s["size-directories"]
2133         self.failUnlessEqual(s["largest-directory-children"], 5)
2134         self.failUnlessEqual(s["largest-immutable-file"], 13000)
2135         # to re-use this function for both the local
2136         # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
2137         # coerce the result into a list of tuples. dirnode.start_deep_stats()
2138         # returns a list of tuples, but JSON only knows about lists., so
2139         # t=start-deep-stats returns a list of lists.
2140         histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
2141         self.failUnlessEqual(histogram, [(11, 31, 2),
2142                                          (10001, 31622, 1),
2143                                          ])
2144         self.failUnlessEqual(s["size-immutable-files"], 13000)
2145         self.failUnlessEqual(s["size-literal-files"], 48)
2146
2147     def do_web_stream_manifest(self, ignored):
2148         d = self.web(self.root, method="POST", t="stream-manifest")
2149         def _check((res,url)):
2150             units = list(self.parse_streamed_json(res))
2151             files = [u for u in units if u["type"] in ("file", "directory")]
2152             assert units[-1]["type"] == "stats"
2153             stats = units[-1]["stats"]
2154             self.failUnlessEqual(len(files), 5)
2155             # [root,mutable,large] are distributed, [small,small2] are not
2156             self.failUnlessEqual(len([f for f in files
2157                                       if f["verifycap"] is not None]), 3)
2158             self.failUnlessEqual(len([f for f in files
2159                                       if f["verifycap"] is None]), 2)
2160             self.failUnlessEqual(len([f for f in files
2161                                       if f["repaircap"] is not None]), 3)
2162             self.failUnlessEqual(len([f for f in files
2163                                       if f["repaircap"] is None]), 2)
2164             self.failUnlessEqual(len([f for f in files
2165                                       if f["storage-index"] is not None]), 3)
2166             self.failUnlessEqual(len([f for f in files
2167                                       if f["storage-index"] is None]), 2)
2168             # make sure that a mutable file has filecap==repaircap!=verifycap
2169             mutable = [f for f in files
2170                        if f["cap"] is not None
2171                        and f["cap"].startswith("URI:SSK:")][0]
2172             self.failUnlessEqual(mutable["cap"], self.mutable_uri)
2173             self.failIfEqual(mutable["cap"], mutable["verifycap"])
2174             self.failUnlessEqual(mutable["cap"], mutable["repaircap"])
2175             # for immutable file, verifycap==repaircap!=filecap
2176             large = [f for f in files
2177                        if f["cap"] is not None
2178                        and f["cap"].startswith("URI:CHK:")][0]
2179             self.failUnlessEqual(large["cap"], self.large_uri)
2180             self.failIfEqual(large["cap"], large["verifycap"])
2181             self.failUnlessEqual(large["verifycap"], large["repaircap"])
2182             self.check_stats_good(stats)
2183         d.addCallback(_check)
2184         return d
2185
2186     def do_test_check_good(self, ignored):
2187         d = defer.succeed(None)
2188         # check the individual items
2189         d.addCallback(lambda ign: self.root.check(Monitor()))
2190         d.addCallback(self.check_is_healthy, self.root, "root")
2191         d.addCallback(lambda ign: self.mutable.check(Monitor()))
2192         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2193         d.addCallback(lambda ign: self.large.check(Monitor()))
2194         d.addCallback(self.check_is_healthy, self.large, "large")
2195         d.addCallback(lambda ign: self.small.check(Monitor()))
2196         d.addCallback(self.failUnlessEqual, None, "small")
2197         d.addCallback(lambda ign: self.small2.check(Monitor()))
2198         d.addCallback(self.failUnlessEqual, None, "small2")
2199
2200         # and again with verify=True
2201         d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2202         d.addCallback(self.check_is_healthy, self.root, "root")
2203         d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2204         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2205         d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2206         d.addCallback(self.check_is_healthy, self.large, "large", incomplete=True)
2207         d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2208         d.addCallback(self.failUnlessEqual, None, "small")
2209         d.addCallback(lambda ign: self.small2.check(Monitor(), verify=True))
2210         d.addCallback(self.failUnlessEqual, None, "small2")
2211
2212         # and check_and_repair(), which should be a nop
2213         d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2214         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2215         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2216         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2217         #TODO d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2218         #TODO d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2219         #TODO d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2220         #TODO d.addCallback(self.failUnlessEqual, None, "small")
2221         #TODO d.addCallback(lambda ign: self.small2.check_and_repair(Monitor()))
2222         #TODO d.addCallback(self.failUnlessEqual, None, "small2")
2223
2224         # check_and_repair(verify=True)
2225         d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2226         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2227         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2228         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2229         #TODO d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2230         #TODO d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2231         #TODO               incomplete=True)
2232         #TODO d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2233         #TODO d.addCallback(self.failUnlessEqual, None, "small")
2234         #TODO d.addCallback(lambda ign: self.small2.check_and_repair(Monitor(), verify=True))
2235         #TODO d.addCallback(self.failUnlessEqual, None, "small2")
2236
2237
2238         # now deep-check the root, with various verify= and repair= options
2239         d.addCallback(lambda ign:
2240                       self.root.start_deep_check().when_done())
2241         d.addCallback(self.deep_check_is_healthy, 3, "root")
2242         d.addCallback(lambda ign:
2243                       self.root.start_deep_check(verify=True).when_done())
2244         d.addCallback(self.deep_check_is_healthy, 3, "root")
2245         d.addCallback(lambda ign:
2246                       self.root.start_deep_check_and_repair().when_done())
2247         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2248         d.addCallback(lambda ign:
2249                       self.root.start_deep_check_and_repair(verify=True).when_done())
2250         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2251
2252         # and finally, start a deep-check, but then cancel it.
2253         d.addCallback(lambda ign: self.root.start_deep_check())
2254         def _checking(monitor):
2255             monitor.cancel()
2256             d = monitor.when_done()
2257             # this should fire as soon as the next dirnode.list finishes.
2258             # TODO: add a counter to measure how many list() calls are made,
2259             # assert that no more than one gets to run before the cancel()
2260             # takes effect.
2261             def _finished_normally(res):
2262                 self.fail("this was supposed to fail, not finish normally")
2263             def _cancelled(f):
2264                 f.trap(OperationCancelledError)
2265             d.addCallbacks(_finished_normally, _cancelled)
2266             return d
2267         d.addCallback(_checking)
2268
2269         return d
2270
2271     def json_check_is_healthy(self, data, n, where, incomplete=False):
2272
2273         self.failUnlessEqual(data["storage-index"],
2274                              base32.b2a(n.get_storage_index()), where)
2275         self.failUnless("summary" in data, (where, data))
2276         self.failUnlessEqual(data["summary"].lower(), "healthy",
2277                              "%s: '%s'" % (where, data["summary"]))
2278         r = data["results"]
2279         self.failUnlessEqual(r["healthy"], True, where)
2280         needs_rebalancing = bool( len(self.clients) < 10 )
2281         if not incomplete:
2282             self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2283         self.failUnlessEqual(r["count-shares-good"], 10, where)
2284         self.failUnlessEqual(r["count-shares-needed"], 3, where)
2285         self.failUnlessEqual(r["count-shares-expected"], 10, where)
2286         if not incomplete:
2287             self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2288         self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2289         self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2290         if not incomplete:
2291             self.failUnlessEqual(sorted(r["servers-responding"]),
2292                                  sorted([idlib.nodeid_b2a(c.nodeid)
2293                                          for c in self.clients]), where)
2294             self.failUnless("sharemap" in r, where)
2295             all_serverids = set()
2296             for (shareid, serverids_s) in r["sharemap"].items():
2297                 all_serverids.update(serverids_s)
2298             self.failUnlessEqual(sorted(all_serverids),
2299                                  sorted([idlib.nodeid_b2a(c.nodeid)
2300                                          for c in self.clients]), where)
2301         self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2302         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2303         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2304
2305     def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2306         self.failUnlessEqual(data["storage-index"],
2307                              base32.b2a(n.get_storage_index()), where)
2308         self.failUnlessEqual(data["repair-attempted"], False, where)
2309         self.json_check_is_healthy(data["pre-repair-results"],
2310                                    n, where, incomplete)
2311         self.json_check_is_healthy(data["post-repair-results"],
2312                                    n, where, incomplete)
2313
2314     def json_full_deepcheck_is_healthy(self, data, n, where):
2315         self.failUnlessEqual(data["root-storage-index"],
2316                              base32.b2a(n.get_storage_index()), where)
2317         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2318         self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2319         self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2320         self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2321         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2322         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2323         self.json_check_stats_good(data["stats"], where)
2324
2325     def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2326         self.failUnlessEqual(data["root-storage-index"],
2327                              base32.b2a(n.get_storage_index()), where)
2328         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2329
2330         self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2331         self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2332         self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2333
2334         self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2335         self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2336         self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2337
2338         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2339         self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2340         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2341
2342         self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2343         self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2344         self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2345
2346
2347     def json_check_lit(self, data, n, where):
2348         self.failUnlessEqual(data["storage-index"], "", where)
2349         self.failUnlessEqual(data["results"]["healthy"], True, where)
2350
2351     def json_check_stats_good(self, data, where):
2352         self.check_stats_good(data)
2353
2354     def do_test_web_good(self, ignored):
2355         d = defer.succeed(None)
2356
2357         # stats
2358         d.addCallback(lambda ign:
2359                       self.slow_web(self.root,
2360                                     t="start-deep-stats", output="json"))
2361         d.addCallback(self.json_check_stats_good, "deep-stats")
2362
2363         # check, no verify
2364         d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2365         d.addCallback(self.json_check_is_healthy, self.root, "root")
2366         d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2367         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2368         d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2369         d.addCallback(self.json_check_is_healthy, self.large, "large")
2370         d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2371         d.addCallback(self.json_check_lit, self.small, "small")
2372         d.addCallback(lambda ign: self.web_json(self.small2, t="check"))
2373         d.addCallback(self.json_check_lit, self.small2, "small2")
2374
2375         # check and verify
2376         d.addCallback(lambda ign:
2377                       self.web_json(self.root, t="check", verify="true"))
2378         d.addCallback(self.json_check_is_healthy, self.root, "root+v")
2379         d.addCallback(lambda ign:
2380                       self.web_json(self.mutable, t="check", verify="true"))
2381         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable+v")
2382         d.addCallback(lambda ign:
2383                       self.web_json(self.large, t="check", verify="true"))
2384         d.addCallback(self.json_check_is_healthy, self.large, "large+v",
2385                       incomplete=True)
2386         d.addCallback(lambda ign:
2387                       self.web_json(self.small, t="check", verify="true"))
2388         d.addCallback(self.json_check_lit, self.small, "small+v")
2389         d.addCallback(lambda ign:
2390                       self.web_json(self.small2, t="check", verify="true"))
2391         d.addCallback(self.json_check_lit, self.small2, "small2+v")
2392
2393         # check and repair, no verify
2394         d.addCallback(lambda ign:
2395                       self.web_json(self.root, t="check", repair="true"))
2396         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+r")
2397         d.addCallback(lambda ign:
2398                       self.web_json(self.mutable, t="check", repair="true"))
2399         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+r")
2400         d.addCallback(lambda ign:
2401                       self.web_json(self.large, t="check", repair="true"))
2402         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+r")
2403         d.addCallback(lambda ign:
2404                       self.web_json(self.small, t="check", repair="true"))
2405         d.addCallback(self.json_check_lit, self.small, "small+r")
2406         d.addCallback(lambda ign:
2407                       self.web_json(self.small2, t="check", repair="true"))
2408         d.addCallback(self.json_check_lit, self.small2, "small2+r")
2409
2410         # check+verify+repair
2411         d.addCallback(lambda ign:
2412                       self.web_json(self.root, t="check", repair="true", verify="true"))
2413         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root+vr")
2414         d.addCallback(lambda ign:
2415                       self.web_json(self.mutable, t="check", repair="true", verify="true"))
2416         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable+vr")
2417         d.addCallback(lambda ign:
2418                       self.web_json(self.large, t="check", repair="true", verify="true"))
2419         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large+vr", incomplete=True)
2420         d.addCallback(lambda ign:
2421                       self.web_json(self.small, t="check", repair="true", verify="true"))
2422         d.addCallback(self.json_check_lit, self.small, "small+vr")
2423         d.addCallback(lambda ign:
2424                       self.web_json(self.small2, t="check", repair="true", verify="true"))
2425         d.addCallback(self.json_check_lit, self.small2, "small2+vr")
2426
2427         # now run a deep-check, with various verify= and repair= flags
2428         d.addCallback(lambda ign:
2429                       self.slow_web(self.root, t="start-deep-check", output="json"))
2430         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+d")
2431         d.addCallback(lambda ign:
2432                       self.slow_web(self.root, t="start-deep-check", verify="true",
2433                                     output="json"))
2434         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root+dv")
2435         d.addCallback(lambda ign:
2436                       self.slow_web(self.root, t="start-deep-check", repair="true",
2437                                     output="json"))
2438         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dr")
2439         d.addCallback(lambda ign:
2440                       self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2441         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root+dvr")
2442
2443         # now look at t=info
2444         d.addCallback(lambda ign: self.web(self.root, t="info"))
2445         # TODO: examine the output
2446         d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2447         d.addCallback(lambda ign: self.web(self.large, t="info"))
2448         d.addCallback(lambda ign: self.web(self.small, t="info"))
2449         d.addCallback(lambda ign: self.web(self.small2, t="info"))
2450
2451         return d
2452
2453     def _run_cli(self, argv, stdin=""):
2454         #print "CLI:", argv
2455         stdout, stderr = StringIO(), StringIO()
2456         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
2457                                   stdin=StringIO(stdin),
2458                                   stdout=stdout, stderr=stderr)
2459         def _done(res):
2460             return stdout.getvalue(), stderr.getvalue()
2461         d.addCallback(_done)
2462         return d
2463
2464     def do_test_cli_good(self, ignored):
2465         basedir = self.getdir("client0")
2466         d = self._run_cli(["manifest",
2467                            "--node-directory", basedir,
2468                            self.root_uri])
2469         def _check((out,err)):
2470             self.failUnlessEqual(err, "")
2471             lines = [l for l in out.split("\n") if l]
2472             self.failUnlessEqual(len(lines), 5)
2473             caps = {}
2474             for l in lines:
2475                 try:
2476                     cap, path = l.split(None, 1)
2477                 except ValueError:
2478                     cap = l.strip()
2479                     path = ""
2480                 caps[cap] = path
2481             self.failUnless(self.root.get_uri() in caps)
2482             self.failUnlessEqual(caps[self.root.get_uri()], "")
2483             self.failUnlessEqual(caps[self.mutable.get_uri()], "mutable")
2484             self.failUnlessEqual(caps[self.large.get_uri()], "large")
2485             self.failUnlessEqual(caps[self.small.get_uri()], "small")
2486             self.failUnlessEqual(caps[self.small2.get_uri()], "small2")
2487         d.addCallback(_check)
2488
2489         d.addCallback(lambda res:
2490                       self._run_cli(["manifest",
2491                                      "--node-directory", basedir,
2492                                      "--storage-index", self.root_uri]))
2493         def _check2((out,err)):
2494             self.failUnlessEqual(err, "")
2495             lines = [l for l in out.split("\n") if l]
2496             self.failUnlessEqual(len(lines), 3)
2497             self.failUnless(base32.b2a(self.root.get_storage_index()) in lines)
2498             self.failUnless(base32.b2a(self.mutable.get_storage_index()) in lines)
2499             self.failUnless(base32.b2a(self.large.get_storage_index()) in lines)
2500         d.addCallback(_check2)
2501
2502         d.addCallback(lambda res:
2503                       self._run_cli(["manifest",
2504                                      "--node-directory", basedir,
2505                                      "--raw", self.root_uri]))
2506         def _check2r((out,err)):
2507             self.failUnlessEqual(err, "")
2508             data = simplejson.loads(out)
2509             sis = data["storage-index"]
2510             self.failUnlessEqual(len(sis), 3)
2511             self.failUnless(base32.b2a(self.root.get_storage_index()) in sis)
2512             self.failUnless(base32.b2a(self.mutable.get_storage_index()) in sis)
2513             self.failUnless(base32.b2a(self.large.get_storage_index()) in sis)
2514             self.failUnlessEqual(data["stats"]["count-files"], 4)
2515             self.failUnlessEqual(data["origin"],
2516                                  base32.b2a(self.root.get_storage_index()))
2517             verifycaps = data["verifycaps"]
2518             self.failUnlessEqual(len(verifycaps), 3)
2519             self.failUnless(self.root.get_verify_cap().to_string() in verifycaps)
2520             self.failUnless(self.mutable.get_verify_cap().to_string() in verifycaps)
2521             self.failUnless(self.large.get_verify_cap().to_string() in verifycaps)
2522         d.addCallback(_check2r)
2523
2524         d.addCallback(lambda res:
2525                       self._run_cli(["stats",
2526                                      "--node-directory", basedir,
2527                                      self.root_uri]))
2528         def _check3((out,err)):
2529             lines = [l.strip() for l in out.split("\n") if l]
2530             self.failUnless("count-immutable-files: 1" in lines)
2531             self.failUnless("count-mutable-files: 1" in lines)
2532             self.failUnless("count-literal-files: 2" in lines)
2533             self.failUnless("count-files: 4" in lines)
2534             self.failUnless("count-directories: 1" in lines)
2535             self.failUnless("size-immutable-files: 13000    (13.00 kB, 12.70 kiB)" in lines, lines)
2536             self.failUnless("size-literal-files: 48" in lines)
2537             self.failUnless("   11-31    : 2    (31 B, 31 B)".strip() in lines)
2538             self.failUnless("10001-31622 : 1    (31.62 kB, 30.88 kiB)".strip() in lines)
2539         d.addCallback(_check3)
2540
2541         d.addCallback(lambda res:
2542                       self._run_cli(["stats",
2543                                      "--node-directory", basedir,
2544                                      "--raw",
2545                                      self.root_uri]))
2546         def _check4((out,err)):
2547             data = simplejson.loads(out)
2548             self.failUnlessEqual(data["count-immutable-files"], 1)
2549             self.failUnlessEqual(data["count-immutable-files"], 1)
2550             self.failUnlessEqual(data["count-mutable-files"], 1)
2551             self.failUnlessEqual(data["count-literal-files"], 2)
2552             self.failUnlessEqual(data["count-files"], 4)
2553             self.failUnlessEqual(data["count-directories"], 1)
2554             self.failUnlessEqual(data["size-immutable-files"], 13000)
2555             self.failUnlessEqual(data["size-literal-files"], 48)
2556             self.failUnless([11,31,2] in data["size-files-histogram"])
2557             self.failUnless([10001,31622,1] in data["size-files-histogram"])
2558         d.addCallback(_check4)
2559
2560         return d
2561
2562
2563 class DeepCheckWebBad(DeepCheckBase, unittest.TestCase):
2564
2565     def test_bad(self):
2566         self.basedir = self.mktemp()
2567         d = self.set_up_nodes()
2568         d.addCallback(self.set_up_damaged_tree)
2569         d.addCallback(self.do_check)
2570         d.addCallback(self.do_deepcheck)
2571         d.addCallback(self.do_test_web_bad)
2572         d.addErrback(self.explain_web_error)
2573         d.addErrback(self.explain_error)
2574         return d
2575
2576
2577
2578     def set_up_damaged_tree(self, ignored):
2579         # 6.4s
2580
2581         # root
2582         #   mutable-good
2583         #   mutable-missing-shares
2584         #   mutable-corrupt-shares
2585         #   mutable-unrecoverable
2586         #   large-good
2587         #   large-missing-shares
2588         #   large-corrupt-shares
2589         #   large-unrecoverable
2590
2591         self.nodes = {}
2592
2593         c0 = self.clients[0]
2594         d = c0.create_empty_dirnode()
2595         def _created_root(n):
2596             self.root = n
2597             self.root_uri = n.get_uri()
2598         d.addCallback(_created_root)
2599         d.addCallback(self.create_mangled, "mutable-good")
2600         d.addCallback(self.create_mangled, "mutable-missing-shares")
2601         d.addCallback(self.create_mangled, "mutable-corrupt-shares")
2602         d.addCallback(self.create_mangled, "mutable-unrecoverable")
2603         d.addCallback(self.create_mangled, "large-good")
2604         d.addCallback(self.create_mangled, "large-missing-shares")
2605         d.addCallback(self.create_mangled, "large-corrupt-shares")
2606         d.addCallback(self.create_mangled, "large-unrecoverable")
2607
2608         return d
2609
2610
2611     def create_mangled(self, ignored, name):
2612         nodetype, mangletype = name.split("-", 1)
2613         if nodetype == "mutable":
2614             d = self.clients[0].create_mutable_file("mutable file contents")
2615             d.addCallback(lambda n: self.root.set_node(unicode(name), n))
2616         elif nodetype == "large":
2617             large = upload.Data("Lots of data\n" * 1000 + name + "\n", None)
2618             d = self.root.add_file(unicode(name), large)
2619         elif nodetype == "small":
2620             small = upload.Data("Small enough for a LIT", None)
2621             d = self.root.add_file(unicode(name), small)
2622
2623         def _stash_node(node):
2624             self.nodes[name] = node
2625             return node
2626         d.addCallback(_stash_node)
2627
2628         if mangletype == "good":
2629             pass
2630         elif mangletype == "missing-shares":
2631             d.addCallback(self._delete_some_shares)
2632         elif mangletype == "corrupt-shares":
2633             d.addCallback(self._corrupt_some_shares)
2634         else:
2635             assert mangletype == "unrecoverable"
2636             d.addCallback(self._delete_most_shares)
2637
2638         return d
2639
2640     def _run_cli(self, argv):
2641         stdout, stderr = StringIO(), StringIO()
2642         # this can only do synchronous operations
2643         assert argv[0] == "debug"
2644         runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
2645         return stdout.getvalue()
2646
2647     def _find_shares(self, node):
2648         si = node.get_storage_index()
2649         out = self._run_cli(["debug", "find-shares", base32.b2a(si)] +
2650                             [c.basedir for c in self.clients])
2651         files = out.split("\n")
2652         return [f for f in files if f]
2653
2654     def _delete_some_shares(self, node):
2655         shares = self._find_shares(node)
2656         os.unlink(shares[0])
2657         os.unlink(shares[1])
2658
2659     def _corrupt_some_shares(self, node):
2660         shares = self._find_shares(node)
2661         self._run_cli(["debug", "corrupt-share", shares[0]])
2662         self._run_cli(["debug", "corrupt-share", shares[1]])
2663
2664     def _delete_most_shares(self, node):
2665         shares = self._find_shares(node)
2666         for share in shares[1:]:
2667             os.unlink(share)
2668
2669
2670     def check_is_healthy(self, cr, where):
2671         try:
2672             self.failUnless(ICheckResults.providedBy(cr), (cr, type(cr), where))
2673             self.failUnless(cr.is_healthy(), (cr.get_report(), cr.is_healthy(), cr.get_summary(), where))
2674             self.failUnless(cr.is_recoverable(), where)
2675             d = cr.get_data()
2676             self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2677             self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2678             return cr
2679         except Exception, le:
2680             le.args = tuple(le.args + (where,))
2681             raise
2682
2683     def check_is_missing_shares(self, cr, where):
2684         self.failUnless(ICheckResults.providedBy(cr), where)
2685         self.failIf(cr.is_healthy(), where)
2686         self.failUnless(cr.is_recoverable(), where)
2687         d = cr.get_data()
2688         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2689         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2690         return cr
2691
2692     def check_has_corrupt_shares(self, cr, where):
2693         # by "corrupt-shares" we mean the file is still recoverable
2694         self.failUnless(ICheckResults.providedBy(cr), where)
2695         d = cr.get_data()
2696         self.failIf(cr.is_healthy(), (where, cr))
2697         self.failUnless(cr.is_recoverable(), where)
2698         d = cr.get_data()
2699         self.failUnless(d["count-shares-good"] < 10, where)
2700         self.failUnless(d["count-corrupt-shares"], where)
2701         self.failUnless(d["list-corrupt-shares"], where)
2702         return cr
2703
2704     def check_is_unrecoverable(self, cr, where):
2705         self.failUnless(ICheckResults.providedBy(cr), where)
2706         d = cr.get_data()
2707         self.failIf(cr.is_healthy(), where)
2708         self.failIf(cr.is_recoverable(), where)
2709         self.failUnless(d["count-shares-good"] < d["count-shares-needed"], (d["count-shares-good"], d["count-shares-needed"], where))
2710         self.failUnlessEqual(d["count-recoverable-versions"], 0, where)
2711         self.failUnlessEqual(d["count-unrecoverable-versions"], 1, where)
2712         return cr
2713
2714     def do_check(self, ignored):
2715         d = defer.succeed(None)
2716
2717         # check the individual items, without verification. This will not
2718         # detect corrupt shares.
2719         def _check(which, checker):
2720             d = self.nodes[which].check(Monitor())
2721             d.addCallback(checker, which + "--check")
2722             return d
2723
2724         d.addCallback(lambda ign: _check("mutable-good", self.check_is_healthy))
2725         d.addCallback(lambda ign: _check("mutable-missing-shares",
2726                                          self.check_is_missing_shares))
2727         d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2728                                          self.check_is_healthy))
2729         d.addCallback(lambda ign: _check("mutable-unrecoverable",
2730                                          self.check_is_unrecoverable))
2731         d.addCallback(lambda ign: _check("large-good", self.check_is_healthy))
2732         d.addCallback(lambda ign: _check("large-missing-shares",
2733                                          self.check_is_missing_shares))
2734         d.addCallback(lambda ign: _check("large-corrupt-shares",
2735                                          self.check_is_healthy))
2736         d.addCallback(lambda ign: _check("large-unrecoverable",
2737                                          self.check_is_unrecoverable))
2738
2739         # and again with verify=True, which *does* detect corrupt shares.
2740         def _checkv(which, checker):
2741             d = self.nodes[which].check(Monitor(), verify=True)
2742             d.addCallback(checker, which + "--check-and-verify")
2743             return d
2744
2745         d.addCallback(lambda ign: _checkv("mutable-good", self.check_is_healthy))
2746         d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2747                                          self.check_is_missing_shares))
2748         d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2749                                          self.check_has_corrupt_shares))
2750         d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2751                                          self.check_is_unrecoverable))
2752         d.addCallback(lambda ign: _checkv("large-good", self.check_is_healthy))
2753         d.addCallback(lambda ign: _checkv("large-missing-shares", self.check_is_missing_shares))
2754         d.addCallback(lambda ign: _checkv("large-corrupt-shares", self.check_has_corrupt_shares))
2755         d.addCallback(lambda ign: _checkv("large-unrecoverable",
2756                                          self.check_is_unrecoverable))
2757
2758         return d
2759
2760     def do_deepcheck(self, ignored):
2761         d = defer.succeed(None)
2762
2763         # now deep-check the root, with various verify= and repair= options
2764         d.addCallback(lambda ign:
2765                       self.root.start_deep_check().when_done())
2766         def _check1(cr):
2767             self.failUnless(IDeepCheckResults.providedBy(cr))
2768             c = cr.get_counters()
2769             self.failUnlessEqual(c["count-objects-checked"], 9)
2770             self.failUnlessEqual(c["count-objects-healthy"], 5)
2771             self.failUnlessEqual(c["count-objects-unhealthy"], 4)
2772             self.failUnlessEqual(c["count-objects-unrecoverable"], 2)
2773         d.addCallback(_check1)
2774
2775         d.addCallback(lambda ign:
2776                       self.root.start_deep_check(verify=True).when_done())
2777         def _check2(cr):
2778             self.failUnless(IDeepCheckResults.providedBy(cr))
2779             c = cr.get_counters()
2780             self.failUnlessEqual(c["count-objects-checked"], 9)
2781             self.failUnlessEqual(c["count-objects-healthy"], 3)
2782             self.failUnlessEqual(c["count-objects-unhealthy"], 6)
2783             self.failUnlessEqual(c["count-objects-healthy"], 3) # root, mutable good, large good
2784             self.failUnlessEqual(c["count-objects-unrecoverable"], 2) # mutable unrecoverable, large unrecoverable
2785         d.addCallback(_check2)
2786
2787         return d
2788
2789     def json_is_healthy(self, data, where):
2790         r = data["results"]
2791         self.failUnless(r["healthy"], where)
2792         self.failUnless(r["recoverable"], where)
2793         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2794         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2795
2796     def json_is_missing_shares(self, data, where):
2797         r = data["results"]
2798         self.failIf(r["healthy"], where)
2799         self.failUnless(r["recoverable"], where)
2800         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2801         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2802
2803     def json_has_corrupt_shares(self, data, where):
2804         # by "corrupt-shares" we mean the file is still recoverable
2805         r = data["results"]
2806         self.failIf(r["healthy"], where)
2807         self.failUnless(r["recoverable"], where)
2808         self.failUnless(r["count-shares-good"] < 10, where)
2809         self.failUnless(r["count-corrupt-shares"], where)
2810         self.failUnless(r["list-corrupt-shares"], where)
2811
2812     def json_is_unrecoverable(self, data, where):
2813         r = data["results"]
2814         self.failIf(r["healthy"], where)
2815         self.failIf(r["recoverable"], where)
2816         self.failUnless(r["count-shares-good"] < r["count-shares-needed"],
2817                         where)
2818         self.failUnlessEqual(r["count-recoverable-versions"], 0, where)
2819         self.failUnlessEqual(r["count-unrecoverable-versions"], 1, where)
2820
2821     def do_test_web_bad(self, ignored):
2822         d = defer.succeed(None)
2823
2824         # check, no verify
2825         def _check(which, checker):
2826             d = self.web_json(self.nodes[which], t="check")
2827             d.addCallback(checker, which + "--webcheck")
2828             return d
2829
2830         d.addCallback(lambda ign: _check("mutable-good",
2831                                          self.json_is_healthy))
2832         d.addCallback(lambda ign: _check("mutable-missing-shares",
2833                                          self.json_is_missing_shares))
2834         d.addCallback(lambda ign: _check("mutable-corrupt-shares",
2835                                          self.json_is_healthy))
2836         d.addCallback(lambda ign: _check("mutable-unrecoverable",
2837                                          self.json_is_unrecoverable))
2838         d.addCallback(lambda ign: _check("large-good",
2839                                          self.json_is_healthy))
2840         d.addCallback(lambda ign: _check("large-missing-shares",
2841                                          self.json_is_missing_shares))
2842         d.addCallback(lambda ign: _check("large-corrupt-shares",
2843                                          self.json_is_healthy))
2844         d.addCallback(lambda ign: _check("large-unrecoverable",
2845                                          self.json_is_unrecoverable))
2846
2847         # check and verify
2848         def _checkv(which, checker):
2849             d = self.web_json(self.nodes[which], t="check", verify="true")
2850             d.addCallback(checker, which + "--webcheck-and-verify")
2851             return d
2852
2853         d.addCallback(lambda ign: _checkv("mutable-good",
2854                                           self.json_is_healthy))
2855         d.addCallback(lambda ign: _checkv("mutable-missing-shares",
2856                                          self.json_is_missing_shares))
2857         d.addCallback(lambda ign: _checkv("mutable-corrupt-shares",
2858                                          self.json_has_corrupt_shares))
2859         d.addCallback(lambda ign: _checkv("mutable-unrecoverable",
2860                                          self.json_is_unrecoverable))
2861         d.addCallback(lambda ign: _checkv("large-good",
2862                                           self.json_is_healthy))
2863         d.addCallback(lambda ign: _checkv("large-missing-shares", self.json_is_missing_shares))
2864         d.addCallback(lambda ign: _checkv("large-corrupt-shares", self.json_has_corrupt_shares))
2865         d.addCallback(lambda ign: _checkv("large-unrecoverable",
2866                                          self.json_is_unrecoverable))
2867
2868         return d