]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
hush pyflakes-0.4.0 warnings: remove trivial unused variables. For #900.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 import allmydata
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.util.consumer import MemoryConsumer, download_to_data
16 from allmydata.scripts import runner
17 from allmydata.interfaces import IDirectoryNode, IFileNode, \
18      NoSuchChildError, NoSharesError
19 from allmydata.monitor import Monitor
20 from allmydata.mutable.common import NotMutableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap.api import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
26
27 from allmydata.test.common import SystemTestMixin
28
29 LARGE_DATA = """
30 This is some data to publish to the remote grid.., which needs to be large
31 enough to not fit inside a LIT uri.
32 """
33
34 class CountingDataUploadable(upload.Data):
35     bytes_read = 0
36     interrupt_after = None
37     interrupt_after_d = None
38
39     def read(self, length):
40         self.bytes_read += length
41         if self.interrupt_after is not None:
42             if self.bytes_read > self.interrupt_after:
43                 self.interrupt_after = None
44                 self.interrupt_after_d.callback(self)
45         return upload.Data.read(self, length)
46
47 class SystemTest(SystemTestMixin, unittest.TestCase):
48     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
49
50     def test_connections(self):
51         self.basedir = "system/SystemTest/test_connections"
52         d = self.set_up_nodes()
53         self.extra_node = None
54         d.addCallback(lambda res: self.add_extra_node(self.numclients))
55         def _check(extra_node):
56             self.extra_node = extra_node
57             for c in self.clients:
58                 all_peerids = c.get_storage_broker().get_all_serverids()
59                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
60                 sb = c.storage_broker
61                 permuted_peers = sb.get_servers_for_index("a")
62                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
63
64         d.addCallback(_check)
65         def _shutdown_extra_node(res):
66             if self.extra_node:
67                 return self.extra_node.stopService()
68             return res
69         d.addBoth(_shutdown_extra_node)
70         return d
71     # test_connections is subsumed by test_upload_and_download, and takes
72     # quite a while to run on a slow machine (because of all the TLS
73     # connections that must be established). If we ever rework the introducer
74     # code to such an extent that we're not sure if it works anymore, we can
75     # reinstate this test until it does.
76     del test_connections
77
78     def test_upload_and_download_random_key(self):
79         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
80         return self._test_upload_and_download(convergence=None)
81
82     def test_upload_and_download_convergent(self):
83         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
84         return self._test_upload_and_download(convergence="some convergence string")
85
86     def _test_upload_and_download(self, convergence):
87         # we use 4000 bytes of data, which will result in about 400k written
88         # to disk among all our simulated nodes
89         DATA = "Some data to upload\n" * 200
90         d = self.set_up_nodes()
91         def _check_connections(res):
92             for c in self.clients:
93                 all_peerids = c.get_storage_broker().get_all_serverids()
94                 self.failUnlessEqual(len(all_peerids), self.numclients)
95                 sb = c.storage_broker
96                 permuted_peers = sb.get_servers_for_index("a")
97                 self.failUnlessEqual(len(permuted_peers), self.numclients)
98         d.addCallback(_check_connections)
99
100         def _do_upload(res):
101             log.msg("UPLOADING")
102             u = self.clients[0].getServiceNamed("uploader")
103             self.uploader = u
104             # we crank the max segsize down to 1024b for the duration of this
105             # test, so we can exercise multiple segments. It is important
106             # that this is not a multiple of the segment size, so that the
107             # tail segment is not the same length as the others. This actualy
108             # gets rounded up to 1025 to be a multiple of the number of
109             # required shares (since we use 25 out of 100 FEC).
110             up = upload.Data(DATA, convergence=convergence)
111             up.max_segment_size = 1024
112             d1 = u.upload(up)
113             return d1
114         d.addCallback(_do_upload)
115         def _upload_done(results):
116             theuri = results.uri
117             log.msg("upload finished: uri is %s" % (theuri,))
118             self.uri = theuri
119             assert isinstance(self.uri, str), self.uri
120             self.cap = uri.from_string(self.uri)
121             self.n = self.clients[1].create_node_from_uri(self.uri)
122         d.addCallback(_upload_done)
123
124         def _upload_again(res):
125             # Upload again. If using convergent encryption then this ought to be
126             # short-circuited, however with the way we currently generate URIs
127             # (i.e. because they include the roothash), we have to do all of the
128             # encoding work, and only get to save on the upload part.
129             log.msg("UPLOADING AGAIN")
130             up = upload.Data(DATA, convergence=convergence)
131             up.max_segment_size = 1024
132             d1 = self.uploader.upload(up)
133         d.addCallback(_upload_again)
134
135         def _download_to_data(res):
136             log.msg("DOWNLOADING")
137             return download_to_data(self.n)
138         d.addCallback(_download_to_data)
139         def _download_to_data_done(data):
140             log.msg("download finished")
141             self.failUnlessEqual(data, DATA)
142         d.addCallback(_download_to_data_done)
143
144         def _test_read(res):
145             n = self.clients[1].create_node_from_uri(self.uri)
146             d = download_to_data(n)
147             def _read_done(data):
148                 self.failUnlessEqual(data, DATA)
149             d.addCallback(_read_done)
150             d.addCallback(lambda ign:
151                           n.read(MemoryConsumer(), offset=1, size=4))
152             def _read_portion_done(mc):
153                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
154             d.addCallback(_read_portion_done)
155             d.addCallback(lambda ign:
156                           n.read(MemoryConsumer(), offset=2, size=None))
157             def _read_tail_done(mc):
158                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
159             d.addCallback(_read_tail_done)
160             d.addCallback(lambda ign:
161                           n.read(MemoryConsumer(), size=len(DATA)+1000))
162             def _read_too_much(mc):
163                 self.failUnlessEqual("".join(mc.chunks), DATA)
164             d.addCallback(_read_too_much)
165
166             return d
167         d.addCallback(_test_read)
168
169         def _test_bad_read(res):
170             bad_u = uri.from_string_filenode(self.uri)
171             bad_u.key = self.flip_bit(bad_u.key)
172             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
173             # this should cause an error during download
174
175             d = self.shouldFail2(NoSharesError, "'download bad node'",
176                                  None,
177                                  bad_n.read, MemoryConsumer(), offset=2)
178             return d
179         d.addCallback(_test_bad_read)
180
181         def _download_nonexistent_uri(res):
182             baduri = self.mangle_uri(self.uri)
183             badnode = self.clients[1].create_node_from_uri(baduri)
184             log.msg("about to download non-existent URI", level=log.UNUSUAL,
185                     facility="tahoe.tests")
186             d1 = download_to_data(badnode)
187             def _baduri_should_fail(res):
188                 log.msg("finished downloading non-existend URI",
189                         level=log.UNUSUAL, facility="tahoe.tests")
190                 self.failUnless(isinstance(res, Failure))
191                 self.failUnless(res.check(NoSharesError),
192                                 "expected NoSharesError, got %s" % res)
193             d1.addBoth(_baduri_should_fail)
194             return d1
195         d.addCallback(_download_nonexistent_uri)
196
197         # add a new node, which doesn't accept shares, and only uses the
198         # helper for upload.
199         d.addCallback(lambda res: self.add_extra_node(self.numclients,
200                                                       self.helper_furl,
201                                                       add_to_sparent=True))
202         def _added(extra_node):
203             self.extra_node = extra_node
204         d.addCallback(_added)
205
206         HELPER_DATA = "Data that needs help to upload" * 1000
207         def _upload_with_helper(res):
208             u = upload.Data(HELPER_DATA, convergence=convergence)
209             d = self.extra_node.upload(u)
210             def _uploaded(results):
211                 n = self.clients[1].create_node_from_uri(results.uri)
212                 return download_to_data(n)
213             d.addCallback(_uploaded)
214             def _check(newdata):
215                 self.failUnlessEqual(newdata, HELPER_DATA)
216             d.addCallback(_check)
217             return d
218         d.addCallback(_upload_with_helper)
219
220         def _upload_duplicate_with_helper(res):
221             u = upload.Data(HELPER_DATA, convergence=convergence)
222             u.debug_stash_RemoteEncryptedUploadable = True
223             d = self.extra_node.upload(u)
224             def _uploaded(results):
225                 n = self.clients[1].create_node_from_uri(results.uri)
226                 return download_to_data(n)
227             d.addCallback(_uploaded)
228             def _check(newdata):
229                 self.failUnlessEqual(newdata, HELPER_DATA)
230                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
231                             "uploadable started uploading, should have been avoided")
232             d.addCallback(_check)
233             return d
234         if convergence is not None:
235             d.addCallback(_upload_duplicate_with_helper)
236
237         def _upload_resumable(res):
238             DATA = "Data that needs help to upload and gets interrupted" * 1000
239             u1 = CountingDataUploadable(DATA, convergence=convergence)
240             u2 = CountingDataUploadable(DATA, convergence=convergence)
241
242             # we interrupt the connection after about 5kB by shutting down
243             # the helper, then restartingit.
244             u1.interrupt_after = 5000
245             u1.interrupt_after_d = defer.Deferred()
246             u1.interrupt_after_d.addCallback(lambda res:
247                                              self.bounce_client(0))
248
249             # sneak into the helper and reduce its chunk size, so that our
250             # debug_interrupt will sever the connection on about the fifth
251             # chunk fetched. This makes sure that we've started to write the
252             # new shares before we abandon them, which exercises the
253             # abort/delete-partial-share code. TODO: find a cleaner way to do
254             # this. I know that this will affect later uses of the helper in
255             # this same test run, but I'm not currently worried about it.
256             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
257
258             d = self.extra_node.upload(u1)
259
260             def _should_not_finish(res):
261                 self.fail("interrupted upload should have failed, not finished"
262                           " with result %s" % (res,))
263             def _interrupted(f):
264                 f.trap(DeadReferenceError)
265
266                 # make sure we actually interrupted it before finishing the
267                 # file
268                 self.failUnless(u1.bytes_read < len(DATA),
269                                 "read %d out of %d total" % (u1.bytes_read,
270                                                              len(DATA)))
271
272                 log.msg("waiting for reconnect", level=log.NOISY,
273                         facility="tahoe.test.test_system")
274                 # now, we need to give the nodes a chance to notice that this
275                 # connection has gone away. When this happens, the storage
276                 # servers will be told to abort their uploads, removing the
277                 # partial shares. Unfortunately this involves TCP messages
278                 # going through the loopback interface, and we can't easily
279                 # predict how long that will take. If it were all local, we
280                 # could use fireEventually() to stall. Since we don't have
281                 # the right introduction hooks, the best we can do is use a
282                 # fixed delay. TODO: this is fragile.
283                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
284                 return u1.interrupt_after_d
285             d.addCallbacks(_should_not_finish, _interrupted)
286
287             def _disconnected(res):
288                 # check to make sure the storage servers aren't still hanging
289                 # on to the partial share: their incoming/ directories should
290                 # now be empty.
291                 log.msg("disconnected", level=log.NOISY,
292                         facility="tahoe.test.test_system")
293                 for i in range(self.numclients):
294                     incdir = os.path.join(self.getdir("client%d" % i),
295                                           "storage", "shares", "incoming")
296                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
297             d.addCallback(_disconnected)
298
299             # then we need to give the reconnector a chance to
300             # reestablish the connection to the helper.
301             d.addCallback(lambda res:
302                           log.msg("wait_for_connections", level=log.NOISY,
303                                   facility="tahoe.test.test_system"))
304             d.addCallback(lambda res: self.wait_for_connections())
305
306
307             d.addCallback(lambda res:
308                           log.msg("uploading again", level=log.NOISY,
309                                   facility="tahoe.test.test_system"))
310             d.addCallback(lambda res: self.extra_node.upload(u2))
311
312             def _uploaded(results):
313                 cap = results.uri
314                 log.msg("Second upload complete", level=log.NOISY,
315                         facility="tahoe.test.test_system")
316
317                 # this is really bytes received rather than sent, but it's
318                 # convenient and basically measures the same thing
319                 bytes_sent = results.ciphertext_fetched
320
321                 # We currently don't support resumption of upload if the data is
322                 # encrypted with a random key.  (Because that would require us
323                 # to store the key locally and re-use it on the next upload of
324                 # this file, which isn't a bad thing to do, but we currently
325                 # don't do it.)
326                 if convergence is not None:
327                     # Make sure we did not have to read the whole file the
328                     # second time around .
329                     self.failUnless(bytes_sent < len(DATA),
330                                 "resumption didn't save us any work:"
331                                 " read %d bytes out of %d total" %
332                                 (bytes_sent, len(DATA)))
333                 else:
334                     # Make sure we did have to read the whole file the second
335                     # time around -- because the one that we partially uploaded
336                     # earlier was encrypted with a different random key.
337                     self.failIf(bytes_sent < len(DATA),
338                                 "resumption saved us some work even though we were using random keys:"
339                                 " read %d bytes out of %d total" %
340                                 (bytes_sent, len(DATA)))
341                 n = self.clients[1].create_node_from_uri(cap)
342                 return download_to_data(n)
343             d.addCallback(_uploaded)
344
345             def _check(newdata):
346                 self.failUnlessEqual(newdata, DATA)
347                 # If using convergent encryption, then also check that the
348                 # helper has removed the temp file from its directories.
349                 if convergence is not None:
350                     basedir = os.path.join(self.getdir("client0"), "helper")
351                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
352                     self.failUnlessEqual(files, [])
353                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
354                     self.failUnlessEqual(files, [])
355             d.addCallback(_check)
356             return d
357         d.addCallback(_upload_resumable)
358
359         def _grab_stats(ignored):
360             # the StatsProvider doesn't normally publish a FURL:
361             # instead it passes a live reference to the StatsGatherer
362             # (if and when it connects). To exercise the remote stats
363             # interface, we manually publish client0's StatsProvider
364             # and use client1 to query it.
365             sp = self.clients[0].stats_provider
366             sp_furl = self.clients[0].tub.registerReference(sp)
367             d = self.clients[1].tub.getReference(sp_furl)
368             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
369             def _got_stats(stats):
370                 #print "STATS"
371                 #from pprint import pprint
372                 #pprint(stats)
373                 s = stats["stats"]
374                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
375                 c = stats["counters"]
376                 self.failUnless("storage_server.allocate" in c)
377             d.addCallback(_got_stats)
378             return d
379         d.addCallback(_grab_stats)
380
381         return d
382
383     def _find_shares(self, basedir):
384         shares = []
385         for (dirpath, dirnames, filenames) in os.walk(basedir):
386             if "storage" not in dirpath:
387                 continue
388             if not filenames:
389                 continue
390             pieces = dirpath.split(os.sep)
391             if (len(pieces) >= 5
392                 and pieces[-4] == "storage"
393                 and pieces[-3] == "shares"):
394                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
395                 # are sharefiles here
396                 assert pieces[-5].startswith("client")
397                 client_num = int(pieces[-5][-1])
398                 storage_index_s = pieces[-1]
399                 storage_index = si_a2b(storage_index_s)
400                 for sharename in filenames:
401                     shnum = int(sharename)
402                     filename = os.path.join(dirpath, sharename)
403                     data = (client_num, storage_index, filename, shnum)
404                     shares.append(data)
405         if not shares:
406             self.fail("unable to find any share files in %s" % basedir)
407         return shares
408
409     def _corrupt_mutable_share(self, filename, which):
410         msf = MutableShareFile(filename)
411         datav = msf.readv([ (0, 1000000) ])
412         final_share = datav[0]
413         assert len(final_share) < 1000000 # ought to be truncated
414         pieces = mutable_layout.unpack_share(final_share)
415         (seqnum, root_hash, IV, k, N, segsize, datalen,
416          verification_key, signature, share_hash_chain, block_hash_tree,
417          share_data, enc_privkey) = pieces
418
419         if which == "seqnum":
420             seqnum = seqnum + 15
421         elif which == "R":
422             root_hash = self.flip_bit(root_hash)
423         elif which == "IV":
424             IV = self.flip_bit(IV)
425         elif which == "segsize":
426             segsize = segsize + 15
427         elif which == "pubkey":
428             verification_key = self.flip_bit(verification_key)
429         elif which == "signature":
430             signature = self.flip_bit(signature)
431         elif which == "share_hash_chain":
432             nodenum = share_hash_chain.keys()[0]
433             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
434         elif which == "block_hash_tree":
435             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
436         elif which == "share_data":
437             share_data = self.flip_bit(share_data)
438         elif which == "encprivkey":
439             enc_privkey = self.flip_bit(enc_privkey)
440
441         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
442                                             segsize, datalen)
443         final_share = mutable_layout.pack_share(prefix,
444                                                 verification_key,
445                                                 signature,
446                                                 share_hash_chain,
447                                                 block_hash_tree,
448                                                 share_data,
449                                                 enc_privkey)
450         msf.writev( [(0, final_share)], None)
451
452
453     def test_mutable(self):
454         self.basedir = "system/SystemTest/test_mutable"
455         DATA = "initial contents go here."  # 25 bytes % 3 != 0
456         NEWDATA = "new contents yay"
457         NEWERDATA = "this is getting old"
458
459         d = self.set_up_nodes(use_key_generator=True)
460
461         def _create_mutable(res):
462             c = self.clients[0]
463             log.msg("starting create_mutable_file")
464             d1 = c.create_mutable_file(DATA)
465             def _done(res):
466                 log.msg("DONE: %s" % (res,))
467                 self._mutable_node_1 = res
468             d1.addCallback(_done)
469             return d1
470         d.addCallback(_create_mutable)
471
472         def _test_debug(res):
473             # find a share. It is important to run this while there is only
474             # one slot in the grid.
475             shares = self._find_shares(self.basedir)
476             (client_num, storage_index, filename, shnum) = shares[0]
477             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
478                     % filename)
479             log.msg(" for clients[%d]" % client_num)
480
481             out,err = StringIO(), StringIO()
482             rc = runner.runner(["debug", "dump-share", "--offsets",
483                                 filename],
484                                stdout=out, stderr=err)
485             output = out.getvalue()
486             self.failUnlessEqual(rc, 0)
487             try:
488                 self.failUnless("Mutable slot found:\n" in output)
489                 self.failUnless("share_type: SDMF\n" in output)
490                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
491                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
492                 self.failUnless(" num_extra_leases: 0\n" in output)
493                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
494                                 in output)
495                 self.failUnless(" SDMF contents:\n" in output)
496                 self.failUnless("  seqnum: 1\n" in output)
497                 self.failUnless("  required_shares: 3\n" in output)
498                 self.failUnless("  total_shares: 10\n" in output)
499                 self.failUnless("  segsize: 27\n" in output, (output, filename))
500                 self.failUnless("  datalen: 25\n" in output)
501                 # the exact share_hash_chain nodes depends upon the sharenum,
502                 # and is more of a hassle to compute than I want to deal with
503                 # now
504                 self.failUnless("  share_hash_chain: " in output)
505                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
506                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
507                             base32.b2a(storage_index))
508                 self.failUnless(expected in output)
509             except unittest.FailTest:
510                 print
511                 print "dump-share output was:"
512                 print output
513                 raise
514         d.addCallback(_test_debug)
515
516         # test retrieval
517
518         # first, let's see if we can use the existing node to retrieve the
519         # contents. This allows it to use the cached pubkey and maybe the
520         # latest-known sharemap.
521
522         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
523         def _check_download_1(res):
524             self.failUnlessEqual(res, DATA)
525             # now we see if we can retrieve the data from a new node,
526             # constructed using the URI of the original one. We do this test
527             # on the same client that uploaded the data.
528             uri = self._mutable_node_1.get_uri()
529             log.msg("starting retrieve1")
530             newnode = self.clients[0].create_node_from_uri(uri)
531             newnode_2 = self.clients[0].create_node_from_uri(uri)
532             self.failUnlessIdentical(newnode, newnode_2)
533             return newnode.download_best_version()
534         d.addCallback(_check_download_1)
535
536         def _check_download_2(res):
537             self.failUnlessEqual(res, DATA)
538             # same thing, but with a different client
539             uri = self._mutable_node_1.get_uri()
540             newnode = self.clients[1].create_node_from_uri(uri)
541             log.msg("starting retrieve2")
542             d1 = newnode.download_best_version()
543             d1.addCallback(lambda res: (res, newnode))
544             return d1
545         d.addCallback(_check_download_2)
546
547         def _check_download_3((res, newnode)):
548             self.failUnlessEqual(res, DATA)
549             # replace the data
550             log.msg("starting replace1")
551             d1 = newnode.overwrite(NEWDATA)
552             d1.addCallback(lambda res: newnode.download_best_version())
553             return d1
554         d.addCallback(_check_download_3)
555
556         def _check_download_4(res):
557             self.failUnlessEqual(res, NEWDATA)
558             # now create an even newer node and replace the data on it. This
559             # new node has never been used for download before.
560             uri = self._mutable_node_1.get_uri()
561             newnode1 = self.clients[2].create_node_from_uri(uri)
562             newnode2 = self.clients[3].create_node_from_uri(uri)
563             self._newnode3 = self.clients[3].create_node_from_uri(uri)
564             log.msg("starting replace2")
565             d1 = newnode1.overwrite(NEWERDATA)
566             d1.addCallback(lambda res: newnode2.download_best_version())
567             return d1
568         d.addCallback(_check_download_4)
569
570         def _check_download_5(res):
571             log.msg("finished replace2")
572             self.failUnlessEqual(res, NEWERDATA)
573         d.addCallback(_check_download_5)
574
575         def _corrupt_shares(res):
576             # run around and flip bits in all but k of the shares, to test
577             # the hash checks
578             shares = self._find_shares(self.basedir)
579             ## sort by share number
580             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
581             where = dict([ (shnum, filename)
582                            for (client_num, storage_index, filename, shnum)
583                            in shares ])
584             assert len(where) == 10 # this test is designed for 3-of-10
585             for shnum, filename in where.items():
586                 # shares 7,8,9 are left alone. read will check
587                 # (share_hash_chain, block_hash_tree, share_data). New
588                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
589                 # segsize, signature).
590                 if shnum == 0:
591                     # read: this will trigger "pubkey doesn't match
592                     # fingerprint".
593                     self._corrupt_mutable_share(filename, "pubkey")
594                     self._corrupt_mutable_share(filename, "encprivkey")
595                 elif shnum == 1:
596                     # triggers "signature is invalid"
597                     self._corrupt_mutable_share(filename, "seqnum")
598                 elif shnum == 2:
599                     # triggers "signature is invalid"
600                     self._corrupt_mutable_share(filename, "R")
601                 elif shnum == 3:
602                     # triggers "signature is invalid"
603                     self._corrupt_mutable_share(filename, "segsize")
604                 elif shnum == 4:
605                     self._corrupt_mutable_share(filename, "share_hash_chain")
606                 elif shnum == 5:
607                     self._corrupt_mutable_share(filename, "block_hash_tree")
608                 elif shnum == 6:
609                     self._corrupt_mutable_share(filename, "share_data")
610                 # other things to correct: IV, signature
611                 # 7,8,9 are left alone
612
613                 # note that initial_query_count=5 means that we'll hit the
614                 # first 5 servers in effectively random order (based upon
615                 # response time), so we won't necessarily ever get a "pubkey
616                 # doesn't match fingerprint" error (if we hit shnum>=1 before
617                 # shnum=0, we pull the pubkey from there). To get repeatable
618                 # specific failures, we need to set initial_query_count=1,
619                 # but of course that will change the sequencing behavior of
620                 # the retrieval process. TODO: find a reasonable way to make
621                 # this a parameter, probably when we expand this test to test
622                 # for one failure mode at a time.
623
624                 # when we retrieve this, we should get three signature
625                 # failures (where we've mangled seqnum, R, and segsize). The
626                 # pubkey mangling
627         d.addCallback(_corrupt_shares)
628
629         d.addCallback(lambda res: self._newnode3.download_best_version())
630         d.addCallback(_check_download_5)
631
632         def _check_empty_file(res):
633             # make sure we can create empty files, this usually screws up the
634             # segsize math
635             d1 = self.clients[2].create_mutable_file("")
636             d1.addCallback(lambda newnode: newnode.download_best_version())
637             d1.addCallback(lambda res: self.failUnlessEqual("", res))
638             return d1
639         d.addCallback(_check_empty_file)
640
641         d.addCallback(lambda res: self.clients[0].create_dirnode())
642         def _created_dirnode(dnode):
643             log.msg("_created_dirnode(%s)" % (dnode,))
644             d1 = dnode.list()
645             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
646             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
647             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
648             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
649             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
650             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
651             d1.addCallback(lambda res: dnode.build_manifest().when_done())
652             d1.addCallback(lambda res:
653                            self.failUnlessEqual(len(res["manifest"]), 1))
654             return d1
655         d.addCallback(_created_dirnode)
656
657         def wait_for_c3_kg_conn():
658             return self.clients[3]._key_generator is not None
659         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
660
661         def check_kg_poolsize(junk, size_delta):
662             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
663                                  self.key_generator_svc.key_generator.pool_size + size_delta)
664
665         d.addCallback(check_kg_poolsize, 0)
666         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
667         d.addCallback(check_kg_poolsize, -1)
668         d.addCallback(lambda junk: self.clients[3].create_dirnode())
669         d.addCallback(check_kg_poolsize, -2)
670         # use_helper induces use of clients[3], which is the using-key_gen client
671         d.addCallback(lambda junk:
672                       self.POST("uri?t=mkdir&name=george", use_helper=True))
673         d.addCallback(check_kg_poolsize, -3)
674
675         return d
676
677     def flip_bit(self, good):
678         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
679
680     def mangle_uri(self, gooduri):
681         # change the key, which changes the storage index, which means we'll
682         # be asking about the wrong file, so nobody will have any shares
683         u = uri.from_string(gooduri)
684         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
685                             uri_extension_hash=u.uri_extension_hash,
686                             needed_shares=u.needed_shares,
687                             total_shares=u.total_shares,
688                             size=u.size)
689         return u2.to_string()
690
691     # TODO: add a test which mangles the uri_extension_hash instead, and
692     # should fail due to not being able to get a valid uri_extension block.
693     # Also a test which sneakily mangles the uri_extension block to change
694     # some of the validation data, so it will fail in the post-download phase
695     # when the file's crypttext integrity check fails. Do the same thing for
696     # the key, which should cause the download to fail the post-download
697     # plaintext_hash check.
698
699     def test_filesystem(self):
700         self.basedir = "system/SystemTest/test_filesystem"
701         self.data = LARGE_DATA
702         d = self.set_up_nodes(use_stats_gatherer=True)
703         d.addCallback(self._test_introweb)
704         d.addCallback(self.log, "starting publish")
705         d.addCallback(self._do_publish1)
706         d.addCallback(self._test_runner)
707         d.addCallback(self._do_publish2)
708         # at this point, we have the following filesystem (where "R" denotes
709         # self._root_directory_uri):
710         # R
711         # R/subdir1
712         # R/subdir1/mydata567
713         # R/subdir1/subdir2/
714         # R/subdir1/subdir2/mydata992
715
716         d.addCallback(lambda res: self.bounce_client(0))
717         d.addCallback(self.log, "bounced client0")
718
719         d.addCallback(self._check_publish1)
720         d.addCallback(self.log, "did _check_publish1")
721         d.addCallback(self._check_publish2)
722         d.addCallback(self.log, "did _check_publish2")
723         d.addCallback(self._do_publish_private)
724         d.addCallback(self.log, "did _do_publish_private")
725         # now we also have (where "P" denotes a new dir):
726         #  P/personal/sekrit data
727         #  P/s2-rw -> /subdir1/subdir2/
728         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
729         d.addCallback(self._check_publish_private)
730         d.addCallback(self.log, "did _check_publish_private")
731         d.addCallback(self._test_web)
732         d.addCallback(self._test_control)
733         d.addCallback(self._test_cli)
734         # P now has four top-level children:
735         # P/personal/sekrit data
736         # P/s2-ro/
737         # P/s2-rw/
738         # P/test_put/  (empty)
739         d.addCallback(self._test_checker)
740         return d
741
742     def _test_introweb(self, res):
743         d = getPage(self.introweb_url, method="GET", followRedirect=True)
744         def _check(res):
745             try:
746                 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
747                                 in res)
748                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
749                 self.failUnless("Subscription Summary: storage: 5" in res)
750             except unittest.FailTest:
751                 print
752                 print "GET %s output was:" % self.introweb_url
753                 print res
754                 raise
755         d.addCallback(_check)
756         d.addCallback(lambda res:
757                       getPage(self.introweb_url + "?t=json",
758                               method="GET", followRedirect=True))
759         def _check_json(res):
760             data = simplejson.loads(res)
761             try:
762                 self.failUnlessEqual(data["subscription_summary"],
763                                      {"storage": 5})
764                 self.failUnlessEqual(data["announcement_summary"],
765                                      {"storage": 5, "stub_client": 5})
766                 self.failUnlessEqual(data["announcement_distinct_hosts"],
767                                      {"storage": 1, "stub_client": 1})
768             except unittest.FailTest:
769                 print
770                 print "GET %s?t=json output was:" % self.introweb_url
771                 print res
772                 raise
773         d.addCallback(_check_json)
774         return d
775
776     def _do_publish1(self, res):
777         ut = upload.Data(self.data, convergence=None)
778         c0 = self.clients[0]
779         d = c0.create_dirnode()
780         def _made_root(new_dirnode):
781             self._root_directory_uri = new_dirnode.get_uri()
782             return c0.create_node_from_uri(self._root_directory_uri)
783         d.addCallback(_made_root)
784         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
785         def _made_subdir1(subdir1_node):
786             self._subdir1_node = subdir1_node
787             d1 = subdir1_node.add_file(u"mydata567", ut)
788             d1.addCallback(self.log, "publish finished")
789             def _stash_uri(filenode):
790                 self.uri = filenode.get_uri()
791                 assert isinstance(self.uri, str), (self.uri, filenode)
792             d1.addCallback(_stash_uri)
793             return d1
794         d.addCallback(_made_subdir1)
795         return d
796
797     def _do_publish2(self, res):
798         ut = upload.Data(self.data, convergence=None)
799         d = self._subdir1_node.create_subdirectory(u"subdir2")
800         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
801         return d
802
803     def log(self, res, *args, **kwargs):
804         # print "MSG: %s  RES: %s" % (msg, args)
805         log.msg(*args, **kwargs)
806         return res
807
808     def _do_publish_private(self, res):
809         self.smalldata = "sssh, very secret stuff"
810         ut = upload.Data(self.smalldata, convergence=None)
811         d = self.clients[0].create_dirnode()
812         d.addCallback(self.log, "GOT private directory")
813         def _got_new_dir(privnode):
814             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
815             d1 = privnode.create_subdirectory(u"personal")
816             d1.addCallback(self.log, "made P/personal")
817             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
818             d1.addCallback(self.log, "made P/personal/sekrit data")
819             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
820             def _got_s2(s2node):
821                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
822                                       s2node.get_readonly_uri())
823                 d2.addCallback(lambda node:
824                                privnode.set_uri(u"s2-ro",
825                                                 s2node.get_readonly_uri(),
826                                                 s2node.get_readonly_uri()))
827                 return d2
828             d1.addCallback(_got_s2)
829             d1.addCallback(lambda res: privnode)
830             return d1
831         d.addCallback(_got_new_dir)
832         return d
833
834     def _check_publish1(self, res):
835         # this one uses the iterative API
836         c1 = self.clients[1]
837         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
838         d.addCallback(self.log, "check_publish1 got /")
839         d.addCallback(lambda root: root.get(u"subdir1"))
840         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
841         d.addCallback(lambda filenode: download_to_data(filenode))
842         d.addCallback(self.log, "get finished")
843         def _get_done(data):
844             self.failUnlessEqual(data, self.data)
845         d.addCallback(_get_done)
846         return d
847
848     def _check_publish2(self, res):
849         # this one uses the path-based API
850         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
851         d = rootnode.get_child_at_path(u"subdir1")
852         d.addCallback(lambda dirnode:
853                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
854         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
855         d.addCallback(lambda filenode: download_to_data(filenode))
856         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
857
858         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
859         def _got_filenode(filenode):
860             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
861             assert fnode == filenode
862         d.addCallback(_got_filenode)
863         return d
864
865     def _check_publish_private(self, resnode):
866         # this one uses the path-based API
867         self._private_node = resnode
868
869         d = self._private_node.get_child_at_path(u"personal")
870         def _got_personal(personal):
871             self._personal_node = personal
872             return personal
873         d.addCallback(_got_personal)
874
875         d.addCallback(lambda dirnode:
876                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
877         def get_path(path):
878             return self._private_node.get_child_at_path(path)
879
880         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
881         d.addCallback(lambda filenode: download_to_data(filenode))
882         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
883         d.addCallback(lambda res: get_path(u"s2-rw"))
884         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
885         d.addCallback(lambda res: get_path(u"s2-ro"))
886         def _got_s2ro(dirnode):
887             self.failUnless(dirnode.is_mutable(), dirnode)
888             self.failUnless(dirnode.is_readonly(), dirnode)
889             d1 = defer.succeed(None)
890             d1.addCallback(lambda res: dirnode.list())
891             d1.addCallback(self.log, "dirnode.list")
892
893             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
894
895             d1.addCallback(self.log, "doing add_file(ro)")
896             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
897             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
898
899             d1.addCallback(self.log, "doing get(ro)")
900             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
901             d1.addCallback(lambda filenode:
902                            self.failUnless(IFileNode.providedBy(filenode)))
903
904             d1.addCallback(self.log, "doing delete(ro)")
905             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
906
907             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
908
909             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
910
911             personal = self._personal_node
912             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
913
914             d1.addCallback(self.log, "doing move_child_to(ro)2")
915             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
916
917             d1.addCallback(self.log, "finished with _got_s2ro")
918             return d1
919         d.addCallback(_got_s2ro)
920         def _got_home(dummy):
921             home = self._private_node
922             personal = self._personal_node
923             d1 = defer.succeed(None)
924             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
925             d1.addCallback(lambda res:
926                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
927
928             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
929             d1.addCallback(lambda res:
930                            home.move_child_to(u"sekrit", home, u"sekrit data"))
931
932             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
933             d1.addCallback(lambda res:
934                            home.move_child_to(u"sekrit data", personal))
935
936             d1.addCallback(lambda res: home.build_manifest().when_done())
937             d1.addCallback(self.log, "manifest")
938             #  five items:
939             # P/
940             # P/personal/
941             # P/personal/sekrit data
942             # P/s2-rw  (same as P/s2-ro)
943             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
944             d1.addCallback(lambda res:
945                            self.failUnlessEqual(len(res["manifest"]), 5))
946             d1.addCallback(lambda res: home.start_deep_stats().when_done())
947             def _check_stats(stats):
948                 expected = {"count-immutable-files": 1,
949                             "count-mutable-files": 0,
950                             "count-literal-files": 1,
951                             "count-files": 2,
952                             "count-directories": 3,
953                             "size-immutable-files": 112,
954                             "size-literal-files": 23,
955                             #"size-directories": 616, # varies
956                             #"largest-directory": 616,
957                             "largest-directory-children": 3,
958                             "largest-immutable-file": 112,
959                             }
960                 for k,v in expected.iteritems():
961                     self.failUnlessEqual(stats[k], v,
962                                          "stats[%s] was %s, not %s" %
963                                          (k, stats[k], v))
964                 self.failUnless(stats["size-directories"] > 1300,
965                                 stats["size-directories"])
966                 self.failUnless(stats["largest-directory"] > 800,
967                                 stats["largest-directory"])
968                 self.failUnlessEqual(stats["size-files-histogram"],
969                                      [ (11, 31, 1), (101, 316, 1) ])
970             d1.addCallback(_check_stats)
971             return d1
972         d.addCallback(_got_home)
973         return d
974
975     def shouldFail(self, res, expected_failure, which, substring=None):
976         if isinstance(res, Failure):
977             res.trap(expected_failure)
978             if substring:
979                 self.failUnless(substring in str(res),
980                                 "substring '%s' not in '%s'"
981                                 % (substring, str(res)))
982         else:
983             self.fail("%s was supposed to raise %s, not get '%s'" %
984                       (which, expected_failure, res))
985
986     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
987         assert substring is None or isinstance(substring, str)
988         d = defer.maybeDeferred(callable, *args, **kwargs)
989         def done(res):
990             if isinstance(res, Failure):
991                 res.trap(expected_failure)
992                 if substring:
993                     self.failUnless(substring in str(res),
994                                     "substring '%s' not in '%s'"
995                                     % (substring, str(res)))
996             else:
997                 self.fail("%s was supposed to raise %s, not get '%s'" %
998                           (which, expected_failure, res))
999         d.addBoth(done)
1000         return d
1001
1002     def PUT(self, urlpath, data):
1003         url = self.webish_url + urlpath
1004         return getPage(url, method="PUT", postdata=data)
1005
1006     def GET(self, urlpath, followRedirect=False):
1007         url = self.webish_url + urlpath
1008         return getPage(url, method="GET", followRedirect=followRedirect)
1009
1010     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1011         sepbase = "boogabooga"
1012         sep = "--" + sepbase
1013         form = []
1014         form.append(sep)
1015         form.append('Content-Disposition: form-data; name="_charset"')
1016         form.append('')
1017         form.append('UTF-8')
1018         form.append(sep)
1019         for name, value in fields.iteritems():
1020             if isinstance(value, tuple):
1021                 filename, value = value
1022                 form.append('Content-Disposition: form-data; name="%s"; '
1023                             'filename="%s"' % (name, filename.encode("utf-8")))
1024             else:
1025                 form.append('Content-Disposition: form-data; name="%s"' % name)
1026             form.append('')
1027             form.append(str(value))
1028             form.append(sep)
1029         form[-1] += "--"
1030         body = ""
1031         headers = {}
1032         if fields:
1033             body = "\r\n".join(form) + "\r\n"
1034             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1035         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1036
1037     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1038               use_helper=False):
1039         if use_helper:
1040             url = self.helper_webish_url + urlpath
1041         else:
1042             url = self.webish_url + urlpath
1043         return getPage(url, method="POST", postdata=body, headers=headers,
1044                        followRedirect=followRedirect)
1045
1046     def _test_web(self, res):
1047         base = self.webish_url
1048         public = "uri/" + self._root_directory_uri
1049         d = getPage(base)
1050         def _got_welcome(page):
1051             # XXX This test is oversensitive to formatting
1052             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1053             self.failUnless(expected in page,
1054                             "I didn't see the right 'connected storage servers'"
1055                             " message in: %s" % page
1056                             )
1057             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1058             self.failUnless(expected in page,
1059                             "I didn't see the right 'My nodeid' message "
1060                             "in: %s" % page)
1061             self.failUnless("Helper: 0 active uploads" in page)
1062         d.addCallback(_got_welcome)
1063         d.addCallback(self.log, "done with _got_welcome")
1064
1065         # get the welcome page from the node that uses the helper too
1066         d.addCallback(lambda res: getPage(self.helper_webish_url))
1067         def _got_welcome_helper(page):
1068             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1069                             page)
1070             self.failUnless("Not running helper" in page)
1071         d.addCallback(_got_welcome_helper)
1072
1073         d.addCallback(lambda res: getPage(base + public))
1074         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1075         def _got_subdir1(page):
1076             # there ought to be an href for our file
1077             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1078             self.failUnless(">mydata567</a>" in page)
1079         d.addCallback(_got_subdir1)
1080         d.addCallback(self.log, "done with _got_subdir1")
1081         d.addCallback(lambda res:
1082                       getPage(base + public + "/subdir1/mydata567"))
1083         def _got_data(page):
1084             self.failUnlessEqual(page, self.data)
1085         d.addCallback(_got_data)
1086
1087         # download from a URI embedded in a URL
1088         d.addCallback(self.log, "_get_from_uri")
1089         def _get_from_uri(res):
1090             return getPage(base + "uri/%s?filename=%s"
1091                            % (self.uri, "mydata567"))
1092         d.addCallback(_get_from_uri)
1093         def _got_from_uri(page):
1094             self.failUnlessEqual(page, self.data)
1095         d.addCallback(_got_from_uri)
1096
1097         # download from a URI embedded in a URL, second form
1098         d.addCallback(self.log, "_get_from_uri2")
1099         def _get_from_uri2(res):
1100             return getPage(base + "uri?uri=%s" % (self.uri,))
1101         d.addCallback(_get_from_uri2)
1102         d.addCallback(_got_from_uri)
1103
1104         # download from a bogus URI, make sure we get a reasonable error
1105         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1106         def _get_from_bogus_uri(res):
1107             d1 = getPage(base + "uri/%s?filename=%s"
1108                          % (self.mangle_uri(self.uri), "mydata567"))
1109             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1110                        "410")
1111             return d1
1112         d.addCallback(_get_from_bogus_uri)
1113         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1114
1115         # upload a file with PUT
1116         d.addCallback(self.log, "about to try PUT")
1117         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1118                                            "new.txt contents"))
1119         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1120         d.addCallback(self.failUnlessEqual, "new.txt contents")
1121         # and again with something large enough to use multiple segments,
1122         # and hopefully trigger pauseProducing too
1123         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1124                                            "big" * 500000)) # 1.5MB
1125         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1126         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1127
1128         # can we replace files in place?
1129         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1130                                            "NEWER contents"))
1131         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1132         d.addCallback(self.failUnlessEqual, "NEWER contents")
1133
1134         # test unlinked POST
1135         d.addCallback(lambda res: self.POST("uri", t="upload",
1136                                             file=("new.txt", "data" * 10000)))
1137         # and again using the helper, which exercises different upload-status
1138         # display code
1139         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1140                                             file=("foo.txt", "data2" * 10000)))
1141
1142         # check that the status page exists
1143         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1144         def _got_status(res):
1145             # find an interesting upload and download to look at. LIT files
1146             # are not interesting.
1147             h = self.clients[0].get_history()
1148             for ds in h.list_all_download_statuses():
1149                 if ds.get_size() > 200:
1150                     self._down_status = ds.get_counter()
1151             for us in h.list_all_upload_statuses():
1152                 if us.get_size() > 200:
1153                     self._up_status = us.get_counter()
1154             rs = list(h.list_all_retrieve_statuses())[0]
1155             self._retrieve_status = rs.get_counter()
1156             ps = list(h.list_all_publish_statuses())[0]
1157             self._publish_status = ps.get_counter()
1158             us = list(h.list_all_mapupdate_statuses())[0]
1159             self._update_status = us.get_counter()
1160
1161             # and that there are some upload- and download- status pages
1162             return self.GET("status/up-%d" % self._up_status)
1163         d.addCallback(_got_status)
1164         def _got_up(res):
1165             return self.GET("status/down-%d" % self._down_status)
1166         d.addCallback(_got_up)
1167         def _got_down(res):
1168             return self.GET("status/mapupdate-%d" % self._update_status)
1169         d.addCallback(_got_down)
1170         def _got_update(res):
1171             return self.GET("status/publish-%d" % self._publish_status)
1172         d.addCallback(_got_update)
1173         def _got_publish(res):
1174             return self.GET("status/retrieve-%d" % self._retrieve_status)
1175         d.addCallback(_got_publish)
1176
1177         # check that the helper status page exists
1178         d.addCallback(lambda res:
1179                       self.GET("helper_status", followRedirect=True))
1180         def _got_helper_status(res):
1181             self.failUnless("Bytes Fetched:" in res)
1182             # touch a couple of files in the helper's working directory to
1183             # exercise more code paths
1184             workdir = os.path.join(self.getdir("client0"), "helper")
1185             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1186             f = open(incfile, "wb")
1187             f.write("small file")
1188             f.close()
1189             then = time.time() - 86400*3
1190             now = time.time()
1191             os.utime(incfile, (now, then))
1192             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1193             f = open(encfile, "wb")
1194             f.write("less small file")
1195             f.close()
1196             os.utime(encfile, (now, then))
1197         d.addCallback(_got_helper_status)
1198         # and that the json form exists
1199         d.addCallback(lambda res:
1200                       self.GET("helper_status?t=json", followRedirect=True))
1201         def _got_helper_status_json(res):
1202             data = simplejson.loads(res)
1203             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1204                                  1)
1205             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1206             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1207             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1208                                  10)
1209             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1210             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1211             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1212                                  15)
1213         d.addCallback(_got_helper_status_json)
1214
1215         # and check that client[3] (which uses a helper but does not run one
1216         # itself) doesn't explode when you ask for its status
1217         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1218         def _got_non_helper_status(res):
1219             self.failUnless("Upload and Download Status" in res)
1220         d.addCallback(_got_non_helper_status)
1221
1222         # or for helper status with t=json
1223         d.addCallback(lambda res:
1224                       getPage(self.helper_webish_url + "helper_status?t=json"))
1225         def _got_non_helper_status_json(res):
1226             data = simplejson.loads(res)
1227             self.failUnlessEqual(data, {})
1228         d.addCallback(_got_non_helper_status_json)
1229
1230         # see if the statistics page exists
1231         d.addCallback(lambda res: self.GET("statistics"))
1232         def _got_stats(res):
1233             self.failUnless("Node Statistics" in res)
1234             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1235         d.addCallback(_got_stats)
1236         d.addCallback(lambda res: self.GET("statistics?t=json"))
1237         def _got_stats_json(res):
1238             data = simplejson.loads(res)
1239             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1240             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1241         d.addCallback(_got_stats_json)
1242
1243         # TODO: mangle the second segment of a file, to test errors that
1244         # occur after we've already sent some good data, which uses a
1245         # different error path.
1246
1247         # TODO: download a URI with a form
1248         # TODO: create a directory by using a form
1249         # TODO: upload by using a form on the directory page
1250         #    url = base + "somedir/subdir1/freeform_post!!upload"
1251         # TODO: delete a file by using a button on the directory page
1252
1253         return d
1254
1255     def _test_runner(self, res):
1256         # exercise some of the diagnostic tools in runner.py
1257
1258         # find a share
1259         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1260             if "storage" not in dirpath:
1261                 continue
1262             if not filenames:
1263                 continue
1264             pieces = dirpath.split(os.sep)
1265             if (len(pieces) >= 4
1266                 and pieces[-4] == "storage"
1267                 and pieces[-3] == "shares"):
1268                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1269                 # are sharefiles here
1270                 filename = os.path.join(dirpath, filenames[0])
1271                 # peek at the magic to see if it is a chk share
1272                 magic = open(filename, "rb").read(4)
1273                 if magic == '\x00\x00\x00\x01':
1274                     break
1275         else:
1276             self.fail("unable to find any uri_extension files in %s"
1277                       % self.basedir)
1278         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1279
1280         out,err = StringIO(), StringIO()
1281         rc = runner.runner(["debug", "dump-share", "--offsets",
1282                             filename],
1283                            stdout=out, stderr=err)
1284         output = out.getvalue()
1285         self.failUnlessEqual(rc, 0)
1286
1287         # we only upload a single file, so we can assert some things about
1288         # its size and shares.
1289         self.failUnless(("share filename: %s" % filename) in output)
1290         self.failUnless("size: %d\n" % len(self.data) in output)
1291         self.failUnless("num_segments: 1\n" in output)
1292         # segment_size is always a multiple of needed_shares
1293         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1294         self.failUnless("total_shares: 10\n" in output)
1295         # keys which are supposed to be present
1296         for key in ("size", "num_segments", "segment_size",
1297                     "needed_shares", "total_shares",
1298                     "codec_name", "codec_params", "tail_codec_params",
1299                     #"plaintext_hash", "plaintext_root_hash",
1300                     "crypttext_hash", "crypttext_root_hash",
1301                     "share_root_hash", "UEB_hash"):
1302             self.failUnless("%s: " % key in output, key)
1303         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1304
1305         # now use its storage index to find the other shares using the
1306         # 'find-shares' tool
1307         sharedir, shnum = os.path.split(filename)
1308         storagedir, storage_index_s = os.path.split(sharedir)
1309         out,err = StringIO(), StringIO()
1310         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1311         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1312         rc = runner.runner(cmd, stdout=out, stderr=err)
1313         self.failUnlessEqual(rc, 0)
1314         out.seek(0)
1315         sharefiles = [sfn.strip() for sfn in out.readlines()]
1316         self.failUnlessEqual(len(sharefiles), 10)
1317
1318         # also exercise the 'catalog-shares' tool
1319         out,err = StringIO(), StringIO()
1320         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1321         cmd = ["debug", "catalog-shares"] + nodedirs
1322         rc = runner.runner(cmd, stdout=out, stderr=err)
1323         self.failUnlessEqual(rc, 0)
1324         out.seek(0)
1325         descriptions = [sfn.strip() for sfn in out.readlines()]
1326         self.failUnlessEqual(len(descriptions), 30)
1327         matching = [line
1328                     for line in descriptions
1329                     if line.startswith("CHK %s " % storage_index_s)]
1330         self.failUnlessEqual(len(matching), 10)
1331
1332     def _test_control(self, res):
1333         # exercise the remote-control-the-client foolscap interfaces in
1334         # allmydata.control (mostly used for performance tests)
1335         c0 = self.clients[0]
1336         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1337         control_furl = open(control_furl_file, "r").read().strip()
1338         # it doesn't really matter which Tub we use to connect to the client,
1339         # so let's just use our IntroducerNode's
1340         d = self.introducer.tub.getReference(control_furl)
1341         d.addCallback(self._test_control2, control_furl_file)
1342         return d
1343     def _test_control2(self, rref, filename):
1344         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1345         downfile = os.path.join(self.basedir, "control.downfile")
1346         d.addCallback(lambda uri:
1347                       rref.callRemote("download_from_uri_to_file",
1348                                       uri, downfile))
1349         def _check(res):
1350             self.failUnlessEqual(res, downfile)
1351             data = open(downfile, "r").read()
1352             expected_data = open(filename, "r").read()
1353             self.failUnlessEqual(data, expected_data)
1354         d.addCallback(_check)
1355         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1356         if sys.platform == "linux2":
1357             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1358         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1359         return d
1360
1361     def _test_cli(self, res):
1362         # run various CLI commands (in a thread, since they use blocking
1363         # network calls)
1364
1365         private_uri = self._private_node.get_uri()
1366         client0_basedir = self.getdir("client0")
1367
1368         nodeargs = [
1369             "--node-directory", client0_basedir,
1370             ]
1371
1372         d = defer.succeed(None)
1373
1374         # for compatibility with earlier versions, private/root_dir.cap is
1375         # supposed to be treated as an alias named "tahoe:". Start by making
1376         # sure that works, before we add other aliases.
1377
1378         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1379         f = open(root_file, "w")
1380         f.write(private_uri)
1381         f.close()
1382
1383         def run(ignored, verb, *args, **kwargs):
1384             stdin = kwargs.get("stdin", "")
1385             newargs = [verb] + nodeargs + list(args)
1386             return self._run_cli(newargs, stdin=stdin)
1387
1388         def _check_ls((out,err), expected_children, unexpected_children=[]):
1389             self.failUnlessEqual(err, "")
1390             for s in expected_children:
1391                 self.failUnless(s in out, (s,out))
1392             for s in unexpected_children:
1393                 self.failIf(s in out, (s,out))
1394
1395         def _check_ls_root((out,err)):
1396             self.failUnless("personal" in out)
1397             self.failUnless("s2-ro" in out)
1398             self.failUnless("s2-rw" in out)
1399             self.failUnlessEqual(err, "")
1400
1401         # this should reference private_uri
1402         d.addCallback(run, "ls")
1403         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1404
1405         d.addCallback(run, "list-aliases")
1406         def _check_aliases_1((out,err)):
1407             self.failUnlessEqual(err, "")
1408             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1409         d.addCallback(_check_aliases_1)
1410
1411         # now that that's out of the way, remove root_dir.cap and work with
1412         # new files
1413         d.addCallback(lambda res: os.unlink(root_file))
1414         d.addCallback(run, "list-aliases")
1415         def _check_aliases_2((out,err)):
1416             self.failUnlessEqual(err, "")
1417             self.failUnlessEqual(out, "")
1418         d.addCallback(_check_aliases_2)
1419
1420         d.addCallback(run, "mkdir")
1421         def _got_dir( (out,err) ):
1422             self.failUnless(uri.from_string_dirnode(out.strip()))
1423             return out.strip()
1424         d.addCallback(_got_dir)
1425         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1426
1427         d.addCallback(run, "list-aliases")
1428         def _check_aliases_3((out,err)):
1429             self.failUnlessEqual(err, "")
1430             self.failUnless("tahoe: " in out)
1431         d.addCallback(_check_aliases_3)
1432
1433         def _check_empty_dir((out,err)):
1434             self.failUnlessEqual(out, "")
1435             self.failUnlessEqual(err, "")
1436         d.addCallback(run, "ls")
1437         d.addCallback(_check_empty_dir)
1438
1439         def _check_missing_dir((out,err)):
1440             # TODO: check that rc==2
1441             self.failUnlessEqual(out, "")
1442             self.failUnlessEqual(err, "No such file or directory\n")
1443         d.addCallback(run, "ls", "bogus")
1444         d.addCallback(_check_missing_dir)
1445
1446         files = []
1447         datas = []
1448         for i in range(10):
1449             fn = os.path.join(self.basedir, "file%d" % i)
1450             files.append(fn)
1451             data = "data to be uploaded: file%d\n" % i
1452             datas.append(data)
1453             open(fn,"wb").write(data)
1454
1455         def _check_stdout_against((out,err), filenum=None, data=None):
1456             self.failUnlessEqual(err, "")
1457             if filenum is not None:
1458                 self.failUnlessEqual(out, datas[filenum])
1459             if data is not None:
1460                 self.failUnlessEqual(out, data)
1461
1462         # test all both forms of put: from a file, and from stdin
1463         #  tahoe put bar FOO
1464         d.addCallback(run, "put", files[0], "tahoe-file0")
1465         def _put_out((out,err)):
1466             self.failUnless("URI:LIT:" in out, out)
1467             self.failUnless("201 Created" in err, err)
1468             uri0 = out.strip()
1469             return run(None, "get", uri0)
1470         d.addCallback(_put_out)
1471         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1472
1473         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1474         #  tahoe put bar tahoe:FOO
1475         d.addCallback(run, "put", files[2], "tahoe:file2")
1476         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1477         def _check_put_mutable((out,err)):
1478             self._mutable_file3_uri = out.strip()
1479         d.addCallback(_check_put_mutable)
1480         d.addCallback(run, "get", "tahoe:file3")
1481         d.addCallback(_check_stdout_against, 3)
1482
1483         #  tahoe put FOO
1484         STDIN_DATA = "This is the file to upload from stdin."
1485         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1486         #  tahoe put tahoe:FOO
1487         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1488                       stdin="Other file from stdin.")
1489
1490         d.addCallback(run, "ls")
1491         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1492                                   "tahoe-file-stdin", "from-stdin"])
1493         d.addCallback(run, "ls", "subdir")
1494         d.addCallback(_check_ls, ["tahoe-file1"])
1495
1496         # tahoe mkdir FOO
1497         d.addCallback(run, "mkdir", "subdir2")
1498         d.addCallback(run, "ls")
1499         # TODO: extract the URI, set an alias with it
1500         d.addCallback(_check_ls, ["subdir2"])
1501
1502         # tahoe get: (to stdin and to a file)
1503         d.addCallback(run, "get", "tahoe-file0")
1504         d.addCallback(_check_stdout_against, 0)
1505         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1506         d.addCallback(_check_stdout_against, 1)
1507         outfile0 = os.path.join(self.basedir, "outfile0")
1508         d.addCallback(run, "get", "file2", outfile0)
1509         def _check_outfile0((out,err)):
1510             data = open(outfile0,"rb").read()
1511             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1512         d.addCallback(_check_outfile0)
1513         outfile1 = os.path.join(self.basedir, "outfile0")
1514         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1515         def _check_outfile1((out,err)):
1516             data = open(outfile1,"rb").read()
1517             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1518         d.addCallback(_check_outfile1)
1519
1520         d.addCallback(run, "rm", "tahoe-file0")
1521         d.addCallback(run, "rm", "tahoe:file2")
1522         d.addCallback(run, "ls")
1523         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1524
1525         d.addCallback(run, "ls", "-l")
1526         def _check_ls_l((out,err)):
1527             lines = out.split("\n")
1528             for l in lines:
1529                 if "tahoe-file-stdin" in l:
1530                     self.failUnless(l.startswith("-r-- "), l)
1531                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1532                 if "file3" in l:
1533                     self.failUnless(l.startswith("-rw- "), l) # mutable
1534         d.addCallback(_check_ls_l)
1535
1536         d.addCallback(run, "ls", "--uri")
1537         def _check_ls_uri((out,err)):
1538             lines = out.split("\n")
1539             for l in lines:
1540                 if "file3" in l:
1541                     self.failUnless(self._mutable_file3_uri in l)
1542         d.addCallback(_check_ls_uri)
1543
1544         d.addCallback(run, "ls", "--readonly-uri")
1545         def _check_ls_rouri((out,err)):
1546             lines = out.split("\n")
1547             for l in lines:
1548                 if "file3" in l:
1549                     rw_uri = self._mutable_file3_uri
1550                     u = uri.from_string_mutable_filenode(rw_uri)
1551                     ro_uri = u.get_readonly().to_string()
1552                     self.failUnless(ro_uri in l)
1553         d.addCallback(_check_ls_rouri)
1554
1555
1556         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1557         d.addCallback(run, "ls")
1558         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1559
1560         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1561         d.addCallback(run, "ls")
1562         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1563
1564         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1565         d.addCallback(run, "ls")
1566         d.addCallback(_check_ls, ["file3", "file3-copy"])
1567         d.addCallback(run, "get", "tahoe:file3-copy")
1568         d.addCallback(_check_stdout_against, 3)
1569
1570         # copy from disk into tahoe
1571         d.addCallback(run, "cp", files[4], "tahoe:file4")
1572         d.addCallback(run, "ls")
1573         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1574         d.addCallback(run, "get", "tahoe:file4")
1575         d.addCallback(_check_stdout_against, 4)
1576
1577         # copy from tahoe into disk
1578         target_filename = os.path.join(self.basedir, "file-out")
1579         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1580         def _check_cp_out((out,err)):
1581             self.failUnless(os.path.exists(target_filename))
1582             got = open(target_filename,"rb").read()
1583             self.failUnlessEqual(got, datas[4])
1584         d.addCallback(_check_cp_out)
1585
1586         # copy from disk to disk (silly case)
1587         target2_filename = os.path.join(self.basedir, "file-out-copy")
1588         d.addCallback(run, "cp", target_filename, target2_filename)
1589         def _check_cp_out2((out,err)):
1590             self.failUnless(os.path.exists(target2_filename))
1591             got = open(target2_filename,"rb").read()
1592             self.failUnlessEqual(got, datas[4])
1593         d.addCallback(_check_cp_out2)
1594
1595         # copy from tahoe into disk, overwriting an existing file
1596         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1597         def _check_cp_out3((out,err)):
1598             self.failUnless(os.path.exists(target_filename))
1599             got = open(target_filename,"rb").read()
1600             self.failUnlessEqual(got, datas[3])
1601         d.addCallback(_check_cp_out3)
1602
1603         # copy from disk into tahoe, overwriting an existing immutable file
1604         d.addCallback(run, "cp", files[5], "tahoe:file4")
1605         d.addCallback(run, "ls")
1606         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1607         d.addCallback(run, "get", "tahoe:file4")
1608         d.addCallback(_check_stdout_against, 5)
1609
1610         # copy from disk into tahoe, overwriting an existing mutable file
1611         d.addCallback(run, "cp", files[5], "tahoe:file3")
1612         d.addCallback(run, "ls")
1613         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1614         d.addCallback(run, "get", "tahoe:file3")
1615         d.addCallback(_check_stdout_against, 5)
1616
1617         # recursive copy: setup
1618         dn = os.path.join(self.basedir, "dir1")
1619         os.makedirs(dn)
1620         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1621         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1622         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1623         sdn2 = os.path.join(dn, "subdir2")
1624         os.makedirs(sdn2)
1625         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1626         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1627
1628         # from disk into tahoe
1629         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1630         d.addCallback(run, "ls")
1631         d.addCallback(_check_ls, ["dir1"])
1632         d.addCallback(run, "ls", "dir1")
1633         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1634                       ["rfile4", "rfile5"])
1635         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1636         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1637                       ["rfile1", "rfile2", "rfile3"])
1638         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1639         d.addCallback(_check_stdout_against, data="rfile4")
1640
1641         # and back out again
1642         dn_copy = os.path.join(self.basedir, "dir1-copy")
1643         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1644         def _check_cp_r_out((out,err)):
1645             def _cmp(name):
1646                 old = open(os.path.join(dn, name), "rb").read()
1647                 newfn = os.path.join(dn_copy, name)
1648                 self.failUnless(os.path.exists(newfn))
1649                 new = open(newfn, "rb").read()
1650                 self.failUnlessEqual(old, new)
1651             _cmp("rfile1")
1652             _cmp("rfile2")
1653             _cmp("rfile3")
1654             _cmp(os.path.join("subdir2", "rfile4"))
1655             _cmp(os.path.join("subdir2", "rfile5"))
1656         d.addCallback(_check_cp_r_out)
1657
1658         # and copy it a second time, which ought to overwrite the same files
1659         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1660
1661         # and again, only writing filecaps
1662         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1663         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1664         def _check_capsonly((out,err)):
1665             # these should all be LITs
1666             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1667             y = uri.from_string_filenode(x)
1668             self.failUnlessEqual(y.data, "rfile4")
1669         d.addCallback(_check_capsonly)
1670
1671         # and tahoe-to-tahoe
1672         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1673         d.addCallback(run, "ls")
1674         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1675         d.addCallback(run, "ls", "dir1-copy")
1676         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1677                       ["rfile4", "rfile5"])
1678         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1679         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1680                       ["rfile1", "rfile2", "rfile3"])
1681         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1682         d.addCallback(_check_stdout_against, data="rfile4")
1683
1684         # and copy it a second time, which ought to overwrite the same files
1685         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1686
1687         # tahoe_ls doesn't currently handle the error correctly: it tries to
1688         # JSON-parse a traceback.
1689 ##         def _ls_missing(res):
1690 ##             argv = ["ls"] + nodeargs + ["bogus"]
1691 ##             return self._run_cli(argv)
1692 ##         d.addCallback(_ls_missing)
1693 ##         def _check_ls_missing((out,err)):
1694 ##             print "OUT", out
1695 ##             print "ERR", err
1696 ##             self.failUnlessEqual(err, "")
1697 ##         d.addCallback(_check_ls_missing)
1698
1699         return d
1700
1701     def _run_cli(self, argv, stdin=""):
1702         #print "CLI:", argv
1703         stdout, stderr = StringIO(), StringIO()
1704         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1705                                   stdin=StringIO(stdin),
1706                                   stdout=stdout, stderr=stderr)
1707         def _done(res):
1708             return stdout.getvalue(), stderr.getvalue()
1709         d.addCallback(_done)
1710         return d
1711
1712     def _test_checker(self, res):
1713         ut = upload.Data("too big to be literal" * 200, convergence=None)
1714         d = self._personal_node.add_file(u"big file", ut)
1715
1716         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1717         def _check_dirnode_results(r):
1718             self.failUnless(r.is_healthy())
1719         d.addCallback(_check_dirnode_results)
1720         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1721         d.addCallback(_check_dirnode_results)
1722
1723         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1724         def _got_chk_filenode(n):
1725             self.failUnless(isinstance(n, ImmutableFileNode))
1726             d = n.check(Monitor())
1727             def _check_filenode_results(r):
1728                 self.failUnless(r.is_healthy())
1729             d.addCallback(_check_filenode_results)
1730             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1731             d.addCallback(_check_filenode_results)
1732             return d
1733         d.addCallback(_got_chk_filenode)
1734
1735         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1736         def _got_lit_filenode(n):
1737             self.failUnless(isinstance(n, LiteralFileNode))
1738             d = n.check(Monitor())
1739             def _check_lit_filenode_results(r):
1740                 self.failUnlessEqual(r, None)
1741             d.addCallback(_check_lit_filenode_results)
1742             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1743             d.addCallback(_check_lit_filenode_results)
1744             return d
1745         d.addCallback(_got_lit_filenode)
1746         return d
1747