]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
Rewrite immutable downloader (#798). This patch adds and updates unit tests.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 import allmydata
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.literal import LiteralFileNode
13 from allmydata.immutable.filenode import ImmutableFileNode
14 from allmydata.util import idlib, mathutil
15 from allmydata.util import log, base32
16 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding
17 from allmydata.util.fileutil import abspath_expanduser_unicode
18 from allmydata.util.consumer import MemoryConsumer, download_to_data
19 from allmydata.scripts import runner
20 from allmydata.interfaces import IDirectoryNode, IFileNode, \
21      NoSuchChildError, NoSharesError
22 from allmydata.monitor import Monitor
23 from allmydata.mutable.common import NotWriteableError
24 from allmydata.mutable import layout as mutable_layout
25 from foolscap.api import DeadReferenceError
26 from twisted.python.failure import Failure
27 from twisted.web.client import getPage
28 from twisted.web.error import Error
29
30 from allmydata.test.common import SystemTestMixin
31
32 LARGE_DATA = """
33 This is some data to publish to the remote grid.., which needs to be large
34 enough to not fit inside a LIT uri.
35 """
36
37 class CountingDataUploadable(upload.Data):
38     bytes_read = 0
39     interrupt_after = None
40     interrupt_after_d = None
41
42     def read(self, length):
43         self.bytes_read += length
44         if self.interrupt_after is not None:
45             if self.bytes_read > self.interrupt_after:
46                 self.interrupt_after = None
47                 self.interrupt_after_d.callback(self)
48         return upload.Data.read(self, length)
49
50 class SystemTest(SystemTestMixin, unittest.TestCase):
51     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
52
53     def test_connections(self):
54         self.basedir = "system/SystemTest/test_connections"
55         d = self.set_up_nodes()
56         self.extra_node = None
57         d.addCallback(lambda res: self.add_extra_node(self.numclients))
58         def _check(extra_node):
59             self.extra_node = extra_node
60             for c in self.clients:
61                 all_peerids = c.get_storage_broker().get_all_serverids()
62                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
63                 sb = c.storage_broker
64                 permuted_peers = sb.get_servers_for_index("a")
65                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
66
67         d.addCallback(_check)
68         def _shutdown_extra_node(res):
69             if self.extra_node:
70                 return self.extra_node.stopService()
71             return res
72         d.addBoth(_shutdown_extra_node)
73         return d
74     # test_connections is subsumed by test_upload_and_download, and takes
75     # quite a while to run on a slow machine (because of all the TLS
76     # connections that must be established). If we ever rework the introducer
77     # code to such an extent that we're not sure if it works anymore, we can
78     # reinstate this test until it does.
79     del test_connections
80
81     def test_upload_and_download_random_key(self):
82         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
83         return self._test_upload_and_download(convergence=None)
84
85     def test_upload_and_download_convergent(self):
86         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
87         return self._test_upload_and_download(convergence="some convergence string")
88
89     def _test_upload_and_download(self, convergence):
90         # we use 4000 bytes of data, which will result in about 400k written
91         # to disk among all our simulated nodes
92         DATA = "Some data to upload\n" * 200
93         d = self.set_up_nodes()
94         def _check_connections(res):
95             for c in self.clients:
96                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
97                 all_peerids = c.get_storage_broker().get_all_serverids()
98                 self.failUnlessEqual(len(all_peerids), self.numclients)
99                 sb = c.storage_broker
100                 permuted_peers = sb.get_servers_for_index("a")
101                 self.failUnlessEqual(len(permuted_peers), self.numclients)
102         d.addCallback(_check_connections)
103
104         def _do_upload(res):
105             log.msg("UPLOADING")
106             u = self.clients[0].getServiceNamed("uploader")
107             self.uploader = u
108             # we crank the max segsize down to 1024b for the duration of this
109             # test, so we can exercise multiple segments. It is important
110             # that this is not a multiple of the segment size, so that the
111             # tail segment is not the same length as the others. This actualy
112             # gets rounded up to 1025 to be a multiple of the number of
113             # required shares (since we use 25 out of 100 FEC).
114             up = upload.Data(DATA, convergence=convergence)
115             up.max_segment_size = 1024
116             d1 = u.upload(up)
117             return d1
118         d.addCallback(_do_upload)
119         def _upload_done(results):
120             theuri = results.uri
121             log.msg("upload finished: uri is %s" % (theuri,))
122             self.uri = theuri
123             assert isinstance(self.uri, str), self.uri
124             self.cap = uri.from_string(self.uri)
125             self.n = self.clients[1].create_node_from_uri(self.uri)
126         d.addCallback(_upload_done)
127
128         def _upload_again(res):
129             # Upload again. If using convergent encryption then this ought to be
130             # short-circuited, however with the way we currently generate URIs
131             # (i.e. because they include the roothash), we have to do all of the
132             # encoding work, and only get to save on the upload part.
133             log.msg("UPLOADING AGAIN")
134             up = upload.Data(DATA, convergence=convergence)
135             up.max_segment_size = 1024
136             return self.uploader.upload(up)
137         d.addCallback(_upload_again)
138
139         def _download_to_data(res):
140             log.msg("DOWNLOADING")
141             return download_to_data(self.n)
142         d.addCallback(_download_to_data)
143         def _download_to_data_done(data):
144             log.msg("download finished")
145             self.failUnlessEqual(data, DATA)
146         d.addCallback(_download_to_data_done)
147
148         def _test_read(res):
149             n = self.clients[1].create_node_from_uri(self.uri)
150             d = download_to_data(n)
151             def _read_done(data):
152                 self.failUnlessEqual(data, DATA)
153             d.addCallback(_read_done)
154             d.addCallback(lambda ign:
155                           n.read(MemoryConsumer(), offset=1, size=4))
156             def _read_portion_done(mc):
157                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
158             d.addCallback(_read_portion_done)
159             d.addCallback(lambda ign:
160                           n.read(MemoryConsumer(), offset=2, size=None))
161             def _read_tail_done(mc):
162                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
163             d.addCallback(_read_tail_done)
164             d.addCallback(lambda ign:
165                           n.read(MemoryConsumer(), size=len(DATA)+1000))
166             def _read_too_much(mc):
167                 self.failUnlessEqual("".join(mc.chunks), DATA)
168             d.addCallback(_read_too_much)
169
170             return d
171         d.addCallback(_test_read)
172
173         def _test_bad_read(res):
174             bad_u = uri.from_string_filenode(self.uri)
175             bad_u.key = self.flip_bit(bad_u.key)
176             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
177             # this should cause an error during download
178
179             d = self.shouldFail2(NoSharesError, "'download bad node'",
180                                  None,
181                                  bad_n.read, MemoryConsumer(), offset=2)
182             return d
183         d.addCallback(_test_bad_read)
184
185         def _download_nonexistent_uri(res):
186             baduri = self.mangle_uri(self.uri)
187             badnode = self.clients[1].create_node_from_uri(baduri)
188             log.msg("about to download non-existent URI", level=log.UNUSUAL,
189                     facility="tahoe.tests")
190             d1 = download_to_data(badnode)
191             def _baduri_should_fail(res):
192                 log.msg("finished downloading non-existend URI",
193                         level=log.UNUSUAL, facility="tahoe.tests")
194                 self.failUnless(isinstance(res, Failure))
195                 self.failUnless(res.check(NoSharesError),
196                                 "expected NoSharesError, got %s" % res)
197             d1.addBoth(_baduri_should_fail)
198             return d1
199         d.addCallback(_download_nonexistent_uri)
200
201         # add a new node, which doesn't accept shares, and only uses the
202         # helper for upload.
203         d.addCallback(lambda res: self.add_extra_node(self.numclients,
204                                                       self.helper_furl,
205                                                       add_to_sparent=True))
206         def _added(extra_node):
207             self.extra_node = extra_node
208             self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
209         d.addCallback(_added)
210
211         HELPER_DATA = "Data that needs help to upload" * 1000
212         def _upload_with_helper(res):
213             u = upload.Data(HELPER_DATA, convergence=convergence)
214             d = self.extra_node.upload(u)
215             def _uploaded(results):
216                 n = self.clients[1].create_node_from_uri(results.uri)
217                 return download_to_data(n)
218             d.addCallback(_uploaded)
219             def _check(newdata):
220                 self.failUnlessEqual(newdata, HELPER_DATA)
221             d.addCallback(_check)
222             return d
223         d.addCallback(_upload_with_helper)
224
225         def _upload_duplicate_with_helper(res):
226             u = upload.Data(HELPER_DATA, convergence=convergence)
227             u.debug_stash_RemoteEncryptedUploadable = True
228             d = self.extra_node.upload(u)
229             def _uploaded(results):
230                 n = self.clients[1].create_node_from_uri(results.uri)
231                 return download_to_data(n)
232             d.addCallback(_uploaded)
233             def _check(newdata):
234                 self.failUnlessEqual(newdata, HELPER_DATA)
235                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
236                             "uploadable started uploading, should have been avoided")
237             d.addCallback(_check)
238             return d
239         if convergence is not None:
240             d.addCallback(_upload_duplicate_with_helper)
241
242         def _upload_resumable(res):
243             DATA = "Data that needs help to upload and gets interrupted" * 1000
244             u1 = CountingDataUploadable(DATA, convergence=convergence)
245             u2 = CountingDataUploadable(DATA, convergence=convergence)
246
247             # we interrupt the connection after about 5kB by shutting down
248             # the helper, then restartingit.
249             u1.interrupt_after = 5000
250             u1.interrupt_after_d = defer.Deferred()
251             u1.interrupt_after_d.addCallback(lambda res:
252                                              self.bounce_client(0))
253
254             # sneak into the helper and reduce its chunk size, so that our
255             # debug_interrupt will sever the connection on about the fifth
256             # chunk fetched. This makes sure that we've started to write the
257             # new shares before we abandon them, which exercises the
258             # abort/delete-partial-share code. TODO: find a cleaner way to do
259             # this. I know that this will affect later uses of the helper in
260             # this same test run, but I'm not currently worried about it.
261             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
262
263             d = self.extra_node.upload(u1)
264
265             def _should_not_finish(res):
266                 self.fail("interrupted upload should have failed, not finished"
267                           " with result %s" % (res,))
268             def _interrupted(f):
269                 f.trap(DeadReferenceError)
270
271                 # make sure we actually interrupted it before finishing the
272                 # file
273                 self.failUnless(u1.bytes_read < len(DATA),
274                                 "read %d out of %d total" % (u1.bytes_read,
275                                                              len(DATA)))
276
277                 log.msg("waiting for reconnect", level=log.NOISY,
278                         facility="tahoe.test.test_system")
279                 # now, we need to give the nodes a chance to notice that this
280                 # connection has gone away. When this happens, the storage
281                 # servers will be told to abort their uploads, removing the
282                 # partial shares. Unfortunately this involves TCP messages
283                 # going through the loopback interface, and we can't easily
284                 # predict how long that will take. If it were all local, we
285                 # could use fireEventually() to stall. Since we don't have
286                 # the right introduction hooks, the best we can do is use a
287                 # fixed delay. TODO: this is fragile.
288                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
289                 return u1.interrupt_after_d
290             d.addCallbacks(_should_not_finish, _interrupted)
291
292             def _disconnected(res):
293                 # check to make sure the storage servers aren't still hanging
294                 # on to the partial share: their incoming/ directories should
295                 # now be empty.
296                 log.msg("disconnected", level=log.NOISY,
297                         facility="tahoe.test.test_system")
298                 for i in range(self.numclients):
299                     incdir = os.path.join(self.getdir("client%d" % i),
300                                           "storage", "shares", "incoming")
301                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
302             d.addCallback(_disconnected)
303
304             # then we need to give the reconnector a chance to
305             # reestablish the connection to the helper.
306             d.addCallback(lambda res:
307                           log.msg("wait_for_connections", level=log.NOISY,
308                                   facility="tahoe.test.test_system"))
309             d.addCallback(lambda res: self.wait_for_connections())
310
311
312             d.addCallback(lambda res:
313                           log.msg("uploading again", level=log.NOISY,
314                                   facility="tahoe.test.test_system"))
315             d.addCallback(lambda res: self.extra_node.upload(u2))
316
317             def _uploaded(results):
318                 cap = results.uri
319                 log.msg("Second upload complete", level=log.NOISY,
320                         facility="tahoe.test.test_system")
321
322                 # this is really bytes received rather than sent, but it's
323                 # convenient and basically measures the same thing
324                 bytes_sent = results.ciphertext_fetched
325                 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
326
327                 # We currently don't support resumption of upload if the data is
328                 # encrypted with a random key.  (Because that would require us
329                 # to store the key locally and re-use it on the next upload of
330                 # this file, which isn't a bad thing to do, but we currently
331                 # don't do it.)
332                 if convergence is not None:
333                     # Make sure we did not have to read the whole file the
334                     # second time around .
335                     self.failUnless(bytes_sent < len(DATA),
336                                     "resumption didn't save us any work:"
337                                     " read %r bytes out of %r total" %
338                                     (bytes_sent, len(DATA)))
339                 else:
340                     # Make sure we did have to read the whole file the second
341                     # time around -- because the one that we partially uploaded
342                     # earlier was encrypted with a different random key.
343                     self.failIf(bytes_sent < len(DATA),
344                                 "resumption saved us some work even though we were using random keys:"
345                                 " read %r bytes out of %r total" %
346                                 (bytes_sent, len(DATA)))
347                 n = self.clients[1].create_node_from_uri(cap)
348                 return download_to_data(n)
349             d.addCallback(_uploaded)
350
351             def _check(newdata):
352                 self.failUnlessEqual(newdata, DATA)
353                 # If using convergent encryption, then also check that the
354                 # helper has removed the temp file from its directories.
355                 if convergence is not None:
356                     basedir = os.path.join(self.getdir("client0"), "helper")
357                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
358                     self.failUnlessEqual(files, [])
359                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
360                     self.failUnlessEqual(files, [])
361             d.addCallback(_check)
362             return d
363         d.addCallback(_upload_resumable)
364
365         def _grab_stats(ignored):
366             # the StatsProvider doesn't normally publish a FURL:
367             # instead it passes a live reference to the StatsGatherer
368             # (if and when it connects). To exercise the remote stats
369             # interface, we manually publish client0's StatsProvider
370             # and use client1 to query it.
371             sp = self.clients[0].stats_provider
372             sp_furl = self.clients[0].tub.registerReference(sp)
373             d = self.clients[1].tub.getReference(sp_furl)
374             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
375             def _got_stats(stats):
376                 #print "STATS"
377                 #from pprint import pprint
378                 #pprint(stats)
379                 s = stats["stats"]
380                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
381                 c = stats["counters"]
382                 self.failUnless("storage_server.allocate" in c)
383             d.addCallback(_got_stats)
384             return d
385         d.addCallback(_grab_stats)
386
387         return d
388
389     def _find_all_shares(self, basedir):
390         shares = []
391         for (dirpath, dirnames, filenames) in os.walk(basedir):
392             if "storage" not in dirpath:
393                 continue
394             if not filenames:
395                 continue
396             pieces = dirpath.split(os.sep)
397             if (len(pieces) >= 5
398                 and pieces[-4] == "storage"
399                 and pieces[-3] == "shares"):
400                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
401                 # are sharefiles here
402                 assert pieces[-5].startswith("client")
403                 client_num = int(pieces[-5][-1])
404                 storage_index_s = pieces[-1]
405                 storage_index = si_a2b(storage_index_s)
406                 for sharename in filenames:
407                     shnum = int(sharename)
408                     filename = os.path.join(dirpath, sharename)
409                     data = (client_num, storage_index, filename, shnum)
410                     shares.append(data)
411         if not shares:
412             self.fail("unable to find any share files in %s" % basedir)
413         return shares
414
415     def _corrupt_mutable_share(self, filename, which):
416         msf = MutableShareFile(filename)
417         datav = msf.readv([ (0, 1000000) ])
418         final_share = datav[0]
419         assert len(final_share) < 1000000 # ought to be truncated
420         pieces = mutable_layout.unpack_share(final_share)
421         (seqnum, root_hash, IV, k, N, segsize, datalen,
422          verification_key, signature, share_hash_chain, block_hash_tree,
423          share_data, enc_privkey) = pieces
424
425         if which == "seqnum":
426             seqnum = seqnum + 15
427         elif which == "R":
428             root_hash = self.flip_bit(root_hash)
429         elif which == "IV":
430             IV = self.flip_bit(IV)
431         elif which == "segsize":
432             segsize = segsize + 15
433         elif which == "pubkey":
434             verification_key = self.flip_bit(verification_key)
435         elif which == "signature":
436             signature = self.flip_bit(signature)
437         elif which == "share_hash_chain":
438             nodenum = share_hash_chain.keys()[0]
439             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
440         elif which == "block_hash_tree":
441             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
442         elif which == "share_data":
443             share_data = self.flip_bit(share_data)
444         elif which == "encprivkey":
445             enc_privkey = self.flip_bit(enc_privkey)
446
447         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
448                                             segsize, datalen)
449         final_share = mutable_layout.pack_share(prefix,
450                                                 verification_key,
451                                                 signature,
452                                                 share_hash_chain,
453                                                 block_hash_tree,
454                                                 share_data,
455                                                 enc_privkey)
456         msf.writev( [(0, final_share)], None)
457
458
459     def test_mutable(self):
460         self.basedir = "system/SystemTest/test_mutable"
461         DATA = "initial contents go here."  # 25 bytes % 3 != 0
462         NEWDATA = "new contents yay"
463         NEWERDATA = "this is getting old"
464
465         d = self.set_up_nodes(use_key_generator=True)
466
467         def _create_mutable(res):
468             c = self.clients[0]
469             log.msg("starting create_mutable_file")
470             d1 = c.create_mutable_file(DATA)
471             def _done(res):
472                 log.msg("DONE: %s" % (res,))
473                 self._mutable_node_1 = res
474             d1.addCallback(_done)
475             return d1
476         d.addCallback(_create_mutable)
477
478         def _test_debug(res):
479             # find a share. It is important to run this while there is only
480             # one slot in the grid.
481             shares = self._find_all_shares(self.basedir)
482             (client_num, storage_index, filename, shnum) = shares[0]
483             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
484                     % filename)
485             log.msg(" for clients[%d]" % client_num)
486
487             out,err = StringIO(), StringIO()
488             rc = runner.runner(["debug", "dump-share", "--offsets",
489                                 filename],
490                                stdout=out, stderr=err)
491             output = out.getvalue()
492             self.failUnlessEqual(rc, 0)
493             try:
494                 self.failUnless("Mutable slot found:\n" in output)
495                 self.failUnless("share_type: SDMF\n" in output)
496                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
497                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
498                 self.failUnless(" num_extra_leases: 0\n" in output)
499                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
500                                 in output)
501                 self.failUnless(" SDMF contents:\n" in output)
502                 self.failUnless("  seqnum: 1\n" in output)
503                 self.failUnless("  required_shares: 3\n" in output)
504                 self.failUnless("  total_shares: 10\n" in output)
505                 self.failUnless("  segsize: 27\n" in output, (output, filename))
506                 self.failUnless("  datalen: 25\n" in output)
507                 # the exact share_hash_chain nodes depends upon the sharenum,
508                 # and is more of a hassle to compute than I want to deal with
509                 # now
510                 self.failUnless("  share_hash_chain: " in output)
511                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
512                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
513                             base32.b2a(storage_index))
514                 self.failUnless(expected in output)
515             except unittest.FailTest:
516                 print
517                 print "dump-share output was:"
518                 print output
519                 raise
520         d.addCallback(_test_debug)
521
522         # test retrieval
523
524         # first, let's see if we can use the existing node to retrieve the
525         # contents. This allows it to use the cached pubkey and maybe the
526         # latest-known sharemap.
527
528         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
529         def _check_download_1(res):
530             self.failUnlessEqual(res, DATA)
531             # now we see if we can retrieve the data from a new node,
532             # constructed using the URI of the original one. We do this test
533             # on the same client that uploaded the data.
534             uri = self._mutable_node_1.get_uri()
535             log.msg("starting retrieve1")
536             newnode = self.clients[0].create_node_from_uri(uri)
537             newnode_2 = self.clients[0].create_node_from_uri(uri)
538             self.failUnlessIdentical(newnode, newnode_2)
539             return newnode.download_best_version()
540         d.addCallback(_check_download_1)
541
542         def _check_download_2(res):
543             self.failUnlessEqual(res, DATA)
544             # same thing, but with a different client
545             uri = self._mutable_node_1.get_uri()
546             newnode = self.clients[1].create_node_from_uri(uri)
547             log.msg("starting retrieve2")
548             d1 = newnode.download_best_version()
549             d1.addCallback(lambda res: (res, newnode))
550             return d1
551         d.addCallback(_check_download_2)
552
553         def _check_download_3((res, newnode)):
554             self.failUnlessEqual(res, DATA)
555             # replace the data
556             log.msg("starting replace1")
557             d1 = newnode.overwrite(NEWDATA)
558             d1.addCallback(lambda res: newnode.download_best_version())
559             return d1
560         d.addCallback(_check_download_3)
561
562         def _check_download_4(res):
563             self.failUnlessEqual(res, NEWDATA)
564             # now create an even newer node and replace the data on it. This
565             # new node has never been used for download before.
566             uri = self._mutable_node_1.get_uri()
567             newnode1 = self.clients[2].create_node_from_uri(uri)
568             newnode2 = self.clients[3].create_node_from_uri(uri)
569             self._newnode3 = self.clients[3].create_node_from_uri(uri)
570             log.msg("starting replace2")
571             d1 = newnode1.overwrite(NEWERDATA)
572             d1.addCallback(lambda res: newnode2.download_best_version())
573             return d1
574         d.addCallback(_check_download_4)
575
576         def _check_download_5(res):
577             log.msg("finished replace2")
578             self.failUnlessEqual(res, NEWERDATA)
579         d.addCallback(_check_download_5)
580
581         def _corrupt_shares(res):
582             # run around and flip bits in all but k of the shares, to test
583             # the hash checks
584             shares = self._find_all_shares(self.basedir)
585             ## sort by share number
586             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
587             where = dict([ (shnum, filename)
588                            for (client_num, storage_index, filename, shnum)
589                            in shares ])
590             assert len(where) == 10 # this test is designed for 3-of-10
591             for shnum, filename in where.items():
592                 # shares 7,8,9 are left alone. read will check
593                 # (share_hash_chain, block_hash_tree, share_data). New
594                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
595                 # segsize, signature).
596                 if shnum == 0:
597                     # read: this will trigger "pubkey doesn't match
598                     # fingerprint".
599                     self._corrupt_mutable_share(filename, "pubkey")
600                     self._corrupt_mutable_share(filename, "encprivkey")
601                 elif shnum == 1:
602                     # triggers "signature is invalid"
603                     self._corrupt_mutable_share(filename, "seqnum")
604                 elif shnum == 2:
605                     # triggers "signature is invalid"
606                     self._corrupt_mutable_share(filename, "R")
607                 elif shnum == 3:
608                     # triggers "signature is invalid"
609                     self._corrupt_mutable_share(filename, "segsize")
610                 elif shnum == 4:
611                     self._corrupt_mutable_share(filename, "share_hash_chain")
612                 elif shnum == 5:
613                     self._corrupt_mutable_share(filename, "block_hash_tree")
614                 elif shnum == 6:
615                     self._corrupt_mutable_share(filename, "share_data")
616                 # other things to correct: IV, signature
617                 # 7,8,9 are left alone
618
619                 # note that initial_query_count=5 means that we'll hit the
620                 # first 5 servers in effectively random order (based upon
621                 # response time), so we won't necessarily ever get a "pubkey
622                 # doesn't match fingerprint" error (if we hit shnum>=1 before
623                 # shnum=0, we pull the pubkey from there). To get repeatable
624                 # specific failures, we need to set initial_query_count=1,
625                 # but of course that will change the sequencing behavior of
626                 # the retrieval process. TODO: find a reasonable way to make
627                 # this a parameter, probably when we expand this test to test
628                 # for one failure mode at a time.
629
630                 # when we retrieve this, we should get three signature
631                 # failures (where we've mangled seqnum, R, and segsize). The
632                 # pubkey mangling
633         d.addCallback(_corrupt_shares)
634
635         d.addCallback(lambda res: self._newnode3.download_best_version())
636         d.addCallback(_check_download_5)
637
638         def _check_empty_file(res):
639             # make sure we can create empty files, this usually screws up the
640             # segsize math
641             d1 = self.clients[2].create_mutable_file("")
642             d1.addCallback(lambda newnode: newnode.download_best_version())
643             d1.addCallback(lambda res: self.failUnlessEqual("", res))
644             return d1
645         d.addCallback(_check_empty_file)
646
647         d.addCallback(lambda res: self.clients[0].create_dirnode())
648         def _created_dirnode(dnode):
649             log.msg("_created_dirnode(%s)" % (dnode,))
650             d1 = dnode.list()
651             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
652             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
653             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
654             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
655             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
656             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
657             d1.addCallback(lambda res: dnode.build_manifest().when_done())
658             d1.addCallback(lambda res:
659                            self.failUnlessEqual(len(res["manifest"]), 1))
660             return d1
661         d.addCallback(_created_dirnode)
662
663         def wait_for_c3_kg_conn():
664             return self.clients[3]._key_generator is not None
665         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
666
667         def check_kg_poolsize(junk, size_delta):
668             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
669                                  self.key_generator_svc.key_generator.pool_size + size_delta)
670
671         d.addCallback(check_kg_poolsize, 0)
672         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
673         d.addCallback(check_kg_poolsize, -1)
674         d.addCallback(lambda junk: self.clients[3].create_dirnode())
675         d.addCallback(check_kg_poolsize, -2)
676         # use_helper induces use of clients[3], which is the using-key_gen client
677         d.addCallback(lambda junk:
678                       self.POST("uri?t=mkdir&name=george", use_helper=True))
679         d.addCallback(check_kg_poolsize, -3)
680
681         return d
682
683     def flip_bit(self, good):
684         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
685
686     def mangle_uri(self, gooduri):
687         # change the key, which changes the storage index, which means we'll
688         # be asking about the wrong file, so nobody will have any shares
689         u = uri.from_string(gooduri)
690         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
691                             uri_extension_hash=u.uri_extension_hash,
692                             needed_shares=u.needed_shares,
693                             total_shares=u.total_shares,
694                             size=u.size)
695         return u2.to_string()
696
697     # TODO: add a test which mangles the uri_extension_hash instead, and
698     # should fail due to not being able to get a valid uri_extension block.
699     # Also a test which sneakily mangles the uri_extension block to change
700     # some of the validation data, so it will fail in the post-download phase
701     # when the file's crypttext integrity check fails. Do the same thing for
702     # the key, which should cause the download to fail the post-download
703     # plaintext_hash check.
704
705     def test_filesystem(self):
706         self.basedir = "system/SystemTest/test_filesystem"
707         self.data = LARGE_DATA
708         d = self.set_up_nodes(use_stats_gatherer=True)
709         def _new_happy_semantics(ign):
710             for c in self.clients:
711                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
712         d.addCallback(_new_happy_semantics)
713         d.addCallback(self._test_introweb)
714         d.addCallback(self.log, "starting publish")
715         d.addCallback(self._do_publish1)
716         d.addCallback(self._test_runner)
717         d.addCallback(self._do_publish2)
718         # at this point, we have the following filesystem (where "R" denotes
719         # self._root_directory_uri):
720         # R
721         # R/subdir1
722         # R/subdir1/mydata567
723         # R/subdir1/subdir2/
724         # R/subdir1/subdir2/mydata992
725
726         d.addCallback(lambda res: self.bounce_client(0))
727         d.addCallback(self.log, "bounced client0")
728
729         d.addCallback(self._check_publish1)
730         d.addCallback(self.log, "did _check_publish1")
731         d.addCallback(self._check_publish2)
732         d.addCallback(self.log, "did _check_publish2")
733         d.addCallback(self._do_publish_private)
734         d.addCallback(self.log, "did _do_publish_private")
735         # now we also have (where "P" denotes a new dir):
736         #  P/personal/sekrit data
737         #  P/s2-rw -> /subdir1/subdir2/
738         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
739         d.addCallback(self._check_publish_private)
740         d.addCallback(self.log, "did _check_publish_private")
741         d.addCallback(self._test_web)
742         d.addCallback(self._test_control)
743         d.addCallback(self._test_cli)
744         # P now has four top-level children:
745         # P/personal/sekrit data
746         # P/s2-ro/
747         # P/s2-rw/
748         # P/test_put/  (empty)
749         d.addCallback(self._test_checker)
750         return d
751
752     def _test_introweb(self, res):
753         d = getPage(self.introweb_url, method="GET", followRedirect=True)
754         def _check(res):
755             try:
756                 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__)
757                                 in res)
758                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
759                 self.failUnless("Subscription Summary: storage: 5" in res)
760             except unittest.FailTest:
761                 print
762                 print "GET %s output was:" % self.introweb_url
763                 print res
764                 raise
765         d.addCallback(_check)
766         d.addCallback(lambda res:
767                       getPage(self.introweb_url + "?t=json",
768                               method="GET", followRedirect=True))
769         def _check_json(res):
770             data = simplejson.loads(res)
771             try:
772                 self.failUnlessEqual(data["subscription_summary"],
773                                      {"storage": 5})
774                 self.failUnlessEqual(data["announcement_summary"],
775                                      {"storage": 5, "stub_client": 5})
776                 self.failUnlessEqual(data["announcement_distinct_hosts"],
777                                      {"storage": 1, "stub_client": 1})
778             except unittest.FailTest:
779                 print
780                 print "GET %s?t=json output was:" % self.introweb_url
781                 print res
782                 raise
783         d.addCallback(_check_json)
784         return d
785
786     def _do_publish1(self, res):
787         ut = upload.Data(self.data, convergence=None)
788         c0 = self.clients[0]
789         d = c0.create_dirnode()
790         def _made_root(new_dirnode):
791             self._root_directory_uri = new_dirnode.get_uri()
792             return c0.create_node_from_uri(self._root_directory_uri)
793         d.addCallback(_made_root)
794         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
795         def _made_subdir1(subdir1_node):
796             self._subdir1_node = subdir1_node
797             d1 = subdir1_node.add_file(u"mydata567", ut)
798             d1.addCallback(self.log, "publish finished")
799             def _stash_uri(filenode):
800                 self.uri = filenode.get_uri()
801                 assert isinstance(self.uri, str), (self.uri, filenode)
802             d1.addCallback(_stash_uri)
803             return d1
804         d.addCallback(_made_subdir1)
805         return d
806
807     def _do_publish2(self, res):
808         ut = upload.Data(self.data, convergence=None)
809         d = self._subdir1_node.create_subdirectory(u"subdir2")
810         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
811         return d
812
813     def log(self, res, *args, **kwargs):
814         # print "MSG: %s  RES: %s" % (msg, args)
815         log.msg(*args, **kwargs)
816         return res
817
818     def _do_publish_private(self, res):
819         self.smalldata = "sssh, very secret stuff"
820         ut = upload.Data(self.smalldata, convergence=None)
821         d = self.clients[0].create_dirnode()
822         d.addCallback(self.log, "GOT private directory")
823         def _got_new_dir(privnode):
824             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
825             d1 = privnode.create_subdirectory(u"personal")
826             d1.addCallback(self.log, "made P/personal")
827             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
828             d1.addCallback(self.log, "made P/personal/sekrit data")
829             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
830             def _got_s2(s2node):
831                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
832                                       s2node.get_readonly_uri())
833                 d2.addCallback(lambda node:
834                                privnode.set_uri(u"s2-ro",
835                                                 s2node.get_readonly_uri(),
836                                                 s2node.get_readonly_uri()))
837                 return d2
838             d1.addCallback(_got_s2)
839             d1.addCallback(lambda res: privnode)
840             return d1
841         d.addCallback(_got_new_dir)
842         return d
843
844     def _check_publish1(self, res):
845         # this one uses the iterative API
846         c1 = self.clients[1]
847         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
848         d.addCallback(self.log, "check_publish1 got /")
849         d.addCallback(lambda root: root.get(u"subdir1"))
850         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
851         d.addCallback(lambda filenode: download_to_data(filenode))
852         d.addCallback(self.log, "get finished")
853         def _get_done(data):
854             self.failUnlessEqual(data, self.data)
855         d.addCallback(_get_done)
856         return d
857
858     def _check_publish2(self, res):
859         # this one uses the path-based API
860         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
861         d = rootnode.get_child_at_path(u"subdir1")
862         d.addCallback(lambda dirnode:
863                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
864         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
865         d.addCallback(lambda filenode: download_to_data(filenode))
866         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
867
868         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
869         def _got_filenode(filenode):
870             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
871             assert fnode == filenode
872         d.addCallback(_got_filenode)
873         return d
874
875     def _check_publish_private(self, resnode):
876         # this one uses the path-based API
877         self._private_node = resnode
878
879         d = self._private_node.get_child_at_path(u"personal")
880         def _got_personal(personal):
881             self._personal_node = personal
882             return personal
883         d.addCallback(_got_personal)
884
885         d.addCallback(lambda dirnode:
886                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
887         def get_path(path):
888             return self._private_node.get_child_at_path(path)
889
890         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
891         d.addCallback(lambda filenode: download_to_data(filenode))
892         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
893         d.addCallback(lambda res: get_path(u"s2-rw"))
894         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
895         d.addCallback(lambda res: get_path(u"s2-ro"))
896         def _got_s2ro(dirnode):
897             self.failUnless(dirnode.is_mutable(), dirnode)
898             self.failUnless(dirnode.is_readonly(), dirnode)
899             d1 = defer.succeed(None)
900             d1.addCallback(lambda res: dirnode.list())
901             d1.addCallback(self.log, "dirnode.list")
902
903             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
904
905             d1.addCallback(self.log, "doing add_file(ro)")
906             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
907             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
908
909             d1.addCallback(self.log, "doing get(ro)")
910             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
911             d1.addCallback(lambda filenode:
912                            self.failUnless(IFileNode.providedBy(filenode)))
913
914             d1.addCallback(self.log, "doing delete(ro)")
915             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
916
917             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
918
919             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
920
921             personal = self._personal_node
922             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
923
924             d1.addCallback(self.log, "doing move_child_to(ro)2")
925             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
926
927             d1.addCallback(self.log, "finished with _got_s2ro")
928             return d1
929         d.addCallback(_got_s2ro)
930         def _got_home(dummy):
931             home = self._private_node
932             personal = self._personal_node
933             d1 = defer.succeed(None)
934             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
935             d1.addCallback(lambda res:
936                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
937
938             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
939             d1.addCallback(lambda res:
940                            home.move_child_to(u"sekrit", home, u"sekrit data"))
941
942             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
943             d1.addCallback(lambda res:
944                            home.move_child_to(u"sekrit data", personal))
945
946             d1.addCallback(lambda res: home.build_manifest().when_done())
947             d1.addCallback(self.log, "manifest")
948             #  five items:
949             # P/
950             # P/personal/
951             # P/personal/sekrit data
952             # P/s2-rw  (same as P/s2-ro)
953             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
954             d1.addCallback(lambda res:
955                            self.failUnlessEqual(len(res["manifest"]), 5))
956             d1.addCallback(lambda res: home.start_deep_stats().when_done())
957             def _check_stats(stats):
958                 expected = {"count-immutable-files": 1,
959                             "count-mutable-files": 0,
960                             "count-literal-files": 1,
961                             "count-files": 2,
962                             "count-directories": 3,
963                             "size-immutable-files": 112,
964                             "size-literal-files": 23,
965                             #"size-directories": 616, # varies
966                             #"largest-directory": 616,
967                             "largest-directory-children": 3,
968                             "largest-immutable-file": 112,
969                             }
970                 for k,v in expected.iteritems():
971                     self.failUnlessEqual(stats[k], v,
972                                          "stats[%s] was %s, not %s" %
973                                          (k, stats[k], v))
974                 self.failUnless(stats["size-directories"] > 1300,
975                                 stats["size-directories"])
976                 self.failUnless(stats["largest-directory"] > 800,
977                                 stats["largest-directory"])
978                 self.failUnlessEqual(stats["size-files-histogram"],
979                                      [ (11, 31, 1), (101, 316, 1) ])
980             d1.addCallback(_check_stats)
981             return d1
982         d.addCallback(_got_home)
983         return d
984
985     def shouldFail(self, res, expected_failure, which, substring=None):
986         if isinstance(res, Failure):
987             res.trap(expected_failure)
988             if substring:
989                 self.failUnless(substring in str(res),
990                                 "substring '%s' not in '%s'"
991                                 % (substring, str(res)))
992         else:
993             self.fail("%s was supposed to raise %s, not get '%s'" %
994                       (which, expected_failure, res))
995
996     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
997         assert substring is None or isinstance(substring, str)
998         d = defer.maybeDeferred(callable, *args, **kwargs)
999         def done(res):
1000             if isinstance(res, Failure):
1001                 res.trap(expected_failure)
1002                 if substring:
1003                     self.failUnless(substring in str(res),
1004                                     "substring '%s' not in '%s'"
1005                                     % (substring, str(res)))
1006             else:
1007                 self.fail("%s was supposed to raise %s, not get '%s'" %
1008                           (which, expected_failure, res))
1009         d.addBoth(done)
1010         return d
1011
1012     def PUT(self, urlpath, data):
1013         url = self.webish_url + urlpath
1014         return getPage(url, method="PUT", postdata=data)
1015
1016     def GET(self, urlpath, followRedirect=False):
1017         url = self.webish_url + urlpath
1018         return getPage(url, method="GET", followRedirect=followRedirect)
1019
1020     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1021         sepbase = "boogabooga"
1022         sep = "--" + sepbase
1023         form = []
1024         form.append(sep)
1025         form.append('Content-Disposition: form-data; name="_charset"')
1026         form.append('')
1027         form.append('UTF-8')
1028         form.append(sep)
1029         for name, value in fields.iteritems():
1030             if isinstance(value, tuple):
1031                 filename, value = value
1032                 form.append('Content-Disposition: form-data; name="%s"; '
1033                             'filename="%s"' % (name, filename.encode("utf-8")))
1034             else:
1035                 form.append('Content-Disposition: form-data; name="%s"' % name)
1036             form.append('')
1037             form.append(str(value))
1038             form.append(sep)
1039         form[-1] += "--"
1040         body = ""
1041         headers = {}
1042         if fields:
1043             body = "\r\n".join(form) + "\r\n"
1044             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1045         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1046
1047     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1048               use_helper=False):
1049         if use_helper:
1050             url = self.helper_webish_url + urlpath
1051         else:
1052             url = self.webish_url + urlpath
1053         return getPage(url, method="POST", postdata=body, headers=headers,
1054                        followRedirect=followRedirect)
1055
1056     def _test_web(self, res):
1057         base = self.webish_url
1058         public = "uri/" + self._root_directory_uri
1059         d = getPage(base)
1060         def _got_welcome(page):
1061             # XXX This test is oversensitive to formatting
1062             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1063             self.failUnless(expected in page,
1064                             "I didn't see the right 'connected storage servers'"
1065                             " message in: %s" % page
1066                             )
1067             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1068             self.failUnless(expected in page,
1069                             "I didn't see the right 'My nodeid' message "
1070                             "in: %s" % page)
1071             self.failUnless("Helper: 0 active uploads" in page)
1072         d.addCallback(_got_welcome)
1073         d.addCallback(self.log, "done with _got_welcome")
1074
1075         # get the welcome page from the node that uses the helper too
1076         d.addCallback(lambda res: getPage(self.helper_webish_url))
1077         def _got_welcome_helper(page):
1078             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1079                             page)
1080             self.failUnless("Not running helper" in page)
1081         d.addCallback(_got_welcome_helper)
1082
1083         d.addCallback(lambda res: getPage(base + public))
1084         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1085         def _got_subdir1(page):
1086             # there ought to be an href for our file
1087             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1088             self.failUnless(">mydata567</a>" in page)
1089         d.addCallback(_got_subdir1)
1090         d.addCallback(self.log, "done with _got_subdir1")
1091         d.addCallback(lambda res:
1092                       getPage(base + public + "/subdir1/mydata567"))
1093         def _got_data(page):
1094             self.failUnlessEqual(page, self.data)
1095         d.addCallback(_got_data)
1096
1097         # download from a URI embedded in a URL
1098         d.addCallback(self.log, "_get_from_uri")
1099         def _get_from_uri(res):
1100             return getPage(base + "uri/%s?filename=%s"
1101                            % (self.uri, "mydata567"))
1102         d.addCallback(_get_from_uri)
1103         def _got_from_uri(page):
1104             self.failUnlessEqual(page, self.data)
1105         d.addCallback(_got_from_uri)
1106
1107         # download from a URI embedded in a URL, second form
1108         d.addCallback(self.log, "_get_from_uri2")
1109         def _get_from_uri2(res):
1110             return getPage(base + "uri?uri=%s" % (self.uri,))
1111         d.addCallback(_get_from_uri2)
1112         d.addCallback(_got_from_uri)
1113
1114         # download from a bogus URI, make sure we get a reasonable error
1115         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1116         def _get_from_bogus_uri(res):
1117             d1 = getPage(base + "uri/%s?filename=%s"
1118                          % (self.mangle_uri(self.uri), "mydata567"))
1119             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1120                        "410")
1121             return d1
1122         d.addCallback(_get_from_bogus_uri)
1123         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1124
1125         # upload a file with PUT
1126         d.addCallback(self.log, "about to try PUT")
1127         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1128                                            "new.txt contents"))
1129         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1130         d.addCallback(self.failUnlessEqual, "new.txt contents")
1131         # and again with something large enough to use multiple segments,
1132         # and hopefully trigger pauseProducing too
1133         def _new_happy_semantics(ign):
1134             for c in self.clients:
1135                 # these get reset somewhere? Whatever.
1136                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1137         d.addCallback(_new_happy_semantics)
1138         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1139                                            "big" * 500000)) # 1.5MB
1140         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1141         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1142
1143         # can we replace files in place?
1144         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1145                                            "NEWER contents"))
1146         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1147         d.addCallback(self.failUnlessEqual, "NEWER contents")
1148
1149         # test unlinked POST
1150         d.addCallback(lambda res: self.POST("uri", t="upload",
1151                                             file=("new.txt", "data" * 10000)))
1152         # and again using the helper, which exercises different upload-status
1153         # display code
1154         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1155                                             file=("foo.txt", "data2" * 10000)))
1156
1157         # check that the status page exists
1158         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1159         def _got_status(res):
1160             # find an interesting upload and download to look at. LIT files
1161             # are not interesting.
1162             h = self.clients[0].get_history()
1163             for ds in h.list_all_download_statuses():
1164                 if ds.get_size() > 200:
1165                     self._down_status = ds.get_counter()
1166             for us in h.list_all_upload_statuses():
1167                 if us.get_size() > 200:
1168                     self._up_status = us.get_counter()
1169             rs = list(h.list_all_retrieve_statuses())[0]
1170             self._retrieve_status = rs.get_counter()
1171             ps = list(h.list_all_publish_statuses())[0]
1172             self._publish_status = ps.get_counter()
1173             us = list(h.list_all_mapupdate_statuses())[0]
1174             self._update_status = us.get_counter()
1175
1176             # and that there are some upload- and download- status pages
1177             return self.GET("status/up-%d" % self._up_status)
1178         d.addCallback(_got_status)
1179         def _got_up(res):
1180             return self.GET("status/down-%d" % self._down_status)
1181         d.addCallback(_got_up)
1182         def _got_down(res):
1183             return self.GET("status/mapupdate-%d" % self._update_status)
1184         d.addCallback(_got_down)
1185         def _got_update(res):
1186             return self.GET("status/publish-%d" % self._publish_status)
1187         d.addCallback(_got_update)
1188         def _got_publish(res):
1189             return self.GET("status/retrieve-%d" % self._retrieve_status)
1190         d.addCallback(_got_publish)
1191
1192         # check that the helper status page exists
1193         d.addCallback(lambda res:
1194                       self.GET("helper_status", followRedirect=True))
1195         def _got_helper_status(res):
1196             self.failUnless("Bytes Fetched:" in res)
1197             # touch a couple of files in the helper's working directory to
1198             # exercise more code paths
1199             workdir = os.path.join(self.getdir("client0"), "helper")
1200             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1201             f = open(incfile, "wb")
1202             f.write("small file")
1203             f.close()
1204             then = time.time() - 86400*3
1205             now = time.time()
1206             os.utime(incfile, (now, then))
1207             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1208             f = open(encfile, "wb")
1209             f.write("less small file")
1210             f.close()
1211             os.utime(encfile, (now, then))
1212         d.addCallback(_got_helper_status)
1213         # and that the json form exists
1214         d.addCallback(lambda res:
1215                       self.GET("helper_status?t=json", followRedirect=True))
1216         def _got_helper_status_json(res):
1217             data = simplejson.loads(res)
1218             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1219                                  1)
1220             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1221             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1222             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1223                                  10)
1224             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1225             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1226             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1227                                  15)
1228         d.addCallback(_got_helper_status_json)
1229
1230         # and check that client[3] (which uses a helper but does not run one
1231         # itself) doesn't explode when you ask for its status
1232         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1233         def _got_non_helper_status(res):
1234             self.failUnless("Upload and Download Status" in res)
1235         d.addCallback(_got_non_helper_status)
1236
1237         # or for helper status with t=json
1238         d.addCallback(lambda res:
1239                       getPage(self.helper_webish_url + "helper_status?t=json"))
1240         def _got_non_helper_status_json(res):
1241             data = simplejson.loads(res)
1242             self.failUnlessEqual(data, {})
1243         d.addCallback(_got_non_helper_status_json)
1244
1245         # see if the statistics page exists
1246         d.addCallback(lambda res: self.GET("statistics"))
1247         def _got_stats(res):
1248             self.failUnless("Node Statistics" in res)
1249             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1250         d.addCallback(_got_stats)
1251         d.addCallback(lambda res: self.GET("statistics?t=json"))
1252         def _got_stats_json(res):
1253             data = simplejson.loads(res)
1254             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1255             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1256         d.addCallback(_got_stats_json)
1257
1258         # TODO: mangle the second segment of a file, to test errors that
1259         # occur after we've already sent some good data, which uses a
1260         # different error path.
1261
1262         # TODO: download a URI with a form
1263         # TODO: create a directory by using a form
1264         # TODO: upload by using a form on the directory page
1265         #    url = base + "somedir/subdir1/freeform_post!!upload"
1266         # TODO: delete a file by using a button on the directory page
1267
1268         return d
1269
1270     def _test_runner(self, res):
1271         # exercise some of the diagnostic tools in runner.py
1272
1273         # find a share
1274         for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1275             if "storage" not in dirpath:
1276                 continue
1277             if not filenames:
1278                 continue
1279             pieces = dirpath.split(os.sep)
1280             if (len(pieces) >= 4
1281                 and pieces[-4] == "storage"
1282                 and pieces[-3] == "shares"):
1283                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1284                 # are sharefiles here
1285                 filename = os.path.join(dirpath, filenames[0])
1286                 # peek at the magic to see if it is a chk share
1287                 magic = open(filename, "rb").read(4)
1288                 if magic == '\x00\x00\x00\x01':
1289                     break
1290         else:
1291             self.fail("unable to find any uri_extension files in %r"
1292                       % self.basedir)
1293         log.msg("test_system.SystemTest._test_runner using %r" % filename)
1294
1295         out,err = StringIO(), StringIO()
1296         rc = runner.runner(["debug", "dump-share", "--offsets",
1297                             unicode_to_argv(filename)],
1298                            stdout=out, stderr=err)
1299         output = out.getvalue()
1300         self.failUnlessEqual(rc, 0)
1301
1302         # we only upload a single file, so we can assert some things about
1303         # its size and shares.
1304         self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1305         self.failUnlessIn("size: %d\n" % len(self.data), output)
1306         self.failUnlessIn("num_segments: 1\n", output)
1307         # segment_size is always a multiple of needed_shares
1308         self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1309         self.failUnlessIn("total_shares: 10\n", output)
1310         # keys which are supposed to be present
1311         for key in ("size", "num_segments", "segment_size",
1312                     "needed_shares", "total_shares",
1313                     "codec_name", "codec_params", "tail_codec_params",
1314                     #"plaintext_hash", "plaintext_root_hash",
1315                     "crypttext_hash", "crypttext_root_hash",
1316                     "share_root_hash", "UEB_hash"):
1317             self.failUnlessIn("%s: " % key, output)
1318         self.failUnlessIn("  verify-cap: URI:CHK-Verifier:", output)
1319
1320         # now use its storage index to find the other shares using the
1321         # 'find-shares' tool
1322         sharedir, shnum = os.path.split(filename)
1323         storagedir, storage_index_s = os.path.split(sharedir)
1324         storage_index_s = str(storage_index_s)
1325         out,err = StringIO(), StringIO()
1326         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1327         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1328         rc = runner.runner(cmd, stdout=out, stderr=err)
1329         self.failUnlessEqual(rc, 0)
1330         out.seek(0)
1331         sharefiles = [sfn.strip() for sfn in out.readlines()]
1332         self.failUnlessEqual(len(sharefiles), 10)
1333
1334         # also exercise the 'catalog-shares' tool
1335         out,err = StringIO(), StringIO()
1336         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1337         cmd = ["debug", "catalog-shares"] + nodedirs
1338         rc = runner.runner(cmd, stdout=out, stderr=err)
1339         self.failUnlessEqual(rc, 0)
1340         out.seek(0)
1341         descriptions = [sfn.strip() for sfn in out.readlines()]
1342         self.failUnlessEqual(len(descriptions), 30)
1343         matching = [line
1344                     for line in descriptions
1345                     if line.startswith("CHK %s " % storage_index_s)]
1346         self.failUnlessEqual(len(matching), 10)
1347
1348     def _test_control(self, res):
1349         # exercise the remote-control-the-client foolscap interfaces in
1350         # allmydata.control (mostly used for performance tests)
1351         c0 = self.clients[0]
1352         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1353         control_furl = open(control_furl_file, "r").read().strip()
1354         # it doesn't really matter which Tub we use to connect to the client,
1355         # so let's just use our IntroducerNode's
1356         d = self.introducer.tub.getReference(control_furl)
1357         d.addCallback(self._test_control2, control_furl_file)
1358         return d
1359     def _test_control2(self, rref, filename):
1360         d = rref.callRemote("upload_from_file_to_uri",
1361                             filename.encode(get_filesystem_encoding()), convergence=None)
1362         downfile = os.path.join(self.basedir, "control.downfile").encode(get_filesystem_encoding())
1363         d.addCallback(lambda uri:
1364                       rref.callRemote("download_from_uri_to_file",
1365                                       uri, downfile))
1366         def _check(res):
1367             self.failUnlessEqual(res, downfile)
1368             data = open(downfile, "r").read()
1369             expected_data = open(filename, "r").read()
1370             self.failUnlessEqual(data, expected_data)
1371         d.addCallback(_check)
1372         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1373         if sys.platform == "linux2":
1374             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1375         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1376         return d
1377
1378     def _test_cli(self, res):
1379         # run various CLI commands (in a thread, since they use blocking
1380         # network calls)
1381
1382         private_uri = self._private_node.get_uri()
1383         client0_basedir = self.getdir("client0")
1384
1385         nodeargs = [
1386             "--node-directory", client0_basedir,
1387             ]
1388
1389         d = defer.succeed(None)
1390
1391         # for compatibility with earlier versions, private/root_dir.cap is
1392         # supposed to be treated as an alias named "tahoe:". Start by making
1393         # sure that works, before we add other aliases.
1394
1395         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1396         f = open(root_file, "w")
1397         f.write(private_uri)
1398         f.close()
1399
1400         def run(ignored, verb, *args, **kwargs):
1401             stdin = kwargs.get("stdin", "")
1402             newargs = [verb] + nodeargs + list(args)
1403             return self._run_cli(newargs, stdin=stdin)
1404
1405         def _check_ls((out,err), expected_children, unexpected_children=[]):
1406             self.failUnlessEqual(err, "")
1407             for s in expected_children:
1408                 self.failUnless(s in out, (s,out))
1409             for s in unexpected_children:
1410                 self.failIf(s in out, (s,out))
1411
1412         def _check_ls_root((out,err)):
1413             self.failUnless("personal" in out)
1414             self.failUnless("s2-ro" in out)
1415             self.failUnless("s2-rw" in out)
1416             self.failUnlessEqual(err, "")
1417
1418         # this should reference private_uri
1419         d.addCallback(run, "ls")
1420         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1421
1422         d.addCallback(run, "list-aliases")
1423         def _check_aliases_1((out,err)):
1424             self.failUnlessEqual(err, "")
1425             self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1426         d.addCallback(_check_aliases_1)
1427
1428         # now that that's out of the way, remove root_dir.cap and work with
1429         # new files
1430         d.addCallback(lambda res: os.unlink(root_file))
1431         d.addCallback(run, "list-aliases")
1432         def _check_aliases_2((out,err)):
1433             self.failUnlessEqual(err, "")
1434             self.failUnlessEqual(out, "")
1435         d.addCallback(_check_aliases_2)
1436
1437         d.addCallback(run, "mkdir")
1438         def _got_dir( (out,err) ):
1439             self.failUnless(uri.from_string_dirnode(out.strip()))
1440             return out.strip()
1441         d.addCallback(_got_dir)
1442         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1443
1444         d.addCallback(run, "list-aliases")
1445         def _check_aliases_3((out,err)):
1446             self.failUnlessEqual(err, "")
1447             self.failUnless("tahoe: " in out)
1448         d.addCallback(_check_aliases_3)
1449
1450         def _check_empty_dir((out,err)):
1451             self.failUnlessEqual(out, "")
1452             self.failUnlessEqual(err, "")
1453         d.addCallback(run, "ls")
1454         d.addCallback(_check_empty_dir)
1455
1456         def _check_missing_dir((out,err)):
1457             # TODO: check that rc==2
1458             self.failUnlessEqual(out, "")
1459             self.failUnlessEqual(err, "No such file or directory\n")
1460         d.addCallback(run, "ls", "bogus")
1461         d.addCallback(_check_missing_dir)
1462
1463         files = []
1464         datas = []
1465         for i in range(10):
1466             fn = os.path.join(self.basedir, "file%d" % i)
1467             files.append(fn)
1468             data = "data to be uploaded: file%d\n" % i
1469             datas.append(data)
1470             open(fn,"wb").write(data)
1471
1472         def _check_stdout_against((out,err), filenum=None, data=None):
1473             self.failUnlessEqual(err, "")
1474             if filenum is not None:
1475                 self.failUnlessEqual(out, datas[filenum])
1476             if data is not None:
1477                 self.failUnlessEqual(out, data)
1478
1479         # test all both forms of put: from a file, and from stdin
1480         #  tahoe put bar FOO
1481         d.addCallback(run, "put", files[0], "tahoe-file0")
1482         def _put_out((out,err)):
1483             self.failUnless("URI:LIT:" in out, out)
1484             self.failUnless("201 Created" in err, err)
1485             uri0 = out.strip()
1486             return run(None, "get", uri0)
1487         d.addCallback(_put_out)
1488         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1489
1490         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1491         #  tahoe put bar tahoe:FOO
1492         d.addCallback(run, "put", files[2], "tahoe:file2")
1493         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1494         def _check_put_mutable((out,err)):
1495             self._mutable_file3_uri = out.strip()
1496         d.addCallback(_check_put_mutable)
1497         d.addCallback(run, "get", "tahoe:file3")
1498         d.addCallback(_check_stdout_against, 3)
1499
1500         #  tahoe put FOO
1501         STDIN_DATA = "This is the file to upload from stdin."
1502         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1503         #  tahoe put tahoe:FOO
1504         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1505                       stdin="Other file from stdin.")
1506
1507         d.addCallback(run, "ls")
1508         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1509                                   "tahoe-file-stdin", "from-stdin"])
1510         d.addCallback(run, "ls", "subdir")
1511         d.addCallback(_check_ls, ["tahoe-file1"])
1512
1513         # tahoe mkdir FOO
1514         d.addCallback(run, "mkdir", "subdir2")
1515         d.addCallback(run, "ls")
1516         # TODO: extract the URI, set an alias with it
1517         d.addCallback(_check_ls, ["subdir2"])
1518
1519         # tahoe get: (to stdin and to a file)
1520         d.addCallback(run, "get", "tahoe-file0")
1521         d.addCallback(_check_stdout_against, 0)
1522         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1523         d.addCallback(_check_stdout_against, 1)
1524         outfile0 = os.path.join(self.basedir, "outfile0")
1525         d.addCallback(run, "get", "file2", outfile0)
1526         def _check_outfile0((out,err)):
1527             data = open(outfile0,"rb").read()
1528             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1529         d.addCallback(_check_outfile0)
1530         outfile1 = os.path.join(self.basedir, "outfile0")
1531         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1532         def _check_outfile1((out,err)):
1533             data = open(outfile1,"rb").read()
1534             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1535         d.addCallback(_check_outfile1)
1536
1537         d.addCallback(run, "rm", "tahoe-file0")
1538         d.addCallback(run, "rm", "tahoe:file2")
1539         d.addCallback(run, "ls")
1540         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1541
1542         d.addCallback(run, "ls", "-l")
1543         def _check_ls_l((out,err)):
1544             lines = out.split("\n")
1545             for l in lines:
1546                 if "tahoe-file-stdin" in l:
1547                     self.failUnless(l.startswith("-r-- "), l)
1548                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1549                 if "file3" in l:
1550                     self.failUnless(l.startswith("-rw- "), l) # mutable
1551         d.addCallback(_check_ls_l)
1552
1553         d.addCallback(run, "ls", "--uri")
1554         def _check_ls_uri((out,err)):
1555             lines = out.split("\n")
1556             for l in lines:
1557                 if "file3" in l:
1558                     self.failUnless(self._mutable_file3_uri in l)
1559         d.addCallback(_check_ls_uri)
1560
1561         d.addCallback(run, "ls", "--readonly-uri")
1562         def _check_ls_rouri((out,err)):
1563             lines = out.split("\n")
1564             for l in lines:
1565                 if "file3" in l:
1566                     rw_uri = self._mutable_file3_uri
1567                     u = uri.from_string_mutable_filenode(rw_uri)
1568                     ro_uri = u.get_readonly().to_string()
1569                     self.failUnless(ro_uri in l)
1570         d.addCallback(_check_ls_rouri)
1571
1572
1573         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1574         d.addCallback(run, "ls")
1575         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1576
1577         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1578         d.addCallback(run, "ls")
1579         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1580
1581         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1582         d.addCallback(run, "ls")
1583         d.addCallback(_check_ls, ["file3", "file3-copy"])
1584         d.addCallback(run, "get", "tahoe:file3-copy")
1585         d.addCallback(_check_stdout_against, 3)
1586
1587         # copy from disk into tahoe
1588         d.addCallback(run, "cp", files[4], "tahoe:file4")
1589         d.addCallback(run, "ls")
1590         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1591         d.addCallback(run, "get", "tahoe:file4")
1592         d.addCallback(_check_stdout_against, 4)
1593
1594         # copy from tahoe into disk
1595         target_filename = os.path.join(self.basedir, "file-out")
1596         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1597         def _check_cp_out((out,err)):
1598             self.failUnless(os.path.exists(target_filename))
1599             got = open(target_filename,"rb").read()
1600             self.failUnlessEqual(got, datas[4])
1601         d.addCallback(_check_cp_out)
1602
1603         # copy from disk to disk (silly case)
1604         target2_filename = os.path.join(self.basedir, "file-out-copy")
1605         d.addCallback(run, "cp", target_filename, target2_filename)
1606         def _check_cp_out2((out,err)):
1607             self.failUnless(os.path.exists(target2_filename))
1608             got = open(target2_filename,"rb").read()
1609             self.failUnlessEqual(got, datas[4])
1610         d.addCallback(_check_cp_out2)
1611
1612         # copy from tahoe into disk, overwriting an existing file
1613         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1614         def _check_cp_out3((out,err)):
1615             self.failUnless(os.path.exists(target_filename))
1616             got = open(target_filename,"rb").read()
1617             self.failUnlessEqual(got, datas[3])
1618         d.addCallback(_check_cp_out3)
1619
1620         # copy from disk into tahoe, overwriting an existing immutable file
1621         d.addCallback(run, "cp", files[5], "tahoe:file4")
1622         d.addCallback(run, "ls")
1623         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1624         d.addCallback(run, "get", "tahoe:file4")
1625         d.addCallback(_check_stdout_against, 5)
1626
1627         # copy from disk into tahoe, overwriting an existing mutable file
1628         d.addCallback(run, "cp", files[5], "tahoe:file3")
1629         d.addCallback(run, "ls")
1630         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1631         d.addCallback(run, "get", "tahoe:file3")
1632         d.addCallback(_check_stdout_against, 5)
1633
1634         # recursive copy: setup
1635         dn = os.path.join(self.basedir, "dir1")
1636         os.makedirs(dn)
1637         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1638         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1639         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1640         sdn2 = os.path.join(dn, "subdir2")
1641         os.makedirs(sdn2)
1642         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1643         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1644
1645         # from disk into tahoe
1646         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1647         d.addCallback(run, "ls")
1648         d.addCallback(_check_ls, ["dir1"])
1649         d.addCallback(run, "ls", "dir1")
1650         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1651                       ["rfile4", "rfile5"])
1652         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1653         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1654                       ["rfile1", "rfile2", "rfile3"])
1655         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1656         d.addCallback(_check_stdout_against, data="rfile4")
1657
1658         # and back out again
1659         dn_copy = os.path.join(self.basedir, "dir1-copy")
1660         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1661         def _check_cp_r_out((out,err)):
1662             def _cmp(name):
1663                 old = open(os.path.join(dn, name), "rb").read()
1664                 newfn = os.path.join(dn_copy, name)
1665                 self.failUnless(os.path.exists(newfn))
1666                 new = open(newfn, "rb").read()
1667                 self.failUnlessEqual(old, new)
1668             _cmp("rfile1")
1669             _cmp("rfile2")
1670             _cmp("rfile3")
1671             _cmp(os.path.join("subdir2", "rfile4"))
1672             _cmp(os.path.join("subdir2", "rfile5"))
1673         d.addCallback(_check_cp_r_out)
1674
1675         # and copy it a second time, which ought to overwrite the same files
1676         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1677
1678         # and again, only writing filecaps
1679         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1680         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1681         def _check_capsonly((out,err)):
1682             # these should all be LITs
1683             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1684             y = uri.from_string_filenode(x)
1685             self.failUnlessEqual(y.data, "rfile4")
1686         d.addCallback(_check_capsonly)
1687
1688         # and tahoe-to-tahoe
1689         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1690         d.addCallback(run, "ls")
1691         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1692         d.addCallback(run, "ls", "dir1-copy")
1693         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1694                       ["rfile4", "rfile5"])
1695         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1696         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1697                       ["rfile1", "rfile2", "rfile3"])
1698         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1699         d.addCallback(_check_stdout_against, data="rfile4")
1700
1701         # and copy it a second time, which ought to overwrite the same files
1702         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1703
1704         # tahoe_ls doesn't currently handle the error correctly: it tries to
1705         # JSON-parse a traceback.
1706 ##         def _ls_missing(res):
1707 ##             argv = ["ls"] + nodeargs + ["bogus"]
1708 ##             return self._run_cli(argv)
1709 ##         d.addCallback(_ls_missing)
1710 ##         def _check_ls_missing((out,err)):
1711 ##             print "OUT", out
1712 ##             print "ERR", err
1713 ##             self.failUnlessEqual(err, "")
1714 ##         d.addCallback(_check_ls_missing)
1715
1716         return d
1717
1718     def _run_cli(self, argv, stdin=""):
1719         #print "CLI:", argv
1720         stdout, stderr = StringIO(), StringIO()
1721         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1722                                   stdin=StringIO(stdin),
1723                                   stdout=stdout, stderr=stderr)
1724         def _done(res):
1725             return stdout.getvalue(), stderr.getvalue()
1726         d.addCallback(_done)
1727         return d
1728
1729     def _test_checker(self, res):
1730         ut = upload.Data("too big to be literal" * 200, convergence=None)
1731         d = self._personal_node.add_file(u"big file", ut)
1732
1733         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1734         def _check_dirnode_results(r):
1735             self.failUnless(r.is_healthy())
1736         d.addCallback(_check_dirnode_results)
1737         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1738         d.addCallback(_check_dirnode_results)
1739
1740         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1741         def _got_chk_filenode(n):
1742             self.failUnless(isinstance(n, ImmutableFileNode))
1743             d = n.check(Monitor())
1744             def _check_filenode_results(r):
1745                 self.failUnless(r.is_healthy())
1746             d.addCallback(_check_filenode_results)
1747             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1748             d.addCallback(_check_filenode_results)
1749             return d
1750         d.addCallback(_got_chk_filenode)
1751
1752         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1753         def _got_lit_filenode(n):
1754             self.failUnless(isinstance(n, LiteralFileNode))
1755             d = n.check(Monitor())
1756             def _check_lit_filenode_results(r):
1757                 self.failUnlessEqual(r, None)
1758             d.addCallback(_check_lit_filenode_results)
1759             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1760             d.addCallback(_check_lit_filenode_results)
1761             return d
1762         d.addCallback(_got_lit_filenode)
1763         return d
1764