]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
test: make tests stop relying on pyutil version class accepting the string 'unknown...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 import allmydata
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.literal import LiteralFileNode
13 from allmydata.immutable.filenode import ImmutableFileNode
14 from allmydata.util import idlib, mathutil
15 from allmydata.util import log, base32
16 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding
17 from allmydata.util.fileutil import abspath_expanduser_unicode
18 from allmydata.util.consumer import MemoryConsumer, download_to_data
19 from allmydata.scripts import runner
20 from allmydata.interfaces import IDirectoryNode, IFileNode, \
21      NoSuchChildError, NoSharesError
22 from allmydata.monitor import Monitor
23 from allmydata.mutable.common import NotWriteableError
24 from allmydata.mutable import layout as mutable_layout
25 from foolscap.api import DeadReferenceError
26 from twisted.python.failure import Failure
27 from twisted.web.client import getPage
28 from twisted.web.error import Error
29
30 from allmydata.test.common import SystemTestMixin
31
32 LARGE_DATA = """
33 This is some data to publish to the remote grid.., which needs to be large
34 enough to not fit inside a LIT uri.
35 """
36
37 class CountingDataUploadable(upload.Data):
38     bytes_read = 0
39     interrupt_after = None
40     interrupt_after_d = None
41
42     def read(self, length):
43         self.bytes_read += length
44         if self.interrupt_after is not None:
45             if self.bytes_read > self.interrupt_after:
46                 self.interrupt_after = None
47                 self.interrupt_after_d.callback(self)
48         return upload.Data.read(self, length)
49
50 class SystemTest(SystemTestMixin, unittest.TestCase):
51     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
52
53     def test_connections(self):
54         self.basedir = "system/SystemTest/test_connections"
55         d = self.set_up_nodes()
56         self.extra_node = None
57         d.addCallback(lambda res: self.add_extra_node(self.numclients))
58         def _check(extra_node):
59             self.extra_node = extra_node
60             for c in self.clients:
61                 all_peerids = c.get_storage_broker().get_all_serverids()
62                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
63                 sb = c.storage_broker
64                 permuted_peers = sb.get_servers_for_index("a")
65                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
66
67         d.addCallback(_check)
68         def _shutdown_extra_node(res):
69             if self.extra_node:
70                 return self.extra_node.stopService()
71             return res
72         d.addBoth(_shutdown_extra_node)
73         return d
74     # test_connections is subsumed by test_upload_and_download, and takes
75     # quite a while to run on a slow machine (because of all the TLS
76     # connections that must be established). If we ever rework the introducer
77     # code to such an extent that we're not sure if it works anymore, we can
78     # reinstate this test until it does.
79     del test_connections
80
81     def test_upload_and_download_random_key(self):
82         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
83         return self._test_upload_and_download(convergence=None)
84
85     def test_upload_and_download_convergent(self):
86         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
87         return self._test_upload_and_download(convergence="some convergence string")
88
89     def _test_upload_and_download(self, convergence):
90         # we use 4000 bytes of data, which will result in about 400k written
91         # to disk among all our simulated nodes
92         DATA = "Some data to upload\n" * 200
93         d = self.set_up_nodes()
94         def _check_connections(res):
95             for c in self.clients:
96                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
97                 all_peerids = c.get_storage_broker().get_all_serverids()
98                 self.failUnlessEqual(len(all_peerids), self.numclients)
99                 sb = c.storage_broker
100                 permuted_peers = sb.get_servers_for_index("a")
101                 self.failUnlessEqual(len(permuted_peers), self.numclients)
102         d.addCallback(_check_connections)
103
104         def _do_upload(res):
105             log.msg("UPLOADING")
106             u = self.clients[0].getServiceNamed("uploader")
107             self.uploader = u
108             # we crank the max segsize down to 1024b for the duration of this
109             # test, so we can exercise multiple segments. It is important
110             # that this is not a multiple of the segment size, so that the
111             # tail segment is not the same length as the others. This actualy
112             # gets rounded up to 1025 to be a multiple of the number of
113             # required shares (since we use 25 out of 100 FEC).
114             up = upload.Data(DATA, convergence=convergence)
115             up.max_segment_size = 1024
116             d1 = u.upload(up)
117             return d1
118         d.addCallback(_do_upload)
119         def _upload_done(results):
120             theuri = results.uri
121             log.msg("upload finished: uri is %s" % (theuri,))
122             self.uri = theuri
123             assert isinstance(self.uri, str), self.uri
124             self.cap = uri.from_string(self.uri)
125             self.n = self.clients[1].create_node_from_uri(self.uri)
126         d.addCallback(_upload_done)
127
128         def _upload_again(res):
129             # Upload again. If using convergent encryption then this ought to be
130             # short-circuited, however with the way we currently generate URIs
131             # (i.e. because they include the roothash), we have to do all of the
132             # encoding work, and only get to save on the upload part.
133             log.msg("UPLOADING AGAIN")
134             up = upload.Data(DATA, convergence=convergence)
135             up.max_segment_size = 1024
136             return self.uploader.upload(up)
137         d.addCallback(_upload_again)
138
139         def _download_to_data(res):
140             log.msg("DOWNLOADING")
141             return download_to_data(self.n)
142         d.addCallback(_download_to_data)
143         def _download_to_data_done(data):
144             log.msg("download finished")
145             self.failUnlessEqual(data, DATA)
146         d.addCallback(_download_to_data_done)
147
148         def _test_read(res):
149             n = self.clients[1].create_node_from_uri(self.uri)
150             d = download_to_data(n)
151             def _read_done(data):
152                 self.failUnlessEqual(data, DATA)
153             d.addCallback(_read_done)
154             d.addCallback(lambda ign:
155                           n.read(MemoryConsumer(), offset=1, size=4))
156             def _read_portion_done(mc):
157                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
158             d.addCallback(_read_portion_done)
159             d.addCallback(lambda ign:
160                           n.read(MemoryConsumer(), offset=2, size=None))
161             def _read_tail_done(mc):
162                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
163             d.addCallback(_read_tail_done)
164             d.addCallback(lambda ign:
165                           n.read(MemoryConsumer(), size=len(DATA)+1000))
166             def _read_too_much(mc):
167                 self.failUnlessEqual("".join(mc.chunks), DATA)
168             d.addCallback(_read_too_much)
169
170             return d
171         d.addCallback(_test_read)
172
173         def _test_bad_read(res):
174             bad_u = uri.from_string_filenode(self.uri)
175             bad_u.key = self.flip_bit(bad_u.key)
176             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
177             # this should cause an error during download
178
179             d = self.shouldFail2(NoSharesError, "'download bad node'",
180                                  None,
181                                  bad_n.read, MemoryConsumer(), offset=2)
182             return d
183         d.addCallback(_test_bad_read)
184
185         def _download_nonexistent_uri(res):
186             baduri = self.mangle_uri(self.uri)
187             badnode = self.clients[1].create_node_from_uri(baduri)
188             log.msg("about to download non-existent URI", level=log.UNUSUAL,
189                     facility="tahoe.tests")
190             d1 = download_to_data(badnode)
191             def _baduri_should_fail(res):
192                 log.msg("finished downloading non-existend URI",
193                         level=log.UNUSUAL, facility="tahoe.tests")
194                 self.failUnless(isinstance(res, Failure))
195                 self.failUnless(res.check(NoSharesError),
196                                 "expected NoSharesError, got %s" % res)
197             d1.addBoth(_baduri_should_fail)
198             return d1
199         d.addCallback(_download_nonexistent_uri)
200
201         # add a new node, which doesn't accept shares, and only uses the
202         # helper for upload.
203         d.addCallback(lambda res: self.add_extra_node(self.numclients,
204                                                       self.helper_furl,
205                                                       add_to_sparent=True))
206         def _added(extra_node):
207             self.extra_node = extra_node
208             self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
209         d.addCallback(_added)
210
211         HELPER_DATA = "Data that needs help to upload" * 1000
212         def _upload_with_helper(res):
213             u = upload.Data(HELPER_DATA, convergence=convergence)
214             d = self.extra_node.upload(u)
215             def _uploaded(results):
216                 n = self.clients[1].create_node_from_uri(results.uri)
217                 return download_to_data(n)
218             d.addCallback(_uploaded)
219             def _check(newdata):
220                 self.failUnlessEqual(newdata, HELPER_DATA)
221             d.addCallback(_check)
222             return d
223         d.addCallback(_upload_with_helper)
224
225         def _upload_duplicate_with_helper(res):
226             u = upload.Data(HELPER_DATA, convergence=convergence)
227             u.debug_stash_RemoteEncryptedUploadable = True
228             d = self.extra_node.upload(u)
229             def _uploaded(results):
230                 n = self.clients[1].create_node_from_uri(results.uri)
231                 return download_to_data(n)
232             d.addCallback(_uploaded)
233             def _check(newdata):
234                 self.failUnlessEqual(newdata, HELPER_DATA)
235                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
236                             "uploadable started uploading, should have been avoided")
237             d.addCallback(_check)
238             return d
239         if convergence is not None:
240             d.addCallback(_upload_duplicate_with_helper)
241
242         def _upload_resumable(res):
243             DATA = "Data that needs help to upload and gets interrupted" * 1000
244             u1 = CountingDataUploadable(DATA, convergence=convergence)
245             u2 = CountingDataUploadable(DATA, convergence=convergence)
246
247             # we interrupt the connection after about 5kB by shutting down
248             # the helper, then restartingit.
249             u1.interrupt_after = 5000
250             u1.interrupt_after_d = defer.Deferred()
251             u1.interrupt_after_d.addCallback(lambda res:
252                                              self.bounce_client(0))
253
254             # sneak into the helper and reduce its chunk size, so that our
255             # debug_interrupt will sever the connection on about the fifth
256             # chunk fetched. This makes sure that we've started to write the
257             # new shares before we abandon them, which exercises the
258             # abort/delete-partial-share code. TODO: find a cleaner way to do
259             # this. I know that this will affect later uses of the helper in
260             # this same test run, but I'm not currently worried about it.
261             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
262
263             d = self.extra_node.upload(u1)
264
265             def _should_not_finish(res):
266                 self.fail("interrupted upload should have failed, not finished"
267                           " with result %s" % (res,))
268             def _interrupted(f):
269                 f.trap(DeadReferenceError)
270
271                 # make sure we actually interrupted it before finishing the
272                 # file
273                 self.failUnless(u1.bytes_read < len(DATA),
274                                 "read %d out of %d total" % (u1.bytes_read,
275                                                              len(DATA)))
276
277                 log.msg("waiting for reconnect", level=log.NOISY,
278                         facility="tahoe.test.test_system")
279                 # now, we need to give the nodes a chance to notice that this
280                 # connection has gone away. When this happens, the storage
281                 # servers will be told to abort their uploads, removing the
282                 # partial shares. Unfortunately this involves TCP messages
283                 # going through the loopback interface, and we can't easily
284                 # predict how long that will take. If it were all local, we
285                 # could use fireEventually() to stall. Since we don't have
286                 # the right introduction hooks, the best we can do is use a
287                 # fixed delay. TODO: this is fragile.
288                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
289                 return u1.interrupt_after_d
290             d.addCallbacks(_should_not_finish, _interrupted)
291
292             def _disconnected(res):
293                 # check to make sure the storage servers aren't still hanging
294                 # on to the partial share: their incoming/ directories should
295                 # now be empty.
296                 log.msg("disconnected", level=log.NOISY,
297                         facility="tahoe.test.test_system")
298                 for i in range(self.numclients):
299                     incdir = os.path.join(self.getdir("client%d" % i),
300                                           "storage", "shares", "incoming")
301                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
302             d.addCallback(_disconnected)
303
304             # then we need to give the reconnector a chance to
305             # reestablish the connection to the helper.
306             d.addCallback(lambda res:
307                           log.msg("wait_for_connections", level=log.NOISY,
308                                   facility="tahoe.test.test_system"))
309             d.addCallback(lambda res: self.wait_for_connections())
310
311
312             d.addCallback(lambda res:
313                           log.msg("uploading again", level=log.NOISY,
314                                   facility="tahoe.test.test_system"))
315             d.addCallback(lambda res: self.extra_node.upload(u2))
316
317             def _uploaded(results):
318                 cap = results.uri
319                 log.msg("Second upload complete", level=log.NOISY,
320                         facility="tahoe.test.test_system")
321
322                 # this is really bytes received rather than sent, but it's
323                 # convenient and basically measures the same thing
324                 bytes_sent = results.ciphertext_fetched
325                 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
326
327                 # We currently don't support resumption of upload if the data is
328                 # encrypted with a random key.  (Because that would require us
329                 # to store the key locally and re-use it on the next upload of
330                 # this file, which isn't a bad thing to do, but we currently
331                 # don't do it.)
332                 if convergence is not None:
333                     # Make sure we did not have to read the whole file the
334                     # second time around .
335                     self.failUnless(bytes_sent < len(DATA),
336                                     "resumption didn't save us any work:"
337                                     " read %r bytes out of %r total" %
338                                     (bytes_sent, len(DATA)))
339                 else:
340                     # Make sure we did have to read the whole file the second
341                     # time around -- because the one that we partially uploaded
342                     # earlier was encrypted with a different random key.
343                     self.failIf(bytes_sent < len(DATA),
344                                 "resumption saved us some work even though we were using random keys:"
345                                 " read %r bytes out of %r total" %
346                                 (bytes_sent, len(DATA)))
347                 n = self.clients[1].create_node_from_uri(cap)
348                 return download_to_data(n)
349             d.addCallback(_uploaded)
350
351             def _check(newdata):
352                 self.failUnlessEqual(newdata, DATA)
353                 # If using convergent encryption, then also check that the
354                 # helper has removed the temp file from its directories.
355                 if convergence is not None:
356                     basedir = os.path.join(self.getdir("client0"), "helper")
357                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
358                     self.failUnlessEqual(files, [])
359                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
360                     self.failUnlessEqual(files, [])
361             d.addCallback(_check)
362             return d
363         d.addCallback(_upload_resumable)
364
365         def _grab_stats(ignored):
366             # the StatsProvider doesn't normally publish a FURL:
367             # instead it passes a live reference to the StatsGatherer
368             # (if and when it connects). To exercise the remote stats
369             # interface, we manually publish client0's StatsProvider
370             # and use client1 to query it.
371             sp = self.clients[0].stats_provider
372             sp_furl = self.clients[0].tub.registerReference(sp)
373             d = self.clients[1].tub.getReference(sp_furl)
374             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
375             def _got_stats(stats):
376                 #print "STATS"
377                 #from pprint import pprint
378                 #pprint(stats)
379                 s = stats["stats"]
380                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
381                 c = stats["counters"]
382                 self.failUnless("storage_server.allocate" in c)
383             d.addCallback(_got_stats)
384             return d
385         d.addCallback(_grab_stats)
386
387         return d
388
389     def _find_all_shares(self, basedir):
390         shares = []
391         for (dirpath, dirnames, filenames) in os.walk(basedir):
392             if "storage" not in dirpath:
393                 continue
394             if not filenames:
395                 continue
396             pieces = dirpath.split(os.sep)
397             if (len(pieces) >= 5
398                 and pieces[-4] == "storage"
399                 and pieces[-3] == "shares"):
400                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
401                 # are sharefiles here
402                 assert pieces[-5].startswith("client")
403                 client_num = int(pieces[-5][-1])
404                 storage_index_s = pieces[-1]
405                 storage_index = si_a2b(storage_index_s)
406                 for sharename in filenames:
407                     shnum = int(sharename)
408                     filename = os.path.join(dirpath, sharename)
409                     data = (client_num, storage_index, filename, shnum)
410                     shares.append(data)
411         if not shares:
412             self.fail("unable to find any share files in %s" % basedir)
413         return shares
414
415     def _corrupt_mutable_share(self, filename, which):
416         msf = MutableShareFile(filename)
417         datav = msf.readv([ (0, 1000000) ])
418         final_share = datav[0]
419         assert len(final_share) < 1000000 # ought to be truncated
420         pieces = mutable_layout.unpack_share(final_share)
421         (seqnum, root_hash, IV, k, N, segsize, datalen,
422          verification_key, signature, share_hash_chain, block_hash_tree,
423          share_data, enc_privkey) = pieces
424
425         if which == "seqnum":
426             seqnum = seqnum + 15
427         elif which == "R":
428             root_hash = self.flip_bit(root_hash)
429         elif which == "IV":
430             IV = self.flip_bit(IV)
431         elif which == "segsize":
432             segsize = segsize + 15
433         elif which == "pubkey":
434             verification_key = self.flip_bit(verification_key)
435         elif which == "signature":
436             signature = self.flip_bit(signature)
437         elif which == "share_hash_chain":
438             nodenum = share_hash_chain.keys()[0]
439             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
440         elif which == "block_hash_tree":
441             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
442         elif which == "share_data":
443             share_data = self.flip_bit(share_data)
444         elif which == "encprivkey":
445             enc_privkey = self.flip_bit(enc_privkey)
446
447         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
448                                             segsize, datalen)
449         final_share = mutable_layout.pack_share(prefix,
450                                                 verification_key,
451                                                 signature,
452                                                 share_hash_chain,
453                                                 block_hash_tree,
454                                                 share_data,
455                                                 enc_privkey)
456         msf.writev( [(0, final_share)], None)
457
458
459     def test_mutable(self):
460         self.basedir = "system/SystemTest/test_mutable"
461         DATA = "initial contents go here."  # 25 bytes % 3 != 0
462         NEWDATA = "new contents yay"
463         NEWERDATA = "this is getting old"
464
465         d = self.set_up_nodes(use_key_generator=True)
466
467         def _create_mutable(res):
468             c = self.clients[0]
469             log.msg("starting create_mutable_file")
470             d1 = c.create_mutable_file(DATA)
471             def _done(res):
472                 log.msg("DONE: %s" % (res,))
473                 self._mutable_node_1 = res
474             d1.addCallback(_done)
475             return d1
476         d.addCallback(_create_mutable)
477
478         def _test_debug(res):
479             # find a share. It is important to run this while there is only
480             # one slot in the grid.
481             shares = self._find_all_shares(self.basedir)
482             (client_num, storage_index, filename, shnum) = shares[0]
483             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
484                     % filename)
485             log.msg(" for clients[%d]" % client_num)
486
487             out,err = StringIO(), StringIO()
488             rc = runner.runner(["debug", "dump-share", "--offsets",
489                                 filename],
490                                stdout=out, stderr=err)
491             output = out.getvalue()
492             self.failUnlessEqual(rc, 0)
493             try:
494                 self.failUnless("Mutable slot found:\n" in output)
495                 self.failUnless("share_type: SDMF\n" in output)
496                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
497                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
498                 self.failUnless(" num_extra_leases: 0\n" in output)
499                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
500                                 in output)
501                 self.failUnless(" SDMF contents:\n" in output)
502                 self.failUnless("  seqnum: 1\n" in output)
503                 self.failUnless("  required_shares: 3\n" in output)
504                 self.failUnless("  total_shares: 10\n" in output)
505                 self.failUnless("  segsize: 27\n" in output, (output, filename))
506                 self.failUnless("  datalen: 25\n" in output)
507                 # the exact share_hash_chain nodes depends upon the sharenum,
508                 # and is more of a hassle to compute than I want to deal with
509                 # now
510                 self.failUnless("  share_hash_chain: " in output)
511                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
512                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
513                             base32.b2a(storage_index))
514                 self.failUnless(expected in output)
515             except unittest.FailTest:
516                 print
517                 print "dump-share output was:"
518                 print output
519                 raise
520         d.addCallback(_test_debug)
521
522         # test retrieval
523
524         # first, let's see if we can use the existing node to retrieve the
525         # contents. This allows it to use the cached pubkey and maybe the
526         # latest-known sharemap.
527
528         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
529         def _check_download_1(res):
530             self.failUnlessEqual(res, DATA)
531             # now we see if we can retrieve the data from a new node,
532             # constructed using the URI of the original one. We do this test
533             # on the same client that uploaded the data.
534             uri = self._mutable_node_1.get_uri()
535             log.msg("starting retrieve1")
536             newnode = self.clients[0].create_node_from_uri(uri)
537             newnode_2 = self.clients[0].create_node_from_uri(uri)
538             self.failUnlessIdentical(newnode, newnode_2)
539             return newnode.download_best_version()
540         d.addCallback(_check_download_1)
541
542         def _check_download_2(res):
543             self.failUnlessEqual(res, DATA)
544             # same thing, but with a different client
545             uri = self._mutable_node_1.get_uri()
546             newnode = self.clients[1].create_node_from_uri(uri)
547             log.msg("starting retrieve2")
548             d1 = newnode.download_best_version()
549             d1.addCallback(lambda res: (res, newnode))
550             return d1
551         d.addCallback(_check_download_2)
552
553         def _check_download_3((res, newnode)):
554             self.failUnlessEqual(res, DATA)
555             # replace the data
556             log.msg("starting replace1")
557             d1 = newnode.overwrite(NEWDATA)
558             d1.addCallback(lambda res: newnode.download_best_version())
559             return d1
560         d.addCallback(_check_download_3)
561
562         def _check_download_4(res):
563             self.failUnlessEqual(res, NEWDATA)
564             # now create an even newer node and replace the data on it. This
565             # new node has never been used for download before.
566             uri = self._mutable_node_1.get_uri()
567             newnode1 = self.clients[2].create_node_from_uri(uri)
568             newnode2 = self.clients[3].create_node_from_uri(uri)
569             self._newnode3 = self.clients[3].create_node_from_uri(uri)
570             log.msg("starting replace2")
571             d1 = newnode1.overwrite(NEWERDATA)
572             d1.addCallback(lambda res: newnode2.download_best_version())
573             return d1
574         d.addCallback(_check_download_4)
575
576         def _check_download_5(res):
577             log.msg("finished replace2")
578             self.failUnlessEqual(res, NEWERDATA)
579         d.addCallback(_check_download_5)
580
581         def _corrupt_shares(res):
582             # run around and flip bits in all but k of the shares, to test
583             # the hash checks
584             shares = self._find_all_shares(self.basedir)
585             ## sort by share number
586             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
587             where = dict([ (shnum, filename)
588                            for (client_num, storage_index, filename, shnum)
589                            in shares ])
590             assert len(where) == 10 # this test is designed for 3-of-10
591             for shnum, filename in where.items():
592                 # shares 7,8,9 are left alone. read will check
593                 # (share_hash_chain, block_hash_tree, share_data). New
594                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
595                 # segsize, signature).
596                 if shnum == 0:
597                     # read: this will trigger "pubkey doesn't match
598                     # fingerprint".
599                     self._corrupt_mutable_share(filename, "pubkey")
600                     self._corrupt_mutable_share(filename, "encprivkey")
601                 elif shnum == 1:
602                     # triggers "signature is invalid"
603                     self._corrupt_mutable_share(filename, "seqnum")
604                 elif shnum == 2:
605                     # triggers "signature is invalid"
606                     self._corrupt_mutable_share(filename, "R")
607                 elif shnum == 3:
608                     # triggers "signature is invalid"
609                     self._corrupt_mutable_share(filename, "segsize")
610                 elif shnum == 4:
611                     self._corrupt_mutable_share(filename, "share_hash_chain")
612                 elif shnum == 5:
613                     self._corrupt_mutable_share(filename, "block_hash_tree")
614                 elif shnum == 6:
615                     self._corrupt_mutable_share(filename, "share_data")
616                 # other things to correct: IV, signature
617                 # 7,8,9 are left alone
618
619                 # note that initial_query_count=5 means that we'll hit the
620                 # first 5 servers in effectively random order (based upon
621                 # response time), so we won't necessarily ever get a "pubkey
622                 # doesn't match fingerprint" error (if we hit shnum>=1 before
623                 # shnum=0, we pull the pubkey from there). To get repeatable
624                 # specific failures, we need to set initial_query_count=1,
625                 # but of course that will change the sequencing behavior of
626                 # the retrieval process. TODO: find a reasonable way to make
627                 # this a parameter, probably when we expand this test to test
628                 # for one failure mode at a time.
629
630                 # when we retrieve this, we should get three signature
631                 # failures (where we've mangled seqnum, R, and segsize). The
632                 # pubkey mangling
633         d.addCallback(_corrupt_shares)
634
635         d.addCallback(lambda res: self._newnode3.download_best_version())
636         d.addCallback(_check_download_5)
637
638         def _check_empty_file(res):
639             # make sure we can create empty files, this usually screws up the
640             # segsize math
641             d1 = self.clients[2].create_mutable_file("")
642             d1.addCallback(lambda newnode: newnode.download_best_version())
643             d1.addCallback(lambda res: self.failUnlessEqual("", res))
644             return d1
645         d.addCallback(_check_empty_file)
646
647         d.addCallback(lambda res: self.clients[0].create_dirnode())
648         def _created_dirnode(dnode):
649             log.msg("_created_dirnode(%s)" % (dnode,))
650             d1 = dnode.list()
651             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
652             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
653             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
654             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
655             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
656             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
657             d1.addCallback(lambda res: dnode.build_manifest().when_done())
658             d1.addCallback(lambda res:
659                            self.failUnlessEqual(len(res["manifest"]), 1))
660             return d1
661         d.addCallback(_created_dirnode)
662
663         def wait_for_c3_kg_conn():
664             return self.clients[3]._key_generator is not None
665         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
666
667         def check_kg_poolsize(junk, size_delta):
668             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
669                                  self.key_generator_svc.key_generator.pool_size + size_delta)
670
671         d.addCallback(check_kg_poolsize, 0)
672         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
673         d.addCallback(check_kg_poolsize, -1)
674         d.addCallback(lambda junk: self.clients[3].create_dirnode())
675         d.addCallback(check_kg_poolsize, -2)
676         # use_helper induces use of clients[3], which is the using-key_gen client
677         d.addCallback(lambda junk:
678                       self.POST("uri?t=mkdir&name=george", use_helper=True))
679         d.addCallback(check_kg_poolsize, -3)
680
681         return d
682
683     def flip_bit(self, good):
684         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
685
686     def mangle_uri(self, gooduri):
687         # change the key, which changes the storage index, which means we'll
688         # be asking about the wrong file, so nobody will have any shares
689         u = uri.from_string(gooduri)
690         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
691                             uri_extension_hash=u.uri_extension_hash,
692                             needed_shares=u.needed_shares,
693                             total_shares=u.total_shares,
694                             size=u.size)
695         return u2.to_string()
696
697     # TODO: add a test which mangles the uri_extension_hash instead, and
698     # should fail due to not being able to get a valid uri_extension block.
699     # Also a test which sneakily mangles the uri_extension block to change
700     # some of the validation data, so it will fail in the post-download phase
701     # when the file's crypttext integrity check fails. Do the same thing for
702     # the key, which should cause the download to fail the post-download
703     # plaintext_hash check.
704
705     def test_filesystem(self):
706         self.basedir = "system/SystemTest/test_filesystem"
707         self.data = LARGE_DATA
708         d = self.set_up_nodes(use_stats_gatherer=True)
709         def _new_happy_semantics(ign):
710             for c in self.clients:
711                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
712         d.addCallback(_new_happy_semantics)
713         d.addCallback(self._test_introweb)
714         d.addCallback(self.log, "starting publish")
715         d.addCallback(self._do_publish1)
716         d.addCallback(self._test_runner)
717         d.addCallback(self._do_publish2)
718         # at this point, we have the following filesystem (where "R" denotes
719         # self._root_directory_uri):
720         # R
721         # R/subdir1
722         # R/subdir1/mydata567
723         # R/subdir1/subdir2/
724         # R/subdir1/subdir2/mydata992
725
726         d.addCallback(lambda res: self.bounce_client(0))
727         d.addCallback(self.log, "bounced client0")
728
729         d.addCallback(self._check_publish1)
730         d.addCallback(self.log, "did _check_publish1")
731         d.addCallback(self._check_publish2)
732         d.addCallback(self.log, "did _check_publish2")
733         d.addCallback(self._do_publish_private)
734         d.addCallback(self.log, "did _do_publish_private")
735         # now we also have (where "P" denotes a new dir):
736         #  P/personal/sekrit data
737         #  P/s2-rw -> /subdir1/subdir2/
738         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
739         d.addCallback(self._check_publish_private)
740         d.addCallback(self.log, "did _check_publish_private")
741         d.addCallback(self._test_web)
742         d.addCallback(self._test_control)
743         d.addCallback(self._test_cli)
744         # P now has four top-level children:
745         # P/personal/sekrit data
746         # P/s2-ro/
747         # P/s2-rw/
748         # P/test_put/  (empty)
749         d.addCallback(self._test_checker)
750         return d
751
752     def _test_introweb(self, res):
753         d = getPage(self.introweb_url, method="GET", followRedirect=True)
754         def _check(res):
755             try:
756                 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res)
757                 verstr = str(allmydata.__version__)
758
759                 # The Python "rational version numbering" convention
760                 # disallows "-r$REV" but allows ".post$REV"
761                 # instead. Eventually we'll probably move to
762                 # that. When we do, this test won't go red:
763                 ix = verstr.rfind('-r')
764                 if ix != -1:
765                     altverstr = verstr[:ix] + '.post' + verstr[ix+2:]
766                 else:
767                     ix = verstr.rfind('.post')
768                     if ix != -1:
769                         altverstr = verstr[:ix] + '-r' + verstr[ix+5:]
770                     else:
771                         altverstr = verstr
772
773                 appverstr = "%s: %s" % (allmydata.__appname__, verstr)
774                 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
775
776                 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
777                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
778                 self.failUnless("Subscription Summary: storage: 5" in res)
779             except unittest.FailTest:
780                 print
781                 print "GET %s output was:" % self.introweb_url
782                 print res
783                 raise
784         d.addCallback(_check)
785         d.addCallback(lambda res:
786                       getPage(self.introweb_url + "?t=json",
787                               method="GET", followRedirect=True))
788         def _check_json(res):
789             data = simplejson.loads(res)
790             try:
791                 self.failUnlessEqual(data["subscription_summary"],
792                                      {"storage": 5})
793                 self.failUnlessEqual(data["announcement_summary"],
794                                      {"storage": 5, "stub_client": 5})
795                 self.failUnlessEqual(data["announcement_distinct_hosts"],
796                                      {"storage": 1, "stub_client": 1})
797             except unittest.FailTest:
798                 print
799                 print "GET %s?t=json output was:" % self.introweb_url
800                 print res
801                 raise
802         d.addCallback(_check_json)
803         return d
804
805     def _do_publish1(self, res):
806         ut = upload.Data(self.data, convergence=None)
807         c0 = self.clients[0]
808         d = c0.create_dirnode()
809         def _made_root(new_dirnode):
810             self._root_directory_uri = new_dirnode.get_uri()
811             return c0.create_node_from_uri(self._root_directory_uri)
812         d.addCallback(_made_root)
813         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
814         def _made_subdir1(subdir1_node):
815             self._subdir1_node = subdir1_node
816             d1 = subdir1_node.add_file(u"mydata567", ut)
817             d1.addCallback(self.log, "publish finished")
818             def _stash_uri(filenode):
819                 self.uri = filenode.get_uri()
820                 assert isinstance(self.uri, str), (self.uri, filenode)
821             d1.addCallback(_stash_uri)
822             return d1
823         d.addCallback(_made_subdir1)
824         return d
825
826     def _do_publish2(self, res):
827         ut = upload.Data(self.data, convergence=None)
828         d = self._subdir1_node.create_subdirectory(u"subdir2")
829         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
830         return d
831
832     def log(self, res, *args, **kwargs):
833         # print "MSG: %s  RES: %s" % (msg, args)
834         log.msg(*args, **kwargs)
835         return res
836
837     def _do_publish_private(self, res):
838         self.smalldata = "sssh, very secret stuff"
839         ut = upload.Data(self.smalldata, convergence=None)
840         d = self.clients[0].create_dirnode()
841         d.addCallback(self.log, "GOT private directory")
842         def _got_new_dir(privnode):
843             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
844             d1 = privnode.create_subdirectory(u"personal")
845             d1.addCallback(self.log, "made P/personal")
846             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
847             d1.addCallback(self.log, "made P/personal/sekrit data")
848             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
849             def _got_s2(s2node):
850                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
851                                       s2node.get_readonly_uri())
852                 d2.addCallback(lambda node:
853                                privnode.set_uri(u"s2-ro",
854                                                 s2node.get_readonly_uri(),
855                                                 s2node.get_readonly_uri()))
856                 return d2
857             d1.addCallback(_got_s2)
858             d1.addCallback(lambda res: privnode)
859             return d1
860         d.addCallback(_got_new_dir)
861         return d
862
863     def _check_publish1(self, res):
864         # this one uses the iterative API
865         c1 = self.clients[1]
866         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
867         d.addCallback(self.log, "check_publish1 got /")
868         d.addCallback(lambda root: root.get(u"subdir1"))
869         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
870         d.addCallback(lambda filenode: download_to_data(filenode))
871         d.addCallback(self.log, "get finished")
872         def _get_done(data):
873             self.failUnlessEqual(data, self.data)
874         d.addCallback(_get_done)
875         return d
876
877     def _check_publish2(self, res):
878         # this one uses the path-based API
879         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
880         d = rootnode.get_child_at_path(u"subdir1")
881         d.addCallback(lambda dirnode:
882                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
883         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
884         d.addCallback(lambda filenode: download_to_data(filenode))
885         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
886
887         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
888         def _got_filenode(filenode):
889             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
890             assert fnode == filenode
891         d.addCallback(_got_filenode)
892         return d
893
894     def _check_publish_private(self, resnode):
895         # this one uses the path-based API
896         self._private_node = resnode
897
898         d = self._private_node.get_child_at_path(u"personal")
899         def _got_personal(personal):
900             self._personal_node = personal
901             return personal
902         d.addCallback(_got_personal)
903
904         d.addCallback(lambda dirnode:
905                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
906         def get_path(path):
907             return self._private_node.get_child_at_path(path)
908
909         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
910         d.addCallback(lambda filenode: download_to_data(filenode))
911         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
912         d.addCallback(lambda res: get_path(u"s2-rw"))
913         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
914         d.addCallback(lambda res: get_path(u"s2-ro"))
915         def _got_s2ro(dirnode):
916             self.failUnless(dirnode.is_mutable(), dirnode)
917             self.failUnless(dirnode.is_readonly(), dirnode)
918             d1 = defer.succeed(None)
919             d1.addCallback(lambda res: dirnode.list())
920             d1.addCallback(self.log, "dirnode.list")
921
922             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
923
924             d1.addCallback(self.log, "doing add_file(ro)")
925             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
926             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
927
928             d1.addCallback(self.log, "doing get(ro)")
929             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
930             d1.addCallback(lambda filenode:
931                            self.failUnless(IFileNode.providedBy(filenode)))
932
933             d1.addCallback(self.log, "doing delete(ro)")
934             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
935
936             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
937
938             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
939
940             personal = self._personal_node
941             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
942
943             d1.addCallback(self.log, "doing move_child_to(ro)2")
944             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
945
946             d1.addCallback(self.log, "finished with _got_s2ro")
947             return d1
948         d.addCallback(_got_s2ro)
949         def _got_home(dummy):
950             home = self._private_node
951             personal = self._personal_node
952             d1 = defer.succeed(None)
953             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
954             d1.addCallback(lambda res:
955                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
956
957             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
958             d1.addCallback(lambda res:
959                            home.move_child_to(u"sekrit", home, u"sekrit data"))
960
961             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
962             d1.addCallback(lambda res:
963                            home.move_child_to(u"sekrit data", personal))
964
965             d1.addCallback(lambda res: home.build_manifest().when_done())
966             d1.addCallback(self.log, "manifest")
967             #  five items:
968             # P/
969             # P/personal/
970             # P/personal/sekrit data
971             # P/s2-rw  (same as P/s2-ro)
972             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
973             d1.addCallback(lambda res:
974                            self.failUnlessEqual(len(res["manifest"]), 5))
975             d1.addCallback(lambda res: home.start_deep_stats().when_done())
976             def _check_stats(stats):
977                 expected = {"count-immutable-files": 1,
978                             "count-mutable-files": 0,
979                             "count-literal-files": 1,
980                             "count-files": 2,
981                             "count-directories": 3,
982                             "size-immutable-files": 112,
983                             "size-literal-files": 23,
984                             #"size-directories": 616, # varies
985                             #"largest-directory": 616,
986                             "largest-directory-children": 3,
987                             "largest-immutable-file": 112,
988                             }
989                 for k,v in expected.iteritems():
990                     self.failUnlessEqual(stats[k], v,
991                                          "stats[%s] was %s, not %s" %
992                                          (k, stats[k], v))
993                 self.failUnless(stats["size-directories"] > 1300,
994                                 stats["size-directories"])
995                 self.failUnless(stats["largest-directory"] > 800,
996                                 stats["largest-directory"])
997                 self.failUnlessEqual(stats["size-files-histogram"],
998                                      [ (11, 31, 1), (101, 316, 1) ])
999             d1.addCallback(_check_stats)
1000             return d1
1001         d.addCallback(_got_home)
1002         return d
1003
1004     def shouldFail(self, res, expected_failure, which, substring=None):
1005         if isinstance(res, Failure):
1006             res.trap(expected_failure)
1007             if substring:
1008                 self.failUnless(substring in str(res),
1009                                 "substring '%s' not in '%s'"
1010                                 % (substring, str(res)))
1011         else:
1012             self.fail("%s was supposed to raise %s, not get '%s'" %
1013                       (which, expected_failure, res))
1014
1015     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1016         assert substring is None or isinstance(substring, str)
1017         d = defer.maybeDeferred(callable, *args, **kwargs)
1018         def done(res):
1019             if isinstance(res, Failure):
1020                 res.trap(expected_failure)
1021                 if substring:
1022                     self.failUnless(substring in str(res),
1023                                     "substring '%s' not in '%s'"
1024                                     % (substring, str(res)))
1025             else:
1026                 self.fail("%s was supposed to raise %s, not get '%s'" %
1027                           (which, expected_failure, res))
1028         d.addBoth(done)
1029         return d
1030
1031     def PUT(self, urlpath, data):
1032         url = self.webish_url + urlpath
1033         return getPage(url, method="PUT", postdata=data)
1034
1035     def GET(self, urlpath, followRedirect=False):
1036         url = self.webish_url + urlpath
1037         return getPage(url, method="GET", followRedirect=followRedirect)
1038
1039     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1040         sepbase = "boogabooga"
1041         sep = "--" + sepbase
1042         form = []
1043         form.append(sep)
1044         form.append('Content-Disposition: form-data; name="_charset"')
1045         form.append('')
1046         form.append('UTF-8')
1047         form.append(sep)
1048         for name, value in fields.iteritems():
1049             if isinstance(value, tuple):
1050                 filename, value = value
1051                 form.append('Content-Disposition: form-data; name="%s"; '
1052                             'filename="%s"' % (name, filename.encode("utf-8")))
1053             else:
1054                 form.append('Content-Disposition: form-data; name="%s"' % name)
1055             form.append('')
1056             form.append(str(value))
1057             form.append(sep)
1058         form[-1] += "--"
1059         body = ""
1060         headers = {}
1061         if fields:
1062             body = "\r\n".join(form) + "\r\n"
1063             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1064         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1065
1066     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1067               use_helper=False):
1068         if use_helper:
1069             url = self.helper_webish_url + urlpath
1070         else:
1071             url = self.webish_url + urlpath
1072         return getPage(url, method="POST", postdata=body, headers=headers,
1073                        followRedirect=followRedirect)
1074
1075     def _test_web(self, res):
1076         base = self.webish_url
1077         public = "uri/" + self._root_directory_uri
1078         d = getPage(base)
1079         def _got_welcome(page):
1080             # XXX This test is oversensitive to formatting
1081             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1082             self.failUnless(expected in page,
1083                             "I didn't see the right 'connected storage servers'"
1084                             " message in: %s" % page
1085                             )
1086             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1087             self.failUnless(expected in page,
1088                             "I didn't see the right 'My nodeid' message "
1089                             "in: %s" % page)
1090             self.failUnless("Helper: 0 active uploads" in page)
1091         d.addCallback(_got_welcome)
1092         d.addCallback(self.log, "done with _got_welcome")
1093
1094         # get the welcome page from the node that uses the helper too
1095         d.addCallback(lambda res: getPage(self.helper_webish_url))
1096         def _got_welcome_helper(page):
1097             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1098                             page)
1099             self.failUnless("Not running helper" in page)
1100         d.addCallback(_got_welcome_helper)
1101
1102         d.addCallback(lambda res: getPage(base + public))
1103         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1104         def _got_subdir1(page):
1105             # there ought to be an href for our file
1106             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1107             self.failUnless(">mydata567</a>" in page)
1108         d.addCallback(_got_subdir1)
1109         d.addCallback(self.log, "done with _got_subdir1")
1110         d.addCallback(lambda res:
1111                       getPage(base + public + "/subdir1/mydata567"))
1112         def _got_data(page):
1113             self.failUnlessEqual(page, self.data)
1114         d.addCallback(_got_data)
1115
1116         # download from a URI embedded in a URL
1117         d.addCallback(self.log, "_get_from_uri")
1118         def _get_from_uri(res):
1119             return getPage(base + "uri/%s?filename=%s"
1120                            % (self.uri, "mydata567"))
1121         d.addCallback(_get_from_uri)
1122         def _got_from_uri(page):
1123             self.failUnlessEqual(page, self.data)
1124         d.addCallback(_got_from_uri)
1125
1126         # download from a URI embedded in a URL, second form
1127         d.addCallback(self.log, "_get_from_uri2")
1128         def _get_from_uri2(res):
1129             return getPage(base + "uri?uri=%s" % (self.uri,))
1130         d.addCallback(_get_from_uri2)
1131         d.addCallback(_got_from_uri)
1132
1133         # download from a bogus URI, make sure we get a reasonable error
1134         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1135         def _get_from_bogus_uri(res):
1136             d1 = getPage(base + "uri/%s?filename=%s"
1137                          % (self.mangle_uri(self.uri), "mydata567"))
1138             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1139                        "410")
1140             return d1
1141         d.addCallback(_get_from_bogus_uri)
1142         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1143
1144         # upload a file with PUT
1145         d.addCallback(self.log, "about to try PUT")
1146         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1147                                            "new.txt contents"))
1148         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1149         d.addCallback(self.failUnlessEqual, "new.txt contents")
1150         # and again with something large enough to use multiple segments,
1151         # and hopefully trigger pauseProducing too
1152         def _new_happy_semantics(ign):
1153             for c in self.clients:
1154                 # these get reset somewhere? Whatever.
1155                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1156         d.addCallback(_new_happy_semantics)
1157         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1158                                            "big" * 500000)) # 1.5MB
1159         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1160         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1161
1162         # can we replace files in place?
1163         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1164                                            "NEWER contents"))
1165         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1166         d.addCallback(self.failUnlessEqual, "NEWER contents")
1167
1168         # test unlinked POST
1169         d.addCallback(lambda res: self.POST("uri", t="upload",
1170                                             file=("new.txt", "data" * 10000)))
1171         # and again using the helper, which exercises different upload-status
1172         # display code
1173         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1174                                             file=("foo.txt", "data2" * 10000)))
1175
1176         # check that the status page exists
1177         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1178         def _got_status(res):
1179             # find an interesting upload and download to look at. LIT files
1180             # are not interesting.
1181             h = self.clients[0].get_history()
1182             for ds in h.list_all_download_statuses():
1183                 if ds.get_size() > 200:
1184                     self._down_status = ds.get_counter()
1185             for us in h.list_all_upload_statuses():
1186                 if us.get_size() > 200:
1187                     self._up_status = us.get_counter()
1188             rs = list(h.list_all_retrieve_statuses())[0]
1189             self._retrieve_status = rs.get_counter()
1190             ps = list(h.list_all_publish_statuses())[0]
1191             self._publish_status = ps.get_counter()
1192             us = list(h.list_all_mapupdate_statuses())[0]
1193             self._update_status = us.get_counter()
1194
1195             # and that there are some upload- and download- status pages
1196             return self.GET("status/up-%d" % self._up_status)
1197         d.addCallback(_got_status)
1198         def _got_up(res):
1199             return self.GET("status/down-%d" % self._down_status)
1200         d.addCallback(_got_up)
1201         def _got_down(res):
1202             return self.GET("status/mapupdate-%d" % self._update_status)
1203         d.addCallback(_got_down)
1204         def _got_update(res):
1205             return self.GET("status/publish-%d" % self._publish_status)
1206         d.addCallback(_got_update)
1207         def _got_publish(res):
1208             return self.GET("status/retrieve-%d" % self._retrieve_status)
1209         d.addCallback(_got_publish)
1210
1211         # check that the helper status page exists
1212         d.addCallback(lambda res:
1213                       self.GET("helper_status", followRedirect=True))
1214         def _got_helper_status(res):
1215             self.failUnless("Bytes Fetched:" in res)
1216             # touch a couple of files in the helper's working directory to
1217             # exercise more code paths
1218             workdir = os.path.join(self.getdir("client0"), "helper")
1219             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1220             f = open(incfile, "wb")
1221             f.write("small file")
1222             f.close()
1223             then = time.time() - 86400*3
1224             now = time.time()
1225             os.utime(incfile, (now, then))
1226             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1227             f = open(encfile, "wb")
1228             f.write("less small file")
1229             f.close()
1230             os.utime(encfile, (now, then))
1231         d.addCallback(_got_helper_status)
1232         # and that the json form exists
1233         d.addCallback(lambda res:
1234                       self.GET("helper_status?t=json", followRedirect=True))
1235         def _got_helper_status_json(res):
1236             data = simplejson.loads(res)
1237             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1238                                  1)
1239             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1240             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1241             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1242                                  10)
1243             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1244             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1245             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1246                                  15)
1247         d.addCallback(_got_helper_status_json)
1248
1249         # and check that client[3] (which uses a helper but does not run one
1250         # itself) doesn't explode when you ask for its status
1251         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1252         def _got_non_helper_status(res):
1253             self.failUnless("Upload and Download Status" in res)
1254         d.addCallback(_got_non_helper_status)
1255
1256         # or for helper status with t=json
1257         d.addCallback(lambda res:
1258                       getPage(self.helper_webish_url + "helper_status?t=json"))
1259         def _got_non_helper_status_json(res):
1260             data = simplejson.loads(res)
1261             self.failUnlessEqual(data, {})
1262         d.addCallback(_got_non_helper_status_json)
1263
1264         # see if the statistics page exists
1265         d.addCallback(lambda res: self.GET("statistics"))
1266         def _got_stats(res):
1267             self.failUnless("Node Statistics" in res)
1268             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1269         d.addCallback(_got_stats)
1270         d.addCallback(lambda res: self.GET("statistics?t=json"))
1271         def _got_stats_json(res):
1272             data = simplejson.loads(res)
1273             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1274             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1275         d.addCallback(_got_stats_json)
1276
1277         # TODO: mangle the second segment of a file, to test errors that
1278         # occur after we've already sent some good data, which uses a
1279         # different error path.
1280
1281         # TODO: download a URI with a form
1282         # TODO: create a directory by using a form
1283         # TODO: upload by using a form on the directory page
1284         #    url = base + "somedir/subdir1/freeform_post!!upload"
1285         # TODO: delete a file by using a button on the directory page
1286
1287         return d
1288
1289     def _test_runner(self, res):
1290         # exercise some of the diagnostic tools in runner.py
1291
1292         # find a share
1293         for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1294             if "storage" not in dirpath:
1295                 continue
1296             if not filenames:
1297                 continue
1298             pieces = dirpath.split(os.sep)
1299             if (len(pieces) >= 4
1300                 and pieces[-4] == "storage"
1301                 and pieces[-3] == "shares"):
1302                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1303                 # are sharefiles here
1304                 filename = os.path.join(dirpath, filenames[0])
1305                 # peek at the magic to see if it is a chk share
1306                 magic = open(filename, "rb").read(4)
1307                 if magic == '\x00\x00\x00\x01':
1308                     break
1309         else:
1310             self.fail("unable to find any uri_extension files in %r"
1311                       % self.basedir)
1312         log.msg("test_system.SystemTest._test_runner using %r" % filename)
1313
1314         out,err = StringIO(), StringIO()
1315         rc = runner.runner(["debug", "dump-share", "--offsets",
1316                             unicode_to_argv(filename)],
1317                            stdout=out, stderr=err)
1318         output = out.getvalue()
1319         self.failUnlessEqual(rc, 0)
1320
1321         # we only upload a single file, so we can assert some things about
1322         # its size and shares.
1323         self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1324         self.failUnlessIn("size: %d\n" % len(self.data), output)
1325         self.failUnlessIn("num_segments: 1\n", output)
1326         # segment_size is always a multiple of needed_shares
1327         self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1328         self.failUnlessIn("total_shares: 10\n", output)
1329         # keys which are supposed to be present
1330         for key in ("size", "num_segments", "segment_size",
1331                     "needed_shares", "total_shares",
1332                     "codec_name", "codec_params", "tail_codec_params",
1333                     #"plaintext_hash", "plaintext_root_hash",
1334                     "crypttext_hash", "crypttext_root_hash",
1335                     "share_root_hash", "UEB_hash"):
1336             self.failUnlessIn("%s: " % key, output)
1337         self.failUnlessIn("  verify-cap: URI:CHK-Verifier:", output)
1338
1339         # now use its storage index to find the other shares using the
1340         # 'find-shares' tool
1341         sharedir, shnum = os.path.split(filename)
1342         storagedir, storage_index_s = os.path.split(sharedir)
1343         storage_index_s = str(storage_index_s)
1344         out,err = StringIO(), StringIO()
1345         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1346         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1347         rc = runner.runner(cmd, stdout=out, stderr=err)
1348         self.failUnlessEqual(rc, 0)
1349         out.seek(0)
1350         sharefiles = [sfn.strip() for sfn in out.readlines()]
1351         self.failUnlessEqual(len(sharefiles), 10)
1352
1353         # also exercise the 'catalog-shares' tool
1354         out,err = StringIO(), StringIO()
1355         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1356         cmd = ["debug", "catalog-shares"] + nodedirs
1357         rc = runner.runner(cmd, stdout=out, stderr=err)
1358         self.failUnlessEqual(rc, 0)
1359         out.seek(0)
1360         descriptions = [sfn.strip() for sfn in out.readlines()]
1361         self.failUnlessEqual(len(descriptions), 30)
1362         matching = [line
1363                     for line in descriptions
1364                     if line.startswith("CHK %s " % storage_index_s)]
1365         self.failUnlessEqual(len(matching), 10)
1366
1367     def _test_control(self, res):
1368         # exercise the remote-control-the-client foolscap interfaces in
1369         # allmydata.control (mostly used for performance tests)
1370         c0 = self.clients[0]
1371         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1372         control_furl = open(control_furl_file, "r").read().strip()
1373         # it doesn't really matter which Tub we use to connect to the client,
1374         # so let's just use our IntroducerNode's
1375         d = self.introducer.tub.getReference(control_furl)
1376         d.addCallback(self._test_control2, control_furl_file)
1377         return d
1378     def _test_control2(self, rref, filename):
1379         d = rref.callRemote("upload_from_file_to_uri",
1380                             filename.encode(get_filesystem_encoding()), convergence=None)
1381         downfile = os.path.join(self.basedir, "control.downfile").encode(get_filesystem_encoding())
1382         d.addCallback(lambda uri:
1383                       rref.callRemote("download_from_uri_to_file",
1384                                       uri, downfile))
1385         def _check(res):
1386             self.failUnlessEqual(res, downfile)
1387             data = open(downfile, "r").read()
1388             expected_data = open(filename, "r").read()
1389             self.failUnlessEqual(data, expected_data)
1390         d.addCallback(_check)
1391         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1392         if sys.platform == "linux2":
1393             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1394         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1395         return d
1396
1397     def _test_cli(self, res):
1398         # run various CLI commands (in a thread, since they use blocking
1399         # network calls)
1400
1401         private_uri = self._private_node.get_uri()
1402         client0_basedir = self.getdir("client0")
1403
1404         nodeargs = [
1405             "--node-directory", client0_basedir,
1406             ]
1407
1408         d = defer.succeed(None)
1409
1410         # for compatibility with earlier versions, private/root_dir.cap is
1411         # supposed to be treated as an alias named "tahoe:". Start by making
1412         # sure that works, before we add other aliases.
1413
1414         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1415         f = open(root_file, "w")
1416         f.write(private_uri)
1417         f.close()
1418
1419         def run(ignored, verb, *args, **kwargs):
1420             stdin = kwargs.get("stdin", "")
1421             newargs = [verb] + nodeargs + list(args)
1422             return self._run_cli(newargs, stdin=stdin)
1423
1424         def _check_ls((out,err), expected_children, unexpected_children=[]):
1425             self.failUnlessEqual(err, "")
1426             for s in expected_children:
1427                 self.failUnless(s in out, (s,out))
1428             for s in unexpected_children:
1429                 self.failIf(s in out, (s,out))
1430
1431         def _check_ls_root((out,err)):
1432             self.failUnless("personal" in out)
1433             self.failUnless("s2-ro" in out)
1434             self.failUnless("s2-rw" in out)
1435             self.failUnlessEqual(err, "")
1436
1437         # this should reference private_uri
1438         d.addCallback(run, "ls")
1439         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1440
1441         d.addCallback(run, "list-aliases")
1442         def _check_aliases_1((out,err)):
1443             self.failUnlessEqual(err, "")
1444             self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1445         d.addCallback(_check_aliases_1)
1446
1447         # now that that's out of the way, remove root_dir.cap and work with
1448         # new files
1449         d.addCallback(lambda res: os.unlink(root_file))
1450         d.addCallback(run, "list-aliases")
1451         def _check_aliases_2((out,err)):
1452             self.failUnlessEqual(err, "")
1453             self.failUnlessEqual(out, "")
1454         d.addCallback(_check_aliases_2)
1455
1456         d.addCallback(run, "mkdir")
1457         def _got_dir( (out,err) ):
1458             self.failUnless(uri.from_string_dirnode(out.strip()))
1459             return out.strip()
1460         d.addCallback(_got_dir)
1461         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1462
1463         d.addCallback(run, "list-aliases")
1464         def _check_aliases_3((out,err)):
1465             self.failUnlessEqual(err, "")
1466             self.failUnless("tahoe: " in out)
1467         d.addCallback(_check_aliases_3)
1468
1469         def _check_empty_dir((out,err)):
1470             self.failUnlessEqual(out, "")
1471             self.failUnlessEqual(err, "")
1472         d.addCallback(run, "ls")
1473         d.addCallback(_check_empty_dir)
1474
1475         def _check_missing_dir((out,err)):
1476             # TODO: check that rc==2
1477             self.failUnlessEqual(out, "")
1478             self.failUnlessEqual(err, "No such file or directory\n")
1479         d.addCallback(run, "ls", "bogus")
1480         d.addCallback(_check_missing_dir)
1481
1482         files = []
1483         datas = []
1484         for i in range(10):
1485             fn = os.path.join(self.basedir, "file%d" % i)
1486             files.append(fn)
1487             data = "data to be uploaded: file%d\n" % i
1488             datas.append(data)
1489             open(fn,"wb").write(data)
1490
1491         def _check_stdout_against((out,err), filenum=None, data=None):
1492             self.failUnlessEqual(err, "")
1493             if filenum is not None:
1494                 self.failUnlessEqual(out, datas[filenum])
1495             if data is not None:
1496                 self.failUnlessEqual(out, data)
1497
1498         # test all both forms of put: from a file, and from stdin
1499         #  tahoe put bar FOO
1500         d.addCallback(run, "put", files[0], "tahoe-file0")
1501         def _put_out((out,err)):
1502             self.failUnless("URI:LIT:" in out, out)
1503             self.failUnless("201 Created" in err, err)
1504             uri0 = out.strip()
1505             return run(None, "get", uri0)
1506         d.addCallback(_put_out)
1507         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1508
1509         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1510         #  tahoe put bar tahoe:FOO
1511         d.addCallback(run, "put", files[2], "tahoe:file2")
1512         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1513         def _check_put_mutable((out,err)):
1514             self._mutable_file3_uri = out.strip()
1515         d.addCallback(_check_put_mutable)
1516         d.addCallback(run, "get", "tahoe:file3")
1517         d.addCallback(_check_stdout_against, 3)
1518
1519         #  tahoe put FOO
1520         STDIN_DATA = "This is the file to upload from stdin."
1521         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1522         #  tahoe put tahoe:FOO
1523         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1524                       stdin="Other file from stdin.")
1525
1526         d.addCallback(run, "ls")
1527         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1528                                   "tahoe-file-stdin", "from-stdin"])
1529         d.addCallback(run, "ls", "subdir")
1530         d.addCallback(_check_ls, ["tahoe-file1"])
1531
1532         # tahoe mkdir FOO
1533         d.addCallback(run, "mkdir", "subdir2")
1534         d.addCallback(run, "ls")
1535         # TODO: extract the URI, set an alias with it
1536         d.addCallback(_check_ls, ["subdir2"])
1537
1538         # tahoe get: (to stdin and to a file)
1539         d.addCallback(run, "get", "tahoe-file0")
1540         d.addCallback(_check_stdout_against, 0)
1541         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1542         d.addCallback(_check_stdout_against, 1)
1543         outfile0 = os.path.join(self.basedir, "outfile0")
1544         d.addCallback(run, "get", "file2", outfile0)
1545         def _check_outfile0((out,err)):
1546             data = open(outfile0,"rb").read()
1547             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1548         d.addCallback(_check_outfile0)
1549         outfile1 = os.path.join(self.basedir, "outfile0")
1550         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1551         def _check_outfile1((out,err)):
1552             data = open(outfile1,"rb").read()
1553             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1554         d.addCallback(_check_outfile1)
1555
1556         d.addCallback(run, "rm", "tahoe-file0")
1557         d.addCallback(run, "rm", "tahoe:file2")
1558         d.addCallback(run, "ls")
1559         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1560
1561         d.addCallback(run, "ls", "-l")
1562         def _check_ls_l((out,err)):
1563             lines = out.split("\n")
1564             for l in lines:
1565                 if "tahoe-file-stdin" in l:
1566                     self.failUnless(l.startswith("-r-- "), l)
1567                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1568                 if "file3" in l:
1569                     self.failUnless(l.startswith("-rw- "), l) # mutable
1570         d.addCallback(_check_ls_l)
1571
1572         d.addCallback(run, "ls", "--uri")
1573         def _check_ls_uri((out,err)):
1574             lines = out.split("\n")
1575             for l in lines:
1576                 if "file3" in l:
1577                     self.failUnless(self._mutable_file3_uri in l)
1578         d.addCallback(_check_ls_uri)
1579
1580         d.addCallback(run, "ls", "--readonly-uri")
1581         def _check_ls_rouri((out,err)):
1582             lines = out.split("\n")
1583             for l in lines:
1584                 if "file3" in l:
1585                     rw_uri = self._mutable_file3_uri
1586                     u = uri.from_string_mutable_filenode(rw_uri)
1587                     ro_uri = u.get_readonly().to_string()
1588                     self.failUnless(ro_uri in l)
1589         d.addCallback(_check_ls_rouri)
1590
1591
1592         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1593         d.addCallback(run, "ls")
1594         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1595
1596         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1597         d.addCallback(run, "ls")
1598         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1599
1600         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1601         d.addCallback(run, "ls")
1602         d.addCallback(_check_ls, ["file3", "file3-copy"])
1603         d.addCallback(run, "get", "tahoe:file3-copy")
1604         d.addCallback(_check_stdout_against, 3)
1605
1606         # copy from disk into tahoe
1607         d.addCallback(run, "cp", files[4], "tahoe:file4")
1608         d.addCallback(run, "ls")
1609         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1610         d.addCallback(run, "get", "tahoe:file4")
1611         d.addCallback(_check_stdout_against, 4)
1612
1613         # copy from tahoe into disk
1614         target_filename = os.path.join(self.basedir, "file-out")
1615         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1616         def _check_cp_out((out,err)):
1617             self.failUnless(os.path.exists(target_filename))
1618             got = open(target_filename,"rb").read()
1619             self.failUnlessEqual(got, datas[4])
1620         d.addCallback(_check_cp_out)
1621
1622         # copy from disk to disk (silly case)
1623         target2_filename = os.path.join(self.basedir, "file-out-copy")
1624         d.addCallback(run, "cp", target_filename, target2_filename)
1625         def _check_cp_out2((out,err)):
1626             self.failUnless(os.path.exists(target2_filename))
1627             got = open(target2_filename,"rb").read()
1628             self.failUnlessEqual(got, datas[4])
1629         d.addCallback(_check_cp_out2)
1630
1631         # copy from tahoe into disk, overwriting an existing file
1632         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1633         def _check_cp_out3((out,err)):
1634             self.failUnless(os.path.exists(target_filename))
1635             got = open(target_filename,"rb").read()
1636             self.failUnlessEqual(got, datas[3])
1637         d.addCallback(_check_cp_out3)
1638
1639         # copy from disk into tahoe, overwriting an existing immutable file
1640         d.addCallback(run, "cp", files[5], "tahoe:file4")
1641         d.addCallback(run, "ls")
1642         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1643         d.addCallback(run, "get", "tahoe:file4")
1644         d.addCallback(_check_stdout_against, 5)
1645
1646         # copy from disk into tahoe, overwriting an existing mutable file
1647         d.addCallback(run, "cp", files[5], "tahoe:file3")
1648         d.addCallback(run, "ls")
1649         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1650         d.addCallback(run, "get", "tahoe:file3")
1651         d.addCallback(_check_stdout_against, 5)
1652
1653         # recursive copy: setup
1654         dn = os.path.join(self.basedir, "dir1")
1655         os.makedirs(dn)
1656         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1657         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1658         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1659         sdn2 = os.path.join(dn, "subdir2")
1660         os.makedirs(sdn2)
1661         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1662         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1663
1664         # from disk into tahoe
1665         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1666         d.addCallback(run, "ls")
1667         d.addCallback(_check_ls, ["dir1"])
1668         d.addCallback(run, "ls", "dir1")
1669         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1670                       ["rfile4", "rfile5"])
1671         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1672         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1673                       ["rfile1", "rfile2", "rfile3"])
1674         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1675         d.addCallback(_check_stdout_against, data="rfile4")
1676
1677         # and back out again
1678         dn_copy = os.path.join(self.basedir, "dir1-copy")
1679         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1680         def _check_cp_r_out((out,err)):
1681             def _cmp(name):
1682                 old = open(os.path.join(dn, name), "rb").read()
1683                 newfn = os.path.join(dn_copy, name)
1684                 self.failUnless(os.path.exists(newfn))
1685                 new = open(newfn, "rb").read()
1686                 self.failUnlessEqual(old, new)
1687             _cmp("rfile1")
1688             _cmp("rfile2")
1689             _cmp("rfile3")
1690             _cmp(os.path.join("subdir2", "rfile4"))
1691             _cmp(os.path.join("subdir2", "rfile5"))
1692         d.addCallback(_check_cp_r_out)
1693
1694         # and copy it a second time, which ought to overwrite the same files
1695         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1696
1697         # and again, only writing filecaps
1698         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1699         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1700         def _check_capsonly((out,err)):
1701             # these should all be LITs
1702             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1703             y = uri.from_string_filenode(x)
1704             self.failUnlessEqual(y.data, "rfile4")
1705         d.addCallback(_check_capsonly)
1706
1707         # and tahoe-to-tahoe
1708         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1709         d.addCallback(run, "ls")
1710         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1711         d.addCallback(run, "ls", "dir1-copy")
1712         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1713                       ["rfile4", "rfile5"])
1714         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1715         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1716                       ["rfile1", "rfile2", "rfile3"])
1717         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1718         d.addCallback(_check_stdout_against, data="rfile4")
1719
1720         # and copy it a second time, which ought to overwrite the same files
1721         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1722
1723         # tahoe_ls doesn't currently handle the error correctly: it tries to
1724         # JSON-parse a traceback.
1725 ##         def _ls_missing(res):
1726 ##             argv = ["ls"] + nodeargs + ["bogus"]
1727 ##             return self._run_cli(argv)
1728 ##         d.addCallback(_ls_missing)
1729 ##         def _check_ls_missing((out,err)):
1730 ##             print "OUT", out
1731 ##             print "ERR", err
1732 ##             self.failUnlessEqual(err, "")
1733 ##         d.addCallback(_check_ls_missing)
1734
1735         return d
1736
1737     def _run_cli(self, argv, stdin=""):
1738         #print "CLI:", argv
1739         stdout, stderr = StringIO(), StringIO()
1740         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1741                                   stdin=StringIO(stdin),
1742                                   stdout=stdout, stderr=stderr)
1743         def _done(res):
1744             return stdout.getvalue(), stderr.getvalue()
1745         d.addCallback(_done)
1746         return d
1747
1748     def _test_checker(self, res):
1749         ut = upload.Data("too big to be literal" * 200, convergence=None)
1750         d = self._personal_node.add_file(u"big file", ut)
1751
1752         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1753         def _check_dirnode_results(r):
1754             self.failUnless(r.is_healthy())
1755         d.addCallback(_check_dirnode_results)
1756         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1757         d.addCallback(_check_dirnode_results)
1758
1759         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1760         def _got_chk_filenode(n):
1761             self.failUnless(isinstance(n, ImmutableFileNode))
1762             d = n.check(Monitor())
1763             def _check_filenode_results(r):
1764                 self.failUnless(r.is_healthy())
1765             d.addCallback(_check_filenode_results)
1766             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1767             d.addCallback(_check_filenode_results)
1768             return d
1769         d.addCallback(_got_chk_filenode)
1770
1771         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1772         def _got_lit_filenode(n):
1773             self.failUnless(isinstance(n, LiteralFileNode))
1774             d = n.check(Monitor())
1775             def _check_lit_filenode_results(r):
1776                 self.failUnlessEqual(r, None)
1777             d.addCallback(_check_lit_filenode_results)
1778             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1779             d.addCallback(_check_lit_filenode_results)
1780             return d
1781         d.addCallback(_got_lit_filenode)
1782         return d
1783