]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
test_system: check t=deep-stats too
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, random, struct, sys, time, re, simplejson, urllib
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 from twisted.internet.error import ConnectionDone, ConnectionLost
8 import allmydata
9 from allmydata import uri, storage, offloaded
10 from allmydata.immutable import download, upload, filenode
11 from allmydata.util import idlib, mathutil, testutil
12 from allmydata.util import log, base32
13 from allmydata.scripts import runner
14 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
15      ICheckerResults, ICheckAndRepairResults, IDeepCheckResults, \
16      IDeepCheckAndRepairResults
17 from allmydata.mutable.common import NotMutableError
18 from allmydata.mutable import layout as mutable_layout
19 from foolscap import DeadReferenceError
20 from twisted.python.failure import Failure
21 from twisted.web.client import getPage
22 from twisted.web.error import Error
23
24 from allmydata.test.common import SystemTestMixin, ShareManglingMixin
25
26 LARGE_DATA = """
27 This is some data to publish to the virtual drive, which needs to be large
28 enough to not fit inside a LIT uri.
29 """
30
31 class CountingDataUploadable(upload.Data):
32     bytes_read = 0
33     interrupt_after = None
34     interrupt_after_d = None
35
36     def read(self, length):
37         self.bytes_read += length
38         if self.interrupt_after is not None:
39             if self.bytes_read > self.interrupt_after:
40                 self.interrupt_after = None
41                 self.interrupt_after_d.callback(self)
42         return upload.Data.read(self, length)
43
44
45 class SystemTest(SystemTestMixin, unittest.TestCase):
46
47     def test_connections(self):
48         self.basedir = "system/SystemTest/test_connections"
49         d = self.set_up_nodes()
50         self.extra_node = None
51         d.addCallback(lambda res: self.add_extra_node(self.numclients))
52         def _check(extra_node):
53             self.extra_node = extra_node
54             for c in self.clients:
55                 all_peerids = list(c.get_all_peerids())
56                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
57                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
58                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
59
60         d.addCallback(_check)
61         def _shutdown_extra_node(res):
62             if self.extra_node:
63                 return self.extra_node.stopService()
64             return res
65         d.addBoth(_shutdown_extra_node)
66         return d
67     test_connections.timeout = 300
68     # test_connections is subsumed by test_upload_and_download, and takes
69     # quite a while to run on a slow machine (because of all the TLS
70     # connections that must be established). If we ever rework the introducer
71     # code to such an extent that we're not sure if it works anymore, we can
72     # reinstate this test until it does.
73     del test_connections
74
75     def test_upload_and_download_random_key(self):
76         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
77         return self._test_upload_and_download(convergence=None)
78     test_upload_and_download_random_key.timeout = 4800
79
80     def test_upload_and_download_convergent(self):
81         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
82         return self._test_upload_and_download(convergence="some convergence string")
83     test_upload_and_download_convergent.timeout = 4800
84
85     def _test_upload_and_download(self, convergence):
86         # we use 4000 bytes of data, which will result in about 400k written
87         # to disk among all our simulated nodes
88         DATA = "Some data to upload\n" * 200
89         d = self.set_up_nodes()
90         def _check_connections(res):
91             for c in self.clients:
92                 all_peerids = list(c.get_all_peerids())
93                 self.failUnlessEqual(len(all_peerids), self.numclients)
94                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
95                 self.failUnlessEqual(len(permuted_peers), self.numclients)
96         d.addCallback(_check_connections)
97
98         def _do_upload(res):
99             log.msg("UPLOADING")
100             u = self.clients[0].getServiceNamed("uploader")
101             self.uploader = u
102             # we crank the max segsize down to 1024b for the duration of this
103             # test, so we can exercise multiple segments. It is important
104             # that this is not a multiple of the segment size, so that the
105             # tail segment is not the same length as the others. This actualy
106             # gets rounded up to 1025 to be a multiple of the number of
107             # required shares (since we use 25 out of 100 FEC).
108             up = upload.Data(DATA, convergence=convergence)
109             up.max_segment_size = 1024
110             d1 = u.upload(up)
111             return d1
112         d.addCallback(_do_upload)
113         def _upload_done(results):
114             uri = results.uri
115             log.msg("upload finished: uri is %s" % (uri,))
116             self.uri = uri
117             dl = self.clients[1].getServiceNamed("downloader")
118             self.downloader = dl
119         d.addCallback(_upload_done)
120
121         def _upload_again(res):
122             # Upload again. If using convergent encryption then this ought to be
123             # short-circuited, however with the way we currently generate URIs
124             # (i.e. because they include the roothash), we have to do all of the
125             # encoding work, and only get to save on the upload part.
126             log.msg("UPLOADING AGAIN")
127             up = upload.Data(DATA, convergence=convergence)
128             up.max_segment_size = 1024
129             d1 = self.uploader.upload(up)
130         d.addCallback(_upload_again)
131
132         def _download_to_data(res):
133             log.msg("DOWNLOADING")
134             return self.downloader.download_to_data(self.uri)
135         d.addCallback(_download_to_data)
136         def _download_to_data_done(data):
137             log.msg("download finished")
138             self.failUnlessEqual(data, DATA)
139         d.addCallback(_download_to_data_done)
140
141         target_filename = os.path.join(self.basedir, "download.target")
142         def _download_to_filename(res):
143             return self.downloader.download_to_filename(self.uri,
144                                                         target_filename)
145         d.addCallback(_download_to_filename)
146         def _download_to_filename_done(res):
147             newdata = open(target_filename, "rb").read()
148             self.failUnlessEqual(newdata, DATA)
149         d.addCallback(_download_to_filename_done)
150
151         target_filename2 = os.path.join(self.basedir, "download.target2")
152         def _download_to_filehandle(res):
153             fh = open(target_filename2, "wb")
154             return self.downloader.download_to_filehandle(self.uri, fh)
155         d.addCallback(_download_to_filehandle)
156         def _download_to_filehandle_done(fh):
157             fh.close()
158             newdata = open(target_filename2, "rb").read()
159             self.failUnlessEqual(newdata, DATA)
160         d.addCallback(_download_to_filehandle_done)
161
162         def _download_nonexistent_uri(res):
163             baduri = self.mangle_uri(self.uri)
164             log.msg("about to download non-existent URI", level=log.UNUSUAL,
165                     facility="tahoe.tests")
166             d1 = self.downloader.download_to_data(baduri)
167             def _baduri_should_fail(res):
168                 log.msg("finished downloading non-existend URI",
169                         level=log.UNUSUAL, facility="tahoe.tests")
170                 self.failUnless(isinstance(res, Failure))
171                 self.failUnless(res.check(download.NotEnoughSharesError),
172                                 "expected NotEnoughSharesError, got %s" % res)
173                 # TODO: files that have zero peers should get a special kind
174                 # of NotEnoughSharesError, which can be used to suggest that
175                 # the URI might be wrong or that they've never uploaded the
176                 # file in the first place.
177             d1.addBoth(_baduri_should_fail)
178             return d1
179         d.addCallback(_download_nonexistent_uri)
180
181         # add a new node, which doesn't accept shares, and only uses the
182         # helper for upload.
183         d.addCallback(lambda res: self.add_extra_node(self.numclients,
184                                                       self.helper_furl,
185                                                       add_to_sparent=True))
186         def _added(extra_node):
187             self.extra_node = extra_node
188             extra_node.getServiceNamed("storage").sizelimit = 0
189         d.addCallback(_added)
190
191         HELPER_DATA = "Data that needs help to upload" * 1000
192         def _upload_with_helper(res):
193             u = upload.Data(HELPER_DATA, convergence=convergence)
194             d = self.extra_node.upload(u)
195             def _uploaded(results):
196                 uri = results.uri
197                 return self.downloader.download_to_data(uri)
198             d.addCallback(_uploaded)
199             def _check(newdata):
200                 self.failUnlessEqual(newdata, HELPER_DATA)
201             d.addCallback(_check)
202             return d
203         d.addCallback(_upload_with_helper)
204
205         def _upload_duplicate_with_helper(res):
206             u = upload.Data(HELPER_DATA, convergence=convergence)
207             u.debug_stash_RemoteEncryptedUploadable = True
208             d = self.extra_node.upload(u)
209             def _uploaded(results):
210                 uri = results.uri
211                 return self.downloader.download_to_data(uri)
212             d.addCallback(_uploaded)
213             def _check(newdata):
214                 self.failUnlessEqual(newdata, HELPER_DATA)
215                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
216                             "uploadable started uploading, should have been avoided")
217             d.addCallback(_check)
218             return d
219         if convergence is not None:
220             d.addCallback(_upload_duplicate_with_helper)
221
222         def _upload_resumable(res):
223             DATA = "Data that needs help to upload and gets interrupted" * 1000
224             u1 = CountingDataUploadable(DATA, convergence=convergence)
225             u2 = CountingDataUploadable(DATA, convergence=convergence)
226
227             # we interrupt the connection after about 5kB by shutting down
228             # the helper, then restartingit.
229             u1.interrupt_after = 5000
230             u1.interrupt_after_d = defer.Deferred()
231             u1.interrupt_after_d.addCallback(lambda res:
232                                              self.bounce_client(0))
233
234             # sneak into the helper and reduce its chunk size, so that our
235             # debug_interrupt will sever the connection on about the fifth
236             # chunk fetched. This makes sure that we've started to write the
237             # new shares before we abandon them, which exercises the
238             # abort/delete-partial-share code. TODO: find a cleaner way to do
239             # this. I know that this will affect later uses of the helper in
240             # this same test run, but I'm not currently worried about it.
241             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
242
243             d = self.extra_node.upload(u1)
244
245             def _should_not_finish(res):
246                 self.fail("interrupted upload should have failed, not finished"
247                           " with result %s" % (res,))
248             def _interrupted(f):
249                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
250
251                 # make sure we actually interrupted it before finishing the
252                 # file
253                 self.failUnless(u1.bytes_read < len(DATA),
254                                 "read %d out of %d total" % (u1.bytes_read,
255                                                              len(DATA)))
256
257                 log.msg("waiting for reconnect", level=log.NOISY,
258                         facility="tahoe.test.test_system")
259                 # now, we need to give the nodes a chance to notice that this
260                 # connection has gone away. When this happens, the storage
261                 # servers will be told to abort their uploads, removing the
262                 # partial shares. Unfortunately this involves TCP messages
263                 # going through the loopback interface, and we can't easily
264                 # predict how long that will take. If it were all local, we
265                 # could use fireEventually() to stall. Since we don't have
266                 # the right introduction hooks, the best we can do is use a
267                 # fixed delay. TODO: this is fragile.
268                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
269                 return u1.interrupt_after_d
270             d.addCallbacks(_should_not_finish, _interrupted)
271
272             def _disconnected(res):
273                 # check to make sure the storage servers aren't still hanging
274                 # on to the partial share: their incoming/ directories should
275                 # now be empty.
276                 log.msg("disconnected", level=log.NOISY,
277                         facility="tahoe.test.test_system")
278                 for i in range(self.numclients):
279                     incdir = os.path.join(self.getdir("client%d" % i),
280                                           "storage", "shares", "incoming")
281                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
282             d.addCallback(_disconnected)
283
284             # then we need to give the reconnector a chance to
285             # reestablish the connection to the helper.
286             d.addCallback(lambda res:
287                           log.msg("wait_for_connections", level=log.NOISY,
288                                   facility="tahoe.test.test_system"))
289             d.addCallback(lambda res: self.wait_for_connections())
290
291
292             d.addCallback(lambda res:
293                           log.msg("uploading again", level=log.NOISY,
294                                   facility="tahoe.test.test_system"))
295             d.addCallback(lambda res: self.extra_node.upload(u2))
296
297             def _uploaded(results):
298                 uri = results.uri
299                 log.msg("Second upload complete", level=log.NOISY,
300                         facility="tahoe.test.test_system")
301
302                 # this is really bytes received rather than sent, but it's
303                 # convenient and basically measures the same thing
304                 bytes_sent = results.ciphertext_fetched
305
306                 # We currently don't support resumption of upload if the data is
307                 # encrypted with a random key.  (Because that would require us
308                 # to store the key locally and re-use it on the next upload of
309                 # this file, which isn't a bad thing to do, but we currently
310                 # don't do it.)
311                 if convergence is not None:
312                     # Make sure we did not have to read the whole file the
313                     # second time around .
314                     self.failUnless(bytes_sent < len(DATA),
315                                 "resumption didn't save us any work:"
316                                 " read %d bytes out of %d total" %
317                                 (bytes_sent, len(DATA)))
318                 else:
319                     # Make sure we did have to read the whole file the second
320                     # time around -- because the one that we partially uploaded
321                     # earlier was encrypted with a different random key.
322                     self.failIf(bytes_sent < len(DATA),
323                                 "resumption saved us some work even though we were using random keys:"
324                                 " read %d bytes out of %d total" %
325                                 (bytes_sent, len(DATA)))
326                 return self.downloader.download_to_data(uri)
327             d.addCallback(_uploaded)
328
329             def _check(newdata):
330                 self.failUnlessEqual(newdata, DATA)
331                 # If using convergent encryption, then also check that the
332                 # helper has removed the temp file from its directories.
333                 if convergence is not None:
334                     basedir = os.path.join(self.getdir("client0"), "helper")
335                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
336                     self.failUnlessEqual(files, [])
337                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
338                     self.failUnlessEqual(files, [])
339             d.addCallback(_check)
340             return d
341         d.addCallback(_upload_resumable)
342
343         return d
344
345     def _find_shares(self, basedir):
346         shares = []
347         for (dirpath, dirnames, filenames) in os.walk(basedir):
348             if "storage" not in dirpath:
349                 continue
350             if not filenames:
351                 continue
352             pieces = dirpath.split(os.sep)
353             if pieces[-4] == "storage" and pieces[-3] == "shares":
354                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
355                 # are sharefiles here
356                 assert pieces[-5].startswith("client")
357                 client_num = int(pieces[-5][-1])
358                 storage_index_s = pieces[-1]
359                 storage_index = storage.si_a2b(storage_index_s)
360                 for sharename in filenames:
361                     shnum = int(sharename)
362                     filename = os.path.join(dirpath, sharename)
363                     data = (client_num, storage_index, filename, shnum)
364                     shares.append(data)
365         if not shares:
366             self.fail("unable to find any share files in %s" % basedir)
367         return shares
368
369     def _corrupt_mutable_share(self, filename, which):
370         msf = storage.MutableShareFile(filename)
371         datav = msf.readv([ (0, 1000000) ])
372         final_share = datav[0]
373         assert len(final_share) < 1000000 # ought to be truncated
374         pieces = mutable_layout.unpack_share(final_share)
375         (seqnum, root_hash, IV, k, N, segsize, datalen,
376          verification_key, signature, share_hash_chain, block_hash_tree,
377          share_data, enc_privkey) = pieces
378
379         if which == "seqnum":
380             seqnum = seqnum + 15
381         elif which == "R":
382             root_hash = self.flip_bit(root_hash)
383         elif which == "IV":
384             IV = self.flip_bit(IV)
385         elif which == "segsize":
386             segsize = segsize + 15
387         elif which == "pubkey":
388             verification_key = self.flip_bit(verification_key)
389         elif which == "signature":
390             signature = self.flip_bit(signature)
391         elif which == "share_hash_chain":
392             nodenum = share_hash_chain.keys()[0]
393             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
394         elif which == "block_hash_tree":
395             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
396         elif which == "share_data":
397             share_data = self.flip_bit(share_data)
398         elif which == "encprivkey":
399             enc_privkey = self.flip_bit(enc_privkey)
400
401         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
402                                             segsize, datalen)
403         final_share = mutable_layout.pack_share(prefix,
404                                                 verification_key,
405                                                 signature,
406                                                 share_hash_chain,
407                                                 block_hash_tree,
408                                                 share_data,
409                                                 enc_privkey)
410         msf.writev( [(0, final_share)], None)
411
412
413     def test_mutable(self):
414         self.basedir = "system/SystemTest/test_mutable"
415         DATA = "initial contents go here."  # 25 bytes % 3 != 0
416         NEWDATA = "new contents yay"
417         NEWERDATA = "this is getting old"
418
419         d = self.set_up_nodes(use_key_generator=True)
420
421         def _create_mutable(res):
422             c = self.clients[0]
423             log.msg("starting create_mutable_file")
424             d1 = c.create_mutable_file(DATA)
425             def _done(res):
426                 log.msg("DONE: %s" % (res,))
427                 self._mutable_node_1 = res
428                 uri = res.get_uri()
429             d1.addCallback(_done)
430             return d1
431         d.addCallback(_create_mutable)
432
433         def _test_debug(res):
434             # find a share. It is important to run this while there is only
435             # one slot in the grid.
436             shares = self._find_shares(self.basedir)
437             (client_num, storage_index, filename, shnum) = shares[0]
438             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
439                     % filename)
440             log.msg(" for clients[%d]" % client_num)
441
442             out,err = StringIO(), StringIO()
443             rc = runner.runner(["debug", "dump-share", "--offsets",
444                                 filename],
445                                stdout=out, stderr=err)
446             output = out.getvalue()
447             self.failUnlessEqual(rc, 0)
448             try:
449                 self.failUnless("Mutable slot found:\n" in output)
450                 self.failUnless("share_type: SDMF\n" in output)
451                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
452                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
453                 self.failUnless(" num_extra_leases: 0\n" in output)
454                 # the pubkey size can vary by a byte, so the container might
455                 # be a bit larger on some runs.
456                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
457                 self.failUnless(m)
458                 container_size = int(m.group(1))
459                 self.failUnless(2037 <= container_size <= 2049, container_size)
460                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
461                 self.failUnless(m)
462                 data_length = int(m.group(1))
463                 self.failUnless(2037 <= data_length <= 2049, data_length)
464                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
465                                 in output)
466                 self.failUnless(" SDMF contents:\n" in output)
467                 self.failUnless("  seqnum: 1\n" in output)
468                 self.failUnless("  required_shares: 3\n" in output)
469                 self.failUnless("  total_shares: 10\n" in output)
470                 self.failUnless("  segsize: 27\n" in output, (output, filename))
471                 self.failUnless("  datalen: 25\n" in output)
472                 # the exact share_hash_chain nodes depends upon the sharenum,
473                 # and is more of a hassle to compute than I want to deal with
474                 # now
475                 self.failUnless("  share_hash_chain: " in output)
476                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
477                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
478                             base32.b2a(storage_index))
479                 self.failUnless(expected in output)
480             except unittest.FailTest:
481                 print
482                 print "dump-share output was:"
483                 print output
484                 raise
485         d.addCallback(_test_debug)
486
487         # test retrieval
488
489         # first, let's see if we can use the existing node to retrieve the
490         # contents. This allows it to use the cached pubkey and maybe the
491         # latest-known sharemap.
492
493         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
494         def _check_download_1(res):
495             self.failUnlessEqual(res, DATA)
496             # now we see if we can retrieve the data from a new node,
497             # constructed using the URI of the original one. We do this test
498             # on the same client that uploaded the data.
499             uri = self._mutable_node_1.get_uri()
500             log.msg("starting retrieve1")
501             newnode = self.clients[0].create_node_from_uri(uri)
502             newnode_2 = self.clients[0].create_node_from_uri(uri)
503             self.failUnlessIdentical(newnode, newnode_2)
504             return newnode.download_best_version()
505         d.addCallback(_check_download_1)
506
507         def _check_download_2(res):
508             self.failUnlessEqual(res, DATA)
509             # same thing, but with a different client
510             uri = self._mutable_node_1.get_uri()
511             newnode = self.clients[1].create_node_from_uri(uri)
512             log.msg("starting retrieve2")
513             d1 = newnode.download_best_version()
514             d1.addCallback(lambda res: (res, newnode))
515             return d1
516         d.addCallback(_check_download_2)
517
518         def _check_download_3((res, newnode)):
519             self.failUnlessEqual(res, DATA)
520             # replace the data
521             log.msg("starting replace1")
522             d1 = newnode.overwrite(NEWDATA)
523             d1.addCallback(lambda res: newnode.download_best_version())
524             return d1
525         d.addCallback(_check_download_3)
526
527         def _check_download_4(res):
528             self.failUnlessEqual(res, NEWDATA)
529             # now create an even newer node and replace the data on it. This
530             # new node has never been used for download before.
531             uri = self._mutable_node_1.get_uri()
532             newnode1 = self.clients[2].create_node_from_uri(uri)
533             newnode2 = self.clients[3].create_node_from_uri(uri)
534             self._newnode3 = self.clients[3].create_node_from_uri(uri)
535             log.msg("starting replace2")
536             d1 = newnode1.overwrite(NEWERDATA)
537             d1.addCallback(lambda res: newnode2.download_best_version())
538             return d1
539         d.addCallback(_check_download_4)
540
541         def _check_download_5(res):
542             log.msg("finished replace2")
543             self.failUnlessEqual(res, NEWERDATA)
544         d.addCallback(_check_download_5)
545
546         def _corrupt_shares(res):
547             # run around and flip bits in all but k of the shares, to test
548             # the hash checks
549             shares = self._find_shares(self.basedir)
550             ## sort by share number
551             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
552             where = dict([ (shnum, filename)
553                            for (client_num, storage_index, filename, shnum)
554                            in shares ])
555             assert len(where) == 10 # this test is designed for 3-of-10
556             for shnum, filename in where.items():
557                 # shares 7,8,9 are left alone. read will check
558                 # (share_hash_chain, block_hash_tree, share_data). New
559                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
560                 # segsize, signature).
561                 if shnum == 0:
562                     # read: this will trigger "pubkey doesn't match
563                     # fingerprint".
564                     self._corrupt_mutable_share(filename, "pubkey")
565                     self._corrupt_mutable_share(filename, "encprivkey")
566                 elif shnum == 1:
567                     # triggers "signature is invalid"
568                     self._corrupt_mutable_share(filename, "seqnum")
569                 elif shnum == 2:
570                     # triggers "signature is invalid"
571                     self._corrupt_mutable_share(filename, "R")
572                 elif shnum == 3:
573                     # triggers "signature is invalid"
574                     self._corrupt_mutable_share(filename, "segsize")
575                 elif shnum == 4:
576                     self._corrupt_mutable_share(filename, "share_hash_chain")
577                 elif shnum == 5:
578                     self._corrupt_mutable_share(filename, "block_hash_tree")
579                 elif shnum == 6:
580                     self._corrupt_mutable_share(filename, "share_data")
581                 # other things to correct: IV, signature
582                 # 7,8,9 are left alone
583
584                 # note that initial_query_count=5 means that we'll hit the
585                 # first 5 servers in effectively random order (based upon
586                 # response time), so we won't necessarily ever get a "pubkey
587                 # doesn't match fingerprint" error (if we hit shnum>=1 before
588                 # shnum=0, we pull the pubkey from there). To get repeatable
589                 # specific failures, we need to set initial_query_count=1,
590                 # but of course that will change the sequencing behavior of
591                 # the retrieval process. TODO: find a reasonable way to make
592                 # this a parameter, probably when we expand this test to test
593                 # for one failure mode at a time.
594
595                 # when we retrieve this, we should get three signature
596                 # failures (where we've mangled seqnum, R, and segsize). The
597                 # pubkey mangling
598         d.addCallback(_corrupt_shares)
599
600         d.addCallback(lambda res: self._newnode3.download_best_version())
601         d.addCallback(_check_download_5)
602
603         def _check_empty_file(res):
604             # make sure we can create empty files, this usually screws up the
605             # segsize math
606             d1 = self.clients[2].create_mutable_file("")
607             d1.addCallback(lambda newnode: newnode.download_best_version())
608             d1.addCallback(lambda res: self.failUnlessEqual("", res))
609             return d1
610         d.addCallback(_check_empty_file)
611
612         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
613         def _created_dirnode(dnode):
614             log.msg("_created_dirnode(%s)" % (dnode,))
615             d1 = dnode.list()
616             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
617             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
618             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
619             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
620             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
621             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
622             d1.addCallback(lambda res: dnode.build_manifest())
623             d1.addCallback(lambda manifest:
624                            self.failUnlessEqual(len(manifest), 1))
625             return d1
626         d.addCallback(_created_dirnode)
627
628         def wait_for_c3_kg_conn():
629             return self.clients[3]._key_generator is not None
630         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
631
632         def check_kg_poolsize(junk, size_delta):
633             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
634                                  self.key_generator_svc.key_generator.pool_size + size_delta)
635
636         d.addCallback(check_kg_poolsize, 0)
637         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
638         d.addCallback(check_kg_poolsize, -1)
639         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
640         d.addCallback(check_kg_poolsize, -2)
641         # use_helper induces use of clients[3], which is the using-key_gen client
642         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
643         d.addCallback(check_kg_poolsize, -3)
644
645         return d
646     # The default 120 second timeout went off when running it under valgrind
647     # on my old Windows laptop, so I'm bumping up the timeout.
648     test_mutable.timeout = 240
649
650     def flip_bit(self, good):
651         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
652
653     def mangle_uri(self, gooduri):
654         # change the key, which changes the storage index, which means we'll
655         # be asking about the wrong file, so nobody will have any shares
656         u = IFileURI(gooduri)
657         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
658                             uri_extension_hash=u.uri_extension_hash,
659                             needed_shares=u.needed_shares,
660                             total_shares=u.total_shares,
661                             size=u.size)
662         return u2.to_string()
663
664     # TODO: add a test which mangles the uri_extension_hash instead, and
665     # should fail due to not being able to get a valid uri_extension block.
666     # Also a test which sneakily mangles the uri_extension block to change
667     # some of the validation data, so it will fail in the post-download phase
668     # when the file's crypttext integrity check fails. Do the same thing for
669     # the key, which should cause the download to fail the post-download
670     # plaintext_hash check.
671
672     def test_vdrive(self):
673         self.basedir = "system/SystemTest/test_vdrive"
674         self.data = LARGE_DATA
675         d = self.set_up_nodes(use_stats_gatherer=True)
676         d.addCallback(self._test_introweb)
677         d.addCallback(self.log, "starting publish")
678         d.addCallback(self._do_publish1)
679         d.addCallback(self._test_runner)
680         d.addCallback(self._do_publish2)
681         # at this point, we have the following filesystem (where "R" denotes
682         # self._root_directory_uri):
683         # R
684         # R/subdir1
685         # R/subdir1/mydata567
686         # R/subdir1/subdir2/
687         # R/subdir1/subdir2/mydata992
688
689         d.addCallback(lambda res: self.bounce_client(0))
690         d.addCallback(self.log, "bounced client0")
691
692         d.addCallback(self._check_publish1)
693         d.addCallback(self.log, "did _check_publish1")
694         d.addCallback(self._check_publish2)
695         d.addCallback(self.log, "did _check_publish2")
696         d.addCallback(self._do_publish_private)
697         d.addCallback(self.log, "did _do_publish_private")
698         # now we also have (where "P" denotes a new dir):
699         #  P/personal/sekrit data
700         #  P/s2-rw -> /subdir1/subdir2/
701         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
702         d.addCallback(self._check_publish_private)
703         d.addCallback(self.log, "did _check_publish_private")
704         d.addCallback(self._test_web)
705         d.addCallback(self._test_control)
706         d.addCallback(self._test_cli)
707         # P now has four top-level children:
708         # P/personal/sekrit data
709         # P/s2-ro/
710         # P/s2-rw/
711         # P/test_put/  (empty)
712         d.addCallback(self._test_checker)
713         d.addCallback(self._grab_stats)
714         return d
715     test_vdrive.timeout = 1100
716
717     def _test_introweb(self, res):
718         d = getPage(self.introweb_url, method="GET", followRedirect=True)
719         def _check(res):
720             try:
721                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
722                                 in res)
723                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
724                 self.failUnless("Subscription Summary: storage: 5" in res)
725             except unittest.FailTest:
726                 print
727                 print "GET %s output was:" % self.introweb_url
728                 print res
729                 raise
730         d.addCallback(_check)
731         d.addCallback(lambda res:
732                       getPage(self.introweb_url + "?t=json",
733                               method="GET", followRedirect=True))
734         def _check_json(res):
735             data = simplejson.loads(res)
736             try:
737                 self.failUnlessEqual(data["subscription_summary"],
738                                      {"storage": 5})
739                 self.failUnlessEqual(data["announcement_summary"],
740                                      {"storage": 5, "stub_client": 5})
741             except unittest.FailTest:
742                 print
743                 print "GET %s?t=json output was:" % self.introweb_url
744                 print res
745                 raise
746         d.addCallback(_check_json)
747         return d
748
749     def _do_publish1(self, res):
750         ut = upload.Data(self.data, convergence=None)
751         c0 = self.clients[0]
752         d = c0.create_empty_dirnode()
753         def _made_root(new_dirnode):
754             self._root_directory_uri = new_dirnode.get_uri()
755             return c0.create_node_from_uri(self._root_directory_uri)
756         d.addCallback(_made_root)
757         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
758         def _made_subdir1(subdir1_node):
759             self._subdir1_node = subdir1_node
760             d1 = subdir1_node.add_file(u"mydata567", ut)
761             d1.addCallback(self.log, "publish finished")
762             def _stash_uri(filenode):
763                 self.uri = filenode.get_uri()
764             d1.addCallback(_stash_uri)
765             return d1
766         d.addCallback(_made_subdir1)
767         return d
768
769     def _do_publish2(self, res):
770         ut = upload.Data(self.data, convergence=None)
771         d = self._subdir1_node.create_empty_directory(u"subdir2")
772         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
773         return d
774
775     def log(self, res, *args, **kwargs):
776         # print "MSG: %s  RES: %s" % (msg, args)
777         log.msg(*args, **kwargs)
778         return res
779
780     def _do_publish_private(self, res):
781         self.smalldata = "sssh, very secret stuff"
782         ut = upload.Data(self.smalldata, convergence=None)
783         d = self.clients[0].create_empty_dirnode()
784         d.addCallback(self.log, "GOT private directory")
785         def _got_new_dir(privnode):
786             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
787             d1 = privnode.create_empty_directory(u"personal")
788             d1.addCallback(self.log, "made P/personal")
789             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
790             d1.addCallback(self.log, "made P/personal/sekrit data")
791             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
792             def _got_s2(s2node):
793                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
794                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
795                 return d2
796             d1.addCallback(_got_s2)
797             d1.addCallback(lambda res: privnode)
798             return d1
799         d.addCallback(_got_new_dir)
800         return d
801
802     def _check_publish1(self, res):
803         # this one uses the iterative API
804         c1 = self.clients[1]
805         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
806         d.addCallback(self.log, "check_publish1 got /")
807         d.addCallback(lambda root: root.get(u"subdir1"))
808         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
809         d.addCallback(lambda filenode: filenode.download_to_data())
810         d.addCallback(self.log, "get finished")
811         def _get_done(data):
812             self.failUnlessEqual(data, self.data)
813         d.addCallback(_get_done)
814         return d
815
816     def _check_publish2(self, res):
817         # this one uses the path-based API
818         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
819         d = rootnode.get_child_at_path(u"subdir1")
820         d.addCallback(lambda dirnode:
821                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
822         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
823         d.addCallback(lambda filenode: filenode.download_to_data())
824         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
825
826         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
827         def _got_filenode(filenode):
828             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
829             assert fnode == filenode
830         d.addCallback(_got_filenode)
831         return d
832
833     def _check_publish_private(self, resnode):
834         # this one uses the path-based API
835         self._private_node = resnode
836
837         d = self._private_node.get_child_at_path(u"personal")
838         def _got_personal(personal):
839             self._personal_node = personal
840             return personal
841         d.addCallback(_got_personal)
842
843         d.addCallback(lambda dirnode:
844                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
845         def get_path(path):
846             return self._private_node.get_child_at_path(path)
847
848         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
849         d.addCallback(lambda filenode: filenode.download_to_data())
850         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
851         d.addCallback(lambda res: get_path(u"s2-rw"))
852         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
853         d.addCallback(lambda res: get_path(u"s2-ro"))
854         def _got_s2ro(dirnode):
855             self.failUnless(dirnode.is_mutable(), dirnode)
856             self.failUnless(dirnode.is_readonly(), dirnode)
857             d1 = defer.succeed(None)
858             d1.addCallback(lambda res: dirnode.list())
859             d1.addCallback(self.log, "dirnode.list")
860
861             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
862
863             d1.addCallback(self.log, "doing add_file(ro)")
864             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
865             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
866
867             d1.addCallback(self.log, "doing get(ro)")
868             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
869             d1.addCallback(lambda filenode:
870                            self.failUnless(IFileNode.providedBy(filenode)))
871
872             d1.addCallback(self.log, "doing delete(ro)")
873             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
874
875             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
876
877             d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
878
879             personal = self._personal_node
880             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
881
882             d1.addCallback(self.log, "doing move_child_to(ro)2")
883             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
884
885             d1.addCallback(self.log, "finished with _got_s2ro")
886             return d1
887         d.addCallback(_got_s2ro)
888         def _got_home(dummy):
889             home = self._private_node
890             personal = self._personal_node
891             d1 = defer.succeed(None)
892             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
893             d1.addCallback(lambda res:
894                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
895
896             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
897             d1.addCallback(lambda res:
898                            home.move_child_to(u"sekrit", home, u"sekrit data"))
899
900             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
901             d1.addCallback(lambda res:
902                            home.move_child_to(u"sekrit data", personal))
903
904             d1.addCallback(lambda res: home.build_manifest())
905             d1.addCallback(self.log, "manifest")
906             #  four items:
907             # P/personal/
908             # P/personal/sekrit data
909             # P/s2-rw  (same as P/s2-ro)
910             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
911             d1.addCallback(lambda manifest:
912                            self.failUnlessEqual(len(manifest), 4))
913             d1.addCallback(lambda res: home.deep_stats())
914             def _check_stats(stats):
915                 expected = {"count-immutable-files": 1,
916                             "count-mutable-files": 0,
917                             "count-literal-files": 1,
918                             "count-files": 2,
919                             "count-directories": 3,
920                             "size-immutable-files": 112,
921                             "size-literal-files": 23,
922                             #"size-directories": 616, # varies
923                             #"largest-directory": 616,
924                             "largest-directory-children": 3,
925                             "largest-immutable-file": 112,
926                             }
927                 for k,v in expected.iteritems():
928                     self.failUnlessEqual(stats[k], v,
929                                          "stats[%s] was %s, not %s" %
930                                          (k, stats[k], v))
931                 self.failUnless(stats["size-directories"] > 1300,
932                                 stats["size-directories"])
933                 self.failUnless(stats["largest-directory"] > 800,
934                                 stats["largest-directory"])
935                 self.failUnlessEqual(stats["size-files-histogram"],
936                                      [ (11, 31, 1), (101, 316, 1) ])
937             d1.addCallback(_check_stats)
938             return d1
939         d.addCallback(_got_home)
940         return d
941
942     def shouldFail(self, res, expected_failure, which, substring=None):
943         if isinstance(res, Failure):
944             res.trap(expected_failure)
945             if substring:
946                 self.failUnless(substring in str(res),
947                                 "substring '%s' not in '%s'"
948                                 % (substring, str(res)))
949         else:
950             self.fail("%s was supposed to raise %s, not get '%s'" %
951                       (which, expected_failure, res))
952
953     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
954         assert substring is None or isinstance(substring, str)
955         d = defer.maybeDeferred(callable, *args, **kwargs)
956         def done(res):
957             if isinstance(res, Failure):
958                 res.trap(expected_failure)
959                 if substring:
960                     self.failUnless(substring in str(res),
961                                     "substring '%s' not in '%s'"
962                                     % (substring, str(res)))
963             else:
964                 self.fail("%s was supposed to raise %s, not get '%s'" %
965                           (which, expected_failure, res))
966         d.addBoth(done)
967         return d
968
969     def PUT(self, urlpath, data):
970         url = self.webish_url + urlpath
971         return getPage(url, method="PUT", postdata=data)
972
973     def GET(self, urlpath, followRedirect=False):
974         url = self.webish_url + urlpath
975         return getPage(url, method="GET", followRedirect=followRedirect)
976
977     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
978         if use_helper:
979             url = self.helper_webish_url + urlpath
980         else:
981             url = self.webish_url + urlpath
982         sepbase = "boogabooga"
983         sep = "--" + sepbase
984         form = []
985         form.append(sep)
986         form.append('Content-Disposition: form-data; name="_charset"')
987         form.append('')
988         form.append('UTF-8')
989         form.append(sep)
990         for name, value in fields.iteritems():
991             if isinstance(value, tuple):
992                 filename, value = value
993                 form.append('Content-Disposition: form-data; name="%s"; '
994                             'filename="%s"' % (name, filename.encode("utf-8")))
995             else:
996                 form.append('Content-Disposition: form-data; name="%s"' % name)
997             form.append('')
998             form.append(str(value))
999             form.append(sep)
1000         form[-1] += "--"
1001         body = "\r\n".join(form) + "\r\n"
1002         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1003                    }
1004         return getPage(url, method="POST", postdata=body,
1005                        headers=headers, followRedirect=followRedirect)
1006
1007     def _test_web(self, res):
1008         base = self.webish_url
1009         public = "uri/" + self._root_directory_uri
1010         d = getPage(base)
1011         def _got_welcome(page):
1012             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1013             self.failUnless(expected in page,
1014                             "I didn't see the right 'connected storage servers'"
1015                             " message in: %s" % page
1016                             )
1017             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1018             self.failUnless(expected in page,
1019                             "I didn't see the right 'My nodeid' message "
1020                             "in: %s" % page)
1021             self.failUnless("Helper: 0 active uploads" in page)
1022         d.addCallback(_got_welcome)
1023         d.addCallback(self.log, "done with _got_welcome")
1024
1025         # get the welcome page from the node that uses the helper too
1026         d.addCallback(lambda res: getPage(self.helper_webish_url))
1027         def _got_welcome_helper(page):
1028             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1029                             page)
1030             self.failUnless("Not running helper" in page)
1031         d.addCallback(_got_welcome_helper)
1032
1033         d.addCallback(lambda res: getPage(base + public))
1034         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1035         def _got_subdir1(page):
1036             # there ought to be an href for our file
1037             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1038             self.failUnless(">mydata567</a>" in page)
1039         d.addCallback(_got_subdir1)
1040         d.addCallback(self.log, "done with _got_subdir1")
1041         d.addCallback(lambda res:
1042                       getPage(base + public + "/subdir1/mydata567"))
1043         def _got_data(page):
1044             self.failUnlessEqual(page, self.data)
1045         d.addCallback(_got_data)
1046
1047         # download from a URI embedded in a URL
1048         d.addCallback(self.log, "_get_from_uri")
1049         def _get_from_uri(res):
1050             return getPage(base + "uri/%s?filename=%s"
1051                            % (self.uri, "mydata567"))
1052         d.addCallback(_get_from_uri)
1053         def _got_from_uri(page):
1054             self.failUnlessEqual(page, self.data)
1055         d.addCallback(_got_from_uri)
1056
1057         # download from a URI embedded in a URL, second form
1058         d.addCallback(self.log, "_get_from_uri2")
1059         def _get_from_uri2(res):
1060             return getPage(base + "uri?uri=%s" % (self.uri,))
1061         d.addCallback(_get_from_uri2)
1062         d.addCallback(_got_from_uri)
1063
1064         # download from a bogus URI, make sure we get a reasonable error
1065         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1066         def _get_from_bogus_uri(res):
1067             d1 = getPage(base + "uri/%s?filename=%s"
1068                          % (self.mangle_uri(self.uri), "mydata567"))
1069             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1070                        "410")
1071             return d1
1072         d.addCallback(_get_from_bogus_uri)
1073         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1074
1075         # upload a file with PUT
1076         d.addCallback(self.log, "about to try PUT")
1077         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1078                                            "new.txt contents"))
1079         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1080         d.addCallback(self.failUnlessEqual, "new.txt contents")
1081         # and again with something large enough to use multiple segments,
1082         # and hopefully trigger pauseProducing too
1083         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1084                                            "big" * 500000)) # 1.5MB
1085         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1086         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1087
1088         # can we replace files in place?
1089         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1090                                            "NEWER contents"))
1091         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1092         d.addCallback(self.failUnlessEqual, "NEWER contents")
1093
1094         # test unlinked POST
1095         d.addCallback(lambda res: self.POST("uri", t="upload",
1096                                             file=("new.txt", "data" * 10000)))
1097         # and again using the helper, which exercises different upload-status
1098         # display code
1099         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1100                                             file=("foo.txt", "data2" * 10000)))
1101
1102         # check that the status page exists
1103         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1104         def _got_status(res):
1105             # find an interesting upload and download to look at. LIT files
1106             # are not interesting.
1107             for ds in self.clients[0].list_all_download_statuses():
1108                 if ds.get_size() > 200:
1109                     self._down_status = ds.get_counter()
1110             for us in self.clients[0].list_all_upload_statuses():
1111                 if us.get_size() > 200:
1112                     self._up_status = us.get_counter()
1113             rs = self.clients[0].list_all_retrieve_statuses()[0]
1114             self._retrieve_status = rs.get_counter()
1115             ps = self.clients[0].list_all_publish_statuses()[0]
1116             self._publish_status = ps.get_counter()
1117             us = self.clients[0].list_all_mapupdate_statuses()[0]
1118             self._update_status = us.get_counter()
1119
1120             # and that there are some upload- and download- status pages
1121             return self.GET("status/up-%d" % self._up_status)
1122         d.addCallback(_got_status)
1123         def _got_up(res):
1124             return self.GET("status/down-%d" % self._down_status)
1125         d.addCallback(_got_up)
1126         def _got_down(res):
1127             return self.GET("status/mapupdate-%d" % self._update_status)
1128         d.addCallback(_got_down)
1129         def _got_update(res):
1130             return self.GET("status/publish-%d" % self._publish_status)
1131         d.addCallback(_got_update)
1132         def _got_publish(res):
1133             return self.GET("status/retrieve-%d" % self._retrieve_status)
1134         d.addCallback(_got_publish)
1135
1136         # check that the helper status page exists
1137         d.addCallback(lambda res:
1138                       self.GET("helper_status", followRedirect=True))
1139         def _got_helper_status(res):
1140             self.failUnless("Bytes Fetched:" in res)
1141             # touch a couple of files in the helper's working directory to
1142             # exercise more code paths
1143             workdir = os.path.join(self.getdir("client0"), "helper")
1144             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1145             f = open(incfile, "wb")
1146             f.write("small file")
1147             f.close()
1148             then = time.time() - 86400*3
1149             now = time.time()
1150             os.utime(incfile, (now, then))
1151             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1152             f = open(encfile, "wb")
1153             f.write("less small file")
1154             f.close()
1155             os.utime(encfile, (now, then))
1156         d.addCallback(_got_helper_status)
1157         # and that the json form exists
1158         d.addCallback(lambda res:
1159                       self.GET("helper_status?t=json", followRedirect=True))
1160         def _got_helper_status_json(res):
1161             data = simplejson.loads(res)
1162             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1163                                  1)
1164             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1165             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1166             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1167                                  10)
1168             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1169             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1170             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1171                                  15)
1172         d.addCallback(_got_helper_status_json)
1173
1174         # and check that client[3] (which uses a helper but does not run one
1175         # itself) doesn't explode when you ask for its status
1176         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1177         def _got_non_helper_status(res):
1178             self.failUnless("Upload and Download Status" in res)
1179         d.addCallback(_got_non_helper_status)
1180
1181         # or for helper status with t=json
1182         d.addCallback(lambda res:
1183                       getPage(self.helper_webish_url + "helper_status?t=json"))
1184         def _got_non_helper_status_json(res):
1185             data = simplejson.loads(res)
1186             self.failUnlessEqual(data, {})
1187         d.addCallback(_got_non_helper_status_json)
1188
1189         # see if the statistics page exists
1190         d.addCallback(lambda res: self.GET("statistics"))
1191         def _got_stats(res):
1192             self.failUnless("Node Statistics" in res)
1193             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1194         d.addCallback(_got_stats)
1195         d.addCallback(lambda res: self.GET("statistics?t=json"))
1196         def _got_stats_json(res):
1197             data = simplejson.loads(res)
1198             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1199             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1200         d.addCallback(_got_stats_json)
1201
1202         # TODO: mangle the second segment of a file, to test errors that
1203         # occur after we've already sent some good data, which uses a
1204         # different error path.
1205
1206         # TODO: download a URI with a form
1207         # TODO: create a directory by using a form
1208         # TODO: upload by using a form on the directory page
1209         #    url = base + "somedir/subdir1/freeform_post!!upload"
1210         # TODO: delete a file by using a button on the directory page
1211
1212         return d
1213
1214     def _test_runner(self, res):
1215         # exercise some of the diagnostic tools in runner.py
1216
1217         # find a share
1218         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1219             if "storage" not in dirpath:
1220                 continue
1221             if not filenames:
1222                 continue
1223             pieces = dirpath.split(os.sep)
1224             if pieces[-4] == "storage" and pieces[-3] == "shares":
1225                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1226                 # are sharefiles here
1227                 filename = os.path.join(dirpath, filenames[0])
1228                 # peek at the magic to see if it is a chk share
1229                 magic = open(filename, "rb").read(4)
1230                 if magic == '\x00\x00\x00\x01':
1231                     break
1232         else:
1233             self.fail("unable to find any uri_extension files in %s"
1234                       % self.basedir)
1235         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1236
1237         out,err = StringIO(), StringIO()
1238         rc = runner.runner(["debug", "dump-share", "--offsets",
1239                             filename],
1240                            stdout=out, stderr=err)
1241         output = out.getvalue()
1242         self.failUnlessEqual(rc, 0)
1243
1244         # we only upload a single file, so we can assert some things about
1245         # its size and shares.
1246         self.failUnless(("share filename: %s" % filename) in output)
1247         self.failUnless("size: %d\n" % len(self.data) in output)
1248         self.failUnless("num_segments: 1\n" in output)
1249         # segment_size is always a multiple of needed_shares
1250         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1251         self.failUnless("total_shares: 10\n" in output)
1252         # keys which are supposed to be present
1253         for key in ("size", "num_segments", "segment_size",
1254                     "needed_shares", "total_shares",
1255                     "codec_name", "codec_params", "tail_codec_params",
1256                     #"plaintext_hash", "plaintext_root_hash",
1257                     "crypttext_hash", "crypttext_root_hash",
1258                     "share_root_hash", "UEB_hash"):
1259             self.failUnless("%s: " % key in output, key)
1260         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1261
1262         # now use its storage index to find the other shares using the
1263         # 'find-shares' tool
1264         sharedir, shnum = os.path.split(filename)
1265         storagedir, storage_index_s = os.path.split(sharedir)
1266         out,err = StringIO(), StringIO()
1267         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1268         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1269         rc = runner.runner(cmd, stdout=out, stderr=err)
1270         self.failUnlessEqual(rc, 0)
1271         out.seek(0)
1272         sharefiles = [sfn.strip() for sfn in out.readlines()]
1273         self.failUnlessEqual(len(sharefiles), 10)
1274
1275         # also exercise the 'catalog-shares' tool
1276         out,err = StringIO(), StringIO()
1277         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1278         cmd = ["debug", "catalog-shares"] + nodedirs
1279         rc = runner.runner(cmd, stdout=out, stderr=err)
1280         self.failUnlessEqual(rc, 0)
1281         out.seek(0)
1282         descriptions = [sfn.strip() for sfn in out.readlines()]
1283         self.failUnlessEqual(len(descriptions), 30)
1284         matching = [line
1285                     for line in descriptions
1286                     if line.startswith("CHK %s " % storage_index_s)]
1287         self.failUnlessEqual(len(matching), 10)
1288
1289     def _test_control(self, res):
1290         # exercise the remote-control-the-client foolscap interfaces in
1291         # allmydata.control (mostly used for performance tests)
1292         c0 = self.clients[0]
1293         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1294         control_furl = open(control_furl_file, "r").read().strip()
1295         # it doesn't really matter which Tub we use to connect to the client,
1296         # so let's just use our IntroducerNode's
1297         d = self.introducer.tub.getReference(control_furl)
1298         d.addCallback(self._test_control2, control_furl_file)
1299         return d
1300     def _test_control2(self, rref, filename):
1301         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1302         downfile = os.path.join(self.basedir, "control.downfile")
1303         d.addCallback(lambda uri:
1304                       rref.callRemote("download_from_uri_to_file",
1305                                       uri, downfile))
1306         def _check(res):
1307             self.failUnlessEqual(res, downfile)
1308             data = open(downfile, "r").read()
1309             expected_data = open(filename, "r").read()
1310             self.failUnlessEqual(data, expected_data)
1311         d.addCallback(_check)
1312         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1313         if sys.platform == "linux2":
1314             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1315         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1316         return d
1317
1318     def _test_cli(self, res):
1319         # run various CLI commands (in a thread, since they use blocking
1320         # network calls)
1321
1322         private_uri = self._private_node.get_uri()
1323         some_uri = self._root_directory_uri
1324         client0_basedir = self.getdir("client0")
1325
1326         nodeargs = [
1327             "--node-directory", client0_basedir,
1328             ]
1329         TESTDATA = "I will not write the same thing over and over.\n" * 100
1330
1331         d = defer.succeed(None)
1332
1333         # for compatibility with earlier versions, private/root_dir.cap is
1334         # supposed to be treated as an alias named "tahoe:". Start by making
1335         # sure that works, before we add other aliases.
1336
1337         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1338         f = open(root_file, "w")
1339         f.write(private_uri)
1340         f.close()
1341
1342         def run(ignored, verb, *args, **kwargs):
1343             stdin = kwargs.get("stdin", "")
1344             newargs = [verb] + nodeargs + list(args)
1345             return self._run_cli(newargs, stdin=stdin)
1346
1347         def _check_ls((out,err), expected_children, unexpected_children=[]):
1348             self.failUnlessEqual(err, "")
1349             for s in expected_children:
1350                 self.failUnless(s in out, (s,out))
1351             for s in unexpected_children:
1352                 self.failIf(s in out, (s,out))
1353
1354         def _check_ls_root((out,err)):
1355             self.failUnless("personal" in out)
1356             self.failUnless("s2-ro" in out)
1357             self.failUnless("s2-rw" in out)
1358             self.failUnlessEqual(err, "")
1359
1360         # this should reference private_uri
1361         d.addCallback(run, "ls")
1362         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1363
1364         d.addCallback(run, "list-aliases")
1365         def _check_aliases_1((out,err)):
1366             self.failUnlessEqual(err, "")
1367             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1368         d.addCallback(_check_aliases_1)
1369
1370         # now that that's out of the way, remove root_dir.cap and work with
1371         # new files
1372         d.addCallback(lambda res: os.unlink(root_file))
1373         d.addCallback(run, "list-aliases")
1374         def _check_aliases_2((out,err)):
1375             self.failUnlessEqual(err, "")
1376             self.failUnlessEqual(out, "")
1377         d.addCallback(_check_aliases_2)
1378
1379         d.addCallback(run, "mkdir")
1380         def _got_dir( (out,err) ):
1381             self.failUnless(uri.from_string_dirnode(out.strip()))
1382             return out.strip()
1383         d.addCallback(_got_dir)
1384         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1385
1386         d.addCallback(run, "list-aliases")
1387         def _check_aliases_3((out,err)):
1388             self.failUnlessEqual(err, "")
1389             self.failUnless("tahoe: " in out)
1390         d.addCallback(_check_aliases_3)
1391
1392         def _check_empty_dir((out,err)):
1393             self.failUnlessEqual(out, "")
1394             self.failUnlessEqual(err, "")
1395         d.addCallback(run, "ls")
1396         d.addCallback(_check_empty_dir)
1397
1398         def _check_missing_dir((out,err)):
1399             # TODO: check that rc==2
1400             self.failUnlessEqual(out, "")
1401             self.failUnlessEqual(err, "No such file or directory\n")
1402         d.addCallback(run, "ls", "bogus")
1403         d.addCallback(_check_missing_dir)
1404
1405         files = []
1406         datas = []
1407         for i in range(10):
1408             fn = os.path.join(self.basedir, "file%d" % i)
1409             files.append(fn)
1410             data = "data to be uploaded: file%d\n" % i
1411             datas.append(data)
1412             open(fn,"wb").write(data)
1413
1414         def _check_stdout_against((out,err), filenum=None, data=None):
1415             self.failUnlessEqual(err, "")
1416             if filenum is not None:
1417                 self.failUnlessEqual(out, datas[filenum])
1418             if data is not None:
1419                 self.failUnlessEqual(out, data)
1420
1421         # test all both forms of put: from a file, and from stdin
1422         #  tahoe put bar FOO
1423         d.addCallback(run, "put", files[0], "tahoe-file0")
1424         def _put_out((out,err)):
1425             self.failUnless("URI:LIT:" in out, out)
1426             self.failUnless("201 Created" in err, err)
1427             uri0 = out.strip()
1428             return run(None, "get", uri0)
1429         d.addCallback(_put_out)
1430         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1431
1432         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1433         #  tahoe put bar tahoe:FOO
1434         d.addCallback(run, "put", files[2], "tahoe:file2")
1435         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1436         def _check_put_mutable((out,err)):
1437             self._mutable_file3_uri = out.strip()
1438         d.addCallback(_check_put_mutable)
1439         d.addCallback(run, "get", "tahoe:file3")
1440         d.addCallback(_check_stdout_against, 3)
1441
1442         #  tahoe put FOO
1443         STDIN_DATA = "This is the file to upload from stdin."
1444         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1445         #  tahoe put tahoe:FOO
1446         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1447                       stdin="Other file from stdin.")
1448
1449         d.addCallback(run, "ls")
1450         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1451                                   "tahoe-file-stdin", "from-stdin"])
1452         d.addCallback(run, "ls", "subdir")
1453         d.addCallback(_check_ls, ["tahoe-file1"])
1454
1455         # tahoe mkdir FOO
1456         d.addCallback(run, "mkdir", "subdir2")
1457         d.addCallback(run, "ls")
1458         # TODO: extract the URI, set an alias with it
1459         d.addCallback(_check_ls, ["subdir2"])
1460
1461         # tahoe get: (to stdin and to a file)
1462         d.addCallback(run, "get", "tahoe-file0")
1463         d.addCallback(_check_stdout_against, 0)
1464         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1465         d.addCallback(_check_stdout_against, 1)
1466         outfile0 = os.path.join(self.basedir, "outfile0")
1467         d.addCallback(run, "get", "file2", outfile0)
1468         def _check_outfile0((out,err)):
1469             data = open(outfile0,"rb").read()
1470             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1471         d.addCallback(_check_outfile0)
1472         outfile1 = os.path.join(self.basedir, "outfile0")
1473         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1474         def _check_outfile1((out,err)):
1475             data = open(outfile1,"rb").read()
1476             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1477         d.addCallback(_check_outfile1)
1478
1479         d.addCallback(run, "rm", "tahoe-file0")
1480         d.addCallback(run, "rm", "tahoe:file2")
1481         d.addCallback(run, "ls")
1482         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1483
1484         d.addCallback(run, "ls", "-l")
1485         def _check_ls_l((out,err)):
1486             lines = out.split("\n")
1487             for l in lines:
1488                 if "tahoe-file-stdin" in l:
1489                     self.failUnless(l.startswith("-r-- "), l)
1490                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1491                 if "file3" in l:
1492                     self.failUnless(l.startswith("-rw- "), l) # mutable
1493         d.addCallback(_check_ls_l)
1494
1495         d.addCallback(run, "ls", "--uri")
1496         def _check_ls_uri((out,err)):
1497             lines = out.split("\n")
1498             for l in lines:
1499                 if "file3" in l:
1500                     self.failUnless(self._mutable_file3_uri in l)
1501         d.addCallback(_check_ls_uri)
1502
1503         d.addCallback(run, "ls", "--readonly-uri")
1504         def _check_ls_rouri((out,err)):
1505             lines = out.split("\n")
1506             for l in lines:
1507                 if "file3" in l:
1508                     rw_uri = self._mutable_file3_uri
1509                     u = uri.from_string_mutable_filenode(rw_uri)
1510                     ro_uri = u.get_readonly().to_string()
1511                     self.failUnless(ro_uri in l)
1512         d.addCallback(_check_ls_rouri)
1513
1514
1515         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1516         d.addCallback(run, "ls")
1517         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1518
1519         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1520         d.addCallback(run, "ls")
1521         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1522
1523         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1524         d.addCallback(run, "ls")
1525         d.addCallback(_check_ls, ["file3", "file3-copy"])
1526         d.addCallback(run, "get", "tahoe:file3-copy")
1527         d.addCallback(_check_stdout_against, 3)
1528
1529         # copy from disk into tahoe
1530         d.addCallback(run, "cp", files[4], "tahoe:file4")
1531         d.addCallback(run, "ls")
1532         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1533         d.addCallback(run, "get", "tahoe:file4")
1534         d.addCallback(_check_stdout_against, 4)
1535
1536         # copy from tahoe into disk
1537         target_filename = os.path.join(self.basedir, "file-out")
1538         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1539         def _check_cp_out((out,err)):
1540             self.failUnless(os.path.exists(target_filename))
1541             got = open(target_filename,"rb").read()
1542             self.failUnlessEqual(got, datas[4])
1543         d.addCallback(_check_cp_out)
1544
1545         # copy from disk to disk (silly case)
1546         target2_filename = os.path.join(self.basedir, "file-out-copy")
1547         d.addCallback(run, "cp", target_filename, target2_filename)
1548         def _check_cp_out2((out,err)):
1549             self.failUnless(os.path.exists(target2_filename))
1550             got = open(target2_filename,"rb").read()
1551             self.failUnlessEqual(got, datas[4])
1552         d.addCallback(_check_cp_out2)
1553
1554         # copy from tahoe into disk, overwriting an existing file
1555         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1556         def _check_cp_out3((out,err)):
1557             self.failUnless(os.path.exists(target_filename))
1558             got = open(target_filename,"rb").read()
1559             self.failUnlessEqual(got, datas[3])
1560         d.addCallback(_check_cp_out3)
1561
1562         # copy from disk into tahoe, overwriting an existing immutable file
1563         d.addCallback(run, "cp", files[5], "tahoe:file4")
1564         d.addCallback(run, "ls")
1565         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1566         d.addCallback(run, "get", "tahoe:file4")
1567         d.addCallback(_check_stdout_against, 5)
1568
1569         # copy from disk into tahoe, overwriting an existing mutable file
1570         d.addCallback(run, "cp", files[5], "tahoe:file3")
1571         d.addCallback(run, "ls")
1572         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1573         d.addCallback(run, "get", "tahoe:file3")
1574         d.addCallback(_check_stdout_against, 5)
1575
1576         # recursive copy: setup
1577         dn = os.path.join(self.basedir, "dir1")
1578         os.makedirs(dn)
1579         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1580         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1581         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1582         sdn2 = os.path.join(dn, "subdir2")
1583         os.makedirs(sdn2)
1584         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1585         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1586
1587         # from disk into tahoe
1588         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1589         d.addCallback(run, "ls")
1590         d.addCallback(_check_ls, ["dir1"])
1591         d.addCallback(run, "ls", "dir1")
1592         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1593                       ["rfile4", "rfile5"])
1594         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1595         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1596                       ["rfile1", "rfile2", "rfile3"])
1597         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1598         d.addCallback(_check_stdout_against, data="rfile4")
1599
1600         # and back out again
1601         dn_copy = os.path.join(self.basedir, "dir1-copy")
1602         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1603         def _check_cp_r_out((out,err)):
1604             def _cmp(name):
1605                 old = open(os.path.join(dn, name), "rb").read()
1606                 newfn = os.path.join(dn_copy, name)
1607                 self.failUnless(os.path.exists(newfn))
1608                 new = open(newfn, "rb").read()
1609                 self.failUnlessEqual(old, new)
1610             _cmp("rfile1")
1611             _cmp("rfile2")
1612             _cmp("rfile3")
1613             _cmp(os.path.join("subdir2", "rfile4"))
1614             _cmp(os.path.join("subdir2", "rfile5"))
1615         d.addCallback(_check_cp_r_out)
1616
1617         # and copy it a second time, which ought to overwrite the same files
1618         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1619
1620         # and tahoe-to-tahoe
1621         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1622         d.addCallback(run, "ls")
1623         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1624         d.addCallback(run, "ls", "dir1-copy")
1625         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1626                       ["rfile4", "rfile5"])
1627         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1628         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1629                       ["rfile1", "rfile2", "rfile3"])
1630         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1631         d.addCallback(_check_stdout_against, data="rfile4")
1632
1633         # and copy it a second time, which ought to overwrite the same files
1634         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1635
1636         # tahoe_ls doesn't currently handle the error correctly: it tries to
1637         # JSON-parse a traceback.
1638 ##         def _ls_missing(res):
1639 ##             argv = ["ls"] + nodeargs + ["bogus"]
1640 ##             return self._run_cli(argv)
1641 ##         d.addCallback(_ls_missing)
1642 ##         def _check_ls_missing((out,err)):
1643 ##             print "OUT", out
1644 ##             print "ERR", err
1645 ##             self.failUnlessEqual(err, "")
1646 ##         d.addCallback(_check_ls_missing)
1647
1648         return d
1649
1650     def _run_cli(self, argv, stdin=""):
1651         #print "CLI:", argv
1652         stdout, stderr = StringIO(), StringIO()
1653         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1654                                   stdin=StringIO(stdin),
1655                                   stdout=stdout, stderr=stderr)
1656         def _done(res):
1657             return stdout.getvalue(), stderr.getvalue()
1658         d.addCallback(_done)
1659         return d
1660
1661     def _test_checker(self, res):
1662         ut = upload.Data("too big to be literal" * 200, convergence=None)
1663         d = self._personal_node.add_file(u"big file", ut)
1664
1665         d.addCallback(lambda res: self._personal_node.check())
1666         def _check_dirnode_results(r):
1667             self.failUnless(r.is_healthy())
1668         d.addCallback(_check_dirnode_results)
1669         d.addCallback(lambda res: self._personal_node.check(verify=True))
1670         d.addCallback(_check_dirnode_results)
1671
1672         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1673         def _got_chk_filenode(n):
1674             self.failUnless(isinstance(n, filenode.FileNode))
1675             d = n.check()
1676             def _check_filenode_results(r):
1677                 self.failUnless(r.is_healthy())
1678             d.addCallback(_check_filenode_results)
1679             d.addCallback(lambda res: n.check(verify=True))
1680             d.addCallback(_check_filenode_results)
1681             return d
1682         d.addCallback(_got_chk_filenode)
1683
1684         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1685         def _got_lit_filenode(n):
1686             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1687             d = n.check()
1688             def _check_lit_filenode_results(r):
1689                 self.failUnlessEqual(r, None)
1690             d.addCallback(_check_lit_filenode_results)
1691             d.addCallback(lambda res: n.check(verify=True))
1692             d.addCallback(_check_lit_filenode_results)
1693             return d
1694         d.addCallback(_got_lit_filenode)
1695         return d
1696
1697
1698 class ImmutableChecker(ShareManglingMixin, unittest.TestCase):
1699     def setUp(self):
1700         # Set self.basedir to a temp dir which has the name of the current test method in its
1701         # name.
1702         self.basedir = self.mktemp()
1703         TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
1704
1705         d = defer.maybeDeferred(SystemTestMixin.setUp, self)
1706         d.addCallback(lambda x: self.set_up_nodes())
1707
1708         def _upload_a_file(ignored):
1709             d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
1710             d2.addCallback(lambda u: self.clients[0].create_node_from_uri(u.uri))
1711             return d2
1712         d.addCallback(_upload_a_file)
1713
1714         def _stash_it(filenode):
1715             self.filenode = filenode
1716         d.addCallback(_stash_it)
1717         return d
1718
1719     def _corrupt_a_share(self, unused=None):
1720         """ Exactly one bit of exactly one share on disk will be flipped (randomly selected from
1721         among the bits of the 'share data' -- the verifiable bits)."""
1722
1723         shares = self.find_shares()
1724         ks = shares.keys()
1725         k = random.choice(ks)
1726         data = shares[k]
1727
1728         (version, size, num_leases) = struct.unpack(">LLL", data[:0xc])
1729         sharedata = data[0xc:0xc+size]
1730
1731         corruptedsharedata = testutil.flip_one_bit(sharedata)
1732         corrupteddata = data[:0xc]+corruptedsharedata+data[0xc+size:]
1733         shares[k] = corrupteddata
1734
1735         self.replace_shares(shares)
1736
1737     def test_test_code(self):
1738         # The following process of stashing the shares, running
1739         # replace_shares, and asserting that the new set of shares equals the
1740         # old is more to test this test code than to test the Tahoe code...
1741         d = defer.succeed(None)
1742         d.addCallback(self.find_shares)
1743         stash = [None]
1744         def _stash_it(res):
1745             stash[0] = res
1746             return res
1747
1748         d.addCallback(_stash_it)
1749         d.addCallback(self.replace_shares)
1750
1751         def _compare(res):
1752             oldshares = stash[0]
1753             self.failUnless(isinstance(oldshares, dict), oldshares)
1754             self.failUnlessEqual(oldshares, res)
1755
1756         d.addCallback(self.find_shares)
1757         d.addCallback(_compare)
1758
1759         d.addCallback(lambda ignore: self.replace_shares({}))
1760         d.addCallback(self.find_shares)
1761         d.addCallback(lambda x: self.failUnlessEqual(x, {}))
1762
1763         return d
1764
1765     def _count_reads(self):
1766         sum_of_read_counts = 0
1767         for client in self.clients:
1768             counters = client.stats_provider.get_stats()['counters']
1769             sum_of_read_counts += counters.get('storage_server.read', 0)
1770         return sum_of_read_counts
1771
1772     def test_check_without_verify(self):
1773         """ Check says the file is healthy when none of the shares have been
1774         touched.  It says that the file is unhealthy when all of them have
1775         been removed. It says that the file is healthy if one bit of one share
1776         has been flipped."""
1777         d = defer.succeed(self.filenode)
1778         def _check1(filenode):
1779             before_check_reads = self._count_reads()
1780
1781             d2 = filenode.check(verify=False)
1782             def _after_check(checkresults):
1783                 after_check_reads = self._count_reads()
1784                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
1785                 self.failUnless(checkresults.is_healthy())
1786
1787             d2.addCallback(_after_check)
1788             return d2
1789         d.addCallback(_check1)
1790
1791         d.addCallback(self._corrupt_a_share)
1792         def _check2(ignored):
1793             before_check_reads = self._count_reads()
1794             d2 = self.filenode.check(verify=False)
1795
1796             def _after_check(checkresults):
1797                 after_check_reads = self._count_reads()
1798                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
1799
1800             d2.addCallback(_after_check)
1801             return d2
1802         d.addCallback(_check2)
1803         return d
1804
1805         d.addCallback(lambda ignore: self.replace_shares({}))
1806         def _check3(ignored):
1807             before_check_reads = self._count_reads()
1808             d2 = self.filenode.check(verify=False)
1809
1810             def _after_check(checkresults):
1811                 after_check_reads = self._count_reads()
1812                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
1813                 self.failIf(checkresults.is_healthy())
1814
1815             d2.addCallback(_after_check)
1816             return d2
1817         d.addCallback(_check3)
1818
1819         return d
1820
1821     def test_check_with_verify(self):
1822         """ Check says the file is healthy when none of the shares have been touched.  It says
1823         that the file is unhealthy if one bit of one share has been flipped."""
1824         DELTA_READS = 10 * 2 # N == 10
1825         d = defer.succeed(self.filenode)
1826         def _check1(filenode):
1827             before_check_reads = self._count_reads()
1828
1829             d2 = filenode.check(verify=True)
1830             def _after_check(checkresults):
1831                 after_check_reads = self._count_reads()
1832                 # print "delta was ", after_check_reads - before_check_reads
1833                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
1834                 self.failUnless(checkresults.is_healthy())
1835
1836             d2.addCallback(_after_check)
1837             return d2
1838         d.addCallback(_check1)
1839
1840         d.addCallback(self._corrupt_a_share)
1841         def _check2(ignored):
1842             before_check_reads = self._count_reads()
1843             d2 = self.filenode.check(verify=True)
1844
1845             def _after_check(checkresults):
1846                 after_check_reads = self._count_reads()
1847                 # print "delta was ", after_check_reads - before_check_reads
1848                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
1849                 self.failIf(checkresults.is_healthy())
1850
1851             d2.addCallback(_after_check)
1852             return d2
1853         d.addCallback(_check2)
1854         return d
1855     test_check_with_verify.todo = "We haven't implemented a verifier this thorough yet."
1856
1857 class MutableChecker(SystemTestMixin, unittest.TestCase):
1858
1859     def _run_cli(self, argv):
1860         stdout, stderr = StringIO(), StringIO()
1861         runner.runner(argv, run_by_human=False, stdout=stdout, stderr=stderr)
1862         return stdout.getvalue()
1863
1864     def test_good(self):
1865         self.basedir = self.mktemp()
1866         d = self.set_up_nodes()
1867         CONTENTS = "a little bit of data"
1868         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1869         def _created(node):
1870             self.node = node
1871             si = self.node.get_storage_index()
1872         d.addCallback(_created)
1873         # now make sure the webapi verifier sees no problems
1874         def _do_check(res):
1875             url = (self.webish_url +
1876                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1877                    "?t=check&verify=true")
1878             return getPage(url, method="POST")
1879         d.addCallback(_do_check)
1880         def _got_results(out):
1881             self.failUnless("<div>Healthy!</div>" in out, out)
1882             self.failUnless("Recoverable Versions: 10*seq1-" in out, out)
1883             self.failIf("Not Healthy!" in out, out)
1884             self.failIf("Unhealthy" in out, out)
1885             self.failIf("Corrupt Shares" in out, out)
1886         d.addCallback(_got_results)
1887         return d
1888
1889     def test_corrupt(self):
1890         self.basedir = self.mktemp()
1891         d = self.set_up_nodes()
1892         CONTENTS = "a little bit of data"
1893         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1894         def _created(node):
1895             self.node = node
1896             si = self.node.get_storage_index()
1897             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1898                                 self.clients[1].basedir])
1899             files = out.split("\n")
1900             # corrupt one of them, using the CLI debug command
1901             f = files[0]
1902             shnum = os.path.basename(f)
1903             nodeid = self.clients[1].nodeid
1904             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1905             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1906             out = self._run_cli(["debug", "corrupt-share", files[0]])
1907         d.addCallback(_created)
1908         # now make sure the webapi verifier notices it
1909         def _do_check(res):
1910             url = (self.webish_url +
1911                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1912                    "?t=check&verify=true")
1913             return getPage(url, method="POST")
1914         d.addCallback(_do_check)
1915         def _got_results(out):
1916             self.failUnless("Not Healthy!" in out, out)
1917             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1918             self.failUnless("Corrupt Shares:" in out, out)
1919         d.addCallback(_got_results)
1920
1921         # now make sure the webapi repairer can fix it
1922         def _do_repair(res):
1923             url = (self.webish_url +
1924                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1925                    "?t=check&verify=true&repair=true")
1926             return getPage(url, method="POST")
1927         d.addCallback(_do_repair)
1928         def _got_repair_results(out):
1929             self.failUnless("<div>Repair successful</div>" in out, out)
1930         d.addCallback(_got_repair_results)
1931         d.addCallback(_do_check)
1932         def _got_postrepair_results(out):
1933             self.failIf("Not Healthy!" in out, out)
1934             self.failUnless("Recoverable Versions: 10*seq" in out, out)
1935         d.addCallback(_got_postrepair_results)
1936
1937         return d
1938
1939     def test_delete_share(self):
1940         self.basedir = self.mktemp()
1941         d = self.set_up_nodes()
1942         CONTENTS = "a little bit of data"
1943         d.addCallback(lambda res: self.clients[0].create_mutable_file(CONTENTS))
1944         def _created(node):
1945             self.node = node
1946             si = self.node.get_storage_index()
1947             out = self._run_cli(["debug", "find-shares", base32.b2a(si),
1948                                 self.clients[1].basedir])
1949             files = out.split("\n")
1950             # corrupt one of them, using the CLI debug command
1951             f = files[0]
1952             shnum = os.path.basename(f)
1953             nodeid = self.clients[1].nodeid
1954             nodeid_prefix = idlib.shortnodeid_b2a(nodeid)
1955             self.corrupt_shareid = "%s-sh%s" % (nodeid_prefix, shnum)
1956             os.unlink(files[0])
1957         d.addCallback(_created)
1958         # now make sure the webapi checker notices it
1959         def _do_check(res):
1960             url = (self.webish_url +
1961                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1962                    "?t=check&verify=false")
1963             return getPage(url, method="POST")
1964         d.addCallback(_do_check)
1965         def _got_results(out):
1966             self.failUnless("Not Healthy!" in out, out)
1967             self.failUnless("Unhealthy: best version has only 9 shares (encoding is 3-of-10)" in out, out)
1968             self.failIf("Corrupt Shares" in out, out)
1969         d.addCallback(_got_results)
1970
1971         # now make sure the webapi repairer can fix it
1972         def _do_repair(res):
1973             url = (self.webish_url +
1974                    "uri/%s" % urllib.quote(self.node.get_uri()) +
1975                    "?t=check&verify=false&repair=true")
1976             return getPage(url, method="POST")
1977         d.addCallback(_do_repair)
1978         def _got_repair_results(out):
1979             self.failUnless("Repair successful" in out)
1980         d.addCallback(_got_repair_results)
1981         d.addCallback(_do_check)
1982         def _got_postrepair_results(out):
1983             self.failIf("Not Healthy!" in out, out)
1984             self.failUnless("Recoverable Versions: 10*seq" in out)
1985         d.addCallback(_got_postrepair_results)
1986
1987         return d
1988
1989 class DeepCheck(SystemTestMixin, unittest.TestCase):
1990     # construct a small directory tree (with one dir, one immutable file, one
1991     # mutable file, one LIT file, and a loop), and then check it in various
1992     # ways.
1993
1994     def set_up_tree(self, ignored):
1995         # 2.9s
1996         c0 = self.clients[0]
1997         d = c0.create_empty_dirnode()
1998         def _created_root(n):
1999             self.root = n
2000             self.root_uri = n.get_uri()
2001         d.addCallback(_created_root)
2002         d.addCallback(lambda ign: c0.create_mutable_file("mutable file contents"))
2003         d.addCallback(lambda n: self.root.set_node(u"mutable", n))
2004         def _created_mutable(n):
2005             self.mutable = n
2006             self.mutable_uri = n.get_uri()
2007         d.addCallback(_created_mutable)
2008
2009         large = upload.Data("Lots of data\n" * 1000, None)
2010         d.addCallback(lambda ign: self.root.add_file(u"large", large))
2011         def _created_large(n):
2012             self.large = n
2013             self.large_uri = n.get_uri()
2014         d.addCallback(_created_large)
2015
2016         small = upload.Data("Small enough for a LIT", None)
2017         d.addCallback(lambda ign: self.root.add_file(u"small", small))
2018         def _created_small(n):
2019             self.small = n
2020             self.small_uri = n.get_uri()
2021         d.addCallback(_created_small)
2022
2023         d.addCallback(lambda ign: self.root.set_node(u"loop", self.root))
2024         return d
2025
2026     def check_is_healthy(self, cr, n, where, incomplete=False):
2027         self.failUnless(ICheckerResults.providedBy(cr), where)
2028         self.failUnless(cr.is_healthy(), where)
2029         self.failUnlessEqual(cr.get_storage_index(), n.get_storage_index(),
2030                              where)
2031         self.failUnlessEqual(cr.get_storage_index_string(),
2032                              base32.b2a(n.get_storage_index()), where)
2033         needs_rebalancing = bool( len(self.clients) < 10 )
2034         if not incomplete:
2035             self.failUnlessEqual(cr.needs_rebalancing(), needs_rebalancing, where)
2036         d = cr.get_data()
2037         self.failUnlessEqual(d["count-shares-good"], 10, where)
2038         self.failUnlessEqual(d["count-shares-needed"], 3, where)
2039         self.failUnlessEqual(d["count-shares-expected"], 10, where)
2040         if not incomplete:
2041             self.failUnlessEqual(d["count-good-share-hosts"], len(self.clients), where)
2042         self.failUnlessEqual(d["count-corrupt-shares"], 0, where)
2043         self.failUnlessEqual(d["list-corrupt-shares"], [], where)
2044         if not incomplete:
2045             self.failUnlessEqual(sorted(d["servers-responding"]),
2046                                  sorted([c.nodeid for c in self.clients]),
2047                                  where)
2048             self.failUnless("sharemap" in d, where)
2049             all_serverids = set()
2050             for (shareid, serverids) in d["sharemap"].items():
2051                 all_serverids.update(serverids)
2052             self.failUnlessEqual(sorted(all_serverids),
2053                                  sorted([c.nodeid for c in self.clients]),
2054                                  where)
2055
2056         self.failUnlessEqual(d["count-wrong-shares"], 0, where)
2057         self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
2058         self.failUnlessEqual(d["count-unrecoverable-versions"], 0, where)
2059
2060
2061     def check_and_repair_is_healthy(self, cr, n, where, incomplete=False):
2062         self.failUnless(ICheckAndRepairResults.providedBy(cr), where)
2063         self.failUnless(cr.get_pre_repair_results().is_healthy(), where)
2064         self.check_is_healthy(cr.get_pre_repair_results(), n, where, incomplete)
2065         self.failUnless(cr.get_post_repair_results().is_healthy(), where)
2066         self.check_is_healthy(cr.get_post_repair_results(), n, where, incomplete)
2067         self.failIf(cr.get_repair_attempted(), where)
2068
2069     def deep_check_is_healthy(self, cr, num_healthy, where):
2070         self.failUnless(IDeepCheckResults.providedBy(cr))
2071         self.failUnlessEqual(cr.get_counters()["count-objects-healthy"],
2072                              num_healthy, where)
2073
2074     def deep_check_and_repair_is_healthy(self, cr, num_healthy, where):
2075         self.failUnless(IDeepCheckAndRepairResults.providedBy(cr), where)
2076         c = cr.get_counters()
2077         self.failUnlessEqual(c["count-objects-healthy-pre-repair"],
2078                              num_healthy, where)
2079         self.failUnlessEqual(c["count-objects-healthy-post-repair"],
2080                              num_healthy, where)
2081         self.failUnlessEqual(c["count-repairs-attempted"], 0, where)
2082
2083     def test_good(self):
2084         self.basedir = self.mktemp()
2085         d = self.set_up_nodes()
2086         d.addCallback(self.set_up_tree)
2087         d.addCallback(self.do_stats)
2088         d.addCallback(self.do_test_good)
2089         d.addCallback(self.do_test_web)
2090         return d
2091
2092     def do_stats(self, ignored):
2093         d = defer.succeed(None)
2094         d.addCallback(lambda ign: self.root.deep_stats())
2095         d.addCallback(self.check_stats)
2096         return d
2097
2098     def check_stats(self, s):
2099         self.failUnlessEqual(s["count-directories"], 1)
2100         self.failUnlessEqual(s["count-files"], 3)
2101         self.failUnlessEqual(s["count-immutable-files"], 1)
2102         self.failUnlessEqual(s["count-literal-files"], 1)
2103         self.failUnlessEqual(s["count-mutable-files"], 1)
2104         # don't check directories: their size will vary
2105         # s["largest-directory"]
2106         # s["size-directories"]
2107         self.failUnlessEqual(s["largest-directory-children"], 4)
2108         self.failUnlessEqual(s["largest-immutable-file"], 13000)
2109         # to re-use this function for both the local dirnode.deep_stats() and
2110         # the webapi t=deep-stats, we coerce the result into a list of
2111         # tuples. dirnode.deep_stats() returns a list of tuples, but JSON
2112         # only knows about lists., so t=deep-stats returns a list of lists.
2113         histogram = [tuple(stuff) for stuff in s["size-files-histogram"]]
2114         self.failUnlessEqual(histogram, [(11, 31, 1),
2115                                          (10001, 31622, 1),
2116                                          ])
2117         self.failUnlessEqual(s["size-immutable-files"], 13000)
2118         self.failUnlessEqual(s["size-literal-files"], 22)
2119
2120     def do_test_good(self, ignored):
2121         d = defer.succeed(None)
2122         # check the individual items
2123         d.addCallback(lambda ign: self.root.check())
2124         d.addCallback(self.check_is_healthy, self.root, "root")
2125         d.addCallback(lambda ign: self.mutable.check())
2126         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2127         d.addCallback(lambda ign: self.large.check())
2128         d.addCallback(self.check_is_healthy, self.large, "large")
2129         d.addCallback(lambda ign: self.small.check())
2130         d.addCallback(self.failUnlessEqual, None, "small")
2131
2132         # and again with verify=True
2133         d.addCallback(lambda ign: self.root.check(verify=True))
2134         d.addCallback(self.check_is_healthy, self.root, "root")
2135         d.addCallback(lambda ign: self.mutable.check(verify=True))
2136         d.addCallback(self.check_is_healthy, self.mutable, "mutable")
2137         d.addCallback(lambda ign: self.large.check(verify=True))
2138         d.addCallback(self.check_is_healthy, self.large, "large",
2139                       incomplete=True)
2140         d.addCallback(lambda ign: self.small.check(verify=True))
2141         d.addCallback(self.failUnlessEqual, None, "small")
2142
2143         # and check_and_repair(), which should be a nop
2144         d.addCallback(lambda ign: self.root.check_and_repair())
2145         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2146         d.addCallback(lambda ign: self.mutable.check_and_repair())
2147         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2148         d.addCallback(lambda ign: self.large.check_and_repair())
2149         d.addCallback(self.check_and_repair_is_healthy, self.large, "large")
2150         d.addCallback(lambda ign: self.small.check_and_repair())
2151         d.addCallback(self.failUnlessEqual, None, "small")
2152
2153         # check_and_repair(verify=True)
2154         d.addCallback(lambda ign: self.root.check_and_repair(verify=True))
2155         d.addCallback(self.check_and_repair_is_healthy, self.root, "root")
2156         d.addCallback(lambda ign: self.mutable.check_and_repair(verify=True))
2157         d.addCallback(self.check_and_repair_is_healthy, self.mutable, "mutable")
2158         d.addCallback(lambda ign: self.large.check_and_repair(verify=True))
2159         d.addCallback(self.check_and_repair_is_healthy, self.large, "large",
2160                       incomplete=True)
2161         d.addCallback(lambda ign: self.small.check_and_repair(verify=True))
2162         d.addCallback(self.failUnlessEqual, None, "small")
2163
2164
2165         # now deep-check on everything. It should be safe to use deep-check
2166         # on anything, even a regular file.
2167         d.addCallback(lambda ign: self.root.deep_check())
2168         d.addCallback(self.deep_check_is_healthy, 3, "root")
2169         d.addCallback(lambda ign: self.mutable.deep_check())
2170         d.addCallback(self.deep_check_is_healthy, 1, "mutable")
2171         d.addCallback(lambda ign: self.large.deep_check())
2172         d.addCallback(self.deep_check_is_healthy, 1, "large")
2173         d.addCallback(lambda ign: self.small.deep_check())
2174         d.addCallback(self.deep_check_is_healthy, 0, "small")
2175
2176         # deep-check verify=True
2177         d.addCallback(lambda ign: self.root.deep_check(verify=True))
2178         d.addCallback(self.deep_check_is_healthy, 3, "root")
2179         d.addCallback(lambda ign: self.mutable.deep_check(verify=True))
2180         d.addCallback(self.deep_check_is_healthy, 1, "mutable")
2181         d.addCallback(lambda ign: self.large.deep_check(verify=True))
2182         d.addCallback(self.deep_check_is_healthy, 1, "large")
2183         d.addCallback(lambda ign: self.small.deep_check(verify=True))
2184         d.addCallback(self.deep_check_is_healthy, 0, "small")
2185
2186         # deep-check-and-repair
2187         d.addCallback(lambda ign: self.root.deep_check_and_repair())
2188         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2189         d.addCallback(lambda ign: self.mutable.deep_check_and_repair())
2190         d.addCallback(self.deep_check_and_repair_is_healthy, 1, "mutable")
2191         d.addCallback(lambda ign: self.large.deep_check_and_repair())
2192         d.addCallback(self.deep_check_and_repair_is_healthy, 1, "large")
2193         d.addCallback(lambda ign: self.small.deep_check_and_repair())
2194         d.addCallback(self.deep_check_and_repair_is_healthy, 0, "small")
2195
2196         # deep-check-and-repair, verify=True
2197         d.addCallback(lambda ign: self.root.deep_check_and_repair(verify=True))
2198         d.addCallback(self.deep_check_and_repair_is_healthy, 3, "root")
2199         d.addCallback(lambda ign: self.mutable.deep_check_and_repair(verify=True))
2200         d.addCallback(self.deep_check_and_repair_is_healthy, 1, "mutable")
2201         d.addCallback(lambda ign: self.large.deep_check_and_repair(verify=True))
2202         d.addCallback(self.deep_check_and_repair_is_healthy, 1, "large")
2203         d.addCallback(lambda ign: self.small.deep_check_and_repair(verify=True))
2204         d.addCallback(self.deep_check_and_repair_is_healthy, 0, "small")
2205
2206         return d
2207
2208     def web_json(self, n, **kwargs):
2209         kwargs["output"] = "json"
2210         return self.web(n, "POST", **kwargs)
2211
2212     def web(self, n, method="GET", **kwargs):
2213         url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
2214                + "?" + "&".join(["%s=%s" % (k,v) for (k,v) in kwargs.items()]))
2215         d = getPage(url, method=method)
2216         def _decode(s):
2217             try:
2218                 data = simplejson.loads(s)
2219             except ValueError:
2220                 self.fail("%s: not JSON: '%s'" % (url, s))
2221             return data
2222         d.addCallback(_decode)
2223         return d
2224
2225     def json_check_is_healthy(self, data, n, where, incomplete=False):
2226
2227         self.failUnlessEqual(data["storage-index"],
2228                              base32.b2a(n.get_storage_index()), where)
2229         r = data["results"]
2230         self.failUnlessEqual(r["healthy"], True, where)
2231         needs_rebalancing = bool( len(self.clients) < 10 )
2232         if not incomplete:
2233             self.failUnlessEqual(r["needs-rebalancing"], needs_rebalancing, where)
2234         self.failUnlessEqual(r["count-shares-good"], 10, where)
2235         self.failUnlessEqual(r["count-shares-needed"], 3, where)
2236         self.failUnlessEqual(r["count-shares-expected"], 10, where)
2237         if not incomplete:
2238             self.failUnlessEqual(r["count-good-share-hosts"], len(self.clients), where)
2239         self.failUnlessEqual(r["count-corrupt-shares"], 0, where)
2240         self.failUnlessEqual(r["list-corrupt-shares"], [], where)
2241         if not incomplete:
2242             self.failUnlessEqual(sorted(r["servers-responding"]),
2243                                  sorted([idlib.nodeid_b2a(c.nodeid)
2244                                          for c in self.clients]), where)
2245             self.failUnless("sharemap" in r, where)
2246             all_serverids = set()
2247             for (shareid, serverids_s) in r["sharemap"].items():
2248                 all_serverids.update(serverids_s)
2249             self.failUnlessEqual(sorted(all_serverids),
2250                                  sorted([idlib.nodeid_b2a(c.nodeid)
2251                                          for c in self.clients]), where)
2252         self.failUnlessEqual(r["count-wrong-shares"], 0, where)
2253         self.failUnlessEqual(r["count-recoverable-versions"], 1, where)
2254         self.failUnlessEqual(r["count-unrecoverable-versions"], 0, where)
2255
2256     def json_check_and_repair_is_healthy(self, data, n, where, incomplete=False):
2257         self.failUnlessEqual(data["storage-index"],
2258                              base32.b2a(n.get_storage_index()), where)
2259         self.failUnlessEqual(data["repair-attempted"], False, where)
2260         self.json_check_is_healthy(data["pre-repair-results"],
2261                                    n, where, incomplete)
2262         self.json_check_is_healthy(data["post-repair-results"],
2263                                    n, where, incomplete)
2264
2265     def json_full_deepcheck_is_healthy(self, data, n, where):
2266         self.failUnlessEqual(data["root-storage-index"],
2267                              base32.b2a(n.get_storage_index()), where)
2268         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2269         self.failUnlessEqual(data["count-objects-healthy"], 3, where)
2270         self.failUnlessEqual(data["count-objects-unhealthy"], 0, where)
2271         self.failUnlessEqual(data["count-corrupt-shares"], 0, where)
2272         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2273         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2274
2275     def json_full_deepcheck_and_repair_is_healthy(self, data, n, where):
2276         self.failUnlessEqual(data["root-storage-index"],
2277                              base32.b2a(n.get_storage_index()), where)
2278         self.failUnlessEqual(data["count-objects-checked"], 3, where)
2279
2280         self.failUnlessEqual(data["count-objects-healthy-pre-repair"], 3, where)
2281         self.failUnlessEqual(data["count-objects-unhealthy-pre-repair"], 0, where)
2282         self.failUnlessEqual(data["count-corrupt-shares-pre-repair"], 0, where)
2283
2284         self.failUnlessEqual(data["count-objects-healthy-post-repair"], 3, where)
2285         self.failUnlessEqual(data["count-objects-unhealthy-post-repair"], 0, where)
2286         self.failUnlessEqual(data["count-corrupt-shares-post-repair"], 0, where)
2287
2288         self.failUnlessEqual(data["list-corrupt-shares"], [], where)
2289         self.failUnlessEqual(data["list-remaining-corrupt-shares"], [], where)
2290         self.failUnlessEqual(data["list-unhealthy-files"], [], where)
2291
2292         self.failUnlessEqual(data["count-repairs-attempted"], 0, where)
2293         self.failUnlessEqual(data["count-repairs-successful"], 0, where)
2294         self.failUnlessEqual(data["count-repairs-unsuccessful"], 0, where)
2295
2296
2297     def json_check_lit(self, data, n, where):
2298         self.failUnlessEqual(data["storage-index"], "", where)
2299         self.failUnlessEqual(data["results"]["healthy"], True, where)
2300
2301     def json_check_stats(self, data, where):
2302         self.check_stats(data)
2303
2304     def do_test_web(self, ignored):
2305         d = defer.succeed(None)
2306
2307         # stats
2308         d.addCallback(lambda ign: self.web(self.root, t="deep-stats"))
2309         d.addCallback(self.json_check_stats, "deep-stats")
2310
2311         # check, no verify
2312         d.addCallback(lambda ign: self.web_json(self.root, t="check"))
2313         d.addCallback(self.json_check_is_healthy, self.root, "root")
2314         d.addCallback(lambda ign: self.web_json(self.mutable, t="check"))
2315         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2316         d.addCallback(lambda ign: self.web_json(self.large, t="check"))
2317         d.addCallback(self.json_check_is_healthy, self.large, "large")
2318         d.addCallback(lambda ign: self.web_json(self.small, t="check"))
2319         d.addCallback(self.json_check_lit, self.small, "small")
2320
2321         # check and verify
2322         d.addCallback(lambda ign:
2323                       self.web_json(self.root, t="check", verify="true"))
2324         d.addCallback(self.json_check_is_healthy, self.root, "root")
2325         d.addCallback(lambda ign:
2326                       self.web_json(self.mutable, t="check", verify="true"))
2327         d.addCallback(self.json_check_is_healthy, self.mutable, "mutable")
2328         d.addCallback(lambda ign:
2329                       self.web_json(self.large, t="check", verify="true"))
2330         d.addCallback(self.json_check_is_healthy, self.large, "large", incomplete=True)
2331         d.addCallback(lambda ign:
2332                       self.web_json(self.small, t="check", verify="true"))
2333         d.addCallback(self.json_check_lit, self.small, "small")
2334
2335         # check and repair, no verify
2336         d.addCallback(lambda ign:
2337                       self.web_json(self.root, t="check", repair="true"))
2338         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2339         d.addCallback(lambda ign:
2340                       self.web_json(self.mutable, t="check", repair="true"))
2341         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2342         d.addCallback(lambda ign:
2343                       self.web_json(self.large, t="check", repair="true"))
2344         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large")
2345         d.addCallback(lambda ign:
2346                       self.web_json(self.small, t="check", repair="true"))
2347         d.addCallback(self.json_check_lit, self.small, "small")
2348
2349         # check+verify+repair
2350         d.addCallback(lambda ign:
2351                       self.web_json(self.root, t="check", repair="true", verify="true"))
2352         d.addCallback(self.json_check_and_repair_is_healthy, self.root, "root")
2353         d.addCallback(lambda ign:
2354                       self.web_json(self.mutable, t="check", repair="true", verify="true"))
2355         d.addCallback(self.json_check_and_repair_is_healthy, self.mutable, "mutable")
2356         d.addCallback(lambda ign:
2357                       self.web_json(self.large, t="check", repair="true", verify="true"))
2358         d.addCallback(self.json_check_and_repair_is_healthy, self.large, "large", incomplete=True)
2359         d.addCallback(lambda ign:
2360                       self.web_json(self.small, t="check", repair="true", verify="true"))
2361         d.addCallback(self.json_check_lit, self.small, "small")
2362
2363         # now run a deep-check. When done through the web, this can only be
2364         # run on a directory.
2365         d.addCallback(lambda ign:
2366                       self.web_json(self.root, t="deep-check"))
2367         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2368         d.addCallback(lambda ign:
2369                       self.web_json(self.root, t="deep-check", verify="true"))
2370         d.addCallback(self.json_full_deepcheck_is_healthy, self.root, "root")
2371         d.addCallback(lambda ign:
2372                       self.web_json(self.root, t="deep-check", repair="true"))
2373         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2374         d.addCallback(lambda ign:
2375                       self.web_json(self.root, t="deep-check", verify="true", repair="true"))
2376         d.addCallback(self.json_full_deepcheck_and_repair_is_healthy, self.root, "root")
2377
2378         return d