]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
hush pyflakes
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, re, simplejson
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri
12 from allmydata.storage.mutable import MutableShareFile
13 from allmydata.storage.server import si_a2b
14 from allmydata.immutable import download, filenode, offloaded, upload
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.scripts import runner
18 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
19      NoSuchChildError, NoSharesError
20 from allmydata.monitor import Monitor
21 from allmydata.mutable.common import NotMutableError
22 from allmydata.mutable import layout as mutable_layout
23 from foolscap.api import DeadReferenceError
24 from twisted.python.failure import Failure
25 from twisted.web.client import getPage
26 from twisted.web.error import Error
27
28 from allmydata.test.common import SystemTestMixin, MemoryConsumer, \
29      download_to_data
30
31 LARGE_DATA = """
32 This is some data to publish to the virtual drive, which needs to be large
33 enough to not fit inside a LIT uri.
34 """
35
36 class CountingDataUploadable(upload.Data):
37     bytes_read = 0
38     interrupt_after = None
39     interrupt_after_d = None
40
41     def read(self, length):
42         self.bytes_read += length
43         if self.interrupt_after is not None:
44             if self.bytes_read > self.interrupt_after:
45                 self.interrupt_after = None
46                 self.interrupt_after_d.callback(self)
47         return upload.Data.read(self, length)
48
49 class GrabEverythingConsumer:
50     implements(IConsumer)
51
52     def __init__(self):
53         self.contents = ""
54
55     def registerProducer(self, producer, streaming):
56         assert streaming
57         assert IPushProducer.providedBy(producer)
58
59     def write(self, data):
60         self.contents += data
61
62     def unregisterProducer(self):
63         pass
64
65 class SystemTest(SystemTestMixin, unittest.TestCase):
66     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
67
68     def test_connections(self):
69         self.basedir = "system/SystemTest/test_connections"
70         d = self.set_up_nodes()
71         self.extra_node = None
72         d.addCallback(lambda res: self.add_extra_node(self.numclients))
73         def _check(extra_node):
74             self.extra_node = extra_node
75             for c in self.clients:
76                 all_peerids = c.get_storage_broker().get_all_serverids()
77                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
78                 sb = c.storage_broker
79                 permuted_peers = sb.get_servers_for_index("a")
80                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
81
82         d.addCallback(_check)
83         def _shutdown_extra_node(res):
84             if self.extra_node:
85                 return self.extra_node.stopService()
86             return res
87         d.addBoth(_shutdown_extra_node)
88         return d
89     # test_connections is subsumed by test_upload_and_download, and takes
90     # quite a while to run on a slow machine (because of all the TLS
91     # connections that must be established). If we ever rework the introducer
92     # code to such an extent that we're not sure if it works anymore, we can
93     # reinstate this test until it does.
94     del test_connections
95
96     def test_upload_and_download_random_key(self):
97         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
98         return self._test_upload_and_download(convergence=None)
99
100     def test_upload_and_download_convergent(self):
101         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
102         return self._test_upload_and_download(convergence="some convergence string")
103
104     def _test_upload_and_download(self, convergence):
105         # we use 4000 bytes of data, which will result in about 400k written
106         # to disk among all our simulated nodes
107         DATA = "Some data to upload\n" * 200
108         d = self.set_up_nodes()
109         def _check_connections(res):
110             for c in self.clients:
111                 all_peerids = c.get_storage_broker().get_all_serverids()
112                 self.failUnlessEqual(len(all_peerids), self.numclients)
113                 sb = c.storage_broker
114                 permuted_peers = sb.get_servers_for_index("a")
115                 self.failUnlessEqual(len(permuted_peers), self.numclients)
116         d.addCallback(_check_connections)
117
118         def _do_upload(res):
119             log.msg("UPLOADING")
120             u = self.clients[0].getServiceNamed("uploader")
121             self.uploader = u
122             # we crank the max segsize down to 1024b for the duration of this
123             # test, so we can exercise multiple segments. It is important
124             # that this is not a multiple of the segment size, so that the
125             # tail segment is not the same length as the others. This actualy
126             # gets rounded up to 1025 to be a multiple of the number of
127             # required shares (since we use 25 out of 100 FEC).
128             up = upload.Data(DATA, convergence=convergence)
129             up.max_segment_size = 1024
130             d1 = u.upload(up)
131             return d1
132         d.addCallback(_do_upload)
133         def _upload_done(results):
134             theuri = results.uri
135             log.msg("upload finished: uri is %s" % (theuri,))
136             self.uri = theuri
137             assert isinstance(self.uri, str), self.uri
138             dl = self.clients[1].getServiceNamed("downloader")
139             self.downloader = dl
140         d.addCallback(_upload_done)
141
142         def _upload_again(res):
143             # Upload again. If using convergent encryption then this ought to be
144             # short-circuited, however with the way we currently generate URIs
145             # (i.e. because they include the roothash), we have to do all of the
146             # encoding work, and only get to save on the upload part.
147             log.msg("UPLOADING AGAIN")
148             up = upload.Data(DATA, convergence=convergence)
149             up.max_segment_size = 1024
150             d1 = self.uploader.upload(up)
151         d.addCallback(_upload_again)
152
153         def _download_to_data(res):
154             log.msg("DOWNLOADING")
155             return self.downloader.download_to_data(self.uri)
156         d.addCallback(_download_to_data)
157         def _download_to_data_done(data):
158             log.msg("download finished")
159             self.failUnlessEqual(data, DATA)
160         d.addCallback(_download_to_data_done)
161
162         target_filename = os.path.join(self.basedir, "download.target")
163         def _download_to_filename(res):
164             return self.downloader.download_to_filename(self.uri,
165                                                         target_filename)
166         d.addCallback(_download_to_filename)
167         def _download_to_filename_done(res):
168             newdata = open(target_filename, "rb").read()
169             self.failUnlessEqual(newdata, DATA)
170         d.addCallback(_download_to_filename_done)
171
172         target_filename2 = os.path.join(self.basedir, "download.target2")
173         def _download_to_filehandle(res):
174             fh = open(target_filename2, "wb")
175             return self.downloader.download_to_filehandle(self.uri, fh)
176         d.addCallback(_download_to_filehandle)
177         def _download_to_filehandle_done(fh):
178             fh.close()
179             newdata = open(target_filename2, "rb").read()
180             self.failUnlessEqual(newdata, DATA)
181         d.addCallback(_download_to_filehandle_done)
182
183         consumer = GrabEverythingConsumer()
184         ct = download.ConsumerAdapter(consumer)
185         d.addCallback(lambda res:
186                       self.downloader.download(self.uri, ct))
187         def _download_to_consumer_done(ign):
188             self.failUnlessEqual(consumer.contents, DATA)
189         d.addCallback(_download_to_consumer_done)
190
191         def _test_read(res):
192             n = self.clients[1].create_node_from_uri(self.uri)
193             d = download_to_data(n)
194             def _read_done(data):
195                 self.failUnlessEqual(data, DATA)
196             d.addCallback(_read_done)
197             d.addCallback(lambda ign:
198                           n.read(MemoryConsumer(), offset=1, size=4))
199             def _read_portion_done(mc):
200                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
201             d.addCallback(_read_portion_done)
202             d.addCallback(lambda ign:
203                           n.read(MemoryConsumer(), offset=2, size=None))
204             def _read_tail_done(mc):
205                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
206             d.addCallback(_read_tail_done)
207             d.addCallback(lambda ign:
208                           n.read(MemoryConsumer(), size=len(DATA)+1000))
209             def _read_too_much(mc):
210                 self.failUnlessEqual("".join(mc.chunks), DATA)
211             d.addCallback(_read_too_much)
212
213             return d
214         d.addCallback(_test_read)
215
216         def _test_bad_read(res):
217             bad_u = uri.from_string_filenode(self.uri)
218             bad_u.key = self.flip_bit(bad_u.key)
219             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
220             # this should cause an error during download
221
222             d = self.shouldFail2(NoSharesError, "'download bad node'",
223                                  None,
224                                  bad_n.read, MemoryConsumer(), offset=2)
225             return d
226         d.addCallback(_test_bad_read)
227
228         def _download_nonexistent_uri(res):
229             baduri = self.mangle_uri(self.uri)
230             log.msg("about to download non-existent URI", level=log.UNUSUAL,
231                     facility="tahoe.tests")
232             d1 = self.downloader.download_to_data(baduri)
233             def _baduri_should_fail(res):
234                 log.msg("finished downloading non-existend URI",
235                         level=log.UNUSUAL, facility="tahoe.tests")
236                 self.failUnless(isinstance(res, Failure))
237                 self.failUnless(res.check(NoSharesError),
238                                 "expected NoSharesError, got %s" % res)
239             d1.addBoth(_baduri_should_fail)
240             return d1
241         d.addCallback(_download_nonexistent_uri)
242
243         # add a new node, which doesn't accept shares, and only uses the
244         # helper for upload.
245         d.addCallback(lambda res: self.add_extra_node(self.numclients,
246                                                       self.helper_furl,
247                                                       add_to_sparent=True))
248         def _added(extra_node):
249             self.extra_node = extra_node
250         d.addCallback(_added)
251
252         HELPER_DATA = "Data that needs help to upload" * 1000
253         def _upload_with_helper(res):
254             u = upload.Data(HELPER_DATA, convergence=convergence)
255             d = self.extra_node.upload(u)
256             def _uploaded(results):
257                 uri = results.uri
258                 return self.downloader.download_to_data(uri)
259             d.addCallback(_uploaded)
260             def _check(newdata):
261                 self.failUnlessEqual(newdata, HELPER_DATA)
262             d.addCallback(_check)
263             return d
264         d.addCallback(_upload_with_helper)
265
266         def _upload_duplicate_with_helper(res):
267             u = upload.Data(HELPER_DATA, convergence=convergence)
268             u.debug_stash_RemoteEncryptedUploadable = True
269             d = self.extra_node.upload(u)
270             def _uploaded(results):
271                 uri = results.uri
272                 return self.downloader.download_to_data(uri)
273             d.addCallback(_uploaded)
274             def _check(newdata):
275                 self.failUnlessEqual(newdata, HELPER_DATA)
276                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
277                             "uploadable started uploading, should have been avoided")
278             d.addCallback(_check)
279             return d
280         if convergence is not None:
281             d.addCallback(_upload_duplicate_with_helper)
282
283         def _upload_resumable(res):
284             DATA = "Data that needs help to upload and gets interrupted" * 1000
285             u1 = CountingDataUploadable(DATA, convergence=convergence)
286             u2 = CountingDataUploadable(DATA, convergence=convergence)
287
288             # we interrupt the connection after about 5kB by shutting down
289             # the helper, then restartingit.
290             u1.interrupt_after = 5000
291             u1.interrupt_after_d = defer.Deferred()
292             u1.interrupt_after_d.addCallback(lambda res:
293                                              self.bounce_client(0))
294
295             # sneak into the helper and reduce its chunk size, so that our
296             # debug_interrupt will sever the connection on about the fifth
297             # chunk fetched. This makes sure that we've started to write the
298             # new shares before we abandon them, which exercises the
299             # abort/delete-partial-share code. TODO: find a cleaner way to do
300             # this. I know that this will affect later uses of the helper in
301             # this same test run, but I'm not currently worried about it.
302             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
303
304             d = self.extra_node.upload(u1)
305
306             def _should_not_finish(res):
307                 self.fail("interrupted upload should have failed, not finished"
308                           " with result %s" % (res,))
309             def _interrupted(f):
310                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
311
312                 # make sure we actually interrupted it before finishing the
313                 # file
314                 self.failUnless(u1.bytes_read < len(DATA),
315                                 "read %d out of %d total" % (u1.bytes_read,
316                                                              len(DATA)))
317
318                 log.msg("waiting for reconnect", level=log.NOISY,
319                         facility="tahoe.test.test_system")
320                 # now, we need to give the nodes a chance to notice that this
321                 # connection has gone away. When this happens, the storage
322                 # servers will be told to abort their uploads, removing the
323                 # partial shares. Unfortunately this involves TCP messages
324                 # going through the loopback interface, and we can't easily
325                 # predict how long that will take. If it were all local, we
326                 # could use fireEventually() to stall. Since we don't have
327                 # the right introduction hooks, the best we can do is use a
328                 # fixed delay. TODO: this is fragile.
329                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
330                 return u1.interrupt_after_d
331             d.addCallbacks(_should_not_finish, _interrupted)
332
333             def _disconnected(res):
334                 # check to make sure the storage servers aren't still hanging
335                 # on to the partial share: their incoming/ directories should
336                 # now be empty.
337                 log.msg("disconnected", level=log.NOISY,
338                         facility="tahoe.test.test_system")
339                 for i in range(self.numclients):
340                     incdir = os.path.join(self.getdir("client%d" % i),
341                                           "storage", "shares", "incoming")
342                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
343             d.addCallback(_disconnected)
344
345             # then we need to give the reconnector a chance to
346             # reestablish the connection to the helper.
347             d.addCallback(lambda res:
348                           log.msg("wait_for_connections", level=log.NOISY,
349                                   facility="tahoe.test.test_system"))
350             d.addCallback(lambda res: self.wait_for_connections())
351
352
353             d.addCallback(lambda res:
354                           log.msg("uploading again", level=log.NOISY,
355                                   facility="tahoe.test.test_system"))
356             d.addCallback(lambda res: self.extra_node.upload(u2))
357
358             def _uploaded(results):
359                 uri = results.uri
360                 log.msg("Second upload complete", level=log.NOISY,
361                         facility="tahoe.test.test_system")
362
363                 # this is really bytes received rather than sent, but it's
364                 # convenient and basically measures the same thing
365                 bytes_sent = results.ciphertext_fetched
366
367                 # We currently don't support resumption of upload if the data is
368                 # encrypted with a random key.  (Because that would require us
369                 # to store the key locally and re-use it on the next upload of
370                 # this file, which isn't a bad thing to do, but we currently
371                 # don't do it.)
372                 if convergence is not None:
373                     # Make sure we did not have to read the whole file the
374                     # second time around .
375                     self.failUnless(bytes_sent < len(DATA),
376                                 "resumption didn't save us any work:"
377                                 " read %d bytes out of %d total" %
378                                 (bytes_sent, len(DATA)))
379                 else:
380                     # Make sure we did have to read the whole file the second
381                     # time around -- because the one that we partially uploaded
382                     # earlier was encrypted with a different random key.
383                     self.failIf(bytes_sent < len(DATA),
384                                 "resumption saved us some work even though we were using random keys:"
385                                 " read %d bytes out of %d total" %
386                                 (bytes_sent, len(DATA)))
387                 return self.downloader.download_to_data(uri)
388             d.addCallback(_uploaded)
389
390             def _check(newdata):
391                 self.failUnlessEqual(newdata, DATA)
392                 # If using convergent encryption, then also check that the
393                 # helper has removed the temp file from its directories.
394                 if convergence is not None:
395                     basedir = os.path.join(self.getdir("client0"), "helper")
396                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
397                     self.failUnlessEqual(files, [])
398                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
399                     self.failUnlessEqual(files, [])
400             d.addCallback(_check)
401             return d
402         d.addCallback(_upload_resumable)
403
404         def _grab_stats(ignored):
405             # the StatsProvider doesn't normally publish a FURL:
406             # instead it passes a live reference to the StatsGatherer
407             # (if and when it connects). To exercise the remote stats
408             # interface, we manually publish client0's StatsProvider
409             # and use client1 to query it.
410             sp = self.clients[0].stats_provider
411             sp_furl = self.clients[0].tub.registerReference(sp)
412             d = self.clients[1].tub.getReference(sp_furl)
413             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
414             def _got_stats(stats):
415                 #print "STATS"
416                 #from pprint import pprint
417                 #pprint(stats)
418                 s = stats["stats"]
419                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
420                 c = stats["counters"]
421                 self.failUnless("storage_server.allocate" in c)
422             d.addCallback(_got_stats)
423             return d
424         d.addCallback(_grab_stats)
425
426         return d
427
428     def _find_shares(self, basedir):
429         shares = []
430         for (dirpath, dirnames, filenames) in os.walk(basedir):
431             if "storage" not in dirpath:
432                 continue
433             if not filenames:
434                 continue
435             pieces = dirpath.split(os.sep)
436             if (len(pieces) >= 5
437                 and pieces[-4] == "storage"
438                 and pieces[-3] == "shares"):
439                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
440                 # are sharefiles here
441                 assert pieces[-5].startswith("client")
442                 client_num = int(pieces[-5][-1])
443                 storage_index_s = pieces[-1]
444                 storage_index = si_a2b(storage_index_s)
445                 for sharename in filenames:
446                     shnum = int(sharename)
447                     filename = os.path.join(dirpath, sharename)
448                     data = (client_num, storage_index, filename, shnum)
449                     shares.append(data)
450         if not shares:
451             self.fail("unable to find any share files in %s" % basedir)
452         return shares
453
454     def _corrupt_mutable_share(self, filename, which):
455         msf = MutableShareFile(filename)
456         datav = msf.readv([ (0, 1000000) ])
457         final_share = datav[0]
458         assert len(final_share) < 1000000 # ought to be truncated
459         pieces = mutable_layout.unpack_share(final_share)
460         (seqnum, root_hash, IV, k, N, segsize, datalen,
461          verification_key, signature, share_hash_chain, block_hash_tree,
462          share_data, enc_privkey) = pieces
463
464         if which == "seqnum":
465             seqnum = seqnum + 15
466         elif which == "R":
467             root_hash = self.flip_bit(root_hash)
468         elif which == "IV":
469             IV = self.flip_bit(IV)
470         elif which == "segsize":
471             segsize = segsize + 15
472         elif which == "pubkey":
473             verification_key = self.flip_bit(verification_key)
474         elif which == "signature":
475             signature = self.flip_bit(signature)
476         elif which == "share_hash_chain":
477             nodenum = share_hash_chain.keys()[0]
478             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
479         elif which == "block_hash_tree":
480             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
481         elif which == "share_data":
482             share_data = self.flip_bit(share_data)
483         elif which == "encprivkey":
484             enc_privkey = self.flip_bit(enc_privkey)
485
486         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
487                                             segsize, datalen)
488         final_share = mutable_layout.pack_share(prefix,
489                                                 verification_key,
490                                                 signature,
491                                                 share_hash_chain,
492                                                 block_hash_tree,
493                                                 share_data,
494                                                 enc_privkey)
495         msf.writev( [(0, final_share)], None)
496
497
498     def test_mutable(self):
499         self.basedir = "system/SystemTest/test_mutable"
500         DATA = "initial contents go here."  # 25 bytes % 3 != 0
501         NEWDATA = "new contents yay"
502         NEWERDATA = "this is getting old"
503
504         d = self.set_up_nodes(use_key_generator=True)
505
506         def _create_mutable(res):
507             c = self.clients[0]
508             log.msg("starting create_mutable_file")
509             d1 = c.create_mutable_file(DATA)
510             def _done(res):
511                 log.msg("DONE: %s" % (res,))
512                 self._mutable_node_1 = res
513                 uri = res.get_uri()
514             d1.addCallback(_done)
515             return d1
516         d.addCallback(_create_mutable)
517
518         def _test_debug(res):
519             # find a share. It is important to run this while there is only
520             # one slot in the grid.
521             shares = self._find_shares(self.basedir)
522             (client_num, storage_index, filename, shnum) = shares[0]
523             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
524                     % filename)
525             log.msg(" for clients[%d]" % client_num)
526
527             out,err = StringIO(), StringIO()
528             rc = runner.runner(["debug", "dump-share", "--offsets",
529                                 filename],
530                                stdout=out, stderr=err)
531             output = out.getvalue()
532             self.failUnlessEqual(rc, 0)
533             try:
534                 self.failUnless("Mutable slot found:\n" in output)
535                 self.failUnless("share_type: SDMF\n" in output)
536                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
537                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
538                 self.failUnless(" num_extra_leases: 0\n" in output)
539                 # the pubkey size can vary by a byte, so the container might
540                 # be a bit larger on some runs.
541                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
542                 self.failUnless(m)
543                 container_size = int(m.group(1))
544                 self.failUnless(2037 <= container_size <= 2049, container_size)
545                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
546                 self.failUnless(m)
547                 data_length = int(m.group(1))
548                 self.failUnless(2037 <= data_length <= 2049, data_length)
549                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
550                                 in output)
551                 self.failUnless(" SDMF contents:\n" in output)
552                 self.failUnless("  seqnum: 1\n" in output)
553                 self.failUnless("  required_shares: 3\n" in output)
554                 self.failUnless("  total_shares: 10\n" in output)
555                 self.failUnless("  segsize: 27\n" in output, (output, filename))
556                 self.failUnless("  datalen: 25\n" in output)
557                 # the exact share_hash_chain nodes depends upon the sharenum,
558                 # and is more of a hassle to compute than I want to deal with
559                 # now
560                 self.failUnless("  share_hash_chain: " in output)
561                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
562                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
563                             base32.b2a(storage_index))
564                 self.failUnless(expected in output)
565             except unittest.FailTest:
566                 print
567                 print "dump-share output was:"
568                 print output
569                 raise
570         d.addCallback(_test_debug)
571
572         # test retrieval
573
574         # first, let's see if we can use the existing node to retrieve the
575         # contents. This allows it to use the cached pubkey and maybe the
576         # latest-known sharemap.
577
578         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
579         def _check_download_1(res):
580             self.failUnlessEqual(res, DATA)
581             # now we see if we can retrieve the data from a new node,
582             # constructed using the URI of the original one. We do this test
583             # on the same client that uploaded the data.
584             uri = self._mutable_node_1.get_uri()
585             log.msg("starting retrieve1")
586             newnode = self.clients[0].create_node_from_uri(uri)
587             newnode_2 = self.clients[0].create_node_from_uri(uri)
588             self.failUnlessIdentical(newnode, newnode_2)
589             return newnode.download_best_version()
590         d.addCallback(_check_download_1)
591
592         def _check_download_2(res):
593             self.failUnlessEqual(res, DATA)
594             # same thing, but with a different client
595             uri = self._mutable_node_1.get_uri()
596             newnode = self.clients[1].create_node_from_uri(uri)
597             log.msg("starting retrieve2")
598             d1 = newnode.download_best_version()
599             d1.addCallback(lambda res: (res, newnode))
600             return d1
601         d.addCallback(_check_download_2)
602
603         def _check_download_3((res, newnode)):
604             self.failUnlessEqual(res, DATA)
605             # replace the data
606             log.msg("starting replace1")
607             d1 = newnode.overwrite(NEWDATA)
608             d1.addCallback(lambda res: newnode.download_best_version())
609             return d1
610         d.addCallback(_check_download_3)
611
612         def _check_download_4(res):
613             self.failUnlessEqual(res, NEWDATA)
614             # now create an even newer node and replace the data on it. This
615             # new node has never been used for download before.
616             uri = self._mutable_node_1.get_uri()
617             newnode1 = self.clients[2].create_node_from_uri(uri)
618             newnode2 = self.clients[3].create_node_from_uri(uri)
619             self._newnode3 = self.clients[3].create_node_from_uri(uri)
620             log.msg("starting replace2")
621             d1 = newnode1.overwrite(NEWERDATA)
622             d1.addCallback(lambda res: newnode2.download_best_version())
623             return d1
624         d.addCallback(_check_download_4)
625
626         def _check_download_5(res):
627             log.msg("finished replace2")
628             self.failUnlessEqual(res, NEWERDATA)
629         d.addCallback(_check_download_5)
630
631         def _corrupt_shares(res):
632             # run around and flip bits in all but k of the shares, to test
633             # the hash checks
634             shares = self._find_shares(self.basedir)
635             ## sort by share number
636             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
637             where = dict([ (shnum, filename)
638                            for (client_num, storage_index, filename, shnum)
639                            in shares ])
640             assert len(where) == 10 # this test is designed for 3-of-10
641             for shnum, filename in where.items():
642                 # shares 7,8,9 are left alone. read will check
643                 # (share_hash_chain, block_hash_tree, share_data). New
644                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
645                 # segsize, signature).
646                 if shnum == 0:
647                     # read: this will trigger "pubkey doesn't match
648                     # fingerprint".
649                     self._corrupt_mutable_share(filename, "pubkey")
650                     self._corrupt_mutable_share(filename, "encprivkey")
651                 elif shnum == 1:
652                     # triggers "signature is invalid"
653                     self._corrupt_mutable_share(filename, "seqnum")
654                 elif shnum == 2:
655                     # triggers "signature is invalid"
656                     self._corrupt_mutable_share(filename, "R")
657                 elif shnum == 3:
658                     # triggers "signature is invalid"
659                     self._corrupt_mutable_share(filename, "segsize")
660                 elif shnum == 4:
661                     self._corrupt_mutable_share(filename, "share_hash_chain")
662                 elif shnum == 5:
663                     self._corrupt_mutable_share(filename, "block_hash_tree")
664                 elif shnum == 6:
665                     self._corrupt_mutable_share(filename, "share_data")
666                 # other things to correct: IV, signature
667                 # 7,8,9 are left alone
668
669                 # note that initial_query_count=5 means that we'll hit the
670                 # first 5 servers in effectively random order (based upon
671                 # response time), so we won't necessarily ever get a "pubkey
672                 # doesn't match fingerprint" error (if we hit shnum>=1 before
673                 # shnum=0, we pull the pubkey from there). To get repeatable
674                 # specific failures, we need to set initial_query_count=1,
675                 # but of course that will change the sequencing behavior of
676                 # the retrieval process. TODO: find a reasonable way to make
677                 # this a parameter, probably when we expand this test to test
678                 # for one failure mode at a time.
679
680                 # when we retrieve this, we should get three signature
681                 # failures (where we've mangled seqnum, R, and segsize). The
682                 # pubkey mangling
683         d.addCallback(_corrupt_shares)
684
685         d.addCallback(lambda res: self._newnode3.download_best_version())
686         d.addCallback(_check_download_5)
687
688         def _check_empty_file(res):
689             # make sure we can create empty files, this usually screws up the
690             # segsize math
691             d1 = self.clients[2].create_mutable_file("")
692             d1.addCallback(lambda newnode: newnode.download_best_version())
693             d1.addCallback(lambda res: self.failUnlessEqual("", res))
694             return d1
695         d.addCallback(_check_empty_file)
696
697         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
698         def _created_dirnode(dnode):
699             log.msg("_created_dirnode(%s)" % (dnode,))
700             d1 = dnode.list()
701             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
702             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
703             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
704             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
705             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
706             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
707             d1.addCallback(lambda res: dnode.build_manifest().when_done())
708             d1.addCallback(lambda res:
709                            self.failUnlessEqual(len(res["manifest"]), 1))
710             return d1
711         d.addCallback(_created_dirnode)
712
713         def wait_for_c3_kg_conn():
714             return self.clients[3]._key_generator is not None
715         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
716
717         def check_kg_poolsize(junk, size_delta):
718             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
719                                  self.key_generator_svc.key_generator.pool_size + size_delta)
720
721         d.addCallback(check_kg_poolsize, 0)
722         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
723         d.addCallback(check_kg_poolsize, -1)
724         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
725         d.addCallback(check_kg_poolsize, -2)
726         # use_helper induces use of clients[3], which is the using-key_gen client
727         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
728         d.addCallback(check_kg_poolsize, -3)
729
730         return d
731
732     def flip_bit(self, good):
733         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
734
735     def mangle_uri(self, gooduri):
736         # change the key, which changes the storage index, which means we'll
737         # be asking about the wrong file, so nobody will have any shares
738         u = IFileURI(gooduri)
739         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
740                             uri_extension_hash=u.uri_extension_hash,
741                             needed_shares=u.needed_shares,
742                             total_shares=u.total_shares,
743                             size=u.size)
744         return u2.to_string()
745
746     # TODO: add a test which mangles the uri_extension_hash instead, and
747     # should fail due to not being able to get a valid uri_extension block.
748     # Also a test which sneakily mangles the uri_extension block to change
749     # some of the validation data, so it will fail in the post-download phase
750     # when the file's crypttext integrity check fails. Do the same thing for
751     # the key, which should cause the download to fail the post-download
752     # plaintext_hash check.
753
754     def test_vdrive(self):
755         self.basedir = "system/SystemTest/test_vdrive"
756         self.data = LARGE_DATA
757         d = self.set_up_nodes(use_stats_gatherer=True)
758         d.addCallback(self._test_introweb)
759         d.addCallback(self.log, "starting publish")
760         d.addCallback(self._do_publish1)
761         d.addCallback(self._test_runner)
762         d.addCallback(self._do_publish2)
763         # at this point, we have the following filesystem (where "R" denotes
764         # self._root_directory_uri):
765         # R
766         # R/subdir1
767         # R/subdir1/mydata567
768         # R/subdir1/subdir2/
769         # R/subdir1/subdir2/mydata992
770
771         d.addCallback(lambda res: self.bounce_client(0))
772         d.addCallback(self.log, "bounced client0")
773
774         d.addCallback(self._check_publish1)
775         d.addCallback(self.log, "did _check_publish1")
776         d.addCallback(self._check_publish2)
777         d.addCallback(self.log, "did _check_publish2")
778         d.addCallback(self._do_publish_private)
779         d.addCallback(self.log, "did _do_publish_private")
780         # now we also have (where "P" denotes a new dir):
781         #  P/personal/sekrit data
782         #  P/s2-rw -> /subdir1/subdir2/
783         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
784         d.addCallback(self._check_publish_private)
785         d.addCallback(self.log, "did _check_publish_private")
786         d.addCallback(self._test_web)
787         d.addCallback(self._test_control)
788         d.addCallback(self._test_cli)
789         # P now has four top-level children:
790         # P/personal/sekrit data
791         # P/s2-ro/
792         # P/s2-rw/
793         # P/test_put/  (empty)
794         d.addCallback(self._test_checker)
795         return d
796
797     def _test_introweb(self, res):
798         d = getPage(self.introweb_url, method="GET", followRedirect=True)
799         def _check(res):
800             try:
801                 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
802                                 in res)
803                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
804                 self.failUnless("Subscription Summary: storage: 5" in res)
805             except unittest.FailTest:
806                 print
807                 print "GET %s output was:" % self.introweb_url
808                 print res
809                 raise
810         d.addCallback(_check)
811         d.addCallback(lambda res:
812                       getPage(self.introweb_url + "?t=json",
813                               method="GET", followRedirect=True))
814         def _check_json(res):
815             data = simplejson.loads(res)
816             try:
817                 self.failUnlessEqual(data["subscription_summary"],
818                                      {"storage": 5})
819                 self.failUnlessEqual(data["announcement_summary"],
820                                      {"storage": 5, "stub_client": 5})
821                 self.failUnlessEqual(data["announcement_distinct_hosts"],
822                                      {"storage": 1, "stub_client": 1})
823             except unittest.FailTest:
824                 print
825                 print "GET %s?t=json output was:" % self.introweb_url
826                 print res
827                 raise
828         d.addCallback(_check_json)
829         return d
830
831     def _do_publish1(self, res):
832         ut = upload.Data(self.data, convergence=None)
833         c0 = self.clients[0]
834         d = c0.create_empty_dirnode()
835         def _made_root(new_dirnode):
836             self._root_directory_uri = new_dirnode.get_uri()
837             return c0.create_node_from_uri(self._root_directory_uri)
838         d.addCallback(_made_root)
839         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
840         def _made_subdir1(subdir1_node):
841             self._subdir1_node = subdir1_node
842             d1 = subdir1_node.add_file(u"mydata567", ut)
843             d1.addCallback(self.log, "publish finished")
844             def _stash_uri(filenode):
845                 self.uri = filenode.get_uri()
846                 assert isinstance(self.uri, str), (self.uri, filenode)
847             d1.addCallback(_stash_uri)
848             return d1
849         d.addCallback(_made_subdir1)
850         return d
851
852     def _do_publish2(self, res):
853         ut = upload.Data(self.data, convergence=None)
854         d = self._subdir1_node.create_empty_directory(u"subdir2")
855         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
856         return d
857
858     def log(self, res, *args, **kwargs):
859         # print "MSG: %s  RES: %s" % (msg, args)
860         log.msg(*args, **kwargs)
861         return res
862
863     def _do_publish_private(self, res):
864         self.smalldata = "sssh, very secret stuff"
865         ut = upload.Data(self.smalldata, convergence=None)
866         d = self.clients[0].create_empty_dirnode()
867         d.addCallback(self.log, "GOT private directory")
868         def _got_new_dir(privnode):
869             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
870             d1 = privnode.create_empty_directory(u"personal")
871             d1.addCallback(self.log, "made P/personal")
872             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
873             d1.addCallback(self.log, "made P/personal/sekrit data")
874             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
875             def _got_s2(s2node):
876                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
877                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
878                 return d2
879             d1.addCallback(_got_s2)
880             d1.addCallback(lambda res: privnode)
881             return d1
882         d.addCallback(_got_new_dir)
883         return d
884
885     def _check_publish1(self, res):
886         # this one uses the iterative API
887         c1 = self.clients[1]
888         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
889         d.addCallback(self.log, "check_publish1 got /")
890         d.addCallback(lambda root: root.get(u"subdir1"))
891         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
892         d.addCallback(lambda filenode: filenode.download_to_data())
893         d.addCallback(self.log, "get finished")
894         def _get_done(data):
895             self.failUnlessEqual(data, self.data)
896         d.addCallback(_get_done)
897         return d
898
899     def _check_publish2(self, res):
900         # this one uses the path-based API
901         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
902         d = rootnode.get_child_at_path(u"subdir1")
903         d.addCallback(lambda dirnode:
904                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
905         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
906         d.addCallback(lambda filenode: filenode.download_to_data())
907         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
908
909         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
910         def _got_filenode(filenode):
911             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
912             assert fnode == filenode
913         d.addCallback(_got_filenode)
914         return d
915
916     def _check_publish_private(self, resnode):
917         # this one uses the path-based API
918         self._private_node = resnode
919
920         d = self._private_node.get_child_at_path(u"personal")
921         def _got_personal(personal):
922             self._personal_node = personal
923             return personal
924         d.addCallback(_got_personal)
925
926         d.addCallback(lambda dirnode:
927                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
928         def get_path(path):
929             return self._private_node.get_child_at_path(path)
930
931         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
932         d.addCallback(lambda filenode: filenode.download_to_data())
933         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
934         d.addCallback(lambda res: get_path(u"s2-rw"))
935         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
936         d.addCallback(lambda res: get_path(u"s2-ro"))
937         def _got_s2ro(dirnode):
938             self.failUnless(dirnode.is_mutable(), dirnode)
939             self.failUnless(dirnode.is_readonly(), dirnode)
940             d1 = defer.succeed(None)
941             d1.addCallback(lambda res: dirnode.list())
942             d1.addCallback(self.log, "dirnode.list")
943
944             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
945
946             d1.addCallback(self.log, "doing add_file(ro)")
947             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
948             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
949
950             d1.addCallback(self.log, "doing get(ro)")
951             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
952             d1.addCallback(lambda filenode:
953                            self.failUnless(IFileNode.providedBy(filenode)))
954
955             d1.addCallback(self.log, "doing delete(ro)")
956             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
957
958             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
959
960             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
961
962             personal = self._personal_node
963             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
964
965             d1.addCallback(self.log, "doing move_child_to(ro)2")
966             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
967
968             d1.addCallback(self.log, "finished with _got_s2ro")
969             return d1
970         d.addCallback(_got_s2ro)
971         def _got_home(dummy):
972             home = self._private_node
973             personal = self._personal_node
974             d1 = defer.succeed(None)
975             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
976             d1.addCallback(lambda res:
977                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
978
979             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
980             d1.addCallback(lambda res:
981                            home.move_child_to(u"sekrit", home, u"sekrit data"))
982
983             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
984             d1.addCallback(lambda res:
985                            home.move_child_to(u"sekrit data", personal))
986
987             d1.addCallback(lambda res: home.build_manifest().when_done())
988             d1.addCallback(self.log, "manifest")
989             #  five items:
990             # P/
991             # P/personal/
992             # P/personal/sekrit data
993             # P/s2-rw  (same as P/s2-ro)
994             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
995             d1.addCallback(lambda res:
996                            self.failUnlessEqual(len(res["manifest"]), 5))
997             d1.addCallback(lambda res: home.start_deep_stats().when_done())
998             def _check_stats(stats):
999                 expected = {"count-immutable-files": 1,
1000                             "count-mutable-files": 0,
1001                             "count-literal-files": 1,
1002                             "count-files": 2,
1003                             "count-directories": 3,
1004                             "size-immutable-files": 112,
1005                             "size-literal-files": 23,
1006                             #"size-directories": 616, # varies
1007                             #"largest-directory": 616,
1008                             "largest-directory-children": 3,
1009                             "largest-immutable-file": 112,
1010                             }
1011                 for k,v in expected.iteritems():
1012                     self.failUnlessEqual(stats[k], v,
1013                                          "stats[%s] was %s, not %s" %
1014                                          (k, stats[k], v))
1015                 self.failUnless(stats["size-directories"] > 1300,
1016                                 stats["size-directories"])
1017                 self.failUnless(stats["largest-directory"] > 800,
1018                                 stats["largest-directory"])
1019                 self.failUnlessEqual(stats["size-files-histogram"],
1020                                      [ (11, 31, 1), (101, 316, 1) ])
1021             d1.addCallback(_check_stats)
1022             return d1
1023         d.addCallback(_got_home)
1024         return d
1025
1026     def shouldFail(self, res, expected_failure, which, substring=None):
1027         if isinstance(res, Failure):
1028             res.trap(expected_failure)
1029             if substring:
1030                 self.failUnless(substring in str(res),
1031                                 "substring '%s' not in '%s'"
1032                                 % (substring, str(res)))
1033         else:
1034             self.fail("%s was supposed to raise %s, not get '%s'" %
1035                       (which, expected_failure, res))
1036
1037     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1038         assert substring is None or isinstance(substring, str)
1039         d = defer.maybeDeferred(callable, *args, **kwargs)
1040         def done(res):
1041             if isinstance(res, Failure):
1042                 res.trap(expected_failure)
1043                 if substring:
1044                     self.failUnless(substring in str(res),
1045                                     "substring '%s' not in '%s'"
1046                                     % (substring, str(res)))
1047             else:
1048                 self.fail("%s was supposed to raise %s, not get '%s'" %
1049                           (which, expected_failure, res))
1050         d.addBoth(done)
1051         return d
1052
1053     def PUT(self, urlpath, data):
1054         url = self.webish_url + urlpath
1055         return getPage(url, method="PUT", postdata=data)
1056
1057     def GET(self, urlpath, followRedirect=False):
1058         url = self.webish_url + urlpath
1059         return getPage(url, method="GET", followRedirect=followRedirect)
1060
1061     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1062         if use_helper:
1063             url = self.helper_webish_url + urlpath
1064         else:
1065             url = self.webish_url + urlpath
1066         sepbase = "boogabooga"
1067         sep = "--" + sepbase
1068         form = []
1069         form.append(sep)
1070         form.append('Content-Disposition: form-data; name="_charset"')
1071         form.append('')
1072         form.append('UTF-8')
1073         form.append(sep)
1074         for name, value in fields.iteritems():
1075             if isinstance(value, tuple):
1076                 filename, value = value
1077                 form.append('Content-Disposition: form-data; name="%s"; '
1078                             'filename="%s"' % (name, filename.encode("utf-8")))
1079             else:
1080                 form.append('Content-Disposition: form-data; name="%s"' % name)
1081             form.append('')
1082             form.append(str(value))
1083             form.append(sep)
1084         form[-1] += "--"
1085         body = "\r\n".join(form) + "\r\n"
1086         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1087                    }
1088         return getPage(url, method="POST", postdata=body,
1089                        headers=headers, followRedirect=followRedirect)
1090
1091     def _test_web(self, res):
1092         base = self.webish_url
1093         public = "uri/" + self._root_directory_uri
1094         d = getPage(base)
1095         def _got_welcome(page):
1096             # XXX This test is oversensitive to formatting
1097             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1098             self.failUnless(expected in page,
1099                             "I didn't see the right 'connected storage servers'"
1100                             " message in: %s" % page
1101                             )
1102             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1103             self.failUnless(expected in page,
1104                             "I didn't see the right 'My nodeid' message "
1105                             "in: %s" % page)
1106             self.failUnless("Helper: 0 active uploads" in page)
1107         d.addCallback(_got_welcome)
1108         d.addCallback(self.log, "done with _got_welcome")
1109
1110         # get the welcome page from the node that uses the helper too
1111         d.addCallback(lambda res: getPage(self.helper_webish_url))
1112         def _got_welcome_helper(page):
1113             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1114                             page)
1115             self.failUnless("Not running helper" in page)
1116         d.addCallback(_got_welcome_helper)
1117
1118         d.addCallback(lambda res: getPage(base + public))
1119         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1120         def _got_subdir1(page):
1121             # there ought to be an href for our file
1122             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1123             self.failUnless(">mydata567</a>" in page)
1124         d.addCallback(_got_subdir1)
1125         d.addCallback(self.log, "done with _got_subdir1")
1126         d.addCallback(lambda res:
1127                       getPage(base + public + "/subdir1/mydata567"))
1128         def _got_data(page):
1129             self.failUnlessEqual(page, self.data)
1130         d.addCallback(_got_data)
1131
1132         # download from a URI embedded in a URL
1133         d.addCallback(self.log, "_get_from_uri")
1134         def _get_from_uri(res):
1135             return getPage(base + "uri/%s?filename=%s"
1136                            % (self.uri, "mydata567"))
1137         d.addCallback(_get_from_uri)
1138         def _got_from_uri(page):
1139             self.failUnlessEqual(page, self.data)
1140         d.addCallback(_got_from_uri)
1141
1142         # download from a URI embedded in a URL, second form
1143         d.addCallback(self.log, "_get_from_uri2")
1144         def _get_from_uri2(res):
1145             return getPage(base + "uri?uri=%s" % (self.uri,))
1146         d.addCallback(_get_from_uri2)
1147         d.addCallback(_got_from_uri)
1148
1149         # download from a bogus URI, make sure we get a reasonable error
1150         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1151         def _get_from_bogus_uri(res):
1152             d1 = getPage(base + "uri/%s?filename=%s"
1153                          % (self.mangle_uri(self.uri), "mydata567"))
1154             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1155                        "410")
1156             return d1
1157         d.addCallback(_get_from_bogus_uri)
1158         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1159
1160         # upload a file with PUT
1161         d.addCallback(self.log, "about to try PUT")
1162         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1163                                            "new.txt contents"))
1164         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1165         d.addCallback(self.failUnlessEqual, "new.txt contents")
1166         # and again with something large enough to use multiple segments,
1167         # and hopefully trigger pauseProducing too
1168         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1169                                            "big" * 500000)) # 1.5MB
1170         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1171         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1172
1173         # can we replace files in place?
1174         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1175                                            "NEWER contents"))
1176         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1177         d.addCallback(self.failUnlessEqual, "NEWER contents")
1178
1179         # test unlinked POST
1180         d.addCallback(lambda res: self.POST("uri", t="upload",
1181                                             file=("new.txt", "data" * 10000)))
1182         # and again using the helper, which exercises different upload-status
1183         # display code
1184         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1185                                             file=("foo.txt", "data2" * 10000)))
1186
1187         # check that the status page exists
1188         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1189         def _got_status(res):
1190             # find an interesting upload and download to look at. LIT files
1191             # are not interesting.
1192             for ds in self.clients[0].list_all_download_statuses():
1193                 if ds.get_size() > 200:
1194                     self._down_status = ds.get_counter()
1195             for us in self.clients[0].list_all_upload_statuses():
1196                 if us.get_size() > 200:
1197                     self._up_status = us.get_counter()
1198             rs = list(self.clients[0].list_all_retrieve_statuses())[0]
1199             self._retrieve_status = rs.get_counter()
1200             ps = list(self.clients[0].list_all_publish_statuses())[0]
1201             self._publish_status = ps.get_counter()
1202             us = list(self.clients[0].list_all_mapupdate_statuses())[0]
1203             self._update_status = us.get_counter()
1204
1205             # and that there are some upload- and download- status pages
1206             return self.GET("status/up-%d" % self._up_status)
1207         d.addCallback(_got_status)
1208         def _got_up(res):
1209             return self.GET("status/down-%d" % self._down_status)
1210         d.addCallback(_got_up)
1211         def _got_down(res):
1212             return self.GET("status/mapupdate-%d" % self._update_status)
1213         d.addCallback(_got_down)
1214         def _got_update(res):
1215             return self.GET("status/publish-%d" % self._publish_status)
1216         d.addCallback(_got_update)
1217         def _got_publish(res):
1218             return self.GET("status/retrieve-%d" % self._retrieve_status)
1219         d.addCallback(_got_publish)
1220
1221         # check that the helper status page exists
1222         d.addCallback(lambda res:
1223                       self.GET("helper_status", followRedirect=True))
1224         def _got_helper_status(res):
1225             self.failUnless("Bytes Fetched:" in res)
1226             # touch a couple of files in the helper's working directory to
1227             # exercise more code paths
1228             workdir = os.path.join(self.getdir("client0"), "helper")
1229             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1230             f = open(incfile, "wb")
1231             f.write("small file")
1232             f.close()
1233             then = time.time() - 86400*3
1234             now = time.time()
1235             os.utime(incfile, (now, then))
1236             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1237             f = open(encfile, "wb")
1238             f.write("less small file")
1239             f.close()
1240             os.utime(encfile, (now, then))
1241         d.addCallback(_got_helper_status)
1242         # and that the json form exists
1243         d.addCallback(lambda res:
1244                       self.GET("helper_status?t=json", followRedirect=True))
1245         def _got_helper_status_json(res):
1246             data = simplejson.loads(res)
1247             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1248                                  1)
1249             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1250             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1251             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1252                                  10)
1253             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1254             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1255             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1256                                  15)
1257         d.addCallback(_got_helper_status_json)
1258
1259         # and check that client[3] (which uses a helper but does not run one
1260         # itself) doesn't explode when you ask for its status
1261         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1262         def _got_non_helper_status(res):
1263             self.failUnless("Upload and Download Status" in res)
1264         d.addCallback(_got_non_helper_status)
1265
1266         # or for helper status with t=json
1267         d.addCallback(lambda res:
1268                       getPage(self.helper_webish_url + "helper_status?t=json"))
1269         def _got_non_helper_status_json(res):
1270             data = simplejson.loads(res)
1271             self.failUnlessEqual(data, {})
1272         d.addCallback(_got_non_helper_status_json)
1273
1274         # see if the statistics page exists
1275         d.addCallback(lambda res: self.GET("statistics"))
1276         def _got_stats(res):
1277             self.failUnless("Node Statistics" in res)
1278             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1279         d.addCallback(_got_stats)
1280         d.addCallback(lambda res: self.GET("statistics?t=json"))
1281         def _got_stats_json(res):
1282             data = simplejson.loads(res)
1283             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1284             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1285         d.addCallback(_got_stats_json)
1286
1287         # TODO: mangle the second segment of a file, to test errors that
1288         # occur after we've already sent some good data, which uses a
1289         # different error path.
1290
1291         # TODO: download a URI with a form
1292         # TODO: create a directory by using a form
1293         # TODO: upload by using a form on the directory page
1294         #    url = base + "somedir/subdir1/freeform_post!!upload"
1295         # TODO: delete a file by using a button on the directory page
1296
1297         return d
1298
1299     def _test_runner(self, res):
1300         # exercise some of the diagnostic tools in runner.py
1301
1302         # find a share
1303         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1304             if "storage" not in dirpath:
1305                 continue
1306             if not filenames:
1307                 continue
1308             pieces = dirpath.split(os.sep)
1309             if (len(pieces) >= 4
1310                 and pieces[-4] == "storage"
1311                 and pieces[-3] == "shares"):
1312                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1313                 # are sharefiles here
1314                 filename = os.path.join(dirpath, filenames[0])
1315                 # peek at the magic to see if it is a chk share
1316                 magic = open(filename, "rb").read(4)
1317                 if magic == '\x00\x00\x00\x01':
1318                     break
1319         else:
1320             self.fail("unable to find any uri_extension files in %s"
1321                       % self.basedir)
1322         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1323
1324         out,err = StringIO(), StringIO()
1325         rc = runner.runner(["debug", "dump-share", "--offsets",
1326                             filename],
1327                            stdout=out, stderr=err)
1328         output = out.getvalue()
1329         self.failUnlessEqual(rc, 0)
1330
1331         # we only upload a single file, so we can assert some things about
1332         # its size and shares.
1333         self.failUnless(("share filename: %s" % filename) in output)
1334         self.failUnless("size: %d\n" % len(self.data) in output)
1335         self.failUnless("num_segments: 1\n" in output)
1336         # segment_size is always a multiple of needed_shares
1337         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1338         self.failUnless("total_shares: 10\n" in output)
1339         # keys which are supposed to be present
1340         for key in ("size", "num_segments", "segment_size",
1341                     "needed_shares", "total_shares",
1342                     "codec_name", "codec_params", "tail_codec_params",
1343                     #"plaintext_hash", "plaintext_root_hash",
1344                     "crypttext_hash", "crypttext_root_hash",
1345                     "share_root_hash", "UEB_hash"):
1346             self.failUnless("%s: " % key in output, key)
1347         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1348
1349         # now use its storage index to find the other shares using the
1350         # 'find-shares' tool
1351         sharedir, shnum = os.path.split(filename)
1352         storagedir, storage_index_s = os.path.split(sharedir)
1353         out,err = StringIO(), StringIO()
1354         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1355         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1356         rc = runner.runner(cmd, stdout=out, stderr=err)
1357         self.failUnlessEqual(rc, 0)
1358         out.seek(0)
1359         sharefiles = [sfn.strip() for sfn in out.readlines()]
1360         self.failUnlessEqual(len(sharefiles), 10)
1361
1362         # also exercise the 'catalog-shares' tool
1363         out,err = StringIO(), StringIO()
1364         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1365         cmd = ["debug", "catalog-shares"] + nodedirs
1366         rc = runner.runner(cmd, stdout=out, stderr=err)
1367         self.failUnlessEqual(rc, 0)
1368         out.seek(0)
1369         descriptions = [sfn.strip() for sfn in out.readlines()]
1370         self.failUnlessEqual(len(descriptions), 30)
1371         matching = [line
1372                     for line in descriptions
1373                     if line.startswith("CHK %s " % storage_index_s)]
1374         self.failUnlessEqual(len(matching), 10)
1375
1376     def _test_control(self, res):
1377         # exercise the remote-control-the-client foolscap interfaces in
1378         # allmydata.control (mostly used for performance tests)
1379         c0 = self.clients[0]
1380         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1381         control_furl = open(control_furl_file, "r").read().strip()
1382         # it doesn't really matter which Tub we use to connect to the client,
1383         # so let's just use our IntroducerNode's
1384         d = self.introducer.tub.getReference(control_furl)
1385         d.addCallback(self._test_control2, control_furl_file)
1386         return d
1387     def _test_control2(self, rref, filename):
1388         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1389         downfile = os.path.join(self.basedir, "control.downfile")
1390         d.addCallback(lambda uri:
1391                       rref.callRemote("download_from_uri_to_file",
1392                                       uri, downfile))
1393         def _check(res):
1394             self.failUnlessEqual(res, downfile)
1395             data = open(downfile, "r").read()
1396             expected_data = open(filename, "r").read()
1397             self.failUnlessEqual(data, expected_data)
1398         d.addCallback(_check)
1399         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1400         if sys.platform == "linux2":
1401             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1402         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1403         return d
1404
1405     def _test_cli(self, res):
1406         # run various CLI commands (in a thread, since they use blocking
1407         # network calls)
1408
1409         private_uri = self._private_node.get_uri()
1410         some_uri = self._root_directory_uri
1411         client0_basedir = self.getdir("client0")
1412
1413         nodeargs = [
1414             "--node-directory", client0_basedir,
1415             ]
1416         TESTDATA = "I will not write the same thing over and over.\n" * 100
1417
1418         d = defer.succeed(None)
1419
1420         # for compatibility with earlier versions, private/root_dir.cap is
1421         # supposed to be treated as an alias named "tahoe:". Start by making
1422         # sure that works, before we add other aliases.
1423
1424         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1425         f = open(root_file, "w")
1426         f.write(private_uri)
1427         f.close()
1428
1429         def run(ignored, verb, *args, **kwargs):
1430             stdin = kwargs.get("stdin", "")
1431             newargs = [verb] + nodeargs + list(args)
1432             return self._run_cli(newargs, stdin=stdin)
1433
1434         def _check_ls((out,err), expected_children, unexpected_children=[]):
1435             self.failUnlessEqual(err, "")
1436             for s in expected_children:
1437                 self.failUnless(s in out, (s,out))
1438             for s in unexpected_children:
1439                 self.failIf(s in out, (s,out))
1440
1441         def _check_ls_root((out,err)):
1442             self.failUnless("personal" in out)
1443             self.failUnless("s2-ro" in out)
1444             self.failUnless("s2-rw" in out)
1445             self.failUnlessEqual(err, "")
1446
1447         # this should reference private_uri
1448         d.addCallback(run, "ls")
1449         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1450
1451         d.addCallback(run, "list-aliases")
1452         def _check_aliases_1((out,err)):
1453             self.failUnlessEqual(err, "")
1454             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1455         d.addCallback(_check_aliases_1)
1456
1457         # now that that's out of the way, remove root_dir.cap and work with
1458         # new files
1459         d.addCallback(lambda res: os.unlink(root_file))
1460         d.addCallback(run, "list-aliases")
1461         def _check_aliases_2((out,err)):
1462             self.failUnlessEqual(err, "")
1463             self.failUnlessEqual(out, "")
1464         d.addCallback(_check_aliases_2)
1465
1466         d.addCallback(run, "mkdir")
1467         def _got_dir( (out,err) ):
1468             self.failUnless(uri.from_string_dirnode(out.strip()))
1469             return out.strip()
1470         d.addCallback(_got_dir)
1471         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1472
1473         d.addCallback(run, "list-aliases")
1474         def _check_aliases_3((out,err)):
1475             self.failUnlessEqual(err, "")
1476             self.failUnless("tahoe: " in out)
1477         d.addCallback(_check_aliases_3)
1478
1479         def _check_empty_dir((out,err)):
1480             self.failUnlessEqual(out, "")
1481             self.failUnlessEqual(err, "")
1482         d.addCallback(run, "ls")
1483         d.addCallback(_check_empty_dir)
1484
1485         def _check_missing_dir((out,err)):
1486             # TODO: check that rc==2
1487             self.failUnlessEqual(out, "")
1488             self.failUnlessEqual(err, "No such file or directory\n")
1489         d.addCallback(run, "ls", "bogus")
1490         d.addCallback(_check_missing_dir)
1491
1492         files = []
1493         datas = []
1494         for i in range(10):
1495             fn = os.path.join(self.basedir, "file%d" % i)
1496             files.append(fn)
1497             data = "data to be uploaded: file%d\n" % i
1498             datas.append(data)
1499             open(fn,"wb").write(data)
1500
1501         def _check_stdout_against((out,err), filenum=None, data=None):
1502             self.failUnlessEqual(err, "")
1503             if filenum is not None:
1504                 self.failUnlessEqual(out, datas[filenum])
1505             if data is not None:
1506                 self.failUnlessEqual(out, data)
1507
1508         # test all both forms of put: from a file, and from stdin
1509         #  tahoe put bar FOO
1510         d.addCallback(run, "put", files[0], "tahoe-file0")
1511         def _put_out((out,err)):
1512             self.failUnless("URI:LIT:" in out, out)
1513             self.failUnless("201 Created" in err, err)
1514             uri0 = out.strip()
1515             return run(None, "get", uri0)
1516         d.addCallback(_put_out)
1517         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1518
1519         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1520         #  tahoe put bar tahoe:FOO
1521         d.addCallback(run, "put", files[2], "tahoe:file2")
1522         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1523         def _check_put_mutable((out,err)):
1524             self._mutable_file3_uri = out.strip()
1525         d.addCallback(_check_put_mutable)
1526         d.addCallback(run, "get", "tahoe:file3")
1527         d.addCallback(_check_stdout_against, 3)
1528
1529         #  tahoe put FOO
1530         STDIN_DATA = "This is the file to upload from stdin."
1531         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1532         #  tahoe put tahoe:FOO
1533         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1534                       stdin="Other file from stdin.")
1535
1536         d.addCallback(run, "ls")
1537         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1538                                   "tahoe-file-stdin", "from-stdin"])
1539         d.addCallback(run, "ls", "subdir")
1540         d.addCallback(_check_ls, ["tahoe-file1"])
1541
1542         # tahoe mkdir FOO
1543         d.addCallback(run, "mkdir", "subdir2")
1544         d.addCallback(run, "ls")
1545         # TODO: extract the URI, set an alias with it
1546         d.addCallback(_check_ls, ["subdir2"])
1547
1548         # tahoe get: (to stdin and to a file)
1549         d.addCallback(run, "get", "tahoe-file0")
1550         d.addCallback(_check_stdout_against, 0)
1551         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1552         d.addCallback(_check_stdout_against, 1)
1553         outfile0 = os.path.join(self.basedir, "outfile0")
1554         d.addCallback(run, "get", "file2", outfile0)
1555         def _check_outfile0((out,err)):
1556             data = open(outfile0,"rb").read()
1557             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1558         d.addCallback(_check_outfile0)
1559         outfile1 = os.path.join(self.basedir, "outfile0")
1560         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1561         def _check_outfile1((out,err)):
1562             data = open(outfile1,"rb").read()
1563             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1564         d.addCallback(_check_outfile1)
1565
1566         d.addCallback(run, "rm", "tahoe-file0")
1567         d.addCallback(run, "rm", "tahoe:file2")
1568         d.addCallback(run, "ls")
1569         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1570
1571         d.addCallback(run, "ls", "-l")
1572         def _check_ls_l((out,err)):
1573             lines = out.split("\n")
1574             for l in lines:
1575                 if "tahoe-file-stdin" in l:
1576                     self.failUnless(l.startswith("-r-- "), l)
1577                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1578                 if "file3" in l:
1579                     self.failUnless(l.startswith("-rw- "), l) # mutable
1580         d.addCallback(_check_ls_l)
1581
1582         d.addCallback(run, "ls", "--uri")
1583         def _check_ls_uri((out,err)):
1584             lines = out.split("\n")
1585             for l in lines:
1586                 if "file3" in l:
1587                     self.failUnless(self._mutable_file3_uri in l)
1588         d.addCallback(_check_ls_uri)
1589
1590         d.addCallback(run, "ls", "--readonly-uri")
1591         def _check_ls_rouri((out,err)):
1592             lines = out.split("\n")
1593             for l in lines:
1594                 if "file3" in l:
1595                     rw_uri = self._mutable_file3_uri
1596                     u = uri.from_string_mutable_filenode(rw_uri)
1597                     ro_uri = u.get_readonly().to_string()
1598                     self.failUnless(ro_uri in l)
1599         d.addCallback(_check_ls_rouri)
1600
1601
1602         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1603         d.addCallback(run, "ls")
1604         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1605
1606         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1607         d.addCallback(run, "ls")
1608         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1609
1610         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1611         d.addCallback(run, "ls")
1612         d.addCallback(_check_ls, ["file3", "file3-copy"])
1613         d.addCallback(run, "get", "tahoe:file3-copy")
1614         d.addCallback(_check_stdout_against, 3)
1615
1616         # copy from disk into tahoe
1617         d.addCallback(run, "cp", files[4], "tahoe:file4")
1618         d.addCallback(run, "ls")
1619         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1620         d.addCallback(run, "get", "tahoe:file4")
1621         d.addCallback(_check_stdout_against, 4)
1622
1623         # copy from tahoe into disk
1624         target_filename = os.path.join(self.basedir, "file-out")
1625         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1626         def _check_cp_out((out,err)):
1627             self.failUnless(os.path.exists(target_filename))
1628             got = open(target_filename,"rb").read()
1629             self.failUnlessEqual(got, datas[4])
1630         d.addCallback(_check_cp_out)
1631
1632         # copy from disk to disk (silly case)
1633         target2_filename = os.path.join(self.basedir, "file-out-copy")
1634         d.addCallback(run, "cp", target_filename, target2_filename)
1635         def _check_cp_out2((out,err)):
1636             self.failUnless(os.path.exists(target2_filename))
1637             got = open(target2_filename,"rb").read()
1638             self.failUnlessEqual(got, datas[4])
1639         d.addCallback(_check_cp_out2)
1640
1641         # copy from tahoe into disk, overwriting an existing file
1642         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1643         def _check_cp_out3((out,err)):
1644             self.failUnless(os.path.exists(target_filename))
1645             got = open(target_filename,"rb").read()
1646             self.failUnlessEqual(got, datas[3])
1647         d.addCallback(_check_cp_out3)
1648
1649         # copy from disk into tahoe, overwriting an existing immutable file
1650         d.addCallback(run, "cp", files[5], "tahoe:file4")
1651         d.addCallback(run, "ls")
1652         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1653         d.addCallback(run, "get", "tahoe:file4")
1654         d.addCallback(_check_stdout_against, 5)
1655
1656         # copy from disk into tahoe, overwriting an existing mutable file
1657         d.addCallback(run, "cp", files[5], "tahoe:file3")
1658         d.addCallback(run, "ls")
1659         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1660         d.addCallback(run, "get", "tahoe:file3")
1661         d.addCallback(_check_stdout_against, 5)
1662
1663         # recursive copy: setup
1664         dn = os.path.join(self.basedir, "dir1")
1665         os.makedirs(dn)
1666         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1667         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1668         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1669         sdn2 = os.path.join(dn, "subdir2")
1670         os.makedirs(sdn2)
1671         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1672         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1673
1674         # from disk into tahoe
1675         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1676         d.addCallback(run, "ls")
1677         d.addCallback(_check_ls, ["dir1"])
1678         d.addCallback(run, "ls", "dir1")
1679         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1680                       ["rfile4", "rfile5"])
1681         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1682         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1683                       ["rfile1", "rfile2", "rfile3"])
1684         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1685         d.addCallback(_check_stdout_against, data="rfile4")
1686
1687         # and back out again
1688         dn_copy = os.path.join(self.basedir, "dir1-copy")
1689         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1690         def _check_cp_r_out((out,err)):
1691             def _cmp(name):
1692                 old = open(os.path.join(dn, name), "rb").read()
1693                 newfn = os.path.join(dn_copy, name)
1694                 self.failUnless(os.path.exists(newfn))
1695                 new = open(newfn, "rb").read()
1696                 self.failUnlessEqual(old, new)
1697             _cmp("rfile1")
1698             _cmp("rfile2")
1699             _cmp("rfile3")
1700             _cmp(os.path.join("subdir2", "rfile4"))
1701             _cmp(os.path.join("subdir2", "rfile5"))
1702         d.addCallback(_check_cp_r_out)
1703
1704         # and copy it a second time, which ought to overwrite the same files
1705         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1706
1707         # and again, only writing filecaps
1708         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1709         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1710         def _check_capsonly((out,err)):
1711             # these should all be LITs
1712             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1713             y = uri.from_string_filenode(x)
1714             self.failUnlessEqual(y.data, "rfile4")
1715         d.addCallback(_check_capsonly)
1716
1717         # and tahoe-to-tahoe
1718         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1719         d.addCallback(run, "ls")
1720         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1721         d.addCallback(run, "ls", "dir1-copy")
1722         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1723                       ["rfile4", "rfile5"])
1724         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1725         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1726                       ["rfile1", "rfile2", "rfile3"])
1727         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1728         d.addCallback(_check_stdout_against, data="rfile4")
1729
1730         # and copy it a second time, which ought to overwrite the same files
1731         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1732
1733         # tahoe_ls doesn't currently handle the error correctly: it tries to
1734         # JSON-parse a traceback.
1735 ##         def _ls_missing(res):
1736 ##             argv = ["ls"] + nodeargs + ["bogus"]
1737 ##             return self._run_cli(argv)
1738 ##         d.addCallback(_ls_missing)
1739 ##         def _check_ls_missing((out,err)):
1740 ##             print "OUT", out
1741 ##             print "ERR", err
1742 ##             self.failUnlessEqual(err, "")
1743 ##         d.addCallback(_check_ls_missing)
1744
1745         return d
1746
1747     def _run_cli(self, argv, stdin=""):
1748         #print "CLI:", argv
1749         stdout, stderr = StringIO(), StringIO()
1750         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1751                                   stdin=StringIO(stdin),
1752                                   stdout=stdout, stderr=stderr)
1753         def _done(res):
1754             return stdout.getvalue(), stderr.getvalue()
1755         d.addCallback(_done)
1756         return d
1757
1758     def _test_checker(self, res):
1759         ut = upload.Data("too big to be literal" * 200, convergence=None)
1760         d = self._personal_node.add_file(u"big file", ut)
1761
1762         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1763         def _check_dirnode_results(r):
1764             self.failUnless(r.is_healthy())
1765         d.addCallback(_check_dirnode_results)
1766         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1767         d.addCallback(_check_dirnode_results)
1768
1769         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1770         def _got_chk_filenode(n):
1771             self.failUnless(isinstance(n, filenode.FileNode))
1772             d = n.check(Monitor())
1773             def _check_filenode_results(r):
1774                 self.failUnless(r.is_healthy())
1775             d.addCallback(_check_filenode_results)
1776             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1777             d.addCallback(_check_filenode_results)
1778             return d
1779         d.addCallback(_got_chk_filenode)
1780
1781         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1782         def _got_lit_filenode(n):
1783             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1784             d = n.check(Monitor())
1785             def _check_lit_filenode_results(r):
1786                 self.failUnlessEqual(r, None)
1787             d.addCallback(_check_lit_filenode_results)
1788             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1789             d.addCallback(_check_lit_filenode_results)
1790             return d
1791         d.addCallback(_got_lit_filenode)
1792         return d
1793