]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri
12 from allmydata.storage.mutable import MutableShareFile
13 from allmydata.storage.server import si_a2b
14 from allmydata.immutable import download, filenode, offloaded, upload
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.scripts import runner
18 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
19      NoSuchChildError, NoSharesError
20 from allmydata.monitor import Monitor
21 from allmydata.mutable.common import NotMutableError
22 from allmydata.mutable import layout as mutable_layout
23 from foolscap.api import DeadReferenceError
24 from twisted.python.failure import Failure
25 from twisted.web.client import getPage
26 from twisted.web.error import Error
27
28 from allmydata.test.common import SystemTestMixin, MemoryConsumer, \
29      download_to_data
30
31 LARGE_DATA = """
32 This is some data to publish to the virtual drive, which needs to be large
33 enough to not fit inside a LIT uri.
34 """
35
36 class CountingDataUploadable(upload.Data):
37     bytes_read = 0
38     interrupt_after = None
39     interrupt_after_d = None
40
41     def read(self, length):
42         self.bytes_read += length
43         if self.interrupt_after is not None:
44             if self.bytes_read > self.interrupt_after:
45                 self.interrupt_after = None
46                 self.interrupt_after_d.callback(self)
47         return upload.Data.read(self, length)
48
49 class GrabEverythingConsumer:
50     implements(IConsumer)
51
52     def __init__(self):
53         self.contents = ""
54
55     def registerProducer(self, producer, streaming):
56         assert streaming
57         assert IPushProducer.providedBy(producer)
58
59     def write(self, data):
60         self.contents += data
61
62     def unregisterProducer(self):
63         pass
64
65 class SystemTest(SystemTestMixin, unittest.TestCase):
66     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
67
68     def test_connections(self):
69         self.basedir = "system/SystemTest/test_connections"
70         d = self.set_up_nodes()
71         self.extra_node = None
72         d.addCallback(lambda res: self.add_extra_node(self.numclients))
73         def _check(extra_node):
74             self.extra_node = extra_node
75             for c in self.clients:
76                 all_peerids = c.get_storage_broker().get_all_serverids()
77                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
78                 sb = c.storage_broker
79                 permuted_peers = sb.get_servers_for_index("a")
80                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
81
82         d.addCallback(_check)
83         def _shutdown_extra_node(res):
84             if self.extra_node:
85                 return self.extra_node.stopService()
86             return res
87         d.addBoth(_shutdown_extra_node)
88         return d
89     # test_connections is subsumed by test_upload_and_download, and takes
90     # quite a while to run on a slow machine (because of all the TLS
91     # connections that must be established). If we ever rework the introducer
92     # code to such an extent that we're not sure if it works anymore, we can
93     # reinstate this test until it does.
94     del test_connections
95
96     def test_upload_and_download_random_key(self):
97         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
98         return self._test_upload_and_download(convergence=None)
99
100     def test_upload_and_download_convergent(self):
101         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
102         return self._test_upload_and_download(convergence="some convergence string")
103
104     def _test_upload_and_download(self, convergence):
105         # we use 4000 bytes of data, which will result in about 400k written
106         # to disk among all our simulated nodes
107         DATA = "Some data to upload\n" * 200
108         d = self.set_up_nodes()
109         def _check_connections(res):
110             for c in self.clients:
111                 all_peerids = c.get_storage_broker().get_all_serverids()
112                 self.failUnlessEqual(len(all_peerids), self.numclients)
113                 sb = c.storage_broker
114                 permuted_peers = sb.get_servers_for_index("a")
115                 self.failUnlessEqual(len(permuted_peers), self.numclients)
116         d.addCallback(_check_connections)
117
118         def _do_upload(res):
119             log.msg("UPLOADING")
120             u = self.clients[0].getServiceNamed("uploader")
121             self.uploader = u
122             # we crank the max segsize down to 1024b for the duration of this
123             # test, so we can exercise multiple segments. It is important
124             # that this is not a multiple of the segment size, so that the
125             # tail segment is not the same length as the others. This actualy
126             # gets rounded up to 1025 to be a multiple of the number of
127             # required shares (since we use 25 out of 100 FEC).
128             up = upload.Data(DATA, convergence=convergence)
129             up.max_segment_size = 1024
130             d1 = u.upload(up)
131             return d1
132         d.addCallback(_do_upload)
133         def _upload_done(results):
134             theuri = results.uri
135             log.msg("upload finished: uri is %s" % (theuri,))
136             self.uri = theuri
137             assert isinstance(self.uri, str), self.uri
138             dl = self.clients[1].getServiceNamed("downloader")
139             self.downloader = dl
140         d.addCallback(_upload_done)
141
142         def _upload_again(res):
143             # Upload again. If using convergent encryption then this ought to be
144             # short-circuited, however with the way we currently generate URIs
145             # (i.e. because they include the roothash), we have to do all of the
146             # encoding work, and only get to save on the upload part.
147             log.msg("UPLOADING AGAIN")
148             up = upload.Data(DATA, convergence=convergence)
149             up.max_segment_size = 1024
150             d1 = self.uploader.upload(up)
151         d.addCallback(_upload_again)
152
153         def _download_to_data(res):
154             log.msg("DOWNLOADING")
155             return self.downloader.download_to_data(self.uri)
156         d.addCallback(_download_to_data)
157         def _download_to_data_done(data):
158             log.msg("download finished")
159             self.failUnlessEqual(data, DATA)
160         d.addCallback(_download_to_data_done)
161
162         target_filename = os.path.join(self.basedir, "download.target")
163         def _download_to_filename(res):
164             return self.downloader.download_to_filename(self.uri,
165                                                         target_filename)
166         d.addCallback(_download_to_filename)
167         def _download_to_filename_done(res):
168             newdata = open(target_filename, "rb").read()
169             self.failUnlessEqual(newdata, DATA)
170         d.addCallback(_download_to_filename_done)
171
172         target_filename2 = os.path.join(self.basedir, "download.target2")
173         def _download_to_filehandle(res):
174             fh = open(target_filename2, "wb")
175             return self.downloader.download_to_filehandle(self.uri, fh)
176         d.addCallback(_download_to_filehandle)
177         def _download_to_filehandle_done(fh):
178             fh.close()
179             newdata = open(target_filename2, "rb").read()
180             self.failUnlessEqual(newdata, DATA)
181         d.addCallback(_download_to_filehandle_done)
182
183         consumer = GrabEverythingConsumer()
184         ct = download.ConsumerAdapter(consumer)
185         d.addCallback(lambda res:
186                       self.downloader.download(self.uri, ct))
187         def _download_to_consumer_done(ign):
188             self.failUnlessEqual(consumer.contents, DATA)
189         d.addCallback(_download_to_consumer_done)
190
191         def _test_read(res):
192             n = self.clients[1].create_node_from_uri(self.uri)
193             d = download_to_data(n)
194             def _read_done(data):
195                 self.failUnlessEqual(data, DATA)
196             d.addCallback(_read_done)
197             d.addCallback(lambda ign:
198                           n.read(MemoryConsumer(), offset=1, size=4))
199             def _read_portion_done(mc):
200                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
201             d.addCallback(_read_portion_done)
202             d.addCallback(lambda ign:
203                           n.read(MemoryConsumer(), offset=2, size=None))
204             def _read_tail_done(mc):
205                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
206             d.addCallback(_read_tail_done)
207             d.addCallback(lambda ign:
208                           n.read(MemoryConsumer(), size=len(DATA)+1000))
209             def _read_too_much(mc):
210                 self.failUnlessEqual("".join(mc.chunks), DATA)
211             d.addCallback(_read_too_much)
212
213             return d
214         d.addCallback(_test_read)
215
216         def _test_bad_read(res):
217             bad_u = uri.from_string_filenode(self.uri)
218             bad_u.key = self.flip_bit(bad_u.key)
219             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
220             # this should cause an error during download
221
222             d = self.shouldFail2(NoSharesError, "'download bad node'",
223                                  None,
224                                  bad_n.read, MemoryConsumer(), offset=2)
225             return d
226         d.addCallback(_test_bad_read)
227
228         def _download_nonexistent_uri(res):
229             baduri = self.mangle_uri(self.uri)
230             log.msg("about to download non-existent URI", level=log.UNUSUAL,
231                     facility="tahoe.tests")
232             d1 = self.downloader.download_to_data(baduri)
233             def _baduri_should_fail(res):
234                 log.msg("finished downloading non-existend URI",
235                         level=log.UNUSUAL, facility="tahoe.tests")
236                 self.failUnless(isinstance(res, Failure))
237                 self.failUnless(res.check(NoSharesError),
238                                 "expected NoSharesError, got %s" % res)
239             d1.addBoth(_baduri_should_fail)
240             return d1
241         d.addCallback(_download_nonexistent_uri)
242
243         # add a new node, which doesn't accept shares, and only uses the
244         # helper for upload.
245         d.addCallback(lambda res: self.add_extra_node(self.numclients,
246                                                       self.helper_furl,
247                                                       add_to_sparent=True))
248         def _added(extra_node):
249             self.extra_node = extra_node
250         d.addCallback(_added)
251
252         HELPER_DATA = "Data that needs help to upload" * 1000
253         def _upload_with_helper(res):
254             u = upload.Data(HELPER_DATA, convergence=convergence)
255             d = self.extra_node.upload(u)
256             def _uploaded(results):
257                 uri = results.uri
258                 return self.downloader.download_to_data(uri)
259             d.addCallback(_uploaded)
260             def _check(newdata):
261                 self.failUnlessEqual(newdata, HELPER_DATA)
262             d.addCallback(_check)
263             return d
264         d.addCallback(_upload_with_helper)
265
266         def _upload_duplicate_with_helper(res):
267             u = upload.Data(HELPER_DATA, convergence=convergence)
268             u.debug_stash_RemoteEncryptedUploadable = True
269             d = self.extra_node.upload(u)
270             def _uploaded(results):
271                 uri = results.uri
272                 return self.downloader.download_to_data(uri)
273             d.addCallback(_uploaded)
274             def _check(newdata):
275                 self.failUnlessEqual(newdata, HELPER_DATA)
276                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
277                             "uploadable started uploading, should have been avoided")
278             d.addCallback(_check)
279             return d
280         if convergence is not None:
281             d.addCallback(_upload_duplicate_with_helper)
282
283         def _upload_resumable(res):
284             DATA = "Data that needs help to upload and gets interrupted" * 1000
285             u1 = CountingDataUploadable(DATA, convergence=convergence)
286             u2 = CountingDataUploadable(DATA, convergence=convergence)
287
288             # we interrupt the connection after about 5kB by shutting down
289             # the helper, then restartingit.
290             u1.interrupt_after = 5000
291             u1.interrupt_after_d = defer.Deferred()
292             u1.interrupt_after_d.addCallback(lambda res:
293                                              self.bounce_client(0))
294
295             # sneak into the helper and reduce its chunk size, so that our
296             # debug_interrupt will sever the connection on about the fifth
297             # chunk fetched. This makes sure that we've started to write the
298             # new shares before we abandon them, which exercises the
299             # abort/delete-partial-share code. TODO: find a cleaner way to do
300             # this. I know that this will affect later uses of the helper in
301             # this same test run, but I'm not currently worried about it.
302             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
303
304             d = self.extra_node.upload(u1)
305
306             def _should_not_finish(res):
307                 self.fail("interrupted upload should have failed, not finished"
308                           " with result %s" % (res,))
309             def _interrupted(f):
310                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
311
312                 # make sure we actually interrupted it before finishing the
313                 # file
314                 self.failUnless(u1.bytes_read < len(DATA),
315                                 "read %d out of %d total" % (u1.bytes_read,
316                                                              len(DATA)))
317
318                 log.msg("waiting for reconnect", level=log.NOISY,
319                         facility="tahoe.test.test_system")
320                 # now, we need to give the nodes a chance to notice that this
321                 # connection has gone away. When this happens, the storage
322                 # servers will be told to abort their uploads, removing the
323                 # partial shares. Unfortunately this involves TCP messages
324                 # going through the loopback interface, and we can't easily
325                 # predict how long that will take. If it were all local, we
326                 # could use fireEventually() to stall. Since we don't have
327                 # the right introduction hooks, the best we can do is use a
328                 # fixed delay. TODO: this is fragile.
329                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
330                 return u1.interrupt_after_d
331             d.addCallbacks(_should_not_finish, _interrupted)
332
333             def _disconnected(res):
334                 # check to make sure the storage servers aren't still hanging
335                 # on to the partial share: their incoming/ directories should
336                 # now be empty.
337                 log.msg("disconnected", level=log.NOISY,
338                         facility="tahoe.test.test_system")
339                 for i in range(self.numclients):
340                     incdir = os.path.join(self.getdir("client%d" % i),
341                                           "storage", "shares", "incoming")
342                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
343             d.addCallback(_disconnected)
344
345             # then we need to give the reconnector a chance to
346             # reestablish the connection to the helper.
347             d.addCallback(lambda res:
348                           log.msg("wait_for_connections", level=log.NOISY,
349                                   facility="tahoe.test.test_system"))
350             d.addCallback(lambda res: self.wait_for_connections())
351
352
353             d.addCallback(lambda res:
354                           log.msg("uploading again", level=log.NOISY,
355                                   facility="tahoe.test.test_system"))
356             d.addCallback(lambda res: self.extra_node.upload(u2))
357
358             def _uploaded(results):
359                 uri = results.uri
360                 log.msg("Second upload complete", level=log.NOISY,
361                         facility="tahoe.test.test_system")
362
363                 # this is really bytes received rather than sent, but it's
364                 # convenient and basically measures the same thing
365                 bytes_sent = results.ciphertext_fetched
366
367                 # We currently don't support resumption of upload if the data is
368                 # encrypted with a random key.  (Because that would require us
369                 # to store the key locally and re-use it on the next upload of
370                 # this file, which isn't a bad thing to do, but we currently
371                 # don't do it.)
372                 if convergence is not None:
373                     # Make sure we did not have to read the whole file the
374                     # second time around .
375                     self.failUnless(bytes_sent < len(DATA),
376                                 "resumption didn't save us any work:"
377                                 " read %d bytes out of %d total" %
378                                 (bytes_sent, len(DATA)))
379                 else:
380                     # Make sure we did have to read the whole file the second
381                     # time around -- because the one that we partially uploaded
382                     # earlier was encrypted with a different random key.
383                     self.failIf(bytes_sent < len(DATA),
384                                 "resumption saved us some work even though we were using random keys:"
385                                 " read %d bytes out of %d total" %
386                                 (bytes_sent, len(DATA)))
387                 return self.downloader.download_to_data(uri)
388             d.addCallback(_uploaded)
389
390             def _check(newdata):
391                 self.failUnlessEqual(newdata, DATA)
392                 # If using convergent encryption, then also check that the
393                 # helper has removed the temp file from its directories.
394                 if convergence is not None:
395                     basedir = os.path.join(self.getdir("client0"), "helper")
396                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
397                     self.failUnlessEqual(files, [])
398                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
399                     self.failUnlessEqual(files, [])
400             d.addCallback(_check)
401             return d
402         d.addCallback(_upload_resumable)
403
404         def _grab_stats(ignored):
405             # the StatsProvider doesn't normally publish a FURL:
406             # instead it passes a live reference to the StatsGatherer
407             # (if and when it connects). To exercise the remote stats
408             # interface, we manually publish client0's StatsProvider
409             # and use client1 to query it.
410             sp = self.clients[0].stats_provider
411             sp_furl = self.clients[0].tub.registerReference(sp)
412             d = self.clients[1].tub.getReference(sp_furl)
413             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
414             def _got_stats(stats):
415                 #print "STATS"
416                 #from pprint import pprint
417                 #pprint(stats)
418                 s = stats["stats"]
419                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
420                 c = stats["counters"]
421                 self.failUnless("storage_server.allocate" in c)
422             d.addCallback(_got_stats)
423             return d
424         d.addCallback(_grab_stats)
425
426         return d
427
428     def _find_shares(self, basedir):
429         shares = []
430         for (dirpath, dirnames, filenames) in os.walk(basedir):
431             if "storage" not in dirpath:
432                 continue
433             if not filenames:
434                 continue
435             pieces = dirpath.split(os.sep)
436             if (len(pieces) >= 5
437                 and pieces[-4] == "storage"
438                 and pieces[-3] == "shares"):
439                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
440                 # are sharefiles here
441                 assert pieces[-5].startswith("client")
442                 client_num = int(pieces[-5][-1])
443                 storage_index_s = pieces[-1]
444                 storage_index = si_a2b(storage_index_s)
445                 for sharename in filenames:
446                     shnum = int(sharename)
447                     filename = os.path.join(dirpath, sharename)
448                     data = (client_num, storage_index, filename, shnum)
449                     shares.append(data)
450         if not shares:
451             self.fail("unable to find any share files in %s" % basedir)
452         return shares
453
454     def _corrupt_mutable_share(self, filename, which):
455         msf = MutableShareFile(filename)
456         datav = msf.readv([ (0, 1000000) ])
457         final_share = datav[0]
458         assert len(final_share) < 1000000 # ought to be truncated
459         pieces = mutable_layout.unpack_share(final_share)
460         (seqnum, root_hash, IV, k, N, segsize, datalen,
461          verification_key, signature, share_hash_chain, block_hash_tree,
462          share_data, enc_privkey) = pieces
463
464         if which == "seqnum":
465             seqnum = seqnum + 15
466         elif which == "R":
467             root_hash = self.flip_bit(root_hash)
468         elif which == "IV":
469             IV = self.flip_bit(IV)
470         elif which == "segsize":
471             segsize = segsize + 15
472         elif which == "pubkey":
473             verification_key = self.flip_bit(verification_key)
474         elif which == "signature":
475             signature = self.flip_bit(signature)
476         elif which == "share_hash_chain":
477             nodenum = share_hash_chain.keys()[0]
478             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
479         elif which == "block_hash_tree":
480             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
481         elif which == "share_data":
482             share_data = self.flip_bit(share_data)
483         elif which == "encprivkey":
484             enc_privkey = self.flip_bit(enc_privkey)
485
486         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
487                                             segsize, datalen)
488         final_share = mutable_layout.pack_share(prefix,
489                                                 verification_key,
490                                                 signature,
491                                                 share_hash_chain,
492                                                 block_hash_tree,
493                                                 share_data,
494                                                 enc_privkey)
495         msf.writev( [(0, final_share)], None)
496
497
498     def test_mutable(self):
499         self.basedir = "system/SystemTest/test_mutable"
500         DATA = "initial contents go here."  # 25 bytes % 3 != 0
501         NEWDATA = "new contents yay"
502         NEWERDATA = "this is getting old"
503
504         d = self.set_up_nodes(use_key_generator=True)
505
506         def _create_mutable(res):
507             c = self.clients[0]
508             log.msg("starting create_mutable_file")
509             d1 = c.create_mutable_file(DATA)
510             def _done(res):
511                 log.msg("DONE: %s" % (res,))
512                 self._mutable_node_1 = res
513                 uri = res.get_uri()
514             d1.addCallback(_done)
515             return d1
516         d.addCallback(_create_mutable)
517
518         def _test_debug(res):
519             # find a share. It is important to run this while there is only
520             # one slot in the grid.
521             shares = self._find_shares(self.basedir)
522             (client_num, storage_index, filename, shnum) = shares[0]
523             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
524                     % filename)
525             log.msg(" for clients[%d]" % client_num)
526
527             out,err = StringIO(), StringIO()
528             rc = runner.runner(["debug", "dump-share", "--offsets",
529                                 filename],
530                                stdout=out, stderr=err)
531             output = out.getvalue()
532             self.failUnlessEqual(rc, 0)
533             try:
534                 self.failUnless("Mutable slot found:\n" in output)
535                 self.failUnless("share_type: SDMF\n" in output)
536                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
537                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
538                 self.failUnless(" num_extra_leases: 0\n" in output)
539                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
540                                 in output)
541                 self.failUnless(" SDMF contents:\n" in output)
542                 self.failUnless("  seqnum: 1\n" in output)
543                 self.failUnless("  required_shares: 3\n" in output)
544                 self.failUnless("  total_shares: 10\n" in output)
545                 self.failUnless("  segsize: 27\n" in output, (output, filename))
546                 self.failUnless("  datalen: 25\n" in output)
547                 # the exact share_hash_chain nodes depends upon the sharenum,
548                 # and is more of a hassle to compute than I want to deal with
549                 # now
550                 self.failUnless("  share_hash_chain: " in output)
551                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
552                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
553                             base32.b2a(storage_index))
554                 self.failUnless(expected in output)
555             except unittest.FailTest:
556                 print
557                 print "dump-share output was:"
558                 print output
559                 raise
560         d.addCallback(_test_debug)
561
562         # test retrieval
563
564         # first, let's see if we can use the existing node to retrieve the
565         # contents. This allows it to use the cached pubkey and maybe the
566         # latest-known sharemap.
567
568         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
569         def _check_download_1(res):
570             self.failUnlessEqual(res, DATA)
571             # now we see if we can retrieve the data from a new node,
572             # constructed using the URI of the original one. We do this test
573             # on the same client that uploaded the data.
574             uri = self._mutable_node_1.get_uri()
575             log.msg("starting retrieve1")
576             newnode = self.clients[0].create_node_from_uri(uri)
577             newnode_2 = self.clients[0].create_node_from_uri(uri)
578             self.failUnlessIdentical(newnode, newnode_2)
579             return newnode.download_best_version()
580         d.addCallback(_check_download_1)
581
582         def _check_download_2(res):
583             self.failUnlessEqual(res, DATA)
584             # same thing, but with a different client
585             uri = self._mutable_node_1.get_uri()
586             newnode = self.clients[1].create_node_from_uri(uri)
587             log.msg("starting retrieve2")
588             d1 = newnode.download_best_version()
589             d1.addCallback(lambda res: (res, newnode))
590             return d1
591         d.addCallback(_check_download_2)
592
593         def _check_download_3((res, newnode)):
594             self.failUnlessEqual(res, DATA)
595             # replace the data
596             log.msg("starting replace1")
597             d1 = newnode.overwrite(NEWDATA)
598             d1.addCallback(lambda res: newnode.download_best_version())
599             return d1
600         d.addCallback(_check_download_3)
601
602         def _check_download_4(res):
603             self.failUnlessEqual(res, NEWDATA)
604             # now create an even newer node and replace the data on it. This
605             # new node has never been used for download before.
606             uri = self._mutable_node_1.get_uri()
607             newnode1 = self.clients[2].create_node_from_uri(uri)
608             newnode2 = self.clients[3].create_node_from_uri(uri)
609             self._newnode3 = self.clients[3].create_node_from_uri(uri)
610             log.msg("starting replace2")
611             d1 = newnode1.overwrite(NEWERDATA)
612             d1.addCallback(lambda res: newnode2.download_best_version())
613             return d1
614         d.addCallback(_check_download_4)
615
616         def _check_download_5(res):
617             log.msg("finished replace2")
618             self.failUnlessEqual(res, NEWERDATA)
619         d.addCallback(_check_download_5)
620
621         def _corrupt_shares(res):
622             # run around and flip bits in all but k of the shares, to test
623             # the hash checks
624             shares = self._find_shares(self.basedir)
625             ## sort by share number
626             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
627             where = dict([ (shnum, filename)
628                            for (client_num, storage_index, filename, shnum)
629                            in shares ])
630             assert len(where) == 10 # this test is designed for 3-of-10
631             for shnum, filename in where.items():
632                 # shares 7,8,9 are left alone. read will check
633                 # (share_hash_chain, block_hash_tree, share_data). New
634                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
635                 # segsize, signature).
636                 if shnum == 0:
637                     # read: this will trigger "pubkey doesn't match
638                     # fingerprint".
639                     self._corrupt_mutable_share(filename, "pubkey")
640                     self._corrupt_mutable_share(filename, "encprivkey")
641                 elif shnum == 1:
642                     # triggers "signature is invalid"
643                     self._corrupt_mutable_share(filename, "seqnum")
644                 elif shnum == 2:
645                     # triggers "signature is invalid"
646                     self._corrupt_mutable_share(filename, "R")
647                 elif shnum == 3:
648                     # triggers "signature is invalid"
649                     self._corrupt_mutable_share(filename, "segsize")
650                 elif shnum == 4:
651                     self._corrupt_mutable_share(filename, "share_hash_chain")
652                 elif shnum == 5:
653                     self._corrupt_mutable_share(filename, "block_hash_tree")
654                 elif shnum == 6:
655                     self._corrupt_mutable_share(filename, "share_data")
656                 # other things to correct: IV, signature
657                 # 7,8,9 are left alone
658
659                 # note that initial_query_count=5 means that we'll hit the
660                 # first 5 servers in effectively random order (based upon
661                 # response time), so we won't necessarily ever get a "pubkey
662                 # doesn't match fingerprint" error (if we hit shnum>=1 before
663                 # shnum=0, we pull the pubkey from there). To get repeatable
664                 # specific failures, we need to set initial_query_count=1,
665                 # but of course that will change the sequencing behavior of
666                 # the retrieval process. TODO: find a reasonable way to make
667                 # this a parameter, probably when we expand this test to test
668                 # for one failure mode at a time.
669
670                 # when we retrieve this, we should get three signature
671                 # failures (where we've mangled seqnum, R, and segsize). The
672                 # pubkey mangling
673         d.addCallback(_corrupt_shares)
674
675         d.addCallback(lambda res: self._newnode3.download_best_version())
676         d.addCallback(_check_download_5)
677
678         def _check_empty_file(res):
679             # make sure we can create empty files, this usually screws up the
680             # segsize math
681             d1 = self.clients[2].create_mutable_file("")
682             d1.addCallback(lambda newnode: newnode.download_best_version())
683             d1.addCallback(lambda res: self.failUnlessEqual("", res))
684             return d1
685         d.addCallback(_check_empty_file)
686
687         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
688         def _created_dirnode(dnode):
689             log.msg("_created_dirnode(%s)" % (dnode,))
690             d1 = dnode.list()
691             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
692             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
693             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
694             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
695             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
696             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
697             d1.addCallback(lambda res: dnode.build_manifest().when_done())
698             d1.addCallback(lambda res:
699                            self.failUnlessEqual(len(res["manifest"]), 1))
700             return d1
701         d.addCallback(_created_dirnode)
702
703         def wait_for_c3_kg_conn():
704             return self.clients[3]._key_generator is not None
705         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
706
707         def check_kg_poolsize(junk, size_delta):
708             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
709                                  self.key_generator_svc.key_generator.pool_size + size_delta)
710
711         d.addCallback(check_kg_poolsize, 0)
712         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
713         d.addCallback(check_kg_poolsize, -1)
714         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
715         d.addCallback(check_kg_poolsize, -2)
716         # use_helper induces use of clients[3], which is the using-key_gen client
717         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
718         d.addCallback(check_kg_poolsize, -3)
719
720         return d
721
722     def flip_bit(self, good):
723         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
724
725     def mangle_uri(self, gooduri):
726         # change the key, which changes the storage index, which means we'll
727         # be asking about the wrong file, so nobody will have any shares
728         u = IFileURI(gooduri)
729         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
730                             uri_extension_hash=u.uri_extension_hash,
731                             needed_shares=u.needed_shares,
732                             total_shares=u.total_shares,
733                             size=u.size)
734         return u2.to_string()
735
736     # TODO: add a test which mangles the uri_extension_hash instead, and
737     # should fail due to not being able to get a valid uri_extension block.
738     # Also a test which sneakily mangles the uri_extension block to change
739     # some of the validation data, so it will fail in the post-download phase
740     # when the file's crypttext integrity check fails. Do the same thing for
741     # the key, which should cause the download to fail the post-download
742     # plaintext_hash check.
743
744     def test_vdrive(self):
745         self.basedir = "system/SystemTest/test_vdrive"
746         self.data = LARGE_DATA
747         d = self.set_up_nodes(use_stats_gatherer=True)
748         d.addCallback(self._test_introweb)
749         d.addCallback(self.log, "starting publish")
750         d.addCallback(self._do_publish1)
751         d.addCallback(self._test_runner)
752         d.addCallback(self._do_publish2)
753         # at this point, we have the following filesystem (where "R" denotes
754         # self._root_directory_uri):
755         # R
756         # R/subdir1
757         # R/subdir1/mydata567
758         # R/subdir1/subdir2/
759         # R/subdir1/subdir2/mydata992
760
761         d.addCallback(lambda res: self.bounce_client(0))
762         d.addCallback(self.log, "bounced client0")
763
764         d.addCallback(self._check_publish1)
765         d.addCallback(self.log, "did _check_publish1")
766         d.addCallback(self._check_publish2)
767         d.addCallback(self.log, "did _check_publish2")
768         d.addCallback(self._do_publish_private)
769         d.addCallback(self.log, "did _do_publish_private")
770         # now we also have (where "P" denotes a new dir):
771         #  P/personal/sekrit data
772         #  P/s2-rw -> /subdir1/subdir2/
773         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
774         d.addCallback(self._check_publish_private)
775         d.addCallback(self.log, "did _check_publish_private")
776         d.addCallback(self._test_web)
777         d.addCallback(self._test_control)
778         d.addCallback(self._test_cli)
779         # P now has four top-level children:
780         # P/personal/sekrit data
781         # P/s2-ro/
782         # P/s2-rw/
783         # P/test_put/  (empty)
784         d.addCallback(self._test_checker)
785         return d
786
787     def _test_introweb(self, res):
788         d = getPage(self.introweb_url, method="GET", followRedirect=True)
789         def _check(res):
790             try:
791                 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
792                                 in res)
793                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
794                 self.failUnless("Subscription Summary: storage: 5" in res)
795             except unittest.FailTest:
796                 print
797                 print "GET %s output was:" % self.introweb_url
798                 print res
799                 raise
800         d.addCallback(_check)
801         d.addCallback(lambda res:
802                       getPage(self.introweb_url + "?t=json",
803                               method="GET", followRedirect=True))
804         def _check_json(res):
805             data = simplejson.loads(res)
806             try:
807                 self.failUnlessEqual(data["subscription_summary"],
808                                      {"storage": 5})
809                 self.failUnlessEqual(data["announcement_summary"],
810                                      {"storage": 5, "stub_client": 5})
811                 self.failUnlessEqual(data["announcement_distinct_hosts"],
812                                      {"storage": 1, "stub_client": 1})
813             except unittest.FailTest:
814                 print
815                 print "GET %s?t=json output was:" % self.introweb_url
816                 print res
817                 raise
818         d.addCallback(_check_json)
819         return d
820
821     def _do_publish1(self, res):
822         ut = upload.Data(self.data, convergence=None)
823         c0 = self.clients[0]
824         d = c0.create_empty_dirnode()
825         def _made_root(new_dirnode):
826             self._root_directory_uri = new_dirnode.get_uri()
827             return c0.create_node_from_uri(self._root_directory_uri)
828         d.addCallback(_made_root)
829         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
830         def _made_subdir1(subdir1_node):
831             self._subdir1_node = subdir1_node
832             d1 = subdir1_node.add_file(u"mydata567", ut)
833             d1.addCallback(self.log, "publish finished")
834             def _stash_uri(filenode):
835                 self.uri = filenode.get_uri()
836                 assert isinstance(self.uri, str), (self.uri, filenode)
837             d1.addCallback(_stash_uri)
838             return d1
839         d.addCallback(_made_subdir1)
840         return d
841
842     def _do_publish2(self, res):
843         ut = upload.Data(self.data, convergence=None)
844         d = self._subdir1_node.create_empty_directory(u"subdir2")
845         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
846         return d
847
848     def log(self, res, *args, **kwargs):
849         # print "MSG: %s  RES: %s" % (msg, args)
850         log.msg(*args, **kwargs)
851         return res
852
853     def _do_publish_private(self, res):
854         self.smalldata = "sssh, very secret stuff"
855         ut = upload.Data(self.smalldata, convergence=None)
856         d = self.clients[0].create_empty_dirnode()
857         d.addCallback(self.log, "GOT private directory")
858         def _got_new_dir(privnode):
859             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
860             d1 = privnode.create_empty_directory(u"personal")
861             d1.addCallback(self.log, "made P/personal")
862             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
863             d1.addCallback(self.log, "made P/personal/sekrit data")
864             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
865             def _got_s2(s2node):
866                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
867                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
868                 return d2
869             d1.addCallback(_got_s2)
870             d1.addCallback(lambda res: privnode)
871             return d1
872         d.addCallback(_got_new_dir)
873         return d
874
875     def _check_publish1(self, res):
876         # this one uses the iterative API
877         c1 = self.clients[1]
878         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
879         d.addCallback(self.log, "check_publish1 got /")
880         d.addCallback(lambda root: root.get(u"subdir1"))
881         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
882         d.addCallback(lambda filenode: filenode.download_to_data())
883         d.addCallback(self.log, "get finished")
884         def _get_done(data):
885             self.failUnlessEqual(data, self.data)
886         d.addCallback(_get_done)
887         return d
888
889     def _check_publish2(self, res):
890         # this one uses the path-based API
891         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
892         d = rootnode.get_child_at_path(u"subdir1")
893         d.addCallback(lambda dirnode:
894                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
895         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
896         d.addCallback(lambda filenode: filenode.download_to_data())
897         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
898
899         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
900         def _got_filenode(filenode):
901             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
902             assert fnode == filenode
903         d.addCallback(_got_filenode)
904         return d
905
906     def _check_publish_private(self, resnode):
907         # this one uses the path-based API
908         self._private_node = resnode
909
910         d = self._private_node.get_child_at_path(u"personal")
911         def _got_personal(personal):
912             self._personal_node = personal
913             return personal
914         d.addCallback(_got_personal)
915
916         d.addCallback(lambda dirnode:
917                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
918         def get_path(path):
919             return self._private_node.get_child_at_path(path)
920
921         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
922         d.addCallback(lambda filenode: filenode.download_to_data())
923         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
924         d.addCallback(lambda res: get_path(u"s2-rw"))
925         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
926         d.addCallback(lambda res: get_path(u"s2-ro"))
927         def _got_s2ro(dirnode):
928             self.failUnless(dirnode.is_mutable(), dirnode)
929             self.failUnless(dirnode.is_readonly(), dirnode)
930             d1 = defer.succeed(None)
931             d1.addCallback(lambda res: dirnode.list())
932             d1.addCallback(self.log, "dirnode.list")
933
934             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
935
936             d1.addCallback(self.log, "doing add_file(ro)")
937             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
938             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
939
940             d1.addCallback(self.log, "doing get(ro)")
941             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
942             d1.addCallback(lambda filenode:
943                            self.failUnless(IFileNode.providedBy(filenode)))
944
945             d1.addCallback(self.log, "doing delete(ro)")
946             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
947
948             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
949
950             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
951
952             personal = self._personal_node
953             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
954
955             d1.addCallback(self.log, "doing move_child_to(ro)2")
956             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
957
958             d1.addCallback(self.log, "finished with _got_s2ro")
959             return d1
960         d.addCallback(_got_s2ro)
961         def _got_home(dummy):
962             home = self._private_node
963             personal = self._personal_node
964             d1 = defer.succeed(None)
965             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
966             d1.addCallback(lambda res:
967                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
968
969             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
970             d1.addCallback(lambda res:
971                            home.move_child_to(u"sekrit", home, u"sekrit data"))
972
973             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
974             d1.addCallback(lambda res:
975                            home.move_child_to(u"sekrit data", personal))
976
977             d1.addCallback(lambda res: home.build_manifest().when_done())
978             d1.addCallback(self.log, "manifest")
979             #  five items:
980             # P/
981             # P/personal/
982             # P/personal/sekrit data
983             # P/s2-rw  (same as P/s2-ro)
984             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
985             d1.addCallback(lambda res:
986                            self.failUnlessEqual(len(res["manifest"]), 5))
987             d1.addCallback(lambda res: home.start_deep_stats().when_done())
988             def _check_stats(stats):
989                 expected = {"count-immutable-files": 1,
990                             "count-mutable-files": 0,
991                             "count-literal-files": 1,
992                             "count-files": 2,
993                             "count-directories": 3,
994                             "size-immutable-files": 112,
995                             "size-literal-files": 23,
996                             #"size-directories": 616, # varies
997                             #"largest-directory": 616,
998                             "largest-directory-children": 3,
999                             "largest-immutable-file": 112,
1000                             }
1001                 for k,v in expected.iteritems():
1002                     self.failUnlessEqual(stats[k], v,
1003                                          "stats[%s] was %s, not %s" %
1004                                          (k, stats[k], v))
1005                 self.failUnless(stats["size-directories"] > 1300,
1006                                 stats["size-directories"])
1007                 self.failUnless(stats["largest-directory"] > 800,
1008                                 stats["largest-directory"])
1009                 self.failUnlessEqual(stats["size-files-histogram"],
1010                                      [ (11, 31, 1), (101, 316, 1) ])
1011             d1.addCallback(_check_stats)
1012             return d1
1013         d.addCallback(_got_home)
1014         return d
1015
1016     def shouldFail(self, res, expected_failure, which, substring=None):
1017         if isinstance(res, Failure):
1018             res.trap(expected_failure)
1019             if substring:
1020                 self.failUnless(substring in str(res),
1021                                 "substring '%s' not in '%s'"
1022                                 % (substring, str(res)))
1023         else:
1024             self.fail("%s was supposed to raise %s, not get '%s'" %
1025                       (which, expected_failure, res))
1026
1027     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1028         assert substring is None or isinstance(substring, str)
1029         d = defer.maybeDeferred(callable, *args, **kwargs)
1030         def done(res):
1031             if isinstance(res, Failure):
1032                 res.trap(expected_failure)
1033                 if substring:
1034                     self.failUnless(substring in str(res),
1035                                     "substring '%s' not in '%s'"
1036                                     % (substring, str(res)))
1037             else:
1038                 self.fail("%s was supposed to raise %s, not get '%s'" %
1039                           (which, expected_failure, res))
1040         d.addBoth(done)
1041         return d
1042
1043     def PUT(self, urlpath, data):
1044         url = self.webish_url + urlpath
1045         return getPage(url, method="PUT", postdata=data)
1046
1047     def GET(self, urlpath, followRedirect=False):
1048         url = self.webish_url + urlpath
1049         return getPage(url, method="GET", followRedirect=followRedirect)
1050
1051     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1052         if use_helper:
1053             url = self.helper_webish_url + urlpath
1054         else:
1055             url = self.webish_url + urlpath
1056         sepbase = "boogabooga"
1057         sep = "--" + sepbase
1058         form = []
1059         form.append(sep)
1060         form.append('Content-Disposition: form-data; name="_charset"')
1061         form.append('')
1062         form.append('UTF-8')
1063         form.append(sep)
1064         for name, value in fields.iteritems():
1065             if isinstance(value, tuple):
1066                 filename, value = value
1067                 form.append('Content-Disposition: form-data; name="%s"; '
1068                             'filename="%s"' % (name, filename.encode("utf-8")))
1069             else:
1070                 form.append('Content-Disposition: form-data; name="%s"' % name)
1071             form.append('')
1072             form.append(str(value))
1073             form.append(sep)
1074         form[-1] += "--"
1075         body = "\r\n".join(form) + "\r\n"
1076         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1077                    }
1078         return getPage(url, method="POST", postdata=body,
1079                        headers=headers, followRedirect=followRedirect)
1080
1081     def _test_web(self, res):
1082         base = self.webish_url
1083         public = "uri/" + self._root_directory_uri
1084         d = getPage(base)
1085         def _got_welcome(page):
1086             # XXX This test is oversensitive to formatting
1087             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1088             self.failUnless(expected in page,
1089                             "I didn't see the right 'connected storage servers'"
1090                             " message in: %s" % page
1091                             )
1092             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1093             self.failUnless(expected in page,
1094                             "I didn't see the right 'My nodeid' message "
1095                             "in: %s" % page)
1096             self.failUnless("Helper: 0 active uploads" in page)
1097         d.addCallback(_got_welcome)
1098         d.addCallback(self.log, "done with _got_welcome")
1099
1100         # get the welcome page from the node that uses the helper too
1101         d.addCallback(lambda res: getPage(self.helper_webish_url))
1102         def _got_welcome_helper(page):
1103             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1104                             page)
1105             self.failUnless("Not running helper" in page)
1106         d.addCallback(_got_welcome_helper)
1107
1108         d.addCallback(lambda res: getPage(base + public))
1109         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1110         def _got_subdir1(page):
1111             # there ought to be an href for our file
1112             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1113             self.failUnless(">mydata567</a>" in page)
1114         d.addCallback(_got_subdir1)
1115         d.addCallback(self.log, "done with _got_subdir1")
1116         d.addCallback(lambda res:
1117                       getPage(base + public + "/subdir1/mydata567"))
1118         def _got_data(page):
1119             self.failUnlessEqual(page, self.data)
1120         d.addCallback(_got_data)
1121
1122         # download from a URI embedded in a URL
1123         d.addCallback(self.log, "_get_from_uri")
1124         def _get_from_uri(res):
1125             return getPage(base + "uri/%s?filename=%s"
1126                            % (self.uri, "mydata567"))
1127         d.addCallback(_get_from_uri)
1128         def _got_from_uri(page):
1129             self.failUnlessEqual(page, self.data)
1130         d.addCallback(_got_from_uri)
1131
1132         # download from a URI embedded in a URL, second form
1133         d.addCallback(self.log, "_get_from_uri2")
1134         def _get_from_uri2(res):
1135             return getPage(base + "uri?uri=%s" % (self.uri,))
1136         d.addCallback(_get_from_uri2)
1137         d.addCallback(_got_from_uri)
1138
1139         # download from a bogus URI, make sure we get a reasonable error
1140         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1141         def _get_from_bogus_uri(res):
1142             d1 = getPage(base + "uri/%s?filename=%s"
1143                          % (self.mangle_uri(self.uri), "mydata567"))
1144             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1145                        "410")
1146             return d1
1147         d.addCallback(_get_from_bogus_uri)
1148         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1149
1150         # upload a file with PUT
1151         d.addCallback(self.log, "about to try PUT")
1152         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1153                                            "new.txt contents"))
1154         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1155         d.addCallback(self.failUnlessEqual, "new.txt contents")
1156         # and again with something large enough to use multiple segments,
1157         # and hopefully trigger pauseProducing too
1158         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1159                                            "big" * 500000)) # 1.5MB
1160         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1161         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1162
1163         # can we replace files in place?
1164         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1165                                            "NEWER contents"))
1166         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1167         d.addCallback(self.failUnlessEqual, "NEWER contents")
1168
1169         # test unlinked POST
1170         d.addCallback(lambda res: self.POST("uri", t="upload",
1171                                             file=("new.txt", "data" * 10000)))
1172         # and again using the helper, which exercises different upload-status
1173         # display code
1174         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1175                                             file=("foo.txt", "data2" * 10000)))
1176
1177         # check that the status page exists
1178         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1179         def _got_status(res):
1180             # find an interesting upload and download to look at. LIT files
1181             # are not interesting.
1182             h = self.clients[0].get_history()
1183             for ds in h.list_all_download_statuses():
1184                 if ds.get_size() > 200:
1185                     self._down_status = ds.get_counter()
1186             for us in h.list_all_upload_statuses():
1187                 if us.get_size() > 200:
1188                     self._up_status = us.get_counter()
1189             rs = list(h.list_all_retrieve_statuses())[0]
1190             self._retrieve_status = rs.get_counter()
1191             ps = list(h.list_all_publish_statuses())[0]
1192             self._publish_status = ps.get_counter()
1193             us = list(h.list_all_mapupdate_statuses())[0]
1194             self._update_status = us.get_counter()
1195
1196             # and that there are some upload- and download- status pages
1197             return self.GET("status/up-%d" % self._up_status)
1198         d.addCallback(_got_status)
1199         def _got_up(res):
1200             return self.GET("status/down-%d" % self._down_status)
1201         d.addCallback(_got_up)
1202         def _got_down(res):
1203             return self.GET("status/mapupdate-%d" % self._update_status)
1204         d.addCallback(_got_down)
1205         def _got_update(res):
1206             return self.GET("status/publish-%d" % self._publish_status)
1207         d.addCallback(_got_update)
1208         def _got_publish(res):
1209             return self.GET("status/retrieve-%d" % self._retrieve_status)
1210         d.addCallback(_got_publish)
1211
1212         # check that the helper status page exists
1213         d.addCallback(lambda res:
1214                       self.GET("helper_status", followRedirect=True))
1215         def _got_helper_status(res):
1216             self.failUnless("Bytes Fetched:" in res)
1217             # touch a couple of files in the helper's working directory to
1218             # exercise more code paths
1219             workdir = os.path.join(self.getdir("client0"), "helper")
1220             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1221             f = open(incfile, "wb")
1222             f.write("small file")
1223             f.close()
1224             then = time.time() - 86400*3
1225             now = time.time()
1226             os.utime(incfile, (now, then))
1227             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1228             f = open(encfile, "wb")
1229             f.write("less small file")
1230             f.close()
1231             os.utime(encfile, (now, then))
1232         d.addCallback(_got_helper_status)
1233         # and that the json form exists
1234         d.addCallback(lambda res:
1235                       self.GET("helper_status?t=json", followRedirect=True))
1236         def _got_helper_status_json(res):
1237             data = simplejson.loads(res)
1238             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1239                                  1)
1240             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1241             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1242             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1243                                  10)
1244             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1245             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1246             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1247                                  15)
1248         d.addCallback(_got_helper_status_json)
1249
1250         # and check that client[3] (which uses a helper but does not run one
1251         # itself) doesn't explode when you ask for its status
1252         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1253         def _got_non_helper_status(res):
1254             self.failUnless("Upload and Download Status" in res)
1255         d.addCallback(_got_non_helper_status)
1256
1257         # or for helper status with t=json
1258         d.addCallback(lambda res:
1259                       getPage(self.helper_webish_url + "helper_status?t=json"))
1260         def _got_non_helper_status_json(res):
1261             data = simplejson.loads(res)
1262             self.failUnlessEqual(data, {})
1263         d.addCallback(_got_non_helper_status_json)
1264
1265         # see if the statistics page exists
1266         d.addCallback(lambda res: self.GET("statistics"))
1267         def _got_stats(res):
1268             self.failUnless("Node Statistics" in res)
1269             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1270         d.addCallback(_got_stats)
1271         d.addCallback(lambda res: self.GET("statistics?t=json"))
1272         def _got_stats_json(res):
1273             data = simplejson.loads(res)
1274             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1275             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1276         d.addCallback(_got_stats_json)
1277
1278         # TODO: mangle the second segment of a file, to test errors that
1279         # occur after we've already sent some good data, which uses a
1280         # different error path.
1281
1282         # TODO: download a URI with a form
1283         # TODO: create a directory by using a form
1284         # TODO: upload by using a form on the directory page
1285         #    url = base + "somedir/subdir1/freeform_post!!upload"
1286         # TODO: delete a file by using a button on the directory page
1287
1288         return d
1289
1290     def _test_runner(self, res):
1291         # exercise some of the diagnostic tools in runner.py
1292
1293         # find a share
1294         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1295             if "storage" not in dirpath:
1296                 continue
1297             if not filenames:
1298                 continue
1299             pieces = dirpath.split(os.sep)
1300             if (len(pieces) >= 4
1301                 and pieces[-4] == "storage"
1302                 and pieces[-3] == "shares"):
1303                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1304                 # are sharefiles here
1305                 filename = os.path.join(dirpath, filenames[0])
1306                 # peek at the magic to see if it is a chk share
1307                 magic = open(filename, "rb").read(4)
1308                 if magic == '\x00\x00\x00\x01':
1309                     break
1310         else:
1311             self.fail("unable to find any uri_extension files in %s"
1312                       % self.basedir)
1313         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1314
1315         out,err = StringIO(), StringIO()
1316         rc = runner.runner(["debug", "dump-share", "--offsets",
1317                             filename],
1318                            stdout=out, stderr=err)
1319         output = out.getvalue()
1320         self.failUnlessEqual(rc, 0)
1321
1322         # we only upload a single file, so we can assert some things about
1323         # its size and shares.
1324         self.failUnless(("share filename: %s" % filename) in output)
1325         self.failUnless("size: %d\n" % len(self.data) in output)
1326         self.failUnless("num_segments: 1\n" in output)
1327         # segment_size is always a multiple of needed_shares
1328         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1329         self.failUnless("total_shares: 10\n" in output)
1330         # keys which are supposed to be present
1331         for key in ("size", "num_segments", "segment_size",
1332                     "needed_shares", "total_shares",
1333                     "codec_name", "codec_params", "tail_codec_params",
1334                     #"plaintext_hash", "plaintext_root_hash",
1335                     "crypttext_hash", "crypttext_root_hash",
1336                     "share_root_hash", "UEB_hash"):
1337             self.failUnless("%s: " % key in output, key)
1338         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1339
1340         # now use its storage index to find the other shares using the
1341         # 'find-shares' tool
1342         sharedir, shnum = os.path.split(filename)
1343         storagedir, storage_index_s = os.path.split(sharedir)
1344         out,err = StringIO(), StringIO()
1345         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1346         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1347         rc = runner.runner(cmd, stdout=out, stderr=err)
1348         self.failUnlessEqual(rc, 0)
1349         out.seek(0)
1350         sharefiles = [sfn.strip() for sfn in out.readlines()]
1351         self.failUnlessEqual(len(sharefiles), 10)
1352
1353         # also exercise the 'catalog-shares' tool
1354         out,err = StringIO(), StringIO()
1355         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1356         cmd = ["debug", "catalog-shares"] + nodedirs
1357         rc = runner.runner(cmd, stdout=out, stderr=err)
1358         self.failUnlessEqual(rc, 0)
1359         out.seek(0)
1360         descriptions = [sfn.strip() for sfn in out.readlines()]
1361         self.failUnlessEqual(len(descriptions), 30)
1362         matching = [line
1363                     for line in descriptions
1364                     if line.startswith("CHK %s " % storage_index_s)]
1365         self.failUnlessEqual(len(matching), 10)
1366
1367     def _test_control(self, res):
1368         # exercise the remote-control-the-client foolscap interfaces in
1369         # allmydata.control (mostly used for performance tests)
1370         c0 = self.clients[0]
1371         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1372         control_furl = open(control_furl_file, "r").read().strip()
1373         # it doesn't really matter which Tub we use to connect to the client,
1374         # so let's just use our IntroducerNode's
1375         d = self.introducer.tub.getReference(control_furl)
1376         d.addCallback(self._test_control2, control_furl_file)
1377         return d
1378     def _test_control2(self, rref, filename):
1379         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1380         downfile = os.path.join(self.basedir, "control.downfile")
1381         d.addCallback(lambda uri:
1382                       rref.callRemote("download_from_uri_to_file",
1383                                       uri, downfile))
1384         def _check(res):
1385             self.failUnlessEqual(res, downfile)
1386             data = open(downfile, "r").read()
1387             expected_data = open(filename, "r").read()
1388             self.failUnlessEqual(data, expected_data)
1389         d.addCallback(_check)
1390         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1391         if sys.platform == "linux2":
1392             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1393         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1394         return d
1395
1396     def _test_cli(self, res):
1397         # run various CLI commands (in a thread, since they use blocking
1398         # network calls)
1399
1400         private_uri = self._private_node.get_uri()
1401         some_uri = self._root_directory_uri
1402         client0_basedir = self.getdir("client0")
1403
1404         nodeargs = [
1405             "--node-directory", client0_basedir,
1406             ]
1407         TESTDATA = "I will not write the same thing over and over.\n" * 100
1408
1409         d = defer.succeed(None)
1410
1411         # for compatibility with earlier versions, private/root_dir.cap is
1412         # supposed to be treated as an alias named "tahoe:". Start by making
1413         # sure that works, before we add other aliases.
1414
1415         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1416         f = open(root_file, "w")
1417         f.write(private_uri)
1418         f.close()
1419
1420         def run(ignored, verb, *args, **kwargs):
1421             stdin = kwargs.get("stdin", "")
1422             newargs = [verb] + nodeargs + list(args)
1423             return self._run_cli(newargs, stdin=stdin)
1424
1425         def _check_ls((out,err), expected_children, unexpected_children=[]):
1426             self.failUnlessEqual(err, "")
1427             for s in expected_children:
1428                 self.failUnless(s in out, (s,out))
1429             for s in unexpected_children:
1430                 self.failIf(s in out, (s,out))
1431
1432         def _check_ls_root((out,err)):
1433             self.failUnless("personal" in out)
1434             self.failUnless("s2-ro" in out)
1435             self.failUnless("s2-rw" in out)
1436             self.failUnlessEqual(err, "")
1437
1438         # this should reference private_uri
1439         d.addCallback(run, "ls")
1440         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1441
1442         d.addCallback(run, "list-aliases")
1443         def _check_aliases_1((out,err)):
1444             self.failUnlessEqual(err, "")
1445             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1446         d.addCallback(_check_aliases_1)
1447
1448         # now that that's out of the way, remove root_dir.cap and work with
1449         # new files
1450         d.addCallback(lambda res: os.unlink(root_file))
1451         d.addCallback(run, "list-aliases")
1452         def _check_aliases_2((out,err)):
1453             self.failUnlessEqual(err, "")
1454             self.failUnlessEqual(out, "")
1455         d.addCallback(_check_aliases_2)
1456
1457         d.addCallback(run, "mkdir")
1458         def _got_dir( (out,err) ):
1459             self.failUnless(uri.from_string_dirnode(out.strip()))
1460             return out.strip()
1461         d.addCallback(_got_dir)
1462         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1463
1464         d.addCallback(run, "list-aliases")
1465         def _check_aliases_3((out,err)):
1466             self.failUnlessEqual(err, "")
1467             self.failUnless("tahoe: " in out)
1468         d.addCallback(_check_aliases_3)
1469
1470         def _check_empty_dir((out,err)):
1471             self.failUnlessEqual(out, "")
1472             self.failUnlessEqual(err, "")
1473         d.addCallback(run, "ls")
1474         d.addCallback(_check_empty_dir)
1475
1476         def _check_missing_dir((out,err)):
1477             # TODO: check that rc==2
1478             self.failUnlessEqual(out, "")
1479             self.failUnlessEqual(err, "No such file or directory\n")
1480         d.addCallback(run, "ls", "bogus")
1481         d.addCallback(_check_missing_dir)
1482
1483         files = []
1484         datas = []
1485         for i in range(10):
1486             fn = os.path.join(self.basedir, "file%d" % i)
1487             files.append(fn)
1488             data = "data to be uploaded: file%d\n" % i
1489             datas.append(data)
1490             open(fn,"wb").write(data)
1491
1492         def _check_stdout_against((out,err), filenum=None, data=None):
1493             self.failUnlessEqual(err, "")
1494             if filenum is not None:
1495                 self.failUnlessEqual(out, datas[filenum])
1496             if data is not None:
1497                 self.failUnlessEqual(out, data)
1498
1499         # test all both forms of put: from a file, and from stdin
1500         #  tahoe put bar FOO
1501         d.addCallback(run, "put", files[0], "tahoe-file0")
1502         def _put_out((out,err)):
1503             self.failUnless("URI:LIT:" in out, out)
1504             self.failUnless("201 Created" in err, err)
1505             uri0 = out.strip()
1506             return run(None, "get", uri0)
1507         d.addCallback(_put_out)
1508         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1509
1510         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1511         #  tahoe put bar tahoe:FOO
1512         d.addCallback(run, "put", files[2], "tahoe:file2")
1513         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1514         def _check_put_mutable((out,err)):
1515             self._mutable_file3_uri = out.strip()
1516         d.addCallback(_check_put_mutable)
1517         d.addCallback(run, "get", "tahoe:file3")
1518         d.addCallback(_check_stdout_against, 3)
1519
1520         #  tahoe put FOO
1521         STDIN_DATA = "This is the file to upload from stdin."
1522         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1523         #  tahoe put tahoe:FOO
1524         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1525                       stdin="Other file from stdin.")
1526
1527         d.addCallback(run, "ls")
1528         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1529                                   "tahoe-file-stdin", "from-stdin"])
1530         d.addCallback(run, "ls", "subdir")
1531         d.addCallback(_check_ls, ["tahoe-file1"])
1532
1533         # tahoe mkdir FOO
1534         d.addCallback(run, "mkdir", "subdir2")
1535         d.addCallback(run, "ls")
1536         # TODO: extract the URI, set an alias with it
1537         d.addCallback(_check_ls, ["subdir2"])
1538
1539         # tahoe get: (to stdin and to a file)
1540         d.addCallback(run, "get", "tahoe-file0")
1541         d.addCallback(_check_stdout_against, 0)
1542         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1543         d.addCallback(_check_stdout_against, 1)
1544         outfile0 = os.path.join(self.basedir, "outfile0")
1545         d.addCallback(run, "get", "file2", outfile0)
1546         def _check_outfile0((out,err)):
1547             data = open(outfile0,"rb").read()
1548             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1549         d.addCallback(_check_outfile0)
1550         outfile1 = os.path.join(self.basedir, "outfile0")
1551         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1552         def _check_outfile1((out,err)):
1553             data = open(outfile1,"rb").read()
1554             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1555         d.addCallback(_check_outfile1)
1556
1557         d.addCallback(run, "rm", "tahoe-file0")
1558         d.addCallback(run, "rm", "tahoe:file2")
1559         d.addCallback(run, "ls")
1560         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1561
1562         d.addCallback(run, "ls", "-l")
1563         def _check_ls_l((out,err)):
1564             lines = out.split("\n")
1565             for l in lines:
1566                 if "tahoe-file-stdin" in l:
1567                     self.failUnless(l.startswith("-r-- "), l)
1568                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1569                 if "file3" in l:
1570                     self.failUnless(l.startswith("-rw- "), l) # mutable
1571         d.addCallback(_check_ls_l)
1572
1573         d.addCallback(run, "ls", "--uri")
1574         def _check_ls_uri((out,err)):
1575             lines = out.split("\n")
1576             for l in lines:
1577                 if "file3" in l:
1578                     self.failUnless(self._mutable_file3_uri in l)
1579         d.addCallback(_check_ls_uri)
1580
1581         d.addCallback(run, "ls", "--readonly-uri")
1582         def _check_ls_rouri((out,err)):
1583             lines = out.split("\n")
1584             for l in lines:
1585                 if "file3" in l:
1586                     rw_uri = self._mutable_file3_uri
1587                     u = uri.from_string_mutable_filenode(rw_uri)
1588                     ro_uri = u.get_readonly().to_string()
1589                     self.failUnless(ro_uri in l)
1590         d.addCallback(_check_ls_rouri)
1591
1592
1593         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1594         d.addCallback(run, "ls")
1595         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1596
1597         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1598         d.addCallback(run, "ls")
1599         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1600
1601         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1602         d.addCallback(run, "ls")
1603         d.addCallback(_check_ls, ["file3", "file3-copy"])
1604         d.addCallback(run, "get", "tahoe:file3-copy")
1605         d.addCallback(_check_stdout_against, 3)
1606
1607         # copy from disk into tahoe
1608         d.addCallback(run, "cp", files[4], "tahoe:file4")
1609         d.addCallback(run, "ls")
1610         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1611         d.addCallback(run, "get", "tahoe:file4")
1612         d.addCallback(_check_stdout_against, 4)
1613
1614         # copy from tahoe into disk
1615         target_filename = os.path.join(self.basedir, "file-out")
1616         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1617         def _check_cp_out((out,err)):
1618             self.failUnless(os.path.exists(target_filename))
1619             got = open(target_filename,"rb").read()
1620             self.failUnlessEqual(got, datas[4])
1621         d.addCallback(_check_cp_out)
1622
1623         # copy from disk to disk (silly case)
1624         target2_filename = os.path.join(self.basedir, "file-out-copy")
1625         d.addCallback(run, "cp", target_filename, target2_filename)
1626         def _check_cp_out2((out,err)):
1627             self.failUnless(os.path.exists(target2_filename))
1628             got = open(target2_filename,"rb").read()
1629             self.failUnlessEqual(got, datas[4])
1630         d.addCallback(_check_cp_out2)
1631
1632         # copy from tahoe into disk, overwriting an existing file
1633         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1634         def _check_cp_out3((out,err)):
1635             self.failUnless(os.path.exists(target_filename))
1636             got = open(target_filename,"rb").read()
1637             self.failUnlessEqual(got, datas[3])
1638         d.addCallback(_check_cp_out3)
1639
1640         # copy from disk into tahoe, overwriting an existing immutable file
1641         d.addCallback(run, "cp", files[5], "tahoe:file4")
1642         d.addCallback(run, "ls")
1643         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1644         d.addCallback(run, "get", "tahoe:file4")
1645         d.addCallback(_check_stdout_against, 5)
1646
1647         # copy from disk into tahoe, overwriting an existing mutable file
1648         d.addCallback(run, "cp", files[5], "tahoe:file3")
1649         d.addCallback(run, "ls")
1650         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1651         d.addCallback(run, "get", "tahoe:file3")
1652         d.addCallback(_check_stdout_against, 5)
1653
1654         # recursive copy: setup
1655         dn = os.path.join(self.basedir, "dir1")
1656         os.makedirs(dn)
1657         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1658         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1659         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1660         sdn2 = os.path.join(dn, "subdir2")
1661         os.makedirs(sdn2)
1662         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1663         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1664
1665         # from disk into tahoe
1666         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1667         d.addCallback(run, "ls")
1668         d.addCallback(_check_ls, ["dir1"])
1669         d.addCallback(run, "ls", "dir1")
1670         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1671                       ["rfile4", "rfile5"])
1672         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1673         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1674                       ["rfile1", "rfile2", "rfile3"])
1675         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1676         d.addCallback(_check_stdout_against, data="rfile4")
1677
1678         # and back out again
1679         dn_copy = os.path.join(self.basedir, "dir1-copy")
1680         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1681         def _check_cp_r_out((out,err)):
1682             def _cmp(name):
1683                 old = open(os.path.join(dn, name), "rb").read()
1684                 newfn = os.path.join(dn_copy, name)
1685                 self.failUnless(os.path.exists(newfn))
1686                 new = open(newfn, "rb").read()
1687                 self.failUnlessEqual(old, new)
1688             _cmp("rfile1")
1689             _cmp("rfile2")
1690             _cmp("rfile3")
1691             _cmp(os.path.join("subdir2", "rfile4"))
1692             _cmp(os.path.join("subdir2", "rfile5"))
1693         d.addCallback(_check_cp_r_out)
1694
1695         # and copy it a second time, which ought to overwrite the same files
1696         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1697
1698         # and again, only writing filecaps
1699         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1700         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1701         def _check_capsonly((out,err)):
1702             # these should all be LITs
1703             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1704             y = uri.from_string_filenode(x)
1705             self.failUnlessEqual(y.data, "rfile4")
1706         d.addCallback(_check_capsonly)
1707
1708         # and tahoe-to-tahoe
1709         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1710         d.addCallback(run, "ls")
1711         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1712         d.addCallback(run, "ls", "dir1-copy")
1713         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1714                       ["rfile4", "rfile5"])
1715         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1716         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1717                       ["rfile1", "rfile2", "rfile3"])
1718         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1719         d.addCallback(_check_stdout_against, data="rfile4")
1720
1721         # and copy it a second time, which ought to overwrite the same files
1722         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1723
1724         # tahoe_ls doesn't currently handle the error correctly: it tries to
1725         # JSON-parse a traceback.
1726 ##         def _ls_missing(res):
1727 ##             argv = ["ls"] + nodeargs + ["bogus"]
1728 ##             return self._run_cli(argv)
1729 ##         d.addCallback(_ls_missing)
1730 ##         def _check_ls_missing((out,err)):
1731 ##             print "OUT", out
1732 ##             print "ERR", err
1733 ##             self.failUnlessEqual(err, "")
1734 ##         d.addCallback(_check_ls_missing)
1735
1736         return d
1737
1738     def _run_cli(self, argv, stdin=""):
1739         #print "CLI:", argv
1740         stdout, stderr = StringIO(), StringIO()
1741         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1742                                   stdin=StringIO(stdin),
1743                                   stdout=stdout, stderr=stderr)
1744         def _done(res):
1745             return stdout.getvalue(), stderr.getvalue()
1746         d.addCallback(_done)
1747         return d
1748
1749     def _test_checker(self, res):
1750         ut = upload.Data("too big to be literal" * 200, convergence=None)
1751         d = self._personal_node.add_file(u"big file", ut)
1752
1753         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1754         def _check_dirnode_results(r):
1755             self.failUnless(r.is_healthy())
1756         d.addCallback(_check_dirnode_results)
1757         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1758         d.addCallback(_check_dirnode_results)
1759
1760         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1761         def _got_chk_filenode(n):
1762             self.failUnless(isinstance(n, filenode.FileNode))
1763             d = n.check(Monitor())
1764             def _check_filenode_results(r):
1765                 self.failUnless(r.is_healthy())
1766             d.addCallback(_check_filenode_results)
1767             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1768             d.addCallback(_check_filenode_results)
1769             return d
1770         d.addCallback(_got_chk_filenode)
1771
1772         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1773         def _got_lit_filenode(n):
1774             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1775             d = n.check(Monitor())
1776             def _check_lit_filenode_results(r):
1777                 self.failUnlessEqual(r, None)
1778             d.addCallback(_check_lit_filenode_results)
1779             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1780             d.addCallback(_check_lit_filenode_results)
1781             return d
1782         d.addCallback(_got_lit_filenode)
1783         return d
1784