]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
4ffbb29cea50f95bbb63ca293962f826171c03b2
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1
2 import os, re, sys, time, simplejson
3 from cStringIO import StringIO
4
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8
9 import allmydata
10 from allmydata import uri
11 from allmydata.storage.mutable import MutableShareFile
12 from allmydata.storage.server import si_a2b
13 from allmydata.immutable import offloaded, upload
14 from allmydata.immutable.literal import LiteralFileNode
15 from allmydata.immutable.filenode import ImmutableFileNode
16 from allmydata.util import idlib, mathutil
17 from allmydata.util import log, base32
18 from allmydata.util.verlib import NormalizedVersion
19 from allmydata.util.encodingutil import quote_output, unicode_to_argv
20 from allmydata.util.fileutil import abspath_expanduser_unicode
21 from allmydata.util.consumer import MemoryConsumer, download_to_data
22 from allmydata.scripts import runner
23 from allmydata.interfaces import IDirectoryNode, IFileNode, \
24      NoSuchChildError, NoSharesError
25 from allmydata.monitor import Monitor
26 from allmydata.mutable.common import NotWriteableError
27 from allmydata.mutable import layout as mutable_layout
28 from allmydata.mutable.publish import MutableData
29
30 import foolscap
31 from foolscap.api import DeadReferenceError, fireEventually
32 from twisted.python.failure import Failure
33 from twisted.web.client import getPage
34 from twisted.web.error import Error
35
36 from allmydata.test.common import SystemTestMixin
37
38 # TODO: move this to common or common_util
39 from allmydata.test.test_runner import RunBinTahoeMixin
40
41 LARGE_DATA = """
42 This is some data to publish to the remote grid.., which needs to be large
43 enough to not fit inside a LIT uri.
44 """
45
46 class CountingDataUploadable(upload.Data):
47     bytes_read = 0
48     interrupt_after = None
49     interrupt_after_d = None
50
51     def read(self, length):
52         self.bytes_read += length
53         if self.interrupt_after is not None:
54             if self.bytes_read > self.interrupt_after:
55                 self.interrupt_after = None
56                 self.interrupt_after_d.callback(self)
57         return upload.Data.read(self, length)
58
59 class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
60     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
61
62     def test_connections(self):
63         self.basedir = "system/SystemTest/test_connections"
64         d = self.set_up_nodes()
65         self.extra_node = None
66         d.addCallback(lambda res: self.add_extra_node(self.numclients))
67         def _check(extra_node):
68             self.extra_node = extra_node
69             for c in self.clients:
70                 all_peerids = c.get_storage_broker().get_all_serverids()
71                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
72                 sb = c.storage_broker
73                 permuted_peers = sb.get_servers_for_psi("a")
74                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
75
76         d.addCallback(_check)
77         def _shutdown_extra_node(res):
78             if self.extra_node:
79                 return self.extra_node.stopService()
80             return res
81         d.addBoth(_shutdown_extra_node)
82         return d
83     # test_connections is subsumed by test_upload_and_download, and takes
84     # quite a while to run on a slow machine (because of all the TLS
85     # connections that must be established). If we ever rework the introducer
86     # code to such an extent that we're not sure if it works anymore, we can
87     # reinstate this test until it does.
88     del test_connections
89
90     def test_upload_and_download_random_key(self):
91         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
92         return self._test_upload_and_download(convergence=None)
93
94     def test_upload_and_download_convergent(self):
95         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
96         return self._test_upload_and_download(convergence="some convergence string")
97
98     def _test_upload_and_download(self, convergence):
99         # we use 4000 bytes of data, which will result in about 400k written
100         # to disk among all our simulated nodes
101         DATA = "Some data to upload\n" * 200
102         d = self.set_up_nodes()
103         def _check_connections(res):
104             for c in self.clients:
105                 c.encoding_params['happy'] = 5
106                 all_peerids = c.get_storage_broker().get_all_serverids()
107                 self.failUnlessEqual(len(all_peerids), self.numclients)
108                 sb = c.storage_broker
109                 permuted_peers = sb.get_servers_for_psi("a")
110                 self.failUnlessEqual(len(permuted_peers), self.numclients)
111         d.addCallback(_check_connections)
112
113         def _do_upload(res):
114             log.msg("UPLOADING")
115             u = self.clients[0].getServiceNamed("uploader")
116             self.uploader = u
117             # we crank the max segsize down to 1024b for the duration of this
118             # test, so we can exercise multiple segments. It is important
119             # that this is not a multiple of the segment size, so that the
120             # tail segment is not the same length as the others. This actualy
121             # gets rounded up to 1025 to be a multiple of the number of
122             # required shares (since we use 25 out of 100 FEC).
123             up = upload.Data(DATA, convergence=convergence)
124             up.max_segment_size = 1024
125             d1 = u.upload(up)
126             return d1
127         d.addCallback(_do_upload)
128         def _upload_done(results):
129             theuri = results.get_uri()
130             log.msg("upload finished: uri is %s" % (theuri,))
131             self.uri = theuri
132             assert isinstance(self.uri, str), self.uri
133             self.cap = uri.from_string(self.uri)
134             self.n = self.clients[1].create_node_from_uri(self.uri)
135         d.addCallback(_upload_done)
136
137         def _upload_again(res):
138             # Upload again. If using convergent encryption then this ought to be
139             # short-circuited, however with the way we currently generate URIs
140             # (i.e. because they include the roothash), we have to do all of the
141             # encoding work, and only get to save on the upload part.
142             log.msg("UPLOADING AGAIN")
143             up = upload.Data(DATA, convergence=convergence)
144             up.max_segment_size = 1024
145             return self.uploader.upload(up)
146         d.addCallback(_upload_again)
147
148         def _download_to_data(res):
149             log.msg("DOWNLOADING")
150             return download_to_data(self.n)
151         d.addCallback(_download_to_data)
152         def _download_to_data_done(data):
153             log.msg("download finished")
154             self.failUnlessEqual(data, DATA)
155         d.addCallback(_download_to_data_done)
156
157         def _test_read(res):
158             n = self.clients[1].create_node_from_uri(self.uri)
159             d = download_to_data(n)
160             def _read_done(data):
161                 self.failUnlessEqual(data, DATA)
162             d.addCallback(_read_done)
163             d.addCallback(lambda ign:
164                           n.read(MemoryConsumer(), offset=1, size=4))
165             def _read_portion_done(mc):
166                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
167             d.addCallback(_read_portion_done)
168             d.addCallback(lambda ign:
169                           n.read(MemoryConsumer(), offset=2, size=None))
170             def _read_tail_done(mc):
171                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
172             d.addCallback(_read_tail_done)
173             d.addCallback(lambda ign:
174                           n.read(MemoryConsumer(), size=len(DATA)+1000))
175             def _read_too_much(mc):
176                 self.failUnlessEqual("".join(mc.chunks), DATA)
177             d.addCallback(_read_too_much)
178
179             return d
180         d.addCallback(_test_read)
181
182         def _test_bad_read(res):
183             bad_u = uri.from_string_filenode(self.uri)
184             bad_u.key = self.flip_bit(bad_u.key)
185             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
186             # this should cause an error during download
187
188             d = self.shouldFail2(NoSharesError, "'download bad node'",
189                                  None,
190                                  bad_n.read, MemoryConsumer(), offset=2)
191             return d
192         d.addCallback(_test_bad_read)
193
194         def _download_nonexistent_uri(res):
195             baduri = self.mangle_uri(self.uri)
196             badnode = self.clients[1].create_node_from_uri(baduri)
197             log.msg("about to download non-existent URI", level=log.UNUSUAL,
198                     facility="tahoe.tests")
199             d1 = download_to_data(badnode)
200             def _baduri_should_fail(res):
201                 log.msg("finished downloading non-existent URI",
202                         level=log.UNUSUAL, facility="tahoe.tests")
203                 self.failUnless(isinstance(res, Failure))
204                 self.failUnless(res.check(NoSharesError),
205                                 "expected NoSharesError, got %s" % res)
206             d1.addBoth(_baduri_should_fail)
207             return d1
208         d.addCallback(_download_nonexistent_uri)
209
210         # add a new node, which doesn't accept shares, and only uses the
211         # helper for upload.
212         d.addCallback(lambda res: self.add_extra_node(self.numclients,
213                                                       self.helper_furl,
214                                                       add_to_sparent=True))
215         def _added(extra_node):
216             self.extra_node = extra_node
217             self.extra_node.encoding_params['happy'] = 5
218         d.addCallback(_added)
219
220         def _has_helper():
221             uploader = self.extra_node.getServiceNamed("uploader")
222             furl, connected = uploader.get_helper_info()
223             return connected
224         d.addCallback(lambda ign: self.poll(_has_helper))
225
226         HELPER_DATA = "Data that needs help to upload" * 1000
227         def _upload_with_helper(res):
228             u = upload.Data(HELPER_DATA, convergence=convergence)
229             d = self.extra_node.upload(u)
230             def _uploaded(results):
231                 n = self.clients[1].create_node_from_uri(results.get_uri())
232                 return download_to_data(n)
233             d.addCallback(_uploaded)
234             def _check(newdata):
235                 self.failUnlessEqual(newdata, HELPER_DATA)
236             d.addCallback(_check)
237             return d
238         d.addCallback(_upload_with_helper)
239
240         def _upload_duplicate_with_helper(res):
241             u = upload.Data(HELPER_DATA, convergence=convergence)
242             u.debug_stash_RemoteEncryptedUploadable = True
243             d = self.extra_node.upload(u)
244             def _uploaded(results):
245                 n = self.clients[1].create_node_from_uri(results.get_uri())
246                 return download_to_data(n)
247             d.addCallback(_uploaded)
248             def _check(newdata):
249                 self.failUnlessEqual(newdata, HELPER_DATA)
250                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
251                             "uploadable started uploading, should have been avoided")
252             d.addCallback(_check)
253             return d
254         if convergence is not None:
255             d.addCallback(_upload_duplicate_with_helper)
256
257         d.addCallback(fireEventually)
258
259         def _upload_resumable(res):
260             DATA = "Data that needs help to upload and gets interrupted" * 1000
261             u1 = CountingDataUploadable(DATA, convergence=convergence)
262             u2 = CountingDataUploadable(DATA, convergence=convergence)
263
264             # we interrupt the connection after about 5kB by shutting down
265             # the helper, then restarting it.
266             u1.interrupt_after = 5000
267             u1.interrupt_after_d = defer.Deferred()
268             bounced_d = defer.Deferred()
269             def _do_bounce(res):
270                 d = self.bounce_client(0)
271                 d.addBoth(bounced_d.callback)
272             u1.interrupt_after_d.addCallback(_do_bounce)
273
274             # sneak into the helper and reduce its chunk size, so that our
275             # debug_interrupt will sever the connection on about the fifth
276             # chunk fetched. This makes sure that we've started to write the
277             # new shares before we abandon them, which exercises the
278             # abort/delete-partial-share code. TODO: find a cleaner way to do
279             # this. I know that this will affect later uses of the helper in
280             # this same test run, but I'm not currently worried about it.
281             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
282
283             upload_d = self.extra_node.upload(u1)
284             # The upload will start, and bounce_client() will be called after
285             # about 5kB. bounced_d will fire after bounce_client() finishes
286             # shutting down and restarting the node.
287             d = bounced_d
288             def _bounced(ign):
289                 # By this point, the upload should have failed because of the
290                 # interruption. upload_d will fire in a moment
291                 def _should_not_finish(res):
292                     self.fail("interrupted upload should have failed, not"
293                               " finished with result %s" % (res,))
294                 def _interrupted(f):
295                     f.trap(DeadReferenceError)
296                     # make sure we actually interrupted it before finishing
297                     # the file
298                     self.failUnless(u1.bytes_read < len(DATA),
299                                     "read %d out of %d total" %
300                                     (u1.bytes_read, len(DATA)))
301                 upload_d.addCallbacks(_should_not_finish, _interrupted)
302                 return upload_d
303             d.addCallback(_bounced)
304
305             def _disconnected(res):
306                 # check to make sure the storage servers aren't still hanging
307                 # on to the partial share: their incoming/ directories should
308                 # now be empty.
309                 log.msg("disconnected", level=log.NOISY,
310                         facility="tahoe.test.test_system")
311                 for i in range(self.numclients):
312                     incdir = os.path.join(self.getdir("client%d" % i),
313                                           "storage", "shares", "incoming")
314                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
315             d.addCallback(_disconnected)
316
317             d.addCallback(lambda res:
318                           log.msg("wait_for_helper", level=log.NOISY,
319                                   facility="tahoe.test.test_system"))
320             # then we need to wait for the extra node to reestablish its
321             # connection to the helper.
322             d.addCallback(lambda ign: self.poll(_has_helper))
323
324             d.addCallback(lambda res:
325                           log.msg("uploading again", level=log.NOISY,
326                                   facility="tahoe.test.test_system"))
327             d.addCallback(lambda res: self.extra_node.upload(u2))
328
329             def _uploaded(results):
330                 cap = results.get_uri()
331                 log.msg("Second upload complete", level=log.NOISY,
332                         facility="tahoe.test.test_system")
333
334                 # this is really bytes received rather than sent, but it's
335                 # convenient and basically measures the same thing
336                 bytes_sent = results.get_ciphertext_fetched()
337                 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
338
339                 # We currently don't support resumption of upload if the data is
340                 # encrypted with a random key.  (Because that would require us
341                 # to store the key locally and re-use it on the next upload of
342                 # this file, which isn't a bad thing to do, but we currently
343                 # don't do it.)
344                 if convergence is not None:
345                     # Make sure we did not have to read the whole file the
346                     # second time around .
347                     self.failUnless(bytes_sent < len(DATA),
348                                     "resumption didn't save us any work:"
349                                     " read %r bytes out of %r total" %
350                                     (bytes_sent, len(DATA)))
351                 else:
352                     # Make sure we did have to read the whole file the second
353                     # time around -- because the one that we partially uploaded
354                     # earlier was encrypted with a different random key.
355                     self.failIf(bytes_sent < len(DATA),
356                                 "resumption saved us some work even though we were using random keys:"
357                                 " read %r bytes out of %r total" %
358                                 (bytes_sent, len(DATA)))
359                 n = self.clients[1].create_node_from_uri(cap)
360                 return download_to_data(n)
361             d.addCallback(_uploaded)
362
363             def _check(newdata):
364                 self.failUnlessEqual(newdata, DATA)
365                 # If using convergent encryption, then also check that the
366                 # helper has removed the temp file from its directories.
367                 if convergence is not None:
368                     basedir = os.path.join(self.getdir("client0"), "helper")
369                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
370                     self.failUnlessEqual(files, [])
371                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
372                     self.failUnlessEqual(files, [])
373             d.addCallback(_check)
374             return d
375         d.addCallback(_upload_resumable)
376
377         def _grab_stats(ignored):
378             # the StatsProvider doesn't normally publish a FURL:
379             # instead it passes a live reference to the StatsGatherer
380             # (if and when it connects). To exercise the remote stats
381             # interface, we manually publish client0's StatsProvider
382             # and use client1 to query it.
383             sp = self.clients[0].stats_provider
384             sp_furl = self.clients[0].tub.registerReference(sp)
385             d = self.clients[1].tub.getReference(sp_furl)
386             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
387             def _got_stats(stats):
388                 #print "STATS"
389                 #from pprint import pprint
390                 #pprint(stats)
391                 s = stats["stats"]
392                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
393                 c = stats["counters"]
394                 self.failUnless("storage_server.allocate" in c)
395             d.addCallback(_got_stats)
396             return d
397         d.addCallback(_grab_stats)
398
399         return d
400
401     def _find_all_shares(self, basedir):
402         shares = []
403         for (dirpath, dirnames, filenames) in os.walk(basedir):
404             if "storage" not in dirpath:
405                 continue
406             if not filenames:
407                 continue
408             pieces = dirpath.split(os.sep)
409             if (len(pieces) >= 5
410                 and pieces[-4] == "storage"
411                 and pieces[-3] == "shares"):
412                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
413                 # are sharefiles here
414                 assert pieces[-5].startswith("client")
415                 client_num = int(pieces[-5][-1])
416                 storage_index_s = pieces[-1]
417                 storage_index = si_a2b(storage_index_s)
418                 for sharename in filenames:
419                     shnum = int(sharename)
420                     filename = os.path.join(dirpath, sharename)
421                     data = (client_num, storage_index, filename, shnum)
422                     shares.append(data)
423         if not shares:
424             self.fail("unable to find any share files in %s" % basedir)
425         return shares
426
427     def _corrupt_mutable_share(self, filename, which):
428         msf = MutableShareFile(filename)
429         datav = msf.readv([ (0, 1000000) ])
430         final_share = datav[0]
431         assert len(final_share) < 1000000 # ought to be truncated
432         pieces = mutable_layout.unpack_share(final_share)
433         (seqnum, root_hash, IV, k, N, segsize, datalen,
434          verification_key, signature, share_hash_chain, block_hash_tree,
435          share_data, enc_privkey) = pieces
436
437         if which == "seqnum":
438             seqnum = seqnum + 15
439         elif which == "R":
440             root_hash = self.flip_bit(root_hash)
441         elif which == "IV":
442             IV = self.flip_bit(IV)
443         elif which == "segsize":
444             segsize = segsize + 15
445         elif which == "pubkey":
446             verification_key = self.flip_bit(verification_key)
447         elif which == "signature":
448             signature = self.flip_bit(signature)
449         elif which == "share_hash_chain":
450             nodenum = share_hash_chain.keys()[0]
451             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
452         elif which == "block_hash_tree":
453             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
454         elif which == "share_data":
455             share_data = self.flip_bit(share_data)
456         elif which == "encprivkey":
457             enc_privkey = self.flip_bit(enc_privkey)
458
459         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
460                                             segsize, datalen)
461         final_share = mutable_layout.pack_share(prefix,
462                                                 verification_key,
463                                                 signature,
464                                                 share_hash_chain,
465                                                 block_hash_tree,
466                                                 share_data,
467                                                 enc_privkey)
468         msf.writev( [(0, final_share)], None)
469
470
471     def test_mutable(self):
472         self.basedir = "system/SystemTest/test_mutable"
473         DATA = "initial contents go here."  # 25 bytes % 3 != 0
474         DATA_uploadable = MutableData(DATA)
475         NEWDATA = "new contents yay"
476         NEWDATA_uploadable = MutableData(NEWDATA)
477         NEWERDATA = "this is getting old"
478         NEWERDATA_uploadable = MutableData(NEWERDATA)
479
480         d = self.set_up_nodes(use_key_generator=True)
481
482         def _create_mutable(res):
483             c = self.clients[0]
484             log.msg("starting create_mutable_file")
485             d1 = c.create_mutable_file(DATA_uploadable)
486             def _done(res):
487                 log.msg("DONE: %s" % (res,))
488                 self._mutable_node_1 = res
489             d1.addCallback(_done)
490             return d1
491         d.addCallback(_create_mutable)
492
493         def _test_debug(res):
494             # find a share. It is important to run this while there is only
495             # one slot in the grid.
496             shares = self._find_all_shares(self.basedir)
497             (client_num, storage_index, filename, shnum) = shares[0]
498             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
499                     % filename)
500             log.msg(" for clients[%d]" % client_num)
501
502             out,err = StringIO(), StringIO()
503             rc = runner.runner(["debug", "dump-share", "--offsets",
504                                 filename],
505                                stdout=out, stderr=err)
506             output = out.getvalue()
507             self.failUnlessEqual(rc, 0)
508             try:
509                 self.failUnless("Mutable slot found:\n" in output)
510                 self.failUnless("share_type: SDMF\n" in output)
511                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
512                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
513                 self.failUnless(" num_extra_leases: 0\n" in output)
514                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
515                                 in output)
516                 self.failUnless(" SDMF contents:\n" in output)
517                 self.failUnless("  seqnum: 1\n" in output)
518                 self.failUnless("  required_shares: 3\n" in output)
519                 self.failUnless("  total_shares: 10\n" in output)
520                 self.failUnless("  segsize: 27\n" in output, (output, filename))
521                 self.failUnless("  datalen: 25\n" in output)
522                 # the exact share_hash_chain nodes depends upon the sharenum,
523                 # and is more of a hassle to compute than I want to deal with
524                 # now
525                 self.failUnless("  share_hash_chain: " in output)
526                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
527                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
528                             base32.b2a(storage_index))
529                 self.failUnless(expected in output)
530             except unittest.FailTest:
531                 print
532                 print "dump-share output was:"
533                 print output
534                 raise
535         d.addCallback(_test_debug)
536
537         # test retrieval
538
539         # first, let's see if we can use the existing node to retrieve the
540         # contents. This allows it to use the cached pubkey and maybe the
541         # latest-known sharemap.
542
543         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
544         def _check_download_1(res):
545             self.failUnlessEqual(res, DATA)
546             # now we see if we can retrieve the data from a new node,
547             # constructed using the URI of the original one. We do this test
548             # on the same client that uploaded the data.
549             uri = self._mutable_node_1.get_uri()
550             log.msg("starting retrieve1")
551             newnode = self.clients[0].create_node_from_uri(uri)
552             newnode_2 = self.clients[0].create_node_from_uri(uri)
553             self.failUnlessIdentical(newnode, newnode_2)
554             return newnode.download_best_version()
555         d.addCallback(_check_download_1)
556
557         def _check_download_2(res):
558             self.failUnlessEqual(res, DATA)
559             # same thing, but with a different client
560             uri = self._mutable_node_1.get_uri()
561             newnode = self.clients[1].create_node_from_uri(uri)
562             log.msg("starting retrieve2")
563             d1 = newnode.download_best_version()
564             d1.addCallback(lambda res: (res, newnode))
565             return d1
566         d.addCallback(_check_download_2)
567
568         def _check_download_3((res, newnode)):
569             self.failUnlessEqual(res, DATA)
570             # replace the data
571             log.msg("starting replace1")
572             d1 = newnode.overwrite(NEWDATA_uploadable)
573             d1.addCallback(lambda res: newnode.download_best_version())
574             return d1
575         d.addCallback(_check_download_3)
576
577         def _check_download_4(res):
578             self.failUnlessEqual(res, NEWDATA)
579             # now create an even newer node and replace the data on it. This
580             # new node has never been used for download before.
581             uri = self._mutable_node_1.get_uri()
582             newnode1 = self.clients[2].create_node_from_uri(uri)
583             newnode2 = self.clients[3].create_node_from_uri(uri)
584             self._newnode3 = self.clients[3].create_node_from_uri(uri)
585             log.msg("starting replace2")
586             d1 = newnode1.overwrite(NEWERDATA_uploadable)
587             d1.addCallback(lambda res: newnode2.download_best_version())
588             return d1
589         d.addCallback(_check_download_4)
590
591         def _check_download_5(res):
592             log.msg("finished replace2")
593             self.failUnlessEqual(res, NEWERDATA)
594         d.addCallback(_check_download_5)
595
596         def _corrupt_shares(res):
597             # run around and flip bits in all but k of the shares, to test
598             # the hash checks
599             shares = self._find_all_shares(self.basedir)
600             ## sort by share number
601             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
602             where = dict([ (shnum, filename)
603                            for (client_num, storage_index, filename, shnum)
604                            in shares ])
605             assert len(where) == 10 # this test is designed for 3-of-10
606             for shnum, filename in where.items():
607                 # shares 7,8,9 are left alone. read will check
608                 # (share_hash_chain, block_hash_tree, share_data). New
609                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
610                 # segsize, signature).
611                 if shnum == 0:
612                     # read: this will trigger "pubkey doesn't match
613                     # fingerprint".
614                     self._corrupt_mutable_share(filename, "pubkey")
615                     self._corrupt_mutable_share(filename, "encprivkey")
616                 elif shnum == 1:
617                     # triggers "signature is invalid"
618                     self._corrupt_mutable_share(filename, "seqnum")
619                 elif shnum == 2:
620                     # triggers "signature is invalid"
621                     self._corrupt_mutable_share(filename, "R")
622                 elif shnum == 3:
623                     # triggers "signature is invalid"
624                     self._corrupt_mutable_share(filename, "segsize")
625                 elif shnum == 4:
626                     self._corrupt_mutable_share(filename, "share_hash_chain")
627                 elif shnum == 5:
628                     self._corrupt_mutable_share(filename, "block_hash_tree")
629                 elif shnum == 6:
630                     self._corrupt_mutable_share(filename, "share_data")
631                 # other things to correct: IV, signature
632                 # 7,8,9 are left alone
633
634                 # note that initial_query_count=5 means that we'll hit the
635                 # first 5 servers in effectively random order (based upon
636                 # response time), so we won't necessarily ever get a "pubkey
637                 # doesn't match fingerprint" error (if we hit shnum>=1 before
638                 # shnum=0, we pull the pubkey from there). To get repeatable
639                 # specific failures, we need to set initial_query_count=1,
640                 # but of course that will change the sequencing behavior of
641                 # the retrieval process. TODO: find a reasonable way to make
642                 # this a parameter, probably when we expand this test to test
643                 # for one failure mode at a time.
644
645                 # when we retrieve this, we should get three signature
646                 # failures (where we've mangled seqnum, R, and segsize). The
647                 # pubkey mangling
648         d.addCallback(_corrupt_shares)
649
650         d.addCallback(lambda res: self._newnode3.download_best_version())
651         d.addCallback(_check_download_5)
652
653         def _check_empty_file(res):
654             # make sure we can create empty files, this usually screws up the
655             # segsize math
656             d1 = self.clients[2].create_mutable_file(MutableData(""))
657             d1.addCallback(lambda newnode: newnode.download_best_version())
658             d1.addCallback(lambda res: self.failUnlessEqual("", res))
659             return d1
660         d.addCallback(_check_empty_file)
661
662         d.addCallback(lambda res: self.clients[0].create_dirnode())
663         def _created_dirnode(dnode):
664             log.msg("_created_dirnode(%s)" % (dnode,))
665             d1 = dnode.list()
666             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
667             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
668             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
669             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
670             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
671             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
672             d1.addCallback(lambda res: dnode.build_manifest().when_done())
673             d1.addCallback(lambda res:
674                            self.failUnlessEqual(len(res["manifest"]), 1))
675             return d1
676         d.addCallback(_created_dirnode)
677
678         def wait_for_c3_kg_conn():
679             return self.clients[3]._key_generator is not None
680         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
681
682         def check_kg_poolsize(junk, size_delta):
683             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
684                                  self.key_generator_svc.key_generator.pool_size + size_delta)
685
686         d.addCallback(check_kg_poolsize, 0)
687         d.addCallback(lambda junk:
688             self.clients[3].create_mutable_file(MutableData('hello, world')))
689         d.addCallback(check_kg_poolsize, -1)
690         d.addCallback(lambda junk: self.clients[3].create_dirnode())
691         d.addCallback(check_kg_poolsize, -2)
692         # use_helper induces use of clients[3], which is the using-key_gen client
693         d.addCallback(lambda junk:
694                       self.POST("uri?t=mkdir&name=george", use_helper=True))
695         d.addCallback(check_kg_poolsize, -3)
696
697         return d
698
699     def flip_bit(self, good):
700         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
701
702     def mangle_uri(self, gooduri):
703         # change the key, which changes the storage index, which means we'll
704         # be asking about the wrong file, so nobody will have any shares
705         u = uri.from_string(gooduri)
706         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
707                             uri_extension_hash=u.uri_extension_hash,
708                             needed_shares=u.needed_shares,
709                             total_shares=u.total_shares,
710                             size=u.size)
711         return u2.to_string()
712
713     # TODO: add a test which mangles the uri_extension_hash instead, and
714     # should fail due to not being able to get a valid uri_extension block.
715     # Also a test which sneakily mangles the uri_extension block to change
716     # some of the validation data, so it will fail in the post-download phase
717     # when the file's crypttext integrity check fails. Do the same thing for
718     # the key, which should cause the download to fail the post-download
719     # plaintext_hash check.
720
721     def test_filesystem(self):
722         self.basedir = "system/SystemTest/test_filesystem"
723         self.data = LARGE_DATA
724         d = self.set_up_nodes(use_stats_gatherer=True)
725         def _new_happy_semantics(ign):
726             for c in self.clients:
727                 c.encoding_params['happy'] = 1
728         d.addCallback(_new_happy_semantics)
729         d.addCallback(self._test_introweb)
730         d.addCallback(self.log, "starting publish")
731         d.addCallback(self._do_publish1)
732         d.addCallback(self._test_runner)
733         d.addCallback(self._do_publish2)
734         # at this point, we have the following filesystem (where "R" denotes
735         # self._root_directory_uri):
736         # R
737         # R/subdir1
738         # R/subdir1/mydata567
739         # R/subdir1/subdir2/
740         # R/subdir1/subdir2/mydata992
741
742         d.addCallback(lambda res: self.bounce_client(0))
743         d.addCallback(self.log, "bounced client0")
744
745         d.addCallback(self._check_publish1)
746         d.addCallback(self.log, "did _check_publish1")
747         d.addCallback(self._check_publish2)
748         d.addCallback(self.log, "did _check_publish2")
749         d.addCallback(self._do_publish_private)
750         d.addCallback(self.log, "did _do_publish_private")
751         # now we also have (where "P" denotes a new dir):
752         #  P/personal/sekrit data
753         #  P/s2-rw -> /subdir1/subdir2/
754         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
755         d.addCallback(self._check_publish_private)
756         d.addCallback(self.log, "did _check_publish_private")
757         d.addCallback(self._test_web)
758         d.addCallback(self._test_control)
759         d.addCallback(self._test_cli)
760         # P now has four top-level children:
761         # P/personal/sekrit data
762         # P/s2-ro/
763         # P/s2-rw/
764         # P/test_put/  (empty)
765         d.addCallback(self._test_checker)
766         return d
767
768     def _test_introweb(self, res):
769         d = getPage(self.introweb_url, method="GET", followRedirect=True)
770         def _check(res):
771             try:
772                 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res)
773                 verstr = str(allmydata.__version__)
774
775                 # The Python "rational version numbering" convention
776                 # disallows "-r$REV" but allows ".post$REV"
777                 # instead. Eventually we'll probably move to
778                 # that. When we do, this test won't go red:
779                 ix = verstr.rfind('-r')
780                 if ix != -1:
781                     altverstr = verstr[:ix] + '.post' + verstr[ix+2:]
782                 else:
783                     ix = verstr.rfind('.post')
784                     if ix != -1:
785                         altverstr = verstr[:ix] + '-r' + verstr[ix+5:]
786                     else:
787                         altverstr = verstr
788
789                 appverstr = "%s: %s" % (allmydata.__appname__, verstr)
790                 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
791
792                 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
793                 self.failUnless("Announcement Summary: storage: 5" in res)
794                 self.failUnless("Subscription Summary: storage: 5" in res)
795                 self.failUnless("tahoe.css" in res)
796             except unittest.FailTest:
797                 print
798                 print "GET %s output was:" % self.introweb_url
799                 print res
800                 raise
801         d.addCallback(_check)
802         # make sure it serves the CSS too
803         d.addCallback(lambda res:
804                       getPage(self.introweb_url+"tahoe.css", method="GET"))
805         d.addCallback(lambda res:
806                       getPage(self.introweb_url + "?t=json",
807                               method="GET", followRedirect=True))
808         def _check_json(res):
809             data = simplejson.loads(res)
810             try:
811                 self.failUnlessEqual(data["subscription_summary"],
812                                      {"storage": 5})
813                 self.failUnlessEqual(data["announcement_summary"],
814                                      {"storage": 5})
815                 self.failUnlessEqual(data["announcement_distinct_hosts"],
816                                      {"storage": 1})
817             except unittest.FailTest:
818                 print
819                 print "GET %s?t=json output was:" % self.introweb_url
820                 print res
821                 raise
822         d.addCallback(_check_json)
823         return d
824
825     def _do_publish1(self, res):
826         ut = upload.Data(self.data, convergence=None)
827         c0 = self.clients[0]
828         d = c0.create_dirnode()
829         def _made_root(new_dirnode):
830             self._root_directory_uri = new_dirnode.get_uri()
831             return c0.create_node_from_uri(self._root_directory_uri)
832         d.addCallback(_made_root)
833         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
834         def _made_subdir1(subdir1_node):
835             self._subdir1_node = subdir1_node
836             d1 = subdir1_node.add_file(u"mydata567", ut)
837             d1.addCallback(self.log, "publish finished")
838             def _stash_uri(filenode):
839                 self.uri = filenode.get_uri()
840                 assert isinstance(self.uri, str), (self.uri, filenode)
841             d1.addCallback(_stash_uri)
842             return d1
843         d.addCallback(_made_subdir1)
844         return d
845
846     def _do_publish2(self, res):
847         ut = upload.Data(self.data, convergence=None)
848         d = self._subdir1_node.create_subdirectory(u"subdir2")
849         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
850         return d
851
852     def log(self, res, *args, **kwargs):
853         # print "MSG: %s  RES: %s" % (msg, args)
854         log.msg(*args, **kwargs)
855         return res
856
857     def _do_publish_private(self, res):
858         self.smalldata = "sssh, very secret stuff"
859         ut = upload.Data(self.smalldata, convergence=None)
860         d = self.clients[0].create_dirnode()
861         d.addCallback(self.log, "GOT private directory")
862         def _got_new_dir(privnode):
863             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
864             d1 = privnode.create_subdirectory(u"personal")
865             d1.addCallback(self.log, "made P/personal")
866             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
867             d1.addCallback(self.log, "made P/personal/sekrit data")
868             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
869             def _got_s2(s2node):
870                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
871                                       s2node.get_readonly_uri())
872                 d2.addCallback(lambda node:
873                                privnode.set_uri(u"s2-ro",
874                                                 s2node.get_readonly_uri(),
875                                                 s2node.get_readonly_uri()))
876                 return d2
877             d1.addCallback(_got_s2)
878             d1.addCallback(lambda res: privnode)
879             return d1
880         d.addCallback(_got_new_dir)
881         return d
882
883     def _check_publish1(self, res):
884         # this one uses the iterative API
885         c1 = self.clients[1]
886         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
887         d.addCallback(self.log, "check_publish1 got /")
888         d.addCallback(lambda root: root.get(u"subdir1"))
889         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
890         d.addCallback(lambda filenode: download_to_data(filenode))
891         d.addCallback(self.log, "get finished")
892         def _get_done(data):
893             self.failUnlessEqual(data, self.data)
894         d.addCallback(_get_done)
895         return d
896
897     def _check_publish2(self, res):
898         # this one uses the path-based API
899         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
900         d = rootnode.get_child_at_path(u"subdir1")
901         d.addCallback(lambda dirnode:
902                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
903         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
904         d.addCallback(lambda filenode: download_to_data(filenode))
905         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
906
907         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
908         def _got_filenode(filenode):
909             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
910             assert fnode == filenode
911         d.addCallback(_got_filenode)
912         return d
913
914     def _check_publish_private(self, resnode):
915         # this one uses the path-based API
916         self._private_node = resnode
917
918         d = self._private_node.get_child_at_path(u"personal")
919         def _got_personal(personal):
920             self._personal_node = personal
921             return personal
922         d.addCallback(_got_personal)
923
924         d.addCallback(lambda dirnode:
925                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
926         def get_path(path):
927             return self._private_node.get_child_at_path(path)
928
929         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
930         d.addCallback(lambda filenode: download_to_data(filenode))
931         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
932         d.addCallback(lambda res: get_path(u"s2-rw"))
933         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
934         d.addCallback(lambda res: get_path(u"s2-ro"))
935         def _got_s2ro(dirnode):
936             self.failUnless(dirnode.is_mutable(), dirnode)
937             self.failUnless(dirnode.is_readonly(), dirnode)
938             d1 = defer.succeed(None)
939             d1.addCallback(lambda res: dirnode.list())
940             d1.addCallback(self.log, "dirnode.list")
941
942             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
943
944             d1.addCallback(self.log, "doing add_file(ro)")
945             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
946             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
947
948             d1.addCallback(self.log, "doing get(ro)")
949             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
950             d1.addCallback(lambda filenode:
951                            self.failUnless(IFileNode.providedBy(filenode)))
952
953             d1.addCallback(self.log, "doing delete(ro)")
954             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
955
956             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
957
958             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
959
960             personal = self._personal_node
961             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
962
963             d1.addCallback(self.log, "doing move_child_to(ro)2")
964             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
965
966             d1.addCallback(self.log, "finished with _got_s2ro")
967             return d1
968         d.addCallback(_got_s2ro)
969         def _got_home(dummy):
970             home = self._private_node
971             personal = self._personal_node
972             d1 = defer.succeed(None)
973             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
974             d1.addCallback(lambda res:
975                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
976
977             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
978             d1.addCallback(lambda res:
979                            home.move_child_to(u"sekrit", home, u"sekrit data"))
980
981             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
982             d1.addCallback(lambda res:
983                            home.move_child_to(u"sekrit data", personal))
984
985             d1.addCallback(lambda res: home.build_manifest().when_done())
986             d1.addCallback(self.log, "manifest")
987             #  five items:
988             # P/
989             # P/personal/
990             # P/personal/sekrit data
991             # P/s2-rw  (same as P/s2-ro)
992             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
993             d1.addCallback(lambda res:
994                            self.failUnlessEqual(len(res["manifest"]), 5))
995             d1.addCallback(lambda res: home.start_deep_stats().when_done())
996             def _check_stats(stats):
997                 expected = {"count-immutable-files": 1,
998                             "count-mutable-files": 0,
999                             "count-literal-files": 1,
1000                             "count-files": 2,
1001                             "count-directories": 3,
1002                             "size-immutable-files": 112,
1003                             "size-literal-files": 23,
1004                             #"size-directories": 616, # varies
1005                             #"largest-directory": 616,
1006                             "largest-directory-children": 3,
1007                             "largest-immutable-file": 112,
1008                             }
1009                 for k,v in expected.iteritems():
1010                     self.failUnlessEqual(stats[k], v,
1011                                          "stats[%s] was %s, not %s" %
1012                                          (k, stats[k], v))
1013                 self.failUnless(stats["size-directories"] > 1300,
1014                                 stats["size-directories"])
1015                 self.failUnless(stats["largest-directory"] > 800,
1016                                 stats["largest-directory"])
1017                 self.failUnlessEqual(stats["size-files-histogram"],
1018                                      [ (11, 31, 1), (101, 316, 1) ])
1019             d1.addCallback(_check_stats)
1020             return d1
1021         d.addCallback(_got_home)
1022         return d
1023
1024     def shouldFail(self, res, expected_failure, which, substring=None):
1025         if isinstance(res, Failure):
1026             res.trap(expected_failure)
1027             if substring:
1028                 self.failUnless(substring in str(res),
1029                                 "substring '%s' not in '%s'"
1030                                 % (substring, str(res)))
1031         else:
1032             self.fail("%s was supposed to raise %s, not get '%s'" %
1033                       (which, expected_failure, res))
1034
1035     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1036         assert substring is None or isinstance(substring, str)
1037         d = defer.maybeDeferred(callable, *args, **kwargs)
1038         def done(res):
1039             if isinstance(res, Failure):
1040                 res.trap(expected_failure)
1041                 if substring:
1042                     self.failUnless(substring in str(res),
1043                                     "substring '%s' not in '%s'"
1044                                     % (substring, str(res)))
1045             else:
1046                 self.fail("%s was supposed to raise %s, not get '%s'" %
1047                           (which, expected_failure, res))
1048         d.addBoth(done)
1049         return d
1050
1051     def PUT(self, urlpath, data):
1052         url = self.webish_url + urlpath
1053         return getPage(url, method="PUT", postdata=data)
1054
1055     def GET(self, urlpath, followRedirect=False):
1056         url = self.webish_url + urlpath
1057         return getPage(url, method="GET", followRedirect=followRedirect)
1058
1059     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1060         sepbase = "boogabooga"
1061         sep = "--" + sepbase
1062         form = []
1063         form.append(sep)
1064         form.append('Content-Disposition: form-data; name="_charset"')
1065         form.append('')
1066         form.append('UTF-8')
1067         form.append(sep)
1068         for name, value in fields.iteritems():
1069             if isinstance(value, tuple):
1070                 filename, value = value
1071                 form.append('Content-Disposition: form-data; name="%s"; '
1072                             'filename="%s"' % (name, filename.encode("utf-8")))
1073             else:
1074                 form.append('Content-Disposition: form-data; name="%s"' % name)
1075             form.append('')
1076             form.append(str(value))
1077             form.append(sep)
1078         form[-1] += "--"
1079         body = ""
1080         headers = {}
1081         if fields:
1082             body = "\r\n".join(form) + "\r\n"
1083             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1084         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1085
1086     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1087               use_helper=False):
1088         if use_helper:
1089             url = self.helper_webish_url + urlpath
1090         else:
1091             url = self.webish_url + urlpath
1092         return getPage(url, method="POST", postdata=body, headers=headers,
1093                        followRedirect=followRedirect)
1094
1095     def _test_web(self, res):
1096         base = self.webish_url
1097         public = "uri/" + self._root_directory_uri
1098         d = getPage(base)
1099         def _got_welcome(page):
1100             html = page.replace('\n', ' ')
1101             connected_re = r'Connected to <span>%d</span>\s*of <span>%d</span> known storage servers' % (self.numclients, self.numclients)
1102             self.failUnless(re.search(connected_re, html),
1103                             "I didn't see the right '%s' message in:\n%s" % (connected_re, page))
1104             # nodeids/tubids don't have any regexp-special characters
1105             nodeid_re = r'<th>Node ID:</th>\s*<td title="TubID: %s">%s</td>' % (
1106                 self.clients[0].get_long_tubid(), self.clients[0].get_long_nodeid())
1107             self.failUnless(re.search(nodeid_re, html),
1108                             "I didn't see the right '%s' message in:\n%s" % (nodeid_re, page))
1109             self.failUnless("Helper: 0 active uploads" in page)
1110         d.addCallback(_got_welcome)
1111         d.addCallback(self.log, "done with _got_welcome")
1112
1113         # get the welcome page from the node that uses the helper too
1114         d.addCallback(lambda res: getPage(self.helper_webish_url))
1115         def _got_welcome_helper(page):
1116             html = page.replace('\n', ' ')
1117             self.failUnless(re.search('<img src="img/connected-yes.png" alt="Connected" />', html), page)
1118             self.failUnlessIn("Not running helper", page)
1119         d.addCallback(_got_welcome_helper)
1120
1121         d.addCallback(lambda res: getPage(base + public))
1122         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1123         def _got_subdir1(page):
1124             # there ought to be an href for our file
1125             self.failUnlessIn('<td align="right">%d</td>' % len(self.data), page)
1126             self.failUnless(">mydata567</a>" in page)
1127         d.addCallback(_got_subdir1)
1128         d.addCallback(self.log, "done with _got_subdir1")
1129         d.addCallback(lambda res:
1130                       getPage(base + public + "/subdir1/mydata567"))
1131         def _got_data(page):
1132             self.failUnlessEqual(page, self.data)
1133         d.addCallback(_got_data)
1134
1135         # download from a URI embedded in a URL
1136         d.addCallback(self.log, "_get_from_uri")
1137         def _get_from_uri(res):
1138             return getPage(base + "uri/%s?filename=%s"
1139                            % (self.uri, "mydata567"))
1140         d.addCallback(_get_from_uri)
1141         def _got_from_uri(page):
1142             self.failUnlessEqual(page, self.data)
1143         d.addCallback(_got_from_uri)
1144
1145         # download from a URI embedded in a URL, second form
1146         d.addCallback(self.log, "_get_from_uri2")
1147         def _get_from_uri2(res):
1148             return getPage(base + "uri?uri=%s" % (self.uri,))
1149         d.addCallback(_get_from_uri2)
1150         d.addCallback(_got_from_uri)
1151
1152         # download from a bogus URI, make sure we get a reasonable error
1153         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1154         def _get_from_bogus_uri(res):
1155             d1 = getPage(base + "uri/%s?filename=%s"
1156                          % (self.mangle_uri(self.uri), "mydata567"))
1157             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1158                        "410")
1159             return d1
1160         d.addCallback(_get_from_bogus_uri)
1161         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1162
1163         # upload a file with PUT
1164         d.addCallback(self.log, "about to try PUT")
1165         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1166                                            "new.txt contents"))
1167         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1168         d.addCallback(self.failUnlessEqual, "new.txt contents")
1169         # and again with something large enough to use multiple segments,
1170         # and hopefully trigger pauseProducing too
1171         def _new_happy_semantics(ign):
1172             for c in self.clients:
1173                 # these get reset somewhere? Whatever.
1174                 c.encoding_params['happy'] = 1
1175         d.addCallback(_new_happy_semantics)
1176         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1177                                            "big" * 500000)) # 1.5MB
1178         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1179         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1180
1181         # can we replace files in place?
1182         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1183                                            "NEWER contents"))
1184         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1185         d.addCallback(self.failUnlessEqual, "NEWER contents")
1186
1187         # test unlinked POST
1188         d.addCallback(lambda res: self.POST("uri", t="upload",
1189                                             file=("new.txt", "data" * 10000)))
1190         # and again using the helper, which exercises different upload-status
1191         # display code
1192         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1193                                             file=("foo.txt", "data2" * 10000)))
1194
1195         # check that the status page exists
1196         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1197         def _got_status(res):
1198             # find an interesting upload and download to look at. LIT files
1199             # are not interesting.
1200             h = self.clients[0].get_history()
1201             for ds in h.list_all_download_statuses():
1202                 if ds.get_size() > 200:
1203                     self._down_status = ds.get_counter()
1204             for us in h.list_all_upload_statuses():
1205                 if us.get_size() > 200:
1206                     self._up_status = us.get_counter()
1207             rs = list(h.list_all_retrieve_statuses())[0]
1208             self._retrieve_status = rs.get_counter()
1209             ps = list(h.list_all_publish_statuses())[0]
1210             self._publish_status = ps.get_counter()
1211             us = list(h.list_all_mapupdate_statuses())[0]
1212             self._update_status = us.get_counter()
1213
1214             # and that there are some upload- and download- status pages
1215             return self.GET("status/up-%d" % self._up_status)
1216         d.addCallback(_got_status)
1217         def _got_up(res):
1218             return self.GET("status/down-%d" % self._down_status)
1219         d.addCallback(_got_up)
1220         def _got_down(res):
1221             return self.GET("status/mapupdate-%d" % self._update_status)
1222         d.addCallback(_got_down)
1223         def _got_update(res):
1224             return self.GET("status/publish-%d" % self._publish_status)
1225         d.addCallback(_got_update)
1226         def _got_publish(res):
1227             self.failUnlessIn("Publish Results", res)
1228             return self.GET("status/retrieve-%d" % self._retrieve_status)
1229         d.addCallback(_got_publish)
1230         def _got_retrieve(res):
1231             self.failUnlessIn("Retrieve Results", res)
1232         d.addCallback(_got_retrieve)
1233
1234         # check that the helper status page exists
1235         d.addCallback(lambda res:
1236                       self.GET("helper_status", followRedirect=True))
1237         def _got_helper_status(res):
1238             self.failUnless("Bytes Fetched:" in res)
1239             # touch a couple of files in the helper's working directory to
1240             # exercise more code paths
1241             workdir = os.path.join(self.getdir("client0"), "helper")
1242             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1243             f = open(incfile, "wb")
1244             f.write("small file")
1245             f.close()
1246             then = time.time() - 86400*3
1247             now = time.time()
1248             os.utime(incfile, (now, then))
1249             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1250             f = open(encfile, "wb")
1251             f.write("less small file")
1252             f.close()
1253             os.utime(encfile, (now, then))
1254         d.addCallback(_got_helper_status)
1255         # and that the json form exists
1256         d.addCallback(lambda res:
1257                       self.GET("helper_status?t=json", followRedirect=True))
1258         def _got_helper_status_json(res):
1259             data = simplejson.loads(res)
1260             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1261                                  1)
1262             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1263             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1264             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1265                                  10)
1266             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1267             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1268             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1269                                  15)
1270         d.addCallback(_got_helper_status_json)
1271
1272         # and check that client[3] (which uses a helper but does not run one
1273         # itself) doesn't explode when you ask for its status
1274         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1275         def _got_non_helper_status(res):
1276             self.failUnlessIn("Recent and Active Operations", res)
1277         d.addCallback(_got_non_helper_status)
1278
1279         # or for helper status with t=json
1280         d.addCallback(lambda res:
1281                       getPage(self.helper_webish_url + "helper_status?t=json"))
1282         def _got_non_helper_status_json(res):
1283             data = simplejson.loads(res)
1284             self.failUnlessEqual(data, {})
1285         d.addCallback(_got_non_helper_status_json)
1286
1287         # see if the statistics page exists
1288         d.addCallback(lambda res: self.GET("statistics"))
1289         def _got_stats(res):
1290             self.failUnlessIn("Operational Statistics", res)
1291             self.failUnlessIn("  'downloader.files_downloaded': 5,", res)
1292         d.addCallback(_got_stats)
1293         d.addCallback(lambda res: self.GET("statistics?t=json"))
1294         def _got_stats_json(res):
1295             data = simplejson.loads(res)
1296             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1297             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1298         d.addCallback(_got_stats_json)
1299
1300         # TODO: mangle the second segment of a file, to test errors that
1301         # occur after we've already sent some good data, which uses a
1302         # different error path.
1303
1304         # TODO: download a URI with a form
1305         # TODO: create a directory by using a form
1306         # TODO: upload by using a form on the directory page
1307         #    url = base + "somedir/subdir1/freeform_post!!upload"
1308         # TODO: delete a file by using a button on the directory page
1309
1310         return d
1311
1312     def _test_runner(self, res):
1313         # exercise some of the diagnostic tools in runner.py
1314
1315         # find a share
1316         for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1317             if "storage" not in dirpath:
1318                 continue
1319             if not filenames:
1320                 continue
1321             pieces = dirpath.split(os.sep)
1322             if (len(pieces) >= 4
1323                 and pieces[-4] == "storage"
1324                 and pieces[-3] == "shares"):
1325                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1326                 # are sharefiles here
1327                 filename = os.path.join(dirpath, filenames[0])
1328                 # peek at the magic to see if it is a chk share
1329                 magic = open(filename, "rb").read(4)
1330                 if magic == '\x00\x00\x00\x01':
1331                     break
1332         else:
1333             self.fail("unable to find any uri_extension files in %r"
1334                       % self.basedir)
1335         log.msg("test_system.SystemTest._test_runner using %r" % filename)
1336
1337         out,err = StringIO(), StringIO()
1338         rc = runner.runner(["debug", "dump-share", "--offsets",
1339                             unicode_to_argv(filename)],
1340                            stdout=out, stderr=err)
1341         output = out.getvalue()
1342         self.failUnlessEqual(rc, 0)
1343
1344         # we only upload a single file, so we can assert some things about
1345         # its size and shares.
1346         self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1347         self.failUnlessIn("size: %d\n" % len(self.data), output)
1348         self.failUnlessIn("num_segments: 1\n", output)
1349         # segment_size is always a multiple of needed_shares
1350         self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1351         self.failUnlessIn("total_shares: 10\n", output)
1352         # keys which are supposed to be present
1353         for key in ("size", "num_segments", "segment_size",
1354                     "needed_shares", "total_shares",
1355                     "codec_name", "codec_params", "tail_codec_params",
1356                     #"plaintext_hash", "plaintext_root_hash",
1357                     "crypttext_hash", "crypttext_root_hash",
1358                     "share_root_hash", "UEB_hash"):
1359             self.failUnlessIn("%s: " % key, output)
1360         self.failUnlessIn("  verify-cap: URI:CHK-Verifier:", output)
1361
1362         # now use its storage index to find the other shares using the
1363         # 'find-shares' tool
1364         sharedir, shnum = os.path.split(filename)
1365         storagedir, storage_index_s = os.path.split(sharedir)
1366         storage_index_s = str(storage_index_s)
1367         out,err = StringIO(), StringIO()
1368         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1369         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1370         rc = runner.runner(cmd, stdout=out, stderr=err)
1371         self.failUnlessEqual(rc, 0)
1372         out.seek(0)
1373         sharefiles = [sfn.strip() for sfn in out.readlines()]
1374         self.failUnlessEqual(len(sharefiles), 10)
1375
1376         # also exercise the 'catalog-shares' tool
1377         out,err = StringIO(), StringIO()
1378         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1379         cmd = ["debug", "catalog-shares"] + nodedirs
1380         rc = runner.runner(cmd, stdout=out, stderr=err)
1381         self.failUnlessEqual(rc, 0)
1382         out.seek(0)
1383         descriptions = [sfn.strip() for sfn in out.readlines()]
1384         self.failUnlessEqual(len(descriptions), 30)
1385         matching = [line
1386                     for line in descriptions
1387                     if line.startswith("CHK %s " % storage_index_s)]
1388         self.failUnlessEqual(len(matching), 10)
1389
1390     def _test_control(self, res):
1391         # exercise the remote-control-the-client foolscap interfaces in
1392         # allmydata.control (mostly used for performance tests)
1393         c0 = self.clients[0]
1394         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1395         control_furl = open(control_furl_file, "r").read().strip()
1396         # it doesn't really matter which Tub we use to connect to the client,
1397         # so let's just use our IntroducerNode's
1398         d = self.introducer.tub.getReference(control_furl)
1399         d.addCallback(self._test_control2, control_furl_file)
1400         return d
1401     def _test_control2(self, rref, filename):
1402         d = defer.succeed(None)
1403         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1404         if sys.platform in ("linux2", "linux3"):
1405             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1406         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1407         return d
1408
1409     def _test_cli(self, res):
1410         # run various CLI commands (in a thread, since they use blocking
1411         # network calls)
1412
1413         private_uri = self._private_node.get_uri()
1414         client0_basedir = self.getdir("client0")
1415
1416         nodeargs = [
1417             "--node-directory", client0_basedir,
1418             ]
1419
1420         d = defer.succeed(None)
1421
1422         # for compatibility with earlier versions, private/root_dir.cap is
1423         # supposed to be treated as an alias named "tahoe:". Start by making
1424         # sure that works, before we add other aliases.
1425
1426         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1427         f = open(root_file, "w")
1428         f.write(private_uri)
1429         f.close()
1430
1431         def run(ignored, verb, *args, **kwargs):
1432             stdin = kwargs.get("stdin", "")
1433             newargs = nodeargs + [verb] + list(args)
1434             return self._run_cli(newargs, stdin=stdin)
1435
1436         def _check_ls((out,err), expected_children, unexpected_children=[]):
1437             self.failUnlessEqual(err, "")
1438             for s in expected_children:
1439                 self.failUnless(s in out, (s,out))
1440             for s in unexpected_children:
1441                 self.failIf(s in out, (s,out))
1442
1443         def _check_ls_root((out,err)):
1444             self.failUnless("personal" in out)
1445             self.failUnless("s2-ro" in out)
1446             self.failUnless("s2-rw" in out)
1447             self.failUnlessEqual(err, "")
1448
1449         # this should reference private_uri
1450         d.addCallback(run, "ls")
1451         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1452
1453         d.addCallback(run, "list-aliases")
1454         def _check_aliases_1((out,err)):
1455             self.failUnlessEqual(err, "")
1456             self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1457         d.addCallback(_check_aliases_1)
1458
1459         # now that that's out of the way, remove root_dir.cap and work with
1460         # new files
1461         d.addCallback(lambda res: os.unlink(root_file))
1462         d.addCallback(run, "list-aliases")
1463         def _check_aliases_2((out,err)):
1464             self.failUnlessEqual(err, "")
1465             self.failUnlessEqual(out, "")
1466         d.addCallback(_check_aliases_2)
1467
1468         d.addCallback(run, "mkdir")
1469         def _got_dir( (out,err) ):
1470             self.failUnless(uri.from_string_dirnode(out.strip()))
1471             return out.strip()
1472         d.addCallback(_got_dir)
1473         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1474
1475         d.addCallback(run, "list-aliases")
1476         def _check_aliases_3((out,err)):
1477             self.failUnlessEqual(err, "")
1478             self.failUnless("tahoe: " in out)
1479         d.addCallback(_check_aliases_3)
1480
1481         def _check_empty_dir((out,err)):
1482             self.failUnlessEqual(out, "")
1483             self.failUnlessEqual(err, "")
1484         d.addCallback(run, "ls")
1485         d.addCallback(_check_empty_dir)
1486
1487         def _check_missing_dir((out,err)):
1488             # TODO: check that rc==2
1489             self.failUnlessEqual(out, "")
1490             self.failUnlessEqual(err, "No such file or directory\n")
1491         d.addCallback(run, "ls", "bogus")
1492         d.addCallback(_check_missing_dir)
1493
1494         files = []
1495         datas = []
1496         for i in range(10):
1497             fn = os.path.join(self.basedir, "file%d" % i)
1498             files.append(fn)
1499             data = "data to be uploaded: file%d\n" % i
1500             datas.append(data)
1501             open(fn,"wb").write(data)
1502
1503         def _check_stdout_against((out,err), filenum=None, data=None):
1504             self.failUnlessEqual(err, "")
1505             if filenum is not None:
1506                 self.failUnlessEqual(out, datas[filenum])
1507             if data is not None:
1508                 self.failUnlessEqual(out, data)
1509
1510         # test all both forms of put: from a file, and from stdin
1511         #  tahoe put bar FOO
1512         d.addCallback(run, "put", files[0], "tahoe-file0")
1513         def _put_out((out,err)):
1514             self.failUnless("URI:LIT:" in out, out)
1515             self.failUnless("201 Created" in err, err)
1516             uri0 = out.strip()
1517             return run(None, "get", uri0)
1518         d.addCallback(_put_out)
1519         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1520
1521         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1522         #  tahoe put bar tahoe:FOO
1523         d.addCallback(run, "put", files[2], "tahoe:file2")
1524         d.addCallback(run, "put", "--format=SDMF", files[3], "tahoe:file3")
1525         def _check_put_mutable((out,err)):
1526             self._mutable_file3_uri = out.strip()
1527         d.addCallback(_check_put_mutable)
1528         d.addCallback(run, "get", "tahoe:file3")
1529         d.addCallback(_check_stdout_against, 3)
1530
1531         #  tahoe put FOO
1532         STDIN_DATA = "This is the file to upload from stdin."
1533         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1534         #  tahoe put tahoe:FOO
1535         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1536                       stdin="Other file from stdin.")
1537
1538         d.addCallback(run, "ls")
1539         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1540                                   "tahoe-file-stdin", "from-stdin"])
1541         d.addCallback(run, "ls", "subdir")
1542         d.addCallback(_check_ls, ["tahoe-file1"])
1543
1544         # tahoe mkdir FOO
1545         d.addCallback(run, "mkdir", "subdir2")
1546         d.addCallback(run, "ls")
1547         # TODO: extract the URI, set an alias with it
1548         d.addCallback(_check_ls, ["subdir2"])
1549
1550         # tahoe get: (to stdin and to a file)
1551         d.addCallback(run, "get", "tahoe-file0")
1552         d.addCallback(_check_stdout_against, 0)
1553         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1554         d.addCallback(_check_stdout_against, 1)
1555         outfile0 = os.path.join(self.basedir, "outfile0")
1556         d.addCallback(run, "get", "file2", outfile0)
1557         def _check_outfile0((out,err)):
1558             data = open(outfile0,"rb").read()
1559             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1560         d.addCallback(_check_outfile0)
1561         outfile1 = os.path.join(self.basedir, "outfile0")
1562         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1563         def _check_outfile1((out,err)):
1564             data = open(outfile1,"rb").read()
1565             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1566         d.addCallback(_check_outfile1)
1567
1568         d.addCallback(run, "rm", "tahoe-file0")
1569         d.addCallback(run, "rm", "tahoe:file2")
1570         d.addCallback(run, "ls")
1571         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1572
1573         d.addCallback(run, "ls", "-l")
1574         def _check_ls_l((out,err)):
1575             lines = out.split("\n")
1576             for l in lines:
1577                 if "tahoe-file-stdin" in l:
1578                     self.failUnless(l.startswith("-r-- "), l)
1579                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1580                 if "file3" in l:
1581                     self.failUnless(l.startswith("-rw- "), l) # mutable
1582         d.addCallback(_check_ls_l)
1583
1584         d.addCallback(run, "ls", "--uri")
1585         def _check_ls_uri((out,err)):
1586             lines = out.split("\n")
1587             for l in lines:
1588                 if "file3" in l:
1589                     self.failUnless(self._mutable_file3_uri in l)
1590         d.addCallback(_check_ls_uri)
1591
1592         d.addCallback(run, "ls", "--readonly-uri")
1593         def _check_ls_rouri((out,err)):
1594             lines = out.split("\n")
1595             for l in lines:
1596                 if "file3" in l:
1597                     rw_uri = self._mutable_file3_uri
1598                     u = uri.from_string_mutable_filenode(rw_uri)
1599                     ro_uri = u.get_readonly().to_string()
1600                     self.failUnless(ro_uri in l)
1601         d.addCallback(_check_ls_rouri)
1602
1603
1604         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1605         d.addCallback(run, "ls")
1606         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1607
1608         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1609         d.addCallback(run, "ls")
1610         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1611
1612         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1613         d.addCallback(run, "ls")
1614         d.addCallback(_check_ls, ["file3", "file3-copy"])
1615         d.addCallback(run, "get", "tahoe:file3-copy")
1616         d.addCallback(_check_stdout_against, 3)
1617
1618         # copy from disk into tahoe
1619         d.addCallback(run, "cp", files[4], "tahoe:file4")
1620         d.addCallback(run, "ls")
1621         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1622         d.addCallback(run, "get", "tahoe:file4")
1623         d.addCallback(_check_stdout_against, 4)
1624
1625         # copy from tahoe into disk
1626         target_filename = os.path.join(self.basedir, "file-out")
1627         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1628         def _check_cp_out((out,err)):
1629             self.failUnless(os.path.exists(target_filename))
1630             got = open(target_filename,"rb").read()
1631             self.failUnlessEqual(got, datas[4])
1632         d.addCallback(_check_cp_out)
1633
1634         # copy from disk to disk (silly case)
1635         target2_filename = os.path.join(self.basedir, "file-out-copy")
1636         d.addCallback(run, "cp", target_filename, target2_filename)
1637         def _check_cp_out2((out,err)):
1638             self.failUnless(os.path.exists(target2_filename))
1639             got = open(target2_filename,"rb").read()
1640             self.failUnlessEqual(got, datas[4])
1641         d.addCallback(_check_cp_out2)
1642
1643         # copy from tahoe into disk, overwriting an existing file
1644         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1645         def _check_cp_out3((out,err)):
1646             self.failUnless(os.path.exists(target_filename))
1647             got = open(target_filename,"rb").read()
1648             self.failUnlessEqual(got, datas[3])
1649         d.addCallback(_check_cp_out3)
1650
1651         # copy from disk into tahoe, overwriting an existing immutable file
1652         d.addCallback(run, "cp", files[5], "tahoe:file4")
1653         d.addCallback(run, "ls")
1654         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1655         d.addCallback(run, "get", "tahoe:file4")
1656         d.addCallback(_check_stdout_against, 5)
1657
1658         # copy from disk into tahoe, overwriting an existing mutable file
1659         d.addCallback(run, "cp", files[5], "tahoe:file3")
1660         d.addCallback(run, "ls")
1661         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1662         d.addCallback(run, "get", "tahoe:file3")
1663         d.addCallback(_check_stdout_against, 5)
1664
1665         # recursive copy: setup
1666         dn = os.path.join(self.basedir, "dir1")
1667         os.makedirs(dn)
1668         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1669         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1670         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1671         sdn2 = os.path.join(dn, "subdir2")
1672         os.makedirs(sdn2)
1673         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1674         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1675
1676         # from disk into tahoe
1677         d.addCallback(run, "cp", "-r", dn, "tahoe:")
1678         d.addCallback(run, "ls")
1679         d.addCallback(_check_ls, ["dir1"])
1680         d.addCallback(run, "ls", "dir1")
1681         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1682                       ["rfile4", "rfile5"])
1683         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1684         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1685                       ["rfile1", "rfile2", "rfile3"])
1686         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1687         d.addCallback(_check_stdout_against, data="rfile4")
1688
1689         # and back out again
1690         dn_copy = os.path.join(self.basedir, "dir1-copy")
1691         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1692         def _check_cp_r_out((out,err)):
1693             def _cmp(name):
1694                 old = open(os.path.join(dn, name), "rb").read()
1695                 newfn = os.path.join(dn_copy, "dir1", name)
1696                 self.failUnless(os.path.exists(newfn))
1697                 new = open(newfn, "rb").read()
1698                 self.failUnlessEqual(old, new)
1699             _cmp("rfile1")
1700             _cmp("rfile2")
1701             _cmp("rfile3")
1702             _cmp(os.path.join("subdir2", "rfile4"))
1703             _cmp(os.path.join("subdir2", "rfile5"))
1704         d.addCallback(_check_cp_r_out)
1705
1706         # and copy it a second time, which ought to overwrite the same files
1707         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1708
1709         # and again, only writing filecaps
1710         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1711         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1712         def _check_capsonly((out,err)):
1713             # these should all be LITs
1714             x = open(os.path.join(dn_copy2, "dir1", "subdir2", "rfile4")).read()
1715             y = uri.from_string_filenode(x)
1716             self.failUnlessEqual(y.data, "rfile4")
1717         d.addCallback(_check_capsonly)
1718
1719         # and tahoe-to-tahoe
1720         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1721         d.addCallback(run, "ls")
1722         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1723         d.addCallback(run, "ls", "dir1-copy/dir1")
1724         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1725                       ["rfile4", "rfile5"])
1726         d.addCallback(run, "ls", "tahoe:dir1-copy/dir1/subdir2")
1727         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1728                       ["rfile1", "rfile2", "rfile3"])
1729         d.addCallback(run, "get", "dir1-copy/dir1/subdir2/rfile4")
1730         d.addCallback(_check_stdout_against, data="rfile4")
1731
1732         # and copy it a second time, which ought to overwrite the same files
1733         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1734
1735         # tahoe_ls doesn't currently handle the error correctly: it tries to
1736         # JSON-parse a traceback.
1737 ##         def _ls_missing(res):
1738 ##             argv = nodeargs + ["ls", "bogus"]
1739 ##             return self._run_cli(argv)
1740 ##         d.addCallback(_ls_missing)
1741 ##         def _check_ls_missing((out,err)):
1742 ##             print "OUT", out
1743 ##             print "ERR", err
1744 ##             self.failUnlessEqual(err, "")
1745 ##         d.addCallback(_check_ls_missing)
1746
1747         return d
1748
1749     def test_filesystem_with_cli_in_subprocess(self):
1750         # We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.
1751
1752         self.basedir = "system/SystemTest/test_filesystem_with_cli_in_subprocess"
1753         d = self.set_up_nodes()
1754         def _new_happy_semantics(ign):
1755             for c in self.clients:
1756                 c.encoding_params['happy'] = 1
1757         d.addCallback(_new_happy_semantics)
1758
1759         def _run_in_subprocess(ignored, verb, *args, **kwargs):
1760             stdin = kwargs.get("stdin")
1761             env = kwargs.get("env")
1762             newargs = ["--node-directory", self.getdir("client0"), verb] + list(args)
1763             return self.run_bintahoe(newargs, stdin=stdin, env=env)
1764
1765         def _check_succeeded(res, check_stderr=True):
1766             out, err, rc_or_sig = res
1767             self.failUnlessEqual(rc_or_sig, 0, str(res))
1768             if check_stderr:
1769                 self.failUnlessEqual(err, "")
1770
1771         d.addCallback(_run_in_subprocess, "create-alias", "newalias")
1772         d.addCallback(_check_succeeded)
1773
1774         STDIN_DATA = "This is the file to upload from stdin."
1775         d.addCallback(_run_in_subprocess, "put", "-", "newalias:tahoe-file", stdin=STDIN_DATA)
1776         d.addCallback(_check_succeeded, check_stderr=False)
1777
1778         def _mv_with_http_proxy(ign):
1779             env = os.environ
1780             env['http_proxy'] = env['HTTP_PROXY'] = "http://127.0.0.0:12345"  # invalid address
1781             return _run_in_subprocess(None, "mv", "newalias:tahoe-file", "newalias:tahoe-moved", env=env)
1782         d.addCallback(_mv_with_http_proxy)
1783         d.addCallback(_check_succeeded)
1784
1785         d.addCallback(_run_in_subprocess, "ls", "newalias:")
1786         def _check_ls(res):
1787             out, err, rc_or_sig = res
1788             self.failUnlessEqual(rc_or_sig, 0, str(res))
1789             self.failUnlessEqual(err, "", str(res))
1790             self.failUnlessIn("tahoe-moved", out)
1791             self.failIfIn("tahoe-file", out)
1792         d.addCallback(_check_ls)
1793         return d
1794
1795     def test_debug_trial(self):
1796         def _check_for_line(lines, result, test):
1797             for l in lines:
1798                 if result in l and test in l:
1799                     return
1800             self.fail("output (prefixed with '##') does not have a line containing both %r and %r:\n## %s"
1801                       % (result, test, "\n## ".join(lines)))
1802
1803         def _check_for_outcome(lines, out, outcome):
1804             self.failUnlessIn(outcome, out, "output (prefixed with '##') does not contain %r:\n## %s"
1805                                             % (outcome, "\n## ".join(lines)))
1806
1807         d = self.run_bintahoe(['debug', 'trial', '--reporter=verbose',
1808                                'allmydata.test.trialtest'])
1809         def _check_failure( (out, err, rc) ):
1810             self.failUnlessEqual(rc, 1)
1811             lines = out.split('\n')
1812             _check_for_line(lines, "[SKIPPED]", "test_skip")
1813             _check_for_line(lines, "[TODO]",    "test_todo")
1814             _check_for_line(lines, "[FAIL]",    "test_fail")
1815             _check_for_line(lines, "[ERROR]",   "test_deferred_error")
1816             _check_for_line(lines, "[ERROR]",   "test_error")
1817             _check_for_outcome(lines, out, "FAILED")
1818         d.addCallback(_check_failure)
1819
1820         # the --quiet argument regression-tests a problem in finding which arguments to pass to trial
1821         d.addCallback(lambda ign: self.run_bintahoe(['--quiet', 'debug', 'trial', '--reporter=verbose',
1822                                                      'allmydata.test.trialtest.Success']))
1823         def _check_success( (out, err, rc) ):
1824             self.failUnlessEqual(rc, 0)
1825             lines = out.split('\n')
1826             _check_for_line(lines, "[SKIPPED]", "test_skip")
1827             _check_for_line(lines, "[TODO]",    "test_todo")
1828             _check_for_outcome(lines, out, "PASSED")
1829         d.addCallback(_check_success)
1830         return d
1831
1832     def _run_cli(self, argv, stdin=""):
1833         #print "CLI:", argv
1834         stdout, stderr = StringIO(), StringIO()
1835         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1836                                   stdin=StringIO(stdin),
1837                                   stdout=stdout, stderr=stderr)
1838         def _done(res):
1839             return stdout.getvalue(), stderr.getvalue()
1840         d.addCallback(_done)
1841         return d
1842
1843     def _test_checker(self, res):
1844         ut = upload.Data("too big to be literal" * 200, convergence=None)
1845         d = self._personal_node.add_file(u"big file", ut)
1846
1847         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1848         def _check_dirnode_results(r):
1849             self.failUnless(r.is_healthy())
1850         d.addCallback(_check_dirnode_results)
1851         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1852         d.addCallback(_check_dirnode_results)
1853
1854         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1855         def _got_chk_filenode(n):
1856             self.failUnless(isinstance(n, ImmutableFileNode))
1857             d = n.check(Monitor())
1858             def _check_filenode_results(r):
1859                 self.failUnless(r.is_healthy())
1860             d.addCallback(_check_filenode_results)
1861             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1862             d.addCallback(_check_filenode_results)
1863             return d
1864         d.addCallback(_got_chk_filenode)
1865
1866         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1867         def _got_lit_filenode(n):
1868             self.failUnless(isinstance(n, LiteralFileNode))
1869             d = n.check(Monitor())
1870             def _check_lit_filenode_results(r):
1871                 self.failUnlessEqual(r, None)
1872             d.addCallback(_check_lit_filenode_results)
1873             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1874             d.addCallback(_check_lit_filenode_results)
1875             return d
1876         d.addCallback(_got_lit_filenode)
1877         return d
1878
1879
1880 class Connections(SystemTestMixin, unittest.TestCase):
1881     def test_rref(self):
1882         if NormalizedVersion(foolscap.__version__) < NormalizedVersion('0.6.4'):
1883             raise unittest.SkipTest("skipped due to http://foolscap.lothar.com/trac/ticket/196 "
1884                                     "(which does not affect normal usage of Tahoe-LAFS)")
1885
1886         self.basedir = "system/Connections/rref"
1887         d = self.set_up_nodes(2)
1888         def _start(ign):
1889             self.c0 = self.clients[0]
1890             nonclients = [s for s in self.c0.storage_broker.get_connected_servers()
1891                           if s.get_serverid() != self.c0.nodeid]
1892             self.failUnlessEqual(len(nonclients), 1)
1893
1894             self.s1 = nonclients[0]  # s1 is the server, not c0
1895             self.s1_rref = self.s1.get_rref()
1896             self.failIfEqual(self.s1_rref, None)
1897             self.failUnless(self.s1.is_connected())
1898         d.addCallback(_start)
1899
1900         # now shut down the server
1901         d.addCallback(lambda ign: self.clients[1].disownServiceParent())
1902         # and wait for the client to notice
1903         def _poll():
1904             return len(self.c0.storage_broker.get_connected_servers()) < 2
1905         d.addCallback(lambda ign: self.poll(_poll))
1906
1907         def _down(ign):
1908             self.failIf(self.s1.is_connected())
1909             rref = self.s1.get_rref()
1910             self.failUnless(rref)
1911             self.failUnlessIdentical(rref, self.s1_rref)
1912         d.addCallback(_down)
1913         return d