]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
0a563309e17eccf8bdfa553014a396d764d178af
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri
12 from allmydata.storage.mutable import MutableShareFile
13 from allmydata.storage.server import si_a2b
14 from allmydata.immutable import download, filenode, offloaded, upload
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.scripts import runner
18 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
19      NoSuchChildError, NoSharesError
20 from allmydata.monitor import Monitor
21 from allmydata.mutable.common import NotMutableError
22 from allmydata.mutable import layout as mutable_layout
23 from foolscap.api import DeadReferenceError
24 from twisted.python.failure import Failure
25 from twisted.web.client import getPage
26 from twisted.web.error import Error
27
28 from allmydata.test.common import SystemTestMixin, MemoryConsumer, \
29      download_to_data
30
31 LARGE_DATA = """
32 This is some data to publish to the virtual drive, which needs to be large
33 enough to not fit inside a LIT uri.
34 """
35
36 class CountingDataUploadable(upload.Data):
37     bytes_read = 0
38     interrupt_after = None
39     interrupt_after_d = None
40
41     def read(self, length):
42         self.bytes_read += length
43         if self.interrupt_after is not None:
44             if self.bytes_read > self.interrupt_after:
45                 self.interrupt_after = None
46                 self.interrupt_after_d.callback(self)
47         return upload.Data.read(self, length)
48
49 class GrabEverythingConsumer:
50     implements(IConsumer)
51
52     def __init__(self):
53         self.contents = ""
54
55     def registerProducer(self, producer, streaming):
56         assert streaming
57         assert IPushProducer.providedBy(producer)
58
59     def write(self, data):
60         self.contents += data
61
62     def unregisterProducer(self):
63         pass
64
65 class SystemTest(SystemTestMixin, unittest.TestCase):
66     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
67
68     def test_connections(self):
69         self.basedir = "system/SystemTest/test_connections"
70         d = self.set_up_nodes()
71         self.extra_node = None
72         d.addCallback(lambda res: self.add_extra_node(self.numclients))
73         def _check(extra_node):
74             self.extra_node = extra_node
75             for c in self.clients:
76                 all_peerids = c.get_storage_broker().get_all_serverids()
77                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
78                 sb = c.storage_broker
79                 permuted_peers = sb.get_servers_for_index("a")
80                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
81
82         d.addCallback(_check)
83         def _shutdown_extra_node(res):
84             if self.extra_node:
85                 return self.extra_node.stopService()
86             return res
87         d.addBoth(_shutdown_extra_node)
88         return d
89     # test_connections is subsumed by test_upload_and_download, and takes
90     # quite a while to run on a slow machine (because of all the TLS
91     # connections that must be established). If we ever rework the introducer
92     # code to such an extent that we're not sure if it works anymore, we can
93     # reinstate this test until it does.
94     del test_connections
95
96     def test_upload_and_download_random_key(self):
97         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
98         return self._test_upload_and_download(convergence=None)
99
100     def test_upload_and_download_convergent(self):
101         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
102         return self._test_upload_and_download(convergence="some convergence string")
103
104     def _test_upload_and_download(self, convergence):
105         # we use 4000 bytes of data, which will result in about 400k written
106         # to disk among all our simulated nodes
107         DATA = "Some data to upload\n" * 200
108         d = self.set_up_nodes()
109         def _check_connections(res):
110             for c in self.clients:
111                 all_peerids = c.get_storage_broker().get_all_serverids()
112                 self.failUnlessEqual(len(all_peerids), self.numclients)
113                 sb = c.storage_broker
114                 permuted_peers = sb.get_servers_for_index("a")
115                 self.failUnlessEqual(len(permuted_peers), self.numclients)
116         d.addCallback(_check_connections)
117
118         def _do_upload(res):
119             log.msg("UPLOADING")
120             u = self.clients[0].getServiceNamed("uploader")
121             self.uploader = u
122             # we crank the max segsize down to 1024b for the duration of this
123             # test, so we can exercise multiple segments. It is important
124             # that this is not a multiple of the segment size, so that the
125             # tail segment is not the same length as the others. This actualy
126             # gets rounded up to 1025 to be a multiple of the number of
127             # required shares (since we use 25 out of 100 FEC).
128             up = upload.Data(DATA, convergence=convergence)
129             up.max_segment_size = 1024
130             d1 = u.upload(up)
131             return d1
132         d.addCallback(_do_upload)
133         def _upload_done(results):
134             theuri = results.uri
135             log.msg("upload finished: uri is %s" % (theuri,))
136             self.uri = theuri
137             assert isinstance(self.uri, str), self.uri
138             self.cap = uri.from_string(self.uri)
139             self.downloader = self.clients[1].downloader
140         d.addCallback(_upload_done)
141
142         def _upload_again(res):
143             # Upload again. If using convergent encryption then this ought to be
144             # short-circuited, however with the way we currently generate URIs
145             # (i.e. because they include the roothash), we have to do all of the
146             # encoding work, and only get to save on the upload part.
147             log.msg("UPLOADING AGAIN")
148             up = upload.Data(DATA, convergence=convergence)
149             up.max_segment_size = 1024
150             d1 = self.uploader.upload(up)
151         d.addCallback(_upload_again)
152
153         def _download_to_data(res):
154             log.msg("DOWNLOADING")
155             return self.downloader.download_to_data(self.cap)
156         d.addCallback(_download_to_data)
157         def _download_to_data_done(data):
158             log.msg("download finished")
159             self.failUnlessEqual(data, DATA)
160         d.addCallback(_download_to_data_done)
161
162         target_filename = os.path.join(self.basedir, "download.target")
163         def _download_to_filename(res):
164             return self.downloader.download_to_filename(self.cap,
165                                                         target_filename)
166         d.addCallback(_download_to_filename)
167         def _download_to_filename_done(res):
168             newdata = open(target_filename, "rb").read()
169             self.failUnlessEqual(newdata, DATA)
170         d.addCallback(_download_to_filename_done)
171
172         target_filename2 = os.path.join(self.basedir, "download.target2")
173         def _download_to_filehandle(res):
174             fh = open(target_filename2, "wb")
175             return self.downloader.download_to_filehandle(self.cap, fh)
176         d.addCallback(_download_to_filehandle)
177         def _download_to_filehandle_done(fh):
178             fh.close()
179             newdata = open(target_filename2, "rb").read()
180             self.failUnlessEqual(newdata, DATA)
181         d.addCallback(_download_to_filehandle_done)
182
183         consumer = GrabEverythingConsumer()
184         ct = download.ConsumerAdapter(consumer)
185         d.addCallback(lambda res:
186                       self.downloader.download(self.cap, ct))
187         def _download_to_consumer_done(ign):
188             self.failUnlessEqual(consumer.contents, DATA)
189         d.addCallback(_download_to_consumer_done)
190
191         def _test_read(res):
192             n = self.clients[1].create_node_from_uri(self.uri)
193             d = download_to_data(n)
194             def _read_done(data):
195                 self.failUnlessEqual(data, DATA)
196             d.addCallback(_read_done)
197             d.addCallback(lambda ign:
198                           n.read(MemoryConsumer(), offset=1, size=4))
199             def _read_portion_done(mc):
200                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
201             d.addCallback(_read_portion_done)
202             d.addCallback(lambda ign:
203                           n.read(MemoryConsumer(), offset=2, size=None))
204             def _read_tail_done(mc):
205                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
206             d.addCallback(_read_tail_done)
207             d.addCallback(lambda ign:
208                           n.read(MemoryConsumer(), size=len(DATA)+1000))
209             def _read_too_much(mc):
210                 self.failUnlessEqual("".join(mc.chunks), DATA)
211             d.addCallback(_read_too_much)
212
213             return d
214         d.addCallback(_test_read)
215
216         def _test_bad_read(res):
217             bad_u = uri.from_string_filenode(self.uri)
218             bad_u.key = self.flip_bit(bad_u.key)
219             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
220             # this should cause an error during download
221
222             d = self.shouldFail2(NoSharesError, "'download bad node'",
223                                  None,
224                                  bad_n.read, MemoryConsumer(), offset=2)
225             return d
226         d.addCallback(_test_bad_read)
227
228         def _download_nonexistent_uri(res):
229             baduri = self.mangle_uri(self.uri)
230             log.msg("about to download non-existent URI", level=log.UNUSUAL,
231                     facility="tahoe.tests")
232             d1 = self.downloader.download_to_data(uri.from_string(baduri))
233             def _baduri_should_fail(res):
234                 log.msg("finished downloading non-existend URI",
235                         level=log.UNUSUAL, facility="tahoe.tests")
236                 self.failUnless(isinstance(res, Failure))
237                 self.failUnless(res.check(NoSharesError),
238                                 "expected NoSharesError, got %s" % res)
239             d1.addBoth(_baduri_should_fail)
240             return d1
241         d.addCallback(_download_nonexistent_uri)
242
243         # add a new node, which doesn't accept shares, and only uses the
244         # helper for upload.
245         d.addCallback(lambda res: self.add_extra_node(self.numclients,
246                                                       self.helper_furl,
247                                                       add_to_sparent=True))
248         def _added(extra_node):
249             self.extra_node = extra_node
250         d.addCallback(_added)
251
252         HELPER_DATA = "Data that needs help to upload" * 1000
253         def _upload_with_helper(res):
254             u = upload.Data(HELPER_DATA, convergence=convergence)
255             d = self.extra_node.upload(u)
256             def _uploaded(results):
257                 cap = uri.from_string(results.uri)
258                 return self.downloader.download_to_data(cap)
259             d.addCallback(_uploaded)
260             def _check(newdata):
261                 self.failUnlessEqual(newdata, HELPER_DATA)
262             d.addCallback(_check)
263             return d
264         d.addCallback(_upload_with_helper)
265
266         def _upload_duplicate_with_helper(res):
267             u = upload.Data(HELPER_DATA, convergence=convergence)
268             u.debug_stash_RemoteEncryptedUploadable = True
269             d = self.extra_node.upload(u)
270             def _uploaded(results):
271                 cap = uri.from_string(results.uri)
272                 return self.downloader.download_to_data(cap)
273             d.addCallback(_uploaded)
274             def _check(newdata):
275                 self.failUnlessEqual(newdata, HELPER_DATA)
276                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
277                             "uploadable started uploading, should have been avoided")
278             d.addCallback(_check)
279             return d
280         if convergence is not None:
281             d.addCallback(_upload_duplicate_with_helper)
282
283         def _upload_resumable(res):
284             DATA = "Data that needs help to upload and gets interrupted" * 1000
285             u1 = CountingDataUploadable(DATA, convergence=convergence)
286             u2 = CountingDataUploadable(DATA, convergence=convergence)
287
288             # we interrupt the connection after about 5kB by shutting down
289             # the helper, then restartingit.
290             u1.interrupt_after = 5000
291             u1.interrupt_after_d = defer.Deferred()
292             u1.interrupt_after_d.addCallback(lambda res:
293                                              self.bounce_client(0))
294
295             # sneak into the helper and reduce its chunk size, so that our
296             # debug_interrupt will sever the connection on about the fifth
297             # chunk fetched. This makes sure that we've started to write the
298             # new shares before we abandon them, which exercises the
299             # abort/delete-partial-share code. TODO: find a cleaner way to do
300             # this. I know that this will affect later uses of the helper in
301             # this same test run, but I'm not currently worried about it.
302             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
303
304             d = self.extra_node.upload(u1)
305
306             def _should_not_finish(res):
307                 self.fail("interrupted upload should have failed, not finished"
308                           " with result %s" % (res,))
309             def _interrupted(f):
310                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
311
312                 # make sure we actually interrupted it before finishing the
313                 # file
314                 self.failUnless(u1.bytes_read < len(DATA),
315                                 "read %d out of %d total" % (u1.bytes_read,
316                                                              len(DATA)))
317
318                 log.msg("waiting for reconnect", level=log.NOISY,
319                         facility="tahoe.test.test_system")
320                 # now, we need to give the nodes a chance to notice that this
321                 # connection has gone away. When this happens, the storage
322                 # servers will be told to abort their uploads, removing the
323                 # partial shares. Unfortunately this involves TCP messages
324                 # going through the loopback interface, and we can't easily
325                 # predict how long that will take. If it were all local, we
326                 # could use fireEventually() to stall. Since we don't have
327                 # the right introduction hooks, the best we can do is use a
328                 # fixed delay. TODO: this is fragile.
329                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
330                 return u1.interrupt_after_d
331             d.addCallbacks(_should_not_finish, _interrupted)
332
333             def _disconnected(res):
334                 # check to make sure the storage servers aren't still hanging
335                 # on to the partial share: their incoming/ directories should
336                 # now be empty.
337                 log.msg("disconnected", level=log.NOISY,
338                         facility="tahoe.test.test_system")
339                 for i in range(self.numclients):
340                     incdir = os.path.join(self.getdir("client%d" % i),
341                                           "storage", "shares", "incoming")
342                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
343             d.addCallback(_disconnected)
344
345             # then we need to give the reconnector a chance to
346             # reestablish the connection to the helper.
347             d.addCallback(lambda res:
348                           log.msg("wait_for_connections", level=log.NOISY,
349                                   facility="tahoe.test.test_system"))
350             d.addCallback(lambda res: self.wait_for_connections())
351
352
353             d.addCallback(lambda res:
354                           log.msg("uploading again", level=log.NOISY,
355                                   facility="tahoe.test.test_system"))
356             d.addCallback(lambda res: self.extra_node.upload(u2))
357
358             def _uploaded(results):
359                 cap = uri.from_string(results.uri)
360                 log.msg("Second upload complete", level=log.NOISY,
361                         facility="tahoe.test.test_system")
362
363                 # this is really bytes received rather than sent, but it's
364                 # convenient and basically measures the same thing
365                 bytes_sent = results.ciphertext_fetched
366
367                 # We currently don't support resumption of upload if the data is
368                 # encrypted with a random key.  (Because that would require us
369                 # to store the key locally and re-use it on the next upload of
370                 # this file, which isn't a bad thing to do, but we currently
371                 # don't do it.)
372                 if convergence is not None:
373                     # Make sure we did not have to read the whole file the
374                     # second time around .
375                     self.failUnless(bytes_sent < len(DATA),
376                                 "resumption didn't save us any work:"
377                                 " read %d bytes out of %d total" %
378                                 (bytes_sent, len(DATA)))
379                 else:
380                     # Make sure we did have to read the whole file the second
381                     # time around -- because the one that we partially uploaded
382                     # earlier was encrypted with a different random key.
383                     self.failIf(bytes_sent < len(DATA),
384                                 "resumption saved us some work even though we were using random keys:"
385                                 " read %d bytes out of %d total" %
386                                 (bytes_sent, len(DATA)))
387                 return self.downloader.download_to_data(cap)
388             d.addCallback(_uploaded)
389
390             def _check(newdata):
391                 self.failUnlessEqual(newdata, DATA)
392                 # If using convergent encryption, then also check that the
393                 # helper has removed the temp file from its directories.
394                 if convergence is not None:
395                     basedir = os.path.join(self.getdir("client0"), "helper")
396                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
397                     self.failUnlessEqual(files, [])
398                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
399                     self.failUnlessEqual(files, [])
400             d.addCallback(_check)
401             return d
402         d.addCallback(_upload_resumable)
403
404         def _grab_stats(ignored):
405             # the StatsProvider doesn't normally publish a FURL:
406             # instead it passes a live reference to the StatsGatherer
407             # (if and when it connects). To exercise the remote stats
408             # interface, we manually publish client0's StatsProvider
409             # and use client1 to query it.
410             sp = self.clients[0].stats_provider
411             sp_furl = self.clients[0].tub.registerReference(sp)
412             d = self.clients[1].tub.getReference(sp_furl)
413             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
414             def _got_stats(stats):
415                 #print "STATS"
416                 #from pprint import pprint
417                 #pprint(stats)
418                 s = stats["stats"]
419                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
420                 c = stats["counters"]
421                 self.failUnless("storage_server.allocate" in c)
422             d.addCallback(_got_stats)
423             return d
424         d.addCallback(_grab_stats)
425
426         return d
427
428     def _find_shares(self, basedir):
429         shares = []
430         for (dirpath, dirnames, filenames) in os.walk(basedir):
431             if "storage" not in dirpath:
432                 continue
433             if not filenames:
434                 continue
435             pieces = dirpath.split(os.sep)
436             if (len(pieces) >= 5
437                 and pieces[-4] == "storage"
438                 and pieces[-3] == "shares"):
439                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
440                 # are sharefiles here
441                 assert pieces[-5].startswith("client")
442                 client_num = int(pieces[-5][-1])
443                 storage_index_s = pieces[-1]
444                 storage_index = si_a2b(storage_index_s)
445                 for sharename in filenames:
446                     shnum = int(sharename)
447                     filename = os.path.join(dirpath, sharename)
448                     data = (client_num, storage_index, filename, shnum)
449                     shares.append(data)
450         if not shares:
451             self.fail("unable to find any share files in %s" % basedir)
452         return shares
453
454     def _corrupt_mutable_share(self, filename, which):
455         msf = MutableShareFile(filename)
456         datav = msf.readv([ (0, 1000000) ])
457         final_share = datav[0]
458         assert len(final_share) < 1000000 # ought to be truncated
459         pieces = mutable_layout.unpack_share(final_share)
460         (seqnum, root_hash, IV, k, N, segsize, datalen,
461          verification_key, signature, share_hash_chain, block_hash_tree,
462          share_data, enc_privkey) = pieces
463
464         if which == "seqnum":
465             seqnum = seqnum + 15
466         elif which == "R":
467             root_hash = self.flip_bit(root_hash)
468         elif which == "IV":
469             IV = self.flip_bit(IV)
470         elif which == "segsize":
471             segsize = segsize + 15
472         elif which == "pubkey":
473             verification_key = self.flip_bit(verification_key)
474         elif which == "signature":
475             signature = self.flip_bit(signature)
476         elif which == "share_hash_chain":
477             nodenum = share_hash_chain.keys()[0]
478             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
479         elif which == "block_hash_tree":
480             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
481         elif which == "share_data":
482             share_data = self.flip_bit(share_data)
483         elif which == "encprivkey":
484             enc_privkey = self.flip_bit(enc_privkey)
485
486         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
487                                             segsize, datalen)
488         final_share = mutable_layout.pack_share(prefix,
489                                                 verification_key,
490                                                 signature,
491                                                 share_hash_chain,
492                                                 block_hash_tree,
493                                                 share_data,
494                                                 enc_privkey)
495         msf.writev( [(0, final_share)], None)
496
497
498     def test_mutable(self):
499         self.basedir = "system/SystemTest/test_mutable"
500         DATA = "initial contents go here."  # 25 bytes % 3 != 0
501         NEWDATA = "new contents yay"
502         NEWERDATA = "this is getting old"
503
504         d = self.set_up_nodes(use_key_generator=True)
505
506         def _create_mutable(res):
507             c = self.clients[0]
508             log.msg("starting create_mutable_file")
509             d1 = c.create_mutable_file(DATA)
510             def _done(res):
511                 log.msg("DONE: %s" % (res,))
512                 self._mutable_node_1 = res
513                 uri = res.get_uri()
514             d1.addCallback(_done)
515             return d1
516         d.addCallback(_create_mutable)
517
518         def _test_debug(res):
519             # find a share. It is important to run this while there is only
520             # one slot in the grid.
521             shares = self._find_shares(self.basedir)
522             (client_num, storage_index, filename, shnum) = shares[0]
523             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
524                     % filename)
525             log.msg(" for clients[%d]" % client_num)
526
527             out,err = StringIO(), StringIO()
528             rc = runner.runner(["debug", "dump-share", "--offsets",
529                                 filename],
530                                stdout=out, stderr=err)
531             output = out.getvalue()
532             self.failUnlessEqual(rc, 0)
533             try:
534                 self.failUnless("Mutable slot found:\n" in output)
535                 self.failUnless("share_type: SDMF\n" in output)
536                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
537                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
538                 self.failUnless(" num_extra_leases: 0\n" in output)
539                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
540                                 in output)
541                 self.failUnless(" SDMF contents:\n" in output)
542                 self.failUnless("  seqnum: 1\n" in output)
543                 self.failUnless("  required_shares: 3\n" in output)
544                 self.failUnless("  total_shares: 10\n" in output)
545                 self.failUnless("  segsize: 27\n" in output, (output, filename))
546                 self.failUnless("  datalen: 25\n" in output)
547                 # the exact share_hash_chain nodes depends upon the sharenum,
548                 # and is more of a hassle to compute than I want to deal with
549                 # now
550                 self.failUnless("  share_hash_chain: " in output)
551                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
552                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
553                             base32.b2a(storage_index))
554                 self.failUnless(expected in output)
555             except unittest.FailTest:
556                 print
557                 print "dump-share output was:"
558                 print output
559                 raise
560         d.addCallback(_test_debug)
561
562         # test retrieval
563
564         # first, let's see if we can use the existing node to retrieve the
565         # contents. This allows it to use the cached pubkey and maybe the
566         # latest-known sharemap.
567
568         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
569         def _check_download_1(res):
570             self.failUnlessEqual(res, DATA)
571             # now we see if we can retrieve the data from a new node,
572             # constructed using the URI of the original one. We do this test
573             # on the same client that uploaded the data.
574             uri = self._mutable_node_1.get_uri()
575             log.msg("starting retrieve1")
576             newnode = self.clients[0].create_node_from_uri(uri)
577             newnode_2 = self.clients[0].create_node_from_uri(uri)
578             self.failUnlessIdentical(newnode, newnode_2)
579             return newnode.download_best_version()
580         d.addCallback(_check_download_1)
581
582         def _check_download_2(res):
583             self.failUnlessEqual(res, DATA)
584             # same thing, but with a different client
585             uri = self._mutable_node_1.get_uri()
586             newnode = self.clients[1].create_node_from_uri(uri)
587             log.msg("starting retrieve2")
588             d1 = newnode.download_best_version()
589             d1.addCallback(lambda res: (res, newnode))
590             return d1
591         d.addCallback(_check_download_2)
592
593         def _check_download_3((res, newnode)):
594             self.failUnlessEqual(res, DATA)
595             # replace the data
596             log.msg("starting replace1")
597             d1 = newnode.overwrite(NEWDATA)
598             d1.addCallback(lambda res: newnode.download_best_version())
599             return d1
600         d.addCallback(_check_download_3)
601
602         def _check_download_4(res):
603             self.failUnlessEqual(res, NEWDATA)
604             # now create an even newer node and replace the data on it. This
605             # new node has never been used for download before.
606             uri = self._mutable_node_1.get_uri()
607             newnode1 = self.clients[2].create_node_from_uri(uri)
608             newnode2 = self.clients[3].create_node_from_uri(uri)
609             self._newnode3 = self.clients[3].create_node_from_uri(uri)
610             log.msg("starting replace2")
611             d1 = newnode1.overwrite(NEWERDATA)
612             d1.addCallback(lambda res: newnode2.download_best_version())
613             return d1
614         d.addCallback(_check_download_4)
615
616         def _check_download_5(res):
617             log.msg("finished replace2")
618             self.failUnlessEqual(res, NEWERDATA)
619         d.addCallback(_check_download_5)
620
621         def _corrupt_shares(res):
622             # run around and flip bits in all but k of the shares, to test
623             # the hash checks
624             shares = self._find_shares(self.basedir)
625             ## sort by share number
626             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
627             where = dict([ (shnum, filename)
628                            for (client_num, storage_index, filename, shnum)
629                            in shares ])
630             assert len(where) == 10 # this test is designed for 3-of-10
631             for shnum, filename in where.items():
632                 # shares 7,8,9 are left alone. read will check
633                 # (share_hash_chain, block_hash_tree, share_data). New
634                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
635                 # segsize, signature).
636                 if shnum == 0:
637                     # read: this will trigger "pubkey doesn't match
638                     # fingerprint".
639                     self._corrupt_mutable_share(filename, "pubkey")
640                     self._corrupt_mutable_share(filename, "encprivkey")
641                 elif shnum == 1:
642                     # triggers "signature is invalid"
643                     self._corrupt_mutable_share(filename, "seqnum")
644                 elif shnum == 2:
645                     # triggers "signature is invalid"
646                     self._corrupt_mutable_share(filename, "R")
647                 elif shnum == 3:
648                     # triggers "signature is invalid"
649                     self._corrupt_mutable_share(filename, "segsize")
650                 elif shnum == 4:
651                     self._corrupt_mutable_share(filename, "share_hash_chain")
652                 elif shnum == 5:
653                     self._corrupt_mutable_share(filename, "block_hash_tree")
654                 elif shnum == 6:
655                     self._corrupt_mutable_share(filename, "share_data")
656                 # other things to correct: IV, signature
657                 # 7,8,9 are left alone
658
659                 # note that initial_query_count=5 means that we'll hit the
660                 # first 5 servers in effectively random order (based upon
661                 # response time), so we won't necessarily ever get a "pubkey
662                 # doesn't match fingerprint" error (if we hit shnum>=1 before
663                 # shnum=0, we pull the pubkey from there). To get repeatable
664                 # specific failures, we need to set initial_query_count=1,
665                 # but of course that will change the sequencing behavior of
666                 # the retrieval process. TODO: find a reasonable way to make
667                 # this a parameter, probably when we expand this test to test
668                 # for one failure mode at a time.
669
670                 # when we retrieve this, we should get three signature
671                 # failures (where we've mangled seqnum, R, and segsize). The
672                 # pubkey mangling
673         d.addCallback(_corrupt_shares)
674
675         d.addCallback(lambda res: self._newnode3.download_best_version())
676         d.addCallback(_check_download_5)
677
678         def _check_empty_file(res):
679             # make sure we can create empty files, this usually screws up the
680             # segsize math
681             d1 = self.clients[2].create_mutable_file("")
682             d1.addCallback(lambda newnode: newnode.download_best_version())
683             d1.addCallback(lambda res: self.failUnlessEqual("", res))
684             return d1
685         d.addCallback(_check_empty_file)
686
687         d.addCallback(lambda res: self.clients[0].create_dirnode())
688         def _created_dirnode(dnode):
689             log.msg("_created_dirnode(%s)" % (dnode,))
690             d1 = dnode.list()
691             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
692             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
693             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
694             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
695             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
696             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
697             d1.addCallback(lambda res: dnode.build_manifest().when_done())
698             d1.addCallback(lambda res:
699                            self.failUnlessEqual(len(res["manifest"]), 1))
700             return d1
701         d.addCallback(_created_dirnode)
702
703         def wait_for_c3_kg_conn():
704             return self.clients[3]._key_generator is not None
705         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
706
707         def check_kg_poolsize(junk, size_delta):
708             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
709                                  self.key_generator_svc.key_generator.pool_size + size_delta)
710
711         d.addCallback(check_kg_poolsize, 0)
712         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
713         d.addCallback(check_kg_poolsize, -1)
714         d.addCallback(lambda junk: self.clients[3].create_dirnode())
715         d.addCallback(check_kg_poolsize, -2)
716         # use_helper induces use of clients[3], which is the using-key_gen client
717         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
718         d.addCallback(check_kg_poolsize, -3)
719
720         return d
721
722     def flip_bit(self, good):
723         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
724
725     def mangle_uri(self, gooduri):
726         # change the key, which changes the storage index, which means we'll
727         # be asking about the wrong file, so nobody will have any shares
728         u = IFileURI(gooduri)
729         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
730                             uri_extension_hash=u.uri_extension_hash,
731                             needed_shares=u.needed_shares,
732                             total_shares=u.total_shares,
733                             size=u.size)
734         return u2.to_string()
735
736     # TODO: add a test which mangles the uri_extension_hash instead, and
737     # should fail due to not being able to get a valid uri_extension block.
738     # Also a test which sneakily mangles the uri_extension block to change
739     # some of the validation data, so it will fail in the post-download phase
740     # when the file's crypttext integrity check fails. Do the same thing for
741     # the key, which should cause the download to fail the post-download
742     # plaintext_hash check.
743
744     def test_vdrive(self):
745         self.basedir = "system/SystemTest/test_vdrive"
746         self.data = LARGE_DATA
747         d = self.set_up_nodes(use_stats_gatherer=True)
748         d.addCallback(self._test_introweb)
749         d.addCallback(self.log, "starting publish")
750         d.addCallback(self._do_publish1)
751         d.addCallback(self._test_runner)
752         d.addCallback(self._do_publish2)
753         # at this point, we have the following filesystem (where "R" denotes
754         # self._root_directory_uri):
755         # R
756         # R/subdir1
757         # R/subdir1/mydata567
758         # R/subdir1/subdir2/
759         # R/subdir1/subdir2/mydata992
760
761         d.addCallback(lambda res: self.bounce_client(0))
762         d.addCallback(self.log, "bounced client0")
763
764         d.addCallback(self._check_publish1)
765         d.addCallback(self.log, "did _check_publish1")
766         d.addCallback(self._check_publish2)
767         d.addCallback(self.log, "did _check_publish2")
768         d.addCallback(self._do_publish_private)
769         d.addCallback(self.log, "did _do_publish_private")
770         # now we also have (where "P" denotes a new dir):
771         #  P/personal/sekrit data
772         #  P/s2-rw -> /subdir1/subdir2/
773         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
774         d.addCallback(self._check_publish_private)
775         d.addCallback(self.log, "did _check_publish_private")
776         d.addCallback(self._test_web)
777         d.addCallback(self._test_control)
778         d.addCallback(self._test_cli)
779         # P now has four top-level children:
780         # P/personal/sekrit data
781         # P/s2-ro/
782         # P/s2-rw/
783         # P/test_put/  (empty)
784         d.addCallback(self._test_checker)
785         return d
786
787     def _test_introweb(self, res):
788         d = getPage(self.introweb_url, method="GET", followRedirect=True)
789         def _check(res):
790             try:
791                 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
792                                 in res)
793                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
794                 self.failUnless("Subscription Summary: storage: 5" in res)
795             except unittest.FailTest:
796                 print
797                 print "GET %s output was:" % self.introweb_url
798                 print res
799                 raise
800         d.addCallback(_check)
801         d.addCallback(lambda res:
802                       getPage(self.introweb_url + "?t=json",
803                               method="GET", followRedirect=True))
804         def _check_json(res):
805             data = simplejson.loads(res)
806             try:
807                 self.failUnlessEqual(data["subscription_summary"],
808                                      {"storage": 5})
809                 self.failUnlessEqual(data["announcement_summary"],
810                                      {"storage": 5, "stub_client": 5})
811                 self.failUnlessEqual(data["announcement_distinct_hosts"],
812                                      {"storage": 1, "stub_client": 1})
813             except unittest.FailTest:
814                 print
815                 print "GET %s?t=json output was:" % self.introweb_url
816                 print res
817                 raise
818         d.addCallback(_check_json)
819         return d
820
821     def _do_publish1(self, res):
822         ut = upload.Data(self.data, convergence=None)
823         c0 = self.clients[0]
824         d = c0.create_dirnode()
825         def _made_root(new_dirnode):
826             self._root_directory_uri = new_dirnode.get_uri()
827             return c0.create_node_from_uri(self._root_directory_uri)
828         d.addCallback(_made_root)
829         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
830         def _made_subdir1(subdir1_node):
831             self._subdir1_node = subdir1_node
832             d1 = subdir1_node.add_file(u"mydata567", ut)
833             d1.addCallback(self.log, "publish finished")
834             def _stash_uri(filenode):
835                 self.uri = filenode.get_uri()
836                 assert isinstance(self.uri, str), (self.uri, filenode)
837             d1.addCallback(_stash_uri)
838             return d1
839         d.addCallback(_made_subdir1)
840         return d
841
842     def _do_publish2(self, res):
843         ut = upload.Data(self.data, convergence=None)
844         d = self._subdir1_node.create_subdirectory(u"subdir2")
845         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
846         return d
847
848     def log(self, res, *args, **kwargs):
849         # print "MSG: %s  RES: %s" % (msg, args)
850         log.msg(*args, **kwargs)
851         return res
852
853     def _do_publish_private(self, res):
854         self.smalldata = "sssh, very secret stuff"
855         ut = upload.Data(self.smalldata, convergence=None)
856         d = self.clients[0].create_dirnode()
857         d.addCallback(self.log, "GOT private directory")
858         def _got_new_dir(privnode):
859             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
860             d1 = privnode.create_subdirectory(u"personal")
861             d1.addCallback(self.log, "made P/personal")
862             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
863             d1.addCallback(self.log, "made P/personal/sekrit data")
864             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
865             def _got_s2(s2node):
866                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
867                                       s2node.get_readonly_uri())
868                 d2.addCallback(lambda node:
869                                privnode.set_uri(u"s2-ro",
870                                                 s2node.get_readonly_uri(),
871                                                 s2node.get_readonly_uri()))
872                 return d2
873             d1.addCallback(_got_s2)
874             d1.addCallback(lambda res: privnode)
875             return d1
876         d.addCallback(_got_new_dir)
877         return d
878
879     def _check_publish1(self, res):
880         # this one uses the iterative API
881         c1 = self.clients[1]
882         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
883         d.addCallback(self.log, "check_publish1 got /")
884         d.addCallback(lambda root: root.get(u"subdir1"))
885         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
886         d.addCallback(lambda filenode: filenode.download_to_data())
887         d.addCallback(self.log, "get finished")
888         def _get_done(data):
889             self.failUnlessEqual(data, self.data)
890         d.addCallback(_get_done)
891         return d
892
893     def _check_publish2(self, res):
894         # this one uses the path-based API
895         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
896         d = rootnode.get_child_at_path(u"subdir1")
897         d.addCallback(lambda dirnode:
898                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
899         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
900         d.addCallback(lambda filenode: filenode.download_to_data())
901         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
902
903         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
904         def _got_filenode(filenode):
905             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
906             assert fnode == filenode
907         d.addCallback(_got_filenode)
908         return d
909
910     def _check_publish_private(self, resnode):
911         # this one uses the path-based API
912         self._private_node = resnode
913
914         d = self._private_node.get_child_at_path(u"personal")
915         def _got_personal(personal):
916             self._personal_node = personal
917             return personal
918         d.addCallback(_got_personal)
919
920         d.addCallback(lambda dirnode:
921                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
922         def get_path(path):
923             return self._private_node.get_child_at_path(path)
924
925         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
926         d.addCallback(lambda filenode: filenode.download_to_data())
927         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
928         d.addCallback(lambda res: get_path(u"s2-rw"))
929         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
930         d.addCallback(lambda res: get_path(u"s2-ro"))
931         def _got_s2ro(dirnode):
932             self.failUnless(dirnode.is_mutable(), dirnode)
933             self.failUnless(dirnode.is_readonly(), dirnode)
934             d1 = defer.succeed(None)
935             d1.addCallback(lambda res: dirnode.list())
936             d1.addCallback(self.log, "dirnode.list")
937
938             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
939
940             d1.addCallback(self.log, "doing add_file(ro)")
941             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
942             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
943
944             d1.addCallback(self.log, "doing get(ro)")
945             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
946             d1.addCallback(lambda filenode:
947                            self.failUnless(IFileNode.providedBy(filenode)))
948
949             d1.addCallback(self.log, "doing delete(ro)")
950             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
951
952             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
953
954             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
955
956             personal = self._personal_node
957             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
958
959             d1.addCallback(self.log, "doing move_child_to(ro)2")
960             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
961
962             d1.addCallback(self.log, "finished with _got_s2ro")
963             return d1
964         d.addCallback(_got_s2ro)
965         def _got_home(dummy):
966             home = self._private_node
967             personal = self._personal_node
968             d1 = defer.succeed(None)
969             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
970             d1.addCallback(lambda res:
971                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
972
973             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
974             d1.addCallback(lambda res:
975                            home.move_child_to(u"sekrit", home, u"sekrit data"))
976
977             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
978             d1.addCallback(lambda res:
979                            home.move_child_to(u"sekrit data", personal))
980
981             d1.addCallback(lambda res: home.build_manifest().when_done())
982             d1.addCallback(self.log, "manifest")
983             #  five items:
984             # P/
985             # P/personal/
986             # P/personal/sekrit data
987             # P/s2-rw  (same as P/s2-ro)
988             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
989             d1.addCallback(lambda res:
990                            self.failUnlessEqual(len(res["manifest"]), 5))
991             d1.addCallback(lambda res: home.start_deep_stats().when_done())
992             def _check_stats(stats):
993                 expected = {"count-immutable-files": 1,
994                             "count-mutable-files": 0,
995                             "count-literal-files": 1,
996                             "count-files": 2,
997                             "count-directories": 3,
998                             "size-immutable-files": 112,
999                             "size-literal-files": 23,
1000                             #"size-directories": 616, # varies
1001                             #"largest-directory": 616,
1002                             "largest-directory-children": 3,
1003                             "largest-immutable-file": 112,
1004                             }
1005                 for k,v in expected.iteritems():
1006                     self.failUnlessEqual(stats[k], v,
1007                                          "stats[%s] was %s, not %s" %
1008                                          (k, stats[k], v))
1009                 self.failUnless(stats["size-directories"] > 1300,
1010                                 stats["size-directories"])
1011                 self.failUnless(stats["largest-directory"] > 800,
1012                                 stats["largest-directory"])
1013                 self.failUnlessEqual(stats["size-files-histogram"],
1014                                      [ (11, 31, 1), (101, 316, 1) ])
1015             d1.addCallback(_check_stats)
1016             return d1
1017         d.addCallback(_got_home)
1018         return d
1019
1020     def shouldFail(self, res, expected_failure, which, substring=None):
1021         if isinstance(res, Failure):
1022             res.trap(expected_failure)
1023             if substring:
1024                 self.failUnless(substring in str(res),
1025                                 "substring '%s' not in '%s'"
1026                                 % (substring, str(res)))
1027         else:
1028             self.fail("%s was supposed to raise %s, not get '%s'" %
1029                       (which, expected_failure, res))
1030
1031     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1032         assert substring is None or isinstance(substring, str)
1033         d = defer.maybeDeferred(callable, *args, **kwargs)
1034         def done(res):
1035             if isinstance(res, Failure):
1036                 res.trap(expected_failure)
1037                 if substring:
1038                     self.failUnless(substring in str(res),
1039                                     "substring '%s' not in '%s'"
1040                                     % (substring, str(res)))
1041             else:
1042                 self.fail("%s was supposed to raise %s, not get '%s'" %
1043                           (which, expected_failure, res))
1044         d.addBoth(done)
1045         return d
1046
1047     def PUT(self, urlpath, data):
1048         url = self.webish_url + urlpath
1049         return getPage(url, method="PUT", postdata=data)
1050
1051     def GET(self, urlpath, followRedirect=False):
1052         url = self.webish_url + urlpath
1053         return getPage(url, method="GET", followRedirect=followRedirect)
1054
1055     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1056         if use_helper:
1057             url = self.helper_webish_url + urlpath
1058         else:
1059             url = self.webish_url + urlpath
1060         sepbase = "boogabooga"
1061         sep = "--" + sepbase
1062         form = []
1063         form.append(sep)
1064         form.append('Content-Disposition: form-data; name="_charset"')
1065         form.append('')
1066         form.append('UTF-8')
1067         form.append(sep)
1068         for name, value in fields.iteritems():
1069             if isinstance(value, tuple):
1070                 filename, value = value
1071                 form.append('Content-Disposition: form-data; name="%s"; '
1072                             'filename="%s"' % (name, filename.encode("utf-8")))
1073             else:
1074                 form.append('Content-Disposition: form-data; name="%s"' % name)
1075             form.append('')
1076             form.append(str(value))
1077             form.append(sep)
1078         form[-1] += "--"
1079         body = "\r\n".join(form) + "\r\n"
1080         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1081                    }
1082         return getPage(url, method="POST", postdata=body,
1083                        headers=headers, followRedirect=followRedirect)
1084
1085     def _test_web(self, res):
1086         base = self.webish_url
1087         public = "uri/" + self._root_directory_uri
1088         d = getPage(base)
1089         def _got_welcome(page):
1090             # XXX This test is oversensitive to formatting
1091             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1092             self.failUnless(expected in page,
1093                             "I didn't see the right 'connected storage servers'"
1094                             " message in: %s" % page
1095                             )
1096             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1097             self.failUnless(expected in page,
1098                             "I didn't see the right 'My nodeid' message "
1099                             "in: %s" % page)
1100             self.failUnless("Helper: 0 active uploads" in page)
1101         d.addCallback(_got_welcome)
1102         d.addCallback(self.log, "done with _got_welcome")
1103
1104         # get the welcome page from the node that uses the helper too
1105         d.addCallback(lambda res: getPage(self.helper_webish_url))
1106         def _got_welcome_helper(page):
1107             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1108                             page)
1109             self.failUnless("Not running helper" in page)
1110         d.addCallback(_got_welcome_helper)
1111
1112         d.addCallback(lambda res: getPage(base + public))
1113         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1114         def _got_subdir1(page):
1115             # there ought to be an href for our file
1116             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1117             self.failUnless(">mydata567</a>" in page)
1118         d.addCallback(_got_subdir1)
1119         d.addCallback(self.log, "done with _got_subdir1")
1120         d.addCallback(lambda res:
1121                       getPage(base + public + "/subdir1/mydata567"))
1122         def _got_data(page):
1123             self.failUnlessEqual(page, self.data)
1124         d.addCallback(_got_data)
1125
1126         # download from a URI embedded in a URL
1127         d.addCallback(self.log, "_get_from_uri")
1128         def _get_from_uri(res):
1129             return getPage(base + "uri/%s?filename=%s"
1130                            % (self.uri, "mydata567"))
1131         d.addCallback(_get_from_uri)
1132         def _got_from_uri(page):
1133             self.failUnlessEqual(page, self.data)
1134         d.addCallback(_got_from_uri)
1135
1136         # download from a URI embedded in a URL, second form
1137         d.addCallback(self.log, "_get_from_uri2")
1138         def _get_from_uri2(res):
1139             return getPage(base + "uri?uri=%s" % (self.uri,))
1140         d.addCallback(_get_from_uri2)
1141         d.addCallback(_got_from_uri)
1142
1143         # download from a bogus URI, make sure we get a reasonable error
1144         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1145         def _get_from_bogus_uri(res):
1146             d1 = getPage(base + "uri/%s?filename=%s"
1147                          % (self.mangle_uri(self.uri), "mydata567"))
1148             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1149                        "410")
1150             return d1
1151         d.addCallback(_get_from_bogus_uri)
1152         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1153
1154         # upload a file with PUT
1155         d.addCallback(self.log, "about to try PUT")
1156         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1157                                            "new.txt contents"))
1158         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1159         d.addCallback(self.failUnlessEqual, "new.txt contents")
1160         # and again with something large enough to use multiple segments,
1161         # and hopefully trigger pauseProducing too
1162         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1163                                            "big" * 500000)) # 1.5MB
1164         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1165         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1166
1167         # can we replace files in place?
1168         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1169                                            "NEWER contents"))
1170         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1171         d.addCallback(self.failUnlessEqual, "NEWER contents")
1172
1173         # test unlinked POST
1174         d.addCallback(lambda res: self.POST("uri", t="upload",
1175                                             file=("new.txt", "data" * 10000)))
1176         # and again using the helper, which exercises different upload-status
1177         # display code
1178         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1179                                             file=("foo.txt", "data2" * 10000)))
1180
1181         # check that the status page exists
1182         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1183         def _got_status(res):
1184             # find an interesting upload and download to look at. LIT files
1185             # are not interesting.
1186             h = self.clients[0].get_history()
1187             for ds in h.list_all_download_statuses():
1188                 if ds.get_size() > 200:
1189                     self._down_status = ds.get_counter()
1190             for us in h.list_all_upload_statuses():
1191                 if us.get_size() > 200:
1192                     self._up_status = us.get_counter()
1193             rs = list(h.list_all_retrieve_statuses())[0]
1194             self._retrieve_status = rs.get_counter()
1195             ps = list(h.list_all_publish_statuses())[0]
1196             self._publish_status = ps.get_counter()
1197             us = list(h.list_all_mapupdate_statuses())[0]
1198             self._update_status = us.get_counter()
1199
1200             # and that there are some upload- and download- status pages
1201             return self.GET("status/up-%d" % self._up_status)
1202         d.addCallback(_got_status)
1203         def _got_up(res):
1204             return self.GET("status/down-%d" % self._down_status)
1205         d.addCallback(_got_up)
1206         def _got_down(res):
1207             return self.GET("status/mapupdate-%d" % self._update_status)
1208         d.addCallback(_got_down)
1209         def _got_update(res):
1210             return self.GET("status/publish-%d" % self._publish_status)
1211         d.addCallback(_got_update)
1212         def _got_publish(res):
1213             return self.GET("status/retrieve-%d" % self._retrieve_status)
1214         d.addCallback(_got_publish)
1215
1216         # check that the helper status page exists
1217         d.addCallback(lambda res:
1218                       self.GET("helper_status", followRedirect=True))
1219         def _got_helper_status(res):
1220             self.failUnless("Bytes Fetched:" in res)
1221             # touch a couple of files in the helper's working directory to
1222             # exercise more code paths
1223             workdir = os.path.join(self.getdir("client0"), "helper")
1224             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1225             f = open(incfile, "wb")
1226             f.write("small file")
1227             f.close()
1228             then = time.time() - 86400*3
1229             now = time.time()
1230             os.utime(incfile, (now, then))
1231             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1232             f = open(encfile, "wb")
1233             f.write("less small file")
1234             f.close()
1235             os.utime(encfile, (now, then))
1236         d.addCallback(_got_helper_status)
1237         # and that the json form exists
1238         d.addCallback(lambda res:
1239                       self.GET("helper_status?t=json", followRedirect=True))
1240         def _got_helper_status_json(res):
1241             data = simplejson.loads(res)
1242             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1243                                  1)
1244             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1245             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1246             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1247                                  10)
1248             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1249             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1250             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1251                                  15)
1252         d.addCallback(_got_helper_status_json)
1253
1254         # and check that client[3] (which uses a helper but does not run one
1255         # itself) doesn't explode when you ask for its status
1256         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1257         def _got_non_helper_status(res):
1258             self.failUnless("Upload and Download Status" in res)
1259         d.addCallback(_got_non_helper_status)
1260
1261         # or for helper status with t=json
1262         d.addCallback(lambda res:
1263                       getPage(self.helper_webish_url + "helper_status?t=json"))
1264         def _got_non_helper_status_json(res):
1265             data = simplejson.loads(res)
1266             self.failUnlessEqual(data, {})
1267         d.addCallback(_got_non_helper_status_json)
1268
1269         # see if the statistics page exists
1270         d.addCallback(lambda res: self.GET("statistics"))
1271         def _got_stats(res):
1272             self.failUnless("Node Statistics" in res)
1273             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1274         d.addCallback(_got_stats)
1275         d.addCallback(lambda res: self.GET("statistics?t=json"))
1276         def _got_stats_json(res):
1277             data = simplejson.loads(res)
1278             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1279             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1280         d.addCallback(_got_stats_json)
1281
1282         # TODO: mangle the second segment of a file, to test errors that
1283         # occur after we've already sent some good data, which uses a
1284         # different error path.
1285
1286         # TODO: download a URI with a form
1287         # TODO: create a directory by using a form
1288         # TODO: upload by using a form on the directory page
1289         #    url = base + "somedir/subdir1/freeform_post!!upload"
1290         # TODO: delete a file by using a button on the directory page
1291
1292         return d
1293
1294     def _test_runner(self, res):
1295         # exercise some of the diagnostic tools in runner.py
1296
1297         # find a share
1298         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1299             if "storage" not in dirpath:
1300                 continue
1301             if not filenames:
1302                 continue
1303             pieces = dirpath.split(os.sep)
1304             if (len(pieces) >= 4
1305                 and pieces[-4] == "storage"
1306                 and pieces[-3] == "shares"):
1307                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1308                 # are sharefiles here
1309                 filename = os.path.join(dirpath, filenames[0])
1310                 # peek at the magic to see if it is a chk share
1311                 magic = open(filename, "rb").read(4)
1312                 if magic == '\x00\x00\x00\x01':
1313                     break
1314         else:
1315             self.fail("unable to find any uri_extension files in %s"
1316                       % self.basedir)
1317         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1318
1319         out,err = StringIO(), StringIO()
1320         rc = runner.runner(["debug", "dump-share", "--offsets",
1321                             filename],
1322                            stdout=out, stderr=err)
1323         output = out.getvalue()
1324         self.failUnlessEqual(rc, 0)
1325
1326         # we only upload a single file, so we can assert some things about
1327         # its size and shares.
1328         self.failUnless(("share filename: %s" % filename) in output)
1329         self.failUnless("size: %d\n" % len(self.data) in output)
1330         self.failUnless("num_segments: 1\n" in output)
1331         # segment_size is always a multiple of needed_shares
1332         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1333         self.failUnless("total_shares: 10\n" in output)
1334         # keys which are supposed to be present
1335         for key in ("size", "num_segments", "segment_size",
1336                     "needed_shares", "total_shares",
1337                     "codec_name", "codec_params", "tail_codec_params",
1338                     #"plaintext_hash", "plaintext_root_hash",
1339                     "crypttext_hash", "crypttext_root_hash",
1340                     "share_root_hash", "UEB_hash"):
1341             self.failUnless("%s: " % key in output, key)
1342         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1343
1344         # now use its storage index to find the other shares using the
1345         # 'find-shares' tool
1346         sharedir, shnum = os.path.split(filename)
1347         storagedir, storage_index_s = os.path.split(sharedir)
1348         out,err = StringIO(), StringIO()
1349         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1350         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1351         rc = runner.runner(cmd, stdout=out, stderr=err)
1352         self.failUnlessEqual(rc, 0)
1353         out.seek(0)
1354         sharefiles = [sfn.strip() for sfn in out.readlines()]
1355         self.failUnlessEqual(len(sharefiles), 10)
1356
1357         # also exercise the 'catalog-shares' tool
1358         out,err = StringIO(), StringIO()
1359         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1360         cmd = ["debug", "catalog-shares"] + nodedirs
1361         rc = runner.runner(cmd, stdout=out, stderr=err)
1362         self.failUnlessEqual(rc, 0)
1363         out.seek(0)
1364         descriptions = [sfn.strip() for sfn in out.readlines()]
1365         self.failUnlessEqual(len(descriptions), 30)
1366         matching = [line
1367                     for line in descriptions
1368                     if line.startswith("CHK %s " % storage_index_s)]
1369         self.failUnlessEqual(len(matching), 10)
1370
1371     def _test_control(self, res):
1372         # exercise the remote-control-the-client foolscap interfaces in
1373         # allmydata.control (mostly used for performance tests)
1374         c0 = self.clients[0]
1375         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1376         control_furl = open(control_furl_file, "r").read().strip()
1377         # it doesn't really matter which Tub we use to connect to the client,
1378         # so let's just use our IntroducerNode's
1379         d = self.introducer.tub.getReference(control_furl)
1380         d.addCallback(self._test_control2, control_furl_file)
1381         return d
1382     def _test_control2(self, rref, filename):
1383         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1384         downfile = os.path.join(self.basedir, "control.downfile")
1385         d.addCallback(lambda uri:
1386                       rref.callRemote("download_from_uri_to_file",
1387                                       uri, downfile))
1388         def _check(res):
1389             self.failUnlessEqual(res, downfile)
1390             data = open(downfile, "r").read()
1391             expected_data = open(filename, "r").read()
1392             self.failUnlessEqual(data, expected_data)
1393         d.addCallback(_check)
1394         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1395         if sys.platform == "linux2":
1396             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1397         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1398         return d
1399
1400     def _test_cli(self, res):
1401         # run various CLI commands (in a thread, since they use blocking
1402         # network calls)
1403
1404         private_uri = self._private_node.get_uri()
1405         some_uri = self._root_directory_uri
1406         client0_basedir = self.getdir("client0")
1407
1408         nodeargs = [
1409             "--node-directory", client0_basedir,
1410             ]
1411         TESTDATA = "I will not write the same thing over and over.\n" * 100
1412
1413         d = defer.succeed(None)
1414
1415         # for compatibility with earlier versions, private/root_dir.cap is
1416         # supposed to be treated as an alias named "tahoe:". Start by making
1417         # sure that works, before we add other aliases.
1418
1419         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1420         f = open(root_file, "w")
1421         f.write(private_uri)
1422         f.close()
1423
1424         def run(ignored, verb, *args, **kwargs):
1425             stdin = kwargs.get("stdin", "")
1426             newargs = [verb] + nodeargs + list(args)
1427             return self._run_cli(newargs, stdin=stdin)
1428
1429         def _check_ls((out,err), expected_children, unexpected_children=[]):
1430             self.failUnlessEqual(err, "")
1431             for s in expected_children:
1432                 self.failUnless(s in out, (s,out))
1433             for s in unexpected_children:
1434                 self.failIf(s in out, (s,out))
1435
1436         def _check_ls_root((out,err)):
1437             self.failUnless("personal" in out)
1438             self.failUnless("s2-ro" in out)
1439             self.failUnless("s2-rw" in out)
1440             self.failUnlessEqual(err, "")
1441
1442         # this should reference private_uri
1443         d.addCallback(run, "ls")
1444         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1445
1446         d.addCallback(run, "list-aliases")
1447         def _check_aliases_1((out,err)):
1448             self.failUnlessEqual(err, "")
1449             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1450         d.addCallback(_check_aliases_1)
1451
1452         # now that that's out of the way, remove root_dir.cap and work with
1453         # new files
1454         d.addCallback(lambda res: os.unlink(root_file))
1455         d.addCallback(run, "list-aliases")
1456         def _check_aliases_2((out,err)):
1457             self.failUnlessEqual(err, "")
1458             self.failUnlessEqual(out, "")
1459         d.addCallback(_check_aliases_2)
1460
1461         d.addCallback(run, "mkdir")
1462         def _got_dir( (out,err) ):
1463             self.failUnless(uri.from_string_dirnode(out.strip()))
1464             return out.strip()
1465         d.addCallback(_got_dir)
1466         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1467
1468         d.addCallback(run, "list-aliases")
1469         def _check_aliases_3((out,err)):
1470             self.failUnlessEqual(err, "")
1471             self.failUnless("tahoe: " in out)
1472         d.addCallback(_check_aliases_3)
1473
1474         def _check_empty_dir((out,err)):
1475             self.failUnlessEqual(out, "")
1476             self.failUnlessEqual(err, "")
1477         d.addCallback(run, "ls")
1478         d.addCallback(_check_empty_dir)
1479
1480         def _check_missing_dir((out,err)):
1481             # TODO: check that rc==2
1482             self.failUnlessEqual(out, "")
1483             self.failUnlessEqual(err, "No such file or directory\n")
1484         d.addCallback(run, "ls", "bogus")
1485         d.addCallback(_check_missing_dir)
1486
1487         files = []
1488         datas = []
1489         for i in range(10):
1490             fn = os.path.join(self.basedir, "file%d" % i)
1491             files.append(fn)
1492             data = "data to be uploaded: file%d\n" % i
1493             datas.append(data)
1494             open(fn,"wb").write(data)
1495
1496         def _check_stdout_against((out,err), filenum=None, data=None):
1497             self.failUnlessEqual(err, "")
1498             if filenum is not None:
1499                 self.failUnlessEqual(out, datas[filenum])
1500             if data is not None:
1501                 self.failUnlessEqual(out, data)
1502
1503         # test all both forms of put: from a file, and from stdin
1504         #  tahoe put bar FOO
1505         d.addCallback(run, "put", files[0], "tahoe-file0")
1506         def _put_out((out,err)):
1507             self.failUnless("URI:LIT:" in out, out)
1508             self.failUnless("201 Created" in err, err)
1509             uri0 = out.strip()
1510             return run(None, "get", uri0)
1511         d.addCallback(_put_out)
1512         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1513
1514         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1515         #  tahoe put bar tahoe:FOO
1516         d.addCallback(run, "put", files[2], "tahoe:file2")
1517         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1518         def _check_put_mutable((out,err)):
1519             self._mutable_file3_uri = out.strip()
1520         d.addCallback(_check_put_mutable)
1521         d.addCallback(run, "get", "tahoe:file3")
1522         d.addCallback(_check_stdout_against, 3)
1523
1524         #  tahoe put FOO
1525         STDIN_DATA = "This is the file to upload from stdin."
1526         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1527         #  tahoe put tahoe:FOO
1528         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1529                       stdin="Other file from stdin.")
1530
1531         d.addCallback(run, "ls")
1532         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1533                                   "tahoe-file-stdin", "from-stdin"])
1534         d.addCallback(run, "ls", "subdir")
1535         d.addCallback(_check_ls, ["tahoe-file1"])
1536
1537         # tahoe mkdir FOO
1538         d.addCallback(run, "mkdir", "subdir2")
1539         d.addCallback(run, "ls")
1540         # TODO: extract the URI, set an alias with it
1541         d.addCallback(_check_ls, ["subdir2"])
1542
1543         # tahoe get: (to stdin and to a file)
1544         d.addCallback(run, "get", "tahoe-file0")
1545         d.addCallback(_check_stdout_against, 0)
1546         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1547         d.addCallback(_check_stdout_against, 1)
1548         outfile0 = os.path.join(self.basedir, "outfile0")
1549         d.addCallback(run, "get", "file2", outfile0)
1550         def _check_outfile0((out,err)):
1551             data = open(outfile0,"rb").read()
1552             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1553         d.addCallback(_check_outfile0)
1554         outfile1 = os.path.join(self.basedir, "outfile0")
1555         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1556         def _check_outfile1((out,err)):
1557             data = open(outfile1,"rb").read()
1558             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1559         d.addCallback(_check_outfile1)
1560
1561         d.addCallback(run, "rm", "tahoe-file0")
1562         d.addCallback(run, "rm", "tahoe:file2")
1563         d.addCallback(run, "ls")
1564         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1565
1566         d.addCallback(run, "ls", "-l")
1567         def _check_ls_l((out,err)):
1568             lines = out.split("\n")
1569             for l in lines:
1570                 if "tahoe-file-stdin" in l:
1571                     self.failUnless(l.startswith("-r-- "), l)
1572                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1573                 if "file3" in l:
1574                     self.failUnless(l.startswith("-rw- "), l) # mutable
1575         d.addCallback(_check_ls_l)
1576
1577         d.addCallback(run, "ls", "--uri")
1578         def _check_ls_uri((out,err)):
1579             lines = out.split("\n")
1580             for l in lines:
1581                 if "file3" in l:
1582                     self.failUnless(self._mutable_file3_uri in l)
1583         d.addCallback(_check_ls_uri)
1584
1585         d.addCallback(run, "ls", "--readonly-uri")
1586         def _check_ls_rouri((out,err)):
1587             lines = out.split("\n")
1588             for l in lines:
1589                 if "file3" in l:
1590                     rw_uri = self._mutable_file3_uri
1591                     u = uri.from_string_mutable_filenode(rw_uri)
1592                     ro_uri = u.get_readonly().to_string()
1593                     self.failUnless(ro_uri in l)
1594         d.addCallback(_check_ls_rouri)
1595
1596
1597         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1598         d.addCallback(run, "ls")
1599         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1600
1601         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1602         d.addCallback(run, "ls")
1603         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1604
1605         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1606         d.addCallback(run, "ls")
1607         d.addCallback(_check_ls, ["file3", "file3-copy"])
1608         d.addCallback(run, "get", "tahoe:file3-copy")
1609         d.addCallback(_check_stdout_against, 3)
1610
1611         # copy from disk into tahoe
1612         d.addCallback(run, "cp", files[4], "tahoe:file4")
1613         d.addCallback(run, "ls")
1614         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1615         d.addCallback(run, "get", "tahoe:file4")
1616         d.addCallback(_check_stdout_against, 4)
1617
1618         # copy from tahoe into disk
1619         target_filename = os.path.join(self.basedir, "file-out")
1620         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1621         def _check_cp_out((out,err)):
1622             self.failUnless(os.path.exists(target_filename))
1623             got = open(target_filename,"rb").read()
1624             self.failUnlessEqual(got, datas[4])
1625         d.addCallback(_check_cp_out)
1626
1627         # copy from disk to disk (silly case)
1628         target2_filename = os.path.join(self.basedir, "file-out-copy")
1629         d.addCallback(run, "cp", target_filename, target2_filename)
1630         def _check_cp_out2((out,err)):
1631             self.failUnless(os.path.exists(target2_filename))
1632             got = open(target2_filename,"rb").read()
1633             self.failUnlessEqual(got, datas[4])
1634         d.addCallback(_check_cp_out2)
1635
1636         # copy from tahoe into disk, overwriting an existing file
1637         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1638         def _check_cp_out3((out,err)):
1639             self.failUnless(os.path.exists(target_filename))
1640             got = open(target_filename,"rb").read()
1641             self.failUnlessEqual(got, datas[3])
1642         d.addCallback(_check_cp_out3)
1643
1644         # copy from disk into tahoe, overwriting an existing immutable file
1645         d.addCallback(run, "cp", files[5], "tahoe:file4")
1646         d.addCallback(run, "ls")
1647         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1648         d.addCallback(run, "get", "tahoe:file4")
1649         d.addCallback(_check_stdout_against, 5)
1650
1651         # copy from disk into tahoe, overwriting an existing mutable file
1652         d.addCallback(run, "cp", files[5], "tahoe:file3")
1653         d.addCallback(run, "ls")
1654         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1655         d.addCallback(run, "get", "tahoe:file3")
1656         d.addCallback(_check_stdout_against, 5)
1657
1658         # recursive copy: setup
1659         dn = os.path.join(self.basedir, "dir1")
1660         os.makedirs(dn)
1661         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1662         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1663         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1664         sdn2 = os.path.join(dn, "subdir2")
1665         os.makedirs(sdn2)
1666         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1667         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1668
1669         # from disk into tahoe
1670         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1671         d.addCallback(run, "ls")
1672         d.addCallback(_check_ls, ["dir1"])
1673         d.addCallback(run, "ls", "dir1")
1674         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1675                       ["rfile4", "rfile5"])
1676         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1677         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1678                       ["rfile1", "rfile2", "rfile3"])
1679         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1680         d.addCallback(_check_stdout_against, data="rfile4")
1681
1682         # and back out again
1683         dn_copy = os.path.join(self.basedir, "dir1-copy")
1684         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1685         def _check_cp_r_out((out,err)):
1686             def _cmp(name):
1687                 old = open(os.path.join(dn, name), "rb").read()
1688                 newfn = os.path.join(dn_copy, name)
1689                 self.failUnless(os.path.exists(newfn))
1690                 new = open(newfn, "rb").read()
1691                 self.failUnlessEqual(old, new)
1692             _cmp("rfile1")
1693             _cmp("rfile2")
1694             _cmp("rfile3")
1695             _cmp(os.path.join("subdir2", "rfile4"))
1696             _cmp(os.path.join("subdir2", "rfile5"))
1697         d.addCallback(_check_cp_r_out)
1698
1699         # and copy it a second time, which ought to overwrite the same files
1700         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1701
1702         # and again, only writing filecaps
1703         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1704         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1705         def _check_capsonly((out,err)):
1706             # these should all be LITs
1707             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1708             y = uri.from_string_filenode(x)
1709             self.failUnlessEqual(y.data, "rfile4")
1710         d.addCallback(_check_capsonly)
1711
1712         # and tahoe-to-tahoe
1713         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1714         d.addCallback(run, "ls")
1715         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1716         d.addCallback(run, "ls", "dir1-copy")
1717         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1718                       ["rfile4", "rfile5"])
1719         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1720         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1721                       ["rfile1", "rfile2", "rfile3"])
1722         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1723         d.addCallback(_check_stdout_against, data="rfile4")
1724
1725         # and copy it a second time, which ought to overwrite the same files
1726         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1727
1728         # tahoe_ls doesn't currently handle the error correctly: it tries to
1729         # JSON-parse a traceback.
1730 ##         def _ls_missing(res):
1731 ##             argv = ["ls"] + nodeargs + ["bogus"]
1732 ##             return self._run_cli(argv)
1733 ##         d.addCallback(_ls_missing)
1734 ##         def _check_ls_missing((out,err)):
1735 ##             print "OUT", out
1736 ##             print "ERR", err
1737 ##             self.failUnlessEqual(err, "")
1738 ##         d.addCallback(_check_ls_missing)
1739
1740         return d
1741
1742     def _run_cli(self, argv, stdin=""):
1743         #print "CLI:", argv
1744         stdout, stderr = StringIO(), StringIO()
1745         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1746                                   stdin=StringIO(stdin),
1747                                   stdout=stdout, stderr=stderr)
1748         def _done(res):
1749             return stdout.getvalue(), stderr.getvalue()
1750         d.addCallback(_done)
1751         return d
1752
1753     def _test_checker(self, res):
1754         ut = upload.Data("too big to be literal" * 200, convergence=None)
1755         d = self._personal_node.add_file(u"big file", ut)
1756
1757         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1758         def _check_dirnode_results(r):
1759             self.failUnless(r.is_healthy())
1760         d.addCallback(_check_dirnode_results)
1761         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1762         d.addCallback(_check_dirnode_results)
1763
1764         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1765         def _got_chk_filenode(n):
1766             self.failUnless(isinstance(n, filenode.FileNode))
1767             d = n.check(Monitor())
1768             def _check_filenode_results(r):
1769                 self.failUnless(r.is_healthy())
1770             d.addCallback(_check_filenode_results)
1771             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1772             d.addCallback(_check_filenode_results)
1773             return d
1774         d.addCallback(_got_chk_filenode)
1775
1776         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1777         def _got_lit_filenode(n):
1778             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1779             d = n.check(Monitor())
1780             def _check_lit_filenode_results(r):
1781                 self.failUnlessEqual(r, None)
1782             d.addCallback(_check_lit_filenode_results)
1783             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1784             d.addCallback(_check_lit_filenode_results)
1785             return d
1786         d.addCallback(_got_lit_filenode)
1787         return d
1788