]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
472f106df8cd3d0a3832f45e19ef0aaeb32f8627
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1
2 from base64 import b32encode
3 import os, re, sys, time, simplejson
4 from cStringIO import StringIO
5
6 from twisted.trial import unittest
7 from twisted.internet import defer
8 from twisted.internet import threads # CLI tests use deferToThread
9
10 import allmydata
11 from allmydata import uri
12 from allmydata.storage.mutable import MutableShareFile
13 from allmydata.storage.server import si_a2b
14 from allmydata.immutable import offloaded, upload
15 from allmydata.immutable.literal import LiteralFileNode
16 from allmydata.immutable.filenode import ImmutableFileNode
17 from allmydata.util import idlib, mathutil
18 from allmydata.util import log, base32
19 from allmydata.util.verlib import NormalizedVersion
20 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding
21 from allmydata.util.fileutil import abspath_expanduser_unicode
22 from allmydata.util.consumer import MemoryConsumer, download_to_data
23 from allmydata.scripts import runner
24 from allmydata.interfaces import IDirectoryNode, IFileNode, \
25      NoSuchChildError, NoSharesError
26 from allmydata.monitor import Monitor
27 from allmydata.mutable.common import NotWriteableError
28 from allmydata.mutable import layout as mutable_layout
29 from allmydata.mutable.publish import MutableData
30
31 import foolscap
32 from foolscap.api import DeadReferenceError, fireEventually
33 from twisted.python.failure import Failure
34 from twisted.web.client import getPage
35 from twisted.web.error import Error
36
37 from allmydata.test.common import SystemTestMixin
38
39 # TODO: move this to common or common_util
40 from allmydata.test.test_runner import RunBinTahoeMixin
41
42 LARGE_DATA = """
43 This is some data to publish to the remote grid.., which needs to be large
44 enough to not fit inside a LIT uri.
45 """
46
47 class CountingDataUploadable(upload.Data):
48     bytes_read = 0
49     interrupt_after = None
50     interrupt_after_d = None
51
52     def read(self, length):
53         self.bytes_read += length
54         if self.interrupt_after is not None:
55             if self.bytes_read > self.interrupt_after:
56                 self.interrupt_after = None
57                 self.interrupt_after_d.callback(self)
58         return upload.Data.read(self, length)
59
60 class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
61     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
62
63     def test_connections(self):
64         self.basedir = "system/SystemTest/test_connections"
65         d = self.set_up_nodes()
66         self.extra_node = None
67         d.addCallback(lambda res: self.add_extra_node(self.numclients))
68         def _check(extra_node):
69             self.extra_node = extra_node
70             for c in self.clients:
71                 all_peerids = c.get_storage_broker().get_all_serverids()
72                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
73                 sb = c.storage_broker
74                 permuted_peers = sb.get_servers_for_psi("a")
75                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
76
77         d.addCallback(_check)
78         def _shutdown_extra_node(res):
79             if self.extra_node:
80                 return self.extra_node.stopService()
81             return res
82         d.addBoth(_shutdown_extra_node)
83         return d
84     # test_connections is subsumed by test_upload_and_download, and takes
85     # quite a while to run on a slow machine (because of all the TLS
86     # connections that must be established). If we ever rework the introducer
87     # code to such an extent that we're not sure if it works anymore, we can
88     # reinstate this test until it does.
89     del test_connections
90
91     def test_upload_and_download_random_key(self):
92         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
93         return self._test_upload_and_download(convergence=None)
94
95     def test_upload_and_download_convergent(self):
96         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
97         return self._test_upload_and_download(convergence="some convergence string")
98
99     def _test_upload_and_download(self, convergence):
100         # we use 4000 bytes of data, which will result in about 400k written
101         # to disk among all our simulated nodes
102         DATA = "Some data to upload\n" * 200
103         d = self.set_up_nodes()
104         def _check_connections(res):
105             for c in self.clients:
106                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
107                 all_peerids = c.get_storage_broker().get_all_serverids()
108                 self.failUnlessEqual(len(all_peerids), self.numclients)
109                 sb = c.storage_broker
110                 permuted_peers = sb.get_servers_for_psi("a")
111                 self.failUnlessEqual(len(permuted_peers), self.numclients)
112         d.addCallback(_check_connections)
113
114         def _do_upload(res):
115             log.msg("UPLOADING")
116             u = self.clients[0].getServiceNamed("uploader")
117             self.uploader = u
118             # we crank the max segsize down to 1024b for the duration of this
119             # test, so we can exercise multiple segments. It is important
120             # that this is not a multiple of the segment size, so that the
121             # tail segment is not the same length as the others. This actualy
122             # gets rounded up to 1025 to be a multiple of the number of
123             # required shares (since we use 25 out of 100 FEC).
124             up = upload.Data(DATA, convergence=convergence)
125             up.max_segment_size = 1024
126             d1 = u.upload(up)
127             return d1
128         d.addCallback(_do_upload)
129         def _upload_done(results):
130             theuri = results.get_uri()
131             log.msg("upload finished: uri is %s" % (theuri,))
132             self.uri = theuri
133             assert isinstance(self.uri, str), self.uri
134             self.cap = uri.from_string(self.uri)
135             self.n = self.clients[1].create_node_from_uri(self.uri)
136         d.addCallback(_upload_done)
137
138         def _upload_again(res):
139             # Upload again. If using convergent encryption then this ought to be
140             # short-circuited, however with the way we currently generate URIs
141             # (i.e. because they include the roothash), we have to do all of the
142             # encoding work, and only get to save on the upload part.
143             log.msg("UPLOADING AGAIN")
144             up = upload.Data(DATA, convergence=convergence)
145             up.max_segment_size = 1024
146             return self.uploader.upload(up)
147         d.addCallback(_upload_again)
148
149         def _download_to_data(res):
150             log.msg("DOWNLOADING")
151             return download_to_data(self.n)
152         d.addCallback(_download_to_data)
153         def _download_to_data_done(data):
154             log.msg("download finished")
155             self.failUnlessEqual(data, DATA)
156         d.addCallback(_download_to_data_done)
157
158         def _test_read(res):
159             n = self.clients[1].create_node_from_uri(self.uri)
160             d = download_to_data(n)
161             def _read_done(data):
162                 self.failUnlessEqual(data, DATA)
163             d.addCallback(_read_done)
164             d.addCallback(lambda ign:
165                           n.read(MemoryConsumer(), offset=1, size=4))
166             def _read_portion_done(mc):
167                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
168             d.addCallback(_read_portion_done)
169             d.addCallback(lambda ign:
170                           n.read(MemoryConsumer(), offset=2, size=None))
171             def _read_tail_done(mc):
172                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
173             d.addCallback(_read_tail_done)
174             d.addCallback(lambda ign:
175                           n.read(MemoryConsumer(), size=len(DATA)+1000))
176             def _read_too_much(mc):
177                 self.failUnlessEqual("".join(mc.chunks), DATA)
178             d.addCallback(_read_too_much)
179
180             return d
181         d.addCallback(_test_read)
182
183         def _test_bad_read(res):
184             bad_u = uri.from_string_filenode(self.uri)
185             bad_u.key = self.flip_bit(bad_u.key)
186             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
187             # this should cause an error during download
188
189             d = self.shouldFail2(NoSharesError, "'download bad node'",
190                                  None,
191                                  bad_n.read, MemoryConsumer(), offset=2)
192             return d
193         d.addCallback(_test_bad_read)
194
195         def _download_nonexistent_uri(res):
196             baduri = self.mangle_uri(self.uri)
197             badnode = self.clients[1].create_node_from_uri(baduri)
198             log.msg("about to download non-existent URI", level=log.UNUSUAL,
199                     facility="tahoe.tests")
200             d1 = download_to_data(badnode)
201             def _baduri_should_fail(res):
202                 log.msg("finished downloading non-existent URI",
203                         level=log.UNUSUAL, facility="tahoe.tests")
204                 self.failUnless(isinstance(res, Failure))
205                 self.failUnless(res.check(NoSharesError),
206                                 "expected NoSharesError, got %s" % res)
207             d1.addBoth(_baduri_should_fail)
208             return d1
209         d.addCallback(_download_nonexistent_uri)
210
211         # add a new node, which doesn't accept shares, and only uses the
212         # helper for upload.
213         d.addCallback(lambda res: self.add_extra_node(self.numclients,
214                                                       self.helper_furl,
215                                                       add_to_sparent=True))
216         def _added(extra_node):
217             self.extra_node = extra_node
218             self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
219         d.addCallback(_added)
220
221         def _has_helper():
222             uploader = self.extra_node.getServiceNamed("uploader")
223             furl, connected = uploader.get_helper_info()
224             return connected
225         d.addCallback(lambda ign: self.poll(_has_helper))
226
227         HELPER_DATA = "Data that needs help to upload" * 1000
228         def _upload_with_helper(res):
229             u = upload.Data(HELPER_DATA, convergence=convergence)
230             d = self.extra_node.upload(u)
231             def _uploaded(results):
232                 n = self.clients[1].create_node_from_uri(results.get_uri())
233                 return download_to_data(n)
234             d.addCallback(_uploaded)
235             def _check(newdata):
236                 self.failUnlessEqual(newdata, HELPER_DATA)
237             d.addCallback(_check)
238             return d
239         d.addCallback(_upload_with_helper)
240
241         def _upload_duplicate_with_helper(res):
242             u = upload.Data(HELPER_DATA, convergence=convergence)
243             u.debug_stash_RemoteEncryptedUploadable = True
244             d = self.extra_node.upload(u)
245             def _uploaded(results):
246                 n = self.clients[1].create_node_from_uri(results.get_uri())
247                 return download_to_data(n)
248             d.addCallback(_uploaded)
249             def _check(newdata):
250                 self.failUnlessEqual(newdata, HELPER_DATA)
251                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
252                             "uploadable started uploading, should have been avoided")
253             d.addCallback(_check)
254             return d
255         if convergence is not None:
256             d.addCallback(_upload_duplicate_with_helper)
257
258         d.addCallback(fireEventually)
259
260         def _upload_resumable(res):
261             DATA = "Data that needs help to upload and gets interrupted" * 1000
262             u1 = CountingDataUploadable(DATA, convergence=convergence)
263             u2 = CountingDataUploadable(DATA, convergence=convergence)
264
265             # we interrupt the connection after about 5kB by shutting down
266             # the helper, then restarting it.
267             u1.interrupt_after = 5000
268             u1.interrupt_after_d = defer.Deferred()
269             bounced_d = defer.Deferred()
270             def _do_bounce(res):
271                 d = self.bounce_client(0)
272                 d.addBoth(bounced_d.callback)
273             u1.interrupt_after_d.addCallback(_do_bounce)
274
275             # sneak into the helper and reduce its chunk size, so that our
276             # debug_interrupt will sever the connection on about the fifth
277             # chunk fetched. This makes sure that we've started to write the
278             # new shares before we abandon them, which exercises the
279             # abort/delete-partial-share code. TODO: find a cleaner way to do
280             # this. I know that this will affect later uses of the helper in
281             # this same test run, but I'm not currently worried about it.
282             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
283
284             upload_d = self.extra_node.upload(u1)
285             # The upload will start, and bounce_client() will be called after
286             # about 5kB. bounced_d will fire after bounce_client() finishes
287             # shutting down and restarting the node.
288             d = bounced_d
289             def _bounced(ign):
290                 # By this point, the upload should have failed because of the
291                 # interruption. upload_d will fire in a moment
292                 def _should_not_finish(res):
293                     self.fail("interrupted upload should have failed, not"
294                               " finished with result %s" % (res,))
295                 def _interrupted(f):
296                     f.trap(DeadReferenceError)
297                     # make sure we actually interrupted it before finishing
298                     # the file
299                     self.failUnless(u1.bytes_read < len(DATA),
300                                     "read %d out of %d total" %
301                                     (u1.bytes_read, len(DATA)))
302                 upload_d.addCallbacks(_should_not_finish, _interrupted)
303                 return upload_d
304             d.addCallback(_bounced)
305
306             def _disconnected(res):
307                 # check to make sure the storage servers aren't still hanging
308                 # on to the partial share: their incoming/ directories should
309                 # now be empty.
310                 log.msg("disconnected", level=log.NOISY,
311                         facility="tahoe.test.test_system")
312                 for i in range(self.numclients):
313                     incdir = os.path.join(self.getdir("client%d" % i),
314                                           "storage", "shares", "incoming")
315                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
316             d.addCallback(_disconnected)
317
318             d.addCallback(lambda res:
319                           log.msg("wait_for_helper", level=log.NOISY,
320                                   facility="tahoe.test.test_system"))
321             # then we need to wait for the extra node to reestablish its
322             # connection to the helper.
323             d.addCallback(lambda ign: self.poll(_has_helper))
324
325             d.addCallback(lambda res:
326                           log.msg("uploading again", level=log.NOISY,
327                                   facility="tahoe.test.test_system"))
328             d.addCallback(lambda res: self.extra_node.upload(u2))
329
330             def _uploaded(results):
331                 cap = results.get_uri()
332                 log.msg("Second upload complete", level=log.NOISY,
333                         facility="tahoe.test.test_system")
334
335                 # this is really bytes received rather than sent, but it's
336                 # convenient and basically measures the same thing
337                 bytes_sent = results.get_ciphertext_fetched()
338                 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
339
340                 # We currently don't support resumption of upload if the data is
341                 # encrypted with a random key.  (Because that would require us
342                 # to store the key locally and re-use it on the next upload of
343                 # this file, which isn't a bad thing to do, but we currently
344                 # don't do it.)
345                 if convergence is not None:
346                     # Make sure we did not have to read the whole file the
347                     # second time around .
348                     self.failUnless(bytes_sent < len(DATA),
349                                     "resumption didn't save us any work:"
350                                     " read %r bytes out of %r total" %
351                                     (bytes_sent, len(DATA)))
352                 else:
353                     # Make sure we did have to read the whole file the second
354                     # time around -- because the one that we partially uploaded
355                     # earlier was encrypted with a different random key.
356                     self.failIf(bytes_sent < len(DATA),
357                                 "resumption saved us some work even though we were using random keys:"
358                                 " read %r bytes out of %r total" %
359                                 (bytes_sent, len(DATA)))
360                 n = self.clients[1].create_node_from_uri(cap)
361                 return download_to_data(n)
362             d.addCallback(_uploaded)
363
364             def _check(newdata):
365                 self.failUnlessEqual(newdata, DATA)
366                 # If using convergent encryption, then also check that the
367                 # helper has removed the temp file from its directories.
368                 if convergence is not None:
369                     basedir = os.path.join(self.getdir("client0"), "helper")
370                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
371                     self.failUnlessEqual(files, [])
372                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
373                     self.failUnlessEqual(files, [])
374             d.addCallback(_check)
375             return d
376         d.addCallback(_upload_resumable)
377
378         def _grab_stats(ignored):
379             # the StatsProvider doesn't normally publish a FURL:
380             # instead it passes a live reference to the StatsGatherer
381             # (if and when it connects). To exercise the remote stats
382             # interface, we manually publish client0's StatsProvider
383             # and use client1 to query it.
384             sp = self.clients[0].stats_provider
385             sp_furl = self.clients[0].tub.registerReference(sp)
386             d = self.clients[1].tub.getReference(sp_furl)
387             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
388             def _got_stats(stats):
389                 #print "STATS"
390                 #from pprint import pprint
391                 #pprint(stats)
392                 s = stats["stats"]
393                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
394                 c = stats["counters"]
395                 self.failUnless("storage_server.allocate" in c)
396             d.addCallback(_got_stats)
397             return d
398         d.addCallback(_grab_stats)
399
400         return d
401
402     def _find_all_shares(self, basedir):
403         shares = []
404         for (dirpath, dirnames, filenames) in os.walk(basedir):
405             if "storage" not in dirpath:
406                 continue
407             if not filenames:
408                 continue
409             pieces = dirpath.split(os.sep)
410             if (len(pieces) >= 5
411                 and pieces[-4] == "storage"
412                 and pieces[-3] == "shares"):
413                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
414                 # are sharefiles here
415                 assert pieces[-5].startswith("client")
416                 client_num = int(pieces[-5][-1])
417                 storage_index_s = pieces[-1]
418                 storage_index = si_a2b(storage_index_s)
419                 for sharename in filenames:
420                     shnum = int(sharename)
421                     filename = os.path.join(dirpath, sharename)
422                     data = (client_num, storage_index, filename, shnum)
423                     shares.append(data)
424         if not shares:
425             self.fail("unable to find any share files in %s" % basedir)
426         return shares
427
428     def _corrupt_mutable_share(self, filename, which):
429         msf = MutableShareFile(filename)
430         datav = msf.readv([ (0, 1000000) ])
431         final_share = datav[0]
432         assert len(final_share) < 1000000 # ought to be truncated
433         pieces = mutable_layout.unpack_share(final_share)
434         (seqnum, root_hash, IV, k, N, segsize, datalen,
435          verification_key, signature, share_hash_chain, block_hash_tree,
436          share_data, enc_privkey) = pieces
437
438         if which == "seqnum":
439             seqnum = seqnum + 15
440         elif which == "R":
441             root_hash = self.flip_bit(root_hash)
442         elif which == "IV":
443             IV = self.flip_bit(IV)
444         elif which == "segsize":
445             segsize = segsize + 15
446         elif which == "pubkey":
447             verification_key = self.flip_bit(verification_key)
448         elif which == "signature":
449             signature = self.flip_bit(signature)
450         elif which == "share_hash_chain":
451             nodenum = share_hash_chain.keys()[0]
452             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
453         elif which == "block_hash_tree":
454             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
455         elif which == "share_data":
456             share_data = self.flip_bit(share_data)
457         elif which == "encprivkey":
458             enc_privkey = self.flip_bit(enc_privkey)
459
460         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
461                                             segsize, datalen)
462         final_share = mutable_layout.pack_share(prefix,
463                                                 verification_key,
464                                                 signature,
465                                                 share_hash_chain,
466                                                 block_hash_tree,
467                                                 share_data,
468                                                 enc_privkey)
469         msf.writev( [(0, final_share)], None)
470
471
472     def test_mutable(self):
473         self.basedir = "system/SystemTest/test_mutable"
474         DATA = "initial contents go here."  # 25 bytes % 3 != 0
475         DATA_uploadable = MutableData(DATA)
476         NEWDATA = "new contents yay"
477         NEWDATA_uploadable = MutableData(NEWDATA)
478         NEWERDATA = "this is getting old"
479         NEWERDATA_uploadable = MutableData(NEWERDATA)
480
481         d = self.set_up_nodes(use_key_generator=True)
482
483         def _create_mutable(res):
484             c = self.clients[0]
485             log.msg("starting create_mutable_file")
486             d1 = c.create_mutable_file(DATA_uploadable)
487             def _done(res):
488                 log.msg("DONE: %s" % (res,))
489                 self._mutable_node_1 = res
490             d1.addCallback(_done)
491             return d1
492         d.addCallback(_create_mutable)
493
494         def _test_debug(res):
495             # find a share. It is important to run this while there is only
496             # one slot in the grid.
497             shares = self._find_all_shares(self.basedir)
498             (client_num, storage_index, filename, shnum) = shares[0]
499             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
500                     % filename)
501             log.msg(" for clients[%d]" % client_num)
502
503             out,err = StringIO(), StringIO()
504             rc = runner.runner(["debug", "dump-share", "--offsets",
505                                 filename],
506                                stdout=out, stderr=err)
507             output = out.getvalue()
508             self.failUnlessEqual(rc, 0)
509             try:
510                 self.failUnless("Mutable slot found:\n" in output)
511                 self.failUnless("share_type: SDMF\n" in output)
512                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
513                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
514                 self.failUnless(" num_extra_leases: 0\n" in output)
515                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
516                                 in output)
517                 self.failUnless(" SDMF contents:\n" in output)
518                 self.failUnless("  seqnum: 1\n" in output)
519                 self.failUnless("  required_shares: 3\n" in output)
520                 self.failUnless("  total_shares: 10\n" in output)
521                 self.failUnless("  segsize: 27\n" in output, (output, filename))
522                 self.failUnless("  datalen: 25\n" in output)
523                 # the exact share_hash_chain nodes depends upon the sharenum,
524                 # and is more of a hassle to compute than I want to deal with
525                 # now
526                 self.failUnless("  share_hash_chain: " in output)
527                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
528                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
529                             base32.b2a(storage_index))
530                 self.failUnless(expected in output)
531             except unittest.FailTest:
532                 print
533                 print "dump-share output was:"
534                 print output
535                 raise
536         d.addCallback(_test_debug)
537
538         # test retrieval
539
540         # first, let's see if we can use the existing node to retrieve the
541         # contents. This allows it to use the cached pubkey and maybe the
542         # latest-known sharemap.
543
544         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
545         def _check_download_1(res):
546             self.failUnlessEqual(res, DATA)
547             # now we see if we can retrieve the data from a new node,
548             # constructed using the URI of the original one. We do this test
549             # on the same client that uploaded the data.
550             uri = self._mutable_node_1.get_uri()
551             log.msg("starting retrieve1")
552             newnode = self.clients[0].create_node_from_uri(uri)
553             newnode_2 = self.clients[0].create_node_from_uri(uri)
554             self.failUnlessIdentical(newnode, newnode_2)
555             return newnode.download_best_version()
556         d.addCallback(_check_download_1)
557
558         def _check_download_2(res):
559             self.failUnlessEqual(res, DATA)
560             # same thing, but with a different client
561             uri = self._mutable_node_1.get_uri()
562             newnode = self.clients[1].create_node_from_uri(uri)
563             log.msg("starting retrieve2")
564             d1 = newnode.download_best_version()
565             d1.addCallback(lambda res: (res, newnode))
566             return d1
567         d.addCallback(_check_download_2)
568
569         def _check_download_3((res, newnode)):
570             self.failUnlessEqual(res, DATA)
571             # replace the data
572             log.msg("starting replace1")
573             d1 = newnode.overwrite(NEWDATA_uploadable)
574             d1.addCallback(lambda res: newnode.download_best_version())
575             return d1
576         d.addCallback(_check_download_3)
577
578         def _check_download_4(res):
579             self.failUnlessEqual(res, NEWDATA)
580             # now create an even newer node and replace the data on it. This
581             # new node has never been used for download before.
582             uri = self._mutable_node_1.get_uri()
583             newnode1 = self.clients[2].create_node_from_uri(uri)
584             newnode2 = self.clients[3].create_node_from_uri(uri)
585             self._newnode3 = self.clients[3].create_node_from_uri(uri)
586             log.msg("starting replace2")
587             d1 = newnode1.overwrite(NEWERDATA_uploadable)
588             d1.addCallback(lambda res: newnode2.download_best_version())
589             return d1
590         d.addCallback(_check_download_4)
591
592         def _check_download_5(res):
593             log.msg("finished replace2")
594             self.failUnlessEqual(res, NEWERDATA)
595         d.addCallback(_check_download_5)
596
597         def _corrupt_shares(res):
598             # run around and flip bits in all but k of the shares, to test
599             # the hash checks
600             shares = self._find_all_shares(self.basedir)
601             ## sort by share number
602             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
603             where = dict([ (shnum, filename)
604                            for (client_num, storage_index, filename, shnum)
605                            in shares ])
606             assert len(where) == 10 # this test is designed for 3-of-10
607             for shnum, filename in where.items():
608                 # shares 7,8,9 are left alone. read will check
609                 # (share_hash_chain, block_hash_tree, share_data). New
610                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
611                 # segsize, signature).
612                 if shnum == 0:
613                     # read: this will trigger "pubkey doesn't match
614                     # fingerprint".
615                     self._corrupt_mutable_share(filename, "pubkey")
616                     self._corrupt_mutable_share(filename, "encprivkey")
617                 elif shnum == 1:
618                     # triggers "signature is invalid"
619                     self._corrupt_mutable_share(filename, "seqnum")
620                 elif shnum == 2:
621                     # triggers "signature is invalid"
622                     self._corrupt_mutable_share(filename, "R")
623                 elif shnum == 3:
624                     # triggers "signature is invalid"
625                     self._corrupt_mutable_share(filename, "segsize")
626                 elif shnum == 4:
627                     self._corrupt_mutable_share(filename, "share_hash_chain")
628                 elif shnum == 5:
629                     self._corrupt_mutable_share(filename, "block_hash_tree")
630                 elif shnum == 6:
631                     self._corrupt_mutable_share(filename, "share_data")
632                 # other things to correct: IV, signature
633                 # 7,8,9 are left alone
634
635                 # note that initial_query_count=5 means that we'll hit the
636                 # first 5 servers in effectively random order (based upon
637                 # response time), so we won't necessarily ever get a "pubkey
638                 # doesn't match fingerprint" error (if we hit shnum>=1 before
639                 # shnum=0, we pull the pubkey from there). To get repeatable
640                 # specific failures, we need to set initial_query_count=1,
641                 # but of course that will change the sequencing behavior of
642                 # the retrieval process. TODO: find a reasonable way to make
643                 # this a parameter, probably when we expand this test to test
644                 # for one failure mode at a time.
645
646                 # when we retrieve this, we should get three signature
647                 # failures (where we've mangled seqnum, R, and segsize). The
648                 # pubkey mangling
649         d.addCallback(_corrupt_shares)
650
651         d.addCallback(lambda res: self._newnode3.download_best_version())
652         d.addCallback(_check_download_5)
653
654         def _check_empty_file(res):
655             # make sure we can create empty files, this usually screws up the
656             # segsize math
657             d1 = self.clients[2].create_mutable_file(MutableData(""))
658             d1.addCallback(lambda newnode: newnode.download_best_version())
659             d1.addCallback(lambda res: self.failUnlessEqual("", res))
660             return d1
661         d.addCallback(_check_empty_file)
662
663         d.addCallback(lambda res: self.clients[0].create_dirnode())
664         def _created_dirnode(dnode):
665             log.msg("_created_dirnode(%s)" % (dnode,))
666             d1 = dnode.list()
667             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
668             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
669             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
670             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
671             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
672             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
673             d1.addCallback(lambda res: dnode.build_manifest().when_done())
674             d1.addCallback(lambda res:
675                            self.failUnlessEqual(len(res["manifest"]), 1))
676             return d1
677         d.addCallback(_created_dirnode)
678
679         def wait_for_c3_kg_conn():
680             return self.clients[3]._key_generator is not None
681         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
682
683         def check_kg_poolsize(junk, size_delta):
684             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
685                                  self.key_generator_svc.key_generator.pool_size + size_delta)
686
687         d.addCallback(check_kg_poolsize, 0)
688         d.addCallback(lambda junk:
689             self.clients[3].create_mutable_file(MutableData('hello, world')))
690         d.addCallback(check_kg_poolsize, -1)
691         d.addCallback(lambda junk: self.clients[3].create_dirnode())
692         d.addCallback(check_kg_poolsize, -2)
693         # use_helper induces use of clients[3], which is the using-key_gen client
694         d.addCallback(lambda junk:
695                       self.POST("uri?t=mkdir&name=george", use_helper=True))
696         d.addCallback(check_kg_poolsize, -3)
697
698         return d
699
700     def flip_bit(self, good):
701         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
702
703     def mangle_uri(self, gooduri):
704         # change the key, which changes the storage index, which means we'll
705         # be asking about the wrong file, so nobody will have any shares
706         u = uri.from_string(gooduri)
707         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
708                             uri_extension_hash=u.uri_extension_hash,
709                             needed_shares=u.needed_shares,
710                             total_shares=u.total_shares,
711                             size=u.size)
712         return u2.to_string()
713
714     # TODO: add a test which mangles the uri_extension_hash instead, and
715     # should fail due to not being able to get a valid uri_extension block.
716     # Also a test which sneakily mangles the uri_extension block to change
717     # some of the validation data, so it will fail in the post-download phase
718     # when the file's crypttext integrity check fails. Do the same thing for
719     # the key, which should cause the download to fail the post-download
720     # plaintext_hash check.
721
722     def test_filesystem(self):
723         self.basedir = "system/SystemTest/test_filesystem"
724         self.data = LARGE_DATA
725         d = self.set_up_nodes(use_stats_gatherer=True)
726         def _new_happy_semantics(ign):
727             for c in self.clients:
728                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
729         d.addCallback(_new_happy_semantics)
730         d.addCallback(self._test_introweb)
731         d.addCallback(self.log, "starting publish")
732         d.addCallback(self._do_publish1)
733         d.addCallback(self._test_runner)
734         d.addCallback(self._do_publish2)
735         # at this point, we have the following filesystem (where "R" denotes
736         # self._root_directory_uri):
737         # R
738         # R/subdir1
739         # R/subdir1/mydata567
740         # R/subdir1/subdir2/
741         # R/subdir1/subdir2/mydata992
742
743         d.addCallback(lambda res: self.bounce_client(0))
744         d.addCallback(self.log, "bounced client0")
745
746         d.addCallback(self._check_publish1)
747         d.addCallback(self.log, "did _check_publish1")
748         d.addCallback(self._check_publish2)
749         d.addCallback(self.log, "did _check_publish2")
750         d.addCallback(self._do_publish_private)
751         d.addCallback(self.log, "did _do_publish_private")
752         # now we also have (where "P" denotes a new dir):
753         #  P/personal/sekrit data
754         #  P/s2-rw -> /subdir1/subdir2/
755         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
756         d.addCallback(self._check_publish_private)
757         d.addCallback(self.log, "did _check_publish_private")
758         d.addCallback(self._test_web)
759         d.addCallback(self._test_control)
760         d.addCallback(self._test_cli)
761         # P now has four top-level children:
762         # P/personal/sekrit data
763         # P/s2-ro/
764         # P/s2-rw/
765         # P/test_put/  (empty)
766         d.addCallback(self._test_checker)
767         return d
768
769     def _test_introweb(self, res):
770         d = getPage(self.introweb_url, method="GET", followRedirect=True)
771         def _check(res):
772             try:
773                 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res)
774                 verstr = str(allmydata.__version__)
775
776                 # The Python "rational version numbering" convention
777                 # disallows "-r$REV" but allows ".post$REV"
778                 # instead. Eventually we'll probably move to
779                 # that. When we do, this test won't go red:
780                 ix = verstr.rfind('-r')
781                 if ix != -1:
782                     altverstr = verstr[:ix] + '.post' + verstr[ix+2:]
783                 else:
784                     ix = verstr.rfind('.post')
785                     if ix != -1:
786                         altverstr = verstr[:ix] + '-r' + verstr[ix+5:]
787                     else:
788                         altverstr = verstr
789
790                 appverstr = "%s: %s" % (allmydata.__appname__, verstr)
791                 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
792
793                 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
794                 self.failUnless("Announcement Summary: storage: 5" in res)
795                 self.failUnless("Subscription Summary: storage: 5" in res)
796                 self.failUnless("tahoe.css" in res)
797             except unittest.FailTest:
798                 print
799                 print "GET %s output was:" % self.introweb_url
800                 print res
801                 raise
802         d.addCallback(_check)
803         # make sure it serves the CSS too
804         d.addCallback(lambda res:
805                       getPage(self.introweb_url+"tahoe.css", method="GET"))
806         d.addCallback(lambda res:
807                       getPage(self.introweb_url + "?t=json",
808                               method="GET", followRedirect=True))
809         def _check_json(res):
810             data = simplejson.loads(res)
811             try:
812                 self.failUnlessEqual(data["subscription_summary"],
813                                      {"storage": 5})
814                 self.failUnlessEqual(data["announcement_summary"],
815                                      {"storage": 5})
816                 self.failUnlessEqual(data["announcement_distinct_hosts"],
817                                      {"storage": 1})
818             except unittest.FailTest:
819                 print
820                 print "GET %s?t=json output was:" % self.introweb_url
821                 print res
822                 raise
823         d.addCallback(_check_json)
824         return d
825
826     def _do_publish1(self, res):
827         ut = upload.Data(self.data, convergence=None)
828         c0 = self.clients[0]
829         d = c0.create_dirnode()
830         def _made_root(new_dirnode):
831             self._root_directory_uri = new_dirnode.get_uri()
832             return c0.create_node_from_uri(self._root_directory_uri)
833         d.addCallback(_made_root)
834         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
835         def _made_subdir1(subdir1_node):
836             self._subdir1_node = subdir1_node
837             d1 = subdir1_node.add_file(u"mydata567", ut)
838             d1.addCallback(self.log, "publish finished")
839             def _stash_uri(filenode):
840                 self.uri = filenode.get_uri()
841                 assert isinstance(self.uri, str), (self.uri, filenode)
842             d1.addCallback(_stash_uri)
843             return d1
844         d.addCallback(_made_subdir1)
845         return d
846
847     def _do_publish2(self, res):
848         ut = upload.Data(self.data, convergence=None)
849         d = self._subdir1_node.create_subdirectory(u"subdir2")
850         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
851         return d
852
853     def log(self, res, *args, **kwargs):
854         # print "MSG: %s  RES: %s" % (msg, args)
855         log.msg(*args, **kwargs)
856         return res
857
858     def _do_publish_private(self, res):
859         self.smalldata = "sssh, very secret stuff"
860         ut = upload.Data(self.smalldata, convergence=None)
861         d = self.clients[0].create_dirnode()
862         d.addCallback(self.log, "GOT private directory")
863         def _got_new_dir(privnode):
864             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
865             d1 = privnode.create_subdirectory(u"personal")
866             d1.addCallback(self.log, "made P/personal")
867             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
868             d1.addCallback(self.log, "made P/personal/sekrit data")
869             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
870             def _got_s2(s2node):
871                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
872                                       s2node.get_readonly_uri())
873                 d2.addCallback(lambda node:
874                                privnode.set_uri(u"s2-ro",
875                                                 s2node.get_readonly_uri(),
876                                                 s2node.get_readonly_uri()))
877                 return d2
878             d1.addCallback(_got_s2)
879             d1.addCallback(lambda res: privnode)
880             return d1
881         d.addCallback(_got_new_dir)
882         return d
883
884     def _check_publish1(self, res):
885         # this one uses the iterative API
886         c1 = self.clients[1]
887         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
888         d.addCallback(self.log, "check_publish1 got /")
889         d.addCallback(lambda root: root.get(u"subdir1"))
890         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
891         d.addCallback(lambda filenode: download_to_data(filenode))
892         d.addCallback(self.log, "get finished")
893         def _get_done(data):
894             self.failUnlessEqual(data, self.data)
895         d.addCallback(_get_done)
896         return d
897
898     def _check_publish2(self, res):
899         # this one uses the path-based API
900         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
901         d = rootnode.get_child_at_path(u"subdir1")
902         d.addCallback(lambda dirnode:
903                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
904         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
905         d.addCallback(lambda filenode: download_to_data(filenode))
906         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
907
908         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
909         def _got_filenode(filenode):
910             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
911             assert fnode == filenode
912         d.addCallback(_got_filenode)
913         return d
914
915     def _check_publish_private(self, resnode):
916         # this one uses the path-based API
917         self._private_node = resnode
918
919         d = self._private_node.get_child_at_path(u"personal")
920         def _got_personal(personal):
921             self._personal_node = personal
922             return personal
923         d.addCallback(_got_personal)
924
925         d.addCallback(lambda dirnode:
926                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
927         def get_path(path):
928             return self._private_node.get_child_at_path(path)
929
930         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
931         d.addCallback(lambda filenode: download_to_data(filenode))
932         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
933         d.addCallback(lambda res: get_path(u"s2-rw"))
934         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
935         d.addCallback(lambda res: get_path(u"s2-ro"))
936         def _got_s2ro(dirnode):
937             self.failUnless(dirnode.is_mutable(), dirnode)
938             self.failUnless(dirnode.is_readonly(), dirnode)
939             d1 = defer.succeed(None)
940             d1.addCallback(lambda res: dirnode.list())
941             d1.addCallback(self.log, "dirnode.list")
942
943             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
944
945             d1.addCallback(self.log, "doing add_file(ro)")
946             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
947             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
948
949             d1.addCallback(self.log, "doing get(ro)")
950             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
951             d1.addCallback(lambda filenode:
952                            self.failUnless(IFileNode.providedBy(filenode)))
953
954             d1.addCallback(self.log, "doing delete(ro)")
955             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
956
957             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
958
959             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
960
961             personal = self._personal_node
962             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
963
964             d1.addCallback(self.log, "doing move_child_to(ro)2")
965             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
966
967             d1.addCallback(self.log, "finished with _got_s2ro")
968             return d1
969         d.addCallback(_got_s2ro)
970         def _got_home(dummy):
971             home = self._private_node
972             personal = self._personal_node
973             d1 = defer.succeed(None)
974             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
975             d1.addCallback(lambda res:
976                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
977
978             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
979             d1.addCallback(lambda res:
980                            home.move_child_to(u"sekrit", home, u"sekrit data"))
981
982             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
983             d1.addCallback(lambda res:
984                            home.move_child_to(u"sekrit data", personal))
985
986             d1.addCallback(lambda res: home.build_manifest().when_done())
987             d1.addCallback(self.log, "manifest")
988             #  five items:
989             # P/
990             # P/personal/
991             # P/personal/sekrit data
992             # P/s2-rw  (same as P/s2-ro)
993             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
994             d1.addCallback(lambda res:
995                            self.failUnlessEqual(len(res["manifest"]), 5))
996             d1.addCallback(lambda res: home.start_deep_stats().when_done())
997             def _check_stats(stats):
998                 expected = {"count-immutable-files": 1,
999                             "count-mutable-files": 0,
1000                             "count-literal-files": 1,
1001                             "count-files": 2,
1002                             "count-directories": 3,
1003                             "size-immutable-files": 112,
1004                             "size-literal-files": 23,
1005                             #"size-directories": 616, # varies
1006                             #"largest-directory": 616,
1007                             "largest-directory-children": 3,
1008                             "largest-immutable-file": 112,
1009                             }
1010                 for k,v in expected.iteritems():
1011                     self.failUnlessEqual(stats[k], v,
1012                                          "stats[%s] was %s, not %s" %
1013                                          (k, stats[k], v))
1014                 self.failUnless(stats["size-directories"] > 1300,
1015                                 stats["size-directories"])
1016                 self.failUnless(stats["largest-directory"] > 800,
1017                                 stats["largest-directory"])
1018                 self.failUnlessEqual(stats["size-files-histogram"],
1019                                      [ (11, 31, 1), (101, 316, 1) ])
1020             d1.addCallback(_check_stats)
1021             return d1
1022         d.addCallback(_got_home)
1023         return d
1024
1025     def shouldFail(self, res, expected_failure, which, substring=None):
1026         if isinstance(res, Failure):
1027             res.trap(expected_failure)
1028             if substring:
1029                 self.failUnless(substring in str(res),
1030                                 "substring '%s' not in '%s'"
1031                                 % (substring, str(res)))
1032         else:
1033             self.fail("%s was supposed to raise %s, not get '%s'" %
1034                       (which, expected_failure, res))
1035
1036     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1037         assert substring is None or isinstance(substring, str)
1038         d = defer.maybeDeferred(callable, *args, **kwargs)
1039         def done(res):
1040             if isinstance(res, Failure):
1041                 res.trap(expected_failure)
1042                 if substring:
1043                     self.failUnless(substring in str(res),
1044                                     "substring '%s' not in '%s'"
1045                                     % (substring, str(res)))
1046             else:
1047                 self.fail("%s was supposed to raise %s, not get '%s'" %
1048                           (which, expected_failure, res))
1049         d.addBoth(done)
1050         return d
1051
1052     def PUT(self, urlpath, data):
1053         url = self.webish_url + urlpath
1054         return getPage(url, method="PUT", postdata=data)
1055
1056     def GET(self, urlpath, followRedirect=False):
1057         url = self.webish_url + urlpath
1058         return getPage(url, method="GET", followRedirect=followRedirect)
1059
1060     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1061         sepbase = "boogabooga"
1062         sep = "--" + sepbase
1063         form = []
1064         form.append(sep)
1065         form.append('Content-Disposition: form-data; name="_charset"')
1066         form.append('')
1067         form.append('UTF-8')
1068         form.append(sep)
1069         for name, value in fields.iteritems():
1070             if isinstance(value, tuple):
1071                 filename, value = value
1072                 form.append('Content-Disposition: form-data; name="%s"; '
1073                             'filename="%s"' % (name, filename.encode("utf-8")))
1074             else:
1075                 form.append('Content-Disposition: form-data; name="%s"' % name)
1076             form.append('')
1077             form.append(str(value))
1078             form.append(sep)
1079         form[-1] += "--"
1080         body = ""
1081         headers = {}
1082         if fields:
1083             body = "\r\n".join(form) + "\r\n"
1084             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1085         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1086
1087     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1088               use_helper=False):
1089         if use_helper:
1090             url = self.helper_webish_url + urlpath
1091         else:
1092             url = self.webish_url + urlpath
1093         return getPage(url, method="POST", postdata=body, headers=headers,
1094                        followRedirect=followRedirect)
1095
1096     def _test_web(self, res):
1097         base = self.webish_url
1098         public = "uri/" + self._root_directory_uri
1099         d = getPage(base)
1100         def _got_welcome(page):
1101             html = page.replace('\n', ' ')
1102             connected_re = "Connected to <span>%d</span>[ ]*of <span>%d</span> known storage servers" % (self.numclients, self.numclients)
1103             self.failUnless(re.search(connected_re, html),
1104                             "I didn't see the right '%s' message in:\n%s" % (connected_re, page))
1105             nodeid_re = "<th>Node ID:</th>[ ]*<td>%s</td>" % (re.escape(b32encode(self.clients[0].nodeid).lower()),)
1106             self.failUnless(re.search(nodeid_re, html),
1107                             "I didn't see the right '%s' message in:\n%s" % (nodeid_re, page))
1108             self.failUnless("Helper: 0 active uploads" in page)
1109         d.addCallback(_got_welcome)
1110         d.addCallback(self.log, "done with _got_welcome")
1111
1112         # get the welcome page from the node that uses the helper too
1113         d.addCallback(lambda res: getPage(self.helper_webish_url))
1114         def _got_welcome_helper(page):
1115             html = page.replace('\n', ' ')
1116             self.failUnless(re.search('<div class="status-indicator connected-yes"></div>[ ]*<div>Helper</div>', html), page)
1117             self.failUnlessIn("Not running helper", page)
1118         d.addCallback(_got_welcome_helper)
1119
1120         d.addCallback(lambda res: getPage(base + public))
1121         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1122         def _got_subdir1(page):
1123             # there ought to be an href for our file
1124             self.failUnlessIn('<td align="right">%d</td>' % len(self.data), page)
1125             self.failUnless(">mydata567</a>" in page)
1126         d.addCallback(_got_subdir1)
1127         d.addCallback(self.log, "done with _got_subdir1")
1128         d.addCallback(lambda res:
1129                       getPage(base + public + "/subdir1/mydata567"))
1130         def _got_data(page):
1131             self.failUnlessEqual(page, self.data)
1132         d.addCallback(_got_data)
1133
1134         # download from a URI embedded in a URL
1135         d.addCallback(self.log, "_get_from_uri")
1136         def _get_from_uri(res):
1137             return getPage(base + "uri/%s?filename=%s"
1138                            % (self.uri, "mydata567"))
1139         d.addCallback(_get_from_uri)
1140         def _got_from_uri(page):
1141             self.failUnlessEqual(page, self.data)
1142         d.addCallback(_got_from_uri)
1143
1144         # download from a URI embedded in a URL, second form
1145         d.addCallback(self.log, "_get_from_uri2")
1146         def _get_from_uri2(res):
1147             return getPage(base + "uri?uri=%s" % (self.uri,))
1148         d.addCallback(_get_from_uri2)
1149         d.addCallback(_got_from_uri)
1150
1151         # download from a bogus URI, make sure we get a reasonable error
1152         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1153         def _get_from_bogus_uri(res):
1154             d1 = getPage(base + "uri/%s?filename=%s"
1155                          % (self.mangle_uri(self.uri), "mydata567"))
1156             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1157                        "410")
1158             return d1
1159         d.addCallback(_get_from_bogus_uri)
1160         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1161
1162         # upload a file with PUT
1163         d.addCallback(self.log, "about to try PUT")
1164         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1165                                            "new.txt contents"))
1166         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1167         d.addCallback(self.failUnlessEqual, "new.txt contents")
1168         # and again with something large enough to use multiple segments,
1169         # and hopefully trigger pauseProducing too
1170         def _new_happy_semantics(ign):
1171             for c in self.clients:
1172                 # these get reset somewhere? Whatever.
1173                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1174         d.addCallback(_new_happy_semantics)
1175         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1176                                            "big" * 500000)) # 1.5MB
1177         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1178         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1179
1180         # can we replace files in place?
1181         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1182                                            "NEWER contents"))
1183         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1184         d.addCallback(self.failUnlessEqual, "NEWER contents")
1185
1186         # test unlinked POST
1187         d.addCallback(lambda res: self.POST("uri", t="upload",
1188                                             file=("new.txt", "data" * 10000)))
1189         # and again using the helper, which exercises different upload-status
1190         # display code
1191         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1192                                             file=("foo.txt", "data2" * 10000)))
1193
1194         # check that the status page exists
1195         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1196         def _got_status(res):
1197             # find an interesting upload and download to look at. LIT files
1198             # are not interesting.
1199             h = self.clients[0].get_history()
1200             for ds in h.list_all_download_statuses():
1201                 if ds.get_size() > 200:
1202                     self._down_status = ds.get_counter()
1203             for us in h.list_all_upload_statuses():
1204                 if us.get_size() > 200:
1205                     self._up_status = us.get_counter()
1206             rs = list(h.list_all_retrieve_statuses())[0]
1207             self._retrieve_status = rs.get_counter()
1208             ps = list(h.list_all_publish_statuses())[0]
1209             self._publish_status = ps.get_counter()
1210             us = list(h.list_all_mapupdate_statuses())[0]
1211             self._update_status = us.get_counter()
1212
1213             # and that there are some upload- and download- status pages
1214             return self.GET("status/up-%d" % self._up_status)
1215         d.addCallback(_got_status)
1216         def _got_up(res):
1217             return self.GET("status/down-%d" % self._down_status)
1218         d.addCallback(_got_up)
1219         def _got_down(res):
1220             return self.GET("status/mapupdate-%d" % self._update_status)
1221         d.addCallback(_got_down)
1222         def _got_update(res):
1223             return self.GET("status/publish-%d" % self._publish_status)
1224         d.addCallback(_got_update)
1225         def _got_publish(res):
1226             self.failUnlessIn("Publish Results", res)
1227             return self.GET("status/retrieve-%d" % self._retrieve_status)
1228         d.addCallback(_got_publish)
1229         def _got_retrieve(res):
1230             self.failUnlessIn("Retrieve Results", res)
1231         d.addCallback(_got_retrieve)
1232
1233         # check that the helper status page exists
1234         d.addCallback(lambda res:
1235                       self.GET("helper_status", followRedirect=True))
1236         def _got_helper_status(res):
1237             self.failUnless("Bytes Fetched:" in res)
1238             # touch a couple of files in the helper's working directory to
1239             # exercise more code paths
1240             workdir = os.path.join(self.getdir("client0"), "helper")
1241             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1242             f = open(incfile, "wb")
1243             f.write("small file")
1244             f.close()
1245             then = time.time() - 86400*3
1246             now = time.time()
1247             os.utime(incfile, (now, then))
1248             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1249             f = open(encfile, "wb")
1250             f.write("less small file")
1251             f.close()
1252             os.utime(encfile, (now, then))
1253         d.addCallback(_got_helper_status)
1254         # and that the json form exists
1255         d.addCallback(lambda res:
1256                       self.GET("helper_status?t=json", followRedirect=True))
1257         def _got_helper_status_json(res):
1258             data = simplejson.loads(res)
1259             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1260                                  1)
1261             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1262             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1263             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1264                                  10)
1265             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1266             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1267             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1268                                  15)
1269         d.addCallback(_got_helper_status_json)
1270
1271         # and check that client[3] (which uses a helper but does not run one
1272         # itself) doesn't explode when you ask for its status
1273         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1274         def _got_non_helper_status(res):
1275             self.failUnlessIn("Recent and Active Operations", res)
1276         d.addCallback(_got_non_helper_status)
1277
1278         # or for helper status with t=json
1279         d.addCallback(lambda res:
1280                       getPage(self.helper_webish_url + "helper_status?t=json"))
1281         def _got_non_helper_status_json(res):
1282             data = simplejson.loads(res)
1283             self.failUnlessEqual(data, {})
1284         d.addCallback(_got_non_helper_status_json)
1285
1286         # see if the statistics page exists
1287         d.addCallback(lambda res: self.GET("statistics"))
1288         def _got_stats(res):
1289             self.failUnlessIn("Operational Statistics", res)
1290             self.failUnlessIn("  'downloader.files_downloaded': 5,", res)
1291         d.addCallback(_got_stats)
1292         d.addCallback(lambda res: self.GET("statistics?t=json"))
1293         def _got_stats_json(res):
1294             data = simplejson.loads(res)
1295             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1296             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1297         d.addCallback(_got_stats_json)
1298
1299         # TODO: mangle the second segment of a file, to test errors that
1300         # occur after we've already sent some good data, which uses a
1301         # different error path.
1302
1303         # TODO: download a URI with a form
1304         # TODO: create a directory by using a form
1305         # TODO: upload by using a form on the directory page
1306         #    url = base + "somedir/subdir1/freeform_post!!upload"
1307         # TODO: delete a file by using a button on the directory page
1308
1309         return d
1310
1311     def _test_runner(self, res):
1312         # exercise some of the diagnostic tools in runner.py
1313
1314         # find a share
1315         for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1316             if "storage" not in dirpath:
1317                 continue
1318             if not filenames:
1319                 continue
1320             pieces = dirpath.split(os.sep)
1321             if (len(pieces) >= 4
1322                 and pieces[-4] == "storage"
1323                 and pieces[-3] == "shares"):
1324                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1325                 # are sharefiles here
1326                 filename = os.path.join(dirpath, filenames[0])
1327                 # peek at the magic to see if it is a chk share
1328                 magic = open(filename, "rb").read(4)
1329                 if magic == '\x00\x00\x00\x01':
1330                     break
1331         else:
1332             self.fail("unable to find any uri_extension files in %r"
1333                       % self.basedir)
1334         log.msg("test_system.SystemTest._test_runner using %r" % filename)
1335
1336         out,err = StringIO(), StringIO()
1337         rc = runner.runner(["debug", "dump-share", "--offsets",
1338                             unicode_to_argv(filename)],
1339                            stdout=out, stderr=err)
1340         output = out.getvalue()
1341         self.failUnlessEqual(rc, 0)
1342
1343         # we only upload a single file, so we can assert some things about
1344         # its size and shares.
1345         self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1346         self.failUnlessIn("size: %d\n" % len(self.data), output)
1347         self.failUnlessIn("num_segments: 1\n", output)
1348         # segment_size is always a multiple of needed_shares
1349         self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1350         self.failUnlessIn("total_shares: 10\n", output)
1351         # keys which are supposed to be present
1352         for key in ("size", "num_segments", "segment_size",
1353                     "needed_shares", "total_shares",
1354                     "codec_name", "codec_params", "tail_codec_params",
1355                     #"plaintext_hash", "plaintext_root_hash",
1356                     "crypttext_hash", "crypttext_root_hash",
1357                     "share_root_hash", "UEB_hash"):
1358             self.failUnlessIn("%s: " % key, output)
1359         self.failUnlessIn("  verify-cap: URI:CHK-Verifier:", output)
1360
1361         # now use its storage index to find the other shares using the
1362         # 'find-shares' tool
1363         sharedir, shnum = os.path.split(filename)
1364         storagedir, storage_index_s = os.path.split(sharedir)
1365         storage_index_s = str(storage_index_s)
1366         out,err = StringIO(), StringIO()
1367         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1368         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1369         rc = runner.runner(cmd, stdout=out, stderr=err)
1370         self.failUnlessEqual(rc, 0)
1371         out.seek(0)
1372         sharefiles = [sfn.strip() for sfn in out.readlines()]
1373         self.failUnlessEqual(len(sharefiles), 10)
1374
1375         # also exercise the 'catalog-shares' tool
1376         out,err = StringIO(), StringIO()
1377         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1378         cmd = ["debug", "catalog-shares"] + nodedirs
1379         rc = runner.runner(cmd, stdout=out, stderr=err)
1380         self.failUnlessEqual(rc, 0)
1381         out.seek(0)
1382         descriptions = [sfn.strip() for sfn in out.readlines()]
1383         self.failUnlessEqual(len(descriptions), 30)
1384         matching = [line
1385                     for line in descriptions
1386                     if line.startswith("CHK %s " % storage_index_s)]
1387         self.failUnlessEqual(len(matching), 10)
1388
1389     def _test_control(self, res):
1390         # exercise the remote-control-the-client foolscap interfaces in
1391         # allmydata.control (mostly used for performance tests)
1392         c0 = self.clients[0]
1393         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1394         control_furl = open(control_furl_file, "r").read().strip()
1395         # it doesn't really matter which Tub we use to connect to the client,
1396         # so let's just use our IntroducerNode's
1397         d = self.introducer.tub.getReference(control_furl)
1398         d.addCallback(self._test_control2, control_furl_file)
1399         return d
1400     def _test_control2(self, rref, filename):
1401         d = rref.callRemote("upload_from_file_to_uri",
1402                             filename.encode(get_filesystem_encoding()), convergence=None)
1403         downfile = os.path.join(self.basedir, "control.downfile").encode(get_filesystem_encoding())
1404         d.addCallback(lambda uri:
1405                       rref.callRemote("download_from_uri_to_file",
1406                                       uri, downfile))
1407         def _check(res):
1408             self.failUnlessEqual(res, downfile)
1409             data = open(downfile, "r").read()
1410             expected_data = open(filename, "r").read()
1411             self.failUnlessEqual(data, expected_data)
1412         d.addCallback(_check)
1413         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1414         if sys.platform in ("linux2", "linux3"):
1415             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1416         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1417         return d
1418
1419     def _test_cli(self, res):
1420         # run various CLI commands (in a thread, since they use blocking
1421         # network calls)
1422
1423         private_uri = self._private_node.get_uri()
1424         client0_basedir = self.getdir("client0")
1425
1426         nodeargs = [
1427             "--node-directory", client0_basedir,
1428             ]
1429
1430         d = defer.succeed(None)
1431
1432         # for compatibility with earlier versions, private/root_dir.cap is
1433         # supposed to be treated as an alias named "tahoe:". Start by making
1434         # sure that works, before we add other aliases.
1435
1436         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1437         f = open(root_file, "w")
1438         f.write(private_uri)
1439         f.close()
1440
1441         def run(ignored, verb, *args, **kwargs):
1442             stdin = kwargs.get("stdin", "")
1443             newargs = nodeargs + [verb] + list(args)
1444             return self._run_cli(newargs, stdin=stdin)
1445
1446         def _check_ls((out,err), expected_children, unexpected_children=[]):
1447             self.failUnlessEqual(err, "")
1448             for s in expected_children:
1449                 self.failUnless(s in out, (s,out))
1450             for s in unexpected_children:
1451                 self.failIf(s in out, (s,out))
1452
1453         def _check_ls_root((out,err)):
1454             self.failUnless("personal" in out)
1455             self.failUnless("s2-ro" in out)
1456             self.failUnless("s2-rw" in out)
1457             self.failUnlessEqual(err, "")
1458
1459         # this should reference private_uri
1460         d.addCallback(run, "ls")
1461         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1462
1463         d.addCallback(run, "list-aliases")
1464         def _check_aliases_1((out,err)):
1465             self.failUnlessEqual(err, "")
1466             self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1467         d.addCallback(_check_aliases_1)
1468
1469         # now that that's out of the way, remove root_dir.cap and work with
1470         # new files
1471         d.addCallback(lambda res: os.unlink(root_file))
1472         d.addCallback(run, "list-aliases")
1473         def _check_aliases_2((out,err)):
1474             self.failUnlessEqual(err, "")
1475             self.failUnlessEqual(out, "")
1476         d.addCallback(_check_aliases_2)
1477
1478         d.addCallback(run, "mkdir")
1479         def _got_dir( (out,err) ):
1480             self.failUnless(uri.from_string_dirnode(out.strip()))
1481             return out.strip()
1482         d.addCallback(_got_dir)
1483         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1484
1485         d.addCallback(run, "list-aliases")
1486         def _check_aliases_3((out,err)):
1487             self.failUnlessEqual(err, "")
1488             self.failUnless("tahoe: " in out)
1489         d.addCallback(_check_aliases_3)
1490
1491         def _check_empty_dir((out,err)):
1492             self.failUnlessEqual(out, "")
1493             self.failUnlessEqual(err, "")
1494         d.addCallback(run, "ls")
1495         d.addCallback(_check_empty_dir)
1496
1497         def _check_missing_dir((out,err)):
1498             # TODO: check that rc==2
1499             self.failUnlessEqual(out, "")
1500             self.failUnlessEqual(err, "No such file or directory\n")
1501         d.addCallback(run, "ls", "bogus")
1502         d.addCallback(_check_missing_dir)
1503
1504         files = []
1505         datas = []
1506         for i in range(10):
1507             fn = os.path.join(self.basedir, "file%d" % i)
1508             files.append(fn)
1509             data = "data to be uploaded: file%d\n" % i
1510             datas.append(data)
1511             open(fn,"wb").write(data)
1512
1513         def _check_stdout_against((out,err), filenum=None, data=None):
1514             self.failUnlessEqual(err, "")
1515             if filenum is not None:
1516                 self.failUnlessEqual(out, datas[filenum])
1517             if data is not None:
1518                 self.failUnlessEqual(out, data)
1519
1520         # test all both forms of put: from a file, and from stdin
1521         #  tahoe put bar FOO
1522         d.addCallback(run, "put", files[0], "tahoe-file0")
1523         def _put_out((out,err)):
1524             self.failUnless("URI:LIT:" in out, out)
1525             self.failUnless("201 Created" in err, err)
1526             uri0 = out.strip()
1527             return run(None, "get", uri0)
1528         d.addCallback(_put_out)
1529         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1530
1531         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1532         #  tahoe put bar tahoe:FOO
1533         d.addCallback(run, "put", files[2], "tahoe:file2")
1534         d.addCallback(run, "put", "--format=SDMF", files[3], "tahoe:file3")
1535         def _check_put_mutable((out,err)):
1536             self._mutable_file3_uri = out.strip()
1537         d.addCallback(_check_put_mutable)
1538         d.addCallback(run, "get", "tahoe:file3")
1539         d.addCallback(_check_stdout_against, 3)
1540
1541         #  tahoe put FOO
1542         STDIN_DATA = "This is the file to upload from stdin."
1543         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1544         #  tahoe put tahoe:FOO
1545         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1546                       stdin="Other file from stdin.")
1547
1548         d.addCallback(run, "ls")
1549         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1550                                   "tahoe-file-stdin", "from-stdin"])
1551         d.addCallback(run, "ls", "subdir")
1552         d.addCallback(_check_ls, ["tahoe-file1"])
1553
1554         # tahoe mkdir FOO
1555         d.addCallback(run, "mkdir", "subdir2")
1556         d.addCallback(run, "ls")
1557         # TODO: extract the URI, set an alias with it
1558         d.addCallback(_check_ls, ["subdir2"])
1559
1560         # tahoe get: (to stdin and to a file)
1561         d.addCallback(run, "get", "tahoe-file0")
1562         d.addCallback(_check_stdout_against, 0)
1563         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1564         d.addCallback(_check_stdout_against, 1)
1565         outfile0 = os.path.join(self.basedir, "outfile0")
1566         d.addCallback(run, "get", "file2", outfile0)
1567         def _check_outfile0((out,err)):
1568             data = open(outfile0,"rb").read()
1569             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1570         d.addCallback(_check_outfile0)
1571         outfile1 = os.path.join(self.basedir, "outfile0")
1572         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1573         def _check_outfile1((out,err)):
1574             data = open(outfile1,"rb").read()
1575             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1576         d.addCallback(_check_outfile1)
1577
1578         d.addCallback(run, "rm", "tahoe-file0")
1579         d.addCallback(run, "rm", "tahoe:file2")
1580         d.addCallback(run, "ls")
1581         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1582
1583         d.addCallback(run, "ls", "-l")
1584         def _check_ls_l((out,err)):
1585             lines = out.split("\n")
1586             for l in lines:
1587                 if "tahoe-file-stdin" in l:
1588                     self.failUnless(l.startswith("-r-- "), l)
1589                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1590                 if "file3" in l:
1591                     self.failUnless(l.startswith("-rw- "), l) # mutable
1592         d.addCallback(_check_ls_l)
1593
1594         d.addCallback(run, "ls", "--uri")
1595         def _check_ls_uri((out,err)):
1596             lines = out.split("\n")
1597             for l in lines:
1598                 if "file3" in l:
1599                     self.failUnless(self._mutable_file3_uri in l)
1600         d.addCallback(_check_ls_uri)
1601
1602         d.addCallback(run, "ls", "--readonly-uri")
1603         def _check_ls_rouri((out,err)):
1604             lines = out.split("\n")
1605             for l in lines:
1606                 if "file3" in l:
1607                     rw_uri = self._mutable_file3_uri
1608                     u = uri.from_string_mutable_filenode(rw_uri)
1609                     ro_uri = u.get_readonly().to_string()
1610                     self.failUnless(ro_uri in l)
1611         d.addCallback(_check_ls_rouri)
1612
1613
1614         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1615         d.addCallback(run, "ls")
1616         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1617
1618         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1619         d.addCallback(run, "ls")
1620         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1621
1622         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1623         d.addCallback(run, "ls")
1624         d.addCallback(_check_ls, ["file3", "file3-copy"])
1625         d.addCallback(run, "get", "tahoe:file3-copy")
1626         d.addCallback(_check_stdout_against, 3)
1627
1628         # copy from disk into tahoe
1629         d.addCallback(run, "cp", files[4], "tahoe:file4")
1630         d.addCallback(run, "ls")
1631         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1632         d.addCallback(run, "get", "tahoe:file4")
1633         d.addCallback(_check_stdout_against, 4)
1634
1635         # copy from tahoe into disk
1636         target_filename = os.path.join(self.basedir, "file-out")
1637         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1638         def _check_cp_out((out,err)):
1639             self.failUnless(os.path.exists(target_filename))
1640             got = open(target_filename,"rb").read()
1641             self.failUnlessEqual(got, datas[4])
1642         d.addCallback(_check_cp_out)
1643
1644         # copy from disk to disk (silly case)
1645         target2_filename = os.path.join(self.basedir, "file-out-copy")
1646         d.addCallback(run, "cp", target_filename, target2_filename)
1647         def _check_cp_out2((out,err)):
1648             self.failUnless(os.path.exists(target2_filename))
1649             got = open(target2_filename,"rb").read()
1650             self.failUnlessEqual(got, datas[4])
1651         d.addCallback(_check_cp_out2)
1652
1653         # copy from tahoe into disk, overwriting an existing file
1654         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1655         def _check_cp_out3((out,err)):
1656             self.failUnless(os.path.exists(target_filename))
1657             got = open(target_filename,"rb").read()
1658             self.failUnlessEqual(got, datas[3])
1659         d.addCallback(_check_cp_out3)
1660
1661         # copy from disk into tahoe, overwriting an existing immutable file
1662         d.addCallback(run, "cp", files[5], "tahoe:file4")
1663         d.addCallback(run, "ls")
1664         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1665         d.addCallback(run, "get", "tahoe:file4")
1666         d.addCallback(_check_stdout_against, 5)
1667
1668         # copy from disk into tahoe, overwriting an existing mutable file
1669         d.addCallback(run, "cp", files[5], "tahoe:file3")
1670         d.addCallback(run, "ls")
1671         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1672         d.addCallback(run, "get", "tahoe:file3")
1673         d.addCallback(_check_stdout_against, 5)
1674
1675         # recursive copy: setup
1676         dn = os.path.join(self.basedir, "dir1")
1677         os.makedirs(dn)
1678         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1679         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1680         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1681         sdn2 = os.path.join(dn, "subdir2")
1682         os.makedirs(sdn2)
1683         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1684         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1685
1686         # from disk into tahoe
1687         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1688         d.addCallback(run, "ls")
1689         d.addCallback(_check_ls, ["dir1"])
1690         d.addCallback(run, "ls", "dir1")
1691         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1692                       ["rfile4", "rfile5"])
1693         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1694         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1695                       ["rfile1", "rfile2", "rfile3"])
1696         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1697         d.addCallback(_check_stdout_against, data="rfile4")
1698
1699         # and back out again
1700         dn_copy = os.path.join(self.basedir, "dir1-copy")
1701         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1702         def _check_cp_r_out((out,err)):
1703             def _cmp(name):
1704                 old = open(os.path.join(dn, name), "rb").read()
1705                 newfn = os.path.join(dn_copy, name)
1706                 self.failUnless(os.path.exists(newfn))
1707                 new = open(newfn, "rb").read()
1708                 self.failUnlessEqual(old, new)
1709             _cmp("rfile1")
1710             _cmp("rfile2")
1711             _cmp("rfile3")
1712             _cmp(os.path.join("subdir2", "rfile4"))
1713             _cmp(os.path.join("subdir2", "rfile5"))
1714         d.addCallback(_check_cp_r_out)
1715
1716         # and copy it a second time, which ought to overwrite the same files
1717         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1718
1719         # and again, only writing filecaps
1720         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1721         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1722         def _check_capsonly((out,err)):
1723             # these should all be LITs
1724             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1725             y = uri.from_string_filenode(x)
1726             self.failUnlessEqual(y.data, "rfile4")
1727         d.addCallback(_check_capsonly)
1728
1729         # and tahoe-to-tahoe
1730         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1731         d.addCallback(run, "ls")
1732         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1733         d.addCallback(run, "ls", "dir1-copy")
1734         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1735                       ["rfile4", "rfile5"])
1736         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1737         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1738                       ["rfile1", "rfile2", "rfile3"])
1739         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1740         d.addCallback(_check_stdout_against, data="rfile4")
1741
1742         # and copy it a second time, which ought to overwrite the same files
1743         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1744
1745         # tahoe_ls doesn't currently handle the error correctly: it tries to
1746         # JSON-parse a traceback.
1747 ##         def _ls_missing(res):
1748 ##             argv = nodeargs + ["ls", "bogus"]
1749 ##             return self._run_cli(argv)
1750 ##         d.addCallback(_ls_missing)
1751 ##         def _check_ls_missing((out,err)):
1752 ##             print "OUT", out
1753 ##             print "ERR", err
1754 ##             self.failUnlessEqual(err, "")
1755 ##         d.addCallback(_check_ls_missing)
1756
1757         return d
1758
1759     def test_filesystem_with_cli_in_subprocess(self):
1760         # We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.
1761
1762         self.basedir = "system/SystemTest/test_filesystem_with_cli_in_subprocess"
1763         d = self.set_up_nodes()
1764         def _new_happy_semantics(ign):
1765             for c in self.clients:
1766                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1767         d.addCallback(_new_happy_semantics)
1768
1769         def _run_in_subprocess(ignored, verb, *args, **kwargs):
1770             stdin = kwargs.get("stdin")
1771             env = kwargs.get("env")
1772             newargs = ["--node-directory", self.getdir("client0"), verb] + list(args)
1773             return self.run_bintahoe(newargs, stdin=stdin, env=env)
1774
1775         def _check_succeeded(res, check_stderr=True):
1776             out, err, rc_or_sig = res
1777             self.failUnlessEqual(rc_or_sig, 0, str(res))
1778             if check_stderr:
1779                 self.failUnlessEqual(err, "")
1780
1781         d.addCallback(_run_in_subprocess, "create-alias", "newalias")
1782         d.addCallback(_check_succeeded)
1783
1784         STDIN_DATA = "This is the file to upload from stdin."
1785         d.addCallback(_run_in_subprocess, "put", "-", "newalias:tahoe-file", stdin=STDIN_DATA)
1786         d.addCallback(_check_succeeded, check_stderr=False)
1787
1788         def _mv_with_http_proxy(ign):
1789             env = os.environ
1790             env['http_proxy'] = env['HTTP_PROXY'] = "http://127.0.0.0:12345"  # invalid address
1791             return _run_in_subprocess(None, "mv", "newalias:tahoe-file", "newalias:tahoe-moved", env=env)
1792         d.addCallback(_mv_with_http_proxy)
1793         d.addCallback(_check_succeeded)
1794
1795         d.addCallback(_run_in_subprocess, "ls", "newalias:")
1796         def _check_ls(res):
1797             out, err, rc_or_sig = res
1798             self.failUnlessEqual(rc_or_sig, 0, str(res))
1799             self.failUnlessEqual(err, "", str(res))
1800             self.failUnlessIn("tahoe-moved", out)
1801             self.failIfIn("tahoe-file", out)
1802         d.addCallback(_check_ls)
1803         return d
1804
1805     def test_debug_trial(self):
1806         def _check_for_line(lines, result, test):
1807             for l in lines:
1808                 if result in l and test in l:
1809                     return
1810             self.fail("output (prefixed with '##') does not have a line containing both %r and %r:\n## %s"
1811                       % (result, test, "\n## ".join(lines)))
1812
1813         def _check_for_outcome(lines, out, outcome):
1814             self.failUnlessIn(outcome, out, "output (prefixed with '##') does not contain %r:\n## %s"
1815                                             % (outcome, "\n## ".join(lines)))
1816
1817         d = self.run_bintahoe(['debug', 'trial', '--reporter=verbose',
1818                                'allmydata.test.trialtest'])
1819         def _check_failure( (out, err, rc) ):
1820             self.failUnlessEqual(rc, 1)
1821             lines = out.split('\n')
1822             _check_for_line(lines, "[SKIPPED]", "test_skip")
1823             _check_for_line(lines, "[TODO]",    "test_todo")
1824             _check_for_line(lines, "[FAIL]",    "test_fail")
1825             _check_for_line(lines, "[ERROR]",   "test_deferred_error")
1826             _check_for_line(lines, "[ERROR]",   "test_error")
1827             _check_for_outcome(lines, out, "FAILED")
1828         d.addCallback(_check_failure)
1829
1830         # the --quiet argument regression-tests a problem in finding which arguments to pass to trial
1831         d.addCallback(lambda ign: self.run_bintahoe(['--quiet', 'debug', 'trial', '--reporter=verbose',
1832                                                      'allmydata.test.trialtest.Success']))
1833         def _check_success( (out, err, rc) ):
1834             self.failUnlessEqual(rc, 0)
1835             lines = out.split('\n')
1836             _check_for_line(lines, "[SKIPPED]", "test_skip")
1837             _check_for_line(lines, "[TODO]",    "test_todo")
1838             _check_for_outcome(lines, out, "PASSED")
1839         d.addCallback(_check_success)
1840         return d
1841
1842     def _run_cli(self, argv, stdin=""):
1843         #print "CLI:", argv
1844         stdout, stderr = StringIO(), StringIO()
1845         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1846                                   stdin=StringIO(stdin),
1847                                   stdout=stdout, stderr=stderr)
1848         def _done(res):
1849             return stdout.getvalue(), stderr.getvalue()
1850         d.addCallback(_done)
1851         return d
1852
1853     def _test_checker(self, res):
1854         ut = upload.Data("too big to be literal" * 200, convergence=None)
1855         d = self._personal_node.add_file(u"big file", ut)
1856
1857         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1858         def _check_dirnode_results(r):
1859             self.failUnless(r.is_healthy())
1860         d.addCallback(_check_dirnode_results)
1861         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1862         d.addCallback(_check_dirnode_results)
1863
1864         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1865         def _got_chk_filenode(n):
1866             self.failUnless(isinstance(n, ImmutableFileNode))
1867             d = n.check(Monitor())
1868             def _check_filenode_results(r):
1869                 self.failUnless(r.is_healthy())
1870             d.addCallback(_check_filenode_results)
1871             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1872             d.addCallback(_check_filenode_results)
1873             return d
1874         d.addCallback(_got_chk_filenode)
1875
1876         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1877         def _got_lit_filenode(n):
1878             self.failUnless(isinstance(n, LiteralFileNode))
1879             d = n.check(Monitor())
1880             def _check_lit_filenode_results(r):
1881                 self.failUnlessEqual(r, None)
1882             d.addCallback(_check_lit_filenode_results)
1883             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1884             d.addCallback(_check_lit_filenode_results)
1885             return d
1886         d.addCallback(_got_lit_filenode)
1887         return d
1888
1889
1890 class Connections(SystemTestMixin, unittest.TestCase):
1891     def test_rref(self):
1892         if NormalizedVersion(foolscap.__version__) < NormalizedVersion('0.6.4'):
1893             raise unittest.SkipTest("skipped due to http://foolscap.lothar.com/trac/ticket/196 "
1894                                     "(which does not affect normal usage of Tahoe-LAFS)")
1895
1896         self.basedir = "system/Connections/rref"
1897         d = self.set_up_nodes(2)
1898         def _start(ign):
1899             self.c0 = self.clients[0]
1900             nonclients = [s for s in self.c0.storage_broker.get_connected_servers()
1901                           if s.get_serverid() != self.c0.nodeid]
1902             self.failUnlessEqual(len(nonclients), 1)
1903
1904             self.s1 = nonclients[0]  # s1 is the server, not c0
1905             self.s1_rref = self.s1.get_rref()
1906             self.failIfEqual(self.s1_rref, None)
1907             self.failUnless(self.s1.is_connected())
1908         d.addCallback(_start)
1909
1910         # now shut down the server
1911         d.addCallback(lambda ign: self.clients[1].disownServiceParent())
1912         # and wait for the client to notice
1913         def _poll():
1914             return len(self.c0.storage_broker.get_connected_servers()) < 2
1915         d.addCallback(lambda ign: self.poll(_poll))
1916
1917         def _down(ign):
1918             self.failIf(self.s1.is_connected())
1919             rref = self.s1.get_rref()
1920             self.failUnless(rref)
1921             self.failUnlessIdentical(rref, self.s1_rref)
1922         d.addCallback(_down)
1923         return d