]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
f69c5f74127ac798f60a436946034437d85fc6e5
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7
8 import allmydata
9 from allmydata import uri
10 from allmydata.storage.mutable import MutableShareFile
11 from allmydata.storage.server import si_a2b
12 from allmydata.immutable import offloaded, upload
13 from allmydata.immutable.literal import LiteralFileNode
14 from allmydata.immutable.filenode import ImmutableFileNode
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.util.verlib import NormalizedVersion
18 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding
19 from allmydata.util.fileutil import abspath_expanduser_unicode
20 from allmydata.util.consumer import MemoryConsumer, download_to_data
21 from allmydata.scripts import runner
22 from allmydata.interfaces import IDirectoryNode, IFileNode, \
23      NoSuchChildError, NoSharesError
24 from allmydata.monitor import Monitor
25 from allmydata.mutable.common import NotWriteableError
26 from allmydata.mutable import layout as mutable_layout
27 from allmydata.mutable.publish import MutableData
28
29 import foolscap
30 from foolscap.api import DeadReferenceError, fireEventually
31 from twisted.python.failure import Failure
32 from twisted.web.client import getPage
33 from twisted.web.error import Error
34
35 from allmydata.test.common import SystemTestMixin
36
37 # TODO: move this to common or common_util
38 from allmydata.test.test_runner import RunBinTahoeMixin
39
40 LARGE_DATA = """
41 This is some data to publish to the remote grid.., which needs to be large
42 enough to not fit inside a LIT uri.
43 """
44
45 class CountingDataUploadable(upload.Data):
46     bytes_read = 0
47     interrupt_after = None
48     interrupt_after_d = None
49
50     def read(self, length):
51         self.bytes_read += length
52         if self.interrupt_after is not None:
53             if self.bytes_read > self.interrupt_after:
54                 self.interrupt_after = None
55                 self.interrupt_after_d.callback(self)
56         return upload.Data.read(self, length)
57
58 class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
59     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
60
61     def test_connections(self):
62         self.basedir = "system/SystemTest/test_connections"
63         d = self.set_up_nodes()
64         self.extra_node = None
65         d.addCallback(lambda res: self.add_extra_node(self.numclients))
66         def _check(extra_node):
67             self.extra_node = extra_node
68             for c in self.clients:
69                 all_peerids = c.get_storage_broker().get_all_serverids()
70                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
71                 sb = c.storage_broker
72                 permuted_peers = sb.get_servers_for_psi("a")
73                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
74
75         d.addCallback(_check)
76         def _shutdown_extra_node(res):
77             if self.extra_node:
78                 return self.extra_node.stopService()
79             return res
80         d.addBoth(_shutdown_extra_node)
81         return d
82     # test_connections is subsumed by test_upload_and_download, and takes
83     # quite a while to run on a slow machine (because of all the TLS
84     # connections that must be established). If we ever rework the introducer
85     # code to such an extent that we're not sure if it works anymore, we can
86     # reinstate this test until it does.
87     del test_connections
88
89     def test_upload_and_download_random_key(self):
90         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
91         return self._test_upload_and_download(convergence=None)
92
93     def test_upload_and_download_convergent(self):
94         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
95         return self._test_upload_and_download(convergence="some convergence string")
96
97     def _test_upload_and_download(self, convergence):
98         # we use 4000 bytes of data, which will result in about 400k written
99         # to disk among all our simulated nodes
100         DATA = "Some data to upload\n" * 200
101         d = self.set_up_nodes()
102         def _check_connections(res):
103             for c in self.clients:
104                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
105                 all_peerids = c.get_storage_broker().get_all_serverids()
106                 self.failUnlessEqual(len(all_peerids), self.numclients)
107                 sb = c.storage_broker
108                 permuted_peers = sb.get_servers_for_psi("a")
109                 self.failUnlessEqual(len(permuted_peers), self.numclients)
110         d.addCallback(_check_connections)
111
112         def _do_upload(res):
113             log.msg("UPLOADING")
114             u = self.clients[0].getServiceNamed("uploader")
115             self.uploader = u
116             # we crank the max segsize down to 1024b for the duration of this
117             # test, so we can exercise multiple segments. It is important
118             # that this is not a multiple of the segment size, so that the
119             # tail segment is not the same length as the others. This actualy
120             # gets rounded up to 1025 to be a multiple of the number of
121             # required shares (since we use 25 out of 100 FEC).
122             up = upload.Data(DATA, convergence=convergence)
123             up.max_segment_size = 1024
124             d1 = u.upload(up)
125             return d1
126         d.addCallback(_do_upload)
127         def _upload_done(results):
128             theuri = results.get_uri()
129             log.msg("upload finished: uri is %s" % (theuri,))
130             self.uri = theuri
131             assert isinstance(self.uri, str), self.uri
132             self.cap = uri.from_string(self.uri)
133             self.n = self.clients[1].create_node_from_uri(self.uri)
134         d.addCallback(_upload_done)
135
136         def _upload_again(res):
137             # Upload again. If using convergent encryption then this ought to be
138             # short-circuited, however with the way we currently generate URIs
139             # (i.e. because they include the roothash), we have to do all of the
140             # encoding work, and only get to save on the upload part.
141             log.msg("UPLOADING AGAIN")
142             up = upload.Data(DATA, convergence=convergence)
143             up.max_segment_size = 1024
144             return self.uploader.upload(up)
145         d.addCallback(_upload_again)
146
147         def _download_to_data(res):
148             log.msg("DOWNLOADING")
149             return download_to_data(self.n)
150         d.addCallback(_download_to_data)
151         def _download_to_data_done(data):
152             log.msg("download finished")
153             self.failUnlessEqual(data, DATA)
154         d.addCallback(_download_to_data_done)
155
156         def _test_read(res):
157             n = self.clients[1].create_node_from_uri(self.uri)
158             d = download_to_data(n)
159             def _read_done(data):
160                 self.failUnlessEqual(data, DATA)
161             d.addCallback(_read_done)
162             d.addCallback(lambda ign:
163                           n.read(MemoryConsumer(), offset=1, size=4))
164             def _read_portion_done(mc):
165                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
166             d.addCallback(_read_portion_done)
167             d.addCallback(lambda ign:
168                           n.read(MemoryConsumer(), offset=2, size=None))
169             def _read_tail_done(mc):
170                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
171             d.addCallback(_read_tail_done)
172             d.addCallback(lambda ign:
173                           n.read(MemoryConsumer(), size=len(DATA)+1000))
174             def _read_too_much(mc):
175                 self.failUnlessEqual("".join(mc.chunks), DATA)
176             d.addCallback(_read_too_much)
177
178             return d
179         d.addCallback(_test_read)
180
181         def _test_bad_read(res):
182             bad_u = uri.from_string_filenode(self.uri)
183             bad_u.key = self.flip_bit(bad_u.key)
184             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
185             # this should cause an error during download
186
187             d = self.shouldFail2(NoSharesError, "'download bad node'",
188                                  None,
189                                  bad_n.read, MemoryConsumer(), offset=2)
190             return d
191         d.addCallback(_test_bad_read)
192
193         def _download_nonexistent_uri(res):
194             baduri = self.mangle_uri(self.uri)
195             badnode = self.clients[1].create_node_from_uri(baduri)
196             log.msg("about to download non-existent URI", level=log.UNUSUAL,
197                     facility="tahoe.tests")
198             d1 = download_to_data(badnode)
199             def _baduri_should_fail(res):
200                 log.msg("finished downloading non-existent URI",
201                         level=log.UNUSUAL, facility="tahoe.tests")
202                 self.failUnless(isinstance(res, Failure))
203                 self.failUnless(res.check(NoSharesError),
204                                 "expected NoSharesError, got %s" % res)
205             d1.addBoth(_baduri_should_fail)
206             return d1
207         d.addCallback(_download_nonexistent_uri)
208
209         # add a new node, which doesn't accept shares, and only uses the
210         # helper for upload.
211         d.addCallback(lambda res: self.add_extra_node(self.numclients,
212                                                       self.helper_furl,
213                                                       add_to_sparent=True))
214         def _added(extra_node):
215             self.extra_node = extra_node
216             self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
217         d.addCallback(_added)
218
219         def _has_helper():
220             uploader = self.extra_node.getServiceNamed("uploader")
221             furl, connected = uploader.get_helper_info()
222             return connected
223         d.addCallback(lambda ign: self.poll(_has_helper))
224
225         HELPER_DATA = "Data that needs help to upload" * 1000
226         def _upload_with_helper(res):
227             u = upload.Data(HELPER_DATA, convergence=convergence)
228             d = self.extra_node.upload(u)
229             def _uploaded(results):
230                 n = self.clients[1].create_node_from_uri(results.get_uri())
231                 return download_to_data(n)
232             d.addCallback(_uploaded)
233             def _check(newdata):
234                 self.failUnlessEqual(newdata, HELPER_DATA)
235             d.addCallback(_check)
236             return d
237         d.addCallback(_upload_with_helper)
238
239         def _upload_duplicate_with_helper(res):
240             u = upload.Data(HELPER_DATA, convergence=convergence)
241             u.debug_stash_RemoteEncryptedUploadable = True
242             d = self.extra_node.upload(u)
243             def _uploaded(results):
244                 n = self.clients[1].create_node_from_uri(results.get_uri())
245                 return download_to_data(n)
246             d.addCallback(_uploaded)
247             def _check(newdata):
248                 self.failUnlessEqual(newdata, HELPER_DATA)
249                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
250                             "uploadable started uploading, should have been avoided")
251             d.addCallback(_check)
252             return d
253         if convergence is not None:
254             d.addCallback(_upload_duplicate_with_helper)
255
256         d.addCallback(fireEventually)
257
258         def _upload_resumable(res):
259             DATA = "Data that needs help to upload and gets interrupted" * 1000
260             u1 = CountingDataUploadable(DATA, convergence=convergence)
261             u2 = CountingDataUploadable(DATA, convergence=convergence)
262
263             # we interrupt the connection after about 5kB by shutting down
264             # the helper, then restarting it.
265             u1.interrupt_after = 5000
266             u1.interrupt_after_d = defer.Deferred()
267             bounced_d = defer.Deferred()
268             def _do_bounce(res):
269                 d = self.bounce_client(0)
270                 d.addBoth(bounced_d.callback)
271             u1.interrupt_after_d.addCallback(_do_bounce)
272
273             # sneak into the helper and reduce its chunk size, so that our
274             # debug_interrupt will sever the connection on about the fifth
275             # chunk fetched. This makes sure that we've started to write the
276             # new shares before we abandon them, which exercises the
277             # abort/delete-partial-share code. TODO: find a cleaner way to do
278             # this. I know that this will affect later uses of the helper in
279             # this same test run, but I'm not currently worried about it.
280             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
281
282             upload_d = self.extra_node.upload(u1)
283             # The upload will start, and bounce_client() will be called after
284             # about 5kB. bounced_d will fire after bounce_client() finishes
285             # shutting down and restarting the node.
286             d = bounced_d
287             def _bounced(ign):
288                 # By this point, the upload should have failed because of the
289                 # interruption. upload_d will fire in a moment
290                 def _should_not_finish(res):
291                     self.fail("interrupted upload should have failed, not"
292                               " finished with result %s" % (res,))
293                 def _interrupted(f):
294                     f.trap(DeadReferenceError)
295                     # make sure we actually interrupted it before finishing
296                     # the file
297                     self.failUnless(u1.bytes_read < len(DATA),
298                                     "read %d out of %d total" %
299                                     (u1.bytes_read, len(DATA)))
300                 upload_d.addCallbacks(_should_not_finish, _interrupted)
301                 return upload_d
302             d.addCallback(_bounced)
303
304             def _disconnected(res):
305                 # check to make sure the storage servers aren't still hanging
306                 # on to the partial share: their incoming/ directories should
307                 # now be empty.
308                 log.msg("disconnected", level=log.NOISY,
309                         facility="tahoe.test.test_system")
310                 for i in range(self.numclients):
311                     incdir = os.path.join(self.getdir("client%d" % i),
312                                           "storage", "shares", "incoming")
313                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
314             d.addCallback(_disconnected)
315
316             d.addCallback(lambda res:
317                           log.msg("wait_for_helper", level=log.NOISY,
318                                   facility="tahoe.test.test_system"))
319             # then we need to wait for the extra node to reestablish its
320             # connection to the helper.
321             d.addCallback(lambda ign: self.poll(_has_helper))
322
323             d.addCallback(lambda res:
324                           log.msg("uploading again", level=log.NOISY,
325                                   facility="tahoe.test.test_system"))
326             d.addCallback(lambda res: self.extra_node.upload(u2))
327
328             def _uploaded(results):
329                 cap = results.get_uri()
330                 log.msg("Second upload complete", level=log.NOISY,
331                         facility="tahoe.test.test_system")
332
333                 # this is really bytes received rather than sent, but it's
334                 # convenient and basically measures the same thing
335                 bytes_sent = results.get_ciphertext_fetched()
336                 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
337
338                 # We currently don't support resumption of upload if the data is
339                 # encrypted with a random key.  (Because that would require us
340                 # to store the key locally and re-use it on the next upload of
341                 # this file, which isn't a bad thing to do, but we currently
342                 # don't do it.)
343                 if convergence is not None:
344                     # Make sure we did not have to read the whole file the
345                     # second time around .
346                     self.failUnless(bytes_sent < len(DATA),
347                                     "resumption didn't save us any work:"
348                                     " read %r bytes out of %r total" %
349                                     (bytes_sent, len(DATA)))
350                 else:
351                     # Make sure we did have to read the whole file the second
352                     # time around -- because the one that we partially uploaded
353                     # earlier was encrypted with a different random key.
354                     self.failIf(bytes_sent < len(DATA),
355                                 "resumption saved us some work even though we were using random keys:"
356                                 " read %r bytes out of %r total" %
357                                 (bytes_sent, len(DATA)))
358                 n = self.clients[1].create_node_from_uri(cap)
359                 return download_to_data(n)
360             d.addCallback(_uploaded)
361
362             def _check(newdata):
363                 self.failUnlessEqual(newdata, DATA)
364                 # If using convergent encryption, then also check that the
365                 # helper has removed the temp file from its directories.
366                 if convergence is not None:
367                     basedir = os.path.join(self.getdir("client0"), "helper")
368                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
369                     self.failUnlessEqual(files, [])
370                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
371                     self.failUnlessEqual(files, [])
372             d.addCallback(_check)
373             return d
374         d.addCallback(_upload_resumable)
375
376         def _grab_stats(ignored):
377             # the StatsProvider doesn't normally publish a FURL:
378             # instead it passes a live reference to the StatsGatherer
379             # (if and when it connects). To exercise the remote stats
380             # interface, we manually publish client0's StatsProvider
381             # and use client1 to query it.
382             sp = self.clients[0].stats_provider
383             sp_furl = self.clients[0].tub.registerReference(sp)
384             d = self.clients[1].tub.getReference(sp_furl)
385             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
386             def _got_stats(stats):
387                 #print "STATS"
388                 #from pprint import pprint
389                 #pprint(stats)
390                 s = stats["stats"]
391                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
392                 c = stats["counters"]
393                 self.failUnless("storage_server.allocate" in c)
394             d.addCallback(_got_stats)
395             return d
396         d.addCallback(_grab_stats)
397
398         return d
399
400     def _find_all_shares(self, basedir):
401         shares = []
402         for (dirpath, dirnames, filenames) in os.walk(basedir):
403             if "storage" not in dirpath:
404                 continue
405             if not filenames:
406                 continue
407             pieces = dirpath.split(os.sep)
408             if (len(pieces) >= 5
409                 and pieces[-4] == "storage"
410                 and pieces[-3] == "shares"):
411                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
412                 # are sharefiles here
413                 assert pieces[-5].startswith("client")
414                 client_num = int(pieces[-5][-1])
415                 storage_index_s = pieces[-1]
416                 storage_index = si_a2b(storage_index_s)
417                 for sharename in filenames:
418                     shnum = int(sharename)
419                     filename = os.path.join(dirpath, sharename)
420                     data = (client_num, storage_index, filename, shnum)
421                     shares.append(data)
422         if not shares:
423             self.fail("unable to find any share files in %s" % basedir)
424         return shares
425
426     def _corrupt_mutable_share(self, filename, which):
427         msf = MutableShareFile(filename)
428         datav = msf.readv([ (0, 1000000) ])
429         final_share = datav[0]
430         assert len(final_share) < 1000000 # ought to be truncated
431         pieces = mutable_layout.unpack_share(final_share)
432         (seqnum, root_hash, IV, k, N, segsize, datalen,
433          verification_key, signature, share_hash_chain, block_hash_tree,
434          share_data, enc_privkey) = pieces
435
436         if which == "seqnum":
437             seqnum = seqnum + 15
438         elif which == "R":
439             root_hash = self.flip_bit(root_hash)
440         elif which == "IV":
441             IV = self.flip_bit(IV)
442         elif which == "segsize":
443             segsize = segsize + 15
444         elif which == "pubkey":
445             verification_key = self.flip_bit(verification_key)
446         elif which == "signature":
447             signature = self.flip_bit(signature)
448         elif which == "share_hash_chain":
449             nodenum = share_hash_chain.keys()[0]
450             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
451         elif which == "block_hash_tree":
452             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
453         elif which == "share_data":
454             share_data = self.flip_bit(share_data)
455         elif which == "encprivkey":
456             enc_privkey = self.flip_bit(enc_privkey)
457
458         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
459                                             segsize, datalen)
460         final_share = mutable_layout.pack_share(prefix,
461                                                 verification_key,
462                                                 signature,
463                                                 share_hash_chain,
464                                                 block_hash_tree,
465                                                 share_data,
466                                                 enc_privkey)
467         msf.writev( [(0, final_share)], None)
468
469
470     def test_mutable(self):
471         self.basedir = "system/SystemTest/test_mutable"
472         DATA = "initial contents go here."  # 25 bytes % 3 != 0
473         DATA_uploadable = MutableData(DATA)
474         NEWDATA = "new contents yay"
475         NEWDATA_uploadable = MutableData(NEWDATA)
476         NEWERDATA = "this is getting old"
477         NEWERDATA_uploadable = MutableData(NEWERDATA)
478
479         d = self.set_up_nodes(use_key_generator=True)
480
481         def _create_mutable(res):
482             c = self.clients[0]
483             log.msg("starting create_mutable_file")
484             d1 = c.create_mutable_file(DATA_uploadable)
485             def _done(res):
486                 log.msg("DONE: %s" % (res,))
487                 self._mutable_node_1 = res
488             d1.addCallback(_done)
489             return d1
490         d.addCallback(_create_mutable)
491
492         def _test_debug(res):
493             # find a share. It is important to run this while there is only
494             # one slot in the grid.
495             shares = self._find_all_shares(self.basedir)
496             (client_num, storage_index, filename, shnum) = shares[0]
497             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
498                     % filename)
499             log.msg(" for clients[%d]" % client_num)
500
501             out,err = StringIO(), StringIO()
502             rc = runner.runner(["debug", "dump-share", "--offsets",
503                                 filename],
504                                stdout=out, stderr=err)
505             output = out.getvalue()
506             self.failUnlessEqual(rc, 0)
507             try:
508                 self.failUnless("Mutable slot found:\n" in output)
509                 self.failUnless("share_type: SDMF\n" in output)
510                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
511                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
512                 self.failUnless(" num_extra_leases: 0\n" in output)
513                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
514                                 in output)
515                 self.failUnless(" SDMF contents:\n" in output)
516                 self.failUnless("  seqnum: 1\n" in output)
517                 self.failUnless("  required_shares: 3\n" in output)
518                 self.failUnless("  total_shares: 10\n" in output)
519                 self.failUnless("  segsize: 27\n" in output, (output, filename))
520                 self.failUnless("  datalen: 25\n" in output)
521                 # the exact share_hash_chain nodes depends upon the sharenum,
522                 # and is more of a hassle to compute than I want to deal with
523                 # now
524                 self.failUnless("  share_hash_chain: " in output)
525                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
526                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
527                             base32.b2a(storage_index))
528                 self.failUnless(expected in output)
529             except unittest.FailTest:
530                 print
531                 print "dump-share output was:"
532                 print output
533                 raise
534         d.addCallback(_test_debug)
535
536         # test retrieval
537
538         # first, let's see if we can use the existing node to retrieve the
539         # contents. This allows it to use the cached pubkey and maybe the
540         # latest-known sharemap.
541
542         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
543         def _check_download_1(res):
544             self.failUnlessEqual(res, DATA)
545             # now we see if we can retrieve the data from a new node,
546             # constructed using the URI of the original one. We do this test
547             # on the same client that uploaded the data.
548             uri = self._mutable_node_1.get_uri()
549             log.msg("starting retrieve1")
550             newnode = self.clients[0].create_node_from_uri(uri)
551             newnode_2 = self.clients[0].create_node_from_uri(uri)
552             self.failUnlessIdentical(newnode, newnode_2)
553             return newnode.download_best_version()
554         d.addCallback(_check_download_1)
555
556         def _check_download_2(res):
557             self.failUnlessEqual(res, DATA)
558             # same thing, but with a different client
559             uri = self._mutable_node_1.get_uri()
560             newnode = self.clients[1].create_node_from_uri(uri)
561             log.msg("starting retrieve2")
562             d1 = newnode.download_best_version()
563             d1.addCallback(lambda res: (res, newnode))
564             return d1
565         d.addCallback(_check_download_2)
566
567         def _check_download_3((res, newnode)):
568             self.failUnlessEqual(res, DATA)
569             # replace the data
570             log.msg("starting replace1")
571             d1 = newnode.overwrite(NEWDATA_uploadable)
572             d1.addCallback(lambda res: newnode.download_best_version())
573             return d1
574         d.addCallback(_check_download_3)
575
576         def _check_download_4(res):
577             self.failUnlessEqual(res, NEWDATA)
578             # now create an even newer node and replace the data on it. This
579             # new node has never been used for download before.
580             uri = self._mutable_node_1.get_uri()
581             newnode1 = self.clients[2].create_node_from_uri(uri)
582             newnode2 = self.clients[3].create_node_from_uri(uri)
583             self._newnode3 = self.clients[3].create_node_from_uri(uri)
584             log.msg("starting replace2")
585             d1 = newnode1.overwrite(NEWERDATA_uploadable)
586             d1.addCallback(lambda res: newnode2.download_best_version())
587             return d1
588         d.addCallback(_check_download_4)
589
590         def _check_download_5(res):
591             log.msg("finished replace2")
592             self.failUnlessEqual(res, NEWERDATA)
593         d.addCallback(_check_download_5)
594
595         def _corrupt_shares(res):
596             # run around and flip bits in all but k of the shares, to test
597             # the hash checks
598             shares = self._find_all_shares(self.basedir)
599             ## sort by share number
600             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
601             where = dict([ (shnum, filename)
602                            for (client_num, storage_index, filename, shnum)
603                            in shares ])
604             assert len(where) == 10 # this test is designed for 3-of-10
605             for shnum, filename in where.items():
606                 # shares 7,8,9 are left alone. read will check
607                 # (share_hash_chain, block_hash_tree, share_data). New
608                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
609                 # segsize, signature).
610                 if shnum == 0:
611                     # read: this will trigger "pubkey doesn't match
612                     # fingerprint".
613                     self._corrupt_mutable_share(filename, "pubkey")
614                     self._corrupt_mutable_share(filename, "encprivkey")
615                 elif shnum == 1:
616                     # triggers "signature is invalid"
617                     self._corrupt_mutable_share(filename, "seqnum")
618                 elif shnum == 2:
619                     # triggers "signature is invalid"
620                     self._corrupt_mutable_share(filename, "R")
621                 elif shnum == 3:
622                     # triggers "signature is invalid"
623                     self._corrupt_mutable_share(filename, "segsize")
624                 elif shnum == 4:
625                     self._corrupt_mutable_share(filename, "share_hash_chain")
626                 elif shnum == 5:
627                     self._corrupt_mutable_share(filename, "block_hash_tree")
628                 elif shnum == 6:
629                     self._corrupt_mutable_share(filename, "share_data")
630                 # other things to correct: IV, signature
631                 # 7,8,9 are left alone
632
633                 # note that initial_query_count=5 means that we'll hit the
634                 # first 5 servers in effectively random order (based upon
635                 # response time), so we won't necessarily ever get a "pubkey
636                 # doesn't match fingerprint" error (if we hit shnum>=1 before
637                 # shnum=0, we pull the pubkey from there). To get repeatable
638                 # specific failures, we need to set initial_query_count=1,
639                 # but of course that will change the sequencing behavior of
640                 # the retrieval process. TODO: find a reasonable way to make
641                 # this a parameter, probably when we expand this test to test
642                 # for one failure mode at a time.
643
644                 # when we retrieve this, we should get three signature
645                 # failures (where we've mangled seqnum, R, and segsize). The
646                 # pubkey mangling
647         d.addCallback(_corrupt_shares)
648
649         d.addCallback(lambda res: self._newnode3.download_best_version())
650         d.addCallback(_check_download_5)
651
652         def _check_empty_file(res):
653             # make sure we can create empty files, this usually screws up the
654             # segsize math
655             d1 = self.clients[2].create_mutable_file(MutableData(""))
656             d1.addCallback(lambda newnode: newnode.download_best_version())
657             d1.addCallback(lambda res: self.failUnlessEqual("", res))
658             return d1
659         d.addCallback(_check_empty_file)
660
661         d.addCallback(lambda res: self.clients[0].create_dirnode())
662         def _created_dirnode(dnode):
663             log.msg("_created_dirnode(%s)" % (dnode,))
664             d1 = dnode.list()
665             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
666             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
667             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
668             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
669             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
670             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
671             d1.addCallback(lambda res: dnode.build_manifest().when_done())
672             d1.addCallback(lambda res:
673                            self.failUnlessEqual(len(res["manifest"]), 1))
674             return d1
675         d.addCallback(_created_dirnode)
676
677         def wait_for_c3_kg_conn():
678             return self.clients[3]._key_generator is not None
679         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
680
681         def check_kg_poolsize(junk, size_delta):
682             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
683                                  self.key_generator_svc.key_generator.pool_size + size_delta)
684
685         d.addCallback(check_kg_poolsize, 0)
686         d.addCallback(lambda junk:
687             self.clients[3].create_mutable_file(MutableData('hello, world')))
688         d.addCallback(check_kg_poolsize, -1)
689         d.addCallback(lambda junk: self.clients[3].create_dirnode())
690         d.addCallback(check_kg_poolsize, -2)
691         # use_helper induces use of clients[3], which is the using-key_gen client
692         d.addCallback(lambda junk:
693                       self.POST("uri?t=mkdir&name=george", use_helper=True))
694         d.addCallback(check_kg_poolsize, -3)
695
696         return d
697
698     def flip_bit(self, good):
699         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
700
701     def mangle_uri(self, gooduri):
702         # change the key, which changes the storage index, which means we'll
703         # be asking about the wrong file, so nobody will have any shares
704         u = uri.from_string(gooduri)
705         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
706                             uri_extension_hash=u.uri_extension_hash,
707                             needed_shares=u.needed_shares,
708                             total_shares=u.total_shares,
709                             size=u.size)
710         return u2.to_string()
711
712     # TODO: add a test which mangles the uri_extension_hash instead, and
713     # should fail due to not being able to get a valid uri_extension block.
714     # Also a test which sneakily mangles the uri_extension block to change
715     # some of the validation data, so it will fail in the post-download phase
716     # when the file's crypttext integrity check fails. Do the same thing for
717     # the key, which should cause the download to fail the post-download
718     # plaintext_hash check.
719
720     def test_filesystem(self):
721         self.basedir = "system/SystemTest/test_filesystem"
722         self.data = LARGE_DATA
723         d = self.set_up_nodes(use_stats_gatherer=True)
724         def _new_happy_semantics(ign):
725             for c in self.clients:
726                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
727         d.addCallback(_new_happy_semantics)
728         d.addCallback(self._test_introweb)
729         d.addCallback(self.log, "starting publish")
730         d.addCallback(self._do_publish1)
731         d.addCallback(self._test_runner)
732         d.addCallback(self._do_publish2)
733         # at this point, we have the following filesystem (where "R" denotes
734         # self._root_directory_uri):
735         # R
736         # R/subdir1
737         # R/subdir1/mydata567
738         # R/subdir1/subdir2/
739         # R/subdir1/subdir2/mydata992
740
741         d.addCallback(lambda res: self.bounce_client(0))
742         d.addCallback(self.log, "bounced client0")
743
744         d.addCallback(self._check_publish1)
745         d.addCallback(self.log, "did _check_publish1")
746         d.addCallback(self._check_publish2)
747         d.addCallback(self.log, "did _check_publish2")
748         d.addCallback(self._do_publish_private)
749         d.addCallback(self.log, "did _do_publish_private")
750         # now we also have (where "P" denotes a new dir):
751         #  P/personal/sekrit data
752         #  P/s2-rw -> /subdir1/subdir2/
753         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
754         d.addCallback(self._check_publish_private)
755         d.addCallback(self.log, "did _check_publish_private")
756         d.addCallback(self._test_web)
757         d.addCallback(self._test_control)
758         d.addCallback(self._test_cli)
759         # P now has four top-level children:
760         # P/personal/sekrit data
761         # P/s2-ro/
762         # P/s2-rw/
763         # P/test_put/  (empty)
764         d.addCallback(self._test_checker)
765         return d
766
767     def _test_introweb(self, res):
768         d = getPage(self.introweb_url, method="GET", followRedirect=True)
769         def _check(res):
770             try:
771                 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res)
772                 verstr = str(allmydata.__version__)
773
774                 # The Python "rational version numbering" convention
775                 # disallows "-r$REV" but allows ".post$REV"
776                 # instead. Eventually we'll probably move to
777                 # that. When we do, this test won't go red:
778                 ix = verstr.rfind('-r')
779                 if ix != -1:
780                     altverstr = verstr[:ix] + '.post' + verstr[ix+2:]
781                 else:
782                     ix = verstr.rfind('.post')
783                     if ix != -1:
784                         altverstr = verstr[:ix] + '-r' + verstr[ix+5:]
785                     else:
786                         altverstr = verstr
787
788                 appverstr = "%s: %s" % (allmydata.__appname__, verstr)
789                 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
790
791                 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
792                 self.failUnless("Announcement Summary: storage: 5" in res)
793                 self.failUnless("Subscription Summary: storage: 5" in res)
794                 self.failUnless("tahoe.css" in res)
795             except unittest.FailTest:
796                 print
797                 print "GET %s output was:" % self.introweb_url
798                 print res
799                 raise
800         d.addCallback(_check)
801         # make sure it serves the CSS too
802         d.addCallback(lambda res:
803                       getPage(self.introweb_url+"tahoe.css", method="GET"))
804         d.addCallback(lambda res:
805                       getPage(self.introweb_url + "?t=json",
806                               method="GET", followRedirect=True))
807         def _check_json(res):
808             data = simplejson.loads(res)
809             try:
810                 self.failUnlessEqual(data["subscription_summary"],
811                                      {"storage": 5})
812                 self.failUnlessEqual(data["announcement_summary"],
813                                      {"storage": 5})
814                 self.failUnlessEqual(data["announcement_distinct_hosts"],
815                                      {"storage": 1})
816             except unittest.FailTest:
817                 print
818                 print "GET %s?t=json output was:" % self.introweb_url
819                 print res
820                 raise
821         d.addCallback(_check_json)
822         return d
823
824     def _do_publish1(self, res):
825         ut = upload.Data(self.data, convergence=None)
826         c0 = self.clients[0]
827         d = c0.create_dirnode()
828         def _made_root(new_dirnode):
829             self._root_directory_uri = new_dirnode.get_uri()
830             return c0.create_node_from_uri(self._root_directory_uri)
831         d.addCallback(_made_root)
832         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
833         def _made_subdir1(subdir1_node):
834             self._subdir1_node = subdir1_node
835             d1 = subdir1_node.add_file(u"mydata567", ut)
836             d1.addCallback(self.log, "publish finished")
837             def _stash_uri(filenode):
838                 self.uri = filenode.get_uri()
839                 assert isinstance(self.uri, str), (self.uri, filenode)
840             d1.addCallback(_stash_uri)
841             return d1
842         d.addCallback(_made_subdir1)
843         return d
844
845     def _do_publish2(self, res):
846         ut = upload.Data(self.data, convergence=None)
847         d = self._subdir1_node.create_subdirectory(u"subdir2")
848         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
849         return d
850
851     def log(self, res, *args, **kwargs):
852         # print "MSG: %s  RES: %s" % (msg, args)
853         log.msg(*args, **kwargs)
854         return res
855
856     def _do_publish_private(self, res):
857         self.smalldata = "sssh, very secret stuff"
858         ut = upload.Data(self.smalldata, convergence=None)
859         d = self.clients[0].create_dirnode()
860         d.addCallback(self.log, "GOT private directory")
861         def _got_new_dir(privnode):
862             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
863             d1 = privnode.create_subdirectory(u"personal")
864             d1.addCallback(self.log, "made P/personal")
865             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
866             d1.addCallback(self.log, "made P/personal/sekrit data")
867             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
868             def _got_s2(s2node):
869                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
870                                       s2node.get_readonly_uri())
871                 d2.addCallback(lambda node:
872                                privnode.set_uri(u"s2-ro",
873                                                 s2node.get_readonly_uri(),
874                                                 s2node.get_readonly_uri()))
875                 return d2
876             d1.addCallback(_got_s2)
877             d1.addCallback(lambda res: privnode)
878             return d1
879         d.addCallback(_got_new_dir)
880         return d
881
882     def _check_publish1(self, res):
883         # this one uses the iterative API
884         c1 = self.clients[1]
885         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
886         d.addCallback(self.log, "check_publish1 got /")
887         d.addCallback(lambda root: root.get(u"subdir1"))
888         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
889         d.addCallback(lambda filenode: download_to_data(filenode))
890         d.addCallback(self.log, "get finished")
891         def _get_done(data):
892             self.failUnlessEqual(data, self.data)
893         d.addCallback(_get_done)
894         return d
895
896     def _check_publish2(self, res):
897         # this one uses the path-based API
898         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
899         d = rootnode.get_child_at_path(u"subdir1")
900         d.addCallback(lambda dirnode:
901                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
902         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
903         d.addCallback(lambda filenode: download_to_data(filenode))
904         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
905
906         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
907         def _got_filenode(filenode):
908             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
909             assert fnode == filenode
910         d.addCallback(_got_filenode)
911         return d
912
913     def _check_publish_private(self, resnode):
914         # this one uses the path-based API
915         self._private_node = resnode
916
917         d = self._private_node.get_child_at_path(u"personal")
918         def _got_personal(personal):
919             self._personal_node = personal
920             return personal
921         d.addCallback(_got_personal)
922
923         d.addCallback(lambda dirnode:
924                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
925         def get_path(path):
926             return self._private_node.get_child_at_path(path)
927
928         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
929         d.addCallback(lambda filenode: download_to_data(filenode))
930         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
931         d.addCallback(lambda res: get_path(u"s2-rw"))
932         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
933         d.addCallback(lambda res: get_path(u"s2-ro"))
934         def _got_s2ro(dirnode):
935             self.failUnless(dirnode.is_mutable(), dirnode)
936             self.failUnless(dirnode.is_readonly(), dirnode)
937             d1 = defer.succeed(None)
938             d1.addCallback(lambda res: dirnode.list())
939             d1.addCallback(self.log, "dirnode.list")
940
941             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
942
943             d1.addCallback(self.log, "doing add_file(ro)")
944             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
945             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
946
947             d1.addCallback(self.log, "doing get(ro)")
948             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
949             d1.addCallback(lambda filenode:
950                            self.failUnless(IFileNode.providedBy(filenode)))
951
952             d1.addCallback(self.log, "doing delete(ro)")
953             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
954
955             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
956
957             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
958
959             personal = self._personal_node
960             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
961
962             d1.addCallback(self.log, "doing move_child_to(ro)2")
963             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
964
965             d1.addCallback(self.log, "finished with _got_s2ro")
966             return d1
967         d.addCallback(_got_s2ro)
968         def _got_home(dummy):
969             home = self._private_node
970             personal = self._personal_node
971             d1 = defer.succeed(None)
972             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
973             d1.addCallback(lambda res:
974                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
975
976             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
977             d1.addCallback(lambda res:
978                            home.move_child_to(u"sekrit", home, u"sekrit data"))
979
980             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
981             d1.addCallback(lambda res:
982                            home.move_child_to(u"sekrit data", personal))
983
984             d1.addCallback(lambda res: home.build_manifest().when_done())
985             d1.addCallback(self.log, "manifest")
986             #  five items:
987             # P/
988             # P/personal/
989             # P/personal/sekrit data
990             # P/s2-rw  (same as P/s2-ro)
991             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
992             d1.addCallback(lambda res:
993                            self.failUnlessEqual(len(res["manifest"]), 5))
994             d1.addCallback(lambda res: home.start_deep_stats().when_done())
995             def _check_stats(stats):
996                 expected = {"count-immutable-files": 1,
997                             "count-mutable-files": 0,
998                             "count-literal-files": 1,
999                             "count-files": 2,
1000                             "count-directories": 3,
1001                             "size-immutable-files": 112,
1002                             "size-literal-files": 23,
1003                             #"size-directories": 616, # varies
1004                             #"largest-directory": 616,
1005                             "largest-directory-children": 3,
1006                             "largest-immutable-file": 112,
1007                             }
1008                 for k,v in expected.iteritems():
1009                     self.failUnlessEqual(stats[k], v,
1010                                          "stats[%s] was %s, not %s" %
1011                                          (k, stats[k], v))
1012                 self.failUnless(stats["size-directories"] > 1300,
1013                                 stats["size-directories"])
1014                 self.failUnless(stats["largest-directory"] > 800,
1015                                 stats["largest-directory"])
1016                 self.failUnlessEqual(stats["size-files-histogram"],
1017                                      [ (11, 31, 1), (101, 316, 1) ])
1018             d1.addCallback(_check_stats)
1019             return d1
1020         d.addCallback(_got_home)
1021         return d
1022
1023     def shouldFail(self, res, expected_failure, which, substring=None):
1024         if isinstance(res, Failure):
1025             res.trap(expected_failure)
1026             if substring:
1027                 self.failUnless(substring in str(res),
1028                                 "substring '%s' not in '%s'"
1029                                 % (substring, str(res)))
1030         else:
1031             self.fail("%s was supposed to raise %s, not get '%s'" %
1032                       (which, expected_failure, res))
1033
1034     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1035         assert substring is None or isinstance(substring, str)
1036         d = defer.maybeDeferred(callable, *args, **kwargs)
1037         def done(res):
1038             if isinstance(res, Failure):
1039                 res.trap(expected_failure)
1040                 if substring:
1041                     self.failUnless(substring in str(res),
1042                                     "substring '%s' not in '%s'"
1043                                     % (substring, str(res)))
1044             else:
1045                 self.fail("%s was supposed to raise %s, not get '%s'" %
1046                           (which, expected_failure, res))
1047         d.addBoth(done)
1048         return d
1049
1050     def PUT(self, urlpath, data):
1051         url = self.webish_url + urlpath
1052         return getPage(url, method="PUT", postdata=data)
1053
1054     def GET(self, urlpath, followRedirect=False):
1055         url = self.webish_url + urlpath
1056         return getPage(url, method="GET", followRedirect=followRedirect)
1057
1058     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1059         sepbase = "boogabooga"
1060         sep = "--" + sepbase
1061         form = []
1062         form.append(sep)
1063         form.append('Content-Disposition: form-data; name="_charset"')
1064         form.append('')
1065         form.append('UTF-8')
1066         form.append(sep)
1067         for name, value in fields.iteritems():
1068             if isinstance(value, tuple):
1069                 filename, value = value
1070                 form.append('Content-Disposition: form-data; name="%s"; '
1071                             'filename="%s"' % (name, filename.encode("utf-8")))
1072             else:
1073                 form.append('Content-Disposition: form-data; name="%s"' % name)
1074             form.append('')
1075             form.append(str(value))
1076             form.append(sep)
1077         form[-1] += "--"
1078         body = ""
1079         headers = {}
1080         if fields:
1081             body = "\r\n".join(form) + "\r\n"
1082             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1083         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1084
1085     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1086               use_helper=False):
1087         if use_helper:
1088             url = self.helper_webish_url + urlpath
1089         else:
1090             url = self.webish_url + urlpath
1091         return getPage(url, method="POST", postdata=body, headers=headers,
1092                        followRedirect=followRedirect)
1093
1094     def _test_web(self, res):
1095         base = self.webish_url
1096         public = "uri/" + self._root_directory_uri
1097         d = getPage(base)
1098         def _got_welcome(page):
1099             # XXX This test is oversensitive to formatting
1100             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1101             self.failUnless(expected in page,
1102                             "I didn't see the right 'connected storage servers'"
1103                             " message in: %s" % page
1104                             )
1105             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1106             self.failUnless(expected in page,
1107                             "I didn't see the right 'My nodeid' message "
1108                             "in: %s" % page)
1109             self.failUnless("Helper: 0 active uploads" in page)
1110         d.addCallback(_got_welcome)
1111         d.addCallback(self.log, "done with _got_welcome")
1112
1113         # get the welcome page from the node that uses the helper too
1114         d.addCallback(lambda res: getPage(self.helper_webish_url))
1115         def _got_welcome_helper(page):
1116             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1117                             page)
1118             self.failUnless("Not running helper" in page)
1119         d.addCallback(_got_welcome_helper)
1120
1121         d.addCallback(lambda res: getPage(base + public))
1122         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1123         def _got_subdir1(page):
1124             # there ought to be an href for our file
1125             self.failUnlessIn('<td align="right">%d</td>' % len(self.data), page)
1126             self.failUnless(">mydata567</a>" in page)
1127         d.addCallback(_got_subdir1)
1128         d.addCallback(self.log, "done with _got_subdir1")
1129         d.addCallback(lambda res:
1130                       getPage(base + public + "/subdir1/mydata567"))
1131         def _got_data(page):
1132             self.failUnlessEqual(page, self.data)
1133         d.addCallback(_got_data)
1134
1135         # download from a URI embedded in a URL
1136         d.addCallback(self.log, "_get_from_uri")
1137         def _get_from_uri(res):
1138             return getPage(base + "uri/%s?filename=%s"
1139                            % (self.uri, "mydata567"))
1140         d.addCallback(_get_from_uri)
1141         def _got_from_uri(page):
1142             self.failUnlessEqual(page, self.data)
1143         d.addCallback(_got_from_uri)
1144
1145         # download from a URI embedded in a URL, second form
1146         d.addCallback(self.log, "_get_from_uri2")
1147         def _get_from_uri2(res):
1148             return getPage(base + "uri?uri=%s" % (self.uri,))
1149         d.addCallback(_get_from_uri2)
1150         d.addCallback(_got_from_uri)
1151
1152         # download from a bogus URI, make sure we get a reasonable error
1153         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1154         def _get_from_bogus_uri(res):
1155             d1 = getPage(base + "uri/%s?filename=%s"
1156                          % (self.mangle_uri(self.uri), "mydata567"))
1157             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1158                        "410")
1159             return d1
1160         d.addCallback(_get_from_bogus_uri)
1161         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1162
1163         # upload a file with PUT
1164         d.addCallback(self.log, "about to try PUT")
1165         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1166                                            "new.txt contents"))
1167         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1168         d.addCallback(self.failUnlessEqual, "new.txt contents")
1169         # and again with something large enough to use multiple segments,
1170         # and hopefully trigger pauseProducing too
1171         def _new_happy_semantics(ign):
1172             for c in self.clients:
1173                 # these get reset somewhere? Whatever.
1174                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1175         d.addCallback(_new_happy_semantics)
1176         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1177                                            "big" * 500000)) # 1.5MB
1178         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1179         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1180
1181         # can we replace files in place?
1182         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1183                                            "NEWER contents"))
1184         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1185         d.addCallback(self.failUnlessEqual, "NEWER contents")
1186
1187         # test unlinked POST
1188         d.addCallback(lambda res: self.POST("uri", t="upload",
1189                                             file=("new.txt", "data" * 10000)))
1190         # and again using the helper, which exercises different upload-status
1191         # display code
1192         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1193                                             file=("foo.txt", "data2" * 10000)))
1194
1195         # check that the status page exists
1196         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1197         def _got_status(res):
1198             # find an interesting upload and download to look at. LIT files
1199             # are not interesting.
1200             h = self.clients[0].get_history()
1201             for ds in h.list_all_download_statuses():
1202                 if ds.get_size() > 200:
1203                     self._down_status = ds.get_counter()
1204             for us in h.list_all_upload_statuses():
1205                 if us.get_size() > 200:
1206                     self._up_status = us.get_counter()
1207             rs = list(h.list_all_retrieve_statuses())[0]
1208             self._retrieve_status = rs.get_counter()
1209             ps = list(h.list_all_publish_statuses())[0]
1210             self._publish_status = ps.get_counter()
1211             us = list(h.list_all_mapupdate_statuses())[0]
1212             self._update_status = us.get_counter()
1213
1214             # and that there are some upload- and download- status pages
1215             return self.GET("status/up-%d" % self._up_status)
1216         d.addCallback(_got_status)
1217         def _got_up(res):
1218             return self.GET("status/down-%d" % self._down_status)
1219         d.addCallback(_got_up)
1220         def _got_down(res):
1221             return self.GET("status/mapupdate-%d" % self._update_status)
1222         d.addCallback(_got_down)
1223         def _got_update(res):
1224             return self.GET("status/publish-%d" % self._publish_status)
1225         d.addCallback(_got_update)
1226         def _got_publish(res):
1227             self.failUnlessIn("Publish Results", res)
1228             return self.GET("status/retrieve-%d" % self._retrieve_status)
1229         d.addCallback(_got_publish)
1230         def _got_retrieve(res):
1231             self.failUnlessIn("Retrieve Results", res)
1232         d.addCallback(_got_retrieve)
1233
1234         # check that the helper status page exists
1235         d.addCallback(lambda res:
1236                       self.GET("helper_status", followRedirect=True))
1237         def _got_helper_status(res):
1238             self.failUnless("Bytes Fetched:" in res)
1239             # touch a couple of files in the helper's working directory to
1240             # exercise more code paths
1241             workdir = os.path.join(self.getdir("client0"), "helper")
1242             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1243             f = open(incfile, "wb")
1244             f.write("small file")
1245             f.close()
1246             then = time.time() - 86400*3
1247             now = time.time()
1248             os.utime(incfile, (now, then))
1249             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1250             f = open(encfile, "wb")
1251             f.write("less small file")
1252             f.close()
1253             os.utime(encfile, (now, then))
1254         d.addCallback(_got_helper_status)
1255         # and that the json form exists
1256         d.addCallback(lambda res:
1257                       self.GET("helper_status?t=json", followRedirect=True))
1258         def _got_helper_status_json(res):
1259             data = simplejson.loads(res)
1260             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1261                                  1)
1262             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1263             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1264             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1265                                  10)
1266             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1267             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1268             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1269                                  15)
1270         d.addCallback(_got_helper_status_json)
1271
1272         # and check that client[3] (which uses a helper but does not run one
1273         # itself) doesn't explode when you ask for its status
1274         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1275         def _got_non_helper_status(res):
1276             self.failUnless("Upload and Download Status" in res)
1277         d.addCallback(_got_non_helper_status)
1278
1279         # or for helper status with t=json
1280         d.addCallback(lambda res:
1281                       getPage(self.helper_webish_url + "helper_status?t=json"))
1282         def _got_non_helper_status_json(res):
1283             data = simplejson.loads(res)
1284             self.failUnlessEqual(data, {})
1285         d.addCallback(_got_non_helper_status_json)
1286
1287         # see if the statistics page exists
1288         d.addCallback(lambda res: self.GET("statistics"))
1289         def _got_stats(res):
1290             self.failUnless("Node Statistics" in res)
1291             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1292         d.addCallback(_got_stats)
1293         d.addCallback(lambda res: self.GET("statistics?t=json"))
1294         def _got_stats_json(res):
1295             data = simplejson.loads(res)
1296             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1297             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1298         d.addCallback(_got_stats_json)
1299
1300         # TODO: mangle the second segment of a file, to test errors that
1301         # occur after we've already sent some good data, which uses a
1302         # different error path.
1303
1304         # TODO: download a URI with a form
1305         # TODO: create a directory by using a form
1306         # TODO: upload by using a form on the directory page
1307         #    url = base + "somedir/subdir1/freeform_post!!upload"
1308         # TODO: delete a file by using a button on the directory page
1309
1310         return d
1311
1312     def _test_runner(self, res):
1313         # exercise some of the diagnostic tools in runner.py
1314
1315         # find a share
1316         for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1317             if "storage" not in dirpath:
1318                 continue
1319             if not filenames:
1320                 continue
1321             pieces = dirpath.split(os.sep)
1322             if (len(pieces) >= 4
1323                 and pieces[-4] == "storage"
1324                 and pieces[-3] == "shares"):
1325                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1326                 # are sharefiles here
1327                 filename = os.path.join(dirpath, filenames[0])
1328                 # peek at the magic to see if it is a chk share
1329                 magic = open(filename, "rb").read(4)
1330                 if magic == '\x00\x00\x00\x01':
1331                     break
1332         else:
1333             self.fail("unable to find any uri_extension files in %r"
1334                       % self.basedir)
1335         log.msg("test_system.SystemTest._test_runner using %r" % filename)
1336
1337         out,err = StringIO(), StringIO()
1338         rc = runner.runner(["debug", "dump-share", "--offsets",
1339                             unicode_to_argv(filename)],
1340                            stdout=out, stderr=err)
1341         output = out.getvalue()
1342         self.failUnlessEqual(rc, 0)
1343
1344         # we only upload a single file, so we can assert some things about
1345         # its size and shares.
1346         self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1347         self.failUnlessIn("size: %d\n" % len(self.data), output)
1348         self.failUnlessIn("num_segments: 1\n", output)
1349         # segment_size is always a multiple of needed_shares
1350         self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1351         self.failUnlessIn("total_shares: 10\n", output)
1352         # keys which are supposed to be present
1353         for key in ("size", "num_segments", "segment_size",
1354                     "needed_shares", "total_shares",
1355                     "codec_name", "codec_params", "tail_codec_params",
1356                     #"plaintext_hash", "plaintext_root_hash",
1357                     "crypttext_hash", "crypttext_root_hash",
1358                     "share_root_hash", "UEB_hash"):
1359             self.failUnlessIn("%s: " % key, output)
1360         self.failUnlessIn("  verify-cap: URI:CHK-Verifier:", output)
1361
1362         # now use its storage index to find the other shares using the
1363         # 'find-shares' tool
1364         sharedir, shnum = os.path.split(filename)
1365         storagedir, storage_index_s = os.path.split(sharedir)
1366         storage_index_s = str(storage_index_s)
1367         out,err = StringIO(), StringIO()
1368         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1369         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1370         rc = runner.runner(cmd, stdout=out, stderr=err)
1371         self.failUnlessEqual(rc, 0)
1372         out.seek(0)
1373         sharefiles = [sfn.strip() for sfn in out.readlines()]
1374         self.failUnlessEqual(len(sharefiles), 10)
1375
1376         # also exercise the 'catalog-shares' tool
1377         out,err = StringIO(), StringIO()
1378         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1379         cmd = ["debug", "catalog-shares"] + nodedirs
1380         rc = runner.runner(cmd, stdout=out, stderr=err)
1381         self.failUnlessEqual(rc, 0)
1382         out.seek(0)
1383         descriptions = [sfn.strip() for sfn in out.readlines()]
1384         self.failUnlessEqual(len(descriptions), 30)
1385         matching = [line
1386                     for line in descriptions
1387                     if line.startswith("CHK %s " % storage_index_s)]
1388         self.failUnlessEqual(len(matching), 10)
1389
1390     def _test_control(self, res):
1391         # exercise the remote-control-the-client foolscap interfaces in
1392         # allmydata.control (mostly used for performance tests)
1393         c0 = self.clients[0]
1394         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1395         control_furl = open(control_furl_file, "r").read().strip()
1396         # it doesn't really matter which Tub we use to connect to the client,
1397         # so let's just use our IntroducerNode's
1398         d = self.introducer.tub.getReference(control_furl)
1399         d.addCallback(self._test_control2, control_furl_file)
1400         return d
1401     def _test_control2(self, rref, filename):
1402         d = rref.callRemote("upload_from_file_to_uri",
1403                             filename.encode(get_filesystem_encoding()), convergence=None)
1404         downfile = os.path.join(self.basedir, "control.downfile").encode(get_filesystem_encoding())
1405         d.addCallback(lambda uri:
1406                       rref.callRemote("download_from_uri_to_file",
1407                                       uri, downfile))
1408         def _check(res):
1409             self.failUnlessEqual(res, downfile)
1410             data = open(downfile, "r").read()
1411             expected_data = open(filename, "r").read()
1412             self.failUnlessEqual(data, expected_data)
1413         d.addCallback(_check)
1414         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1415         if sys.platform in ("linux2", "linux3"):
1416             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1417         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1418         return d
1419
1420     def _test_cli(self, res):
1421         # run various CLI commands (in a thread, since they use blocking
1422         # network calls)
1423
1424         private_uri = self._private_node.get_uri()
1425         client0_basedir = self.getdir("client0")
1426
1427         nodeargs = [
1428             "--node-directory", client0_basedir,
1429             ]
1430
1431         d = defer.succeed(None)
1432
1433         # for compatibility with earlier versions, private/root_dir.cap is
1434         # supposed to be treated as an alias named "tahoe:". Start by making
1435         # sure that works, before we add other aliases.
1436
1437         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1438         f = open(root_file, "w")
1439         f.write(private_uri)
1440         f.close()
1441
1442         def run(ignored, verb, *args, **kwargs):
1443             stdin = kwargs.get("stdin", "")
1444             newargs = [verb] + nodeargs + list(args)
1445             return self._run_cli(newargs, stdin=stdin)
1446
1447         def _check_ls((out,err), expected_children, unexpected_children=[]):
1448             self.failUnlessEqual(err, "")
1449             for s in expected_children:
1450                 self.failUnless(s in out, (s,out))
1451             for s in unexpected_children:
1452                 self.failIf(s in out, (s,out))
1453
1454         def _check_ls_root((out,err)):
1455             self.failUnless("personal" in out)
1456             self.failUnless("s2-ro" in out)
1457             self.failUnless("s2-rw" in out)
1458             self.failUnlessEqual(err, "")
1459
1460         # this should reference private_uri
1461         d.addCallback(run, "ls")
1462         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1463
1464         d.addCallback(run, "list-aliases")
1465         def _check_aliases_1((out,err)):
1466             self.failUnlessEqual(err, "")
1467             self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1468         d.addCallback(_check_aliases_1)
1469
1470         # now that that's out of the way, remove root_dir.cap and work with
1471         # new files
1472         d.addCallback(lambda res: os.unlink(root_file))
1473         d.addCallback(run, "list-aliases")
1474         def _check_aliases_2((out,err)):
1475             self.failUnlessEqual(err, "")
1476             self.failUnlessEqual(out, "")
1477         d.addCallback(_check_aliases_2)
1478
1479         d.addCallback(run, "mkdir")
1480         def _got_dir( (out,err) ):
1481             self.failUnless(uri.from_string_dirnode(out.strip()))
1482             return out.strip()
1483         d.addCallback(_got_dir)
1484         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1485
1486         d.addCallback(run, "list-aliases")
1487         def _check_aliases_3((out,err)):
1488             self.failUnlessEqual(err, "")
1489             self.failUnless("tahoe: " in out)
1490         d.addCallback(_check_aliases_3)
1491
1492         def _check_empty_dir((out,err)):
1493             self.failUnlessEqual(out, "")
1494             self.failUnlessEqual(err, "")
1495         d.addCallback(run, "ls")
1496         d.addCallback(_check_empty_dir)
1497
1498         def _check_missing_dir((out,err)):
1499             # TODO: check that rc==2
1500             self.failUnlessEqual(out, "")
1501             self.failUnlessEqual(err, "No such file or directory\n")
1502         d.addCallback(run, "ls", "bogus")
1503         d.addCallback(_check_missing_dir)
1504
1505         files = []
1506         datas = []
1507         for i in range(10):
1508             fn = os.path.join(self.basedir, "file%d" % i)
1509             files.append(fn)
1510             data = "data to be uploaded: file%d\n" % i
1511             datas.append(data)
1512             open(fn,"wb").write(data)
1513
1514         def _check_stdout_against((out,err), filenum=None, data=None):
1515             self.failUnlessEqual(err, "")
1516             if filenum is not None:
1517                 self.failUnlessEqual(out, datas[filenum])
1518             if data is not None:
1519                 self.failUnlessEqual(out, data)
1520
1521         # test all both forms of put: from a file, and from stdin
1522         #  tahoe put bar FOO
1523         d.addCallback(run, "put", files[0], "tahoe-file0")
1524         def _put_out((out,err)):
1525             self.failUnless("URI:LIT:" in out, out)
1526             self.failUnless("201 Created" in err, err)
1527             uri0 = out.strip()
1528             return run(None, "get", uri0)
1529         d.addCallback(_put_out)
1530         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1531
1532         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1533         #  tahoe put bar tahoe:FOO
1534         d.addCallback(run, "put", files[2], "tahoe:file2")
1535         d.addCallback(run, "put", "--format=SDMF", files[3], "tahoe:file3")
1536         def _check_put_mutable((out,err)):
1537             self._mutable_file3_uri = out.strip()
1538         d.addCallback(_check_put_mutable)
1539         d.addCallback(run, "get", "tahoe:file3")
1540         d.addCallback(_check_stdout_against, 3)
1541
1542         #  tahoe put FOO
1543         STDIN_DATA = "This is the file to upload from stdin."
1544         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1545         #  tahoe put tahoe:FOO
1546         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1547                       stdin="Other file from stdin.")
1548
1549         d.addCallback(run, "ls")
1550         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1551                                   "tahoe-file-stdin", "from-stdin"])
1552         d.addCallback(run, "ls", "subdir")
1553         d.addCallback(_check_ls, ["tahoe-file1"])
1554
1555         # tahoe mkdir FOO
1556         d.addCallback(run, "mkdir", "subdir2")
1557         d.addCallback(run, "ls")
1558         # TODO: extract the URI, set an alias with it
1559         d.addCallback(_check_ls, ["subdir2"])
1560
1561         # tahoe get: (to stdin and to a file)
1562         d.addCallback(run, "get", "tahoe-file0")
1563         d.addCallback(_check_stdout_against, 0)
1564         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1565         d.addCallback(_check_stdout_against, 1)
1566         outfile0 = os.path.join(self.basedir, "outfile0")
1567         d.addCallback(run, "get", "file2", outfile0)
1568         def _check_outfile0((out,err)):
1569             data = open(outfile0,"rb").read()
1570             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1571         d.addCallback(_check_outfile0)
1572         outfile1 = os.path.join(self.basedir, "outfile0")
1573         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1574         def _check_outfile1((out,err)):
1575             data = open(outfile1,"rb").read()
1576             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1577         d.addCallback(_check_outfile1)
1578
1579         d.addCallback(run, "rm", "tahoe-file0")
1580         d.addCallback(run, "rm", "tahoe:file2")
1581         d.addCallback(run, "ls")
1582         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1583
1584         d.addCallback(run, "ls", "-l")
1585         def _check_ls_l((out,err)):
1586             lines = out.split("\n")
1587             for l in lines:
1588                 if "tahoe-file-stdin" in l:
1589                     self.failUnless(l.startswith("-r-- "), l)
1590                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1591                 if "file3" in l:
1592                     self.failUnless(l.startswith("-rw- "), l) # mutable
1593         d.addCallback(_check_ls_l)
1594
1595         d.addCallback(run, "ls", "--uri")
1596         def _check_ls_uri((out,err)):
1597             lines = out.split("\n")
1598             for l in lines:
1599                 if "file3" in l:
1600                     self.failUnless(self._mutable_file3_uri in l)
1601         d.addCallback(_check_ls_uri)
1602
1603         d.addCallback(run, "ls", "--readonly-uri")
1604         def _check_ls_rouri((out,err)):
1605             lines = out.split("\n")
1606             for l in lines:
1607                 if "file3" in l:
1608                     rw_uri = self._mutable_file3_uri
1609                     u = uri.from_string_mutable_filenode(rw_uri)
1610                     ro_uri = u.get_readonly().to_string()
1611                     self.failUnless(ro_uri in l)
1612         d.addCallback(_check_ls_rouri)
1613
1614
1615         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1616         d.addCallback(run, "ls")
1617         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1618
1619         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1620         d.addCallback(run, "ls")
1621         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1622
1623         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1624         d.addCallback(run, "ls")
1625         d.addCallback(_check_ls, ["file3", "file3-copy"])
1626         d.addCallback(run, "get", "tahoe:file3-copy")
1627         d.addCallback(_check_stdout_against, 3)
1628
1629         # copy from disk into tahoe
1630         d.addCallback(run, "cp", files[4], "tahoe:file4")
1631         d.addCallback(run, "ls")
1632         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1633         d.addCallback(run, "get", "tahoe:file4")
1634         d.addCallback(_check_stdout_against, 4)
1635
1636         # copy from tahoe into disk
1637         target_filename = os.path.join(self.basedir, "file-out")
1638         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1639         def _check_cp_out((out,err)):
1640             self.failUnless(os.path.exists(target_filename))
1641             got = open(target_filename,"rb").read()
1642             self.failUnlessEqual(got, datas[4])
1643         d.addCallback(_check_cp_out)
1644
1645         # copy from disk to disk (silly case)
1646         target2_filename = os.path.join(self.basedir, "file-out-copy")
1647         d.addCallback(run, "cp", target_filename, target2_filename)
1648         def _check_cp_out2((out,err)):
1649             self.failUnless(os.path.exists(target2_filename))
1650             got = open(target2_filename,"rb").read()
1651             self.failUnlessEqual(got, datas[4])
1652         d.addCallback(_check_cp_out2)
1653
1654         # copy from tahoe into disk, overwriting an existing file
1655         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1656         def _check_cp_out3((out,err)):
1657             self.failUnless(os.path.exists(target_filename))
1658             got = open(target_filename,"rb").read()
1659             self.failUnlessEqual(got, datas[3])
1660         d.addCallback(_check_cp_out3)
1661
1662         # copy from disk into tahoe, overwriting an existing immutable file
1663         d.addCallback(run, "cp", files[5], "tahoe:file4")
1664         d.addCallback(run, "ls")
1665         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1666         d.addCallback(run, "get", "tahoe:file4")
1667         d.addCallback(_check_stdout_against, 5)
1668
1669         # copy from disk into tahoe, overwriting an existing mutable file
1670         d.addCallback(run, "cp", files[5], "tahoe:file3")
1671         d.addCallback(run, "ls")
1672         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1673         d.addCallback(run, "get", "tahoe:file3")
1674         d.addCallback(_check_stdout_against, 5)
1675
1676         # recursive copy: setup
1677         dn = os.path.join(self.basedir, "dir1")
1678         os.makedirs(dn)
1679         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1680         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1681         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1682         sdn2 = os.path.join(dn, "subdir2")
1683         os.makedirs(sdn2)
1684         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1685         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1686
1687         # from disk into tahoe
1688         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1689         d.addCallback(run, "ls")
1690         d.addCallback(_check_ls, ["dir1"])
1691         d.addCallback(run, "ls", "dir1")
1692         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1693                       ["rfile4", "rfile5"])
1694         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1695         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1696                       ["rfile1", "rfile2", "rfile3"])
1697         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1698         d.addCallback(_check_stdout_against, data="rfile4")
1699
1700         # and back out again
1701         dn_copy = os.path.join(self.basedir, "dir1-copy")
1702         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1703         def _check_cp_r_out((out,err)):
1704             def _cmp(name):
1705                 old = open(os.path.join(dn, name), "rb").read()
1706                 newfn = os.path.join(dn_copy, name)
1707                 self.failUnless(os.path.exists(newfn))
1708                 new = open(newfn, "rb").read()
1709                 self.failUnlessEqual(old, new)
1710             _cmp("rfile1")
1711             _cmp("rfile2")
1712             _cmp("rfile3")
1713             _cmp(os.path.join("subdir2", "rfile4"))
1714             _cmp(os.path.join("subdir2", "rfile5"))
1715         d.addCallback(_check_cp_r_out)
1716
1717         # and copy it a second time, which ought to overwrite the same files
1718         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1719
1720         # and again, only writing filecaps
1721         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1722         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1723         def _check_capsonly((out,err)):
1724             # these should all be LITs
1725             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1726             y = uri.from_string_filenode(x)
1727             self.failUnlessEqual(y.data, "rfile4")
1728         d.addCallback(_check_capsonly)
1729
1730         # and tahoe-to-tahoe
1731         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1732         d.addCallback(run, "ls")
1733         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1734         d.addCallback(run, "ls", "dir1-copy")
1735         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1736                       ["rfile4", "rfile5"])
1737         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1738         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1739                       ["rfile1", "rfile2", "rfile3"])
1740         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1741         d.addCallback(_check_stdout_against, data="rfile4")
1742
1743         # and copy it a second time, which ought to overwrite the same files
1744         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1745
1746         # tahoe_ls doesn't currently handle the error correctly: it tries to
1747         # JSON-parse a traceback.
1748 ##         def _ls_missing(res):
1749 ##             argv = ["ls"] + nodeargs + ["bogus"]
1750 ##             return self._run_cli(argv)
1751 ##         d.addCallback(_ls_missing)
1752 ##         def _check_ls_missing((out,err)):
1753 ##             print "OUT", out
1754 ##             print "ERR", err
1755 ##             self.failUnlessEqual(err, "")
1756 ##         d.addCallback(_check_ls_missing)
1757
1758         return d
1759
1760     def test_filesystem_with_cli_in_subprocess(self):
1761         # We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.
1762
1763         self.basedir = "system/SystemTest/test_filesystem_with_cli_in_subprocess"
1764         d = self.set_up_nodes()
1765         def _new_happy_semantics(ign):
1766             for c in self.clients:
1767                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1768         d.addCallback(_new_happy_semantics)
1769
1770         def _run_in_subprocess(ignored, verb, *args, **kwargs):
1771             stdin = kwargs.get("stdin")
1772             env = kwargs.get("env")
1773             newargs = [verb, "--node-directory", self.getdir("client0")] + list(args)
1774             return self.run_bintahoe(newargs, stdin=stdin, env=env)
1775
1776         def _check_succeeded(res, check_stderr=True):
1777             out, err, rc_or_sig = res
1778             self.failUnlessEqual(rc_or_sig, 0, str(res))
1779             if check_stderr:
1780                 self.failUnlessEqual(err, "")
1781
1782         d.addCallback(_run_in_subprocess, "create-alias", "newalias")
1783         d.addCallback(_check_succeeded)
1784
1785         STDIN_DATA = "This is the file to upload from stdin."
1786         d.addCallback(_run_in_subprocess, "put", "-", "newalias:tahoe-file", stdin=STDIN_DATA)
1787         d.addCallback(_check_succeeded, check_stderr=False)
1788
1789         def _mv_with_http_proxy(ign):
1790             env = os.environ
1791             env['http_proxy'] = env['HTTP_PROXY'] = "http://127.0.0.0:12345"  # invalid address
1792             return _run_in_subprocess(None, "mv", "newalias:tahoe-file", "newalias:tahoe-moved", env=env)
1793         d.addCallback(_mv_with_http_proxy)
1794         d.addCallback(_check_succeeded)
1795
1796         d.addCallback(_run_in_subprocess, "ls", "newalias:")
1797         def _check_ls(res):
1798             out, err, rc_or_sig = res
1799             self.failUnlessEqual(rc_or_sig, 0, str(res))
1800             self.failUnlessEqual(err, "", str(res))
1801             self.failUnlessIn("tahoe-moved", out)
1802             self.failIfIn("tahoe-file", out)
1803         d.addCallback(_check_ls)
1804         return d
1805
1806     def test_debug_trial(self):
1807         def _check_for_line(lines, result, test):
1808             for l in lines:
1809                 if result in l and test in l:
1810                     return
1811             self.fail("output (prefixed with '##') does not have a line containing both %r and %r:\n## %s"
1812                       % (result, test, "\n## ".join(lines)))
1813
1814         def _check_for_outcome(lines, out, outcome):
1815             self.failUnlessIn(outcome, out, "output (prefixed with '##') does not contain %r:\n## %s"
1816                                             % (outcome, "\n## ".join(lines)))
1817
1818         d = self.run_bintahoe(['debug', 'trial', '--reporter=verbose',
1819                                'allmydata.test.trialtest'])
1820         def _check_failure( (out, err, rc) ):
1821             self.failUnlessEqual(rc, 1)
1822             lines = out.split('\n')
1823             _check_for_line(lines, "[SKIPPED]", "test_skip")
1824             _check_for_line(lines, "[TODO]",    "test_todo")
1825             _check_for_line(lines, "[FAIL]",    "test_fail")
1826             _check_for_line(lines, "[ERROR]",   "test_deferred_error")
1827             _check_for_line(lines, "[ERROR]",   "test_error")
1828             _check_for_outcome(lines, out, "FAILED")
1829         d.addCallback(_check_failure)
1830
1831         # the --quiet argument regression-tests a problem in finding which arguments to pass to trial
1832         d.addCallback(lambda ign: self.run_bintahoe(['--quiet', 'debug', 'trial', '--reporter=verbose',
1833                                                      'allmydata.test.trialtest.Success']))
1834         def _check_success( (out, err, rc) ):
1835             self.failUnlessEqual(rc, 0)
1836             lines = out.split('\n')
1837             _check_for_line(lines, "[SKIPPED]", "test_skip")
1838             _check_for_line(lines, "[TODO]",    "test_todo")
1839             _check_for_outcome(lines, out, "PASSED")
1840         d.addCallback(_check_success)
1841         return d
1842
1843     def _run_cli(self, argv, stdin=""):
1844         #print "CLI:", argv
1845         stdout, stderr = StringIO(), StringIO()
1846         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1847                                   stdin=StringIO(stdin),
1848                                   stdout=stdout, stderr=stderr)
1849         def _done(res):
1850             return stdout.getvalue(), stderr.getvalue()
1851         d.addCallback(_done)
1852         return d
1853
1854     def _test_checker(self, res):
1855         ut = upload.Data("too big to be literal" * 200, convergence=None)
1856         d = self._personal_node.add_file(u"big file", ut)
1857
1858         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1859         def _check_dirnode_results(r):
1860             self.failUnless(r.is_healthy())
1861         d.addCallback(_check_dirnode_results)
1862         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1863         d.addCallback(_check_dirnode_results)
1864
1865         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1866         def _got_chk_filenode(n):
1867             self.failUnless(isinstance(n, ImmutableFileNode))
1868             d = n.check(Monitor())
1869             def _check_filenode_results(r):
1870                 self.failUnless(r.is_healthy())
1871             d.addCallback(_check_filenode_results)
1872             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1873             d.addCallback(_check_filenode_results)
1874             return d
1875         d.addCallback(_got_chk_filenode)
1876
1877         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1878         def _got_lit_filenode(n):
1879             self.failUnless(isinstance(n, LiteralFileNode))
1880             d = n.check(Monitor())
1881             def _check_lit_filenode_results(r):
1882                 self.failUnlessEqual(r, None)
1883             d.addCallback(_check_lit_filenode_results)
1884             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1885             d.addCallback(_check_lit_filenode_results)
1886             return d
1887         d.addCallback(_got_lit_filenode)
1888         return d
1889
1890 class Connections(SystemTestMixin, unittest.TestCase):
1891     def test_rref(self):
1892         if NormalizedVersion(foolscap.__version__) < NormalizedVersion('0.6.4'):
1893             raise unittest.SkipTest("skipped due to http://foolscap.lothar.com/trac/ticket/196 "
1894                                     "(which does not affect normal usage of Tahoe-LAFS)")
1895
1896         self.basedir = "system/Connections/rref"
1897         d = self.set_up_nodes(2)
1898         def _start(ign):
1899             self.c0 = self.clients[0]
1900             for s in self.c0.storage_broker.get_connected_servers():
1901                 if "pub-"+s.get_longname() != self.c0.node_key_s:
1902                     break
1903             self.s1 = s # s1 is the server, not c0
1904             self.s1_rref = s.get_rref()
1905             self.failIfEqual(self.s1_rref, None)
1906             self.failUnless(self.s1.is_connected())
1907         d.addCallback(_start)
1908
1909         # now shut down the server
1910         d.addCallback(lambda ign: self.clients[1].disownServiceParent())
1911         # and wait for the client to notice
1912         def _poll():
1913             return len(self.c0.storage_broker.get_connected_servers()) < 2
1914         d.addCallback(lambda ign: self.poll(_poll))
1915
1916         def _down(ign):
1917             self.failIf(self.s1.is_connected())
1918             rref = self.s1.get_rref()
1919             self.failUnless(rref)
1920             self.failUnlessIdentical(rref, self.s1_rref)
1921         d.addCallback(_down)
1922         return d