]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
423657db7cf2d26b0f8035cb057b18e8b5c617b0
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri
12 from allmydata.storage.mutable import MutableShareFile
13 from allmydata.storage.server import si_a2b
14 from allmydata.immutable import download, offloaded, upload
15 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
16 from allmydata.util import idlib, mathutil
17 from allmydata.util import log, base32
18 from allmydata.scripts import runner
19 from allmydata.interfaces import IDirectoryNode, IFileNode, \
20      NoSuchChildError, NoSharesError
21 from allmydata.monitor import Monitor
22 from allmydata.mutable.common import NotMutableError
23 from allmydata.mutable import layout as mutable_layout
24 from foolscap.api import DeadReferenceError
25 from twisted.python.failure import Failure
26 from twisted.web.client import getPage
27 from twisted.web.error import Error
28
29 from allmydata.test.common import SystemTestMixin, MemoryConsumer, \
30      download_to_data
31
32 LARGE_DATA = """
33 This is some data to publish to the virtual drive, which needs to be large
34 enough to not fit inside a LIT uri.
35 """
36
37 class CountingDataUploadable(upload.Data):
38     bytes_read = 0
39     interrupt_after = None
40     interrupt_after_d = None
41
42     def read(self, length):
43         self.bytes_read += length
44         if self.interrupt_after is not None:
45             if self.bytes_read > self.interrupt_after:
46                 self.interrupt_after = None
47                 self.interrupt_after_d.callback(self)
48         return upload.Data.read(self, length)
49
50 class GrabEverythingConsumer:
51     implements(IConsumer)
52
53     def __init__(self):
54         self.contents = ""
55
56     def registerProducer(self, producer, streaming):
57         assert streaming
58         assert IPushProducer.providedBy(producer)
59
60     def write(self, data):
61         self.contents += data
62
63     def unregisterProducer(self):
64         pass
65
66 class SystemTest(SystemTestMixin, unittest.TestCase):
67     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
68
69     def test_connections(self):
70         self.basedir = "system/SystemTest/test_connections"
71         d = self.set_up_nodes()
72         self.extra_node = None
73         d.addCallback(lambda res: self.add_extra_node(self.numclients))
74         def _check(extra_node):
75             self.extra_node = extra_node
76             for c in self.clients:
77                 all_peerids = c.get_storage_broker().get_all_serverids()
78                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
79                 sb = c.storage_broker
80                 permuted_peers = sb.get_servers_for_index("a")
81                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
82
83         d.addCallback(_check)
84         def _shutdown_extra_node(res):
85             if self.extra_node:
86                 return self.extra_node.stopService()
87             return res
88         d.addBoth(_shutdown_extra_node)
89         return d
90     # test_connections is subsumed by test_upload_and_download, and takes
91     # quite a while to run on a slow machine (because of all the TLS
92     # connections that must be established). If we ever rework the introducer
93     # code to such an extent that we're not sure if it works anymore, we can
94     # reinstate this test until it does.
95     del test_connections
96
97     def test_upload_and_download_random_key(self):
98         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
99         return self._test_upload_and_download(convergence=None)
100
101     def test_upload_and_download_convergent(self):
102         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
103         return self._test_upload_and_download(convergence="some convergence string")
104
105     def _test_upload_and_download(self, convergence):
106         # we use 4000 bytes of data, which will result in about 400k written
107         # to disk among all our simulated nodes
108         DATA = "Some data to upload\n" * 200
109         d = self.set_up_nodes()
110         def _check_connections(res):
111             for c in self.clients:
112                 all_peerids = c.get_storage_broker().get_all_serverids()
113                 self.failUnlessEqual(len(all_peerids), self.numclients)
114                 sb = c.storage_broker
115                 permuted_peers = sb.get_servers_for_index("a")
116                 self.failUnlessEqual(len(permuted_peers), self.numclients)
117         d.addCallback(_check_connections)
118
119         def _do_upload(res):
120             log.msg("UPLOADING")
121             u = self.clients[0].getServiceNamed("uploader")
122             self.uploader = u
123             # we crank the max segsize down to 1024b for the duration of this
124             # test, so we can exercise multiple segments. It is important
125             # that this is not a multiple of the segment size, so that the
126             # tail segment is not the same length as the others. This actualy
127             # gets rounded up to 1025 to be a multiple of the number of
128             # required shares (since we use 25 out of 100 FEC).
129             up = upload.Data(DATA, convergence=convergence)
130             up.max_segment_size = 1024
131             d1 = u.upload(up)
132             return d1
133         d.addCallback(_do_upload)
134         def _upload_done(results):
135             theuri = results.uri
136             log.msg("upload finished: uri is %s" % (theuri,))
137             self.uri = theuri
138             assert isinstance(self.uri, str), self.uri
139             self.cap = uri.from_string(self.uri)
140             self.downloader = self.clients[1].downloader
141         d.addCallback(_upload_done)
142
143         def _upload_again(res):
144             # Upload again. If using convergent encryption then this ought to be
145             # short-circuited, however with the way we currently generate URIs
146             # (i.e. because they include the roothash), we have to do all of the
147             # encoding work, and only get to save on the upload part.
148             log.msg("UPLOADING AGAIN")
149             up = upload.Data(DATA, convergence=convergence)
150             up.max_segment_size = 1024
151             d1 = self.uploader.upload(up)
152         d.addCallback(_upload_again)
153
154         def _download_to_data(res):
155             log.msg("DOWNLOADING")
156             return self.downloader.download_to_data(self.cap)
157         d.addCallback(_download_to_data)
158         def _download_to_data_done(data):
159             log.msg("download finished")
160             self.failUnlessEqual(data, DATA)
161         d.addCallback(_download_to_data_done)
162
163         target_filename = os.path.join(self.basedir, "download.target")
164         def _download_to_filename(res):
165             return self.downloader.download_to_filename(self.cap,
166                                                         target_filename)
167         d.addCallback(_download_to_filename)
168         def _download_to_filename_done(res):
169             newdata = open(target_filename, "rb").read()
170             self.failUnlessEqual(newdata, DATA)
171         d.addCallback(_download_to_filename_done)
172
173         target_filename2 = os.path.join(self.basedir, "download.target2")
174         def _download_to_filehandle(res):
175             fh = open(target_filename2, "wb")
176             return self.downloader.download_to_filehandle(self.cap, fh)
177         d.addCallback(_download_to_filehandle)
178         def _download_to_filehandle_done(fh):
179             fh.close()
180             newdata = open(target_filename2, "rb").read()
181             self.failUnlessEqual(newdata, DATA)
182         d.addCallback(_download_to_filehandle_done)
183
184         consumer = GrabEverythingConsumer()
185         ct = download.ConsumerAdapter(consumer)
186         d.addCallback(lambda res:
187                       self.downloader.download(self.cap, ct))
188         def _download_to_consumer_done(ign):
189             self.failUnlessEqual(consumer.contents, DATA)
190         d.addCallback(_download_to_consumer_done)
191
192         def _test_read(res):
193             n = self.clients[1].create_node_from_uri(self.uri)
194             d = download_to_data(n)
195             def _read_done(data):
196                 self.failUnlessEqual(data, DATA)
197             d.addCallback(_read_done)
198             d.addCallback(lambda ign:
199                           n.read(MemoryConsumer(), offset=1, size=4))
200             def _read_portion_done(mc):
201                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
202             d.addCallback(_read_portion_done)
203             d.addCallback(lambda ign:
204                           n.read(MemoryConsumer(), offset=2, size=None))
205             def _read_tail_done(mc):
206                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
207             d.addCallback(_read_tail_done)
208             d.addCallback(lambda ign:
209                           n.read(MemoryConsumer(), size=len(DATA)+1000))
210             def _read_too_much(mc):
211                 self.failUnlessEqual("".join(mc.chunks), DATA)
212             d.addCallback(_read_too_much)
213
214             return d
215         d.addCallback(_test_read)
216
217         def _test_bad_read(res):
218             bad_u = uri.from_string_filenode(self.uri)
219             bad_u.key = self.flip_bit(bad_u.key)
220             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
221             # this should cause an error during download
222
223             d = self.shouldFail2(NoSharesError, "'download bad node'",
224                                  None,
225                                  bad_n.read, MemoryConsumer(), offset=2)
226             return d
227         d.addCallback(_test_bad_read)
228
229         def _download_nonexistent_uri(res):
230             baduri = self.mangle_uri(self.uri)
231             log.msg("about to download non-existent URI", level=log.UNUSUAL,
232                     facility="tahoe.tests")
233             d1 = self.downloader.download_to_data(uri.from_string(baduri))
234             def _baduri_should_fail(res):
235                 log.msg("finished downloading non-existend URI",
236                         level=log.UNUSUAL, facility="tahoe.tests")
237                 self.failUnless(isinstance(res, Failure))
238                 self.failUnless(res.check(NoSharesError),
239                                 "expected NoSharesError, got %s" % res)
240             d1.addBoth(_baduri_should_fail)
241             return d1
242         d.addCallback(_download_nonexistent_uri)
243
244         # add a new node, which doesn't accept shares, and only uses the
245         # helper for upload.
246         d.addCallback(lambda res: self.add_extra_node(self.numclients,
247                                                       self.helper_furl,
248                                                       add_to_sparent=True))
249         def _added(extra_node):
250             self.extra_node = extra_node
251         d.addCallback(_added)
252
253         HELPER_DATA = "Data that needs help to upload" * 1000
254         def _upload_with_helper(res):
255             u = upload.Data(HELPER_DATA, convergence=convergence)
256             d = self.extra_node.upload(u)
257             def _uploaded(results):
258                 cap = uri.from_string(results.uri)
259                 return self.downloader.download_to_data(cap)
260             d.addCallback(_uploaded)
261             def _check(newdata):
262                 self.failUnlessEqual(newdata, HELPER_DATA)
263             d.addCallback(_check)
264             return d
265         d.addCallback(_upload_with_helper)
266
267         def _upload_duplicate_with_helper(res):
268             u = upload.Data(HELPER_DATA, convergence=convergence)
269             u.debug_stash_RemoteEncryptedUploadable = True
270             d = self.extra_node.upload(u)
271             def _uploaded(results):
272                 cap = uri.from_string(results.uri)
273                 return self.downloader.download_to_data(cap)
274             d.addCallback(_uploaded)
275             def _check(newdata):
276                 self.failUnlessEqual(newdata, HELPER_DATA)
277                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
278                             "uploadable started uploading, should have been avoided")
279             d.addCallback(_check)
280             return d
281         if convergence is not None:
282             d.addCallback(_upload_duplicate_with_helper)
283
284         def _upload_resumable(res):
285             DATA = "Data that needs help to upload and gets interrupted" * 1000
286             u1 = CountingDataUploadable(DATA, convergence=convergence)
287             u2 = CountingDataUploadable(DATA, convergence=convergence)
288
289             # we interrupt the connection after about 5kB by shutting down
290             # the helper, then restartingit.
291             u1.interrupt_after = 5000
292             u1.interrupt_after_d = defer.Deferred()
293             u1.interrupt_after_d.addCallback(lambda res:
294                                              self.bounce_client(0))
295
296             # sneak into the helper and reduce its chunk size, so that our
297             # debug_interrupt will sever the connection on about the fifth
298             # chunk fetched. This makes sure that we've started to write the
299             # new shares before we abandon them, which exercises the
300             # abort/delete-partial-share code. TODO: find a cleaner way to do
301             # this. I know that this will affect later uses of the helper in
302             # this same test run, but I'm not currently worried about it.
303             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
304
305             d = self.extra_node.upload(u1)
306
307             def _should_not_finish(res):
308                 self.fail("interrupted upload should have failed, not finished"
309                           " with result %s" % (res,))
310             def _interrupted(f):
311                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
312
313                 # make sure we actually interrupted it before finishing the
314                 # file
315                 self.failUnless(u1.bytes_read < len(DATA),
316                                 "read %d out of %d total" % (u1.bytes_read,
317                                                              len(DATA)))
318
319                 log.msg("waiting for reconnect", level=log.NOISY,
320                         facility="tahoe.test.test_system")
321                 # now, we need to give the nodes a chance to notice that this
322                 # connection has gone away. When this happens, the storage
323                 # servers will be told to abort their uploads, removing the
324                 # partial shares. Unfortunately this involves TCP messages
325                 # going through the loopback interface, and we can't easily
326                 # predict how long that will take. If it were all local, we
327                 # could use fireEventually() to stall. Since we don't have
328                 # the right introduction hooks, the best we can do is use a
329                 # fixed delay. TODO: this is fragile.
330                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
331                 return u1.interrupt_after_d
332             d.addCallbacks(_should_not_finish, _interrupted)
333
334             def _disconnected(res):
335                 # check to make sure the storage servers aren't still hanging
336                 # on to the partial share: their incoming/ directories should
337                 # now be empty.
338                 log.msg("disconnected", level=log.NOISY,
339                         facility="tahoe.test.test_system")
340                 for i in range(self.numclients):
341                     incdir = os.path.join(self.getdir("client%d" % i),
342                                           "storage", "shares", "incoming")
343                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
344             d.addCallback(_disconnected)
345
346             # then we need to give the reconnector a chance to
347             # reestablish the connection to the helper.
348             d.addCallback(lambda res:
349                           log.msg("wait_for_connections", level=log.NOISY,
350                                   facility="tahoe.test.test_system"))
351             d.addCallback(lambda res: self.wait_for_connections())
352
353
354             d.addCallback(lambda res:
355                           log.msg("uploading again", level=log.NOISY,
356                                   facility="tahoe.test.test_system"))
357             d.addCallback(lambda res: self.extra_node.upload(u2))
358
359             def _uploaded(results):
360                 cap = uri.from_string(results.uri)
361                 log.msg("Second upload complete", level=log.NOISY,
362                         facility="tahoe.test.test_system")
363
364                 # this is really bytes received rather than sent, but it's
365                 # convenient and basically measures the same thing
366                 bytes_sent = results.ciphertext_fetched
367
368                 # We currently don't support resumption of upload if the data is
369                 # encrypted with a random key.  (Because that would require us
370                 # to store the key locally and re-use it on the next upload of
371                 # this file, which isn't a bad thing to do, but we currently
372                 # don't do it.)
373                 if convergence is not None:
374                     # Make sure we did not have to read the whole file the
375                     # second time around .
376                     self.failUnless(bytes_sent < len(DATA),
377                                 "resumption didn't save us any work:"
378                                 " read %d bytes out of %d total" %
379                                 (bytes_sent, len(DATA)))
380                 else:
381                     # Make sure we did have to read the whole file the second
382                     # time around -- because the one that we partially uploaded
383                     # earlier was encrypted with a different random key.
384                     self.failIf(bytes_sent < len(DATA),
385                                 "resumption saved us some work even though we were using random keys:"
386                                 " read %d bytes out of %d total" %
387                                 (bytes_sent, len(DATA)))
388                 return self.downloader.download_to_data(cap)
389             d.addCallback(_uploaded)
390
391             def _check(newdata):
392                 self.failUnlessEqual(newdata, DATA)
393                 # If using convergent encryption, then also check that the
394                 # helper has removed the temp file from its directories.
395                 if convergence is not None:
396                     basedir = os.path.join(self.getdir("client0"), "helper")
397                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
398                     self.failUnlessEqual(files, [])
399                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
400                     self.failUnlessEqual(files, [])
401             d.addCallback(_check)
402             return d
403         d.addCallback(_upload_resumable)
404
405         def _grab_stats(ignored):
406             # the StatsProvider doesn't normally publish a FURL:
407             # instead it passes a live reference to the StatsGatherer
408             # (if and when it connects). To exercise the remote stats
409             # interface, we manually publish client0's StatsProvider
410             # and use client1 to query it.
411             sp = self.clients[0].stats_provider
412             sp_furl = self.clients[0].tub.registerReference(sp)
413             d = self.clients[1].tub.getReference(sp_furl)
414             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
415             def _got_stats(stats):
416                 #print "STATS"
417                 #from pprint import pprint
418                 #pprint(stats)
419                 s = stats["stats"]
420                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
421                 c = stats["counters"]
422                 self.failUnless("storage_server.allocate" in c)
423             d.addCallback(_got_stats)
424             return d
425         d.addCallback(_grab_stats)
426
427         return d
428
429     def _find_shares(self, basedir):
430         shares = []
431         for (dirpath, dirnames, filenames) in os.walk(basedir):
432             if "storage" not in dirpath:
433                 continue
434             if not filenames:
435                 continue
436             pieces = dirpath.split(os.sep)
437             if (len(pieces) >= 5
438                 and pieces[-4] == "storage"
439                 and pieces[-3] == "shares"):
440                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
441                 # are sharefiles here
442                 assert pieces[-5].startswith("client")
443                 client_num = int(pieces[-5][-1])
444                 storage_index_s = pieces[-1]
445                 storage_index = si_a2b(storage_index_s)
446                 for sharename in filenames:
447                     shnum = int(sharename)
448                     filename = os.path.join(dirpath, sharename)
449                     data = (client_num, storage_index, filename, shnum)
450                     shares.append(data)
451         if not shares:
452             self.fail("unable to find any share files in %s" % basedir)
453         return shares
454
455     def _corrupt_mutable_share(self, filename, which):
456         msf = MutableShareFile(filename)
457         datav = msf.readv([ (0, 1000000) ])
458         final_share = datav[0]
459         assert len(final_share) < 1000000 # ought to be truncated
460         pieces = mutable_layout.unpack_share(final_share)
461         (seqnum, root_hash, IV, k, N, segsize, datalen,
462          verification_key, signature, share_hash_chain, block_hash_tree,
463          share_data, enc_privkey) = pieces
464
465         if which == "seqnum":
466             seqnum = seqnum + 15
467         elif which == "R":
468             root_hash = self.flip_bit(root_hash)
469         elif which == "IV":
470             IV = self.flip_bit(IV)
471         elif which == "segsize":
472             segsize = segsize + 15
473         elif which == "pubkey":
474             verification_key = self.flip_bit(verification_key)
475         elif which == "signature":
476             signature = self.flip_bit(signature)
477         elif which == "share_hash_chain":
478             nodenum = share_hash_chain.keys()[0]
479             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
480         elif which == "block_hash_tree":
481             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
482         elif which == "share_data":
483             share_data = self.flip_bit(share_data)
484         elif which == "encprivkey":
485             enc_privkey = self.flip_bit(enc_privkey)
486
487         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
488                                             segsize, datalen)
489         final_share = mutable_layout.pack_share(prefix,
490                                                 verification_key,
491                                                 signature,
492                                                 share_hash_chain,
493                                                 block_hash_tree,
494                                                 share_data,
495                                                 enc_privkey)
496         msf.writev( [(0, final_share)], None)
497
498
499     def test_mutable(self):
500         self.basedir = "system/SystemTest/test_mutable"
501         DATA = "initial contents go here."  # 25 bytes % 3 != 0
502         NEWDATA = "new contents yay"
503         NEWERDATA = "this is getting old"
504
505         d = self.set_up_nodes(use_key_generator=True)
506
507         def _create_mutable(res):
508             c = self.clients[0]
509             log.msg("starting create_mutable_file")
510             d1 = c.create_mutable_file(DATA)
511             def _done(res):
512                 log.msg("DONE: %s" % (res,))
513                 self._mutable_node_1 = res
514                 uri = res.get_uri()
515             d1.addCallback(_done)
516             return d1
517         d.addCallback(_create_mutable)
518
519         def _test_debug(res):
520             # find a share. It is important to run this while there is only
521             # one slot in the grid.
522             shares = self._find_shares(self.basedir)
523             (client_num, storage_index, filename, shnum) = shares[0]
524             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
525                     % filename)
526             log.msg(" for clients[%d]" % client_num)
527
528             out,err = StringIO(), StringIO()
529             rc = runner.runner(["debug", "dump-share", "--offsets",
530                                 filename],
531                                stdout=out, stderr=err)
532             output = out.getvalue()
533             self.failUnlessEqual(rc, 0)
534             try:
535                 self.failUnless("Mutable slot found:\n" in output)
536                 self.failUnless("share_type: SDMF\n" in output)
537                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
538                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
539                 self.failUnless(" num_extra_leases: 0\n" in output)
540                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
541                                 in output)
542                 self.failUnless(" SDMF contents:\n" in output)
543                 self.failUnless("  seqnum: 1\n" in output)
544                 self.failUnless("  required_shares: 3\n" in output)
545                 self.failUnless("  total_shares: 10\n" in output)
546                 self.failUnless("  segsize: 27\n" in output, (output, filename))
547                 self.failUnless("  datalen: 25\n" in output)
548                 # the exact share_hash_chain nodes depends upon the sharenum,
549                 # and is more of a hassle to compute than I want to deal with
550                 # now
551                 self.failUnless("  share_hash_chain: " in output)
552                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
553                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
554                             base32.b2a(storage_index))
555                 self.failUnless(expected in output)
556             except unittest.FailTest:
557                 print
558                 print "dump-share output was:"
559                 print output
560                 raise
561         d.addCallback(_test_debug)
562
563         # test retrieval
564
565         # first, let's see if we can use the existing node to retrieve the
566         # contents. This allows it to use the cached pubkey and maybe the
567         # latest-known sharemap.
568
569         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
570         def _check_download_1(res):
571             self.failUnlessEqual(res, DATA)
572             # now we see if we can retrieve the data from a new node,
573             # constructed using the URI of the original one. We do this test
574             # on the same client that uploaded the data.
575             uri = self._mutable_node_1.get_uri()
576             log.msg("starting retrieve1")
577             newnode = self.clients[0].create_node_from_uri(uri)
578             newnode_2 = self.clients[0].create_node_from_uri(uri)
579             self.failUnlessIdentical(newnode, newnode_2)
580             return newnode.download_best_version()
581         d.addCallback(_check_download_1)
582
583         def _check_download_2(res):
584             self.failUnlessEqual(res, DATA)
585             # same thing, but with a different client
586             uri = self._mutable_node_1.get_uri()
587             newnode = self.clients[1].create_node_from_uri(uri)
588             log.msg("starting retrieve2")
589             d1 = newnode.download_best_version()
590             d1.addCallback(lambda res: (res, newnode))
591             return d1
592         d.addCallback(_check_download_2)
593
594         def _check_download_3((res, newnode)):
595             self.failUnlessEqual(res, DATA)
596             # replace the data
597             log.msg("starting replace1")
598             d1 = newnode.overwrite(NEWDATA)
599             d1.addCallback(lambda res: newnode.download_best_version())
600             return d1
601         d.addCallback(_check_download_3)
602
603         def _check_download_4(res):
604             self.failUnlessEqual(res, NEWDATA)
605             # now create an even newer node and replace the data on it. This
606             # new node has never been used for download before.
607             uri = self._mutable_node_1.get_uri()
608             newnode1 = self.clients[2].create_node_from_uri(uri)
609             newnode2 = self.clients[3].create_node_from_uri(uri)
610             self._newnode3 = self.clients[3].create_node_from_uri(uri)
611             log.msg("starting replace2")
612             d1 = newnode1.overwrite(NEWERDATA)
613             d1.addCallback(lambda res: newnode2.download_best_version())
614             return d1
615         d.addCallback(_check_download_4)
616
617         def _check_download_5(res):
618             log.msg("finished replace2")
619             self.failUnlessEqual(res, NEWERDATA)
620         d.addCallback(_check_download_5)
621
622         def _corrupt_shares(res):
623             # run around and flip bits in all but k of the shares, to test
624             # the hash checks
625             shares = self._find_shares(self.basedir)
626             ## sort by share number
627             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
628             where = dict([ (shnum, filename)
629                            for (client_num, storage_index, filename, shnum)
630                            in shares ])
631             assert len(where) == 10 # this test is designed for 3-of-10
632             for shnum, filename in where.items():
633                 # shares 7,8,9 are left alone. read will check
634                 # (share_hash_chain, block_hash_tree, share_data). New
635                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
636                 # segsize, signature).
637                 if shnum == 0:
638                     # read: this will trigger "pubkey doesn't match
639                     # fingerprint".
640                     self._corrupt_mutable_share(filename, "pubkey")
641                     self._corrupt_mutable_share(filename, "encprivkey")
642                 elif shnum == 1:
643                     # triggers "signature is invalid"
644                     self._corrupt_mutable_share(filename, "seqnum")
645                 elif shnum == 2:
646                     # triggers "signature is invalid"
647                     self._corrupt_mutable_share(filename, "R")
648                 elif shnum == 3:
649                     # triggers "signature is invalid"
650                     self._corrupt_mutable_share(filename, "segsize")
651                 elif shnum == 4:
652                     self._corrupt_mutable_share(filename, "share_hash_chain")
653                 elif shnum == 5:
654                     self._corrupt_mutable_share(filename, "block_hash_tree")
655                 elif shnum == 6:
656                     self._corrupt_mutable_share(filename, "share_data")
657                 # other things to correct: IV, signature
658                 # 7,8,9 are left alone
659
660                 # note that initial_query_count=5 means that we'll hit the
661                 # first 5 servers in effectively random order (based upon
662                 # response time), so we won't necessarily ever get a "pubkey
663                 # doesn't match fingerprint" error (if we hit shnum>=1 before
664                 # shnum=0, we pull the pubkey from there). To get repeatable
665                 # specific failures, we need to set initial_query_count=1,
666                 # but of course that will change the sequencing behavior of
667                 # the retrieval process. TODO: find a reasonable way to make
668                 # this a parameter, probably when we expand this test to test
669                 # for one failure mode at a time.
670
671                 # when we retrieve this, we should get three signature
672                 # failures (where we've mangled seqnum, R, and segsize). The
673                 # pubkey mangling
674         d.addCallback(_corrupt_shares)
675
676         d.addCallback(lambda res: self._newnode3.download_best_version())
677         d.addCallback(_check_download_5)
678
679         def _check_empty_file(res):
680             # make sure we can create empty files, this usually screws up the
681             # segsize math
682             d1 = self.clients[2].create_mutable_file("")
683             d1.addCallback(lambda newnode: newnode.download_best_version())
684             d1.addCallback(lambda res: self.failUnlessEqual("", res))
685             return d1
686         d.addCallback(_check_empty_file)
687
688         d.addCallback(lambda res: self.clients[0].create_dirnode())
689         def _created_dirnode(dnode):
690             log.msg("_created_dirnode(%s)" % (dnode,))
691             d1 = dnode.list()
692             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
693             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
694             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
695             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
696             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
697             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
698             d1.addCallback(lambda res: dnode.build_manifest().when_done())
699             d1.addCallback(lambda res:
700                            self.failUnlessEqual(len(res["manifest"]), 1))
701             return d1
702         d.addCallback(_created_dirnode)
703
704         def wait_for_c3_kg_conn():
705             return self.clients[3]._key_generator is not None
706         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
707
708         def check_kg_poolsize(junk, size_delta):
709             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
710                                  self.key_generator_svc.key_generator.pool_size + size_delta)
711
712         d.addCallback(check_kg_poolsize, 0)
713         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
714         d.addCallback(check_kg_poolsize, -1)
715         d.addCallback(lambda junk: self.clients[3].create_dirnode())
716         d.addCallback(check_kg_poolsize, -2)
717         # use_helper induces use of clients[3], which is the using-key_gen client
718         d.addCallback(lambda junk:
719                       self.POST("uri?t=mkdir&name=george", use_helper=True))
720         d.addCallback(check_kg_poolsize, -3)
721
722         return d
723
724     def flip_bit(self, good):
725         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
726
727     def mangle_uri(self, gooduri):
728         # change the key, which changes the storage index, which means we'll
729         # be asking about the wrong file, so nobody will have any shares
730         u = uri.from_string(gooduri)
731         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
732                             uri_extension_hash=u.uri_extension_hash,
733                             needed_shares=u.needed_shares,
734                             total_shares=u.total_shares,
735                             size=u.size)
736         return u2.to_string()
737
738     # TODO: add a test which mangles the uri_extension_hash instead, and
739     # should fail due to not being able to get a valid uri_extension block.
740     # Also a test which sneakily mangles the uri_extension block to change
741     # some of the validation data, so it will fail in the post-download phase
742     # when the file's crypttext integrity check fails. Do the same thing for
743     # the key, which should cause the download to fail the post-download
744     # plaintext_hash check.
745
746     def test_vdrive(self):
747         self.basedir = "system/SystemTest/test_vdrive"
748         self.data = LARGE_DATA
749         d = self.set_up_nodes(use_stats_gatherer=True)
750         d.addCallback(self._test_introweb)
751         d.addCallback(self.log, "starting publish")
752         d.addCallback(self._do_publish1)
753         d.addCallback(self._test_runner)
754         d.addCallback(self._do_publish2)
755         # at this point, we have the following filesystem (where "R" denotes
756         # self._root_directory_uri):
757         # R
758         # R/subdir1
759         # R/subdir1/mydata567
760         # R/subdir1/subdir2/
761         # R/subdir1/subdir2/mydata992
762
763         d.addCallback(lambda res: self.bounce_client(0))
764         d.addCallback(self.log, "bounced client0")
765
766         d.addCallback(self._check_publish1)
767         d.addCallback(self.log, "did _check_publish1")
768         d.addCallback(self._check_publish2)
769         d.addCallback(self.log, "did _check_publish2")
770         d.addCallback(self._do_publish_private)
771         d.addCallback(self.log, "did _do_publish_private")
772         # now we also have (where "P" denotes a new dir):
773         #  P/personal/sekrit data
774         #  P/s2-rw -> /subdir1/subdir2/
775         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
776         d.addCallback(self._check_publish_private)
777         d.addCallback(self.log, "did _check_publish_private")
778         d.addCallback(self._test_web)
779         d.addCallback(self._test_control)
780         d.addCallback(self._test_cli)
781         # P now has four top-level children:
782         # P/personal/sekrit data
783         # P/s2-ro/
784         # P/s2-rw/
785         # P/test_put/  (empty)
786         d.addCallback(self._test_checker)
787         return d
788
789     def _test_introweb(self, res):
790         d = getPage(self.introweb_url, method="GET", followRedirect=True)
791         def _check(res):
792             try:
793                 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
794                                 in res)
795                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
796                 self.failUnless("Subscription Summary: storage: 5" in res)
797             except unittest.FailTest:
798                 print
799                 print "GET %s output was:" % self.introweb_url
800                 print res
801                 raise
802         d.addCallback(_check)
803         d.addCallback(lambda res:
804                       getPage(self.introweb_url + "?t=json",
805                               method="GET", followRedirect=True))
806         def _check_json(res):
807             data = simplejson.loads(res)
808             try:
809                 self.failUnlessEqual(data["subscription_summary"],
810                                      {"storage": 5})
811                 self.failUnlessEqual(data["announcement_summary"],
812                                      {"storage": 5, "stub_client": 5})
813                 self.failUnlessEqual(data["announcement_distinct_hosts"],
814                                      {"storage": 1, "stub_client": 1})
815             except unittest.FailTest:
816                 print
817                 print "GET %s?t=json output was:" % self.introweb_url
818                 print res
819                 raise
820         d.addCallback(_check_json)
821         return d
822
823     def _do_publish1(self, res):
824         ut = upload.Data(self.data, convergence=None)
825         c0 = self.clients[0]
826         d = c0.create_dirnode()
827         def _made_root(new_dirnode):
828             self._root_directory_uri = new_dirnode.get_uri()
829             return c0.create_node_from_uri(self._root_directory_uri)
830         d.addCallback(_made_root)
831         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
832         def _made_subdir1(subdir1_node):
833             self._subdir1_node = subdir1_node
834             d1 = subdir1_node.add_file(u"mydata567", ut)
835             d1.addCallback(self.log, "publish finished")
836             def _stash_uri(filenode):
837                 self.uri = filenode.get_uri()
838                 assert isinstance(self.uri, str), (self.uri, filenode)
839             d1.addCallback(_stash_uri)
840             return d1
841         d.addCallback(_made_subdir1)
842         return d
843
844     def _do_publish2(self, res):
845         ut = upload.Data(self.data, convergence=None)
846         d = self._subdir1_node.create_subdirectory(u"subdir2")
847         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
848         return d
849
850     def log(self, res, *args, **kwargs):
851         # print "MSG: %s  RES: %s" % (msg, args)
852         log.msg(*args, **kwargs)
853         return res
854
855     def _do_publish_private(self, res):
856         self.smalldata = "sssh, very secret stuff"
857         ut = upload.Data(self.smalldata, convergence=None)
858         d = self.clients[0].create_dirnode()
859         d.addCallback(self.log, "GOT private directory")
860         def _got_new_dir(privnode):
861             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
862             d1 = privnode.create_subdirectory(u"personal")
863             d1.addCallback(self.log, "made P/personal")
864             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
865             d1.addCallback(self.log, "made P/personal/sekrit data")
866             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
867             def _got_s2(s2node):
868                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
869                                       s2node.get_readonly_uri())
870                 d2.addCallback(lambda node:
871                                privnode.set_uri(u"s2-ro",
872                                                 s2node.get_readonly_uri(),
873                                                 s2node.get_readonly_uri()))
874                 return d2
875             d1.addCallback(_got_s2)
876             d1.addCallback(lambda res: privnode)
877             return d1
878         d.addCallback(_got_new_dir)
879         return d
880
881     def _check_publish1(self, res):
882         # this one uses the iterative API
883         c1 = self.clients[1]
884         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
885         d.addCallback(self.log, "check_publish1 got /")
886         d.addCallback(lambda root: root.get(u"subdir1"))
887         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
888         d.addCallback(lambda filenode: filenode.download_to_data())
889         d.addCallback(self.log, "get finished")
890         def _get_done(data):
891             self.failUnlessEqual(data, self.data)
892         d.addCallback(_get_done)
893         return d
894
895     def _check_publish2(self, res):
896         # this one uses the path-based API
897         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
898         d = rootnode.get_child_at_path(u"subdir1")
899         d.addCallback(lambda dirnode:
900                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
901         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
902         d.addCallback(lambda filenode: filenode.download_to_data())
903         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
904
905         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
906         def _got_filenode(filenode):
907             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
908             assert fnode == filenode
909         d.addCallback(_got_filenode)
910         return d
911
912     def _check_publish_private(self, resnode):
913         # this one uses the path-based API
914         self._private_node = resnode
915
916         d = self._private_node.get_child_at_path(u"personal")
917         def _got_personal(personal):
918             self._personal_node = personal
919             return personal
920         d.addCallback(_got_personal)
921
922         d.addCallback(lambda dirnode:
923                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
924         def get_path(path):
925             return self._private_node.get_child_at_path(path)
926
927         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
928         d.addCallback(lambda filenode: filenode.download_to_data())
929         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
930         d.addCallback(lambda res: get_path(u"s2-rw"))
931         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
932         d.addCallback(lambda res: get_path(u"s2-ro"))
933         def _got_s2ro(dirnode):
934             self.failUnless(dirnode.is_mutable(), dirnode)
935             self.failUnless(dirnode.is_readonly(), dirnode)
936             d1 = defer.succeed(None)
937             d1.addCallback(lambda res: dirnode.list())
938             d1.addCallback(self.log, "dirnode.list")
939
940             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
941
942             d1.addCallback(self.log, "doing add_file(ro)")
943             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
944             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
945
946             d1.addCallback(self.log, "doing get(ro)")
947             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
948             d1.addCallback(lambda filenode:
949                            self.failUnless(IFileNode.providedBy(filenode)))
950
951             d1.addCallback(self.log, "doing delete(ro)")
952             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
953
954             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
955
956             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
957
958             personal = self._personal_node
959             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
960
961             d1.addCallback(self.log, "doing move_child_to(ro)2")
962             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
963
964             d1.addCallback(self.log, "finished with _got_s2ro")
965             return d1
966         d.addCallback(_got_s2ro)
967         def _got_home(dummy):
968             home = self._private_node
969             personal = self._personal_node
970             d1 = defer.succeed(None)
971             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
972             d1.addCallback(lambda res:
973                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
974
975             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
976             d1.addCallback(lambda res:
977                            home.move_child_to(u"sekrit", home, u"sekrit data"))
978
979             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
980             d1.addCallback(lambda res:
981                            home.move_child_to(u"sekrit data", personal))
982
983             d1.addCallback(lambda res: home.build_manifest().when_done())
984             d1.addCallback(self.log, "manifest")
985             #  five items:
986             # P/
987             # P/personal/
988             # P/personal/sekrit data
989             # P/s2-rw  (same as P/s2-ro)
990             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
991             d1.addCallback(lambda res:
992                            self.failUnlessEqual(len(res["manifest"]), 5))
993             d1.addCallback(lambda res: home.start_deep_stats().when_done())
994             def _check_stats(stats):
995                 expected = {"count-immutable-files": 1,
996                             "count-mutable-files": 0,
997                             "count-literal-files": 1,
998                             "count-files": 2,
999                             "count-directories": 3,
1000                             "size-immutable-files": 112,
1001                             "size-literal-files": 23,
1002                             #"size-directories": 616, # varies
1003                             #"largest-directory": 616,
1004                             "largest-directory-children": 3,
1005                             "largest-immutable-file": 112,
1006                             }
1007                 for k,v in expected.iteritems():
1008                     self.failUnlessEqual(stats[k], v,
1009                                          "stats[%s] was %s, not %s" %
1010                                          (k, stats[k], v))
1011                 self.failUnless(stats["size-directories"] > 1300,
1012                                 stats["size-directories"])
1013                 self.failUnless(stats["largest-directory"] > 800,
1014                                 stats["largest-directory"])
1015                 self.failUnlessEqual(stats["size-files-histogram"],
1016                                      [ (11, 31, 1), (101, 316, 1) ])
1017             d1.addCallback(_check_stats)
1018             return d1
1019         d.addCallback(_got_home)
1020         return d
1021
1022     def shouldFail(self, res, expected_failure, which, substring=None):
1023         if isinstance(res, Failure):
1024             res.trap(expected_failure)
1025             if substring:
1026                 self.failUnless(substring in str(res),
1027                                 "substring '%s' not in '%s'"
1028                                 % (substring, str(res)))
1029         else:
1030             self.fail("%s was supposed to raise %s, not get '%s'" %
1031                       (which, expected_failure, res))
1032
1033     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1034         assert substring is None or isinstance(substring, str)
1035         d = defer.maybeDeferred(callable, *args, **kwargs)
1036         def done(res):
1037             if isinstance(res, Failure):
1038                 res.trap(expected_failure)
1039                 if substring:
1040                     self.failUnless(substring in str(res),
1041                                     "substring '%s' not in '%s'"
1042                                     % (substring, str(res)))
1043             else:
1044                 self.fail("%s was supposed to raise %s, not get '%s'" %
1045                           (which, expected_failure, res))
1046         d.addBoth(done)
1047         return d
1048
1049     def PUT(self, urlpath, data):
1050         url = self.webish_url + urlpath
1051         return getPage(url, method="PUT", postdata=data)
1052
1053     def GET(self, urlpath, followRedirect=False):
1054         url = self.webish_url + urlpath
1055         return getPage(url, method="GET", followRedirect=followRedirect)
1056
1057     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1058         sepbase = "boogabooga"
1059         sep = "--" + sepbase
1060         form = []
1061         form.append(sep)
1062         form.append('Content-Disposition: form-data; name="_charset"')
1063         form.append('')
1064         form.append('UTF-8')
1065         form.append(sep)
1066         for name, value in fields.iteritems():
1067             if isinstance(value, tuple):
1068                 filename, value = value
1069                 form.append('Content-Disposition: form-data; name="%s"; '
1070                             'filename="%s"' % (name, filename.encode("utf-8")))
1071             else:
1072                 form.append('Content-Disposition: form-data; name="%s"' % name)
1073             form.append('')
1074             form.append(str(value))
1075             form.append(sep)
1076         form[-1] += "--"
1077         body = ""
1078         headers = {}
1079         if fields:
1080             body = "\r\n".join(form) + "\r\n"
1081             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1082         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1083
1084     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1085               use_helper=False):
1086         if use_helper:
1087             url = self.helper_webish_url + urlpath
1088         else:
1089             url = self.webish_url + urlpath
1090         return getPage(url, method="POST", postdata=body, headers=headers,
1091                        followRedirect=followRedirect)
1092
1093     def _test_web(self, res):
1094         base = self.webish_url
1095         public = "uri/" + self._root_directory_uri
1096         d = getPage(base)
1097         def _got_welcome(page):
1098             # XXX This test is oversensitive to formatting
1099             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1100             self.failUnless(expected in page,
1101                             "I didn't see the right 'connected storage servers'"
1102                             " message in: %s" % page
1103                             )
1104             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1105             self.failUnless(expected in page,
1106                             "I didn't see the right 'My nodeid' message "
1107                             "in: %s" % page)
1108             self.failUnless("Helper: 0 active uploads" in page)
1109         d.addCallback(_got_welcome)
1110         d.addCallback(self.log, "done with _got_welcome")
1111
1112         # get the welcome page from the node that uses the helper too
1113         d.addCallback(lambda res: getPage(self.helper_webish_url))
1114         def _got_welcome_helper(page):
1115             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1116                             page)
1117             self.failUnless("Not running helper" in page)
1118         d.addCallback(_got_welcome_helper)
1119
1120         d.addCallback(lambda res: getPage(base + public))
1121         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1122         def _got_subdir1(page):
1123             # there ought to be an href for our file
1124             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1125             self.failUnless(">mydata567</a>" in page)
1126         d.addCallback(_got_subdir1)
1127         d.addCallback(self.log, "done with _got_subdir1")
1128         d.addCallback(lambda res:
1129                       getPage(base + public + "/subdir1/mydata567"))
1130         def _got_data(page):
1131             self.failUnlessEqual(page, self.data)
1132         d.addCallback(_got_data)
1133
1134         # download from a URI embedded in a URL
1135         d.addCallback(self.log, "_get_from_uri")
1136         def _get_from_uri(res):
1137             return getPage(base + "uri/%s?filename=%s"
1138                            % (self.uri, "mydata567"))
1139         d.addCallback(_get_from_uri)
1140         def _got_from_uri(page):
1141             self.failUnlessEqual(page, self.data)
1142         d.addCallback(_got_from_uri)
1143
1144         # download from a URI embedded in a URL, second form
1145         d.addCallback(self.log, "_get_from_uri2")
1146         def _get_from_uri2(res):
1147             return getPage(base + "uri?uri=%s" % (self.uri,))
1148         d.addCallback(_get_from_uri2)
1149         d.addCallback(_got_from_uri)
1150
1151         # download from a bogus URI, make sure we get a reasonable error
1152         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1153         def _get_from_bogus_uri(res):
1154             d1 = getPage(base + "uri/%s?filename=%s"
1155                          % (self.mangle_uri(self.uri), "mydata567"))
1156             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1157                        "410")
1158             return d1
1159         d.addCallback(_get_from_bogus_uri)
1160         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1161
1162         # upload a file with PUT
1163         d.addCallback(self.log, "about to try PUT")
1164         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1165                                            "new.txt contents"))
1166         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1167         d.addCallback(self.failUnlessEqual, "new.txt contents")
1168         # and again with something large enough to use multiple segments,
1169         # and hopefully trigger pauseProducing too
1170         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1171                                            "big" * 500000)) # 1.5MB
1172         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1173         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1174
1175         # can we replace files in place?
1176         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1177                                            "NEWER contents"))
1178         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1179         d.addCallback(self.failUnlessEqual, "NEWER contents")
1180
1181         # test unlinked POST
1182         d.addCallback(lambda res: self.POST("uri", t="upload",
1183                                             file=("new.txt", "data" * 10000)))
1184         # and again using the helper, which exercises different upload-status
1185         # display code
1186         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1187                                             file=("foo.txt", "data2" * 10000)))
1188
1189         # check that the status page exists
1190         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1191         def _got_status(res):
1192             # find an interesting upload and download to look at. LIT files
1193             # are not interesting.
1194             h = self.clients[0].get_history()
1195             for ds in h.list_all_download_statuses():
1196                 if ds.get_size() > 200:
1197                     self._down_status = ds.get_counter()
1198             for us in h.list_all_upload_statuses():
1199                 if us.get_size() > 200:
1200                     self._up_status = us.get_counter()
1201             rs = list(h.list_all_retrieve_statuses())[0]
1202             self._retrieve_status = rs.get_counter()
1203             ps = list(h.list_all_publish_statuses())[0]
1204             self._publish_status = ps.get_counter()
1205             us = list(h.list_all_mapupdate_statuses())[0]
1206             self._update_status = us.get_counter()
1207
1208             # and that there are some upload- and download- status pages
1209             return self.GET("status/up-%d" % self._up_status)
1210         d.addCallback(_got_status)
1211         def _got_up(res):
1212             return self.GET("status/down-%d" % self._down_status)
1213         d.addCallback(_got_up)
1214         def _got_down(res):
1215             return self.GET("status/mapupdate-%d" % self._update_status)
1216         d.addCallback(_got_down)
1217         def _got_update(res):
1218             return self.GET("status/publish-%d" % self._publish_status)
1219         d.addCallback(_got_update)
1220         def _got_publish(res):
1221             return self.GET("status/retrieve-%d" % self._retrieve_status)
1222         d.addCallback(_got_publish)
1223
1224         # check that the helper status page exists
1225         d.addCallback(lambda res:
1226                       self.GET("helper_status", followRedirect=True))
1227         def _got_helper_status(res):
1228             self.failUnless("Bytes Fetched:" in res)
1229             # touch a couple of files in the helper's working directory to
1230             # exercise more code paths
1231             workdir = os.path.join(self.getdir("client0"), "helper")
1232             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1233             f = open(incfile, "wb")
1234             f.write("small file")
1235             f.close()
1236             then = time.time() - 86400*3
1237             now = time.time()
1238             os.utime(incfile, (now, then))
1239             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1240             f = open(encfile, "wb")
1241             f.write("less small file")
1242             f.close()
1243             os.utime(encfile, (now, then))
1244         d.addCallback(_got_helper_status)
1245         # and that the json form exists
1246         d.addCallback(lambda res:
1247                       self.GET("helper_status?t=json", followRedirect=True))
1248         def _got_helper_status_json(res):
1249             data = simplejson.loads(res)
1250             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1251                                  1)
1252             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1253             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1254             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1255                                  10)
1256             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1257             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1258             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1259                                  15)
1260         d.addCallback(_got_helper_status_json)
1261
1262         # and check that client[3] (which uses a helper but does not run one
1263         # itself) doesn't explode when you ask for its status
1264         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1265         def _got_non_helper_status(res):
1266             self.failUnless("Upload and Download Status" in res)
1267         d.addCallback(_got_non_helper_status)
1268
1269         # or for helper status with t=json
1270         d.addCallback(lambda res:
1271                       getPage(self.helper_webish_url + "helper_status?t=json"))
1272         def _got_non_helper_status_json(res):
1273             data = simplejson.loads(res)
1274             self.failUnlessEqual(data, {})
1275         d.addCallback(_got_non_helper_status_json)
1276
1277         # see if the statistics page exists
1278         d.addCallback(lambda res: self.GET("statistics"))
1279         def _got_stats(res):
1280             self.failUnless("Node Statistics" in res)
1281             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1282         d.addCallback(_got_stats)
1283         d.addCallback(lambda res: self.GET("statistics?t=json"))
1284         def _got_stats_json(res):
1285             data = simplejson.loads(res)
1286             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1287             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1288         d.addCallback(_got_stats_json)
1289
1290         # TODO: mangle the second segment of a file, to test errors that
1291         # occur after we've already sent some good data, which uses a
1292         # different error path.
1293
1294         # TODO: download a URI with a form
1295         # TODO: create a directory by using a form
1296         # TODO: upload by using a form on the directory page
1297         #    url = base + "somedir/subdir1/freeform_post!!upload"
1298         # TODO: delete a file by using a button on the directory page
1299
1300         return d
1301
1302     def _test_runner(self, res):
1303         # exercise some of the diagnostic tools in runner.py
1304
1305         # find a share
1306         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1307             if "storage" not in dirpath:
1308                 continue
1309             if not filenames:
1310                 continue
1311             pieces = dirpath.split(os.sep)
1312             if (len(pieces) >= 4
1313                 and pieces[-4] == "storage"
1314                 and pieces[-3] == "shares"):
1315                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1316                 # are sharefiles here
1317                 filename = os.path.join(dirpath, filenames[0])
1318                 # peek at the magic to see if it is a chk share
1319                 magic = open(filename, "rb").read(4)
1320                 if magic == '\x00\x00\x00\x01':
1321                     break
1322         else:
1323             self.fail("unable to find any uri_extension files in %s"
1324                       % self.basedir)
1325         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1326
1327         out,err = StringIO(), StringIO()
1328         rc = runner.runner(["debug", "dump-share", "--offsets",
1329                             filename],
1330                            stdout=out, stderr=err)
1331         output = out.getvalue()
1332         self.failUnlessEqual(rc, 0)
1333
1334         # we only upload a single file, so we can assert some things about
1335         # its size and shares.
1336         self.failUnless(("share filename: %s" % filename) in output)
1337         self.failUnless("size: %d\n" % len(self.data) in output)
1338         self.failUnless("num_segments: 1\n" in output)
1339         # segment_size is always a multiple of needed_shares
1340         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1341         self.failUnless("total_shares: 10\n" in output)
1342         # keys which are supposed to be present
1343         for key in ("size", "num_segments", "segment_size",
1344                     "needed_shares", "total_shares",
1345                     "codec_name", "codec_params", "tail_codec_params",
1346                     #"plaintext_hash", "plaintext_root_hash",
1347                     "crypttext_hash", "crypttext_root_hash",
1348                     "share_root_hash", "UEB_hash"):
1349             self.failUnless("%s: " % key in output, key)
1350         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1351
1352         # now use its storage index to find the other shares using the
1353         # 'find-shares' tool
1354         sharedir, shnum = os.path.split(filename)
1355         storagedir, storage_index_s = os.path.split(sharedir)
1356         out,err = StringIO(), StringIO()
1357         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1358         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1359         rc = runner.runner(cmd, stdout=out, stderr=err)
1360         self.failUnlessEqual(rc, 0)
1361         out.seek(0)
1362         sharefiles = [sfn.strip() for sfn in out.readlines()]
1363         self.failUnlessEqual(len(sharefiles), 10)
1364
1365         # also exercise the 'catalog-shares' tool
1366         out,err = StringIO(), StringIO()
1367         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1368         cmd = ["debug", "catalog-shares"] + nodedirs
1369         rc = runner.runner(cmd, stdout=out, stderr=err)
1370         self.failUnlessEqual(rc, 0)
1371         out.seek(0)
1372         descriptions = [sfn.strip() for sfn in out.readlines()]
1373         self.failUnlessEqual(len(descriptions), 30)
1374         matching = [line
1375                     for line in descriptions
1376                     if line.startswith("CHK %s " % storage_index_s)]
1377         self.failUnlessEqual(len(matching), 10)
1378
1379     def _test_control(self, res):
1380         # exercise the remote-control-the-client foolscap interfaces in
1381         # allmydata.control (mostly used for performance tests)
1382         c0 = self.clients[0]
1383         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1384         control_furl = open(control_furl_file, "r").read().strip()
1385         # it doesn't really matter which Tub we use to connect to the client,
1386         # so let's just use our IntroducerNode's
1387         d = self.introducer.tub.getReference(control_furl)
1388         d.addCallback(self._test_control2, control_furl_file)
1389         return d
1390     def _test_control2(self, rref, filename):
1391         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1392         downfile = os.path.join(self.basedir, "control.downfile")
1393         d.addCallback(lambda uri:
1394                       rref.callRemote("download_from_uri_to_file",
1395                                       uri, downfile))
1396         def _check(res):
1397             self.failUnlessEqual(res, downfile)
1398             data = open(downfile, "r").read()
1399             expected_data = open(filename, "r").read()
1400             self.failUnlessEqual(data, expected_data)
1401         d.addCallback(_check)
1402         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1403         if sys.platform == "linux2":
1404             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1405         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1406         return d
1407
1408     def _test_cli(self, res):
1409         # run various CLI commands (in a thread, since they use blocking
1410         # network calls)
1411
1412         private_uri = self._private_node.get_uri()
1413         some_uri = self._root_directory_uri
1414         client0_basedir = self.getdir("client0")
1415
1416         nodeargs = [
1417             "--node-directory", client0_basedir,
1418             ]
1419         TESTDATA = "I will not write the same thing over and over.\n" * 100
1420
1421         d = defer.succeed(None)
1422
1423         # for compatibility with earlier versions, private/root_dir.cap is
1424         # supposed to be treated as an alias named "tahoe:". Start by making
1425         # sure that works, before we add other aliases.
1426
1427         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1428         f = open(root_file, "w")
1429         f.write(private_uri)
1430         f.close()
1431
1432         def run(ignored, verb, *args, **kwargs):
1433             stdin = kwargs.get("stdin", "")
1434             newargs = [verb] + nodeargs + list(args)
1435             return self._run_cli(newargs, stdin=stdin)
1436
1437         def _check_ls((out,err), expected_children, unexpected_children=[]):
1438             self.failUnlessEqual(err, "")
1439             for s in expected_children:
1440                 self.failUnless(s in out, (s,out))
1441             for s in unexpected_children:
1442                 self.failIf(s in out, (s,out))
1443
1444         def _check_ls_root((out,err)):
1445             self.failUnless("personal" in out)
1446             self.failUnless("s2-ro" in out)
1447             self.failUnless("s2-rw" in out)
1448             self.failUnlessEqual(err, "")
1449
1450         # this should reference private_uri
1451         d.addCallback(run, "ls")
1452         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1453
1454         d.addCallback(run, "list-aliases")
1455         def _check_aliases_1((out,err)):
1456             self.failUnlessEqual(err, "")
1457             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1458         d.addCallback(_check_aliases_1)
1459
1460         # now that that's out of the way, remove root_dir.cap and work with
1461         # new files
1462         d.addCallback(lambda res: os.unlink(root_file))
1463         d.addCallback(run, "list-aliases")
1464         def _check_aliases_2((out,err)):
1465             self.failUnlessEqual(err, "")
1466             self.failUnlessEqual(out, "")
1467         d.addCallback(_check_aliases_2)
1468
1469         d.addCallback(run, "mkdir")
1470         def _got_dir( (out,err) ):
1471             self.failUnless(uri.from_string_dirnode(out.strip()))
1472             return out.strip()
1473         d.addCallback(_got_dir)
1474         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1475
1476         d.addCallback(run, "list-aliases")
1477         def _check_aliases_3((out,err)):
1478             self.failUnlessEqual(err, "")
1479             self.failUnless("tahoe: " in out)
1480         d.addCallback(_check_aliases_3)
1481
1482         def _check_empty_dir((out,err)):
1483             self.failUnlessEqual(out, "")
1484             self.failUnlessEqual(err, "")
1485         d.addCallback(run, "ls")
1486         d.addCallback(_check_empty_dir)
1487
1488         def _check_missing_dir((out,err)):
1489             # TODO: check that rc==2
1490             self.failUnlessEqual(out, "")
1491             self.failUnlessEqual(err, "No such file or directory\n")
1492         d.addCallback(run, "ls", "bogus")
1493         d.addCallback(_check_missing_dir)
1494
1495         files = []
1496         datas = []
1497         for i in range(10):
1498             fn = os.path.join(self.basedir, "file%d" % i)
1499             files.append(fn)
1500             data = "data to be uploaded: file%d\n" % i
1501             datas.append(data)
1502             open(fn,"wb").write(data)
1503
1504         def _check_stdout_against((out,err), filenum=None, data=None):
1505             self.failUnlessEqual(err, "")
1506             if filenum is not None:
1507                 self.failUnlessEqual(out, datas[filenum])
1508             if data is not None:
1509                 self.failUnlessEqual(out, data)
1510
1511         # test all both forms of put: from a file, and from stdin
1512         #  tahoe put bar FOO
1513         d.addCallback(run, "put", files[0], "tahoe-file0")
1514         def _put_out((out,err)):
1515             self.failUnless("URI:LIT:" in out, out)
1516             self.failUnless("201 Created" in err, err)
1517             uri0 = out.strip()
1518             return run(None, "get", uri0)
1519         d.addCallback(_put_out)
1520         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1521
1522         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1523         #  tahoe put bar tahoe:FOO
1524         d.addCallback(run, "put", files[2], "tahoe:file2")
1525         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1526         def _check_put_mutable((out,err)):
1527             self._mutable_file3_uri = out.strip()
1528         d.addCallback(_check_put_mutable)
1529         d.addCallback(run, "get", "tahoe:file3")
1530         d.addCallback(_check_stdout_against, 3)
1531
1532         #  tahoe put FOO
1533         STDIN_DATA = "This is the file to upload from stdin."
1534         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1535         #  tahoe put tahoe:FOO
1536         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1537                       stdin="Other file from stdin.")
1538
1539         d.addCallback(run, "ls")
1540         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1541                                   "tahoe-file-stdin", "from-stdin"])
1542         d.addCallback(run, "ls", "subdir")
1543         d.addCallback(_check_ls, ["tahoe-file1"])
1544
1545         # tahoe mkdir FOO
1546         d.addCallback(run, "mkdir", "subdir2")
1547         d.addCallback(run, "ls")
1548         # TODO: extract the URI, set an alias with it
1549         d.addCallback(_check_ls, ["subdir2"])
1550
1551         # tahoe get: (to stdin and to a file)
1552         d.addCallback(run, "get", "tahoe-file0")
1553         d.addCallback(_check_stdout_against, 0)
1554         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1555         d.addCallback(_check_stdout_against, 1)
1556         outfile0 = os.path.join(self.basedir, "outfile0")
1557         d.addCallback(run, "get", "file2", outfile0)
1558         def _check_outfile0((out,err)):
1559             data = open(outfile0,"rb").read()
1560             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1561         d.addCallback(_check_outfile0)
1562         outfile1 = os.path.join(self.basedir, "outfile0")
1563         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1564         def _check_outfile1((out,err)):
1565             data = open(outfile1,"rb").read()
1566             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1567         d.addCallback(_check_outfile1)
1568
1569         d.addCallback(run, "rm", "tahoe-file0")
1570         d.addCallback(run, "rm", "tahoe:file2")
1571         d.addCallback(run, "ls")
1572         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1573
1574         d.addCallback(run, "ls", "-l")
1575         def _check_ls_l((out,err)):
1576             lines = out.split("\n")
1577             for l in lines:
1578                 if "tahoe-file-stdin" in l:
1579                     self.failUnless(l.startswith("-r-- "), l)
1580                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1581                 if "file3" in l:
1582                     self.failUnless(l.startswith("-rw- "), l) # mutable
1583         d.addCallback(_check_ls_l)
1584
1585         d.addCallback(run, "ls", "--uri")
1586         def _check_ls_uri((out,err)):
1587             lines = out.split("\n")
1588             for l in lines:
1589                 if "file3" in l:
1590                     self.failUnless(self._mutable_file3_uri in l)
1591         d.addCallback(_check_ls_uri)
1592
1593         d.addCallback(run, "ls", "--readonly-uri")
1594         def _check_ls_rouri((out,err)):
1595             lines = out.split("\n")
1596             for l in lines:
1597                 if "file3" in l:
1598                     rw_uri = self._mutable_file3_uri
1599                     u = uri.from_string_mutable_filenode(rw_uri)
1600                     ro_uri = u.get_readonly().to_string()
1601                     self.failUnless(ro_uri in l)
1602         d.addCallback(_check_ls_rouri)
1603
1604
1605         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1606         d.addCallback(run, "ls")
1607         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1608
1609         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1610         d.addCallback(run, "ls")
1611         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1612
1613         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1614         d.addCallback(run, "ls")
1615         d.addCallback(_check_ls, ["file3", "file3-copy"])
1616         d.addCallback(run, "get", "tahoe:file3-copy")
1617         d.addCallback(_check_stdout_against, 3)
1618
1619         # copy from disk into tahoe
1620         d.addCallback(run, "cp", files[4], "tahoe:file4")
1621         d.addCallback(run, "ls")
1622         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1623         d.addCallback(run, "get", "tahoe:file4")
1624         d.addCallback(_check_stdout_against, 4)
1625
1626         # copy from tahoe into disk
1627         target_filename = os.path.join(self.basedir, "file-out")
1628         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1629         def _check_cp_out((out,err)):
1630             self.failUnless(os.path.exists(target_filename))
1631             got = open(target_filename,"rb").read()
1632             self.failUnlessEqual(got, datas[4])
1633         d.addCallback(_check_cp_out)
1634
1635         # copy from disk to disk (silly case)
1636         target2_filename = os.path.join(self.basedir, "file-out-copy")
1637         d.addCallback(run, "cp", target_filename, target2_filename)
1638         def _check_cp_out2((out,err)):
1639             self.failUnless(os.path.exists(target2_filename))
1640             got = open(target2_filename,"rb").read()
1641             self.failUnlessEqual(got, datas[4])
1642         d.addCallback(_check_cp_out2)
1643
1644         # copy from tahoe into disk, overwriting an existing file
1645         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1646         def _check_cp_out3((out,err)):
1647             self.failUnless(os.path.exists(target_filename))
1648             got = open(target_filename,"rb").read()
1649             self.failUnlessEqual(got, datas[3])
1650         d.addCallback(_check_cp_out3)
1651
1652         # copy from disk into tahoe, overwriting an existing immutable file
1653         d.addCallback(run, "cp", files[5], "tahoe:file4")
1654         d.addCallback(run, "ls")
1655         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1656         d.addCallback(run, "get", "tahoe:file4")
1657         d.addCallback(_check_stdout_against, 5)
1658
1659         # copy from disk into tahoe, overwriting an existing mutable file
1660         d.addCallback(run, "cp", files[5], "tahoe:file3")
1661         d.addCallback(run, "ls")
1662         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1663         d.addCallback(run, "get", "tahoe:file3")
1664         d.addCallback(_check_stdout_against, 5)
1665
1666         # recursive copy: setup
1667         dn = os.path.join(self.basedir, "dir1")
1668         os.makedirs(dn)
1669         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1670         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1671         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1672         sdn2 = os.path.join(dn, "subdir2")
1673         os.makedirs(sdn2)
1674         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1675         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1676
1677         # from disk into tahoe
1678         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1679         d.addCallback(run, "ls")
1680         d.addCallback(_check_ls, ["dir1"])
1681         d.addCallback(run, "ls", "dir1")
1682         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1683                       ["rfile4", "rfile5"])
1684         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1685         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1686                       ["rfile1", "rfile2", "rfile3"])
1687         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1688         d.addCallback(_check_stdout_against, data="rfile4")
1689
1690         # and back out again
1691         dn_copy = os.path.join(self.basedir, "dir1-copy")
1692         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1693         def _check_cp_r_out((out,err)):
1694             def _cmp(name):
1695                 old = open(os.path.join(dn, name), "rb").read()
1696                 newfn = os.path.join(dn_copy, name)
1697                 self.failUnless(os.path.exists(newfn))
1698                 new = open(newfn, "rb").read()
1699                 self.failUnlessEqual(old, new)
1700             _cmp("rfile1")
1701             _cmp("rfile2")
1702             _cmp("rfile3")
1703             _cmp(os.path.join("subdir2", "rfile4"))
1704             _cmp(os.path.join("subdir2", "rfile5"))
1705         d.addCallback(_check_cp_r_out)
1706
1707         # and copy it a second time, which ought to overwrite the same files
1708         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1709
1710         # and again, only writing filecaps
1711         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1712         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1713         def _check_capsonly((out,err)):
1714             # these should all be LITs
1715             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1716             y = uri.from_string_filenode(x)
1717             self.failUnlessEqual(y.data, "rfile4")
1718         d.addCallback(_check_capsonly)
1719
1720         # and tahoe-to-tahoe
1721         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1722         d.addCallback(run, "ls")
1723         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1724         d.addCallback(run, "ls", "dir1-copy")
1725         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1726                       ["rfile4", "rfile5"])
1727         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1728         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1729                       ["rfile1", "rfile2", "rfile3"])
1730         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1731         d.addCallback(_check_stdout_against, data="rfile4")
1732
1733         # and copy it a second time, which ought to overwrite the same files
1734         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1735
1736         # tahoe_ls doesn't currently handle the error correctly: it tries to
1737         # JSON-parse a traceback.
1738 ##         def _ls_missing(res):
1739 ##             argv = ["ls"] + nodeargs + ["bogus"]
1740 ##             return self._run_cli(argv)
1741 ##         d.addCallback(_ls_missing)
1742 ##         def _check_ls_missing((out,err)):
1743 ##             print "OUT", out
1744 ##             print "ERR", err
1745 ##             self.failUnlessEqual(err, "")
1746 ##         d.addCallback(_check_ls_missing)
1747
1748         return d
1749
1750     def _run_cli(self, argv, stdin=""):
1751         #print "CLI:", argv
1752         stdout, stderr = StringIO(), StringIO()
1753         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1754                                   stdin=StringIO(stdin),
1755                                   stdout=stdout, stderr=stderr)
1756         def _done(res):
1757             return stdout.getvalue(), stderr.getvalue()
1758         d.addCallback(_done)
1759         return d
1760
1761     def _test_checker(self, res):
1762         ut = upload.Data("too big to be literal" * 200, convergence=None)
1763         d = self._personal_node.add_file(u"big file", ut)
1764
1765         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1766         def _check_dirnode_results(r):
1767             self.failUnless(r.is_healthy())
1768         d.addCallback(_check_dirnode_results)
1769         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1770         d.addCallback(_check_dirnode_results)
1771
1772         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1773         def _got_chk_filenode(n):
1774             self.failUnless(isinstance(n, ImmutableFileNode))
1775             d = n.check(Monitor())
1776             def _check_filenode_results(r):
1777                 self.failUnless(r.is_healthy())
1778             d.addCallback(_check_filenode_results)
1779             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1780             d.addCallback(_check_filenode_results)
1781             return d
1782         d.addCallback(_got_chk_filenode)
1783
1784         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1785         def _got_lit_filenode(n):
1786             self.failUnless(isinstance(n, LiteralFileNode))
1787             d = n.check(Monitor())
1788             def _check_lit_filenode_results(r):
1789                 self.failUnlessEqual(r, None)
1790             d.addCallback(_check_lit_filenode_results)
1791             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1792             d.addCallback(_check_lit_filenode_results)
1793             return d
1794         d.addCallback(_got_lit_filenode)
1795         return d
1796