]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
f3d618d6656066de80bf33012de4d4beccbbec39
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7
8 import allmydata
9 from allmydata import uri
10 from allmydata.storage.mutable import MutableShareFile
11 from allmydata.storage.server import si_a2b
12 from allmydata.immutable import offloaded, upload
13 from allmydata.immutable.literal import LiteralFileNode
14 from allmydata.immutable.filenode import ImmutableFileNode
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding
18 from allmydata.util.fileutil import abspath_expanduser_unicode
19 from allmydata.util.consumer import MemoryConsumer, download_to_data
20 from allmydata.scripts import runner
21 from allmydata.interfaces import IDirectoryNode, IFileNode, \
22      NoSuchChildError, NoSharesError
23 from allmydata.monitor import Monitor
24 from allmydata.mutable.common import NotWriteableError
25 from allmydata.mutable import layout as mutable_layout
26 from allmydata.mutable.publish import MutableData
27 from foolscap.api import DeadReferenceError, fireEventually
28 from twisted.python.failure import Failure
29 from twisted.web.client import getPage
30 from twisted.web.error import Error
31
32 from allmydata.test.common import SystemTestMixin
33
34 # TODO: move this to common or common_util
35 from allmydata.test.test_runner import RunBinTahoeMixin
36
37 LARGE_DATA = """
38 This is some data to publish to the remote grid.., which needs to be large
39 enough to not fit inside a LIT uri.
40 """
41
42 class CountingDataUploadable(upload.Data):
43     bytes_read = 0
44     interrupt_after = None
45     interrupt_after_d = None
46
47     def read(self, length):
48         self.bytes_read += length
49         if self.interrupt_after is not None:
50             if self.bytes_read > self.interrupt_after:
51                 self.interrupt_after = None
52                 self.interrupt_after_d.callback(self)
53         return upload.Data.read(self, length)
54
55 class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
56     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
57
58     def test_connections(self):
59         self.basedir = "system/SystemTest/test_connections"
60         d = self.set_up_nodes()
61         self.extra_node = None
62         d.addCallback(lambda res: self.add_extra_node(self.numclients))
63         def _check(extra_node):
64             self.extra_node = extra_node
65             for c in self.clients:
66                 all_peerids = c.get_storage_broker().get_all_serverids()
67                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
68                 sb = c.storage_broker
69                 permuted_peers = sb.get_servers_for_psi("a")
70                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
71
72         d.addCallback(_check)
73         def _shutdown_extra_node(res):
74             if self.extra_node:
75                 return self.extra_node.stopService()
76             return res
77         d.addBoth(_shutdown_extra_node)
78         return d
79     # test_connections is subsumed by test_upload_and_download, and takes
80     # quite a while to run on a slow machine (because of all the TLS
81     # connections that must be established). If we ever rework the introducer
82     # code to such an extent that we're not sure if it works anymore, we can
83     # reinstate this test until it does.
84     del test_connections
85
86     def test_upload_and_download_random_key(self):
87         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
88         return self._test_upload_and_download(convergence=None)
89
90     def test_upload_and_download_convergent(self):
91         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
92         return self._test_upload_and_download(convergence="some convergence string")
93
94     def _test_upload_and_download(self, convergence):
95         # we use 4000 bytes of data, which will result in about 400k written
96         # to disk among all our simulated nodes
97         DATA = "Some data to upload\n" * 200
98         d = self.set_up_nodes()
99         def _check_connections(res):
100             for c in self.clients:
101                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
102                 all_peerids = c.get_storage_broker().get_all_serverids()
103                 self.failUnlessEqual(len(all_peerids), self.numclients)
104                 sb = c.storage_broker
105                 permuted_peers = sb.get_servers_for_psi("a")
106                 self.failUnlessEqual(len(permuted_peers), self.numclients)
107         d.addCallback(_check_connections)
108
109         def _do_upload(res):
110             log.msg("UPLOADING")
111             u = self.clients[0].getServiceNamed("uploader")
112             self.uploader = u
113             # we crank the max segsize down to 1024b for the duration of this
114             # test, so we can exercise multiple segments. It is important
115             # that this is not a multiple of the segment size, so that the
116             # tail segment is not the same length as the others. This actualy
117             # gets rounded up to 1025 to be a multiple of the number of
118             # required shares (since we use 25 out of 100 FEC).
119             up = upload.Data(DATA, convergence=convergence)
120             up.max_segment_size = 1024
121             d1 = u.upload(up)
122             return d1
123         d.addCallback(_do_upload)
124         def _upload_done(results):
125             theuri = results.get_uri()
126             log.msg("upload finished: uri is %s" % (theuri,))
127             self.uri = theuri
128             assert isinstance(self.uri, str), self.uri
129             self.cap = uri.from_string(self.uri)
130             self.n = self.clients[1].create_node_from_uri(self.uri)
131         d.addCallback(_upload_done)
132
133         def _upload_again(res):
134             # Upload again. If using convergent encryption then this ought to be
135             # short-circuited, however with the way we currently generate URIs
136             # (i.e. because they include the roothash), we have to do all of the
137             # encoding work, and only get to save on the upload part.
138             log.msg("UPLOADING AGAIN")
139             up = upload.Data(DATA, convergence=convergence)
140             up.max_segment_size = 1024
141             return self.uploader.upload(up)
142         d.addCallback(_upload_again)
143
144         def _download_to_data(res):
145             log.msg("DOWNLOADING")
146             return download_to_data(self.n)
147         d.addCallback(_download_to_data)
148         def _download_to_data_done(data):
149             log.msg("download finished")
150             self.failUnlessEqual(data, DATA)
151         d.addCallback(_download_to_data_done)
152
153         def _test_read(res):
154             n = self.clients[1].create_node_from_uri(self.uri)
155             d = download_to_data(n)
156             def _read_done(data):
157                 self.failUnlessEqual(data, DATA)
158             d.addCallback(_read_done)
159             d.addCallback(lambda ign:
160                           n.read(MemoryConsumer(), offset=1, size=4))
161             def _read_portion_done(mc):
162                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
163             d.addCallback(_read_portion_done)
164             d.addCallback(lambda ign:
165                           n.read(MemoryConsumer(), offset=2, size=None))
166             def _read_tail_done(mc):
167                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
168             d.addCallback(_read_tail_done)
169             d.addCallback(lambda ign:
170                           n.read(MemoryConsumer(), size=len(DATA)+1000))
171             def _read_too_much(mc):
172                 self.failUnlessEqual("".join(mc.chunks), DATA)
173             d.addCallback(_read_too_much)
174
175             return d
176         d.addCallback(_test_read)
177
178         def _test_bad_read(res):
179             bad_u = uri.from_string_filenode(self.uri)
180             bad_u.key = self.flip_bit(bad_u.key)
181             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
182             # this should cause an error during download
183
184             d = self.shouldFail2(NoSharesError, "'download bad node'",
185                                  None,
186                                  bad_n.read, MemoryConsumer(), offset=2)
187             return d
188         d.addCallback(_test_bad_read)
189
190         def _download_nonexistent_uri(res):
191             baduri = self.mangle_uri(self.uri)
192             badnode = self.clients[1].create_node_from_uri(baduri)
193             log.msg("about to download non-existent URI", level=log.UNUSUAL,
194                     facility="tahoe.tests")
195             d1 = download_to_data(badnode)
196             def _baduri_should_fail(res):
197                 log.msg("finished downloading non-existent URI",
198                         level=log.UNUSUAL, facility="tahoe.tests")
199                 self.failUnless(isinstance(res, Failure))
200                 self.failUnless(res.check(NoSharesError),
201                                 "expected NoSharesError, got %s" % res)
202             d1.addBoth(_baduri_should_fail)
203             return d1
204         d.addCallback(_download_nonexistent_uri)
205
206         # add a new node, which doesn't accept shares, and only uses the
207         # helper for upload.
208         d.addCallback(lambda res: self.add_extra_node(self.numclients,
209                                                       self.helper_furl,
210                                                       add_to_sparent=True))
211         def _added(extra_node):
212             self.extra_node = extra_node
213             self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
214         d.addCallback(_added)
215
216         def _has_helper():
217             uploader = self.extra_node.getServiceNamed("uploader")
218             furl, connected = uploader.get_helper_info()
219             return connected
220         d.addCallback(lambda ign: self.poll(_has_helper))
221
222         HELPER_DATA = "Data that needs help to upload" * 1000
223         def _upload_with_helper(res):
224             u = upload.Data(HELPER_DATA, convergence=convergence)
225             d = self.extra_node.upload(u)
226             def _uploaded(results):
227                 n = self.clients[1].create_node_from_uri(results.get_uri())
228                 return download_to_data(n)
229             d.addCallback(_uploaded)
230             def _check(newdata):
231                 self.failUnlessEqual(newdata, HELPER_DATA)
232             d.addCallback(_check)
233             return d
234         d.addCallback(_upload_with_helper)
235
236         def _upload_duplicate_with_helper(res):
237             u = upload.Data(HELPER_DATA, convergence=convergence)
238             u.debug_stash_RemoteEncryptedUploadable = True
239             d = self.extra_node.upload(u)
240             def _uploaded(results):
241                 n = self.clients[1].create_node_from_uri(results.get_uri())
242                 return download_to_data(n)
243             d.addCallback(_uploaded)
244             def _check(newdata):
245                 self.failUnlessEqual(newdata, HELPER_DATA)
246                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
247                             "uploadable started uploading, should have been avoided")
248             d.addCallback(_check)
249             return d
250         if convergence is not None:
251             d.addCallback(_upload_duplicate_with_helper)
252
253         d.addCallback(fireEventually)
254
255         def _upload_resumable(res):
256             DATA = "Data that needs help to upload and gets interrupted" * 1000
257             u1 = CountingDataUploadable(DATA, convergence=convergence)
258             u2 = CountingDataUploadable(DATA, convergence=convergence)
259
260             # we interrupt the connection after about 5kB by shutting down
261             # the helper, then restarting it.
262             u1.interrupt_after = 5000
263             u1.interrupt_after_d = defer.Deferred()
264             bounced_d = defer.Deferred()
265             def _do_bounce(res):
266                 d = self.bounce_client(0)
267                 d.addBoth(bounced_d.callback)
268             u1.interrupt_after_d.addCallback(_do_bounce)
269
270             # sneak into the helper and reduce its chunk size, so that our
271             # debug_interrupt will sever the connection on about the fifth
272             # chunk fetched. This makes sure that we've started to write the
273             # new shares before we abandon them, which exercises the
274             # abort/delete-partial-share code. TODO: find a cleaner way to do
275             # this. I know that this will affect later uses of the helper in
276             # this same test run, but I'm not currently worried about it.
277             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
278
279             upload_d = self.extra_node.upload(u1)
280             # The upload will start, and bounce_client() will be called after
281             # about 5kB. bounced_d will fire after bounce_client() finishes
282             # shutting down and restarting the node.
283             d = bounced_d
284             def _bounced(ign):
285                 # By this point, the upload should have failed because of the
286                 # interruption. upload_d will fire in a moment
287                 def _should_not_finish(res):
288                     self.fail("interrupted upload should have failed, not"
289                               " finished with result %s" % (res,))
290                 def _interrupted(f):
291                     f.trap(DeadReferenceError)
292                     # make sure we actually interrupted it before finishing
293                     # the file
294                     self.failUnless(u1.bytes_read < len(DATA),
295                                     "read %d out of %d total" %
296                                     (u1.bytes_read, len(DATA)))
297                 upload_d.addCallbacks(_should_not_finish, _interrupted)
298                 return upload_d
299             d.addCallback(_bounced)
300
301             def _disconnected(res):
302                 # check to make sure the storage servers aren't still hanging
303                 # on to the partial share: their incoming/ directories should
304                 # now be empty.
305                 log.msg("disconnected", level=log.NOISY,
306                         facility="tahoe.test.test_system")
307                 for i in range(self.numclients):
308                     incdir = os.path.join(self.getdir("client%d" % i),
309                                           "storage", "shares", "incoming")
310                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
311             d.addCallback(_disconnected)
312
313             d.addCallback(lambda res:
314                           log.msg("wait_for_helper", level=log.NOISY,
315                                   facility="tahoe.test.test_system"))
316             # then we need to wait for the extra node to reestablish its
317             # connection to the helper.
318             d.addCallback(lambda ign: self.poll(_has_helper))
319
320             d.addCallback(lambda res:
321                           log.msg("uploading again", level=log.NOISY,
322                                   facility="tahoe.test.test_system"))
323             d.addCallback(lambda res: self.extra_node.upload(u2))
324
325             def _uploaded(results):
326                 cap = results.get_uri()
327                 log.msg("Second upload complete", level=log.NOISY,
328                         facility="tahoe.test.test_system")
329
330                 # this is really bytes received rather than sent, but it's
331                 # convenient and basically measures the same thing
332                 bytes_sent = results.get_ciphertext_fetched()
333                 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
334
335                 # We currently don't support resumption of upload if the data is
336                 # encrypted with a random key.  (Because that would require us
337                 # to store the key locally and re-use it on the next upload of
338                 # this file, which isn't a bad thing to do, but we currently
339                 # don't do it.)
340                 if convergence is not None:
341                     # Make sure we did not have to read the whole file the
342                     # second time around .
343                     self.failUnless(bytes_sent < len(DATA),
344                                     "resumption didn't save us any work:"
345                                     " read %r bytes out of %r total" %
346                                     (bytes_sent, len(DATA)))
347                 else:
348                     # Make sure we did have to read the whole file the second
349                     # time around -- because the one that we partially uploaded
350                     # earlier was encrypted with a different random key.
351                     self.failIf(bytes_sent < len(DATA),
352                                 "resumption saved us some work even though we were using random keys:"
353                                 " read %r bytes out of %r total" %
354                                 (bytes_sent, len(DATA)))
355                 n = self.clients[1].create_node_from_uri(cap)
356                 return download_to_data(n)
357             d.addCallback(_uploaded)
358
359             def _check(newdata):
360                 self.failUnlessEqual(newdata, DATA)
361                 # If using convergent encryption, then also check that the
362                 # helper has removed the temp file from its directories.
363                 if convergence is not None:
364                     basedir = os.path.join(self.getdir("client0"), "helper")
365                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
366                     self.failUnlessEqual(files, [])
367                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
368                     self.failUnlessEqual(files, [])
369             d.addCallback(_check)
370             return d
371         d.addCallback(_upload_resumable)
372
373         def _grab_stats(ignored):
374             # the StatsProvider doesn't normally publish a FURL:
375             # instead it passes a live reference to the StatsGatherer
376             # (if and when it connects). To exercise the remote stats
377             # interface, we manually publish client0's StatsProvider
378             # and use client1 to query it.
379             sp = self.clients[0].stats_provider
380             sp_furl = self.clients[0].tub.registerReference(sp)
381             d = self.clients[1].tub.getReference(sp_furl)
382             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
383             def _got_stats(stats):
384                 #print "STATS"
385                 #from pprint import pprint
386                 #pprint(stats)
387                 s = stats["stats"]
388                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
389                 c = stats["counters"]
390                 self.failUnless("storage_server.allocate" in c)
391             d.addCallback(_got_stats)
392             return d
393         d.addCallback(_grab_stats)
394
395         return d
396
397     def _find_all_shares(self, basedir):
398         shares = []
399         for (dirpath, dirnames, filenames) in os.walk(basedir):
400             if "storage" not in dirpath:
401                 continue
402             if not filenames:
403                 continue
404             pieces = dirpath.split(os.sep)
405             if (len(pieces) >= 5
406                 and pieces[-4] == "storage"
407                 and pieces[-3] == "shares"):
408                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
409                 # are sharefiles here
410                 assert pieces[-5].startswith("client")
411                 client_num = int(pieces[-5][-1])
412                 storage_index_s = pieces[-1]
413                 storage_index = si_a2b(storage_index_s)
414                 for sharename in filenames:
415                     shnum = int(sharename)
416                     filename = os.path.join(dirpath, sharename)
417                     data = (client_num, storage_index, filename, shnum)
418                     shares.append(data)
419         if not shares:
420             self.fail("unable to find any share files in %s" % basedir)
421         return shares
422
423     def _corrupt_mutable_share(self, filename, which):
424         msf = MutableShareFile(filename)
425         datav = msf.readv([ (0, 1000000) ])
426         final_share = datav[0]
427         assert len(final_share) < 1000000 # ought to be truncated
428         pieces = mutable_layout.unpack_share(final_share)
429         (seqnum, root_hash, IV, k, N, segsize, datalen,
430          verification_key, signature, share_hash_chain, block_hash_tree,
431          share_data, enc_privkey) = pieces
432
433         if which == "seqnum":
434             seqnum = seqnum + 15
435         elif which == "R":
436             root_hash = self.flip_bit(root_hash)
437         elif which == "IV":
438             IV = self.flip_bit(IV)
439         elif which == "segsize":
440             segsize = segsize + 15
441         elif which == "pubkey":
442             verification_key = self.flip_bit(verification_key)
443         elif which == "signature":
444             signature = self.flip_bit(signature)
445         elif which == "share_hash_chain":
446             nodenum = share_hash_chain.keys()[0]
447             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
448         elif which == "block_hash_tree":
449             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
450         elif which == "share_data":
451             share_data = self.flip_bit(share_data)
452         elif which == "encprivkey":
453             enc_privkey = self.flip_bit(enc_privkey)
454
455         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
456                                             segsize, datalen)
457         final_share = mutable_layout.pack_share(prefix,
458                                                 verification_key,
459                                                 signature,
460                                                 share_hash_chain,
461                                                 block_hash_tree,
462                                                 share_data,
463                                                 enc_privkey)
464         msf.writev( [(0, final_share)], None)
465
466
467     def test_mutable(self):
468         self.basedir = "system/SystemTest/test_mutable"
469         DATA = "initial contents go here."  # 25 bytes % 3 != 0
470         DATA_uploadable = MutableData(DATA)
471         NEWDATA = "new contents yay"
472         NEWDATA_uploadable = MutableData(NEWDATA)
473         NEWERDATA = "this is getting old"
474         NEWERDATA_uploadable = MutableData(NEWERDATA)
475
476         d = self.set_up_nodes(use_key_generator=True)
477
478         def _create_mutable(res):
479             c = self.clients[0]
480             log.msg("starting create_mutable_file")
481             d1 = c.create_mutable_file(DATA_uploadable)
482             def _done(res):
483                 log.msg("DONE: %s" % (res,))
484                 self._mutable_node_1 = res
485             d1.addCallback(_done)
486             return d1
487         d.addCallback(_create_mutable)
488
489         def _test_debug(res):
490             # find a share. It is important to run this while there is only
491             # one slot in the grid.
492             shares = self._find_all_shares(self.basedir)
493             (client_num, storage_index, filename, shnum) = shares[0]
494             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
495                     % filename)
496             log.msg(" for clients[%d]" % client_num)
497
498             out,err = StringIO(), StringIO()
499             rc = runner.runner(["debug", "dump-share", "--offsets",
500                                 filename],
501                                stdout=out, stderr=err)
502             output = out.getvalue()
503             self.failUnlessEqual(rc, 0)
504             try:
505                 self.failUnless("Mutable slot found:\n" in output)
506                 self.failUnless("share_type: SDMF\n" in output)
507                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
508                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
509                 self.failUnless(" num_extra_leases: 0\n" in output)
510                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
511                                 in output)
512                 self.failUnless(" SDMF contents:\n" in output)
513                 self.failUnless("  seqnum: 1\n" in output)
514                 self.failUnless("  required_shares: 3\n" in output)
515                 self.failUnless("  total_shares: 10\n" in output)
516                 self.failUnless("  segsize: 27\n" in output, (output, filename))
517                 self.failUnless("  datalen: 25\n" in output)
518                 # the exact share_hash_chain nodes depends upon the sharenum,
519                 # and is more of a hassle to compute than I want to deal with
520                 # now
521                 self.failUnless("  share_hash_chain: " in output)
522                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
523                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
524                             base32.b2a(storage_index))
525                 self.failUnless(expected in output)
526             except unittest.FailTest:
527                 print
528                 print "dump-share output was:"
529                 print output
530                 raise
531         d.addCallback(_test_debug)
532
533         # test retrieval
534
535         # first, let's see if we can use the existing node to retrieve the
536         # contents. This allows it to use the cached pubkey and maybe the
537         # latest-known sharemap.
538
539         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
540         def _check_download_1(res):
541             self.failUnlessEqual(res, DATA)
542             # now we see if we can retrieve the data from a new node,
543             # constructed using the URI of the original one. We do this test
544             # on the same client that uploaded the data.
545             uri = self._mutable_node_1.get_uri()
546             log.msg("starting retrieve1")
547             newnode = self.clients[0].create_node_from_uri(uri)
548             newnode_2 = self.clients[0].create_node_from_uri(uri)
549             self.failUnlessIdentical(newnode, newnode_2)
550             return newnode.download_best_version()
551         d.addCallback(_check_download_1)
552
553         def _check_download_2(res):
554             self.failUnlessEqual(res, DATA)
555             # same thing, but with a different client
556             uri = self._mutable_node_1.get_uri()
557             newnode = self.clients[1].create_node_from_uri(uri)
558             log.msg("starting retrieve2")
559             d1 = newnode.download_best_version()
560             d1.addCallback(lambda res: (res, newnode))
561             return d1
562         d.addCallback(_check_download_2)
563
564         def _check_download_3((res, newnode)):
565             self.failUnlessEqual(res, DATA)
566             # replace the data
567             log.msg("starting replace1")
568             d1 = newnode.overwrite(NEWDATA_uploadable)
569             d1.addCallback(lambda res: newnode.download_best_version())
570             return d1
571         d.addCallback(_check_download_3)
572
573         def _check_download_4(res):
574             self.failUnlessEqual(res, NEWDATA)
575             # now create an even newer node and replace the data on it. This
576             # new node has never been used for download before.
577             uri = self._mutable_node_1.get_uri()
578             newnode1 = self.clients[2].create_node_from_uri(uri)
579             newnode2 = self.clients[3].create_node_from_uri(uri)
580             self._newnode3 = self.clients[3].create_node_from_uri(uri)
581             log.msg("starting replace2")
582             d1 = newnode1.overwrite(NEWERDATA_uploadable)
583             d1.addCallback(lambda res: newnode2.download_best_version())
584             return d1
585         d.addCallback(_check_download_4)
586
587         def _check_download_5(res):
588             log.msg("finished replace2")
589             self.failUnlessEqual(res, NEWERDATA)
590         d.addCallback(_check_download_5)
591
592         def _corrupt_shares(res):
593             # run around and flip bits in all but k of the shares, to test
594             # the hash checks
595             shares = self._find_all_shares(self.basedir)
596             ## sort by share number
597             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
598             where = dict([ (shnum, filename)
599                            for (client_num, storage_index, filename, shnum)
600                            in shares ])
601             assert len(where) == 10 # this test is designed for 3-of-10
602             for shnum, filename in where.items():
603                 # shares 7,8,9 are left alone. read will check
604                 # (share_hash_chain, block_hash_tree, share_data). New
605                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
606                 # segsize, signature).
607                 if shnum == 0:
608                     # read: this will trigger "pubkey doesn't match
609                     # fingerprint".
610                     self._corrupt_mutable_share(filename, "pubkey")
611                     self._corrupt_mutable_share(filename, "encprivkey")
612                 elif shnum == 1:
613                     # triggers "signature is invalid"
614                     self._corrupt_mutable_share(filename, "seqnum")
615                 elif shnum == 2:
616                     # triggers "signature is invalid"
617                     self._corrupt_mutable_share(filename, "R")
618                 elif shnum == 3:
619                     # triggers "signature is invalid"
620                     self._corrupt_mutable_share(filename, "segsize")
621                 elif shnum == 4:
622                     self._corrupt_mutable_share(filename, "share_hash_chain")
623                 elif shnum == 5:
624                     self._corrupt_mutable_share(filename, "block_hash_tree")
625                 elif shnum == 6:
626                     self._corrupt_mutable_share(filename, "share_data")
627                 # other things to correct: IV, signature
628                 # 7,8,9 are left alone
629
630                 # note that initial_query_count=5 means that we'll hit the
631                 # first 5 servers in effectively random order (based upon
632                 # response time), so we won't necessarily ever get a "pubkey
633                 # doesn't match fingerprint" error (if we hit shnum>=1 before
634                 # shnum=0, we pull the pubkey from there). To get repeatable
635                 # specific failures, we need to set initial_query_count=1,
636                 # but of course that will change the sequencing behavior of
637                 # the retrieval process. TODO: find a reasonable way to make
638                 # this a parameter, probably when we expand this test to test
639                 # for one failure mode at a time.
640
641                 # when we retrieve this, we should get three signature
642                 # failures (where we've mangled seqnum, R, and segsize). The
643                 # pubkey mangling
644         d.addCallback(_corrupt_shares)
645
646         d.addCallback(lambda res: self._newnode3.download_best_version())
647         d.addCallback(_check_download_5)
648
649         def _check_empty_file(res):
650             # make sure we can create empty files, this usually screws up the
651             # segsize math
652             d1 = self.clients[2].create_mutable_file(MutableData(""))
653             d1.addCallback(lambda newnode: newnode.download_best_version())
654             d1.addCallback(lambda res: self.failUnlessEqual("", res))
655             return d1
656         d.addCallback(_check_empty_file)
657
658         d.addCallback(lambda res: self.clients[0].create_dirnode())
659         def _created_dirnode(dnode):
660             log.msg("_created_dirnode(%s)" % (dnode,))
661             d1 = dnode.list()
662             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
663             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
664             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
665             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
666             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
667             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
668             d1.addCallback(lambda res: dnode.build_manifest().when_done())
669             d1.addCallback(lambda res:
670                            self.failUnlessEqual(len(res["manifest"]), 1))
671             return d1
672         d.addCallback(_created_dirnode)
673
674         def wait_for_c3_kg_conn():
675             return self.clients[3]._key_generator is not None
676         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
677
678         def check_kg_poolsize(junk, size_delta):
679             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
680                                  self.key_generator_svc.key_generator.pool_size + size_delta)
681
682         d.addCallback(check_kg_poolsize, 0)
683         d.addCallback(lambda junk:
684             self.clients[3].create_mutable_file(MutableData('hello, world')))
685         d.addCallback(check_kg_poolsize, -1)
686         d.addCallback(lambda junk: self.clients[3].create_dirnode())
687         d.addCallback(check_kg_poolsize, -2)
688         # use_helper induces use of clients[3], which is the using-key_gen client
689         d.addCallback(lambda junk:
690                       self.POST("uri?t=mkdir&name=george", use_helper=True))
691         d.addCallback(check_kg_poolsize, -3)
692
693         return d
694
695     def flip_bit(self, good):
696         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
697
698     def mangle_uri(self, gooduri):
699         # change the key, which changes the storage index, which means we'll
700         # be asking about the wrong file, so nobody will have any shares
701         u = uri.from_string(gooduri)
702         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
703                             uri_extension_hash=u.uri_extension_hash,
704                             needed_shares=u.needed_shares,
705                             total_shares=u.total_shares,
706                             size=u.size)
707         return u2.to_string()
708
709     # TODO: add a test which mangles the uri_extension_hash instead, and
710     # should fail due to not being able to get a valid uri_extension block.
711     # Also a test which sneakily mangles the uri_extension block to change
712     # some of the validation data, so it will fail in the post-download phase
713     # when the file's crypttext integrity check fails. Do the same thing for
714     # the key, which should cause the download to fail the post-download
715     # plaintext_hash check.
716
717     def test_filesystem(self):
718         self.basedir = "system/SystemTest/test_filesystem"
719         self.data = LARGE_DATA
720         d = self.set_up_nodes(use_stats_gatherer=True)
721         def _new_happy_semantics(ign):
722             for c in self.clients:
723                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
724         d.addCallback(_new_happy_semantics)
725         d.addCallback(self._test_introweb)
726         d.addCallback(self.log, "starting publish")
727         d.addCallback(self._do_publish1)
728         d.addCallback(self._test_runner)
729         d.addCallback(self._do_publish2)
730         # at this point, we have the following filesystem (where "R" denotes
731         # self._root_directory_uri):
732         # R
733         # R/subdir1
734         # R/subdir1/mydata567
735         # R/subdir1/subdir2/
736         # R/subdir1/subdir2/mydata992
737
738         d.addCallback(lambda res: self.bounce_client(0))
739         d.addCallback(self.log, "bounced client0")
740
741         d.addCallback(self._check_publish1)
742         d.addCallback(self.log, "did _check_publish1")
743         d.addCallback(self._check_publish2)
744         d.addCallback(self.log, "did _check_publish2")
745         d.addCallback(self._do_publish_private)
746         d.addCallback(self.log, "did _do_publish_private")
747         # now we also have (where "P" denotes a new dir):
748         #  P/personal/sekrit data
749         #  P/s2-rw -> /subdir1/subdir2/
750         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
751         d.addCallback(self._check_publish_private)
752         d.addCallback(self.log, "did _check_publish_private")
753         d.addCallback(self._test_web)
754         d.addCallback(self._test_control)
755         d.addCallback(self._test_cli)
756         # P now has four top-level children:
757         # P/personal/sekrit data
758         # P/s2-ro/
759         # P/s2-rw/
760         # P/test_put/  (empty)
761         d.addCallback(self._test_checker)
762         return d
763
764     def _test_introweb(self, res):
765         d = getPage(self.introweb_url, method="GET", followRedirect=True)
766         def _check(res):
767             try:
768                 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res)
769                 verstr = str(allmydata.__version__)
770
771                 # The Python "rational version numbering" convention
772                 # disallows "-r$REV" but allows ".post$REV"
773                 # instead. Eventually we'll probably move to
774                 # that. When we do, this test won't go red:
775                 ix = verstr.rfind('-r')
776                 if ix != -1:
777                     altverstr = verstr[:ix] + '.post' + verstr[ix+2:]
778                 else:
779                     ix = verstr.rfind('.post')
780                     if ix != -1:
781                         altverstr = verstr[:ix] + '-r' + verstr[ix+5:]
782                     else:
783                         altverstr = verstr
784
785                 appverstr = "%s: %s" % (allmydata.__appname__, verstr)
786                 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
787
788                 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
789                 self.failUnless("Announcement Summary: storage: 5" in res)
790                 self.failUnless("Subscription Summary: storage: 5" in res)
791                 self.failUnless("tahoe.css" in res)
792             except unittest.FailTest:
793                 print
794                 print "GET %s output was:" % self.introweb_url
795                 print res
796                 raise
797         d.addCallback(_check)
798         # make sure it serves the CSS too
799         d.addCallback(lambda res:
800                       getPage(self.introweb_url+"tahoe.css", method="GET"))
801         d.addCallback(lambda res:
802                       getPage(self.introweb_url + "?t=json",
803                               method="GET", followRedirect=True))
804         def _check_json(res):
805             data = simplejson.loads(res)
806             try:
807                 self.failUnlessEqual(data["subscription_summary"],
808                                      {"storage": 5})
809                 self.failUnlessEqual(data["announcement_summary"],
810                                      {"storage": 5})
811                 self.failUnlessEqual(data["announcement_distinct_hosts"],
812                                      {"storage": 1})
813             except unittest.FailTest:
814                 print
815                 print "GET %s?t=json output was:" % self.introweb_url
816                 print res
817                 raise
818         d.addCallback(_check_json)
819         return d
820
821     def _do_publish1(self, res):
822         ut = upload.Data(self.data, convergence=None)
823         c0 = self.clients[0]
824         d = c0.create_dirnode()
825         def _made_root(new_dirnode):
826             self._root_directory_uri = new_dirnode.get_uri()
827             return c0.create_node_from_uri(self._root_directory_uri)
828         d.addCallback(_made_root)
829         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
830         def _made_subdir1(subdir1_node):
831             self._subdir1_node = subdir1_node
832             d1 = subdir1_node.add_file(u"mydata567", ut)
833             d1.addCallback(self.log, "publish finished")
834             def _stash_uri(filenode):
835                 self.uri = filenode.get_uri()
836                 assert isinstance(self.uri, str), (self.uri, filenode)
837             d1.addCallback(_stash_uri)
838             return d1
839         d.addCallback(_made_subdir1)
840         return d
841
842     def _do_publish2(self, res):
843         ut = upload.Data(self.data, convergence=None)
844         d = self._subdir1_node.create_subdirectory(u"subdir2")
845         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
846         return d
847
848     def log(self, res, *args, **kwargs):
849         # print "MSG: %s  RES: %s" % (msg, args)
850         log.msg(*args, **kwargs)
851         return res
852
853     def _do_publish_private(self, res):
854         self.smalldata = "sssh, very secret stuff"
855         ut = upload.Data(self.smalldata, convergence=None)
856         d = self.clients[0].create_dirnode()
857         d.addCallback(self.log, "GOT private directory")
858         def _got_new_dir(privnode):
859             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
860             d1 = privnode.create_subdirectory(u"personal")
861             d1.addCallback(self.log, "made P/personal")
862             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
863             d1.addCallback(self.log, "made P/personal/sekrit data")
864             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
865             def _got_s2(s2node):
866                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
867                                       s2node.get_readonly_uri())
868                 d2.addCallback(lambda node:
869                                privnode.set_uri(u"s2-ro",
870                                                 s2node.get_readonly_uri(),
871                                                 s2node.get_readonly_uri()))
872                 return d2
873             d1.addCallback(_got_s2)
874             d1.addCallback(lambda res: privnode)
875             return d1
876         d.addCallback(_got_new_dir)
877         return d
878
879     def _check_publish1(self, res):
880         # this one uses the iterative API
881         c1 = self.clients[1]
882         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
883         d.addCallback(self.log, "check_publish1 got /")
884         d.addCallback(lambda root: root.get(u"subdir1"))
885         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
886         d.addCallback(lambda filenode: download_to_data(filenode))
887         d.addCallback(self.log, "get finished")
888         def _get_done(data):
889             self.failUnlessEqual(data, self.data)
890         d.addCallback(_get_done)
891         return d
892
893     def _check_publish2(self, res):
894         # this one uses the path-based API
895         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
896         d = rootnode.get_child_at_path(u"subdir1")
897         d.addCallback(lambda dirnode:
898                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
899         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
900         d.addCallback(lambda filenode: download_to_data(filenode))
901         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
902
903         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
904         def _got_filenode(filenode):
905             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
906             assert fnode == filenode
907         d.addCallback(_got_filenode)
908         return d
909
910     def _check_publish_private(self, resnode):
911         # this one uses the path-based API
912         self._private_node = resnode
913
914         d = self._private_node.get_child_at_path(u"personal")
915         def _got_personal(personal):
916             self._personal_node = personal
917             return personal
918         d.addCallback(_got_personal)
919
920         d.addCallback(lambda dirnode:
921                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
922         def get_path(path):
923             return self._private_node.get_child_at_path(path)
924
925         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
926         d.addCallback(lambda filenode: download_to_data(filenode))
927         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
928         d.addCallback(lambda res: get_path(u"s2-rw"))
929         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
930         d.addCallback(lambda res: get_path(u"s2-ro"))
931         def _got_s2ro(dirnode):
932             self.failUnless(dirnode.is_mutable(), dirnode)
933             self.failUnless(dirnode.is_readonly(), dirnode)
934             d1 = defer.succeed(None)
935             d1.addCallback(lambda res: dirnode.list())
936             d1.addCallback(self.log, "dirnode.list")
937
938             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
939
940             d1.addCallback(self.log, "doing add_file(ro)")
941             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
942             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
943
944             d1.addCallback(self.log, "doing get(ro)")
945             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
946             d1.addCallback(lambda filenode:
947                            self.failUnless(IFileNode.providedBy(filenode)))
948
949             d1.addCallback(self.log, "doing delete(ro)")
950             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
951
952             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
953
954             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
955
956             personal = self._personal_node
957             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
958
959             d1.addCallback(self.log, "doing move_child_to(ro)2")
960             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
961
962             d1.addCallback(self.log, "finished with _got_s2ro")
963             return d1
964         d.addCallback(_got_s2ro)
965         def _got_home(dummy):
966             home = self._private_node
967             personal = self._personal_node
968             d1 = defer.succeed(None)
969             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
970             d1.addCallback(lambda res:
971                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
972
973             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
974             d1.addCallback(lambda res:
975                            home.move_child_to(u"sekrit", home, u"sekrit data"))
976
977             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
978             d1.addCallback(lambda res:
979                            home.move_child_to(u"sekrit data", personal))
980
981             d1.addCallback(lambda res: home.build_manifest().when_done())
982             d1.addCallback(self.log, "manifest")
983             #  five items:
984             # P/
985             # P/personal/
986             # P/personal/sekrit data
987             # P/s2-rw  (same as P/s2-ro)
988             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
989             d1.addCallback(lambda res:
990                            self.failUnlessEqual(len(res["manifest"]), 5))
991             d1.addCallback(lambda res: home.start_deep_stats().when_done())
992             def _check_stats(stats):
993                 expected = {"count-immutable-files": 1,
994                             "count-mutable-files": 0,
995                             "count-literal-files": 1,
996                             "count-files": 2,
997                             "count-directories": 3,
998                             "size-immutable-files": 112,
999                             "size-literal-files": 23,
1000                             #"size-directories": 616, # varies
1001                             #"largest-directory": 616,
1002                             "largest-directory-children": 3,
1003                             "largest-immutable-file": 112,
1004                             }
1005                 for k,v in expected.iteritems():
1006                     self.failUnlessEqual(stats[k], v,
1007                                          "stats[%s] was %s, not %s" %
1008                                          (k, stats[k], v))
1009                 self.failUnless(stats["size-directories"] > 1300,
1010                                 stats["size-directories"])
1011                 self.failUnless(stats["largest-directory"] > 800,
1012                                 stats["largest-directory"])
1013                 self.failUnlessEqual(stats["size-files-histogram"],
1014                                      [ (11, 31, 1), (101, 316, 1) ])
1015             d1.addCallback(_check_stats)
1016             return d1
1017         d.addCallback(_got_home)
1018         return d
1019
1020     def shouldFail(self, res, expected_failure, which, substring=None):
1021         if isinstance(res, Failure):
1022             res.trap(expected_failure)
1023             if substring:
1024                 self.failUnless(substring in str(res),
1025                                 "substring '%s' not in '%s'"
1026                                 % (substring, str(res)))
1027         else:
1028             self.fail("%s was supposed to raise %s, not get '%s'" %
1029                       (which, expected_failure, res))
1030
1031     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1032         assert substring is None or isinstance(substring, str)
1033         d = defer.maybeDeferred(callable, *args, **kwargs)
1034         def done(res):
1035             if isinstance(res, Failure):
1036                 res.trap(expected_failure)
1037                 if substring:
1038                     self.failUnless(substring in str(res),
1039                                     "substring '%s' not in '%s'"
1040                                     % (substring, str(res)))
1041             else:
1042                 self.fail("%s was supposed to raise %s, not get '%s'" %
1043                           (which, expected_failure, res))
1044         d.addBoth(done)
1045         return d
1046
1047     def PUT(self, urlpath, data):
1048         url = self.webish_url + urlpath
1049         return getPage(url, method="PUT", postdata=data)
1050
1051     def GET(self, urlpath, followRedirect=False):
1052         url = self.webish_url + urlpath
1053         return getPage(url, method="GET", followRedirect=followRedirect)
1054
1055     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1056         sepbase = "boogabooga"
1057         sep = "--" + sepbase
1058         form = []
1059         form.append(sep)
1060         form.append('Content-Disposition: form-data; name="_charset"')
1061         form.append('')
1062         form.append('UTF-8')
1063         form.append(sep)
1064         for name, value in fields.iteritems():
1065             if isinstance(value, tuple):
1066                 filename, value = value
1067                 form.append('Content-Disposition: form-data; name="%s"; '
1068                             'filename="%s"' % (name, filename.encode("utf-8")))
1069             else:
1070                 form.append('Content-Disposition: form-data; name="%s"' % name)
1071             form.append('')
1072             form.append(str(value))
1073             form.append(sep)
1074         form[-1] += "--"
1075         body = ""
1076         headers = {}
1077         if fields:
1078             body = "\r\n".join(form) + "\r\n"
1079             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1080         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1081
1082     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1083               use_helper=False):
1084         if use_helper:
1085             url = self.helper_webish_url + urlpath
1086         else:
1087             url = self.webish_url + urlpath
1088         return getPage(url, method="POST", postdata=body, headers=headers,
1089                        followRedirect=followRedirect)
1090
1091     def _test_web(self, res):
1092         base = self.webish_url
1093         public = "uri/" + self._root_directory_uri
1094         d = getPage(base)
1095         def _got_welcome(page):
1096             # XXX This test is oversensitive to formatting
1097             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1098             self.failUnless(expected in page,
1099                             "I didn't see the right 'connected storage servers'"
1100                             " message in: %s" % page
1101                             )
1102             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1103             self.failUnless(expected in page,
1104                             "I didn't see the right 'My nodeid' message "
1105                             "in: %s" % page)
1106             self.failUnless("Helper: 0 active uploads" in page)
1107         d.addCallback(_got_welcome)
1108         d.addCallback(self.log, "done with _got_welcome")
1109
1110         # get the welcome page from the node that uses the helper too
1111         d.addCallback(lambda res: getPage(self.helper_webish_url))
1112         def _got_welcome_helper(page):
1113             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1114                             page)
1115             self.failUnless("Not running helper" in page)
1116         d.addCallback(_got_welcome_helper)
1117
1118         d.addCallback(lambda res: getPage(base + public))
1119         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1120         def _got_subdir1(page):
1121             # there ought to be an href for our file
1122             self.failUnlessIn('<td align="right">%d</td>' % len(self.data), page)
1123             self.failUnless(">mydata567</a>" in page)
1124         d.addCallback(_got_subdir1)
1125         d.addCallback(self.log, "done with _got_subdir1")
1126         d.addCallback(lambda res:
1127                       getPage(base + public + "/subdir1/mydata567"))
1128         def _got_data(page):
1129             self.failUnlessEqual(page, self.data)
1130         d.addCallback(_got_data)
1131
1132         # download from a URI embedded in a URL
1133         d.addCallback(self.log, "_get_from_uri")
1134         def _get_from_uri(res):
1135             return getPage(base + "uri/%s?filename=%s"
1136                            % (self.uri, "mydata567"))
1137         d.addCallback(_get_from_uri)
1138         def _got_from_uri(page):
1139             self.failUnlessEqual(page, self.data)
1140         d.addCallback(_got_from_uri)
1141
1142         # download from a URI embedded in a URL, second form
1143         d.addCallback(self.log, "_get_from_uri2")
1144         def _get_from_uri2(res):
1145             return getPage(base + "uri?uri=%s" % (self.uri,))
1146         d.addCallback(_get_from_uri2)
1147         d.addCallback(_got_from_uri)
1148
1149         # download from a bogus URI, make sure we get a reasonable error
1150         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1151         def _get_from_bogus_uri(res):
1152             d1 = getPage(base + "uri/%s?filename=%s"
1153                          % (self.mangle_uri(self.uri), "mydata567"))
1154             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1155                        "410")
1156             return d1
1157         d.addCallback(_get_from_bogus_uri)
1158         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1159
1160         # upload a file with PUT
1161         d.addCallback(self.log, "about to try PUT")
1162         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1163                                            "new.txt contents"))
1164         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1165         d.addCallback(self.failUnlessEqual, "new.txt contents")
1166         # and again with something large enough to use multiple segments,
1167         # and hopefully trigger pauseProducing too
1168         def _new_happy_semantics(ign):
1169             for c in self.clients:
1170                 # these get reset somewhere? Whatever.
1171                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1172         d.addCallback(_new_happy_semantics)
1173         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1174                                            "big" * 500000)) # 1.5MB
1175         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1176         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1177
1178         # can we replace files in place?
1179         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1180                                            "NEWER contents"))
1181         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1182         d.addCallback(self.failUnlessEqual, "NEWER contents")
1183
1184         # test unlinked POST
1185         d.addCallback(lambda res: self.POST("uri", t="upload",
1186                                             file=("new.txt", "data" * 10000)))
1187         # and again using the helper, which exercises different upload-status
1188         # display code
1189         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1190                                             file=("foo.txt", "data2" * 10000)))
1191
1192         # check that the status page exists
1193         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1194         def _got_status(res):
1195             # find an interesting upload and download to look at. LIT files
1196             # are not interesting.
1197             h = self.clients[0].get_history()
1198             for ds in h.list_all_download_statuses():
1199                 if ds.get_size() > 200:
1200                     self._down_status = ds.get_counter()
1201             for us in h.list_all_upload_statuses():
1202                 if us.get_size() > 200:
1203                     self._up_status = us.get_counter()
1204             rs = list(h.list_all_retrieve_statuses())[0]
1205             self._retrieve_status = rs.get_counter()
1206             ps = list(h.list_all_publish_statuses())[0]
1207             self._publish_status = ps.get_counter()
1208             us = list(h.list_all_mapupdate_statuses())[0]
1209             self._update_status = us.get_counter()
1210
1211             # and that there are some upload- and download- status pages
1212             return self.GET("status/up-%d" % self._up_status)
1213         d.addCallback(_got_status)
1214         def _got_up(res):
1215             return self.GET("status/down-%d" % self._down_status)
1216         d.addCallback(_got_up)
1217         def _got_down(res):
1218             return self.GET("status/mapupdate-%d" % self._update_status)
1219         d.addCallback(_got_down)
1220         def _got_update(res):
1221             return self.GET("status/publish-%d" % self._publish_status)
1222         d.addCallback(_got_update)
1223         def _got_publish(res):
1224             self.failUnlessIn("Publish Results", res)
1225             return self.GET("status/retrieve-%d" % self._retrieve_status)
1226         d.addCallback(_got_publish)
1227         def _got_retrieve(res):
1228             self.failUnlessIn("Retrieve Results", res)
1229         d.addCallback(_got_retrieve)
1230
1231         # check that the helper status page exists
1232         d.addCallback(lambda res:
1233                       self.GET("helper_status", followRedirect=True))
1234         def _got_helper_status(res):
1235             self.failUnless("Bytes Fetched:" in res)
1236             # touch a couple of files in the helper's working directory to
1237             # exercise more code paths
1238             workdir = os.path.join(self.getdir("client0"), "helper")
1239             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1240             f = open(incfile, "wb")
1241             f.write("small file")
1242             f.close()
1243             then = time.time() - 86400*3
1244             now = time.time()
1245             os.utime(incfile, (now, then))
1246             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1247             f = open(encfile, "wb")
1248             f.write("less small file")
1249             f.close()
1250             os.utime(encfile, (now, then))
1251         d.addCallback(_got_helper_status)
1252         # and that the json form exists
1253         d.addCallback(lambda res:
1254                       self.GET("helper_status?t=json", followRedirect=True))
1255         def _got_helper_status_json(res):
1256             data = simplejson.loads(res)
1257             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1258                                  1)
1259             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1260             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1261             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1262                                  10)
1263             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1264             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1265             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1266                                  15)
1267         d.addCallback(_got_helper_status_json)
1268
1269         # and check that client[3] (which uses a helper but does not run one
1270         # itself) doesn't explode when you ask for its status
1271         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1272         def _got_non_helper_status(res):
1273             self.failUnless("Upload and Download Status" in res)
1274         d.addCallback(_got_non_helper_status)
1275
1276         # or for helper status with t=json
1277         d.addCallback(lambda res:
1278                       getPage(self.helper_webish_url + "helper_status?t=json"))
1279         def _got_non_helper_status_json(res):
1280             data = simplejson.loads(res)
1281             self.failUnlessEqual(data, {})
1282         d.addCallback(_got_non_helper_status_json)
1283
1284         # see if the statistics page exists
1285         d.addCallback(lambda res: self.GET("statistics"))
1286         def _got_stats(res):
1287             self.failUnless("Node Statistics" in res)
1288             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1289         d.addCallback(_got_stats)
1290         d.addCallback(lambda res: self.GET("statistics?t=json"))
1291         def _got_stats_json(res):
1292             data = simplejson.loads(res)
1293             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1294             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1295         d.addCallback(_got_stats_json)
1296
1297         # TODO: mangle the second segment of a file, to test errors that
1298         # occur after we've already sent some good data, which uses a
1299         # different error path.
1300
1301         # TODO: download a URI with a form
1302         # TODO: create a directory by using a form
1303         # TODO: upload by using a form on the directory page
1304         #    url = base + "somedir/subdir1/freeform_post!!upload"
1305         # TODO: delete a file by using a button on the directory page
1306
1307         return d
1308
1309     def _test_runner(self, res):
1310         # exercise some of the diagnostic tools in runner.py
1311
1312         # find a share
1313         for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1314             if "storage" not in dirpath:
1315                 continue
1316             if not filenames:
1317                 continue
1318             pieces = dirpath.split(os.sep)
1319             if (len(pieces) >= 4
1320                 and pieces[-4] == "storage"
1321                 and pieces[-3] == "shares"):
1322                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1323                 # are sharefiles here
1324                 filename = os.path.join(dirpath, filenames[0])
1325                 # peek at the magic to see if it is a chk share
1326                 magic = open(filename, "rb").read(4)
1327                 if magic == '\x00\x00\x00\x01':
1328                     break
1329         else:
1330             self.fail("unable to find any uri_extension files in %r"
1331                       % self.basedir)
1332         log.msg("test_system.SystemTest._test_runner using %r" % filename)
1333
1334         out,err = StringIO(), StringIO()
1335         rc = runner.runner(["debug", "dump-share", "--offsets",
1336                             unicode_to_argv(filename)],
1337                            stdout=out, stderr=err)
1338         output = out.getvalue()
1339         self.failUnlessEqual(rc, 0)
1340
1341         # we only upload a single file, so we can assert some things about
1342         # its size and shares.
1343         self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1344         self.failUnlessIn("size: %d\n" % len(self.data), output)
1345         self.failUnlessIn("num_segments: 1\n", output)
1346         # segment_size is always a multiple of needed_shares
1347         self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1348         self.failUnlessIn("total_shares: 10\n", output)
1349         # keys which are supposed to be present
1350         for key in ("size", "num_segments", "segment_size",
1351                     "needed_shares", "total_shares",
1352                     "codec_name", "codec_params", "tail_codec_params",
1353                     #"plaintext_hash", "plaintext_root_hash",
1354                     "crypttext_hash", "crypttext_root_hash",
1355                     "share_root_hash", "UEB_hash"):
1356             self.failUnlessIn("%s: " % key, output)
1357         self.failUnlessIn("  verify-cap: URI:CHK-Verifier:", output)
1358
1359         # now use its storage index to find the other shares using the
1360         # 'find-shares' tool
1361         sharedir, shnum = os.path.split(filename)
1362         storagedir, storage_index_s = os.path.split(sharedir)
1363         storage_index_s = str(storage_index_s)
1364         out,err = StringIO(), StringIO()
1365         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1366         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1367         rc = runner.runner(cmd, stdout=out, stderr=err)
1368         self.failUnlessEqual(rc, 0)
1369         out.seek(0)
1370         sharefiles = [sfn.strip() for sfn in out.readlines()]
1371         self.failUnlessEqual(len(sharefiles), 10)
1372
1373         # also exercise the 'catalog-shares' tool
1374         out,err = StringIO(), StringIO()
1375         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1376         cmd = ["debug", "catalog-shares"] + nodedirs
1377         rc = runner.runner(cmd, stdout=out, stderr=err)
1378         self.failUnlessEqual(rc, 0)
1379         out.seek(0)
1380         descriptions = [sfn.strip() for sfn in out.readlines()]
1381         self.failUnlessEqual(len(descriptions), 30)
1382         matching = [line
1383                     for line in descriptions
1384                     if line.startswith("CHK %s " % storage_index_s)]
1385         self.failUnlessEqual(len(matching), 10)
1386
1387     def _test_control(self, res):
1388         # exercise the remote-control-the-client foolscap interfaces in
1389         # allmydata.control (mostly used for performance tests)
1390         c0 = self.clients[0]
1391         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1392         control_furl = open(control_furl_file, "r").read().strip()
1393         # it doesn't really matter which Tub we use to connect to the client,
1394         # so let's just use our IntroducerNode's
1395         d = self.introducer.tub.getReference(control_furl)
1396         d.addCallback(self._test_control2, control_furl_file)
1397         return d
1398     def _test_control2(self, rref, filename):
1399         d = rref.callRemote("upload_from_file_to_uri",
1400                             filename.encode(get_filesystem_encoding()), convergence=None)
1401         downfile = os.path.join(self.basedir, "control.downfile").encode(get_filesystem_encoding())
1402         d.addCallback(lambda uri:
1403                       rref.callRemote("download_from_uri_to_file",
1404                                       uri, downfile))
1405         def _check(res):
1406             self.failUnlessEqual(res, downfile)
1407             data = open(downfile, "r").read()
1408             expected_data = open(filename, "r").read()
1409             self.failUnlessEqual(data, expected_data)
1410         d.addCallback(_check)
1411         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1412         if sys.platform in ("linux2", "linux3"):
1413             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1414         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1415         return d
1416
1417     def _test_cli(self, res):
1418         # run various CLI commands (in a thread, since they use blocking
1419         # network calls)
1420
1421         private_uri = self._private_node.get_uri()
1422         client0_basedir = self.getdir("client0")
1423
1424         nodeargs = [
1425             "--node-directory", client0_basedir,
1426             ]
1427
1428         d = defer.succeed(None)
1429
1430         # for compatibility with earlier versions, private/root_dir.cap is
1431         # supposed to be treated as an alias named "tahoe:". Start by making
1432         # sure that works, before we add other aliases.
1433
1434         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1435         f = open(root_file, "w")
1436         f.write(private_uri)
1437         f.close()
1438
1439         def run(ignored, verb, *args, **kwargs):
1440             stdin = kwargs.get("stdin", "")
1441             newargs = [verb] + nodeargs + list(args)
1442             return self._run_cli(newargs, stdin=stdin)
1443
1444         def _check_ls((out,err), expected_children, unexpected_children=[]):
1445             self.failUnlessEqual(err, "")
1446             for s in expected_children:
1447                 self.failUnless(s in out, (s,out))
1448             for s in unexpected_children:
1449                 self.failIf(s in out, (s,out))
1450
1451         def _check_ls_root((out,err)):
1452             self.failUnless("personal" in out)
1453             self.failUnless("s2-ro" in out)
1454             self.failUnless("s2-rw" in out)
1455             self.failUnlessEqual(err, "")
1456
1457         # this should reference private_uri
1458         d.addCallback(run, "ls")
1459         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1460
1461         d.addCallback(run, "list-aliases")
1462         def _check_aliases_1((out,err)):
1463             self.failUnlessEqual(err, "")
1464             self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1465         d.addCallback(_check_aliases_1)
1466
1467         # now that that's out of the way, remove root_dir.cap and work with
1468         # new files
1469         d.addCallback(lambda res: os.unlink(root_file))
1470         d.addCallback(run, "list-aliases")
1471         def _check_aliases_2((out,err)):
1472             self.failUnlessEqual(err, "")
1473             self.failUnlessEqual(out, "")
1474         d.addCallback(_check_aliases_2)
1475
1476         d.addCallback(run, "mkdir")
1477         def _got_dir( (out,err) ):
1478             self.failUnless(uri.from_string_dirnode(out.strip()))
1479             return out.strip()
1480         d.addCallback(_got_dir)
1481         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1482
1483         d.addCallback(run, "list-aliases")
1484         def _check_aliases_3((out,err)):
1485             self.failUnlessEqual(err, "")
1486             self.failUnless("tahoe: " in out)
1487         d.addCallback(_check_aliases_3)
1488
1489         def _check_empty_dir((out,err)):
1490             self.failUnlessEqual(out, "")
1491             self.failUnlessEqual(err, "")
1492         d.addCallback(run, "ls")
1493         d.addCallback(_check_empty_dir)
1494
1495         def _check_missing_dir((out,err)):
1496             # TODO: check that rc==2
1497             self.failUnlessEqual(out, "")
1498             self.failUnlessEqual(err, "No such file or directory\n")
1499         d.addCallback(run, "ls", "bogus")
1500         d.addCallback(_check_missing_dir)
1501
1502         files = []
1503         datas = []
1504         for i in range(10):
1505             fn = os.path.join(self.basedir, "file%d" % i)
1506             files.append(fn)
1507             data = "data to be uploaded: file%d\n" % i
1508             datas.append(data)
1509             open(fn,"wb").write(data)
1510
1511         def _check_stdout_against((out,err), filenum=None, data=None):
1512             self.failUnlessEqual(err, "")
1513             if filenum is not None:
1514                 self.failUnlessEqual(out, datas[filenum])
1515             if data is not None:
1516                 self.failUnlessEqual(out, data)
1517
1518         # test all both forms of put: from a file, and from stdin
1519         #  tahoe put bar FOO
1520         d.addCallback(run, "put", files[0], "tahoe-file0")
1521         def _put_out((out,err)):
1522             self.failUnless("URI:LIT:" in out, out)
1523             self.failUnless("201 Created" in err, err)
1524             uri0 = out.strip()
1525             return run(None, "get", uri0)
1526         d.addCallback(_put_out)
1527         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1528
1529         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1530         #  tahoe put bar tahoe:FOO
1531         d.addCallback(run, "put", files[2], "tahoe:file2")
1532         d.addCallback(run, "put", "--format=SDMF", files[3], "tahoe:file3")
1533         def _check_put_mutable((out,err)):
1534             self._mutable_file3_uri = out.strip()
1535         d.addCallback(_check_put_mutable)
1536         d.addCallback(run, "get", "tahoe:file3")
1537         d.addCallback(_check_stdout_against, 3)
1538
1539         #  tahoe put FOO
1540         STDIN_DATA = "This is the file to upload from stdin."
1541         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1542         #  tahoe put tahoe:FOO
1543         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1544                       stdin="Other file from stdin.")
1545
1546         d.addCallback(run, "ls")
1547         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1548                                   "tahoe-file-stdin", "from-stdin"])
1549         d.addCallback(run, "ls", "subdir")
1550         d.addCallback(_check_ls, ["tahoe-file1"])
1551
1552         # tahoe mkdir FOO
1553         d.addCallback(run, "mkdir", "subdir2")
1554         d.addCallback(run, "ls")
1555         # TODO: extract the URI, set an alias with it
1556         d.addCallback(_check_ls, ["subdir2"])
1557
1558         # tahoe get: (to stdin and to a file)
1559         d.addCallback(run, "get", "tahoe-file0")
1560         d.addCallback(_check_stdout_against, 0)
1561         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1562         d.addCallback(_check_stdout_against, 1)
1563         outfile0 = os.path.join(self.basedir, "outfile0")
1564         d.addCallback(run, "get", "file2", outfile0)
1565         def _check_outfile0((out,err)):
1566             data = open(outfile0,"rb").read()
1567             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1568         d.addCallback(_check_outfile0)
1569         outfile1 = os.path.join(self.basedir, "outfile0")
1570         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1571         def _check_outfile1((out,err)):
1572             data = open(outfile1,"rb").read()
1573             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1574         d.addCallback(_check_outfile1)
1575
1576         d.addCallback(run, "rm", "tahoe-file0")
1577         d.addCallback(run, "rm", "tahoe:file2")
1578         d.addCallback(run, "ls")
1579         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1580
1581         d.addCallback(run, "ls", "-l")
1582         def _check_ls_l((out,err)):
1583             lines = out.split("\n")
1584             for l in lines:
1585                 if "tahoe-file-stdin" in l:
1586                     self.failUnless(l.startswith("-r-- "), l)
1587                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1588                 if "file3" in l:
1589                     self.failUnless(l.startswith("-rw- "), l) # mutable
1590         d.addCallback(_check_ls_l)
1591
1592         d.addCallback(run, "ls", "--uri")
1593         def _check_ls_uri((out,err)):
1594             lines = out.split("\n")
1595             for l in lines:
1596                 if "file3" in l:
1597                     self.failUnless(self._mutable_file3_uri in l)
1598         d.addCallback(_check_ls_uri)
1599
1600         d.addCallback(run, "ls", "--readonly-uri")
1601         def _check_ls_rouri((out,err)):
1602             lines = out.split("\n")
1603             for l in lines:
1604                 if "file3" in l:
1605                     rw_uri = self._mutable_file3_uri
1606                     u = uri.from_string_mutable_filenode(rw_uri)
1607                     ro_uri = u.get_readonly().to_string()
1608                     self.failUnless(ro_uri in l)
1609         d.addCallback(_check_ls_rouri)
1610
1611
1612         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1613         d.addCallback(run, "ls")
1614         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1615
1616         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1617         d.addCallback(run, "ls")
1618         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1619
1620         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1621         d.addCallback(run, "ls")
1622         d.addCallback(_check_ls, ["file3", "file3-copy"])
1623         d.addCallback(run, "get", "tahoe:file3-copy")
1624         d.addCallback(_check_stdout_against, 3)
1625
1626         # copy from disk into tahoe
1627         d.addCallback(run, "cp", files[4], "tahoe:file4")
1628         d.addCallback(run, "ls")
1629         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1630         d.addCallback(run, "get", "tahoe:file4")
1631         d.addCallback(_check_stdout_against, 4)
1632
1633         # copy from tahoe into disk
1634         target_filename = os.path.join(self.basedir, "file-out")
1635         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1636         def _check_cp_out((out,err)):
1637             self.failUnless(os.path.exists(target_filename))
1638             got = open(target_filename,"rb").read()
1639             self.failUnlessEqual(got, datas[4])
1640         d.addCallback(_check_cp_out)
1641
1642         # copy from disk to disk (silly case)
1643         target2_filename = os.path.join(self.basedir, "file-out-copy")
1644         d.addCallback(run, "cp", target_filename, target2_filename)
1645         def _check_cp_out2((out,err)):
1646             self.failUnless(os.path.exists(target2_filename))
1647             got = open(target2_filename,"rb").read()
1648             self.failUnlessEqual(got, datas[4])
1649         d.addCallback(_check_cp_out2)
1650
1651         # copy from tahoe into disk, overwriting an existing file
1652         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1653         def _check_cp_out3((out,err)):
1654             self.failUnless(os.path.exists(target_filename))
1655             got = open(target_filename,"rb").read()
1656             self.failUnlessEqual(got, datas[3])
1657         d.addCallback(_check_cp_out3)
1658
1659         # copy from disk into tahoe, overwriting an existing immutable file
1660         d.addCallback(run, "cp", files[5], "tahoe:file4")
1661         d.addCallback(run, "ls")
1662         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1663         d.addCallback(run, "get", "tahoe:file4")
1664         d.addCallback(_check_stdout_against, 5)
1665
1666         # copy from disk into tahoe, overwriting an existing mutable file
1667         d.addCallback(run, "cp", files[5], "tahoe:file3")
1668         d.addCallback(run, "ls")
1669         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1670         d.addCallback(run, "get", "tahoe:file3")
1671         d.addCallback(_check_stdout_against, 5)
1672
1673         # recursive copy: setup
1674         dn = os.path.join(self.basedir, "dir1")
1675         os.makedirs(dn)
1676         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1677         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1678         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1679         sdn2 = os.path.join(dn, "subdir2")
1680         os.makedirs(sdn2)
1681         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1682         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1683
1684         # from disk into tahoe
1685         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1686         d.addCallback(run, "ls")
1687         d.addCallback(_check_ls, ["dir1"])
1688         d.addCallback(run, "ls", "dir1")
1689         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1690                       ["rfile4", "rfile5"])
1691         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1692         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1693                       ["rfile1", "rfile2", "rfile3"])
1694         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1695         d.addCallback(_check_stdout_against, data="rfile4")
1696
1697         # and back out again
1698         dn_copy = os.path.join(self.basedir, "dir1-copy")
1699         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1700         def _check_cp_r_out((out,err)):
1701             def _cmp(name):
1702                 old = open(os.path.join(dn, name), "rb").read()
1703                 newfn = os.path.join(dn_copy, name)
1704                 self.failUnless(os.path.exists(newfn))
1705                 new = open(newfn, "rb").read()
1706                 self.failUnlessEqual(old, new)
1707             _cmp("rfile1")
1708             _cmp("rfile2")
1709             _cmp("rfile3")
1710             _cmp(os.path.join("subdir2", "rfile4"))
1711             _cmp(os.path.join("subdir2", "rfile5"))
1712         d.addCallback(_check_cp_r_out)
1713
1714         # and copy it a second time, which ought to overwrite the same files
1715         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1716
1717         # and again, only writing filecaps
1718         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1719         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1720         def _check_capsonly((out,err)):
1721             # these should all be LITs
1722             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1723             y = uri.from_string_filenode(x)
1724             self.failUnlessEqual(y.data, "rfile4")
1725         d.addCallback(_check_capsonly)
1726
1727         # and tahoe-to-tahoe
1728         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1729         d.addCallback(run, "ls")
1730         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1731         d.addCallback(run, "ls", "dir1-copy")
1732         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1733                       ["rfile4", "rfile5"])
1734         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1735         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1736                       ["rfile1", "rfile2", "rfile3"])
1737         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1738         d.addCallback(_check_stdout_against, data="rfile4")
1739
1740         # and copy it a second time, which ought to overwrite the same files
1741         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1742
1743         # tahoe_ls doesn't currently handle the error correctly: it tries to
1744         # JSON-parse a traceback.
1745 ##         def _ls_missing(res):
1746 ##             argv = ["ls"] + nodeargs + ["bogus"]
1747 ##             return self._run_cli(argv)
1748 ##         d.addCallback(_ls_missing)
1749 ##         def _check_ls_missing((out,err)):
1750 ##             print "OUT", out
1751 ##             print "ERR", err
1752 ##             self.failUnlessEqual(err, "")
1753 ##         d.addCallback(_check_ls_missing)
1754
1755         return d
1756
1757     def test_filesystem_with_cli_in_subprocess(self):
1758         # We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.
1759
1760         self.basedir = "system/SystemTest/test_filesystem_with_cli_in_subprocess"
1761         d = self.set_up_nodes()
1762         def _new_happy_semantics(ign):
1763             for c in self.clients:
1764                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1765         d.addCallback(_new_happy_semantics)
1766
1767         def _run_in_subprocess(ignored, verb, *args, **kwargs):
1768             stdin = kwargs.get("stdin")
1769             env = kwargs.get("env")
1770             newargs = [verb, "--node-directory", self.getdir("client0")] + list(args)
1771             return self.run_bintahoe(newargs, stdin=stdin, env=env)
1772
1773         def _check_succeeded(res, check_stderr=True):
1774             out, err, rc_or_sig = res
1775             self.failUnlessEqual(rc_or_sig, 0, str(res))
1776             if check_stderr:
1777                 self.failUnlessEqual(err, "")
1778
1779         d.addCallback(_run_in_subprocess, "create-alias", "newalias")
1780         d.addCallback(_check_succeeded)
1781
1782         STDIN_DATA = "This is the file to upload from stdin."
1783         d.addCallback(_run_in_subprocess, "put", "-", "newalias:tahoe-file", stdin=STDIN_DATA)
1784         d.addCallback(_check_succeeded, check_stderr=False)
1785
1786         def _mv_with_http_proxy(ign):
1787             env = os.environ
1788             env['http_proxy'] = env['HTTP_PROXY'] = "http://127.0.0.0:12345"  # invalid address
1789             return _run_in_subprocess(None, "mv", "newalias:tahoe-file", "newalias:tahoe-moved", env=env)
1790         d.addCallback(_mv_with_http_proxy)
1791         d.addCallback(_check_succeeded)
1792
1793         d.addCallback(_run_in_subprocess, "ls", "newalias:")
1794         def _check_ls(res):
1795             out, err, rc_or_sig = res
1796             self.failUnlessEqual(rc_or_sig, 0, str(res))
1797             self.failUnlessEqual(err, "", str(res))
1798             self.failUnlessIn("tahoe-moved", out)
1799             self.failIfIn("tahoe-file", out)
1800         d.addCallback(_check_ls)
1801         return d
1802
1803     def test_debug_trial(self):
1804         def _check_for_line(lines, result, test):
1805             for l in lines:
1806                 if result in l and test in l:
1807                     return
1808             self.fail("output (prefixed with '##') does not have a line containing both %r and %r:\n## %s"
1809                       % (result, test, "\n## ".join(lines)))
1810
1811         def _check_for_outcome(lines, out, outcome):
1812             self.failUnlessIn(outcome, out, "output (prefixed with '##') does not contain %r:\n## %s"
1813                                             % (outcome, "\n## ".join(lines)))
1814
1815         d = self.run_bintahoe(['debug', 'trial', '--reporter=verbose',
1816                                'allmydata.test.trialtest'])
1817         def _check_failure( (out, err, rc) ):
1818             self.failUnlessEqual(rc, 1)
1819             lines = out.split('\n')
1820             _check_for_line(lines, "[SKIPPED]", "test_skip")
1821             _check_for_line(lines, "[TODO]",    "test_todo")
1822             _check_for_line(lines, "[FAIL]",    "test_fail")
1823             _check_for_line(lines, "[ERROR]",   "test_deferred_error")
1824             _check_for_line(lines, "[ERROR]",   "test_error")
1825             _check_for_outcome(lines, out, "FAILED")
1826         d.addCallback(_check_failure)
1827
1828         # the --quiet argument regression-tests a problem in finding which arguments to pass to trial
1829         d.addCallback(lambda ign: self.run_bintahoe(['--quiet', 'debug', 'trial', '--reporter=verbose',
1830                                                      'allmydata.test.trialtest.Success']))
1831         def _check_success( (out, err, rc) ):
1832             self.failUnlessEqual(rc, 0)
1833             lines = out.split('\n')
1834             _check_for_line(lines, "[SKIPPED]", "test_skip")
1835             _check_for_line(lines, "[TODO]",    "test_todo")
1836             _check_for_outcome(lines, out, "PASSED")
1837         d.addCallback(_check_success)
1838         return d
1839
1840     def _run_cli(self, argv, stdin=""):
1841         #print "CLI:", argv
1842         stdout, stderr = StringIO(), StringIO()
1843         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1844                                   stdin=StringIO(stdin),
1845                                   stdout=stdout, stderr=stderr)
1846         def _done(res):
1847             return stdout.getvalue(), stderr.getvalue()
1848         d.addCallback(_done)
1849         return d
1850
1851     def _test_checker(self, res):
1852         ut = upload.Data("too big to be literal" * 200, convergence=None)
1853         d = self._personal_node.add_file(u"big file", ut)
1854
1855         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1856         def _check_dirnode_results(r):
1857             self.failUnless(r.is_healthy())
1858         d.addCallback(_check_dirnode_results)
1859         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1860         d.addCallback(_check_dirnode_results)
1861
1862         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1863         def _got_chk_filenode(n):
1864             self.failUnless(isinstance(n, ImmutableFileNode))
1865             d = n.check(Monitor())
1866             def _check_filenode_results(r):
1867                 self.failUnless(r.is_healthy())
1868             d.addCallback(_check_filenode_results)
1869             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1870             d.addCallback(_check_filenode_results)
1871             return d
1872         d.addCallback(_got_chk_filenode)
1873
1874         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1875         def _got_lit_filenode(n):
1876             self.failUnless(isinstance(n, LiteralFileNode))
1877             d = n.check(Monitor())
1878             def _check_lit_filenode_results(r):
1879                 self.failUnlessEqual(r, None)
1880             d.addCallback(_check_lit_filenode_results)
1881             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1882             d.addCallback(_check_lit_filenode_results)
1883             return d
1884         d.addCallback(_got_lit_filenode)
1885         return d