]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
immutable: extend the tests to check that the shares that got uploaded really do...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 import allmydata
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.util.consumer import MemoryConsumer, download_to_data
16 from allmydata.scripts import runner
17 from allmydata.interfaces import IDirectoryNode, IFileNode, \
18      NoSuchChildError, NoSharesError
19 from allmydata.monitor import Monitor
20 from allmydata.mutable.common import NotWriteableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap.api import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
26
27 from allmydata.test.common import SystemTestMixin
28
29 LARGE_DATA = """
30 This is some data to publish to the remote grid.., which needs to be large
31 enough to not fit inside a LIT uri.
32 """
33
34 class CountingDataUploadable(upload.Data):
35     bytes_read = 0
36     interrupt_after = None
37     interrupt_after_d = None
38
39     def read(self, length):
40         self.bytes_read += length
41         if self.interrupt_after is not None:
42             if self.bytes_read > self.interrupt_after:
43                 self.interrupt_after = None
44                 self.interrupt_after_d.callback(self)
45         return upload.Data.read(self, length)
46
47 class SystemTest(SystemTestMixin, unittest.TestCase):
48     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
49
50     def test_connections(self):
51         self.basedir = "system/SystemTest/test_connections"
52         d = self.set_up_nodes()
53         self.extra_node = None
54         d.addCallback(lambda res: self.add_extra_node(self.numclients))
55         def _check(extra_node):
56             self.extra_node = extra_node
57             for c in self.clients:
58                 all_peerids = c.get_storage_broker().get_all_serverids()
59                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
60                 sb = c.storage_broker
61                 permuted_peers = sb.get_servers_for_index("a")
62                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
63
64         d.addCallback(_check)
65         def _shutdown_extra_node(res):
66             if self.extra_node:
67                 return self.extra_node.stopService()
68             return res
69         d.addBoth(_shutdown_extra_node)
70         return d
71     # test_connections is subsumed by test_upload_and_download, and takes
72     # quite a while to run on a slow machine (because of all the TLS
73     # connections that must be established). If we ever rework the introducer
74     # code to such an extent that we're not sure if it works anymore, we can
75     # reinstate this test until it does.
76     del test_connections
77
78     def test_upload_and_download_random_key(self):
79         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
80         return self._test_upload_and_download(convergence=None)
81
82     def test_upload_and_download_convergent(self):
83         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
84         return self._test_upload_and_download(convergence="some convergence string")
85
86     def _test_upload_and_download(self, convergence):
87         # we use 4000 bytes of data, which will result in about 400k written
88         # to disk among all our simulated nodes
89         DATA = "Some data to upload\n" * 200
90         d = self.set_up_nodes()
91         def _check_connections(res):
92             for c in self.clients:
93                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
94                 all_peerids = c.get_storage_broker().get_all_serverids()
95                 self.failUnlessEqual(len(all_peerids), self.numclients)
96                 sb = c.storage_broker
97                 permuted_peers = sb.get_servers_for_index("a")
98                 self.failUnlessEqual(len(permuted_peers), self.numclients)
99         d.addCallback(_check_connections)
100
101         def _do_upload(res):
102             log.msg("UPLOADING")
103             u = self.clients[0].getServiceNamed("uploader")
104             self.uploader = u
105             # we crank the max segsize down to 1024b for the duration of this
106             # test, so we can exercise multiple segments. It is important
107             # that this is not a multiple of the segment size, so that the
108             # tail segment is not the same length as the others. This actualy
109             # gets rounded up to 1025 to be a multiple of the number of
110             # required shares (since we use 25 out of 100 FEC).
111             up = upload.Data(DATA, convergence=convergence)
112             up.max_segment_size = 1024
113             d1 = u.upload(up)
114             return d1
115         d.addCallback(_do_upload)
116         def _upload_done(results):
117             theuri = results.uri
118             log.msg("upload finished: uri is %s" % (theuri,))
119             self.uri = theuri
120             assert isinstance(self.uri, str), self.uri
121             self.cap = uri.from_string(self.uri)
122             self.n = self.clients[1].create_node_from_uri(self.uri)
123         d.addCallback(_upload_done)
124
125         def _upload_again(res):
126             # Upload again. If using convergent encryption then this ought to be
127             # short-circuited, however with the way we currently generate URIs
128             # (i.e. because they include the roothash), we have to do all of the
129             # encoding work, and only get to save on the upload part.
130             log.msg("UPLOADING AGAIN")
131             up = upload.Data(DATA, convergence=convergence)
132             up.max_segment_size = 1024
133             return self.uploader.upload(up)
134         d.addCallback(_upload_again)
135
136         def _download_to_data(res):
137             log.msg("DOWNLOADING")
138             return download_to_data(self.n)
139         d.addCallback(_download_to_data)
140         def _download_to_data_done(data):
141             log.msg("download finished")
142             self.failUnlessEqual(data, DATA)
143         d.addCallback(_download_to_data_done)
144
145         def _test_read(res):
146             n = self.clients[1].create_node_from_uri(self.uri)
147             d = download_to_data(n)
148             def _read_done(data):
149                 self.failUnlessEqual(data, DATA)
150             d.addCallback(_read_done)
151             d.addCallback(lambda ign:
152                           n.read(MemoryConsumer(), offset=1, size=4))
153             def _read_portion_done(mc):
154                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
155             d.addCallback(_read_portion_done)
156             d.addCallback(lambda ign:
157                           n.read(MemoryConsumer(), offset=2, size=None))
158             def _read_tail_done(mc):
159                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
160             d.addCallback(_read_tail_done)
161             d.addCallback(lambda ign:
162                           n.read(MemoryConsumer(), size=len(DATA)+1000))
163             def _read_too_much(mc):
164                 self.failUnlessEqual("".join(mc.chunks), DATA)
165             d.addCallback(_read_too_much)
166
167             return d
168         d.addCallback(_test_read)
169
170         def _test_bad_read(res):
171             bad_u = uri.from_string_filenode(self.uri)
172             bad_u.key = self.flip_bit(bad_u.key)
173             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
174             # this should cause an error during download
175
176             d = self.shouldFail2(NoSharesError, "'download bad node'",
177                                  None,
178                                  bad_n.read, MemoryConsumer(), offset=2)
179             return d
180         d.addCallback(_test_bad_read)
181
182         def _download_nonexistent_uri(res):
183             baduri = self.mangle_uri(self.uri)
184             badnode = self.clients[1].create_node_from_uri(baduri)
185             log.msg("about to download non-existent URI", level=log.UNUSUAL,
186                     facility="tahoe.tests")
187             d1 = download_to_data(badnode)
188             def _baduri_should_fail(res):
189                 log.msg("finished downloading non-existend URI",
190                         level=log.UNUSUAL, facility="tahoe.tests")
191                 self.failUnless(isinstance(res, Failure))
192                 self.failUnless(res.check(NoSharesError),
193                                 "expected NoSharesError, got %s" % res)
194             d1.addBoth(_baduri_should_fail)
195             return d1
196         d.addCallback(_download_nonexistent_uri)
197
198         # add a new node, which doesn't accept shares, and only uses the
199         # helper for upload.
200         d.addCallback(lambda res: self.add_extra_node(self.numclients,
201                                                       self.helper_furl,
202                                                       add_to_sparent=True))
203         def _added(extra_node):
204             self.extra_node = extra_node
205             self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
206         d.addCallback(_added)
207
208         HELPER_DATA = "Data that needs help to upload" * 1000
209         def _upload_with_helper(res):
210             u = upload.Data(HELPER_DATA, convergence=convergence)
211             d = self.extra_node.upload(u)
212             def _uploaded(results):
213                 n = self.clients[1].create_node_from_uri(results.uri)
214                 return download_to_data(n)
215             d.addCallback(_uploaded)
216             def _check(newdata):
217                 self.failUnlessEqual(newdata, HELPER_DATA)
218             d.addCallback(_check)
219             return d
220         d.addCallback(_upload_with_helper)
221
222         def _upload_duplicate_with_helper(res):
223             u = upload.Data(HELPER_DATA, convergence=convergence)
224             u.debug_stash_RemoteEncryptedUploadable = True
225             d = self.extra_node.upload(u)
226             def _uploaded(results):
227                 n = self.clients[1].create_node_from_uri(results.uri)
228                 return download_to_data(n)
229             d.addCallback(_uploaded)
230             def _check(newdata):
231                 self.failUnlessEqual(newdata, HELPER_DATA)
232                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
233                             "uploadable started uploading, should have been avoided")
234             d.addCallback(_check)
235             return d
236         if convergence is not None:
237             d.addCallback(_upload_duplicate_with_helper)
238
239         def _upload_resumable(res):
240             DATA = "Data that needs help to upload and gets interrupted" * 1000
241             u1 = CountingDataUploadable(DATA, convergence=convergence)
242             u2 = CountingDataUploadable(DATA, convergence=convergence)
243
244             # we interrupt the connection after about 5kB by shutting down
245             # the helper, then restartingit.
246             u1.interrupt_after = 5000
247             u1.interrupt_after_d = defer.Deferred()
248             u1.interrupt_after_d.addCallback(lambda res:
249                                              self.bounce_client(0))
250
251             # sneak into the helper and reduce its chunk size, so that our
252             # debug_interrupt will sever the connection on about the fifth
253             # chunk fetched. This makes sure that we've started to write the
254             # new shares before we abandon them, which exercises the
255             # abort/delete-partial-share code. TODO: find a cleaner way to do
256             # this. I know that this will affect later uses of the helper in
257             # this same test run, but I'm not currently worried about it.
258             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
259
260             d = self.extra_node.upload(u1)
261
262             def _should_not_finish(res):
263                 self.fail("interrupted upload should have failed, not finished"
264                           " with result %s" % (res,))
265             def _interrupted(f):
266                 f.trap(DeadReferenceError)
267
268                 # make sure we actually interrupted it before finishing the
269                 # file
270                 self.failUnless(u1.bytes_read < len(DATA),
271                                 "read %d out of %d total" % (u1.bytes_read,
272                                                              len(DATA)))
273
274                 log.msg("waiting for reconnect", level=log.NOISY,
275                         facility="tahoe.test.test_system")
276                 # now, we need to give the nodes a chance to notice that this
277                 # connection has gone away. When this happens, the storage
278                 # servers will be told to abort their uploads, removing the
279                 # partial shares. Unfortunately this involves TCP messages
280                 # going through the loopback interface, and we can't easily
281                 # predict how long that will take. If it were all local, we
282                 # could use fireEventually() to stall. Since we don't have
283                 # the right introduction hooks, the best we can do is use a
284                 # fixed delay. TODO: this is fragile.
285                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
286                 return u1.interrupt_after_d
287             d.addCallbacks(_should_not_finish, _interrupted)
288
289             def _disconnected(res):
290                 # check to make sure the storage servers aren't still hanging
291                 # on to the partial share: their incoming/ directories should
292                 # now be empty.
293                 log.msg("disconnected", level=log.NOISY,
294                         facility="tahoe.test.test_system")
295                 for i in range(self.numclients):
296                     incdir = os.path.join(self.getdir("client%d" % i),
297                                           "storage", "shares", "incoming")
298                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
299             d.addCallback(_disconnected)
300
301             # then we need to give the reconnector a chance to
302             # reestablish the connection to the helper.
303             d.addCallback(lambda res:
304                           log.msg("wait_for_connections", level=log.NOISY,
305                                   facility="tahoe.test.test_system"))
306             d.addCallback(lambda res: self.wait_for_connections())
307
308
309             d.addCallback(lambda res:
310                           log.msg("uploading again", level=log.NOISY,
311                                   facility="tahoe.test.test_system"))
312             d.addCallback(lambda res: self.extra_node.upload(u2))
313
314             def _uploaded(results):
315                 cap = results.uri
316                 log.msg("Second upload complete", level=log.NOISY,
317                         facility="tahoe.test.test_system")
318
319                 # this is really bytes received rather than sent, but it's
320                 # convenient and basically measures the same thing
321                 bytes_sent = results.ciphertext_fetched
322                 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
323
324                 # We currently don't support resumption of upload if the data is
325                 # encrypted with a random key.  (Because that would require us
326                 # to store the key locally and re-use it on the next upload of
327                 # this file, which isn't a bad thing to do, but we currently
328                 # don't do it.)
329                 if convergence is not None:
330                     # Make sure we did not have to read the whole file the
331                     # second time around .
332                     self.failUnless(bytes_sent < len(DATA),
333                                     "resumption didn't save us any work:"
334                                     " read %r bytes out of %r total" %
335                                     (bytes_sent, len(DATA)))
336                 else:
337                     # Make sure we did have to read the whole file the second
338                     # time around -- because the one that we partially uploaded
339                     # earlier was encrypted with a different random key.
340                     self.failIf(bytes_sent < len(DATA),
341                                 "resumption saved us some work even though we were using random keys:"
342                                 " read %r bytes out of %r total" %
343                                 (bytes_sent, len(DATA)))
344                 n = self.clients[1].create_node_from_uri(cap)
345                 return download_to_data(n)
346             d.addCallback(_uploaded)
347
348             def _check(newdata):
349                 self.failUnlessEqual(newdata, DATA)
350                 # If using convergent encryption, then also check that the
351                 # helper has removed the temp file from its directories.
352                 if convergence is not None:
353                     basedir = os.path.join(self.getdir("client0"), "helper")
354                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
355                     self.failUnlessEqual(files, [])
356                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
357                     self.failUnlessEqual(files, [])
358             d.addCallback(_check)
359             return d
360         d.addCallback(_upload_resumable)
361
362         def _grab_stats(ignored):
363             # the StatsProvider doesn't normally publish a FURL:
364             # instead it passes a live reference to the StatsGatherer
365             # (if and when it connects). To exercise the remote stats
366             # interface, we manually publish client0's StatsProvider
367             # and use client1 to query it.
368             sp = self.clients[0].stats_provider
369             sp_furl = self.clients[0].tub.registerReference(sp)
370             d = self.clients[1].tub.getReference(sp_furl)
371             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
372             def _got_stats(stats):
373                 #print "STATS"
374                 #from pprint import pprint
375                 #pprint(stats)
376                 s = stats["stats"]
377                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
378                 c = stats["counters"]
379                 self.failUnless("storage_server.allocate" in c)
380             d.addCallback(_got_stats)
381             return d
382         d.addCallback(_grab_stats)
383
384         return d
385
386     def _find_all_shares(self, basedir):
387         shares = []
388         for (dirpath, dirnames, filenames) in os.walk(basedir):
389             if "storage" not in dirpath:
390                 continue
391             if not filenames:
392                 continue
393             pieces = dirpath.split(os.sep)
394             if (len(pieces) >= 5
395                 and pieces[-4] == "storage"
396                 and pieces[-3] == "shares"):
397                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
398                 # are sharefiles here
399                 assert pieces[-5].startswith("client")
400                 client_num = int(pieces[-5][-1])
401                 storage_index_s = pieces[-1]
402                 storage_index = si_a2b(storage_index_s)
403                 for sharename in filenames:
404                     shnum = int(sharename)
405                     filename = os.path.join(dirpath, sharename)
406                     data = (client_num, storage_index, filename, shnum)
407                     shares.append(data)
408         if not shares:
409             self.fail("unable to find any share files in %s" % basedir)
410         return shares
411
412     def _corrupt_mutable_share(self, filename, which):
413         msf = MutableShareFile(filename)
414         datav = msf.readv([ (0, 1000000) ])
415         final_share = datav[0]
416         assert len(final_share) < 1000000 # ought to be truncated
417         pieces = mutable_layout.unpack_share(final_share)
418         (seqnum, root_hash, IV, k, N, segsize, datalen,
419          verification_key, signature, share_hash_chain, block_hash_tree,
420          share_data, enc_privkey) = pieces
421
422         if which == "seqnum":
423             seqnum = seqnum + 15
424         elif which == "R":
425             root_hash = self.flip_bit(root_hash)
426         elif which == "IV":
427             IV = self.flip_bit(IV)
428         elif which == "segsize":
429             segsize = segsize + 15
430         elif which == "pubkey":
431             verification_key = self.flip_bit(verification_key)
432         elif which == "signature":
433             signature = self.flip_bit(signature)
434         elif which == "share_hash_chain":
435             nodenum = share_hash_chain.keys()[0]
436             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
437         elif which == "block_hash_tree":
438             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
439         elif which == "share_data":
440             share_data = self.flip_bit(share_data)
441         elif which == "encprivkey":
442             enc_privkey = self.flip_bit(enc_privkey)
443
444         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
445                                             segsize, datalen)
446         final_share = mutable_layout.pack_share(prefix,
447                                                 verification_key,
448                                                 signature,
449                                                 share_hash_chain,
450                                                 block_hash_tree,
451                                                 share_data,
452                                                 enc_privkey)
453         msf.writev( [(0, final_share)], None)
454
455
456     def test_mutable(self):
457         self.basedir = "system/SystemTest/test_mutable"
458         DATA = "initial contents go here."  # 25 bytes % 3 != 0
459         NEWDATA = "new contents yay"
460         NEWERDATA = "this is getting old"
461
462         d = self.set_up_nodes(use_key_generator=True)
463
464         def _create_mutable(res):
465             c = self.clients[0]
466             log.msg("starting create_mutable_file")
467             d1 = c.create_mutable_file(DATA)
468             def _done(res):
469                 log.msg("DONE: %s" % (res,))
470                 self._mutable_node_1 = res
471             d1.addCallback(_done)
472             return d1
473         d.addCallback(_create_mutable)
474
475         def _test_debug(res):
476             # find a share. It is important to run this while there is only
477             # one slot in the grid.
478             shares = self._find_all_shares(self.basedir)
479             (client_num, storage_index, filename, shnum) = shares[0]
480             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
481                     % filename)
482             log.msg(" for clients[%d]" % client_num)
483
484             out,err = StringIO(), StringIO()
485             rc = runner.runner(["debug", "dump-share", "--offsets",
486                                 filename],
487                                stdout=out, stderr=err)
488             output = out.getvalue()
489             self.failUnlessEqual(rc, 0)
490             try:
491                 self.failUnless("Mutable slot found:\n" in output)
492                 self.failUnless("share_type: SDMF\n" in output)
493                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
494                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
495                 self.failUnless(" num_extra_leases: 0\n" in output)
496                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
497                                 in output)
498                 self.failUnless(" SDMF contents:\n" in output)
499                 self.failUnless("  seqnum: 1\n" in output)
500                 self.failUnless("  required_shares: 3\n" in output)
501                 self.failUnless("  total_shares: 10\n" in output)
502                 self.failUnless("  segsize: 27\n" in output, (output, filename))
503                 self.failUnless("  datalen: 25\n" in output)
504                 # the exact share_hash_chain nodes depends upon the sharenum,
505                 # and is more of a hassle to compute than I want to deal with
506                 # now
507                 self.failUnless("  share_hash_chain: " in output)
508                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
509                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
510                             base32.b2a(storage_index))
511                 self.failUnless(expected in output)
512             except unittest.FailTest:
513                 print
514                 print "dump-share output was:"
515                 print output
516                 raise
517         d.addCallback(_test_debug)
518
519         # test retrieval
520
521         # first, let's see if we can use the existing node to retrieve the
522         # contents. This allows it to use the cached pubkey and maybe the
523         # latest-known sharemap.
524
525         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
526         def _check_download_1(res):
527             self.failUnlessEqual(res, DATA)
528             # now we see if we can retrieve the data from a new node,
529             # constructed using the URI of the original one. We do this test
530             # on the same client that uploaded the data.
531             uri = self._mutable_node_1.get_uri()
532             log.msg("starting retrieve1")
533             newnode = self.clients[0].create_node_from_uri(uri)
534             newnode_2 = self.clients[0].create_node_from_uri(uri)
535             self.failUnlessIdentical(newnode, newnode_2)
536             return newnode.download_best_version()
537         d.addCallback(_check_download_1)
538
539         def _check_download_2(res):
540             self.failUnlessEqual(res, DATA)
541             # same thing, but with a different client
542             uri = self._mutable_node_1.get_uri()
543             newnode = self.clients[1].create_node_from_uri(uri)
544             log.msg("starting retrieve2")
545             d1 = newnode.download_best_version()
546             d1.addCallback(lambda res: (res, newnode))
547             return d1
548         d.addCallback(_check_download_2)
549
550         def _check_download_3((res, newnode)):
551             self.failUnlessEqual(res, DATA)
552             # replace the data
553             log.msg("starting replace1")
554             d1 = newnode.overwrite(NEWDATA)
555             d1.addCallback(lambda res: newnode.download_best_version())
556             return d1
557         d.addCallback(_check_download_3)
558
559         def _check_download_4(res):
560             self.failUnlessEqual(res, NEWDATA)
561             # now create an even newer node and replace the data on it. This
562             # new node has never been used for download before.
563             uri = self._mutable_node_1.get_uri()
564             newnode1 = self.clients[2].create_node_from_uri(uri)
565             newnode2 = self.clients[3].create_node_from_uri(uri)
566             self._newnode3 = self.clients[3].create_node_from_uri(uri)
567             log.msg("starting replace2")
568             d1 = newnode1.overwrite(NEWERDATA)
569             d1.addCallback(lambda res: newnode2.download_best_version())
570             return d1
571         d.addCallback(_check_download_4)
572
573         def _check_download_5(res):
574             log.msg("finished replace2")
575             self.failUnlessEqual(res, NEWERDATA)
576         d.addCallback(_check_download_5)
577
578         def _corrupt_shares(res):
579             # run around and flip bits in all but k of the shares, to test
580             # the hash checks
581             shares = self._find_all_shares(self.basedir)
582             ## sort by share number
583             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
584             where = dict([ (shnum, filename)
585                            for (client_num, storage_index, filename, shnum)
586                            in shares ])
587             assert len(where) == 10 # this test is designed for 3-of-10
588             for shnum, filename in where.items():
589                 # shares 7,8,9 are left alone. read will check
590                 # (share_hash_chain, block_hash_tree, share_data). New
591                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
592                 # segsize, signature).
593                 if shnum == 0:
594                     # read: this will trigger "pubkey doesn't match
595                     # fingerprint".
596                     self._corrupt_mutable_share(filename, "pubkey")
597                     self._corrupt_mutable_share(filename, "encprivkey")
598                 elif shnum == 1:
599                     # triggers "signature is invalid"
600                     self._corrupt_mutable_share(filename, "seqnum")
601                 elif shnum == 2:
602                     # triggers "signature is invalid"
603                     self._corrupt_mutable_share(filename, "R")
604                 elif shnum == 3:
605                     # triggers "signature is invalid"
606                     self._corrupt_mutable_share(filename, "segsize")
607                 elif shnum == 4:
608                     self._corrupt_mutable_share(filename, "share_hash_chain")
609                 elif shnum == 5:
610                     self._corrupt_mutable_share(filename, "block_hash_tree")
611                 elif shnum == 6:
612                     self._corrupt_mutable_share(filename, "share_data")
613                 # other things to correct: IV, signature
614                 # 7,8,9 are left alone
615
616                 # note that initial_query_count=5 means that we'll hit the
617                 # first 5 servers in effectively random order (based upon
618                 # response time), so we won't necessarily ever get a "pubkey
619                 # doesn't match fingerprint" error (if we hit shnum>=1 before
620                 # shnum=0, we pull the pubkey from there). To get repeatable
621                 # specific failures, we need to set initial_query_count=1,
622                 # but of course that will change the sequencing behavior of
623                 # the retrieval process. TODO: find a reasonable way to make
624                 # this a parameter, probably when we expand this test to test
625                 # for one failure mode at a time.
626
627                 # when we retrieve this, we should get three signature
628                 # failures (where we've mangled seqnum, R, and segsize). The
629                 # pubkey mangling
630         d.addCallback(_corrupt_shares)
631
632         d.addCallback(lambda res: self._newnode3.download_best_version())
633         d.addCallback(_check_download_5)
634
635         def _check_empty_file(res):
636             # make sure we can create empty files, this usually screws up the
637             # segsize math
638             d1 = self.clients[2].create_mutable_file("")
639             d1.addCallback(lambda newnode: newnode.download_best_version())
640             d1.addCallback(lambda res: self.failUnlessEqual("", res))
641             return d1
642         d.addCallback(_check_empty_file)
643
644         d.addCallback(lambda res: self.clients[0].create_dirnode())
645         def _created_dirnode(dnode):
646             log.msg("_created_dirnode(%s)" % (dnode,))
647             d1 = dnode.list()
648             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
649             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
650             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
651             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
652             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
653             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
654             d1.addCallback(lambda res: dnode.build_manifest().when_done())
655             d1.addCallback(lambda res:
656                            self.failUnlessEqual(len(res["manifest"]), 1))
657             return d1
658         d.addCallback(_created_dirnode)
659
660         def wait_for_c3_kg_conn():
661             return self.clients[3]._key_generator is not None
662         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
663
664         def check_kg_poolsize(junk, size_delta):
665             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
666                                  self.key_generator_svc.key_generator.pool_size + size_delta)
667
668         d.addCallback(check_kg_poolsize, 0)
669         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
670         d.addCallback(check_kg_poolsize, -1)
671         d.addCallback(lambda junk: self.clients[3].create_dirnode())
672         d.addCallback(check_kg_poolsize, -2)
673         # use_helper induces use of clients[3], which is the using-key_gen client
674         d.addCallback(lambda junk:
675                       self.POST("uri?t=mkdir&name=george", use_helper=True))
676         d.addCallback(check_kg_poolsize, -3)
677
678         return d
679
680     def flip_bit(self, good):
681         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
682
683     def mangle_uri(self, gooduri):
684         # change the key, which changes the storage index, which means we'll
685         # be asking about the wrong file, so nobody will have any shares
686         u = uri.from_string(gooduri)
687         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
688                             uri_extension_hash=u.uri_extension_hash,
689                             needed_shares=u.needed_shares,
690                             total_shares=u.total_shares,
691                             size=u.size)
692         return u2.to_string()
693
694     # TODO: add a test which mangles the uri_extension_hash instead, and
695     # should fail due to not being able to get a valid uri_extension block.
696     # Also a test which sneakily mangles the uri_extension block to change
697     # some of the validation data, so it will fail in the post-download phase
698     # when the file's crypttext integrity check fails. Do the same thing for
699     # the key, which should cause the download to fail the post-download
700     # plaintext_hash check.
701
702     def test_filesystem(self):
703         self.basedir = "system/SystemTest/test_filesystem"
704         self.data = LARGE_DATA
705         d = self.set_up_nodes(use_stats_gatherer=True)
706         def _new_happy_semantics(ign):
707             for c in self.clients:
708                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
709         d.addCallback(_new_happy_semantics)
710         d.addCallback(self._test_introweb)
711         d.addCallback(self.log, "starting publish")
712         d.addCallback(self._do_publish1)
713         d.addCallback(self._test_runner)
714         d.addCallback(self._do_publish2)
715         # at this point, we have the following filesystem (where "R" denotes
716         # self._root_directory_uri):
717         # R
718         # R/subdir1
719         # R/subdir1/mydata567
720         # R/subdir1/subdir2/
721         # R/subdir1/subdir2/mydata992
722
723         d.addCallback(lambda res: self.bounce_client(0))
724         d.addCallback(self.log, "bounced client0")
725
726         d.addCallback(self._check_publish1)
727         d.addCallback(self.log, "did _check_publish1")
728         d.addCallback(self._check_publish2)
729         d.addCallback(self.log, "did _check_publish2")
730         d.addCallback(self._do_publish_private)
731         d.addCallback(self.log, "did _do_publish_private")
732         # now we also have (where "P" denotes a new dir):
733         #  P/personal/sekrit data
734         #  P/s2-rw -> /subdir1/subdir2/
735         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
736         d.addCallback(self._check_publish_private)
737         d.addCallback(self.log, "did _check_publish_private")
738         d.addCallback(self._test_web)
739         d.addCallback(self._test_control)
740         d.addCallback(self._test_cli)
741         # P now has four top-level children:
742         # P/personal/sekrit data
743         # P/s2-ro/
744         # P/s2-rw/
745         # P/test_put/  (empty)
746         d.addCallback(self._test_checker)
747         return d
748
749     def _test_introweb(self, res):
750         d = getPage(self.introweb_url, method="GET", followRedirect=True)
751         def _check(res):
752             try:
753                 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
754                                 in res)
755                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
756                 self.failUnless("Subscription Summary: storage: 5" in res)
757             except unittest.FailTest:
758                 print
759                 print "GET %s output was:" % self.introweb_url
760                 print res
761                 raise
762         d.addCallback(_check)
763         d.addCallback(lambda res:
764                       getPage(self.introweb_url + "?t=json",
765                               method="GET", followRedirect=True))
766         def _check_json(res):
767             data = simplejson.loads(res)
768             try:
769                 self.failUnlessEqual(data["subscription_summary"],
770                                      {"storage": 5})
771                 self.failUnlessEqual(data["announcement_summary"],
772                                      {"storage": 5, "stub_client": 5})
773                 self.failUnlessEqual(data["announcement_distinct_hosts"],
774                                      {"storage": 1, "stub_client": 1})
775             except unittest.FailTest:
776                 print
777                 print "GET %s?t=json output was:" % self.introweb_url
778                 print res
779                 raise
780         d.addCallback(_check_json)
781         return d
782
783     def _do_publish1(self, res):
784         ut = upload.Data(self.data, convergence=None)
785         c0 = self.clients[0]
786         d = c0.create_dirnode()
787         def _made_root(new_dirnode):
788             self._root_directory_uri = new_dirnode.get_uri()
789             return c0.create_node_from_uri(self._root_directory_uri)
790         d.addCallback(_made_root)
791         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
792         def _made_subdir1(subdir1_node):
793             self._subdir1_node = subdir1_node
794             d1 = subdir1_node.add_file(u"mydata567", ut)
795             d1.addCallback(self.log, "publish finished")
796             def _stash_uri(filenode):
797                 self.uri = filenode.get_uri()
798                 assert isinstance(self.uri, str), (self.uri, filenode)
799             d1.addCallback(_stash_uri)
800             return d1
801         d.addCallback(_made_subdir1)
802         return d
803
804     def _do_publish2(self, res):
805         ut = upload.Data(self.data, convergence=None)
806         d = self._subdir1_node.create_subdirectory(u"subdir2")
807         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
808         return d
809
810     def log(self, res, *args, **kwargs):
811         # print "MSG: %s  RES: %s" % (msg, args)
812         log.msg(*args, **kwargs)
813         return res
814
815     def _do_publish_private(self, res):
816         self.smalldata = "sssh, very secret stuff"
817         ut = upload.Data(self.smalldata, convergence=None)
818         d = self.clients[0].create_dirnode()
819         d.addCallback(self.log, "GOT private directory")
820         def _got_new_dir(privnode):
821             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
822             d1 = privnode.create_subdirectory(u"personal")
823             d1.addCallback(self.log, "made P/personal")
824             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
825             d1.addCallback(self.log, "made P/personal/sekrit data")
826             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
827             def _got_s2(s2node):
828                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
829                                       s2node.get_readonly_uri())
830                 d2.addCallback(lambda node:
831                                privnode.set_uri(u"s2-ro",
832                                                 s2node.get_readonly_uri(),
833                                                 s2node.get_readonly_uri()))
834                 return d2
835             d1.addCallback(_got_s2)
836             d1.addCallback(lambda res: privnode)
837             return d1
838         d.addCallback(_got_new_dir)
839         return d
840
841     def _check_publish1(self, res):
842         # this one uses the iterative API
843         c1 = self.clients[1]
844         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
845         d.addCallback(self.log, "check_publish1 got /")
846         d.addCallback(lambda root: root.get(u"subdir1"))
847         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
848         d.addCallback(lambda filenode: download_to_data(filenode))
849         d.addCallback(self.log, "get finished")
850         def _get_done(data):
851             self.failUnlessEqual(data, self.data)
852         d.addCallback(_get_done)
853         return d
854
855     def _check_publish2(self, res):
856         # this one uses the path-based API
857         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
858         d = rootnode.get_child_at_path(u"subdir1")
859         d.addCallback(lambda dirnode:
860                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
861         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
862         d.addCallback(lambda filenode: download_to_data(filenode))
863         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
864
865         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
866         def _got_filenode(filenode):
867             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
868             assert fnode == filenode
869         d.addCallback(_got_filenode)
870         return d
871
872     def _check_publish_private(self, resnode):
873         # this one uses the path-based API
874         self._private_node = resnode
875
876         d = self._private_node.get_child_at_path(u"personal")
877         def _got_personal(personal):
878             self._personal_node = personal
879             return personal
880         d.addCallback(_got_personal)
881
882         d.addCallback(lambda dirnode:
883                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
884         def get_path(path):
885             return self._private_node.get_child_at_path(path)
886
887         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
888         d.addCallback(lambda filenode: download_to_data(filenode))
889         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
890         d.addCallback(lambda res: get_path(u"s2-rw"))
891         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
892         d.addCallback(lambda res: get_path(u"s2-ro"))
893         def _got_s2ro(dirnode):
894             self.failUnless(dirnode.is_mutable(), dirnode)
895             self.failUnless(dirnode.is_readonly(), dirnode)
896             d1 = defer.succeed(None)
897             d1.addCallback(lambda res: dirnode.list())
898             d1.addCallback(self.log, "dirnode.list")
899
900             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
901
902             d1.addCallback(self.log, "doing add_file(ro)")
903             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
904             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
905
906             d1.addCallback(self.log, "doing get(ro)")
907             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
908             d1.addCallback(lambda filenode:
909                            self.failUnless(IFileNode.providedBy(filenode)))
910
911             d1.addCallback(self.log, "doing delete(ro)")
912             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
913
914             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
915
916             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
917
918             personal = self._personal_node
919             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
920
921             d1.addCallback(self.log, "doing move_child_to(ro)2")
922             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
923
924             d1.addCallback(self.log, "finished with _got_s2ro")
925             return d1
926         d.addCallback(_got_s2ro)
927         def _got_home(dummy):
928             home = self._private_node
929             personal = self._personal_node
930             d1 = defer.succeed(None)
931             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
932             d1.addCallback(lambda res:
933                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
934
935             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
936             d1.addCallback(lambda res:
937                            home.move_child_to(u"sekrit", home, u"sekrit data"))
938
939             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
940             d1.addCallback(lambda res:
941                            home.move_child_to(u"sekrit data", personal))
942
943             d1.addCallback(lambda res: home.build_manifest().when_done())
944             d1.addCallback(self.log, "manifest")
945             #  five items:
946             # P/
947             # P/personal/
948             # P/personal/sekrit data
949             # P/s2-rw  (same as P/s2-ro)
950             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
951             d1.addCallback(lambda res:
952                            self.failUnlessEqual(len(res["manifest"]), 5))
953             d1.addCallback(lambda res: home.start_deep_stats().when_done())
954             def _check_stats(stats):
955                 expected = {"count-immutable-files": 1,
956                             "count-mutable-files": 0,
957                             "count-literal-files": 1,
958                             "count-files": 2,
959                             "count-directories": 3,
960                             "size-immutable-files": 112,
961                             "size-literal-files": 23,
962                             #"size-directories": 616, # varies
963                             #"largest-directory": 616,
964                             "largest-directory-children": 3,
965                             "largest-immutable-file": 112,
966                             }
967                 for k,v in expected.iteritems():
968                     self.failUnlessEqual(stats[k], v,
969                                          "stats[%s] was %s, not %s" %
970                                          (k, stats[k], v))
971                 self.failUnless(stats["size-directories"] > 1300,
972                                 stats["size-directories"])
973                 self.failUnless(stats["largest-directory"] > 800,
974                                 stats["largest-directory"])
975                 self.failUnlessEqual(stats["size-files-histogram"],
976                                      [ (11, 31, 1), (101, 316, 1) ])
977             d1.addCallback(_check_stats)
978             return d1
979         d.addCallback(_got_home)
980         return d
981
982     def shouldFail(self, res, expected_failure, which, substring=None):
983         if isinstance(res, Failure):
984             res.trap(expected_failure)
985             if substring:
986                 self.failUnless(substring in str(res),
987                                 "substring '%s' not in '%s'"
988                                 % (substring, str(res)))
989         else:
990             self.fail("%s was supposed to raise %s, not get '%s'" %
991                       (which, expected_failure, res))
992
993     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
994         assert substring is None or isinstance(substring, str)
995         d = defer.maybeDeferred(callable, *args, **kwargs)
996         def done(res):
997             if isinstance(res, Failure):
998                 res.trap(expected_failure)
999                 if substring:
1000                     self.failUnless(substring in str(res),
1001                                     "substring '%s' not in '%s'"
1002                                     % (substring, str(res)))
1003             else:
1004                 self.fail("%s was supposed to raise %s, not get '%s'" %
1005                           (which, expected_failure, res))
1006         d.addBoth(done)
1007         return d
1008
1009     def PUT(self, urlpath, data):
1010         url = self.webish_url + urlpath
1011         return getPage(url, method="PUT", postdata=data)
1012
1013     def GET(self, urlpath, followRedirect=False):
1014         url = self.webish_url + urlpath
1015         return getPage(url, method="GET", followRedirect=followRedirect)
1016
1017     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1018         sepbase = "boogabooga"
1019         sep = "--" + sepbase
1020         form = []
1021         form.append(sep)
1022         form.append('Content-Disposition: form-data; name="_charset"')
1023         form.append('')
1024         form.append('UTF-8')
1025         form.append(sep)
1026         for name, value in fields.iteritems():
1027             if isinstance(value, tuple):
1028                 filename, value = value
1029                 form.append('Content-Disposition: form-data; name="%s"; '
1030                             'filename="%s"' % (name, filename.encode("utf-8")))
1031             else:
1032                 form.append('Content-Disposition: form-data; name="%s"' % name)
1033             form.append('')
1034             form.append(str(value))
1035             form.append(sep)
1036         form[-1] += "--"
1037         body = ""
1038         headers = {}
1039         if fields:
1040             body = "\r\n".join(form) + "\r\n"
1041             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1042         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1043
1044     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1045               use_helper=False):
1046         if use_helper:
1047             url = self.helper_webish_url + urlpath
1048         else:
1049             url = self.webish_url + urlpath
1050         return getPage(url, method="POST", postdata=body, headers=headers,
1051                        followRedirect=followRedirect)
1052
1053     def _test_web(self, res):
1054         base = self.webish_url
1055         public = "uri/" + self._root_directory_uri
1056         d = getPage(base)
1057         def _got_welcome(page):
1058             # XXX This test is oversensitive to formatting
1059             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1060             self.failUnless(expected in page,
1061                             "I didn't see the right 'connected storage servers'"
1062                             " message in: %s" % page
1063                             )
1064             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1065             self.failUnless(expected in page,
1066                             "I didn't see the right 'My nodeid' message "
1067                             "in: %s" % page)
1068             self.failUnless("Helper: 0 active uploads" in page)
1069         d.addCallback(_got_welcome)
1070         d.addCallback(self.log, "done with _got_welcome")
1071
1072         # get the welcome page from the node that uses the helper too
1073         d.addCallback(lambda res: getPage(self.helper_webish_url))
1074         def _got_welcome_helper(page):
1075             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1076                             page)
1077             self.failUnless("Not running helper" in page)
1078         d.addCallback(_got_welcome_helper)
1079
1080         d.addCallback(lambda res: getPage(base + public))
1081         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1082         def _got_subdir1(page):
1083             # there ought to be an href for our file
1084             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1085             self.failUnless(">mydata567</a>" in page)
1086         d.addCallback(_got_subdir1)
1087         d.addCallback(self.log, "done with _got_subdir1")
1088         d.addCallback(lambda res:
1089                       getPage(base + public + "/subdir1/mydata567"))
1090         def _got_data(page):
1091             self.failUnlessEqual(page, self.data)
1092         d.addCallback(_got_data)
1093
1094         # download from a URI embedded in a URL
1095         d.addCallback(self.log, "_get_from_uri")
1096         def _get_from_uri(res):
1097             return getPage(base + "uri/%s?filename=%s"
1098                            % (self.uri, "mydata567"))
1099         d.addCallback(_get_from_uri)
1100         def _got_from_uri(page):
1101             self.failUnlessEqual(page, self.data)
1102         d.addCallback(_got_from_uri)
1103
1104         # download from a URI embedded in a URL, second form
1105         d.addCallback(self.log, "_get_from_uri2")
1106         def _get_from_uri2(res):
1107             return getPage(base + "uri?uri=%s" % (self.uri,))
1108         d.addCallback(_get_from_uri2)
1109         d.addCallback(_got_from_uri)
1110
1111         # download from a bogus URI, make sure we get a reasonable error
1112         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1113         def _get_from_bogus_uri(res):
1114             d1 = getPage(base + "uri/%s?filename=%s"
1115                          % (self.mangle_uri(self.uri), "mydata567"))
1116             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1117                        "410")
1118             return d1
1119         d.addCallback(_get_from_bogus_uri)
1120         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1121
1122         # upload a file with PUT
1123         d.addCallback(self.log, "about to try PUT")
1124         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1125                                            "new.txt contents"))
1126         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1127         d.addCallback(self.failUnlessEqual, "new.txt contents")
1128         # and again with something large enough to use multiple segments,
1129         # and hopefully trigger pauseProducing too
1130         def _new_happy_semantics(ign):
1131             for c in self.clients:
1132                 # these get reset somewhere? Whatever.
1133                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1134         d.addCallback(_new_happy_semantics)
1135         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1136                                            "big" * 500000)) # 1.5MB
1137         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1138         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1139
1140         # can we replace files in place?
1141         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1142                                            "NEWER contents"))
1143         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1144         d.addCallback(self.failUnlessEqual, "NEWER contents")
1145
1146         # test unlinked POST
1147         d.addCallback(lambda res: self.POST("uri", t="upload",
1148                                             file=("new.txt", "data" * 10000)))
1149         # and again using the helper, which exercises different upload-status
1150         # display code
1151         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1152                                             file=("foo.txt", "data2" * 10000)))
1153
1154         # check that the status page exists
1155         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1156         def _got_status(res):
1157             # find an interesting upload and download to look at. LIT files
1158             # are not interesting.
1159             h = self.clients[0].get_history()
1160             for ds in h.list_all_download_statuses():
1161                 if ds.get_size() > 200:
1162                     self._down_status = ds.get_counter()
1163             for us in h.list_all_upload_statuses():
1164                 if us.get_size() > 200:
1165                     self._up_status = us.get_counter()
1166             rs = list(h.list_all_retrieve_statuses())[0]
1167             self._retrieve_status = rs.get_counter()
1168             ps = list(h.list_all_publish_statuses())[0]
1169             self._publish_status = ps.get_counter()
1170             us = list(h.list_all_mapupdate_statuses())[0]
1171             self._update_status = us.get_counter()
1172
1173             # and that there are some upload- and download- status pages
1174             return self.GET("status/up-%d" % self._up_status)
1175         d.addCallback(_got_status)
1176         def _got_up(res):
1177             return self.GET("status/down-%d" % self._down_status)
1178         d.addCallback(_got_up)
1179         def _got_down(res):
1180             return self.GET("status/mapupdate-%d" % self._update_status)
1181         d.addCallback(_got_down)
1182         def _got_update(res):
1183             return self.GET("status/publish-%d" % self._publish_status)
1184         d.addCallback(_got_update)
1185         def _got_publish(res):
1186             return self.GET("status/retrieve-%d" % self._retrieve_status)
1187         d.addCallback(_got_publish)
1188
1189         # check that the helper status page exists
1190         d.addCallback(lambda res:
1191                       self.GET("helper_status", followRedirect=True))
1192         def _got_helper_status(res):
1193             self.failUnless("Bytes Fetched:" in res)
1194             # touch a couple of files in the helper's working directory to
1195             # exercise more code paths
1196             workdir = os.path.join(self.getdir("client0"), "helper")
1197             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1198             f = open(incfile, "wb")
1199             f.write("small file")
1200             f.close()
1201             then = time.time() - 86400*3
1202             now = time.time()
1203             os.utime(incfile, (now, then))
1204             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1205             f = open(encfile, "wb")
1206             f.write("less small file")
1207             f.close()
1208             os.utime(encfile, (now, then))
1209         d.addCallback(_got_helper_status)
1210         # and that the json form exists
1211         d.addCallback(lambda res:
1212                       self.GET("helper_status?t=json", followRedirect=True))
1213         def _got_helper_status_json(res):
1214             data = simplejson.loads(res)
1215             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1216                                  1)
1217             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1218             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1219             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1220                                  10)
1221             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1222             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1223             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1224                                  15)
1225         d.addCallback(_got_helper_status_json)
1226
1227         # and check that client[3] (which uses a helper but does not run one
1228         # itself) doesn't explode when you ask for its status
1229         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1230         def _got_non_helper_status(res):
1231             self.failUnless("Upload and Download Status" in res)
1232         d.addCallback(_got_non_helper_status)
1233
1234         # or for helper status with t=json
1235         d.addCallback(lambda res:
1236                       getPage(self.helper_webish_url + "helper_status?t=json"))
1237         def _got_non_helper_status_json(res):
1238             data = simplejson.loads(res)
1239             self.failUnlessEqual(data, {})
1240         d.addCallback(_got_non_helper_status_json)
1241
1242         # see if the statistics page exists
1243         d.addCallback(lambda res: self.GET("statistics"))
1244         def _got_stats(res):
1245             self.failUnless("Node Statistics" in res)
1246             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1247         d.addCallback(_got_stats)
1248         d.addCallback(lambda res: self.GET("statistics?t=json"))
1249         def _got_stats_json(res):
1250             data = simplejson.loads(res)
1251             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1252             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1253         d.addCallback(_got_stats_json)
1254
1255         # TODO: mangle the second segment of a file, to test errors that
1256         # occur after we've already sent some good data, which uses a
1257         # different error path.
1258
1259         # TODO: download a URI with a form
1260         # TODO: create a directory by using a form
1261         # TODO: upload by using a form on the directory page
1262         #    url = base + "somedir/subdir1/freeform_post!!upload"
1263         # TODO: delete a file by using a button on the directory page
1264
1265         return d
1266
1267     def _test_runner(self, res):
1268         # exercise some of the diagnostic tools in runner.py
1269
1270         # find a share
1271         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1272             if "storage" not in dirpath:
1273                 continue
1274             if not filenames:
1275                 continue
1276             pieces = dirpath.split(os.sep)
1277             if (len(pieces) >= 4
1278                 and pieces[-4] == "storage"
1279                 and pieces[-3] == "shares"):
1280                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1281                 # are sharefiles here
1282                 filename = os.path.join(dirpath, filenames[0])
1283                 # peek at the magic to see if it is a chk share
1284                 magic = open(filename, "rb").read(4)
1285                 if magic == '\x00\x00\x00\x01':
1286                     break
1287         else:
1288             self.fail("unable to find any uri_extension files in %s"
1289                       % self.basedir)
1290         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1291
1292         out,err = StringIO(), StringIO()
1293         rc = runner.runner(["debug", "dump-share", "--offsets",
1294                             filename],
1295                            stdout=out, stderr=err)
1296         output = out.getvalue()
1297         self.failUnlessEqual(rc, 0)
1298
1299         # we only upload a single file, so we can assert some things about
1300         # its size and shares.
1301         self.failUnless(("share filename: %s" % filename) in output)
1302         self.failUnless("size: %d\n" % len(self.data) in output)
1303         self.failUnless("num_segments: 1\n" in output)
1304         # segment_size is always a multiple of needed_shares
1305         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1306         self.failUnless("total_shares: 10\n" in output)
1307         # keys which are supposed to be present
1308         for key in ("size", "num_segments", "segment_size",
1309                     "needed_shares", "total_shares",
1310                     "codec_name", "codec_params", "tail_codec_params",
1311                     #"plaintext_hash", "plaintext_root_hash",
1312                     "crypttext_hash", "crypttext_root_hash",
1313                     "share_root_hash", "UEB_hash"):
1314             self.failUnless("%s: " % key in output, key)
1315         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1316
1317         # now use its storage index to find the other shares using the
1318         # 'find-shares' tool
1319         sharedir, shnum = os.path.split(filename)
1320         storagedir, storage_index_s = os.path.split(sharedir)
1321         out,err = StringIO(), StringIO()
1322         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1323         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1324         rc = runner.runner(cmd, stdout=out, stderr=err)
1325         self.failUnlessEqual(rc, 0)
1326         out.seek(0)
1327         sharefiles = [sfn.strip() for sfn in out.readlines()]
1328         self.failUnlessEqual(len(sharefiles), 10)
1329
1330         # also exercise the 'catalog-shares' tool
1331         out,err = StringIO(), StringIO()
1332         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1333         cmd = ["debug", "catalog-shares"] + nodedirs
1334         rc = runner.runner(cmd, stdout=out, stderr=err)
1335         self.failUnlessEqual(rc, 0)
1336         out.seek(0)
1337         descriptions = [sfn.strip() for sfn in out.readlines()]
1338         self.failUnlessEqual(len(descriptions), 30)
1339         matching = [line
1340                     for line in descriptions
1341                     if line.startswith("CHK %s " % storage_index_s)]
1342         self.failUnlessEqual(len(matching), 10)
1343
1344     def _test_control(self, res):
1345         # exercise the remote-control-the-client foolscap interfaces in
1346         # allmydata.control (mostly used for performance tests)
1347         c0 = self.clients[0]
1348         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1349         control_furl = open(control_furl_file, "r").read().strip()
1350         # it doesn't really matter which Tub we use to connect to the client,
1351         # so let's just use our IntroducerNode's
1352         d = self.introducer.tub.getReference(control_furl)
1353         d.addCallback(self._test_control2, control_furl_file)
1354         return d
1355     def _test_control2(self, rref, filename):
1356         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1357         downfile = os.path.join(self.basedir, "control.downfile")
1358         d.addCallback(lambda uri:
1359                       rref.callRemote("download_from_uri_to_file",
1360                                       uri, downfile))
1361         def _check(res):
1362             self.failUnlessEqual(res, downfile)
1363             data = open(downfile, "r").read()
1364             expected_data = open(filename, "r").read()
1365             self.failUnlessEqual(data, expected_data)
1366         d.addCallback(_check)
1367         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1368         if sys.platform == "linux2":
1369             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1370         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1371         return d
1372
1373     def _test_cli(self, res):
1374         # run various CLI commands (in a thread, since they use blocking
1375         # network calls)
1376
1377         private_uri = self._private_node.get_uri()
1378         client0_basedir = self.getdir("client0")
1379
1380         nodeargs = [
1381             "--node-directory", client0_basedir,
1382             ]
1383
1384         d = defer.succeed(None)
1385
1386         # for compatibility with earlier versions, private/root_dir.cap is
1387         # supposed to be treated as an alias named "tahoe:". Start by making
1388         # sure that works, before we add other aliases.
1389
1390         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1391         f = open(root_file, "w")
1392         f.write(private_uri)
1393         f.close()
1394
1395         def run(ignored, verb, *args, **kwargs):
1396             stdin = kwargs.get("stdin", "")
1397             newargs = [verb] + nodeargs + list(args)
1398             return self._run_cli(newargs, stdin=stdin)
1399
1400         def _check_ls((out,err), expected_children, unexpected_children=[]):
1401             self.failUnlessEqual(err, "")
1402             for s in expected_children:
1403                 self.failUnless(s in out, (s,out))
1404             for s in unexpected_children:
1405                 self.failIf(s in out, (s,out))
1406
1407         def _check_ls_root((out,err)):
1408             self.failUnless("personal" in out)
1409             self.failUnless("s2-ro" in out)
1410             self.failUnless("s2-rw" in out)
1411             self.failUnlessEqual(err, "")
1412
1413         # this should reference private_uri
1414         d.addCallback(run, "ls")
1415         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1416
1417         d.addCallback(run, "list-aliases")
1418         def _check_aliases_1((out,err)):
1419             self.failUnlessEqual(err, "")
1420             self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1421         d.addCallback(_check_aliases_1)
1422
1423         # now that that's out of the way, remove root_dir.cap and work with
1424         # new files
1425         d.addCallback(lambda res: os.unlink(root_file))
1426         d.addCallback(run, "list-aliases")
1427         def _check_aliases_2((out,err)):
1428             self.failUnlessEqual(err, "")
1429             self.failUnlessEqual(out, "")
1430         d.addCallback(_check_aliases_2)
1431
1432         d.addCallback(run, "mkdir")
1433         def _got_dir( (out,err) ):
1434             self.failUnless(uri.from_string_dirnode(out.strip()))
1435             return out.strip()
1436         d.addCallback(_got_dir)
1437         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1438
1439         d.addCallback(run, "list-aliases")
1440         def _check_aliases_3((out,err)):
1441             self.failUnlessEqual(err, "")
1442             self.failUnless("tahoe: " in out)
1443         d.addCallback(_check_aliases_3)
1444
1445         def _check_empty_dir((out,err)):
1446             self.failUnlessEqual(out, "")
1447             self.failUnlessEqual(err, "")
1448         d.addCallback(run, "ls")
1449         d.addCallback(_check_empty_dir)
1450
1451         def _check_missing_dir((out,err)):
1452             # TODO: check that rc==2
1453             self.failUnlessEqual(out, "")
1454             self.failUnlessEqual(err, "No such file or directory\n")
1455         d.addCallback(run, "ls", "bogus")
1456         d.addCallback(_check_missing_dir)
1457
1458         files = []
1459         datas = []
1460         for i in range(10):
1461             fn = os.path.join(self.basedir, "file%d" % i)
1462             files.append(fn)
1463             data = "data to be uploaded: file%d\n" % i
1464             datas.append(data)
1465             open(fn,"wb").write(data)
1466
1467         def _check_stdout_against((out,err), filenum=None, data=None):
1468             self.failUnlessEqual(err, "")
1469             if filenum is not None:
1470                 self.failUnlessEqual(out, datas[filenum])
1471             if data is not None:
1472                 self.failUnlessEqual(out, data)
1473
1474         # test all both forms of put: from a file, and from stdin
1475         #  tahoe put bar FOO
1476         d.addCallback(run, "put", files[0], "tahoe-file0")
1477         def _put_out((out,err)):
1478             self.failUnless("URI:LIT:" in out, out)
1479             self.failUnless("201 Created" in err, err)
1480             uri0 = out.strip()
1481             return run(None, "get", uri0)
1482         d.addCallback(_put_out)
1483         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1484
1485         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1486         #  tahoe put bar tahoe:FOO
1487         d.addCallback(run, "put", files[2], "tahoe:file2")
1488         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1489         def _check_put_mutable((out,err)):
1490             self._mutable_file3_uri = out.strip()
1491         d.addCallback(_check_put_mutable)
1492         d.addCallback(run, "get", "tahoe:file3")
1493         d.addCallback(_check_stdout_against, 3)
1494
1495         #  tahoe put FOO
1496         STDIN_DATA = "This is the file to upload from stdin."
1497         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1498         #  tahoe put tahoe:FOO
1499         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1500                       stdin="Other file from stdin.")
1501
1502         d.addCallback(run, "ls")
1503         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1504                                   "tahoe-file-stdin", "from-stdin"])
1505         d.addCallback(run, "ls", "subdir")
1506         d.addCallback(_check_ls, ["tahoe-file1"])
1507
1508         # tahoe mkdir FOO
1509         d.addCallback(run, "mkdir", "subdir2")
1510         d.addCallback(run, "ls")
1511         # TODO: extract the URI, set an alias with it
1512         d.addCallback(_check_ls, ["subdir2"])
1513
1514         # tahoe get: (to stdin and to a file)
1515         d.addCallback(run, "get", "tahoe-file0")
1516         d.addCallback(_check_stdout_against, 0)
1517         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1518         d.addCallback(_check_stdout_against, 1)
1519         outfile0 = os.path.join(self.basedir, "outfile0")
1520         d.addCallback(run, "get", "file2", outfile0)
1521         def _check_outfile0((out,err)):
1522             data = open(outfile0,"rb").read()
1523             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1524         d.addCallback(_check_outfile0)
1525         outfile1 = os.path.join(self.basedir, "outfile0")
1526         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1527         def _check_outfile1((out,err)):
1528             data = open(outfile1,"rb").read()
1529             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1530         d.addCallback(_check_outfile1)
1531
1532         d.addCallback(run, "rm", "tahoe-file0")
1533         d.addCallback(run, "rm", "tahoe:file2")
1534         d.addCallback(run, "ls")
1535         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1536
1537         d.addCallback(run, "ls", "-l")
1538         def _check_ls_l((out,err)):
1539             lines = out.split("\n")
1540             for l in lines:
1541                 if "tahoe-file-stdin" in l:
1542                     self.failUnless(l.startswith("-r-- "), l)
1543                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1544                 if "file3" in l:
1545                     self.failUnless(l.startswith("-rw- "), l) # mutable
1546         d.addCallback(_check_ls_l)
1547
1548         d.addCallback(run, "ls", "--uri")
1549         def _check_ls_uri((out,err)):
1550             lines = out.split("\n")
1551             for l in lines:
1552                 if "file3" in l:
1553                     self.failUnless(self._mutable_file3_uri in l)
1554         d.addCallback(_check_ls_uri)
1555
1556         d.addCallback(run, "ls", "--readonly-uri")
1557         def _check_ls_rouri((out,err)):
1558             lines = out.split("\n")
1559             for l in lines:
1560                 if "file3" in l:
1561                     rw_uri = self._mutable_file3_uri
1562                     u = uri.from_string_mutable_filenode(rw_uri)
1563                     ro_uri = u.get_readonly().to_string()
1564                     self.failUnless(ro_uri in l)
1565         d.addCallback(_check_ls_rouri)
1566
1567
1568         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1569         d.addCallback(run, "ls")
1570         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1571
1572         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1573         d.addCallback(run, "ls")
1574         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1575
1576         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1577         d.addCallback(run, "ls")
1578         d.addCallback(_check_ls, ["file3", "file3-copy"])
1579         d.addCallback(run, "get", "tahoe:file3-copy")
1580         d.addCallback(_check_stdout_against, 3)
1581
1582         # copy from disk into tahoe
1583         d.addCallback(run, "cp", files[4], "tahoe:file4")
1584         d.addCallback(run, "ls")
1585         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1586         d.addCallback(run, "get", "tahoe:file4")
1587         d.addCallback(_check_stdout_against, 4)
1588
1589         # copy from tahoe into disk
1590         target_filename = os.path.join(self.basedir, "file-out")
1591         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1592         def _check_cp_out((out,err)):
1593             self.failUnless(os.path.exists(target_filename))
1594             got = open(target_filename,"rb").read()
1595             self.failUnlessEqual(got, datas[4])
1596         d.addCallback(_check_cp_out)
1597
1598         # copy from disk to disk (silly case)
1599         target2_filename = os.path.join(self.basedir, "file-out-copy")
1600         d.addCallback(run, "cp", target_filename, target2_filename)
1601         def _check_cp_out2((out,err)):
1602             self.failUnless(os.path.exists(target2_filename))
1603             got = open(target2_filename,"rb").read()
1604             self.failUnlessEqual(got, datas[4])
1605         d.addCallback(_check_cp_out2)
1606
1607         # copy from tahoe into disk, overwriting an existing file
1608         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1609         def _check_cp_out3((out,err)):
1610             self.failUnless(os.path.exists(target_filename))
1611             got = open(target_filename,"rb").read()
1612             self.failUnlessEqual(got, datas[3])
1613         d.addCallback(_check_cp_out3)
1614
1615         # copy from disk into tahoe, overwriting an existing immutable file
1616         d.addCallback(run, "cp", files[5], "tahoe:file4")
1617         d.addCallback(run, "ls")
1618         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1619         d.addCallback(run, "get", "tahoe:file4")
1620         d.addCallback(_check_stdout_against, 5)
1621
1622         # copy from disk into tahoe, overwriting an existing mutable file
1623         d.addCallback(run, "cp", files[5], "tahoe:file3")
1624         d.addCallback(run, "ls")
1625         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1626         d.addCallback(run, "get", "tahoe:file3")
1627         d.addCallback(_check_stdout_against, 5)
1628
1629         # recursive copy: setup
1630         dn = os.path.join(self.basedir, "dir1")
1631         os.makedirs(dn)
1632         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1633         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1634         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1635         sdn2 = os.path.join(dn, "subdir2")
1636         os.makedirs(sdn2)
1637         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1638         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1639
1640         # from disk into tahoe
1641         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1642         d.addCallback(run, "ls")
1643         d.addCallback(_check_ls, ["dir1"])
1644         d.addCallback(run, "ls", "dir1")
1645         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1646                       ["rfile4", "rfile5"])
1647         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1648         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1649                       ["rfile1", "rfile2", "rfile3"])
1650         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1651         d.addCallback(_check_stdout_against, data="rfile4")
1652
1653         # and back out again
1654         dn_copy = os.path.join(self.basedir, "dir1-copy")
1655         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1656         def _check_cp_r_out((out,err)):
1657             def _cmp(name):
1658                 old = open(os.path.join(dn, name), "rb").read()
1659                 newfn = os.path.join(dn_copy, name)
1660                 self.failUnless(os.path.exists(newfn))
1661                 new = open(newfn, "rb").read()
1662                 self.failUnlessEqual(old, new)
1663             _cmp("rfile1")
1664             _cmp("rfile2")
1665             _cmp("rfile3")
1666             _cmp(os.path.join("subdir2", "rfile4"))
1667             _cmp(os.path.join("subdir2", "rfile5"))
1668         d.addCallback(_check_cp_r_out)
1669
1670         # and copy it a second time, which ought to overwrite the same files
1671         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1672
1673         # and again, only writing filecaps
1674         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1675         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1676         def _check_capsonly((out,err)):
1677             # these should all be LITs
1678             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1679             y = uri.from_string_filenode(x)
1680             self.failUnlessEqual(y.data, "rfile4")
1681         d.addCallback(_check_capsonly)
1682
1683         # and tahoe-to-tahoe
1684         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1685         d.addCallback(run, "ls")
1686         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1687         d.addCallback(run, "ls", "dir1-copy")
1688         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1689                       ["rfile4", "rfile5"])
1690         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1691         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1692                       ["rfile1", "rfile2", "rfile3"])
1693         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1694         d.addCallback(_check_stdout_against, data="rfile4")
1695
1696         # and copy it a second time, which ought to overwrite the same files
1697         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1698
1699         # tahoe_ls doesn't currently handle the error correctly: it tries to
1700         # JSON-parse a traceback.
1701 ##         def _ls_missing(res):
1702 ##             argv = ["ls"] + nodeargs + ["bogus"]
1703 ##             return self._run_cli(argv)
1704 ##         d.addCallback(_ls_missing)
1705 ##         def _check_ls_missing((out,err)):
1706 ##             print "OUT", out
1707 ##             print "ERR", err
1708 ##             self.failUnlessEqual(err, "")
1709 ##         d.addCallback(_check_ls_missing)
1710
1711         return d
1712
1713     def _run_cli(self, argv, stdin=""):
1714         #print "CLI:", argv
1715         stdout, stderr = StringIO(), StringIO()
1716         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1717                                   stdin=StringIO(stdin),
1718                                   stdout=stdout, stderr=stderr)
1719         def _done(res):
1720             return stdout.getvalue(), stderr.getvalue()
1721         d.addCallback(_done)
1722         return d
1723
1724     def _test_checker(self, res):
1725         ut = upload.Data("too big to be literal" * 200, convergence=None)
1726         d = self._personal_node.add_file(u"big file", ut)
1727
1728         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1729         def _check_dirnode_results(r):
1730             self.failUnless(r.is_healthy())
1731         d.addCallback(_check_dirnode_results)
1732         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1733         d.addCallback(_check_dirnode_results)
1734
1735         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1736         def _got_chk_filenode(n):
1737             self.failUnless(isinstance(n, ImmutableFileNode))
1738             d = n.check(Monitor())
1739             def _check_filenode_results(r):
1740                 self.failUnless(r.is_healthy())
1741             d.addCallback(_check_filenode_results)
1742             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1743             d.addCallback(_check_filenode_results)
1744             return d
1745         d.addCallback(_got_chk_filenode)
1746
1747         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1748         def _got_lit_filenode(n):
1749             self.failUnless(isinstance(n, LiteralFileNode))
1750             d = n.check(Monitor())
1751             def _check_lit_filenode_results(r):
1752                 self.failUnlessEqual(r, None)
1753             d.addCallback(_check_lit_filenode_results)
1754             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1755             d.addCallback(_check_lit_filenode_results)
1756             return d
1757         d.addCallback(_got_lit_filenode)
1758         return d
1759