]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
test_system: update test to match web checker results
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri, storage, offloaded
12 from allmydata.immutable import download, upload, filenode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17      ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
18      IDeepCheckAndRepairResults
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
26
27 from allmydata.test.common import SystemTestMixin, WebErrorMixin
28
29 LARGE_DATA = """
30 This is some data to publish to the virtual drive, which needs to be large
31 enough to not fit inside a LIT uri.
32 """
33
34 class CountingDataUploadable(upload.Data):
35     bytes_read = 0
36     interrupt_after = None
37     interrupt_after_d = None
38
39     def read(self, length):
40         self.bytes_read += length
41         if self.interrupt_after is not None:
42             if self.bytes_read > self.interrupt_after:
43                 self.interrupt_after = None
44                 self.interrupt_after_d.callback(self)
45         return upload.Data.read(self, length)
46
47 class GrabEverythingConsumer:
48     implements(IConsumer)
49
50     def __init__(self):
51         self.contents = ""
52
53     def registerProducer(self, producer, streaming):
54         assert streaming
55         assert IPushProducer.providedBy(producer)
56
57     def write(self, data):
58         self.contents += data
59
60     def unregisterProducer(self):
61         pass
62
63 class SystemTest(SystemTestMixin, unittest.TestCase):
64
65     def test_connections(self):
66         self.basedir = "system/SystemTest/test_connections"
67         d = self.set_up_nodes()
68         self.extra_node = None
69         d.addCallback(lambda res: self.add_extra_node(self.numclients))
70         def _check(extra_node):
71             self.extra_node = extra_node
72             for c in self.clients:
73                 all_peerids = list(c.get_all_peerids())
74                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
75                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
76                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
77
78         d.addCallback(_check)
79         def _shutdown_extra_node(res):
80             if self.extra_node:
81                 return self.extra_node.stopService()
82             return res
83         d.addBoth(_shutdown_extra_node)
84         return d
85     test_connections.timeout = 300
86     # test_connections is subsumed by test_upload_and_download, and takes
87     # quite a while to run on a slow machine (because of all the TLS
88     # connections that must be established). If we ever rework the introducer
89     # code to such an extent that we're not sure if it works anymore, we can
90     # reinstate this test until it does.
91     del test_connections
92
93     def test_upload_and_download_random_key(self):
94         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
95         return self._test_upload_and_download(convergence=None)
96     test_upload_and_download_random_key.timeout = 4800
97
98     def test_upload_and_download_convergent(self):
99         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
100         return self._test_upload_and_download(convergence="some convergence string")
101     test_upload_and_download_convergent.timeout = 4800
102
103     def _test_upload_and_download(self, convergence):
104         # we use 4000 bytes of data, which will result in about 400k written
105         # to disk among all our simulated nodes
106         DATA = "Some data to upload\n" * 200
107         d = self.set_up_nodes()
108         def _check_connections(res):
109             for c in self.clients:
110                 all_peerids = list(c.get_all_peerids())
111                 self.failUnlessEqual(len(all_peerids), self.numclients)
112                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
113                 self.failUnlessEqual(len(permuted_peers), self.numclients)
114         d.addCallback(_check_connections)
115
116         def _do_upload(res):
117             log.msg("UPLOADING")
118             u = self.clients[0].getServiceNamed("uploader")
119             self.uploader = u
120             # we crank the max segsize down to 1024b for the duration of this
121             # test, so we can exercise multiple segments. It is important
122             # that this is not a multiple of the segment size, so that the
123             # tail segment is not the same length as the others. This actualy
124             # gets rounded up to 1025 to be a multiple of the number of
125             # required shares (since we use 25 out of 100 FEC).
126             up = upload.Data(DATA, convergence=convergence)
127             up.max_segment_size = 1024
128             d1 = u.upload(up)
129             return d1
130         d.addCallback(_do_upload)
131         def _upload_done(results):
132             uri = results.uri
133             log.msg("upload finished: uri is %s" % (uri,))
134             self.uri = uri
135             dl = self.clients[1].getServiceNamed("downloader")
136             self.downloader = dl
137         d.addCallback(_upload_done)
138
139         def _upload_again(res):
140             # Upload again. If using convergent encryption then this ought to be
141             # short-circuited, however with the way we currently generate URIs
142             # (i.e. because they include the roothash), we have to do all of the
143             # encoding work, and only get to save on the upload part.
144             log.msg("UPLOADING AGAIN")
145             up = upload.Data(DATA, convergence=convergence)
146             up.max_segment_size = 1024
147             d1 = self.uploader.upload(up)
148         d.addCallback(_upload_again)
149
150         def _download_to_data(res):
151             log.msg("DOWNLOADING")
152             return self.downloader.download_to_data(self.uri)
153         d.addCallback(_download_to_data)
154         def _download_to_data_done(data):
155             log.msg("download finished")
156             self.failUnlessEqual(data, DATA)
157         d.addCallback(_download_to_data_done)
158
159         target_filename = os.path.join(self.basedir, "download.target")
160         def _download_to_filename(res):
161             return self.downloader.download_to_filename(self.uri,
162                                                         target_filename)
163         d.addCallback(_download_to_filename)
164         def _download_to_filename_done(res):
165             newdata = open(target_filename, "rb").read()
166             self.failUnlessEqual(newdata, DATA)
167         d.addCallback(_download_to_filename_done)
168
169         target_filename2 = os.path.join(self.basedir, "download.target2")
170         def _download_to_filehandle(res):
171             fh = open(target_filename2, "wb")
172             return self.downloader.download_to_filehandle(self.uri, fh)
173         d.addCallback(_download_to_filehandle)
174         def _download_to_filehandle_done(fh):
175             fh.close()
176             newdata = open(target_filename2, "rb").read()
177             self.failUnlessEqual(newdata, DATA)
178         d.addCallback(_download_to_filehandle_done)
179
180         consumer = GrabEverythingConsumer()
181         ct = download.ConsumerAdapter(consumer)
182         d.addCallback(lambda res:
183                       self.downloader.download(self.uri, ct))
184         def _download_to_consumer_done(ign):
185             self.failUnlessEqual(consumer.contents, DATA)
186         d.addCallback(_download_to_consumer_done)
187
188         def _download_nonexistent_uri(res):
189             baduri = self.mangle_uri(self.uri)
190             log.msg("about to download non-existent URI", level=log.UNUSUAL,
191                     facility="tahoe.tests")
192             d1 = self.downloader.download_to_data(baduri)
193             def _baduri_should_fail(res):
194                 log.msg("finished downloading non-existend URI",
195                         level=log.UNUSUAL, facility="tahoe.tests")
196                 self.failUnless(isinstance(res, Failure))
197                 self.failUnless(res.check(download.NotEnoughSharesError),
198                                 "expected NotEnoughSharesError, got %s" % res)
199                 # TODO: files that have zero peers should get a special kind
200                 # of NotEnoughSharesError, which can be used to suggest that
201                 # the URI might be wrong or that they've never uploaded the
202                 # file in the first place.
203             d1.addBoth(_baduri_should_fail)
204             return d1
205         d.addCallback(_download_nonexistent_uri)
206
207         # add a new node, which doesn't accept shares, and only uses the
208         # helper for upload.
209         d.addCallback(lambda res: self.add_extra_node(self.numclients,
210                                                       self.helper_furl,
211                                                       add_to_sparent=True))
212         def _added(extra_node):
213             self.extra_node = extra_node
214             extra_node.getServiceNamed("storage").sizelimit = 0
215         d.addCallback(_added)
216
217         HELPER_DATA = "Data that needs help to upload" * 1000
218         def _upload_with_helper(res):
219             u = upload.Data(HELPER_DATA, convergence=convergence)
220             d = self.extra_node.upload(u)
221             def _uploaded(results):
222                 uri = results.uri
223                 return self.downloader.download_to_data(uri)
224             d.addCallback(_uploaded)
225             def _check(newdata):
226                 self.failUnlessEqual(newdata, HELPER_DATA)
227             d.addCallback(_check)
228             return d
229         d.addCallback(_upload_with_helper)
230
231         def _upload_duplicate_with_helper(res):
232             u = upload.Data(HELPER_DATA, convergence=convergence)
233             u.debug_stash_RemoteEncryptedUploadable = True
234             d = self.extra_node.upload(u)
235             def _uploaded(results):
236                 uri = results.uri
237                 return self.downloader.download_to_data(uri)
238             d.addCallback(_uploaded)
239             def _check(newdata):
240                 self.failUnlessEqual(newdata, HELPER_DATA)
241                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
242                             "uploadable started uploading, should have been avoided")
243             d.addCallback(_check)
244             return d
245         if convergence is not None:
246             d.addCallback(_upload_duplicate_with_helper)
247
248         def _upload_resumable(res):
249             DATA = "Data that needs help to upload and gets interrupted" * 1000
250             u1 = CountingDataUploadable(DATA, convergence=convergence)
251             u2 = CountingDataUploadable(DATA, convergence=convergence)
252
253             # we interrupt the connection after about 5kB by shutting down
254             # the helper, then restartingit.
255             u1.interrupt_after = 5000
256             u1.interrupt_after_d = defer.Deferred()
257             u1.interrupt_after_d.addCallback(lambda res:
258                                              self.bounce_client(0))
259
260             # sneak into the helper and reduce its chunk size, so that our
261             # debug_interrupt will sever the connection on about the fifth
262             # chunk fetched. This makes sure that we've started to write the
263             # new shares before we abandon them, which exercises the
264             # abort/delete-partial-share code. TODO: find a cleaner way to do
265             # this. I know that this will affect later uses of the helper in
266             # this same test run, but I'm not currently worried about it.
267             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
268
269             d = self.extra_node.upload(u1)
270
271             def _should_not_finish(res):
272                 self.fail("interrupted upload should have failed, not finished"
273                           " with result %s" % (res,))
274             def _interrupted(f):
275                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
276
277                 # make sure we actually interrupted it before finishing the
278                 # file
279                 self.failUnless(u1.bytes_read < len(DATA),
280                                 "read %d out of %d total" % (u1.bytes_read,
281                                                              len(DATA)))
282
283                 log.msg("waiting for reconnect", level=log.NOISY,
284                         facility="tahoe.test.test_system")
285                 # now, we need to give the nodes a chance to notice that this
286                 # connection has gone away. When this happens, the storage
287                 # servers will be told to abort their uploads, removing the
288                 # partial shares. Unfortunately this involves TCP messages
289                 # going through the loopback interface, and we can't easily
290                 # predict how long that will take. If it were all local, we
291                 # could use fireEventually() to stall. Since we don't have
292                 # the right introduction hooks, the best we can do is use a
293                 # fixed delay. TODO: this is fragile.
294                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
295                 return u1.interrupt_after_d
296             d.addCallbacks(_should_not_finish, _interrupted)
297
298             def _disconnected(res):
299                 # check to make sure the storage servers aren't still hanging
300                 # on to the partial share: their incoming/ directories should
301                 # now be empty.
302                 log.msg("disconnected", level=log.NOISY,
303                         facility="tahoe.test.test_system")
304                 for i in range(self.numclients):
305                     incdir = os.path.join(self.getdir("client%d" % i),
306                                           "storage", "shares", "incoming")
307                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
308             d.addCallback(_disconnected)
309
310             # then we need to give the reconnector a chance to
311             # reestablish the connection to the helper.
312             d.addCallback(lambda res:
313                           log.msg("wait_for_connections", level=log.NOISY,
314                                   facility="tahoe.test.test_system"))
315             d.addCallback(lambda res: self.wait_for_connections())
316
317
318             d.addCallback(lambda res:
319                           log.msg("uploading again", level=log.NOISY,
320                                   facility="tahoe.test.test_system"))
321             d.addCallback(lambda res: self.extra_node.upload(u2))
322
323             def _uploaded(results):
324                 uri = results.uri
325                 log.msg("Second upload complete", level=log.NOISY,
326                         facility="tahoe.test.test_system")
327
328                 # this is really bytes received rather than sent, but it's
329                 # convenient and basically measures the same thing
330                 bytes_sent = results.ciphertext_fetched
331
332                 # We currently don't support resumption of upload if the data is
333                 # encrypted with a random key.  (Because that would require us
334                 # to store the key locally and re-use it on the next upload of
335                 # this file, which isn't a bad thing to do, but we currently
336                 # don't do it.)
337                 if convergence is not None:
338                     # Make sure we did not have to read the whole file the
339                     # second time around .
340                     self.failUnless(bytes_sent < len(DATA),
341                                 "resumption didn't save us any work:"
342                                 " read %d bytes out of %d total" %
343                                 (bytes_sent, len(DATA)))
344                 else:
345                     # Make sure we did have to read the whole file the second
346                     # time around -- because the one that we partially uploaded
347                     # earlier was encrypted with a different random key.
348                     self.failIf(bytes_sent < len(DATA),
349                                 "resumption saved us some work even though we were using random keys:"
350                                 " read %d bytes out of %d total" %
351                                 (bytes_sent, len(DATA)))
352                 return self.downloader.download_to_data(uri)
353             d.addCallback(_uploaded)
354
355             def _check(newdata):
356                 self.failUnlessEqual(newdata, DATA)
357                 # If using convergent encryption, then also check that the
358                 # helper has removed the temp file from its directories.
359                 if convergence is not None:
360                     basedir = os.path.join(self.getdir("client0"), "helper")
361                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
362                     self.failUnlessEqual(files, [])
363                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
364                     self.failUnlessEqual(files, [])
365             d.addCallback(_check)
366             return d
367         d.addCallback(_upload_resumable)
368
369         return d
370
371     def _find_shares(self, basedir):
372         shares = []
373         for (dirpath, dirnames, filenames) in os.walk(basedir):
374             if "storage" not in dirpath:
375                 continue
376             if not filenames:
377                 continue
378             pieces = dirpath.split(os.sep)
379             if pieces[-4] == "storage" and pieces[-3] == "shares":
380                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
381                 # are sharefiles here
382                 assert pieces[-5].startswith("client")
383                 client_num = int(pieces[-5][-1])
384                 storage_index_s = pieces[-1]
385                 storage_index = storage.si_a2b(storage_index_s)
386                 for sharename in filenames:
387                     shnum = int(sharename)
388                     filename = os.path.join(dirpath, sharename)
389                     data = (client_num, storage_index, filename, shnum)
390                     shares.append(data)
391         if not shares:
392             self.fail("unable to find any share files in %s" % basedir)
393         return shares
394
395     def _corrupt_mutable_share(self, filename, which):
396         msf = storage.MutableShareFile(filename)
397         datav = msf.readv([ (0, 1000000) ])
398         final_share = datav[0]
399         assert len(final_share) < 1000000 # ought to be truncated
400         pieces = mutable_layout.unpack_share(final_share)
401         (seqnum, root_hash, IV, k, N, segsize, datalen,
402          verification_key, signature, share_hash_chain, block_hash_tree,
403          share_data, enc_privkey) = pieces
404
405         if which == "seqnum":
406             seqnum = seqnum + 15
407         elif which == "R":
408             root_hash = self.flip_bit(root_hash)
409         elif which == "IV":
410             IV = self.flip_bit(IV)
411         elif which == "segsize":
412             segsize = segsize + 15
413         elif which == "pubkey":
414             verification_key = self.flip_bit(verification_key)
415         elif which == "signature":
416             signature = self.flip_bit(signature)
417         elif which == "share_hash_chain":
418             nodenum = share_hash_chain.keys()[0]
419             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
420         elif which == "block_hash_tree":
421             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
422         elif which == "share_data":
423             share_data = self.flip_bit(share_data)
424         elif which == "encprivkey":
425             enc_privkey = self.flip_bit(enc_privkey)
426
427         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
428                                             segsize, datalen)
429         final_share = mutable_layout.pack_share(prefix,
430                                                 verification_key,
431                                                 signature,
432                                                 share_hash_chain,
433                                                 block_hash_tree,
434                                                 share_data,
435                                                 enc_privkey)
436         msf.writev( [(0, final_share)], None)
437
438
439     def test_mutable(self):
440         self.basedir = "system/SystemTest/test_mutable"
441         DATA = "initial contents go here."  # 25 bytes % 3 != 0
442         NEWDATA = "new contents yay"
443         NEWERDATA = "this is getting old"
444
445         d = self.set_up_nodes(use_key_generator=True)
446
447         def _create_mutable(res):
448             c = self.clients[0]
449             log.msg("starting create_mutable_file")
450             d1 = c.create_mutable_file(DATA)
451             def _done(res):
452                 log.msg("DONE: %s" % (res,))
453                 self._mutable_node_1 = res
454                 uri = res.get_uri()
455             d1.addCallback(_done)
456             return d1
457         d.addCallback(_create_mutable)
458
459         def _test_debug(res):
460             # find a share. It is important to run this while there is only
461             # one slot in the grid.
462             shares = self._find_shares(self.basedir)
463             (client_num, storage_index, filename, shnum) = shares[0]
464             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
465                     % filename)
466             log.msg(" for clients[%d]" % client_num)
467
468             out,err = StringIO(), StringIO()
469             rc = runner.runner(["debug", "dump-share", "--offsets",
470                                 filename],
471                                stdout=out, stderr=err)
472             output = out.getvalue()
473             self.failUnlessEqual(rc, 0)
474             try:
475                 self.failUnless("Mutable slot found:\n" in output)
476                 self.failUnless("share_type: SDMF\n" in output)
477                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
478                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
479                 self.failUnless(" num_extra_leases: 0\n" in output)
480                 # the pubkey size can vary by a byte, so the container might
481                 # be a bit larger on some runs.
482                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
483                 self.failUnless(m)
484                 container_size = int(m.group(1))
485                 self.failUnless(2037 <= container_size <= 2049, container_size)
486                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
487                 self.failUnless(m)
488                 data_length = int(m.group(1))
489                 self.failUnless(2037 <= data_length <= 2049, data_length)
490                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
491                                 in output)
492                 self.failUnless(" SDMF contents:\n" in output)
493                 self.failUnless("  seqnum: 1\n" in output)
494                 self.failUnless("  required_shares: 3\n" in output)
495                 self.failUnless("  total_shares: 10\n" in output)
496                 self.failUnless("  segsize: 27\n" in output, (output, filename))
497                 self.failUnless("  datalen: 25\n" in output)
498                 # the exact share_hash_chain nodes depends upon the sharenum,
499                 # and is more of a hassle to compute than I want to deal with
500                 # now
501                 self.failUnless("  share_hash_chain: " in output)
502                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
503                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
504                             base32.b2a(storage_index))
505                 self.failUnless(expected in output)
506             except unittest.FailTest:
507                 print
508                 print "dump-share output was:"
509                 print output
510                 raise
511         d.addCallback(_test_debug)
512
513         # test retrieval
514
515         # first, let's see if we can use the existing node to retrieve the
516         # contents. This allows it to use the cached pubkey and maybe the
517         # latest-known sharemap.
518
519         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
520         def _check_download_1(res):
521             self.failUnlessEqual(res, DATA)
522             # now we see if we can retrieve the data from a new node,
523             # constructed using the URI of the original one. We do this test
524             # on the same client that uploaded the data.
525             uri = self._mutable_node_1.get_uri()
526             log.msg("starting retrieve1")
527             newnode = self.clients[0].create_node_from_uri(uri)
528             newnode_2 = self.clients[0].create_node_from_uri(uri)
529             self.failUnlessIdentical(newnode, newnode_2)
530             return newnode.download_best_version()
531         d.addCallback(_check_download_1)
532
533         def _check_download_2(res):
534             self.failUnlessEqual(res, DATA)
535             # same thing, but with a different client
536             uri = self._mutable_node_1.get_uri()
537             newnode = self.clients[1].create_node_from_uri(uri)
538             log.msg("starting retrieve2")
539             d1 = newnode.download_best_version()
540             d1.addCallback(lambda res: (res, newnode))
541             return d1
542         d.addCallback(_check_download_2)
543
544         def _check_download_3((res, newnode)):
545             self.failUnlessEqual(res, DATA)
546             # replace the data
547             log.msg("starting replace1")
548             d1 = newnode.overwrite(NEWDATA)
549             d1.addCallback(lambda res: newnode.download_best_version())
550             return d1
551         d.addCallback(_check_download_3)
552
553         def _check_download_4(res):
554             self.failUnlessEqual(res, NEWDATA)
555             # now create an even newer node and replace the data on it. This
556             # new node has never been used for download before.
557             uri = self._mutable_node_1.get_uri()
558             newnode1 = self.clients[2].create_node_from_uri(uri)
559             newnode2 = self.clients[3].create_node_from_uri(uri)
560             self._newnode3 = self.clients[3].create_node_from_uri(uri)
561             log.msg("starting replace2")
562             d1 = newnode1.overwrite(NEWERDATA)
563             d1.addCallback(lambda res: newnode2.download_best_version())
564             return d1
565         d.addCallback(_check_download_4)
566
567         def _check_download_5(res):
568             log.msg("finished replace2")
569             self.failUnlessEqual(res, NEWERDATA)
570         d.addCallback(_check_download_5)
571
572         def _corrupt_shares(res):
573             # run around and flip bits in all but k of the shares, to test
574             # the hash checks
575             shares = self._find_shares(self.basedir)
576             ## sort by share number
577             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
578             where = dict([ (shnum, filename)
579                            for (client_num, storage_index, filename, shnum)
580                            in shares ])
581             assert len(where) == 10 # this test is designed for 3-of-10
582             for shnum, filename in where.items():
583                 # shares 7,8,9 are left alone. read will check
584                 # (share_hash_chain, block_hash_tree, share_data). New
585                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
586                 # segsize, signature).
587                 if shnum == 0:
588                     # read: this will trigger "pubkey doesn't match
589                     # fingerprint".
590                     self._corrupt_mutable_share(filename, "pubkey")
591                     self._corrupt_mutable_share(filename, "encprivkey")
592                 elif shnum == 1:
593                     # triggers "signature is invalid"
594                     self._corrupt_mutable_share(filename, "seqnum")
595                 elif shnum == 2:
596                     # triggers "signature is invalid"
597                     self._corrupt_mutable_share(filename, "R")
598                 elif shnum == 3:
599                     # triggers "signature is invalid"
600                     self._corrupt_mutable_share(filename, "segsize")
601                 elif shnum == 4:
602                     self._corrupt_mutable_share(filename, "share_hash_chain")
603                 elif shnum == 5:
604                     self._corrupt_mutable_share(filename, "block_hash_tree")
605                 elif shnum == 6:
606                     self._corrupt_mutable_share(filename, "share_data")
607                 # other things to correct: IV, signature
608                 # 7,8,9 are left alone
609
610                 # note that initial_query_count=5 means that we'll hit the
611                 # first 5 servers in effectively random order (based upon
612                 # response time), so we won't necessarily ever get a "pubkey
613                 # doesn't match fingerprint" error (if we hit shnum>=1 before
614                 # shnum=0, we pull the pubkey from there). To get repeatable
615                 # specific failures, we need to set initial_query_count=1,
616                 # but of course that will change the sequencing behavior of
617                 # the retrieval process. TODO: find a reasonable way to make
618                 # this a parameter, probably when we expand this test to test
619                 # for one failure mode at a time.
620
621                 # when we retrieve this, we should get three signature
622                 # failures (where we've mangled seqnum, R, and segsize). The
623                 # pubkey mangling
624         d.addCallback(_corrupt_shares)
625
626         d.addCallback(lambda res: self._newnode3.download_best_version())
627         d.addCallback(_check_download_5)
628
629         def _check_empty_file(res):
630             # make sure we can create empty files, this usually screws up the
631             # segsize math
632             d1 = self.clients[2].create_mutable_file("")
633             d1.addCallback(lambda newnode: newnode.download_best_version())
634             d1.addCallback(lambda res: self.failUnlessEqual("", res))
635             return d1
636         d.addCallback(_check_empty_file)
637
638         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
639         def _created_dirnode(dnode):
640             log.msg("_created_dirnode(%s)" % (dnode,))
641             d1 = dnode.list()
642             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
643             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
644             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
645             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
646             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
647             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
648             d1.addCallback(lambda res: dnode.build_manifest().when_done())
649             d1.addCallback(lambda manifest:
650                            self.failUnlessEqual(len(manifest), 1))
651             return d1
652         d.addCallback(_created_dirnode)
653
654         def wait_for_c3_kg_conn():
655             return self.clients[3]._key_generator is not None
656         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
657
658         def check_kg_poolsize(junk, size_delta):
659             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
660                                  self.key_generator_svc.key_generator.pool_size + size_delta)
661
662         d.addCallback(check_kg_poolsize, 0)
663         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
664         d.addCallback(check_kg_poolsize, -1)
665         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
666         d.addCallback(check_kg_poolsize, -2)
667         # use_helper induces use of clients[3], which is the using-key_gen client
668         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
669         d.addCallback(check_kg_poolsize, -3)
670
671         return d
672     # The default 120 second timeout went off when running it under valgrind
673     # on my old Windows laptop, so I'm bumping up the timeout.
674     test_mutable.timeout = 240
675
676     def flip_bit(self, good):
677         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
678
679     def mangle_uri(self, gooduri):
680         # change the key, which changes the storage index, which means we'll
681         # be asking about the wrong file, so nobody will have any shares
682         u = IFileURI(gooduri)
683         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
684                             uri_extension_hash=u.uri_extension_hash,
685                             needed_shares=u.needed_shares,
686                             total_shares=u.total_shares,
687                             size=u.size)
688         return u2.to_string()
689
690     # TODO: add a test which mangles the uri_extension_hash instead, and
691     # should fail due to not being able to get a valid uri_extension block.
692     # Also a test which sneakily mangles the uri_extension block to change
693     # some of the validation data, so it will fail in the post-download phase
694     # when the file's crypttext integrity check fails. Do the same thing for
695     # the key, which should cause the download to fail the post-download
696     # plaintext_hash check.
697
698     def test_vdrive(self):
699         self.basedir = "system/SystemTest/test_vdrive"
700         self.data = LARGE_DATA
701         d = self.set_up_nodes(use_stats_gatherer=True)
702         d.addCallback(self._test_introweb)
703         d.addCallback(self.log, "starting publish")
704         d.addCallback(self._do_publish1)
705         d.addCallback(self._test_runner)
706         d.addCallback(self._do_publish2)
707         # at this point, we have the following filesystem (where "R" denotes
708         # self._root_directory_uri):
709         # R
710         # R/subdir1
711         # R/subdir1/mydata567
712         # R/subdir1/subdir2/
713         # R/subdir1/subdir2/mydata992
714
715         d.addCallback(lambda res: self.bounce_client(0))
716         d.addCallback(self.log, "bounced client0")
717
718         d.addCallback(self._check_publish1)
719         d.addCallback(self.log, "did _check_publish1")
720         d.addCallback(self._check_publish2)
721         d.addCallback(self.log, "did _check_publish2")
722         d.addCallback(self._do_publish_private)
723         d.addCallback(self.log, "did _do_publish_private")
724         # now we also have (where "P" denotes a new dir):
725         #  P/personal/sekrit data
726         #  P/s2-rw -> /subdir1/subdir2/
727         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
728         d.addCallback(self._check_publish_private)
729         d.addCallback(self.log, "did _check_publish_private")
730         d.addCallback(self._test_web)
731         d.addCallback(self._test_control)
732         d.addCallback(self._test_cli)
733         # P now has four top-level children:
734         # P/personal/sekrit data
735         # P/s2-ro/
736         # P/s2-rw/
737         # P/test_put/  (empty)
738         d.addCallback(self._test_checker)
739         d.addCallback(self._grab_stats)
740         return d
741     test_vdrive.timeout = 1100
742
743     def _test_introweb(self, res):
744         d = getPage(self.introweb_url, method="GET", followRedirect=True)
745         def _check(res):
746             try:
747                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
748                                 in res)
749                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
750                 self.failUnless("Subscription Summary: storage: 5" in res)
751             except unittest.FailTest:
752                 print
753                 print "GET %s output was:" % self.introweb_url
754                 print res
755                 raise
756         d.addCallback(_check)
757         d.addCallback(lambda res:
758                       getPage(self.introweb_url + "?t=json",
759                               method="GET", followRedirect=True))
760         def _check_json(res):
761             data = simplejson.loads(res)
762             try:
763                 self.failUnlessEqual(data["subscription_summary"],
764                                      {"storage": 5})
765                 self.failUnlessEqual(data["announcement_summary"],
766                                      {"storage": 5, "stub_client": 5})
767             except unittest.FailTest:
768                 print
769                 print "GET %s?t=json output was:" % self.introweb_url
770                 print res
771                 raise
772         d.addCallback(_check_json)
773         return d
774
775     def _do_publish1(self, res):
776         ut = upload.Data(self.data, convergence=None)
777         c0 = self.clients[0]
778         d = c0.create_empty_dirnode()
779         def _made_root(new_dirnode):
780             self._root_directory_uri = new_dirnode.get_uri()
781             return c0.create_node_from_uri(self._root_directory_uri)
782         d.addCallback(_made_root)
783         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
784         def _made_subdir1(subdir1_node):
785             self._subdir1_node = subdir1_node
786             d1 = subdir1_node.add_file(u"mydata567", ut)
787             d1.addCallback(self.log, "publish finished")
788             def _stash_uri(filenode):
789                 self.uri = filenode.get_uri()
790             d1.addCallback(_stash_uri)
791             return d1
792         d.addCallback(_made_subdir1)
793         return d
794
795     def _do_publish2(self, res):
796         ut = upload.Data(self.data, convergence=None)
797         d = self._subdir1_node.create_empty_directory(u"subdir2")
798         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
799         return d
800
801     def log(self, res, *args, **kwargs):
802         # print "MSG: %s  RES: %s" % (msg, args)
803         log.msg(*args, **kwargs)
804         return res
805
806     def _do_publish_private(self, res):
807         self.smalldata = "sssh, very secret stuff"
808         ut = upload.Data(self.smalldata, convergence=None)
809         d = self.clients[0].create_empty_dirnode()
810         d.addCallback(self.log, "GOT private directory")
811         def _got_new_dir(privnode):
812             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
813             d1 = privnode.create_empty_directory(u"personal")
814             d1.addCallback(self.log, "made P/personal")
815             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
816             d1.addCallback(self.log, "made P/personal/sekrit data")
817             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
818             def _got_s2(s2node):
819                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
820                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
821                 return d2
822             d1.addCallback(_got_s2)
823             d1.addCallback(lambda res: privnode)
824             return d1
825         d.addCallback(_got_new_dir)
826         return d
827
828     def _check_publish1(self, res):
829         # this one uses the iterative API
830         c1 = self.clients[1]
831         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
832         d.addCallback(self.log, "check_publish1 got /")
833         d.addCallback(lambda root: root.get(u"subdir1"))
834         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
835         d.addCallback(lambda filenode: filenode.download_to_data())
836         d.addCallback(self.log, "get finished")
837         def _get_done(data):
838             self.failUnlessEqual(data, self.data)
839         d.addCallback(_get_done)
840         return d
841
842     def _check_publish2(self, res):
843         # this one uses the path-based API
844         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
845         d = rootnode.get_child_at_path(u"subdir1")
846         d.addCallback(lambda dirnode:
847                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
848         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
849         d.addCallback(lambda filenode: filenode.download_to_data())
850         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
851
852         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
853         def _got_filenode(filenode):
854             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
855             assert fnode == filenode
856         d.addCallback(_got_filenode)
857         return d
858
859     def _check_publish_private(self, resnode):
860         # this one uses the path-based API
861         self._private_node = resnode
862
863         d = self._private_node.get_child_at_path(u"personal")
864         def _got_personal(personal):
865             self._personal_node = personal
866             return personal
867         d.addCallback(_got_personal)
868
869         d.addCallback(lambda dirnode:
870                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
871         def get_path(path):
872             return self._private_node.get_child_at_path(path)
873
874         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
875         d.addCallback(lambda filenode: filenode.download_to_data())
876         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
877         d.addCallback(lambda res: get_path(u"s2-rw"))
878         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
879         d.addCallback(lambda res: get_path(u"s2-ro"))
880         def _got_s2ro(dirnode):
881             self.failUnless(dirnode.is_mutable(), dirnode)
882             self.failUnless(dirnode.is_readonly(), dirnode)
883             d1 = defer.succeed(None)
884             d1.addCallback(lambda res: dirnode.list())
885             d1.addCallback(self.log, "dirnode.list")
886
887             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
888
889             d1.addCallback(self.log, "doing add_file(ro)")
890             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
891             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
892
893             d1.addCallback(self.log, "doing get(ro)")
894             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
895             d1.addCallback(lambda filenode:
896                            self.failUnless(IFileNode.providedBy(filenode)))
897
898             d1.addCallback(self.log, "doing delete(ro)")
899             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
900
901             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
902
903             d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
904
905             personal = self._personal_node
906             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
907
908             d1.addCallback(self.log, "doing move_child_to(ro)2")
909             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
910
911             d1.addCallback(self.log, "finished with _got_s2ro")
912             return d1
913         d.addCallback(_got_s2ro)
914         def _got_home(dummy):
915             home = self._private_node
916             personal = self._personal_node
917             d1 = defer.succeed(None)
918             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
919             d1.addCallback(lambda res:
920                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
921
922             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
923             d1.addCallback(lambda res:
924                            home.move_child_to(u"sekrit", home, u"sekrit data"))
925
926             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
927             d1.addCallback(lambda res:
928                            home.move_child_to(u"sekrit data", personal))
929
930             d1.addCallback(lambda res: home.build_manifest().when_done())
931             d1.addCallback(self.log, "manifest")
932             #  five items:
933             # P/
934             # P/personal/
935             # P/personal/sekrit data
936             # P/s2-rw  (same as P/s2-ro)
937             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
938             d1.addCallback(lambda manifest:
939                            self.failUnlessEqual(len(manifest), 5))
940             d1.addCallback(lambda res: home.start_deep_stats().when_done())
941             def _check_stats(stats):
942                 expected = {"count-immutable-files": 1,
943                             "count-mutable-files": 0,
944                             "count-literal-files": 1,
945                             "count-files": 2,
946                             "count-directories": 3,
947                             "size-immutable-files": 112,
948                             "size-literal-files": 23,
949                             #"size-directories": 616, # varies
950                             #"largest-directory": 616,
951                             "largest-directory-children": 3,
952                             "largest-immutable-file": 112,
953                             }
954                 for k,v in expected.iteritems():
955                     self.failUnlessEqual(stats[k], v,
956                                          "stats[%s] was %s, not %s" %
957                                          (k, stats[k], v))
958                 self.failUnless(stats["size-directories"] > 1300,
959                                 stats["size-directories"])
960                 self.failUnless(stats["largest-directory"] > 800,
961                                 stats["largest-directory"])
962                 self.failUnlessEqual(stats["size-files-histogram"],
963                                      [ (11, 31, 1), (101, 316, 1) ])
964             d1.addCallback(_check_stats)
965             return d1
966         d.addCallback(_got_home)
967         return d
968
969     def shouldFail(self, res, expected_failure, which, substring=None):
970         if isinstance(res, Failure):
971             res.trap(expected_failure)
972             if substring:
973                 self.failUnless(substring in str(res),
974                                 "substring '%s' not in '%s'"
975                                 % (substring, str(res)))
976         else:
977             self.fail("%s was supposed to raise %s, not get '%s'" %
978                       (which, expected_failure, res))
979
980     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
981         assert substring is None or isinstance(substring, str)
982         d = defer.maybeDeferred(callable, *args, **kwargs)
983         def done(res):
984             if isinstance(res, Failure):
985                 res.trap(expected_failure)
986                 if substring:
987                     self.failUnless(substring in str(res),
988                                     "substring '%s' not in '%s'"
989                                     % (substring, str(res)))
990             else:
991                 self.fail("%s was supposed to raise %s, not get '%s'" %
992                           (which, expected_failure, res))
993         d.addBoth(done)
994         return d
995
996     def PUT(self, urlpath, data):
997         url = self.webish_url + urlpath
998         return getPage(url, method="PUT", postdata=data)
999
1000     def GET(self, urlpath, followRedirect=False):
1001         url = self.webish_url + urlpath
1002         return getPage(url, method="GET", followRedirect=followRedirect)
1003
1004     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1005         if use_helper:
1006             url = self.helper_webish_url + urlpath
1007         else:
1008             url = self.webish_url + urlpath
1009         sepbase = "boogabooga"
1010         sep = "--" + sepbase
1011         form = []
1012         form.append(sep)
1013         form.append('Content-Disposition: form-data; name="_charset"')
1014         form.append('')
1015         form.append('UTF-8')
1016         form.append(sep)
1017         for name, value in fields.iteritems():
1018             if isinstance(value, tuple):
1019                 filename, value = value
1020                 form.append('Content-Disposition: form-data; name="%s"; '
1021                             'filename="%s"' % (name, filename.encode("utf-8")))
1022             else:
1023                 form.append('Content-Disposition: form-data; name="%s"' % name)
1024             form.append('')
1025             form.append(str(value))
1026             form.append(sep)
1027         form[-1] += "--"
1028         body = "\r\n".join(form) + "\r\n"
1029         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1030                    }
1031         return getPage(url, method="POST", postdata=body,
1032                        headers=headers, followRedirect=followRedirect)
1033
1034     def _test_web(self, res):
1035         base = self.webish_url
1036         public = "uri/" + self._root_directory_uri
1037         d = getPage(base)
1038         def _got_welcome(page):
1039             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1040             self.failUnless(expected in page,
1041                             "I didn't see the right 'connected storage servers'"
1042                             " message in: %s" % page
1043                             )
1044             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1045             self.failUnless(expected in page,
1046                             "I didn't see the right 'My nodeid' message "
1047                             "in: %s" % page)
1048             self.failUnless("Helper: 0 active uploads" in page)
1049         d.addCallback(_got_welcome)
1050         d.addCallback(self.log, "done with _got_welcome")
1051
1052         # get the welcome page from the node that uses the helper too
1053         d.addCallback(lambda res: getPage(self.helper_webish_url))
1054         def _got_welcome_helper(page):
1055             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1056                             page)
1057             self.failUnless("Not running helper" in page)
1058         d.addCallback(_got_welcome_helper)
1059
1060         d.addCallback(lambda res: getPage(base + public))
1061         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1062         def _got_subdir1(page):
1063             # there ought to be an href for our file
1064             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1065             self.failUnless(">mydata567</a>" in page)
1066         d.addCallback(_got_subdir1)
1067         d.addCallback(self.log, "done with _got_subdir1")
1068         d.addCallback(lambda res:
1069                       getPage(base + public + "/subdir1/mydata567"))
1070         def _got_data(page):
1071             self.failUnlessEqual(page, self.data)
1072         d.addCallback(_got_data)
1073
1074         # download from a URI embedded in a URL
1075         d.addCallback(self.log, "_get_from_uri")
1076         def _get_from_uri(res):
1077             return getPage(base + "uri/%s?filename=%s"
1078                            % (self.uri, "mydata567"))
1079         d.addCallback(_get_from_uri)
1080         def _got_from_uri(page):
1081             self.failUnlessEqual(page, self.data)
1082         d.addCallback(_got_from_uri)
1083
1084         # download from a URI embedded in a URL, second form
1085         d.addCallback(self.log, "_get_from_uri2")
1086         def _get_from_uri2(res):
1087             return getPage(base + "uri?uri=%s" % (self.uri,))
1088         d.addCallback(_get_from_uri2)
1089         d.addCallback(_got_from_uri)
1090
1091         # download from a bogus URI, make sure we get a reasonable error
1092         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1093         def _get_from_bogus_uri(res):
1094             d1 = getPage(base + "uri/%s?filename=%s"
1095                          % (self.mangle_uri(self.uri), "mydata567"))
1096             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1097                        "410")
1098             return d1
1099         d.addCallback(_get_from_bogus_uri)
1100         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1101
1102         # upload a file with PUT
1103         d.addCallback(self.log, "about to try PUT")
1104         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1105                                            "new.txt contents"))
1106         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1107         d.addCallback(self.failUnlessEqual, "new.txt contents")
1108         # and again with something large enough to use multiple segments,
1109         # and hopefully trigger pauseProducing too
1110         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1111                                            "big" * 500000)) # 1.5MB
1112         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1113         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1114
1115         # can we replace files in place?
1116         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1117                                            "NEWER contents"))
1118         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1119         d.addCallback(self.failUnlessEqual, "NEWER contents")
1120
1121         # test unlinked POST
1122         d.addCallback(lambda res: self.POST("uri", t="upload",
1123                                             file=("new.txt", "data" * 10000)))
1124         # and again using the helper, which exercises different upload-status
1125         # display code
1126         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1127                                             file=("foo.txt", "data2" * 10000)))
1128
1129         # check that the status page exists
1130         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1131         def _got_status(res):
1132             # find an interesting upload and download to look at. LIT files
1133             # are not interesting.
1134             for ds in self.clients[0].list_all_download_statuses():
1135                 if ds.get_size() > 200:
1136                     self._down_status = ds.get_counter()
1137             for us in self.clients[0].list_all_upload_statuses():
1138                 if us.get_size() > 200:
1139                     self._up_status = us.get_counter()
1140             rs = self.clients[0].list_all_retrieve_statuses()[0]
1141             self._retrieve_status = rs.get_counter()
1142             ps = self.clients[0].list_all_publish_statuses()[0]
1143             self._publish_status = ps.get_counter()
1144             us = self.clients[0].list_all_mapupdate_statuses()[0]
1145             self._update_status = us.get_counter()
1146
1147             # and that there are some upload- and download- status pages
1148             return self.GET("status/up-%d" % self._up_status)
1149         d.addCallback(_got_status)
1150         def _got_up(res):
1151             return self.GET("status/down-%d" % self._down_status)
1152         d.addCallback(_got_up)
1153         def _got_down(res):
1154             return self.GET("status/mapupdate-%d" % self._update_status)
1155         d.addCallback(_got_down)
1156         def _got_update(res):
1157             return self.GET("status/publish-%d" % self._publish_status)
1158         d.addCallback(_got_update)
1159         def _got_publish(res):
1160             return self.GET("status/retrieve-%d" % self._retrieve_status)
1161         d.addCallback(_got_publish)
1162
1163         # check that the helper status page exists
1164         d.addCallback(lambda res:
1165                       self.GET("helper_status", followRedirect=True))
1166         def _got_helper_status(res):
1167             self.failUnless("Bytes Fetched:" in res)
1168             # touch a couple of files in the helper's working directory to
1169             # exercise more code paths
1170             workdir = os.path.join(self.getdir("client0"), "helper")
1171             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1172             f = open(incfile, "wb")
1173             f.write("small file")
1174             f.close()
1175             then = time.time() - 86400*3
1176             now = time.time()
1177             os.utime(incfile, (now, then))
1178             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1179             f = open(encfile, "wb")
1180             f.write("less small file")
1181             f.close()
1182             os.utime(encfile, (now, then))
1183         d.addCallback(_got_helper_status)
1184         # and that the json form exists
1185         d.addCallback(lambda res:
1186                       self.GET("helper_status?t=json", followRedirect=True))
1187         def _got_helper_status_json(res):
1188             data = simplejson.loads(res)
1189             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1190                                  1)
1191             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1192             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1193             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1194                                  10)
1195             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1196             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1197             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1198                                  15)
1199         d.addCallback(_got_helper_status_json)
1200
1201         # and check that client[3] (which uses a helper but does not run one
1202         # itself) doesn't explode when you ask for its status
1203         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1204         def _got_non_helper_status(res):
1205             self.failUnless("Upload and Download Status" in res)
1206         d.addCallback(_got_non_helper_status)
1207
1208         # or for helper status with t=json
1209         d.addCallback(lambda res:
1210                       getPage(self.helper_webish_url + "helper_status?t=json"))
1211         def _got_non_helper_status_json(res):
1212             data = simplejson.loads(res)
1213             self.failUnlessEqual(data, {})
1214         d.addCallback(_got_non_helper_status_json)
1215
1216         # see if the statistics page exists
1217         d.addCallback(lambda res: self.GET("statistics"))
1218         def _got_stats(res):
1219             self.failUnless("Node Statistics" in res)
1220             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1221         d.addCallback(_got_stats)
1222         d.addCallback(lambda res: self.GET("statistics?t=json"))
1223         def _got_stats_json(res):
1224             data = simplejson.loads(res)
1225             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1226             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1227         d.addCallback(_got_stats_json)
1228
1229         # TODO: mangle the second segment of a file, to test errors that
1230         # occur after we've already sent some good data, which uses a
1231         # different error path.
1232
1233         # TODO: download a URI with a form
1234         # TODO: create a directory by using a form
1235         # TODO: upload by using a form on the directory page
1236         #    url = base + "somedir/subdir1/freeform_post!!upload"
1237         # TODO: delete a file by using a button on the directory page
1238
1239         return d
1240
1241     def _test_runner(self, res):
1242         # exercise some of the diagnostic tools in runner.py
1243
1244         # find a share
1245         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1246             if "storage" not in dirpath:
1247                 continue
1248             if not filenames:
1249                 continue
1250             pieces = dirpath.split(os.sep)
1251             if pieces[-4] == "storage" and pieces[-3] == "shares":
1252                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1253                 # are sharefiles here
1254                 filename = os.path.join(dirpath, filenames[0])
1255                 # peek at the magic to see if it is a chk share
1256                 magic = open(filename, "rb").read(4)
1257                 if magic == '\x00\x00\x00\x01':
1258                     break
1259         else:
1260             self.fail("unable to find any uri_extension files in %s"
1261                       % self.basedir)
1262         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1263
1264         out,err = StringIO(), StringIO()
1265         rc = runner.runner(["debug", "dump-share", "--offsets",
1266                             filename],
1267                            stdout=out, stderr=err)
1268         output = out.getvalue()
1269         self.failUnlessEqual(rc, 0)
1270
1271         # we only upload a single file, so we can assert some things about
1272         # its size and shares.
1273         self.failUnless(("share filename: %s" % filename) in output)
1274         self.failUnless("size: %d\n" % len(self.data) in output)
1275         self.failUnless("num_segments: 1\n" in output)
1276         # segment_size is always a multiple of needed_shares
1277         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1278         self.failUnless("total_shares: 10\n" in output)
1279         # keys which are supposed to be present
1280         for key in ("size", "num_segments", "segment_size",
1281                     "needed_shares", "total_shares",
1282                     "codec_name", "codec_params", "tail_codec_params",
1283                     #"plaintext_hash", "plaintext_root_hash",
1284                     "crypttext_hash", "crypttext_root_hash",
1285                     "share_root_hash", "UEB_hash"):
1286             self.failUnless("%s: " % key in output, key)
1287         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1288
1289         # now use its storage index to find the other shares using the
1290         # 'find-shares' tool
1291         sharedir, shnum = os.path.split(filename)
1292         storagedir, storage_index_s = os.path.split(sharedir)
1293         out,err = StringIO(), StringIO()
1294         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1295         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1296         rc = runner.runner(cmd, stdout=out, stderr=err)
1297         self.failUnlessEqual(rc, 0)
1298         out.seek(0)
1299         sharefiles = [sfn.strip() for sfn in out.readlines()]
1300         self.failUnlessEqual(len(sharefiles), 10)
1301
1302         # also exercise the 'catalog-shares' tool
1303         out,err = StringIO(), StringIO()
1304         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1305         cmd = ["debug", "catalog-shares"] + nodedirs
1306         rc = runner.runner(cmd, stdout=out, stderr=err)
1307         self.failUnlessEqual(rc, 0)
1308         out.seek(0)
1309         descriptions = [sfn.strip() for sfn in out.readlines()]
1310         self.failUnlessEqual(len(descriptions), 30)
1311         matching = [line
1312                     for line in descriptions
1313                     if line.startswith("CHK %s " % storage_index_s)]
1314         self.failUnlessEqual(len(matching), 10)
1315
1316     def _test_control(self, res):
1317         # exercise the remote-control-the-client foolscap interfaces in
1318         # allmydata.control (mostly used for performance tests)
1319         c0 = self.clients[0]
1320         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1321         control_furl = open(control_furl_file, "r").read().strip()
1322         # it doesn't really matter which Tub we use to connect to the client,
1323         # so let's just use our IntroducerNode's
1324         d = self.introducer.tub.getReference(control_furl)
1325         d.addCallback(self._test_control2, control_furl_file)
1326         return d
1327     def _test_control2(self, rref, filename):
1328         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1329         downfile = os.path.join(self.basedir, "control.downfile")
1330         d.addCallback(lambda uri:
1331                       rref.callRemote("download_from_uri_to_file",
1332                                       uri, downfile))
1333         def _check(res):
1334             self.failUnlessEqual(res, downfile)
1335             data = open(downfile, "r").read()
1336             expected_data = open(filename, "r").read()
1337             self.failUnlessEqual(data, expected_data)
1338         d.addCallback(_check)
1339         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1340         if sys.platform == "linux2":
1341             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1342         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1343         return d
1344
1345     def _test_cli(self, res):
1346         # run various CLI commands (in a thread, since they use blocking
1347         # network calls)
1348
1349         private_uri = self._private_node.get_uri()
1350         some_uri = self._root_directory_uri
1351         client0_basedir = self.getdir("client0")
1352
1353         nodeargs = [
1354             "--node-directory", client0_basedir,
1355             ]
1356         TESTDATA = "I will not write the same thing over and over.\n" * 100
1357
1358         d = defer.succeed(None)
1359
1360         # for compatibility with earlier versions, private/root_dir.cap is
1361         # supposed to be treated as an alias named "tahoe:". Start by making
1362         # sure that works, before we add other aliases.
1363
1364         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1365         f = open(root_file, "w")
1366         f.write(private_uri)
1367         f.close()
1368
1369         def run(ignored, verb, *args, **kwargs):
1370             stdin = kwargs.get("stdin", "")
1371             newargs = [verb] + nodeargs + list(args)
1372             return self._run_cli(newargs, stdin=stdin)
1373
1374         def _check_ls((out,err), expected_children, unexpected_children=[]):
1375             self.failUnlessEqual(err, "")
1376             for s in expected_children:
1377                 self.failUnless(s in out, (s,out))
1378             for s in unexpected_children:
1379                 self.failIf(s in out, (s,out))
1380
1381         def _check_ls_root((out,err)):
1382             self.failUnless("personal" in out)
1383             self.failUnless("s2-ro" in out)
1384             self.failUnless("s2-rw" in out)
1385             self.failUnlessEqual(err, "")
1386
1387         # this should reference private_uri
1388         d.addCallback(run, "ls")
1389         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1390
1391         d.addCallback(run, "list-aliases")
1392         def _check_aliases_1((out,err)):
1393             self.failUnlessEqual(err, "")
1394             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1395         d.addCallback(_check_aliases_1)
1396
1397         # now that that's out of the way, remove root_dir.cap and work with
1398         # new files
1399         d.addCallback(lambda res: os.unlink(root_file))
1400         d.addCallback(run, "list-aliases")
1401         def _check_aliases_2((out,err)):
1402             self.failUnlessEqual(err, "")
1403             self.failUnlessEqual(out, "")
1404         d.addCallback(_check_aliases_2)
1405
1406         d.addCallback(run, "mkdir")
1407         def _got_dir( (out,err) ):
1408             self.failUnless(uri.from_string_dirnode(out.strip()))
1409             return out.strip()
1410         d.addCallback(_got_dir)
1411         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1412
1413         d.addCallback(run, "list-aliases")
1414         def _check_aliases_3((out,err)):
1415             self.failUnlessEqual(err, "")
1416             self.failUnless("tahoe: " in out)
1417         d.addCallback(_check_aliases_3)
1418
1419         def _check_empty_dir((out,err)):
1420             self.failUnlessEqual(out, "")
1421             self.failUnlessEqual(err, "")
1422         d.addCallback(run, "ls")
1423         d.addCallback(_check_empty_dir)
1424
1425         def _check_missing_dir((out,err)):
1426             # TODO: check that rc==2
1427             self.failUnlessEqual(out, "")
1428             self.failUnlessEqual(err, "No such file or directory\n")
1429         d.addCallback(run, "ls", "bogus")
1430         d.addCallback(_check_missing_dir)
1431
1432         files = []
1433         datas = []
1434         for i in range(10):
1435             fn = os.path.join(self.basedir, "file%d" % i)
1436             files.append(fn)
1437             data = "data to be uploaded: file%d\n" % i
1438             datas.append(data)
1439             open(fn,"wb").write(data)
1440
1441         def _check_stdout_against((out,err), filenum=None, data=None):
1442             self.failUnlessEqual(err, "")
1443             if filenum is not None:
1444                 self.failUnlessEqual(out, datas[filenum])
1445             if data is not None:
1446                 self.failUnlessEqual(out, data)
1447
1448         # test all both forms of put: from a file, and from stdin
1449         #  tahoe put bar FOO
1450         d.addCallback(run, "put", files[0], "tahoe-file0")
1451         def _put_out((out,err)):
1452             self.failUnless("URI:LIT:" in out, out)
1453             self.failUnless("201 Created" in err, err)
1454             uri0 = out.strip()
1455             return run(None, "get", uri0)
1456         d.addCallback(_put_out)
1457         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1458
1459         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1460         #  tahoe put bar tahoe:FOO
1461         d.addCallback(run, "put", files[2], "tahoe:file2")
1462         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1463         def _check_put_mutable((out,err)):
1464             self._mutable_file3_uri = out.strip()
1465         d.addCallback(_check_put_mutable)
1466         d.addCallback(run, "get", "tahoe:file3")
1467         d.addCallback(_check_stdout_against, 3)
1468
1469         #  tahoe put FOO
1470         STDIN_DATA = "This is the file to upload from stdin."
1471         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1472         #  tahoe put tahoe:FOO
1473         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1474                       stdin="Other file from stdin.")
1475
1476         d.addCallback(run, "ls")
1477         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1478                                   "tahoe-file-stdin", "from-stdin"])
1479         d.addCallback(run, "ls", "subdir")
1480         d.addCallback(_check_ls, ["tahoe-file1"])
1481
1482         # tahoe mkdir FOO
1483         d.addCallback(run, "mkdir", "subdir2")
1484         d.addCallback(run, "ls")
1485         # TODO: extract the URI, set an alias with it
1486         d.addCallback(_check_ls, ["subdir2"])
1487
1488         # tahoe get: (to stdin and to a file)
1489         d.addCallback(run, "get", "tahoe-file0")
1490         d.addCallback(_check_stdout_against, 0)
1491         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1492         d.addCallback(_check_stdout_against, 1)
1493         outfile0 = os.path.join(self.basedir, "outfile0")
1494         d.addCallback(run, "get", "file2", outfile0)
1495         def _check_outfile0((out,err)):
1496             data = open(outfile0,"rb").read()
1497             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1498         d.addCallback(_check_outfile0)
1499         outfile1 = os.path.join(self.basedir, "outfile0")
1500         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1501         def _check_outfile1((out,err)):
1502             data = open(outfile1,"rb").read()
1503             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1504         d.addCallback(_check_outfile1)
1505
1506         d.addCallback(run, "rm", "tahoe-file0")
1507         d.addCallback(run, "rm", "tahoe:file2")
1508         d.addCallback(run, "ls")
1509         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1510
1511         d.addCallback(run, "ls", "-l")
1512         def _check_ls_l((out,err)):
1513             lines = out.split("\n")
1514             for l in lines:
1515                 if "tahoe-file-stdin" in l:
1516                     self.failUnless(l.startswith("-r-- "), l)
1517                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1518                 if "file3" in l:
1519                     self.failUnless(l.startswith("-rw- "), l) # mutable
1520         d.addCallback(_check_ls_l)
1521
1522         d.addCallback(run, "ls", "--uri")
1523         def _check_ls_uri((out,err)):
1524             lines = out.split("\n")
1525             for l in lines:
1526                 if "file3" in l:
1527                     self.failUnless(self._mutable_file3_uri in l)
1528         d.addCallback(_check_ls_uri)
1529
1530         d.addCallback(run, "ls", "--readonly-uri")
1531         def _check_ls_rouri((out,err)):
1532             lines = out.split("\n")
1533             for l in lines:
1534                 if "file3" in l:
1535                     rw_uri = self._mutable_file3_uri
1536                     u = uri.from_string_mutable_filenode(rw_uri)
1537                     ro_uri = u.get_readonly().to_string()
1538                     self.failUnless(ro_uri in l)
1539         d.addCallback(_check_ls_rouri)
1540
1541
1542         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1543         d.addCallback(run, "ls")
1544         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1545
1546         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1547         d.addCallback(run, "ls")
1548         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1549
1550         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1551         d.addCallback(run, "ls")
1552         d.addCallback(_check_ls, ["file3", "file3-copy"])
1553         d.addCallback(run, "get", "tahoe:file3-copy")
1554         d.addCallback(_check_stdout_against, 3)
1555
1556         # copy from disk into tahoe
1557         d.addCallback(run, "cp", files[4], "tahoe:file4")
1558         d.addCallback(run, "ls")
1559         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1560         d.addCallback(run, "get", "tahoe:file4")
1561         d.addCallback(_check_stdout_against, 4)
1562
1563         # copy from tahoe into disk
1564         target_filename = os.path.join(self.basedir, "file-out")
1565         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1566         def _check_cp_out((out,err)):
1567             self.failUnless(os.path.exists(target_filename))
1568             got = open(target_filename,"rb").read()
1569             self.failUnlessEqual(got, datas[4])
1570         d.addCallback(_check_cp_out)
1571
1572         # copy from disk to disk (silly case)
1573         target2_filename = os.path.join(self.basedir, "file-out-copy")
1574         d.addCallback(run, "cp", target_filename, target2_filename)
1575         def _check_cp_out2((out,err)):
1576             self.failUnless(os.path.exists(target2_filename))
1577             got = open(target2_filename,"rb").read()
1578             self.failUnlessEqual(got, datas[4])
1579         d.addCallback(_check_cp_out2)
1580
1581         # copy from tahoe into disk, overwriting an existing file
1582         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1583         def _check_cp_out3((out,err)):
1584             self.failUnless(os.path.exists(target_filename))
1585             got = open(target_filename,"rb").read()
1586             self.failUnlessEqual(got, datas[3])
1587         d.addCallback(_check_cp_out3)
1588
1589         # copy from disk into tahoe, overwriting an existing immutable file
1590         d.addCallback(run, "cp", files[5], "tahoe:file4")
1591         d.addCallback(run, "ls")
1592         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1593         d.addCallback(run, "get", "tahoe:file4")
1594         d.addCallback(_check_stdout_against, 5)
1595
1596         # copy from disk into tahoe, overwriting an existing mutable file
1597         d.addCallback(run, "cp", files[5], "tahoe:file3")
1598         d.addCallback(run, "ls")
1599         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1600         d.addCallback(run, "get", "tahoe:file3")
1601         d.addCallback(_check_stdout_against, 5)
1602
1603         # recursive copy: setup
1604         dn = os.path.join(self.basedir, "dir1")
1605         os.makedirs(dn)
1606         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1607         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1608         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1609         sdn2 = os.path.join(dn, "subdir2")
1610         os.makedirs(sdn2)
1611         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1612         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1613
1614         # from disk into tahoe
1615         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1616         d.addCallback(run, "ls")
1617         d.addCallback(_check_ls, ["dir1"])
1618         d.addCallback(run, "ls", "dir1")
1619         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1620                       ["rfile4", "rfile5"])
1621         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1622         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1623                       ["rfile1", "rfile2", "rfile3"])
1624         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1625         d.addCallback(_check_stdout_against, data="rfile4")
1626
1627         # and back out again
1628         dn_copy = os.path.join(self.basedir, "dir1-copy")
1629         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1630         def _check_cp_r_out((out,err)):
1631             def _cmp(name):
1632                 old = open(os.path.join(dn, name), "rb").read()
1633                 newfn = os.path.join(dn_copy, name)
1634                 self.failUnless(os.path.exists(newfn))
1635                 new = open(newfn, "rb").read()
1636                 self.failUnlessEqual(old, new)
1637             _cmp("rfile1")
1638             _cmp("rfile2")
1639             _cmp("rfile3")
1640             _cmp(os.path.join("subdir2", "rfile4"))
1641             _cmp(os.path.join("subdir2", "rfile5"))
1642         d.addCallback(_check_cp_r_out)
1643
1644         # and copy it a second time, which ought to overwrite the same files
1645         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1646
1647         # and tahoe-to-tahoe
1648         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1649         d.addCallback(run, "ls")
1650         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1651         d.addCallback(run, "ls", "dir1-copy")
1652         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1653                       ["rfile4", "rfile5"])
1654         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1655         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1656                       ["rfile1", "rfile2", "rfile3"])
1657         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1658         d.addCallback(_check_stdout_against, data="rfile4")
1659
1660         # and copy it a second time, which ought to overwrite the same files
1661         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1662
1663         # tahoe_ls doesn't currently handle the error correctly: it tries to
1664         # JSON-parse a traceback.
1665 ##         def _ls_missing(res):
1666 ##             argv = ["ls"] + nodeargs + ["bogus"]
1667 ##             return self._run_cli(argv)
1668 ##         d.addCallback(_ls_missing)
1669 ##         def _check_ls_missing((out,err)):
1670 ##             print "OUT", out
1671 ##             print "ERR", err
1672 ##             self.failUnlessEqual(err, "")
1673 ##         d.addCallback(_check_ls_missing)
1674
1675         return d
1676
1677     def _run_cli(self, argv, stdin=""):
1678         #print "CLI:", argv
1679         stdout, stderr = StringIO(), StringIO()
1680         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1681                                   stdin=StringIO(stdin),
1682                                   stdout=stdout, stderr=stderr)
1683         def _done(res):
1684             return stdout.getvalue(), stderr.getvalue()
1685         d.addCallback(_done)
1686         return d
1687
1688     def _test_checker(self, res):
1689         ut = upload.Data("too big to be literal" * 200, convergence=None)
1690         d = self._personal_node.add_file(u"big file", ut)
1691
1692         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1693         def _check_dirnode_results(r):
1694             self.failUnless(r.is_healthy())
1695         d.addCallback(_check_dirnode_results)
1696         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1697         d.addCallback(_check_dirnode_results)
1698
1699         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1700         def _got_chk_filenode(n):
1701             self.failUnless(isinstance(n, filenode.FileNode))
1702             d = n.check(Monitor())
1703             def _check_filenode_results(r):
1704                 self.failUnless(r.is_healthy())
1705             d.addCallback(_check_filenode_results)
1706             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1707             d.addCallback(_check_filenode_results)
1708             return d
1709         d.addCallback(_got_chk_filenode)
1710
1711         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1712         def _got_lit_filenode(n):
1713             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1714             d = n.check(Monitor())
1715             def _check_lit_filenode_results(r):
1716                 self.failUnlessEqual(r, None)
1717             d.addCallback(_check_lit_filenode_results)
1718             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1719             d.addCallback(_check_lit_filenode_results)
1720             return d
1721         d.addCallback(_got_lit_filenode)
1722         return d
1723
1724
1725 class MutableChecker(SystemTestMixin, unittest.TestCase, WebErrorMixin):
1726
1727     def _run_cli(self, argv):
1728         stdout, stderr = StringIO(), StringIO()
1729         runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1730         return stdout.getvalue()
1731
1732     def test_good(self):
1733         self.basedir = self.mktemp()
1734         d = self.set_up_nodes()
1735         CONTENTS = "a little bit of data"
1736         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1737         def _created(node):
1738             self.node = node
1739             si = self.node.get_storage_index()
1740         d.addCallback(_created)
1741         # now make sure the webapi verifier sees no problems
1742         def _do_check(res):
1743             url = (self.webish_url +
1744                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1745                    "?t=check&verify=true")
1746             return getPage(url, method="POST")
1747         d.addCallback(_do_check)
1748         def _got_results(out):
1749             self.failUnless("<span>Healthy!</span>" in out, out)
1750             self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1751             self.failIf("Not Healthy!" in out, out)
1752             self.failIf("Unhealthy" in out, out)
1753             self.failIf("Corrupt Shares" in out, out)
1754         d.addCallback(_got_results)
1755         d.addErrback(self.explain_web_error)
1756         return d
1757
1758     def test_corrupt(self):
1759         self.basedir = self.mktemp()
1760         d = self.set_up_nodes()
1761         CONTENTS = "a little bit of data"
1762         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1763         def _created(node):
1764             self.node = node
1765             si = self.node.get_storage_index()
1766             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1767                                 self.clients[1].basedir])
1768             files = out.split("\n")
1769             # corrupt one of them, using the CLI debug command
1770             f = files[0]
1771             shnum = os.path.basename(f)
1772             nodeid = self.clients[1].nodeid
1773             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1774             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1775             out = self._run_cli(["debug", "corrupt-share", files[0]])
1776         d.addCallback(_created)
1777         # now make sure the webapi verifier notices it
1778         def _do_check(res):
1779             url = (self.webish_url +
1780                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1781                    "?t=check&verify=true")
1782             return getPage(url, method="POST")
1783         d.addCallback(_do_check)
1784         def _got_results(out):
1785             self.failUnless("Not Healthy!" in out, out)
1786             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1787             self.failUnless("Corrupt Shares:" in out, out)
1788         d.addCallback(_got_results)
1789
1790         # now make sure the webapi repairer can fix it
1791         def _do_repair(res):
1792             url = (self.webish_url +
1793                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1794                    "?t=check&verify=true&repair=true")
1795             return getPage(url, method="POST")
1796         d.addCallback(_do_repair)
1797         def _got_repair_results(out):
1798             self.failUnless("<div>Repair successful</div>" in out, out)
1799         d.addCallback(_got_repair_results)
1800         d.addCallback(_do_check)
1801         def _got_postrepair_results(out):
1802             self.failIf("Not Healthy!" in out, out)
1803             self.failUnless("Recoverable Versions: 10*seq" in out, out)
1804         d.addCallback(_got_postrepair_results)
1805         d.addErrback(self.explain_web_error)
1806
1807         return d
1808
1809     def test_delete_share(self):
1810         self.basedir = self.mktemp()
1811         d = self.set_up_nodes()
1812         CONTENTS = "a little bit of data"
1813         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1814         def _created(node):
1815             self.node = node
1816             si = self.node.get_storage_index()
1817             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1818                                 self.clients[1].basedir])
1819             files = out.split("\n")
1820             # corrupt one of them, using the CLI debug command
1821             f = files[0]
1822             shnum = os.path.basename(f)
1823             nodeid = self.clients[1].nodeid
1824             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1825             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1826             os.unlink(files[0])
1827         d.addCallback(_created)
1828         # now make sure the webapi checker notices it
1829         def _do_check(res):
1830             url = (self.webish_url +
1831                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1832                    "?t=check&verify=false")
1833             return getPage(url, method="POST")
1834         d.addCallback(_do_check)
1835         def _got_results(out):
1836             self.failUnless("Not Healthy!" in out, out)
1837             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1838             self.failIf("Corrupt Shares" in out, out)
1839         d.addCallback(_got_results)
1840
1841         # now make sure the webapi repairer can fix it
1842         def _do_repair(res):
1843             url = (self.webish_url +
1844                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1845                    "?t=check&verify=false&repair=true")
1846             return getPage(url, method="POST")
1847         d.addCallback(_do_repair)
1848         def _got_repair_results(out):
1849             self.failUnless("Repair successful" in out)
1850         d.addCallback(_got_repair_results)
1851         d.addCallback(_do_check)
1852         def _got_postrepair_results(out):
1853             self.failIf("Not Healthy!" in out, out)
1854             self.failUnless("Recoverable Versions: 10*seq" in out)
1855         d.addCallback(_got_postrepair_results)
1856         d.addErrback(self.explain_web_error)
1857
1858         return d
1859
1860 class DeepCheckWeb(SystemTestMixin, unittest.TestCase, WebErrorMixin):
1861     # construct a small directory tree (with one dir, one immutable file, one
1862     # mutable file, one LIT file, and a loop), and then check/examine it in
1863     # various ways.
1864
1865     def set_up_tree(self, ignored):
1866         # 2.9s
1867         c0 = self.clients[0]
1868         d = c0.create_empty_dirnode()
1869         def _created_root(n):
1870             self.root = n
1871             self.root_uri = n.get_uri()
1872         d.addCallback(_created_root)
1873         d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1874         d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1875         def _created_mutable(n):
1876             self.mutable = n
1877             self.mutable_uri = n.get_uri()
1878         d.addCallback(_created_mutable)
1879
1880         large = upload.Data("Lots of data\n" * 1000, None)
1881         d.addCallback(lambda ign: self.root.add_file(u"large", large))
1882         def _created_large(n):
1883             self.large = n
1884             self.large_uri = n.get_uri()
1885         d.addCallback(_created_large)
1886
1887         small = upload.Data("Small enough for a LIT", None)
1888         d.addCallback(lambda ign: self.root.add_file(u"small", small))
1889         def _created_small(n):
1890             self.small = n
1891             self.small_uri = n.get_uri()
1892         d.addCallback(_created_small)
1893
1894         d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
1895         return d
1896
1897     def check_is_healthy(self, cr, n, where, incomplete=False):
1898         self.failUnless(ICheckerResults.providedBy(cr), where)
1899         self.failUnless(cr.is_healthy(), where)
1900         self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
1901                              where)
1902         self.failUnlessEqual(cr.get_storage_index_string(),
1903                              base32.b2a(n.get_storage_index()), where)
1904         needs_rebalancing = bool( len(self.clients) < 10 )
1905         if not incomplete:
1906             self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
1907         d = cr.get_data()
1908         self.failUnlessEqual(d["count-shares-good"], 10, where)
1909         self.failUnlessEqual(d["count-shares-needed"], 3, where)
1910         self.failUnlessEqual(d["count-shares-expected"], 10, where)
1911         if not incomplete:
1912             self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
1913         self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
1914         self.failUnlessEqual(d["list-corrupt-shares"], [], where)
1915         if not incomplete:
1916             self.failUnlessEqual(sorted(d["servers-responding"]),
1917                                  sorted([c.nodeid for c in self.clients]),
1918                                  where)
1919             self.failUnless("sharemap" in d, where)
1920             all_serverids = set()
1921             for (shareid, serverids) in d["sharemap"].items():
1922                 all_serverids.update(serverids)
1923             self.failUnlessEqual(sorted(all_serverids),
1924                                  sorted([c.nodeid for c in self.clients]),
1925                                  where)
1926
1927         self.failUnlessEqual(d["count-wrong-shares"], 0, where)
1928         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
1929         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
1930
1931
1932     def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
1933         self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
1934         self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
1935         self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
1936         self.failUnless(cr.get_post_repair_results().is_healthy(), where)
1937         self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
1938         self.failIf(cr.get_repair_attempted(), where)
1939
1940     def deep_check_is_healthy(self, cr, num_healthy, where):
1941         self.failUnless(IDeepCheckResults.providedBy(cr))
1942         self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
1943                              num_healthy, where)
1944
1945     def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
1946         self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
1947         c = cr.get_counters()
1948         self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
1949                              num_healthy, where)
1950         self.failUnlessEqual(c["count-objects-healthy-post-repair"],
1951                              num_healthy, where)
1952         self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
1953
1954     def test_good(self):
1955         self.basedir = self.mktemp()
1956         d = self.set_up_nodes()
1957         d.addCallback(self.set_up_tree)
1958         d.addCallback(self.do_stats)
1959         d.addCallback(self.do_test_good)
1960         d.addCallback(self.do_test_web)
1961         d.addErrback(self.explain_web_error)
1962         return d
1963
1964     def do_stats(self, ignored):
1965         d = defer.succeed(None)
1966         d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
1967         d.addCallback(self.check_stats)
1968         return d
1969
1970     def check_stats(self, s):
1971         self.failUnlessEqual(s["count-directories"], 1)
1972         self.failUnlessEqual(s["count-files"], 3)
1973         self.failUnlessEqual(s["count-immutable-files"], 1)
1974         self.failUnlessEqual(s["count-literal-files"], 1)
1975         self.failUnlessEqual(s["count-mutable-files"], 1)
1976         # don't check directories: their size will vary
1977         # s["largest-directory"]
1978         # s["size-directories"]
1979         self.failUnlessEqual(s["largest-directory-children"], 4)
1980         self.failUnlessEqual(s["largest-immutable-file"], 13000)
1981         # to re-use this function for both the local
1982         # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
1983         # coerce the result into a list of tuples. dirnode.start_deep_stats()
1984         # returns a list of tuples, but JSON only knows about lists., so
1985         # t=start-deep-stats returns a list of lists.
1986         histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
1987         self.failUnlessEqual(histogram, [(11, 31, 1),
1988                                          (10001, 31622, 1),
1989                                          ])
1990         self.failUnlessEqual(s["size-immutable-files"], 13000)
1991         self.failUnlessEqual(s["size-literal-files"], 22)
1992
1993     def do_test_good(self, ignored):
1994         d = defer.succeed(None)
1995         # check the individual items
1996         d.addCallback(lambda ign: self.root.check(Monitor()))
1997         d.addCallback(self.check_is_healthy, self.root, "root")
1998         d.addCallback(lambda ign: self.mutable.check(Monitor()))
1999         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2000         d.addCallback(lambda ign: self.large.check(Monitor()))
2001         d.addCallback(self.check_is_healthy, self.large, "large")
2002         d.addCallback(lambda ign: self.small.check(Monitor()))
2003         d.addCallback(self.failUnlessEqual, None, "small")
2004
2005         # and again with verify=True
2006         d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2007         d.addCallback(self.check_is_healthy, self.root, "root")
2008         d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2009         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2010         d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2011         d.addCallback(self.check_is_healthy, self.large, "large",
2012                       incomplete=True)
2013         d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2014         d.addCallback(self.failUnlessEqual, None, "small")
2015
2016         # and check_and_repair(), which should be a nop
2017         d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2018         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2019         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2020         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2021         d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2022         d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2023         d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2024         d.addCallback(self.failUnlessEqual, None, "small")
2025
2026         # check_and_repair(verify=True)
2027         d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2028         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2029         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2030         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2031         d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2032         d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2033                       incomplete=True)
2034         d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2035         d.addCallback(self.failUnlessEqual, None, "small")
2036
2037
2038         # now deep-check the root, with various verify= and repair= options
2039         d.addCallback(lambda ign:
2040                       self.root.start_deep_check().when_done())
2041         d.addCallback(self.deep_check_is_healthy, 3, "root")
2042         d.addCallback(lambda ign:
2043                       self.root.start_deep_check(verify=True).when_done())
2044         d.addCallback(self.deep_check_is_healthy, 3, "root")
2045         d.addCallback(lambda ign:
2046                       self.root.start_deep_check_and_repair().when_done())
2047         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2048         d.addCallback(lambda ign:
2049                       self.root.start_deep_check_and_repair(verify=True).when_done())
2050         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2051
2052         # and finally, start a deep-check, but then cancel it.
2053         d.addCallback(lambda ign: self.root.start_deep_check())
2054         def _checking(monitor):
2055             monitor.cancel()
2056             d = monitor.when_done()
2057             # this should fire as soon as the next dirnode.list finishes.
2058             # TODO: add a counter to measure how many list() calls are made,
2059             # assert that no more than one gets to run before the cancel()
2060             # takes effect.
2061             def _finished_normally(res):
2062                 self.fail("this was supposed to fail, not finish normally")
2063             def _cancelled(f):
2064                 f.trap(OperationCancelledError)
2065             d.addCallbacks(_finished_normally, _cancelled)
2066             return d
2067         d.addCallback(_checking)
2068
2069         return d
2070
2071     def web_json(self, n, **kwargs):
2072         kwargs["output"] = "json"
2073         d = self.web(n, "POST", **kwargs)
2074         d.addCallback(self.decode_json)
2075         return d
2076
2077     def decode_json(self, (s,url)):
2078         try:
2079             data = simplejson.loads(s)
2080         except ValueError:
2081             self.fail("%s: not JSON: '%s'" % (url, s))
2082         return data
2083
2084     def web(self, n, method="GET", **kwargs):
2085         # returns (data, url)
2086         url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
2087                + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
2088         d = getPage(url, method=method)
2089         d.addCallback(lambda data: (data,url))
2090         return d
2091
2092     def wait_for_operation(self, ignored, ophandle):
2093         url = self.webish_url + "operations/" + ophandle
2094         url += "?t=status&output=JSON"
2095         d = getPage(url)
2096         def _got(res):
2097             try:
2098                 data = simplejson.loads(res)
2099             except ValueError:
2100                 self.fail("%s: not JSON: '%s'" % (url, res))
2101             if not data["finished"]:
2102                 d = self.stall(delay=1.0)
2103                 d.addCallback(self.wait_for_operation, ophandle)
2104                 return d
2105             return data
2106         d.addCallback(_got)
2107         return d
2108
2109     def get_operation_results(self, ignored, ophandle, output=None):
2110         url = self.webish_url + "operations/" + ophandle
2111         url += "?t=status"
2112         if output:
2113             url += "&output=" + output
2114         d = getPage(url)
2115         def _got(res):
2116             if output and output.lower() == "json":
2117                 try:
2118                     return simplejson.loads(res)
2119                 except ValueError:
2120                     self.fail("%s: not JSON: '%s'" % (url, res))
2121             return res
2122         d.addCallback(_got)
2123         return d
2124
2125     def slow_web(self, n, output=None, **kwargs):
2126         # use ophandle=
2127         handle = base32.b2a(os.urandom(4))
2128         d = self.web(n, "POST", ophandle=handle, **kwargs)
2129         d.addCallback(self.wait_for_operation, handle)
2130         d.addCallback(self.get_operation_results, handle, output=output)
2131         return d
2132
2133     def json_check_is_healthy(self, data, n, where, incomplete=False):
2134
2135         self.failUnlessEqual(data["storage-index"],
2136                              base32.b2a(n.get_storage_index()), where)
2137         r = data["results"]
2138         self.failUnlessEqual(r["healthy"], True, where)
2139         needs_rebalancing = bool( len(self.clients) < 10 )
2140         if not incomplete:
2141             self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2142         self.failUnlessEqual(r["count-shares-good"], 10, where)
2143         self.failUnlessEqual(r["count-shares-needed"], 3, where)
2144         self.failUnlessEqual(r["count-shares-expected"], 10, where)
2145         if not incomplete:
2146             self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2147         self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2148         self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2149         if not incomplete:
2150             self.failUnlessEqual(sorted(r["servers-responding"]),
2151                                  sorted([idlib.nodeid_b2a(c.nodeid)
2152                                          for c in self.clients]), where)
2153             self.failUnless("sharemap" in r, where)
2154             all_serverids = set()
2155             for (shareid, serverids_s) in r["sharemap"].items():
2156                 all_serverids.update(serverids_s)
2157             self.failUnlessEqual(sorted(all_serverids),
2158                                  sorted([idlib.nodeid_b2a(c.nodeid)
2159                                          for c in self.clients]), where)
2160         self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2161         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2162         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2163
2164     def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2165         self.failUnlessEqual(data["storage-index"],
2166                              base32.b2a(n.get_storage_index()), where)
2167         self.failUnlessEqual(data["repair-attempted"], False, where)
2168         self.json_check_is_healthy(data["pre-repair-results"],
2169                                    n, where, incomplete)
2170         self.json_check_is_healthy(data["post-repair-results"],
2171                                    n, where, incomplete)
2172
2173     def json_full_deepcheck_is_healthy(self, data, n, where):
2174         self.failUnlessEqual(data["root-storage-index"],
2175                              base32.b2a(n.get_storage_index()), where)
2176         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2177         self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2178         self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2179         self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2180         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2181         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2182         self.json_check_stats(data["stats"], where)
2183
2184     def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2185         self.failUnlessEqual(data["root-storage-index"],
2186                              base32.b2a(n.get_storage_index()), where)
2187         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2188
2189         self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2190         self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2191         self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2192
2193         self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2194         self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2195         self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2196
2197         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2198         self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2199         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2200
2201         self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2202         self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2203         self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2204
2205
2206     def json_check_lit(self, data, n, where):
2207         self.failUnlessEqual(data["storage-index"], "", where)
2208         self.failUnlessEqual(data["results"]["healthy"], True, where)
2209
2210     def json_check_stats(self, data, where):
2211         self.check_stats(data)
2212
2213     def do_test_web(self, ignored):
2214         d = defer.succeed(None)
2215
2216         # stats
2217         d.addCallback(lambda ign:
2218                       self.slow_web(self.root,
2219                                     t="start-deep-stats", output="json"))
2220         d.addCallback(self.json_check_stats, "deep-stats")
2221
2222         # check, no verify
2223         d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2224         d.addCallback(self.json_check_is_healthy, self.root, "root")
2225         d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2226         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2227         d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2228         d.addCallback(self.json_check_is_healthy, self.large, "large")
2229         d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2230         d.addCallback(self.json_check_lit, self.small, "small")
2231
2232         # check and verify
2233         d.addCallback(lambda ign:
2234                       self.web_json(self.root, t="check", verify="true"))
2235         d.addCallback(self.json_check_is_healthy, self.root, "root")
2236         d.addCallback(lambda ign:
2237                       self.web_json(self.mutable, t="check", verify="true"))
2238         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2239         d.addCallback(lambda ign:
2240                       self.web_json(self.large, t="check", verify="true"))
2241         d.addCallback(self.json_check_is_healthy, self.large, "large", incomplete=True)
2242         d.addCallback(lambda ign:
2243                       self.web_json(self.small, t="check", verify="true"))
2244         d.addCallback(self.json_check_lit, self.small, "small")
2245
2246         # check and repair, no verify
2247         d.addCallback(lambda ign:
2248                       self.web_json(self.root, t="check", repair="true"))
2249         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2250         d.addCallback(lambda ign:
2251                       self.web_json(self.mutable, t="check", repair="true"))
2252         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2253         d.addCallback(lambda ign:
2254                       self.web_json(self.large, t="check", repair="true"))
2255         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large")
2256         d.addCallback(lambda ign:
2257                       self.web_json(self.small, t="check", repair="true"))
2258         d.addCallback(self.json_check_lit, self.small, "small")
2259
2260         # check+verify+repair
2261         d.addCallback(lambda ign:
2262                       self.web_json(self.root, t="check", repair="true", verify="true"))
2263         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2264         d.addCallback(lambda ign:
2265                       self.web_json(self.mutable, t="check", repair="true", verify="true"))
2266         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2267         d.addCallback(lambda ign:
2268                       self.web_json(self.large, t="check", repair="true", verify="true"))
2269         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large", incomplete=True)
2270         d.addCallback(lambda ign:
2271                       self.web_json(self.small, t="check", repair="true", verify="true"))
2272         d.addCallback(self.json_check_lit, self.small, "small")
2273
2274         # now run a deep-check, with various verify= and repair= flags
2275         d.addCallback(lambda ign:
2276                       self.slow_web(self.root, t="start-deep-check", output="json"))
2277         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2278         d.addCallback(lambda ign:
2279                       self.slow_web(self.root, t="start-deep-check", verify="true",
2280                                     output="json"))
2281         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2282         d.addCallback(lambda ign:
2283                       self.slow_web(self.root, t="start-deep-check", repair="true",
2284                                     output="json"))
2285         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2286         d.addCallback(lambda ign:
2287                       self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2288         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2289
2290         # now look at t=info
2291         d.addCallback(lambda ign: self.web(self.root, t="info"))
2292         # TODO: examine the output
2293         d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2294         d.addCallback(lambda ign: self.web(self.large, t="info"))
2295         d.addCallback(lambda ign: self.web(self.small, t="info"))
2296
2297         return d