]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
Make tests work with both Nevow 0.11 and 0.12
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1
2 import os, re, sys, time, simplejson
3 from cStringIO import StringIO
4
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8
9 import allmydata
10 from allmydata import uri
11 from allmydata.storage.mutable import MutableShareFile
12 from allmydata.storage.server import si_a2b
13 from allmydata.immutable import offloaded, upload
14 from allmydata.immutable.literal import LiteralFileNode
15 from allmydata.immutable.filenode import ImmutableFileNode
16 from allmydata.util import idlib, mathutil
17 from allmydata.util import log, base32
18 from allmydata.util.verlib import NormalizedVersion
19 from allmydata.util.encodingutil import quote_output, unicode_to_argv
20 from allmydata.util.fileutil import abspath_expanduser_unicode
21 from allmydata.util.consumer import MemoryConsumer, download_to_data
22 from allmydata.scripts import runner
23 from allmydata.interfaces import IDirectoryNode, IFileNode, \
24      NoSuchChildError, NoSharesError
25 from allmydata.monitor import Monitor
26 from allmydata.mutable.common import NotWriteableError
27 from allmydata.mutable import layout as mutable_layout
28 from allmydata.mutable.publish import MutableData
29
30 import foolscap
31 from foolscap.api import DeadReferenceError, fireEventually
32 from twisted.python.failure import Failure
33 from twisted.web.client import getPage
34 from twisted.web.error import Error
35
36 from allmydata.test.common import SystemTestMixin
37
38 # TODO: move this to common or common_util
39 from allmydata.test.test_runner import RunBinTahoeMixin
40
41 LARGE_DATA = """
42 This is some data to publish to the remote grid.., which needs to be large
43 enough to not fit inside a LIT uri.
44 """
45
46 class CountingDataUploadable(upload.Data):
47     bytes_read = 0
48     interrupt_after = None
49     interrupt_after_d = None
50
51     def read(self, length):
52         self.bytes_read += length
53         if self.interrupt_after is not None:
54             if self.bytes_read > self.interrupt_after:
55                 self.interrupt_after = None
56                 self.interrupt_after_d.callback(self)
57         return upload.Data.read(self, length)
58
59 class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
60     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
61
62     def test_connections(self):
63         self.basedir = "system/SystemTest/test_connections"
64         d = self.set_up_nodes()
65         self.extra_node = None
66         d.addCallback(lambda res: self.add_extra_node(self.numclients))
67         def _check(extra_node):
68             self.extra_node = extra_node
69             for c in self.clients:
70                 all_peerids = c.get_storage_broker().get_all_serverids()
71                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
72                 sb = c.storage_broker
73                 permuted_peers = sb.get_servers_for_psi("a")
74                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
75
76         d.addCallback(_check)
77         def _shutdown_extra_node(res):
78             if self.extra_node:
79                 return self.extra_node.stopService()
80             return res
81         d.addBoth(_shutdown_extra_node)
82         return d
83     # test_connections is subsumed by test_upload_and_download, and takes
84     # quite a while to run on a slow machine (because of all the TLS
85     # connections that must be established). If we ever rework the introducer
86     # code to such an extent that we're not sure if it works anymore, we can
87     # reinstate this test until it does.
88     del test_connections
89
90     def test_upload_and_download_random_key(self):
91         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
92         return self._test_upload_and_download(convergence=None)
93
94     def test_upload_and_download_convergent(self):
95         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
96         return self._test_upload_and_download(convergence="some convergence string")
97
98     def _test_upload_and_download(self, convergence):
99         # we use 4000 bytes of data, which will result in about 400k written
100         # to disk among all our simulated nodes
101         DATA = "Some data to upload\n" * 200
102         d = self.set_up_nodes()
103         def _check_connections(res):
104             for c in self.clients:
105                 c.encoding_params['happy'] = 5
106                 all_peerids = c.get_storage_broker().get_all_serverids()
107                 self.failUnlessEqual(len(all_peerids), self.numclients)
108                 sb = c.storage_broker
109                 permuted_peers = sb.get_servers_for_psi("a")
110                 self.failUnlessEqual(len(permuted_peers), self.numclients)
111         d.addCallback(_check_connections)
112
113         def _do_upload(res):
114             log.msg("UPLOADING")
115             u = self.clients[0].getServiceNamed("uploader")
116             self.uploader = u
117             # we crank the max segsize down to 1024b for the duration of this
118             # test, so we can exercise multiple segments. It is important
119             # that this is not a multiple of the segment size, so that the
120             # tail segment is not the same length as the others. This actualy
121             # gets rounded up to 1025 to be a multiple of the number of
122             # required shares (since we use 25 out of 100 FEC).
123             up = upload.Data(DATA, convergence=convergence)
124             up.max_segment_size = 1024
125             d1 = u.upload(up)
126             return d1
127         d.addCallback(_do_upload)
128         def _upload_done(results):
129             theuri = results.get_uri()
130             log.msg("upload finished: uri is %s" % (theuri,))
131             self.uri = theuri
132             assert isinstance(self.uri, str), self.uri
133             self.cap = uri.from_string(self.uri)
134             self.n = self.clients[1].create_node_from_uri(self.uri)
135         d.addCallback(_upload_done)
136
137         def _upload_again(res):
138             # Upload again. If using convergent encryption then this ought to be
139             # short-circuited, however with the way we currently generate URIs
140             # (i.e. because they include the roothash), we have to do all of the
141             # encoding work, and only get to save on the upload part.
142             log.msg("UPLOADING AGAIN")
143             up = upload.Data(DATA, convergence=convergence)
144             up.max_segment_size = 1024
145             return self.uploader.upload(up)
146         d.addCallback(_upload_again)
147
148         def _download_to_data(res):
149             log.msg("DOWNLOADING")
150             return download_to_data(self.n)
151         d.addCallback(_download_to_data)
152         def _download_to_data_done(data):
153             log.msg("download finished")
154             self.failUnlessEqual(data, DATA)
155         d.addCallback(_download_to_data_done)
156
157         def _test_read(res):
158             n = self.clients[1].create_node_from_uri(self.uri)
159             d = download_to_data(n)
160             def _read_done(data):
161                 self.failUnlessEqual(data, DATA)
162             d.addCallback(_read_done)
163             d.addCallback(lambda ign:
164                           n.read(MemoryConsumer(), offset=1, size=4))
165             def _read_portion_done(mc):
166                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
167             d.addCallback(_read_portion_done)
168             d.addCallback(lambda ign:
169                           n.read(MemoryConsumer(), offset=2, size=None))
170             def _read_tail_done(mc):
171                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
172             d.addCallback(_read_tail_done)
173             d.addCallback(lambda ign:
174                           n.read(MemoryConsumer(), size=len(DATA)+1000))
175             def _read_too_much(mc):
176                 self.failUnlessEqual("".join(mc.chunks), DATA)
177             d.addCallback(_read_too_much)
178
179             return d
180         d.addCallback(_test_read)
181
182         def _test_bad_read(res):
183             bad_u = uri.from_string_filenode(self.uri)
184             bad_u.key = self.flip_bit(bad_u.key)
185             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
186             # this should cause an error during download
187
188             d = self.shouldFail2(NoSharesError, "'download bad node'",
189                                  None,
190                                  bad_n.read, MemoryConsumer(), offset=2)
191             return d
192         d.addCallback(_test_bad_read)
193
194         def _download_nonexistent_uri(res):
195             baduri = self.mangle_uri(self.uri)
196             badnode = self.clients[1].create_node_from_uri(baduri)
197             log.msg("about to download non-existent URI", level=log.UNUSUAL,
198                     facility="tahoe.tests")
199             d1 = download_to_data(badnode)
200             def _baduri_should_fail(res):
201                 log.msg("finished downloading non-existent URI",
202                         level=log.UNUSUAL, facility="tahoe.tests")
203                 self.failUnless(isinstance(res, Failure))
204                 self.failUnless(res.check(NoSharesError),
205                                 "expected NoSharesError, got %s" % res)
206             d1.addBoth(_baduri_should_fail)
207             return d1
208         d.addCallback(_download_nonexistent_uri)
209
210         # add a new node, which doesn't accept shares, and only uses the
211         # helper for upload.
212         d.addCallback(lambda res: self.add_extra_node(self.numclients,
213                                                       self.helper_furl,
214                                                       add_to_sparent=True))
215         def _added(extra_node):
216             self.extra_node = extra_node
217             self.extra_node.encoding_params['happy'] = 5
218         d.addCallback(_added)
219
220         def _has_helper():
221             uploader = self.extra_node.getServiceNamed("uploader")
222             furl, connected = uploader.get_helper_info()
223             return connected
224         d.addCallback(lambda ign: self.poll(_has_helper))
225
226         HELPER_DATA = "Data that needs help to upload" * 1000
227         def _upload_with_helper(res):
228             u = upload.Data(HELPER_DATA, convergence=convergence)
229             d = self.extra_node.upload(u)
230             def _uploaded(results):
231                 n = self.clients[1].create_node_from_uri(results.get_uri())
232                 return download_to_data(n)
233             d.addCallback(_uploaded)
234             def _check(newdata):
235                 self.failUnlessEqual(newdata, HELPER_DATA)
236             d.addCallback(_check)
237             return d
238         d.addCallback(_upload_with_helper)
239
240         def _upload_duplicate_with_helper(res):
241             u = upload.Data(HELPER_DATA, convergence=convergence)
242             u.debug_stash_RemoteEncryptedUploadable = True
243             d = self.extra_node.upload(u)
244             def _uploaded(results):
245                 n = self.clients[1].create_node_from_uri(results.get_uri())
246                 return download_to_data(n)
247             d.addCallback(_uploaded)
248             def _check(newdata):
249                 self.failUnlessEqual(newdata, HELPER_DATA)
250                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
251                             "uploadable started uploading, should have been avoided")
252             d.addCallback(_check)
253             return d
254         if convergence is not None:
255             d.addCallback(_upload_duplicate_with_helper)
256
257         d.addCallback(fireEventually)
258
259         def _upload_resumable(res):
260             DATA = "Data that needs help to upload and gets interrupted" * 1000
261             u1 = CountingDataUploadable(DATA, convergence=convergence)
262             u2 = CountingDataUploadable(DATA, convergence=convergence)
263
264             # we interrupt the connection after about 5kB by shutting down
265             # the helper, then restarting it.
266             u1.interrupt_after = 5000
267             u1.interrupt_after_d = defer.Deferred()
268             bounced_d = defer.Deferred()
269             def _do_bounce(res):
270                 d = self.bounce_client(0)
271                 d.addBoth(bounced_d.callback)
272             u1.interrupt_after_d.addCallback(_do_bounce)
273
274             # sneak into the helper and reduce its chunk size, so that our
275             # debug_interrupt will sever the connection on about the fifth
276             # chunk fetched. This makes sure that we've started to write the
277             # new shares before we abandon them, which exercises the
278             # abort/delete-partial-share code. TODO: find a cleaner way to do
279             # this. I know that this will affect later uses of the helper in
280             # this same test run, but I'm not currently worried about it.
281             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
282
283             upload_d = self.extra_node.upload(u1)
284             # The upload will start, and bounce_client() will be called after
285             # about 5kB. bounced_d will fire after bounce_client() finishes
286             # shutting down and restarting the node.
287             d = bounced_d
288             def _bounced(ign):
289                 # By this point, the upload should have failed because of the
290                 # interruption. upload_d will fire in a moment
291                 def _should_not_finish(res):
292                     self.fail("interrupted upload should have failed, not"
293                               " finished with result %s" % (res,))
294                 def _interrupted(f):
295                     f.trap(DeadReferenceError)
296                     # make sure we actually interrupted it before finishing
297                     # the file
298                     self.failUnless(u1.bytes_read < len(DATA),
299                                     "read %d out of %d total" %
300                                     (u1.bytes_read, len(DATA)))
301                 upload_d.addCallbacks(_should_not_finish, _interrupted)
302                 return upload_d
303             d.addCallback(_bounced)
304
305             def _disconnected(res):
306                 # check to make sure the storage servers aren't still hanging
307                 # on to the partial share: their incoming/ directories should
308                 # now be empty.
309                 log.msg("disconnected", level=log.NOISY,
310                         facility="tahoe.test.test_system")
311                 for i in range(self.numclients):
312                     incdir = os.path.join(self.getdir("client%d" % i),
313                                           "storage", "shares", "incoming")
314                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
315             d.addCallback(_disconnected)
316
317             d.addCallback(lambda res:
318                           log.msg("wait_for_helper", level=log.NOISY,
319                                   facility="tahoe.test.test_system"))
320             # then we need to wait for the extra node to reestablish its
321             # connection to the helper.
322             d.addCallback(lambda ign: self.poll(_has_helper))
323
324             d.addCallback(lambda res:
325                           log.msg("uploading again", level=log.NOISY,
326                                   facility="tahoe.test.test_system"))
327             d.addCallback(lambda res: self.extra_node.upload(u2))
328
329             def _uploaded(results):
330                 cap = results.get_uri()
331                 log.msg("Second upload complete", level=log.NOISY,
332                         facility="tahoe.test.test_system")
333
334                 # this is really bytes received rather than sent, but it's
335                 # convenient and basically measures the same thing
336                 bytes_sent = results.get_ciphertext_fetched()
337                 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
338
339                 # We currently don't support resumption of upload if the data is
340                 # encrypted with a random key.  (Because that would require us
341                 # to store the key locally and re-use it on the next upload of
342                 # this file, which isn't a bad thing to do, but we currently
343                 # don't do it.)
344                 if convergence is not None:
345                     # Make sure we did not have to read the whole file the
346                     # second time around .
347                     self.failUnless(bytes_sent < len(DATA),
348                                     "resumption didn't save us any work:"
349                                     " read %r bytes out of %r total" %
350                                     (bytes_sent, len(DATA)))
351                 else:
352                     # Make sure we did have to read the whole file the second
353                     # time around -- because the one that we partially uploaded
354                     # earlier was encrypted with a different random key.
355                     self.failIf(bytes_sent < len(DATA),
356                                 "resumption saved us some work even though we were using random keys:"
357                                 " read %r bytes out of %r total" %
358                                 (bytes_sent, len(DATA)))
359                 n = self.clients[1].create_node_from_uri(cap)
360                 return download_to_data(n)
361             d.addCallback(_uploaded)
362
363             def _check(newdata):
364                 self.failUnlessEqual(newdata, DATA)
365                 # If using convergent encryption, then also check that the
366                 # helper has removed the temp file from its directories.
367                 if convergence is not None:
368                     basedir = os.path.join(self.getdir("client0"), "helper")
369                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
370                     self.failUnlessEqual(files, [])
371                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
372                     self.failUnlessEqual(files, [])
373             d.addCallback(_check)
374             return d
375         d.addCallback(_upload_resumable)
376
377         def _grab_stats(ignored):
378             # the StatsProvider doesn't normally publish a FURL:
379             # instead it passes a live reference to the StatsGatherer
380             # (if and when it connects). To exercise the remote stats
381             # interface, we manually publish client0's StatsProvider
382             # and use client1 to query it.
383             sp = self.clients[0].stats_provider
384             sp_furl = self.clients[0].tub.registerReference(sp)
385             d = self.clients[1].tub.getReference(sp_furl)
386             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
387             def _got_stats(stats):
388                 #print "STATS"
389                 #from pprint import pprint
390                 #pprint(stats)
391                 s = stats["stats"]
392                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
393                 c = stats["counters"]
394                 self.failUnless("storage_server.allocate" in c)
395             d.addCallback(_got_stats)
396             return d
397         d.addCallback(_grab_stats)
398
399         return d
400
401     def _find_all_shares(self, basedir):
402         shares = []
403         for (dirpath, dirnames, filenames) in os.walk(basedir):
404             if "storage" not in dirpath:
405                 continue
406             if not filenames:
407                 continue
408             pieces = dirpath.split(os.sep)
409             if (len(pieces) >= 5
410                 and pieces[-4] == "storage"
411                 and pieces[-3] == "shares"):
412                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
413                 # are sharefiles here
414                 assert pieces[-5].startswith("client")
415                 client_num = int(pieces[-5][-1])
416                 storage_index_s = pieces[-1]
417                 storage_index = si_a2b(storage_index_s)
418                 for sharename in filenames:
419                     shnum = int(sharename)
420                     filename = os.path.join(dirpath, sharename)
421                     data = (client_num, storage_index, filename, shnum)
422                     shares.append(data)
423         if not shares:
424             self.fail("unable to find any share files in %s" % basedir)
425         return shares
426
427     def _corrupt_mutable_share(self, filename, which):
428         msf = MutableShareFile(filename)
429         datav = msf.readv([ (0, 1000000) ])
430         final_share = datav[0]
431         assert len(final_share) < 1000000 # ought to be truncated
432         pieces = mutable_layout.unpack_share(final_share)
433         (seqnum, root_hash, IV, k, N, segsize, datalen,
434          verification_key, signature, share_hash_chain, block_hash_tree,
435          share_data, enc_privkey) = pieces
436
437         if which == "seqnum":
438             seqnum = seqnum + 15
439         elif which == "R":
440             root_hash = self.flip_bit(root_hash)
441         elif which == "IV":
442             IV = self.flip_bit(IV)
443         elif which == "segsize":
444             segsize = segsize + 15
445         elif which == "pubkey":
446             verification_key = self.flip_bit(verification_key)
447         elif which == "signature":
448             signature = self.flip_bit(signature)
449         elif which == "share_hash_chain":
450             nodenum = share_hash_chain.keys()[0]
451             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
452         elif which == "block_hash_tree":
453             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
454         elif which == "share_data":
455             share_data = self.flip_bit(share_data)
456         elif which == "encprivkey":
457             enc_privkey = self.flip_bit(enc_privkey)
458
459         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
460                                             segsize, datalen)
461         final_share = mutable_layout.pack_share(prefix,
462                                                 verification_key,
463                                                 signature,
464                                                 share_hash_chain,
465                                                 block_hash_tree,
466                                                 share_data,
467                                                 enc_privkey)
468         msf.writev( [(0, final_share)], None)
469
470
471     def test_mutable(self):
472         self.basedir = "system/SystemTest/test_mutable"
473         DATA = "initial contents go here."  # 25 bytes % 3 != 0
474         DATA_uploadable = MutableData(DATA)
475         NEWDATA = "new contents yay"
476         NEWDATA_uploadable = MutableData(NEWDATA)
477         NEWERDATA = "this is getting old"
478         NEWERDATA_uploadable = MutableData(NEWERDATA)
479
480         d = self.set_up_nodes(use_key_generator=True)
481
482         def _create_mutable(res):
483             c = self.clients[0]
484             log.msg("starting create_mutable_file")
485             d1 = c.create_mutable_file(DATA_uploadable)
486             def _done(res):
487                 log.msg("DONE: %s" % (res,))
488                 self._mutable_node_1 = res
489             d1.addCallback(_done)
490             return d1
491         d.addCallback(_create_mutable)
492
493         def _test_debug(res):
494             # find a share. It is important to run this while there is only
495             # one slot in the grid.
496             shares = self._find_all_shares(self.basedir)
497             (client_num, storage_index, filename, shnum) = shares[0]
498             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
499                     % filename)
500             log.msg(" for clients[%d]" % client_num)
501
502             out,err = StringIO(), StringIO()
503             rc = runner.runner(["debug", "dump-share", "--offsets",
504                                 filename],
505                                stdout=out, stderr=err)
506             output = out.getvalue()
507             self.failUnlessEqual(rc, 0)
508             try:
509                 self.failUnless("Mutable slot found:\n" in output)
510                 self.failUnless("share_type: SDMF\n" in output)
511                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
512                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
513                 self.failUnless(" num_extra_leases: 0\n" in output)
514                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
515                                 in output)
516                 self.failUnless(" SDMF contents:\n" in output)
517                 self.failUnless("  seqnum: 1\n" in output)
518                 self.failUnless("  required_shares: 3\n" in output)
519                 self.failUnless("  total_shares: 10\n" in output)
520                 self.failUnless("  segsize: 27\n" in output, (output, filename))
521                 self.failUnless("  datalen: 25\n" in output)
522                 # the exact share_hash_chain nodes depends upon the sharenum,
523                 # and is more of a hassle to compute than I want to deal with
524                 # now
525                 self.failUnless("  share_hash_chain: " in output)
526                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
527                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
528                             base32.b2a(storage_index))
529                 self.failUnless(expected in output)
530             except unittest.FailTest:
531                 print
532                 print "dump-share output was:"
533                 print output
534                 raise
535         d.addCallback(_test_debug)
536
537         # test retrieval
538
539         # first, let's see if we can use the existing node to retrieve the
540         # contents. This allows it to use the cached pubkey and maybe the
541         # latest-known sharemap.
542
543         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
544         def _check_download_1(res):
545             self.failUnlessEqual(res, DATA)
546             # now we see if we can retrieve the data from a new node,
547             # constructed using the URI of the original one. We do this test
548             # on the same client that uploaded the data.
549             uri = self._mutable_node_1.get_uri()
550             log.msg("starting retrieve1")
551             newnode = self.clients[0].create_node_from_uri(uri)
552             newnode_2 = self.clients[0].create_node_from_uri(uri)
553             self.failUnlessIdentical(newnode, newnode_2)
554             return newnode.download_best_version()
555         d.addCallback(_check_download_1)
556
557         def _check_download_2(res):
558             self.failUnlessEqual(res, DATA)
559             # same thing, but with a different client
560             uri = self._mutable_node_1.get_uri()
561             newnode = self.clients[1].create_node_from_uri(uri)
562             log.msg("starting retrieve2")
563             d1 = newnode.download_best_version()
564             d1.addCallback(lambda res: (res, newnode))
565             return d1
566         d.addCallback(_check_download_2)
567
568         def _check_download_3((res, newnode)):
569             self.failUnlessEqual(res, DATA)
570             # replace the data
571             log.msg("starting replace1")
572             d1 = newnode.overwrite(NEWDATA_uploadable)
573             d1.addCallback(lambda res: newnode.download_best_version())
574             return d1
575         d.addCallback(_check_download_3)
576
577         def _check_download_4(res):
578             self.failUnlessEqual(res, NEWDATA)
579             # now create an even newer node and replace the data on it. This
580             # new node has never been used for download before.
581             uri = self._mutable_node_1.get_uri()
582             newnode1 = self.clients[2].create_node_from_uri(uri)
583             newnode2 = self.clients[3].create_node_from_uri(uri)
584             self._newnode3 = self.clients[3].create_node_from_uri(uri)
585             log.msg("starting replace2")
586             d1 = newnode1.overwrite(NEWERDATA_uploadable)
587             d1.addCallback(lambda res: newnode2.download_best_version())
588             return d1
589         d.addCallback(_check_download_4)
590
591         def _check_download_5(res):
592             log.msg("finished replace2")
593             self.failUnlessEqual(res, NEWERDATA)
594         d.addCallback(_check_download_5)
595
596         def _corrupt_shares(res):
597             # run around and flip bits in all but k of the shares, to test
598             # the hash checks
599             shares = self._find_all_shares(self.basedir)
600             ## sort by share number
601             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
602             where = dict([ (shnum, filename)
603                            for (client_num, storage_index, filename, shnum)
604                            in shares ])
605             assert len(where) == 10 # this test is designed for 3-of-10
606             for shnum, filename in where.items():
607                 # shares 7,8,9 are left alone. read will check
608                 # (share_hash_chain, block_hash_tree, share_data). New
609                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
610                 # segsize, signature).
611                 if shnum == 0:
612                     # read: this will trigger "pubkey doesn't match
613                     # fingerprint".
614                     self._corrupt_mutable_share(filename, "pubkey")
615                     self._corrupt_mutable_share(filename, "encprivkey")
616                 elif shnum == 1:
617                     # triggers "signature is invalid"
618                     self._corrupt_mutable_share(filename, "seqnum")
619                 elif shnum == 2:
620                     # triggers "signature is invalid"
621                     self._corrupt_mutable_share(filename, "R")
622                 elif shnum == 3:
623                     # triggers "signature is invalid"
624                     self._corrupt_mutable_share(filename, "segsize")
625                 elif shnum == 4:
626                     self._corrupt_mutable_share(filename, "share_hash_chain")
627                 elif shnum == 5:
628                     self._corrupt_mutable_share(filename, "block_hash_tree")
629                 elif shnum == 6:
630                     self._corrupt_mutable_share(filename, "share_data")
631                 # other things to correct: IV, signature
632                 # 7,8,9 are left alone
633
634                 # note that initial_query_count=5 means that we'll hit the
635                 # first 5 servers in effectively random order (based upon
636                 # response time), so we won't necessarily ever get a "pubkey
637                 # doesn't match fingerprint" error (if we hit shnum>=1 before
638                 # shnum=0, we pull the pubkey from there). To get repeatable
639                 # specific failures, we need to set initial_query_count=1,
640                 # but of course that will change the sequencing behavior of
641                 # the retrieval process. TODO: find a reasonable way to make
642                 # this a parameter, probably when we expand this test to test
643                 # for one failure mode at a time.
644
645                 # when we retrieve this, we should get three signature
646                 # failures (where we've mangled seqnum, R, and segsize). The
647                 # pubkey mangling
648         d.addCallback(_corrupt_shares)
649
650         d.addCallback(lambda res: self._newnode3.download_best_version())
651         d.addCallback(_check_download_5)
652
653         def _check_empty_file(res):
654             # make sure we can create empty files, this usually screws up the
655             # segsize math
656             d1 = self.clients[2].create_mutable_file(MutableData(""))
657             d1.addCallback(lambda newnode: newnode.download_best_version())
658             d1.addCallback(lambda res: self.failUnlessEqual("", res))
659             return d1
660         d.addCallback(_check_empty_file)
661
662         d.addCallback(lambda res: self.clients[0].create_dirnode())
663         def _created_dirnode(dnode):
664             log.msg("_created_dirnode(%s)" % (dnode,))
665             d1 = dnode.list()
666             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
667             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
668             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
669             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
670             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
671             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
672             d1.addCallback(lambda res: dnode.build_manifest().when_done())
673             d1.addCallback(lambda res:
674                            self.failUnlessEqual(len(res["manifest"]), 1))
675             return d1
676         d.addCallback(_created_dirnode)
677
678         def wait_for_c3_kg_conn():
679             return self.clients[3]._key_generator is not None
680         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
681
682         def check_kg_poolsize(junk, size_delta):
683             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
684                                  self.key_generator_svc.key_generator.pool_size + size_delta)
685
686         d.addCallback(check_kg_poolsize, 0)
687         d.addCallback(lambda junk:
688             self.clients[3].create_mutable_file(MutableData('hello, world')))
689         d.addCallback(check_kg_poolsize, -1)
690         d.addCallback(lambda junk: self.clients[3].create_dirnode())
691         d.addCallback(check_kg_poolsize, -2)
692         # use_helper induces use of clients[3], which is the using-key_gen client
693         d.addCallback(lambda junk:
694                       self.POST("uri?t=mkdir&name=george", use_helper=True))
695         d.addCallback(check_kg_poolsize, -3)
696
697         return d
698
699     def flip_bit(self, good):
700         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
701
702     def mangle_uri(self, gooduri):
703         # change the key, which changes the storage index, which means we'll
704         # be asking about the wrong file, so nobody will have any shares
705         u = uri.from_string(gooduri)
706         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
707                             uri_extension_hash=u.uri_extension_hash,
708                             needed_shares=u.needed_shares,
709                             total_shares=u.total_shares,
710                             size=u.size)
711         return u2.to_string()
712
713     # TODO: add a test which mangles the uri_extension_hash instead, and
714     # should fail due to not being able to get a valid uri_extension block.
715     # Also a test which sneakily mangles the uri_extension block to change
716     # some of the validation data, so it will fail in the post-download phase
717     # when the file's crypttext integrity check fails. Do the same thing for
718     # the key, which should cause the download to fail the post-download
719     # plaintext_hash check.
720
721     def test_filesystem(self):
722         self.basedir = "system/SystemTest/test_filesystem"
723         self.data = LARGE_DATA
724         d = self.set_up_nodes(use_stats_gatherer=True)
725         def _new_happy_semantics(ign):
726             for c in self.clients:
727                 c.encoding_params['happy'] = 1
728         d.addCallback(_new_happy_semantics)
729         d.addCallback(self._test_introweb)
730         d.addCallback(self.log, "starting publish")
731         d.addCallback(self._do_publish1)
732         d.addCallback(self._test_runner)
733         d.addCallback(self._do_publish2)
734         # at this point, we have the following filesystem (where "R" denotes
735         # self._root_directory_uri):
736         # R
737         # R/subdir1
738         # R/subdir1/mydata567
739         # R/subdir1/subdir2/
740         # R/subdir1/subdir2/mydata992
741
742         d.addCallback(lambda res: self.bounce_client(0))
743         d.addCallback(self.log, "bounced client0")
744
745         d.addCallback(self._check_publish1)
746         d.addCallback(self.log, "did _check_publish1")
747         d.addCallback(self._check_publish2)
748         d.addCallback(self.log, "did _check_publish2")
749         d.addCallback(self._do_publish_private)
750         d.addCallback(self.log, "did _do_publish_private")
751         # now we also have (where "P" denotes a new dir):
752         #  P/personal/sekrit data
753         #  P/s2-rw -> /subdir1/subdir2/
754         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
755         d.addCallback(self._check_publish_private)
756         d.addCallback(self.log, "did _check_publish_private")
757         d.addCallback(self._test_web)
758         d.addCallback(self._test_control)
759         d.addCallback(self._test_cli)
760         # P now has four top-level children:
761         # P/personal/sekrit data
762         # P/s2-ro/
763         # P/s2-rw/
764         # P/test_put/  (empty)
765         d.addCallback(self._test_checker)
766         return d
767
768     def _test_introweb(self, res):
769         d = getPage(self.introweb_url, method="GET", followRedirect=True)
770         def _check(res):
771             try:
772                 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res)
773                 verstr = str(allmydata.__version__)
774
775                 # The Python "rational version numbering" convention
776                 # disallows "-r$REV" but allows ".post$REV"
777                 # instead. Eventually we'll probably move to
778                 # that. When we do, this test won't go red:
779                 ix = verstr.rfind('-r')
780                 if ix != -1:
781                     altverstr = verstr[:ix] + '.post' + verstr[ix+2:]
782                 else:
783                     ix = verstr.rfind('.post')
784                     if ix != -1:
785                         altverstr = verstr[:ix] + '-r' + verstr[ix+5:]
786                     else:
787                         altverstr = verstr
788
789                 appverstr = "%s: %s" % (allmydata.__appname__, verstr)
790                 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
791
792                 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
793                 self.failUnless("Announcement Summary: storage: 5" in res)
794                 self.failUnless("Subscription Summary: storage: 5" in res)
795                 self.failUnless("tahoe.css" in res)
796             except unittest.FailTest:
797                 print
798                 print "GET %s output was:" % self.introweb_url
799                 print res
800                 raise
801         d.addCallback(_check)
802         # make sure it serves the CSS too
803         d.addCallback(lambda res:
804                       getPage(self.introweb_url+"tahoe.css", method="GET"))
805         d.addCallback(lambda res:
806                       getPage(self.introweb_url + "?t=json",
807                               method="GET", followRedirect=True))
808         def _check_json(res):
809             data = simplejson.loads(res)
810             try:
811                 self.failUnlessEqual(data["subscription_summary"],
812                                      {"storage": 5})
813                 self.failUnlessEqual(data["announcement_summary"],
814                                      {"storage": 5})
815             except unittest.FailTest:
816                 print
817                 print "GET %s?t=json output was:" % self.introweb_url
818                 print res
819                 raise
820         d.addCallback(_check_json)
821         return d
822
823     def _do_publish1(self, res):
824         ut = upload.Data(self.data, convergence=None)
825         c0 = self.clients[0]
826         d = c0.create_dirnode()
827         def _made_root(new_dirnode):
828             self._root_directory_uri = new_dirnode.get_uri()
829             return c0.create_node_from_uri(self._root_directory_uri)
830         d.addCallback(_made_root)
831         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
832         def _made_subdir1(subdir1_node):
833             self._subdir1_node = subdir1_node
834             d1 = subdir1_node.add_file(u"mydata567", ut)
835             d1.addCallback(self.log, "publish finished")
836             def _stash_uri(filenode):
837                 self.uri = filenode.get_uri()
838                 assert isinstance(self.uri, str), (self.uri, filenode)
839             d1.addCallback(_stash_uri)
840             return d1
841         d.addCallback(_made_subdir1)
842         return d
843
844     def _do_publish2(self, res):
845         ut = upload.Data(self.data, convergence=None)
846         d = self._subdir1_node.create_subdirectory(u"subdir2")
847         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
848         return d
849
850     def log(self, res, *args, **kwargs):
851         # print "MSG: %s  RES: %s" % (msg, args)
852         log.msg(*args, **kwargs)
853         return res
854
855     def _do_publish_private(self, res):
856         self.smalldata = "sssh, very secret stuff"
857         ut = upload.Data(self.smalldata, convergence=None)
858         d = self.clients[0].create_dirnode()
859         d.addCallback(self.log, "GOT private directory")
860         def _got_new_dir(privnode):
861             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
862             d1 = privnode.create_subdirectory(u"personal")
863             d1.addCallback(self.log, "made P/personal")
864             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
865             d1.addCallback(self.log, "made P/personal/sekrit data")
866             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
867             def _got_s2(s2node):
868                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
869                                       s2node.get_readonly_uri())
870                 d2.addCallback(lambda node:
871                                privnode.set_uri(u"s2-ro",
872                                                 s2node.get_readonly_uri(),
873                                                 s2node.get_readonly_uri()))
874                 return d2
875             d1.addCallback(_got_s2)
876             d1.addCallback(lambda res: privnode)
877             return d1
878         d.addCallback(_got_new_dir)
879         return d
880
881     def _check_publish1(self, res):
882         # this one uses the iterative API
883         c1 = self.clients[1]
884         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
885         d.addCallback(self.log, "check_publish1 got /")
886         d.addCallback(lambda root: root.get(u"subdir1"))
887         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
888         d.addCallback(lambda filenode: download_to_data(filenode))
889         d.addCallback(self.log, "get finished")
890         def _get_done(data):
891             self.failUnlessEqual(data, self.data)
892         d.addCallback(_get_done)
893         return d
894
895     def _check_publish2(self, res):
896         # this one uses the path-based API
897         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
898         d = rootnode.get_child_at_path(u"subdir1")
899         d.addCallback(lambda dirnode:
900                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
901         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
902         d.addCallback(lambda filenode: download_to_data(filenode))
903         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
904
905         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
906         def _got_filenode(filenode):
907             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
908             assert fnode == filenode
909         d.addCallback(_got_filenode)
910         return d
911
912     def _check_publish_private(self, resnode):
913         # this one uses the path-based API
914         self._private_node = resnode
915
916         d = self._private_node.get_child_at_path(u"personal")
917         def _got_personal(personal):
918             self._personal_node = personal
919             return personal
920         d.addCallback(_got_personal)
921
922         d.addCallback(lambda dirnode:
923                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
924         def get_path(path):
925             return self._private_node.get_child_at_path(path)
926
927         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
928         d.addCallback(lambda filenode: download_to_data(filenode))
929         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
930         d.addCallback(lambda res: get_path(u"s2-rw"))
931         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
932         d.addCallback(lambda res: get_path(u"s2-ro"))
933         def _got_s2ro(dirnode):
934             self.failUnless(dirnode.is_mutable(), dirnode)
935             self.failUnless(dirnode.is_readonly(), dirnode)
936             d1 = defer.succeed(None)
937             d1.addCallback(lambda res: dirnode.list())
938             d1.addCallback(self.log, "dirnode.list")
939
940             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
941
942             d1.addCallback(self.log, "doing add_file(ro)")
943             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
944             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
945
946             d1.addCallback(self.log, "doing get(ro)")
947             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
948             d1.addCallback(lambda filenode:
949                            self.failUnless(IFileNode.providedBy(filenode)))
950
951             d1.addCallback(self.log, "doing delete(ro)")
952             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
953
954             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
955
956             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
957
958             personal = self._personal_node
959             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
960
961             d1.addCallback(self.log, "doing move_child_to(ro)2")
962             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
963
964             d1.addCallback(self.log, "finished with _got_s2ro")
965             return d1
966         d.addCallback(_got_s2ro)
967         def _got_home(dummy):
968             home = self._private_node
969             personal = self._personal_node
970             d1 = defer.succeed(None)
971             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
972             d1.addCallback(lambda res:
973                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
974
975             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
976             d1.addCallback(lambda res:
977                            home.move_child_to(u"sekrit", home, u"sekrit data"))
978
979             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
980             d1.addCallback(lambda res:
981                            home.move_child_to(u"sekrit data", personal))
982
983             d1.addCallback(lambda res: home.build_manifest().when_done())
984             d1.addCallback(self.log, "manifest")
985             #  five items:
986             # P/
987             # P/personal/
988             # P/personal/sekrit data
989             # P/s2-rw  (same as P/s2-ro)
990             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
991             d1.addCallback(lambda res:
992                            self.failUnlessEqual(len(res["manifest"]), 5))
993             d1.addCallback(lambda res: home.start_deep_stats().when_done())
994             def _check_stats(stats):
995                 expected = {"count-immutable-files": 1,
996                             "count-mutable-files": 0,
997                             "count-literal-files": 1,
998                             "count-files": 2,
999                             "count-directories": 3,
1000                             "size-immutable-files": 112,
1001                             "size-literal-files": 23,
1002                             #"size-directories": 616, # varies
1003                             #"largest-directory": 616,
1004                             "largest-directory-children": 3,
1005                             "largest-immutable-file": 112,
1006                             }
1007                 for k,v in expected.iteritems():
1008                     self.failUnlessEqual(stats[k], v,
1009                                          "stats[%s] was %s, not %s" %
1010                                          (k, stats[k], v))
1011                 self.failUnless(stats["size-directories"] > 1300,
1012                                 stats["size-directories"])
1013                 self.failUnless(stats["largest-directory"] > 800,
1014                                 stats["largest-directory"])
1015                 self.failUnlessEqual(stats["size-files-histogram"],
1016                                      [ (11, 31, 1), (101, 316, 1) ])
1017             d1.addCallback(_check_stats)
1018             return d1
1019         d.addCallback(_got_home)
1020         return d
1021
1022     def shouldFail(self, res, expected_failure, which, substring=None):
1023         if isinstance(res, Failure):
1024             res.trap(expected_failure)
1025             if substring:
1026                 self.failUnless(substring in str(res),
1027                                 "substring '%s' not in '%s'"
1028                                 % (substring, str(res)))
1029         else:
1030             self.fail("%s was supposed to raise %s, not get '%s'" %
1031                       (which, expected_failure, res))
1032
1033     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1034         assert substring is None or isinstance(substring, str)
1035         d = defer.maybeDeferred(callable, *args, **kwargs)
1036         def done(res):
1037             if isinstance(res, Failure):
1038                 res.trap(expected_failure)
1039                 if substring:
1040                     self.failUnless(substring in str(res),
1041                                     "substring '%s' not in '%s'"
1042                                     % (substring, str(res)))
1043             else:
1044                 self.fail("%s was supposed to raise %s, not get '%s'" %
1045                           (which, expected_failure, res))
1046         d.addBoth(done)
1047         return d
1048
1049     def PUT(self, urlpath, data):
1050         url = self.webish_url + urlpath
1051         return getPage(url, method="PUT", postdata=data)
1052
1053     def GET(self, urlpath, followRedirect=False):
1054         url = self.webish_url + urlpath
1055         return getPage(url, method="GET", followRedirect=followRedirect)
1056
1057     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1058         sepbase = "boogabooga"
1059         sep = "--" + sepbase
1060         form = []
1061         form.append(sep)
1062         form.append('Content-Disposition: form-data; name="_charset"')
1063         form.append('')
1064         form.append('UTF-8')
1065         form.append(sep)
1066         for name, value in fields.iteritems():
1067             if isinstance(value, tuple):
1068                 filename, value = value
1069                 form.append('Content-Disposition: form-data; name="%s"; '
1070                             'filename="%s"' % (name, filename.encode("utf-8")))
1071             else:
1072                 form.append('Content-Disposition: form-data; name="%s"' % name)
1073             form.append('')
1074             form.append(str(value))
1075             form.append(sep)
1076         form[-1] += "--"
1077         body = ""
1078         headers = {}
1079         if fields:
1080             body = "\r\n".join(form) + "\r\n"
1081             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1082         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1083
1084     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1085               use_helper=False):
1086         if use_helper:
1087             url = self.helper_webish_url + urlpath
1088         else:
1089             url = self.webish_url + urlpath
1090         return getPage(url, method="POST", postdata=body, headers=headers,
1091                        followRedirect=followRedirect)
1092
1093     def _test_web(self, res):
1094         base = self.webish_url
1095         public = "uri/" + self._root_directory_uri
1096         d = getPage(base)
1097         def _got_welcome(page):
1098             html = page.replace('\n', ' ')
1099             connected_re = r'Connected to <span>%d</span>\s*of <span>%d</span> known storage servers' % (self.numclients, self.numclients)
1100             self.failUnless(re.search(connected_re, html),
1101                             "I didn't see the right '%s' message in:\n%s" % (connected_re, page))
1102             # nodeids/tubids don't have any regexp-special characters
1103             nodeid_re = r'<th>Node ID:</th>\s*<td title="TubID: %s">%s</td>' % (
1104                 self.clients[0].get_long_tubid(), self.clients[0].get_long_nodeid())
1105             self.failUnless(re.search(nodeid_re, html),
1106                             "I didn't see the right '%s' message in:\n%s" % (nodeid_re, page))
1107             self.failUnless("Helper: 0 active uploads" in page)
1108         d.addCallback(_got_welcome)
1109         d.addCallback(self.log, "done with _got_welcome")
1110
1111         # get the welcome page from the node that uses the helper too
1112         d.addCallback(lambda res: getPage(self.helper_webish_url))
1113         def _got_welcome_helper(page):
1114             html = page.replace('\n', ' ')
1115             self.failUnless(re.search('<img (src="img/connected-yes.png" |alt="Connected" ){2}/>', html), page)
1116             self.failUnlessIn("Not running helper", page)
1117         d.addCallback(_got_welcome_helper)
1118
1119         d.addCallback(lambda res: getPage(base + public))
1120         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1121         def _got_subdir1(page):
1122             # there ought to be an href for our file
1123             self.failUnlessIn('<td align="right">%d</td>' % len(self.data), page)
1124             self.failUnless(">mydata567</a>" in page)
1125         d.addCallback(_got_subdir1)
1126         d.addCallback(self.log, "done with _got_subdir1")
1127         d.addCallback(lambda res:
1128                       getPage(base + public + "/subdir1/mydata567"))
1129         def _got_data(page):
1130             self.failUnlessEqual(page, self.data)
1131         d.addCallback(_got_data)
1132
1133         # download from a URI embedded in a URL
1134         d.addCallback(self.log, "_get_from_uri")
1135         def _get_from_uri(res):
1136             return getPage(base + "uri/%s?filename=%s"
1137                            % (self.uri, "mydata567"))
1138         d.addCallback(_get_from_uri)
1139         def _got_from_uri(page):
1140             self.failUnlessEqual(page, self.data)
1141         d.addCallback(_got_from_uri)
1142
1143         # download from a URI embedded in a URL, second form
1144         d.addCallback(self.log, "_get_from_uri2")
1145         def _get_from_uri2(res):
1146             return getPage(base + "uri?uri=%s" % (self.uri,))
1147         d.addCallback(_get_from_uri2)
1148         d.addCallback(_got_from_uri)
1149
1150         # download from a bogus URI, make sure we get a reasonable error
1151         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1152         def _get_from_bogus_uri(res):
1153             d1 = getPage(base + "uri/%s?filename=%s"
1154                          % (self.mangle_uri(self.uri), "mydata567"))
1155             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1156                        "410")
1157             return d1
1158         d.addCallback(_get_from_bogus_uri)
1159         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1160
1161         # upload a file with PUT
1162         d.addCallback(self.log, "about to try PUT")
1163         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1164                                            "new.txt contents"))
1165         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1166         d.addCallback(self.failUnlessEqual, "new.txt contents")
1167         # and again with something large enough to use multiple segments,
1168         # and hopefully trigger pauseProducing too
1169         def _new_happy_semantics(ign):
1170             for c in self.clients:
1171                 # these get reset somewhere? Whatever.
1172                 c.encoding_params['happy'] = 1
1173         d.addCallback(_new_happy_semantics)
1174         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1175                                            "big" * 500000)) # 1.5MB
1176         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1177         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1178
1179         # can we replace files in place?
1180         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1181                                            "NEWER contents"))
1182         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1183         d.addCallback(self.failUnlessEqual, "NEWER contents")
1184
1185         # test unlinked POST
1186         d.addCallback(lambda res: self.POST("uri", t="upload",
1187                                             file=("new.txt", "data" * 10000)))
1188         # and again using the helper, which exercises different upload-status
1189         # display code
1190         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1191                                             file=("foo.txt", "data2" * 10000)))
1192
1193         # check that the status page exists
1194         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1195         def _got_status(res):
1196             # find an interesting upload and download to look at. LIT files
1197             # are not interesting.
1198             h = self.clients[0].get_history()
1199             for ds in h.list_all_download_statuses():
1200                 if ds.get_size() > 200:
1201                     self._down_status = ds.get_counter()
1202             for us in h.list_all_upload_statuses():
1203                 if us.get_size() > 200:
1204                     self._up_status = us.get_counter()
1205             rs = list(h.list_all_retrieve_statuses())[0]
1206             self._retrieve_status = rs.get_counter()
1207             ps = list(h.list_all_publish_statuses())[0]
1208             self._publish_status = ps.get_counter()
1209             us = list(h.list_all_mapupdate_statuses())[0]
1210             self._update_status = us.get_counter()
1211
1212             # and that there are some upload- and download- status pages
1213             return self.GET("status/up-%d" % self._up_status)
1214         d.addCallback(_got_status)
1215         def _got_up(res):
1216             return self.GET("status/down-%d" % self._down_status)
1217         d.addCallback(_got_up)
1218         def _got_down(res):
1219             return self.GET("status/mapupdate-%d" % self._update_status)
1220         d.addCallback(_got_down)
1221         def _got_update(res):
1222             return self.GET("status/publish-%d" % self._publish_status)
1223         d.addCallback(_got_update)
1224         def _got_publish(res):
1225             self.failUnlessIn("Publish Results", res)
1226             return self.GET("status/retrieve-%d" % self._retrieve_status)
1227         d.addCallback(_got_publish)
1228         def _got_retrieve(res):
1229             self.failUnlessIn("Retrieve Results", res)
1230         d.addCallback(_got_retrieve)
1231
1232         # check that the helper status page exists
1233         d.addCallback(lambda res:
1234                       self.GET("helper_status", followRedirect=True))
1235         def _got_helper_status(res):
1236             self.failUnless("Bytes Fetched:" in res)
1237             # touch a couple of files in the helper's working directory to
1238             # exercise more code paths
1239             workdir = os.path.join(self.getdir("client0"), "helper")
1240             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1241             f = open(incfile, "wb")
1242             f.write("small file")
1243             f.close()
1244             then = time.time() - 86400*3
1245             now = time.time()
1246             os.utime(incfile, (now, then))
1247             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1248             f = open(encfile, "wb")
1249             f.write("less small file")
1250             f.close()
1251             os.utime(encfile, (now, then))
1252         d.addCallback(_got_helper_status)
1253         # and that the json form exists
1254         d.addCallback(lambda res:
1255                       self.GET("helper_status?t=json", followRedirect=True))
1256         def _got_helper_status_json(res):
1257             data = simplejson.loads(res)
1258             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1259                                  1)
1260             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1261             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1262             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1263                                  10)
1264             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1265             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1266             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1267                                  15)
1268         d.addCallback(_got_helper_status_json)
1269
1270         # and check that client[3] (which uses a helper but does not run one
1271         # itself) doesn't explode when you ask for its status
1272         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1273         def _got_non_helper_status(res):
1274             self.failUnlessIn("Recent and Active Operations", res)
1275         d.addCallback(_got_non_helper_status)
1276
1277         # or for helper status with t=json
1278         d.addCallback(lambda res:
1279                       getPage(self.helper_webish_url + "helper_status?t=json"))
1280         def _got_non_helper_status_json(res):
1281             data = simplejson.loads(res)
1282             self.failUnlessEqual(data, {})
1283         d.addCallback(_got_non_helper_status_json)
1284
1285         # see if the statistics page exists
1286         d.addCallback(lambda res: self.GET("statistics"))
1287         def _got_stats(res):
1288             self.failUnlessIn("Operational Statistics", res)
1289             self.failUnlessIn("  'downloader.files_downloaded': 5,", res)
1290         d.addCallback(_got_stats)
1291         d.addCallback(lambda res: self.GET("statistics?t=json"))
1292         def _got_stats_json(res):
1293             data = simplejson.loads(res)
1294             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1295             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1296         d.addCallback(_got_stats_json)
1297
1298         # TODO: mangle the second segment of a file, to test errors that
1299         # occur after we've already sent some good data, which uses a
1300         # different error path.
1301
1302         # TODO: download a URI with a form
1303         # TODO: create a directory by using a form
1304         # TODO: upload by using a form on the directory page
1305         #    url = base + "somedir/subdir1/freeform_post!!upload"
1306         # TODO: delete a file by using a button on the directory page
1307
1308         return d
1309
1310     def _test_runner(self, res):
1311         # exercise some of the diagnostic tools in runner.py
1312
1313         # find a share
1314         for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1315             if "storage" not in dirpath:
1316                 continue
1317             if not filenames:
1318                 continue
1319             pieces = dirpath.split(os.sep)
1320             if (len(pieces) >= 4
1321                 and pieces[-4] == "storage"
1322                 and pieces[-3] == "shares"):
1323                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1324                 # are sharefiles here
1325                 filename = os.path.join(dirpath, filenames[0])
1326                 # peek at the magic to see if it is a chk share
1327                 magic = open(filename, "rb").read(4)
1328                 if magic == '\x00\x00\x00\x01':
1329                     break
1330         else:
1331             self.fail("unable to find any uri_extension files in %r"
1332                       % self.basedir)
1333         log.msg("test_system.SystemTest._test_runner using %r" % filename)
1334
1335         out,err = StringIO(), StringIO()
1336         rc = runner.runner(["debug", "dump-share", "--offsets",
1337                             unicode_to_argv(filename)],
1338                            stdout=out, stderr=err)
1339         output = out.getvalue()
1340         self.failUnlessEqual(rc, 0)
1341
1342         # we only upload a single file, so we can assert some things about
1343         # its size and shares.
1344         self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1345         self.failUnlessIn("size: %d\n" % len(self.data), output)
1346         self.failUnlessIn("num_segments: 1\n", output)
1347         # segment_size is always a multiple of needed_shares
1348         self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1349         self.failUnlessIn("total_shares: 10\n", output)
1350         # keys which are supposed to be present
1351         for key in ("size", "num_segments", "segment_size",
1352                     "needed_shares", "total_shares",
1353                     "codec_name", "codec_params", "tail_codec_params",
1354                     #"plaintext_hash", "plaintext_root_hash",
1355                     "crypttext_hash", "crypttext_root_hash",
1356                     "share_root_hash", "UEB_hash"):
1357             self.failUnlessIn("%s: " % key, output)
1358         self.failUnlessIn("  verify-cap: URI:CHK-Verifier:", output)
1359
1360         # now use its storage index to find the other shares using the
1361         # 'find-shares' tool
1362         sharedir, shnum = os.path.split(filename)
1363         storagedir, storage_index_s = os.path.split(sharedir)
1364         storage_index_s = str(storage_index_s)
1365         out,err = StringIO(), StringIO()
1366         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1367         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1368         rc = runner.runner(cmd, stdout=out, stderr=err)
1369         self.failUnlessEqual(rc, 0)
1370         out.seek(0)
1371         sharefiles = [sfn.strip() for sfn in out.readlines()]
1372         self.failUnlessEqual(len(sharefiles), 10)
1373
1374         # also exercise the 'catalog-shares' tool
1375         out,err = StringIO(), StringIO()
1376         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1377         cmd = ["debug", "catalog-shares"] + nodedirs
1378         rc = runner.runner(cmd, stdout=out, stderr=err)
1379         self.failUnlessEqual(rc, 0)
1380         out.seek(0)
1381         descriptions = [sfn.strip() for sfn in out.readlines()]
1382         self.failUnlessEqual(len(descriptions), 30)
1383         matching = [line
1384                     for line in descriptions
1385                     if line.startswith("CHK %s " % storage_index_s)]
1386         self.failUnlessEqual(len(matching), 10)
1387
1388     def _test_control(self, res):
1389         # exercise the remote-control-the-client foolscap interfaces in
1390         # allmydata.control (mostly used for performance tests)
1391         c0 = self.clients[0]
1392         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1393         control_furl = open(control_furl_file, "r").read().strip()
1394         # it doesn't really matter which Tub we use to connect to the client,
1395         # so let's just use our IntroducerNode's
1396         d = self.introducer.tub.getReference(control_furl)
1397         d.addCallback(self._test_control2, control_furl_file)
1398         return d
1399     def _test_control2(self, rref, filename):
1400         d = defer.succeed(None)
1401         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1402         if sys.platform in ("linux2", "linux3"):
1403             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1404         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1405         return d
1406
1407     def _test_cli(self, res):
1408         # run various CLI commands (in a thread, since they use blocking
1409         # network calls)
1410
1411         private_uri = self._private_node.get_uri()
1412         client0_basedir = self.getdir("client0")
1413
1414         nodeargs = [
1415             "--node-directory", client0_basedir,
1416             ]
1417
1418         d = defer.succeed(None)
1419
1420         # for compatibility with earlier versions, private/root_dir.cap is
1421         # supposed to be treated as an alias named "tahoe:". Start by making
1422         # sure that works, before we add other aliases.
1423
1424         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1425         f = open(root_file, "w")
1426         f.write(private_uri)
1427         f.close()
1428
1429         def run(ignored, verb, *args, **kwargs):
1430             stdin = kwargs.get("stdin", "")
1431             newargs = nodeargs + [verb] + list(args)
1432             return self._run_cli(newargs, stdin=stdin)
1433
1434         def _check_ls((out,err), expected_children, unexpected_children=[]):
1435             self.failUnlessEqual(err, "")
1436             for s in expected_children:
1437                 self.failUnless(s in out, (s,out))
1438             for s in unexpected_children:
1439                 self.failIf(s in out, (s,out))
1440
1441         def _check_ls_root((out,err)):
1442             self.failUnless("personal" in out)
1443             self.failUnless("s2-ro" in out)
1444             self.failUnless("s2-rw" in out)
1445             self.failUnlessEqual(err, "")
1446
1447         # this should reference private_uri
1448         d.addCallback(run, "ls")
1449         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1450
1451         d.addCallback(run, "list-aliases")
1452         def _check_aliases_1((out,err)):
1453             self.failUnlessEqual(err, "")
1454             self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1455         d.addCallback(_check_aliases_1)
1456
1457         # now that that's out of the way, remove root_dir.cap and work with
1458         # new files
1459         d.addCallback(lambda res: os.unlink(root_file))
1460         d.addCallback(run, "list-aliases")
1461         def _check_aliases_2((out,err)):
1462             self.failUnlessEqual(err, "")
1463             self.failUnlessEqual(out, "")
1464         d.addCallback(_check_aliases_2)
1465
1466         d.addCallback(run, "mkdir")
1467         def _got_dir( (out,err) ):
1468             self.failUnless(uri.from_string_dirnode(out.strip()))
1469             return out.strip()
1470         d.addCallback(_got_dir)
1471         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1472
1473         d.addCallback(run, "list-aliases")
1474         def _check_aliases_3((out,err)):
1475             self.failUnlessEqual(err, "")
1476             self.failUnless("tahoe: " in out)
1477         d.addCallback(_check_aliases_3)
1478
1479         def _check_empty_dir((out,err)):
1480             self.failUnlessEqual(out, "")
1481             self.failUnlessEqual(err, "")
1482         d.addCallback(run, "ls")
1483         d.addCallback(_check_empty_dir)
1484
1485         def _check_missing_dir((out,err)):
1486             # TODO: check that rc==2
1487             self.failUnlessEqual(out, "")
1488             self.failUnlessEqual(err, "No such file or directory\n")
1489         d.addCallback(run, "ls", "bogus")
1490         d.addCallback(_check_missing_dir)
1491
1492         files = []
1493         datas = []
1494         for i in range(10):
1495             fn = os.path.join(self.basedir, "file%d" % i)
1496             files.append(fn)
1497             data = "data to be uploaded: file%d\n" % i
1498             datas.append(data)
1499             open(fn,"wb").write(data)
1500
1501         def _check_stdout_against((out,err), filenum=None, data=None):
1502             self.failUnlessEqual(err, "")
1503             if filenum is not None:
1504                 self.failUnlessEqual(out, datas[filenum])
1505             if data is not None:
1506                 self.failUnlessEqual(out, data)
1507
1508         # test all both forms of put: from a file, and from stdin
1509         #  tahoe put bar FOO
1510         d.addCallback(run, "put", files[0], "tahoe-file0")
1511         def _put_out((out,err)):
1512             self.failUnless("URI:LIT:" in out, out)
1513             self.failUnless("201 Created" in err, err)
1514             uri0 = out.strip()
1515             return run(None, "get", uri0)
1516         d.addCallback(_put_out)
1517         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1518
1519         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1520         #  tahoe put bar tahoe:FOO
1521         d.addCallback(run, "put", files[2], "tahoe:file2")
1522         d.addCallback(run, "put", "--format=SDMF", files[3], "tahoe:file3")
1523         def _check_put_mutable((out,err)):
1524             self._mutable_file3_uri = out.strip()
1525         d.addCallback(_check_put_mutable)
1526         d.addCallback(run, "get", "tahoe:file3")
1527         d.addCallback(_check_stdout_against, 3)
1528
1529         #  tahoe put FOO
1530         STDIN_DATA = "This is the file to upload from stdin."
1531         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1532         #  tahoe put tahoe:FOO
1533         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1534                       stdin="Other file from stdin.")
1535
1536         d.addCallback(run, "ls")
1537         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1538                                   "tahoe-file-stdin", "from-stdin"])
1539         d.addCallback(run, "ls", "subdir")
1540         d.addCallback(_check_ls, ["tahoe-file1"])
1541
1542         # tahoe mkdir FOO
1543         d.addCallback(run, "mkdir", "subdir2")
1544         d.addCallback(run, "ls")
1545         # TODO: extract the URI, set an alias with it
1546         d.addCallback(_check_ls, ["subdir2"])
1547
1548         # tahoe get: (to stdin and to a file)
1549         d.addCallback(run, "get", "tahoe-file0")
1550         d.addCallback(_check_stdout_against, 0)
1551         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1552         d.addCallback(_check_stdout_against, 1)
1553         outfile0 = os.path.join(self.basedir, "outfile0")
1554         d.addCallback(run, "get", "file2", outfile0)
1555         def _check_outfile0((out,err)):
1556             data = open(outfile0,"rb").read()
1557             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1558         d.addCallback(_check_outfile0)
1559         outfile1 = os.path.join(self.basedir, "outfile0")
1560         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1561         def _check_outfile1((out,err)):
1562             data = open(outfile1,"rb").read()
1563             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1564         d.addCallback(_check_outfile1)
1565
1566         d.addCallback(run, "rm", "tahoe-file0")
1567         d.addCallback(run, "rm", "tahoe:file2")
1568         d.addCallback(run, "ls")
1569         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1570
1571         d.addCallback(run, "ls", "-l")
1572         def _check_ls_l((out,err)):
1573             lines = out.split("\n")
1574             for l in lines:
1575                 if "tahoe-file-stdin" in l:
1576                     self.failUnless(l.startswith("-r-- "), l)
1577                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1578                 if "file3" in l:
1579                     self.failUnless(l.startswith("-rw- "), l) # mutable
1580         d.addCallback(_check_ls_l)
1581
1582         d.addCallback(run, "ls", "--uri")
1583         def _check_ls_uri((out,err)):
1584             lines = out.split("\n")
1585             for l in lines:
1586                 if "file3" in l:
1587                     self.failUnless(self._mutable_file3_uri in l)
1588         d.addCallback(_check_ls_uri)
1589
1590         d.addCallback(run, "ls", "--readonly-uri")
1591         def _check_ls_rouri((out,err)):
1592             lines = out.split("\n")
1593             for l in lines:
1594                 if "file3" in l:
1595                     rw_uri = self._mutable_file3_uri
1596                     u = uri.from_string_mutable_filenode(rw_uri)
1597                     ro_uri = u.get_readonly().to_string()
1598                     self.failUnless(ro_uri in l)
1599         d.addCallback(_check_ls_rouri)
1600
1601
1602         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1603         d.addCallback(run, "ls")
1604         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1605
1606         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1607         d.addCallback(run, "ls")
1608         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1609
1610         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1611         d.addCallback(run, "ls")
1612         d.addCallback(_check_ls, ["file3", "file3-copy"])
1613         d.addCallback(run, "get", "tahoe:file3-copy")
1614         d.addCallback(_check_stdout_against, 3)
1615
1616         # copy from disk into tahoe
1617         d.addCallback(run, "cp", files[4], "tahoe:file4")
1618         d.addCallback(run, "ls")
1619         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1620         d.addCallback(run, "get", "tahoe:file4")
1621         d.addCallback(_check_stdout_against, 4)
1622
1623         # copy from tahoe into disk
1624         target_filename = os.path.join(self.basedir, "file-out")
1625         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1626         def _check_cp_out((out,err)):
1627             self.failUnless(os.path.exists(target_filename))
1628             got = open(target_filename,"rb").read()
1629             self.failUnlessEqual(got, datas[4])
1630         d.addCallback(_check_cp_out)
1631
1632         # copy from disk to disk (silly case)
1633         target2_filename = os.path.join(self.basedir, "file-out-copy")
1634         d.addCallback(run, "cp", target_filename, target2_filename)
1635         def _check_cp_out2((out,err)):
1636             self.failUnless(os.path.exists(target2_filename))
1637             got = open(target2_filename,"rb").read()
1638             self.failUnlessEqual(got, datas[4])
1639         d.addCallback(_check_cp_out2)
1640
1641         # copy from tahoe into disk, overwriting an existing file
1642         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1643         def _check_cp_out3((out,err)):
1644             self.failUnless(os.path.exists(target_filename))
1645             got = open(target_filename,"rb").read()
1646             self.failUnlessEqual(got, datas[3])
1647         d.addCallback(_check_cp_out3)
1648
1649         # copy from disk into tahoe, overwriting an existing immutable file
1650         d.addCallback(run, "cp", files[5], "tahoe:file4")
1651         d.addCallback(run, "ls")
1652         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1653         d.addCallback(run, "get", "tahoe:file4")
1654         d.addCallback(_check_stdout_against, 5)
1655
1656         # copy from disk into tahoe, overwriting an existing mutable file
1657         d.addCallback(run, "cp", files[5], "tahoe:file3")
1658         d.addCallback(run, "ls")
1659         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1660         d.addCallback(run, "get", "tahoe:file3")
1661         d.addCallback(_check_stdout_against, 5)
1662
1663         # recursive copy: setup
1664         dn = os.path.join(self.basedir, "dir1")
1665         os.makedirs(dn)
1666         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1667         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1668         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1669         sdn2 = os.path.join(dn, "subdir2")
1670         os.makedirs(sdn2)
1671         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1672         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1673
1674         # from disk into tahoe
1675         d.addCallback(run, "cp", "-r", dn, "tahoe:")
1676         d.addCallback(run, "ls")
1677         d.addCallback(_check_ls, ["dir1"])
1678         d.addCallback(run, "ls", "dir1")
1679         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1680                       ["rfile4", "rfile5"])
1681         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1682         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1683                       ["rfile1", "rfile2", "rfile3"])
1684         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1685         d.addCallback(_check_stdout_against, data="rfile4")
1686
1687         # and back out again
1688         dn_copy = os.path.join(self.basedir, "dir1-copy")
1689         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1690         def _check_cp_r_out((out,err)):
1691             def _cmp(name):
1692                 old = open(os.path.join(dn, name), "rb").read()
1693                 newfn = os.path.join(dn_copy, "dir1", name)
1694                 self.failUnless(os.path.exists(newfn))
1695                 new = open(newfn, "rb").read()
1696                 self.failUnlessEqual(old, new)
1697             _cmp("rfile1")
1698             _cmp("rfile2")
1699             _cmp("rfile3")
1700             _cmp(os.path.join("subdir2", "rfile4"))
1701             _cmp(os.path.join("subdir2", "rfile5"))
1702         d.addCallback(_check_cp_r_out)
1703
1704         # and copy it a second time, which ought to overwrite the same files
1705         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1706
1707         # and again, only writing filecaps
1708         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1709         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1710         def _check_capsonly((out,err)):
1711             # these should all be LITs
1712             x = open(os.path.join(dn_copy2, "dir1", "subdir2", "rfile4")).read()
1713             y = uri.from_string_filenode(x)
1714             self.failUnlessEqual(y.data, "rfile4")
1715         d.addCallback(_check_capsonly)
1716
1717         # and tahoe-to-tahoe
1718         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1719         d.addCallback(run, "ls")
1720         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1721         d.addCallback(run, "ls", "dir1-copy/dir1")
1722         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1723                       ["rfile4", "rfile5"])
1724         d.addCallback(run, "ls", "tahoe:dir1-copy/dir1/subdir2")
1725         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1726                       ["rfile1", "rfile2", "rfile3"])
1727         d.addCallback(run, "get", "dir1-copy/dir1/subdir2/rfile4")
1728         d.addCallback(_check_stdout_against, data="rfile4")
1729
1730         # and copy it a second time, which ought to overwrite the same files
1731         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1732
1733         # tahoe_ls doesn't currently handle the error correctly: it tries to
1734         # JSON-parse a traceback.
1735 ##         def _ls_missing(res):
1736 ##             argv = nodeargs + ["ls", "bogus"]
1737 ##             return self._run_cli(argv)
1738 ##         d.addCallback(_ls_missing)
1739 ##         def _check_ls_missing((out,err)):
1740 ##             print "OUT", out
1741 ##             print "ERR", err
1742 ##             self.failUnlessEqual(err, "")
1743 ##         d.addCallback(_check_ls_missing)
1744
1745         return d
1746
1747     def test_filesystem_with_cli_in_subprocess(self):
1748         # We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.
1749
1750         self.basedir = "system/SystemTest/test_filesystem_with_cli_in_subprocess"
1751         d = self.set_up_nodes()
1752         def _new_happy_semantics(ign):
1753             for c in self.clients:
1754                 c.encoding_params['happy'] = 1
1755         d.addCallback(_new_happy_semantics)
1756
1757         def _run_in_subprocess(ignored, verb, *args, **kwargs):
1758             stdin = kwargs.get("stdin")
1759             env = kwargs.get("env")
1760             newargs = ["--node-directory", self.getdir("client0"), verb] + list(args)
1761             return self.run_bintahoe(newargs, stdin=stdin, env=env)
1762
1763         def _check_succeeded(res, check_stderr=True):
1764             out, err, rc_or_sig = res
1765             self.failUnlessEqual(rc_or_sig, 0, str(res))
1766             if check_stderr:
1767                 self.failUnlessEqual(err, "")
1768
1769         d.addCallback(_run_in_subprocess, "create-alias", "newalias")
1770         d.addCallback(_check_succeeded)
1771
1772         STDIN_DATA = "This is the file to upload from stdin."
1773         d.addCallback(_run_in_subprocess, "put", "-", "newalias:tahoe-file", stdin=STDIN_DATA)
1774         d.addCallback(_check_succeeded, check_stderr=False)
1775
1776         def _mv_with_http_proxy(ign):
1777             env = os.environ
1778             env['http_proxy'] = env['HTTP_PROXY'] = "http://127.0.0.0:12345"  # invalid address
1779             return _run_in_subprocess(None, "mv", "newalias:tahoe-file", "newalias:tahoe-moved", env=env)
1780         d.addCallback(_mv_with_http_proxy)
1781         d.addCallback(_check_succeeded)
1782
1783         d.addCallback(_run_in_subprocess, "ls", "newalias:")
1784         def _check_ls(res):
1785             out, err, rc_or_sig = res
1786             self.failUnlessEqual(rc_or_sig, 0, str(res))
1787             self.failUnlessEqual(err, "", str(res))
1788             self.failUnlessIn("tahoe-moved", out)
1789             self.failIfIn("tahoe-file", out)
1790         d.addCallback(_check_ls)
1791         return d
1792
1793     def test_debug_trial(self):
1794         def _check_for_line(lines, result, test):
1795             for l in lines:
1796                 if result in l and test in l:
1797                     return
1798             self.fail("output (prefixed with '##') does not have a line containing both %r and %r:\n## %s"
1799                       % (result, test, "\n## ".join(lines)))
1800
1801         def _check_for_outcome(lines, out, outcome):
1802             self.failUnlessIn(outcome, out, "output (prefixed with '##') does not contain %r:\n## %s"
1803                                             % (outcome, "\n## ".join(lines)))
1804
1805         d = self.run_bintahoe(['debug', 'trial', '--reporter=verbose',
1806                                'allmydata.test.trialtest'])
1807         def _check_failure( (out, err, rc) ):
1808             self.failUnlessEqual(rc, 1)
1809             lines = out.split('\n')
1810             _check_for_line(lines, "[SKIPPED]", "test_skip")
1811             _check_for_line(lines, "[TODO]",    "test_todo")
1812             _check_for_line(lines, "[FAIL]",    "test_fail")
1813             _check_for_line(lines, "[ERROR]",   "test_deferred_error")
1814             _check_for_line(lines, "[ERROR]",   "test_error")
1815             _check_for_outcome(lines, out, "FAILED")
1816         d.addCallback(_check_failure)
1817
1818         # the --quiet argument regression-tests a problem in finding which arguments to pass to trial
1819         d.addCallback(lambda ign: self.run_bintahoe(['--quiet', 'debug', 'trial', '--reporter=verbose',
1820                                                      'allmydata.test.trialtest.Success']))
1821         def _check_success( (out, err, rc) ):
1822             self.failUnlessEqual(rc, 0)
1823             lines = out.split('\n')
1824             _check_for_line(lines, "[SKIPPED]", "test_skip")
1825             _check_for_line(lines, "[TODO]",    "test_todo")
1826             _check_for_outcome(lines, out, "PASSED")
1827         d.addCallback(_check_success)
1828         return d
1829
1830     def _run_cli(self, argv, stdin=""):
1831         #print "CLI:", argv
1832         stdout, stderr = StringIO(), StringIO()
1833         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1834                                   stdin=StringIO(stdin),
1835                                   stdout=stdout, stderr=stderr)
1836         def _done(res):
1837             return stdout.getvalue(), stderr.getvalue()
1838         d.addCallback(_done)
1839         return d
1840
1841     def _test_checker(self, res):
1842         ut = upload.Data("too big to be literal" * 200, convergence=None)
1843         d = self._personal_node.add_file(u"big file", ut)
1844
1845         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1846         def _check_dirnode_results(r):
1847             self.failUnless(r.is_healthy())
1848         d.addCallback(_check_dirnode_results)
1849         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1850         d.addCallback(_check_dirnode_results)
1851
1852         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1853         def _got_chk_filenode(n):
1854             self.failUnless(isinstance(n, ImmutableFileNode))
1855             d = n.check(Monitor())
1856             def _check_filenode_results(r):
1857                 self.failUnless(r.is_healthy())
1858             d.addCallback(_check_filenode_results)
1859             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1860             d.addCallback(_check_filenode_results)
1861             return d
1862         d.addCallback(_got_chk_filenode)
1863
1864         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1865         def _got_lit_filenode(n):
1866             self.failUnless(isinstance(n, LiteralFileNode))
1867             d = n.check(Monitor())
1868             def _check_lit_filenode_results(r):
1869                 self.failUnlessEqual(r, None)
1870             d.addCallback(_check_lit_filenode_results)
1871             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1872             d.addCallback(_check_lit_filenode_results)
1873             return d
1874         d.addCallback(_got_lit_filenode)
1875         return d
1876
1877
1878 class Connections(SystemTestMixin, unittest.TestCase):
1879     def test_rref(self):
1880         if NormalizedVersion(foolscap.__version__) < NormalizedVersion('0.6.4'):
1881             raise unittest.SkipTest("skipped due to http://foolscap.lothar.com/trac/ticket/196 "
1882                                     "(which does not affect normal usage of Tahoe-LAFS)")
1883
1884         self.basedir = "system/Connections/rref"
1885         d = self.set_up_nodes(2)
1886         def _start(ign):
1887             self.c0 = self.clients[0]
1888             nonclients = [s for s in self.c0.storage_broker.get_connected_servers()
1889                           if s.get_serverid() != self.c0.nodeid]
1890             self.failUnlessEqual(len(nonclients), 1)
1891
1892             self.s1 = nonclients[0]  # s1 is the server, not c0
1893             self.s1_rref = self.s1.get_rref()
1894             self.failIfEqual(self.s1_rref, None)
1895             self.failUnless(self.s1.is_connected())
1896         d.addCallback(_start)
1897
1898         # now shut down the server
1899         d.addCallback(lambda ign: self.clients[1].disownServiceParent())
1900         # and wait for the client to notice
1901         def _poll():
1902             return len(self.c0.storage_broker.get_connected_servers()) < 2
1903         d.addCallback(lambda ign: self.poll(_poll))
1904
1905         def _down(ign):
1906             self.failIf(self.s1.is_connected())
1907             rref = self.s1.get_rref()
1908             self.failUnless(rref)
1909             self.failUnlessIdentical(rref, self.s1_rref)
1910         d.addCallback(_down)
1911         return d