]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
tidy up DeadReferenceError handling, ignore them in add_lease calls
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 import allmydata
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.util.consumer import MemoryConsumer, download_to_data
16 from allmydata.scripts import runner
17 from allmydata.interfaces import IDirectoryNode, IFileNode, \
18      NoSuchChildError, NoSharesError
19 from allmydata.monitor import Monitor
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap.api import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
26
27 from allmydata.test.common import SystemTestMixin
28
29 LARGE_DATA = """
30 This is some data to publish to the virtual drive, which needs to be large
31 enough to not fit inside a LIT uri.
32 """
33
34 class CountingDataUploadable(upload.Data):
35     bytes_read = 0
36     interrupt_after = None
37     interrupt_after_d = None
38
39     def read(self, length):
40         self.bytes_read += length
41         if self.interrupt_after is not None:
42             if self.bytes_read > self.interrupt_after:
43                 self.interrupt_after = None
44                 self.interrupt_after_d.callback(self)
45         return upload.Data.read(self, length)
46
47 class SystemTest(SystemTestMixin, unittest.TestCase):
48     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
49
50     def test_connections(self):
51         self.basedir = "system/SystemTest/test_connections"
52         d = self.set_up_nodes()
53         self.extra_node = None
54         d.addCallback(lambda res: self.add_extra_node(self.numclients))
55         def _check(extra_node):
56             self.extra_node = extra_node
57             for c in self.clients:
58                 all_peerids = c.get_storage_broker().get_all_serverids()
59                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
60                 sb = c.storage_broker
61                 permuted_peers = sb.get_servers_for_index("a")
62                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
63
64         d.addCallback(_check)
65         def _shutdown_extra_node(res):
66             if self.extra_node:
67                 return self.extra_node.stopService()
68             return res
69         d.addBoth(_shutdown_extra_node)
70         return d
71     # test_connections is subsumed by test_upload_and_download, and takes
72     # quite a while to run on a slow machine (because of all the TLS
73     # connections that must be established). If we ever rework the introducer
74     # code to such an extent that we're not sure if it works anymore, we can
75     # reinstate this test until it does.
76     del test_connections
77
78     def test_upload_and_download_random_key(self):
79         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
80         return self._test_upload_and_download(convergence=None)
81
82     def test_upload_and_download_convergent(self):
83         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
84         return self._test_upload_and_download(convergence="some convergence string")
85
86     def _test_upload_and_download(self, convergence):
87         # we use 4000 bytes of data, which will result in about 400k written
88         # to disk among all our simulated nodes
89         DATA = "Some data to upload\n" * 200
90         d = self.set_up_nodes()
91         def _check_connections(res):
92             for c in self.clients:
93                 all_peerids = c.get_storage_broker().get_all_serverids()
94                 self.failUnlessEqual(len(all_peerids), self.numclients)
95                 sb = c.storage_broker
96                 permuted_peers = sb.get_servers_for_index("a")
97                 self.failUnlessEqual(len(permuted_peers), self.numclients)
98         d.addCallback(_check_connections)
99
100         def _do_upload(res):
101             log.msg("UPLOADING")
102             u = self.clients[0].getServiceNamed("uploader")
103             self.uploader = u
104             # we crank the max segsize down to 1024b for the duration of this
105             # test, so we can exercise multiple segments. It is important
106             # that this is not a multiple of the segment size, so that the
107             # tail segment is not the same length as the others. This actualy
108             # gets rounded up to 1025 to be a multiple of the number of
109             # required shares (since we use 25 out of 100 FEC).
110             up = upload.Data(DATA, convergence=convergence)
111             up.max_segment_size = 1024
112             d1 = u.upload(up)
113             return d1
114         d.addCallback(_do_upload)
115         def _upload_done(results):
116             theuri = results.uri
117             log.msg("upload finished: uri is %s" % (theuri,))
118             self.uri = theuri
119             assert isinstance(self.uri, str), self.uri
120             self.cap = uri.from_string(self.uri)
121             self.n = self.clients[1].create_node_from_uri(self.uri)
122         d.addCallback(_upload_done)
123
124         def _upload_again(res):
125             # Upload again. If using convergent encryption then this ought to be
126             # short-circuited, however with the way we currently generate URIs
127             # (i.e. because they include the roothash), we have to do all of the
128             # encoding work, and only get to save on the upload part.
129             log.msg("UPLOADING AGAIN")
130             up = upload.Data(DATA, convergence=convergence)
131             up.max_segment_size = 1024
132             d1 = self.uploader.upload(up)
133         d.addCallback(_upload_again)
134
135         def _download_to_data(res):
136             log.msg("DOWNLOADING")
137             return download_to_data(self.n)
138         d.addCallback(_download_to_data)
139         def _download_to_data_done(data):
140             log.msg("download finished")
141             self.failUnlessEqual(data, DATA)
142         d.addCallback(_download_to_data_done)
143
144         def _test_read(res):
145             n = self.clients[1].create_node_from_uri(self.uri)
146             d = download_to_data(n)
147             def _read_done(data):
148                 self.failUnlessEqual(data, DATA)
149             d.addCallback(_read_done)
150             d.addCallback(lambda ign:
151                           n.read(MemoryConsumer(), offset=1, size=4))
152             def _read_portion_done(mc):
153                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
154             d.addCallback(_read_portion_done)
155             d.addCallback(lambda ign:
156                           n.read(MemoryConsumer(), offset=2, size=None))
157             def _read_tail_done(mc):
158                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
159             d.addCallback(_read_tail_done)
160             d.addCallback(lambda ign:
161                           n.read(MemoryConsumer(), size=len(DATA)+1000))
162             def _read_too_much(mc):
163                 self.failUnlessEqual("".join(mc.chunks), DATA)
164             d.addCallback(_read_too_much)
165
166             return d
167         d.addCallback(_test_read)
168
169         def _test_bad_read(res):
170             bad_u = uri.from_string_filenode(self.uri)
171             bad_u.key = self.flip_bit(bad_u.key)
172             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
173             # this should cause an error during download
174
175             d = self.shouldFail2(NoSharesError, "'download bad node'",
176                                  None,
177                                  bad_n.read, MemoryConsumer(), offset=2)
178             return d
179         d.addCallback(_test_bad_read)
180
181         def _download_nonexistent_uri(res):
182             baduri = self.mangle_uri(self.uri)
183             badnode = self.clients[1].create_node_from_uri(baduri)
184             log.msg("about to download non-existent URI", level=log.UNUSUAL,
185                     facility="tahoe.tests")
186             d1 = download_to_data(badnode)
187             def _baduri_should_fail(res):
188                 log.msg("finished downloading non-existend URI",
189                         level=log.UNUSUAL, facility="tahoe.tests")
190                 self.failUnless(isinstance(res, Failure))
191                 self.failUnless(res.check(NoSharesError),
192                                 "expected NoSharesError, got %s" % res)
193             d1.addBoth(_baduri_should_fail)
194             return d1
195         d.addCallback(_download_nonexistent_uri)
196
197         # add a new node, which doesn't accept shares, and only uses the
198         # helper for upload.
199         d.addCallback(lambda res: self.add_extra_node(self.numclients,
200                                                       self.helper_furl,
201                                                       add_to_sparent=True))
202         def _added(extra_node):
203             self.extra_node = extra_node
204         d.addCallback(_added)
205
206         HELPER_DATA = "Data that needs help to upload" * 1000
207         def _upload_with_helper(res):
208             u = upload.Data(HELPER_DATA, convergence=convergence)
209             d = self.extra_node.upload(u)
210             def _uploaded(results):
211                 n = self.clients[1].create_node_from_uri(results.uri)
212                 return download_to_data(n)
213             d.addCallback(_uploaded)
214             def _check(newdata):
215                 self.failUnlessEqual(newdata, HELPER_DATA)
216             d.addCallback(_check)
217             return d
218         d.addCallback(_upload_with_helper)
219
220         def _upload_duplicate_with_helper(res):
221             u = upload.Data(HELPER_DATA, convergence=convergence)
222             u.debug_stash_RemoteEncryptedUploadable = True
223             d = self.extra_node.upload(u)
224             def _uploaded(results):
225                 n = self.clients[1].create_node_from_uri(results.uri)
226                 return download_to_data(n)
227             d.addCallback(_uploaded)
228             def _check(newdata):
229                 self.failUnlessEqual(newdata, HELPER_DATA)
230                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
231                             "uploadable started uploading, should have been avoided")
232             d.addCallback(_check)
233             return d
234         if convergence is not None:
235             d.addCallback(_upload_duplicate_with_helper)
236
237         def _upload_resumable(res):
238             DATA = "Data that needs help to upload and gets interrupted" * 1000
239             u1 = CountingDataUploadable(DATA, convergence=convergence)
240             u2 = CountingDataUploadable(DATA, convergence=convergence)
241
242             # we interrupt the connection after about 5kB by shutting down
243             # the helper, then restartingit.
244             u1.interrupt_after = 5000
245             u1.interrupt_after_d = defer.Deferred()
246             u1.interrupt_after_d.addCallback(lambda res:
247                                              self.bounce_client(0))
248
249             # sneak into the helper and reduce its chunk size, so that our
250             # debug_interrupt will sever the connection on about the fifth
251             # chunk fetched. This makes sure that we've started to write the
252             # new shares before we abandon them, which exercises the
253             # abort/delete-partial-share code. TODO: find a cleaner way to do
254             # this. I know that this will affect later uses of the helper in
255             # this same test run, but I'm not currently worried about it.
256             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
257
258             d = self.extra_node.upload(u1)
259
260             def _should_not_finish(res):
261                 self.fail("interrupted upload should have failed, not finished"
262                           " with result %s" % (res,))
263             def _interrupted(f):
264                 f.trap(DeadReferenceError)
265
266                 # make sure we actually interrupted it before finishing the
267                 # file
268                 self.failUnless(u1.bytes_read < len(DATA),
269                                 "read %d out of %d total" % (u1.bytes_read,
270                                                              len(DATA)))
271
272                 log.msg("waiting for reconnect", level=log.NOISY,
273                         facility="tahoe.test.test_system")
274                 # now, we need to give the nodes a chance to notice that this
275                 # connection has gone away. When this happens, the storage
276                 # servers will be told to abort their uploads, removing the
277                 # partial shares. Unfortunately this involves TCP messages
278                 # going through the loopback interface, and we can't easily
279                 # predict how long that will take. If it were all local, we
280                 # could use fireEventually() to stall. Since we don't have
281                 # the right introduction hooks, the best we can do is use a
282                 # fixed delay. TODO: this is fragile.
283                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
284                 return u1.interrupt_after_d
285             d.addCallbacks(_should_not_finish, _interrupted)
286
287             def _disconnected(res):
288                 # check to make sure the storage servers aren't still hanging
289                 # on to the partial share: their incoming/ directories should
290                 # now be empty.
291                 log.msg("disconnected", level=log.NOISY,
292                         facility="tahoe.test.test_system")
293                 for i in range(self.numclients):
294                     incdir = os.path.join(self.getdir("client%d" % i),
295                                           "storage", "shares", "incoming")
296                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
297             d.addCallback(_disconnected)
298
299             # then we need to give the reconnector a chance to
300             # reestablish the connection to the helper.
301             d.addCallback(lambda res:
302                           log.msg("wait_for_connections", level=log.NOISY,
303                                   facility="tahoe.test.test_system"))
304             d.addCallback(lambda res: self.wait_for_connections())
305
306
307             d.addCallback(lambda res:
308                           log.msg("uploading again", level=log.NOISY,
309                                   facility="tahoe.test.test_system"))
310             d.addCallback(lambda res: self.extra_node.upload(u2))
311
312             def _uploaded(results):
313                 cap = results.uri
314                 log.msg("Second upload complete", level=log.NOISY,
315                         facility="tahoe.test.test_system")
316
317                 # this is really bytes received rather than sent, but it's
318                 # convenient and basically measures the same thing
319                 bytes_sent = results.ciphertext_fetched
320
321                 # We currently don't support resumption of upload if the data is
322                 # encrypted with a random key.  (Because that would require us
323                 # to store the key locally and re-use it on the next upload of
324                 # this file, which isn't a bad thing to do, but we currently
325                 # don't do it.)
326                 if convergence is not None:
327                     # Make sure we did not have to read the whole file the
328                     # second time around .
329                     self.failUnless(bytes_sent < len(DATA),
330                                 "resumption didn't save us any work:"
331                                 " read %d bytes out of %d total" %
332                                 (bytes_sent, len(DATA)))
333                 else:
334                     # Make sure we did have to read the whole file the second
335                     # time around -- because the one that we partially uploaded
336                     # earlier was encrypted with a different random key.
337                     self.failIf(bytes_sent < len(DATA),
338                                 "resumption saved us some work even though we were using random keys:"
339                                 " read %d bytes out of %d total" %
340                                 (bytes_sent, len(DATA)))
341                 n = self.clients[1].create_node_from_uri(cap)
342                 return download_to_data(n)
343             d.addCallback(_uploaded)
344
345             def _check(newdata):
346                 self.failUnlessEqual(newdata, DATA)
347                 # If using convergent encryption, then also check that the
348                 # helper has removed the temp file from its directories.
349                 if convergence is not None:
350                     basedir = os.path.join(self.getdir("client0"), "helper")
351                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
352                     self.failUnlessEqual(files, [])
353                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
354                     self.failUnlessEqual(files, [])
355             d.addCallback(_check)
356             return d
357         d.addCallback(_upload_resumable)
358
359         def _grab_stats(ignored):
360             # the StatsProvider doesn't normally publish a FURL:
361             # instead it passes a live reference to the StatsGatherer
362             # (if and when it connects). To exercise the remote stats
363             # interface, we manually publish client0's StatsProvider
364             # and use client1 to query it.
365             sp = self.clients[0].stats_provider
366             sp_furl = self.clients[0].tub.registerReference(sp)
367             d = self.clients[1].tub.getReference(sp_furl)
368             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
369             def _got_stats(stats):
370                 #print "STATS"
371                 #from pprint import pprint
372                 #pprint(stats)
373                 s = stats["stats"]
374                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
375                 c = stats["counters"]
376                 self.failUnless("storage_server.allocate" in c)
377             d.addCallback(_got_stats)
378             return d
379         d.addCallback(_grab_stats)
380
381         return d
382
383     def _find_shares(self, basedir):
384         shares = []
385         for (dirpath, dirnames, filenames) in os.walk(basedir):
386             if "storage" not in dirpath:
387                 continue
388             if not filenames:
389                 continue
390             pieces = dirpath.split(os.sep)
391             if (len(pieces) >= 5
392                 and pieces[-4] == "storage"
393                 and pieces[-3] == "shares"):
394                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
395                 # are sharefiles here
396                 assert pieces[-5].startswith("client")
397                 client_num = int(pieces[-5][-1])
398                 storage_index_s = pieces[-1]
399                 storage_index = si_a2b(storage_index_s)
400                 for sharename in filenames:
401                     shnum = int(sharename)
402                     filename = os.path.join(dirpath, sharename)
403                     data = (client_num, storage_index, filename, shnum)
404                     shares.append(data)
405         if not shares:
406             self.fail("unable to find any share files in %s" % basedir)
407         return shares
408
409     def _corrupt_mutable_share(self, filename, which):
410         msf = MutableShareFile(filename)
411         datav = msf.readv([ (0, 1000000) ])
412         final_share = datav[0]
413         assert len(final_share) < 1000000 # ought to be truncated
414         pieces = mutable_layout.unpack_share(final_share)
415         (seqnum, root_hash, IV, k, N, segsize, datalen,
416          verification_key, signature, share_hash_chain, block_hash_tree,
417          share_data, enc_privkey) = pieces
418
419         if which == "seqnum":
420             seqnum = seqnum + 15
421         elif which == "R":
422             root_hash = self.flip_bit(root_hash)
423         elif which == "IV":
424             IV = self.flip_bit(IV)
425         elif which == "segsize":
426             segsize = segsize + 15
427         elif which == "pubkey":
428             verification_key = self.flip_bit(verification_key)
429         elif which == "signature":
430             signature = self.flip_bit(signature)
431         elif which == "share_hash_chain":
432             nodenum = share_hash_chain.keys()[0]
433             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
434         elif which == "block_hash_tree":
435             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
436         elif which == "share_data":
437             share_data = self.flip_bit(share_data)
438         elif which == "encprivkey":
439             enc_privkey = self.flip_bit(enc_privkey)
440
441         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
442                                             segsize, datalen)
443         final_share = mutable_layout.pack_share(prefix,
444                                                 verification_key,
445                                                 signature,
446                                                 share_hash_chain,
447                                                 block_hash_tree,
448                                                 share_data,
449                                                 enc_privkey)
450         msf.writev( [(0, final_share)], None)
451
452
453     def test_mutable(self):
454         self.basedir = "system/SystemTest/test_mutable"
455         DATA = "initial contents go here."  # 25 bytes % 3 != 0
456         NEWDATA = "new contents yay"
457         NEWERDATA = "this is getting old"
458
459         d = self.set_up_nodes(use_key_generator=True)
460
461         def _create_mutable(res):
462             c = self.clients[0]
463             log.msg("starting create_mutable_file")
464             d1 = c.create_mutable_file(DATA)
465             def _done(res):
466                 log.msg("DONE: %s" % (res,))
467                 self._mutable_node_1 = res
468                 uri = res.get_uri()
469             d1.addCallback(_done)
470             return d1
471         d.addCallback(_create_mutable)
472
473         def _test_debug(res):
474             # find a share. It is important to run this while there is only
475             # one slot in the grid.
476             shares = self._find_shares(self.basedir)
477             (client_num, storage_index, filename, shnum) = shares[0]
478             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
479                     % filename)
480             log.msg(" for clients[%d]" % client_num)
481
482             out,err = StringIO(), StringIO()
483             rc = runner.runner(["debug", "dump-share", "--offsets",
484                                 filename],
485                                stdout=out, stderr=err)
486             output = out.getvalue()
487             self.failUnlessEqual(rc, 0)
488             try:
489                 self.failUnless("Mutable slot found:\n" in output)
490                 self.failUnless("share_type: SDMF\n" in output)
491                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
492                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
493                 self.failUnless(" num_extra_leases: 0\n" in output)
494                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
495                                 in output)
496                 self.failUnless(" SDMF contents:\n" in output)
497                 self.failUnless("  seqnum: 1\n" in output)
498                 self.failUnless("  required_shares: 3\n" in output)
499                 self.failUnless("  total_shares: 10\n" in output)
500                 self.failUnless("  segsize: 27\n" in output, (output, filename))
501                 self.failUnless("  datalen: 25\n" in output)
502                 # the exact share_hash_chain nodes depends upon the sharenum,
503                 # and is more of a hassle to compute than I want to deal with
504                 # now
505                 self.failUnless("  share_hash_chain: " in output)
506                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
507                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
508                             base32.b2a(storage_index))
509                 self.failUnless(expected in output)
510             except unittest.FailTest:
511                 print
512                 print "dump-share output was:"
513                 print output
514                 raise
515         d.addCallback(_test_debug)
516
517         # test retrieval
518
519         # first, let's see if we can use the existing node to retrieve the
520         # contents. This allows it to use the cached pubkey and maybe the
521         # latest-known sharemap.
522
523         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
524         def _check_download_1(res):
525             self.failUnlessEqual(res, DATA)
526             # now we see if we can retrieve the data from a new node,
527             # constructed using the URI of the original one. We do this test
528             # on the same client that uploaded the data.
529             uri = self._mutable_node_1.get_uri()
530             log.msg("starting retrieve1")
531             newnode = self.clients[0].create_node_from_uri(uri)
532             newnode_2 = self.clients[0].create_node_from_uri(uri)
533             self.failUnlessIdentical(newnode, newnode_2)
534             return newnode.download_best_version()
535         d.addCallback(_check_download_1)
536
537         def _check_download_2(res):
538             self.failUnlessEqual(res, DATA)
539             # same thing, but with a different client
540             uri = self._mutable_node_1.get_uri()
541             newnode = self.clients[1].create_node_from_uri(uri)
542             log.msg("starting retrieve2")
543             d1 = newnode.download_best_version()
544             d1.addCallback(lambda res: (res, newnode))
545             return d1
546         d.addCallback(_check_download_2)
547
548         def _check_download_3((res, newnode)):
549             self.failUnlessEqual(res, DATA)
550             # replace the data
551             log.msg("starting replace1")
552             d1 = newnode.overwrite(NEWDATA)
553             d1.addCallback(lambda res: newnode.download_best_version())
554             return d1
555         d.addCallback(_check_download_3)
556
557         def _check_download_4(res):
558             self.failUnlessEqual(res, NEWDATA)
559             # now create an even newer node and replace the data on it. This
560             # new node has never been used for download before.
561             uri = self._mutable_node_1.get_uri()
562             newnode1 = self.clients[2].create_node_from_uri(uri)
563             newnode2 = self.clients[3].create_node_from_uri(uri)
564             self._newnode3 = self.clients[3].create_node_from_uri(uri)
565             log.msg("starting replace2")
566             d1 = newnode1.overwrite(NEWERDATA)
567             d1.addCallback(lambda res: newnode2.download_best_version())
568             return d1
569         d.addCallback(_check_download_4)
570
571         def _check_download_5(res):
572             log.msg("finished replace2")
573             self.failUnlessEqual(res, NEWERDATA)
574         d.addCallback(_check_download_5)
575
576         def _corrupt_shares(res):
577             # run around and flip bits in all but k of the shares, to test
578             # the hash checks
579             shares = self._find_shares(self.basedir)
580             ## sort by share number
581             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
582             where = dict([ (shnum, filename)
583                            for (client_num, storage_index, filename, shnum)
584                            in shares ])
585             assert len(where) == 10 # this test is designed for 3-of-10
586             for shnum, filename in where.items():
587                 # shares 7,8,9 are left alone. read will check
588                 # (share_hash_chain, block_hash_tree, share_data). New
589                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
590                 # segsize, signature).
591                 if shnum == 0:
592                     # read: this will trigger "pubkey doesn't match
593                     # fingerprint".
594                     self._corrupt_mutable_share(filename, "pubkey")
595                     self._corrupt_mutable_share(filename, "encprivkey")
596                 elif shnum == 1:
597                     # triggers "signature is invalid"
598                     self._corrupt_mutable_share(filename, "seqnum")
599                 elif shnum == 2:
600                     # triggers "signature is invalid"
601                     self._corrupt_mutable_share(filename, "R")
602                 elif shnum == 3:
603                     # triggers "signature is invalid"
604                     self._corrupt_mutable_share(filename, "segsize")
605                 elif shnum == 4:
606                     self._corrupt_mutable_share(filename, "share_hash_chain")
607                 elif shnum == 5:
608                     self._corrupt_mutable_share(filename, "block_hash_tree")
609                 elif shnum == 6:
610                     self._corrupt_mutable_share(filename, "share_data")
611                 # other things to correct: IV, signature
612                 # 7,8,9 are left alone
613
614                 # note that initial_query_count=5 means that we'll hit the
615                 # first 5 servers in effectively random order (based upon
616                 # response time), so we won't necessarily ever get a "pubkey
617                 # doesn't match fingerprint" error (if we hit shnum>=1 before
618                 # shnum=0, we pull the pubkey from there). To get repeatable
619                 # specific failures, we need to set initial_query_count=1,
620                 # but of course that will change the sequencing behavior of
621                 # the retrieval process. TODO: find a reasonable way to make
622                 # this a parameter, probably when we expand this test to test
623                 # for one failure mode at a time.
624
625                 # when we retrieve this, we should get three signature
626                 # failures (where we've mangled seqnum, R, and segsize). The
627                 # pubkey mangling
628         d.addCallback(_corrupt_shares)
629
630         d.addCallback(lambda res: self._newnode3.download_best_version())
631         d.addCallback(_check_download_5)
632
633         def _check_empty_file(res):
634             # make sure we can create empty files, this usually screws up the
635             # segsize math
636             d1 = self.clients[2].create_mutable_file("")
637             d1.addCallback(lambda newnode: newnode.download_best_version())
638             d1.addCallback(lambda res: self.failUnlessEqual("", res))
639             return d1
640         d.addCallback(_check_empty_file)
641
642         d.addCallback(lambda res: self.clients[0].create_dirnode())
643         def _created_dirnode(dnode):
644             log.msg("_created_dirnode(%s)" % (dnode,))
645             d1 = dnode.list()
646             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
647             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
648             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
649             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
650             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
651             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
652             d1.addCallback(lambda res: dnode.build_manifest().when_done())
653             d1.addCallback(lambda res:
654                            self.failUnlessEqual(len(res["manifest"]), 1))
655             return d1
656         d.addCallback(_created_dirnode)
657
658         def wait_for_c3_kg_conn():
659             return self.clients[3]._key_generator is not None
660         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
661
662         def check_kg_poolsize(junk, size_delta):
663             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
664                                  self.key_generator_svc.key_generator.pool_size + size_delta)
665
666         d.addCallback(check_kg_poolsize, 0)
667         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
668         d.addCallback(check_kg_poolsize, -1)
669         d.addCallback(lambda junk: self.clients[3].create_dirnode())
670         d.addCallback(check_kg_poolsize, -2)
671         # use_helper induces use of clients[3], which is the using-key_gen client
672         d.addCallback(lambda junk:
673                       self.POST("uri?t=mkdir&name=george", use_helper=True))
674         d.addCallback(check_kg_poolsize, -3)
675
676         return d
677
678     def flip_bit(self, good):
679         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
680
681     def mangle_uri(self, gooduri):
682         # change the key, which changes the storage index, which means we'll
683         # be asking about the wrong file, so nobody will have any shares
684         u = uri.from_string(gooduri)
685         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
686                             uri_extension_hash=u.uri_extension_hash,
687                             needed_shares=u.needed_shares,
688                             total_shares=u.total_shares,
689                             size=u.size)
690         return u2.to_string()
691
692     # TODO: add a test which mangles the uri_extension_hash instead, and
693     # should fail due to not being able to get a valid uri_extension block.
694     # Also a test which sneakily mangles the uri_extension block to change
695     # some of the validation data, so it will fail in the post-download phase
696     # when the file's crypttext integrity check fails. Do the same thing for
697     # the key, which should cause the download to fail the post-download
698     # plaintext_hash check.
699
700     def test_vdrive(self):
701         self.basedir = "system/SystemTest/test_vdrive"
702         self.data = LARGE_DATA
703         d = self.set_up_nodes(use_stats_gatherer=True)
704         d.addCallback(self._test_introweb)
705         d.addCallback(self.log, "starting publish")
706         d.addCallback(self._do_publish1)
707         d.addCallback(self._test_runner)
708         d.addCallback(self._do_publish2)
709         # at this point, we have the following filesystem (where "R" denotes
710         # self._root_directory_uri):
711         # R
712         # R/subdir1
713         # R/subdir1/mydata567
714         # R/subdir1/subdir2/
715         # R/subdir1/subdir2/mydata992
716
717         d.addCallback(lambda res: self.bounce_client(0))
718         d.addCallback(self.log, "bounced client0")
719
720         d.addCallback(self._check_publish1)
721         d.addCallback(self.log, "did _check_publish1")
722         d.addCallback(self._check_publish2)
723         d.addCallback(self.log, "did _check_publish2")
724         d.addCallback(self._do_publish_private)
725         d.addCallback(self.log, "did _do_publish_private")
726         # now we also have (where "P" denotes a new dir):
727         #  P/personal/sekrit data
728         #  P/s2-rw -> /subdir1/subdir2/
729         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
730         d.addCallback(self._check_publish_private)
731         d.addCallback(self.log, "did _check_publish_private")
732         d.addCallback(self._test_web)
733         d.addCallback(self._test_control)
734         d.addCallback(self._test_cli)
735         # P now has four top-level children:
736         # P/personal/sekrit data
737         # P/s2-ro/
738         # P/s2-rw/
739         # P/test_put/  (empty)
740         d.addCallback(self._test_checker)
741         return d
742
743     def _test_introweb(self, res):
744         d = getPage(self.introweb_url, method="GET", followRedirect=True)
745         def _check(res):
746             try:
747                 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
748                                 in res)
749                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
750                 self.failUnless("Subscription Summary: storage: 5" in res)
751             except unittest.FailTest:
752                 print
753                 print "GET %s output was:" % self.introweb_url
754                 print res
755                 raise
756         d.addCallback(_check)
757         d.addCallback(lambda res:
758                       getPage(self.introweb_url + "?t=json",
759                               method="GET", followRedirect=True))
760         def _check_json(res):
761             data = simplejson.loads(res)
762             try:
763                 self.failUnlessEqual(data["subscription_summary"],
764                                      {"storage": 5})
765                 self.failUnlessEqual(data["announcement_summary"],
766                                      {"storage": 5, "stub_client": 5})
767                 self.failUnlessEqual(data["announcement_distinct_hosts"],
768                                      {"storage": 1, "stub_client": 1})
769             except unittest.FailTest:
770                 print
771                 print "GET %s?t=json output was:" % self.introweb_url
772                 print res
773                 raise
774         d.addCallback(_check_json)
775         return d
776
777     def _do_publish1(self, res):
778         ut = upload.Data(self.data, convergence=None)
779         c0 = self.clients[0]
780         d = c0.create_dirnode()
781         def _made_root(new_dirnode):
782             self._root_directory_uri = new_dirnode.get_uri()
783             return c0.create_node_from_uri(self._root_directory_uri)
784         d.addCallback(_made_root)
785         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
786         def _made_subdir1(subdir1_node):
787             self._subdir1_node = subdir1_node
788             d1 = subdir1_node.add_file(u"mydata567", ut)
789             d1.addCallback(self.log, "publish finished")
790             def _stash_uri(filenode):
791                 self.uri = filenode.get_uri()
792                 assert isinstance(self.uri, str), (self.uri, filenode)
793             d1.addCallback(_stash_uri)
794             return d1
795         d.addCallback(_made_subdir1)
796         return d
797
798     def _do_publish2(self, res):
799         ut = upload.Data(self.data, convergence=None)
800         d = self._subdir1_node.create_subdirectory(u"subdir2")
801         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
802         return d
803
804     def log(self, res, *args, **kwargs):
805         # print "MSG: %s  RES: %s" % (msg, args)
806         log.msg(*args, **kwargs)
807         return res
808
809     def _do_publish_private(self, res):
810         self.smalldata = "sssh, very secret stuff"
811         ut = upload.Data(self.smalldata, convergence=None)
812         d = self.clients[0].create_dirnode()
813         d.addCallback(self.log, "GOT private directory")
814         def _got_new_dir(privnode):
815             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
816             d1 = privnode.create_subdirectory(u"personal")
817             d1.addCallback(self.log, "made P/personal")
818             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
819             d1.addCallback(self.log, "made P/personal/sekrit data")
820             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
821             def _got_s2(s2node):
822                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
823                                       s2node.get_readonly_uri())
824                 d2.addCallback(lambda node:
825                                privnode.set_uri(u"s2-ro",
826                                                 s2node.get_readonly_uri(),
827                                                 s2node.get_readonly_uri()))
828                 return d2
829             d1.addCallback(_got_s2)
830             d1.addCallback(lambda res: privnode)
831             return d1
832         d.addCallback(_got_new_dir)
833         return d
834
835     def _check_publish1(self, res):
836         # this one uses the iterative API
837         c1 = self.clients[1]
838         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
839         d.addCallback(self.log, "check_publish1 got /")
840         d.addCallback(lambda root: root.get(u"subdir1"))
841         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
842         d.addCallback(lambda filenode: download_to_data(filenode))
843         d.addCallback(self.log, "get finished")
844         def _get_done(data):
845             self.failUnlessEqual(data, self.data)
846         d.addCallback(_get_done)
847         return d
848
849     def _check_publish2(self, res):
850         # this one uses the path-based API
851         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
852         d = rootnode.get_child_at_path(u"subdir1")
853         d.addCallback(lambda dirnode:
854                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
855         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
856         d.addCallback(lambda filenode: download_to_data(filenode))
857         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
858
859         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
860         def _got_filenode(filenode):
861             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
862             assert fnode == filenode
863         d.addCallback(_got_filenode)
864         return d
865
866     def _check_publish_private(self, resnode):
867         # this one uses the path-based API
868         self._private_node = resnode
869
870         d = self._private_node.get_child_at_path(u"personal")
871         def _got_personal(personal):
872             self._personal_node = personal
873             return personal
874         d.addCallback(_got_personal)
875
876         d.addCallback(lambda dirnode:
877                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
878         def get_path(path):
879             return self._private_node.get_child_at_path(path)
880
881         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
882         d.addCallback(lambda filenode: download_to_data(filenode))
883         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
884         d.addCallback(lambda res: get_path(u"s2-rw"))
885         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
886         d.addCallback(lambda res: get_path(u"s2-ro"))
887         def _got_s2ro(dirnode):
888             self.failUnless(dirnode.is_mutable(), dirnode)
889             self.failUnless(dirnode.is_readonly(), dirnode)
890             d1 = defer.succeed(None)
891             d1.addCallback(lambda res: dirnode.list())
892             d1.addCallback(self.log, "dirnode.list")
893
894             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
895
896             d1.addCallback(self.log, "doing add_file(ro)")
897             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
898             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
899
900             d1.addCallback(self.log, "doing get(ro)")
901             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
902             d1.addCallback(lambda filenode:
903                            self.failUnless(IFileNode.providedBy(filenode)))
904
905             d1.addCallback(self.log, "doing delete(ro)")
906             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
907
908             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
909
910             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
911
912             personal = self._personal_node
913             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
914
915             d1.addCallback(self.log, "doing move_child_to(ro)2")
916             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
917
918             d1.addCallback(self.log, "finished with _got_s2ro")
919             return d1
920         d.addCallback(_got_s2ro)
921         def _got_home(dummy):
922             home = self._private_node
923             personal = self._personal_node
924             d1 = defer.succeed(None)
925             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
926             d1.addCallback(lambda res:
927                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
928
929             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
930             d1.addCallback(lambda res:
931                            home.move_child_to(u"sekrit", home, u"sekrit data"))
932
933             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
934             d1.addCallback(lambda res:
935                            home.move_child_to(u"sekrit data", personal))
936
937             d1.addCallback(lambda res: home.build_manifest().when_done())
938             d1.addCallback(self.log, "manifest")
939             #  five items:
940             # P/
941             # P/personal/
942             # P/personal/sekrit data
943             # P/s2-rw  (same as P/s2-ro)
944             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
945             d1.addCallback(lambda res:
946                            self.failUnlessEqual(len(res["manifest"]), 5))
947             d1.addCallback(lambda res: home.start_deep_stats().when_done())
948             def _check_stats(stats):
949                 expected = {"count-immutable-files": 1,
950                             "count-mutable-files": 0,
951                             "count-literal-files": 1,
952                             "count-files": 2,
953                             "count-directories": 3,
954                             "size-immutable-files": 112,
955                             "size-literal-files": 23,
956                             #"size-directories": 616, # varies
957                             #"largest-directory": 616,
958                             "largest-directory-children": 3,
959                             "largest-immutable-file": 112,
960                             }
961                 for k,v in expected.iteritems():
962                     self.failUnlessEqual(stats[k], v,
963                                          "stats[%s] was %s, not %s" %
964                                          (k, stats[k], v))
965                 self.failUnless(stats["size-directories"] > 1300,
966                                 stats["size-directories"])
967                 self.failUnless(stats["largest-directory"] > 800,
968                                 stats["largest-directory"])
969                 self.failUnlessEqual(stats["size-files-histogram"],
970                                      [ (11, 31, 1), (101, 316, 1) ])
971             d1.addCallback(_check_stats)
972             return d1
973         d.addCallback(_got_home)
974         return d
975
976     def shouldFail(self, res, expected_failure, which, substring=None):
977         if isinstance(res, Failure):
978             res.trap(expected_failure)
979             if substring:
980                 self.failUnless(substring in str(res),
981                                 "substring '%s' not in '%s'"
982                                 % (substring, str(res)))
983         else:
984             self.fail("%s was supposed to raise %s, not get '%s'" %
985                       (which, expected_failure, res))
986
987     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
988         assert substring is None or isinstance(substring, str)
989         d = defer.maybeDeferred(callable, *args, **kwargs)
990         def done(res):
991             if isinstance(res, Failure):
992                 res.trap(expected_failure)
993                 if substring:
994                     self.failUnless(substring in str(res),
995                                     "substring '%s' not in '%s'"
996                                     % (substring, str(res)))
997             else:
998                 self.fail("%s was supposed to raise %s, not get '%s'" %
999                           (which, expected_failure, res))
1000         d.addBoth(done)
1001         return d
1002
1003     def PUT(self, urlpath, data):
1004         url = self.webish_url + urlpath
1005         return getPage(url, method="PUT", postdata=data)
1006
1007     def GET(self, urlpath, followRedirect=False):
1008         url = self.webish_url + urlpath
1009         return getPage(url, method="GET", followRedirect=followRedirect)
1010
1011     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1012         sepbase = "boogabooga"
1013         sep = "--" + sepbase
1014         form = []
1015         form.append(sep)
1016         form.append('Content-Disposition: form-data; name="_charset"')
1017         form.append('')
1018         form.append('UTF-8')
1019         form.append(sep)
1020         for name, value in fields.iteritems():
1021             if isinstance(value, tuple):
1022                 filename, value = value
1023                 form.append('Content-Disposition: form-data; name="%s"; '
1024                             'filename="%s"' % (name, filename.encode("utf-8")))
1025             else:
1026                 form.append('Content-Disposition: form-data; name="%s"' % name)
1027             form.append('')
1028             form.append(str(value))
1029             form.append(sep)
1030         form[-1] += "--"
1031         body = ""
1032         headers = {}
1033         if fields:
1034             body = "\r\n".join(form) + "\r\n"
1035             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1036         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1037
1038     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1039               use_helper=False):
1040         if use_helper:
1041             url = self.helper_webish_url + urlpath
1042         else:
1043             url = self.webish_url + urlpath
1044         return getPage(url, method="POST", postdata=body, headers=headers,
1045                        followRedirect=followRedirect)
1046
1047     def _test_web(self, res):
1048         base = self.webish_url
1049         public = "uri/" + self._root_directory_uri
1050         d = getPage(base)
1051         def _got_welcome(page):
1052             # XXX This test is oversensitive to formatting
1053             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1054             self.failUnless(expected in page,
1055                             "I didn't see the right 'connected storage servers'"
1056                             " message in: %s" % page
1057                             )
1058             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1059             self.failUnless(expected in page,
1060                             "I didn't see the right 'My nodeid' message "
1061                             "in: %s" % page)
1062             self.failUnless("Helper: 0 active uploads" in page)
1063         d.addCallback(_got_welcome)
1064         d.addCallback(self.log, "done with _got_welcome")
1065
1066         # get the welcome page from the node that uses the helper too
1067         d.addCallback(lambda res: getPage(self.helper_webish_url))
1068         def _got_welcome_helper(page):
1069             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1070                             page)
1071             self.failUnless("Not running helper" in page)
1072         d.addCallback(_got_welcome_helper)
1073
1074         d.addCallback(lambda res: getPage(base + public))
1075         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1076         def _got_subdir1(page):
1077             # there ought to be an href for our file
1078             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1079             self.failUnless(">mydata567</a>" in page)
1080         d.addCallback(_got_subdir1)
1081         d.addCallback(self.log, "done with _got_subdir1")
1082         d.addCallback(lambda res:
1083                       getPage(base + public + "/subdir1/mydata567"))
1084         def _got_data(page):
1085             self.failUnlessEqual(page, self.data)
1086         d.addCallback(_got_data)
1087
1088         # download from a URI embedded in a URL
1089         d.addCallback(self.log, "_get_from_uri")
1090         def _get_from_uri(res):
1091             return getPage(base + "uri/%s?filename=%s"
1092                            % (self.uri, "mydata567"))
1093         d.addCallback(_get_from_uri)
1094         def _got_from_uri(page):
1095             self.failUnlessEqual(page, self.data)
1096         d.addCallback(_got_from_uri)
1097
1098         # download from a URI embedded in a URL, second form
1099         d.addCallback(self.log, "_get_from_uri2")
1100         def _get_from_uri2(res):
1101             return getPage(base + "uri?uri=%s" % (self.uri,))
1102         d.addCallback(_get_from_uri2)
1103         d.addCallback(_got_from_uri)
1104
1105         # download from a bogus URI, make sure we get a reasonable error
1106         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1107         def _get_from_bogus_uri(res):
1108             d1 = getPage(base + "uri/%s?filename=%s"
1109                          % (self.mangle_uri(self.uri), "mydata567"))
1110             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1111                        "410")
1112             return d1
1113         d.addCallback(_get_from_bogus_uri)
1114         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1115
1116         # upload a file with PUT
1117         d.addCallback(self.log, "about to try PUT")
1118         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1119                                            "new.txt contents"))
1120         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1121         d.addCallback(self.failUnlessEqual, "new.txt contents")
1122         # and again with something large enough to use multiple segments,
1123         # and hopefully trigger pauseProducing too
1124         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1125                                            "big" * 500000)) # 1.5MB
1126         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1127         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1128
1129         # can we replace files in place?
1130         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1131                                            "NEWER contents"))
1132         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1133         d.addCallback(self.failUnlessEqual, "NEWER contents")
1134
1135         # test unlinked POST
1136         d.addCallback(lambda res: self.POST("uri", t="upload",
1137                                             file=("new.txt", "data" * 10000)))
1138         # and again using the helper, which exercises different upload-status
1139         # display code
1140         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1141                                             file=("foo.txt", "data2" * 10000)))
1142
1143         # check that the status page exists
1144         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1145         def _got_status(res):
1146             # find an interesting upload and download to look at. LIT files
1147             # are not interesting.
1148             h = self.clients[0].get_history()
1149             for ds in h.list_all_download_statuses():
1150                 if ds.get_size() > 200:
1151                     self._down_status = ds.get_counter()
1152             for us in h.list_all_upload_statuses():
1153                 if us.get_size() > 200:
1154                     self._up_status = us.get_counter()
1155             rs = list(h.list_all_retrieve_statuses())[0]
1156             self._retrieve_status = rs.get_counter()
1157             ps = list(h.list_all_publish_statuses())[0]
1158             self._publish_status = ps.get_counter()
1159             us = list(h.list_all_mapupdate_statuses())[0]
1160             self._update_status = us.get_counter()
1161
1162             # and that there are some upload- and download- status pages
1163             return self.GET("status/up-%d" % self._up_status)
1164         d.addCallback(_got_status)
1165         def _got_up(res):
1166             return self.GET("status/down-%d" % self._down_status)
1167         d.addCallback(_got_up)
1168         def _got_down(res):
1169             return self.GET("status/mapupdate-%d" % self._update_status)
1170         d.addCallback(_got_down)
1171         def _got_update(res):
1172             return self.GET("status/publish-%d" % self._publish_status)
1173         d.addCallback(_got_update)
1174         def _got_publish(res):
1175             return self.GET("status/retrieve-%d" % self._retrieve_status)
1176         d.addCallback(_got_publish)
1177
1178         # check that the helper status page exists
1179         d.addCallback(lambda res:
1180                       self.GET("helper_status", followRedirect=True))
1181         def _got_helper_status(res):
1182             self.failUnless("Bytes Fetched:" in res)
1183             # touch a couple of files in the helper's working directory to
1184             # exercise more code paths
1185             workdir = os.path.join(self.getdir("client0"), "helper")
1186             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1187             f = open(incfile, "wb")
1188             f.write("small file")
1189             f.close()
1190             then = time.time() - 86400*3
1191             now = time.time()
1192             os.utime(incfile, (now, then))
1193             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1194             f = open(encfile, "wb")
1195             f.write("less small file")
1196             f.close()
1197             os.utime(encfile, (now, then))
1198         d.addCallback(_got_helper_status)
1199         # and that the json form exists
1200         d.addCallback(lambda res:
1201                       self.GET("helper_status?t=json", followRedirect=True))
1202         def _got_helper_status_json(res):
1203             data = simplejson.loads(res)
1204             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1205                                  1)
1206             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1207             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1208             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1209                                  10)
1210             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1211             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1212             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1213                                  15)
1214         d.addCallback(_got_helper_status_json)
1215
1216         # and check that client[3] (which uses a helper but does not run one
1217         # itself) doesn't explode when you ask for its status
1218         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1219         def _got_non_helper_status(res):
1220             self.failUnless("Upload and Download Status" in res)
1221         d.addCallback(_got_non_helper_status)
1222
1223         # or for helper status with t=json
1224         d.addCallback(lambda res:
1225                       getPage(self.helper_webish_url + "helper_status?t=json"))
1226         def _got_non_helper_status_json(res):
1227             data = simplejson.loads(res)
1228             self.failUnlessEqual(data, {})
1229         d.addCallback(_got_non_helper_status_json)
1230
1231         # see if the statistics page exists
1232         d.addCallback(lambda res: self.GET("statistics"))
1233         def _got_stats(res):
1234             self.failUnless("Node Statistics" in res)
1235             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1236         d.addCallback(_got_stats)
1237         d.addCallback(lambda res: self.GET("statistics?t=json"))
1238         def _got_stats_json(res):
1239             data = simplejson.loads(res)
1240             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1241             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1242         d.addCallback(_got_stats_json)
1243
1244         # TODO: mangle the second segment of a file, to test errors that
1245         # occur after we've already sent some good data, which uses a
1246         # different error path.
1247
1248         # TODO: download a URI with a form
1249         # TODO: create a directory by using a form
1250         # TODO: upload by using a form on the directory page
1251         #    url = base + "somedir/subdir1/freeform_post!!upload"
1252         # TODO: delete a file by using a button on the directory page
1253
1254         return d
1255
1256     def _test_runner(self, res):
1257         # exercise some of the diagnostic tools in runner.py
1258
1259         # find a share
1260         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1261             if "storage" not in dirpath:
1262                 continue
1263             if not filenames:
1264                 continue
1265             pieces = dirpath.split(os.sep)
1266             if (len(pieces) >= 4
1267                 and pieces[-4] == "storage"
1268                 and pieces[-3] == "shares"):
1269                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1270                 # are sharefiles here
1271                 filename = os.path.join(dirpath, filenames[0])
1272                 # peek at the magic to see if it is a chk share
1273                 magic = open(filename, "rb").read(4)
1274                 if magic == '\x00\x00\x00\x01':
1275                     break
1276         else:
1277             self.fail("unable to find any uri_extension files in %s"
1278                       % self.basedir)
1279         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1280
1281         out,err = StringIO(), StringIO()
1282         rc = runner.runner(["debug", "dump-share", "--offsets",
1283                             filename],
1284                            stdout=out, stderr=err)
1285         output = out.getvalue()
1286         self.failUnlessEqual(rc, 0)
1287
1288         # we only upload a single file, so we can assert some things about
1289         # its size and shares.
1290         self.failUnless(("share filename: %s" % filename) in output)
1291         self.failUnless("size: %d\n" % len(self.data) in output)
1292         self.failUnless("num_segments: 1\n" in output)
1293         # segment_size is always a multiple of needed_shares
1294         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1295         self.failUnless("total_shares: 10\n" in output)
1296         # keys which are supposed to be present
1297         for key in ("size", "num_segments", "segment_size",
1298                     "needed_shares", "total_shares",
1299                     "codec_name", "codec_params", "tail_codec_params",
1300                     #"plaintext_hash", "plaintext_root_hash",
1301                     "crypttext_hash", "crypttext_root_hash",
1302                     "share_root_hash", "UEB_hash"):
1303             self.failUnless("%s: " % key in output, key)
1304         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1305
1306         # now use its storage index to find the other shares using the
1307         # 'find-shares' tool
1308         sharedir, shnum = os.path.split(filename)
1309         storagedir, storage_index_s = os.path.split(sharedir)
1310         out,err = StringIO(), StringIO()
1311         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1312         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1313         rc = runner.runner(cmd, stdout=out, stderr=err)
1314         self.failUnlessEqual(rc, 0)
1315         out.seek(0)
1316         sharefiles = [sfn.strip() for sfn in out.readlines()]
1317         self.failUnlessEqual(len(sharefiles), 10)
1318
1319         # also exercise the 'catalog-shares' tool
1320         out,err = StringIO(), StringIO()
1321         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1322         cmd = ["debug", "catalog-shares"] + nodedirs
1323         rc = runner.runner(cmd, stdout=out, stderr=err)
1324         self.failUnlessEqual(rc, 0)
1325         out.seek(0)
1326         descriptions = [sfn.strip() for sfn in out.readlines()]
1327         self.failUnlessEqual(len(descriptions), 30)
1328         matching = [line
1329                     for line in descriptions
1330                     if line.startswith("CHK %s " % storage_index_s)]
1331         self.failUnlessEqual(len(matching), 10)
1332
1333     def _test_control(self, res):
1334         # exercise the remote-control-the-client foolscap interfaces in
1335         # allmydata.control (mostly used for performance tests)
1336         c0 = self.clients[0]
1337         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1338         control_furl = open(control_furl_file, "r").read().strip()
1339         # it doesn't really matter which Tub we use to connect to the client,
1340         # so let's just use our IntroducerNode's
1341         d = self.introducer.tub.getReference(control_furl)
1342         d.addCallback(self._test_control2, control_furl_file)
1343         return d
1344     def _test_control2(self, rref, filename):
1345         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1346         downfile = os.path.join(self.basedir, "control.downfile")
1347         d.addCallback(lambda uri:
1348                       rref.callRemote("download_from_uri_to_file",
1349                                       uri, downfile))
1350         def _check(res):
1351             self.failUnlessEqual(res, downfile)
1352             data = open(downfile, "r").read()
1353             expected_data = open(filename, "r").read()
1354             self.failUnlessEqual(data, expected_data)
1355         d.addCallback(_check)
1356         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1357         if sys.platform == "linux2":
1358             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1359         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1360         return d
1361
1362     def _test_cli(self, res):
1363         # run various CLI commands (in a thread, since they use blocking
1364         # network calls)
1365
1366         private_uri = self._private_node.get_uri()
1367         some_uri = self._root_directory_uri
1368         client0_basedir = self.getdir("client0")
1369
1370         nodeargs = [
1371             "--node-directory", client0_basedir,
1372             ]
1373         TESTDATA = "I will not write the same thing over and over.\n" * 100
1374
1375         d = defer.succeed(None)
1376
1377         # for compatibility with earlier versions, private/root_dir.cap is
1378         # supposed to be treated as an alias named "tahoe:". Start by making
1379         # sure that works, before we add other aliases.
1380
1381         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1382         f = open(root_file, "w")
1383         f.write(private_uri)
1384         f.close()
1385
1386         def run(ignored, verb, *args, **kwargs):
1387             stdin = kwargs.get("stdin", "")
1388             newargs = [verb] + nodeargs + list(args)
1389             return self._run_cli(newargs, stdin=stdin)
1390
1391         def _check_ls((out,err), expected_children, unexpected_children=[]):
1392             self.failUnlessEqual(err, "")
1393             for s in expected_children:
1394                 self.failUnless(s in out, (s,out))
1395             for s in unexpected_children:
1396                 self.failIf(s in out, (s,out))
1397
1398         def _check_ls_root((out,err)):
1399             self.failUnless("personal" in out)
1400             self.failUnless("s2-ro" in out)
1401             self.failUnless("s2-rw" in out)
1402             self.failUnlessEqual(err, "")
1403
1404         # this should reference private_uri
1405         d.addCallback(run, "ls")
1406         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1407
1408         d.addCallback(run, "list-aliases")
1409         def _check_aliases_1((out,err)):
1410             self.failUnlessEqual(err, "")
1411             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1412         d.addCallback(_check_aliases_1)
1413
1414         # now that that's out of the way, remove root_dir.cap and work with
1415         # new files
1416         d.addCallback(lambda res: os.unlink(root_file))
1417         d.addCallback(run, "list-aliases")
1418         def _check_aliases_2((out,err)):
1419             self.failUnlessEqual(err, "")
1420             self.failUnlessEqual(out, "")
1421         d.addCallback(_check_aliases_2)
1422
1423         d.addCallback(run, "mkdir")
1424         def _got_dir( (out,err) ):
1425             self.failUnless(uri.from_string_dirnode(out.strip()))
1426             return out.strip()
1427         d.addCallback(_got_dir)
1428         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1429
1430         d.addCallback(run, "list-aliases")
1431         def _check_aliases_3((out,err)):
1432             self.failUnlessEqual(err, "")
1433             self.failUnless("tahoe: " in out)
1434         d.addCallback(_check_aliases_3)
1435
1436         def _check_empty_dir((out,err)):
1437             self.failUnlessEqual(out, "")
1438             self.failUnlessEqual(err, "")
1439         d.addCallback(run, "ls")
1440         d.addCallback(_check_empty_dir)
1441
1442         def _check_missing_dir((out,err)):
1443             # TODO: check that rc==2
1444             self.failUnlessEqual(out, "")
1445             self.failUnlessEqual(err, "No such file or directory\n")
1446         d.addCallback(run, "ls", "bogus")
1447         d.addCallback(_check_missing_dir)
1448
1449         files = []
1450         datas = []
1451         for i in range(10):
1452             fn = os.path.join(self.basedir, "file%d" % i)
1453             files.append(fn)
1454             data = "data to be uploaded: file%d\n" % i
1455             datas.append(data)
1456             open(fn,"wb").write(data)
1457
1458         def _check_stdout_against((out,err), filenum=None, data=None):
1459             self.failUnlessEqual(err, "")
1460             if filenum is not None:
1461                 self.failUnlessEqual(out, datas[filenum])
1462             if data is not None:
1463                 self.failUnlessEqual(out, data)
1464
1465         # test all both forms of put: from a file, and from stdin
1466         #  tahoe put bar FOO
1467         d.addCallback(run, "put", files[0], "tahoe-file0")
1468         def _put_out((out,err)):
1469             self.failUnless("URI:LIT:" in out, out)
1470             self.failUnless("201 Created" in err, err)
1471             uri0 = out.strip()
1472             return run(None, "get", uri0)
1473         d.addCallback(_put_out)
1474         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1475
1476         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1477         #  tahoe put bar tahoe:FOO
1478         d.addCallback(run, "put", files[2], "tahoe:file2")
1479         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1480         def _check_put_mutable((out,err)):
1481             self._mutable_file3_uri = out.strip()
1482         d.addCallback(_check_put_mutable)
1483         d.addCallback(run, "get", "tahoe:file3")
1484         d.addCallback(_check_stdout_against, 3)
1485
1486         #  tahoe put FOO
1487         STDIN_DATA = "This is the file to upload from stdin."
1488         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1489         #  tahoe put tahoe:FOO
1490         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1491                       stdin="Other file from stdin.")
1492
1493         d.addCallback(run, "ls")
1494         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1495                                   "tahoe-file-stdin", "from-stdin"])
1496         d.addCallback(run, "ls", "subdir")
1497         d.addCallback(_check_ls, ["tahoe-file1"])
1498
1499         # tahoe mkdir FOO
1500         d.addCallback(run, "mkdir", "subdir2")
1501         d.addCallback(run, "ls")
1502         # TODO: extract the URI, set an alias with it
1503         d.addCallback(_check_ls, ["subdir2"])
1504
1505         # tahoe get: (to stdin and to a file)
1506         d.addCallback(run, "get", "tahoe-file0")
1507         d.addCallback(_check_stdout_against, 0)
1508         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1509         d.addCallback(_check_stdout_against, 1)
1510         outfile0 = os.path.join(self.basedir, "outfile0")
1511         d.addCallback(run, "get", "file2", outfile0)
1512         def _check_outfile0((out,err)):
1513             data = open(outfile0,"rb").read()
1514             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1515         d.addCallback(_check_outfile0)
1516         outfile1 = os.path.join(self.basedir, "outfile0")
1517         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1518         def _check_outfile1((out,err)):
1519             data = open(outfile1,"rb").read()
1520             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1521         d.addCallback(_check_outfile1)
1522
1523         d.addCallback(run, "rm", "tahoe-file0")
1524         d.addCallback(run, "rm", "tahoe:file2")
1525         d.addCallback(run, "ls")
1526         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1527
1528         d.addCallback(run, "ls", "-l")
1529         def _check_ls_l((out,err)):
1530             lines = out.split("\n")
1531             for l in lines:
1532                 if "tahoe-file-stdin" in l:
1533                     self.failUnless(l.startswith("-r-- "), l)
1534                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1535                 if "file3" in l:
1536                     self.failUnless(l.startswith("-rw- "), l) # mutable
1537         d.addCallback(_check_ls_l)
1538
1539         d.addCallback(run, "ls", "--uri")
1540         def _check_ls_uri((out,err)):
1541             lines = out.split("\n")
1542             for l in lines:
1543                 if "file3" in l:
1544                     self.failUnless(self._mutable_file3_uri in l)
1545         d.addCallback(_check_ls_uri)
1546
1547         d.addCallback(run, "ls", "--readonly-uri")
1548         def _check_ls_rouri((out,err)):
1549             lines = out.split("\n")
1550             for l in lines:
1551                 if "file3" in l:
1552                     rw_uri = self._mutable_file3_uri
1553                     u = uri.from_string_mutable_filenode(rw_uri)
1554                     ro_uri = u.get_readonly().to_string()
1555                     self.failUnless(ro_uri in l)
1556         d.addCallback(_check_ls_rouri)
1557
1558
1559         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1560         d.addCallback(run, "ls")
1561         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1562
1563         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1564         d.addCallback(run, "ls")
1565         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1566
1567         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1568         d.addCallback(run, "ls")
1569         d.addCallback(_check_ls, ["file3", "file3-copy"])
1570         d.addCallback(run, "get", "tahoe:file3-copy")
1571         d.addCallback(_check_stdout_against, 3)
1572
1573         # copy from disk into tahoe
1574         d.addCallback(run, "cp", files[4], "tahoe:file4")
1575         d.addCallback(run, "ls")
1576         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1577         d.addCallback(run, "get", "tahoe:file4")
1578         d.addCallback(_check_stdout_against, 4)
1579
1580         # copy from tahoe into disk
1581         target_filename = os.path.join(self.basedir, "file-out")
1582         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1583         def _check_cp_out((out,err)):
1584             self.failUnless(os.path.exists(target_filename))
1585             got = open(target_filename,"rb").read()
1586             self.failUnlessEqual(got, datas[4])
1587         d.addCallback(_check_cp_out)
1588
1589         # copy from disk to disk (silly case)
1590         target2_filename = os.path.join(self.basedir, "file-out-copy")
1591         d.addCallback(run, "cp", target_filename, target2_filename)
1592         def _check_cp_out2((out,err)):
1593             self.failUnless(os.path.exists(target2_filename))
1594             got = open(target2_filename,"rb").read()
1595             self.failUnlessEqual(got, datas[4])
1596         d.addCallback(_check_cp_out2)
1597
1598         # copy from tahoe into disk, overwriting an existing file
1599         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1600         def _check_cp_out3((out,err)):
1601             self.failUnless(os.path.exists(target_filename))
1602             got = open(target_filename,"rb").read()
1603             self.failUnlessEqual(got, datas[3])
1604         d.addCallback(_check_cp_out3)
1605
1606         # copy from disk into tahoe, overwriting an existing immutable file
1607         d.addCallback(run, "cp", files[5], "tahoe:file4")
1608         d.addCallback(run, "ls")
1609         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1610         d.addCallback(run, "get", "tahoe:file4")
1611         d.addCallback(_check_stdout_against, 5)
1612
1613         # copy from disk into tahoe, overwriting an existing mutable file
1614         d.addCallback(run, "cp", files[5], "tahoe:file3")
1615         d.addCallback(run, "ls")
1616         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1617         d.addCallback(run, "get", "tahoe:file3")
1618         d.addCallback(_check_stdout_against, 5)
1619
1620         # recursive copy: setup
1621         dn = os.path.join(self.basedir, "dir1")
1622         os.makedirs(dn)
1623         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1624         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1625         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1626         sdn2 = os.path.join(dn, "subdir2")
1627         os.makedirs(sdn2)
1628         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1629         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1630
1631         # from disk into tahoe
1632         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1633         d.addCallback(run, "ls")
1634         d.addCallback(_check_ls, ["dir1"])
1635         d.addCallback(run, "ls", "dir1")
1636         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1637                       ["rfile4", "rfile5"])
1638         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1639         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1640                       ["rfile1", "rfile2", "rfile3"])
1641         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1642         d.addCallback(_check_stdout_against, data="rfile4")
1643
1644         # and back out again
1645         dn_copy = os.path.join(self.basedir, "dir1-copy")
1646         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1647         def _check_cp_r_out((out,err)):
1648             def _cmp(name):
1649                 old = open(os.path.join(dn, name), "rb").read()
1650                 newfn = os.path.join(dn_copy, name)
1651                 self.failUnless(os.path.exists(newfn))
1652                 new = open(newfn, "rb").read()
1653                 self.failUnlessEqual(old, new)
1654             _cmp("rfile1")
1655             _cmp("rfile2")
1656             _cmp("rfile3")
1657             _cmp(os.path.join("subdir2", "rfile4"))
1658             _cmp(os.path.join("subdir2", "rfile5"))
1659         d.addCallback(_check_cp_r_out)
1660
1661         # and copy it a second time, which ought to overwrite the same files
1662         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1663
1664         # and again, only writing filecaps
1665         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1666         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1667         def _check_capsonly((out,err)):
1668             # these should all be LITs
1669             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1670             y = uri.from_string_filenode(x)
1671             self.failUnlessEqual(y.data, "rfile4")
1672         d.addCallback(_check_capsonly)
1673
1674         # and tahoe-to-tahoe
1675         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1676         d.addCallback(run, "ls")
1677         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1678         d.addCallback(run, "ls", "dir1-copy")
1679         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1680                       ["rfile4", "rfile5"])
1681         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1682         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1683                       ["rfile1", "rfile2", "rfile3"])
1684         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1685         d.addCallback(_check_stdout_against, data="rfile4")
1686
1687         # and copy it a second time, which ought to overwrite the same files
1688         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1689
1690         # tahoe_ls doesn't currently handle the error correctly: it tries to
1691         # JSON-parse a traceback.
1692 ##         def _ls_missing(res):
1693 ##             argv = ["ls"] + nodeargs + ["bogus"]
1694 ##             return self._run_cli(argv)
1695 ##         d.addCallback(_ls_missing)
1696 ##         def _check_ls_missing((out,err)):
1697 ##             print "OUT", out
1698 ##             print "ERR", err
1699 ##             self.failUnlessEqual(err, "")
1700 ##         d.addCallback(_check_ls_missing)
1701
1702         return d
1703
1704     def _run_cli(self, argv, stdin=""):
1705         #print "CLI:", argv
1706         stdout, stderr = StringIO(), StringIO()
1707         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1708                                   stdin=StringIO(stdin),
1709                                   stdout=stdout, stderr=stderr)
1710         def _done(res):
1711             return stdout.getvalue(), stderr.getvalue()
1712         d.addCallback(_done)
1713         return d
1714
1715     def _test_checker(self, res):
1716         ut = upload.Data("too big to be literal" * 200, convergence=None)
1717         d = self._personal_node.add_file(u"big file", ut)
1718
1719         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1720         def _check_dirnode_results(r):
1721             self.failUnless(r.is_healthy())
1722         d.addCallback(_check_dirnode_results)
1723         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1724         d.addCallback(_check_dirnode_results)
1725
1726         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1727         def _got_chk_filenode(n):
1728             self.failUnless(isinstance(n, ImmutableFileNode))
1729             d = n.check(Monitor())
1730             def _check_filenode_results(r):
1731                 self.failUnless(r.is_healthy())
1732             d.addCallback(_check_filenode_results)
1733             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1734             d.addCallback(_check_filenode_results)
1735             return d
1736         d.addCallback(_got_chk_filenode)
1737
1738         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1739         def _got_lit_filenode(n):
1740             self.failUnless(isinstance(n, LiteralFileNode))
1741             d = n.check(Monitor())
1742             def _check_lit_filenode_results(r):
1743                 self.failUnlessEqual(r, None)
1744             d.addCallback(_check_lit_filenode_results)
1745             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1746             d.addCallback(_check_lit_filenode_results)
1747             return d
1748         d.addCallback(_got_lit_filenode)
1749         return d
1750