]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
Modify markup of Tahoe web pages to be more amenable to styling; some minor changes...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, re, simplejson
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri
12 from allmydata.storage.mutable import MutableShareFile
13 from allmydata.storage.server import si_a2b
14 from allmydata.immutable import download, filenode, offloaded, upload
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.scripts import runner
18 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
19      NoSuchChildError, NotEnoughSharesError
20 from allmydata.monitor import Monitor
21 from allmydata.mutable.common import NotMutableError
22 from allmydata.mutable import layout as mutable_layout
23 from foolscap.api import DeadReferenceError
24 from twisted.python.failure import Failure
25 from twisted.web.client import getPage
26 from twisted.web.error import Error
27
28 from allmydata.test.common import SystemTestMixin, MemoryConsumer, \
29      download_to_data
30
31 LARGE_DATA = """
32 This is some data to publish to the virtual drive, which needs to be large
33 enough to not fit inside a LIT uri.
34 """
35
36 class CountingDataUploadable(upload.Data):
37     bytes_read = 0
38     interrupt_after = None
39     interrupt_after_d = None
40
41     def read(self, length):
42         self.bytes_read += length
43         if self.interrupt_after is not None:
44             if self.bytes_read > self.interrupt_after:
45                 self.interrupt_after = None
46                 self.interrupt_after_d.callback(self)
47         return upload.Data.read(self, length)
48
49 class GrabEverythingConsumer:
50     implements(IConsumer)
51
52     def __init__(self):
53         self.contents = ""
54
55     def registerProducer(self, producer, streaming):
56         assert streaming
57         assert IPushProducer.providedBy(producer)
58
59     def write(self, data):
60         self.contents += data
61
62     def unregisterProducer(self):
63         pass
64
65 class SystemTest(SystemTestMixin, unittest.TestCase):
66
67     def test_connections(self):
68         self.basedir = "system/SystemTest/test_connections"
69         d = self.set_up_nodes()
70         self.extra_node = None
71         d.addCallback(lambda res: self.add_extra_node(self.numclients))
72         def _check(extra_node):
73             self.extra_node = extra_node
74             for c in self.clients:
75                 all_peerids = list(c.get_all_peerids())
76                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
77                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
78                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
79
80         d.addCallback(_check)
81         def _shutdown_extra_node(res):
82             if self.extra_node:
83                 return self.extra_node.stopService()
84             return res
85         d.addBoth(_shutdown_extra_node)
86         return d
87     test_connections.timeout = 300
88     # test_connections is subsumed by test_upload_and_download, and takes
89     # quite a while to run on a slow machine (because of all the TLS
90     # connections that must be established). If we ever rework the introducer
91     # code to such an extent that we're not sure if it works anymore, we can
92     # reinstate this test until it does.
93     del test_connections
94
95     def test_upload_and_download_random_key(self):
96         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
97         return self._test_upload_and_download(convergence=None)
98     test_upload_and_download_random_key.timeout = 4800
99
100     def test_upload_and_download_convergent(self):
101         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
102         return self._test_upload_and_download(convergence="some convergence string")
103     test_upload_and_download_convergent.timeout = 4800
104
105     def _test_upload_and_download(self, convergence):
106         # we use 4000 bytes of data, which will result in about 400k written
107         # to disk among all our simulated nodes
108         DATA = "Some data to upload\n" * 200
109         d = self.set_up_nodes()
110         def _check_connections(res):
111             for c in self.clients:
112                 all_peerids = list(c.get_all_peerids())
113                 self.failUnlessEqual(len(all_peerids), self.numclients)
114                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
115                 self.failUnlessEqual(len(permuted_peers), self.numclients)
116         d.addCallback(_check_connections)
117
118         def _do_upload(res):
119             log.msg("UPLOADING")
120             u = self.clients[0].getServiceNamed("uploader")
121             self.uploader = u
122             # we crank the max segsize down to 1024b for the duration of this
123             # test, so we can exercise multiple segments. It is important
124             # that this is not a multiple of the segment size, so that the
125             # tail segment is not the same length as the others. This actualy
126             # gets rounded up to 1025 to be a multiple of the number of
127             # required shares (since we use 25 out of 100 FEC).
128             up = upload.Data(DATA, convergence=convergence)
129             up.max_segment_size = 1024
130             d1 = u.upload(up)
131             return d1
132         d.addCallback(_do_upload)
133         def _upload_done(results):
134             theuri = results.uri
135             log.msg("upload finished: uri is %s" % (theuri,))
136             self.uri = theuri
137             assert isinstance(self.uri, str), self.uri
138             dl = self.clients[1].getServiceNamed("downloader")
139             self.downloader = dl
140         d.addCallback(_upload_done)
141
142         def _upload_again(res):
143             # Upload again. If using convergent encryption then this ought to be
144             # short-circuited, however with the way we currently generate URIs
145             # (i.e. because they include the roothash), we have to do all of the
146             # encoding work, and only get to save on the upload part.
147             log.msg("UPLOADING AGAIN")
148             up = upload.Data(DATA, convergence=convergence)
149             up.max_segment_size = 1024
150             d1 = self.uploader.upload(up)
151         d.addCallback(_upload_again)
152
153         def _download_to_data(res):
154             log.msg("DOWNLOADING")
155             return self.downloader.download_to_data(self.uri)
156         d.addCallback(_download_to_data)
157         def _download_to_data_done(data):
158             log.msg("download finished")
159             self.failUnlessEqual(data, DATA)
160         d.addCallback(_download_to_data_done)
161
162         target_filename = os.path.join(self.basedir, "download.target")
163         def _download_to_filename(res):
164             return self.downloader.download_to_filename(self.uri,
165                                                         target_filename)
166         d.addCallback(_download_to_filename)
167         def _download_to_filename_done(res):
168             newdata = open(target_filename, "rb").read()
169             self.failUnlessEqual(newdata, DATA)
170         d.addCallback(_download_to_filename_done)
171
172         target_filename2 = os.path.join(self.basedir, "download.target2")
173         def _download_to_filehandle(res):
174             fh = open(target_filename2, "wb")
175             return self.downloader.download_to_filehandle(self.uri, fh)
176         d.addCallback(_download_to_filehandle)
177         def _download_to_filehandle_done(fh):
178             fh.close()
179             newdata = open(target_filename2, "rb").read()
180             self.failUnlessEqual(newdata, DATA)
181         d.addCallback(_download_to_filehandle_done)
182
183         consumer = GrabEverythingConsumer()
184         ct = download.ConsumerAdapter(consumer)
185         d.addCallback(lambda res:
186                       self.downloader.download(self.uri, ct))
187         def _download_to_consumer_done(ign):
188             self.failUnlessEqual(consumer.contents, DATA)
189         d.addCallback(_download_to_consumer_done)
190
191         def _test_read(res):
192             n = self.clients[1].create_node_from_uri(self.uri)
193             d = download_to_data(n)
194             def _read_done(data):
195                 self.failUnlessEqual(data, DATA)
196             d.addCallback(_read_done)
197             d.addCallback(lambda ign:
198                           n.read(MemoryConsumer(), offset=1, size=4))
199             def _read_portion_done(mc):
200                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
201             d.addCallback(_read_portion_done)
202             d.addCallback(lambda ign:
203                           n.read(MemoryConsumer(), offset=2, size=None))
204             def _read_tail_done(mc):
205                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
206             d.addCallback(_read_tail_done)
207             d.addCallback(lambda ign:
208                           n.read(MemoryConsumer(), size=len(DATA)+1000))
209             def _read_too_much(mc):
210                 self.failUnlessEqual("".join(mc.chunks), DATA)
211             d.addCallback(_read_too_much)
212
213             return d
214         d.addCallback(_test_read)
215
216         def _test_bad_read(res):
217             bad_u = uri.from_string_filenode(self.uri)
218             bad_u.key = self.flip_bit(bad_u.key)
219             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
220             # this should cause an error during download
221
222             d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
223                                  None,
224                                  bad_n.read, MemoryConsumer(), offset=2)
225             return d
226         d.addCallback(_test_bad_read)
227
228         def _download_nonexistent_uri(res):
229             baduri = self.mangle_uri(self.uri)
230             log.msg("about to download non-existent URI", level=log.UNUSUAL,
231                     facility="tahoe.tests")
232             d1 = self.downloader.download_to_data(baduri)
233             def _baduri_should_fail(res):
234                 log.msg("finished downloading non-existend URI",
235                         level=log.UNUSUAL, facility="tahoe.tests")
236                 self.failUnless(isinstance(res, Failure))
237                 self.failUnless(res.check(NotEnoughSharesError),
238                                 "expected NotEnoughSharesError, got %s" % res)
239                 # TODO: files that have zero peers should get a special kind
240                 # of NotEnoughSharesError, which can be used to suggest that
241                 # the URI might be wrong or that they've never uploaded the
242                 # file in the first place.
243             d1.addBoth(_baduri_should_fail)
244             return d1
245         d.addCallback(_download_nonexistent_uri)
246
247         # add a new node, which doesn't accept shares, and only uses the
248         # helper for upload.
249         d.addCallback(lambda res: self.add_extra_node(self.numclients,
250                                                       self.helper_furl,
251                                                       add_to_sparent=True))
252         def _added(extra_node):
253             self.extra_node = extra_node
254         d.addCallback(_added)
255
256         HELPER_DATA = "Data that needs help to upload" * 1000
257         def _upload_with_helper(res):
258             u = upload.Data(HELPER_DATA, convergence=convergence)
259             d = self.extra_node.upload(u)
260             def _uploaded(results):
261                 uri = results.uri
262                 return self.downloader.download_to_data(uri)
263             d.addCallback(_uploaded)
264             def _check(newdata):
265                 self.failUnlessEqual(newdata, HELPER_DATA)
266             d.addCallback(_check)
267             return d
268         d.addCallback(_upload_with_helper)
269
270         def _upload_duplicate_with_helper(res):
271             u = upload.Data(HELPER_DATA, convergence=convergence)
272             u.debug_stash_RemoteEncryptedUploadable = True
273             d = self.extra_node.upload(u)
274             def _uploaded(results):
275                 uri = results.uri
276                 return self.downloader.download_to_data(uri)
277             d.addCallback(_uploaded)
278             def _check(newdata):
279                 self.failUnlessEqual(newdata, HELPER_DATA)
280                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
281                             "uploadable started uploading, should have been avoided")
282             d.addCallback(_check)
283             return d
284         if convergence is not None:
285             d.addCallback(_upload_duplicate_with_helper)
286
287         def _upload_resumable(res):
288             DATA = "Data that needs help to upload and gets interrupted" * 1000
289             u1 = CountingDataUploadable(DATA, convergence=convergence)
290             u2 = CountingDataUploadable(DATA, convergence=convergence)
291
292             # we interrupt the connection after about 5kB by shutting down
293             # the helper, then restartingit.
294             u1.interrupt_after = 5000
295             u1.interrupt_after_d = defer.Deferred()
296             u1.interrupt_after_d.addCallback(lambda res:
297                                              self.bounce_client(0))
298
299             # sneak into the helper and reduce its chunk size, so that our
300             # debug_interrupt will sever the connection on about the fifth
301             # chunk fetched. This makes sure that we've started to write the
302             # new shares before we abandon them, which exercises the
303             # abort/delete-partial-share code. TODO: find a cleaner way to do
304             # this. I know that this will affect later uses of the helper in
305             # this same test run, but I'm not currently worried about it.
306             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
307
308             d = self.extra_node.upload(u1)
309
310             def _should_not_finish(res):
311                 self.fail("interrupted upload should have failed, not finished"
312                           " with result %s" % (res,))
313             def _interrupted(f):
314                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
315
316                 # make sure we actually interrupted it before finishing the
317                 # file
318                 self.failUnless(u1.bytes_read < len(DATA),
319                                 "read %d out of %d total" % (u1.bytes_read,
320                                                              len(DATA)))
321
322                 log.msg("waiting for reconnect", level=log.NOISY,
323                         facility="tahoe.test.test_system")
324                 # now, we need to give the nodes a chance to notice that this
325                 # connection has gone away. When this happens, the storage
326                 # servers will be told to abort their uploads, removing the
327                 # partial shares. Unfortunately this involves TCP messages
328                 # going through the loopback interface, and we can't easily
329                 # predict how long that will take. If it were all local, we
330                 # could use fireEventually() to stall. Since we don't have
331                 # the right introduction hooks, the best we can do is use a
332                 # fixed delay. TODO: this is fragile.
333                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
334                 return u1.interrupt_after_d
335             d.addCallbacks(_should_not_finish, _interrupted)
336
337             def _disconnected(res):
338                 # check to make sure the storage servers aren't still hanging
339                 # on to the partial share: their incoming/ directories should
340                 # now be empty.
341                 log.msg("disconnected", level=log.NOISY,
342                         facility="tahoe.test.test_system")
343                 for i in range(self.numclients):
344                     incdir = os.path.join(self.getdir("client%d" % i),
345                                           "storage", "shares", "incoming")
346                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
347             d.addCallback(_disconnected)
348
349             # then we need to give the reconnector a chance to
350             # reestablish the connection to the helper.
351             d.addCallback(lambda res:
352                           log.msg("wait_for_connections", level=log.NOISY,
353                                   facility="tahoe.test.test_system"))
354             d.addCallback(lambda res: self.wait_for_connections())
355
356
357             d.addCallback(lambda res:
358                           log.msg("uploading again", level=log.NOISY,
359                                   facility="tahoe.test.test_system"))
360             d.addCallback(lambda res: self.extra_node.upload(u2))
361
362             def _uploaded(results):
363                 uri = results.uri
364                 log.msg("Second upload complete", level=log.NOISY,
365                         facility="tahoe.test.test_system")
366
367                 # this is really bytes received rather than sent, but it's
368                 # convenient and basically measures the same thing
369                 bytes_sent = results.ciphertext_fetched
370
371                 # We currently don't support resumption of upload if the data is
372                 # encrypted with a random key.  (Because that would require us
373                 # to store the key locally and re-use it on the next upload of
374                 # this file, which isn't a bad thing to do, but we currently
375                 # don't do it.)
376                 if convergence is not None:
377                     # Make sure we did not have to read the whole file the
378                     # second time around .
379                     self.failUnless(bytes_sent < len(DATA),
380                                 "resumption didn't save us any work:"
381                                 " read %d bytes out of %d total" %
382                                 (bytes_sent, len(DATA)))
383                 else:
384                     # Make sure we did have to read the whole file the second
385                     # time around -- because the one that we partially uploaded
386                     # earlier was encrypted with a different random key.
387                     self.failIf(bytes_sent < len(DATA),
388                                 "resumption saved us some work even though we were using random keys:"
389                                 " read %d bytes out of %d total" %
390                                 (bytes_sent, len(DATA)))
391                 return self.downloader.download_to_data(uri)
392             d.addCallback(_uploaded)
393
394             def _check(newdata):
395                 self.failUnlessEqual(newdata, DATA)
396                 # If using convergent encryption, then also check that the
397                 # helper has removed the temp file from its directories.
398                 if convergence is not None:
399                     basedir = os.path.join(self.getdir("client0"), "helper")
400                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
401                     self.failUnlessEqual(files, [])
402                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
403                     self.failUnlessEqual(files, [])
404             d.addCallback(_check)
405             return d
406         d.addCallback(_upload_resumable)
407
408         def _grab_stats(ignored):
409             # the StatsProvider doesn't normally publish a FURL:
410             # instead it passes a live reference to the StatsGatherer
411             # (if and when it connects). To exercise the remote stats
412             # interface, we manually publish client0's StatsProvider
413             # and use client1 to query it.
414             sp = self.clients[0].stats_provider
415             sp_furl = self.clients[0].tub.registerReference(sp)
416             d = self.clients[1].tub.getReference(sp_furl)
417             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
418             def _got_stats(stats):
419                 #print "STATS"
420                 #from pprint import pprint
421                 #pprint(stats)
422                 s = stats["stats"]
423                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
424                 c = stats["counters"]
425                 self.failUnless("storage_server.allocate" in c)
426             d.addCallback(_got_stats)
427             return d
428         d.addCallback(_grab_stats)
429
430         return d
431
432     def _find_shares(self, basedir):
433         shares = []
434         for (dirpath, dirnames, filenames) in os.walk(basedir):
435             if "storage" not in dirpath:
436                 continue
437             if not filenames:
438                 continue
439             pieces = dirpath.split(os.sep)
440             if (len(pieces) >= 5
441                 and pieces[-4] == "storage"
442                 and pieces[-3] == "shares"):
443                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
444                 # are sharefiles here
445                 assert pieces[-5].startswith("client")
446                 client_num = int(pieces[-5][-1])
447                 storage_index_s = pieces[-1]
448                 storage_index = si_a2b(storage_index_s)
449                 for sharename in filenames:
450                     shnum = int(sharename)
451                     filename = os.path.join(dirpath, sharename)
452                     data = (client_num, storage_index, filename, shnum)
453                     shares.append(data)
454         if not shares:
455             self.fail("unable to find any share files in %s" % basedir)
456         return shares
457
458     def _corrupt_mutable_share(self, filename, which):
459         msf = MutableShareFile(filename)
460         datav = msf.readv([ (0, 1000000) ])
461         final_share = datav[0]
462         assert len(final_share) < 1000000 # ought to be truncated
463         pieces = mutable_layout.unpack_share(final_share)
464         (seqnum, root_hash, IV, k, N, segsize, datalen,
465          verification_key, signature, share_hash_chain, block_hash_tree,
466          share_data, enc_privkey) = pieces
467
468         if which == "seqnum":
469             seqnum = seqnum + 15
470         elif which == "R":
471             root_hash = self.flip_bit(root_hash)
472         elif which == "IV":
473             IV = self.flip_bit(IV)
474         elif which == "segsize":
475             segsize = segsize + 15
476         elif which == "pubkey":
477             verification_key = self.flip_bit(verification_key)
478         elif which == "signature":
479             signature = self.flip_bit(signature)
480         elif which == "share_hash_chain":
481             nodenum = share_hash_chain.keys()[0]
482             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
483         elif which == "block_hash_tree":
484             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
485         elif which == "share_data":
486             share_data = self.flip_bit(share_data)
487         elif which == "encprivkey":
488             enc_privkey = self.flip_bit(enc_privkey)
489
490         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
491                                             segsize, datalen)
492         final_share = mutable_layout.pack_share(prefix,
493                                                 verification_key,
494                                                 signature,
495                                                 share_hash_chain,
496                                                 block_hash_tree,
497                                                 share_data,
498                                                 enc_privkey)
499         msf.writev( [(0, final_share)], None)
500
501
502     def test_mutable(self):
503         self.basedir = "system/SystemTest/test_mutable"
504         DATA = "initial contents go here."  # 25 bytes % 3 != 0
505         NEWDATA = "new contents yay"
506         NEWERDATA = "this is getting old"
507
508         d = self.set_up_nodes(use_key_generator=True)
509
510         def _create_mutable(res):
511             c = self.clients[0]
512             log.msg("starting create_mutable_file")
513             d1 = c.create_mutable_file(DATA)
514             def _done(res):
515                 log.msg("DONE: %s" % (res,))
516                 self._mutable_node_1 = res
517                 uri = res.get_uri()
518             d1.addCallback(_done)
519             return d1
520         d.addCallback(_create_mutable)
521
522         def _test_debug(res):
523             # find a share. It is important to run this while there is only
524             # one slot in the grid.
525             shares = self._find_shares(self.basedir)
526             (client_num, storage_index, filename, shnum) = shares[0]
527             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
528                     % filename)
529             log.msg(" for clients[%d]" % client_num)
530
531             out,err = StringIO(), StringIO()
532             rc = runner.runner(["debug", "dump-share", "--offsets",
533                                 filename],
534                                stdout=out, stderr=err)
535             output = out.getvalue()
536             self.failUnlessEqual(rc, 0)
537             try:
538                 self.failUnless("Mutable slot found:\n" in output)
539                 self.failUnless("share_type: SDMF\n" in output)
540                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
541                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
542                 self.failUnless(" num_extra_leases: 0\n" in output)
543                 # the pubkey size can vary by a byte, so the container might
544                 # be a bit larger on some runs.
545                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
546                 self.failUnless(m)
547                 container_size = int(m.group(1))
548                 self.failUnless(2037 <= container_size <= 2049, container_size)
549                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
550                 self.failUnless(m)
551                 data_length = int(m.group(1))
552                 self.failUnless(2037 <= data_length <= 2049, data_length)
553                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
554                                 in output)
555                 self.failUnless(" SDMF contents:\n" in output)
556                 self.failUnless("  seqnum: 1\n" in output)
557                 self.failUnless("  required_shares: 3\n" in output)
558                 self.failUnless("  total_shares: 10\n" in output)
559                 self.failUnless("  segsize: 27\n" in output, (output, filename))
560                 self.failUnless("  datalen: 25\n" in output)
561                 # the exact share_hash_chain nodes depends upon the sharenum,
562                 # and is more of a hassle to compute than I want to deal with
563                 # now
564                 self.failUnless("  share_hash_chain: " in output)
565                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
566                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
567                             base32.b2a(storage_index))
568                 self.failUnless(expected in output)
569             except unittest.FailTest:
570                 print
571                 print "dump-share output was:"
572                 print output
573                 raise
574         d.addCallback(_test_debug)
575
576         # test retrieval
577
578         # first, let's see if we can use the existing node to retrieve the
579         # contents. This allows it to use the cached pubkey and maybe the
580         # latest-known sharemap.
581
582         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
583         def _check_download_1(res):
584             self.failUnlessEqual(res, DATA)
585             # now we see if we can retrieve the data from a new node,
586             # constructed using the URI of the original one. We do this test
587             # on the same client that uploaded the data.
588             uri = self._mutable_node_1.get_uri()
589             log.msg("starting retrieve1")
590             newnode = self.clients[0].create_node_from_uri(uri)
591             newnode_2 = self.clients[0].create_node_from_uri(uri)
592             self.failUnlessIdentical(newnode, newnode_2)
593             return newnode.download_best_version()
594         d.addCallback(_check_download_1)
595
596         def _check_download_2(res):
597             self.failUnlessEqual(res, DATA)
598             # same thing, but with a different client
599             uri = self._mutable_node_1.get_uri()
600             newnode = self.clients[1].create_node_from_uri(uri)
601             log.msg("starting retrieve2")
602             d1 = newnode.download_best_version()
603             d1.addCallback(lambda res: (res, newnode))
604             return d1
605         d.addCallback(_check_download_2)
606
607         def _check_download_3((res, newnode)):
608             self.failUnlessEqual(res, DATA)
609             # replace the data
610             log.msg("starting replace1")
611             d1 = newnode.overwrite(NEWDATA)
612             d1.addCallback(lambda res: newnode.download_best_version())
613             return d1
614         d.addCallback(_check_download_3)
615
616         def _check_download_4(res):
617             self.failUnlessEqual(res, NEWDATA)
618             # now create an even newer node and replace the data on it. This
619             # new node has never been used for download before.
620             uri = self._mutable_node_1.get_uri()
621             newnode1 = self.clients[2].create_node_from_uri(uri)
622             newnode2 = self.clients[3].create_node_from_uri(uri)
623             self._newnode3 = self.clients[3].create_node_from_uri(uri)
624             log.msg("starting replace2")
625             d1 = newnode1.overwrite(NEWERDATA)
626             d1.addCallback(lambda res: newnode2.download_best_version())
627             return d1
628         d.addCallback(_check_download_4)
629
630         def _check_download_5(res):
631             log.msg("finished replace2")
632             self.failUnlessEqual(res, NEWERDATA)
633         d.addCallback(_check_download_5)
634
635         def _corrupt_shares(res):
636             # run around and flip bits in all but k of the shares, to test
637             # the hash checks
638             shares = self._find_shares(self.basedir)
639             ## sort by share number
640             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
641             where = dict([ (shnum, filename)
642                            for (client_num, storage_index, filename, shnum)
643                            in shares ])
644             assert len(where) == 10 # this test is designed for 3-of-10
645             for shnum, filename in where.items():
646                 # shares 7,8,9 are left alone. read will check
647                 # (share_hash_chain, block_hash_tree, share_data). New
648                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
649                 # segsize, signature).
650                 if shnum == 0:
651                     # read: this will trigger "pubkey doesn't match
652                     # fingerprint".
653                     self._corrupt_mutable_share(filename, "pubkey")
654                     self._corrupt_mutable_share(filename, "encprivkey")
655                 elif shnum == 1:
656                     # triggers "signature is invalid"
657                     self._corrupt_mutable_share(filename, "seqnum")
658                 elif shnum == 2:
659                     # triggers "signature is invalid"
660                     self._corrupt_mutable_share(filename, "R")
661                 elif shnum == 3:
662                     # triggers "signature is invalid"
663                     self._corrupt_mutable_share(filename, "segsize")
664                 elif shnum == 4:
665                     self._corrupt_mutable_share(filename, "share_hash_chain")
666                 elif shnum == 5:
667                     self._corrupt_mutable_share(filename, "block_hash_tree")
668                 elif shnum == 6:
669                     self._corrupt_mutable_share(filename, "share_data")
670                 # other things to correct: IV, signature
671                 # 7,8,9 are left alone
672
673                 # note that initial_query_count=5 means that we'll hit the
674                 # first 5 servers in effectively random order (based upon
675                 # response time), so we won't necessarily ever get a "pubkey
676                 # doesn't match fingerprint" error (if we hit shnum>=1 before
677                 # shnum=0, we pull the pubkey from there). To get repeatable
678                 # specific failures, we need to set initial_query_count=1,
679                 # but of course that will change the sequencing behavior of
680                 # the retrieval process. TODO: find a reasonable way to make
681                 # this a parameter, probably when we expand this test to test
682                 # for one failure mode at a time.
683
684                 # when we retrieve this, we should get three signature
685                 # failures (where we've mangled seqnum, R, and segsize). The
686                 # pubkey mangling
687         d.addCallback(_corrupt_shares)
688
689         d.addCallback(lambda res: self._newnode3.download_best_version())
690         d.addCallback(_check_download_5)
691
692         def _check_empty_file(res):
693             # make sure we can create empty files, this usually screws up the
694             # segsize math
695             d1 = self.clients[2].create_mutable_file("")
696             d1.addCallback(lambda newnode: newnode.download_best_version())
697             d1.addCallback(lambda res: self.failUnlessEqual("", res))
698             return d1
699         d.addCallback(_check_empty_file)
700
701         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
702         def _created_dirnode(dnode):
703             log.msg("_created_dirnode(%s)" % (dnode,))
704             d1 = dnode.list()
705             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
706             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
707             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
708             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
709             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
710             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
711             d1.addCallback(lambda res: dnode.build_manifest().when_done())
712             d1.addCallback(lambda res:
713                            self.failUnlessEqual(len(res["manifest"]), 1))
714             return d1
715         d.addCallback(_created_dirnode)
716
717         def wait_for_c3_kg_conn():
718             return self.clients[3]._key_generator is not None
719         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
720
721         def check_kg_poolsize(junk, size_delta):
722             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
723                                  self.key_generator_svc.key_generator.pool_size + size_delta)
724
725         d.addCallback(check_kg_poolsize, 0)
726         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
727         d.addCallback(check_kg_poolsize, -1)
728         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
729         d.addCallback(check_kg_poolsize, -2)
730         # use_helper induces use of clients[3], which is the using-key_gen client
731         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
732         d.addCallback(check_kg_poolsize, -3)
733
734         return d
735     # The default 120 second timeout went off when running it under valgrind
736     # on my old Windows laptop, so I'm bumping up the timeout.
737     test_mutable.timeout = 240
738
739     def flip_bit(self, good):
740         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
741
742     def mangle_uri(self, gooduri):
743         # change the key, which changes the storage index, which means we'll
744         # be asking about the wrong file, so nobody will have any shares
745         u = IFileURI(gooduri)
746         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
747                             uri_extension_hash=u.uri_extension_hash,
748                             needed_shares=u.needed_shares,
749                             total_shares=u.total_shares,
750                             size=u.size)
751         return u2.to_string()
752
753     # TODO: add a test which mangles the uri_extension_hash instead, and
754     # should fail due to not being able to get a valid uri_extension block.
755     # Also a test which sneakily mangles the uri_extension block to change
756     # some of the validation data, so it will fail in the post-download phase
757     # when the file's crypttext integrity check fails. Do the same thing for
758     # the key, which should cause the download to fail the post-download
759     # plaintext_hash check.
760
761     def test_vdrive(self):
762         self.basedir = "system/SystemTest/test_vdrive"
763         self.data = LARGE_DATA
764         d = self.set_up_nodes(use_stats_gatherer=True)
765         d.addCallback(self._test_introweb)
766         d.addCallback(self.log, "starting publish")
767         d.addCallback(self._do_publish1)
768         d.addCallback(self._test_runner)
769         d.addCallback(self._do_publish2)
770         # at this point, we have the following filesystem (where "R" denotes
771         # self._root_directory_uri):
772         # R
773         # R/subdir1
774         # R/subdir1/mydata567
775         # R/subdir1/subdir2/
776         # R/subdir1/subdir2/mydata992
777
778         d.addCallback(lambda res: self.bounce_client(0))
779         d.addCallback(self.log, "bounced client0")
780
781         d.addCallback(self._check_publish1)
782         d.addCallback(self.log, "did _check_publish1")
783         d.addCallback(self._check_publish2)
784         d.addCallback(self.log, "did _check_publish2")
785         d.addCallback(self._do_publish_private)
786         d.addCallback(self.log, "did _do_publish_private")
787         # now we also have (where "P" denotes a new dir):
788         #  P/personal/sekrit data
789         #  P/s2-rw -> /subdir1/subdir2/
790         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
791         d.addCallback(self._check_publish_private)
792         d.addCallback(self.log, "did _check_publish_private")
793         d.addCallback(self._test_web)
794         d.addCallback(self._test_control)
795         d.addCallback(self._test_cli)
796         # P now has four top-level children:
797         # P/personal/sekrit data
798         # P/s2-ro/
799         # P/s2-rw/
800         # P/test_put/  (empty)
801         d.addCallback(self._test_checker)
802         return d
803     test_vdrive.timeout = 1100
804
805     def _test_introweb(self, res):
806         d = getPage(self.introweb_url, method="GET", followRedirect=True)
807         def _check(res):
808             try:
809                 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
810                                 in res)
811                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
812                 self.failUnless("Subscription Summary: storage: 5" in res)
813             except unittest.FailTest:
814                 print
815                 print "GET %s output was:" % self.introweb_url
816                 print res
817                 raise
818         d.addCallback(_check)
819         d.addCallback(lambda res:
820                       getPage(self.introweb_url + "?t=json",
821                               method="GET", followRedirect=True))
822         def _check_json(res):
823             data = simplejson.loads(res)
824             try:
825                 self.failUnlessEqual(data["subscription_summary"],
826                                      {"storage": 5})
827                 self.failUnlessEqual(data["announcement_summary"],
828                                      {"storage": 5, "stub_client": 5})
829                 self.failUnlessEqual(data["announcement_distinct_hosts"],
830                                      {"storage": 1, "stub_client": 1})
831             except unittest.FailTest:
832                 print
833                 print "GET %s?t=json output was:" % self.introweb_url
834                 print res
835                 raise
836         d.addCallback(_check_json)
837         return d
838
839     def _do_publish1(self, res):
840         ut = upload.Data(self.data, convergence=None)
841         c0 = self.clients[0]
842         d = c0.create_empty_dirnode()
843         def _made_root(new_dirnode):
844             self._root_directory_uri = new_dirnode.get_uri()
845             return c0.create_node_from_uri(self._root_directory_uri)
846         d.addCallback(_made_root)
847         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
848         def _made_subdir1(subdir1_node):
849             self._subdir1_node = subdir1_node
850             d1 = subdir1_node.add_file(u"mydata567", ut)
851             d1.addCallback(self.log, "publish finished")
852             def _stash_uri(filenode):
853                 self.uri = filenode.get_uri()
854                 assert isinstance(self.uri, str), (self.uri, filenode)
855             d1.addCallback(_stash_uri)
856             return d1
857         d.addCallback(_made_subdir1)
858         return d
859
860     def _do_publish2(self, res):
861         ut = upload.Data(self.data, convergence=None)
862         d = self._subdir1_node.create_empty_directory(u"subdir2")
863         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
864         return d
865
866     def log(self, res, *args, **kwargs):
867         # print "MSG: %s  RES: %s" % (msg, args)
868         log.msg(*args, **kwargs)
869         return res
870
871     def _do_publish_private(self, res):
872         self.smalldata = "sssh, very secret stuff"
873         ut = upload.Data(self.smalldata, convergence=None)
874         d = self.clients[0].create_empty_dirnode()
875         d.addCallback(self.log, "GOT private directory")
876         def _got_new_dir(privnode):
877             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
878             d1 = privnode.create_empty_directory(u"personal")
879             d1.addCallback(self.log, "made P/personal")
880             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
881             d1.addCallback(self.log, "made P/personal/sekrit data")
882             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
883             def _got_s2(s2node):
884                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
885                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
886                 return d2
887             d1.addCallback(_got_s2)
888             d1.addCallback(lambda res: privnode)
889             return d1
890         d.addCallback(_got_new_dir)
891         return d
892
893     def _check_publish1(self, res):
894         # this one uses the iterative API
895         c1 = self.clients[1]
896         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
897         d.addCallback(self.log, "check_publish1 got /")
898         d.addCallback(lambda root: root.get(u"subdir1"))
899         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
900         d.addCallback(lambda filenode: filenode.download_to_data())
901         d.addCallback(self.log, "get finished")
902         def _get_done(data):
903             self.failUnlessEqual(data, self.data)
904         d.addCallback(_get_done)
905         return d
906
907     def _check_publish2(self, res):
908         # this one uses the path-based API
909         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
910         d = rootnode.get_child_at_path(u"subdir1")
911         d.addCallback(lambda dirnode:
912                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
913         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
914         d.addCallback(lambda filenode: filenode.download_to_data())
915         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
916
917         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
918         def _got_filenode(filenode):
919             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
920             assert fnode == filenode
921         d.addCallback(_got_filenode)
922         return d
923
924     def _check_publish_private(self, resnode):
925         # this one uses the path-based API
926         self._private_node = resnode
927
928         d = self._private_node.get_child_at_path(u"personal")
929         def _got_personal(personal):
930             self._personal_node = personal
931             return personal
932         d.addCallback(_got_personal)
933
934         d.addCallback(lambda dirnode:
935                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
936         def get_path(path):
937             return self._private_node.get_child_at_path(path)
938
939         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
940         d.addCallback(lambda filenode: filenode.download_to_data())
941         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
942         d.addCallback(lambda res: get_path(u"s2-rw"))
943         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
944         d.addCallback(lambda res: get_path(u"s2-ro"))
945         def _got_s2ro(dirnode):
946             self.failUnless(dirnode.is_mutable(), dirnode)
947             self.failUnless(dirnode.is_readonly(), dirnode)
948             d1 = defer.succeed(None)
949             d1.addCallback(lambda res: dirnode.list())
950             d1.addCallback(self.log, "dirnode.list")
951
952             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
953
954             d1.addCallback(self.log, "doing add_file(ro)")
955             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
956             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
957
958             d1.addCallback(self.log, "doing get(ro)")
959             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
960             d1.addCallback(lambda filenode:
961                            self.failUnless(IFileNode.providedBy(filenode)))
962
963             d1.addCallback(self.log, "doing delete(ro)")
964             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
965
966             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
967
968             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
969
970             personal = self._personal_node
971             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
972
973             d1.addCallback(self.log, "doing move_child_to(ro)2")
974             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
975
976             d1.addCallback(self.log, "finished with _got_s2ro")
977             return d1
978         d.addCallback(_got_s2ro)
979         def _got_home(dummy):
980             home = self._private_node
981             personal = self._personal_node
982             d1 = defer.succeed(None)
983             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
984             d1.addCallback(lambda res:
985                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
986
987             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
988             d1.addCallback(lambda res:
989                            home.move_child_to(u"sekrit", home, u"sekrit data"))
990
991             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
992             d1.addCallback(lambda res:
993                            home.move_child_to(u"sekrit data", personal))
994
995             d1.addCallback(lambda res: home.build_manifest().when_done())
996             d1.addCallback(self.log, "manifest")
997             #  five items:
998             # P/
999             # P/personal/
1000             # P/personal/sekrit data
1001             # P/s2-rw  (same as P/s2-ro)
1002             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1003             d1.addCallback(lambda res:
1004                            self.failUnlessEqual(len(res["manifest"]), 5))
1005             d1.addCallback(lambda res: home.start_deep_stats().when_done())
1006             def _check_stats(stats):
1007                 expected = {"count-immutable-files": 1,
1008                             "count-mutable-files": 0,
1009                             "count-literal-files": 1,
1010                             "count-files": 2,
1011                             "count-directories": 3,
1012                             "size-immutable-files": 112,
1013                             "size-literal-files": 23,
1014                             #"size-directories": 616, # varies
1015                             #"largest-directory": 616,
1016                             "largest-directory-children": 3,
1017                             "largest-immutable-file": 112,
1018                             }
1019                 for k,v in expected.iteritems():
1020                     self.failUnlessEqual(stats[k], v,
1021                                          "stats[%s] was %s, not %s" %
1022                                          (k, stats[k], v))
1023                 self.failUnless(stats["size-directories"] > 1300,
1024                                 stats["size-directories"])
1025                 self.failUnless(stats["largest-directory"] > 800,
1026                                 stats["largest-directory"])
1027                 self.failUnlessEqual(stats["size-files-histogram"],
1028                                      [ (11, 31, 1), (101, 316, 1) ])
1029             d1.addCallback(_check_stats)
1030             return d1
1031         d.addCallback(_got_home)
1032         return d
1033
1034     def shouldFail(self, res, expected_failure, which, substring=None):
1035         if isinstance(res, Failure):
1036             res.trap(expected_failure)
1037             if substring:
1038                 self.failUnless(substring in str(res),
1039                                 "substring '%s' not in '%s'"
1040                                 % (substring, str(res)))
1041         else:
1042             self.fail("%s was supposed to raise %s, not get '%s'" %
1043                       (which, expected_failure, res))
1044
1045     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1046         assert substring is None or isinstance(substring, str)
1047         d = defer.maybeDeferred(callable, *args, **kwargs)
1048         def done(res):
1049             if isinstance(res, Failure):
1050                 res.trap(expected_failure)
1051                 if substring:
1052                     self.failUnless(substring in str(res),
1053                                     "substring '%s' not in '%s'"
1054                                     % (substring, str(res)))
1055             else:
1056                 self.fail("%s was supposed to raise %s, not get '%s'" %
1057                           (which, expected_failure, res))
1058         d.addBoth(done)
1059         return d
1060
1061     def PUT(self, urlpath, data):
1062         url = self.webish_url + urlpath
1063         return getPage(url, method="PUT", postdata=data)
1064
1065     def GET(self, urlpath, followRedirect=False):
1066         url = self.webish_url + urlpath
1067         return getPage(url, method="GET", followRedirect=followRedirect)
1068
1069     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1070         if use_helper:
1071             url = self.helper_webish_url + urlpath
1072         else:
1073             url = self.webish_url + urlpath
1074         sepbase = "boogabooga"
1075         sep = "--" + sepbase
1076         form = []
1077         form.append(sep)
1078         form.append('Content-Disposition: form-data; name="_charset"')
1079         form.append('')
1080         form.append('UTF-8')
1081         form.append(sep)
1082         for name, value in fields.iteritems():
1083             if isinstance(value, tuple):
1084                 filename, value = value
1085                 form.append('Content-Disposition: form-data; name="%s"; '
1086                             'filename="%s"' % (name, filename.encode("utf-8")))
1087             else:
1088                 form.append('Content-Disposition: form-data; name="%s"' % name)
1089             form.append('')
1090             form.append(str(value))
1091             form.append(sep)
1092         form[-1] += "--"
1093         body = "\r\n".join(form) + "\r\n"
1094         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1095                    }
1096         return getPage(url, method="POST", postdata=body,
1097                        headers=headers, followRedirect=followRedirect)
1098
1099     def _test_web(self, res):
1100         base = self.webish_url
1101         public = "uri/" + self._root_directory_uri
1102         d = getPage(base)
1103         def _got_welcome(page):
1104             # XXX This test is oversensitive to formatting
1105             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1106             self.failUnless(expected in page,
1107                             "I didn't see the right 'connected storage servers'"
1108                             " message in: %s" % page
1109                             )
1110             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1111             self.failUnless(expected in page,
1112                             "I didn't see the right 'My nodeid' message "
1113                             "in: %s" % page)
1114             self.failUnless("Helper: 0 active uploads" in page)
1115         d.addCallback(_got_welcome)
1116         d.addCallback(self.log, "done with _got_welcome")
1117
1118         # get the welcome page from the node that uses the helper too
1119         d.addCallback(lambda res: getPage(self.helper_webish_url))
1120         def _got_welcome_helper(page):
1121             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1122                             page)
1123             self.failUnless("Not running helper" in page)
1124         d.addCallback(_got_welcome_helper)
1125
1126         d.addCallback(lambda res: getPage(base + public))
1127         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1128         def _got_subdir1(page):
1129             # there ought to be an href for our file
1130             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1131             self.failUnless(">mydata567</a>" in page)
1132         d.addCallback(_got_subdir1)
1133         d.addCallback(self.log, "done with _got_subdir1")
1134         d.addCallback(lambda res:
1135                       getPage(base + public + "/subdir1/mydata567"))
1136         def _got_data(page):
1137             self.failUnlessEqual(page, self.data)
1138         d.addCallback(_got_data)
1139
1140         # download from a URI embedded in a URL
1141         d.addCallback(self.log, "_get_from_uri")
1142         def _get_from_uri(res):
1143             return getPage(base + "uri/%s?filename=%s"
1144                            % (self.uri, "mydata567"))
1145         d.addCallback(_get_from_uri)
1146         def _got_from_uri(page):
1147             self.failUnlessEqual(page, self.data)
1148         d.addCallback(_got_from_uri)
1149
1150         # download from a URI embedded in a URL, second form
1151         d.addCallback(self.log, "_get_from_uri2")
1152         def _get_from_uri2(res):
1153             return getPage(base + "uri?uri=%s" % (self.uri,))
1154         d.addCallback(_get_from_uri2)
1155         d.addCallback(_got_from_uri)
1156
1157         # download from a bogus URI, make sure we get a reasonable error
1158         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1159         def _get_from_bogus_uri(res):
1160             d1 = getPage(base + "uri/%s?filename=%s"
1161                          % (self.mangle_uri(self.uri), "mydata567"))
1162             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1163                        "410")
1164             return d1
1165         d.addCallback(_get_from_bogus_uri)
1166         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1167
1168         # upload a file with PUT
1169         d.addCallback(self.log, "about to try PUT")
1170         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1171                                            "new.txt contents"))
1172         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1173         d.addCallback(self.failUnlessEqual, "new.txt contents")
1174         # and again with something large enough to use multiple segments,
1175         # and hopefully trigger pauseProducing too
1176         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1177                                            "big" * 500000)) # 1.5MB
1178         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1179         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1180
1181         # can we replace files in place?
1182         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1183                                            "NEWER contents"))
1184         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1185         d.addCallback(self.failUnlessEqual, "NEWER contents")
1186
1187         # test unlinked POST
1188         d.addCallback(lambda res: self.POST("uri", t="upload",
1189                                             file=("new.txt", "data" * 10000)))
1190         # and again using the helper, which exercises different upload-status
1191         # display code
1192         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1193                                             file=("foo.txt", "data2" * 10000)))
1194
1195         # check that the status page exists
1196         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1197         def _got_status(res):
1198             # find an interesting upload and download to look at. LIT files
1199             # are not interesting.
1200             for ds in self.clients[0].list_all_download_statuses():
1201                 if ds.get_size() > 200:
1202                     self._down_status = ds.get_counter()
1203             for us in self.clients[0].list_all_upload_statuses():
1204                 if us.get_size() > 200:
1205                     self._up_status = us.get_counter()
1206             rs = list(self.clients[0].list_all_retrieve_statuses())[0]
1207             self._retrieve_status = rs.get_counter()
1208             ps = list(self.clients[0].list_all_publish_statuses())[0]
1209             self._publish_status = ps.get_counter()
1210             us = list(self.clients[0].list_all_mapupdate_statuses())[0]
1211             self._update_status = us.get_counter()
1212
1213             # and that there are some upload- and download- status pages
1214             return self.GET("status/up-%d" % self._up_status)
1215         d.addCallback(_got_status)
1216         def _got_up(res):
1217             return self.GET("status/down-%d" % self._down_status)
1218         d.addCallback(_got_up)
1219         def _got_down(res):
1220             return self.GET("status/mapupdate-%d" % self._update_status)
1221         d.addCallback(_got_down)
1222         def _got_update(res):
1223             return self.GET("status/publish-%d" % self._publish_status)
1224         d.addCallback(_got_update)
1225         def _got_publish(res):
1226             return self.GET("status/retrieve-%d" % self._retrieve_status)
1227         d.addCallback(_got_publish)
1228
1229         # check that the helper status page exists
1230         d.addCallback(lambda res:
1231                       self.GET("helper_status", followRedirect=True))
1232         def _got_helper_status(res):
1233             self.failUnless("Bytes Fetched:" in res)
1234             # touch a couple of files in the helper's working directory to
1235             # exercise more code paths
1236             workdir = os.path.join(self.getdir("client0"), "helper")
1237             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1238             f = open(incfile, "wb")
1239             f.write("small file")
1240             f.close()
1241             then = time.time() - 86400*3
1242             now = time.time()
1243             os.utime(incfile, (now, then))
1244             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1245             f = open(encfile, "wb")
1246             f.write("less small file")
1247             f.close()
1248             os.utime(encfile, (now, then))
1249         d.addCallback(_got_helper_status)
1250         # and that the json form exists
1251         d.addCallback(lambda res:
1252                       self.GET("helper_status?t=json", followRedirect=True))
1253         def _got_helper_status_json(res):
1254             data = simplejson.loads(res)
1255             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1256                                  1)
1257             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1258             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1259             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1260                                  10)
1261             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1262             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1263             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1264                                  15)
1265         d.addCallback(_got_helper_status_json)
1266
1267         # and check that client[3] (which uses a helper but does not run one
1268         # itself) doesn't explode when you ask for its status
1269         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1270         def _got_non_helper_status(res):
1271             self.failUnless("Upload and Download Status" in res)
1272         d.addCallback(_got_non_helper_status)
1273
1274         # or for helper status with t=json
1275         d.addCallback(lambda res:
1276                       getPage(self.helper_webish_url + "helper_status?t=json"))
1277         def _got_non_helper_status_json(res):
1278             data = simplejson.loads(res)
1279             self.failUnlessEqual(data, {})
1280         d.addCallback(_got_non_helper_status_json)
1281
1282         # see if the statistics page exists
1283         d.addCallback(lambda res: self.GET("statistics"))
1284         def _got_stats(res):
1285             self.failUnless("Node Statistics" in res)
1286             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1287         d.addCallback(_got_stats)
1288         d.addCallback(lambda res: self.GET("statistics?t=json"))
1289         def _got_stats_json(res):
1290             data = simplejson.loads(res)
1291             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1292             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1293         d.addCallback(_got_stats_json)
1294
1295         # TODO: mangle the second segment of a file, to test errors that
1296         # occur after we've already sent some good data, which uses a
1297         # different error path.
1298
1299         # TODO: download a URI with a form
1300         # TODO: create a directory by using a form
1301         # TODO: upload by using a form on the directory page
1302         #    url = base + "somedir/subdir1/freeform_post!!upload"
1303         # TODO: delete a file by using a button on the directory page
1304
1305         return d
1306
1307     def _test_runner(self, res):
1308         # exercise some of the diagnostic tools in runner.py
1309
1310         # find a share
1311         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1312             if "storage" not in dirpath:
1313                 continue
1314             if not filenames:
1315                 continue
1316             pieces = dirpath.split(os.sep)
1317             if (len(pieces) >= 4
1318                 and pieces[-4] == "storage"
1319                 and pieces[-3] == "shares"):
1320                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1321                 # are sharefiles here
1322                 filename = os.path.join(dirpath, filenames[0])
1323                 # peek at the magic to see if it is a chk share
1324                 magic = open(filename, "rb").read(4)
1325                 if magic == '\x00\x00\x00\x01':
1326                     break
1327         else:
1328             self.fail("unable to find any uri_extension files in %s"
1329                       % self.basedir)
1330         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1331
1332         out,err = StringIO(), StringIO()
1333         rc = runner.runner(["debug", "dump-share", "--offsets",
1334                             filename],
1335                            stdout=out, stderr=err)
1336         output = out.getvalue()
1337         self.failUnlessEqual(rc, 0)
1338
1339         # we only upload a single file, so we can assert some things about
1340         # its size and shares.
1341         self.failUnless(("share filename: %s" % filename) in output)
1342         self.failUnless("size: %d\n" % len(self.data) in output)
1343         self.failUnless("num_segments: 1\n" in output)
1344         # segment_size is always a multiple of needed_shares
1345         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1346         self.failUnless("total_shares: 10\n" in output)
1347         # keys which are supposed to be present
1348         for key in ("size", "num_segments", "segment_size",
1349                     "needed_shares", "total_shares",
1350                     "codec_name", "codec_params", "tail_codec_params",
1351                     #"plaintext_hash", "plaintext_root_hash",
1352                     "crypttext_hash", "crypttext_root_hash",
1353                     "share_root_hash", "UEB_hash"):
1354             self.failUnless("%s: " % key in output, key)
1355         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1356
1357         # now use its storage index to find the other shares using the
1358         # 'find-shares' tool
1359         sharedir, shnum = os.path.split(filename)
1360         storagedir, storage_index_s = os.path.split(sharedir)
1361         out,err = StringIO(), StringIO()
1362         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1363         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1364         rc = runner.runner(cmd, stdout=out, stderr=err)
1365         self.failUnlessEqual(rc, 0)
1366         out.seek(0)
1367         sharefiles = [sfn.strip() for sfn in out.readlines()]
1368         self.failUnlessEqual(len(sharefiles), 10)
1369
1370         # also exercise the 'catalog-shares' tool
1371         out,err = StringIO(), StringIO()
1372         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1373         cmd = ["debug", "catalog-shares"] + nodedirs
1374         rc = runner.runner(cmd, stdout=out, stderr=err)
1375         self.failUnlessEqual(rc, 0)
1376         out.seek(0)
1377         descriptions = [sfn.strip() for sfn in out.readlines()]
1378         self.failUnlessEqual(len(descriptions), 30)
1379         matching = [line
1380                     for line in descriptions
1381                     if line.startswith("CHK %s " % storage_index_s)]
1382         self.failUnlessEqual(len(matching), 10)
1383
1384     def _test_control(self, res):
1385         # exercise the remote-control-the-client foolscap interfaces in
1386         # allmydata.control (mostly used for performance tests)
1387         c0 = self.clients[0]
1388         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1389         control_furl = open(control_furl_file, "r").read().strip()
1390         # it doesn't really matter which Tub we use to connect to the client,
1391         # so let's just use our IntroducerNode's
1392         d = self.introducer.tub.getReference(control_furl)
1393         d.addCallback(self._test_control2, control_furl_file)
1394         return d
1395     def _test_control2(self, rref, filename):
1396         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1397         downfile = os.path.join(self.basedir, "control.downfile")
1398         d.addCallback(lambda uri:
1399                       rref.callRemote("download_from_uri_to_file",
1400                                       uri, downfile))
1401         def _check(res):
1402             self.failUnlessEqual(res, downfile)
1403             data = open(downfile, "r").read()
1404             expected_data = open(filename, "r").read()
1405             self.failUnlessEqual(data, expected_data)
1406         d.addCallback(_check)
1407         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1408         if sys.platform == "linux2":
1409             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1410         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1411         return d
1412
1413     def _test_cli(self, res):
1414         # run various CLI commands (in a thread, since they use blocking
1415         # network calls)
1416
1417         private_uri = self._private_node.get_uri()
1418         some_uri = self._root_directory_uri
1419         client0_basedir = self.getdir("client0")
1420
1421         nodeargs = [
1422             "--node-directory", client0_basedir,
1423             ]
1424         TESTDATA = "I will not write the same thing over and over.\n" * 100
1425
1426         d = defer.succeed(None)
1427
1428         # for compatibility with earlier versions, private/root_dir.cap is
1429         # supposed to be treated as an alias named "tahoe:". Start by making
1430         # sure that works, before we add other aliases.
1431
1432         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1433         f = open(root_file, "w")
1434         f.write(private_uri)
1435         f.close()
1436
1437         def run(ignored, verb, *args, **kwargs):
1438             stdin = kwargs.get("stdin", "")
1439             newargs = [verb] + nodeargs + list(args)
1440             return self._run_cli(newargs, stdin=stdin)
1441
1442         def _check_ls((out,err), expected_children, unexpected_children=[]):
1443             self.failUnlessEqual(err, "")
1444             for s in expected_children:
1445                 self.failUnless(s in out, (s,out))
1446             for s in unexpected_children:
1447                 self.failIf(s in out, (s,out))
1448
1449         def _check_ls_root((out,err)):
1450             self.failUnless("personal" in out)
1451             self.failUnless("s2-ro" in out)
1452             self.failUnless("s2-rw" in out)
1453             self.failUnlessEqual(err, "")
1454
1455         # this should reference private_uri
1456         d.addCallback(run, "ls")
1457         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1458
1459         d.addCallback(run, "list-aliases")
1460         def _check_aliases_1((out,err)):
1461             self.failUnlessEqual(err, "")
1462             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1463         d.addCallback(_check_aliases_1)
1464
1465         # now that that's out of the way, remove root_dir.cap and work with
1466         # new files
1467         d.addCallback(lambda res: os.unlink(root_file))
1468         d.addCallback(run, "list-aliases")
1469         def _check_aliases_2((out,err)):
1470             self.failUnlessEqual(err, "")
1471             self.failUnlessEqual(out, "")
1472         d.addCallback(_check_aliases_2)
1473
1474         d.addCallback(run, "mkdir")
1475         def _got_dir( (out,err) ):
1476             self.failUnless(uri.from_string_dirnode(out.strip()))
1477             return out.strip()
1478         d.addCallback(_got_dir)
1479         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1480
1481         d.addCallback(run, "list-aliases")
1482         def _check_aliases_3((out,err)):
1483             self.failUnlessEqual(err, "")
1484             self.failUnless("tahoe: " in out)
1485         d.addCallback(_check_aliases_3)
1486
1487         def _check_empty_dir((out,err)):
1488             self.failUnlessEqual(out, "")
1489             self.failUnlessEqual(err, "")
1490         d.addCallback(run, "ls")
1491         d.addCallback(_check_empty_dir)
1492
1493         def _check_missing_dir((out,err)):
1494             # TODO: check that rc==2
1495             self.failUnlessEqual(out, "")
1496             self.failUnlessEqual(err, "No such file or directory\n")
1497         d.addCallback(run, "ls", "bogus")
1498         d.addCallback(_check_missing_dir)
1499
1500         files = []
1501         datas = []
1502         for i in range(10):
1503             fn = os.path.join(self.basedir, "file%d" % i)
1504             files.append(fn)
1505             data = "data to be uploaded: file%d\n" % i
1506             datas.append(data)
1507             open(fn,"wb").write(data)
1508
1509         def _check_stdout_against((out,err), filenum=None, data=None):
1510             self.failUnlessEqual(err, "")
1511             if filenum is not None:
1512                 self.failUnlessEqual(out, datas[filenum])
1513             if data is not None:
1514                 self.failUnlessEqual(out, data)
1515
1516         # test all both forms of put: from a file, and from stdin
1517         #  tahoe put bar FOO
1518         d.addCallback(run, "put", files[0], "tahoe-file0")
1519         def _put_out((out,err)):
1520             self.failUnless("URI:LIT:" in out, out)
1521             self.failUnless("201 Created" in err, err)
1522             uri0 = out.strip()
1523             return run(None, "get", uri0)
1524         d.addCallback(_put_out)
1525         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1526
1527         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1528         #  tahoe put bar tahoe:FOO
1529         d.addCallback(run, "put", files[2], "tahoe:file2")
1530         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1531         def _check_put_mutable((out,err)):
1532             self._mutable_file3_uri = out.strip()
1533         d.addCallback(_check_put_mutable)
1534         d.addCallback(run, "get", "tahoe:file3")
1535         d.addCallback(_check_stdout_against, 3)
1536
1537         #  tahoe put FOO
1538         STDIN_DATA = "This is the file to upload from stdin."
1539         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1540         #  tahoe put tahoe:FOO
1541         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1542                       stdin="Other file from stdin.")
1543
1544         d.addCallback(run, "ls")
1545         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1546                                   "tahoe-file-stdin", "from-stdin"])
1547         d.addCallback(run, "ls", "subdir")
1548         d.addCallback(_check_ls, ["tahoe-file1"])
1549
1550         # tahoe mkdir FOO
1551         d.addCallback(run, "mkdir", "subdir2")
1552         d.addCallback(run, "ls")
1553         # TODO: extract the URI, set an alias with it
1554         d.addCallback(_check_ls, ["subdir2"])
1555
1556         # tahoe get: (to stdin and to a file)
1557         d.addCallback(run, "get", "tahoe-file0")
1558         d.addCallback(_check_stdout_against, 0)
1559         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1560         d.addCallback(_check_stdout_against, 1)
1561         outfile0 = os.path.join(self.basedir, "outfile0")
1562         d.addCallback(run, "get", "file2", outfile0)
1563         def _check_outfile0((out,err)):
1564             data = open(outfile0,"rb").read()
1565             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1566         d.addCallback(_check_outfile0)
1567         outfile1 = os.path.join(self.basedir, "outfile0")
1568         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1569         def _check_outfile1((out,err)):
1570             data = open(outfile1,"rb").read()
1571             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1572         d.addCallback(_check_outfile1)
1573
1574         d.addCallback(run, "rm", "tahoe-file0")
1575         d.addCallback(run, "rm", "tahoe:file2")
1576         d.addCallback(run, "ls")
1577         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1578
1579         d.addCallback(run, "ls", "-l")
1580         def _check_ls_l((out,err)):
1581             lines = out.split("\n")
1582             for l in lines:
1583                 if "tahoe-file-stdin" in l:
1584                     self.failUnless(l.startswith("-r-- "), l)
1585                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1586                 if "file3" in l:
1587                     self.failUnless(l.startswith("-rw- "), l) # mutable
1588         d.addCallback(_check_ls_l)
1589
1590         d.addCallback(run, "ls", "--uri")
1591         def _check_ls_uri((out,err)):
1592             lines = out.split("\n")
1593             for l in lines:
1594                 if "file3" in l:
1595                     self.failUnless(self._mutable_file3_uri in l)
1596         d.addCallback(_check_ls_uri)
1597
1598         d.addCallback(run, "ls", "--readonly-uri")
1599         def _check_ls_rouri((out,err)):
1600             lines = out.split("\n")
1601             for l in lines:
1602                 if "file3" in l:
1603                     rw_uri = self._mutable_file3_uri
1604                     u = uri.from_string_mutable_filenode(rw_uri)
1605                     ro_uri = u.get_readonly().to_string()
1606                     self.failUnless(ro_uri in l)
1607         d.addCallback(_check_ls_rouri)
1608
1609
1610         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1611         d.addCallback(run, "ls")
1612         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1613
1614         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1615         d.addCallback(run, "ls")
1616         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1617
1618         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1619         d.addCallback(run, "ls")
1620         d.addCallback(_check_ls, ["file3", "file3-copy"])
1621         d.addCallback(run, "get", "tahoe:file3-copy")
1622         d.addCallback(_check_stdout_against, 3)
1623
1624         # copy from disk into tahoe
1625         d.addCallback(run, "cp", files[4], "tahoe:file4")
1626         d.addCallback(run, "ls")
1627         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1628         d.addCallback(run, "get", "tahoe:file4")
1629         d.addCallback(_check_stdout_against, 4)
1630
1631         # copy from tahoe into disk
1632         target_filename = os.path.join(self.basedir, "file-out")
1633         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1634         def _check_cp_out((out,err)):
1635             self.failUnless(os.path.exists(target_filename))
1636             got = open(target_filename,"rb").read()
1637             self.failUnlessEqual(got, datas[4])
1638         d.addCallback(_check_cp_out)
1639
1640         # copy from disk to disk (silly case)
1641         target2_filename = os.path.join(self.basedir, "file-out-copy")
1642         d.addCallback(run, "cp", target_filename, target2_filename)
1643         def _check_cp_out2((out,err)):
1644             self.failUnless(os.path.exists(target2_filename))
1645             got = open(target2_filename,"rb").read()
1646             self.failUnlessEqual(got, datas[4])
1647         d.addCallback(_check_cp_out2)
1648
1649         # copy from tahoe into disk, overwriting an existing file
1650         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1651         def _check_cp_out3((out,err)):
1652             self.failUnless(os.path.exists(target_filename))
1653             got = open(target_filename,"rb").read()
1654             self.failUnlessEqual(got, datas[3])
1655         d.addCallback(_check_cp_out3)
1656
1657         # copy from disk into tahoe, overwriting an existing immutable file
1658         d.addCallback(run, "cp", files[5], "tahoe:file4")
1659         d.addCallback(run, "ls")
1660         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1661         d.addCallback(run, "get", "tahoe:file4")
1662         d.addCallback(_check_stdout_against, 5)
1663
1664         # copy from disk into tahoe, overwriting an existing mutable file
1665         d.addCallback(run, "cp", files[5], "tahoe:file3")
1666         d.addCallback(run, "ls")
1667         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1668         d.addCallback(run, "get", "tahoe:file3")
1669         d.addCallback(_check_stdout_against, 5)
1670
1671         # recursive copy: setup
1672         dn = os.path.join(self.basedir, "dir1")
1673         os.makedirs(dn)
1674         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1675         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1676         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1677         sdn2 = os.path.join(dn, "subdir2")
1678         os.makedirs(sdn2)
1679         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1680         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1681
1682         # from disk into tahoe
1683         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1684         d.addCallback(run, "ls")
1685         d.addCallback(_check_ls, ["dir1"])
1686         d.addCallback(run, "ls", "dir1")
1687         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1688                       ["rfile4", "rfile5"])
1689         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1690         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1691                       ["rfile1", "rfile2", "rfile3"])
1692         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1693         d.addCallback(_check_stdout_against, data="rfile4")
1694
1695         # and back out again
1696         dn_copy = os.path.join(self.basedir, "dir1-copy")
1697         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1698         def _check_cp_r_out((out,err)):
1699             def _cmp(name):
1700                 old = open(os.path.join(dn, name), "rb").read()
1701                 newfn = os.path.join(dn_copy, name)
1702                 self.failUnless(os.path.exists(newfn))
1703                 new = open(newfn, "rb").read()
1704                 self.failUnlessEqual(old, new)
1705             _cmp("rfile1")
1706             _cmp("rfile2")
1707             _cmp("rfile3")
1708             _cmp(os.path.join("subdir2", "rfile4"))
1709             _cmp(os.path.join("subdir2", "rfile5"))
1710         d.addCallback(_check_cp_r_out)
1711
1712         # and copy it a second time, which ought to overwrite the same files
1713         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1714
1715         # and again, only writing filecaps
1716         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1717         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1718         def _check_capsonly((out,err)):
1719             # these should all be LITs
1720             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1721             y = uri.from_string_filenode(x)
1722             self.failUnlessEqual(y.data, "rfile4")
1723         d.addCallback(_check_capsonly)
1724
1725         # and tahoe-to-tahoe
1726         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1727         d.addCallback(run, "ls")
1728         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1729         d.addCallback(run, "ls", "dir1-copy")
1730         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1731                       ["rfile4", "rfile5"])
1732         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1733         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1734                       ["rfile1", "rfile2", "rfile3"])
1735         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1736         d.addCallback(_check_stdout_against, data="rfile4")
1737
1738         # and copy it a second time, which ought to overwrite the same files
1739         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1740
1741         # tahoe_ls doesn't currently handle the error correctly: it tries to
1742         # JSON-parse a traceback.
1743 ##         def _ls_missing(res):
1744 ##             argv = ["ls"] + nodeargs + ["bogus"]
1745 ##             return self._run_cli(argv)
1746 ##         d.addCallback(_ls_missing)
1747 ##         def _check_ls_missing((out,err)):
1748 ##             print "OUT", out
1749 ##             print "ERR", err
1750 ##             self.failUnlessEqual(err, "")
1751 ##         d.addCallback(_check_ls_missing)
1752
1753         return d
1754
1755     def _run_cli(self, argv, stdin=""):
1756         #print "CLI:", argv
1757         stdout, stderr = StringIO(), StringIO()
1758         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1759                                   stdin=StringIO(stdin),
1760                                   stdout=stdout, stderr=stderr)
1761         def _done(res):
1762             return stdout.getvalue(), stderr.getvalue()
1763         d.addCallback(_done)
1764         return d
1765
1766     def _test_checker(self, res):
1767         ut = upload.Data("too big to be literal" * 200, convergence=None)
1768         d = self._personal_node.add_file(u"big file", ut)
1769
1770         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1771         def _check_dirnode_results(r):
1772             self.failUnless(r.is_healthy())
1773         d.addCallback(_check_dirnode_results)
1774         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1775         d.addCallback(_check_dirnode_results)
1776
1777         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1778         def _got_chk_filenode(n):
1779             self.failUnless(isinstance(n, filenode.FileNode))
1780             d = n.check(Monitor())
1781             def _check_filenode_results(r):
1782                 self.failUnless(r.is_healthy())
1783             d.addCallback(_check_filenode_results)
1784             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1785             d.addCallback(_check_filenode_results)
1786             return d
1787         d.addCallback(_got_chk_filenode)
1788
1789         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1790         def _got_lit_filenode(n):
1791             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1792             d = n.check(Monitor())
1793             def _check_lit_filenode_results(r):
1794                 self.failUnlessEqual(r, None)
1795             d.addCallback(_check_lit_filenode_results)
1796             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1797             d.addCallback(_check_lit_filenode_results)
1798             return d
1799         d.addCallback(_got_lit_filenode)
1800         return d
1801