]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
f2770bba31a892338ea1bb799b4d929a2917f460
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 from twisted.internet import utils
8
9 import allmydata
10 from allmydata import uri
11 from allmydata.storage.mutable import MutableShareFile
12 from allmydata.storage.server import si_a2b
13 from allmydata.immutable import offloaded, upload
14 from allmydata.immutable.literal import LiteralFileNode
15 from allmydata.immutable.filenode import ImmutableFileNode
16 from allmydata.util import idlib, mathutil
17 from allmydata.util import log, base32
18 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding
19 from allmydata.util.fileutil import abspath_expanduser_unicode
20 from allmydata.util.consumer import MemoryConsumer, download_to_data
21 from allmydata.scripts import runner
22 from allmydata.interfaces import IDirectoryNode, IFileNode, \
23      NoSuchChildError, NoSharesError
24 from allmydata.monitor import Monitor
25 from allmydata.mutable.common import NotWriteableError
26 from allmydata.mutable import layout as mutable_layout
27 from foolscap.api import DeadReferenceError
28 from twisted.python.failure import Failure
29 from twisted.web.client import getPage
30 from twisted.web.error import Error
31
32 from allmydata.test.common import SystemTestMixin
33
34 # TODO: move these to common or common_util
35 from allmydata.test.test_runner import bintahoe, SkipMixin
36
37 LARGE_DATA = """
38 This is some data to publish to the remote grid.., which needs to be large
39 enough to not fit inside a LIT uri.
40 """
41
42 class CountingDataUploadable(upload.Data):
43     bytes_read = 0
44     interrupt_after = None
45     interrupt_after_d = None
46
47     def read(self, length):
48         self.bytes_read += length
49         if self.interrupt_after is not None:
50             if self.bytes_read > self.interrupt_after:
51                 self.interrupt_after = None
52                 self.interrupt_after_d.callback(self)
53         return upload.Data.read(self, length)
54
55 class SystemTest(SystemTestMixin, SkipMixin, unittest.TestCase):
56     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
57
58     def test_connections(self):
59         self.basedir = "system/SystemTest/test_connections"
60         d = self.set_up_nodes()
61         self.extra_node = None
62         d.addCallback(lambda res: self.add_extra_node(self.numclients))
63         def _check(extra_node):
64             self.extra_node = extra_node
65             for c in self.clients:
66                 all_peerids = c.get_storage_broker().get_all_serverids()
67                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
68                 sb = c.storage_broker
69                 permuted_peers = sb.get_servers_for_index("a")
70                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
71
72         d.addCallback(_check)
73         def _shutdown_extra_node(res):
74             if self.extra_node:
75                 return self.extra_node.stopService()
76             return res
77         d.addBoth(_shutdown_extra_node)
78         return d
79     # test_connections is subsumed by test_upload_and_download, and takes
80     # quite a while to run on a slow machine (because of all the TLS
81     # connections that must be established). If we ever rework the introducer
82     # code to such an extent that we're not sure if it works anymore, we can
83     # reinstate this test until it does.
84     del test_connections
85
86     def test_upload_and_download_random_key(self):
87         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
88         return self._test_upload_and_download(convergence=None)
89
90     def test_upload_and_download_convergent(self):
91         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
92         return self._test_upload_and_download(convergence="some convergence string")
93
94     def _test_upload_and_download(self, convergence):
95         # we use 4000 bytes of data, which will result in about 400k written
96         # to disk among all our simulated nodes
97         DATA = "Some data to upload\n" * 200
98         d = self.set_up_nodes()
99         def _check_connections(res):
100             for c in self.clients:
101                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
102                 all_peerids = c.get_storage_broker().get_all_serverids()
103                 self.failUnlessEqual(len(all_peerids), self.numclients)
104                 sb = c.storage_broker
105                 permuted_peers = sb.get_servers_for_index("a")
106                 self.failUnlessEqual(len(permuted_peers), self.numclients)
107         d.addCallback(_check_connections)
108
109         def _do_upload(res):
110             log.msg("UPLOADING")
111             u = self.clients[0].getServiceNamed("uploader")
112             self.uploader = u
113             # we crank the max segsize down to 1024b for the duration of this
114             # test, so we can exercise multiple segments. It is important
115             # that this is not a multiple of the segment size, so that the
116             # tail segment is not the same length as the others. This actualy
117             # gets rounded up to 1025 to be a multiple of the number of
118             # required shares (since we use 25 out of 100 FEC).
119             up = upload.Data(DATA, convergence=convergence)
120             up.max_segment_size = 1024
121             d1 = u.upload(up)
122             return d1
123         d.addCallback(_do_upload)
124         def _upload_done(results):
125             theuri = results.uri
126             log.msg("upload finished: uri is %s" % (theuri,))
127             self.uri = theuri
128             assert isinstance(self.uri, str), self.uri
129             self.cap = uri.from_string(self.uri)
130             self.n = self.clients[1].create_node_from_uri(self.uri)
131         d.addCallback(_upload_done)
132
133         def _upload_again(res):
134             # Upload again. If using convergent encryption then this ought to be
135             # short-circuited, however with the way we currently generate URIs
136             # (i.e. because they include the roothash), we have to do all of the
137             # encoding work, and only get to save on the upload part.
138             log.msg("UPLOADING AGAIN")
139             up = upload.Data(DATA, convergence=convergence)
140             up.max_segment_size = 1024
141             return self.uploader.upload(up)
142         d.addCallback(_upload_again)
143
144         def _download_to_data(res):
145             log.msg("DOWNLOADING")
146             return download_to_data(self.n)
147         d.addCallback(_download_to_data)
148         def _download_to_data_done(data):
149             log.msg("download finished")
150             self.failUnlessEqual(data, DATA)
151         d.addCallback(_download_to_data_done)
152
153         def _test_read(res):
154             n = self.clients[1].create_node_from_uri(self.uri)
155             d = download_to_data(n)
156             def _read_done(data):
157                 self.failUnlessEqual(data, DATA)
158             d.addCallback(_read_done)
159             d.addCallback(lambda ign:
160                           n.read(MemoryConsumer(), offset=1, size=4))
161             def _read_portion_done(mc):
162                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
163             d.addCallback(_read_portion_done)
164             d.addCallback(lambda ign:
165                           n.read(MemoryConsumer(), offset=2, size=None))
166             def _read_tail_done(mc):
167                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
168             d.addCallback(_read_tail_done)
169             d.addCallback(lambda ign:
170                           n.read(MemoryConsumer(), size=len(DATA)+1000))
171             def _read_too_much(mc):
172                 self.failUnlessEqual("".join(mc.chunks), DATA)
173             d.addCallback(_read_too_much)
174
175             return d
176         d.addCallback(_test_read)
177
178         def _test_bad_read(res):
179             bad_u = uri.from_string_filenode(self.uri)
180             bad_u.key = self.flip_bit(bad_u.key)
181             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
182             # this should cause an error during download
183
184             d = self.shouldFail2(NoSharesError, "'download bad node'",
185                                  None,
186                                  bad_n.read, MemoryConsumer(), offset=2)
187             return d
188         d.addCallback(_test_bad_read)
189
190         def _download_nonexistent_uri(res):
191             baduri = self.mangle_uri(self.uri)
192             badnode = self.clients[1].create_node_from_uri(baduri)
193             log.msg("about to download non-existent URI", level=log.UNUSUAL,
194                     facility="tahoe.tests")
195             d1 = download_to_data(badnode)
196             def _baduri_should_fail(res):
197                 log.msg("finished downloading non-existend URI",
198                         level=log.UNUSUAL, facility="tahoe.tests")
199                 self.failUnless(isinstance(res, Failure))
200                 self.failUnless(res.check(NoSharesError),
201                                 "expected NoSharesError, got %s" % res)
202             d1.addBoth(_baduri_should_fail)
203             return d1
204         d.addCallback(_download_nonexistent_uri)
205
206         # add a new node, which doesn't accept shares, and only uses the
207         # helper for upload.
208         d.addCallback(lambda res: self.add_extra_node(self.numclients,
209                                                       self.helper_furl,
210                                                       add_to_sparent=True))
211         def _added(extra_node):
212             self.extra_node = extra_node
213             self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
214         d.addCallback(_added)
215
216         HELPER_DATA = "Data that needs help to upload" * 1000
217         def _upload_with_helper(res):
218             u = upload.Data(HELPER_DATA, convergence=convergence)
219             d = self.extra_node.upload(u)
220             def _uploaded(results):
221                 n = self.clients[1].create_node_from_uri(results.uri)
222                 return download_to_data(n)
223             d.addCallback(_uploaded)
224             def _check(newdata):
225                 self.failUnlessEqual(newdata, HELPER_DATA)
226             d.addCallback(_check)
227             return d
228         d.addCallback(_upload_with_helper)
229
230         def _upload_duplicate_with_helper(res):
231             u = upload.Data(HELPER_DATA, convergence=convergence)
232             u.debug_stash_RemoteEncryptedUploadable = True
233             d = self.extra_node.upload(u)
234             def _uploaded(results):
235                 n = self.clients[1].create_node_from_uri(results.uri)
236                 return download_to_data(n)
237             d.addCallback(_uploaded)
238             def _check(newdata):
239                 self.failUnlessEqual(newdata, HELPER_DATA)
240                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
241                             "uploadable started uploading, should have been avoided")
242             d.addCallback(_check)
243             return d
244         if convergence is not None:
245             d.addCallback(_upload_duplicate_with_helper)
246
247         def _upload_resumable(res):
248             DATA = "Data that needs help to upload and gets interrupted" * 1000
249             u1 = CountingDataUploadable(DATA, convergence=convergence)
250             u2 = CountingDataUploadable(DATA, convergence=convergence)
251
252             # we interrupt the connection after about 5kB by shutting down
253             # the helper, then restartingit.
254             u1.interrupt_after = 5000
255             u1.interrupt_after_d = defer.Deferred()
256             u1.interrupt_after_d.addCallback(lambda res:
257                                              self.bounce_client(0))
258
259             # sneak into the helper and reduce its chunk size, so that our
260             # debug_interrupt will sever the connection on about the fifth
261             # chunk fetched. This makes sure that we've started to write the
262             # new shares before we abandon them, which exercises the
263             # abort/delete-partial-share code. TODO: find a cleaner way to do
264             # this. I know that this will affect later uses of the helper in
265             # this same test run, but I'm not currently worried about it.
266             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
267
268             d = self.extra_node.upload(u1)
269
270             def _should_not_finish(res):
271                 self.fail("interrupted upload should have failed, not finished"
272                           " with result %s" % (res,))
273             def _interrupted(f):
274                 f.trap(DeadReferenceError)
275
276                 # make sure we actually interrupted it before finishing the
277                 # file
278                 self.failUnless(u1.bytes_read < len(DATA),
279                                 "read %d out of %d total" % (u1.bytes_read,
280                                                              len(DATA)))
281
282                 log.msg("waiting for reconnect", level=log.NOISY,
283                         facility="tahoe.test.test_system")
284                 # now, we need to give the nodes a chance to notice that this
285                 # connection has gone away. When this happens, the storage
286                 # servers will be told to abort their uploads, removing the
287                 # partial shares. Unfortunately this involves TCP messages
288                 # going through the loopback interface, and we can't easily
289                 # predict how long that will take. If it were all local, we
290                 # could use fireEventually() to stall. Since we don't have
291                 # the right introduction hooks, the best we can do is use a
292                 # fixed delay. TODO: this is fragile.
293                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
294                 return u1.interrupt_after_d
295             d.addCallbacks(_should_not_finish, _interrupted)
296
297             def _disconnected(res):
298                 # check to make sure the storage servers aren't still hanging
299                 # on to the partial share: their incoming/ directories should
300                 # now be empty.
301                 log.msg("disconnected", level=log.NOISY,
302                         facility="tahoe.test.test_system")
303                 for i in range(self.numclients):
304                     incdir = os.path.join(self.getdir("client%d" % i),
305                                           "storage", "shares", "incoming")
306                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
307             d.addCallback(_disconnected)
308
309             # then we need to give the reconnector a chance to
310             # reestablish the connection to the helper.
311             d.addCallback(lambda res:
312                           log.msg("wait_for_connections", level=log.NOISY,
313                                   facility="tahoe.test.test_system"))
314             d.addCallback(lambda res: self.wait_for_connections())
315
316
317             d.addCallback(lambda res:
318                           log.msg("uploading again", level=log.NOISY,
319                                   facility="tahoe.test.test_system"))
320             d.addCallback(lambda res: self.extra_node.upload(u2))
321
322             def _uploaded(results):
323                 cap = results.uri
324                 log.msg("Second upload complete", level=log.NOISY,
325                         facility="tahoe.test.test_system")
326
327                 # this is really bytes received rather than sent, but it's
328                 # convenient and basically measures the same thing
329                 bytes_sent = results.ciphertext_fetched
330                 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
331
332                 # We currently don't support resumption of upload if the data is
333                 # encrypted with a random key.  (Because that would require us
334                 # to store the key locally and re-use it on the next upload of
335                 # this file, which isn't a bad thing to do, but we currently
336                 # don't do it.)
337                 if convergence is not None:
338                     # Make sure we did not have to read the whole file the
339                     # second time around .
340                     self.failUnless(bytes_sent < len(DATA),
341                                     "resumption didn't save us any work:"
342                                     " read %r bytes out of %r total" %
343                                     (bytes_sent, len(DATA)))
344                 else:
345                     # Make sure we did have to read the whole file the second
346                     # time around -- because the one that we partially uploaded
347                     # earlier was encrypted with a different random key.
348                     self.failIf(bytes_sent < len(DATA),
349                                 "resumption saved us some work even though we were using random keys:"
350                                 " read %r bytes out of %r total" %
351                                 (bytes_sent, len(DATA)))
352                 n = self.clients[1].create_node_from_uri(cap)
353                 return download_to_data(n)
354             d.addCallback(_uploaded)
355
356             def _check(newdata):
357                 self.failUnlessEqual(newdata, DATA)
358                 # If using convergent encryption, then also check that the
359                 # helper has removed the temp file from its directories.
360                 if convergence is not None:
361                     basedir = os.path.join(self.getdir("client0"), "helper")
362                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
363                     self.failUnlessEqual(files, [])
364                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
365                     self.failUnlessEqual(files, [])
366             d.addCallback(_check)
367             return d
368         d.addCallback(_upload_resumable)
369
370         def _grab_stats(ignored):
371             # the StatsProvider doesn't normally publish a FURL:
372             # instead it passes a live reference to the StatsGatherer
373             # (if and when it connects). To exercise the remote stats
374             # interface, we manually publish client0's StatsProvider
375             # and use client1 to query it.
376             sp = self.clients[0].stats_provider
377             sp_furl = self.clients[0].tub.registerReference(sp)
378             d = self.clients[1].tub.getReference(sp_furl)
379             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
380             def _got_stats(stats):
381                 #print "STATS"
382                 #from pprint import pprint
383                 #pprint(stats)
384                 s = stats["stats"]
385                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
386                 c = stats["counters"]
387                 self.failUnless("storage_server.allocate" in c)
388             d.addCallback(_got_stats)
389             return d
390         d.addCallback(_grab_stats)
391
392         return d
393
394     def _find_all_shares(self, basedir):
395         shares = []
396         for (dirpath, dirnames, filenames) in os.walk(basedir):
397             if "storage" not in dirpath:
398                 continue
399             if not filenames:
400                 continue
401             pieces = dirpath.split(os.sep)
402             if (len(pieces) >= 5
403                 and pieces[-4] == "storage"
404                 and pieces[-3] == "shares"):
405                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
406                 # are sharefiles here
407                 assert pieces[-5].startswith("client")
408                 client_num = int(pieces[-5][-1])
409                 storage_index_s = pieces[-1]
410                 storage_index = si_a2b(storage_index_s)
411                 for sharename in filenames:
412                     shnum = int(sharename)
413                     filename = os.path.join(dirpath, sharename)
414                     data = (client_num, storage_index, filename, shnum)
415                     shares.append(data)
416         if not shares:
417             self.fail("unable to find any share files in %s" % basedir)
418         return shares
419
420     def _corrupt_mutable_share(self, filename, which):
421         msf = MutableShareFile(filename)
422         datav = msf.readv([ (0, 1000000) ])
423         final_share = datav[0]
424         assert len(final_share) < 1000000 # ought to be truncated
425         pieces = mutable_layout.unpack_share(final_share)
426         (seqnum, root_hash, IV, k, N, segsize, datalen,
427          verification_key, signature, share_hash_chain, block_hash_tree,
428          share_data, enc_privkey) = pieces
429
430         if which == "seqnum":
431             seqnum = seqnum + 15
432         elif which == "R":
433             root_hash = self.flip_bit(root_hash)
434         elif which == "IV":
435             IV = self.flip_bit(IV)
436         elif which == "segsize":
437             segsize = segsize + 15
438         elif which == "pubkey":
439             verification_key = self.flip_bit(verification_key)
440         elif which == "signature":
441             signature = self.flip_bit(signature)
442         elif which == "share_hash_chain":
443             nodenum = share_hash_chain.keys()[0]
444             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
445         elif which == "block_hash_tree":
446             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
447         elif which == "share_data":
448             share_data = self.flip_bit(share_data)
449         elif which == "encprivkey":
450             enc_privkey = self.flip_bit(enc_privkey)
451
452         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
453                                             segsize, datalen)
454         final_share = mutable_layout.pack_share(prefix,
455                                                 verification_key,
456                                                 signature,
457                                                 share_hash_chain,
458                                                 block_hash_tree,
459                                                 share_data,
460                                                 enc_privkey)
461         msf.writev( [(0, final_share)], None)
462
463
464     def test_mutable(self):
465         self.basedir = "system/SystemTest/test_mutable"
466         DATA = "initial contents go here."  # 25 bytes % 3 != 0
467         NEWDATA = "new contents yay"
468         NEWERDATA = "this is getting old"
469
470         d = self.set_up_nodes(use_key_generator=True)
471
472         def _create_mutable(res):
473             c = self.clients[0]
474             log.msg("starting create_mutable_file")
475             d1 = c.create_mutable_file(DATA)
476             def _done(res):
477                 log.msg("DONE: %s" % (res,))
478                 self._mutable_node_1 = res
479             d1.addCallback(_done)
480             return d1
481         d.addCallback(_create_mutable)
482
483         def _test_debug(res):
484             # find a share. It is important to run this while there is only
485             # one slot in the grid.
486             shares = self._find_all_shares(self.basedir)
487             (client_num, storage_index, filename, shnum) = shares[0]
488             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
489                     % filename)
490             log.msg(" for clients[%d]" % client_num)
491
492             out,err = StringIO(), StringIO()
493             rc = runner.runner(["debug", "dump-share", "--offsets",
494                                 filename],
495                                stdout=out, stderr=err)
496             output = out.getvalue()
497             self.failUnlessEqual(rc, 0)
498             try:
499                 self.failUnless("Mutable slot found:\n" in output)
500                 self.failUnless("share_type: SDMF\n" in output)
501                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
502                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
503                 self.failUnless(" num_extra_leases: 0\n" in output)
504                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
505                                 in output)
506                 self.failUnless(" SDMF contents:\n" in output)
507                 self.failUnless("  seqnum: 1\n" in output)
508                 self.failUnless("  required_shares: 3\n" in output)
509                 self.failUnless("  total_shares: 10\n" in output)
510                 self.failUnless("  segsize: 27\n" in output, (output, filename))
511                 self.failUnless("  datalen: 25\n" in output)
512                 # the exact share_hash_chain nodes depends upon the sharenum,
513                 # and is more of a hassle to compute than I want to deal with
514                 # now
515                 self.failUnless("  share_hash_chain: " in output)
516                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
517                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
518                             base32.b2a(storage_index))
519                 self.failUnless(expected in output)
520             except unittest.FailTest:
521                 print
522                 print "dump-share output was:"
523                 print output
524                 raise
525         d.addCallback(_test_debug)
526
527         # test retrieval
528
529         # first, let's see if we can use the existing node to retrieve the
530         # contents. This allows it to use the cached pubkey and maybe the
531         # latest-known sharemap.
532
533         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
534         def _check_download_1(res):
535             self.failUnlessEqual(res, DATA)
536             # now we see if we can retrieve the data from a new node,
537             # constructed using the URI of the original one. We do this test
538             # on the same client that uploaded the data.
539             uri = self._mutable_node_1.get_uri()
540             log.msg("starting retrieve1")
541             newnode = self.clients[0].create_node_from_uri(uri)
542             newnode_2 = self.clients[0].create_node_from_uri(uri)
543             self.failUnlessIdentical(newnode, newnode_2)
544             return newnode.download_best_version()
545         d.addCallback(_check_download_1)
546
547         def _check_download_2(res):
548             self.failUnlessEqual(res, DATA)
549             # same thing, but with a different client
550             uri = self._mutable_node_1.get_uri()
551             newnode = self.clients[1].create_node_from_uri(uri)
552             log.msg("starting retrieve2")
553             d1 = newnode.download_best_version()
554             d1.addCallback(lambda res: (res, newnode))
555             return d1
556         d.addCallback(_check_download_2)
557
558         def _check_download_3((res, newnode)):
559             self.failUnlessEqual(res, DATA)
560             # replace the data
561             log.msg("starting replace1")
562             d1 = newnode.overwrite(NEWDATA)
563             d1.addCallback(lambda res: newnode.download_best_version())
564             return d1
565         d.addCallback(_check_download_3)
566
567         def _check_download_4(res):
568             self.failUnlessEqual(res, NEWDATA)
569             # now create an even newer node and replace the data on it. This
570             # new node has never been used for download before.
571             uri = self._mutable_node_1.get_uri()
572             newnode1 = self.clients[2].create_node_from_uri(uri)
573             newnode2 = self.clients[3].create_node_from_uri(uri)
574             self._newnode3 = self.clients[3].create_node_from_uri(uri)
575             log.msg("starting replace2")
576             d1 = newnode1.overwrite(NEWERDATA)
577             d1.addCallback(lambda res: newnode2.download_best_version())
578             return d1
579         d.addCallback(_check_download_4)
580
581         def _check_download_5(res):
582             log.msg("finished replace2")
583             self.failUnlessEqual(res, NEWERDATA)
584         d.addCallback(_check_download_5)
585
586         def _corrupt_shares(res):
587             # run around and flip bits in all but k of the shares, to test
588             # the hash checks
589             shares = self._find_all_shares(self.basedir)
590             ## sort by share number
591             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
592             where = dict([ (shnum, filename)
593                            for (client_num, storage_index, filename, shnum)
594                            in shares ])
595             assert len(where) == 10 # this test is designed for 3-of-10
596             for shnum, filename in where.items():
597                 # shares 7,8,9 are left alone. read will check
598                 # (share_hash_chain, block_hash_tree, share_data). New
599                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
600                 # segsize, signature).
601                 if shnum == 0:
602                     # read: this will trigger "pubkey doesn't match
603                     # fingerprint".
604                     self._corrupt_mutable_share(filename, "pubkey")
605                     self._corrupt_mutable_share(filename, "encprivkey")
606                 elif shnum == 1:
607                     # triggers "signature is invalid"
608                     self._corrupt_mutable_share(filename, "seqnum")
609                 elif shnum == 2:
610                     # triggers "signature is invalid"
611                     self._corrupt_mutable_share(filename, "R")
612                 elif shnum == 3:
613                     # triggers "signature is invalid"
614                     self._corrupt_mutable_share(filename, "segsize")
615                 elif shnum == 4:
616                     self._corrupt_mutable_share(filename, "share_hash_chain")
617                 elif shnum == 5:
618                     self._corrupt_mutable_share(filename, "block_hash_tree")
619                 elif shnum == 6:
620                     self._corrupt_mutable_share(filename, "share_data")
621                 # other things to correct: IV, signature
622                 # 7,8,9 are left alone
623
624                 # note that initial_query_count=5 means that we'll hit the
625                 # first 5 servers in effectively random order (based upon
626                 # response time), so we won't necessarily ever get a "pubkey
627                 # doesn't match fingerprint" error (if we hit shnum>=1 before
628                 # shnum=0, we pull the pubkey from there). To get repeatable
629                 # specific failures, we need to set initial_query_count=1,
630                 # but of course that will change the sequencing behavior of
631                 # the retrieval process. TODO: find a reasonable way to make
632                 # this a parameter, probably when we expand this test to test
633                 # for one failure mode at a time.
634
635                 # when we retrieve this, we should get three signature
636                 # failures (where we've mangled seqnum, R, and segsize). The
637                 # pubkey mangling
638         d.addCallback(_corrupt_shares)
639
640         d.addCallback(lambda res: self._newnode3.download_best_version())
641         d.addCallback(_check_download_5)
642
643         def _check_empty_file(res):
644             # make sure we can create empty files, this usually screws up the
645             # segsize math
646             d1 = self.clients[2].create_mutable_file("")
647             d1.addCallback(lambda newnode: newnode.download_best_version())
648             d1.addCallback(lambda res: self.failUnlessEqual("", res))
649             return d1
650         d.addCallback(_check_empty_file)
651
652         d.addCallback(lambda res: self.clients[0].create_dirnode())
653         def _created_dirnode(dnode):
654             log.msg("_created_dirnode(%s)" % (dnode,))
655             d1 = dnode.list()
656             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
657             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
658             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
659             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
660             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
661             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
662             d1.addCallback(lambda res: dnode.build_manifest().when_done())
663             d1.addCallback(lambda res:
664                            self.failUnlessEqual(len(res["manifest"]), 1))
665             return d1
666         d.addCallback(_created_dirnode)
667
668         def wait_for_c3_kg_conn():
669             return self.clients[3]._key_generator is not None
670         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
671
672         def check_kg_poolsize(junk, size_delta):
673             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
674                                  self.key_generator_svc.key_generator.pool_size + size_delta)
675
676         d.addCallback(check_kg_poolsize, 0)
677         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
678         d.addCallback(check_kg_poolsize, -1)
679         d.addCallback(lambda junk: self.clients[3].create_dirnode())
680         d.addCallback(check_kg_poolsize, -2)
681         # use_helper induces use of clients[3], which is the using-key_gen client
682         d.addCallback(lambda junk:
683                       self.POST("uri?t=mkdir&name=george", use_helper=True))
684         d.addCallback(check_kg_poolsize, -3)
685
686         return d
687
688     def flip_bit(self, good):
689         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
690
691     def mangle_uri(self, gooduri):
692         # change the key, which changes the storage index, which means we'll
693         # be asking about the wrong file, so nobody will have any shares
694         u = uri.from_string(gooduri)
695         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
696                             uri_extension_hash=u.uri_extension_hash,
697                             needed_shares=u.needed_shares,
698                             total_shares=u.total_shares,
699                             size=u.size)
700         return u2.to_string()
701
702     # TODO: add a test which mangles the uri_extension_hash instead, and
703     # should fail due to not being able to get a valid uri_extension block.
704     # Also a test which sneakily mangles the uri_extension block to change
705     # some of the validation data, so it will fail in the post-download phase
706     # when the file's crypttext integrity check fails. Do the same thing for
707     # the key, which should cause the download to fail the post-download
708     # plaintext_hash check.
709
710     def test_filesystem(self):
711         self.basedir = "system/SystemTest/test_filesystem"
712         self.data = LARGE_DATA
713         d = self.set_up_nodes(use_stats_gatherer=True)
714         def _new_happy_semantics(ign):
715             for c in self.clients:
716                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
717         d.addCallback(_new_happy_semantics)
718         d.addCallback(self._test_introweb)
719         d.addCallback(self.log, "starting publish")
720         d.addCallback(self._do_publish1)
721         d.addCallback(self._test_runner)
722         d.addCallback(self._do_publish2)
723         # at this point, we have the following filesystem (where "R" denotes
724         # self._root_directory_uri):
725         # R
726         # R/subdir1
727         # R/subdir1/mydata567
728         # R/subdir1/subdir2/
729         # R/subdir1/subdir2/mydata992
730
731         d.addCallback(lambda res: self.bounce_client(0))
732         d.addCallback(self.log, "bounced client0")
733
734         d.addCallback(self._check_publish1)
735         d.addCallback(self.log, "did _check_publish1")
736         d.addCallback(self._check_publish2)
737         d.addCallback(self.log, "did _check_publish2")
738         d.addCallback(self._do_publish_private)
739         d.addCallback(self.log, "did _do_publish_private")
740         # now we also have (where "P" denotes a new dir):
741         #  P/personal/sekrit data
742         #  P/s2-rw -> /subdir1/subdir2/
743         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
744         d.addCallback(self._check_publish_private)
745         d.addCallback(self.log, "did _check_publish_private")
746         d.addCallback(self._test_web)
747         d.addCallback(self._test_control)
748         d.addCallback(self._test_cli)
749         # P now has four top-level children:
750         # P/personal/sekrit data
751         # P/s2-ro/
752         # P/s2-rw/
753         # P/test_put/  (empty)
754         d.addCallback(self._test_checker)
755         return d
756
757     def _test_introweb(self, res):
758         d = getPage(self.introweb_url, method="GET", followRedirect=True)
759         def _check(res):
760             try:
761                 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res)
762                 verstr = str(allmydata.__version__)
763
764                 # The Python "rational version numbering" convention
765                 # disallows "-r$REV" but allows ".post$REV"
766                 # instead. Eventually we'll probably move to
767                 # that. When we do, this test won't go red:
768                 ix = verstr.rfind('-r')
769                 if ix != -1:
770                     altverstr = verstr[:ix] + '.post' + verstr[ix+2:]
771                 else:
772                     ix = verstr.rfind('.post')
773                     if ix != -1:
774                         altverstr = verstr[:ix] + '-r' + verstr[ix+5:]
775                     else:
776                         altverstr = verstr
777
778                 appverstr = "%s: %s" % (allmydata.__appname__, verstr)
779                 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
780
781                 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
782                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
783                 self.failUnless("Subscription Summary: storage: 5" in res)
784             except unittest.FailTest:
785                 print
786                 print "GET %s output was:" % self.introweb_url
787                 print res
788                 raise
789         d.addCallback(_check)
790         d.addCallback(lambda res:
791                       getPage(self.introweb_url + "?t=json",
792                               method="GET", followRedirect=True))
793         def _check_json(res):
794             data = simplejson.loads(res)
795             try:
796                 self.failUnlessEqual(data["subscription_summary"],
797                                      {"storage": 5})
798                 self.failUnlessEqual(data["announcement_summary"],
799                                      {"storage": 5, "stub_client": 5})
800                 self.failUnlessEqual(data["announcement_distinct_hosts"],
801                                      {"storage": 1, "stub_client": 1})
802             except unittest.FailTest:
803                 print
804                 print "GET %s?t=json output was:" % self.introweb_url
805                 print res
806                 raise
807         d.addCallback(_check_json)
808         return d
809
810     def _do_publish1(self, res):
811         ut = upload.Data(self.data, convergence=None)
812         c0 = self.clients[0]
813         d = c0.create_dirnode()
814         def _made_root(new_dirnode):
815             self._root_directory_uri = new_dirnode.get_uri()
816             return c0.create_node_from_uri(self._root_directory_uri)
817         d.addCallback(_made_root)
818         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
819         def _made_subdir1(subdir1_node):
820             self._subdir1_node = subdir1_node
821             d1 = subdir1_node.add_file(u"mydata567", ut)
822             d1.addCallback(self.log, "publish finished")
823             def _stash_uri(filenode):
824                 self.uri = filenode.get_uri()
825                 assert isinstance(self.uri, str), (self.uri, filenode)
826             d1.addCallback(_stash_uri)
827             return d1
828         d.addCallback(_made_subdir1)
829         return d
830
831     def _do_publish2(self, res):
832         ut = upload.Data(self.data, convergence=None)
833         d = self._subdir1_node.create_subdirectory(u"subdir2")
834         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
835         return d
836
837     def log(self, res, *args, **kwargs):
838         # print "MSG: %s  RES: %s" % (msg, args)
839         log.msg(*args, **kwargs)
840         return res
841
842     def _do_publish_private(self, res):
843         self.smalldata = "sssh, very secret stuff"
844         ut = upload.Data(self.smalldata, convergence=None)
845         d = self.clients[0].create_dirnode()
846         d.addCallback(self.log, "GOT private directory")
847         def _got_new_dir(privnode):
848             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
849             d1 = privnode.create_subdirectory(u"personal")
850             d1.addCallback(self.log, "made P/personal")
851             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
852             d1.addCallback(self.log, "made P/personal/sekrit data")
853             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
854             def _got_s2(s2node):
855                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
856                                       s2node.get_readonly_uri())
857                 d2.addCallback(lambda node:
858                                privnode.set_uri(u"s2-ro",
859                                                 s2node.get_readonly_uri(),
860                                                 s2node.get_readonly_uri()))
861                 return d2
862             d1.addCallback(_got_s2)
863             d1.addCallback(lambda res: privnode)
864             return d1
865         d.addCallback(_got_new_dir)
866         return d
867
868     def _check_publish1(self, res):
869         # this one uses the iterative API
870         c1 = self.clients[1]
871         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
872         d.addCallback(self.log, "check_publish1 got /")
873         d.addCallback(lambda root: root.get(u"subdir1"))
874         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
875         d.addCallback(lambda filenode: download_to_data(filenode))
876         d.addCallback(self.log, "get finished")
877         def _get_done(data):
878             self.failUnlessEqual(data, self.data)
879         d.addCallback(_get_done)
880         return d
881
882     def _check_publish2(self, res):
883         # this one uses the path-based API
884         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
885         d = rootnode.get_child_at_path(u"subdir1")
886         d.addCallback(lambda dirnode:
887                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
888         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
889         d.addCallback(lambda filenode: download_to_data(filenode))
890         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
891
892         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
893         def _got_filenode(filenode):
894             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
895             assert fnode == filenode
896         d.addCallback(_got_filenode)
897         return d
898
899     def _check_publish_private(self, resnode):
900         # this one uses the path-based API
901         self._private_node = resnode
902
903         d = self._private_node.get_child_at_path(u"personal")
904         def _got_personal(personal):
905             self._personal_node = personal
906             return personal
907         d.addCallback(_got_personal)
908
909         d.addCallback(lambda dirnode:
910                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
911         def get_path(path):
912             return self._private_node.get_child_at_path(path)
913
914         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
915         d.addCallback(lambda filenode: download_to_data(filenode))
916         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
917         d.addCallback(lambda res: get_path(u"s2-rw"))
918         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
919         d.addCallback(lambda res: get_path(u"s2-ro"))
920         def _got_s2ro(dirnode):
921             self.failUnless(dirnode.is_mutable(), dirnode)
922             self.failUnless(dirnode.is_readonly(), dirnode)
923             d1 = defer.succeed(None)
924             d1.addCallback(lambda res: dirnode.list())
925             d1.addCallback(self.log, "dirnode.list")
926
927             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
928
929             d1.addCallback(self.log, "doing add_file(ro)")
930             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
931             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
932
933             d1.addCallback(self.log, "doing get(ro)")
934             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
935             d1.addCallback(lambda filenode:
936                            self.failUnless(IFileNode.providedBy(filenode)))
937
938             d1.addCallback(self.log, "doing delete(ro)")
939             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
940
941             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
942
943             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
944
945             personal = self._personal_node
946             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
947
948             d1.addCallback(self.log, "doing move_child_to(ro)2")
949             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
950
951             d1.addCallback(self.log, "finished with _got_s2ro")
952             return d1
953         d.addCallback(_got_s2ro)
954         def _got_home(dummy):
955             home = self._private_node
956             personal = self._personal_node
957             d1 = defer.succeed(None)
958             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
959             d1.addCallback(lambda res:
960                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
961
962             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
963             d1.addCallback(lambda res:
964                            home.move_child_to(u"sekrit", home, u"sekrit data"))
965
966             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
967             d1.addCallback(lambda res:
968                            home.move_child_to(u"sekrit data", personal))
969
970             d1.addCallback(lambda res: home.build_manifest().when_done())
971             d1.addCallback(self.log, "manifest")
972             #  five items:
973             # P/
974             # P/personal/
975             # P/personal/sekrit data
976             # P/s2-rw  (same as P/s2-ro)
977             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
978             d1.addCallback(lambda res:
979                            self.failUnlessEqual(len(res["manifest"]), 5))
980             d1.addCallback(lambda res: home.start_deep_stats().when_done())
981             def _check_stats(stats):
982                 expected = {"count-immutable-files": 1,
983                             "count-mutable-files": 0,
984                             "count-literal-files": 1,
985                             "count-files": 2,
986                             "count-directories": 3,
987                             "size-immutable-files": 112,
988                             "size-literal-files": 23,
989                             #"size-directories": 616, # varies
990                             #"largest-directory": 616,
991                             "largest-directory-children": 3,
992                             "largest-immutable-file": 112,
993                             }
994                 for k,v in expected.iteritems():
995                     self.failUnlessEqual(stats[k], v,
996                                          "stats[%s] was %s, not %s" %
997                                          (k, stats[k], v))
998                 self.failUnless(stats["size-directories"] > 1300,
999                                 stats["size-directories"])
1000                 self.failUnless(stats["largest-directory"] > 800,
1001                                 stats["largest-directory"])
1002                 self.failUnlessEqual(stats["size-files-histogram"],
1003                                      [ (11, 31, 1), (101, 316, 1) ])
1004             d1.addCallback(_check_stats)
1005             return d1
1006         d.addCallback(_got_home)
1007         return d
1008
1009     def shouldFail(self, res, expected_failure, which, substring=None):
1010         if isinstance(res, Failure):
1011             res.trap(expected_failure)
1012             if substring:
1013                 self.failUnless(substring in str(res),
1014                                 "substring '%s' not in '%s'"
1015                                 % (substring, str(res)))
1016         else:
1017             self.fail("%s was supposed to raise %s, not get '%s'" %
1018                       (which, expected_failure, res))
1019
1020     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1021         assert substring is None or isinstance(substring, str)
1022         d = defer.maybeDeferred(callable, *args, **kwargs)
1023         def done(res):
1024             if isinstance(res, Failure):
1025                 res.trap(expected_failure)
1026                 if substring:
1027                     self.failUnless(substring in str(res),
1028                                     "substring '%s' not in '%s'"
1029                                     % (substring, str(res)))
1030             else:
1031                 self.fail("%s was supposed to raise %s, not get '%s'" %
1032                           (which, expected_failure, res))
1033         d.addBoth(done)
1034         return d
1035
1036     def PUT(self, urlpath, data):
1037         url = self.webish_url + urlpath
1038         return getPage(url, method="PUT", postdata=data)
1039
1040     def GET(self, urlpath, followRedirect=False):
1041         url = self.webish_url + urlpath
1042         return getPage(url, method="GET", followRedirect=followRedirect)
1043
1044     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1045         sepbase = "boogabooga"
1046         sep = "--" + sepbase
1047         form = []
1048         form.append(sep)
1049         form.append('Content-Disposition: form-data; name="_charset"')
1050         form.append('')
1051         form.append('UTF-8')
1052         form.append(sep)
1053         for name, value in fields.iteritems():
1054             if isinstance(value, tuple):
1055                 filename, value = value
1056                 form.append('Content-Disposition: form-data; name="%s"; '
1057                             'filename="%s"' % (name, filename.encode("utf-8")))
1058             else:
1059                 form.append('Content-Disposition: form-data; name="%s"' % name)
1060             form.append('')
1061             form.append(str(value))
1062             form.append(sep)
1063         form[-1] += "--"
1064         body = ""
1065         headers = {}
1066         if fields:
1067             body = "\r\n".join(form) + "\r\n"
1068             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1069         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1070
1071     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1072               use_helper=False):
1073         if use_helper:
1074             url = self.helper_webish_url + urlpath
1075         else:
1076             url = self.webish_url + urlpath
1077         return getPage(url, method="POST", postdata=body, headers=headers,
1078                        followRedirect=followRedirect)
1079
1080     def _test_web(self, res):
1081         base = self.webish_url
1082         public = "uri/" + self._root_directory_uri
1083         d = getPage(base)
1084         def _got_welcome(page):
1085             # XXX This test is oversensitive to formatting
1086             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1087             self.failUnless(expected in page,
1088                             "I didn't see the right 'connected storage servers'"
1089                             " message in: %s" % page
1090                             )
1091             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1092             self.failUnless(expected in page,
1093                             "I didn't see the right 'My nodeid' message "
1094                             "in: %s" % page)
1095             self.failUnless("Helper: 0 active uploads" in page)
1096         d.addCallback(_got_welcome)
1097         d.addCallback(self.log, "done with _got_welcome")
1098
1099         # get the welcome page from the node that uses the helper too
1100         d.addCallback(lambda res: getPage(self.helper_webish_url))
1101         def _got_welcome_helper(page):
1102             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1103                             page)
1104             self.failUnless("Not running helper" in page)
1105         d.addCallback(_got_welcome_helper)
1106
1107         d.addCallback(lambda res: getPage(base + public))
1108         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1109         def _got_subdir1(page):
1110             # there ought to be an href for our file
1111             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1112             self.failUnless(">mydata567</a>" in page)
1113         d.addCallback(_got_subdir1)
1114         d.addCallback(self.log, "done with _got_subdir1")
1115         d.addCallback(lambda res:
1116                       getPage(base + public + "/subdir1/mydata567"))
1117         def _got_data(page):
1118             self.failUnlessEqual(page, self.data)
1119         d.addCallback(_got_data)
1120
1121         # download from a URI embedded in a URL
1122         d.addCallback(self.log, "_get_from_uri")
1123         def _get_from_uri(res):
1124             return getPage(base + "uri/%s?filename=%s"
1125                            % (self.uri, "mydata567"))
1126         d.addCallback(_get_from_uri)
1127         def _got_from_uri(page):
1128             self.failUnlessEqual(page, self.data)
1129         d.addCallback(_got_from_uri)
1130
1131         # download from a URI embedded in a URL, second form
1132         d.addCallback(self.log, "_get_from_uri2")
1133         def _get_from_uri2(res):
1134             return getPage(base + "uri?uri=%s" % (self.uri,))
1135         d.addCallback(_get_from_uri2)
1136         d.addCallback(_got_from_uri)
1137
1138         # download from a bogus URI, make sure we get a reasonable error
1139         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1140         def _get_from_bogus_uri(res):
1141             d1 = getPage(base + "uri/%s?filename=%s"
1142                          % (self.mangle_uri(self.uri), "mydata567"))
1143             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1144                        "410")
1145             return d1
1146         d.addCallback(_get_from_bogus_uri)
1147         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1148
1149         # upload a file with PUT
1150         d.addCallback(self.log, "about to try PUT")
1151         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1152                                            "new.txt contents"))
1153         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1154         d.addCallback(self.failUnlessEqual, "new.txt contents")
1155         # and again with something large enough to use multiple segments,
1156         # and hopefully trigger pauseProducing too
1157         def _new_happy_semantics(ign):
1158             for c in self.clients:
1159                 # these get reset somewhere? Whatever.
1160                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1161         d.addCallback(_new_happy_semantics)
1162         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1163                                            "big" * 500000)) # 1.5MB
1164         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1165         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1166
1167         # can we replace files in place?
1168         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1169                                            "NEWER contents"))
1170         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1171         d.addCallback(self.failUnlessEqual, "NEWER contents")
1172
1173         # test unlinked POST
1174         d.addCallback(lambda res: self.POST("uri", t="upload",
1175                                             file=("new.txt", "data" * 10000)))
1176         # and again using the helper, which exercises different upload-status
1177         # display code
1178         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1179                                             file=("foo.txt", "data2" * 10000)))
1180
1181         # check that the status page exists
1182         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1183         def _got_status(res):
1184             # find an interesting upload and download to look at. LIT files
1185             # are not interesting.
1186             h = self.clients[0].get_history()
1187             for ds in h.list_all_download_statuses():
1188                 if ds.get_size() > 200:
1189                     self._down_status = ds.get_counter()
1190             for us in h.list_all_upload_statuses():
1191                 if us.get_size() > 200:
1192                     self._up_status = us.get_counter()
1193             rs = list(h.list_all_retrieve_statuses())[0]
1194             self._retrieve_status = rs.get_counter()
1195             ps = list(h.list_all_publish_statuses())[0]
1196             self._publish_status = ps.get_counter()
1197             us = list(h.list_all_mapupdate_statuses())[0]
1198             self._update_status = us.get_counter()
1199
1200             # and that there are some upload- and download- status pages
1201             return self.GET("status/up-%d" % self._up_status)
1202         d.addCallback(_got_status)
1203         def _got_up(res):
1204             return self.GET("status/down-%d" % self._down_status)
1205         d.addCallback(_got_up)
1206         def _got_down(res):
1207             return self.GET("status/mapupdate-%d" % self._update_status)
1208         d.addCallback(_got_down)
1209         def _got_update(res):
1210             return self.GET("status/publish-%d" % self._publish_status)
1211         d.addCallback(_got_update)
1212         def _got_publish(res):
1213             return self.GET("status/retrieve-%d" % self._retrieve_status)
1214         d.addCallback(_got_publish)
1215
1216         # check that the helper status page exists
1217         d.addCallback(lambda res:
1218                       self.GET("helper_status", followRedirect=True))
1219         def _got_helper_status(res):
1220             self.failUnless("Bytes Fetched:" in res)
1221             # touch a couple of files in the helper's working directory to
1222             # exercise more code paths
1223             workdir = os.path.join(self.getdir("client0"), "helper")
1224             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1225             f = open(incfile, "wb")
1226             f.write("small file")
1227             f.close()
1228             then = time.time() - 86400*3
1229             now = time.time()
1230             os.utime(incfile, (now, then))
1231             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1232             f = open(encfile, "wb")
1233             f.write("less small file")
1234             f.close()
1235             os.utime(encfile, (now, then))
1236         d.addCallback(_got_helper_status)
1237         # and that the json form exists
1238         d.addCallback(lambda res:
1239                       self.GET("helper_status?t=json", followRedirect=True))
1240         def _got_helper_status_json(res):
1241             data = simplejson.loads(res)
1242             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1243                                  1)
1244             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1245             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1246             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1247                                  10)
1248             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1249             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1250             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1251                                  15)
1252         d.addCallback(_got_helper_status_json)
1253
1254         # and check that client[3] (which uses a helper but does not run one
1255         # itself) doesn't explode when you ask for its status
1256         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1257         def _got_non_helper_status(res):
1258             self.failUnless("Upload and Download Status" in res)
1259         d.addCallback(_got_non_helper_status)
1260
1261         # or for helper status with t=json
1262         d.addCallback(lambda res:
1263                       getPage(self.helper_webish_url + "helper_status?t=json"))
1264         def _got_non_helper_status_json(res):
1265             data = simplejson.loads(res)
1266             self.failUnlessEqual(data, {})
1267         d.addCallback(_got_non_helper_status_json)
1268
1269         # see if the statistics page exists
1270         d.addCallback(lambda res: self.GET("statistics"))
1271         def _got_stats(res):
1272             self.failUnless("Node Statistics" in res)
1273             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1274         d.addCallback(_got_stats)
1275         d.addCallback(lambda res: self.GET("statistics?t=json"))
1276         def _got_stats_json(res):
1277             data = simplejson.loads(res)
1278             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1279             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1280         d.addCallback(_got_stats_json)
1281
1282         # TODO: mangle the second segment of a file, to test errors that
1283         # occur after we've already sent some good data, which uses a
1284         # different error path.
1285
1286         # TODO: download a URI with a form
1287         # TODO: create a directory by using a form
1288         # TODO: upload by using a form on the directory page
1289         #    url = base + "somedir/subdir1/freeform_post!!upload"
1290         # TODO: delete a file by using a button on the directory page
1291
1292         return d
1293
1294     def _test_runner(self, res):
1295         # exercise some of the diagnostic tools in runner.py
1296
1297         # find a share
1298         for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1299             if "storage" not in dirpath:
1300                 continue
1301             if not filenames:
1302                 continue
1303             pieces = dirpath.split(os.sep)
1304             if (len(pieces) >= 4
1305                 and pieces[-4] == "storage"
1306                 and pieces[-3] == "shares"):
1307                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1308                 # are sharefiles here
1309                 filename = os.path.join(dirpath, filenames[0])
1310                 # peek at the magic to see if it is a chk share
1311                 magic = open(filename, "rb").read(4)
1312                 if magic == '\x00\x00\x00\x01':
1313                     break
1314         else:
1315             self.fail("unable to find any uri_extension files in %r"
1316                       % self.basedir)
1317         log.msg("test_system.SystemTest._test_runner using %r" % filename)
1318
1319         out,err = StringIO(), StringIO()
1320         rc = runner.runner(["debug", "dump-share", "--offsets",
1321                             unicode_to_argv(filename)],
1322                            stdout=out, stderr=err)
1323         output = out.getvalue()
1324         self.failUnlessEqual(rc, 0)
1325
1326         # we only upload a single file, so we can assert some things about
1327         # its size and shares.
1328         self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1329         self.failUnlessIn("size: %d\n" % len(self.data), output)
1330         self.failUnlessIn("num_segments: 1\n", output)
1331         # segment_size is always a multiple of needed_shares
1332         self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1333         self.failUnlessIn("total_shares: 10\n", output)
1334         # keys which are supposed to be present
1335         for key in ("size", "num_segments", "segment_size",
1336                     "needed_shares", "total_shares",
1337                     "codec_name", "codec_params", "tail_codec_params",
1338                     #"plaintext_hash", "plaintext_root_hash",
1339                     "crypttext_hash", "crypttext_root_hash",
1340                     "share_root_hash", "UEB_hash"):
1341             self.failUnlessIn("%s: " % key, output)
1342         self.failUnlessIn("  verify-cap: URI:CHK-Verifier:", output)
1343
1344         # now use its storage index to find the other shares using the
1345         # 'find-shares' tool
1346         sharedir, shnum = os.path.split(filename)
1347         storagedir, storage_index_s = os.path.split(sharedir)
1348         storage_index_s = str(storage_index_s)
1349         out,err = StringIO(), StringIO()
1350         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1351         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1352         rc = runner.runner(cmd, stdout=out, stderr=err)
1353         self.failUnlessEqual(rc, 0)
1354         out.seek(0)
1355         sharefiles = [sfn.strip() for sfn in out.readlines()]
1356         self.failUnlessEqual(len(sharefiles), 10)
1357
1358         # also exercise the 'catalog-shares' tool
1359         out,err = StringIO(), StringIO()
1360         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1361         cmd = ["debug", "catalog-shares"] + nodedirs
1362         rc = runner.runner(cmd, stdout=out, stderr=err)
1363         self.failUnlessEqual(rc, 0)
1364         out.seek(0)
1365         descriptions = [sfn.strip() for sfn in out.readlines()]
1366         self.failUnlessEqual(len(descriptions), 30)
1367         matching = [line
1368                     for line in descriptions
1369                     if line.startswith("CHK %s " % storage_index_s)]
1370         self.failUnlessEqual(len(matching), 10)
1371
1372     def _test_control(self, res):
1373         # exercise the remote-control-the-client foolscap interfaces in
1374         # allmydata.control (mostly used for performance tests)
1375         c0 = self.clients[0]
1376         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1377         control_furl = open(control_furl_file, "r").read().strip()
1378         # it doesn't really matter which Tub we use to connect to the client,
1379         # so let's just use our IntroducerNode's
1380         d = self.introducer.tub.getReference(control_furl)
1381         d.addCallback(self._test_control2, control_furl_file)
1382         return d
1383     def _test_control2(self, rref, filename):
1384         d = rref.callRemote("upload_from_file_to_uri",
1385                             filename.encode(get_filesystem_encoding()), convergence=None)
1386         downfile = os.path.join(self.basedir, "control.downfile").encode(get_filesystem_encoding())
1387         d.addCallback(lambda uri:
1388                       rref.callRemote("download_from_uri_to_file",
1389                                       uri, downfile))
1390         def _check(res):
1391             self.failUnlessEqual(res, downfile)
1392             data = open(downfile, "r").read()
1393             expected_data = open(filename, "r").read()
1394             self.failUnlessEqual(data, expected_data)
1395         d.addCallback(_check)
1396         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1397         if sys.platform == "linux2":
1398             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1399         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1400         return d
1401
1402     def _test_cli(self, res):
1403         # run various CLI commands (in a thread, since they use blocking
1404         # network calls)
1405
1406         private_uri = self._private_node.get_uri()
1407         client0_basedir = self.getdir("client0")
1408
1409         nodeargs = [
1410             "--node-directory", client0_basedir,
1411             ]
1412
1413         d = defer.succeed(None)
1414
1415         # for compatibility with earlier versions, private/root_dir.cap is
1416         # supposed to be treated as an alias named "tahoe:". Start by making
1417         # sure that works, before we add other aliases.
1418
1419         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1420         f = open(root_file, "w")
1421         f.write(private_uri)
1422         f.close()
1423
1424         def run(ignored, verb, *args, **kwargs):
1425             stdin = kwargs.get("stdin", "")
1426             newargs = [verb] + nodeargs + list(args)
1427             return self._run_cli(newargs, stdin=stdin)
1428
1429         def _check_ls((out,err), expected_children, unexpected_children=[]):
1430             self.failUnlessEqual(err, "")
1431             for s in expected_children:
1432                 self.failUnless(s in out, (s,out))
1433             for s in unexpected_children:
1434                 self.failIf(s in out, (s,out))
1435
1436         def _check_ls_root((out,err)):
1437             self.failUnless("personal" in out)
1438             self.failUnless("s2-ro" in out)
1439             self.failUnless("s2-rw" in out)
1440             self.failUnlessEqual(err, "")
1441
1442         # this should reference private_uri
1443         d.addCallback(run, "ls")
1444         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1445
1446         d.addCallback(run, "list-aliases")
1447         def _check_aliases_1((out,err)):
1448             self.failUnlessEqual(err, "")
1449             self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1450         d.addCallback(_check_aliases_1)
1451
1452         # now that that's out of the way, remove root_dir.cap and work with
1453         # new files
1454         d.addCallback(lambda res: os.unlink(root_file))
1455         d.addCallback(run, "list-aliases")
1456         def _check_aliases_2((out,err)):
1457             self.failUnlessEqual(err, "")
1458             self.failUnlessEqual(out, "")
1459         d.addCallback(_check_aliases_2)
1460
1461         d.addCallback(run, "mkdir")
1462         def _got_dir( (out,err) ):
1463             self.failUnless(uri.from_string_dirnode(out.strip()))
1464             return out.strip()
1465         d.addCallback(_got_dir)
1466         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1467
1468         d.addCallback(run, "list-aliases")
1469         def _check_aliases_3((out,err)):
1470             self.failUnlessEqual(err, "")
1471             self.failUnless("tahoe: " in out)
1472         d.addCallback(_check_aliases_3)
1473
1474         def _check_empty_dir((out,err)):
1475             self.failUnlessEqual(out, "")
1476             self.failUnlessEqual(err, "")
1477         d.addCallback(run, "ls")
1478         d.addCallback(_check_empty_dir)
1479
1480         def _check_missing_dir((out,err)):
1481             # TODO: check that rc==2
1482             self.failUnlessEqual(out, "")
1483             self.failUnlessEqual(err, "No such file or directory\n")
1484         d.addCallback(run, "ls", "bogus")
1485         d.addCallback(_check_missing_dir)
1486
1487         files = []
1488         datas = []
1489         for i in range(10):
1490             fn = os.path.join(self.basedir, "file%d" % i)
1491             files.append(fn)
1492             data = "data to be uploaded: file%d\n" % i
1493             datas.append(data)
1494             open(fn,"wb").write(data)
1495
1496         def _check_stdout_against((out,err), filenum=None, data=None):
1497             self.failUnlessEqual(err, "")
1498             if filenum is not None:
1499                 self.failUnlessEqual(out, datas[filenum])
1500             if data is not None:
1501                 self.failUnlessEqual(out, data)
1502
1503         # test all both forms of put: from a file, and from stdin
1504         #  tahoe put bar FOO
1505         d.addCallback(run, "put", files[0], "tahoe-file0")
1506         def _put_out((out,err)):
1507             self.failUnless("URI:LIT:" in out, out)
1508             self.failUnless("201 Created" in err, err)
1509             uri0 = out.strip()
1510             return run(None, "get", uri0)
1511         d.addCallback(_put_out)
1512         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1513
1514         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1515         #  tahoe put bar tahoe:FOO
1516         d.addCallback(run, "put", files[2], "tahoe:file2")
1517         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1518         def _check_put_mutable((out,err)):
1519             self._mutable_file3_uri = out.strip()
1520         d.addCallback(_check_put_mutable)
1521         d.addCallback(run, "get", "tahoe:file3")
1522         d.addCallback(_check_stdout_against, 3)
1523
1524         #  tahoe put FOO
1525         STDIN_DATA = "This is the file to upload from stdin."
1526         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1527         #  tahoe put tahoe:FOO
1528         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1529                       stdin="Other file from stdin.")
1530
1531         d.addCallback(run, "ls")
1532         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1533                                   "tahoe-file-stdin", "from-stdin"])
1534         d.addCallback(run, "ls", "subdir")
1535         d.addCallback(_check_ls, ["tahoe-file1"])
1536
1537         # tahoe mkdir FOO
1538         d.addCallback(run, "mkdir", "subdir2")
1539         d.addCallback(run, "ls")
1540         # TODO: extract the URI, set an alias with it
1541         d.addCallback(_check_ls, ["subdir2"])
1542
1543         # tahoe get: (to stdin and to a file)
1544         d.addCallback(run, "get", "tahoe-file0")
1545         d.addCallback(_check_stdout_against, 0)
1546         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1547         d.addCallback(_check_stdout_against, 1)
1548         outfile0 = os.path.join(self.basedir, "outfile0")
1549         d.addCallback(run, "get", "file2", outfile0)
1550         def _check_outfile0((out,err)):
1551             data = open(outfile0,"rb").read()
1552             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1553         d.addCallback(_check_outfile0)
1554         outfile1 = os.path.join(self.basedir, "outfile0")
1555         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1556         def _check_outfile1((out,err)):
1557             data = open(outfile1,"rb").read()
1558             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1559         d.addCallback(_check_outfile1)
1560
1561         d.addCallback(run, "rm", "tahoe-file0")
1562         d.addCallback(run, "rm", "tahoe:file2")
1563         d.addCallback(run, "ls")
1564         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1565
1566         d.addCallback(run, "ls", "-l")
1567         def _check_ls_l((out,err)):
1568             lines = out.split("\n")
1569             for l in lines:
1570                 if "tahoe-file-stdin" in l:
1571                     self.failUnless(l.startswith("-r-- "), l)
1572                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1573                 if "file3" in l:
1574                     self.failUnless(l.startswith("-rw- "), l) # mutable
1575         d.addCallback(_check_ls_l)
1576
1577         d.addCallback(run, "ls", "--uri")
1578         def _check_ls_uri((out,err)):
1579             lines = out.split("\n")
1580             for l in lines:
1581                 if "file3" in l:
1582                     self.failUnless(self._mutable_file3_uri in l)
1583         d.addCallback(_check_ls_uri)
1584
1585         d.addCallback(run, "ls", "--readonly-uri")
1586         def _check_ls_rouri((out,err)):
1587             lines = out.split("\n")
1588             for l in lines:
1589                 if "file3" in l:
1590                     rw_uri = self._mutable_file3_uri
1591                     u = uri.from_string_mutable_filenode(rw_uri)
1592                     ro_uri = u.get_readonly().to_string()
1593                     self.failUnless(ro_uri in l)
1594         d.addCallback(_check_ls_rouri)
1595
1596
1597         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved-first-time")
1598         d.addCallback(run, "ls")
1599         d.addCallback(_check_ls, ["tahoe-moved-first-time"], ["tahoe-file-stdin"])
1600
1601         def _mv_with_http_proxy(ign):
1602             env = os.environ
1603             env['http_proxy'] = env['HTTP_PROXY'] = "http://127.0.0.0:12345"  # invalid address
1604             return self._run_cli_in_subprocess(["mv"] + nodeargs + ["tahoe-moved-first-time", "tahoe-moved"], env=env)
1605         d.addCallback(_mv_with_http_proxy)
1606
1607         def _check_mv_with_http_proxy(res):
1608             out, err, rc_or_sig = res
1609             self.failUnlessEqual(rc_or_sig, 0, str(res))
1610         d.addCallback(_check_mv_with_http_proxy)
1611
1612         d.addCallback(run, "ls")
1613         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-moved-firsttime"])
1614
1615         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1616         d.addCallback(run, "ls")
1617         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1618
1619         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1620         d.addCallback(run, "ls")
1621         d.addCallback(_check_ls, ["file3", "file3-copy"])
1622         d.addCallback(run, "get", "tahoe:file3-copy")
1623         d.addCallback(_check_stdout_against, 3)
1624
1625         # copy from disk into tahoe
1626         d.addCallback(run, "cp", files[4], "tahoe:file4")
1627         d.addCallback(run, "ls")
1628         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1629         d.addCallback(run, "get", "tahoe:file4")
1630         d.addCallback(_check_stdout_against, 4)
1631
1632         # copy from tahoe into disk
1633         target_filename = os.path.join(self.basedir, "file-out")
1634         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1635         def _check_cp_out((out,err)):
1636             self.failUnless(os.path.exists(target_filename))
1637             got = open(target_filename,"rb").read()
1638             self.failUnlessEqual(got, datas[4])
1639         d.addCallback(_check_cp_out)
1640
1641         # copy from disk to disk (silly case)
1642         target2_filename = os.path.join(self.basedir, "file-out-copy")
1643         d.addCallback(run, "cp", target_filename, target2_filename)
1644         def _check_cp_out2((out,err)):
1645             self.failUnless(os.path.exists(target2_filename))
1646             got = open(target2_filename,"rb").read()
1647             self.failUnlessEqual(got, datas[4])
1648         d.addCallback(_check_cp_out2)
1649
1650         # copy from tahoe into disk, overwriting an existing file
1651         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1652         def _check_cp_out3((out,err)):
1653             self.failUnless(os.path.exists(target_filename))
1654             got = open(target_filename,"rb").read()
1655             self.failUnlessEqual(got, datas[3])
1656         d.addCallback(_check_cp_out3)
1657
1658         # copy from disk into tahoe, overwriting an existing immutable file
1659         d.addCallback(run, "cp", files[5], "tahoe:file4")
1660         d.addCallback(run, "ls")
1661         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1662         d.addCallback(run, "get", "tahoe:file4")
1663         d.addCallback(_check_stdout_against, 5)
1664
1665         # copy from disk into tahoe, overwriting an existing mutable file
1666         d.addCallback(run, "cp", files[5], "tahoe:file3")
1667         d.addCallback(run, "ls")
1668         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1669         d.addCallback(run, "get", "tahoe:file3")
1670         d.addCallback(_check_stdout_against, 5)
1671
1672         # recursive copy: setup
1673         dn = os.path.join(self.basedir, "dir1")
1674         os.makedirs(dn)
1675         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1676         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1677         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1678         sdn2 = os.path.join(dn, "subdir2")
1679         os.makedirs(sdn2)
1680         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1681         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1682
1683         # from disk into tahoe
1684         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1685         d.addCallback(run, "ls")
1686         d.addCallback(_check_ls, ["dir1"])
1687         d.addCallback(run, "ls", "dir1")
1688         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1689                       ["rfile4", "rfile5"])
1690         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1691         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1692                       ["rfile1", "rfile2", "rfile3"])
1693         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1694         d.addCallback(_check_stdout_against, data="rfile4")
1695
1696         # and back out again
1697         dn_copy = os.path.join(self.basedir, "dir1-copy")
1698         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1699         def _check_cp_r_out((out,err)):
1700             def _cmp(name):
1701                 old = open(os.path.join(dn, name), "rb").read()
1702                 newfn = os.path.join(dn_copy, name)
1703                 self.failUnless(os.path.exists(newfn))
1704                 new = open(newfn, "rb").read()
1705                 self.failUnlessEqual(old, new)
1706             _cmp("rfile1")
1707             _cmp("rfile2")
1708             _cmp("rfile3")
1709             _cmp(os.path.join("subdir2", "rfile4"))
1710             _cmp(os.path.join("subdir2", "rfile5"))
1711         d.addCallback(_check_cp_r_out)
1712
1713         # and copy it a second time, which ought to overwrite the same files
1714         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1715
1716         # and again, only writing filecaps
1717         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1718         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1719         def _check_capsonly((out,err)):
1720             # these should all be LITs
1721             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1722             y = uri.from_string_filenode(x)
1723             self.failUnlessEqual(y.data, "rfile4")
1724         d.addCallback(_check_capsonly)
1725
1726         # and tahoe-to-tahoe
1727         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1728         d.addCallback(run, "ls")
1729         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1730         d.addCallback(run, "ls", "dir1-copy")
1731         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1732                       ["rfile4", "rfile5"])
1733         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1734         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1735                       ["rfile1", "rfile2", "rfile3"])
1736         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1737         d.addCallback(_check_stdout_against, data="rfile4")
1738
1739         # and copy it a second time, which ought to overwrite the same files
1740         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1741
1742         # tahoe_ls doesn't currently handle the error correctly: it tries to
1743         # JSON-parse a traceback.
1744 ##         def _ls_missing(res):
1745 ##             argv = ["ls"] + nodeargs + ["bogus"]
1746 ##             return self._run_cli(argv)
1747 ##         d.addCallback(_ls_missing)
1748 ##         def _check_ls_missing((out,err)):
1749 ##             print "OUT", out
1750 ##             print "ERR", err
1751 ##             self.failUnlessEqual(err, "")
1752 ##         d.addCallback(_check_ls_missing)
1753
1754         return d
1755
1756     def _run_cli(self, argv, stdin=""):
1757         #print "CLI:", argv
1758         stdout, stderr = StringIO(), StringIO()
1759         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1760                                   stdin=StringIO(stdin),
1761                                   stdout=stdout, stderr=stderr)
1762         def _done(res):
1763             return stdout.getvalue(), stderr.getvalue()
1764         d.addCallback(_done)
1765         return d
1766
1767     def _run_cli_in_subprocess(self, argv, env=None):
1768         self.skip_if_cannot_run_bintahoe()
1769
1770         if env is None:
1771             env = os.environ
1772         d = utils.getProcessOutputAndValue(sys.executable, args=[bintahoe] + argv,
1773                                            env=env)
1774         return d
1775
1776     def _test_checker(self, res):
1777         ut = upload.Data("too big to be literal" * 200, convergence=None)
1778         d = self._personal_node.add_file(u"big file", ut)
1779
1780         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1781         def _check_dirnode_results(r):
1782             self.failUnless(r.is_healthy())
1783         d.addCallback(_check_dirnode_results)
1784         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1785         d.addCallback(_check_dirnode_results)
1786
1787         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1788         def _got_chk_filenode(n):
1789             self.failUnless(isinstance(n, ImmutableFileNode))
1790             d = n.check(Monitor())
1791             def _check_filenode_results(r):
1792                 self.failUnless(r.is_healthy())
1793             d.addCallback(_check_filenode_results)
1794             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1795             d.addCallback(_check_filenode_results)
1796             return d
1797         d.addCallback(_got_chk_filenode)
1798
1799         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1800         def _got_lit_filenode(n):
1801             self.failUnless(isinstance(n, LiteralFileNode))
1802             d = n.check(Monitor())
1803             def _check_lit_filenode_results(r):
1804                 self.failUnlessEqual(r, None)
1805             d.addCallback(_check_lit_filenode_results)
1806             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1807             d.addCallback(_check_lit_filenode_results)
1808             return d
1809         d.addCallback(_got_lit_filenode)
1810         return d
1811