]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
tests: double the timeouts on some tests which time-out on Francois's box
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, re, simplejson
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri
12 from allmydata.storage.mutable import MutableShareFile
13 from allmydata.storage.server import si_a2b
14 from allmydata.immutable import download, filenode, offloaded, upload
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.scripts import runner
18 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
19      NoSuchChildError, NotEnoughSharesError
20 from allmydata.monitor import Monitor
21 from allmydata.mutable.common import NotMutableError
22 from allmydata.mutable import layout as mutable_layout
23 from foolscap.api import DeadReferenceError
24 from twisted.python.failure import Failure
25 from twisted.web.client import getPage
26 from twisted.web.error import Error
27
28 from allmydata.test.common import SystemTestMixin, MemoryConsumer, \
29      download_to_data
30
31 LARGE_DATA = """
32 This is some data to publish to the virtual drive, which needs to be large
33 enough to not fit inside a LIT uri.
34 """
35
36 class CountingDataUploadable(upload.Data):
37     bytes_read = 0
38     interrupt_after = None
39     interrupt_after_d = None
40
41     def read(self, length):
42         self.bytes_read += length
43         if self.interrupt_after is not None:
44             if self.bytes_read > self.interrupt_after:
45                 self.interrupt_after = None
46                 self.interrupt_after_d.callback(self)
47         return upload.Data.read(self, length)
48
49 class GrabEverythingConsumer:
50     implements(IConsumer)
51
52     def __init__(self):
53         self.contents = ""
54
55     def registerProducer(self, producer, streaming):
56         assert streaming
57         assert IPushProducer.providedBy(producer)
58
59     def write(self, data):
60         self.contents += data
61
62     def unregisterProducer(self):
63         pass
64
65 class SystemTest(SystemTestMixin, unittest.TestCase):
66     timeout = 960 # It takes longer than 480 seconds on Francois's arm box.
67
68     def test_connections(self):
69         self.basedir = "system/SystemTest/test_connections"
70         d = self.set_up_nodes()
71         self.extra_node = None
72         d.addCallback(lambda res: self.add_extra_node(self.numclients))
73         def _check(extra_node):
74             self.extra_node = extra_node
75             for c in self.clients:
76                 all_peerids = list(c.get_storage_broker().get_all_serverids())
77                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
78                 sb = c.storage_broker
79                 permuted_peers = list(sb.get_servers("a"))
80                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
81
82         d.addCallback(_check)
83         def _shutdown_extra_node(res):
84             if self.extra_node:
85                 return self.extra_node.stopService()
86             return res
87         d.addBoth(_shutdown_extra_node)
88         return d
89     # test_connections is subsumed by test_upload_and_download, and takes
90     # quite a while to run on a slow machine (because of all the TLS
91     # connections that must be established). If we ever rework the introducer
92     # code to such an extent that we're not sure if it works anymore, we can
93     # reinstate this test until it does.
94     del test_connections
95
96     def test_upload_and_download_random_key(self):
97         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
98         return self._test_upload_and_download(convergence=None)
99
100     def test_upload_and_download_convergent(self):
101         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
102         return self._test_upload_and_download(convergence="some convergence string")
103
104     def _test_upload_and_download(self, convergence):
105         # we use 4000 bytes of data, which will result in about 400k written
106         # to disk among all our simulated nodes
107         DATA = "Some data to upload\n" * 200
108         d = self.set_up_nodes()
109         def _check_connections(res):
110             for c in self.clients:
111                 all_peerids = list(c.get_storage_broker().get_all_serverids())
112                 self.failUnlessEqual(len(all_peerids), self.numclients)
113                 sb = c.storage_broker
114                 permuted_peers = list(sb.get_servers("a"))
115                 self.failUnlessEqual(len(permuted_peers), self.numclients)
116         d.addCallback(_check_connections)
117
118         def _do_upload(res):
119             log.msg("UPLOADING")
120             u = self.clients[0].getServiceNamed("uploader")
121             self.uploader = u
122             # we crank the max segsize down to 1024b for the duration of this
123             # test, so we can exercise multiple segments. It is important
124             # that this is not a multiple of the segment size, so that the
125             # tail segment is not the same length as the others. This actualy
126             # gets rounded up to 1025 to be a multiple of the number of
127             # required shares (since we use 25 out of 100 FEC).
128             up = upload.Data(DATA, convergence=convergence)
129             up.max_segment_size = 1024
130             d1 = u.upload(up)
131             return d1
132         d.addCallback(_do_upload)
133         def _upload_done(results):
134             theuri = results.uri
135             log.msg("upload finished: uri is %s" % (theuri,))
136             self.uri = theuri
137             assert isinstance(self.uri, str), self.uri
138             dl = self.clients[1].getServiceNamed("downloader")
139             self.downloader = dl
140         d.addCallback(_upload_done)
141
142         def _upload_again(res):
143             # Upload again. If using convergent encryption then this ought to be
144             # short-circuited, however with the way we currently generate URIs
145             # (i.e. because they include the roothash), we have to do all of the
146             # encoding work, and only get to save on the upload part.
147             log.msg("UPLOADING AGAIN")
148             up = upload.Data(DATA, convergence=convergence)
149             up.max_segment_size = 1024
150             d1 = self.uploader.upload(up)
151         d.addCallback(_upload_again)
152
153         def _download_to_data(res):
154             log.msg("DOWNLOADING")
155             return self.downloader.download_to_data(self.uri)
156         d.addCallback(_download_to_data)
157         def _download_to_data_done(data):
158             log.msg("download finished")
159             self.failUnlessEqual(data, DATA)
160         d.addCallback(_download_to_data_done)
161
162         target_filename = os.path.join(self.basedir, "download.target")
163         def _download_to_filename(res):
164             return self.downloader.download_to_filename(self.uri,
165                                                         target_filename)
166         d.addCallback(_download_to_filename)
167         def _download_to_filename_done(res):
168             newdata = open(target_filename, "rb").read()
169             self.failUnlessEqual(newdata, DATA)
170         d.addCallback(_download_to_filename_done)
171
172         target_filename2 = os.path.join(self.basedir, "download.target2")
173         def _download_to_filehandle(res):
174             fh = open(target_filename2, "wb")
175             return self.downloader.download_to_filehandle(self.uri, fh)
176         d.addCallback(_download_to_filehandle)
177         def _download_to_filehandle_done(fh):
178             fh.close()
179             newdata = open(target_filename2, "rb").read()
180             self.failUnlessEqual(newdata, DATA)
181         d.addCallback(_download_to_filehandle_done)
182
183         consumer = GrabEverythingConsumer()
184         ct = download.ConsumerAdapter(consumer)
185         d.addCallback(lambda res:
186                       self.downloader.download(self.uri, ct))
187         def _download_to_consumer_done(ign):
188             self.failUnlessEqual(consumer.contents, DATA)
189         d.addCallback(_download_to_consumer_done)
190
191         def _test_read(res):
192             n = self.clients[1].create_node_from_uri(self.uri)
193             d = download_to_data(n)
194             def _read_done(data):
195                 self.failUnlessEqual(data, DATA)
196             d.addCallback(_read_done)
197             d.addCallback(lambda ign:
198                           n.read(MemoryConsumer(), offset=1, size=4))
199             def _read_portion_done(mc):
200                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
201             d.addCallback(_read_portion_done)
202             d.addCallback(lambda ign:
203                           n.read(MemoryConsumer(), offset=2, size=None))
204             def _read_tail_done(mc):
205                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
206             d.addCallback(_read_tail_done)
207             d.addCallback(lambda ign:
208                           n.read(MemoryConsumer(), size=len(DATA)+1000))
209             def _read_too_much(mc):
210                 self.failUnlessEqual("".join(mc.chunks), DATA)
211             d.addCallback(_read_too_much)
212
213             return d
214         d.addCallback(_test_read)
215
216         def _test_bad_read(res):
217             bad_u = uri.from_string_filenode(self.uri)
218             bad_u.key = self.flip_bit(bad_u.key)
219             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
220             # this should cause an error during download
221
222             d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
223                                  None,
224                                  bad_n.read, MemoryConsumer(), offset=2)
225             return d
226         d.addCallback(_test_bad_read)
227
228         def _download_nonexistent_uri(res):
229             baduri = self.mangle_uri(self.uri)
230             log.msg("about to download non-existent URI", level=log.UNUSUAL,
231                     facility="tahoe.tests")
232             d1 = self.downloader.download_to_data(baduri)
233             def _baduri_should_fail(res):
234                 log.msg("finished downloading non-existend URI",
235                         level=log.UNUSUAL, facility="tahoe.tests")
236                 self.failUnless(isinstance(res, Failure))
237                 self.failUnless(res.check(NotEnoughSharesError),
238                                 "expected NotEnoughSharesError, got %s" % res)
239                 # TODO: files that have zero peers should get a special kind
240                 # of NotEnoughSharesError, which can be used to suggest that
241                 # the URI might be wrong or that they've never uploaded the
242                 # file in the first place.
243             d1.addBoth(_baduri_should_fail)
244             return d1
245         d.addCallback(_download_nonexistent_uri)
246
247         # add a new node, which doesn't accept shares, and only uses the
248         # helper for upload.
249         d.addCallback(lambda res: self.add_extra_node(self.numclients,
250                                                       self.helper_furl,
251                                                       add_to_sparent=True))
252         def _added(extra_node):
253             self.extra_node = extra_node
254         d.addCallback(_added)
255
256         HELPER_DATA = "Data that needs help to upload" * 1000
257         def _upload_with_helper(res):
258             u = upload.Data(HELPER_DATA, convergence=convergence)
259             d = self.extra_node.upload(u)
260             def _uploaded(results):
261                 uri = results.uri
262                 return self.downloader.download_to_data(uri)
263             d.addCallback(_uploaded)
264             def _check(newdata):
265                 self.failUnlessEqual(newdata, HELPER_DATA)
266             d.addCallback(_check)
267             return d
268         d.addCallback(_upload_with_helper)
269
270         def _upload_duplicate_with_helper(res):
271             u = upload.Data(HELPER_DATA, convergence=convergence)
272             u.debug_stash_RemoteEncryptedUploadable = True
273             d = self.extra_node.upload(u)
274             def _uploaded(results):
275                 uri = results.uri
276                 return self.downloader.download_to_data(uri)
277             d.addCallback(_uploaded)
278             def _check(newdata):
279                 self.failUnlessEqual(newdata, HELPER_DATA)
280                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
281                             "uploadable started uploading, should have been avoided")
282             d.addCallback(_check)
283             return d
284         if convergence is not None:
285             d.addCallback(_upload_duplicate_with_helper)
286
287         def _upload_resumable(res):
288             DATA = "Data that needs help to upload and gets interrupted" * 1000
289             u1 = CountingDataUploadable(DATA, convergence=convergence)
290             u2 = CountingDataUploadable(DATA, convergence=convergence)
291
292             # we interrupt the connection after about 5kB by shutting down
293             # the helper, then restartingit.
294             u1.interrupt_after = 5000
295             u1.interrupt_after_d = defer.Deferred()
296             u1.interrupt_after_d.addCallback(lambda res:
297                                              self.bounce_client(0))
298
299             # sneak into the helper and reduce its chunk size, so that our
300             # debug_interrupt will sever the connection on about the fifth
301             # chunk fetched. This makes sure that we've started to write the
302             # new shares before we abandon them, which exercises the
303             # abort/delete-partial-share code. TODO: find a cleaner way to do
304             # this. I know that this will affect later uses of the helper in
305             # this same test run, but I'm not currently worried about it.
306             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
307
308             d = self.extra_node.upload(u1)
309
310             def _should_not_finish(res):
311                 self.fail("interrupted upload should have failed, not finished"
312                           " with result %s" % (res,))
313             def _interrupted(f):
314                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
315
316                 # make sure we actually interrupted it before finishing the
317                 # file
318                 self.failUnless(u1.bytes_read < len(DATA),
319                                 "read %d out of %d total" % (u1.bytes_read,
320                                                              len(DATA)))
321
322                 log.msg("waiting for reconnect", level=log.NOISY,
323                         facility="tahoe.test.test_system")
324                 # now, we need to give the nodes a chance to notice that this
325                 # connection has gone away. When this happens, the storage
326                 # servers will be told to abort their uploads, removing the
327                 # partial shares. Unfortunately this involves TCP messages
328                 # going through the loopback interface, and we can't easily
329                 # predict how long that will take. If it were all local, we
330                 # could use fireEventually() to stall. Since we don't have
331                 # the right introduction hooks, the best we can do is use a
332                 # fixed delay. TODO: this is fragile.
333                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
334                 return u1.interrupt_after_d
335             d.addCallbacks(_should_not_finish, _interrupted)
336
337             def _disconnected(res):
338                 # check to make sure the storage servers aren't still hanging
339                 # on to the partial share: their incoming/ directories should
340                 # now be empty.
341                 log.msg("disconnected", level=log.NOISY,
342                         facility="tahoe.test.test_system")
343                 for i in range(self.numclients):
344                     incdir = os.path.join(self.getdir("client%d" % i),
345                                           "storage", "shares", "incoming")
346                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
347             d.addCallback(_disconnected)
348
349             # then we need to give the reconnector a chance to
350             # reestablish the connection to the helper.
351             d.addCallback(lambda res:
352                           log.msg("wait_for_connections", level=log.NOISY,
353                                   facility="tahoe.test.test_system"))
354             d.addCallback(lambda res: self.wait_for_connections())
355
356
357             d.addCallback(lambda res:
358                           log.msg("uploading again", level=log.NOISY,
359                                   facility="tahoe.test.test_system"))
360             d.addCallback(lambda res: self.extra_node.upload(u2))
361
362             def _uploaded(results):
363                 uri = results.uri
364                 log.msg("Second upload complete", level=log.NOISY,
365                         facility="tahoe.test.test_system")
366
367                 # this is really bytes received rather than sent, but it's
368                 # convenient and basically measures the same thing
369                 bytes_sent = results.ciphertext_fetched
370
371                 # We currently don't support resumption of upload if the data is
372                 # encrypted with a random key.  (Because that would require us
373                 # to store the key locally and re-use it on the next upload of
374                 # this file, which isn't a bad thing to do, but we currently
375                 # don't do it.)
376                 if convergence is not None:
377                     # Make sure we did not have to read the whole file the
378                     # second time around .
379                     self.failUnless(bytes_sent < len(DATA),
380                                 "resumption didn't save us any work:"
381                                 " read %d bytes out of %d total" %
382                                 (bytes_sent, len(DATA)))
383                 else:
384                     # Make sure we did have to read the whole file the second
385                     # time around -- because the one that we partially uploaded
386                     # earlier was encrypted with a different random key.
387                     self.failIf(bytes_sent < len(DATA),
388                                 "resumption saved us some work even though we were using random keys:"
389                                 " read %d bytes out of %d total" %
390                                 (bytes_sent, len(DATA)))
391                 return self.downloader.download_to_data(uri)
392             d.addCallback(_uploaded)
393
394             def _check(newdata):
395                 self.failUnlessEqual(newdata, DATA)
396                 # If using convergent encryption, then also check that the
397                 # helper has removed the temp file from its directories.
398                 if convergence is not None:
399                     basedir = os.path.join(self.getdir("client0"), "helper")
400                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
401                     self.failUnlessEqual(files, [])
402                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
403                     self.failUnlessEqual(files, [])
404             d.addCallback(_check)
405             return d
406         d.addCallback(_upload_resumable)
407
408         def _grab_stats(ignored):
409             # the StatsProvider doesn't normally publish a FURL:
410             # instead it passes a live reference to the StatsGatherer
411             # (if and when it connects). To exercise the remote stats
412             # interface, we manually publish client0's StatsProvider
413             # and use client1 to query it.
414             sp = self.clients[0].stats_provider
415             sp_furl = self.clients[0].tub.registerReference(sp)
416             d = self.clients[1].tub.getReference(sp_furl)
417             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
418             def _got_stats(stats):
419                 #print "STATS"
420                 #from pprint import pprint
421                 #pprint(stats)
422                 s = stats["stats"]
423                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
424                 c = stats["counters"]
425                 self.failUnless("storage_server.allocate" in c)
426             d.addCallback(_got_stats)
427             return d
428         d.addCallback(_grab_stats)
429
430         return d
431
432     def _find_shares(self, basedir):
433         shares = []
434         for (dirpath, dirnames, filenames) in os.walk(basedir):
435             if "storage" not in dirpath:
436                 continue
437             if not filenames:
438                 continue
439             pieces = dirpath.split(os.sep)
440             if (len(pieces) >= 5
441                 and pieces[-4] == "storage"
442                 and pieces[-3] == "shares"):
443                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
444                 # are sharefiles here
445                 assert pieces[-5].startswith("client")
446                 client_num = int(pieces[-5][-1])
447                 storage_index_s = pieces[-1]
448                 storage_index = si_a2b(storage_index_s)
449                 for sharename in filenames:
450                     shnum = int(sharename)
451                     filename = os.path.join(dirpath, sharename)
452                     data = (client_num, storage_index, filename, shnum)
453                     shares.append(data)
454         if not shares:
455             self.fail("unable to find any share files in %s" % basedir)
456         return shares
457
458     def _corrupt_mutable_share(self, filename, which):
459         msf = MutableShareFile(filename)
460         datav = msf.readv([ (0, 1000000) ])
461         final_share = datav[0]
462         assert len(final_share) < 1000000 # ought to be truncated
463         pieces = mutable_layout.unpack_share(final_share)
464         (seqnum, root_hash, IV, k, N, segsize, datalen,
465          verification_key, signature, share_hash_chain, block_hash_tree,
466          share_data, enc_privkey) = pieces
467
468         if which == "seqnum":
469             seqnum = seqnum + 15
470         elif which == "R":
471             root_hash = self.flip_bit(root_hash)
472         elif which == "IV":
473             IV = self.flip_bit(IV)
474         elif which == "segsize":
475             segsize = segsize + 15
476         elif which == "pubkey":
477             verification_key = self.flip_bit(verification_key)
478         elif which == "signature":
479             signature = self.flip_bit(signature)
480         elif which == "share_hash_chain":
481             nodenum = share_hash_chain.keys()[0]
482             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
483         elif which == "block_hash_tree":
484             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
485         elif which == "share_data":
486             share_data = self.flip_bit(share_data)
487         elif which == "encprivkey":
488             enc_privkey = self.flip_bit(enc_privkey)
489
490         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
491                                             segsize, datalen)
492         final_share = mutable_layout.pack_share(prefix,
493                                                 verification_key,
494                                                 signature,
495                                                 share_hash_chain,
496                                                 block_hash_tree,
497                                                 share_data,
498                                                 enc_privkey)
499         msf.writev( [(0, final_share)], None)
500
501
502     def test_mutable(self):
503         self.basedir = "system/SystemTest/test_mutable"
504         DATA = "initial contents go here."  # 25 bytes % 3 != 0
505         NEWDATA = "new contents yay"
506         NEWERDATA = "this is getting old"
507
508         d = self.set_up_nodes(use_key_generator=True)
509
510         def _create_mutable(res):
511             c = self.clients[0]
512             log.msg("starting create_mutable_file")
513             d1 = c.create_mutable_file(DATA)
514             def _done(res):
515                 log.msg("DONE: %s" % (res,))
516                 self._mutable_node_1 = res
517                 uri = res.get_uri()
518             d1.addCallback(_done)
519             return d1
520         d.addCallback(_create_mutable)
521
522         def _test_debug(res):
523             # find a share. It is important to run this while there is only
524             # one slot in the grid.
525             shares = self._find_shares(self.basedir)
526             (client_num, storage_index, filename, shnum) = shares[0]
527             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
528                     % filename)
529             log.msg(" for clients[%d]" % client_num)
530
531             out,err = StringIO(), StringIO()
532             rc = runner.runner(["debug", "dump-share", "--offsets",
533                                 filename],
534                                stdout=out, stderr=err)
535             output = out.getvalue()
536             self.failUnlessEqual(rc, 0)
537             try:
538                 self.failUnless("Mutable slot found:\n" in output)
539                 self.failUnless("share_type: SDMF\n" in output)
540                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
541                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
542                 self.failUnless(" num_extra_leases: 0\n" in output)
543                 # the pubkey size can vary by a byte, so the container might
544                 # be a bit larger on some runs.
545                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
546                 self.failUnless(m)
547                 container_size = int(m.group(1))
548                 self.failUnless(2037 <= container_size <= 2049, container_size)
549                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
550                 self.failUnless(m)
551                 data_length = int(m.group(1))
552                 self.failUnless(2037 <= data_length <= 2049, data_length)
553                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
554                                 in output)
555                 self.failUnless(" SDMF contents:\n" in output)
556                 self.failUnless("  seqnum: 1\n" in output)
557                 self.failUnless("  required_shares: 3\n" in output)
558                 self.failUnless("  total_shares: 10\n" in output)
559                 self.failUnless("  segsize: 27\n" in output, (output, filename))
560                 self.failUnless("  datalen: 25\n" in output)
561                 # the exact share_hash_chain nodes depends upon the sharenum,
562                 # and is more of a hassle to compute than I want to deal with
563                 # now
564                 self.failUnless("  share_hash_chain: " in output)
565                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
566                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
567                             base32.b2a(storage_index))
568                 self.failUnless(expected in output)
569             except unittest.FailTest:
570                 print
571                 print "dump-share output was:"
572                 print output
573                 raise
574         d.addCallback(_test_debug)
575
576         # test retrieval
577
578         # first, let's see if we can use the existing node to retrieve the
579         # contents. This allows it to use the cached pubkey and maybe the
580         # latest-known sharemap.
581
582         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
583         def _check_download_1(res):
584             self.failUnlessEqual(res, DATA)
585             # now we see if we can retrieve the data from a new node,
586             # constructed using the URI of the original one. We do this test
587             # on the same client that uploaded the data.
588             uri = self._mutable_node_1.get_uri()
589             log.msg("starting retrieve1")
590             newnode = self.clients[0].create_node_from_uri(uri)
591             newnode_2 = self.clients[0].create_node_from_uri(uri)
592             self.failUnlessIdentical(newnode, newnode_2)
593             return newnode.download_best_version()
594         d.addCallback(_check_download_1)
595
596         def _check_download_2(res):
597             self.failUnlessEqual(res, DATA)
598             # same thing, but with a different client
599             uri = self._mutable_node_1.get_uri()
600             newnode = self.clients[1].create_node_from_uri(uri)
601             log.msg("starting retrieve2")
602             d1 = newnode.download_best_version()
603             d1.addCallback(lambda res: (res, newnode))
604             return d1
605         d.addCallback(_check_download_2)
606
607         def _check_download_3((res, newnode)):
608             self.failUnlessEqual(res, DATA)
609             # replace the data
610             log.msg("starting replace1")
611             d1 = newnode.overwrite(NEWDATA)
612             d1.addCallback(lambda res: newnode.download_best_version())
613             return d1
614         d.addCallback(_check_download_3)
615
616         def _check_download_4(res):
617             self.failUnlessEqual(res, NEWDATA)
618             # now create an even newer node and replace the data on it. This
619             # new node has never been used for download before.
620             uri = self._mutable_node_1.get_uri()
621             newnode1 = self.clients[2].create_node_from_uri(uri)
622             newnode2 = self.clients[3].create_node_from_uri(uri)
623             self._newnode3 = self.clients[3].create_node_from_uri(uri)
624             log.msg("starting replace2")
625             d1 = newnode1.overwrite(NEWERDATA)
626             d1.addCallback(lambda res: newnode2.download_best_version())
627             return d1
628         d.addCallback(_check_download_4)
629
630         def _check_download_5(res):
631             log.msg("finished replace2")
632             self.failUnlessEqual(res, NEWERDATA)
633         d.addCallback(_check_download_5)
634
635         def _corrupt_shares(res):
636             # run around and flip bits in all but k of the shares, to test
637             # the hash checks
638             shares = self._find_shares(self.basedir)
639             ## sort by share number
640             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
641             where = dict([ (shnum, filename)
642                            for (client_num, storage_index, filename, shnum)
643                            in shares ])
644             assert len(where) == 10 # this test is designed for 3-of-10
645             for shnum, filename in where.items():
646                 # shares 7,8,9 are left alone. read will check
647                 # (share_hash_chain, block_hash_tree, share_data). New
648                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
649                 # segsize, signature).
650                 if shnum == 0:
651                     # read: this will trigger "pubkey doesn't match
652                     # fingerprint".
653                     self._corrupt_mutable_share(filename, "pubkey")
654                     self._corrupt_mutable_share(filename, "encprivkey")
655                 elif shnum == 1:
656                     # triggers "signature is invalid"
657                     self._corrupt_mutable_share(filename, "seqnum")
658                 elif shnum == 2:
659                     # triggers "signature is invalid"
660                     self._corrupt_mutable_share(filename, "R")
661                 elif shnum == 3:
662                     # triggers "signature is invalid"
663                     self._corrupt_mutable_share(filename, "segsize")
664                 elif shnum == 4:
665                     self._corrupt_mutable_share(filename, "share_hash_chain")
666                 elif shnum == 5:
667                     self._corrupt_mutable_share(filename, "block_hash_tree")
668                 elif shnum == 6:
669                     self._corrupt_mutable_share(filename, "share_data")
670                 # other things to correct: IV, signature
671                 # 7,8,9 are left alone
672
673                 # note that initial_query_count=5 means that we'll hit the
674                 # first 5 servers in effectively random order (based upon
675                 # response time), so we won't necessarily ever get a "pubkey
676                 # doesn't match fingerprint" error (if we hit shnum>=1 before
677                 # shnum=0, we pull the pubkey from there). To get repeatable
678                 # specific failures, we need to set initial_query_count=1,
679                 # but of course that will change the sequencing behavior of
680                 # the retrieval process. TODO: find a reasonable way to make
681                 # this a parameter, probably when we expand this test to test
682                 # for one failure mode at a time.
683
684                 # when we retrieve this, we should get three signature
685                 # failures (where we've mangled seqnum, R, and segsize). The
686                 # pubkey mangling
687         d.addCallback(_corrupt_shares)
688
689         d.addCallback(lambda res: self._newnode3.download_best_version())
690         d.addCallback(_check_download_5)
691
692         def _check_empty_file(res):
693             # make sure we can create empty files, this usually screws up the
694             # segsize math
695             d1 = self.clients[2].create_mutable_file("")
696             d1.addCallback(lambda newnode: newnode.download_best_version())
697             d1.addCallback(lambda res: self.failUnlessEqual("", res))
698             return d1
699         d.addCallback(_check_empty_file)
700
701         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
702         def _created_dirnode(dnode):
703             log.msg("_created_dirnode(%s)" % (dnode,))
704             d1 = dnode.list()
705             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
706             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
707             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
708             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
709             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
710             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
711             d1.addCallback(lambda res: dnode.build_manifest().when_done())
712             d1.addCallback(lambda res:
713                            self.failUnlessEqual(len(res["manifest"]), 1))
714             return d1
715         d.addCallback(_created_dirnode)
716
717         def wait_for_c3_kg_conn():
718             return self.clients[3]._key_generator is not None
719         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
720
721         def check_kg_poolsize(junk, size_delta):
722             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
723                                  self.key_generator_svc.key_generator.pool_size + size_delta)
724
725         d.addCallback(check_kg_poolsize, 0)
726         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
727         d.addCallback(check_kg_poolsize, -1)
728         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
729         d.addCallback(check_kg_poolsize, -2)
730         # use_helper induces use of clients[3], which is the using-key_gen client
731         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
732         d.addCallback(check_kg_poolsize, -3)
733
734         return d
735
736     def flip_bit(self, good):
737         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
738
739     def mangle_uri(self, gooduri):
740         # change the key, which changes the storage index, which means we'll
741         # be asking about the wrong file, so nobody will have any shares
742         u = IFileURI(gooduri)
743         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
744                             uri_extension_hash=u.uri_extension_hash,
745                             needed_shares=u.needed_shares,
746                             total_shares=u.total_shares,
747                             size=u.size)
748         return u2.to_string()
749
750     # TODO: add a test which mangles the uri_extension_hash instead, and
751     # should fail due to not being able to get a valid uri_extension block.
752     # Also a test which sneakily mangles the uri_extension block to change
753     # some of the validation data, so it will fail in the post-download phase
754     # when the file's crypttext integrity check fails. Do the same thing for
755     # the key, which should cause the download to fail the post-download
756     # plaintext_hash check.
757
758     def test_vdrive(self):
759         self.basedir = "system/SystemTest/test_vdrive"
760         self.data = LARGE_DATA
761         d = self.set_up_nodes(use_stats_gatherer=True)
762         d.addCallback(self._test_introweb)
763         d.addCallback(self.log, "starting publish")
764         d.addCallback(self._do_publish1)
765         d.addCallback(self._test_runner)
766         d.addCallback(self._do_publish2)
767         # at this point, we have the following filesystem (where "R" denotes
768         # self._root_directory_uri):
769         # R
770         # R/subdir1
771         # R/subdir1/mydata567
772         # R/subdir1/subdir2/
773         # R/subdir1/subdir2/mydata992
774
775         d.addCallback(lambda res: self.bounce_client(0))
776         d.addCallback(self.log, "bounced client0")
777
778         d.addCallback(self._check_publish1)
779         d.addCallback(self.log, "did _check_publish1")
780         d.addCallback(self._check_publish2)
781         d.addCallback(self.log, "did _check_publish2")
782         d.addCallback(self._do_publish_private)
783         d.addCallback(self.log, "did _do_publish_private")
784         # now we also have (where "P" denotes a new dir):
785         #  P/personal/sekrit data
786         #  P/s2-rw -> /subdir1/subdir2/
787         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
788         d.addCallback(self._check_publish_private)
789         d.addCallback(self.log, "did _check_publish_private")
790         d.addCallback(self._test_web)
791         d.addCallback(self._test_control)
792         d.addCallback(self._test_cli)
793         # P now has four top-level children:
794         # P/personal/sekrit data
795         # P/s2-ro/
796         # P/s2-rw/
797         # P/test_put/  (empty)
798         d.addCallback(self._test_checker)
799         return d
800
801     def _test_introweb(self, res):
802         d = getPage(self.introweb_url, method="GET", followRedirect=True)
803         def _check(res):
804             try:
805                 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
806                                 in res)
807                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
808                 self.failUnless("Subscription Summary: storage: 5" in res)
809             except unittest.FailTest:
810                 print
811                 print "GET %s output was:" % self.introweb_url
812                 print res
813                 raise
814         d.addCallback(_check)
815         d.addCallback(lambda res:
816                       getPage(self.introweb_url + "?t=json",
817                               method="GET", followRedirect=True))
818         def _check_json(res):
819             data = simplejson.loads(res)
820             try:
821                 self.failUnlessEqual(data["subscription_summary"],
822                                      {"storage": 5})
823                 self.failUnlessEqual(data["announcement_summary"],
824                                      {"storage": 5, "stub_client": 5})
825                 self.failUnlessEqual(data["announcement_distinct_hosts"],
826                                      {"storage": 1, "stub_client": 1})
827             except unittest.FailTest:
828                 print
829                 print "GET %s?t=json output was:" % self.introweb_url
830                 print res
831                 raise
832         d.addCallback(_check_json)
833         return d
834
835     def _do_publish1(self, res):
836         ut = upload.Data(self.data, convergence=None)
837         c0 = self.clients[0]
838         d = c0.create_empty_dirnode()
839         def _made_root(new_dirnode):
840             self._root_directory_uri = new_dirnode.get_uri()
841             return c0.create_node_from_uri(self._root_directory_uri)
842         d.addCallback(_made_root)
843         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
844         def _made_subdir1(subdir1_node):
845             self._subdir1_node = subdir1_node
846             d1 = subdir1_node.add_file(u"mydata567", ut)
847             d1.addCallback(self.log, "publish finished")
848             def _stash_uri(filenode):
849                 self.uri = filenode.get_uri()
850                 assert isinstance(self.uri, str), (self.uri, filenode)
851             d1.addCallback(_stash_uri)
852             return d1
853         d.addCallback(_made_subdir1)
854         return d
855
856     def _do_publish2(self, res):
857         ut = upload.Data(self.data, convergence=None)
858         d = self._subdir1_node.create_empty_directory(u"subdir2")
859         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
860         return d
861
862     def log(self, res, *args, **kwargs):
863         # print "MSG: %s  RES: %s" % (msg, args)
864         log.msg(*args, **kwargs)
865         return res
866
867     def _do_publish_private(self, res):
868         self.smalldata = "sssh, very secret stuff"
869         ut = upload.Data(self.smalldata, convergence=None)
870         d = self.clients[0].create_empty_dirnode()
871         d.addCallback(self.log, "GOT private directory")
872         def _got_new_dir(privnode):
873             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
874             d1 = privnode.create_empty_directory(u"personal")
875             d1.addCallback(self.log, "made P/personal")
876             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
877             d1.addCallback(self.log, "made P/personal/sekrit data")
878             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
879             def _got_s2(s2node):
880                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
881                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
882                 return d2
883             d1.addCallback(_got_s2)
884             d1.addCallback(lambda res: privnode)
885             return d1
886         d.addCallback(_got_new_dir)
887         return d
888
889     def _check_publish1(self, res):
890         # this one uses the iterative API
891         c1 = self.clients[1]
892         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
893         d.addCallback(self.log, "check_publish1 got /")
894         d.addCallback(lambda root: root.get(u"subdir1"))
895         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
896         d.addCallback(lambda filenode: filenode.download_to_data())
897         d.addCallback(self.log, "get finished")
898         def _get_done(data):
899             self.failUnlessEqual(data, self.data)
900         d.addCallback(_get_done)
901         return d
902
903     def _check_publish2(self, res):
904         # this one uses the path-based API
905         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
906         d = rootnode.get_child_at_path(u"subdir1")
907         d.addCallback(lambda dirnode:
908                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
909         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
910         d.addCallback(lambda filenode: filenode.download_to_data())
911         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
912
913         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
914         def _got_filenode(filenode):
915             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
916             assert fnode == filenode
917         d.addCallback(_got_filenode)
918         return d
919
920     def _check_publish_private(self, resnode):
921         # this one uses the path-based API
922         self._private_node = resnode
923
924         d = self._private_node.get_child_at_path(u"personal")
925         def _got_personal(personal):
926             self._personal_node = personal
927             return personal
928         d.addCallback(_got_personal)
929
930         d.addCallback(lambda dirnode:
931                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
932         def get_path(path):
933             return self._private_node.get_child_at_path(path)
934
935         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
936         d.addCallback(lambda filenode: filenode.download_to_data())
937         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
938         d.addCallback(lambda res: get_path(u"s2-rw"))
939         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
940         d.addCallback(lambda res: get_path(u"s2-ro"))
941         def _got_s2ro(dirnode):
942             self.failUnless(dirnode.is_mutable(), dirnode)
943             self.failUnless(dirnode.is_readonly(), dirnode)
944             d1 = defer.succeed(None)
945             d1.addCallback(lambda res: dirnode.list())
946             d1.addCallback(self.log, "dirnode.list")
947
948             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
949
950             d1.addCallback(self.log, "doing add_file(ro)")
951             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
952             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
953
954             d1.addCallback(self.log, "doing get(ro)")
955             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
956             d1.addCallback(lambda filenode:
957                            self.failUnless(IFileNode.providedBy(filenode)))
958
959             d1.addCallback(self.log, "doing delete(ro)")
960             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
961
962             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
963
964             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
965
966             personal = self._personal_node
967             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
968
969             d1.addCallback(self.log, "doing move_child_to(ro)2")
970             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
971
972             d1.addCallback(self.log, "finished with _got_s2ro")
973             return d1
974         d.addCallback(_got_s2ro)
975         def _got_home(dummy):
976             home = self._private_node
977             personal = self._personal_node
978             d1 = defer.succeed(None)
979             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
980             d1.addCallback(lambda res:
981                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
982
983             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
984             d1.addCallback(lambda res:
985                            home.move_child_to(u"sekrit", home, u"sekrit data"))
986
987             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
988             d1.addCallback(lambda res:
989                            home.move_child_to(u"sekrit data", personal))
990
991             d1.addCallback(lambda res: home.build_manifest().when_done())
992             d1.addCallback(self.log, "manifest")
993             #  five items:
994             # P/
995             # P/personal/
996             # P/personal/sekrit data
997             # P/s2-rw  (same as P/s2-ro)
998             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
999             d1.addCallback(lambda res:
1000                            self.failUnlessEqual(len(res["manifest"]), 5))
1001             d1.addCallback(lambda res: home.start_deep_stats().when_done())
1002             def _check_stats(stats):
1003                 expected = {"count-immutable-files": 1,
1004                             "count-mutable-files": 0,
1005                             "count-literal-files": 1,
1006                             "count-files": 2,
1007                             "count-directories": 3,
1008                             "size-immutable-files": 112,
1009                             "size-literal-files": 23,
1010                             #"size-directories": 616, # varies
1011                             #"largest-directory": 616,
1012                             "largest-directory-children": 3,
1013                             "largest-immutable-file": 112,
1014                             }
1015                 for k,v in expected.iteritems():
1016                     self.failUnlessEqual(stats[k], v,
1017                                          "stats[%s] was %s, not %s" %
1018                                          (k, stats[k], v))
1019                 self.failUnless(stats["size-directories"] > 1300,
1020                                 stats["size-directories"])
1021                 self.failUnless(stats["largest-directory"] > 800,
1022                                 stats["largest-directory"])
1023                 self.failUnlessEqual(stats["size-files-histogram"],
1024                                      [ (11, 31, 1), (101, 316, 1) ])
1025             d1.addCallback(_check_stats)
1026             return d1
1027         d.addCallback(_got_home)
1028         return d
1029
1030     def shouldFail(self, res, expected_failure, which, substring=None):
1031         if isinstance(res, Failure):
1032             res.trap(expected_failure)
1033             if substring:
1034                 self.failUnless(substring in str(res),
1035                                 "substring '%s' not in '%s'"
1036                                 % (substring, str(res)))
1037         else:
1038             self.fail("%s was supposed to raise %s, not get '%s'" %
1039                       (which, expected_failure, res))
1040
1041     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1042         assert substring is None or isinstance(substring, str)
1043         d = defer.maybeDeferred(callable, *args, **kwargs)
1044         def done(res):
1045             if isinstance(res, Failure):
1046                 res.trap(expected_failure)
1047                 if substring:
1048                     self.failUnless(substring in str(res),
1049                                     "substring '%s' not in '%s'"
1050                                     % (substring, str(res)))
1051             else:
1052                 self.fail("%s was supposed to raise %s, not get '%s'" %
1053                           (which, expected_failure, res))
1054         d.addBoth(done)
1055         return d
1056
1057     def PUT(self, urlpath, data):
1058         url = self.webish_url + urlpath
1059         return getPage(url, method="PUT", postdata=data)
1060
1061     def GET(self, urlpath, followRedirect=False):
1062         url = self.webish_url + urlpath
1063         return getPage(url, method="GET", followRedirect=followRedirect)
1064
1065     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1066         if use_helper:
1067             url = self.helper_webish_url + urlpath
1068         else:
1069             url = self.webish_url + urlpath
1070         sepbase = "boogabooga"
1071         sep = "--" + sepbase
1072         form = []
1073         form.append(sep)
1074         form.append('Content-Disposition: form-data; name="_charset"')
1075         form.append('')
1076         form.append('UTF-8')
1077         form.append(sep)
1078         for name, value in fields.iteritems():
1079             if isinstance(value, tuple):
1080                 filename, value = value
1081                 form.append('Content-Disposition: form-data; name="%s"; '
1082                             'filename="%s"' % (name, filename.encode("utf-8")))
1083             else:
1084                 form.append('Content-Disposition: form-data; name="%s"' % name)
1085             form.append('')
1086             form.append(str(value))
1087             form.append(sep)
1088         form[-1] += "--"
1089         body = "\r\n".join(form) + "\r\n"
1090         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1091                    }
1092         return getPage(url, method="POST", postdata=body,
1093                        headers=headers, followRedirect=followRedirect)
1094
1095     def _test_web(self, res):
1096         base = self.webish_url
1097         public = "uri/" + self._root_directory_uri
1098         d = getPage(base)
1099         def _got_welcome(page):
1100             # XXX This test is oversensitive to formatting
1101             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1102             self.failUnless(expected in page,
1103                             "I didn't see the right 'connected storage servers'"
1104                             " message in: %s" % page
1105                             )
1106             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1107             self.failUnless(expected in page,
1108                             "I didn't see the right 'My nodeid' message "
1109                             "in: %s" % page)
1110             self.failUnless("Helper: 0 active uploads" in page)
1111         d.addCallback(_got_welcome)
1112         d.addCallback(self.log, "done with _got_welcome")
1113
1114         # get the welcome page from the node that uses the helper too
1115         d.addCallback(lambda res: getPage(self.helper_webish_url))
1116         def _got_welcome_helper(page):
1117             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1118                             page)
1119             self.failUnless("Not running helper" in page)
1120         d.addCallback(_got_welcome_helper)
1121
1122         d.addCallback(lambda res: getPage(base + public))
1123         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1124         def _got_subdir1(page):
1125             # there ought to be an href for our file
1126             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1127             self.failUnless(">mydata567</a>" in page)
1128         d.addCallback(_got_subdir1)
1129         d.addCallback(self.log, "done with _got_subdir1")
1130         d.addCallback(lambda res:
1131                       getPage(base + public + "/subdir1/mydata567"))
1132         def _got_data(page):
1133             self.failUnlessEqual(page, self.data)
1134         d.addCallback(_got_data)
1135
1136         # download from a URI embedded in a URL
1137         d.addCallback(self.log, "_get_from_uri")
1138         def _get_from_uri(res):
1139             return getPage(base + "uri/%s?filename=%s"
1140                            % (self.uri, "mydata567"))
1141         d.addCallback(_get_from_uri)
1142         def _got_from_uri(page):
1143             self.failUnlessEqual(page, self.data)
1144         d.addCallback(_got_from_uri)
1145
1146         # download from a URI embedded in a URL, second form
1147         d.addCallback(self.log, "_get_from_uri2")
1148         def _get_from_uri2(res):
1149             return getPage(base + "uri?uri=%s" % (self.uri,))
1150         d.addCallback(_get_from_uri2)
1151         d.addCallback(_got_from_uri)
1152
1153         # download from a bogus URI, make sure we get a reasonable error
1154         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1155         def _get_from_bogus_uri(res):
1156             d1 = getPage(base + "uri/%s?filename=%s"
1157                          % (self.mangle_uri(self.uri), "mydata567"))
1158             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1159                        "410")
1160             return d1
1161         d.addCallback(_get_from_bogus_uri)
1162         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1163
1164         # upload a file with PUT
1165         d.addCallback(self.log, "about to try PUT")
1166         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1167                                            "new.txt contents"))
1168         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1169         d.addCallback(self.failUnlessEqual, "new.txt contents")
1170         # and again with something large enough to use multiple segments,
1171         # and hopefully trigger pauseProducing too
1172         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1173                                            "big" * 500000)) # 1.5MB
1174         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1175         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1176
1177         # can we replace files in place?
1178         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1179                                            "NEWER contents"))
1180         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1181         d.addCallback(self.failUnlessEqual, "NEWER contents")
1182
1183         # test unlinked POST
1184         d.addCallback(lambda res: self.POST("uri", t="upload",
1185                                             file=("new.txt", "data" * 10000)))
1186         # and again using the helper, which exercises different upload-status
1187         # display code
1188         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1189                                             file=("foo.txt", "data2" * 10000)))
1190
1191         # check that the status page exists
1192         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1193         def _got_status(res):
1194             # find an interesting upload and download to look at. LIT files
1195             # are not interesting.
1196             for ds in self.clients[0].list_all_download_statuses():
1197                 if ds.get_size() > 200:
1198                     self._down_status = ds.get_counter()
1199             for us in self.clients[0].list_all_upload_statuses():
1200                 if us.get_size() > 200:
1201                     self._up_status = us.get_counter()
1202             rs = list(self.clients[0].list_all_retrieve_statuses())[0]
1203             self._retrieve_status = rs.get_counter()
1204             ps = list(self.clients[0].list_all_publish_statuses())[0]
1205             self._publish_status = ps.get_counter()
1206             us = list(self.clients[0].list_all_mapupdate_statuses())[0]
1207             self._update_status = us.get_counter()
1208
1209             # and that there are some upload- and download- status pages
1210             return self.GET("status/up-%d" % self._up_status)
1211         d.addCallback(_got_status)
1212         def _got_up(res):
1213             return self.GET("status/down-%d" % self._down_status)
1214         d.addCallback(_got_up)
1215         def _got_down(res):
1216             return self.GET("status/mapupdate-%d" % self._update_status)
1217         d.addCallback(_got_down)
1218         def _got_update(res):
1219             return self.GET("status/publish-%d" % self._publish_status)
1220         d.addCallback(_got_update)
1221         def _got_publish(res):
1222             return self.GET("status/retrieve-%d" % self._retrieve_status)
1223         d.addCallback(_got_publish)
1224
1225         # check that the helper status page exists
1226         d.addCallback(lambda res:
1227                       self.GET("helper_status", followRedirect=True))
1228         def _got_helper_status(res):
1229             self.failUnless("Bytes Fetched:" in res)
1230             # touch a couple of files in the helper's working directory to
1231             # exercise more code paths
1232             workdir = os.path.join(self.getdir("client0"), "helper")
1233             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1234             f = open(incfile, "wb")
1235             f.write("small file")
1236             f.close()
1237             then = time.time() - 86400*3
1238             now = time.time()
1239             os.utime(incfile, (now, then))
1240             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1241             f = open(encfile, "wb")
1242             f.write("less small file")
1243             f.close()
1244             os.utime(encfile, (now, then))
1245         d.addCallback(_got_helper_status)
1246         # and that the json form exists
1247         d.addCallback(lambda res:
1248                       self.GET("helper_status?t=json", followRedirect=True))
1249         def _got_helper_status_json(res):
1250             data = simplejson.loads(res)
1251             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1252                                  1)
1253             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1254             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1255             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1256                                  10)
1257             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1258             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1259             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1260                                  15)
1261         d.addCallback(_got_helper_status_json)
1262
1263         # and check that client[3] (which uses a helper but does not run one
1264         # itself) doesn't explode when you ask for its status
1265         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1266         def _got_non_helper_status(res):
1267             self.failUnless("Upload and Download Status" in res)
1268         d.addCallback(_got_non_helper_status)
1269
1270         # or for helper status with t=json
1271         d.addCallback(lambda res:
1272                       getPage(self.helper_webish_url + "helper_status?t=json"))
1273         def _got_non_helper_status_json(res):
1274             data = simplejson.loads(res)
1275             self.failUnlessEqual(data, {})
1276         d.addCallback(_got_non_helper_status_json)
1277
1278         # see if the statistics page exists
1279         d.addCallback(lambda res: self.GET("statistics"))
1280         def _got_stats(res):
1281             self.failUnless("Node Statistics" in res)
1282             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1283         d.addCallback(_got_stats)
1284         d.addCallback(lambda res: self.GET("statistics?t=json"))
1285         def _got_stats_json(res):
1286             data = simplejson.loads(res)
1287             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1288             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1289         d.addCallback(_got_stats_json)
1290
1291         # TODO: mangle the second segment of a file, to test errors that
1292         # occur after we've already sent some good data, which uses a
1293         # different error path.
1294
1295         # TODO: download a URI with a form
1296         # TODO: create a directory by using a form
1297         # TODO: upload by using a form on the directory page
1298         #    url = base + "somedir/subdir1/freeform_post!!upload"
1299         # TODO: delete a file by using a button on the directory page
1300
1301         return d
1302
1303     def _test_runner(self, res):
1304         # exercise some of the diagnostic tools in runner.py
1305
1306         # find a share
1307         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1308             if "storage" not in dirpath:
1309                 continue
1310             if not filenames:
1311                 continue
1312             pieces = dirpath.split(os.sep)
1313             if (len(pieces) >= 4
1314                 and pieces[-4] == "storage"
1315                 and pieces[-3] == "shares"):
1316                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1317                 # are sharefiles here
1318                 filename = os.path.join(dirpath, filenames[0])
1319                 # peek at the magic to see if it is a chk share
1320                 magic = open(filename, "rb").read(4)
1321                 if magic == '\x00\x00\x00\x01':
1322                     break
1323         else:
1324             self.fail("unable to find any uri_extension files in %s"
1325                       % self.basedir)
1326         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1327
1328         out,err = StringIO(), StringIO()
1329         rc = runner.runner(["debug", "dump-share", "--offsets",
1330                             filename],
1331                            stdout=out, stderr=err)
1332         output = out.getvalue()
1333         self.failUnlessEqual(rc, 0)
1334
1335         # we only upload a single file, so we can assert some things about
1336         # its size and shares.
1337         self.failUnless(("share filename: %s" % filename) in output)
1338         self.failUnless("size: %d\n" % len(self.data) in output)
1339         self.failUnless("num_segments: 1\n" in output)
1340         # segment_size is always a multiple of needed_shares
1341         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1342         self.failUnless("total_shares: 10\n" in output)
1343         # keys which are supposed to be present
1344         for key in ("size", "num_segments", "segment_size",
1345                     "needed_shares", "total_shares",
1346                     "codec_name", "codec_params", "tail_codec_params",
1347                     #"plaintext_hash", "plaintext_root_hash",
1348                     "crypttext_hash", "crypttext_root_hash",
1349                     "share_root_hash", "UEB_hash"):
1350             self.failUnless("%s: " % key in output, key)
1351         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1352
1353         # now use its storage index to find the other shares using the
1354         # 'find-shares' tool
1355         sharedir, shnum = os.path.split(filename)
1356         storagedir, storage_index_s = os.path.split(sharedir)
1357         out,err = StringIO(), StringIO()
1358         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1359         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1360         rc = runner.runner(cmd, stdout=out, stderr=err)
1361         self.failUnlessEqual(rc, 0)
1362         out.seek(0)
1363         sharefiles = [sfn.strip() for sfn in out.readlines()]
1364         self.failUnlessEqual(len(sharefiles), 10)
1365
1366         # also exercise the 'catalog-shares' tool
1367         out,err = StringIO(), StringIO()
1368         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1369         cmd = ["debug", "catalog-shares"] + nodedirs
1370         rc = runner.runner(cmd, stdout=out, stderr=err)
1371         self.failUnlessEqual(rc, 0)
1372         out.seek(0)
1373         descriptions = [sfn.strip() for sfn in out.readlines()]
1374         self.failUnlessEqual(len(descriptions), 30)
1375         matching = [line
1376                     for line in descriptions
1377                     if line.startswith("CHK %s " % storage_index_s)]
1378         self.failUnlessEqual(len(matching), 10)
1379
1380     def _test_control(self, res):
1381         # exercise the remote-control-the-client foolscap interfaces in
1382         # allmydata.control (mostly used for performance tests)
1383         c0 = self.clients[0]
1384         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1385         control_furl = open(control_furl_file, "r").read().strip()
1386         # it doesn't really matter which Tub we use to connect to the client,
1387         # so let's just use our IntroducerNode's
1388         d = self.introducer.tub.getReference(control_furl)
1389         d.addCallback(self._test_control2, control_furl_file)
1390         return d
1391     def _test_control2(self, rref, filename):
1392         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1393         downfile = os.path.join(self.basedir, "control.downfile")
1394         d.addCallback(lambda uri:
1395                       rref.callRemote("download_from_uri_to_file",
1396                                       uri, downfile))
1397         def _check(res):
1398             self.failUnlessEqual(res, downfile)
1399             data = open(downfile, "r").read()
1400             expected_data = open(filename, "r").read()
1401             self.failUnlessEqual(data, expected_data)
1402         d.addCallback(_check)
1403         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1404         if sys.platform == "linux2":
1405             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1406         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1407         return d
1408
1409     def _test_cli(self, res):
1410         # run various CLI commands (in a thread, since they use blocking
1411         # network calls)
1412
1413         private_uri = self._private_node.get_uri()
1414         some_uri = self._root_directory_uri
1415         client0_basedir = self.getdir("client0")
1416
1417         nodeargs = [
1418             "--node-directory", client0_basedir,
1419             ]
1420         TESTDATA = "I will not write the same thing over and over.\n" * 100
1421
1422         d = defer.succeed(None)
1423
1424         # for compatibility with earlier versions, private/root_dir.cap is
1425         # supposed to be treated as an alias named "tahoe:". Start by making
1426         # sure that works, before we add other aliases.
1427
1428         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1429         f = open(root_file, "w")
1430         f.write(private_uri)
1431         f.close()
1432
1433         def run(ignored, verb, *args, **kwargs):
1434             stdin = kwargs.get("stdin", "")
1435             newargs = [verb] + nodeargs + list(args)
1436             return self._run_cli(newargs, stdin=stdin)
1437
1438         def _check_ls((out,err), expected_children, unexpected_children=[]):
1439             self.failUnlessEqual(err, "")
1440             for s in expected_children:
1441                 self.failUnless(s in out, (s,out))
1442             for s in unexpected_children:
1443                 self.failIf(s in out, (s,out))
1444
1445         def _check_ls_root((out,err)):
1446             self.failUnless("personal" in out)
1447             self.failUnless("s2-ro" in out)
1448             self.failUnless("s2-rw" in out)
1449             self.failUnlessEqual(err, "")
1450
1451         # this should reference private_uri
1452         d.addCallback(run, "ls")
1453         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1454
1455         d.addCallback(run, "list-aliases")
1456         def _check_aliases_1((out,err)):
1457             self.failUnlessEqual(err, "")
1458             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1459         d.addCallback(_check_aliases_1)
1460
1461         # now that that's out of the way, remove root_dir.cap and work with
1462         # new files
1463         d.addCallback(lambda res: os.unlink(root_file))
1464         d.addCallback(run, "list-aliases")
1465         def _check_aliases_2((out,err)):
1466             self.failUnlessEqual(err, "")
1467             self.failUnlessEqual(out, "")
1468         d.addCallback(_check_aliases_2)
1469
1470         d.addCallback(run, "mkdir")
1471         def _got_dir( (out,err) ):
1472             self.failUnless(uri.from_string_dirnode(out.strip()))
1473             return out.strip()
1474         d.addCallback(_got_dir)
1475         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1476
1477         d.addCallback(run, "list-aliases")
1478         def _check_aliases_3((out,err)):
1479             self.failUnlessEqual(err, "")
1480             self.failUnless("tahoe: " in out)
1481         d.addCallback(_check_aliases_3)
1482
1483         def _check_empty_dir((out,err)):
1484             self.failUnlessEqual(out, "")
1485             self.failUnlessEqual(err, "")
1486         d.addCallback(run, "ls")
1487         d.addCallback(_check_empty_dir)
1488
1489         def _check_missing_dir((out,err)):
1490             # TODO: check that rc==2
1491             self.failUnlessEqual(out, "")
1492             self.failUnlessEqual(err, "No such file or directory\n")
1493         d.addCallback(run, "ls", "bogus")
1494         d.addCallback(_check_missing_dir)
1495
1496         files = []
1497         datas = []
1498         for i in range(10):
1499             fn = os.path.join(self.basedir, "file%d" % i)
1500             files.append(fn)
1501             data = "data to be uploaded: file%d\n" % i
1502             datas.append(data)
1503             open(fn,"wb").write(data)
1504
1505         def _check_stdout_against((out,err), filenum=None, data=None):
1506             self.failUnlessEqual(err, "")
1507             if filenum is not None:
1508                 self.failUnlessEqual(out, datas[filenum])
1509             if data is not None:
1510                 self.failUnlessEqual(out, data)
1511
1512         # test all both forms of put: from a file, and from stdin
1513         #  tahoe put bar FOO
1514         d.addCallback(run, "put", files[0], "tahoe-file0")
1515         def _put_out((out,err)):
1516             self.failUnless("URI:LIT:" in out, out)
1517             self.failUnless("201 Created" in err, err)
1518             uri0 = out.strip()
1519             return run(None, "get", uri0)
1520         d.addCallback(_put_out)
1521         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1522
1523         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1524         #  tahoe put bar tahoe:FOO
1525         d.addCallback(run, "put", files[2], "tahoe:file2")
1526         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1527         def _check_put_mutable((out,err)):
1528             self._mutable_file3_uri = out.strip()
1529         d.addCallback(_check_put_mutable)
1530         d.addCallback(run, "get", "tahoe:file3")
1531         d.addCallback(_check_stdout_against, 3)
1532
1533         #  tahoe put FOO
1534         STDIN_DATA = "This is the file to upload from stdin."
1535         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1536         #  tahoe put tahoe:FOO
1537         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1538                       stdin="Other file from stdin.")
1539
1540         d.addCallback(run, "ls")
1541         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1542                                   "tahoe-file-stdin", "from-stdin"])
1543         d.addCallback(run, "ls", "subdir")
1544         d.addCallback(_check_ls, ["tahoe-file1"])
1545
1546         # tahoe mkdir FOO
1547         d.addCallback(run, "mkdir", "subdir2")
1548         d.addCallback(run, "ls")
1549         # TODO: extract the URI, set an alias with it
1550         d.addCallback(_check_ls, ["subdir2"])
1551
1552         # tahoe get: (to stdin and to a file)
1553         d.addCallback(run, "get", "tahoe-file0")
1554         d.addCallback(_check_stdout_against, 0)
1555         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1556         d.addCallback(_check_stdout_against, 1)
1557         outfile0 = os.path.join(self.basedir, "outfile0")
1558         d.addCallback(run, "get", "file2", outfile0)
1559         def _check_outfile0((out,err)):
1560             data = open(outfile0,"rb").read()
1561             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1562         d.addCallback(_check_outfile0)
1563         outfile1 = os.path.join(self.basedir, "outfile0")
1564         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1565         def _check_outfile1((out,err)):
1566             data = open(outfile1,"rb").read()
1567             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1568         d.addCallback(_check_outfile1)
1569
1570         d.addCallback(run, "rm", "tahoe-file0")
1571         d.addCallback(run, "rm", "tahoe:file2")
1572         d.addCallback(run, "ls")
1573         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1574
1575         d.addCallback(run, "ls", "-l")
1576         def _check_ls_l((out,err)):
1577             lines = out.split("\n")
1578             for l in lines:
1579                 if "tahoe-file-stdin" in l:
1580                     self.failUnless(l.startswith("-r-- "), l)
1581                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1582                 if "file3" in l:
1583                     self.failUnless(l.startswith("-rw- "), l) # mutable
1584         d.addCallback(_check_ls_l)
1585
1586         d.addCallback(run, "ls", "--uri")
1587         def _check_ls_uri((out,err)):
1588             lines = out.split("\n")
1589             for l in lines:
1590                 if "file3" in l:
1591                     self.failUnless(self._mutable_file3_uri in l)
1592         d.addCallback(_check_ls_uri)
1593
1594         d.addCallback(run, "ls", "--readonly-uri")
1595         def _check_ls_rouri((out,err)):
1596             lines = out.split("\n")
1597             for l in lines:
1598                 if "file3" in l:
1599                     rw_uri = self._mutable_file3_uri
1600                     u = uri.from_string_mutable_filenode(rw_uri)
1601                     ro_uri = u.get_readonly().to_string()
1602                     self.failUnless(ro_uri in l)
1603         d.addCallback(_check_ls_rouri)
1604
1605
1606         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1607         d.addCallback(run, "ls")
1608         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1609
1610         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1611         d.addCallback(run, "ls")
1612         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1613
1614         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1615         d.addCallback(run, "ls")
1616         d.addCallback(_check_ls, ["file3", "file3-copy"])
1617         d.addCallback(run, "get", "tahoe:file3-copy")
1618         d.addCallback(_check_stdout_against, 3)
1619
1620         # copy from disk into tahoe
1621         d.addCallback(run, "cp", files[4], "tahoe:file4")
1622         d.addCallback(run, "ls")
1623         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1624         d.addCallback(run, "get", "tahoe:file4")
1625         d.addCallback(_check_stdout_against, 4)
1626
1627         # copy from tahoe into disk
1628         target_filename = os.path.join(self.basedir, "file-out")
1629         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1630         def _check_cp_out((out,err)):
1631             self.failUnless(os.path.exists(target_filename))
1632             got = open(target_filename,"rb").read()
1633             self.failUnlessEqual(got, datas[4])
1634         d.addCallback(_check_cp_out)
1635
1636         # copy from disk to disk (silly case)
1637         target2_filename = os.path.join(self.basedir, "file-out-copy")
1638         d.addCallback(run, "cp", target_filename, target2_filename)
1639         def _check_cp_out2((out,err)):
1640             self.failUnless(os.path.exists(target2_filename))
1641             got = open(target2_filename,"rb").read()
1642             self.failUnlessEqual(got, datas[4])
1643         d.addCallback(_check_cp_out2)
1644
1645         # copy from tahoe into disk, overwriting an existing file
1646         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1647         def _check_cp_out3((out,err)):
1648             self.failUnless(os.path.exists(target_filename))
1649             got = open(target_filename,"rb").read()
1650             self.failUnlessEqual(got, datas[3])
1651         d.addCallback(_check_cp_out3)
1652
1653         # copy from disk into tahoe, overwriting an existing immutable file
1654         d.addCallback(run, "cp", files[5], "tahoe:file4")
1655         d.addCallback(run, "ls")
1656         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1657         d.addCallback(run, "get", "tahoe:file4")
1658         d.addCallback(_check_stdout_against, 5)
1659
1660         # copy from disk into tahoe, overwriting an existing mutable file
1661         d.addCallback(run, "cp", files[5], "tahoe:file3")
1662         d.addCallback(run, "ls")
1663         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1664         d.addCallback(run, "get", "tahoe:file3")
1665         d.addCallback(_check_stdout_against, 5)
1666
1667         # recursive copy: setup
1668         dn = os.path.join(self.basedir, "dir1")
1669         os.makedirs(dn)
1670         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1671         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1672         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1673         sdn2 = os.path.join(dn, "subdir2")
1674         os.makedirs(sdn2)
1675         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1676         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1677
1678         # from disk into tahoe
1679         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1680         d.addCallback(run, "ls")
1681         d.addCallback(_check_ls, ["dir1"])
1682         d.addCallback(run, "ls", "dir1")
1683         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1684                       ["rfile4", "rfile5"])
1685         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1686         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1687                       ["rfile1", "rfile2", "rfile3"])
1688         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1689         d.addCallback(_check_stdout_against, data="rfile4")
1690
1691         # and back out again
1692         dn_copy = os.path.join(self.basedir, "dir1-copy")
1693         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1694         def _check_cp_r_out((out,err)):
1695             def _cmp(name):
1696                 old = open(os.path.join(dn, name), "rb").read()
1697                 newfn = os.path.join(dn_copy, name)
1698                 self.failUnless(os.path.exists(newfn))
1699                 new = open(newfn, "rb").read()
1700                 self.failUnlessEqual(old, new)
1701             _cmp("rfile1")
1702             _cmp("rfile2")
1703             _cmp("rfile3")
1704             _cmp(os.path.join("subdir2", "rfile4"))
1705             _cmp(os.path.join("subdir2", "rfile5"))
1706         d.addCallback(_check_cp_r_out)
1707
1708         # and copy it a second time, which ought to overwrite the same files
1709         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1710
1711         # and again, only writing filecaps
1712         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1713         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1714         def _check_capsonly((out,err)):
1715             # these should all be LITs
1716             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1717             y = uri.from_string_filenode(x)
1718             self.failUnlessEqual(y.data, "rfile4")
1719         d.addCallback(_check_capsonly)
1720
1721         # and tahoe-to-tahoe
1722         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1723         d.addCallback(run, "ls")
1724         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1725         d.addCallback(run, "ls", "dir1-copy")
1726         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1727                       ["rfile4", "rfile5"])
1728         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1729         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1730                       ["rfile1", "rfile2", "rfile3"])
1731         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1732         d.addCallback(_check_stdout_against, data="rfile4")
1733
1734         # and copy it a second time, which ought to overwrite the same files
1735         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1736
1737         # tahoe_ls doesn't currently handle the error correctly: it tries to
1738         # JSON-parse a traceback.
1739 ##         def _ls_missing(res):
1740 ##             argv = ["ls"] + nodeargs + ["bogus"]
1741 ##             return self._run_cli(argv)
1742 ##         d.addCallback(_ls_missing)
1743 ##         def _check_ls_missing((out,err)):
1744 ##             print "OUT", out
1745 ##             print "ERR", err
1746 ##             self.failUnlessEqual(err, "")
1747 ##         d.addCallback(_check_ls_missing)
1748
1749         return d
1750
1751     def _run_cli(self, argv, stdin=""):
1752         #print "CLI:", argv
1753         stdout, stderr = StringIO(), StringIO()
1754         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1755                                   stdin=StringIO(stdin),
1756                                   stdout=stdout, stderr=stderr)
1757         def _done(res):
1758             return stdout.getvalue(), stderr.getvalue()
1759         d.addCallback(_done)
1760         return d
1761
1762     def _test_checker(self, res):
1763         ut = upload.Data("too big to be literal" * 200, convergence=None)
1764         d = self._personal_node.add_file(u"big file", ut)
1765
1766         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1767         def _check_dirnode_results(r):
1768             self.failUnless(r.is_healthy())
1769         d.addCallback(_check_dirnode_results)
1770         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1771         d.addCallback(_check_dirnode_results)
1772
1773         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1774         def _got_chk_filenode(n):
1775             self.failUnless(isinstance(n, filenode.FileNode))
1776             d = n.check(Monitor())
1777             def _check_filenode_results(r):
1778                 self.failUnless(r.is_healthy())
1779             d.addCallback(_check_filenode_results)
1780             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1781             d.addCallback(_check_filenode_results)
1782             return d
1783         d.addCallback(_got_chk_filenode)
1784
1785         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1786         def _got_lit_filenode(n):
1787             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1788             d = n.check(Monitor())
1789             def _check_lit_filenode_results(r):
1790                 self.failUnlessEqual(r, None)
1791             d.addCallback(_check_lit_filenode_results)
1792             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1793             d.addCallback(_check_lit_filenode_results)
1794             return d
1795         d.addCallback(_got_lit_filenode)
1796         return d
1797