]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
#527: respond to GETs with early ranges quickly, without waiting for the whole file...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri, storage, offloaded
12 from allmydata.immutable import download, upload, filenode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
17      ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
18      IDeepCheckAndRepairResults, NoSuchChildError, NotEnoughSharesError
19 from allmydata.monitor import Monitor, OperationCancelledError
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
26
27 from allmydata.test.common import SystemTestMixin, WebErrorMixin, \
28      MemoryConsumer, download_to_data
29
30 LARGE_DATA = """
31 This is some data to publish to the virtual drive, which needs to be large
32 enough to not fit inside a LIT uri.
33 """
34
35 class CountingDataUploadable(upload.Data):
36     bytes_read = 0
37     interrupt_after = None
38     interrupt_after_d = None
39
40     def read(self, length):
41         self.bytes_read += length
42         if self.interrupt_after is not None:
43             if self.bytes_read > self.interrupt_after:
44                 self.interrupt_after = None
45                 self.interrupt_after_d.callback(self)
46         return upload.Data.read(self, length)
47
48 class GrabEverythingConsumer:
49     implements(IConsumer)
50
51     def __init__(self):
52         self.contents = ""
53
54     def registerProducer(self, producer, streaming):
55         assert streaming
56         assert IPushProducer.providedBy(producer)
57
58     def write(self, data):
59         self.contents += data
60
61     def unregisterProducer(self):
62         pass
63
64 class SystemTest(SystemTestMixin, unittest.TestCase):
65
66     def test_connections(self):
67         self.basedir = "system/SystemTest/test_connections"
68         d = self.set_up_nodes()
69         self.extra_node = None
70         d.addCallback(lambda res: self.add_extra_node(self.numclients))
71         def _check(extra_node):
72             self.extra_node = extra_node
73             for c in self.clients:
74                 all_peerids = list(c.get_all_peerids())
75                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
76                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
77                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
78
79         d.addCallback(_check)
80         def _shutdown_extra_node(res):
81             if self.extra_node:
82                 return self.extra_node.stopService()
83             return res
84         d.addBoth(_shutdown_extra_node)
85         return d
86     test_connections.timeout = 300
87     # test_connections is subsumed by test_upload_and_download, and takes
88     # quite a while to run on a slow machine (because of all the TLS
89     # connections that must be established). If we ever rework the introducer
90     # code to such an extent that we're not sure if it works anymore, we can
91     # reinstate this test until it does.
92     del test_connections
93
94     def test_upload_and_download_random_key(self):
95         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
96         return self._test_upload_and_download(convergence=None)
97     test_upload_and_download_random_key.timeout = 4800
98
99     def test_upload_and_download_convergent(self):
100         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
101         return self._test_upload_and_download(convergence="some convergence string")
102     test_upload_and_download_convergent.timeout = 4800
103
104     def _test_upload_and_download(self, convergence):
105         # we use 4000 bytes of data, which will result in about 400k written
106         # to disk among all our simulated nodes
107         DATA = "Some data to upload\n" * 200
108         d = self.set_up_nodes()
109         def _check_connections(res):
110             for c in self.clients:
111                 all_peerids = list(c.get_all_peerids())
112                 self.failUnlessEqual(len(all_peerids), self.numclients)
113                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
114                 self.failUnlessEqual(len(permuted_peers), self.numclients)
115         d.addCallback(_check_connections)
116
117         def _do_upload(res):
118             log.msg("UPLOADING")
119             u = self.clients[0].getServiceNamed("uploader")
120             self.uploader = u
121             # we crank the max segsize down to 1024b for the duration of this
122             # test, so we can exercise multiple segments. It is important
123             # that this is not a multiple of the segment size, so that the
124             # tail segment is not the same length as the others. This actualy
125             # gets rounded up to 1025 to be a multiple of the number of
126             # required shares (since we use 25 out of 100 FEC).
127             up = upload.Data(DATA, convergence=convergence)
128             up.max_segment_size = 1024
129             d1 = u.upload(up)
130             return d1
131         d.addCallback(_do_upload)
132         def _upload_done(results):
133             uri = results.uri
134             log.msg("upload finished: uri is %s" % (uri,))
135             self.uri = uri
136             dl = self.clients[1].getServiceNamed("downloader")
137             self.downloader = dl
138         d.addCallback(_upload_done)
139
140         def _upload_again(res):
141             # Upload again. If using convergent encryption then this ought to be
142             # short-circuited, however with the way we currently generate URIs
143             # (i.e. because they include the roothash), we have to do all of the
144             # encoding work, and only get to save on the upload part.
145             log.msg("UPLOADING AGAIN")
146             up = upload.Data(DATA, convergence=convergence)
147             up.max_segment_size = 1024
148             d1 = self.uploader.upload(up)
149         d.addCallback(_upload_again)
150
151         def _download_to_data(res):
152             log.msg("DOWNLOADING")
153             return self.downloader.download_to_data(self.uri)
154         d.addCallback(_download_to_data)
155         def _download_to_data_done(data):
156             log.msg("download finished")
157             self.failUnlessEqual(data, DATA)
158         d.addCallback(_download_to_data_done)
159
160         target_filename = os.path.join(self.basedir, "download.target")
161         def _download_to_filename(res):
162             return self.downloader.download_to_filename(self.uri,
163                                                         target_filename)
164         d.addCallback(_download_to_filename)
165         def _download_to_filename_done(res):
166             newdata = open(target_filename, "rb").read()
167             self.failUnlessEqual(newdata, DATA)
168         d.addCallback(_download_to_filename_done)
169
170         target_filename2 = os.path.join(self.basedir, "download.target2")
171         def _download_to_filehandle(res):
172             fh = open(target_filename2, "wb")
173             return self.downloader.download_to_filehandle(self.uri, fh)
174         d.addCallback(_download_to_filehandle)
175         def _download_to_filehandle_done(fh):
176             fh.close()
177             newdata = open(target_filename2, "rb").read()
178             self.failUnlessEqual(newdata, DATA)
179         d.addCallback(_download_to_filehandle_done)
180
181         consumer = GrabEverythingConsumer()
182         ct = download.ConsumerAdapter(consumer)
183         d.addCallback(lambda res:
184                       self.downloader.download(self.uri, ct))
185         def _download_to_consumer_done(ign):
186             self.failUnlessEqual(consumer.contents, DATA)
187         d.addCallback(_download_to_consumer_done)
188
189         def _test_read(res):
190             n = self.clients[1].create_node_from_uri(self.uri)
191             d = download_to_data(n)
192             def _read_done(data):
193                 self.failUnlessEqual(data, DATA)
194             d.addCallback(_read_done)
195             d.addCallback(lambda ign:
196                           n.read(MemoryConsumer(), offset=1, size=4))
197             def _read_portion_done(mc):
198                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
199             d.addCallback(_read_portion_done)
200             d.addCallback(lambda ign:
201                           n.read(MemoryConsumer(), offset=2, size=None))
202             def _read_tail_done(mc):
203                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
204             d.addCallback(_read_tail_done)
205
206             return d
207         d.addCallback(_test_read)
208
209         def _test_bad_read(res):
210             bad_u = uri.from_string_filenode(self.uri)
211             bad_u.key = self.flip_bit(bad_u.key)
212             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
213             # this should cause an error during download
214
215             d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
216                                  None,
217                                  bad_n.read, MemoryConsumer(), offset=2)
218             return d
219         d.addCallback(_test_bad_read)
220
221         def _download_nonexistent_uri(res):
222             baduri = self.mangle_uri(self.uri)
223             log.msg("about to download non-existent URI", level=log.UNUSUAL,
224                     facility="tahoe.tests")
225             d1 = self.downloader.download_to_data(baduri)
226             def _baduri_should_fail(res):
227                 log.msg("finished downloading non-existend URI",
228                         level=log.UNUSUAL, facility="tahoe.tests")
229                 self.failUnless(isinstance(res, Failure))
230                 self.failUnless(res.check(NotEnoughSharesError),
231                                 "expected NotEnoughSharesError, got %s" % res)
232                 # TODO: files that have zero peers should get a special kind
233                 # of NotEnoughSharesError, which can be used to suggest that
234                 # the URI might be wrong or that they've never uploaded the
235                 # file in the first place.
236             d1.addBoth(_baduri_should_fail)
237             return d1
238         d.addCallback(_download_nonexistent_uri)
239
240         # add a new node, which doesn't accept shares, and only uses the
241         # helper for upload.
242         d.addCallback(lambda res: self.add_extra_node(self.numclients,
243                                                       self.helper_furl,
244                                                       add_to_sparent=True))
245         def _added(extra_node):
246             self.extra_node = extra_node
247             extra_node.getServiceNamed("storage").sizelimit = 0
248         d.addCallback(_added)
249
250         HELPER_DATA = "Data that needs help to upload" * 1000
251         def _upload_with_helper(res):
252             u = upload.Data(HELPER_DATA, convergence=convergence)
253             d = self.extra_node.upload(u)
254             def _uploaded(results):
255                 uri = results.uri
256                 return self.downloader.download_to_data(uri)
257             d.addCallback(_uploaded)
258             def _check(newdata):
259                 self.failUnlessEqual(newdata, HELPER_DATA)
260             d.addCallback(_check)
261             return d
262         d.addCallback(_upload_with_helper)
263
264         def _upload_duplicate_with_helper(res):
265             u = upload.Data(HELPER_DATA, convergence=convergence)
266             u.debug_stash_RemoteEncryptedUploadable = True
267             d = self.extra_node.upload(u)
268             def _uploaded(results):
269                 uri = results.uri
270                 return self.downloader.download_to_data(uri)
271             d.addCallback(_uploaded)
272             def _check(newdata):
273                 self.failUnlessEqual(newdata, HELPER_DATA)
274                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
275                             "uploadable started uploading, should have been avoided")
276             d.addCallback(_check)
277             return d
278         if convergence is not None:
279             d.addCallback(_upload_duplicate_with_helper)
280
281         def _upload_resumable(res):
282             DATA = "Data that needs help to upload and gets interrupted" * 1000
283             u1 = CountingDataUploadable(DATA, convergence=convergence)
284             u2 = CountingDataUploadable(DATA, convergence=convergence)
285
286             # we interrupt the connection after about 5kB by shutting down
287             # the helper, then restartingit.
288             u1.interrupt_after = 5000
289             u1.interrupt_after_d = defer.Deferred()
290             u1.interrupt_after_d.addCallback(lambda res:
291                                              self.bounce_client(0))
292
293             # sneak into the helper and reduce its chunk size, so that our
294             # debug_interrupt will sever the connection on about the fifth
295             # chunk fetched. This makes sure that we've started to write the
296             # new shares before we abandon them, which exercises the
297             # abort/delete-partial-share code. TODO: find a cleaner way to do
298             # this. I know that this will affect later uses of the helper in
299             # this same test run, but I'm not currently worried about it.
300             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
301
302             d = self.extra_node.upload(u1)
303
304             def _should_not_finish(res):
305                 self.fail("interrupted upload should have failed, not finished"
306                           " with result %s" % (res,))
307             def _interrupted(f):
308                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
309
310                 # make sure we actually interrupted it before finishing the
311                 # file
312                 self.failUnless(u1.bytes_read < len(DATA),
313                                 "read %d out of %d total" % (u1.bytes_read,
314                                                              len(DATA)))
315
316                 log.msg("waiting for reconnect", level=log.NOISY,
317                         facility="tahoe.test.test_system")
318                 # now, we need to give the nodes a chance to notice that this
319                 # connection has gone away. When this happens, the storage
320                 # servers will be told to abort their uploads, removing the
321                 # partial shares. Unfortunately this involves TCP messages
322                 # going through the loopback interface, and we can't easily
323                 # predict how long that will take. If it were all local, we
324                 # could use fireEventually() to stall. Since we don't have
325                 # the right introduction hooks, the best we can do is use a
326                 # fixed delay. TODO: this is fragile.
327                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
328                 return u1.interrupt_after_d
329             d.addCallbacks(_should_not_finish, _interrupted)
330
331             def _disconnected(res):
332                 # check to make sure the storage servers aren't still hanging
333                 # on to the partial share: their incoming/ directories should
334                 # now be empty.
335                 log.msg("disconnected", level=log.NOISY,
336                         facility="tahoe.test.test_system")
337                 for i in range(self.numclients):
338                     incdir = os.path.join(self.getdir("client%d" % i),
339                                           "storage", "shares", "incoming")
340                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
341             d.addCallback(_disconnected)
342
343             # then we need to give the reconnector a chance to
344             # reestablish the connection to the helper.
345             d.addCallback(lambda res:
346                           log.msg("wait_for_connections", level=log.NOISY,
347                                   facility="tahoe.test.test_system"))
348             d.addCallback(lambda res: self.wait_for_connections())
349
350
351             d.addCallback(lambda res:
352                           log.msg("uploading again", level=log.NOISY,
353                                   facility="tahoe.test.test_system"))
354             d.addCallback(lambda res: self.extra_node.upload(u2))
355
356             def _uploaded(results):
357                 uri = results.uri
358                 log.msg("Second upload complete", level=log.NOISY,
359                         facility="tahoe.test.test_system")
360
361                 # this is really bytes received rather than sent, but it's
362                 # convenient and basically measures the same thing
363                 bytes_sent = results.ciphertext_fetched
364
365                 # We currently don't support resumption of upload if the data is
366                 # encrypted with a random key.  (Because that would require us
367                 # to store the key locally and re-use it on the next upload of
368                 # this file, which isn't a bad thing to do, but we currently
369                 # don't do it.)
370                 if convergence is not None:
371                     # Make sure we did not have to read the whole file the
372                     # second time around .
373                     self.failUnless(bytes_sent < len(DATA),
374                                 "resumption didn't save us any work:"
375                                 " read %d bytes out of %d total" %
376                                 (bytes_sent, len(DATA)))
377                 else:
378                     # Make sure we did have to read the whole file the second
379                     # time around -- because the one that we partially uploaded
380                     # earlier was encrypted with a different random key.
381                     self.failIf(bytes_sent < len(DATA),
382                                 "resumption saved us some work even though we were using random keys:"
383                                 " read %d bytes out of %d total" %
384                                 (bytes_sent, len(DATA)))
385                 return self.downloader.download_to_data(uri)
386             d.addCallback(_uploaded)
387
388             def _check(newdata):
389                 self.failUnlessEqual(newdata, DATA)
390                 # If using convergent encryption, then also check that the
391                 # helper has removed the temp file from its directories.
392                 if convergence is not None:
393                     basedir = os.path.join(self.getdir("client0"), "helper")
394                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
395                     self.failUnlessEqual(files, [])
396                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
397                     self.failUnlessEqual(files, [])
398             d.addCallback(_check)
399             return d
400         d.addCallback(_upload_resumable)
401
402         return d
403
404     def _find_shares(self, basedir):
405         shares = []
406         for (dirpath, dirnames, filenames) in os.walk(basedir):
407             if "storage" not in dirpath:
408                 continue
409             if not filenames:
410                 continue
411             pieces = dirpath.split(os.sep)
412             if pieces[-4] == "storage" and pieces[-3] == "shares":
413                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
414                 # are sharefiles here
415                 assert pieces[-5].startswith("client")
416                 client_num = int(pieces[-5][-1])
417                 storage_index_s = pieces[-1]
418                 storage_index = storage.si_a2b(storage_index_s)
419                 for sharename in filenames:
420                     shnum = int(sharename)
421                     filename = os.path.join(dirpath, sharename)
422                     data = (client_num, storage_index, filename, shnum)
423                     shares.append(data)
424         if not shares:
425             self.fail("unable to find any share files in %s" % basedir)
426         return shares
427
428     def _corrupt_mutable_share(self, filename, which):
429         msf = storage.MutableShareFile(filename)
430         datav = msf.readv([ (0, 1000000) ])
431         final_share = datav[0]
432         assert len(final_share) < 1000000 # ought to be truncated
433         pieces = mutable_layout.unpack_share(final_share)
434         (seqnum, root_hash, IV, k, N, segsize, datalen,
435          verification_key, signature, share_hash_chain, block_hash_tree,
436          share_data, enc_privkey) = pieces
437
438         if which == "seqnum":
439             seqnum = seqnum + 15
440         elif which == "R":
441             root_hash = self.flip_bit(root_hash)
442         elif which == "IV":
443             IV = self.flip_bit(IV)
444         elif which == "segsize":
445             segsize = segsize + 15
446         elif which == "pubkey":
447             verification_key = self.flip_bit(verification_key)
448         elif which == "signature":
449             signature = self.flip_bit(signature)
450         elif which == "share_hash_chain":
451             nodenum = share_hash_chain.keys()[0]
452             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
453         elif which == "block_hash_tree":
454             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
455         elif which == "share_data":
456             share_data = self.flip_bit(share_data)
457         elif which == "encprivkey":
458             enc_privkey = self.flip_bit(enc_privkey)
459
460         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
461                                             segsize, datalen)
462         final_share = mutable_layout.pack_share(prefix,
463                                                 verification_key,
464                                                 signature,
465                                                 share_hash_chain,
466                                                 block_hash_tree,
467                                                 share_data,
468                                                 enc_privkey)
469         msf.writev( [(0, final_share)], None)
470
471
472     def test_mutable(self):
473         self.basedir = "system/SystemTest/test_mutable"
474         DATA = "initial contents go here."  # 25 bytes % 3 != 0
475         NEWDATA = "new contents yay"
476         NEWERDATA = "this is getting old"
477
478         d = self.set_up_nodes(use_key_generator=True)
479
480         def _create_mutable(res):
481             c = self.clients[0]
482             log.msg("starting create_mutable_file")
483             d1 = c.create_mutable_file(DATA)
484             def _done(res):
485                 log.msg("DONE: %s" % (res,))
486                 self._mutable_node_1 = res
487                 uri = res.get_uri()
488             d1.addCallback(_done)
489             return d1
490         d.addCallback(_create_mutable)
491
492         def _test_debug(res):
493             # find a share. It is important to run this while there is only
494             # one slot in the grid.
495             shares = self._find_shares(self.basedir)
496             (client_num, storage_index, filename, shnum) = shares[0]
497             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
498                     % filename)
499             log.msg(" for clients[%d]" % client_num)
500
501             out,err = StringIO(), StringIO()
502             rc = runner.runner(["debug", "dump-share", "--offsets",
503                                 filename],
504                                stdout=out, stderr=err)
505             output = out.getvalue()
506             self.failUnlessEqual(rc, 0)
507             try:
508                 self.failUnless("Mutable slot found:\n" in output)
509                 self.failUnless("share_type: SDMF\n" in output)
510                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
511                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
512                 self.failUnless(" num_extra_leases: 0\n" in output)
513                 # the pubkey size can vary by a byte, so the container might
514                 # be a bit larger on some runs.
515                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
516                 self.failUnless(m)
517                 container_size = int(m.group(1))
518                 self.failUnless(2037 <= container_size <= 2049, container_size)
519                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
520                 self.failUnless(m)
521                 data_length = int(m.group(1))
522                 self.failUnless(2037 <= data_length <= 2049, data_length)
523                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
524                                 in output)
525                 self.failUnless(" SDMF contents:\n" in output)
526                 self.failUnless("  seqnum: 1\n" in output)
527                 self.failUnless("  required_shares: 3\n" in output)
528                 self.failUnless("  total_shares: 10\n" in output)
529                 self.failUnless("  segsize: 27\n" in output, (output, filename))
530                 self.failUnless("  datalen: 25\n" in output)
531                 # the exact share_hash_chain nodes depends upon the sharenum,
532                 # and is more of a hassle to compute than I want to deal with
533                 # now
534                 self.failUnless("  share_hash_chain: " in output)
535                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
536                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
537                             base32.b2a(storage_index))
538                 self.failUnless(expected in output)
539             except unittest.FailTest:
540                 print
541                 print "dump-share output was:"
542                 print output
543                 raise
544         d.addCallback(_test_debug)
545
546         # test retrieval
547
548         # first, let's see if we can use the existing node to retrieve the
549         # contents. This allows it to use the cached pubkey and maybe the
550         # latest-known sharemap.
551
552         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
553         def _check_download_1(res):
554             self.failUnlessEqual(res, DATA)
555             # now we see if we can retrieve the data from a new node,
556             # constructed using the URI of the original one. We do this test
557             # on the same client that uploaded the data.
558             uri = self._mutable_node_1.get_uri()
559             log.msg("starting retrieve1")
560             newnode = self.clients[0].create_node_from_uri(uri)
561             newnode_2 = self.clients[0].create_node_from_uri(uri)
562             self.failUnlessIdentical(newnode, newnode_2)
563             return newnode.download_best_version()
564         d.addCallback(_check_download_1)
565
566         def _check_download_2(res):
567             self.failUnlessEqual(res, DATA)
568             # same thing, but with a different client
569             uri = self._mutable_node_1.get_uri()
570             newnode = self.clients[1].create_node_from_uri(uri)
571             log.msg("starting retrieve2")
572             d1 = newnode.download_best_version()
573             d1.addCallback(lambda res: (res, newnode))
574             return d1
575         d.addCallback(_check_download_2)
576
577         def _check_download_3((res, newnode)):
578             self.failUnlessEqual(res, DATA)
579             # replace the data
580             log.msg("starting replace1")
581             d1 = newnode.overwrite(NEWDATA)
582             d1.addCallback(lambda res: newnode.download_best_version())
583             return d1
584         d.addCallback(_check_download_3)
585
586         def _check_download_4(res):
587             self.failUnlessEqual(res, NEWDATA)
588             # now create an even newer node and replace the data on it. This
589             # new node has never been used for download before.
590             uri = self._mutable_node_1.get_uri()
591             newnode1 = self.clients[2].create_node_from_uri(uri)
592             newnode2 = self.clients[3].create_node_from_uri(uri)
593             self._newnode3 = self.clients[3].create_node_from_uri(uri)
594             log.msg("starting replace2")
595             d1 = newnode1.overwrite(NEWERDATA)
596             d1.addCallback(lambda res: newnode2.download_best_version())
597             return d1
598         d.addCallback(_check_download_4)
599
600         def _check_download_5(res):
601             log.msg("finished replace2")
602             self.failUnlessEqual(res, NEWERDATA)
603         d.addCallback(_check_download_5)
604
605         def _corrupt_shares(res):
606             # run around and flip bits in all but k of the shares, to test
607             # the hash checks
608             shares = self._find_shares(self.basedir)
609             ## sort by share number
610             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
611             where = dict([ (shnum, filename)
612                            for (client_num, storage_index, filename, shnum)
613                            in shares ])
614             assert len(where) == 10 # this test is designed for 3-of-10
615             for shnum, filename in where.items():
616                 # shares 7,8,9 are left alone. read will check
617                 # (share_hash_chain, block_hash_tree, share_data). New
618                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
619                 # segsize, signature).
620                 if shnum == 0:
621                     # read: this will trigger "pubkey doesn't match
622                     # fingerprint".
623                     self._corrupt_mutable_share(filename, "pubkey")
624                     self._corrupt_mutable_share(filename, "encprivkey")
625                 elif shnum == 1:
626                     # triggers "signature is invalid"
627                     self._corrupt_mutable_share(filename, "seqnum")
628                 elif shnum == 2:
629                     # triggers "signature is invalid"
630                     self._corrupt_mutable_share(filename, "R")
631                 elif shnum == 3:
632                     # triggers "signature is invalid"
633                     self._corrupt_mutable_share(filename, "segsize")
634                 elif shnum == 4:
635                     self._corrupt_mutable_share(filename, "share_hash_chain")
636                 elif shnum == 5:
637                     self._corrupt_mutable_share(filename, "block_hash_tree")
638                 elif shnum == 6:
639                     self._corrupt_mutable_share(filename, "share_data")
640                 # other things to correct: IV, signature
641                 # 7,8,9 are left alone
642
643                 # note that initial_query_count=5 means that we'll hit the
644                 # first 5 servers in effectively random order (based upon
645                 # response time), so we won't necessarily ever get a "pubkey
646                 # doesn't match fingerprint" error (if we hit shnum>=1 before
647                 # shnum=0, we pull the pubkey from there). To get repeatable
648                 # specific failures, we need to set initial_query_count=1,
649                 # but of course that will change the sequencing behavior of
650                 # the retrieval process. TODO: find a reasonable way to make
651                 # this a parameter, probably when we expand this test to test
652                 # for one failure mode at a time.
653
654                 # when we retrieve this, we should get three signature
655                 # failures (where we've mangled seqnum, R, and segsize). The
656                 # pubkey mangling
657         d.addCallback(_corrupt_shares)
658
659         d.addCallback(lambda res: self._newnode3.download_best_version())
660         d.addCallback(_check_download_5)
661
662         def _check_empty_file(res):
663             # make sure we can create empty files, this usually screws up the
664             # segsize math
665             d1 = self.clients[2].create_mutable_file("")
666             d1.addCallback(lambda newnode: newnode.download_best_version())
667             d1.addCallback(lambda res: self.failUnlessEqual("", res))
668             return d1
669         d.addCallback(_check_empty_file)
670
671         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
672         def _created_dirnode(dnode):
673             log.msg("_created_dirnode(%s)" % (dnode,))
674             d1 = dnode.list()
675             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
676             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
677             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
678             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
679             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
680             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
681             d1.addCallback(lambda res: dnode.build_manifest().when_done())
682             d1.addCallback(lambda manifest:
683                            self.failUnlessEqual(len(manifest), 1))
684             return d1
685         d.addCallback(_created_dirnode)
686
687         def wait_for_c3_kg_conn():
688             return self.clients[3]._key_generator is not None
689         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
690
691         def check_kg_poolsize(junk, size_delta):
692             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
693                                  self.key_generator_svc.key_generator.pool_size + size_delta)
694
695         d.addCallback(check_kg_poolsize, 0)
696         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
697         d.addCallback(check_kg_poolsize, -1)
698         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
699         d.addCallback(check_kg_poolsize, -2)
700         # use_helper induces use of clients[3], which is the using-key_gen client
701         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
702         d.addCallback(check_kg_poolsize, -3)
703
704         return d
705     # The default 120 second timeout went off when running it under valgrind
706     # on my old Windows laptop, so I'm bumping up the timeout.
707     test_mutable.timeout = 240
708
709     def flip_bit(self, good):
710         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
711
712     def mangle_uri(self, gooduri):
713         # change the key, which changes the storage index, which means we'll
714         # be asking about the wrong file, so nobody will have any shares
715         u = IFileURI(gooduri)
716         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
717                             uri_extension_hash=u.uri_extension_hash,
718                             needed_shares=u.needed_shares,
719                             total_shares=u.total_shares,
720                             size=u.size)
721         return u2.to_string()
722
723     # TODO: add a test which mangles the uri_extension_hash instead, and
724     # should fail due to not being able to get a valid uri_extension block.
725     # Also a test which sneakily mangles the uri_extension block to change
726     # some of the validation data, so it will fail in the post-download phase
727     # when the file's crypttext integrity check fails. Do the same thing for
728     # the key, which should cause the download to fail the post-download
729     # plaintext_hash check.
730
731     def test_vdrive(self):
732         self.basedir = "system/SystemTest/test_vdrive"
733         self.data = LARGE_DATA
734         d = self.set_up_nodes(use_stats_gatherer=True)
735         d.addCallback(self._test_introweb)
736         d.addCallback(self.log, "starting publish")
737         d.addCallback(self._do_publish1)
738         d.addCallback(self._test_runner)
739         d.addCallback(self._do_publish2)
740         # at this point, we have the following filesystem (where "R" denotes
741         # self._root_directory_uri):
742         # R
743         # R/subdir1
744         # R/subdir1/mydata567
745         # R/subdir1/subdir2/
746         # R/subdir1/subdir2/mydata992
747
748         d.addCallback(lambda res: self.bounce_client(0))
749         d.addCallback(self.log, "bounced client0")
750
751         d.addCallback(self._check_publish1)
752         d.addCallback(self.log, "did _check_publish1")
753         d.addCallback(self._check_publish2)
754         d.addCallback(self.log, "did _check_publish2")
755         d.addCallback(self._do_publish_private)
756         d.addCallback(self.log, "did _do_publish_private")
757         # now we also have (where "P" denotes a new dir):
758         #  P/personal/sekrit data
759         #  P/s2-rw -> /subdir1/subdir2/
760         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
761         d.addCallback(self._check_publish_private)
762         d.addCallback(self.log, "did _check_publish_private")
763         d.addCallback(self._test_web)
764         d.addCallback(self._test_control)
765         d.addCallback(self._test_cli)
766         # P now has four top-level children:
767         # P/personal/sekrit data
768         # P/s2-ro/
769         # P/s2-rw/
770         # P/test_put/  (empty)
771         d.addCallback(self._test_checker)
772         d.addCallback(self._grab_stats)
773         return d
774     test_vdrive.timeout = 1100
775
776     def _test_introweb(self, res):
777         d = getPage(self.introweb_url, method="GET", followRedirect=True)
778         def _check(res):
779             try:
780                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
781                                 in res)
782                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
783                 self.failUnless("Subscription Summary: storage: 5" in res)
784             except unittest.FailTest:
785                 print
786                 print "GET %s output was:" % self.introweb_url
787                 print res
788                 raise
789         d.addCallback(_check)
790         d.addCallback(lambda res:
791                       getPage(self.introweb_url + "?t=json",
792                               method="GET", followRedirect=True))
793         def _check_json(res):
794             data = simplejson.loads(res)
795             try:
796                 self.failUnlessEqual(data["subscription_summary"],
797                                      {"storage": 5})
798                 self.failUnlessEqual(data["announcement_summary"],
799                                      {"storage": 5, "stub_client": 5})
800             except unittest.FailTest:
801                 print
802                 print "GET %s?t=json output was:" % self.introweb_url
803                 print res
804                 raise
805         d.addCallback(_check_json)
806         return d
807
808     def _do_publish1(self, res):
809         ut = upload.Data(self.data, convergence=None)
810         c0 = self.clients[0]
811         d = c0.create_empty_dirnode()
812         def _made_root(new_dirnode):
813             self._root_directory_uri = new_dirnode.get_uri()
814             return c0.create_node_from_uri(self._root_directory_uri)
815         d.addCallback(_made_root)
816         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
817         def _made_subdir1(subdir1_node):
818             self._subdir1_node = subdir1_node
819             d1 = subdir1_node.add_file(u"mydata567", ut)
820             d1.addCallback(self.log, "publish finished")
821             def _stash_uri(filenode):
822                 self.uri = filenode.get_uri()
823             d1.addCallback(_stash_uri)
824             return d1
825         d.addCallback(_made_subdir1)
826         return d
827
828     def _do_publish2(self, res):
829         ut = upload.Data(self.data, convergence=None)
830         d = self._subdir1_node.create_empty_directory(u"subdir2")
831         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
832         return d
833
834     def log(self, res, *args, **kwargs):
835         # print "MSG: %s  RES: %s" % (msg, args)
836         log.msg(*args, **kwargs)
837         return res
838
839     def _do_publish_private(self, res):
840         self.smalldata = "sssh, very secret stuff"
841         ut = upload.Data(self.smalldata, convergence=None)
842         d = self.clients[0].create_empty_dirnode()
843         d.addCallback(self.log, "GOT private directory")
844         def _got_new_dir(privnode):
845             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
846             d1 = privnode.create_empty_directory(u"personal")
847             d1.addCallback(self.log, "made P/personal")
848             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
849             d1.addCallback(self.log, "made P/personal/sekrit data")
850             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
851             def _got_s2(s2node):
852                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
853                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
854                 return d2
855             d1.addCallback(_got_s2)
856             d1.addCallback(lambda res: privnode)
857             return d1
858         d.addCallback(_got_new_dir)
859         return d
860
861     def _check_publish1(self, res):
862         # this one uses the iterative API
863         c1 = self.clients[1]
864         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
865         d.addCallback(self.log, "check_publish1 got /")
866         d.addCallback(lambda root: root.get(u"subdir1"))
867         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
868         d.addCallback(lambda filenode: filenode.download_to_data())
869         d.addCallback(self.log, "get finished")
870         def _get_done(data):
871             self.failUnlessEqual(data, self.data)
872         d.addCallback(_get_done)
873         return d
874
875     def _check_publish2(self, res):
876         # this one uses the path-based API
877         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
878         d = rootnode.get_child_at_path(u"subdir1")
879         d.addCallback(lambda dirnode:
880                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
881         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
882         d.addCallback(lambda filenode: filenode.download_to_data())
883         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
884
885         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
886         def _got_filenode(filenode):
887             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
888             assert fnode == filenode
889         d.addCallback(_got_filenode)
890         return d
891
892     def _check_publish_private(self, resnode):
893         # this one uses the path-based API
894         self._private_node = resnode
895
896         d = self._private_node.get_child_at_path(u"personal")
897         def _got_personal(personal):
898             self._personal_node = personal
899             return personal
900         d.addCallback(_got_personal)
901
902         d.addCallback(lambda dirnode:
903                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
904         def get_path(path):
905             return self._private_node.get_child_at_path(path)
906
907         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
908         d.addCallback(lambda filenode: filenode.download_to_data())
909         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
910         d.addCallback(lambda res: get_path(u"s2-rw"))
911         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
912         d.addCallback(lambda res: get_path(u"s2-ro"))
913         def _got_s2ro(dirnode):
914             self.failUnless(dirnode.is_mutable(), dirnode)
915             self.failUnless(dirnode.is_readonly(), dirnode)
916             d1 = defer.succeed(None)
917             d1.addCallback(lambda res: dirnode.list())
918             d1.addCallback(self.log, "dirnode.list")
919
920             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
921
922             d1.addCallback(self.log, "doing add_file(ro)")
923             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
924             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
925
926             d1.addCallback(self.log, "doing get(ro)")
927             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
928             d1.addCallback(lambda filenode:
929                            self.failUnless(IFileNode.providedBy(filenode)))
930
931             d1.addCallback(self.log, "doing delete(ro)")
932             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
933
934             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
935
936             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
937
938             personal = self._personal_node
939             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
940
941             d1.addCallback(self.log, "doing move_child_to(ro)2")
942             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
943
944             d1.addCallback(self.log, "finished with _got_s2ro")
945             return d1
946         d.addCallback(_got_s2ro)
947         def _got_home(dummy):
948             home = self._private_node
949             personal = self._personal_node
950             d1 = defer.succeed(None)
951             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
952             d1.addCallback(lambda res:
953                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
954
955             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
956             d1.addCallback(lambda res:
957                            home.move_child_to(u"sekrit", home, u"sekrit data"))
958
959             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
960             d1.addCallback(lambda res:
961                            home.move_child_to(u"sekrit data", personal))
962
963             d1.addCallback(lambda res: home.build_manifest().when_done())
964             d1.addCallback(self.log, "manifest")
965             #  five items:
966             # P/
967             # P/personal/
968             # P/personal/sekrit data
969             # P/s2-rw  (same as P/s2-ro)
970             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
971             d1.addCallback(lambda manifest:
972                            self.failUnlessEqual(len(manifest), 5))
973             d1.addCallback(lambda res: home.start_deep_stats().when_done())
974             def _check_stats(stats):
975                 expected = {"count-immutable-files": 1,
976                             "count-mutable-files": 0,
977                             "count-literal-files": 1,
978                             "count-files": 2,
979                             "count-directories": 3,
980                             "size-immutable-files": 112,
981                             "size-literal-files": 23,
982                             #"size-directories": 616, # varies
983                             #"largest-directory": 616,
984                             "largest-directory-children": 3,
985                             "largest-immutable-file": 112,
986                             }
987                 for k,v in expected.iteritems():
988                     self.failUnlessEqual(stats[k], v,
989                                          "stats[%s] was %s, not %s" %
990                                          (k, stats[k], v))
991                 self.failUnless(stats["size-directories"] > 1300,
992                                 stats["size-directories"])
993                 self.failUnless(stats["largest-directory"] > 800,
994                                 stats["largest-directory"])
995                 self.failUnlessEqual(stats["size-files-histogram"],
996                                      [ (11, 31, 1), (101, 316, 1) ])
997             d1.addCallback(_check_stats)
998             return d1
999         d.addCallback(_got_home)
1000         return d
1001
1002     def shouldFail(self, res, expected_failure, which, substring=None):
1003         if isinstance(res, Failure):
1004             res.trap(expected_failure)
1005             if substring:
1006                 self.failUnless(substring in str(res),
1007                                 "substring '%s' not in '%s'"
1008                                 % (substring, str(res)))
1009         else:
1010             self.fail("%s was supposed to raise %s, not get '%s'" %
1011                       (which, expected_failure, res))
1012
1013     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1014         assert substring is None or isinstance(substring, str)
1015         d = defer.maybeDeferred(callable, *args, **kwargs)
1016         def done(res):
1017             if isinstance(res, Failure):
1018                 res.trap(expected_failure)
1019                 if substring:
1020                     self.failUnless(substring in str(res),
1021                                     "substring '%s' not in '%s'"
1022                                     % (substring, str(res)))
1023             else:
1024                 self.fail("%s was supposed to raise %s, not get '%s'" %
1025                           (which, expected_failure, res))
1026         d.addBoth(done)
1027         return d
1028
1029     def PUT(self, urlpath, data):
1030         url = self.webish_url + urlpath
1031         return getPage(url, method="PUT", postdata=data)
1032
1033     def GET(self, urlpath, followRedirect=False):
1034         url = self.webish_url + urlpath
1035         return getPage(url, method="GET", followRedirect=followRedirect)
1036
1037     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1038         if use_helper:
1039             url = self.helper_webish_url + urlpath
1040         else:
1041             url = self.webish_url + urlpath
1042         sepbase = "boogabooga"
1043         sep = "--" + sepbase
1044         form = []
1045         form.append(sep)
1046         form.append('Content-Disposition: form-data; name="_charset"')
1047         form.append('')
1048         form.append('UTF-8')
1049         form.append(sep)
1050         for name, value in fields.iteritems():
1051             if isinstance(value, tuple):
1052                 filename, value = value
1053                 form.append('Content-Disposition: form-data; name="%s"; '
1054                             'filename="%s"' % (name, filename.encode("utf-8")))
1055             else:
1056                 form.append('Content-Disposition: form-data; name="%s"' % name)
1057             form.append('')
1058             form.append(str(value))
1059             form.append(sep)
1060         form[-1] += "--"
1061         body = "\r\n".join(form) + "\r\n"
1062         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1063                    }
1064         return getPage(url, method="POST", postdata=body,
1065                        headers=headers, followRedirect=followRedirect)
1066
1067     def _test_web(self, res):
1068         base = self.webish_url
1069         public = "uri/" + self._root_directory_uri
1070         d = getPage(base)
1071         def _got_welcome(page):
1072             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1073             self.failUnless(expected in page,
1074                             "I didn't see the right 'connected storage servers'"
1075                             " message in: %s" % page
1076                             )
1077             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1078             self.failUnless(expected in page,
1079                             "I didn't see the right 'My nodeid' message "
1080                             "in: %s" % page)
1081             self.failUnless("Helper: 0 active uploads" in page)
1082         d.addCallback(_got_welcome)
1083         d.addCallback(self.log, "done with _got_welcome")
1084
1085         # get the welcome page from the node that uses the helper too
1086         d.addCallback(lambda res: getPage(self.helper_webish_url))
1087         def _got_welcome_helper(page):
1088             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1089                             page)
1090             self.failUnless("Not running helper" in page)
1091         d.addCallback(_got_welcome_helper)
1092
1093         d.addCallback(lambda res: getPage(base + public))
1094         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1095         def _got_subdir1(page):
1096             # there ought to be an href for our file
1097             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1098             self.failUnless(">mydata567</a>" in page)
1099         d.addCallback(_got_subdir1)
1100         d.addCallback(self.log, "done with _got_subdir1")
1101         d.addCallback(lambda res:
1102                       getPage(base + public + "/subdir1/mydata567"))
1103         def _got_data(page):
1104             self.failUnlessEqual(page, self.data)
1105         d.addCallback(_got_data)
1106
1107         # download from a URI embedded in a URL
1108         d.addCallback(self.log, "_get_from_uri")
1109         def _get_from_uri(res):
1110             return getPage(base + "uri/%s?filename=%s"
1111                            % (self.uri, "mydata567"))
1112         d.addCallback(_get_from_uri)
1113         def _got_from_uri(page):
1114             self.failUnlessEqual(page, self.data)
1115         d.addCallback(_got_from_uri)
1116
1117         # download from a URI embedded in a URL, second form
1118         d.addCallback(self.log, "_get_from_uri2")
1119         def _get_from_uri2(res):
1120             return getPage(base + "uri?uri=%s" % (self.uri,))
1121         d.addCallback(_get_from_uri2)
1122         d.addCallback(_got_from_uri)
1123
1124         # download from a bogus URI, make sure we get a reasonable error
1125         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1126         def _get_from_bogus_uri(res):
1127             d1 = getPage(base + "uri/%s?filename=%s"
1128                          % (self.mangle_uri(self.uri), "mydata567"))
1129             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1130                        "410")
1131             return d1
1132         d.addCallback(_get_from_bogus_uri)
1133         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1134
1135         # upload a file with PUT
1136         d.addCallback(self.log, "about to try PUT")
1137         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1138                                            "new.txt contents"))
1139         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1140         d.addCallback(self.failUnlessEqual, "new.txt contents")
1141         # and again with something large enough to use multiple segments,
1142         # and hopefully trigger pauseProducing too
1143         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1144                                            "big" * 500000)) # 1.5MB
1145         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1146         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1147
1148         # can we replace files in place?
1149         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1150                                            "NEWER contents"))
1151         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1152         d.addCallback(self.failUnlessEqual, "NEWER contents")
1153
1154         # test unlinked POST
1155         d.addCallback(lambda res: self.POST("uri", t="upload",
1156                                             file=("new.txt", "data" * 10000)))
1157         # and again using the helper, which exercises different upload-status
1158         # display code
1159         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1160                                             file=("foo.txt", "data2" * 10000)))
1161
1162         # check that the status page exists
1163         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1164         def _got_status(res):
1165             # find an interesting upload and download to look at. LIT files
1166             # are not interesting.
1167             for ds in self.clients[0].list_all_download_statuses():
1168                 if ds.get_size() > 200:
1169                     self._down_status = ds.get_counter()
1170             for us in self.clients[0].list_all_upload_statuses():
1171                 if us.get_size() > 200:
1172                     self._up_status = us.get_counter()
1173             rs = self.clients[0].list_all_retrieve_statuses()[0]
1174             self._retrieve_status = rs.get_counter()
1175             ps = self.clients[0].list_all_publish_statuses()[0]
1176             self._publish_status = ps.get_counter()
1177             us = self.clients[0].list_all_mapupdate_statuses()[0]
1178             self._update_status = us.get_counter()
1179
1180             # and that there are some upload- and download- status pages
1181             return self.GET("status/up-%d" % self._up_status)
1182         d.addCallback(_got_status)
1183         def _got_up(res):
1184             return self.GET("status/down-%d" % self._down_status)
1185         d.addCallback(_got_up)
1186         def _got_down(res):
1187             return self.GET("status/mapupdate-%d" % self._update_status)
1188         d.addCallback(_got_down)
1189         def _got_update(res):
1190             return self.GET("status/publish-%d" % self._publish_status)
1191         d.addCallback(_got_update)
1192         def _got_publish(res):
1193             return self.GET("status/retrieve-%d" % self._retrieve_status)
1194         d.addCallback(_got_publish)
1195
1196         # check that the helper status page exists
1197         d.addCallback(lambda res:
1198                       self.GET("helper_status", followRedirect=True))
1199         def _got_helper_status(res):
1200             self.failUnless("Bytes Fetched:" in res)
1201             # touch a couple of files in the helper's working directory to
1202             # exercise more code paths
1203             workdir = os.path.join(self.getdir("client0"), "helper")
1204             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1205             f = open(incfile, "wb")
1206             f.write("small file")
1207             f.close()
1208             then = time.time() - 86400*3
1209             now = time.time()
1210             os.utime(incfile, (now, then))
1211             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1212             f = open(encfile, "wb")
1213             f.write("less small file")
1214             f.close()
1215             os.utime(encfile, (now, then))
1216         d.addCallback(_got_helper_status)
1217         # and that the json form exists
1218         d.addCallback(lambda res:
1219                       self.GET("helper_status?t=json", followRedirect=True))
1220         def _got_helper_status_json(res):
1221             data = simplejson.loads(res)
1222             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1223                                  1)
1224             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1225             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1226             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1227                                  10)
1228             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1229             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1230             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1231                                  15)
1232         d.addCallback(_got_helper_status_json)
1233
1234         # and check that client[3] (which uses a helper but does not run one
1235         # itself) doesn't explode when you ask for its status
1236         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1237         def _got_non_helper_status(res):
1238             self.failUnless("Upload and Download Status" in res)
1239         d.addCallback(_got_non_helper_status)
1240
1241         # or for helper status with t=json
1242         d.addCallback(lambda res:
1243                       getPage(self.helper_webish_url + "helper_status?t=json"))
1244         def _got_non_helper_status_json(res):
1245             data = simplejson.loads(res)
1246             self.failUnlessEqual(data, {})
1247         d.addCallback(_got_non_helper_status_json)
1248
1249         # see if the statistics page exists
1250         d.addCallback(lambda res: self.GET("statistics"))
1251         def _got_stats(res):
1252             self.failUnless("Node Statistics" in res)
1253             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1254         d.addCallback(_got_stats)
1255         d.addCallback(lambda res: self.GET("statistics?t=json"))
1256         def _got_stats_json(res):
1257             data = simplejson.loads(res)
1258             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1259             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1260         d.addCallback(_got_stats_json)
1261
1262         # TODO: mangle the second segment of a file, to test errors that
1263         # occur after we've already sent some good data, which uses a
1264         # different error path.
1265
1266         # TODO: download a URI with a form
1267         # TODO: create a directory by using a form
1268         # TODO: upload by using a form on the directory page
1269         #    url = base + "somedir/subdir1/freeform_post!!upload"
1270         # TODO: delete a file by using a button on the directory page
1271
1272         return d
1273
1274     def _test_runner(self, res):
1275         # exercise some of the diagnostic tools in runner.py
1276
1277         # find a share
1278         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1279             if "storage" not in dirpath:
1280                 continue
1281             if not filenames:
1282                 continue
1283             pieces = dirpath.split(os.sep)
1284             if pieces[-4] == "storage" and pieces[-3] == "shares":
1285                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1286                 # are sharefiles here
1287                 filename = os.path.join(dirpath, filenames[0])
1288                 # peek at the magic to see if it is a chk share
1289                 magic = open(filename, "rb").read(4)
1290                 if magic == '\x00\x00\x00\x01':
1291                     break
1292         else:
1293             self.fail("unable to find any uri_extension files in %s"
1294                       % self.basedir)
1295         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1296
1297         out,err = StringIO(), StringIO()
1298         rc = runner.runner(["debug", "dump-share", "--offsets",
1299                             filename],
1300                            stdout=out, stderr=err)
1301         output = out.getvalue()
1302         self.failUnlessEqual(rc, 0)
1303
1304         # we only upload a single file, so we can assert some things about
1305         # its size and shares.
1306         self.failUnless(("share filename: %s" % filename) in output)
1307         self.failUnless("size: %d\n" % len(self.data) in output)
1308         self.failUnless("num_segments: 1\n" in output)
1309         # segment_size is always a multiple of needed_shares
1310         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1311         self.failUnless("total_shares: 10\n" in output)
1312         # keys which are supposed to be present
1313         for key in ("size", "num_segments", "segment_size",
1314                     "needed_shares", "total_shares",
1315                     "codec_name", "codec_params", "tail_codec_params",
1316                     #"plaintext_hash", "plaintext_root_hash",
1317                     "crypttext_hash", "crypttext_root_hash",
1318                     "share_root_hash", "UEB_hash"):
1319             self.failUnless("%s: " % key in output, key)
1320         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1321
1322         # now use its storage index to find the other shares using the
1323         # 'find-shares' tool
1324         sharedir, shnum = os.path.split(filename)
1325         storagedir, storage_index_s = os.path.split(sharedir)
1326         out,err = StringIO(), StringIO()
1327         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1328         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1329         rc = runner.runner(cmd, stdout=out, stderr=err)
1330         self.failUnlessEqual(rc, 0)
1331         out.seek(0)
1332         sharefiles = [sfn.strip() for sfn in out.readlines()]
1333         self.failUnlessEqual(len(sharefiles), 10)
1334
1335         # also exercise the 'catalog-shares' tool
1336         out,err = StringIO(), StringIO()
1337         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1338         cmd = ["debug", "catalog-shares"] + nodedirs
1339         rc = runner.runner(cmd, stdout=out, stderr=err)
1340         self.failUnlessEqual(rc, 0)
1341         out.seek(0)
1342         descriptions = [sfn.strip() for sfn in out.readlines()]
1343         self.failUnlessEqual(len(descriptions), 30)
1344         matching = [line
1345                     for line in descriptions
1346                     if line.startswith("CHK %s " % storage_index_s)]
1347         self.failUnlessEqual(len(matching), 10)
1348
1349     def _test_control(self, res):
1350         # exercise the remote-control-the-client foolscap interfaces in
1351         # allmydata.control (mostly used for performance tests)
1352         c0 = self.clients[0]
1353         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1354         control_furl = open(control_furl_file, "r").read().strip()
1355         # it doesn't really matter which Tub we use to connect to the client,
1356         # so let's just use our IntroducerNode's
1357         d = self.introducer.tub.getReference(control_furl)
1358         d.addCallback(self._test_control2, control_furl_file)
1359         return d
1360     def _test_control2(self, rref, filename):
1361         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1362         downfile = os.path.join(self.basedir, "control.downfile")
1363         d.addCallback(lambda uri:
1364                       rref.callRemote("download_from_uri_to_file",
1365                                       uri, downfile))
1366         def _check(res):
1367             self.failUnlessEqual(res, downfile)
1368             data = open(downfile, "r").read()
1369             expected_data = open(filename, "r").read()
1370             self.failUnlessEqual(data, expected_data)
1371         d.addCallback(_check)
1372         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1373         if sys.platform == "linux2":
1374             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1375         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1376         return d
1377
1378     def _test_cli(self, res):
1379         # run various CLI commands (in a thread, since they use blocking
1380         # network calls)
1381
1382         private_uri = self._private_node.get_uri()
1383         some_uri = self._root_directory_uri
1384         client0_basedir = self.getdir("client0")
1385
1386         nodeargs = [
1387             "--node-directory", client0_basedir,
1388             ]
1389         TESTDATA = "I will not write the same thing over and over.\n" * 100
1390
1391         d = defer.succeed(None)
1392
1393         # for compatibility with earlier versions, private/root_dir.cap is
1394         # supposed to be treated as an alias named "tahoe:". Start by making
1395         # sure that works, before we add other aliases.
1396
1397         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1398         f = open(root_file, "w")
1399         f.write(private_uri)
1400         f.close()
1401
1402         def run(ignored, verb, *args, **kwargs):
1403             stdin = kwargs.get("stdin", "")
1404             newargs = [verb] + nodeargs + list(args)
1405             return self._run_cli(newargs, stdin=stdin)
1406
1407         def _check_ls((out,err), expected_children, unexpected_children=[]):
1408             self.failUnlessEqual(err, "")
1409             for s in expected_children:
1410                 self.failUnless(s in out, (s,out))
1411             for s in unexpected_children:
1412                 self.failIf(s in out, (s,out))
1413
1414         def _check_ls_root((out,err)):
1415             self.failUnless("personal" in out)
1416             self.failUnless("s2-ro" in out)
1417             self.failUnless("s2-rw" in out)
1418             self.failUnlessEqual(err, "")
1419
1420         # this should reference private_uri
1421         d.addCallback(run, "ls")
1422         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1423
1424         d.addCallback(run, "list-aliases")
1425         def _check_aliases_1((out,err)):
1426             self.failUnlessEqual(err, "")
1427             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1428         d.addCallback(_check_aliases_1)
1429
1430         # now that that's out of the way, remove root_dir.cap and work with
1431         # new files
1432         d.addCallback(lambda res: os.unlink(root_file))
1433         d.addCallback(run, "list-aliases")
1434         def _check_aliases_2((out,err)):
1435             self.failUnlessEqual(err, "")
1436             self.failUnlessEqual(out, "")
1437         d.addCallback(_check_aliases_2)
1438
1439         d.addCallback(run, "mkdir")
1440         def _got_dir( (out,err) ):
1441             self.failUnless(uri.from_string_dirnode(out.strip()))
1442             return out.strip()
1443         d.addCallback(_got_dir)
1444         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1445
1446         d.addCallback(run, "list-aliases")
1447         def _check_aliases_3((out,err)):
1448             self.failUnlessEqual(err, "")
1449             self.failUnless("tahoe: " in out)
1450         d.addCallback(_check_aliases_3)
1451
1452         def _check_empty_dir((out,err)):
1453             self.failUnlessEqual(out, "")
1454             self.failUnlessEqual(err, "")
1455         d.addCallback(run, "ls")
1456         d.addCallback(_check_empty_dir)
1457
1458         def _check_missing_dir((out,err)):
1459             # TODO: check that rc==2
1460             self.failUnlessEqual(out, "")
1461             self.failUnlessEqual(err, "No such file or directory\n")
1462         d.addCallback(run, "ls", "bogus")
1463         d.addCallback(_check_missing_dir)
1464
1465         files = []
1466         datas = []
1467         for i in range(10):
1468             fn = os.path.join(self.basedir, "file%d" % i)
1469             files.append(fn)
1470             data = "data to be uploaded: file%d\n" % i
1471             datas.append(data)
1472             open(fn,"wb").write(data)
1473
1474         def _check_stdout_against((out,err), filenum=None, data=None):
1475             self.failUnlessEqual(err, "")
1476             if filenum is not None:
1477                 self.failUnlessEqual(out, datas[filenum])
1478             if data is not None:
1479                 self.failUnlessEqual(out, data)
1480
1481         # test all both forms of put: from a file, and from stdin
1482         #  tahoe put bar FOO
1483         d.addCallback(run, "put", files[0], "tahoe-file0")
1484         def _put_out((out,err)):
1485             self.failUnless("URI:LIT:" in out, out)
1486             self.failUnless("201 Created" in err, err)
1487             uri0 = out.strip()
1488             return run(None, "get", uri0)
1489         d.addCallback(_put_out)
1490         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1491
1492         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1493         #  tahoe put bar tahoe:FOO
1494         d.addCallback(run, "put", files[2], "tahoe:file2")
1495         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1496         def _check_put_mutable((out,err)):
1497             self._mutable_file3_uri = out.strip()
1498         d.addCallback(_check_put_mutable)
1499         d.addCallback(run, "get", "tahoe:file3")
1500         d.addCallback(_check_stdout_against, 3)
1501
1502         #  tahoe put FOO
1503         STDIN_DATA = "This is the file to upload from stdin."
1504         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1505         #  tahoe put tahoe:FOO
1506         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1507                       stdin="Other file from stdin.")
1508
1509         d.addCallback(run, "ls")
1510         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1511                                   "tahoe-file-stdin", "from-stdin"])
1512         d.addCallback(run, "ls", "subdir")
1513         d.addCallback(_check_ls, ["tahoe-file1"])
1514
1515         # tahoe mkdir FOO
1516         d.addCallback(run, "mkdir", "subdir2")
1517         d.addCallback(run, "ls")
1518         # TODO: extract the URI, set an alias with it
1519         d.addCallback(_check_ls, ["subdir2"])
1520
1521         # tahoe get: (to stdin and to a file)
1522         d.addCallback(run, "get", "tahoe-file0")
1523         d.addCallback(_check_stdout_against, 0)
1524         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1525         d.addCallback(_check_stdout_against, 1)
1526         outfile0 = os.path.join(self.basedir, "outfile0")
1527         d.addCallback(run, "get", "file2", outfile0)
1528         def _check_outfile0((out,err)):
1529             data = open(outfile0,"rb").read()
1530             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1531         d.addCallback(_check_outfile0)
1532         outfile1 = os.path.join(self.basedir, "outfile0")
1533         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1534         def _check_outfile1((out,err)):
1535             data = open(outfile1,"rb").read()
1536             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1537         d.addCallback(_check_outfile1)
1538
1539         d.addCallback(run, "rm", "tahoe-file0")
1540         d.addCallback(run, "rm", "tahoe:file2")
1541         d.addCallback(run, "ls")
1542         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1543
1544         d.addCallback(run, "ls", "-l")
1545         def _check_ls_l((out,err)):
1546             lines = out.split("\n")
1547             for l in lines:
1548                 if "tahoe-file-stdin" in l:
1549                     self.failUnless(l.startswith("-r-- "), l)
1550                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1551                 if "file3" in l:
1552                     self.failUnless(l.startswith("-rw- "), l) # mutable
1553         d.addCallback(_check_ls_l)
1554
1555         d.addCallback(run, "ls", "--uri")
1556         def _check_ls_uri((out,err)):
1557             lines = out.split("\n")
1558             for l in lines:
1559                 if "file3" in l:
1560                     self.failUnless(self._mutable_file3_uri in l)
1561         d.addCallback(_check_ls_uri)
1562
1563         d.addCallback(run, "ls", "--readonly-uri")
1564         def _check_ls_rouri((out,err)):
1565             lines = out.split("\n")
1566             for l in lines:
1567                 if "file3" in l:
1568                     rw_uri = self._mutable_file3_uri
1569                     u = uri.from_string_mutable_filenode(rw_uri)
1570                     ro_uri = u.get_readonly().to_string()
1571                     self.failUnless(ro_uri in l)
1572         d.addCallback(_check_ls_rouri)
1573
1574
1575         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1576         d.addCallback(run, "ls")
1577         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1578
1579         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1580         d.addCallback(run, "ls")
1581         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1582
1583         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1584         d.addCallback(run, "ls")
1585         d.addCallback(_check_ls, ["file3", "file3-copy"])
1586         d.addCallback(run, "get", "tahoe:file3-copy")
1587         d.addCallback(_check_stdout_against, 3)
1588
1589         # copy from disk into tahoe
1590         d.addCallback(run, "cp", files[4], "tahoe:file4")
1591         d.addCallback(run, "ls")
1592         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1593         d.addCallback(run, "get", "tahoe:file4")
1594         d.addCallback(_check_stdout_against, 4)
1595
1596         # copy from tahoe into disk
1597         target_filename = os.path.join(self.basedir, "file-out")
1598         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1599         def _check_cp_out((out,err)):
1600             self.failUnless(os.path.exists(target_filename))
1601             got = open(target_filename,"rb").read()
1602             self.failUnlessEqual(got, datas[4])
1603         d.addCallback(_check_cp_out)
1604
1605         # copy from disk to disk (silly case)
1606         target2_filename = os.path.join(self.basedir, "file-out-copy")
1607         d.addCallback(run, "cp", target_filename, target2_filename)
1608         def _check_cp_out2((out,err)):
1609             self.failUnless(os.path.exists(target2_filename))
1610             got = open(target2_filename,"rb").read()
1611             self.failUnlessEqual(got, datas[4])
1612         d.addCallback(_check_cp_out2)
1613
1614         # copy from tahoe into disk, overwriting an existing file
1615         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1616         def _check_cp_out3((out,err)):
1617             self.failUnless(os.path.exists(target_filename))
1618             got = open(target_filename,"rb").read()
1619             self.failUnlessEqual(got, datas[3])
1620         d.addCallback(_check_cp_out3)
1621
1622         # copy from disk into tahoe, overwriting an existing immutable file
1623         d.addCallback(run, "cp", files[5], "tahoe:file4")
1624         d.addCallback(run, "ls")
1625         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1626         d.addCallback(run, "get", "tahoe:file4")
1627         d.addCallback(_check_stdout_against, 5)
1628
1629         # copy from disk into tahoe, overwriting an existing mutable file
1630         d.addCallback(run, "cp", files[5], "tahoe:file3")
1631         d.addCallback(run, "ls")
1632         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1633         d.addCallback(run, "get", "tahoe:file3")
1634         d.addCallback(_check_stdout_against, 5)
1635
1636         # recursive copy: setup
1637         dn = os.path.join(self.basedir, "dir1")
1638         os.makedirs(dn)
1639         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1640         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1641         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1642         sdn2 = os.path.join(dn, "subdir2")
1643         os.makedirs(sdn2)
1644         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1645         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1646
1647         # from disk into tahoe
1648         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1649         d.addCallback(run, "ls")
1650         d.addCallback(_check_ls, ["dir1"])
1651         d.addCallback(run, "ls", "dir1")
1652         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1653                       ["rfile4", "rfile5"])
1654         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1655         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1656                       ["rfile1", "rfile2", "rfile3"])
1657         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1658         d.addCallback(_check_stdout_against, data="rfile4")
1659
1660         # and back out again
1661         dn_copy = os.path.join(self.basedir, "dir1-copy")
1662         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1663         def _check_cp_r_out((out,err)):
1664             def _cmp(name):
1665                 old = open(os.path.join(dn, name), "rb").read()
1666                 newfn = os.path.join(dn_copy, name)
1667                 self.failUnless(os.path.exists(newfn))
1668                 new = open(newfn, "rb").read()
1669                 self.failUnlessEqual(old, new)
1670             _cmp("rfile1")
1671             _cmp("rfile2")
1672             _cmp("rfile3")
1673             _cmp(os.path.join("subdir2", "rfile4"))
1674             _cmp(os.path.join("subdir2", "rfile5"))
1675         d.addCallback(_check_cp_r_out)
1676
1677         # and copy it a second time, which ought to overwrite the same files
1678         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1679
1680         # and tahoe-to-tahoe
1681         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1682         d.addCallback(run, "ls")
1683         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1684         d.addCallback(run, "ls", "dir1-copy")
1685         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1686                       ["rfile4", "rfile5"])
1687         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1688         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1689                       ["rfile1", "rfile2", "rfile3"])
1690         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1691         d.addCallback(_check_stdout_against, data="rfile4")
1692
1693         # and copy it a second time, which ought to overwrite the same files
1694         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1695
1696         # tahoe_ls doesn't currently handle the error correctly: it tries to
1697         # JSON-parse a traceback.
1698 ##         def _ls_missing(res):
1699 ##             argv = ["ls"] + nodeargs + ["bogus"]
1700 ##             return self._run_cli(argv)
1701 ##         d.addCallback(_ls_missing)
1702 ##         def _check_ls_missing((out,err)):
1703 ##             print "OUT", out
1704 ##             print "ERR", err
1705 ##             self.failUnlessEqual(err, "")
1706 ##         d.addCallback(_check_ls_missing)
1707
1708         return d
1709
1710     def _run_cli(self, argv, stdin=""):
1711         #print "CLI:", argv
1712         stdout, stderr = StringIO(), StringIO()
1713         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1714                                   stdin=StringIO(stdin),
1715                                   stdout=stdout, stderr=stderr)
1716         def _done(res):
1717             return stdout.getvalue(), stderr.getvalue()
1718         d.addCallback(_done)
1719         return d
1720
1721     def _test_checker(self, res):
1722         ut = upload.Data("too big to be literal" * 200, convergence=None)
1723         d = self._personal_node.add_file(u"big file", ut)
1724
1725         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1726         def _check_dirnode_results(r):
1727             self.failUnless(r.is_healthy())
1728         d.addCallback(_check_dirnode_results)
1729         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1730         d.addCallback(_check_dirnode_results)
1731
1732         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1733         def _got_chk_filenode(n):
1734             self.failUnless(isinstance(n, filenode.FileNode))
1735             d = n.check(Monitor())
1736             def _check_filenode_results(r):
1737                 self.failUnless(r.is_healthy())
1738             d.addCallback(_check_filenode_results)
1739             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1740             d.addCallback(_check_filenode_results)
1741             return d
1742         d.addCallback(_got_chk_filenode)
1743
1744         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1745         def _got_lit_filenode(n):
1746             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1747             d = n.check(Monitor())
1748             def _check_lit_filenode_results(r):
1749                 self.failUnlessEqual(r, None)
1750             d.addCallback(_check_lit_filenode_results)
1751             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1752             d.addCallback(_check_lit_filenode_results)
1753             return d
1754         d.addCallback(_got_lit_filenode)
1755         return d
1756
1757
1758 class MutableChecker(SystemTestMixin, unittest.TestCase, WebErrorMixin):
1759
1760     def _run_cli(self, argv):
1761         stdout, stderr = StringIO(), StringIO()
1762         runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1763         return stdout.getvalue()
1764
1765     def test_good(self):
1766         self.basedir = self.mktemp()
1767         d = self.set_up_nodes()
1768         CONTENTS = "a little bit of data"
1769         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1770         def _created(node):
1771             self.node = node
1772             si = self.node.get_storage_index()
1773         d.addCallback(_created)
1774         # now make sure the webapi verifier sees no problems
1775         def _do_check(res):
1776             url = (self.webish_url +
1777                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1778                    "?t=check&verify=true")
1779             return getPage(url, method="POST")
1780         d.addCallback(_do_check)
1781         def _got_results(out):
1782             self.failUnless("<span>Healthy!</span>" in out, out)
1783             self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1784             self.failIf("Not Healthy!" in out, out)
1785             self.failIf("Unhealthy" in out, out)
1786             self.failIf("Corrupt Shares" in out, out)
1787         d.addCallback(_got_results)
1788         d.addErrback(self.explain_web_error)
1789         return d
1790
1791     def test_corrupt(self):
1792         self.basedir = self.mktemp()
1793         d = self.set_up_nodes()
1794         CONTENTS = "a little bit of data"
1795         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1796         def _created(node):
1797             self.node = node
1798             si = self.node.get_storage_index()
1799             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1800                                 self.clients[1].basedir])
1801             files = out.split("\n")
1802             # corrupt one of them, using the CLI debug command
1803             f = files[0]
1804             shnum = os.path.basename(f)
1805             nodeid = self.clients[1].nodeid
1806             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1807             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1808             out = self._run_cli(["debug", "corrupt-share", files[0]])
1809         d.addCallback(_created)
1810         # now make sure the webapi verifier notices it
1811         def _do_check(res):
1812             url = (self.webish_url +
1813                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1814                    "?t=check&verify=true")
1815             return getPage(url, method="POST")
1816         d.addCallback(_do_check)
1817         def _got_results(out):
1818             self.failUnless("Not Healthy!" in out, out)
1819             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1820             self.failUnless("Corrupt Shares:" in out, out)
1821         d.addCallback(_got_results)
1822
1823         # now make sure the webapi repairer can fix it
1824         def _do_repair(res):
1825             url = (self.webish_url +
1826                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1827                    "?t=check&verify=true&repair=true")
1828             return getPage(url, method="POST")
1829         d.addCallback(_do_repair)
1830         def _got_repair_results(out):
1831             self.failUnless("<div>Repair successful</div>" in out, out)
1832         d.addCallback(_got_repair_results)
1833         d.addCallback(_do_check)
1834         def _got_postrepair_results(out):
1835             self.failIf("Not Healthy!" in out, out)
1836             self.failUnless("Recoverable Versions: 10*seq" in out, out)
1837         d.addCallback(_got_postrepair_results)
1838         d.addErrback(self.explain_web_error)
1839
1840         return d
1841
1842     def test_delete_share(self):
1843         self.basedir = self.mktemp()
1844         d = self.set_up_nodes()
1845         CONTENTS = "a little bit of data"
1846         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1847         def _created(node):
1848             self.node = node
1849             si = self.node.get_storage_index()
1850             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1851                                 self.clients[1].basedir])
1852             files = out.split("\n")
1853             # corrupt one of them, using the CLI debug command
1854             f = files[0]
1855             shnum = os.path.basename(f)
1856             nodeid = self.clients[1].nodeid
1857             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1858             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1859             os.unlink(files[0])
1860         d.addCallback(_created)
1861         # now make sure the webapi checker notices it
1862         def _do_check(res):
1863             url = (self.webish_url +
1864                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1865                    "?t=check&verify=false")
1866             return getPage(url, method="POST")
1867         d.addCallback(_do_check)
1868         def _got_results(out):
1869             self.failUnless("Not Healthy!" in out, out)
1870             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1871             self.failIf("Corrupt Shares" in out, out)
1872         d.addCallback(_got_results)
1873
1874         # now make sure the webapi repairer can fix it
1875         def _do_repair(res):
1876             url = (self.webish_url +
1877                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1878                    "?t=check&verify=false&repair=true")
1879             return getPage(url, method="POST")
1880         d.addCallback(_do_repair)
1881         def _got_repair_results(out):
1882             self.failUnless("Repair successful" in out)
1883         d.addCallback(_got_repair_results)
1884         d.addCallback(_do_check)
1885         def _got_postrepair_results(out):
1886             self.failIf("Not Healthy!" in out, out)
1887             self.failUnless("Recoverable Versions: 10*seq" in out)
1888         d.addCallback(_got_postrepair_results)
1889         d.addErrback(self.explain_web_error)
1890
1891         return d
1892
1893 class DeepCheckWeb(SystemTestMixin, unittest.TestCase, WebErrorMixin):
1894     # construct a small directory tree (with one dir, one immutable file, one
1895     # mutable file, one LIT file, and a loop), and then check/examine it in
1896     # various ways.
1897
1898     def set_up_tree(self, ignored):
1899         # 2.9s
1900         c0 = self.clients[0]
1901         d = c0.create_empty_dirnode()
1902         def _created_root(n):
1903             self.root = n
1904             self.root_uri = n.get_uri()
1905         d.addCallback(_created_root)
1906         d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
1907         d.addCallback(lambda n: self.root.set_node(u"mutable", n))
1908         def _created_mutable(n):
1909             self.mutable = n
1910             self.mutable_uri = n.get_uri()
1911         d.addCallback(_created_mutable)
1912
1913         large = upload.Data("Lots of data\n" * 1000, None)
1914         d.addCallback(lambda ign: self.root.add_file(u"large", large))
1915         def _created_large(n):
1916             self.large = n
1917             self.large_uri = n.get_uri()
1918         d.addCallback(_created_large)
1919
1920         small = upload.Data("Small enough for a LIT", None)
1921         d.addCallback(lambda ign: self.root.add_file(u"small", small))
1922         def _created_small(n):
1923             self.small = n
1924             self.small_uri = n.get_uri()
1925         d.addCallback(_created_small)
1926
1927         d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
1928         return d
1929
1930     def check_is_healthy(self, cr, n, where, incomplete=False):
1931         self.failUnless(ICheckerResults.providedBy(cr), where)
1932         self.failUnless(cr.is_healthy(), where)
1933         self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
1934                              where)
1935         self.failUnlessEqual(cr.get_storage_index_string(),
1936                              base32.b2a(n.get_storage_index()), where)
1937         needs_rebalancing = bool( len(self.clients) < 10 )
1938         if not incomplete:
1939             self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
1940         d = cr.get_data()
1941         self.failUnlessEqual(d["count-shares-good"], 10, where)
1942         self.failUnlessEqual(d["count-shares-needed"], 3, where)
1943         self.failUnlessEqual(d["count-shares-expected"], 10, where)
1944         if not incomplete:
1945             self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
1946         self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
1947         self.failUnlessEqual(d["list-corrupt-shares"], [], where)
1948         if not incomplete:
1949             self.failUnlessEqual(sorted(d["servers-responding"]),
1950                                  sorted([c.nodeid for c in self.clients]),
1951                                  where)
1952             self.failUnless("sharemap" in d, where)
1953             all_serverids = set()
1954             for (shareid, serverids) in d["sharemap"].items():
1955                 all_serverids.update(serverids)
1956             self.failUnlessEqual(sorted(all_serverids),
1957                                  sorted([c.nodeid for c in self.clients]),
1958                                  where)
1959
1960         self.failUnlessEqual(d["count-wrong-shares"], 0, where)
1961         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
1962         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
1963
1964
1965     def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
1966         self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
1967         self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
1968         self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
1969         self.failUnless(cr.get_post_repair_results().is_healthy(), where)
1970         self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
1971         self.failIf(cr.get_repair_attempted(), where)
1972
1973     def deep_check_is_healthy(self, cr, num_healthy, where):
1974         self.failUnless(IDeepCheckResults.providedBy(cr))
1975         self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
1976                              num_healthy, where)
1977
1978     def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
1979         self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
1980         c = cr.get_counters()
1981         self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
1982                              num_healthy, where)
1983         self.failUnlessEqual(c["count-objects-healthy-post-repair"],
1984                              num_healthy, where)
1985         self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
1986
1987     def test_good(self):
1988         self.basedir = self.mktemp()
1989         d = self.set_up_nodes()
1990         d.addCallback(self.set_up_tree)
1991         d.addCallback(self.do_stats)
1992         d.addCallback(self.do_test_good)
1993         d.addCallback(self.do_test_web)
1994         d.addErrback(self.explain_web_error)
1995         return d
1996
1997     def do_stats(self, ignored):
1998         d = defer.succeed(None)
1999         d.addCallback(lambda ign: self.root.start_deep_stats().when_done())
2000         d.addCallback(self.check_stats)
2001         return d
2002
2003     def check_stats(self, s):
2004         self.failUnlessEqual(s["count-directories"], 1)
2005         self.failUnlessEqual(s["count-files"], 3)
2006         self.failUnlessEqual(s["count-immutable-files"], 1)
2007         self.failUnlessEqual(s["count-literal-files"], 1)
2008         self.failUnlessEqual(s["count-mutable-files"], 1)
2009         # don't check directories: their size will vary
2010         # s["largest-directory"]
2011         # s["size-directories"]
2012         self.failUnlessEqual(s["largest-directory-children"], 4)
2013         self.failUnlessEqual(s["largest-immutable-file"], 13000)
2014         # to re-use this function for both the local
2015         # dirnode.start_deep_stats() and the webapi t=start-deep-stats, we
2016         # coerce the result into a list of tuples. dirnode.start_deep_stats()
2017         # returns a list of tuples, but JSON only knows about lists., so
2018         # t=start-deep-stats returns a list of lists.
2019         histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
2020         self.failUnlessEqual(histogram, [(11, 31, 1),
2021                                          (10001, 31622, 1),
2022                                          ])
2023         self.failUnlessEqual(s["size-immutable-files"], 13000)
2024         self.failUnlessEqual(s["size-literal-files"], 22)
2025
2026     def do_test_good(self, ignored):
2027         d = defer.succeed(None)
2028         # check the individual items
2029         d.addCallback(lambda ign: self.root.check(Monitor()))
2030         d.addCallback(self.check_is_healthy, self.root, "root")
2031         d.addCallback(lambda ign: self.mutable.check(Monitor()))
2032         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2033         d.addCallback(lambda ign: self.large.check(Monitor()))
2034         d.addCallback(self.check_is_healthy, self.large, "large")
2035         d.addCallback(lambda ign: self.small.check(Monitor()))
2036         d.addCallback(self.failUnlessEqual, None, "small")
2037
2038         # and again with verify=True
2039         d.addCallback(lambda ign: self.root.check(Monitor(), verify=True))
2040         d.addCallback(self.check_is_healthy, self.root, "root")
2041         d.addCallback(lambda ign: self.mutable.check(Monitor(), verify=True))
2042         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2043         d.addCallback(lambda ign: self.large.check(Monitor(), verify=True))
2044         d.addCallback(self.check_is_healthy, self.large, "large",
2045                       incomplete=True)
2046         d.addCallback(lambda ign: self.small.check(Monitor(), verify=True))
2047         d.addCallback(self.failUnlessEqual, None, "small")
2048
2049         # and check_and_repair(), which should be a nop
2050         d.addCallback(lambda ign: self.root.check_and_repair(Monitor()))
2051         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2052         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor()))
2053         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2054         d.addCallback(lambda ign: self.large.check_and_repair(Monitor()))
2055         d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2056         d.addCallback(lambda ign: self.small.check_and_repair(Monitor()))
2057         d.addCallback(self.failUnlessEqual, None, "small")
2058
2059         # check_and_repair(verify=True)
2060         d.addCallback(lambda ign: self.root.check_and_repair(Monitor(), verify=True))
2061         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2062         d.addCallback(lambda ign: self.mutable.check_and_repair(Monitor(), verify=True))
2063         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2064         d.addCallback(lambda ign: self.large.check_and_repair(Monitor(), verify=True))
2065         d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2066                       incomplete=True)
2067         d.addCallback(lambda ign: self.small.check_and_repair(Monitor(), verify=True))
2068         d.addCallback(self.failUnlessEqual, None, "small")
2069
2070
2071         # now deep-check the root, with various verify= and repair= options
2072         d.addCallback(lambda ign:
2073                       self.root.start_deep_check().when_done())
2074         d.addCallback(self.deep_check_is_healthy, 3, "root")
2075         d.addCallback(lambda ign:
2076                       self.root.start_deep_check(verify=True).when_done())
2077         d.addCallback(self.deep_check_is_healthy, 3, "root")
2078         d.addCallback(lambda ign:
2079                       self.root.start_deep_check_and_repair().when_done())
2080         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2081         d.addCallback(lambda ign:
2082                       self.root.start_deep_check_and_repair(verify=True).when_done())
2083         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2084
2085         # and finally, start a deep-check, but then cancel it.
2086         d.addCallback(lambda ign: self.root.start_deep_check())
2087         def _checking(monitor):
2088             monitor.cancel()
2089             d = monitor.when_done()
2090             # this should fire as soon as the next dirnode.list finishes.
2091             # TODO: add a counter to measure how many list() calls are made,
2092             # assert that no more than one gets to run before the cancel()
2093             # takes effect.
2094             def _finished_normally(res):
2095                 self.fail("this was supposed to fail, not finish normally")
2096             def _cancelled(f):
2097                 f.trap(OperationCancelledError)
2098             d.addCallbacks(_finished_normally, _cancelled)
2099             return d
2100         d.addCallback(_checking)
2101
2102         return d
2103
2104     def web_json(self, n, **kwargs):
2105         kwargs["output"] = "json"
2106         d = self.web(n, "POST", **kwargs)
2107         d.addCallback(self.decode_json)
2108         return d
2109
2110     def decode_json(self, (s,url)):
2111         try:
2112             data = simplejson.loads(s)
2113         except ValueError:
2114             self.fail("%s: not JSON: '%s'" % (url, s))
2115         return data
2116
2117     def web(self, n, method="GET", **kwargs):
2118         # returns (data, url)
2119         url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
2120                + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
2121         d = getPage(url, method=method)
2122         d.addCallback(lambda data: (data,url))
2123         return d
2124
2125     def wait_for_operation(self, ignored, ophandle):
2126         url = self.webish_url + "operations/" + ophandle
2127         url += "?t=status&output=JSON"
2128         d = getPage(url)
2129         def _got(res):
2130             try:
2131                 data = simplejson.loads(res)
2132             except ValueError:
2133                 self.fail("%s: not JSON: '%s'" % (url, res))
2134             if not data["finished"]:
2135                 d = self.stall(delay=1.0)
2136                 d.addCallback(self.wait_for_operation, ophandle)
2137                 return d
2138             return data
2139         d.addCallback(_got)
2140         return d
2141
2142     def get_operation_results(self, ignored, ophandle, output=None):
2143         url = self.webish_url + "operations/" + ophandle
2144         url += "?t=status"
2145         if output:
2146             url += "&output=" + output
2147         d = getPage(url)
2148         def _got(res):
2149             if output and output.lower() == "json":
2150                 try:
2151                     return simplejson.loads(res)
2152                 except ValueError:
2153                     self.fail("%s: not JSON: '%s'" % (url, res))
2154             return res
2155         d.addCallback(_got)
2156         return d
2157
2158     def slow_web(self, n, output=None, **kwargs):
2159         # use ophandle=
2160         handle = base32.b2a(os.urandom(4))
2161         d = self.web(n, "POST", ophandle=handle, **kwargs)
2162         d.addCallback(self.wait_for_operation, handle)
2163         d.addCallback(self.get_operation_results, handle, output=output)
2164         return d
2165
2166     def json_check_is_healthy(self, data, n, where, incomplete=False):
2167
2168         self.failUnlessEqual(data["storage-index"],
2169                              base32.b2a(n.get_storage_index()), where)
2170         r = data["results"]
2171         self.failUnlessEqual(r["healthy"], True, where)
2172         needs_rebalancing = bool( len(self.clients) < 10 )
2173         if not incomplete:
2174             self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2175         self.failUnlessEqual(r["count-shares-good"], 10, where)
2176         self.failUnlessEqual(r["count-shares-needed"], 3, where)
2177         self.failUnlessEqual(r["count-shares-expected"], 10, where)
2178         if not incomplete:
2179             self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2180         self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2181         self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2182         if not incomplete:
2183             self.failUnlessEqual(sorted(r["servers-responding"]),
2184                                  sorted([idlib.nodeid_b2a(c.nodeid)
2185                                          for c in self.clients]), where)
2186             self.failUnless("sharemap" in r, where)
2187             all_serverids = set()
2188             for (shareid, serverids_s) in r["sharemap"].items():
2189                 all_serverids.update(serverids_s)
2190             self.failUnlessEqual(sorted(all_serverids),
2191                                  sorted([idlib.nodeid_b2a(c.nodeid)
2192                                          for c in self.clients]), where)
2193         self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2194         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2195         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2196
2197     def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2198         self.failUnlessEqual(data["storage-index"],
2199                              base32.b2a(n.get_storage_index()), where)
2200         self.failUnlessEqual(data["repair-attempted"], False, where)
2201         self.json_check_is_healthy(data["pre-repair-results"],
2202                                    n, where, incomplete)
2203         self.json_check_is_healthy(data["post-repair-results"],
2204                                    n, where, incomplete)
2205
2206     def json_full_deepcheck_is_healthy(self, data, n, where):
2207         self.failUnlessEqual(data["root-storage-index"],
2208                              base32.b2a(n.get_storage_index()), where)
2209         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2210         self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2211         self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2212         self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2213         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2214         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2215         self.json_check_stats(data["stats"], where)
2216
2217     def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2218         self.failUnlessEqual(data["root-storage-index"],
2219                              base32.b2a(n.get_storage_index()), where)
2220         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2221
2222         self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2223         self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2224         self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2225
2226         self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2227         self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2228         self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2229
2230         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2231         self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2232         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2233
2234         self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2235         self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2236         self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2237
2238
2239     def json_check_lit(self, data, n, where):
2240         self.failUnlessEqual(data["storage-index"], "", where)
2241         self.failUnlessEqual(data["results"]["healthy"], True, where)
2242
2243     def json_check_stats(self, data, where):
2244         self.check_stats(data)
2245
2246     def do_test_web(self, ignored):
2247         d = defer.succeed(None)
2248
2249         # stats
2250         d.addCallback(lambda ign:
2251                       self.slow_web(self.root,
2252                                     t="start-deep-stats", output="json"))
2253         d.addCallback(self.json_check_stats, "deep-stats")
2254
2255         # check, no verify
2256         d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2257         d.addCallback(self.json_check_is_healthy, self.root, "root")
2258         d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2259         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2260         d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2261         d.addCallback(self.json_check_is_healthy, self.large, "large")
2262         d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2263         d.addCallback(self.json_check_lit, self.small, "small")
2264
2265         # check and verify
2266         d.addCallback(lambda ign:
2267                       self.web_json(self.root, t="check", verify="true"))
2268         d.addCallback(self.json_check_is_healthy, self.root, "root")
2269         d.addCallback(lambda ign:
2270                       self.web_json(self.mutable, t="check", verify="true"))
2271         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2272         d.addCallback(lambda ign:
2273                       self.web_json(self.large, t="check", verify="true"))
2274         d.addCallback(self.json_check_is_healthy, self.large, "large", incomplete=True)
2275         d.addCallback(lambda ign:
2276                       self.web_json(self.small, t="check", verify="true"))
2277         d.addCallback(self.json_check_lit, self.small, "small")
2278
2279         # check and repair, no verify
2280         d.addCallback(lambda ign:
2281                       self.web_json(self.root, t="check", repair="true"))
2282         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2283         d.addCallback(lambda ign:
2284                       self.web_json(self.mutable, t="check", repair="true"))
2285         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2286         d.addCallback(lambda ign:
2287                       self.web_json(self.large, t="check", repair="true"))
2288         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large")
2289         d.addCallback(lambda ign:
2290                       self.web_json(self.small, t="check", repair="true"))
2291         d.addCallback(self.json_check_lit, self.small, "small")
2292
2293         # check+verify+repair
2294         d.addCallback(lambda ign:
2295                       self.web_json(self.root, t="check", repair="true", verify="true"))
2296         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2297         d.addCallback(lambda ign:
2298                       self.web_json(self.mutable, t="check", repair="true", verify="true"))
2299         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2300         d.addCallback(lambda ign:
2301                       self.web_json(self.large, t="check", repair="true", verify="true"))
2302         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large", incomplete=True)
2303         d.addCallback(lambda ign:
2304                       self.web_json(self.small, t="check", repair="true", verify="true"))
2305         d.addCallback(self.json_check_lit, self.small, "small")
2306
2307         # now run a deep-check, with various verify= and repair= flags
2308         d.addCallback(lambda ign:
2309                       self.slow_web(self.root, t="start-deep-check", output="json"))
2310         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2311         d.addCallback(lambda ign:
2312                       self.slow_web(self.root, t="start-deep-check", verify="true",
2313                                     output="json"))
2314         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2315         d.addCallback(lambda ign:
2316                       self.slow_web(self.root, t="start-deep-check", repair="true",
2317                                     output="json"))
2318         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2319         d.addCallback(lambda ign:
2320                       self.slow_web(self.root, t="start-deep-check", verify="true", repair="true", output="json"))
2321         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2322
2323         # now look at t=info
2324         d.addCallback(lambda ign: self.web(self.root, t="info"))
2325         # TODO: examine the output
2326         d.addCallback(lambda ign: self.web(self.mutable, t="info"))
2327         d.addCallback(lambda ign: self.web(self.large, t="info"))
2328         d.addCallback(lambda ign: self.web(self.small, t="info"))
2329
2330         return d