]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
#527: support HTTP 'Range:' requests, using a cachefile. Adds filenode.read(consumer...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri, storage, offloaded
12 from allmydata.immutable import download, upload, filenode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17      ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
18      IDeepCheckAndRepairResults, NoSuchChildError, NotEnoughSharesError
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
26
27 from allmydata.test.common import SystemTestMixin, WebErrorMixin, \
28      MemoryConsumer, download_to_data
29
30 LARGE_DATA = """
31 This is some data to publish to the virtual drive, which needs to be large
32 enough to not fit inside a LIT uri.
33 """
34
35 class CountingDataUploadable(upload.Data):
36     bytes_read = 0
37     interrupt_after = None
38     interrupt_after_d = None
39
40     def read(self, length):
41         self.bytes_read += length
42         if self.interrupt_after is not None:
43             if self.bytes_read > self.interrupt_after:
44                 self.interrupt_after = None
45                 self.interrupt_after_d.callback(self)
46         return upload.Data.read(self, length)
47
48 class GrabEverythingConsumer:
49     implements(IConsumer)
50
51     def __init__(self):
52         self.contents = ""
53
54     def registerProducer(self, producer, streaming):
55         assert streaming
56         assert IPushProducer.providedBy(producer)
57
58     def write(self, data):
59         self.contents += data
60
61     def unregisterProducer(self):
62         pass
63
64 class SystemTest(SystemTestMixin, unittest.TestCase):
65
66     def test_connections(self):
67         self.basedir = "system/SystemTest/test_connections"
68         d = self.set_up_nodes()
69         self.extra_node = None
70         d.addCallback(lambda res: self.add_extra_node(self.numclients))
71         def _check(extra_node):
72             self.extra_node = extra_node
73             for c in self.clients:
74                 all_peerids = list(c.get_all_peerids())
75                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
76                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
77                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
78
79         d.addCallback(_check)
80         def _shutdown_extra_node(res):
81             if self.extra_node:
82                 return self.extra_node.stopService()
83             return res
84         d.addBoth(_shutdown_extra_node)
85         return d
86     test_connections.timeout = 300
87     # test_connections is subsumed by test_upload_and_download, and takes
88     # quite a while to run on a slow machine (because of all the TLS
89     # connections that must be established). If we ever rework the introducer
90     # code to such an extent that we're not sure if it works anymore, we can
91     # reinstate this test until it does.
92     del test_connections
93
94     def test_upload_and_download_random_key(self):
95         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
96         return self._test_upload_and_download(convergence=None)
97     test_upload_and_download_random_key.timeout = 4800
98
99     def test_upload_and_download_convergent(self):
100         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
101         return self._test_upload_and_download(convergence="some convergence string")
102     test_upload_and_download_convergent.timeout = 4800
103
104     def _test_upload_and_download(self, convergence):
105         # we use 4000 bytes of data, which will result in about 400k written
106         # to disk among all our simulated nodes
107         DATA = "Some data to upload\n" * 200
108         d = self.set_up_nodes()
109         def _check_connections(res):
110             for c in self.clients:
111                 all_peerids = list(c.get_all_peerids())
112                 self.failUnlessEqual(len(all_peerids), self.numclients)
113                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
114                 self.failUnlessEqual(len(permuted_peers), self.numclients)
115         d.addCallback(_check_connections)
116
117         def _do_upload(res):
118             log.msg("UPLOADING")
119             u = self.clients[0].getServiceNamed("uploader")
120             self.uploader = u
121             # we crank the max segsize down to 1024b for the duration of this
122             # test, so we can exercise multiple segments. It is important
123             # that this is not a multiple of the segment size, so that the
124             # tail segment is not the same length as the others. This actualy
125             # gets rounded up to 1025 to be a multiple of the number of
126             # required shares (since we use 25 out of 100 FEC).
127             up = upload.Data(DATA, convergence=convergence)
128             up.max_segment_size = 1024
129             d1 = u.upload(up)
130             return d1
131         d.addCallback(_do_upload)
132         def _upload_done(results):
133             uri = results.uri
134             log.msg("upload finished: uri is %s" % (uri,))
135             self.uri = uri
136             dl = self.clients[1].getServiceNamed("downloader")
137             self.downloader = dl
138         d.addCallback(_upload_done)
139
140         def _upload_again(res):
141             # Upload again. If using convergent encryption then this ought to be
142             # short-circuited, however with the way we currently generate URIs
143             # (i.e. because they include the roothash), we have to do all of the
144             # encoding work, and only get to save on the upload part.
145             log.msg("UPLOADING AGAIN")
146             up = upload.Data(DATA, convergence=convergence)
147             up.max_segment_size = 1024
148             d1 = self.uploader.upload(up)
149         d.addCallback(_upload_again)
150
151         def _download_to_data(res):
152             log.msg("DOWNLOADING")
153             return self.downloader.download_to_data(self.uri)
154         d.addCallback(_download_to_data)
155         def _download_to_data_done(data):
156             log.msg("download finished")
157             self.failUnlessEqual(data, DATA)
158         d.addCallback(_download_to_data_done)
159
160         target_filename = os.path.join(self.basedir, "download.target")
161         def _download_to_filename(res):
162             return self.downloader.download_to_filename(self.uri,
163                                                         target_filename)
164         d.addCallback(_download_to_filename)
165         def _download_to_filename_done(res):
166             newdata = open(target_filename, "rb").read()
167             self.failUnlessEqual(newdata, DATA)
168         d.addCallback(_download_to_filename_done)
169
170         target_filename2 = os.path.join(self.basedir, "download.target2")
171         def _download_to_filehandle(res):
172             fh = open(target_filename2, "wb")
173             return self.downloader.download_to_filehandle(self.uri, fh)
174         d.addCallback(_download_to_filehandle)
175         def _download_to_filehandle_done(fh):
176             fh.close()
177             newdata = open(target_filename2, "rb").read()
178             self.failUnlessEqual(newdata, DATA)
179         d.addCallback(_download_to_filehandle_done)
180
181         consumer = GrabEverythingConsumer()
182         ct = download.ConsumerAdapter(consumer)
183         d.addCallback(lambda res:
184                       self.downloader.download(self.uri, ct))
185         def _download_to_consumer_done(ign):
186             self.failUnlessEqual(consumer.contents, DATA)
187         d.addCallback(_download_to_consumer_done)
188
189         def _test_read(res):
190             n = self.clients[1].create_node_from_uri(self.uri)
191             d = download_to_data(n)
192             def _read_done(data):
193                 self.failUnlessEqual(data, DATA)
194             d.addCallback(_read_done)
195             d.addCallback(lambda ign:
196                           n.read(MemoryConsumer(), offset=1, size=4))
197             def _read_portion_done(mc):
198                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
199             d.addCallback(_read_portion_done)
200             d.addCallback(lambda ign:
201                           n.read(MemoryConsumer(), offset=2, size=None))
202             def _read_tail_done(mc):
203                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
204             d.addCallback(_read_tail_done)
205             return d
206         d.addCallback(_test_read)
207
208         def _download_nonexistent_uri(res):
209             baduri = self.mangle_uri(self.uri)
210             log.msg("about to download non-existent URI", level=log.UNUSUAL,
211                     facility="tahoe.tests")
212             d1 = self.downloader.download_to_data(baduri)
213             def _baduri_should_fail(res):
214                 log.msg("finished downloading non-existend URI",
215                         level=log.UNUSUAL, facility="tahoe.tests")
216                 self.failUnless(isinstance(res, Failure))
217                 self.failUnless(res.check(NotEnoughSharesError),
218                                 "expected NotEnoughSharesError, got %s" % res)
219                 # TODO: files that have zero peers should get a special kind
220                 # of NotEnoughSharesError, which can be used to suggest that
221                 # the URI might be wrong or that they've never uploaded the
222                 # file in the first place.
223             d1.addBoth(_baduri_should_fail)
224             return d1
225         d.addCallback(_download_nonexistent_uri)
226
227         # add a new node, which doesn't accept shares, and only uses the
228         # helper for upload.
229         d.addCallback(lambda res: self.add_extra_node(self.numclients,
230                                                       self.helper_furl,
231                                                       add_to_sparent=True))
232         def _added(extra_node):
233             self.extra_node = extra_node
234             extra_node.getServiceNamed("storage").sizelimit = 0
235         d.addCallback(_added)
236
237         HELPER_DATA = "Data that needs help to upload" * 1000
238         def _upload_with_helper(res):
239             u = upload.Data(HELPER_DATA, convergence=convergence)
240             d = self.extra_node.upload(u)
241             def _uploaded(results):
242                 uri = results.uri
243                 return self.downloader.download_to_data(uri)
244             d.addCallback(_uploaded)
245             def _check(newdata):
246                 self.failUnlessEqual(newdata, HELPER_DATA)
247             d.addCallback(_check)
248             return d
249         d.addCallback(_upload_with_helper)
250
251         def _upload_duplicate_with_helper(res):
252             u = upload.Data(HELPER_DATA, convergence=convergence)
253             u.debug_stash_RemoteEncryptedUploadable = True
254             d = self.extra_node.upload(u)
255             def _uploaded(results):
256                 uri = results.uri
257                 return self.downloader.download_to_data(uri)
258             d.addCallback(_uploaded)
259             def _check(newdata):
260                 self.failUnlessEqual(newdata, HELPER_DATA)
261                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
262                             "uploadable started uploading, should have been avoided")
263             d.addCallback(_check)
264             return d
265         if convergence is not None:
266             d.addCallback(_upload_duplicate_with_helper)
267
268         def _upload_resumable(res):
269             DATA = "Data that needs help to upload and gets interrupted" * 1000
270             u1 = CountingDataUploadable(DATA, convergence=convergence)
271             u2 = CountingDataUploadable(DATA, convergence=convergence)
272
273             # we interrupt the connection after about 5kB by shutting down
274             # the helper, then restartingit.
275             u1.interrupt_after = 5000
276             u1.interrupt_after_d = defer.Deferred()
277             u1.interrupt_after_d.addCallback(lambda res:
278                                              self.bounce_client(0))
279
280             # sneak into the helper and reduce its chunk size, so that our
281             # debug_interrupt will sever the connection on about the fifth
282             # chunk fetched. This makes sure that we've started to write the
283             # new shares before we abandon them, which exercises the
284             # abort/delete-partial-share code. TODO: find a cleaner way to do
285             # this. I know that this will affect later uses of the helper in
286             # this same test run, but I'm not currently worried about it.
287             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
288
289             d = self.extra_node.upload(u1)
290
291             def _should_not_finish(res):
292                 self.fail("interrupted upload should have failed, not finished"
293                           " with result %s" % (res,))
294             def _interrupted(f):
295                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
296
297                 # make sure we actually interrupted it before finishing the
298                 # file
299                 self.failUnless(u1.bytes_read < len(DATA),
300                                 "read %d out of %d total" % (u1.bytes_read,
301                                                              len(DATA)))
302
303                 log.msg("waiting for reconnect", level=log.NOISY,
304                         facility="tahoe.test.test_system")
305                 # now, we need to give the nodes a chance to notice that this
306                 # connection has gone away. When this happens, the storage
307                 # servers will be told to abort their uploads, removing the
308                 # partial shares. Unfortunately this involves TCP messages
309                 # going through the loopback interface, and we can't easily
310                 # predict how long that will take. If it were all local, we
311                 # could use fireEventually() to stall. Since we don't have
312                 # the right introduction hooks, the best we can do is use a
313                 # fixed delay. TODO: this is fragile.
314                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
315                 return u1.interrupt_after_d
316             d.addCallbacks(_should_not_finish, _interrupted)
317
318             def _disconnected(res):
319                 # check to make sure the storage servers aren't still hanging
320                 # on to the partial share: their incoming/ directories should
321                 # now be empty.
322                 log.msg("disconnected", level=log.NOISY,
323                         facility="tahoe.test.test_system")
324                 for i in range(self.numclients):
325                     incdir = os.path.join(self.getdir("client%d" % i),
326                                           "storage", "shares", "incoming")
327                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
328             d.addCallback(_disconnected)
329
330             # then we need to give the reconnector a chance to
331             # reestablish the connection to the helper.
332             d.addCallback(lambda res:
333                           log.msg("wait_for_connections", level=log.NOISY,
334                                   facility="tahoe.test.test_system"))
335             d.addCallback(lambda res: self.wait_for_connections())
336
337
338             d.addCallback(lambda res:
339                           log.msg("uploading again", level=log.NOISY,
340                                   facility="tahoe.test.test_system"))
341             d.addCallback(lambda res: self.extra_node.upload(u2))
342
343             def _uploaded(results):
344                 uri = results.uri
345                 log.msg("Second upload complete", level=log.NOISY,
346                         facility="tahoe.test.test_system")
347
348                 # this is really bytes received rather than sent, but it's
349                 # convenient and basically measures the same thing
350                 bytes_sent = results.ciphertext_fetched
351
352                 # We currently don't support resumption of upload if the data is
353                 # encrypted with a random key.  (Because that would require us
354                 # to store the key locally and re-use it on the next upload of
355                 # this file, which isn't a bad thing to do, but we currently
356                 # don't do it.)
357                 if convergence is not None:
358                     # Make sure we did not have to read the whole file the
359                     # second time around .
360                     self.failUnless(bytes_sent < len(DATA),
361                                 "resumption didn't save us any work:"
362                                 " read %d bytes out of %d total" %
363                                 (bytes_sent, len(DATA)))
364                 else:
365                     # Make sure we did have to read the whole file the second
366                     # time around -- because the one that we partially uploaded
367                     # earlier was encrypted with a different random key.
368                     self.failIf(bytes_sent < len(DATA),
369                                 "resumption saved us some work even though we were using random keys:"
370                                 " read %d bytes out of %d total" %
371                                 (bytes_sent, len(DATA)))
372                 return self.downloader.download_to_data(uri)
373             d.addCallback(_uploaded)
374
375             def _check(newdata):
376                 self.failUnlessEqual(newdata, DATA)
377                 # If using convergent encryption, then also check that the
378                 # helper has removed the temp file from its directories.
379                 if convergence is not None:
380                     basedir = os.path.join(self.getdir("client0"), "helper")
381                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
382                     self.failUnlessEqual(files, [])
383                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
384                     self.failUnlessEqual(files, [])
385             d.addCallback(_check)
386             return d
387         d.addCallback(_upload_resumable)
388
389         return d
390
391     def _find_shares(self, basedir):
392         shares = []
393         for (dirpath, dirnames, filenames) in os.walk(basedir):
394             if "storage" not in dirpath:
395                 continue
396             if not filenames:
397                 continue
398             pieces = dirpath.split(os.sep)
399             if pieces[-4] == "storage" and pieces[-3] == "shares":
400                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
401                 # are sharefiles here
402                 assert pieces[-5].startswith("client")
403                 client_num = int(pieces[-5][-1])
404                 storage_index_s = pieces[-1]
405                 storage_index = storage.si_a2b(storage_index_s)
406                 for sharename in filenames:
407                     shnum = int(sharename)
408                     filename = os.path.join(dirpath, sharename)
409                     data = (client_num, storage_index, filename, shnum)
410                     shares.append(data)
411         if not shares:
412             self.fail("unable to find any share files in %s" % basedir)
413         return shares
414
415     def _corrupt_mutable_share(self, filename, which):
416         msf = storage.MutableShareFile(filename)
417         datav = msf.readv([ (0, 1000000) ])
418         final_share = datav[0]
419         assert len(final_share) < 1000000 # ought to be truncated
420         pieces = mutable_layout.unpack_share(final_share)
421         (seqnum, root_hash, IV, k, N, segsize, datalen,
422          verification_key, signature, share_hash_chain, block_hash_tree,
423          share_data, enc_privkey) = pieces
424
425         if which == "seqnum":
426             seqnum = seqnum + 15
427         elif which == "R":
428             root_hash = self.flip_bit(root_hash)
429         elif which == "IV":
430             IV = self.flip_bit(IV)
431         elif which == "segsize":
432             segsize = segsize + 15
433         elif which == "pubkey":
434             verification_key = self.flip_bit(verification_key)
435         elif which == "signature":
436             signature = self.flip_bit(signature)
437         elif which == "share_hash_chain":
438             nodenum = share_hash_chain.keys()[0]
439             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
440         elif which == "block_hash_tree":
441             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
442         elif which == "share_data":
443             share_data = self.flip_bit(share_data)
444         elif which == "encprivkey":
445             enc_privkey = self.flip_bit(enc_privkey)
446
447         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
448                                             segsize, datalen)
449         final_share = mutable_layout.pack_share(prefix,
450                                                 verification_key,
451                                                 signature,
452                                                 share_hash_chain,
453                                                 block_hash_tree,
454                                                 share_data,
455                                                 enc_privkey)
456         msf.writev( [(0, final_share)], None)
457
458
459     def test_mutable(self):
460         self.basedir = "system/SystemTest/test_mutable"
461         DATA = "initial contents go here."  # 25 bytes % 3 != 0
462         NEWDATA = "new contents yay"
463         NEWERDATA = "this is getting old"
464
465         d = self.set_up_nodes(use_key_generator=True)
466
467         def _create_mutable(res):
468             c = self.clients[0]
469             log.msg("starting create_mutable_file")
470             d1 = c.create_mutable_file(DATA)
471             def _done(res):
472                 log.msg("DONE: %s" % (res,))
473                 self._mutable_node_1 = res
474                 uri = res.get_uri()
475             d1.addCallback(_done)
476             return d1
477         d.addCallback(_create_mutable)
478
479         def _test_debug(res):
480             # find a share. It is important to run this while there is only
481             # one slot in the grid.
482             shares = self._find_shares(self.basedir)
483             (client_num, storage_index, filename, shnum) = shares[0]
484             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
485                     % filename)
486             log.msg(" for clients[%d]" % client_num)
487
488             out,err = StringIO(), StringIO()
489             rc = runner.runner(["debug", "dump-share", "--offsets",
490                                 filename],
491                                stdout=out, stderr=err)
492             output = out.getvalue()
493             self.failUnlessEqual(rc, 0)
494             try:
495                 self.failUnless("Mutable slot found:\n" in output)
496                 self.failUnless("share_type: SDMF\n" in output)
497                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
498                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
499                 self.failUnless(" num_extra_leases: 0\n" in output)
500                 # the pubkey size can vary by a byte, so the container might
501                 # be a bit larger on some runs.
502                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
503                 self.failUnless(m)
504                 container_size = int(m.group(1))
505                 self.failUnless(2037 <= container_size <= 2049, container_size)
506                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
507                 self.failUnless(m)
508                 data_length = int(m.group(1))
509                 self.failUnless(2037 <= data_length <= 2049, data_length)
510                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
511                                 in output)
512                 self.failUnless(" SDMF contents:\n" in output)
513                 self.failUnless("  seqnum: 1\n" in output)
514                 self.failUnless("  required_shares: 3\n" in output)
515                 self.failUnless("  total_shares: 10\n" in output)
516                 self.failUnless("  segsize: 27\n" in output, (output, filename))
517                 self.failUnless("  datalen: 25\n" in output)
518                 # the exact share_hash_chain nodes depends upon the sharenum,
519                 # and is more of a hassle to compute than I want to deal with
520                 # now
521                 self.failUnless("  share_hash_chain: " in output)
522                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
523                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
524                             base32.b2a(storage_index))
525                 self.failUnless(expected in output)
526             except unittest.FailTest:
527                 print
528                 print "dump-share output was:"
529                 print output
530                 raise
531         d.addCallback(_test_debug)
532
533         # test retrieval
534
535         # first, let's see if we can use the existing node to retrieve the
536         # contents. This allows it to use the cached pubkey and maybe the
537         # latest-known sharemap.
538
539         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
540         def _check_download_1(res):
541             self.failUnlessEqual(res, DATA)
542             # now we see if we can retrieve the data from a new node,
543             # constructed using the URI of the original one. We do this test
544             # on the same client that uploaded the data.
545             uri = self._mutable_node_1.get_uri()
546             log.msg("starting retrieve1")
547             newnode = self.clients[0].create_node_from_uri(uri)
548             newnode_2 = self.clients[0].create_node_from_uri(uri)
549             self.failUnlessIdentical(newnode, newnode_2)
550             return newnode.download_best_version()
551         d.addCallback(_check_download_1)
552
553         def _check_download_2(res):
554             self.failUnlessEqual(res, DATA)
555             # same thing, but with a different client
556             uri = self._mutable_node_1.get_uri()
557             newnode = self.clients[1].create_node_from_uri(uri)
558             log.msg("starting retrieve2")
559             d1 = newnode.download_best_version()
560             d1.addCallback(lambda res: (res, newnode))
561             return d1
562         d.addCallback(_check_download_2)
563
564         def _check_download_3((res, newnode)):
565             self.failUnlessEqual(res, DATA)
566             # replace the data
567             log.msg("starting replace1")
568             d1 = newnode.overwrite(NEWDATA)
569             d1.addCallback(lambda res: newnode.download_best_version())
570             return d1
571         d.addCallback(_check_download_3)
572
573         def _check_download_4(res):
574             self.failUnlessEqual(res, NEWDATA)
575             # now create an even newer node and replace the data on it. This
576             # new node has never been used for download before.
577             uri = self._mutable_node_1.get_uri()
578             newnode1 = self.clients[2].create_node_from_uri(uri)
579             newnode2 = self.clients[3].create_node_from_uri(uri)
580             self._newnode3 = self.clients[3].create_node_from_uri(uri)
581             log.msg("starting replace2")
582             d1 = newnode1.overwrite(NEWERDATA)
583             d1.addCallback(lambda res: newnode2.download_best_version())
584             return d1
585         d.addCallback(_check_download_4)
586
587         def _check_download_5(res):
588             log.msg("finished replace2")
589             self.failUnlessEqual(res, NEWERDATA)
590         d.addCallback(_check_download_5)
591
592         def _corrupt_shares(res):
593             # run around and flip bits in all but k of the shares, to test
594             # the hash checks
595             shares = self._find_shares(self.basedir)
596             ## sort by share number
597             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
598             where = dict([ (shnum, filename)
599                            for (client_num, storage_index, filename, shnum)
600                            in shares ])
601             assert len(where) == 10 # this test is designed for 3-of-10
602             for shnum, filename in where.items():
603                 # shares 7,8,9 are left alone. read will check
604                 # (share_hash_chain, block_hash_tree, share_data). New
605                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
606                 # segsize, signature).
607                 if shnum == 0:
608                     # read: this will trigger "pubkey doesn't match
609                     # fingerprint".
610                     self._corrupt_mutable_share(filename, "pubkey")
611                     self._corrupt_mutable_share(filename, "encprivkey")
612                 elif shnum == 1:
613                     # triggers "signature is invalid"
614                     self._corrupt_mutable_share(filename, "seqnum")
615                 elif shnum == 2:
616                     # triggers "signature is invalid"
617                     self._corrupt_mutable_share(filename, "R")
618                 elif shnum == 3:
619                     # triggers "signature is invalid"
620                     self._corrupt_mutable_share(filename, "segsize")
621                 elif shnum == 4:
622                     self._corrupt_mutable_share(filename, "share_hash_chain")
623                 elif shnum == 5:
624                     self._corrupt_mutable_share(filename, "block_hash_tree")
625                 elif shnum == 6:
626                     self._corrupt_mutable_share(filename, "share_data")
627                 # other things to correct: IV, signature
628                 # 7,8,9 are left alone
629
630                 # note that initial_query_count=5 means that we'll hit the
631                 # first 5 servers in effectively random order (based upon
632                 # response time), so we won't necessarily ever get a "pubkey
633                 # doesn't match fingerprint" error (if we hit shnum>=1 before
634                 # shnum=0, we pull the pubkey from there). To get repeatable
635                 # specific failures, we need to set initial_query_count=1,
636                 # but of course that will change the sequencing behavior of
637                 # the retrieval process. TODO: find a reasonable way to make
638                 # this a parameter, probably when we expand this test to test
639                 # for one failure mode at a time.
640
641                 # when we retrieve this, we should get three signature
642                 # failures (where we've mangled seqnum, R, and segsize). The
643                 # pubkey mangling
644         d.addCallback(_corrupt_shares)
645
646         d.addCallback(lambda res: self._newnode3.download_best_version())
647         d.addCallback(_check_download_5)
648
649         def _check_empty_file(res):
650             # make sure we can create empty files, this usually screws up the
651             # segsize math
652             d1 = self.clients[2].create_mutable_file("")
653             d1.addCallback(lambda newnode: newnode.download_best_version())
654             d1.addCallback(lambda res: self.failUnlessEqual("", res))
655             return d1
656         d.addCallback(_check_empty_file)
657
658         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
659         def _created_dirnode(dnode):
660             log.msg("_created_dirnode(%s)" % (dnode,))
661             d1 = dnode.list()
662             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
663             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
664             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
665             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
666             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
667             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
668             d1.addCallback(lambda res: dnode.build_manifest().when_done())
669             d1.addCallback(lambda manifest:
670                            self.failUnlessEqual(len(manifest), 1))
671             return d1
672         d.addCallback(_created_dirnode)
673
674         def wait_for_c3_kg_conn():
675             return self.clients[3]._key_generator is not None
676         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
677
678         def check_kg_poolsize(junk, size_delta):
679             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
680                                  self.key_generator_svc.key_generator.pool_size + size_delta)
681
682         d.addCallback(check_kg_poolsize, 0)
683         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
684         d.addCallback(check_kg_poolsize, -1)
685         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
686         d.addCallback(check_kg_poolsize, -2)
687         # use_helper induces use of clients[3], which is the using-key_gen client
688         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
689         d.addCallback(check_kg_poolsize, -3)
690
691         return d
692     # The default 120 second timeout went off when running it under valgrind
693     # on my old Windows laptop, so I'm bumping up the timeout.
694     test_mutable.timeout = 240
695
696     def flip_bit(self, good):
697         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
698
699     def mangle_uri(self, gooduri):
700         # change the key, which changes the storage index, which means we'll
701         # be asking about the wrong file, so nobody will have any shares
702         u = IFileURI(gooduri)
703         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
704                             uri_extension_hash=u.uri_extension_hash,
705                             needed_shares=u.needed_shares,
706                             total_shares=u.total_shares,
707                             size=u.size)
708         return u2.to_string()
709
710     # TODO: add a test which mangles the uri_extension_hash instead, and
711     # should fail due to not being able to get a valid uri_extension block.
712     # Also a test which sneakily mangles the uri_extension block to change
713     # some of the validation data, so it will fail in the post-download phase
714     # when the file's crypttext integrity check fails. Do the same thing for
715     # the key, which should cause the download to fail the post-download
716     # plaintext_hash check.
717
718     def test_vdrive(self):
719         self.basedir = "system/SystemTest/test_vdrive"
720         self.data = LARGE_DATA
721         d = self.set_up_nodes(use_stats_gatherer=True)
722         d.addCallback(self._test_introweb)
723         d.addCallback(self.log, "starting publish")
724         d.addCallback(self._do_publish1)
725         d.addCallback(self._test_runner)
726         d.addCallback(self._do_publish2)
727         # at this point, we have the following filesystem (where "R" denotes
728         # self._root_directory_uri):
729         # R
730         # R/subdir1
731         # R/subdir1/mydata567
732         # R/subdir1/subdir2/
733         # R/subdir1/subdir2/mydata992
734
735         d.addCallback(lambda res: self.bounce_client(0))
736         d.addCallback(self.log, "bounced client0")
737
738         d.addCallback(self._check_publish1)
739         d.addCallback(self.log, "did _check_publish1")
740         d.addCallback(self._check_publish2)
741         d.addCallback(self.log, "did _check_publish2")
742         d.addCallback(self._do_publish_private)
743         d.addCallback(self.log, "did _do_publish_private")
744         # now we also have (where "P" denotes a new dir):
745         #  P/personal/sekrit data
746         #  P/s2-rw -> /subdir1/subdir2/
747         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
748         d.addCallback(self._check_publish_private)
749         d.addCallback(self.log, "did _check_publish_private")
750         d.addCallback(self._test_web)
751         d.addCallback(self._test_control)
752         d.addCallback(self._test_cli)
753         # P now has four top-level children:
754         # P/personal/sekrit data
755         # P/s2-ro/
756         # P/s2-rw/
757         # P/test_put/  (empty)
758         d.addCallback(self._test_checker)
759         d.addCallback(self._grab_stats)
760         return d
761     test_vdrive.timeout = 1100
762
763     def _test_introweb(self, res):
764         d = getPage(self.introweb_url, method="GET", followRedirect=True)
765         def _check(res):
766             try:
767                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
768                                 in res)
769                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
770                 self.failUnless("Subscription Summary: storage: 5" in res)
771             except unittest.FailTest:
772                 print
773                 print "GET %s output was:" % self.introweb_url
774                 print res
775                 raise
776         d.addCallback(_check)
777         d.addCallback(lambda res:
778                       getPage(self.introweb_url + "?t=json",
779                               method="GET", followRedirect=True))
780         def _check_json(res):
781             data = simplejson.loads(res)
782             try:
783                 self.failUnlessEqual(data["subscription_summary"],
784                                      {"storage": 5})
785                 self.failUnlessEqual(data["announcement_summary"],
786                                      {"storage": 5, "stub_client": 5})
787             except unittest.FailTest:
788                 print
789                 print "GET %s?t=json output was:" % self.introweb_url
790                 print res
791                 raise
792         d.addCallback(_check_json)
793         return d
794
795     def _do_publish1(self, res):
796         ut = upload.Data(self.data, convergence=None)
797         c0 = self.clients[0]
798         d = c0.create_empty_dirnode()
799         def _made_root(new_dirnode):
800             self._root_directory_uri = new_dirnode.get_uri()
801             return c0.create_node_from_uri(self._root_directory_uri)
802         d.addCallback(_made_root)
803         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
804         def _made_subdir1(subdir1_node):
805             self._subdir1_node = subdir1_node
806             d1 = subdir1_node.add_file(u"mydata567", ut)
807             d1.addCallback(self.log, "publish finished")
808             def _stash_uri(filenode):
809                 self.uri = filenode.get_uri()
810             d1.addCallback(_stash_uri)
811             return d1
812         d.addCallback(_made_subdir1)
813         return d
814
815     def _do_publish2(self, res):
816         ut = upload.Data(self.data, convergence=None)
817         d = self._subdir1_node.create_empty_directory(u"subdir2")
818         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
819         return d
820
821     def log(self, res, *args, **kwargs):
822         # print "MSG: %s  RES: %s" % (msg, args)
823         log.msg(*args, **kwargs)
824         return res
825
826     def _do_publish_private(self, res):
827         self.smalldata = "sssh, very secret stuff"
828         ut = upload.Data(self.smalldata, convergence=None)
829         d = self.clients[0].create_empty_dirnode()
830         d.addCallback(self.log, "GOT private directory")
831         def _got_new_dir(privnode):
832             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
833             d1 = privnode.create_empty_directory(u"personal")
834             d1.addCallback(self.log, "made P/personal")
835             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
836             d1.addCallback(self.log, "made P/personal/sekrit data")
837             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
838             def _got_s2(s2node):
839                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
840                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
841                 return d2
842             d1.addCallback(_got_s2)
843             d1.addCallback(lambda res: privnode)
844             return d1
845         d.addCallback(_got_new_dir)
846         return d
847
848     def _check_publish1(self, res):
849         # this one uses the iterative API
850         c1 = self.clients[1]
851         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
852         d.addCallback(self.log, "check_publish1 got /")
853         d.addCallback(lambda root: root.get(u"subdir1"))
854         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
855         d.addCallback(lambda filenode: filenode.download_to_data())
856         d.addCallback(self.log, "get finished")
857         def _get_done(data):
858             self.failUnlessEqual(data, self.data)
859         d.addCallback(_get_done)
860         return d
861
862     def _check_publish2(self, res):
863         # this one uses the path-based API
864         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
865         d = rootnode.get_child_at_path(u"subdir1")
866         d.addCallback(lambda dirnode:
867                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
868         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
869         d.addCallback(lambda filenode: filenode.download_to_data())
870         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
871
872         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
873         def _got_filenode(filenode):
874             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
875             assert fnode == filenode
876         d.addCallback(_got_filenode)
877         return d
878
879     def _check_publish_private(self, resnode):
880         # this one uses the path-based API
881         self._private_node = resnode
882
883         d = self._private_node.get_child_at_path(u"personal")
884         def _got_personal(personal):
885             self._personal_node = personal
886             return personal
887         d.addCallback(_got_personal)
888
889         d.addCallback(lambda dirnode:
890                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
891         def get_path(path):
892             return self._private_node.get_child_at_path(path)
893
894         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
895         d.addCallback(lambda filenode: filenode.download_to_data())
896         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
897         d.addCallback(lambda res: get_path(u"s2-rw"))
898         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
899         d.addCallback(lambda res: get_path(u"s2-ro"))
900         def _got_s2ro(dirnode):
901             self.failUnless(dirnode.is_mutable(), dirnode)
902             self.failUnless(dirnode.is_readonly(), dirnode)
903             d1 = defer.succeed(None)
904             d1.addCallback(lambda res: dirnode.list())
905             d1.addCallback(self.log, "dirnode.list")
906
907             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
908
909             d1.addCallback(self.log, "doing add_file(ro)")
910             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
911             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
912
913             d1.addCallback(self.log, "doing get(ro)")
914             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
915             d1.addCallback(lambda filenode:
916                            self.failUnless(IFileNode.providedBy(filenode)))
917
918             d1.addCallback(self.log, "doing delete(ro)")
919             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
920
921             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
922
923             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
924
925             personal = self._personal_node
926             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
927
928             d1.addCallback(self.log, "doing move_child_to(ro)2")
929             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
930
931             d1.addCallback(self.log, "finished with _got_s2ro")
932             return d1
933         d.addCallback(_got_s2ro)
934         def _got_home(dummy):
935             home = self._private_node
936             personal = self._personal_node
937             d1 = defer.succeed(None)
938             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
939             d1.addCallback(lambda res:
940                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
941
942             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
943             d1.addCallback(lambda res:
944                            home.move_child_to(u"sekrit", home, u"sekrit data"))
945
946             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
947             d1.addCallback(lambda res:
948                            home.move_child_to(u"sekrit data", personal))
949
950             d1.addCallback(lambda res: home.build_manifest().when_done())
951             d1.addCallback(self.log, "manifest")
952             #  five items:
953             # P/
954             # P/personal/
955             # P/personal/sekrit data
956             # P/s2-rw  (same as P/s2-ro)
957             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
958             d1.addCallback(lambda manifest:
959                            self.failUnlessEqual(len(manifest), 5))
960             d1.addCallback(lambda res: home.start_deep_stats().when_done())
961             def _check_stats(stats):
962                 expected = {"count-immutable-files": 1,
963                             "count-mutable-files": 0,
964                             "count-literal-files": 1,
965                             "count-files": 2,
966                             "count-directories": 3,
967                             "size-immutable-files": 112,
968                             "size-literal-files": 23,
969                             #"size-directories": 616, # varies
970                             #"largest-directory": 616,
971                             "largest-directory-children": 3,
972                             "largest-immutable-file": 112,
973                             }
974                 for k,v in expected.iteritems():
975                     self.failUnlessEqual(stats[k], v,
976                                          "stats[%s] was %s, not %s" %
977                                          (k, stats[k], v))
978                 self.failUnless(stats["size-directories"] > 1300,
979                                 stats["size-directories"])
980                 self.failUnless(stats["largest-directory"] > 800,
981                                 stats["largest-directory"])
982                 self.failUnlessEqual(stats["size-files-histogram"],
983                                      [ (11, 31, 1), (101, 316, 1) ])
984             d1.addCallback(_check_stats)
985             return d1
986         d.addCallback(_got_home)
987         return d
988
989     def shouldFail(self, res, expected_failure, which, substring=None):
990         if isinstance(res, Failure):
991             res.trap(expected_failure)
992             if substring:
993                 self.failUnless(substring in str(res),
994                                 "substring '%s' not in '%s'"
995                                 % (substring, str(res)))
996         else:
997             self.fail("%s was supposed to raise %s, not get '%s'" %
998                       (which, expected_failure, res))
999
1000     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1001         assert substring is None or isinstance(substring, str)
1002         d = defer.maybeDeferred(callable, *args, **kwargs)
1003         def done(res):
1004             if isinstance(res, Failure):
1005                 res.trap(expected_failure)
1006                 if substring:
1007                     self.failUnless(substring in str(res),
1008                                     "substring '%s' not in '%s'"
1009                                     % (substring, str(res)))
1010             else:
1011                 self.fail("%s was supposed to raise %s, not get '%s'" %
1012                           (which, expected_failure, res))
1013         d.addBoth(done)
1014         return d
1015
1016     def PUT(self, urlpath, data):
1017         url = self.webish_url + urlpath
1018         return getPage(url, method="PUT", postdata=data)
1019
1020     def GET(self, urlpath, followRedirect=False):
1021         url = self.webish_url + urlpath
1022         return getPage(url, method="GET", followRedirect=followRedirect)
1023
1024     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1025         if use_helper:
1026             url = self.helper_webish_url + urlpath
1027         else:
1028             url = self.webish_url + urlpath
1029         sepbase = "boogabooga"
1030         sep = "--" + sepbase
1031         form = []
1032         form.append(sep)
1033         form.append('Content-Disposition: form-data; name="_charset"')
1034         form.append('')
1035         form.append('UTF-8')
1036         form.append(sep)
1037         for name, value in fields.iteritems():
1038             if isinstance(value, tuple):
1039                 filename, value = value
1040                 form.append('Content-Disposition: form-data; name="%s"; '
1041                             'filename="%s"' % (name, filename.encode("utf-8")))
1042             else:
1043                 form.append('Content-Disposition: form-data; name="%s"' % name)
1044             form.append('')
1045             form.append(str(value))
1046             form.append(sep)
1047         form[-1] += "--"
1048         body = "\r\n".join(form) + "\r\n"
1049         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1050                    }
1051         return getPage(url, method="POST", postdata=body,
1052                        headers=headers, followRedirect=followRedirect)
1053
1054     def _test_web(self, res):
1055         base = self.webish_url
1056         public = "uri/" + self._root_directory_uri
1057         d = getPage(base)
1058         def _got_welcome(page):
1059             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1060             self.failUnless(expected in page,
1061                             "I didn't see the right 'connected storage servers'"
1062                             " message in: %s" % page
1063                             )
1064             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1065             self.failUnless(expected in page,
1066                             "I didn't see the right 'My nodeid' message "
1067                             "in: %s" % page)
1068             self.failUnless("Helper: 0 active uploads" in page)
1069         d.addCallback(_got_welcome)
1070         d.addCallback(self.log, "done with _got_welcome")
1071
1072         # get the welcome page from the node that uses the helper too
1073         d.addCallback(lambda res: getPage(self.helper_webish_url))
1074         def _got_welcome_helper(page):
1075             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1076                             page)
1077             self.failUnless("Not running helper" in page)
1078         d.addCallback(_got_welcome_helper)
1079
1080         d.addCallback(lambda res: getPage(base + public))
1081         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1082         def _got_subdir1(page):
1083             # there ought to be an href for our file
1084             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1085             self.failUnless(">mydata567</a>" in page)
1086         d.addCallback(_got_subdir1)
1087         d.addCallback(self.log, "done with _got_subdir1")
1088         d.addCallback(lambda res:
1089                       getPage(base + public + "/subdir1/mydata567"))
1090         def _got_data(page):
1091             self.failUnlessEqual(page, self.data)
1092         d.addCallback(_got_data)
1093
1094         # download from a URI embedded in a URL
1095         d.addCallback(self.log, "_get_from_uri")
1096         def _get_from_uri(res):
1097             return getPage(base + "uri/%s?filename=%s"
1098                            % (self.uri, "mydata567"))
1099         d.addCallback(_get_from_uri)
1100         def _got_from_uri(page):
1101             self.failUnlessEqual(page, self.data)
1102         d.addCallback(_got_from_uri)
1103
1104         # download from a URI embedded in a URL, second form
1105         d.addCallback(self.log, "_get_from_uri2")
1106         def _get_from_uri2(res):
1107             return getPage(base + "uri?uri=%s" % (self.uri,))
1108         d.addCallback(_get_from_uri2)
1109         d.addCallback(_got_from_uri)
1110
1111         # download from a bogus URI, make sure we get a reasonable error
1112         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1113         def _get_from_bogus_uri(res):
1114             d1 = getPage(base + "uri/%s?filename=%s"
1115                          % (self.mangle_uri(self.uri), "mydata567"))
1116             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1117                        "410")
1118             return d1
1119         d.addCallback(_get_from_bogus_uri)
1120         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1121
1122         # upload a file with PUT
1123         d.addCallback(self.log, "about to try PUT")
1124         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1125                                            "new.txt contents"))
1126         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1127         d.addCallback(self.failUnlessEqual, "new.txt contents")
1128         # and again with something large enough to use multiple segments,
1129         # and hopefully trigger pauseProducing too
1130         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1131                                            "big" * 500000)) # 1.5MB
1132         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1133         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1134
1135         # can we replace files in place?
1136         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1137                                            "NEWER contents"))
1138         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1139         d.addCallback(self.failUnlessEqual, "NEWER contents")
1140
1141         # test unlinked POST
1142         d.addCallback(lambda res: self.POST("uri", t="upload",
1143                                             file=("new.txt", "data" * 10000)))
1144         # and again using the helper, which exercises different upload-status
1145         # display code
1146         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1147                                             file=("foo.txt", "data2" * 10000)))
1148
1149         # check that the status page exists
1150         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1151         def _got_status(res):
1152             # find an interesting upload and download to look at. LIT files
1153             # are not interesting.
1154             for ds in self.clients[0].list_all_download_statuses():
1155                 if ds.get_size() > 200:
1156                     self._down_status = ds.get_counter()
1157             for us in self.clients[0].list_all_upload_statuses():
1158                 if us.get_size() > 200:
1159                     self._up_status = us.get_counter()
1160             rs = self.clients[0].list_all_retrieve_statuses()[0]
1161             self._retrieve_status = rs.get_counter()
1162             ps = self.clients[0].list_all_publish_statuses()[0]
1163             self._publish_status = ps.get_counter()
1164             us = self.clients[0].list_all_mapupdate_statuses()[0]
1165             self._update_status = us.get_counter()
1166
1167             # and that there are some upload- and download- status pages
1168             return self.GET("status/up-%d" % self._up_status)
1169         d.addCallback(_got_status)
1170         def _got_up(res):
1171             return self.GET("status/down-%d" % self._down_status)
1172         d.addCallback(_got_up)
1173         def _got_down(res):
1174             return self.GET("status/mapupdate-%d" % self._update_status)
1175         d.addCallback(_got_down)
1176         def _got_update(res):
1177             return self.GET("status/publish-%d" % self._publish_status)
1178         d.addCallback(_got_update)
1179         def _got_publish(res):
1180             return self.GET("status/retrieve-%d" % self._retrieve_status)
1181         d.addCallback(_got_publish)
1182
1183         # check that the helper status page exists
1184         d.addCallback(lambda res:
1185                       self.GET("helper_status", followRedirect=True))
1186         def _got_helper_status(res):
1187             self.failUnless("Bytes Fetched:" in res)
1188             # touch a couple of files in the helper's working directory to
1189             # exercise more code paths
1190             workdir = os.path.join(self.getdir("client0"), "helper")
1191             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1192             f = open(incfile, "wb")
1193             f.write("small file")
1194             f.close()
1195             then = time.time() - 86400*3
1196             now = time.time()
1197             os.utime(incfile, (now, then))
1198             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1199             f = open(encfile, "wb")
1200             f.write("less small file")
1201             f.close()
1202             os.utime(encfile, (now, then))
1203         d.addCallback(_got_helper_status)
1204         # and that the json form exists
1205         d.addCallback(lambda res:
1206                       self.GET("helper_status?t=json", followRedirect=True))
1207         def _got_helper_status_json(res):
1208             data = simplejson.loads(res)
1209             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1210                                  1)
1211             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1212             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1213             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1214                                  10)
1215             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1216             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1217             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1218                                  15)
1219         d.addCallback(_got_helper_status_json)
1220
1221         # and check that client[3] (which uses a helper but does not run one
1222         # itself) doesn't explode when you ask for its status
1223         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1224         def _got_non_helper_status(res):
1225             self.failUnless("Upload and Download Status" in res)
1226         d.addCallback(_got_non_helper_status)
1227
1228         # or for helper status with t=json
1229         d.addCallback(lambda res:
1230                       getPage(self.helper_webish_url + "helper_status?t=json"))
1231         def _got_non_helper_status_json(res):
1232             data = simplejson.loads(res)
1233             self.failUnlessEqual(data, {})
1234         d.addCallback(_got_non_helper_status_json)
1235
1236         # see if the statistics page exists
1237         d.addCallback(lambda res: self.GET("statistics"))
1238         def _got_stats(res):
1239             self.failUnless("Node Statistics" in res)
1240             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1241         d.addCallback(_got_stats)
1242         d.addCallback(lambda res: self.GET("statistics?t=json"))
1243         def _got_stats_json(res):
1244             data = simplejson.loads(res)
1245             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1246             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1247         d.addCallback(_got_stats_json)
1248
1249         # TODO: mangle the second segment of a file, to test errors that
1250         # occur after we've already sent some good data, which uses a
1251         # different error path.
1252
1253         # TODO: download a URI with a form
1254         # TODO: create a directory by using a form
1255         # TODO: upload by using a form on the directory page
1256         #    url = base + "somedir/subdir1/freeform_post!!upload"
1257         # TODO: delete a file by using a button on the directory page
1258
1259         return d
1260
1261     def _test_runner(self, res):
1262         # exercise some of the diagnostic tools in runner.py
1263
1264         # find a share
1265         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1266             if "storage" not in dirpath:
1267                 continue
1268             if not filenames:
1269                 continue
1270             pieces = dirpath.split(os.sep)
1271             if pieces[-4] == "storage" and pieces[-3] == "shares":
1272                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1273                 # are sharefiles here
1274                 filename = os.path.join(dirpath, filenames[0])
1275                 # peek at the magic to see if it is a chk share
1276                 magic = open(filename, "rb").read(4)
1277                 if magic == '\x00\x00\x00\x01':
1278                     break
1279         else:
1280             self.fail("unable to find any uri_extension files in %s"
1281                       % self.basedir)
1282         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1283
1284         out,err = StringIO(), StringIO()
1285         rc = runner.runner(["debug", "dump-share", "--offsets",
1286                             filename],
1287                            stdout=out, stderr=err)
1288         output = out.getvalue()
1289         self.failUnlessEqual(rc, 0)
1290
1291         # we only upload a single file, so we can assert some things about
1292         # its size and shares.
1293         self.failUnless(("share filename: %s" % filename) in output)
1294         self.failUnless("size: %d\n" % len(self.data) in output)
1295         self.failUnless("num_segments: 1\n" in output)
1296         # segment_size is always a multiple of needed_shares
1297         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1298         self.failUnless("total_shares: 10\n" in output)
1299         # keys which are supposed to be present
1300         for key in ("size", "num_segments", "segment_size",
1301                     "needed_shares", "total_shares",
1302                     "codec_name", "codec_params", "tail_codec_params",
1303                     #"plaintext_hash", "plaintext_root_hash",
1304                     "crypttext_hash", "crypttext_root_hash",
1305                     "share_root_hash", "UEB_hash"):
1306             self.failUnless("%s: " % key in output, key)
1307         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1308
1309         # now use its storage index to find the other shares using the
1310         # 'find-shares' tool
1311         sharedir, shnum = os.path.split(filename)
1312         storagedir, storage_index_s = os.path.split(sharedir)
1313         out,err = StringIO(), StringIO()
1314         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1315         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1316         rc = runner.runner(cmd, stdout=out, stderr=err)
1317         self.failUnlessEqual(rc, 0)
1318         out.seek(0)
1319         sharefiles = [sfn.strip() for sfn in out.readlines()]
1320         self.failUnlessEqual(len(sharefiles), 10)
1321
1322         # also exercise the 'catalog-shares' tool
1323         out,err = StringIO(), StringIO()
1324         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1325         cmd = ["debug", "catalog-shares"] + nodedirs
1326         rc = runner.runner(cmd, stdout=out, stderr=err)
1327         self.failUnlessEqual(rc, 0)
1328         out.seek(0)
1329         descriptions = [sfn.strip() for sfn in out.readlines()]
1330         self.failUnlessEqual(len(descriptions), 30)
1331         matching = [line
1332                     for line in descriptions
1333                     if line.startswith("CHK %s " % storage_index_s)]
1334         self.failUnlessEqual(len(matching), 10)
1335
1336     def _test_control(self, res):
1337         # exercise the remote-control-the-client foolscap interfaces in
1338         # allmydata.control (mostly used for performance tests)
1339         c0 = self.clients[0]
1340         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1341         control_furl = open(control_furl_file, "r").read().strip()
1342         # it doesn't really matter which Tub we use to connect to the client,
1343         # so let's just use our IntroducerNode's
1344         d = self.introducer.tub.getReference(control_furl)
1345         d.addCallback(self._test_control2, control_furl_file)
1346         return d
1347     def _test_control2(self, rref, filename):
1348         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1349         downfile = os.path.join(self.basedir, "control.downfile")
1350         d.addCallback(lambda uri:
1351                       rref.callRemote("download_from_uri_to_file",
1352                                       uri, downfile))
1353         def _check(res):
1354             self.failUnlessEqual(res, downfile)
1355             data = open(downfile, "r").read()
1356             expected_data = open(filename, "r").read()
1357             self.failUnlessEqual(data, expected_data)
1358         d.addCallback(_check)
1359         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1360         if sys.platform == "linux2":
1361             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1362         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1363         return d
1364
1365     def _test_cli(self, res):
1366         # run various CLI commands (in a thread, since they use blocking
1367         # network calls)
1368
1369         private_uri = self._private_node.get_uri()
1370         some_uri = self._root_directory_uri
1371         client0_basedir = self.getdir("client0")
1372
1373         nodeargs = [
1374             "--node-directory", client0_basedir,
1375             ]
1376         TESTDATA = "I will not write the same thing over and over.\n" * 100
1377
1378         d = defer.succeed(None)
1379
1380         # for compatibility with earlier versions, private/root_dir.cap is
1381         # supposed to be treated as an alias named "tahoe:". Start by making
1382         # sure that works, before we add other aliases.
1383
1384         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1385         f = open(root_file, "w")
1386         f.write(private_uri)
1387         f.close()
1388
1389         def run(ignored, verb, *args, **kwargs):
1390             stdin = kwargs.get("stdin", "")
1391             newargs = [verb] + nodeargs + list(args)
1392             return self._run_cli(newargs, stdin=stdin)
1393
1394         def _check_ls((out,err), expected_children, unexpected_children=[]):
1395             self.failUnlessEqual(err, "")
1396             for s in expected_children:
1397                 self.failUnless(s in out, (s,out))
1398             for s in unexpected_children:
1399                 self.failIf(s in out, (s,out))
1400
1401         def _check_ls_root((out,err)):
1402             self.failUnless("personal" in out)
1403             self.failUnless("s2-ro" in out)
1404             self.failUnless("s2-rw" in out)
1405             self.failUnlessEqual(err, "")
1406
1407         # this should reference private_uri
1408         d.addCallback(run, "ls")
1409         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1410
1411         d.addCallback(run, "list-aliases")
1412         def _check_aliases_1((out,err)):
1413             self.failUnlessEqual(err, "")
1414             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1415         d.addCallback(_check_aliases_1)
1416
1417         # now that that's out of the way, remove root_dir.cap and work with
1418         # new files
1419         d.addCallback(lambda res: os.unlink(root_file))
1420         d.addCallback(run, "list-aliases")
1421         def _check_aliases_2((out,err)):
1422             self.failUnlessEqual(err, "")
1423             self.failUnlessEqual(out, "")
1424         d.addCallback(_check_aliases_2)
1425
1426         d.addCallback(run, "mkdir")
1427         def _got_dir( (out,err) ):
1428             self.failUnless(uri.from_string_dirnode(out.strip()))
1429             return out.strip()
1430         d.addCallback(_got_dir)
1431         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1432
1433         d.addCallback(run, "list-aliases")
1434         def _check_aliases_3((out,err)):
1435             self.failUnlessEqual(err, "")
1436             self.failUnless("tahoe: " in out)
1437         d.addCallback(_check_aliases_3)
1438
1439         def _check_empty_dir((out,err)):
1440             self.failUnlessEqual(out, "")
1441             self.failUnlessEqual(err, "")
1442         d.addCallback(run, "ls")
1443         d.addCallback(_check_empty_dir)
1444
1445         def _check_missing_dir((out,err)):
1446             # TODO: check that rc==2
1447             self.failUnlessEqual(out, "")
1448             self.failUnlessEqual(err, "No such file or directory\n")
1449         d.addCallback(run, "ls", "bogus")
1450         d.addCallback(_check_missing_dir)
1451
1452         files = []
1453         datas = []
1454         for i in range(10):
1455             fn = os.path.join(self.basedir, "file%d" % i)
1456             files.append(fn)
1457             data = "data to be uploaded: file%d\n" % i
1458             datas.append(data)
1459             open(fn,"wb").write(data)
1460
1461         def _check_stdout_against((out,err), filenum=None, data=None):
1462             self.failUnlessEqual(err, "")
1463             if filenum is not None:
1464                 self.failUnlessEqual(out, datas[filenum])
1465             if data is not None:
1466                 self.failUnlessEqual(out, data)
1467
1468         # test all both forms of put: from a file, and from stdin
1469         #  tahoe put bar FOO
1470         d.addCallback(run, "put", files[0], "tahoe-file0")
1471         def _put_out((out,err)):
1472             self.failUnless("URI:LIT:" in out, out)
1473             self.failUnless("201 Created" in err, err)
1474             uri0 = out.strip()
1475             return run(None, "get", uri0)
1476         d.addCallback(_put_out)
1477         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1478
1479         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1480         #  tahoe put bar tahoe:FOO
1481         d.addCallback(run, "put", files[2], "tahoe:file2")
1482         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1483         def _check_put_mutable((out,err)):
1484             self._mutable_file3_uri = out.strip()
1485         d.addCallback(_check_put_mutable)
1486         d.addCallback(run, "get", "tahoe:file3")
1487         d.addCallback(_check_stdout_against, 3)
1488
1489         #  tahoe put FOO
1490         STDIN_DATA = "This is the file to upload from stdin."
1491         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1492         #  tahoe put tahoe:FOO
1493         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1494                       stdin="Other file from stdin.")
1495
1496         d.addCallback(run, "ls")
1497         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1498                                   "tahoe-file-stdin", "from-stdin"])
1499         d.addCallback(run, "ls", "subdir")
1500         d.addCallback(_check_ls, ["tahoe-file1"])
1501
1502         # tahoe mkdir FOO
1503         d.addCallback(run, "mkdir", "subdir2")
1504         d.addCallback(run, "ls")
1505         # TODO: extract the URI, set an alias with it
1506         d.addCallback(_check_ls, ["subdir2"])
1507
1508         # tahoe get: (to stdin and to a file)
1509         d.addCallback(run, "get", "tahoe-file0")
1510         d.addCallback(_check_stdout_against, 0)
1511         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1512         d.addCallback(_check_stdout_against, 1)
1513         outfile0 = os.path.join(self.basedir, "outfile0")
1514         d.addCallback(run, "get", "file2", outfile0)
1515         def _check_outfile0((out,err)):
1516             data = open(outfile0,"rb").read()
1517             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1518         d.addCallback(_check_outfile0)
1519         outfile1 = os.path.join(self.basedir, "outfile0")
1520         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1521         def _check_outfile1((out,err)):
1522             data = open(outfile1,"rb").read()
1523             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1524         d.addCallback(_check_outfile1)
1525
1526         d.addCallback(run, "rm", "tahoe-file0")
1527         d.addCallback(run, "rm", "tahoe:file2")
1528         d.addCallback(run, "ls")
1529         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1530
1531         d.addCallback(run, "ls", "-l")
1532         def _check_ls_l((out,err)):
1533             lines = out.split("\n")
1534             for l in lines:
1535                 if "tahoe-file-stdin" in l:
1536                     self.failUnless(l.startswith("-r-- "), l)
1537                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1538                 if "file3" in l:
1539                     self.failUnless(l.startswith("-rw- "), l) # mutable
1540         d.addCallback(_check_ls_l)
1541
1542         d.addCallback(run, "ls", "--uri")
1543         def _check_ls_uri((out,err)):
1544             lines = out.split("\n")
1545             for l in lines:
1546                 if "file3" in l:
1547                     self.failUnless(self._mutable_file3_uri in l)
1548         d.addCallback(_check_ls_uri)
1549
1550         d.addCallback(run, "ls", "--readonly-uri")
1551         def _check_ls_rouri((out,err)):
1552             lines = out.split("\n")
1553             for l in lines:
1554                 if "file3" in l:
1555                     rw_uri = self._mutable_file3_uri
1556                     u = uri.from_string_mutable_filenode(rw_uri)
1557                     ro_uri = u.get_readonly().to_string()
1558                     self.failUnless(ro_uri in l)
1559         d.addCallback(_check_ls_rouri)
1560
1561
1562         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1563         d.addCallback(run, "ls")
1564         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1565
1566         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1567         d.addCallback(run, "ls")
1568         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1569
1570         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1571         d.addCallback(run, "ls")
1572         d.addCallback(_check_ls, ["file3", "file3-copy"])
1573         d.addCallback(run, "get", "tahoe:file3-copy")
1574         d.addCallback(_check_stdout_against, 3)
1575
1576         # copy from disk into tahoe
1577         d.addCallback(run, "cp", files[4], "tahoe:file4")
1578         d.addCallback(run, "ls")
1579         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1580         d.addCallback(run, "get", "tahoe:file4")
1581         d.addCallback(_check_stdout_against, 4)
1582
1583         # copy from tahoe into disk
1584         target_filename = os.path.join(self.basedir, "file-out")
1585         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1586         def _check_cp_out((out,err)):
1587             self.failUnless(os.path.exists(target_filename))
1588             got = open(target_filename,"rb").read()
1589             self.failUnlessEqual(got, datas[4])
1590         d.addCallback(_check_cp_out)
1591
1592         # copy from disk to disk (silly case)
1593         target2_filename = os.path.join(self.basedir, "file-out-copy")
1594         d.addCallback(run, "cp", target_filename, target2_filename)
1595         def _check_cp_out2((out,err)):
1596             self.failUnless(os.path.exists(target2_filename))
1597             got = open(target2_filename,"rb").read()
1598             self.failUnlessEqual(got, datas[4])
1599         d.addCallback(_check_cp_out2)
1600
1601         # copy from tahoe into disk, overwriting an existing file
1602         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1603         def _check_cp_out3((out,err)):
1604             self.failUnless(os.path.exists(target_filename))
1605             got = open(target_filename,"rb").read()
1606             self.failUnlessEqual(got, datas[3])
1607         d.addCallback(_check_cp_out3)
1608
1609         # copy from disk into tahoe, overwriting an existing immutable file
1610         d.addCallback(run, "cp", files[5], "tahoe:file4")
1611         d.addCallback(run, "ls")
1612         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1613         d.addCallback(run, "get", "tahoe:file4")
1614         d.addCallback(_check_stdout_against, 5)
1615
1616         # copy from disk into tahoe, overwriting an existing mutable file
1617         d.addCallback(run, "cp", files[5], "tahoe:file3")
1618         d.addCallback(run, "ls")
1619         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1620         d.addCallback(run, "get", "tahoe:file3")
1621         d.addCallback(_check_stdout_against, 5)
1622
1623         # recursive copy: setup
1624         dn = os.path.join(self.basedir, "dir1")
1625         os.makedirs(dn)
1626         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1627         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1628         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1629         sdn2 = os.path.join(dn, "subdir2")
1630         os.makedirs(sdn2)
1631         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1632         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1633
1634         # from disk into tahoe
1635         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1636         d.addCallback(run, "ls")
1637         d.addCallback(_check_ls, ["dir1"])
1638         d.addCallback(run, "ls", "dir1")
1639         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1640                       ["rfile4", "rfile5"])
1641         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1642         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1643                       ["rfile1", "rfile2", "rfile3"])
1644         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1645         d.addCallback(_check_stdout_against, data="rfile4")
1646
1647         # and back out again
1648         dn_copy = os.path.join(self.basedir, "dir1-copy")
1649         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1650         def _check_cp_r_out((out,err)):
1651             def _cmp(name):
1652                 old = open(os.path.join(dn, name), "rb").read()
1653                 newfn = os.path.join(dn_copy, name)
1654                 self.failUnless(os.path.exists(newfn))
1655                 new = open(newfn, "rb").read()
1656                 self.failUnlessEqual(old, new)
1657             _cmp("rfile1")
1658             _cmp("rfile2")
1659             _cmp("rfile3")
1660             _cmp(os.path.join("subdir2", "rfile4"))
1661             _cmp(os.path.join("subdir2", "rfile5"))
1662         d.addCallback(_check_cp_r_out)
1663
1664         # and copy it a second time, which ought to overwrite the same files
1665         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1666
1667         # and tahoe-to-tahoe
1668         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1669         d.addCallback(run, "ls")
1670         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1671         d.addCallback(run, "ls", "dir1-copy")
1672         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1673                       ["rfile4", "rfile5"])
1674         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1675         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1676                       ["rfile1", "rfile2", "rfile3"])
1677         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1678         d.addCallback(_check_stdout_against, data="rfile4")
1679
1680         # and copy it a second time, which ought to overwrite the same files
1681         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1682
1683         # tahoe_ls doesn't currently handle the error correctly: it tries to
1684         # JSON-parse a traceback.
1685 ##         def _ls_missing(res):
1686 ##             argv = ["ls"] + nodeargs + ["bogus"]
1687 ##             return self._run_cli(argv)
1688 ##         d.addCallback(_ls_missing)
1689 ##         def _check_ls_missing((out,err)):
1690 ##             print "OUT", out
1691 ##             print "ERR", err
1692 ##             self.failUnlessEqual(err, "")
1693 ##         d.addCallback(_check_ls_missing)
1694
1695         return d
1696
1697     def _run_cli(self, argv, stdin=""):
1698         #print "CLI:", argv
1699         stdout, stderr = StringIO(), StringIO()
1700         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1701                                   stdin=StringIO(stdin),
1702                                   stdout=stdout, stderr=stderr)
1703         def _done(res):
1704             return stdout.getvalue(), stderr.getvalue()
1705         d.addCallback(_done)
1706         return d
1707
1708     def _test_checker(self, res):
1709         ut = upload.Data("too big to be literal" * 200, convergence=None)
1710         d = self._personal_node.add_file(u"big file", ut)
1711
1712         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1713         def _check_dirnode_results(r):
1714             self.failUnless(r.is_healthy())
1715         d.addCallback(_check_dirnode_results)
1716         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1717         d.addCallback(_check_dirnode_results)
1718
1719         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1720         def _got_chk_filenode(n):
1721             self.failUnless(isinstance(n, filenode.FileNode))
1722             d = n.check(Monitor())
1723             def _check_filenode_results(r):
1724                 self.failUnless(r.is_healthy())
1725             d.addCallback(_check_filenode_results)
1726             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1727             d.addCallback(_check_filenode_results)
1728             return d
1729         d.addCallback(_got_chk_filenode)
1730
1731         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1732         def _got_lit_filenode(n):
1733             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1734             d = n.check(Monitor())
1735             def _check_lit_filenode_results(r):
1736                 self.failUnlessEqual(r, None)
1737             d.addCallback(_check_lit_filenode_results)
1738             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1739             d.addCallback(_check_lit_filenode_results)
1740             return d
1741         d.addCallback(_got_lit_filenode)
1742         return d
1743
1744
1745 class MutableChecker(SystemTestMixin, unittest.TestCase, WebErrorMixin):
1746
1747     def _run_cli(self, argv):
1748         stdout, stderr = StringIO(), StringIO()
1749         runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1750         return stdout.getvalue()
1751
1752     def test_good(self):
1753         self.basedir = self.mktemp()
1754         d = self.set_up_nodes()
1755         CONTENTS = "a little bit of data"
1756         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1757         def _created(node):
1758             self.node = node
1759             si = self.node.get_storage_index()
1760         d.addCallback(_created)
1761         # now make sure the webapi verifier sees no problems
1762         def _do_check(res):
1763             url = (self.webish_url +
1764                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1765                    "?t=check&verify=true")
1766             return getPage(url, method="POST")
1767         d.addCallback(_do_check)
1768         def _got_results(out):
1769             self.failUnless("<span>Healthy!</span>" in out, out)
1770             self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1771             self.failIf("Not Healthy!" in out, out)
1772             self.failIf("Unhealthy" in out, out)
1773             self.failIf("Corrupt Shares" in out, out)
1774         d.addCallback(_got_results)
1775         d.addErrback(self.explain_web_error)
1776         return d
1777
1778     def test_corrupt(self):
1779         self.basedir = self.mktemp()
1780         d = self.set_up_nodes()
1781         CONTENTS = "a little bit of data"
1782         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1783         def _created(node):
1784             self.node = node
1785             si = self.node.get_storage_index()
1786             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1787                                 self.clients[1].basedir])
1788             files = out.split("\n")
1789             # corrupt one of them, using the CLI debug command
1790             f = files[0]
1791             shnum = os.path.basename(f)
1792             nodeid = self.clients[1].nodeid
1793             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1794             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1795             out = self._run_cli(["debug", "corrupt-share", files[0]])
1796         d.addCallback(_created)
1797         # now make sure the webapi verifier notices it
1798         def _do_check(res):
1799             url = (self.webish_url +
1800                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1801                    "?t=check&verify=true")
1802             return getPage(url, method="POST")
1803         d.addCallback(_do_check)
1804         def _got_results(out):
1805             self.failUnless("Not Healthy!" in out, out)
1806             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1807             self.failUnless("Corrupt Shares:" in out, out)
1808         d.addCallback(_got_results)
1809
1810         # now make sure the webapi repairer can fix it
1811         def _do_repair(res):
1812             url = (self.webish_url +
1813                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1814                    "?t=check&verify=true&repair=true")
1815             return getPage(url, method="POST")
1816         d.addCallback(_do_repair)
1817         def _got_repair_results(out):
1818             self.failUnless("<div>Repair successful</div>" in out, out)
1819         d.addCallback(_got_repair_results)
1820         d.addCallback(_do_check)
1821         def _got_postrepair_results(out):
1822             self.failIf("Not Healthy!" in out, out)
1823             self.failUnless("Recoverable Versions: 10*seq" in out, out)
1824         d.addCallback(_got_postrepair_results)
1825         d.addErrback(self.explain_web_error)
1826
1827         return d
1828
1829     def test_delete_share(self):
1830         self.basedir = self.mktemp()
1831         d = self.set_up_nodes()
1832         CONTENTS = "a little bit of data"
1833         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1834         def _created(node):
1835             self.node = node
1836             si = self.node.get_storage_index()
1837             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1838                                 self.clients[1].basedir])
1839             files = out.split("\n")
1840             # corrupt one of them, using the CLI debug command
1841             f = files[0]
1842             shnum = os.path.basename(f)
1843             nodeid = self.clients[1].nodeid
1844             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1845             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1846             os.unlink(files[0])
1847         d.addCallback(_created)
1848         # now make sure the webapi checker notices it
1849         def _do_check(res):
1850             url = (self.webish_url +
1851                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1852                    "?t=check&verify=false")
1853             return getPage(url, method="POST")
1854         d.addCallback(_do_check)
1855         def _got_results(out):
1856             self.failUnless("Not Healthy!" in out, out)
1857             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1858             self.failIf("Corrupt Shares" in out, out)
1859         d.addCallback(_got_results)
1860
1861         # now make sure the webapi repairer can fix it
1862         def _do_repair(res):
1863             url = (self.webish_url +
1864                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1865                    "?t=check&verify=false&repair=true")
1866             return getPage(url, method="POST")
1867         d.addCallback(_do_repair)
1868         def _got_repair_results(out):
1869             self.failUnless("Repair successful" in out)
1870         d.addCallback(_got_repair_results)
1871         d.addCallback(_do_check)
1872         def _got_postrepair_results(out):
1873             self.failIf("Not Healthy!" in out, out)
1874             self.failUnless("Recoverable Versions: 10*seq" in out)
1875         d.addCallback(_got_postrepair_results)
1876         d.addErrback(self.explain_web_error)
1877
1878         return d
1879
1880 class DeepCheckWeb(SystemTestMixin, unittest.TestCase, WebErrorMixin):
1881     # construct a small directory tree (with one dir, one immutable file, one
1882     # mutable file, one LIT file, and a loop), and then check/examine it in
1883     # various ways.
1884
1885     def set_up_tree(self, ignored):
1886         # 2.9s
1887         c0 = self.clients[0]
1888         d = c0.create_empty_dirnode()
1889         def _created_root(n):
1890             self.root = n
1891             self.root_uri = n.get_uri()
1892         d.addCallback(_created_root)
1893         d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1894         d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1895         def _created_mutable(n):
1896             self.mutable = n
1897             self.mutable_uri = n.get_uri()
1898         d.addCallback(_created_mutable)
1899
1900         large = upload.Data("Lots of data\n" * 1000, None)
1901         d.addCallback(lambda ign: self.root.add_file(u"large", large))
1902         def _created_large(n):
1903             self.large = n
1904             self.large_uri = n.get_uri()
1905         d.addCallback(_created_large)
1906
1907         small = upload.Data("Small enough for a LIT", None)
1908         d.addCallback(lambda ign: self.root.add_file(u"small", small))
1909         def _created_small(n):
1910             self.small = n
1911             self.small_uri = n.get_uri()
1912         d.addCallback(_created_small)
1913
1914         d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
1915         return d
1916
1917     def check_is_healthy(self, cr, n, where, incomplete=False):
1918         self.failUnless(ICheckerResults.providedBy(cr), where)
1919         self.failUnless(cr.is_healthy(), where)
1920         self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
1921                              where)
1922         self.failUnlessEqual(cr.get_storage_index_string(),
1923                              base32.b2a(n.get_storage_index()), where)
1924         needs_rebalancing = bool( len(self.clients) < 10 )
1925         if not incomplete:
1926             self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
1927         d = cr.get_data()
1928         self.failUnlessEqual(d["count-shares-good"], 10, where)
1929         self.failUnlessEqual(d["count-shares-needed"], 3, where)
1930         self.failUnlessEqual(d["count-shares-expected"], 10, where)
1931         if not incomplete:
1932             self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
1933         self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
1934         self.failUnlessEqual(d["list-corrupt-shares"], [], where)
1935         if not incomplete:
1936             self.failUnlessEqual(sorted(d["servers-responding"]),
1937                                  sorted([c.nodeid for c in self.clients]),
1938                                  where)
1939             self.failUnless("sharemap" in d, where)
1940             all_serverids = set()
1941             for (shareid, serverids) in d["sharemap"].items():
1942                 all_serverids.update(serverids)
1943             self.failUnlessEqual(sorted(all_serverids),
1944                                  sorted([c.nodeid for c in self.clients]),
1945                                  where)
1946
1947         self.failUnlessEqual(d["count-wrong-shares"], 0, where)
1948         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
1949         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
1950
1951
1952     def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
1953         self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
1954         self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
1955         self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
1956         self.failUnless(cr.get_post_repair_results().is_healthy(), where)
1957         self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
1958         self.failIf(cr.get_repair_attempted(), where)
1959
1960     def deep_check_is_healthy(self, cr, num_healthy, where):
1961         self.failUnless(IDeepCheckResults.providedBy(cr))
1962         self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
1963                              num_healthy, where)
1964
1965     def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
1966         self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
1967         c = cr.get_counters()
1968         self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
1969                              num_healthy, where)
1970         self.failUnlessEqual(c["count-objects-healthy-post-repair"],
1971                              num_healthy, where)
1972         self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
1973
1974     def test_good(self):
1975         self.basedir = self.mktemp()
1976         d = self.set_up_nodes()
1977         d.addCallback(self.set_up_tree)
1978         d.addCallback(self.do_stats)
1979         d.addCallback(self.do_test_good)
1980         d.addCallback(self.do_test_web)
1981         d.addErrback(self.explain_web_error)
1982         return d
1983
1984     def do_stats(self, ignored):
1985         d = defer.succeed(None)
1986         d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
1987         d.addCallback(self.check_stats)
1988         return d
1989
1990     def check_stats(self, s):
1991         self.failUnlessEqual(s["count-directories"], 1)
1992         self.failUnlessEqual(s["count-files"], 3)
1993         self.failUnlessEqual(s["count-immutable-files"], 1)
1994         self.failUnlessEqual(s["count-literal-files"], 1)
1995         self.failUnlessEqual(s["count-mutable-files"], 1)
1996         # don't check directories: their size will vary
1997         # s["largest-directory"]
1998         # s["size-directories"]
1999         self.failUnlessEqual(s["largest-directory-children"], 4)
2000         self.failUnlessEqual(s["largest-immutable-file"], 13000)
2001         # to re-use this function for both the local
2002         # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
2003         # coerce the result into a list of tuples. dirnode.start_deep_stats()
2004         # returns a list of tuples, but JSON only knows about lists., so
2005         # t=start-deep-stats returns a list of lists.
2006         histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
2007         self.failUnlessEqual(histogram, [(11, 31, 1),
2008                                          (10001, 31622, 1),
2009                                          ])
2010         self.failUnlessEqual(s["size-immutable-files"], 13000)
2011         self.failUnlessEqual(s["size-literal-files"], 22)
2012
2013     def do_test_good(self, ignored):
2014         d = defer.succeed(None)
2015         # check the individual items
2016         d.addCallback(lambda ign: self.root.check(Monitor()))
2017         d.addCallback(self.check_is_healthy, self.root, "root")
2018         d.addCallback(lambda ign: self.mutable.check(Monitor()))
2019         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2020         d.addCallback(lambda ign: self.large.check(Monitor()))
2021         d.addCallback(self.check_is_healthy, self.large, "large")
2022         d.addCallback(lambda ign: self.small.check(Monitor()))
2023         d.addCallback(self.failUnlessEqual, None, "small")
2024
2025         # and again with verify=True
2026         d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2027         d.addCallback(self.check_is_healthy, self.root, "root")
2028         d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2029         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2030         d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2031         d.addCallback(self.check_is_healthy, self.large, "large",
2032                       incomplete=True)
2033         d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2034         d.addCallback(self.failUnlessEqual, None, "small")
2035
2036         # and check_and_repair(), which should be a nop
2037         d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2038         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2039         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2040         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2041         d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2042         d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2043         d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2044         d.addCallback(self.failUnlessEqual, None, "small")
2045
2046         # check_and_repair(verify=True)
2047         d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2048         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2049         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2050         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2051         d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2052         d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2053                       incomplete=True)
2054         d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2055         d.addCallback(self.failUnlessEqual, None, "small")
2056
2057
2058         # now deep-check the root, with various verify= and repair= options
2059         d.addCallback(lambda ign:
2060                       self.root.start_deep_check().when_done())
2061         d.addCallback(self.deep_check_is_healthy, 3, "root")
2062         d.addCallback(lambda ign:
2063                       self.root.start_deep_check(verify=True).when_done())
2064         d.addCallback(self.deep_check_is_healthy, 3, "root")
2065         d.addCallback(lambda ign:
2066                       self.root.start_deep_check_and_repair().when_done())
2067         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2068         d.addCallback(lambda ign:
2069                       self.root.start_deep_check_and_repair(verify=True).when_done())
2070         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2071
2072         # and finally, start a deep-check, but then cancel it.
2073         d.addCallback(lambda ign: self.root.start_deep_check())
2074         def _checking(monitor):
2075             monitor.cancel()
2076             d = monitor.when_done()
2077             # this should fire as soon as the next dirnode.list finishes.
2078             # TODO: add a counter to measure how many list() calls are made,
2079             # assert that no more than one gets to run before the cancel()
2080             # takes effect.
2081             def _finished_normally(res):
2082                 self.fail("this was supposed to fail, not finish normally")
2083             def _cancelled(f):
2084                 f.trap(OperationCancelledError)
2085             d.addCallbacks(_finished_normally, _cancelled)
2086             return d
2087         d.addCallback(_checking)
2088
2089         return d
2090
2091     def web_json(self, n, **kwargs):
2092         kwargs["output"] = "json"
2093         d = self.web(n, "POST", **kwargs)
2094         d.addCallback(self.decode_json)
2095         return d
2096
2097     def decode_json(self, (s,url)):
2098         try:
2099             data = simplejson.loads(s)
2100         except ValueError:
2101             self.fail("%s: not JSON: '%s'" % (url, s))
2102         return data
2103
2104     def web(self, n, method="GET", **kwargs):
2105         # returns (data, url)
2106         url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
2107                + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
2108         d = getPage(url, method=method)
2109         d.addCallback(lambda data: (data,url))
2110         return d
2111
2112     def wait_for_operation(self, ignored, ophandle):
2113         url = self.webish_url + "operations/" + ophandle
2114         url += "?t=status&output=JSON"
2115         d = getPage(url)
2116         def _got(res):
2117             try:
2118                 data = simplejson.loads(res)
2119             except ValueError:
2120                 self.fail("%s: not JSON: '%s'" % (url, res))
2121             if not data["finished"]:
2122                 d = self.stall(delay=1.0)
2123                 d.addCallback(self.wait_for_operation, ophandle)
2124                 return d
2125             return data
2126         d.addCallback(_got)
2127         return d
2128
2129     def get_operation_results(self, ignored, ophandle, output=None):
2130         url = self.webish_url + "operations/" + ophandle
2131         url += "?t=status"
2132         if output:
2133             url += "&output=" + output
2134         d = getPage(url)
2135         def _got(res):
2136             if output and output.lower() == "json":
2137                 try:
2138                     return simplejson.loads(res)
2139                 except ValueError:
2140                     self.fail("%s: not JSON: '%s'" % (url, res))
2141             return res
2142         d.addCallback(_got)
2143         return d
2144
2145     def slow_web(self, n, output=None, **kwargs):
2146         # use ophandle=
2147         handle = base32.b2a(os.urandom(4))
2148         d = self.web(n, "POST", ophandle=handle, **kwargs)
2149         d.addCallback(self.wait_for_operation, handle)
2150         d.addCallback(self.get_operation_results, handle, output=output)
2151         return d
2152
2153     def json_check_is_healthy(self, data, n, where, incomplete=False):
2154
2155         self.failUnlessEqual(data["storage-index"],
2156                              base32.b2a(n.get_storage_index()), where)
2157         r = data["results"]
2158         self.failUnlessEqual(r["healthy"], True, where)
2159         needs_rebalancing = bool( len(self.clients) < 10 )
2160         if not incomplete:
2161             self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2162         self.failUnlessEqual(r["count-shares-good"], 10, where)
2163         self.failUnlessEqual(r["count-shares-needed"], 3, where)
2164         self.failUnlessEqual(r["count-shares-expected"], 10, where)
2165         if not incomplete:
2166             self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2167         self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2168         self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2169         if not incomplete:
2170             self.failUnlessEqual(sorted(r["servers-responding"]),
2171                                  sorted([idlib.nodeid_b2a(c.nodeid)
2172                                          for c in self.clients]), where)
2173             self.failUnless("sharemap" in r, where)
2174             all_serverids = set()
2175             for (shareid, serverids_s) in r["sharemap"].items():
2176                 all_serverids.update(serverids_s)
2177             self.failUnlessEqual(sorted(all_serverids),
2178                                  sorted([idlib.nodeid_b2a(c.nodeid)
2179                                          for c in self.clients]), where)
2180         self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2181         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2182         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2183
2184     def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2185         self.failUnlessEqual(data["storage-index"],
2186                              base32.b2a(n.get_storage_index()), where)
2187         self.failUnlessEqual(data["repair-attempted"], False, where)
2188         self.json_check_is_healthy(data["pre-repair-results"],
2189                                    n, where, incomplete)
2190         self.json_check_is_healthy(data["post-repair-results"],
2191                                    n, where, incomplete)
2192
2193     def json_full_deepcheck_is_healthy(self, data, n, where):
2194         self.failUnlessEqual(data["root-storage-index"],
2195                              base32.b2a(n.get_storage_index()), where)
2196         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2197         self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2198         self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2199         self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2200         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2201         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2202         self.json_check_stats(data["stats"], where)
2203
2204     def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2205         self.failUnlessEqual(data["root-storage-index"],
2206                              base32.b2a(n.get_storage_index()), where)
2207         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2208
2209         self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2210         self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2211         self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2212
2213         self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2214         self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2215         self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2216
2217         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2218         self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2219         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2220
2221         self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2222         self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2223         self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2224
2225
2226     def json_check_lit(self, data, n, where):
2227         self.failUnlessEqual(data["storage-index"], "", where)
2228         self.failUnlessEqual(data["results"]["healthy"], True, where)
2229
2230     def json_check_stats(self, data, where):
2231         self.check_stats(data)
2232
2233     def do_test_web(self, ignored):
2234         d = defer.succeed(None)
2235
2236         # stats
2237         d.addCallback(lambda ign:
2238                       self.slow_web(self.root,
2239                                     t="start-deep-stats", output="json"))
2240         d.addCallback(self.json_check_stats, "deep-stats")
2241
2242         # check, no verify
2243         d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2244         d.addCallback(self.json_check_is_healthy, self.root, "root")
2245         d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2246         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2247         d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2248         d.addCallback(self.json_check_is_healthy, self.large, "large")
2249         d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2250         d.addCallback(self.json_check_lit, self.small, "small")
2251
2252         # check and verify
2253         d.addCallback(lambda ign:
2254                       self.web_json(self.root, t="check", verify="true"))
2255         d.addCallback(self.json_check_is_healthy, self.root, "root")
2256         d.addCallback(lambda ign:
2257                       self.web_json(self.mutable, t="check", verify="true"))
2258         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2259         d.addCallback(lambda ign:
2260                       self.web_json(self.large, t="check", verify="true"))
2261         d.addCallback(self.json_check_is_healthy, self.large, "large", incomplete=True)
2262         d.addCallback(lambda ign:
2263                       self.web_json(self.small, t="check", verify="true"))
2264         d.addCallback(self.json_check_lit, self.small, "small")
2265
2266         # check and repair, no verify
2267         d.addCallback(lambda ign:
2268                       self.web_json(self.root, t="check", repair="true"))
2269         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2270         d.addCallback(lambda ign:
2271                       self.web_json(self.mutable, t="check", repair="true"))
2272         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2273         d.addCallback(lambda ign:
2274                       self.web_json(self.large, t="check", repair="true"))
2275         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large")
2276         d.addCallback(lambda ign:
2277                       self.web_json(self.small, t="check", repair="true"))
2278         d.addCallback(self.json_check_lit, self.small, "small")
2279
2280         # check+verify+repair
2281         d.addCallback(lambda ign:
2282                       self.web_json(self.root, t="check", repair="true", verify="true"))
2283         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2284         d.addCallback(lambda ign:
2285                       self.web_json(self.mutable, t="check", repair="true", verify="true"))
2286         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2287         d.addCallback(lambda ign:
2288                       self.web_json(self.large, t="check", repair="true", verify="true"))
2289         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large", incomplete=True)
2290         d.addCallback(lambda ign:
2291                       self.web_json(self.small, t="check", repair="true", verify="true"))
2292         d.addCallback(self.json_check_lit, self.small, "small")
2293
2294         # now run a deep-check, with various verify= and repair= flags
2295         d.addCallback(lambda ign:
2296                       self.slow_web(self.root, t="start-deep-check", output="json"))
2297         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2298         d.addCallback(lambda ign:
2299                       self.slow_web(self.root, t="start-deep-check", verify="true",
2300                                     output="json"))
2301         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2302         d.addCallback(lambda ign:
2303                       self.slow_web(self.root, t="start-deep-check", repair="true",
2304                                     output="json"))
2305         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2306         d.addCallback(lambda ign:
2307                       self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2308         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2309
2310         # now look at t=info
2311         d.addCallback(lambda ign: self.web(self.root, t="info"))
2312         # TODO: examine the output
2313         d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2314         d.addCallback(lambda ign: self.web(self.large, t="info"))
2315         d.addCallback(lambda ign: self.web(self.small, t="info"))
2316
2317         return d