]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
494069e8f623aaf2826725c0b5a641ac314e0f7d
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7
8 import allmydata
9 from allmydata import uri
10 from allmydata.storage.mutable import MutableShareFile
11 from allmydata.storage.server import si_a2b
12 from allmydata.immutable import offloaded, upload
13 from allmydata.immutable.literal import LiteralFileNode
14 from allmydata.immutable.filenode import ImmutableFileNode
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding
18 from allmydata.util.fileutil import abspath_expanduser_unicode
19 from allmydata.util.consumer import MemoryConsumer, download_to_data
20 from allmydata.scripts import runner
21 from allmydata.interfaces import IDirectoryNode, IFileNode, \
22      NoSuchChildError, NoSharesError
23 from allmydata.monitor import Monitor
24 from allmydata.mutable.common import NotWriteableError
25 from allmydata.mutable import layout as mutable_layout
26 from allmydata.mutable.publish import MutableData
27 from foolscap.api import DeadReferenceError
28 from twisted.python.failure import Failure
29 from twisted.web.client import getPage
30 from twisted.web.error import Error
31
32 from allmydata.test.common import SystemTestMixin
33
34 # TODO: move this to common or common_util
35 from allmydata.test.test_runner import RunBinTahoeMixin
36
37 LARGE_DATA = """
38 This is some data to publish to the remote grid.., which needs to be large
39 enough to not fit inside a LIT uri.
40 """
41
42 class CountingDataUploadable(upload.Data):
43     bytes_read = 0
44     interrupt_after = None
45     interrupt_after_d = None
46
47     def read(self, length):
48         self.bytes_read += length
49         if self.interrupt_after is not None:
50             if self.bytes_read > self.interrupt_after:
51                 self.interrupt_after = None
52                 self.interrupt_after_d.callback(self)
53         return upload.Data.read(self, length)
54
55 class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
56     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
57
58     def test_connections(self):
59         self.basedir = "system/SystemTest/test_connections"
60         d = self.set_up_nodes()
61         self.extra_node = None
62         d.addCallback(lambda res: self.add_extra_node(self.numclients))
63         def _check(extra_node):
64             self.extra_node = extra_node
65             for c in self.clients:
66                 all_peerids = c.get_storage_broker().get_all_serverids()
67                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
68                 sb = c.storage_broker
69                 permuted_peers = sb.get_servers_for_psi("a")
70                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
71
72         d.addCallback(_check)
73         def _shutdown_extra_node(res):
74             if self.extra_node:
75                 return self.extra_node.stopService()
76             return res
77         d.addBoth(_shutdown_extra_node)
78         return d
79     # test_connections is subsumed by test_upload_and_download, and takes
80     # quite a while to run on a slow machine (because of all the TLS
81     # connections that must be established). If we ever rework the introducer
82     # code to such an extent that we're not sure if it works anymore, we can
83     # reinstate this test until it does.
84     del test_connections
85
86     def test_upload_and_download_random_key(self):
87         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
88         return self._test_upload_and_download(convergence=None)
89
90     def test_upload_and_download_convergent(self):
91         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
92         return self._test_upload_and_download(convergence="some convergence string")
93
94     def _test_upload_and_download(self, convergence):
95         # we use 4000 bytes of data, which will result in about 400k written
96         # to disk among all our simulated nodes
97         DATA = "Some data to upload\n" * 200
98         d = self.set_up_nodes()
99         def _check_connections(res):
100             for c in self.clients:
101                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
102                 all_peerids = c.get_storage_broker().get_all_serverids()
103                 self.failUnlessEqual(len(all_peerids), self.numclients)
104                 sb = c.storage_broker
105                 permuted_peers = sb.get_servers_for_psi("a")
106                 self.failUnlessEqual(len(permuted_peers), self.numclients)
107         d.addCallback(_check_connections)
108
109         def _do_upload(res):
110             log.msg("UPLOADING")
111             u = self.clients[0].getServiceNamed("uploader")
112             self.uploader = u
113             # we crank the max segsize down to 1024b for the duration of this
114             # test, so we can exercise multiple segments. It is important
115             # that this is not a multiple of the segment size, so that the
116             # tail segment is not the same length as the others. This actualy
117             # gets rounded up to 1025 to be a multiple of the number of
118             # required shares (since we use 25 out of 100 FEC).
119             up = upload.Data(DATA, convergence=convergence)
120             up.max_segment_size = 1024
121             d1 = u.upload(up)
122             return d1
123         d.addCallback(_do_upload)
124         def _upload_done(results):
125             theuri = results.get_uri()
126             log.msg("upload finished: uri is %s" % (theuri,))
127             self.uri = theuri
128             assert isinstance(self.uri, str), self.uri
129             self.cap = uri.from_string(self.uri)
130             self.n = self.clients[1].create_node_from_uri(self.uri)
131         d.addCallback(_upload_done)
132
133         def _upload_again(res):
134             # Upload again. If using convergent encryption then this ought to be
135             # short-circuited, however with the way we currently generate URIs
136             # (i.e. because they include the roothash), we have to do all of the
137             # encoding work, and only get to save on the upload part.
138             log.msg("UPLOADING AGAIN")
139             up = upload.Data(DATA, convergence=convergence)
140             up.max_segment_size = 1024
141             return self.uploader.upload(up)
142         d.addCallback(_upload_again)
143
144         def _download_to_data(res):
145             log.msg("DOWNLOADING")
146             return download_to_data(self.n)
147         d.addCallback(_download_to_data)
148         def _download_to_data_done(data):
149             log.msg("download finished")
150             self.failUnlessEqual(data, DATA)
151         d.addCallback(_download_to_data_done)
152
153         def _test_read(res):
154             n = self.clients[1].create_node_from_uri(self.uri)
155             d = download_to_data(n)
156             def _read_done(data):
157                 self.failUnlessEqual(data, DATA)
158             d.addCallback(_read_done)
159             d.addCallback(lambda ign:
160                           n.read(MemoryConsumer(), offset=1, size=4))
161             def _read_portion_done(mc):
162                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
163             d.addCallback(_read_portion_done)
164             d.addCallback(lambda ign:
165                           n.read(MemoryConsumer(), offset=2, size=None))
166             def _read_tail_done(mc):
167                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
168             d.addCallback(_read_tail_done)
169             d.addCallback(lambda ign:
170                           n.read(MemoryConsumer(), size=len(DATA)+1000))
171             def _read_too_much(mc):
172                 self.failUnlessEqual("".join(mc.chunks), DATA)
173             d.addCallback(_read_too_much)
174
175             return d
176         d.addCallback(_test_read)
177
178         def _test_bad_read(res):
179             bad_u = uri.from_string_filenode(self.uri)
180             bad_u.key = self.flip_bit(bad_u.key)
181             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
182             # this should cause an error during download
183
184             d = self.shouldFail2(NoSharesError, "'download bad node'",
185                                  None,
186                                  bad_n.read, MemoryConsumer(), offset=2)
187             return d
188         d.addCallback(_test_bad_read)
189
190         def _download_nonexistent_uri(res):
191             baduri = self.mangle_uri(self.uri)
192             badnode = self.clients[1].create_node_from_uri(baduri)
193             log.msg("about to download non-existent URI", level=log.UNUSUAL,
194                     facility="tahoe.tests")
195             d1 = download_to_data(badnode)
196             def _baduri_should_fail(res):
197                 log.msg("finished downloading non-existend URI",
198                         level=log.UNUSUAL, facility="tahoe.tests")
199                 self.failUnless(isinstance(res, Failure))
200                 self.failUnless(res.check(NoSharesError),
201                                 "expected NoSharesError, got %s" % res)
202             d1.addBoth(_baduri_should_fail)
203             return d1
204         d.addCallback(_download_nonexistent_uri)
205
206         # add a new node, which doesn't accept shares, and only uses the
207         # helper for upload.
208         d.addCallback(lambda res: self.add_extra_node(self.numclients,
209                                                       self.helper_furl,
210                                                       add_to_sparent=True))
211         def _added(extra_node):
212             self.extra_node = extra_node
213             self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
214         d.addCallback(_added)
215
216         HELPER_DATA = "Data that needs help to upload" * 1000
217         def _upload_with_helper(res):
218             u = upload.Data(HELPER_DATA, convergence=convergence)
219             d = self.extra_node.upload(u)
220             def _uploaded(results):
221                 n = self.clients[1].create_node_from_uri(results.get_uri())
222                 return download_to_data(n)
223             d.addCallback(_uploaded)
224             def _check(newdata):
225                 self.failUnlessEqual(newdata, HELPER_DATA)
226             d.addCallback(_check)
227             return d
228         d.addCallback(_upload_with_helper)
229
230         def _upload_duplicate_with_helper(res):
231             u = upload.Data(HELPER_DATA, convergence=convergence)
232             u.debug_stash_RemoteEncryptedUploadable = True
233             d = self.extra_node.upload(u)
234             def _uploaded(results):
235                 n = self.clients[1].create_node_from_uri(results.get_uri())
236                 return download_to_data(n)
237             d.addCallback(_uploaded)
238             def _check(newdata):
239                 self.failUnlessEqual(newdata, HELPER_DATA)
240                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
241                             "uploadable started uploading, should have been avoided")
242             d.addCallback(_check)
243             return d
244         if convergence is not None:
245             d.addCallback(_upload_duplicate_with_helper)
246
247         def _upload_resumable(res):
248             DATA = "Data that needs help to upload and gets interrupted" * 1000
249             u1 = CountingDataUploadable(DATA, convergence=convergence)
250             u2 = CountingDataUploadable(DATA, convergence=convergence)
251
252             # we interrupt the connection after about 5kB by shutting down
253             # the helper, then restartingit.
254             u1.interrupt_after = 5000
255             u1.interrupt_after_d = defer.Deferred()
256             u1.interrupt_after_d.addCallback(lambda res:
257                                              self.bounce_client(0))
258
259             # sneak into the helper and reduce its chunk size, so that our
260             # debug_interrupt will sever the connection on about the fifth
261             # chunk fetched. This makes sure that we've started to write the
262             # new shares before we abandon them, which exercises the
263             # abort/delete-partial-share code. TODO: find a cleaner way to do
264             # this. I know that this will affect later uses of the helper in
265             # this same test run, but I'm not currently worried about it.
266             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
267
268             d = self.extra_node.upload(u1)
269
270             def _should_not_finish(res):
271                 self.fail("interrupted upload should have failed, not finished"
272                           " with result %s" % (res,))
273             def _interrupted(f):
274                 f.trap(DeadReferenceError)
275
276                 # make sure we actually interrupted it before finishing the
277                 # file
278                 self.failUnless(u1.bytes_read < len(DATA),
279                                 "read %d out of %d total" % (u1.bytes_read,
280                                                              len(DATA)))
281
282                 log.msg("waiting for reconnect", level=log.NOISY,
283                         facility="tahoe.test.test_system")
284                 # now, we need to give the nodes a chance to notice that this
285                 # connection has gone away. When this happens, the storage
286                 # servers will be told to abort their uploads, removing the
287                 # partial shares. Unfortunately this involves TCP messages
288                 # going through the loopback interface, and we can't easily
289                 # predict how long that will take. If it were all local, we
290                 # could use fireEventually() to stall. Since we don't have
291                 # the right introduction hooks, the best we can do is use a
292                 # fixed delay. TODO: this is fragile.
293                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
294                 return u1.interrupt_after_d
295             d.addCallbacks(_should_not_finish, _interrupted)
296
297             def _disconnected(res):
298                 # check to make sure the storage servers aren't still hanging
299                 # on to the partial share: their incoming/ directories should
300                 # now be empty.
301                 log.msg("disconnected", level=log.NOISY,
302                         facility="tahoe.test.test_system")
303                 for i in range(self.numclients):
304                     incdir = os.path.join(self.getdir("client%d" % i),
305                                           "storage", "shares", "incoming")
306                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
307             d.addCallback(_disconnected)
308
309             # then we need to give the reconnector a chance to
310             # reestablish the connection to the helper.
311             d.addCallback(lambda res:
312                           log.msg("wait_for_connections", level=log.NOISY,
313                                   facility="tahoe.test.test_system"))
314             d.addCallback(lambda res: self.wait_for_connections())
315
316
317             d.addCallback(lambda res:
318                           log.msg("uploading again", level=log.NOISY,
319                                   facility="tahoe.test.test_system"))
320             d.addCallback(lambda res: self.extra_node.upload(u2))
321
322             def _uploaded(results):
323                 cap = results.get_uri()
324                 log.msg("Second upload complete", level=log.NOISY,
325                         facility="tahoe.test.test_system")
326
327                 # this is really bytes received rather than sent, but it's
328                 # convenient and basically measures the same thing
329                 bytes_sent = results.get_ciphertext_fetched()
330                 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
331
332                 # We currently don't support resumption of upload if the data is
333                 # encrypted with a random key.  (Because that would require us
334                 # to store the key locally and re-use it on the next upload of
335                 # this file, which isn't a bad thing to do, but we currently
336                 # don't do it.)
337                 if convergence is not None:
338                     # Make sure we did not have to read the whole file the
339                     # second time around .
340                     self.failUnless(bytes_sent < len(DATA),
341                                     "resumption didn't save us any work:"
342                                     " read %r bytes out of %r total" %
343                                     (bytes_sent, len(DATA)))
344                 else:
345                     # Make sure we did have to read the whole file the second
346                     # time around -- because the one that we partially uploaded
347                     # earlier was encrypted with a different random key.
348                     self.failIf(bytes_sent < len(DATA),
349                                 "resumption saved us some work even though we were using random keys:"
350                                 " read %r bytes out of %r total" %
351                                 (bytes_sent, len(DATA)))
352                 n = self.clients[1].create_node_from_uri(cap)
353                 return download_to_data(n)
354             d.addCallback(_uploaded)
355
356             def _check(newdata):
357                 self.failUnlessEqual(newdata, DATA)
358                 # If using convergent encryption, then also check that the
359                 # helper has removed the temp file from its directories.
360                 if convergence is not None:
361                     basedir = os.path.join(self.getdir("client0"), "helper")
362                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
363                     self.failUnlessEqual(files, [])
364                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
365                     self.failUnlessEqual(files, [])
366             d.addCallback(_check)
367             return d
368         d.addCallback(_upload_resumable)
369
370         def _grab_stats(ignored):
371             # the StatsProvider doesn't normally publish a FURL:
372             # instead it passes a live reference to the StatsGatherer
373             # (if and when it connects). To exercise the remote stats
374             # interface, we manually publish client0's StatsProvider
375             # and use client1 to query it.
376             sp = self.clients[0].stats_provider
377             sp_furl = self.clients[0].tub.registerReference(sp)
378             d = self.clients[1].tub.getReference(sp_furl)
379             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
380             def _got_stats(stats):
381                 #print "STATS"
382                 #from pprint import pprint
383                 #pprint(stats)
384                 s = stats["stats"]
385                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
386                 c = stats["counters"]
387                 self.failUnless("storage_server.allocate" in c)
388             d.addCallback(_got_stats)
389             return d
390         d.addCallback(_grab_stats)
391
392         return d
393
394     def _find_all_shares(self, basedir):
395         shares = []
396         for (dirpath, dirnames, filenames) in os.walk(basedir):
397             if "storage" not in dirpath:
398                 continue
399             if not filenames:
400                 continue
401             pieces = dirpath.split(os.sep)
402             if (len(pieces) >= 5
403                 and pieces[-4] == "storage"
404                 and pieces[-3] == "shares"):
405                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
406                 # are sharefiles here
407                 assert pieces[-5].startswith("client")
408                 client_num = int(pieces[-5][-1])
409                 storage_index_s = pieces[-1]
410                 storage_index = si_a2b(storage_index_s)
411                 for sharename in filenames:
412                     shnum = int(sharename)
413                     filename = os.path.join(dirpath, sharename)
414                     data = (client_num, storage_index, filename, shnum)
415                     shares.append(data)
416         if not shares:
417             self.fail("unable to find any share files in %s" % basedir)
418         return shares
419
420     def _corrupt_mutable_share(self, filename, which):
421         msf = MutableShareFile(filename)
422         datav = msf.readv([ (0, 1000000) ])
423         final_share = datav[0]
424         assert len(final_share) < 1000000 # ought to be truncated
425         pieces = mutable_layout.unpack_share(final_share)
426         (seqnum, root_hash, IV, k, N, segsize, datalen,
427          verification_key, signature, share_hash_chain, block_hash_tree,
428          share_data, enc_privkey) = pieces
429
430         if which == "seqnum":
431             seqnum = seqnum + 15
432         elif which == "R":
433             root_hash = self.flip_bit(root_hash)
434         elif which == "IV":
435             IV = self.flip_bit(IV)
436         elif which == "segsize":
437             segsize = segsize + 15
438         elif which == "pubkey":
439             verification_key = self.flip_bit(verification_key)
440         elif which == "signature":
441             signature = self.flip_bit(signature)
442         elif which == "share_hash_chain":
443             nodenum = share_hash_chain.keys()[0]
444             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
445         elif which == "block_hash_tree":
446             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
447         elif which == "share_data":
448             share_data = self.flip_bit(share_data)
449         elif which == "encprivkey":
450             enc_privkey = self.flip_bit(enc_privkey)
451
452         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
453                                             segsize, datalen)
454         final_share = mutable_layout.pack_share(prefix,
455                                                 verification_key,
456                                                 signature,
457                                                 share_hash_chain,
458                                                 block_hash_tree,
459                                                 share_data,
460                                                 enc_privkey)
461         msf.writev( [(0, final_share)], None)
462
463
464     def test_mutable(self):
465         self.basedir = "system/SystemTest/test_mutable"
466         DATA = "initial contents go here."  # 25 bytes % 3 != 0
467         DATA_uploadable = MutableData(DATA)
468         NEWDATA = "new contents yay"
469         NEWDATA_uploadable = MutableData(NEWDATA)
470         NEWERDATA = "this is getting old"
471         NEWERDATA_uploadable = MutableData(NEWERDATA)
472
473         d = self.set_up_nodes(use_key_generator=True)
474
475         def _create_mutable(res):
476             c = self.clients[0]
477             log.msg("starting create_mutable_file")
478             d1 = c.create_mutable_file(DATA_uploadable)
479             def _done(res):
480                 log.msg("DONE: %s" % (res,))
481                 self._mutable_node_1 = res
482             d1.addCallback(_done)
483             return d1
484         d.addCallback(_create_mutable)
485
486         def _test_debug(res):
487             # find a share. It is important to run this while there is only
488             # one slot in the grid.
489             shares = self._find_all_shares(self.basedir)
490             (client_num, storage_index, filename, shnum) = shares[0]
491             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
492                     % filename)
493             log.msg(" for clients[%d]" % client_num)
494
495             out,err = StringIO(), StringIO()
496             rc = runner.runner(["debug", "dump-share", "--offsets",
497                                 filename],
498                                stdout=out, stderr=err)
499             output = out.getvalue()
500             self.failUnlessEqual(rc, 0)
501             try:
502                 self.failUnless("Mutable slot found:\n" in output)
503                 self.failUnless("share_type: SDMF\n" in output)
504                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
505                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
506                 self.failUnless(" num_extra_leases: 0\n" in output)
507                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
508                                 in output)
509                 self.failUnless(" SDMF contents:\n" in output)
510                 self.failUnless("  seqnum: 1\n" in output)
511                 self.failUnless("  required_shares: 3\n" in output)
512                 self.failUnless("  total_shares: 10\n" in output)
513                 self.failUnless("  segsize: 27\n" in output, (output, filename))
514                 self.failUnless("  datalen: 25\n" in output)
515                 # the exact share_hash_chain nodes depends upon the sharenum,
516                 # and is more of a hassle to compute than I want to deal with
517                 # now
518                 self.failUnless("  share_hash_chain: " in output)
519                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
520                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
521                             base32.b2a(storage_index))
522                 self.failUnless(expected in output)
523             except unittest.FailTest:
524                 print
525                 print "dump-share output was:"
526                 print output
527                 raise
528         d.addCallback(_test_debug)
529
530         # test retrieval
531
532         # first, let's see if we can use the existing node to retrieve the
533         # contents. This allows it to use the cached pubkey and maybe the
534         # latest-known sharemap.
535
536         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
537         def _check_download_1(res):
538             self.failUnlessEqual(res, DATA)
539             # now we see if we can retrieve the data from a new node,
540             # constructed using the URI of the original one. We do this test
541             # on the same client that uploaded the data.
542             uri = self._mutable_node_1.get_uri()
543             log.msg("starting retrieve1")
544             newnode = self.clients[0].create_node_from_uri(uri)
545             newnode_2 = self.clients[0].create_node_from_uri(uri)
546             self.failUnlessIdentical(newnode, newnode_2)
547             return newnode.download_best_version()
548         d.addCallback(_check_download_1)
549
550         def _check_download_2(res):
551             self.failUnlessEqual(res, DATA)
552             # same thing, but with a different client
553             uri = self._mutable_node_1.get_uri()
554             newnode = self.clients[1].create_node_from_uri(uri)
555             log.msg("starting retrieve2")
556             d1 = newnode.download_best_version()
557             d1.addCallback(lambda res: (res, newnode))
558             return d1
559         d.addCallback(_check_download_2)
560
561         def _check_download_3((res, newnode)):
562             self.failUnlessEqual(res, DATA)
563             # replace the data
564             log.msg("starting replace1")
565             d1 = newnode.overwrite(NEWDATA_uploadable)
566             d1.addCallback(lambda res: newnode.download_best_version())
567             return d1
568         d.addCallback(_check_download_3)
569
570         def _check_download_4(res):
571             self.failUnlessEqual(res, NEWDATA)
572             # now create an even newer node and replace the data on it. This
573             # new node has never been used for download before.
574             uri = self._mutable_node_1.get_uri()
575             newnode1 = self.clients[2].create_node_from_uri(uri)
576             newnode2 = self.clients[3].create_node_from_uri(uri)
577             self._newnode3 = self.clients[3].create_node_from_uri(uri)
578             log.msg("starting replace2")
579             d1 = newnode1.overwrite(NEWERDATA_uploadable)
580             d1.addCallback(lambda res: newnode2.download_best_version())
581             return d1
582         d.addCallback(_check_download_4)
583
584         def _check_download_5(res):
585             log.msg("finished replace2")
586             self.failUnlessEqual(res, NEWERDATA)
587         d.addCallback(_check_download_5)
588
589         def _corrupt_shares(res):
590             # run around and flip bits in all but k of the shares, to test
591             # the hash checks
592             shares = self._find_all_shares(self.basedir)
593             ## sort by share number
594             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
595             where = dict([ (shnum, filename)
596                            for (client_num, storage_index, filename, shnum)
597                            in shares ])
598             assert len(where) == 10 # this test is designed for 3-of-10
599             for shnum, filename in where.items():
600                 # shares 7,8,9 are left alone. read will check
601                 # (share_hash_chain, block_hash_tree, share_data). New
602                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
603                 # segsize, signature).
604                 if shnum == 0:
605                     # read: this will trigger "pubkey doesn't match
606                     # fingerprint".
607                     self._corrupt_mutable_share(filename, "pubkey")
608                     self._corrupt_mutable_share(filename, "encprivkey")
609                 elif shnum == 1:
610                     # triggers "signature is invalid"
611                     self._corrupt_mutable_share(filename, "seqnum")
612                 elif shnum == 2:
613                     # triggers "signature is invalid"
614                     self._corrupt_mutable_share(filename, "R")
615                 elif shnum == 3:
616                     # triggers "signature is invalid"
617                     self._corrupt_mutable_share(filename, "segsize")
618                 elif shnum == 4:
619                     self._corrupt_mutable_share(filename, "share_hash_chain")
620                 elif shnum == 5:
621                     self._corrupt_mutable_share(filename, "block_hash_tree")
622                 elif shnum == 6:
623                     self._corrupt_mutable_share(filename, "share_data")
624                 # other things to correct: IV, signature
625                 # 7,8,9 are left alone
626
627                 # note that initial_query_count=5 means that we'll hit the
628                 # first 5 servers in effectively random order (based upon
629                 # response time), so we won't necessarily ever get a "pubkey
630                 # doesn't match fingerprint" error (if we hit shnum>=1 before
631                 # shnum=0, we pull the pubkey from there). To get repeatable
632                 # specific failures, we need to set initial_query_count=1,
633                 # but of course that will change the sequencing behavior of
634                 # the retrieval process. TODO: find a reasonable way to make
635                 # this a parameter, probably when we expand this test to test
636                 # for one failure mode at a time.
637
638                 # when we retrieve this, we should get three signature
639                 # failures (where we've mangled seqnum, R, and segsize). The
640                 # pubkey mangling
641         d.addCallback(_corrupt_shares)
642
643         d.addCallback(lambda res: self._newnode3.download_best_version())
644         d.addCallback(_check_download_5)
645
646         def _check_empty_file(res):
647             # make sure we can create empty files, this usually screws up the
648             # segsize math
649             d1 = self.clients[2].create_mutable_file(MutableData(""))
650             d1.addCallback(lambda newnode: newnode.download_best_version())
651             d1.addCallback(lambda res: self.failUnlessEqual("", res))
652             return d1
653         d.addCallback(_check_empty_file)
654
655         d.addCallback(lambda res: self.clients[0].create_dirnode())
656         def _created_dirnode(dnode):
657             log.msg("_created_dirnode(%s)" % (dnode,))
658             d1 = dnode.list()
659             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
660             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
661             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
662             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
663             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
664             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
665             d1.addCallback(lambda res: dnode.build_manifest().when_done())
666             d1.addCallback(lambda res:
667                            self.failUnlessEqual(len(res["manifest"]), 1))
668             return d1
669         d.addCallback(_created_dirnode)
670
671         def wait_for_c3_kg_conn():
672             return self.clients[3]._key_generator is not None
673         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
674
675         def check_kg_poolsize(junk, size_delta):
676             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
677                                  self.key_generator_svc.key_generator.pool_size + size_delta)
678
679         d.addCallback(check_kg_poolsize, 0)
680         d.addCallback(lambda junk:
681             self.clients[3].create_mutable_file(MutableData('hello, world')))
682         d.addCallback(check_kg_poolsize, -1)
683         d.addCallback(lambda junk: self.clients[3].create_dirnode())
684         d.addCallback(check_kg_poolsize, -2)
685         # use_helper induces use of clients[3], which is the using-key_gen client
686         d.addCallback(lambda junk:
687                       self.POST("uri?t=mkdir&name=george", use_helper=True))
688         d.addCallback(check_kg_poolsize, -3)
689
690         return d
691
692     def flip_bit(self, good):
693         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
694
695     def mangle_uri(self, gooduri):
696         # change the key, which changes the storage index, which means we'll
697         # be asking about the wrong file, so nobody will have any shares
698         u = uri.from_string(gooduri)
699         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
700                             uri_extension_hash=u.uri_extension_hash,
701                             needed_shares=u.needed_shares,
702                             total_shares=u.total_shares,
703                             size=u.size)
704         return u2.to_string()
705
706     # TODO: add a test which mangles the uri_extension_hash instead, and
707     # should fail due to not being able to get a valid uri_extension block.
708     # Also a test which sneakily mangles the uri_extension block to change
709     # some of the validation data, so it will fail in the post-download phase
710     # when the file's crypttext integrity check fails. Do the same thing for
711     # the key, which should cause the download to fail the post-download
712     # plaintext_hash check.
713
714     def test_filesystem(self):
715         self.basedir = "system/SystemTest/test_filesystem"
716         self.data = LARGE_DATA
717         d = self.set_up_nodes(use_stats_gatherer=True)
718         def _new_happy_semantics(ign):
719             for c in self.clients:
720                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
721         d.addCallback(_new_happy_semantics)
722         d.addCallback(self._test_introweb)
723         d.addCallback(self.log, "starting publish")
724         d.addCallback(self._do_publish1)
725         d.addCallback(self._test_runner)
726         d.addCallback(self._do_publish2)
727         # at this point, we have the following filesystem (where "R" denotes
728         # self._root_directory_uri):
729         # R
730         # R/subdir1
731         # R/subdir1/mydata567
732         # R/subdir1/subdir2/
733         # R/subdir1/subdir2/mydata992
734
735         d.addCallback(lambda res: self.bounce_client(0))
736         d.addCallback(self.log, "bounced client0")
737
738         d.addCallback(self._check_publish1)
739         d.addCallback(self.log, "did _check_publish1")
740         d.addCallback(self._check_publish2)
741         d.addCallback(self.log, "did _check_publish2")
742         d.addCallback(self._do_publish_private)
743         d.addCallback(self.log, "did _do_publish_private")
744         # now we also have (where "P" denotes a new dir):
745         #  P/personal/sekrit data
746         #  P/s2-rw -> /subdir1/subdir2/
747         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
748         d.addCallback(self._check_publish_private)
749         d.addCallback(self.log, "did _check_publish_private")
750         d.addCallback(self._test_web)
751         d.addCallback(self._test_control)
752         d.addCallback(self._test_cli)
753         # P now has four top-level children:
754         # P/personal/sekrit data
755         # P/s2-ro/
756         # P/s2-rw/
757         # P/test_put/  (empty)
758         d.addCallback(self._test_checker)
759         return d
760
761     def _test_introweb(self, res):
762         d = getPage(self.introweb_url, method="GET", followRedirect=True)
763         def _check(res):
764             try:
765                 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res)
766                 verstr = str(allmydata.__version__)
767
768                 # The Python "rational version numbering" convention
769                 # disallows "-r$REV" but allows ".post$REV"
770                 # instead. Eventually we'll probably move to
771                 # that. When we do, this test won't go red:
772                 ix = verstr.rfind('-r')
773                 if ix != -1:
774                     altverstr = verstr[:ix] + '.post' + verstr[ix+2:]
775                 else:
776                     ix = verstr.rfind('.post')
777                     if ix != -1:
778                         altverstr = verstr[:ix] + '-r' + verstr[ix+5:]
779                     else:
780                         altverstr = verstr
781
782                 appverstr = "%s: %s" % (allmydata.__appname__, verstr)
783                 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
784
785                 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
786                 self.failUnless("Announcement Summary: storage: 5" in res)
787                 self.failUnless("Subscription Summary: storage: 5" in res)
788                 self.failUnless("tahoe.css" in res)
789             except unittest.FailTest:
790                 print
791                 print "GET %s output was:" % self.introweb_url
792                 print res
793                 raise
794         d.addCallback(_check)
795         # make sure it serves the CSS too
796         d.addCallback(lambda res:
797                       getPage(self.introweb_url+"tahoe.css", method="GET"))
798         d.addCallback(lambda res:
799                       getPage(self.introweb_url + "?t=json",
800                               method="GET", followRedirect=True))
801         def _check_json(res):
802             data = simplejson.loads(res)
803             try:
804                 self.failUnlessEqual(data["subscription_summary"],
805                                      {"storage": 5})
806                 self.failUnlessEqual(data["announcement_summary"],
807                                      {"storage": 5})
808                 self.failUnlessEqual(data["announcement_distinct_hosts"],
809                                      {"storage": 1})
810             except unittest.FailTest:
811                 print
812                 print "GET %s?t=json output was:" % self.introweb_url
813                 print res
814                 raise
815         d.addCallback(_check_json)
816         return d
817
818     def _do_publish1(self, res):
819         ut = upload.Data(self.data, convergence=None)
820         c0 = self.clients[0]
821         d = c0.create_dirnode()
822         def _made_root(new_dirnode):
823             self._root_directory_uri = new_dirnode.get_uri()
824             return c0.create_node_from_uri(self._root_directory_uri)
825         d.addCallback(_made_root)
826         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
827         def _made_subdir1(subdir1_node):
828             self._subdir1_node = subdir1_node
829             d1 = subdir1_node.add_file(u"mydata567", ut)
830             d1.addCallback(self.log, "publish finished")
831             def _stash_uri(filenode):
832                 self.uri = filenode.get_uri()
833                 assert isinstance(self.uri, str), (self.uri, filenode)
834             d1.addCallback(_stash_uri)
835             return d1
836         d.addCallback(_made_subdir1)
837         return d
838
839     def _do_publish2(self, res):
840         ut = upload.Data(self.data, convergence=None)
841         d = self._subdir1_node.create_subdirectory(u"subdir2")
842         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
843         return d
844
845     def log(self, res, *args, **kwargs):
846         # print "MSG: %s  RES: %s" % (msg, args)
847         log.msg(*args, **kwargs)
848         return res
849
850     def _do_publish_private(self, res):
851         self.smalldata = "sssh, very secret stuff"
852         ut = upload.Data(self.smalldata, convergence=None)
853         d = self.clients[0].create_dirnode()
854         d.addCallback(self.log, "GOT private directory")
855         def _got_new_dir(privnode):
856             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
857             d1 = privnode.create_subdirectory(u"personal")
858             d1.addCallback(self.log, "made P/personal")
859             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
860             d1.addCallback(self.log, "made P/personal/sekrit data")
861             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
862             def _got_s2(s2node):
863                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
864                                       s2node.get_readonly_uri())
865                 d2.addCallback(lambda node:
866                                privnode.set_uri(u"s2-ro",
867                                                 s2node.get_readonly_uri(),
868                                                 s2node.get_readonly_uri()))
869                 return d2
870             d1.addCallback(_got_s2)
871             d1.addCallback(lambda res: privnode)
872             return d1
873         d.addCallback(_got_new_dir)
874         return d
875
876     def _check_publish1(self, res):
877         # this one uses the iterative API
878         c1 = self.clients[1]
879         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
880         d.addCallback(self.log, "check_publish1 got /")
881         d.addCallback(lambda root: root.get(u"subdir1"))
882         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
883         d.addCallback(lambda filenode: download_to_data(filenode))
884         d.addCallback(self.log, "get finished")
885         def _get_done(data):
886             self.failUnlessEqual(data, self.data)
887         d.addCallback(_get_done)
888         return d
889
890     def _check_publish2(self, res):
891         # this one uses the path-based API
892         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
893         d = rootnode.get_child_at_path(u"subdir1")
894         d.addCallback(lambda dirnode:
895                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
896         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
897         d.addCallback(lambda filenode: download_to_data(filenode))
898         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
899
900         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
901         def _got_filenode(filenode):
902             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
903             assert fnode == filenode
904         d.addCallback(_got_filenode)
905         return d
906
907     def _check_publish_private(self, resnode):
908         # this one uses the path-based API
909         self._private_node = resnode
910
911         d = self._private_node.get_child_at_path(u"personal")
912         def _got_personal(personal):
913             self._personal_node = personal
914             return personal
915         d.addCallback(_got_personal)
916
917         d.addCallback(lambda dirnode:
918                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
919         def get_path(path):
920             return self._private_node.get_child_at_path(path)
921
922         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
923         d.addCallback(lambda filenode: download_to_data(filenode))
924         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
925         d.addCallback(lambda res: get_path(u"s2-rw"))
926         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
927         d.addCallback(lambda res: get_path(u"s2-ro"))
928         def _got_s2ro(dirnode):
929             self.failUnless(dirnode.is_mutable(), dirnode)
930             self.failUnless(dirnode.is_readonly(), dirnode)
931             d1 = defer.succeed(None)
932             d1.addCallback(lambda res: dirnode.list())
933             d1.addCallback(self.log, "dirnode.list")
934
935             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
936
937             d1.addCallback(self.log, "doing add_file(ro)")
938             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
939             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
940
941             d1.addCallback(self.log, "doing get(ro)")
942             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
943             d1.addCallback(lambda filenode:
944                            self.failUnless(IFileNode.providedBy(filenode)))
945
946             d1.addCallback(self.log, "doing delete(ro)")
947             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
948
949             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
950
951             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
952
953             personal = self._personal_node
954             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
955
956             d1.addCallback(self.log, "doing move_child_to(ro)2")
957             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
958
959             d1.addCallback(self.log, "finished with _got_s2ro")
960             return d1
961         d.addCallback(_got_s2ro)
962         def _got_home(dummy):
963             home = self._private_node
964             personal = self._personal_node
965             d1 = defer.succeed(None)
966             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
967             d1.addCallback(lambda res:
968                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
969
970             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
971             d1.addCallback(lambda res:
972                            home.move_child_to(u"sekrit", home, u"sekrit data"))
973
974             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
975             d1.addCallback(lambda res:
976                            home.move_child_to(u"sekrit data", personal))
977
978             d1.addCallback(lambda res: home.build_manifest().when_done())
979             d1.addCallback(self.log, "manifest")
980             #  five items:
981             # P/
982             # P/personal/
983             # P/personal/sekrit data
984             # P/s2-rw  (same as P/s2-ro)
985             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
986             d1.addCallback(lambda res:
987                            self.failUnlessEqual(len(res["manifest"]), 5))
988             d1.addCallback(lambda res: home.start_deep_stats().when_done())
989             def _check_stats(stats):
990                 expected = {"count-immutable-files": 1,
991                             "count-mutable-files": 0,
992                             "count-literal-files": 1,
993                             "count-files": 2,
994                             "count-directories": 3,
995                             "size-immutable-files": 112,
996                             "size-literal-files": 23,
997                             #"size-directories": 616, # varies
998                             #"largest-directory": 616,
999                             "largest-directory-children": 3,
1000                             "largest-immutable-file": 112,
1001                             }
1002                 for k,v in expected.iteritems():
1003                     self.failUnlessEqual(stats[k], v,
1004                                          "stats[%s] was %s, not %s" %
1005                                          (k, stats[k], v))
1006                 self.failUnless(stats["size-directories"] > 1300,
1007                                 stats["size-directories"])
1008                 self.failUnless(stats["largest-directory"] > 800,
1009                                 stats["largest-directory"])
1010                 self.failUnlessEqual(stats["size-files-histogram"],
1011                                      [ (11, 31, 1), (101, 316, 1) ])
1012             d1.addCallback(_check_stats)
1013             return d1
1014         d.addCallback(_got_home)
1015         return d
1016
1017     def shouldFail(self, res, expected_failure, which, substring=None):
1018         if isinstance(res, Failure):
1019             res.trap(expected_failure)
1020             if substring:
1021                 self.failUnless(substring in str(res),
1022                                 "substring '%s' not in '%s'"
1023                                 % (substring, str(res)))
1024         else:
1025             self.fail("%s was supposed to raise %s, not get '%s'" %
1026                       (which, expected_failure, res))
1027
1028     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1029         assert substring is None or isinstance(substring, str)
1030         d = defer.maybeDeferred(callable, *args, **kwargs)
1031         def done(res):
1032             if isinstance(res, Failure):
1033                 res.trap(expected_failure)
1034                 if substring:
1035                     self.failUnless(substring in str(res),
1036                                     "substring '%s' not in '%s'"
1037                                     % (substring, str(res)))
1038             else:
1039                 self.fail("%s was supposed to raise %s, not get '%s'" %
1040                           (which, expected_failure, res))
1041         d.addBoth(done)
1042         return d
1043
1044     def PUT(self, urlpath, data):
1045         url = self.webish_url + urlpath
1046         return getPage(url, method="PUT", postdata=data)
1047
1048     def GET(self, urlpath, followRedirect=False):
1049         url = self.webish_url + urlpath
1050         return getPage(url, method="GET", followRedirect=followRedirect)
1051
1052     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1053         sepbase = "boogabooga"
1054         sep = "--" + sepbase
1055         form = []
1056         form.append(sep)
1057         form.append('Content-Disposition: form-data; name="_charset"')
1058         form.append('')
1059         form.append('UTF-8')
1060         form.append(sep)
1061         for name, value in fields.iteritems():
1062             if isinstance(value, tuple):
1063                 filename, value = value
1064                 form.append('Content-Disposition: form-data; name="%s"; '
1065                             'filename="%s"' % (name, filename.encode("utf-8")))
1066             else:
1067                 form.append('Content-Disposition: form-data; name="%s"' % name)
1068             form.append('')
1069             form.append(str(value))
1070             form.append(sep)
1071         form[-1] += "--"
1072         body = ""
1073         headers = {}
1074         if fields:
1075             body = "\r\n".join(form) + "\r\n"
1076             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1077         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1078
1079     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1080               use_helper=False):
1081         if use_helper:
1082             url = self.helper_webish_url + urlpath
1083         else:
1084             url = self.webish_url + urlpath
1085         return getPage(url, method="POST", postdata=body, headers=headers,
1086                        followRedirect=followRedirect)
1087
1088     def _test_web(self, res):
1089         base = self.webish_url
1090         public = "uri/" + self._root_directory_uri
1091         d = getPage(base)
1092         def _got_welcome(page):
1093             # XXX This test is oversensitive to formatting
1094             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1095             self.failUnless(expected in page,
1096                             "I didn't see the right 'connected storage servers'"
1097                             " message in: %s" % page
1098                             )
1099             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1100             self.failUnless(expected in page,
1101                             "I didn't see the right 'My nodeid' message "
1102                             "in: %s" % page)
1103             self.failUnless("Helper: 0 active uploads" in page)
1104         d.addCallback(_got_welcome)
1105         d.addCallback(self.log, "done with _got_welcome")
1106
1107         # get the welcome page from the node that uses the helper too
1108         d.addCallback(lambda res: getPage(self.helper_webish_url))
1109         def _got_welcome_helper(page):
1110             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1111                             page)
1112             self.failUnless("Not running helper" in page)
1113         d.addCallback(_got_welcome_helper)
1114
1115         d.addCallback(lambda res: getPage(base + public))
1116         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1117         def _got_subdir1(page):
1118             # there ought to be an href for our file
1119             self.failUnlessIn('<td align="right">%d</td>' % len(self.data), page)
1120             self.failUnless(">mydata567</a>" in page)
1121         d.addCallback(_got_subdir1)
1122         d.addCallback(self.log, "done with _got_subdir1")
1123         d.addCallback(lambda res:
1124                       getPage(base + public + "/subdir1/mydata567"))
1125         def _got_data(page):
1126             self.failUnlessEqual(page, self.data)
1127         d.addCallback(_got_data)
1128
1129         # download from a URI embedded in a URL
1130         d.addCallback(self.log, "_get_from_uri")
1131         def _get_from_uri(res):
1132             return getPage(base + "uri/%s?filename=%s"
1133                            % (self.uri, "mydata567"))
1134         d.addCallback(_get_from_uri)
1135         def _got_from_uri(page):
1136             self.failUnlessEqual(page, self.data)
1137         d.addCallback(_got_from_uri)
1138
1139         # download from a URI embedded in a URL, second form
1140         d.addCallback(self.log, "_get_from_uri2")
1141         def _get_from_uri2(res):
1142             return getPage(base + "uri?uri=%s" % (self.uri,))
1143         d.addCallback(_get_from_uri2)
1144         d.addCallback(_got_from_uri)
1145
1146         # download from a bogus URI, make sure we get a reasonable error
1147         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1148         def _get_from_bogus_uri(res):
1149             d1 = getPage(base + "uri/%s?filename=%s"
1150                          % (self.mangle_uri(self.uri), "mydata567"))
1151             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1152                        "410")
1153             return d1
1154         d.addCallback(_get_from_bogus_uri)
1155         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1156
1157         # upload a file with PUT
1158         d.addCallback(self.log, "about to try PUT")
1159         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1160                                            "new.txt contents"))
1161         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1162         d.addCallback(self.failUnlessEqual, "new.txt contents")
1163         # and again with something large enough to use multiple segments,
1164         # and hopefully trigger pauseProducing too
1165         def _new_happy_semantics(ign):
1166             for c in self.clients:
1167                 # these get reset somewhere? Whatever.
1168                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1169         d.addCallback(_new_happy_semantics)
1170         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1171                                            "big" * 500000)) # 1.5MB
1172         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1173         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1174
1175         # can we replace files in place?
1176         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1177                                            "NEWER contents"))
1178         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1179         d.addCallback(self.failUnlessEqual, "NEWER contents")
1180
1181         # test unlinked POST
1182         d.addCallback(lambda res: self.POST("uri", t="upload",
1183                                             file=("new.txt", "data" * 10000)))
1184         # and again using the helper, which exercises different upload-status
1185         # display code
1186         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1187                                             file=("foo.txt", "data2" * 10000)))
1188
1189         # check that the status page exists
1190         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1191         def _got_status(res):
1192             # find an interesting upload and download to look at. LIT files
1193             # are not interesting.
1194             h = self.clients[0].get_history()
1195             for ds in h.list_all_download_statuses():
1196                 if ds.get_size() > 200:
1197                     self._down_status = ds.get_counter()
1198             for us in h.list_all_upload_statuses():
1199                 if us.get_size() > 200:
1200                     self._up_status = us.get_counter()
1201             rs = list(h.list_all_retrieve_statuses())[0]
1202             self._retrieve_status = rs.get_counter()
1203             ps = list(h.list_all_publish_statuses())[0]
1204             self._publish_status = ps.get_counter()
1205             us = list(h.list_all_mapupdate_statuses())[0]
1206             self._update_status = us.get_counter()
1207
1208             # and that there are some upload- and download- status pages
1209             return self.GET("status/up-%d" % self._up_status)
1210         d.addCallback(_got_status)
1211         def _got_up(res):
1212             return self.GET("status/down-%d" % self._down_status)
1213         d.addCallback(_got_up)
1214         def _got_down(res):
1215             return self.GET("status/mapupdate-%d" % self._update_status)
1216         d.addCallback(_got_down)
1217         def _got_update(res):
1218             return self.GET("status/publish-%d" % self._publish_status)
1219         d.addCallback(_got_update)
1220         def _got_publish(res):
1221             self.failUnlessIn("Publish Results", res)
1222             return self.GET("status/retrieve-%d" % self._retrieve_status)
1223         d.addCallback(_got_publish)
1224         def _got_retrieve(res):
1225             self.failUnlessIn("Retrieve Results", res)
1226         d.addCallback(_got_retrieve)
1227
1228         # check that the helper status page exists
1229         d.addCallback(lambda res:
1230                       self.GET("helper_status", followRedirect=True))
1231         def _got_helper_status(res):
1232             self.failUnless("Bytes Fetched:" in res)
1233             # touch a couple of files in the helper's working directory to
1234             # exercise more code paths
1235             workdir = os.path.join(self.getdir("client0"), "helper")
1236             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1237             f = open(incfile, "wb")
1238             f.write("small file")
1239             f.close()
1240             then = time.time() - 86400*3
1241             now = time.time()
1242             os.utime(incfile, (now, then))
1243             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1244             f = open(encfile, "wb")
1245             f.write("less small file")
1246             f.close()
1247             os.utime(encfile, (now, then))
1248         d.addCallback(_got_helper_status)
1249         # and that the json form exists
1250         d.addCallback(lambda res:
1251                       self.GET("helper_status?t=json", followRedirect=True))
1252         def _got_helper_status_json(res):
1253             data = simplejson.loads(res)
1254             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1255                                  1)
1256             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1257             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1258             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1259                                  10)
1260             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1261             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1262             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1263                                  15)
1264         d.addCallback(_got_helper_status_json)
1265
1266         # and check that client[3] (which uses a helper but does not run one
1267         # itself) doesn't explode when you ask for its status
1268         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1269         def _got_non_helper_status(res):
1270             self.failUnless("Upload and Download Status" in res)
1271         d.addCallback(_got_non_helper_status)
1272
1273         # or for helper status with t=json
1274         d.addCallback(lambda res:
1275                       getPage(self.helper_webish_url + "helper_status?t=json"))
1276         def _got_non_helper_status_json(res):
1277             data = simplejson.loads(res)
1278             self.failUnlessEqual(data, {})
1279         d.addCallback(_got_non_helper_status_json)
1280
1281         # see if the statistics page exists
1282         d.addCallback(lambda res: self.GET("statistics"))
1283         def _got_stats(res):
1284             self.failUnless("Node Statistics" in res)
1285             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1286         d.addCallback(_got_stats)
1287         d.addCallback(lambda res: self.GET("statistics?t=json"))
1288         def _got_stats_json(res):
1289             data = simplejson.loads(res)
1290             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1291             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1292         d.addCallback(_got_stats_json)
1293
1294         # TODO: mangle the second segment of a file, to test errors that
1295         # occur after we've already sent some good data, which uses a
1296         # different error path.
1297
1298         # TODO: download a URI with a form
1299         # TODO: create a directory by using a form
1300         # TODO: upload by using a form on the directory page
1301         #    url = base + "somedir/subdir1/freeform_post!!upload"
1302         # TODO: delete a file by using a button on the directory page
1303
1304         return d
1305
1306     def _test_runner(self, res):
1307         # exercise some of the diagnostic tools in runner.py
1308
1309         # find a share
1310         for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1311             if "storage" not in dirpath:
1312                 continue
1313             if not filenames:
1314                 continue
1315             pieces = dirpath.split(os.sep)
1316             if (len(pieces) >= 4
1317                 and pieces[-4] == "storage"
1318                 and pieces[-3] == "shares"):
1319                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1320                 # are sharefiles here
1321                 filename = os.path.join(dirpath, filenames[0])
1322                 # peek at the magic to see if it is a chk share
1323                 magic = open(filename, "rb").read(4)
1324                 if magic == '\x00\x00\x00\x01':
1325                     break
1326         else:
1327             self.fail("unable to find any uri_extension files in %r"
1328                       % self.basedir)
1329         log.msg("test_system.SystemTest._test_runner using %r" % filename)
1330
1331         out,err = StringIO(), StringIO()
1332         rc = runner.runner(["debug", "dump-share", "--offsets",
1333                             unicode_to_argv(filename)],
1334                            stdout=out, stderr=err)
1335         output = out.getvalue()
1336         self.failUnlessEqual(rc, 0)
1337
1338         # we only upload a single file, so we can assert some things about
1339         # its size and shares.
1340         self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1341         self.failUnlessIn("size: %d\n" % len(self.data), output)
1342         self.failUnlessIn("num_segments: 1\n", output)
1343         # segment_size is always a multiple of needed_shares
1344         self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1345         self.failUnlessIn("total_shares: 10\n", output)
1346         # keys which are supposed to be present
1347         for key in ("size", "num_segments", "segment_size",
1348                     "needed_shares", "total_shares",
1349                     "codec_name", "codec_params", "tail_codec_params",
1350                     #"plaintext_hash", "plaintext_root_hash",
1351                     "crypttext_hash", "crypttext_root_hash",
1352                     "share_root_hash", "UEB_hash"):
1353             self.failUnlessIn("%s: " % key, output)
1354         self.failUnlessIn("  verify-cap: URI:CHK-Verifier:", output)
1355
1356         # now use its storage index to find the other shares using the
1357         # 'find-shares' tool
1358         sharedir, shnum = os.path.split(filename)
1359         storagedir, storage_index_s = os.path.split(sharedir)
1360         storage_index_s = str(storage_index_s)
1361         out,err = StringIO(), StringIO()
1362         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1363         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1364         rc = runner.runner(cmd, stdout=out, stderr=err)
1365         self.failUnlessEqual(rc, 0)
1366         out.seek(0)
1367         sharefiles = [sfn.strip() for sfn in out.readlines()]
1368         self.failUnlessEqual(len(sharefiles), 10)
1369
1370         # also exercise the 'catalog-shares' tool
1371         out,err = StringIO(), StringIO()
1372         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1373         cmd = ["debug", "catalog-shares"] + nodedirs
1374         rc = runner.runner(cmd, stdout=out, stderr=err)
1375         self.failUnlessEqual(rc, 0)
1376         out.seek(0)
1377         descriptions = [sfn.strip() for sfn in out.readlines()]
1378         self.failUnlessEqual(len(descriptions), 30)
1379         matching = [line
1380                     for line in descriptions
1381                     if line.startswith("CHK %s " % storage_index_s)]
1382         self.failUnlessEqual(len(matching), 10)
1383
1384     def _test_control(self, res):
1385         # exercise the remote-control-the-client foolscap interfaces in
1386         # allmydata.control (mostly used for performance tests)
1387         c0 = self.clients[0]
1388         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1389         control_furl = open(control_furl_file, "r").read().strip()
1390         # it doesn't really matter which Tub we use to connect to the client,
1391         # so let's just use our IntroducerNode's
1392         d = self.introducer.tub.getReference(control_furl)
1393         d.addCallback(self._test_control2, control_furl_file)
1394         return d
1395     def _test_control2(self, rref, filename):
1396         d = rref.callRemote("upload_from_file_to_uri",
1397                             filename.encode(get_filesystem_encoding()), convergence=None)
1398         downfile = os.path.join(self.basedir, "control.downfile").encode(get_filesystem_encoding())
1399         d.addCallback(lambda uri:
1400                       rref.callRemote("download_from_uri_to_file",
1401                                       uri, downfile))
1402         def _check(res):
1403             self.failUnlessEqual(res, downfile)
1404             data = open(downfile, "r").read()
1405             expected_data = open(filename, "r").read()
1406             self.failUnlessEqual(data, expected_data)
1407         d.addCallback(_check)
1408         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1409         if sys.platform in ("linux2", "linux3"):
1410             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1411         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1412         return d
1413
1414     def _test_cli(self, res):
1415         # run various CLI commands (in a thread, since they use blocking
1416         # network calls)
1417
1418         private_uri = self._private_node.get_uri()
1419         client0_basedir = self.getdir("client0")
1420
1421         nodeargs = [
1422             "--node-directory", client0_basedir,
1423             ]
1424
1425         d = defer.succeed(None)
1426
1427         # for compatibility with earlier versions, private/root_dir.cap is
1428         # supposed to be treated as an alias named "tahoe:". Start by making
1429         # sure that works, before we add other aliases.
1430
1431         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1432         f = open(root_file, "w")
1433         f.write(private_uri)
1434         f.close()
1435
1436         def run(ignored, verb, *args, **kwargs):
1437             stdin = kwargs.get("stdin", "")
1438             newargs = [verb] + nodeargs + list(args)
1439             return self._run_cli(newargs, stdin=stdin)
1440
1441         def _check_ls((out,err), expected_children, unexpected_children=[]):
1442             self.failUnlessEqual(err, "")
1443             for s in expected_children:
1444                 self.failUnless(s in out, (s,out))
1445             for s in unexpected_children:
1446                 self.failIf(s in out, (s,out))
1447
1448         def _check_ls_root((out,err)):
1449             self.failUnless("personal" in out)
1450             self.failUnless("s2-ro" in out)
1451             self.failUnless("s2-rw" in out)
1452             self.failUnlessEqual(err, "")
1453
1454         # this should reference private_uri
1455         d.addCallback(run, "ls")
1456         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1457
1458         d.addCallback(run, "list-aliases")
1459         def _check_aliases_1((out,err)):
1460             self.failUnlessEqual(err, "")
1461             self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1462         d.addCallback(_check_aliases_1)
1463
1464         # now that that's out of the way, remove root_dir.cap and work with
1465         # new files
1466         d.addCallback(lambda res: os.unlink(root_file))
1467         d.addCallback(run, "list-aliases")
1468         def _check_aliases_2((out,err)):
1469             self.failUnlessEqual(err, "")
1470             self.failUnlessEqual(out, "")
1471         d.addCallback(_check_aliases_2)
1472
1473         d.addCallback(run, "mkdir")
1474         def _got_dir( (out,err) ):
1475             self.failUnless(uri.from_string_dirnode(out.strip()))
1476             return out.strip()
1477         d.addCallback(_got_dir)
1478         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1479
1480         d.addCallback(run, "list-aliases")
1481         def _check_aliases_3((out,err)):
1482             self.failUnlessEqual(err, "")
1483             self.failUnless("tahoe: " in out)
1484         d.addCallback(_check_aliases_3)
1485
1486         def _check_empty_dir((out,err)):
1487             self.failUnlessEqual(out, "")
1488             self.failUnlessEqual(err, "")
1489         d.addCallback(run, "ls")
1490         d.addCallback(_check_empty_dir)
1491
1492         def _check_missing_dir((out,err)):
1493             # TODO: check that rc==2
1494             self.failUnlessEqual(out, "")
1495             self.failUnlessEqual(err, "No such file or directory\n")
1496         d.addCallback(run, "ls", "bogus")
1497         d.addCallback(_check_missing_dir)
1498
1499         files = []
1500         datas = []
1501         for i in range(10):
1502             fn = os.path.join(self.basedir, "file%d" % i)
1503             files.append(fn)
1504             data = "data to be uploaded: file%d\n" % i
1505             datas.append(data)
1506             open(fn,"wb").write(data)
1507
1508         def _check_stdout_against((out,err), filenum=None, data=None):
1509             self.failUnlessEqual(err, "")
1510             if filenum is not None:
1511                 self.failUnlessEqual(out, datas[filenum])
1512             if data is not None:
1513                 self.failUnlessEqual(out, data)
1514
1515         # test all both forms of put: from a file, and from stdin
1516         #  tahoe put bar FOO
1517         d.addCallback(run, "put", files[0], "tahoe-file0")
1518         def _put_out((out,err)):
1519             self.failUnless("URI:LIT:" in out, out)
1520             self.failUnless("201 Created" in err, err)
1521             uri0 = out.strip()
1522             return run(None, "get", uri0)
1523         d.addCallback(_put_out)
1524         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1525
1526         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1527         #  tahoe put bar tahoe:FOO
1528         d.addCallback(run, "put", files[2], "tahoe:file2")
1529         d.addCallback(run, "put", "--format=SDMF", files[3], "tahoe:file3")
1530         def _check_put_mutable((out,err)):
1531             self._mutable_file3_uri = out.strip()
1532         d.addCallback(_check_put_mutable)
1533         d.addCallback(run, "get", "tahoe:file3")
1534         d.addCallback(_check_stdout_against, 3)
1535
1536         #  tahoe put FOO
1537         STDIN_DATA = "This is the file to upload from stdin."
1538         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1539         #  tahoe put tahoe:FOO
1540         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1541                       stdin="Other file from stdin.")
1542
1543         d.addCallback(run, "ls")
1544         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1545                                   "tahoe-file-stdin", "from-stdin"])
1546         d.addCallback(run, "ls", "subdir")
1547         d.addCallback(_check_ls, ["tahoe-file1"])
1548
1549         # tahoe mkdir FOO
1550         d.addCallback(run, "mkdir", "subdir2")
1551         d.addCallback(run, "ls")
1552         # TODO: extract the URI, set an alias with it
1553         d.addCallback(_check_ls, ["subdir2"])
1554
1555         # tahoe get: (to stdin and to a file)
1556         d.addCallback(run, "get", "tahoe-file0")
1557         d.addCallback(_check_stdout_against, 0)
1558         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1559         d.addCallback(_check_stdout_against, 1)
1560         outfile0 = os.path.join(self.basedir, "outfile0")
1561         d.addCallback(run, "get", "file2", outfile0)
1562         def _check_outfile0((out,err)):
1563             data = open(outfile0,"rb").read()
1564             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1565         d.addCallback(_check_outfile0)
1566         outfile1 = os.path.join(self.basedir, "outfile0")
1567         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1568         def _check_outfile1((out,err)):
1569             data = open(outfile1,"rb").read()
1570             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1571         d.addCallback(_check_outfile1)
1572
1573         d.addCallback(run, "rm", "tahoe-file0")
1574         d.addCallback(run, "rm", "tahoe:file2")
1575         d.addCallback(run, "ls")
1576         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1577
1578         d.addCallback(run, "ls", "-l")
1579         def _check_ls_l((out,err)):
1580             lines = out.split("\n")
1581             for l in lines:
1582                 if "tahoe-file-stdin" in l:
1583                     self.failUnless(l.startswith("-r-- "), l)
1584                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1585                 if "file3" in l:
1586                     self.failUnless(l.startswith("-rw- "), l) # mutable
1587         d.addCallback(_check_ls_l)
1588
1589         d.addCallback(run, "ls", "--uri")
1590         def _check_ls_uri((out,err)):
1591             lines = out.split("\n")
1592             for l in lines:
1593                 if "file3" in l:
1594                     self.failUnless(self._mutable_file3_uri in l)
1595         d.addCallback(_check_ls_uri)
1596
1597         d.addCallback(run, "ls", "--readonly-uri")
1598         def _check_ls_rouri((out,err)):
1599             lines = out.split("\n")
1600             for l in lines:
1601                 if "file3" in l:
1602                     rw_uri = self._mutable_file3_uri
1603                     u = uri.from_string_mutable_filenode(rw_uri)
1604                     ro_uri = u.get_readonly().to_string()
1605                     self.failUnless(ro_uri in l)
1606         d.addCallback(_check_ls_rouri)
1607
1608
1609         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1610         d.addCallback(run, "ls")
1611         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1612
1613         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1614         d.addCallback(run, "ls")
1615         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1616
1617         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1618         d.addCallback(run, "ls")
1619         d.addCallback(_check_ls, ["file3", "file3-copy"])
1620         d.addCallback(run, "get", "tahoe:file3-copy")
1621         d.addCallback(_check_stdout_against, 3)
1622
1623         # copy from disk into tahoe
1624         d.addCallback(run, "cp", files[4], "tahoe:file4")
1625         d.addCallback(run, "ls")
1626         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1627         d.addCallback(run, "get", "tahoe:file4")
1628         d.addCallback(_check_stdout_against, 4)
1629
1630         # copy from tahoe into disk
1631         target_filename = os.path.join(self.basedir, "file-out")
1632         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1633         def _check_cp_out((out,err)):
1634             self.failUnless(os.path.exists(target_filename))
1635             got = open(target_filename,"rb").read()
1636             self.failUnlessEqual(got, datas[4])
1637         d.addCallback(_check_cp_out)
1638
1639         # copy from disk to disk (silly case)
1640         target2_filename = os.path.join(self.basedir, "file-out-copy")
1641         d.addCallback(run, "cp", target_filename, target2_filename)
1642         def _check_cp_out2((out,err)):
1643             self.failUnless(os.path.exists(target2_filename))
1644             got = open(target2_filename,"rb").read()
1645             self.failUnlessEqual(got, datas[4])
1646         d.addCallback(_check_cp_out2)
1647
1648         # copy from tahoe into disk, overwriting an existing file
1649         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1650         def _check_cp_out3((out,err)):
1651             self.failUnless(os.path.exists(target_filename))
1652             got = open(target_filename,"rb").read()
1653             self.failUnlessEqual(got, datas[3])
1654         d.addCallback(_check_cp_out3)
1655
1656         # copy from disk into tahoe, overwriting an existing immutable file
1657         d.addCallback(run, "cp", files[5], "tahoe:file4")
1658         d.addCallback(run, "ls")
1659         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1660         d.addCallback(run, "get", "tahoe:file4")
1661         d.addCallback(_check_stdout_against, 5)
1662
1663         # copy from disk into tahoe, overwriting an existing mutable file
1664         d.addCallback(run, "cp", files[5], "tahoe:file3")
1665         d.addCallback(run, "ls")
1666         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1667         d.addCallback(run, "get", "tahoe:file3")
1668         d.addCallback(_check_stdout_against, 5)
1669
1670         # recursive copy: setup
1671         dn = os.path.join(self.basedir, "dir1")
1672         os.makedirs(dn)
1673         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1674         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1675         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1676         sdn2 = os.path.join(dn, "subdir2")
1677         os.makedirs(sdn2)
1678         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1679         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1680
1681         # from disk into tahoe
1682         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1683         d.addCallback(run, "ls")
1684         d.addCallback(_check_ls, ["dir1"])
1685         d.addCallback(run, "ls", "dir1")
1686         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1687                       ["rfile4", "rfile5"])
1688         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1689         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1690                       ["rfile1", "rfile2", "rfile3"])
1691         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1692         d.addCallback(_check_stdout_against, data="rfile4")
1693
1694         # and back out again
1695         dn_copy = os.path.join(self.basedir, "dir1-copy")
1696         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1697         def _check_cp_r_out((out,err)):
1698             def _cmp(name):
1699                 old = open(os.path.join(dn, name), "rb").read()
1700                 newfn = os.path.join(dn_copy, name)
1701                 self.failUnless(os.path.exists(newfn))
1702                 new = open(newfn, "rb").read()
1703                 self.failUnlessEqual(old, new)
1704             _cmp("rfile1")
1705             _cmp("rfile2")
1706             _cmp("rfile3")
1707             _cmp(os.path.join("subdir2", "rfile4"))
1708             _cmp(os.path.join("subdir2", "rfile5"))
1709         d.addCallback(_check_cp_r_out)
1710
1711         # and copy it a second time, which ought to overwrite the same files
1712         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1713
1714         # and again, only writing filecaps
1715         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1716         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1717         def _check_capsonly((out,err)):
1718             # these should all be LITs
1719             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1720             y = uri.from_string_filenode(x)
1721             self.failUnlessEqual(y.data, "rfile4")
1722         d.addCallback(_check_capsonly)
1723
1724         # and tahoe-to-tahoe
1725         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1726         d.addCallback(run, "ls")
1727         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1728         d.addCallback(run, "ls", "dir1-copy")
1729         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1730                       ["rfile4", "rfile5"])
1731         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1732         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1733                       ["rfile1", "rfile2", "rfile3"])
1734         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1735         d.addCallback(_check_stdout_against, data="rfile4")
1736
1737         # and copy it a second time, which ought to overwrite the same files
1738         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1739
1740         # tahoe_ls doesn't currently handle the error correctly: it tries to
1741         # JSON-parse a traceback.
1742 ##         def _ls_missing(res):
1743 ##             argv = ["ls"] + nodeargs + ["bogus"]
1744 ##             return self._run_cli(argv)
1745 ##         d.addCallback(_ls_missing)
1746 ##         def _check_ls_missing((out,err)):
1747 ##             print "OUT", out
1748 ##             print "ERR", err
1749 ##             self.failUnlessEqual(err, "")
1750 ##         d.addCallback(_check_ls_missing)
1751
1752         return d
1753
1754     def test_filesystem_with_cli_in_subprocess(self):
1755         # We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.
1756
1757         self.basedir = "system/SystemTest/test_filesystem_with_cli_in_subprocess"
1758         d = self.set_up_nodes()
1759         def _new_happy_semantics(ign):
1760             for c in self.clients:
1761                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1762         d.addCallback(_new_happy_semantics)
1763
1764         def _run_in_subprocess(ignored, verb, *args, **kwargs):
1765             stdin = kwargs.get("stdin")
1766             env = kwargs.get("env")
1767             newargs = [verb, "--node-directory", self.getdir("client0")] + list(args)
1768             return self.run_bintahoe(newargs, stdin=stdin, env=env)
1769
1770         def _check_succeeded(res, check_stderr=True):
1771             out, err, rc_or_sig = res
1772             self.failUnlessEqual(rc_or_sig, 0, str(res))
1773             if check_stderr:
1774                 self.failUnlessEqual(err, "")
1775
1776         d.addCallback(_run_in_subprocess, "create-alias", "newalias")
1777         d.addCallback(_check_succeeded)
1778
1779         STDIN_DATA = "This is the file to upload from stdin."
1780         d.addCallback(_run_in_subprocess, "put", "-", "newalias:tahoe-file", stdin=STDIN_DATA)
1781         d.addCallback(_check_succeeded, check_stderr=False)
1782
1783         def _mv_with_http_proxy(ign):
1784             env = os.environ
1785             env['http_proxy'] = env['HTTP_PROXY'] = "http://127.0.0.0:12345"  # invalid address
1786             return _run_in_subprocess(None, "mv", "newalias:tahoe-file", "newalias:tahoe-moved", env=env)
1787         d.addCallback(_mv_with_http_proxy)
1788         d.addCallback(_check_succeeded)
1789
1790         d.addCallback(_run_in_subprocess, "ls", "newalias:")
1791         def _check_ls(res):
1792             out, err, rc_or_sig = res
1793             self.failUnlessEqual(rc_or_sig, 0, str(res))
1794             self.failUnlessEqual(err, "", str(res))
1795             self.failUnlessIn("tahoe-moved", out)
1796             self.failIfIn("tahoe-file", out)
1797         d.addCallback(_check_ls)
1798         return d
1799
1800     def test_debug_trial(self):
1801         def _check_for_line(lines, result, test):
1802             for l in lines:
1803                 if result in l and test in l:
1804                     return
1805             self.fail("output (prefixed with '##') does not have a line containing both %r and %r:\n## %s"
1806                       % (result, test, "\n## ".join(lines)))
1807
1808         def _check_for_outcome(lines, out, outcome):
1809             self.failUnlessIn(outcome, out, "output (prefixed with '##') does not contain %r:\n## %s"
1810                                             % (outcome, "\n## ".join(lines)))
1811
1812         d = self.run_bintahoe(['debug', 'trial', '--reporter=verbose',
1813                                'allmydata.test.trialtest'])
1814         def _check_failure( (out, err, rc) ):
1815             self.failUnlessEqual(rc, 1)
1816             lines = out.split('\n')
1817             _check_for_line(lines, "[SKIPPED]", "test_skip")
1818             _check_for_line(lines, "[TODO]",    "test_todo")
1819             _check_for_line(lines, "[FAIL]",    "test_fail")
1820             _check_for_line(lines, "[ERROR]",   "test_deferred_error")
1821             _check_for_line(lines, "[ERROR]",   "test_error")
1822             _check_for_outcome(lines, out, "FAILED")
1823         d.addCallback(_check_failure)
1824
1825         # the --quiet argument regression-tests a problem in finding which arguments to pass to trial
1826         d.addCallback(lambda ign: self.run_bintahoe(['--quiet', 'debug', 'trial', '--reporter=verbose',
1827                                                      'allmydata.test.trialtest.Success']))
1828         def _check_success( (out, err, rc) ):
1829             self.failUnlessEqual(rc, 0)
1830             lines = out.split('\n')
1831             _check_for_line(lines, "[SKIPPED]", "test_skip")
1832             _check_for_line(lines, "[TODO]",    "test_todo")
1833             _check_for_outcome(lines, out, "PASSED")
1834         d.addCallback(_check_success)
1835         return d
1836
1837     def _run_cli(self, argv, stdin=""):
1838         #print "CLI:", argv
1839         stdout, stderr = StringIO(), StringIO()
1840         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1841                                   stdin=StringIO(stdin),
1842                                   stdout=stdout, stderr=stderr)
1843         def _done(res):
1844             return stdout.getvalue(), stderr.getvalue()
1845         d.addCallback(_done)
1846         return d
1847
1848     def _test_checker(self, res):
1849         ut = upload.Data("too big to be literal" * 200, convergence=None)
1850         d = self._personal_node.add_file(u"big file", ut)
1851
1852         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1853         def _check_dirnode_results(r):
1854             self.failUnless(r.is_healthy())
1855         d.addCallback(_check_dirnode_results)
1856         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1857         d.addCallback(_check_dirnode_results)
1858
1859         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1860         def _got_chk_filenode(n):
1861             self.failUnless(isinstance(n, ImmutableFileNode))
1862             d = n.check(Monitor())
1863             def _check_filenode_results(r):
1864                 self.failUnless(r.is_healthy())
1865             d.addCallback(_check_filenode_results)
1866             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1867             d.addCallback(_check_filenode_results)
1868             return d
1869         d.addCallback(_got_chk_filenode)
1870
1871         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1872         def _got_lit_filenode(n):
1873             self.failUnless(isinstance(n, LiteralFileNode))
1874             d = n.check(Monitor())
1875             def _check_lit_filenode_results(r):
1876                 self.failUnlessEqual(r, None)
1877             d.addCallback(_check_lit_filenode_results)
1878             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1879             d.addCallback(_check_lit_filenode_results)
1880             return d
1881         d.addCallback(_got_lit_filenode)
1882         return d