]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
Alter various unit tests to work with the new happy behavior
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 import allmydata
8 from allmydata import uri
9 from allmydata.storage.mutable import MutableShareFile
10 from allmydata.storage.server import si_a2b
11 from allmydata.immutable import offloaded, upload
12 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
13 from allmydata.util import idlib, mathutil
14 from allmydata.util import log, base32
15 from allmydata.util.consumer import MemoryConsumer, download_to_data
16 from allmydata.scripts import runner
17 from allmydata.interfaces import IDirectoryNode, IFileNode, \
18      NoSuchChildError, NoSharesError
19 from allmydata.monitor import Monitor
20 from allmydata.mutable.common import NotWriteableError
21 from allmydata.mutable import layout as mutable_layout
22 from foolscap.api import DeadReferenceError
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
26
27 from allmydata.test.common import SystemTestMixin
28
29 LARGE_DATA = """
30 This is some data to publish to the remote grid.., which needs to be large
31 enough to not fit inside a LIT uri.
32 """
33
34 class CountingDataUploadable(upload.Data):
35     bytes_read = 0
36     interrupt_after = None
37     interrupt_after_d = None
38
39     def read(self, length):
40         self.bytes_read += length
41         if self.interrupt_after is not None:
42             if self.bytes_read > self.interrupt_after:
43                 self.interrupt_after = None
44                 self.interrupt_after_d.callback(self)
45         return upload.Data.read(self, length)
46
47 class SystemTest(SystemTestMixin, unittest.TestCase):
48     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
49
50     def test_connections(self):
51         self.basedir = "system/SystemTest/test_connections"
52         d = self.set_up_nodes()
53         self.extra_node = None
54         d.addCallback(lambda res: self.add_extra_node(self.numclients))
55         def _check(extra_node):
56             self.extra_node = extra_node
57             for c in self.clients:
58                 all_peerids = c.get_storage_broker().get_all_serverids()
59                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
60                 sb = c.storage_broker
61                 permuted_peers = sb.get_servers_for_index("a")
62                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
63
64         d.addCallback(_check)
65         def _shutdown_extra_node(res):
66             if self.extra_node:
67                 return self.extra_node.stopService()
68             return res
69         d.addBoth(_shutdown_extra_node)
70         return d
71     # test_connections is subsumed by test_upload_and_download, and takes
72     # quite a while to run on a slow machine (because of all the TLS
73     # connections that must be established). If we ever rework the introducer
74     # code to such an extent that we're not sure if it works anymore, we can
75     # reinstate this test until it does.
76     del test_connections
77
78     def test_upload_and_download_random_key(self):
79         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
80         return self._test_upload_and_download(convergence=None)
81
82     def test_upload_and_download_convergent(self):
83         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
84         return self._test_upload_and_download(convergence="some convergence string")
85
86     def _test_upload_and_download(self, convergence):
87         # we use 4000 bytes of data, which will result in about 400k written
88         # to disk among all our simulated nodes
89         DATA = "Some data to upload\n" * 200
90         d = self.set_up_nodes()
91         def _check_connections(res):
92             for c in self.clients:
93                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
94                 all_peerids = c.get_storage_broker().get_all_serverids()
95                 self.failUnlessEqual(len(all_peerids), self.numclients)
96                 sb = c.storage_broker
97                 permuted_peers = sb.get_servers_for_index("a")
98                 self.failUnlessEqual(len(permuted_peers), self.numclients)
99         d.addCallback(_check_connections)
100
101         def _do_upload(res):
102             log.msg("UPLOADING")
103             u = self.clients[0].getServiceNamed("uploader")
104             self.uploader = u
105             # we crank the max segsize down to 1024b for the duration of this
106             # test, so we can exercise multiple segments. It is important
107             # that this is not a multiple of the segment size, so that the
108             # tail segment is not the same length as the others. This actualy
109             # gets rounded up to 1025 to be a multiple of the number of
110             # required shares (since we use 25 out of 100 FEC).
111             up = upload.Data(DATA, convergence=convergence)
112             up.max_segment_size = 1024
113             d1 = u.upload(up)
114             return d1
115         d.addCallback(_do_upload)
116         def _upload_done(results):
117             theuri = results.uri
118             log.msg("upload finished: uri is %s" % (theuri,))
119             self.uri = theuri
120             assert isinstance(self.uri, str), self.uri
121             self.cap = uri.from_string(self.uri)
122             self.n = self.clients[1].create_node_from_uri(self.uri)
123         d.addCallback(_upload_done)
124
125         def _upload_again(res):
126             # Upload again. If using convergent encryption then this ought to be
127             # short-circuited, however with the way we currently generate URIs
128             # (i.e. because they include the roothash), we have to do all of the
129             # encoding work, and only get to save on the upload part.
130             log.msg("UPLOADING AGAIN")
131             up = upload.Data(DATA, convergence=convergence)
132             up.max_segment_size = 1024
133             return self.uploader.upload(up)
134         d.addCallback(_upload_again)
135
136         def _download_to_data(res):
137             log.msg("DOWNLOADING")
138             return download_to_data(self.n)
139         d.addCallback(_download_to_data)
140         def _download_to_data_done(data):
141             log.msg("download finished")
142             self.failUnlessEqual(data, DATA)
143         d.addCallback(_download_to_data_done)
144
145         def _test_read(res):
146             n = self.clients[1].create_node_from_uri(self.uri)
147             d = download_to_data(n)
148             def _read_done(data):
149                 self.failUnlessEqual(data, DATA)
150             d.addCallback(_read_done)
151             d.addCallback(lambda ign:
152                           n.read(MemoryConsumer(), offset=1, size=4))
153             def _read_portion_done(mc):
154                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
155             d.addCallback(_read_portion_done)
156             d.addCallback(lambda ign:
157                           n.read(MemoryConsumer(), offset=2, size=None))
158             def _read_tail_done(mc):
159                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
160             d.addCallback(_read_tail_done)
161             d.addCallback(lambda ign:
162                           n.read(MemoryConsumer(), size=len(DATA)+1000))
163             def _read_too_much(mc):
164                 self.failUnlessEqual("".join(mc.chunks), DATA)
165             d.addCallback(_read_too_much)
166
167             return d
168         d.addCallback(_test_read)
169
170         def _test_bad_read(res):
171             bad_u = uri.from_string_filenode(self.uri)
172             bad_u.key = self.flip_bit(bad_u.key)
173             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
174             # this should cause an error during download
175
176             d = self.shouldFail2(NoSharesError, "'download bad node'",
177                                  None,
178                                  bad_n.read, MemoryConsumer(), offset=2)
179             return d
180         d.addCallback(_test_bad_read)
181
182         def _download_nonexistent_uri(res):
183             baduri = self.mangle_uri(self.uri)
184             badnode = self.clients[1].create_node_from_uri(baduri)
185             log.msg("about to download non-existent URI", level=log.UNUSUAL,
186                     facility="tahoe.tests")
187             d1 = download_to_data(badnode)
188             def _baduri_should_fail(res):
189                 log.msg("finished downloading non-existend URI",
190                         level=log.UNUSUAL, facility="tahoe.tests")
191                 self.failUnless(isinstance(res, Failure))
192                 self.failUnless(res.check(NoSharesError),
193                                 "expected NoSharesError, got %s" % res)
194             d1.addBoth(_baduri_should_fail)
195             return d1
196         d.addCallback(_download_nonexistent_uri)
197
198         # add a new node, which doesn't accept shares, and only uses the
199         # helper for upload.
200         d.addCallback(lambda res: self.add_extra_node(self.numclients,
201                                                       self.helper_furl,
202                                                       add_to_sparent=True))
203         def _added(extra_node):
204             self.extra_node = extra_node
205             self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
206         d.addCallback(_added)
207
208         HELPER_DATA = "Data that needs help to upload" * 1000
209         def _upload_with_helper(res):
210             u = upload.Data(HELPER_DATA, convergence=convergence)
211             d = self.extra_node.upload(u)
212             def _uploaded(results):
213                 n = self.clients[1].create_node_from_uri(results.uri)
214                 return download_to_data(n)
215             d.addCallback(_uploaded)
216             def _check(newdata):
217                 self.failUnlessEqual(newdata, HELPER_DATA)
218             d.addCallback(_check)
219             return d
220         d.addCallback(_upload_with_helper)
221
222         def _upload_duplicate_with_helper(res):
223             u = upload.Data(HELPER_DATA, convergence=convergence)
224             u.debug_stash_RemoteEncryptedUploadable = True
225             d = self.extra_node.upload(u)
226             def _uploaded(results):
227                 n = self.clients[1].create_node_from_uri(results.uri)
228                 return download_to_data(n)
229             d.addCallback(_uploaded)
230             def _check(newdata):
231                 self.failUnlessEqual(newdata, HELPER_DATA)
232                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
233                             "uploadable started uploading, should have been avoided")
234             d.addCallback(_check)
235             return d
236         if convergence is not None:
237             d.addCallback(_upload_duplicate_with_helper)
238
239         def _upload_resumable(res):
240             DATA = "Data that needs help to upload and gets interrupted" * 1000
241             u1 = CountingDataUploadable(DATA, convergence=convergence)
242             u2 = CountingDataUploadable(DATA, convergence=convergence)
243
244             # we interrupt the connection after about 5kB by shutting down
245             # the helper, then restartingit.
246             u1.interrupt_after = 5000
247             u1.interrupt_after_d = defer.Deferred()
248             u1.interrupt_after_d.addCallback(lambda res:
249                                              self.bounce_client(0))
250
251             # sneak into the helper and reduce its chunk size, so that our
252             # debug_interrupt will sever the connection on about the fifth
253             # chunk fetched. This makes sure that we've started to write the
254             # new shares before we abandon them, which exercises the
255             # abort/delete-partial-share code. TODO: find a cleaner way to do
256             # this. I know that this will affect later uses of the helper in
257             # this same test run, but I'm not currently worried about it.
258             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
259
260             d = self.extra_node.upload(u1)
261
262             def _should_not_finish(res):
263                 self.fail("interrupted upload should have failed, not finished"
264                           " with result %s" % (res,))
265             def _interrupted(f):
266                 f.trap(DeadReferenceError)
267
268                 # make sure we actually interrupted it before finishing the
269                 # file
270                 self.failUnless(u1.bytes_read < len(DATA),
271                                 "read %d out of %d total" % (u1.bytes_read,
272                                                              len(DATA)))
273
274                 log.msg("waiting for reconnect", level=log.NOISY,
275                         facility="tahoe.test.test_system")
276                 # now, we need to give the nodes a chance to notice that this
277                 # connection has gone away. When this happens, the storage
278                 # servers will be told to abort their uploads, removing the
279                 # partial shares. Unfortunately this involves TCP messages
280                 # going through the loopback interface, and we can't easily
281                 # predict how long that will take. If it were all local, we
282                 # could use fireEventually() to stall. Since we don't have
283                 # the right introduction hooks, the best we can do is use a
284                 # fixed delay. TODO: this is fragile.
285                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
286                 return u1.interrupt_after_d
287             d.addCallbacks(_should_not_finish, _interrupted)
288
289             def _disconnected(res):
290                 # check to make sure the storage servers aren't still hanging
291                 # on to the partial share: their incoming/ directories should
292                 # now be empty.
293                 log.msg("disconnected", level=log.NOISY,
294                         facility="tahoe.test.test_system")
295                 for i in range(self.numclients):
296                     incdir = os.path.join(self.getdir("client%d" % i),
297                                           "storage", "shares", "incoming")
298                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
299             d.addCallback(_disconnected)
300
301             # then we need to give the reconnector a chance to
302             # reestablish the connection to the helper.
303             d.addCallback(lambda res:
304                           log.msg("wait_for_connections", level=log.NOISY,
305                                   facility="tahoe.test.test_system"))
306             d.addCallback(lambda res: self.wait_for_connections())
307
308
309             d.addCallback(lambda res:
310                           log.msg("uploading again", level=log.NOISY,
311                                   facility="tahoe.test.test_system"))
312             d.addCallback(lambda res: self.extra_node.upload(u2))
313
314             def _uploaded(results):
315                 cap = results.uri
316                 log.msg("Second upload complete", level=log.NOISY,
317                         facility="tahoe.test.test_system")
318
319                 # this is really bytes received rather than sent, but it's
320                 # convenient and basically measures the same thing
321                 bytes_sent = results.ciphertext_fetched
322
323                 # We currently don't support resumption of upload if the data is
324                 # encrypted with a random key.  (Because that would require us
325                 # to store the key locally and re-use it on the next upload of
326                 # this file, which isn't a bad thing to do, but we currently
327                 # don't do it.)
328                 if convergence is not None:
329                     # Make sure we did not have to read the whole file the
330                     # second time around .
331                     self.failUnless(bytes_sent < len(DATA),
332                                 "resumption didn't save us any work:"
333                                 " read %d bytes out of %d total" %
334                                 (bytes_sent, len(DATA)))
335                 else:
336                     # Make sure we did have to read the whole file the second
337                     # time around -- because the one that we partially uploaded
338                     # earlier was encrypted with a different random key.
339                     self.failIf(bytes_sent < len(DATA),
340                                 "resumption saved us some work even though we were using random keys:"
341                                 " read %d bytes out of %d total" %
342                                 (bytes_sent, len(DATA)))
343                 n = self.clients[1].create_node_from_uri(cap)
344                 return download_to_data(n)
345             d.addCallback(_uploaded)
346
347             def _check(newdata):
348                 self.failUnlessEqual(newdata, DATA)
349                 # If using convergent encryption, then also check that the
350                 # helper has removed the temp file from its directories.
351                 if convergence is not None:
352                     basedir = os.path.join(self.getdir("client0"), "helper")
353                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
354                     self.failUnlessEqual(files, [])
355                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
356                     self.failUnlessEqual(files, [])
357             d.addCallback(_check)
358             return d
359         d.addCallback(_upload_resumable)
360
361         def _grab_stats(ignored):
362             # the StatsProvider doesn't normally publish a FURL:
363             # instead it passes a live reference to the StatsGatherer
364             # (if and when it connects). To exercise the remote stats
365             # interface, we manually publish client0's StatsProvider
366             # and use client1 to query it.
367             sp = self.clients[0].stats_provider
368             sp_furl = self.clients[0].tub.registerReference(sp)
369             d = self.clients[1].tub.getReference(sp_furl)
370             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
371             def _got_stats(stats):
372                 #print "STATS"
373                 #from pprint import pprint
374                 #pprint(stats)
375                 s = stats["stats"]
376                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
377                 c = stats["counters"]
378                 self.failUnless("storage_server.allocate" in c)
379             d.addCallback(_got_stats)
380             return d
381         d.addCallback(_grab_stats)
382
383         return d
384
385     def _find_shares(self, basedir):
386         shares = []
387         for (dirpath, dirnames, filenames) in os.walk(basedir):
388             if "storage" not in dirpath:
389                 continue
390             if not filenames:
391                 continue
392             pieces = dirpath.split(os.sep)
393             if (len(pieces) >= 5
394                 and pieces[-4] == "storage"
395                 and pieces[-3] == "shares"):
396                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
397                 # are sharefiles here
398                 assert pieces[-5].startswith("client")
399                 client_num = int(pieces[-5][-1])
400                 storage_index_s = pieces[-1]
401                 storage_index = si_a2b(storage_index_s)
402                 for sharename in filenames:
403                     shnum = int(sharename)
404                     filename = os.path.join(dirpath, sharename)
405                     data = (client_num, storage_index, filename, shnum)
406                     shares.append(data)
407         if not shares:
408             self.fail("unable to find any share files in %s" % basedir)
409         return shares
410
411     def _corrupt_mutable_share(self, filename, which):
412         msf = MutableShareFile(filename)
413         datav = msf.readv([ (0, 1000000) ])
414         final_share = datav[0]
415         assert len(final_share) < 1000000 # ought to be truncated
416         pieces = mutable_layout.unpack_share(final_share)
417         (seqnum, root_hash, IV, k, N, segsize, datalen,
418          verification_key, signature, share_hash_chain, block_hash_tree,
419          share_data, enc_privkey) = pieces
420
421         if which == "seqnum":
422             seqnum = seqnum + 15
423         elif which == "R":
424             root_hash = self.flip_bit(root_hash)
425         elif which == "IV":
426             IV = self.flip_bit(IV)
427         elif which == "segsize":
428             segsize = segsize + 15
429         elif which == "pubkey":
430             verification_key = self.flip_bit(verification_key)
431         elif which == "signature":
432             signature = self.flip_bit(signature)
433         elif which == "share_hash_chain":
434             nodenum = share_hash_chain.keys()[0]
435             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
436         elif which == "block_hash_tree":
437             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
438         elif which == "share_data":
439             share_data = self.flip_bit(share_data)
440         elif which == "encprivkey":
441             enc_privkey = self.flip_bit(enc_privkey)
442
443         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
444                                             segsize, datalen)
445         final_share = mutable_layout.pack_share(prefix,
446                                                 verification_key,
447                                                 signature,
448                                                 share_hash_chain,
449                                                 block_hash_tree,
450                                                 share_data,
451                                                 enc_privkey)
452         msf.writev( [(0, final_share)], None)
453
454
455     def test_mutable(self):
456         self.basedir = "system/SystemTest/test_mutable"
457         DATA = "initial contents go here."  # 25 bytes % 3 != 0
458         NEWDATA = "new contents yay"
459         NEWERDATA = "this is getting old"
460
461         d = self.set_up_nodes(use_key_generator=True)
462
463         def _create_mutable(res):
464             c = self.clients[0]
465             log.msg("starting create_mutable_file")
466             d1 = c.create_mutable_file(DATA)
467             def _done(res):
468                 log.msg("DONE: %s" % (res,))
469                 self._mutable_node_1 = res
470             d1.addCallback(_done)
471             return d1
472         d.addCallback(_create_mutable)
473
474         def _test_debug(res):
475             # find a share. It is important to run this while there is only
476             # one slot in the grid.
477             shares = self._find_shares(self.basedir)
478             (client_num, storage_index, filename, shnum) = shares[0]
479             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
480                     % filename)
481             log.msg(" for clients[%d]" % client_num)
482
483             out,err = StringIO(), StringIO()
484             rc = runner.runner(["debug", "dump-share", "--offsets",
485                                 filename],
486                                stdout=out, stderr=err)
487             output = out.getvalue()
488             self.failUnlessEqual(rc, 0)
489             try:
490                 self.failUnless("Mutable slot found:\n" in output)
491                 self.failUnless("share_type: SDMF\n" in output)
492                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
493                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
494                 self.failUnless(" num_extra_leases: 0\n" in output)
495                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
496                                 in output)
497                 self.failUnless(" SDMF contents:\n" in output)
498                 self.failUnless("  seqnum: 1\n" in output)
499                 self.failUnless("  required_shares: 3\n" in output)
500                 self.failUnless("  total_shares: 10\n" in output)
501                 self.failUnless("  segsize: 27\n" in output, (output, filename))
502                 self.failUnless("  datalen: 25\n" in output)
503                 # the exact share_hash_chain nodes depends upon the sharenum,
504                 # and is more of a hassle to compute than I want to deal with
505                 # now
506                 self.failUnless("  share_hash_chain: " in output)
507                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
508                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
509                             base32.b2a(storage_index))
510                 self.failUnless(expected in output)
511             except unittest.FailTest:
512                 print
513                 print "dump-share output was:"
514                 print output
515                 raise
516         d.addCallback(_test_debug)
517
518         # test retrieval
519
520         # first, let's see if we can use the existing node to retrieve the
521         # contents. This allows it to use the cached pubkey and maybe the
522         # latest-known sharemap.
523
524         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
525         def _check_download_1(res):
526             self.failUnlessEqual(res, DATA)
527             # now we see if we can retrieve the data from a new node,
528             # constructed using the URI of the original one. We do this test
529             # on the same client that uploaded the data.
530             uri = self._mutable_node_1.get_uri()
531             log.msg("starting retrieve1")
532             newnode = self.clients[0].create_node_from_uri(uri)
533             newnode_2 = self.clients[0].create_node_from_uri(uri)
534             self.failUnlessIdentical(newnode, newnode_2)
535             return newnode.download_best_version()
536         d.addCallback(_check_download_1)
537
538         def _check_download_2(res):
539             self.failUnlessEqual(res, DATA)
540             # same thing, but with a different client
541             uri = self._mutable_node_1.get_uri()
542             newnode = self.clients[1].create_node_from_uri(uri)
543             log.msg("starting retrieve2")
544             d1 = newnode.download_best_version()
545             d1.addCallback(lambda res: (res, newnode))
546             return d1
547         d.addCallback(_check_download_2)
548
549         def _check_download_3((res, newnode)):
550             self.failUnlessEqual(res, DATA)
551             # replace the data
552             log.msg("starting replace1")
553             d1 = newnode.overwrite(NEWDATA)
554             d1.addCallback(lambda res: newnode.download_best_version())
555             return d1
556         d.addCallback(_check_download_3)
557
558         def _check_download_4(res):
559             self.failUnlessEqual(res, NEWDATA)
560             # now create an even newer node and replace the data on it. This
561             # new node has never been used for download before.
562             uri = self._mutable_node_1.get_uri()
563             newnode1 = self.clients[2].create_node_from_uri(uri)
564             newnode2 = self.clients[3].create_node_from_uri(uri)
565             self._newnode3 = self.clients[3].create_node_from_uri(uri)
566             log.msg("starting replace2")
567             d1 = newnode1.overwrite(NEWERDATA)
568             d1.addCallback(lambda res: newnode2.download_best_version())
569             return d1
570         d.addCallback(_check_download_4)
571
572         def _check_download_5(res):
573             log.msg("finished replace2")
574             self.failUnlessEqual(res, NEWERDATA)
575         d.addCallback(_check_download_5)
576
577         def _corrupt_shares(res):
578             # run around and flip bits in all but k of the shares, to test
579             # the hash checks
580             shares = self._find_shares(self.basedir)
581             ## sort by share number
582             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
583             where = dict([ (shnum, filename)
584                            for (client_num, storage_index, filename, shnum)
585                            in shares ])
586             assert len(where) == 10 # this test is designed for 3-of-10
587             for shnum, filename in where.items():
588                 # shares 7,8,9 are left alone. read will check
589                 # (share_hash_chain, block_hash_tree, share_data). New
590                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
591                 # segsize, signature).
592                 if shnum == 0:
593                     # read: this will trigger "pubkey doesn't match
594                     # fingerprint".
595                     self._corrupt_mutable_share(filename, "pubkey")
596                     self._corrupt_mutable_share(filename, "encprivkey")
597                 elif shnum == 1:
598                     # triggers "signature is invalid"
599                     self._corrupt_mutable_share(filename, "seqnum")
600                 elif shnum == 2:
601                     # triggers "signature is invalid"
602                     self._corrupt_mutable_share(filename, "R")
603                 elif shnum == 3:
604                     # triggers "signature is invalid"
605                     self._corrupt_mutable_share(filename, "segsize")
606                 elif shnum == 4:
607                     self._corrupt_mutable_share(filename, "share_hash_chain")
608                 elif shnum == 5:
609                     self._corrupt_mutable_share(filename, "block_hash_tree")
610                 elif shnum == 6:
611                     self._corrupt_mutable_share(filename, "share_data")
612                 # other things to correct: IV, signature
613                 # 7,8,9 are left alone
614
615                 # note that initial_query_count=5 means that we'll hit the
616                 # first 5 servers in effectively random order (based upon
617                 # response time), so we won't necessarily ever get a "pubkey
618                 # doesn't match fingerprint" error (if we hit shnum>=1 before
619                 # shnum=0, we pull the pubkey from there). To get repeatable
620                 # specific failures, we need to set initial_query_count=1,
621                 # but of course that will change the sequencing behavior of
622                 # the retrieval process. TODO: find a reasonable way to make
623                 # this a parameter, probably when we expand this test to test
624                 # for one failure mode at a time.
625
626                 # when we retrieve this, we should get three signature
627                 # failures (where we've mangled seqnum, R, and segsize). The
628                 # pubkey mangling
629         d.addCallback(_corrupt_shares)
630
631         d.addCallback(lambda res: self._newnode3.download_best_version())
632         d.addCallback(_check_download_5)
633
634         def _check_empty_file(res):
635             # make sure we can create empty files, this usually screws up the
636             # segsize math
637             d1 = self.clients[2].create_mutable_file("")
638             d1.addCallback(lambda newnode: newnode.download_best_version())
639             d1.addCallback(lambda res: self.failUnlessEqual("", res))
640             return d1
641         d.addCallback(_check_empty_file)
642
643         d.addCallback(lambda res: self.clients[0].create_dirnode())
644         def _created_dirnode(dnode):
645             log.msg("_created_dirnode(%s)" % (dnode,))
646             d1 = dnode.list()
647             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
648             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
649             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
650             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
651             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
652             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
653             d1.addCallback(lambda res: dnode.build_manifest().when_done())
654             d1.addCallback(lambda res:
655                            self.failUnlessEqual(len(res["manifest"]), 1))
656             return d1
657         d.addCallback(_created_dirnode)
658
659         def wait_for_c3_kg_conn():
660             return self.clients[3]._key_generator is not None
661         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
662
663         def check_kg_poolsize(junk, size_delta):
664             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
665                                  self.key_generator_svc.key_generator.pool_size + size_delta)
666
667         d.addCallback(check_kg_poolsize, 0)
668         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
669         d.addCallback(check_kg_poolsize, -1)
670         d.addCallback(lambda junk: self.clients[3].create_dirnode())
671         d.addCallback(check_kg_poolsize, -2)
672         # use_helper induces use of clients[3], which is the using-key_gen client
673         d.addCallback(lambda junk:
674                       self.POST("uri?t=mkdir&name=george", use_helper=True))
675         d.addCallback(check_kg_poolsize, -3)
676
677         return d
678
679     def flip_bit(self, good):
680         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
681
682     def mangle_uri(self, gooduri):
683         # change the key, which changes the storage index, which means we'll
684         # be asking about the wrong file, so nobody will have any shares
685         u = uri.from_string(gooduri)
686         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
687                             uri_extension_hash=u.uri_extension_hash,
688                             needed_shares=u.needed_shares,
689                             total_shares=u.total_shares,
690                             size=u.size)
691         return u2.to_string()
692
693     # TODO: add a test which mangles the uri_extension_hash instead, and
694     # should fail due to not being able to get a valid uri_extension block.
695     # Also a test which sneakily mangles the uri_extension block to change
696     # some of the validation data, so it will fail in the post-download phase
697     # when the file's crypttext integrity check fails. Do the same thing for
698     # the key, which should cause the download to fail the post-download
699     # plaintext_hash check.
700
701     def test_filesystem(self):
702         self.basedir = "system/SystemTest/test_filesystem"
703         self.data = LARGE_DATA
704         d = self.set_up_nodes(use_stats_gatherer=True)
705         def _new_happy_semantics(ign):
706             for c in self.clients:
707                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
708         d.addCallback(_new_happy_semantics)
709         d.addCallback(self._test_introweb)
710         d.addCallback(self.log, "starting publish")
711         d.addCallback(self._do_publish1)
712         d.addCallback(self._test_runner)
713         d.addCallback(self._do_publish2)
714         # at this point, we have the following filesystem (where "R" denotes
715         # self._root_directory_uri):
716         # R
717         # R/subdir1
718         # R/subdir1/mydata567
719         # R/subdir1/subdir2/
720         # R/subdir1/subdir2/mydata992
721
722         d.addCallback(lambda res: self.bounce_client(0))
723         d.addCallback(self.log, "bounced client0")
724
725         d.addCallback(self._check_publish1)
726         d.addCallback(self.log, "did _check_publish1")
727         d.addCallback(self._check_publish2)
728         d.addCallback(self.log, "did _check_publish2")
729         d.addCallback(self._do_publish_private)
730         d.addCallback(self.log, "did _do_publish_private")
731         # now we also have (where "P" denotes a new dir):
732         #  P/personal/sekrit data
733         #  P/s2-rw -> /subdir1/subdir2/
734         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
735         d.addCallback(self._check_publish_private)
736         d.addCallback(self.log, "did _check_publish_private")
737         d.addCallback(self._test_web)
738         d.addCallback(self._test_control)
739         d.addCallback(self._test_cli)
740         # P now has four top-level children:
741         # P/personal/sekrit data
742         # P/s2-ro/
743         # P/s2-rw/
744         # P/test_put/  (empty)
745         d.addCallback(self._test_checker)
746         return d
747
748     def _test_introweb(self, res):
749         d = getPage(self.introweb_url, method="GET", followRedirect=True)
750         def _check(res):
751             try:
752                 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
753                                 in res)
754                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
755                 self.failUnless("Subscription Summary: storage: 5" in res)
756             except unittest.FailTest:
757                 print
758                 print "GET %s output was:" % self.introweb_url
759                 print res
760                 raise
761         d.addCallback(_check)
762         d.addCallback(lambda res:
763                       getPage(self.introweb_url + "?t=json",
764                               method="GET", followRedirect=True))
765         def _check_json(res):
766             data = simplejson.loads(res)
767             try:
768                 self.failUnlessEqual(data["subscription_summary"],
769                                      {"storage": 5})
770                 self.failUnlessEqual(data["announcement_summary"],
771                                      {"storage": 5, "stub_client": 5})
772                 self.failUnlessEqual(data["announcement_distinct_hosts"],
773                                      {"storage": 1, "stub_client": 1})
774             except unittest.FailTest:
775                 print
776                 print "GET %s?t=json output was:" % self.introweb_url
777                 print res
778                 raise
779         d.addCallback(_check_json)
780         return d
781
782     def _do_publish1(self, res):
783         ut = upload.Data(self.data, convergence=None)
784         c0 = self.clients[0]
785         d = c0.create_dirnode()
786         def _made_root(new_dirnode):
787             self._root_directory_uri = new_dirnode.get_uri()
788             return c0.create_node_from_uri(self._root_directory_uri)
789         d.addCallback(_made_root)
790         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
791         def _made_subdir1(subdir1_node):
792             self._subdir1_node = subdir1_node
793             d1 = subdir1_node.add_file(u"mydata567", ut)
794             d1.addCallback(self.log, "publish finished")
795             def _stash_uri(filenode):
796                 self.uri = filenode.get_uri()
797                 assert isinstance(self.uri, str), (self.uri, filenode)
798             d1.addCallback(_stash_uri)
799             return d1
800         d.addCallback(_made_subdir1)
801         return d
802
803     def _do_publish2(self, res):
804         ut = upload.Data(self.data, convergence=None)
805         d = self._subdir1_node.create_subdirectory(u"subdir2")
806         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
807         return d
808
809     def log(self, res, *args, **kwargs):
810         # print "MSG: %s  RES: %s" % (msg, args)
811         log.msg(*args, **kwargs)
812         return res
813
814     def _do_publish_private(self, res):
815         self.smalldata = "sssh, very secret stuff"
816         ut = upload.Data(self.smalldata, convergence=None)
817         d = self.clients[0].create_dirnode()
818         d.addCallback(self.log, "GOT private directory")
819         def _got_new_dir(privnode):
820             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
821             d1 = privnode.create_subdirectory(u"personal")
822             d1.addCallback(self.log, "made P/personal")
823             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
824             d1.addCallback(self.log, "made P/personal/sekrit data")
825             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
826             def _got_s2(s2node):
827                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
828                                       s2node.get_readonly_uri())
829                 d2.addCallback(lambda node:
830                                privnode.set_uri(u"s2-ro",
831                                                 s2node.get_readonly_uri(),
832                                                 s2node.get_readonly_uri()))
833                 return d2
834             d1.addCallback(_got_s2)
835             d1.addCallback(lambda res: privnode)
836             return d1
837         d.addCallback(_got_new_dir)
838         return d
839
840     def _check_publish1(self, res):
841         # this one uses the iterative API
842         c1 = self.clients[1]
843         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
844         d.addCallback(self.log, "check_publish1 got /")
845         d.addCallback(lambda root: root.get(u"subdir1"))
846         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
847         d.addCallback(lambda filenode: download_to_data(filenode))
848         d.addCallback(self.log, "get finished")
849         def _get_done(data):
850             self.failUnlessEqual(data, self.data)
851         d.addCallback(_get_done)
852         return d
853
854     def _check_publish2(self, res):
855         # this one uses the path-based API
856         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
857         d = rootnode.get_child_at_path(u"subdir1")
858         d.addCallback(lambda dirnode:
859                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
860         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
861         d.addCallback(lambda filenode: download_to_data(filenode))
862         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
863
864         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
865         def _got_filenode(filenode):
866             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
867             assert fnode == filenode
868         d.addCallback(_got_filenode)
869         return d
870
871     def _check_publish_private(self, resnode):
872         # this one uses the path-based API
873         self._private_node = resnode
874
875         d = self._private_node.get_child_at_path(u"personal")
876         def _got_personal(personal):
877             self._personal_node = personal
878             return personal
879         d.addCallback(_got_personal)
880
881         d.addCallback(lambda dirnode:
882                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
883         def get_path(path):
884             return self._private_node.get_child_at_path(path)
885
886         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
887         d.addCallback(lambda filenode: download_to_data(filenode))
888         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
889         d.addCallback(lambda res: get_path(u"s2-rw"))
890         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
891         d.addCallback(lambda res: get_path(u"s2-ro"))
892         def _got_s2ro(dirnode):
893             self.failUnless(dirnode.is_mutable(), dirnode)
894             self.failUnless(dirnode.is_readonly(), dirnode)
895             d1 = defer.succeed(None)
896             d1.addCallback(lambda res: dirnode.list())
897             d1.addCallback(self.log, "dirnode.list")
898
899             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
900
901             d1.addCallback(self.log, "doing add_file(ro)")
902             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
903             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
904
905             d1.addCallback(self.log, "doing get(ro)")
906             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
907             d1.addCallback(lambda filenode:
908                            self.failUnless(IFileNode.providedBy(filenode)))
909
910             d1.addCallback(self.log, "doing delete(ro)")
911             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
912
913             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
914
915             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
916
917             personal = self._personal_node
918             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
919
920             d1.addCallback(self.log, "doing move_child_to(ro)2")
921             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
922
923             d1.addCallback(self.log, "finished with _got_s2ro")
924             return d1
925         d.addCallback(_got_s2ro)
926         def _got_home(dummy):
927             home = self._private_node
928             personal = self._personal_node
929             d1 = defer.succeed(None)
930             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
931             d1.addCallback(lambda res:
932                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
933
934             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
935             d1.addCallback(lambda res:
936                            home.move_child_to(u"sekrit", home, u"sekrit data"))
937
938             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
939             d1.addCallback(lambda res:
940                            home.move_child_to(u"sekrit data", personal))
941
942             d1.addCallback(lambda res: home.build_manifest().when_done())
943             d1.addCallback(self.log, "manifest")
944             #  five items:
945             # P/
946             # P/personal/
947             # P/personal/sekrit data
948             # P/s2-rw  (same as P/s2-ro)
949             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
950             d1.addCallback(lambda res:
951                            self.failUnlessEqual(len(res["manifest"]), 5))
952             d1.addCallback(lambda res: home.start_deep_stats().when_done())
953             def _check_stats(stats):
954                 expected = {"count-immutable-files": 1,
955                             "count-mutable-files": 0,
956                             "count-literal-files": 1,
957                             "count-files": 2,
958                             "count-directories": 3,
959                             "size-immutable-files": 112,
960                             "size-literal-files": 23,
961                             #"size-directories": 616, # varies
962                             #"largest-directory": 616,
963                             "largest-directory-children": 3,
964                             "largest-immutable-file": 112,
965                             }
966                 for k,v in expected.iteritems():
967                     self.failUnlessEqual(stats[k], v,
968                                          "stats[%s] was %s, not %s" %
969                                          (k, stats[k], v))
970                 self.failUnless(stats["size-directories"] > 1300,
971                                 stats["size-directories"])
972                 self.failUnless(stats["largest-directory"] > 800,
973                                 stats["largest-directory"])
974                 self.failUnlessEqual(stats["size-files-histogram"],
975                                      [ (11, 31, 1), (101, 316, 1) ])
976             d1.addCallback(_check_stats)
977             return d1
978         d.addCallback(_got_home)
979         return d
980
981     def shouldFail(self, res, expected_failure, which, substring=None):
982         if isinstance(res, Failure):
983             res.trap(expected_failure)
984             if substring:
985                 self.failUnless(substring in str(res),
986                                 "substring '%s' not in '%s'"
987                                 % (substring, str(res)))
988         else:
989             self.fail("%s was supposed to raise %s, not get '%s'" %
990                       (which, expected_failure, res))
991
992     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
993         assert substring is None or isinstance(substring, str)
994         d = defer.maybeDeferred(callable, *args, **kwargs)
995         def done(res):
996             if isinstance(res, Failure):
997                 res.trap(expected_failure)
998                 if substring:
999                     self.failUnless(substring in str(res),
1000                                     "substring '%s' not in '%s'"
1001                                     % (substring, str(res)))
1002             else:
1003                 self.fail("%s was supposed to raise %s, not get '%s'" %
1004                           (which, expected_failure, res))
1005         d.addBoth(done)
1006         return d
1007
1008     def PUT(self, urlpath, data):
1009         url = self.webish_url + urlpath
1010         return getPage(url, method="PUT", postdata=data)
1011
1012     def GET(self, urlpath, followRedirect=False):
1013         url = self.webish_url + urlpath
1014         return getPage(url, method="GET", followRedirect=followRedirect)
1015
1016     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1017         sepbase = "boogabooga"
1018         sep = "--" + sepbase
1019         form = []
1020         form.append(sep)
1021         form.append('Content-Disposition: form-data; name="_charset"')
1022         form.append('')
1023         form.append('UTF-8')
1024         form.append(sep)
1025         for name, value in fields.iteritems():
1026             if isinstance(value, tuple):
1027                 filename, value = value
1028                 form.append('Content-Disposition: form-data; name="%s"; '
1029                             'filename="%s"' % (name, filename.encode("utf-8")))
1030             else:
1031                 form.append('Content-Disposition: form-data; name="%s"' % name)
1032             form.append('')
1033             form.append(str(value))
1034             form.append(sep)
1035         form[-1] += "--"
1036         body = ""
1037         headers = {}
1038         if fields:
1039             body = "\r\n".join(form) + "\r\n"
1040             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1041         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1042
1043     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1044               use_helper=False):
1045         if use_helper:
1046             url = self.helper_webish_url + urlpath
1047         else:
1048             url = self.webish_url + urlpath
1049         return getPage(url, method="POST", postdata=body, headers=headers,
1050                        followRedirect=followRedirect)
1051
1052     def _test_web(self, res):
1053         base = self.webish_url
1054         public = "uri/" + self._root_directory_uri
1055         d = getPage(base)
1056         def _got_welcome(page):
1057             # XXX This test is oversensitive to formatting
1058             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1059             self.failUnless(expected in page,
1060                             "I didn't see the right 'connected storage servers'"
1061                             " message in: %s" % page
1062                             )
1063             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1064             self.failUnless(expected in page,
1065                             "I didn't see the right 'My nodeid' message "
1066                             "in: %s" % page)
1067             self.failUnless("Helper: 0 active uploads" in page)
1068         d.addCallback(_got_welcome)
1069         d.addCallback(self.log, "done with _got_welcome")
1070
1071         # get the welcome page from the node that uses the helper too
1072         d.addCallback(lambda res: getPage(self.helper_webish_url))
1073         def _got_welcome_helper(page):
1074             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1075                             page)
1076             self.failUnless("Not running helper" in page)
1077         d.addCallback(_got_welcome_helper)
1078
1079         d.addCallback(lambda res: getPage(base + public))
1080         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1081         def _got_subdir1(page):
1082             # there ought to be an href for our file
1083             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1084             self.failUnless(">mydata567</a>" in page)
1085         d.addCallback(_got_subdir1)
1086         d.addCallback(self.log, "done with _got_subdir1")
1087         d.addCallback(lambda res:
1088                       getPage(base + public + "/subdir1/mydata567"))
1089         def _got_data(page):
1090             self.failUnlessEqual(page, self.data)
1091         d.addCallback(_got_data)
1092
1093         # download from a URI embedded in a URL
1094         d.addCallback(self.log, "_get_from_uri")
1095         def _get_from_uri(res):
1096             return getPage(base + "uri/%s?filename=%s"
1097                            % (self.uri, "mydata567"))
1098         d.addCallback(_get_from_uri)
1099         def _got_from_uri(page):
1100             self.failUnlessEqual(page, self.data)
1101         d.addCallback(_got_from_uri)
1102
1103         # download from a URI embedded in a URL, second form
1104         d.addCallback(self.log, "_get_from_uri2")
1105         def _get_from_uri2(res):
1106             return getPage(base + "uri?uri=%s" % (self.uri,))
1107         d.addCallback(_get_from_uri2)
1108         d.addCallback(_got_from_uri)
1109
1110         # download from a bogus URI, make sure we get a reasonable error
1111         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1112         def _get_from_bogus_uri(res):
1113             d1 = getPage(base + "uri/%s?filename=%s"
1114                          % (self.mangle_uri(self.uri), "mydata567"))
1115             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1116                        "410")
1117             return d1
1118         d.addCallback(_get_from_bogus_uri)
1119         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1120
1121         # upload a file with PUT
1122         d.addCallback(self.log, "about to try PUT")
1123         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1124                                            "new.txt contents"))
1125         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1126         d.addCallback(self.failUnlessEqual, "new.txt contents")
1127         # and again with something large enough to use multiple segments,
1128         # and hopefully trigger pauseProducing too
1129         def _new_happy_semantics(ign):
1130             for c in self.clients:
1131                 # these get reset somewhere? Whatever.
1132                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1133         d.addCallback(_new_happy_semantics)
1134         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1135                                            "big" * 500000)) # 1.5MB
1136         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1137         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1138
1139         # can we replace files in place?
1140         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1141                                            "NEWER contents"))
1142         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1143         d.addCallback(self.failUnlessEqual, "NEWER contents")
1144
1145         # test unlinked POST
1146         d.addCallback(lambda res: self.POST("uri", t="upload",
1147                                             file=("new.txt", "data" * 10000)))
1148         # and again using the helper, which exercises different upload-status
1149         # display code
1150         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1151                                             file=("foo.txt", "data2" * 10000)))
1152
1153         # check that the status page exists
1154         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1155         def _got_status(res):
1156             # find an interesting upload and download to look at. LIT files
1157             # are not interesting.
1158             h = self.clients[0].get_history()
1159             for ds in h.list_all_download_statuses():
1160                 if ds.get_size() > 200:
1161                     self._down_status = ds.get_counter()
1162             for us in h.list_all_upload_statuses():
1163                 if us.get_size() > 200:
1164                     self._up_status = us.get_counter()
1165             rs = list(h.list_all_retrieve_statuses())[0]
1166             self._retrieve_status = rs.get_counter()
1167             ps = list(h.list_all_publish_statuses())[0]
1168             self._publish_status = ps.get_counter()
1169             us = list(h.list_all_mapupdate_statuses())[0]
1170             self._update_status = us.get_counter()
1171
1172             # and that there are some upload- and download- status pages
1173             return self.GET("status/up-%d" % self._up_status)
1174         d.addCallback(_got_status)
1175         def _got_up(res):
1176             return self.GET("status/down-%d" % self._down_status)
1177         d.addCallback(_got_up)
1178         def _got_down(res):
1179             return self.GET("status/mapupdate-%d" % self._update_status)
1180         d.addCallback(_got_down)
1181         def _got_update(res):
1182             return self.GET("status/publish-%d" % self._publish_status)
1183         d.addCallback(_got_update)
1184         def _got_publish(res):
1185             return self.GET("status/retrieve-%d" % self._retrieve_status)
1186         d.addCallback(_got_publish)
1187
1188         # check that the helper status page exists
1189         d.addCallback(lambda res:
1190                       self.GET("helper_status", followRedirect=True))
1191         def _got_helper_status(res):
1192             self.failUnless("Bytes Fetched:" in res)
1193             # touch a couple of files in the helper's working directory to
1194             # exercise more code paths
1195             workdir = os.path.join(self.getdir("client0"), "helper")
1196             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1197             f = open(incfile, "wb")
1198             f.write("small file")
1199             f.close()
1200             then = time.time() - 86400*3
1201             now = time.time()
1202             os.utime(incfile, (now, then))
1203             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1204             f = open(encfile, "wb")
1205             f.write("less small file")
1206             f.close()
1207             os.utime(encfile, (now, then))
1208         d.addCallback(_got_helper_status)
1209         # and that the json form exists
1210         d.addCallback(lambda res:
1211                       self.GET("helper_status?t=json", followRedirect=True))
1212         def _got_helper_status_json(res):
1213             data = simplejson.loads(res)
1214             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1215                                  1)
1216             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1217             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1218             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1219                                  10)
1220             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1221             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1222             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1223                                  15)
1224         d.addCallback(_got_helper_status_json)
1225
1226         # and check that client[3] (which uses a helper but does not run one
1227         # itself) doesn't explode when you ask for its status
1228         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1229         def _got_non_helper_status(res):
1230             self.failUnless("Upload and Download Status" in res)
1231         d.addCallback(_got_non_helper_status)
1232
1233         # or for helper status with t=json
1234         d.addCallback(lambda res:
1235                       getPage(self.helper_webish_url + "helper_status?t=json"))
1236         def _got_non_helper_status_json(res):
1237             data = simplejson.loads(res)
1238             self.failUnlessEqual(data, {})
1239         d.addCallback(_got_non_helper_status_json)
1240
1241         # see if the statistics page exists
1242         d.addCallback(lambda res: self.GET("statistics"))
1243         def _got_stats(res):
1244             self.failUnless("Node Statistics" in res)
1245             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1246         d.addCallback(_got_stats)
1247         d.addCallback(lambda res: self.GET("statistics?t=json"))
1248         def _got_stats_json(res):
1249             data = simplejson.loads(res)
1250             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1251             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1252         d.addCallback(_got_stats_json)
1253
1254         # TODO: mangle the second segment of a file, to test errors that
1255         # occur after we've already sent some good data, which uses a
1256         # different error path.
1257
1258         # TODO: download a URI with a form
1259         # TODO: create a directory by using a form
1260         # TODO: upload by using a form on the directory page
1261         #    url = base + "somedir/subdir1/freeform_post!!upload"
1262         # TODO: delete a file by using a button on the directory page
1263
1264         return d
1265
1266     def _test_runner(self, res):
1267         # exercise some of the diagnostic tools in runner.py
1268
1269         # find a share
1270         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1271             if "storage" not in dirpath:
1272                 continue
1273             if not filenames:
1274                 continue
1275             pieces = dirpath.split(os.sep)
1276             if (len(pieces) >= 4
1277                 and pieces[-4] == "storage"
1278                 and pieces[-3] == "shares"):
1279                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1280                 # are sharefiles here
1281                 filename = os.path.join(dirpath, filenames[0])
1282                 # peek at the magic to see if it is a chk share
1283                 magic = open(filename, "rb").read(4)
1284                 if magic == '\x00\x00\x00\x01':
1285                     break
1286         else:
1287             self.fail("unable to find any uri_extension files in %s"
1288                       % self.basedir)
1289         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1290
1291         out,err = StringIO(), StringIO()
1292         rc = runner.runner(["debug", "dump-share", "--offsets",
1293                             filename],
1294                            stdout=out, stderr=err)
1295         output = out.getvalue()
1296         self.failUnlessEqual(rc, 0)
1297
1298         # we only upload a single file, so we can assert some things about
1299         # its size and shares.
1300         self.failUnless(("share filename: %s" % filename) in output)
1301         self.failUnless("size: %d\n" % len(self.data) in output)
1302         self.failUnless("num_segments: 1\n" in output)
1303         # segment_size is always a multiple of needed_shares
1304         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1305         self.failUnless("total_shares: 10\n" in output)
1306         # keys which are supposed to be present
1307         for key in ("size", "num_segments", "segment_size",
1308                     "needed_shares", "total_shares",
1309                     "codec_name", "codec_params", "tail_codec_params",
1310                     #"plaintext_hash", "plaintext_root_hash",
1311                     "crypttext_hash", "crypttext_root_hash",
1312                     "share_root_hash", "UEB_hash"):
1313             self.failUnless("%s: " % key in output, key)
1314         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1315
1316         # now use its storage index to find the other shares using the
1317         # 'find-shares' tool
1318         sharedir, shnum = os.path.split(filename)
1319         storagedir, storage_index_s = os.path.split(sharedir)
1320         out,err = StringIO(), StringIO()
1321         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1322         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1323         rc = runner.runner(cmd, stdout=out, stderr=err)
1324         self.failUnlessEqual(rc, 0)
1325         out.seek(0)
1326         sharefiles = [sfn.strip() for sfn in out.readlines()]
1327         self.failUnlessEqual(len(sharefiles), 10)
1328
1329         # also exercise the 'catalog-shares' tool
1330         out,err = StringIO(), StringIO()
1331         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1332         cmd = ["debug", "catalog-shares"] + nodedirs
1333         rc = runner.runner(cmd, stdout=out, stderr=err)
1334         self.failUnlessEqual(rc, 0)
1335         out.seek(0)
1336         descriptions = [sfn.strip() for sfn in out.readlines()]
1337         self.failUnlessEqual(len(descriptions), 30)
1338         matching = [line
1339                     for line in descriptions
1340                     if line.startswith("CHK %s " % storage_index_s)]
1341         self.failUnlessEqual(len(matching), 10)
1342
1343     def _test_control(self, res):
1344         # exercise the remote-control-the-client foolscap interfaces in
1345         # allmydata.control (mostly used for performance tests)
1346         c0 = self.clients[0]
1347         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1348         control_furl = open(control_furl_file, "r").read().strip()
1349         # it doesn't really matter which Tub we use to connect to the client,
1350         # so let's just use our IntroducerNode's
1351         d = self.introducer.tub.getReference(control_furl)
1352         d.addCallback(self._test_control2, control_furl_file)
1353         return d
1354     def _test_control2(self, rref, filename):
1355         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1356         downfile = os.path.join(self.basedir, "control.downfile")
1357         d.addCallback(lambda uri:
1358                       rref.callRemote("download_from_uri_to_file",
1359                                       uri, downfile))
1360         def _check(res):
1361             self.failUnlessEqual(res, downfile)
1362             data = open(downfile, "r").read()
1363             expected_data = open(filename, "r").read()
1364             self.failUnlessEqual(data, expected_data)
1365         d.addCallback(_check)
1366         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1367         if sys.platform == "linux2":
1368             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1369         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1370         return d
1371
1372     def _test_cli(self, res):
1373         # run various CLI commands (in a thread, since they use blocking
1374         # network calls)
1375
1376         private_uri = self._private_node.get_uri()
1377         client0_basedir = self.getdir("client0")
1378
1379         nodeargs = [
1380             "--node-directory", client0_basedir,
1381             ]
1382
1383         d = defer.succeed(None)
1384
1385         # for compatibility with earlier versions, private/root_dir.cap is
1386         # supposed to be treated as an alias named "tahoe:". Start by making
1387         # sure that works, before we add other aliases.
1388
1389         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1390         f = open(root_file, "w")
1391         f.write(private_uri)
1392         f.close()
1393
1394         def run(ignored, verb, *args, **kwargs):
1395             stdin = kwargs.get("stdin", "")
1396             newargs = [verb] + nodeargs + list(args)
1397             return self._run_cli(newargs, stdin=stdin)
1398
1399         def _check_ls((out,err), expected_children, unexpected_children=[]):
1400             self.failUnlessEqual(err, "")
1401             for s in expected_children:
1402                 self.failUnless(s in out, (s,out))
1403             for s in unexpected_children:
1404                 self.failIf(s in out, (s,out))
1405
1406         def _check_ls_root((out,err)):
1407             self.failUnless("personal" in out)
1408             self.failUnless("s2-ro" in out)
1409             self.failUnless("s2-rw" in out)
1410             self.failUnlessEqual(err, "")
1411
1412         # this should reference private_uri
1413         d.addCallback(run, "ls")
1414         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1415
1416         d.addCallback(run, "list-aliases")
1417         def _check_aliases_1((out,err)):
1418             self.failUnlessEqual(err, "")
1419             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1420         d.addCallback(_check_aliases_1)
1421
1422         # now that that's out of the way, remove root_dir.cap and work with
1423         # new files
1424         d.addCallback(lambda res: os.unlink(root_file))
1425         d.addCallback(run, "list-aliases")
1426         def _check_aliases_2((out,err)):
1427             self.failUnlessEqual(err, "")
1428             self.failUnlessEqual(out, "")
1429         d.addCallback(_check_aliases_2)
1430
1431         d.addCallback(run, "mkdir")
1432         def _got_dir( (out,err) ):
1433             self.failUnless(uri.from_string_dirnode(out.strip()))
1434             return out.strip()
1435         d.addCallback(_got_dir)
1436         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1437
1438         d.addCallback(run, "list-aliases")
1439         def _check_aliases_3((out,err)):
1440             self.failUnlessEqual(err, "")
1441             self.failUnless("tahoe: " in out)
1442         d.addCallback(_check_aliases_3)
1443
1444         def _check_empty_dir((out,err)):
1445             self.failUnlessEqual(out, "")
1446             self.failUnlessEqual(err, "")
1447         d.addCallback(run, "ls")
1448         d.addCallback(_check_empty_dir)
1449
1450         def _check_missing_dir((out,err)):
1451             # TODO: check that rc==2
1452             self.failUnlessEqual(out, "")
1453             self.failUnlessEqual(err, "No such file or directory\n")
1454         d.addCallback(run, "ls", "bogus")
1455         d.addCallback(_check_missing_dir)
1456
1457         files = []
1458         datas = []
1459         for i in range(10):
1460             fn = os.path.join(self.basedir, "file%d" % i)
1461             files.append(fn)
1462             data = "data to be uploaded: file%d\n" % i
1463             datas.append(data)
1464             open(fn,"wb").write(data)
1465
1466         def _check_stdout_against((out,err), filenum=None, data=None):
1467             self.failUnlessEqual(err, "")
1468             if filenum is not None:
1469                 self.failUnlessEqual(out, datas[filenum])
1470             if data is not None:
1471                 self.failUnlessEqual(out, data)
1472
1473         # test all both forms of put: from a file, and from stdin
1474         #  tahoe put bar FOO
1475         d.addCallback(run, "put", files[0], "tahoe-file0")
1476         def _put_out((out,err)):
1477             self.failUnless("URI:LIT:" in out, out)
1478             self.failUnless("201 Created" in err, err)
1479             uri0 = out.strip()
1480             return run(None, "get", uri0)
1481         d.addCallback(_put_out)
1482         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1483
1484         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1485         #  tahoe put bar tahoe:FOO
1486         d.addCallback(run, "put", files[2], "tahoe:file2")
1487         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1488         def _check_put_mutable((out,err)):
1489             self._mutable_file3_uri = out.strip()
1490         d.addCallback(_check_put_mutable)
1491         d.addCallback(run, "get", "tahoe:file3")
1492         d.addCallback(_check_stdout_against, 3)
1493
1494         #  tahoe put FOO
1495         STDIN_DATA = "This is the file to upload from stdin."
1496         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1497         #  tahoe put tahoe:FOO
1498         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1499                       stdin="Other file from stdin.")
1500
1501         d.addCallback(run, "ls")
1502         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1503                                   "tahoe-file-stdin", "from-stdin"])
1504         d.addCallback(run, "ls", "subdir")
1505         d.addCallback(_check_ls, ["tahoe-file1"])
1506
1507         # tahoe mkdir FOO
1508         d.addCallback(run, "mkdir", "subdir2")
1509         d.addCallback(run, "ls")
1510         # TODO: extract the URI, set an alias with it
1511         d.addCallback(_check_ls, ["subdir2"])
1512
1513         # tahoe get: (to stdin and to a file)
1514         d.addCallback(run, "get", "tahoe-file0")
1515         d.addCallback(_check_stdout_against, 0)
1516         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1517         d.addCallback(_check_stdout_against, 1)
1518         outfile0 = os.path.join(self.basedir, "outfile0")
1519         d.addCallback(run, "get", "file2", outfile0)
1520         def _check_outfile0((out,err)):
1521             data = open(outfile0,"rb").read()
1522             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1523         d.addCallback(_check_outfile0)
1524         outfile1 = os.path.join(self.basedir, "outfile0")
1525         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1526         def _check_outfile1((out,err)):
1527             data = open(outfile1,"rb").read()
1528             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1529         d.addCallback(_check_outfile1)
1530
1531         d.addCallback(run, "rm", "tahoe-file0")
1532         d.addCallback(run, "rm", "tahoe:file2")
1533         d.addCallback(run, "ls")
1534         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1535
1536         d.addCallback(run, "ls", "-l")
1537         def _check_ls_l((out,err)):
1538             lines = out.split("\n")
1539             for l in lines:
1540                 if "tahoe-file-stdin" in l:
1541                     self.failUnless(l.startswith("-r-- "), l)
1542                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1543                 if "file3" in l:
1544                     self.failUnless(l.startswith("-rw- "), l) # mutable
1545         d.addCallback(_check_ls_l)
1546
1547         d.addCallback(run, "ls", "--uri")
1548         def _check_ls_uri((out,err)):
1549             lines = out.split("\n")
1550             for l in lines:
1551                 if "file3" in l:
1552                     self.failUnless(self._mutable_file3_uri in l)
1553         d.addCallback(_check_ls_uri)
1554
1555         d.addCallback(run, "ls", "--readonly-uri")
1556         def _check_ls_rouri((out,err)):
1557             lines = out.split("\n")
1558             for l in lines:
1559                 if "file3" in l:
1560                     rw_uri = self._mutable_file3_uri
1561                     u = uri.from_string_mutable_filenode(rw_uri)
1562                     ro_uri = u.get_readonly().to_string()
1563                     self.failUnless(ro_uri in l)
1564         d.addCallback(_check_ls_rouri)
1565
1566
1567         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1568         d.addCallback(run, "ls")
1569         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1570
1571         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1572         d.addCallback(run, "ls")
1573         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1574
1575         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1576         d.addCallback(run, "ls")
1577         d.addCallback(_check_ls, ["file3", "file3-copy"])
1578         d.addCallback(run, "get", "tahoe:file3-copy")
1579         d.addCallback(_check_stdout_against, 3)
1580
1581         # copy from disk into tahoe
1582         d.addCallback(run, "cp", files[4], "tahoe:file4")
1583         d.addCallback(run, "ls")
1584         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1585         d.addCallback(run, "get", "tahoe:file4")
1586         d.addCallback(_check_stdout_against, 4)
1587
1588         # copy from tahoe into disk
1589         target_filename = os.path.join(self.basedir, "file-out")
1590         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1591         def _check_cp_out((out,err)):
1592             self.failUnless(os.path.exists(target_filename))
1593             got = open(target_filename,"rb").read()
1594             self.failUnlessEqual(got, datas[4])
1595         d.addCallback(_check_cp_out)
1596
1597         # copy from disk to disk (silly case)
1598         target2_filename = os.path.join(self.basedir, "file-out-copy")
1599         d.addCallback(run, "cp", target_filename, target2_filename)
1600         def _check_cp_out2((out,err)):
1601             self.failUnless(os.path.exists(target2_filename))
1602             got = open(target2_filename,"rb").read()
1603             self.failUnlessEqual(got, datas[4])
1604         d.addCallback(_check_cp_out2)
1605
1606         # copy from tahoe into disk, overwriting an existing file
1607         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1608         def _check_cp_out3((out,err)):
1609             self.failUnless(os.path.exists(target_filename))
1610             got = open(target_filename,"rb").read()
1611             self.failUnlessEqual(got, datas[3])
1612         d.addCallback(_check_cp_out3)
1613
1614         # copy from disk into tahoe, overwriting an existing immutable file
1615         d.addCallback(run, "cp", files[5], "tahoe:file4")
1616         d.addCallback(run, "ls")
1617         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1618         d.addCallback(run, "get", "tahoe:file4")
1619         d.addCallback(_check_stdout_against, 5)
1620
1621         # copy from disk into tahoe, overwriting an existing mutable file
1622         d.addCallback(run, "cp", files[5], "tahoe:file3")
1623         d.addCallback(run, "ls")
1624         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1625         d.addCallback(run, "get", "tahoe:file3")
1626         d.addCallback(_check_stdout_against, 5)
1627
1628         # recursive copy: setup
1629         dn = os.path.join(self.basedir, "dir1")
1630         os.makedirs(dn)
1631         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1632         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1633         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1634         sdn2 = os.path.join(dn, "subdir2")
1635         os.makedirs(sdn2)
1636         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1637         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1638
1639         # from disk into tahoe
1640         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1641         d.addCallback(run, "ls")
1642         d.addCallback(_check_ls, ["dir1"])
1643         d.addCallback(run, "ls", "dir1")
1644         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1645                       ["rfile4", "rfile5"])
1646         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1647         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1648                       ["rfile1", "rfile2", "rfile3"])
1649         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1650         d.addCallback(_check_stdout_against, data="rfile4")
1651
1652         # and back out again
1653         dn_copy = os.path.join(self.basedir, "dir1-copy")
1654         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1655         def _check_cp_r_out((out,err)):
1656             def _cmp(name):
1657                 old = open(os.path.join(dn, name), "rb").read()
1658                 newfn = os.path.join(dn_copy, name)
1659                 self.failUnless(os.path.exists(newfn))
1660                 new = open(newfn, "rb").read()
1661                 self.failUnlessEqual(old, new)
1662             _cmp("rfile1")
1663             _cmp("rfile2")
1664             _cmp("rfile3")
1665             _cmp(os.path.join("subdir2", "rfile4"))
1666             _cmp(os.path.join("subdir2", "rfile5"))
1667         d.addCallback(_check_cp_r_out)
1668
1669         # and copy it a second time, which ought to overwrite the same files
1670         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1671
1672         # and again, only writing filecaps
1673         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1674         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1675         def _check_capsonly((out,err)):
1676             # these should all be LITs
1677             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1678             y = uri.from_string_filenode(x)
1679             self.failUnlessEqual(y.data, "rfile4")
1680         d.addCallback(_check_capsonly)
1681
1682         # and tahoe-to-tahoe
1683         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1684         d.addCallback(run, "ls")
1685         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1686         d.addCallback(run, "ls", "dir1-copy")
1687         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1688                       ["rfile4", "rfile5"])
1689         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1690         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1691                       ["rfile1", "rfile2", "rfile3"])
1692         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1693         d.addCallback(_check_stdout_against, data="rfile4")
1694
1695         # and copy it a second time, which ought to overwrite the same files
1696         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1697
1698         # tahoe_ls doesn't currently handle the error correctly: it tries to
1699         # JSON-parse a traceback.
1700 ##         def _ls_missing(res):
1701 ##             argv = ["ls"] + nodeargs + ["bogus"]
1702 ##             return self._run_cli(argv)
1703 ##         d.addCallback(_ls_missing)
1704 ##         def _check_ls_missing((out,err)):
1705 ##             print "OUT", out
1706 ##             print "ERR", err
1707 ##             self.failUnlessEqual(err, "")
1708 ##         d.addCallback(_check_ls_missing)
1709
1710         return d
1711
1712     def _run_cli(self, argv, stdin=""):
1713         #print "CLI:", argv
1714         stdout, stderr = StringIO(), StringIO()
1715         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1716                                   stdin=StringIO(stdin),
1717                                   stdout=stdout, stderr=stderr)
1718         def _done(res):
1719             return stdout.getvalue(), stderr.getvalue()
1720         d.addCallback(_done)
1721         return d
1722
1723     def _test_checker(self, res):
1724         ut = upload.Data("too big to be literal" * 200, convergence=None)
1725         d = self._personal_node.add_file(u"big file", ut)
1726
1727         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1728         def _check_dirnode_results(r):
1729             self.failUnless(r.is_healthy())
1730         d.addCallback(_check_dirnode_results)
1731         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1732         d.addCallback(_check_dirnode_results)
1733
1734         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1735         def _got_chk_filenode(n):
1736             self.failUnless(isinstance(n, ImmutableFileNode))
1737             d = n.check(Monitor())
1738             def _check_filenode_results(r):
1739                 self.failUnless(r.is_healthy())
1740             d.addCallback(_check_filenode_results)
1741             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1742             d.addCallback(_check_filenode_results)
1743             return d
1744         d.addCallback(_got_chk_filenode)
1745
1746         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1747         def _got_lit_filenode(n):
1748             self.failUnless(isinstance(n, LiteralFileNode))
1749             d = n.check(Monitor())
1750             def _check_lit_filenode_results(r):
1751                 self.failUnlessEqual(r, None)
1752             d.addCallback(_check_lit_filenode_results)
1753             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1754             d.addCallback(_check_lit_filenode_results)
1755             return d
1756         d.addCallback(_got_lit_filenode)
1757         return d
1758