]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
Replace uses of os.path.abspath with abspath_expanduser_unicode where necessary....
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 import allmydata
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.util.encodingutil import quote_output
16 from allmydata.util.fileutil import abspath_expanduser_unicode
17 from allmydata.util.consumer import MemoryConsumer, download_to_data
18 from allmydata.scripts import runner
19 from allmydata.interfaces import IDirectoryNode, IFileNode, \
20      NoSuchChildError, NoSharesError
21 from allmydata.monitor import Monitor
22 from allmydata.mutable.common import NotWriteableError
23 from allmydata.mutable import layout as mutable_layout
24 from foolscap.api import DeadReferenceError
25 from twisted.python.failure import Failure
26 from twisted.web.client import getPage
27 from twisted.web.error import Error
28
29 from allmydata.test.common import SystemTestMixin
30
31 LARGE_DATA = """
32 This is some data to publish to the remote grid.., which needs to be large
33 enough to not fit inside a LIT uri.
34 """
35
36 class CountingDataUploadable(upload.Data):
37     bytes_read = 0
38     interrupt_after = None
39     interrupt_after_d = None
40
41     def read(self, length):
42         self.bytes_read += length
43         if self.interrupt_after is not None:
44             if self.bytes_read > self.interrupt_after:
45                 self.interrupt_after = None
46                 self.interrupt_after_d.callback(self)
47         return upload.Data.read(self, length)
48
49 class SystemTest(SystemTestMixin, unittest.TestCase):
50     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
51
52     def test_connections(self):
53         self.basedir = "system/SystemTest/test_connections"
54         d = self.set_up_nodes()
55         self.extra_node = None
56         d.addCallback(lambda res: self.add_extra_node(self.numclients))
57         def _check(extra_node):
58             self.extra_node = extra_node
59             for c in self.clients:
60                 all_peerids = c.get_storage_broker().get_all_serverids()
61                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
62                 sb = c.storage_broker
63                 permuted_peers = sb.get_servers_for_index("a")
64                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
65
66         d.addCallback(_check)
67         def _shutdown_extra_node(res):
68             if self.extra_node:
69                 return self.extra_node.stopService()
70             return res
71         d.addBoth(_shutdown_extra_node)
72         return d
73     # test_connections is subsumed by test_upload_and_download, and takes
74     # quite a while to run on a slow machine (because of all the TLS
75     # connections that must be established). If we ever rework the introducer
76     # code to such an extent that we're not sure if it works anymore, we can
77     # reinstate this test until it does.
78     del test_connections
79
80     def test_upload_and_download_random_key(self):
81         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
82         return self._test_upload_and_download(convergence=None)
83
84     def test_upload_and_download_convergent(self):
85         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
86         return self._test_upload_and_download(convergence="some convergence string")
87
88     def _test_upload_and_download(self, convergence):
89         # we use 4000 bytes of data, which will result in about 400k written
90         # to disk among all our simulated nodes
91         DATA = "Some data to upload\n" * 200
92         d = self.set_up_nodes()
93         def _check_connections(res):
94             for c in self.clients:
95                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
96                 all_peerids = c.get_storage_broker().get_all_serverids()
97                 self.failUnlessEqual(len(all_peerids), self.numclients)
98                 sb = c.storage_broker
99                 permuted_peers = sb.get_servers_for_index("a")
100                 self.failUnlessEqual(len(permuted_peers), self.numclients)
101         d.addCallback(_check_connections)
102
103         def _do_upload(res):
104             log.msg("UPLOADING")
105             u = self.clients[0].getServiceNamed("uploader")
106             self.uploader = u
107             # we crank the max segsize down to 1024b for the duration of this
108             # test, so we can exercise multiple segments. It is important
109             # that this is not a multiple of the segment size, so that the
110             # tail segment is not the same length as the others. This actualy
111             # gets rounded up to 1025 to be a multiple of the number of
112             # required shares (since we use 25 out of 100 FEC).
113             up = upload.Data(DATA, convergence=convergence)
114             up.max_segment_size = 1024
115             d1 = u.upload(up)
116             return d1
117         d.addCallback(_do_upload)
118         def _upload_done(results):
119             theuri = results.uri
120             log.msg("upload finished: uri is %s" % (theuri,))
121             self.uri = theuri
122             assert isinstance(self.uri, str), self.uri
123             self.cap = uri.from_string(self.uri)
124             self.n = self.clients[1].create_node_from_uri(self.uri)
125         d.addCallback(_upload_done)
126
127         def _upload_again(res):
128             # Upload again. If using convergent encryption then this ought to be
129             # short-circuited, however with the way we currently generate URIs
130             # (i.e. because they include the roothash), we have to do all of the
131             # encoding work, and only get to save on the upload part.
132             log.msg("UPLOADING AGAIN")
133             up = upload.Data(DATA, convergence=convergence)
134             up.max_segment_size = 1024
135             return self.uploader.upload(up)
136         d.addCallback(_upload_again)
137
138         def _download_to_data(res):
139             log.msg("DOWNLOADING")
140             return download_to_data(self.n)
141         d.addCallback(_download_to_data)
142         def _download_to_data_done(data):
143             log.msg("download finished")
144             self.failUnlessEqual(data, DATA)
145         d.addCallback(_download_to_data_done)
146
147         def _test_read(res):
148             n = self.clients[1].create_node_from_uri(self.uri)
149             d = download_to_data(n)
150             def _read_done(data):
151                 self.failUnlessEqual(data, DATA)
152             d.addCallback(_read_done)
153             d.addCallback(lambda ign:
154                           n.read(MemoryConsumer(), offset=1, size=4))
155             def _read_portion_done(mc):
156                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
157             d.addCallback(_read_portion_done)
158             d.addCallback(lambda ign:
159                           n.read(MemoryConsumer(), offset=2, size=None))
160             def _read_tail_done(mc):
161                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
162             d.addCallback(_read_tail_done)
163             d.addCallback(lambda ign:
164                           n.read(MemoryConsumer(), size=len(DATA)+1000))
165             def _read_too_much(mc):
166                 self.failUnlessEqual("".join(mc.chunks), DATA)
167             d.addCallback(_read_too_much)
168
169             return d
170         d.addCallback(_test_read)
171
172         def _test_bad_read(res):
173             bad_u = uri.from_string_filenode(self.uri)
174             bad_u.key = self.flip_bit(bad_u.key)
175             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
176             # this should cause an error during download
177
178             d = self.shouldFail2(NoSharesError, "'download bad node'",
179                                  None,
180                                  bad_n.read, MemoryConsumer(), offset=2)
181             return d
182         d.addCallback(_test_bad_read)
183
184         def _download_nonexistent_uri(res):
185             baduri = self.mangle_uri(self.uri)
186             badnode = self.clients[1].create_node_from_uri(baduri)
187             log.msg("about to download non-existent URI", level=log.UNUSUAL,
188                     facility="tahoe.tests")
189             d1 = download_to_data(badnode)
190             def _baduri_should_fail(res):
191                 log.msg("finished downloading non-existend URI",
192                         level=log.UNUSUAL, facility="tahoe.tests")
193                 self.failUnless(isinstance(res, Failure))
194                 self.failUnless(res.check(NoSharesError),
195                                 "expected NoSharesError, got %s" % res)
196             d1.addBoth(_baduri_should_fail)
197             return d1
198         d.addCallback(_download_nonexistent_uri)
199
200         # add a new node, which doesn't accept shares, and only uses the
201         # helper for upload.
202         d.addCallback(lambda res: self.add_extra_node(self.numclients,
203                                                       self.helper_furl,
204                                                       add_to_sparent=True))
205         def _added(extra_node):
206             self.extra_node = extra_node
207             self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
208         d.addCallback(_added)
209
210         HELPER_DATA = "Data that needs help to upload" * 1000
211         def _upload_with_helper(res):
212             u = upload.Data(HELPER_DATA, convergence=convergence)
213             d = self.extra_node.upload(u)
214             def _uploaded(results):
215                 n = self.clients[1].create_node_from_uri(results.uri)
216                 return download_to_data(n)
217             d.addCallback(_uploaded)
218             def _check(newdata):
219                 self.failUnlessEqual(newdata, HELPER_DATA)
220             d.addCallback(_check)
221             return d
222         d.addCallback(_upload_with_helper)
223
224         def _upload_duplicate_with_helper(res):
225             u = upload.Data(HELPER_DATA, convergence=convergence)
226             u.debug_stash_RemoteEncryptedUploadable = True
227             d = self.extra_node.upload(u)
228             def _uploaded(results):
229                 n = self.clients[1].create_node_from_uri(results.uri)
230                 return download_to_data(n)
231             d.addCallback(_uploaded)
232             def _check(newdata):
233                 self.failUnlessEqual(newdata, HELPER_DATA)
234                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
235                             "uploadable started uploading, should have been avoided")
236             d.addCallback(_check)
237             return d
238         if convergence is not None:
239             d.addCallback(_upload_duplicate_with_helper)
240
241         def _upload_resumable(res):
242             DATA = "Data that needs help to upload and gets interrupted" * 1000
243             u1 = CountingDataUploadable(DATA, convergence=convergence)
244             u2 = CountingDataUploadable(DATA, convergence=convergence)
245
246             # we interrupt the connection after about 5kB by shutting down
247             # the helper, then restartingit.
248             u1.interrupt_after = 5000
249             u1.interrupt_after_d = defer.Deferred()
250             u1.interrupt_after_d.addCallback(lambda res:
251                                              self.bounce_client(0))
252
253             # sneak into the helper and reduce its chunk size, so that our
254             # debug_interrupt will sever the connection on about the fifth
255             # chunk fetched. This makes sure that we've started to write the
256             # new shares before we abandon them, which exercises the
257             # abort/delete-partial-share code. TODO: find a cleaner way to do
258             # this. I know that this will affect later uses of the helper in
259             # this same test run, but I'm not currently worried about it.
260             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
261
262             d = self.extra_node.upload(u1)
263
264             def _should_not_finish(res):
265                 self.fail("interrupted upload should have failed, not finished"
266                           " with result %s" % (res,))
267             def _interrupted(f):
268                 f.trap(DeadReferenceError)
269
270                 # make sure we actually interrupted it before finishing the
271                 # file
272                 self.failUnless(u1.bytes_read < len(DATA),
273                                 "read %d out of %d total" % (u1.bytes_read,
274                                                              len(DATA)))
275
276                 log.msg("waiting for reconnect", level=log.NOISY,
277                         facility="tahoe.test.test_system")
278                 # now, we need to give the nodes a chance to notice that this
279                 # connection has gone away. When this happens, the storage
280                 # servers will be told to abort their uploads, removing the
281                 # partial shares. Unfortunately this involves TCP messages
282                 # going through the loopback interface, and we can't easily
283                 # predict how long that will take. If it were all local, we
284                 # could use fireEventually() to stall. Since we don't have
285                 # the right introduction hooks, the best we can do is use a
286                 # fixed delay. TODO: this is fragile.
287                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
288                 return u1.interrupt_after_d
289             d.addCallbacks(_should_not_finish, _interrupted)
290
291             def _disconnected(res):
292                 # check to make sure the storage servers aren't still hanging
293                 # on to the partial share: their incoming/ directories should
294                 # now be empty.
295                 log.msg("disconnected", level=log.NOISY,
296                         facility="tahoe.test.test_system")
297                 for i in range(self.numclients):
298                     incdir = os.path.join(self.getdir("client%d" % i),
299                                           "storage", "shares", "incoming")
300                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
301             d.addCallback(_disconnected)
302
303             # then we need to give the reconnector a chance to
304             # reestablish the connection to the helper.
305             d.addCallback(lambda res:
306                           log.msg("wait_for_connections", level=log.NOISY,
307                                   facility="tahoe.test.test_system"))
308             d.addCallback(lambda res: self.wait_for_connections())
309
310
311             d.addCallback(lambda res:
312                           log.msg("uploading again", level=log.NOISY,
313                                   facility="tahoe.test.test_system"))
314             d.addCallback(lambda res: self.extra_node.upload(u2))
315
316             def _uploaded(results):
317                 cap = results.uri
318                 log.msg("Second upload complete", level=log.NOISY,
319                         facility="tahoe.test.test_system")
320
321                 # this is really bytes received rather than sent, but it's
322                 # convenient and basically measures the same thing
323                 bytes_sent = results.ciphertext_fetched
324                 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
325
326                 # We currently don't support resumption of upload if the data is
327                 # encrypted with a random key.  (Because that would require us
328                 # to store the key locally and re-use it on the next upload of
329                 # this file, which isn't a bad thing to do, but we currently
330                 # don't do it.)
331                 if convergence is not None:
332                     # Make sure we did not have to read the whole file the
333                     # second time around .
334                     self.failUnless(bytes_sent < len(DATA),
335                                     "resumption didn't save us any work:"
336                                     " read %r bytes out of %r total" %
337                                     (bytes_sent, len(DATA)))
338                 else:
339                     # Make sure we did have to read the whole file the second
340                     # time around -- because the one that we partially uploaded
341                     # earlier was encrypted with a different random key.
342                     self.failIf(bytes_sent < len(DATA),
343                                 "resumption saved us some work even though we were using random keys:"
344                                 " read %r bytes out of %r total" %
345                                 (bytes_sent, len(DATA)))
346                 n = self.clients[1].create_node_from_uri(cap)
347                 return download_to_data(n)
348             d.addCallback(_uploaded)
349
350             def _check(newdata):
351                 self.failUnlessEqual(newdata, DATA)
352                 # If using convergent encryption, then also check that the
353                 # helper has removed the temp file from its directories.
354                 if convergence is not None:
355                     basedir = os.path.join(self.getdir("client0"), "helper")
356                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
357                     self.failUnlessEqual(files, [])
358                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
359                     self.failUnlessEqual(files, [])
360             d.addCallback(_check)
361             return d
362         d.addCallback(_upload_resumable)
363
364         def _grab_stats(ignored):
365             # the StatsProvider doesn't normally publish a FURL:
366             # instead it passes a live reference to the StatsGatherer
367             # (if and when it connects). To exercise the remote stats
368             # interface, we manually publish client0's StatsProvider
369             # and use client1 to query it.
370             sp = self.clients[0].stats_provider
371             sp_furl = self.clients[0].tub.registerReference(sp)
372             d = self.clients[1].tub.getReference(sp_furl)
373             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
374             def _got_stats(stats):
375                 #print "STATS"
376                 #from pprint import pprint
377                 #pprint(stats)
378                 s = stats["stats"]
379                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
380                 c = stats["counters"]
381                 self.failUnless("storage_server.allocate" in c)
382             d.addCallback(_got_stats)
383             return d
384         d.addCallback(_grab_stats)
385
386         return d
387
388     def _find_all_shares(self, basedir):
389         shares = []
390         for (dirpath, dirnames, filenames) in os.walk(basedir):
391             if "storage" not in dirpath:
392                 continue
393             if not filenames:
394                 continue
395             pieces = dirpath.split(os.sep)
396             if (len(pieces) >= 5
397                 and pieces[-4] == "storage"
398                 and pieces[-3] == "shares"):
399                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
400                 # are sharefiles here
401                 assert pieces[-5].startswith("client")
402                 client_num = int(pieces[-5][-1])
403                 storage_index_s = pieces[-1]
404                 storage_index = si_a2b(storage_index_s)
405                 for sharename in filenames:
406                     shnum = int(sharename)
407                     filename = os.path.join(dirpath, sharename)
408                     data = (client_num, storage_index, filename, shnum)
409                     shares.append(data)
410         if not shares:
411             self.fail("unable to find any share files in %s" % basedir)
412         return shares
413
414     def _corrupt_mutable_share(self, filename, which):
415         msf = MutableShareFile(filename)
416         datav = msf.readv([ (0, 1000000) ])
417         final_share = datav[0]
418         assert len(final_share) < 1000000 # ought to be truncated
419         pieces = mutable_layout.unpack_share(final_share)
420         (seqnum, root_hash, IV, k, N, segsize, datalen,
421          verification_key, signature, share_hash_chain, block_hash_tree,
422          share_data, enc_privkey) = pieces
423
424         if which == "seqnum":
425             seqnum = seqnum + 15
426         elif which == "R":
427             root_hash = self.flip_bit(root_hash)
428         elif which == "IV":
429             IV = self.flip_bit(IV)
430         elif which == "segsize":
431             segsize = segsize + 15
432         elif which == "pubkey":
433             verification_key = self.flip_bit(verification_key)
434         elif which == "signature":
435             signature = self.flip_bit(signature)
436         elif which == "share_hash_chain":
437             nodenum = share_hash_chain.keys()[0]
438             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
439         elif which == "block_hash_tree":
440             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
441         elif which == "share_data":
442             share_data = self.flip_bit(share_data)
443         elif which == "encprivkey":
444             enc_privkey = self.flip_bit(enc_privkey)
445
446         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
447                                             segsize, datalen)
448         final_share = mutable_layout.pack_share(prefix,
449                                                 verification_key,
450                                                 signature,
451                                                 share_hash_chain,
452                                                 block_hash_tree,
453                                                 share_data,
454                                                 enc_privkey)
455         msf.writev( [(0, final_share)], None)
456
457
458     def test_mutable(self):
459         self.basedir = "system/SystemTest/test_mutable"
460         DATA = "initial contents go here."  # 25 bytes % 3 != 0
461         NEWDATA = "new contents yay"
462         NEWERDATA = "this is getting old"
463
464         d = self.set_up_nodes(use_key_generator=True)
465
466         def _create_mutable(res):
467             c = self.clients[0]
468             log.msg("starting create_mutable_file")
469             d1 = c.create_mutable_file(DATA)
470             def _done(res):
471                 log.msg("DONE: %s" % (res,))
472                 self._mutable_node_1 = res
473             d1.addCallback(_done)
474             return d1
475         d.addCallback(_create_mutable)
476
477         def _test_debug(res):
478             # find a share. It is important to run this while there is only
479             # one slot in the grid.
480             shares = self._find_all_shares(self.basedir)
481             (client_num, storage_index, filename, shnum) = shares[0]
482             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
483                     % filename)
484             log.msg(" for clients[%d]" % client_num)
485
486             out,err = StringIO(), StringIO()
487             rc = runner.runner(["debug", "dump-share", "--offsets",
488                                 filename],
489                                stdout=out, stderr=err)
490             output = out.getvalue()
491             self.failUnlessEqual(rc, 0)
492             try:
493                 self.failUnless("Mutable slot found:\n" in output)
494                 self.failUnless("share_type: SDMF\n" in output)
495                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
496                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
497                 self.failUnless(" num_extra_leases: 0\n" in output)
498                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
499                                 in output)
500                 self.failUnless(" SDMF contents:\n" in output)
501                 self.failUnless("  seqnum: 1\n" in output)
502                 self.failUnless("  required_shares: 3\n" in output)
503                 self.failUnless("  total_shares: 10\n" in output)
504                 self.failUnless("  segsize: 27\n" in output, (output, filename))
505                 self.failUnless("  datalen: 25\n" in output)
506                 # the exact share_hash_chain nodes depends upon the sharenum,
507                 # and is more of a hassle to compute than I want to deal with
508                 # now
509                 self.failUnless("  share_hash_chain: " in output)
510                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
511                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
512                             base32.b2a(storage_index))
513                 self.failUnless(expected in output)
514             except unittest.FailTest:
515                 print
516                 print "dump-share output was:"
517                 print output
518                 raise
519         d.addCallback(_test_debug)
520
521         # test retrieval
522
523         # first, let's see if we can use the existing node to retrieve the
524         # contents. This allows it to use the cached pubkey and maybe the
525         # latest-known sharemap.
526
527         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
528         def _check_download_1(res):
529             self.failUnlessEqual(res, DATA)
530             # now we see if we can retrieve the data from a new node,
531             # constructed using the URI of the original one. We do this test
532             # on the same client that uploaded the data.
533             uri = self._mutable_node_1.get_uri()
534             log.msg("starting retrieve1")
535             newnode = self.clients[0].create_node_from_uri(uri)
536             newnode_2 = self.clients[0].create_node_from_uri(uri)
537             self.failUnlessIdentical(newnode, newnode_2)
538             return newnode.download_best_version()
539         d.addCallback(_check_download_1)
540
541         def _check_download_2(res):
542             self.failUnlessEqual(res, DATA)
543             # same thing, but with a different client
544             uri = self._mutable_node_1.get_uri()
545             newnode = self.clients[1].create_node_from_uri(uri)
546             log.msg("starting retrieve2")
547             d1 = newnode.download_best_version()
548             d1.addCallback(lambda res: (res, newnode))
549             return d1
550         d.addCallback(_check_download_2)
551
552         def _check_download_3((res, newnode)):
553             self.failUnlessEqual(res, DATA)
554             # replace the data
555             log.msg("starting replace1")
556             d1 = newnode.overwrite(NEWDATA)
557             d1.addCallback(lambda res: newnode.download_best_version())
558             return d1
559         d.addCallback(_check_download_3)
560
561         def _check_download_4(res):
562             self.failUnlessEqual(res, NEWDATA)
563             # now create an even newer node and replace the data on it. This
564             # new node has never been used for download before.
565             uri = self._mutable_node_1.get_uri()
566             newnode1 = self.clients[2].create_node_from_uri(uri)
567             newnode2 = self.clients[3].create_node_from_uri(uri)
568             self._newnode3 = self.clients[3].create_node_from_uri(uri)
569             log.msg("starting replace2")
570             d1 = newnode1.overwrite(NEWERDATA)
571             d1.addCallback(lambda res: newnode2.download_best_version())
572             return d1
573         d.addCallback(_check_download_4)
574
575         def _check_download_5(res):
576             log.msg("finished replace2")
577             self.failUnlessEqual(res, NEWERDATA)
578         d.addCallback(_check_download_5)
579
580         def _corrupt_shares(res):
581             # run around and flip bits in all but k of the shares, to test
582             # the hash checks
583             shares = self._find_all_shares(self.basedir)
584             ## sort by share number
585             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
586             where = dict([ (shnum, filename)
587                            for (client_num, storage_index, filename, shnum)
588                            in shares ])
589             assert len(where) == 10 # this test is designed for 3-of-10
590             for shnum, filename in where.items():
591                 # shares 7,8,9 are left alone. read will check
592                 # (share_hash_chain, block_hash_tree, share_data). New
593                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
594                 # segsize, signature).
595                 if shnum == 0:
596                     # read: this will trigger "pubkey doesn't match
597                     # fingerprint".
598                     self._corrupt_mutable_share(filename, "pubkey")
599                     self._corrupt_mutable_share(filename, "encprivkey")
600                 elif shnum == 1:
601                     # triggers "signature is invalid"
602                     self._corrupt_mutable_share(filename, "seqnum")
603                 elif shnum == 2:
604                     # triggers "signature is invalid"
605                     self._corrupt_mutable_share(filename, "R")
606                 elif shnum == 3:
607                     # triggers "signature is invalid"
608                     self._corrupt_mutable_share(filename, "segsize")
609                 elif shnum == 4:
610                     self._corrupt_mutable_share(filename, "share_hash_chain")
611                 elif shnum == 5:
612                     self._corrupt_mutable_share(filename, "block_hash_tree")
613                 elif shnum == 6:
614                     self._corrupt_mutable_share(filename, "share_data")
615                 # other things to correct: IV, signature
616                 # 7,8,9 are left alone
617
618                 # note that initial_query_count=5 means that we'll hit the
619                 # first 5 servers in effectively random order (based upon
620                 # response time), so we won't necessarily ever get a "pubkey
621                 # doesn't match fingerprint" error (if we hit shnum>=1 before
622                 # shnum=0, we pull the pubkey from there). To get repeatable
623                 # specific failures, we need to set initial_query_count=1,
624                 # but of course that will change the sequencing behavior of
625                 # the retrieval process. TODO: find a reasonable way to make
626                 # this a parameter, probably when we expand this test to test
627                 # for one failure mode at a time.
628
629                 # when we retrieve this, we should get three signature
630                 # failures (where we've mangled seqnum, R, and segsize). The
631                 # pubkey mangling
632         d.addCallback(_corrupt_shares)
633
634         d.addCallback(lambda res: self._newnode3.download_best_version())
635         d.addCallback(_check_download_5)
636
637         def _check_empty_file(res):
638             # make sure we can create empty files, this usually screws up the
639             # segsize math
640             d1 = self.clients[2].create_mutable_file("")
641             d1.addCallback(lambda newnode: newnode.download_best_version())
642             d1.addCallback(lambda res: self.failUnlessEqual("", res))
643             return d1
644         d.addCallback(_check_empty_file)
645
646         d.addCallback(lambda res: self.clients[0].create_dirnode())
647         def _created_dirnode(dnode):
648             log.msg("_created_dirnode(%s)" % (dnode,))
649             d1 = dnode.list()
650             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
651             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
652             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
653             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
654             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
655             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
656             d1.addCallback(lambda res: dnode.build_manifest().when_done())
657             d1.addCallback(lambda res:
658                            self.failUnlessEqual(len(res["manifest"]), 1))
659             return d1
660         d.addCallback(_created_dirnode)
661
662         def wait_for_c3_kg_conn():
663             return self.clients[3]._key_generator is not None
664         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
665
666         def check_kg_poolsize(junk, size_delta):
667             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
668                                  self.key_generator_svc.key_generator.pool_size + size_delta)
669
670         d.addCallback(check_kg_poolsize, 0)
671         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
672         d.addCallback(check_kg_poolsize, -1)
673         d.addCallback(lambda junk: self.clients[3].create_dirnode())
674         d.addCallback(check_kg_poolsize, -2)
675         # use_helper induces use of clients[3], which is the using-key_gen client
676         d.addCallback(lambda junk:
677                       self.POST("uri?t=mkdir&name=george", use_helper=True))
678         d.addCallback(check_kg_poolsize, -3)
679
680         return d
681
682     def flip_bit(self, good):
683         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
684
685     def mangle_uri(self, gooduri):
686         # change the key, which changes the storage index, which means we'll
687         # be asking about the wrong file, so nobody will have any shares
688         u = uri.from_string(gooduri)
689         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
690                             uri_extension_hash=u.uri_extension_hash,
691                             needed_shares=u.needed_shares,
692                             total_shares=u.total_shares,
693                             size=u.size)
694         return u2.to_string()
695
696     # TODO: add a test which mangles the uri_extension_hash instead, and
697     # should fail due to not being able to get a valid uri_extension block.
698     # Also a test which sneakily mangles the uri_extension block to change
699     # some of the validation data, so it will fail in the post-download phase
700     # when the file's crypttext integrity check fails. Do the same thing for
701     # the key, which should cause the download to fail the post-download
702     # plaintext_hash check.
703
704     def test_filesystem(self):
705         self.basedir = "system/SystemTest/test_filesystem"
706         self.data = LARGE_DATA
707         d = self.set_up_nodes(use_stats_gatherer=True)
708         def _new_happy_semantics(ign):
709             for c in self.clients:
710                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
711         d.addCallback(_new_happy_semantics)
712         d.addCallback(self._test_introweb)
713         d.addCallback(self.log, "starting publish")
714         d.addCallback(self._do_publish1)
715         d.addCallback(self._test_runner)
716         d.addCallback(self._do_publish2)
717         # at this point, we have the following filesystem (where "R" denotes
718         # self._root_directory_uri):
719         # R
720         # R/subdir1
721         # R/subdir1/mydata567
722         # R/subdir1/subdir2/
723         # R/subdir1/subdir2/mydata992
724
725         d.addCallback(lambda res: self.bounce_client(0))
726         d.addCallback(self.log, "bounced client0")
727
728         d.addCallback(self._check_publish1)
729         d.addCallback(self.log, "did _check_publish1")
730         d.addCallback(self._check_publish2)
731         d.addCallback(self.log, "did _check_publish2")
732         d.addCallback(self._do_publish_private)
733         d.addCallback(self.log, "did _do_publish_private")
734         # now we also have (where "P" denotes a new dir):
735         #  P/personal/sekrit data
736         #  P/s2-rw -> /subdir1/subdir2/
737         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
738         d.addCallback(self._check_publish_private)
739         d.addCallback(self.log, "did _check_publish_private")
740         d.addCallback(self._test_web)
741         d.addCallback(self._test_control)
742         d.addCallback(self._test_cli)
743         # P now has four top-level children:
744         # P/personal/sekrit data
745         # P/s2-ro/
746         # P/s2-rw/
747         # P/test_put/  (empty)
748         d.addCallback(self._test_checker)
749         return d
750
751     def _test_introweb(self, res):
752         d = getPage(self.introweb_url, method="GET", followRedirect=True)
753         def _check(res):
754             try:
755                 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__)
756                                 in res)
757                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
758                 self.failUnless("Subscription Summary: storage: 5" in res)
759             except unittest.FailTest:
760                 print
761                 print "GET %s output was:" % self.introweb_url
762                 print res
763                 raise
764         d.addCallback(_check)
765         d.addCallback(lambda res:
766                       getPage(self.introweb_url + "?t=json",
767                               method="GET", followRedirect=True))
768         def _check_json(res):
769             data = simplejson.loads(res)
770             try:
771                 self.failUnlessEqual(data["subscription_summary"],
772                                      {"storage": 5})
773                 self.failUnlessEqual(data["announcement_summary"],
774                                      {"storage": 5, "stub_client": 5})
775                 self.failUnlessEqual(data["announcement_distinct_hosts"],
776                                      {"storage": 1, "stub_client": 1})
777             except unittest.FailTest:
778                 print
779                 print "GET %s?t=json output was:" % self.introweb_url
780                 print res
781                 raise
782         d.addCallback(_check_json)
783         return d
784
785     def _do_publish1(self, res):
786         ut = upload.Data(self.data, convergence=None)
787         c0 = self.clients[0]
788         d = c0.create_dirnode()
789         def _made_root(new_dirnode):
790             self._root_directory_uri = new_dirnode.get_uri()
791             return c0.create_node_from_uri(self._root_directory_uri)
792         d.addCallback(_made_root)
793         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
794         def _made_subdir1(subdir1_node):
795             self._subdir1_node = subdir1_node
796             d1 = subdir1_node.add_file(u"mydata567", ut)
797             d1.addCallback(self.log, "publish finished")
798             def _stash_uri(filenode):
799                 self.uri = filenode.get_uri()
800                 assert isinstance(self.uri, str), (self.uri, filenode)
801             d1.addCallback(_stash_uri)
802             return d1
803         d.addCallback(_made_subdir1)
804         return d
805
806     def _do_publish2(self, res):
807         ut = upload.Data(self.data, convergence=None)
808         d = self._subdir1_node.create_subdirectory(u"subdir2")
809         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
810         return d
811
812     def log(self, res, *args, **kwargs):
813         # print "MSG: %s  RES: %s" % (msg, args)
814         log.msg(*args, **kwargs)
815         return res
816
817     def _do_publish_private(self, res):
818         self.smalldata = "sssh, very secret stuff"
819         ut = upload.Data(self.smalldata, convergence=None)
820         d = self.clients[0].create_dirnode()
821         d.addCallback(self.log, "GOT private directory")
822         def _got_new_dir(privnode):
823             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
824             d1 = privnode.create_subdirectory(u"personal")
825             d1.addCallback(self.log, "made P/personal")
826             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
827             d1.addCallback(self.log, "made P/personal/sekrit data")
828             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
829             def _got_s2(s2node):
830                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
831                                       s2node.get_readonly_uri())
832                 d2.addCallback(lambda node:
833                                privnode.set_uri(u"s2-ro",
834                                                 s2node.get_readonly_uri(),
835                                                 s2node.get_readonly_uri()))
836                 return d2
837             d1.addCallback(_got_s2)
838             d1.addCallback(lambda res: privnode)
839             return d1
840         d.addCallback(_got_new_dir)
841         return d
842
843     def _check_publish1(self, res):
844         # this one uses the iterative API
845         c1 = self.clients[1]
846         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
847         d.addCallback(self.log, "check_publish1 got /")
848         d.addCallback(lambda root: root.get(u"subdir1"))
849         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
850         d.addCallback(lambda filenode: download_to_data(filenode))
851         d.addCallback(self.log, "get finished")
852         def _get_done(data):
853             self.failUnlessEqual(data, self.data)
854         d.addCallback(_get_done)
855         return d
856
857     def _check_publish2(self, res):
858         # this one uses the path-based API
859         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
860         d = rootnode.get_child_at_path(u"subdir1")
861         d.addCallback(lambda dirnode:
862                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
863         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
864         d.addCallback(lambda filenode: download_to_data(filenode))
865         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
866
867         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
868         def _got_filenode(filenode):
869             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
870             assert fnode == filenode
871         d.addCallback(_got_filenode)
872         return d
873
874     def _check_publish_private(self, resnode):
875         # this one uses the path-based API
876         self._private_node = resnode
877
878         d = self._private_node.get_child_at_path(u"personal")
879         def _got_personal(personal):
880             self._personal_node = personal
881             return personal
882         d.addCallback(_got_personal)
883
884         d.addCallback(lambda dirnode:
885                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
886         def get_path(path):
887             return self._private_node.get_child_at_path(path)
888
889         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
890         d.addCallback(lambda filenode: download_to_data(filenode))
891         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
892         d.addCallback(lambda res: get_path(u"s2-rw"))
893         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
894         d.addCallback(lambda res: get_path(u"s2-ro"))
895         def _got_s2ro(dirnode):
896             self.failUnless(dirnode.is_mutable(), dirnode)
897             self.failUnless(dirnode.is_readonly(), dirnode)
898             d1 = defer.succeed(None)
899             d1.addCallback(lambda res: dirnode.list())
900             d1.addCallback(self.log, "dirnode.list")
901
902             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
903
904             d1.addCallback(self.log, "doing add_file(ro)")
905             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
906             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
907
908             d1.addCallback(self.log, "doing get(ro)")
909             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
910             d1.addCallback(lambda filenode:
911                            self.failUnless(IFileNode.providedBy(filenode)))
912
913             d1.addCallback(self.log, "doing delete(ro)")
914             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
915
916             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
917
918             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
919
920             personal = self._personal_node
921             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
922
923             d1.addCallback(self.log, "doing move_child_to(ro)2")
924             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
925
926             d1.addCallback(self.log, "finished with _got_s2ro")
927             return d1
928         d.addCallback(_got_s2ro)
929         def _got_home(dummy):
930             home = self._private_node
931             personal = self._personal_node
932             d1 = defer.succeed(None)
933             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
934             d1.addCallback(lambda res:
935                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
936
937             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
938             d1.addCallback(lambda res:
939                            home.move_child_to(u"sekrit", home, u"sekrit data"))
940
941             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
942             d1.addCallback(lambda res:
943                            home.move_child_to(u"sekrit data", personal))
944
945             d1.addCallback(lambda res: home.build_manifest().when_done())
946             d1.addCallback(self.log, "manifest")
947             #  five items:
948             # P/
949             # P/personal/
950             # P/personal/sekrit data
951             # P/s2-rw  (same as P/s2-ro)
952             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
953             d1.addCallback(lambda res:
954                            self.failUnlessEqual(len(res["manifest"]), 5))
955             d1.addCallback(lambda res: home.start_deep_stats().when_done())
956             def _check_stats(stats):
957                 expected = {"count-immutable-files": 1,
958                             "count-mutable-files": 0,
959                             "count-literal-files": 1,
960                             "count-files": 2,
961                             "count-directories": 3,
962                             "size-immutable-files": 112,
963                             "size-literal-files": 23,
964                             #"size-directories": 616, # varies
965                             #"largest-directory": 616,
966                             "largest-directory-children": 3,
967                             "largest-immutable-file": 112,
968                             }
969                 for k,v in expected.iteritems():
970                     self.failUnlessEqual(stats[k], v,
971                                          "stats[%s] was %s, not %s" %
972                                          (k, stats[k], v))
973                 self.failUnless(stats["size-directories"] > 1300,
974                                 stats["size-directories"])
975                 self.failUnless(stats["largest-directory"] > 800,
976                                 stats["largest-directory"])
977                 self.failUnlessEqual(stats["size-files-histogram"],
978                                      [ (11, 31, 1), (101, 316, 1) ])
979             d1.addCallback(_check_stats)
980             return d1
981         d.addCallback(_got_home)
982         return d
983
984     def shouldFail(self, res, expected_failure, which, substring=None):
985         if isinstance(res, Failure):
986             res.trap(expected_failure)
987             if substring:
988                 self.failUnless(substring in str(res),
989                                 "substring '%s' not in '%s'"
990                                 % (substring, str(res)))
991         else:
992             self.fail("%s was supposed to raise %s, not get '%s'" %
993                       (which, expected_failure, res))
994
995     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
996         assert substring is None or isinstance(substring, str)
997         d = defer.maybeDeferred(callable, *args, **kwargs)
998         def done(res):
999             if isinstance(res, Failure):
1000                 res.trap(expected_failure)
1001                 if substring:
1002                     self.failUnless(substring in str(res),
1003                                     "substring '%s' not in '%s'"
1004                                     % (substring, str(res)))
1005             else:
1006                 self.fail("%s was supposed to raise %s, not get '%s'" %
1007                           (which, expected_failure, res))
1008         d.addBoth(done)
1009         return d
1010
1011     def PUT(self, urlpath, data):
1012         url = self.webish_url + urlpath
1013         return getPage(url, method="PUT", postdata=data)
1014
1015     def GET(self, urlpath, followRedirect=False):
1016         url = self.webish_url + urlpath
1017         return getPage(url, method="GET", followRedirect=followRedirect)
1018
1019     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1020         sepbase = "boogabooga"
1021         sep = "--" + sepbase
1022         form = []
1023         form.append(sep)
1024         form.append('Content-Disposition: form-data; name="_charset"')
1025         form.append('')
1026         form.append('UTF-8')
1027         form.append(sep)
1028         for name, value in fields.iteritems():
1029             if isinstance(value, tuple):
1030                 filename, value = value
1031                 form.append('Content-Disposition: form-data; name="%s"; '
1032                             'filename="%s"' % (name, filename.encode("utf-8")))
1033             else:
1034                 form.append('Content-Disposition: form-data; name="%s"' % name)
1035             form.append('')
1036             form.append(str(value))
1037             form.append(sep)
1038         form[-1] += "--"
1039         body = ""
1040         headers = {}
1041         if fields:
1042             body = "\r\n".join(form) + "\r\n"
1043             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1044         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1045
1046     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1047               use_helper=False):
1048         if use_helper:
1049             url = self.helper_webish_url + urlpath
1050         else:
1051             url = self.webish_url + urlpath
1052         return getPage(url, method="POST", postdata=body, headers=headers,
1053                        followRedirect=followRedirect)
1054
1055     def _test_web(self, res):
1056         base = self.webish_url
1057         public = "uri/" + self._root_directory_uri
1058         d = getPage(base)
1059         def _got_welcome(page):
1060             # XXX This test is oversensitive to formatting
1061             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1062             self.failUnless(expected in page,
1063                             "I didn't see the right 'connected storage servers'"
1064                             " message in: %s" % page
1065                             )
1066             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1067             self.failUnless(expected in page,
1068                             "I didn't see the right 'My nodeid' message "
1069                             "in: %s" % page)
1070             self.failUnless("Helper: 0 active uploads" in page)
1071         d.addCallback(_got_welcome)
1072         d.addCallback(self.log, "done with _got_welcome")
1073
1074         # get the welcome page from the node that uses the helper too
1075         d.addCallback(lambda res: getPage(self.helper_webish_url))
1076         def _got_welcome_helper(page):
1077             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1078                             page)
1079             self.failUnless("Not running helper" in page)
1080         d.addCallback(_got_welcome_helper)
1081
1082         d.addCallback(lambda res: getPage(base + public))
1083         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1084         def _got_subdir1(page):
1085             # there ought to be an href for our file
1086             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1087             self.failUnless(">mydata567</a>" in page)
1088         d.addCallback(_got_subdir1)
1089         d.addCallback(self.log, "done with _got_subdir1")
1090         d.addCallback(lambda res:
1091                       getPage(base + public + "/subdir1/mydata567"))
1092         def _got_data(page):
1093             self.failUnlessEqual(page, self.data)
1094         d.addCallback(_got_data)
1095
1096         # download from a URI embedded in a URL
1097         d.addCallback(self.log, "_get_from_uri")
1098         def _get_from_uri(res):
1099             return getPage(base + "uri/%s?filename=%s"
1100                            % (self.uri, "mydata567"))
1101         d.addCallback(_get_from_uri)
1102         def _got_from_uri(page):
1103             self.failUnlessEqual(page, self.data)
1104         d.addCallback(_got_from_uri)
1105
1106         # download from a URI embedded in a URL, second form
1107         d.addCallback(self.log, "_get_from_uri2")
1108         def _get_from_uri2(res):
1109             return getPage(base + "uri?uri=%s" % (self.uri,))
1110         d.addCallback(_get_from_uri2)
1111         d.addCallback(_got_from_uri)
1112
1113         # download from a bogus URI, make sure we get a reasonable error
1114         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1115         def _get_from_bogus_uri(res):
1116             d1 = getPage(base + "uri/%s?filename=%s"
1117                          % (self.mangle_uri(self.uri), "mydata567"))
1118             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1119                        "410")
1120             return d1
1121         d.addCallback(_get_from_bogus_uri)
1122         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1123
1124         # upload a file with PUT
1125         d.addCallback(self.log, "about to try PUT")
1126         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1127                                            "new.txt contents"))
1128         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1129         d.addCallback(self.failUnlessEqual, "new.txt contents")
1130         # and again with something large enough to use multiple segments,
1131         # and hopefully trigger pauseProducing too
1132         def _new_happy_semantics(ign):
1133             for c in self.clients:
1134                 # these get reset somewhere? Whatever.
1135                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1136         d.addCallback(_new_happy_semantics)
1137         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1138                                            "big" * 500000)) # 1.5MB
1139         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1140         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1141
1142         # can we replace files in place?
1143         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1144                                            "NEWER contents"))
1145         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1146         d.addCallback(self.failUnlessEqual, "NEWER contents")
1147
1148         # test unlinked POST
1149         d.addCallback(lambda res: self.POST("uri", t="upload",
1150                                             file=("new.txt", "data" * 10000)))
1151         # and again using the helper, which exercises different upload-status
1152         # display code
1153         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1154                                             file=("foo.txt", "data2" * 10000)))
1155
1156         # check that the status page exists
1157         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1158         def _got_status(res):
1159             # find an interesting upload and download to look at. LIT files
1160             # are not interesting.
1161             h = self.clients[0].get_history()
1162             for ds in h.list_all_download_statuses():
1163                 if ds.get_size() > 200:
1164                     self._down_status = ds.get_counter()
1165             for us in h.list_all_upload_statuses():
1166                 if us.get_size() > 200:
1167                     self._up_status = us.get_counter()
1168             rs = list(h.list_all_retrieve_statuses())[0]
1169             self._retrieve_status = rs.get_counter()
1170             ps = list(h.list_all_publish_statuses())[0]
1171             self._publish_status = ps.get_counter()
1172             us = list(h.list_all_mapupdate_statuses())[0]
1173             self._update_status = us.get_counter()
1174
1175             # and that there are some upload- and download- status pages
1176             return self.GET("status/up-%d" % self._up_status)
1177         d.addCallback(_got_status)
1178         def _got_up(res):
1179             return self.GET("status/down-%d" % self._down_status)
1180         d.addCallback(_got_up)
1181         def _got_down(res):
1182             return self.GET("status/mapupdate-%d" % self._update_status)
1183         d.addCallback(_got_down)
1184         def _got_update(res):
1185             return self.GET("status/publish-%d" % self._publish_status)
1186         d.addCallback(_got_update)
1187         def _got_publish(res):
1188             return self.GET("status/retrieve-%d" % self._retrieve_status)
1189         d.addCallback(_got_publish)
1190
1191         # check that the helper status page exists
1192         d.addCallback(lambda res:
1193                       self.GET("helper_status", followRedirect=True))
1194         def _got_helper_status(res):
1195             self.failUnless("Bytes Fetched:" in res)
1196             # touch a couple of files in the helper's working directory to
1197             # exercise more code paths
1198             workdir = os.path.join(self.getdir("client0"), "helper")
1199             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1200             f = open(incfile, "wb")
1201             f.write("small file")
1202             f.close()
1203             then = time.time() - 86400*3
1204             now = time.time()
1205             os.utime(incfile, (now, then))
1206             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1207             f = open(encfile, "wb")
1208             f.write("less small file")
1209             f.close()
1210             os.utime(encfile, (now, then))
1211         d.addCallback(_got_helper_status)
1212         # and that the json form exists
1213         d.addCallback(lambda res:
1214                       self.GET("helper_status?t=json", followRedirect=True))
1215         def _got_helper_status_json(res):
1216             data = simplejson.loads(res)
1217             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1218                                  1)
1219             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1220             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1221             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1222                                  10)
1223             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1224             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1225             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1226                                  15)
1227         d.addCallback(_got_helper_status_json)
1228
1229         # and check that client[3] (which uses a helper but does not run one
1230         # itself) doesn't explode when you ask for its status
1231         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1232         def _got_non_helper_status(res):
1233             self.failUnless("Upload and Download Status" in res)
1234         d.addCallback(_got_non_helper_status)
1235
1236         # or for helper status with t=json
1237         d.addCallback(lambda res:
1238                       getPage(self.helper_webish_url + "helper_status?t=json"))
1239         def _got_non_helper_status_json(res):
1240             data = simplejson.loads(res)
1241             self.failUnlessEqual(data, {})
1242         d.addCallback(_got_non_helper_status_json)
1243
1244         # see if the statistics page exists
1245         d.addCallback(lambda res: self.GET("statistics"))
1246         def _got_stats(res):
1247             self.failUnless("Node Statistics" in res)
1248             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1249         d.addCallback(_got_stats)
1250         d.addCallback(lambda res: self.GET("statistics?t=json"))
1251         def _got_stats_json(res):
1252             data = simplejson.loads(res)
1253             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1254             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1255         d.addCallback(_got_stats_json)
1256
1257         # TODO: mangle the second segment of a file, to test errors that
1258         # occur after we've already sent some good data, which uses a
1259         # different error path.
1260
1261         # TODO: download a URI with a form
1262         # TODO: create a directory by using a form
1263         # TODO: upload by using a form on the directory page
1264         #    url = base + "somedir/subdir1/freeform_post!!upload"
1265         # TODO: delete a file by using a button on the directory page
1266
1267         return d
1268
1269     def _test_runner(self, res):
1270         # exercise some of the diagnostic tools in runner.py
1271
1272         # find a share
1273         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1274             if "storage" not in dirpath:
1275                 continue
1276             if not filenames:
1277                 continue
1278             pieces = dirpath.split(os.sep)
1279             if (len(pieces) >= 4
1280                 and pieces[-4] == "storage"
1281                 and pieces[-3] == "shares"):
1282                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1283                 # are sharefiles here
1284                 filename = os.path.join(dirpath, filenames[0])
1285                 # peek at the magic to see if it is a chk share
1286                 magic = open(filename, "rb").read(4)
1287                 if magic == '\x00\x00\x00\x01':
1288                     break
1289         else:
1290             self.fail("unable to find any uri_extension files in %r"
1291                       % self.basedir)
1292         log.msg("test_system.SystemTest._test_runner using %r" % filename)
1293
1294         out,err = StringIO(), StringIO()
1295         rc = runner.runner(["debug", "dump-share", "--offsets",
1296                             filename],
1297                            stdout=out, stderr=err)
1298         output = out.getvalue()
1299         self.failUnlessEqual(rc, 0)
1300
1301         # we only upload a single file, so we can assert some things about
1302         # its size and shares.
1303         self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1304         self.failUnlessIn("size: %d\n" % len(self.data), output)
1305         self.failUnlessIn("num_segments: 1\n", output)
1306         # segment_size is always a multiple of needed_shares
1307         self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1308         self.failUnlessIn("total_shares: 10\n", output)
1309         # keys which are supposed to be present
1310         for key in ("size", "num_segments", "segment_size",
1311                     "needed_shares", "total_shares",
1312                     "codec_name", "codec_params", "tail_codec_params",
1313                     #"plaintext_hash", "plaintext_root_hash",
1314                     "crypttext_hash", "crypttext_root_hash",
1315                     "share_root_hash", "UEB_hash"):
1316             self.failUnlessIn("%s: " % key, output)
1317         self.failUnlessIn("  verify-cap: URI:CHK-Verifier:", output)
1318
1319         # now use its storage index to find the other shares using the
1320         # 'find-shares' tool
1321         sharedir, shnum = os.path.split(filename)
1322         storagedir, storage_index_s = os.path.split(sharedir)
1323         out,err = StringIO(), StringIO()
1324         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1325         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1326         rc = runner.runner(cmd, stdout=out, stderr=err)
1327         self.failUnlessEqual(rc, 0)
1328         out.seek(0)
1329         sharefiles = [sfn.strip() for sfn in out.readlines()]
1330         self.failUnlessEqual(len(sharefiles), 10)
1331
1332         # also exercise the 'catalog-shares' tool
1333         out,err = StringIO(), StringIO()
1334         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1335         cmd = ["debug", "catalog-shares"] + nodedirs
1336         rc = runner.runner(cmd, stdout=out, stderr=err)
1337         self.failUnlessEqual(rc, 0)
1338         out.seek(0)
1339         descriptions = [sfn.strip() for sfn in out.readlines()]
1340         self.failUnlessEqual(len(descriptions), 30)
1341         matching = [line
1342                     for line in descriptions
1343                     if line.startswith("CHK %s " % storage_index_s)]
1344         self.failUnlessEqual(len(matching), 10)
1345
1346     def _test_control(self, res):
1347         # exercise the remote-control-the-client foolscap interfaces in
1348         # allmydata.control (mostly used for performance tests)
1349         c0 = self.clients[0]
1350         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1351         control_furl = open(control_furl_file, "r").read().strip()
1352         # it doesn't really matter which Tub we use to connect to the client,
1353         # so let's just use our IntroducerNode's
1354         d = self.introducer.tub.getReference(control_furl)
1355         d.addCallback(self._test_control2, control_furl_file)
1356         return d
1357     def _test_control2(self, rref, filename):
1358         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1359         downfile = os.path.join(self.basedir, "control.downfile")
1360         d.addCallback(lambda uri:
1361                       rref.callRemote("download_from_uri_to_file",
1362                                       uri, downfile))
1363         def _check(res):
1364             self.failUnlessEqual(res, downfile)
1365             data = open(downfile, "r").read()
1366             expected_data = open(filename, "r").read()
1367             self.failUnlessEqual(data, expected_data)
1368         d.addCallback(_check)
1369         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1370         if sys.platform == "linux2":
1371             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1372         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1373         return d
1374
1375     def _test_cli(self, res):
1376         # run various CLI commands (in a thread, since they use blocking
1377         # network calls)
1378
1379         private_uri = self._private_node.get_uri()
1380         client0_basedir = self.getdir("client0")
1381
1382         nodeargs = [
1383             "--node-directory", client0_basedir,
1384             ]
1385
1386         d = defer.succeed(None)
1387
1388         # for compatibility with earlier versions, private/root_dir.cap is
1389         # supposed to be treated as an alias named "tahoe:". Start by making
1390         # sure that works, before we add other aliases.
1391
1392         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1393         f = open(root_file, "w")
1394         f.write(private_uri)
1395         f.close()
1396
1397         def run(ignored, verb, *args, **kwargs):
1398             stdin = kwargs.get("stdin", "")
1399             newargs = [verb] + nodeargs + list(args)
1400             return self._run_cli(newargs, stdin=stdin)
1401
1402         def _check_ls((out,err), expected_children, unexpected_children=[]):
1403             self.failUnlessEqual(err, "")
1404             for s in expected_children:
1405                 self.failUnless(s in out, (s,out))
1406             for s in unexpected_children:
1407                 self.failIf(s in out, (s,out))
1408
1409         def _check_ls_root((out,err)):
1410             self.failUnless("personal" in out)
1411             self.failUnless("s2-ro" in out)
1412             self.failUnless("s2-rw" in out)
1413             self.failUnlessEqual(err, "")
1414
1415         # this should reference private_uri
1416         d.addCallback(run, "ls")
1417         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1418
1419         d.addCallback(run, "list-aliases")
1420         def _check_aliases_1((out,err)):
1421             self.failUnlessEqual(err, "")
1422             self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1423         d.addCallback(_check_aliases_1)
1424
1425         # now that that's out of the way, remove root_dir.cap and work with
1426         # new files
1427         d.addCallback(lambda res: os.unlink(root_file))
1428         d.addCallback(run, "list-aliases")
1429         def _check_aliases_2((out,err)):
1430             self.failUnlessEqual(err, "")
1431             self.failUnlessEqual(out, "")
1432         d.addCallback(_check_aliases_2)
1433
1434         d.addCallback(run, "mkdir")
1435         def _got_dir( (out,err) ):
1436             self.failUnless(uri.from_string_dirnode(out.strip()))
1437             return out.strip()
1438         d.addCallback(_got_dir)
1439         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1440
1441         d.addCallback(run, "list-aliases")
1442         def _check_aliases_3((out,err)):
1443             self.failUnlessEqual(err, "")
1444             self.failUnless("tahoe: " in out)
1445         d.addCallback(_check_aliases_3)
1446
1447         def _check_empty_dir((out,err)):
1448             self.failUnlessEqual(out, "")
1449             self.failUnlessEqual(err, "")
1450         d.addCallback(run, "ls")
1451         d.addCallback(_check_empty_dir)
1452
1453         def _check_missing_dir((out,err)):
1454             # TODO: check that rc==2
1455             self.failUnlessEqual(out, "")
1456             self.failUnlessEqual(err, "No such file or directory\n")
1457         d.addCallback(run, "ls", "bogus")
1458         d.addCallback(_check_missing_dir)
1459
1460         files = []
1461         datas = []
1462         for i in range(10):
1463             fn = os.path.join(self.basedir, "file%d" % i)
1464             files.append(fn)
1465             data = "data to be uploaded: file%d\n" % i
1466             datas.append(data)
1467             open(fn,"wb").write(data)
1468
1469         def _check_stdout_against((out,err), filenum=None, data=None):
1470             self.failUnlessEqual(err, "")
1471             if filenum is not None:
1472                 self.failUnlessEqual(out, datas[filenum])
1473             if data is not None:
1474                 self.failUnlessEqual(out, data)
1475
1476         # test all both forms of put: from a file, and from stdin
1477         #  tahoe put bar FOO
1478         d.addCallback(run, "put", files[0], "tahoe-file0")
1479         def _put_out((out,err)):
1480             self.failUnless("URI:LIT:" in out, out)
1481             self.failUnless("201 Created" in err, err)
1482             uri0 = out.strip()
1483             return run(None, "get", uri0)
1484         d.addCallback(_put_out)
1485         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1486
1487         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1488         #  tahoe put bar tahoe:FOO
1489         d.addCallback(run, "put", files[2], "tahoe:file2")
1490         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1491         def _check_put_mutable((out,err)):
1492             self._mutable_file3_uri = out.strip()
1493         d.addCallback(_check_put_mutable)
1494         d.addCallback(run, "get", "tahoe:file3")
1495         d.addCallback(_check_stdout_against, 3)
1496
1497         #  tahoe put FOO
1498         STDIN_DATA = "This is the file to upload from stdin."
1499         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1500         #  tahoe put tahoe:FOO
1501         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1502                       stdin="Other file from stdin.")
1503
1504         d.addCallback(run, "ls")
1505         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1506                                   "tahoe-file-stdin", "from-stdin"])
1507         d.addCallback(run, "ls", "subdir")
1508         d.addCallback(_check_ls, ["tahoe-file1"])
1509
1510         # tahoe mkdir FOO
1511         d.addCallback(run, "mkdir", "subdir2")
1512         d.addCallback(run, "ls")
1513         # TODO: extract the URI, set an alias with it
1514         d.addCallback(_check_ls, ["subdir2"])
1515
1516         # tahoe get: (to stdin and to a file)
1517         d.addCallback(run, "get", "tahoe-file0")
1518         d.addCallback(_check_stdout_against, 0)
1519         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1520         d.addCallback(_check_stdout_against, 1)
1521         outfile0 = os.path.join(self.basedir, "outfile0")
1522         d.addCallback(run, "get", "file2", outfile0)
1523         def _check_outfile0((out,err)):
1524             data = open(outfile0,"rb").read()
1525             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1526         d.addCallback(_check_outfile0)
1527         outfile1 = os.path.join(self.basedir, "outfile0")
1528         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1529         def _check_outfile1((out,err)):
1530             data = open(outfile1,"rb").read()
1531             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1532         d.addCallback(_check_outfile1)
1533
1534         d.addCallback(run, "rm", "tahoe-file0")
1535         d.addCallback(run, "rm", "tahoe:file2")
1536         d.addCallback(run, "ls")
1537         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1538
1539         d.addCallback(run, "ls", "-l")
1540         def _check_ls_l((out,err)):
1541             lines = out.split("\n")
1542             for l in lines:
1543                 if "tahoe-file-stdin" in l:
1544                     self.failUnless(l.startswith("-r-- "), l)
1545                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1546                 if "file3" in l:
1547                     self.failUnless(l.startswith("-rw- "), l) # mutable
1548         d.addCallback(_check_ls_l)
1549
1550         d.addCallback(run, "ls", "--uri")
1551         def _check_ls_uri((out,err)):
1552             lines = out.split("\n")
1553             for l in lines:
1554                 if "file3" in l:
1555                     self.failUnless(self._mutable_file3_uri in l)
1556         d.addCallback(_check_ls_uri)
1557
1558         d.addCallback(run, "ls", "--readonly-uri")
1559         def _check_ls_rouri((out,err)):
1560             lines = out.split("\n")
1561             for l in lines:
1562                 if "file3" in l:
1563                     rw_uri = self._mutable_file3_uri
1564                     u = uri.from_string_mutable_filenode(rw_uri)
1565                     ro_uri = u.get_readonly().to_string()
1566                     self.failUnless(ro_uri in l)
1567         d.addCallback(_check_ls_rouri)
1568
1569
1570         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1571         d.addCallback(run, "ls")
1572         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1573
1574         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1575         d.addCallback(run, "ls")
1576         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1577
1578         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1579         d.addCallback(run, "ls")
1580         d.addCallback(_check_ls, ["file3", "file3-copy"])
1581         d.addCallback(run, "get", "tahoe:file3-copy")
1582         d.addCallback(_check_stdout_against, 3)
1583
1584         # copy from disk into tahoe
1585         d.addCallback(run, "cp", files[4], "tahoe:file4")
1586         d.addCallback(run, "ls")
1587         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1588         d.addCallback(run, "get", "tahoe:file4")
1589         d.addCallback(_check_stdout_against, 4)
1590
1591         # copy from tahoe into disk
1592         target_filename = os.path.join(self.basedir, "file-out")
1593         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1594         def _check_cp_out((out,err)):
1595             self.failUnless(os.path.exists(target_filename))
1596             got = open(target_filename,"rb").read()
1597             self.failUnlessEqual(got, datas[4])
1598         d.addCallback(_check_cp_out)
1599
1600         # copy from disk to disk (silly case)
1601         target2_filename = os.path.join(self.basedir, "file-out-copy")
1602         d.addCallback(run, "cp", target_filename, target2_filename)
1603         def _check_cp_out2((out,err)):
1604             self.failUnless(os.path.exists(target2_filename))
1605             got = open(target2_filename,"rb").read()
1606             self.failUnlessEqual(got, datas[4])
1607         d.addCallback(_check_cp_out2)
1608
1609         # copy from tahoe into disk, overwriting an existing file
1610         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1611         def _check_cp_out3((out,err)):
1612             self.failUnless(os.path.exists(target_filename))
1613             got = open(target_filename,"rb").read()
1614             self.failUnlessEqual(got, datas[3])
1615         d.addCallback(_check_cp_out3)
1616
1617         # copy from disk into tahoe, overwriting an existing immutable file
1618         d.addCallback(run, "cp", files[5], "tahoe:file4")
1619         d.addCallback(run, "ls")
1620         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1621         d.addCallback(run, "get", "tahoe:file4")
1622         d.addCallback(_check_stdout_against, 5)
1623
1624         # copy from disk into tahoe, overwriting an existing mutable file
1625         d.addCallback(run, "cp", files[5], "tahoe:file3")
1626         d.addCallback(run, "ls")
1627         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1628         d.addCallback(run, "get", "tahoe:file3")
1629         d.addCallback(_check_stdout_against, 5)
1630
1631         # recursive copy: setup
1632         dn = os.path.join(self.basedir, "dir1")
1633         os.makedirs(dn)
1634         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1635         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1636         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1637         sdn2 = os.path.join(dn, "subdir2")
1638         os.makedirs(sdn2)
1639         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1640         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1641
1642         # from disk into tahoe
1643         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1644         d.addCallback(run, "ls")
1645         d.addCallback(_check_ls, ["dir1"])
1646         d.addCallback(run, "ls", "dir1")
1647         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1648                       ["rfile4", "rfile5"])
1649         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1650         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1651                       ["rfile1", "rfile2", "rfile3"])
1652         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1653         d.addCallback(_check_stdout_against, data="rfile4")
1654
1655         # and back out again
1656         dn_copy = os.path.join(self.basedir, "dir1-copy")
1657         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1658         def _check_cp_r_out((out,err)):
1659             def _cmp(name):
1660                 old = open(os.path.join(dn, name), "rb").read()
1661                 newfn = os.path.join(dn_copy, name)
1662                 self.failUnless(os.path.exists(newfn))
1663                 new = open(newfn, "rb").read()
1664                 self.failUnlessEqual(old, new)
1665             _cmp("rfile1")
1666             _cmp("rfile2")
1667             _cmp("rfile3")
1668             _cmp(os.path.join("subdir2", "rfile4"))
1669             _cmp(os.path.join("subdir2", "rfile5"))
1670         d.addCallback(_check_cp_r_out)
1671
1672         # and copy it a second time, which ought to overwrite the same files
1673         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1674
1675         # and again, only writing filecaps
1676         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1677         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1678         def _check_capsonly((out,err)):
1679             # these should all be LITs
1680             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1681             y = uri.from_string_filenode(x)
1682             self.failUnlessEqual(y.data, "rfile4")
1683         d.addCallback(_check_capsonly)
1684
1685         # and tahoe-to-tahoe
1686         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1687         d.addCallback(run, "ls")
1688         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1689         d.addCallback(run, "ls", "dir1-copy")
1690         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1691                       ["rfile4", "rfile5"])
1692         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1693         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1694                       ["rfile1", "rfile2", "rfile3"])
1695         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1696         d.addCallback(_check_stdout_against, data="rfile4")
1697
1698         # and copy it a second time, which ought to overwrite the same files
1699         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1700
1701         # tahoe_ls doesn't currently handle the error correctly: it tries to
1702         # JSON-parse a traceback.
1703 ##         def _ls_missing(res):
1704 ##             argv = ["ls"] + nodeargs + ["bogus"]
1705 ##             return self._run_cli(argv)
1706 ##         d.addCallback(_ls_missing)
1707 ##         def _check_ls_missing((out,err)):
1708 ##             print "OUT", out
1709 ##             print "ERR", err
1710 ##             self.failUnlessEqual(err, "")
1711 ##         d.addCallback(_check_ls_missing)
1712
1713         return d
1714
1715     def _run_cli(self, argv, stdin=""):
1716         #print "CLI:", argv
1717         stdout, stderr = StringIO(), StringIO()
1718         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1719                                   stdin=StringIO(stdin),
1720                                   stdout=stdout, stderr=stderr)
1721         def _done(res):
1722             return stdout.getvalue(), stderr.getvalue()
1723         d.addCallback(_done)
1724         return d
1725
1726     def _test_checker(self, res):
1727         ut = upload.Data("too big to be literal" * 200, convergence=None)
1728         d = self._personal_node.add_file(u"big file", ut)
1729
1730         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1731         def _check_dirnode_results(r):
1732             self.failUnless(r.is_healthy())
1733         d.addCallback(_check_dirnode_results)
1734         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1735         d.addCallback(_check_dirnode_results)
1736
1737         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1738         def _got_chk_filenode(n):
1739             self.failUnless(isinstance(n, ImmutableFileNode))
1740             d = n.check(Monitor())
1741             def _check_filenode_results(r):
1742                 self.failUnless(r.is_healthy())
1743             d.addCallback(_check_filenode_results)
1744             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1745             d.addCallback(_check_filenode_results)
1746             return d
1747         d.addCallback(_got_chk_filenode)
1748
1749         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1750         def _got_lit_filenode(n):
1751             self.failUnless(isinstance(n, LiteralFileNode))
1752             d = n.check(Monitor())
1753             def _check_lit_filenode_results(r):
1754                 self.failUnlessEqual(r, None)
1755             d.addCallback(_check_lit_filenode_results)
1756             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1757             d.addCallback(_check_lit_filenode_results)
1758             return d
1759         d.addCallback(_got_lit_filenode)
1760         return d
1761