]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
switch all foolscap imports to use foolscap.api or foolscap.logging
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, re, simplejson
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri
12 from allmydata.storage.mutable import MutableShareFile
13 from allmydata.storage.server import si_a2b
14 from allmydata.immutable import download, filenode, offloaded, upload
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.scripts import runner
18 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
19      NoSuchChildError, NotEnoughSharesError
20 from allmydata.monitor import Monitor
21 from allmydata.mutable.common import NotMutableError
22 from allmydata.mutable import layout as mutable_layout
23 from foolscap.api import DeadReferenceError
24 from twisted.python.failure import Failure
25 from twisted.web.client import getPage
26 from twisted.web.error import Error
27
28 from allmydata.test.common import SystemTestMixin, MemoryConsumer, \
29      download_to_data
30
31 LARGE_DATA = """
32 This is some data to publish to the virtual drive, which needs to be large
33 enough to not fit inside a LIT uri.
34 """
35
36 class CountingDataUploadable(upload.Data):
37     bytes_read = 0
38     interrupt_after = None
39     interrupt_after_d = None
40
41     def read(self, length):
42         self.bytes_read += length
43         if self.interrupt_after is not None:
44             if self.bytes_read > self.interrupt_after:
45                 self.interrupt_after = None
46                 self.interrupt_after_d.callback(self)
47         return upload.Data.read(self, length)
48
49 class GrabEverythingConsumer:
50     implements(IConsumer)
51
52     def __init__(self):
53         self.contents = ""
54
55     def registerProducer(self, producer, streaming):
56         assert streaming
57         assert IPushProducer.providedBy(producer)
58
59     def write(self, data):
60         self.contents += data
61
62     def unregisterProducer(self):
63         pass
64
65 class SystemTest(SystemTestMixin, unittest.TestCase):
66
67     def test_connections(self):
68         self.basedir = "system/SystemTest/test_connections"
69         d = self.set_up_nodes()
70         self.extra_node = None
71         d.addCallback(lambda res: self.add_extra_node(self.numclients))
72         def _check(extra_node):
73             self.extra_node = extra_node
74             for c in self.clients:
75                 all_peerids = list(c.get_all_peerids())
76                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
77                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
78                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
79
80         d.addCallback(_check)
81         def _shutdown_extra_node(res):
82             if self.extra_node:
83                 return self.extra_node.stopService()
84             return res
85         d.addBoth(_shutdown_extra_node)
86         return d
87     test_connections.timeout = 300
88     # test_connections is subsumed by test_upload_and_download, and takes
89     # quite a while to run on a slow machine (because of all the TLS
90     # connections that must be established). If we ever rework the introducer
91     # code to such an extent that we're not sure if it works anymore, we can
92     # reinstate this test until it does.
93     del test_connections
94
95     def test_upload_and_download_random_key(self):
96         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
97         return self._test_upload_and_download(convergence=None)
98     test_upload_and_download_random_key.timeout = 4800
99
100     def test_upload_and_download_convergent(self):
101         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
102         return self._test_upload_and_download(convergence="some convergence string")
103     test_upload_and_download_convergent.timeout = 4800
104
105     def _test_upload_and_download(self, convergence):
106         # we use 4000 bytes of data, which will result in about 400k written
107         # to disk among all our simulated nodes
108         DATA = "Some data to upload\n" * 200
109         d = self.set_up_nodes()
110         def _check_connections(res):
111             for c in self.clients:
112                 all_peerids = list(c.get_all_peerids())
113                 self.failUnlessEqual(len(all_peerids), self.numclients)
114                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
115                 self.failUnlessEqual(len(permuted_peers), self.numclients)
116         d.addCallback(_check_connections)
117
118         def _do_upload(res):
119             log.msg("UPLOADING")
120             u = self.clients[0].getServiceNamed("uploader")
121             self.uploader = u
122             # we crank the max segsize down to 1024b for the duration of this
123             # test, so we can exercise multiple segments. It is important
124             # that this is not a multiple of the segment size, so that the
125             # tail segment is not the same length as the others. This actualy
126             # gets rounded up to 1025 to be a multiple of the number of
127             # required shares (since we use 25 out of 100 FEC).
128             up = upload.Data(DATA, convergence=convergence)
129             up.max_segment_size = 1024
130             d1 = u.upload(up)
131             return d1
132         d.addCallback(_do_upload)
133         def _upload_done(results):
134             theuri = results.uri
135             log.msg("upload finished: uri is %s" % (theuri,))
136             self.uri = theuri
137             assert isinstance(self.uri, str), self.uri
138             dl = self.clients[1].getServiceNamed("downloader")
139             self.downloader = dl
140         d.addCallback(_upload_done)
141
142         def _upload_again(res):
143             # Upload again. If using convergent encryption then this ought to be
144             # short-circuited, however with the way we currently generate URIs
145             # (i.e. because they include the roothash), we have to do all of the
146             # encoding work, and only get to save on the upload part.
147             log.msg("UPLOADING AGAIN")
148             up = upload.Data(DATA, convergence=convergence)
149             up.max_segment_size = 1024
150             d1 = self.uploader.upload(up)
151         d.addCallback(_upload_again)
152
153         def _download_to_data(res):
154             log.msg("DOWNLOADING")
155             return self.downloader.download_to_data(self.uri)
156         d.addCallback(_download_to_data)
157         def _download_to_data_done(data):
158             log.msg("download finished")
159             self.failUnlessEqual(data, DATA)
160         d.addCallback(_download_to_data_done)
161
162         target_filename = os.path.join(self.basedir, "download.target")
163         def _download_to_filename(res):
164             return self.downloader.download_to_filename(self.uri,
165                                                         target_filename)
166         d.addCallback(_download_to_filename)
167         def _download_to_filename_done(res):
168             newdata = open(target_filename, "rb").read()
169             self.failUnlessEqual(newdata, DATA)
170         d.addCallback(_download_to_filename_done)
171
172         target_filename2 = os.path.join(self.basedir, "download.target2")
173         def _download_to_filehandle(res):
174             fh = open(target_filename2, "wb")
175             return self.downloader.download_to_filehandle(self.uri, fh)
176         d.addCallback(_download_to_filehandle)
177         def _download_to_filehandle_done(fh):
178             fh.close()
179             newdata = open(target_filename2, "rb").read()
180             self.failUnlessEqual(newdata, DATA)
181         d.addCallback(_download_to_filehandle_done)
182
183         consumer = GrabEverythingConsumer()
184         ct = download.ConsumerAdapter(consumer)
185         d.addCallback(lambda res:
186                       self.downloader.download(self.uri, ct))
187         def _download_to_consumer_done(ign):
188             self.failUnlessEqual(consumer.contents, DATA)
189         d.addCallback(_download_to_consumer_done)
190
191         def _test_read(res):
192             n = self.clients[1].create_node_from_uri(self.uri)
193             d = download_to_data(n)
194             def _read_done(data):
195                 self.failUnlessEqual(data, DATA)
196             d.addCallback(_read_done)
197             d.addCallback(lambda ign:
198                           n.read(MemoryConsumer(), offset=1, size=4))
199             def _read_portion_done(mc):
200                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
201             d.addCallback(_read_portion_done)
202             d.addCallback(lambda ign:
203                           n.read(MemoryConsumer(), offset=2, size=None))
204             def _read_tail_done(mc):
205                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
206             d.addCallback(_read_tail_done)
207             d.addCallback(lambda ign:
208                           n.read(MemoryConsumer(), size=len(DATA)+1000))
209             def _read_too_much(mc):
210                 self.failUnlessEqual("".join(mc.chunks), DATA)
211             d.addCallback(_read_too_much)
212
213             return d
214         d.addCallback(_test_read)
215
216         def _test_bad_read(res):
217             bad_u = uri.from_string_filenode(self.uri)
218             bad_u.key = self.flip_bit(bad_u.key)
219             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
220             # this should cause an error during download
221
222             d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
223                                  None,
224                                  bad_n.read, MemoryConsumer(), offset=2)
225             return d
226         d.addCallback(_test_bad_read)
227
228         def _download_nonexistent_uri(res):
229             baduri = self.mangle_uri(self.uri)
230             log.msg("about to download non-existent URI", level=log.UNUSUAL,
231                     facility="tahoe.tests")
232             d1 = self.downloader.download_to_data(baduri)
233             def _baduri_should_fail(res):
234                 log.msg("finished downloading non-existend URI",
235                         level=log.UNUSUAL, facility="tahoe.tests")
236                 self.failUnless(isinstance(res, Failure))
237                 self.failUnless(res.check(NotEnoughSharesError),
238                                 "expected NotEnoughSharesError, got %s" % res)
239                 # TODO: files that have zero peers should get a special kind
240                 # of NotEnoughSharesError, which can be used to suggest that
241                 # the URI might be wrong or that they've never uploaded the
242                 # file in the first place.
243             d1.addBoth(_baduri_should_fail)
244             return d1
245         d.addCallback(_download_nonexistent_uri)
246
247         # add a new node, which doesn't accept shares, and only uses the
248         # helper for upload.
249         d.addCallback(lambda res: self.add_extra_node(self.numclients,
250                                                       self.helper_furl,
251                                                       add_to_sparent=True))
252         def _added(extra_node):
253             self.extra_node = extra_node
254         d.addCallback(_added)
255
256         HELPER_DATA = "Data that needs help to upload" * 1000
257         def _upload_with_helper(res):
258             u = upload.Data(HELPER_DATA, convergence=convergence)
259             d = self.extra_node.upload(u)
260             def _uploaded(results):
261                 uri = results.uri
262                 return self.downloader.download_to_data(uri)
263             d.addCallback(_uploaded)
264             def _check(newdata):
265                 self.failUnlessEqual(newdata, HELPER_DATA)
266             d.addCallback(_check)
267             return d
268         d.addCallback(_upload_with_helper)
269
270         def _upload_duplicate_with_helper(res):
271             u = upload.Data(HELPER_DATA, convergence=convergence)
272             u.debug_stash_RemoteEncryptedUploadable = True
273             d = self.extra_node.upload(u)
274             def _uploaded(results):
275                 uri = results.uri
276                 return self.downloader.download_to_data(uri)
277             d.addCallback(_uploaded)
278             def _check(newdata):
279                 self.failUnlessEqual(newdata, HELPER_DATA)
280                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
281                             "uploadable started uploading, should have been avoided")
282             d.addCallback(_check)
283             return d
284         if convergence is not None:
285             d.addCallback(_upload_duplicate_with_helper)
286
287         def _upload_resumable(res):
288             DATA = "Data that needs help to upload and gets interrupted" * 1000
289             u1 = CountingDataUploadable(DATA, convergence=convergence)
290             u2 = CountingDataUploadable(DATA, convergence=convergence)
291
292             # we interrupt the connection after about 5kB by shutting down
293             # the helper, then restartingit.
294             u1.interrupt_after = 5000
295             u1.interrupt_after_d = defer.Deferred()
296             u1.interrupt_after_d.addCallback(lambda res:
297                                              self.bounce_client(0))
298
299             # sneak into the helper and reduce its chunk size, so that our
300             # debug_interrupt will sever the connection on about the fifth
301             # chunk fetched. This makes sure that we've started to write the
302             # new shares before we abandon them, which exercises the
303             # abort/delete-partial-share code. TODO: find a cleaner way to do
304             # this. I know that this will affect later uses of the helper in
305             # this same test run, but I'm not currently worried about it.
306             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
307
308             d = self.extra_node.upload(u1)
309
310             def _should_not_finish(res):
311                 self.fail("interrupted upload should have failed, not finished"
312                           " with result %s" % (res,))
313             def _interrupted(f):
314                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
315
316                 # make sure we actually interrupted it before finishing the
317                 # file
318                 self.failUnless(u1.bytes_read < len(DATA),
319                                 "read %d out of %d total" % (u1.bytes_read,
320                                                              len(DATA)))
321
322                 log.msg("waiting for reconnect", level=log.NOISY,
323                         facility="tahoe.test.test_system")
324                 # now, we need to give the nodes a chance to notice that this
325                 # connection has gone away. When this happens, the storage
326                 # servers will be told to abort their uploads, removing the
327                 # partial shares. Unfortunately this involves TCP messages
328                 # going through the loopback interface, and we can't easily
329                 # predict how long that will take. If it were all local, we
330                 # could use fireEventually() to stall. Since we don't have
331                 # the right introduction hooks, the best we can do is use a
332                 # fixed delay. TODO: this is fragile.
333                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
334                 return u1.interrupt_after_d
335             d.addCallbacks(_should_not_finish, _interrupted)
336
337             def _disconnected(res):
338                 # check to make sure the storage servers aren't still hanging
339                 # on to the partial share: their incoming/ directories should
340                 # now be empty.
341                 log.msg("disconnected", level=log.NOISY,
342                         facility="tahoe.test.test_system")
343                 for i in range(self.numclients):
344                     incdir = os.path.join(self.getdir("client%d" % i),
345                                           "storage", "shares", "incoming")
346                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
347             d.addCallback(_disconnected)
348
349             # then we need to give the reconnector a chance to
350             # reestablish the connection to the helper.
351             d.addCallback(lambda res:
352                           log.msg("wait_for_connections", level=log.NOISY,
353                                   facility="tahoe.test.test_system"))
354             d.addCallback(lambda res: self.wait_for_connections())
355
356
357             d.addCallback(lambda res:
358                           log.msg("uploading again", level=log.NOISY,
359                                   facility="tahoe.test.test_system"))
360             d.addCallback(lambda res: self.extra_node.upload(u2))
361
362             def _uploaded(results):
363                 uri = results.uri
364                 log.msg("Second upload complete", level=log.NOISY,
365                         facility="tahoe.test.test_system")
366
367                 # this is really bytes received rather than sent, but it's
368                 # convenient and basically measures the same thing
369                 bytes_sent = results.ciphertext_fetched
370
371                 # We currently don't support resumption of upload if the data is
372                 # encrypted with a random key.  (Because that would require us
373                 # to store the key locally and re-use it on the next upload of
374                 # this file, which isn't a bad thing to do, but we currently
375                 # don't do it.)
376                 if convergence is not None:
377                     # Make sure we did not have to read the whole file the
378                     # second time around .
379                     self.failUnless(bytes_sent < len(DATA),
380                                 "resumption didn't save us any work:"
381                                 " read %d bytes out of %d total" %
382                                 (bytes_sent, len(DATA)))
383                 else:
384                     # Make sure we did have to read the whole file the second
385                     # time around -- because the one that we partially uploaded
386                     # earlier was encrypted with a different random key.
387                     self.failIf(bytes_sent < len(DATA),
388                                 "resumption saved us some work even though we were using random keys:"
389                                 " read %d bytes out of %d total" %
390                                 (bytes_sent, len(DATA)))
391                 return self.downloader.download_to_data(uri)
392             d.addCallback(_uploaded)
393
394             def _check(newdata):
395                 self.failUnlessEqual(newdata, DATA)
396                 # If using convergent encryption, then also check that the
397                 # helper has removed the temp file from its directories.
398                 if convergence is not None:
399                     basedir = os.path.join(self.getdir("client0"), "helper")
400                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
401                     self.failUnlessEqual(files, [])
402                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
403                     self.failUnlessEqual(files, [])
404             d.addCallback(_check)
405             return d
406         d.addCallback(_upload_resumable)
407
408         def _grab_stats(ignored):
409             # the StatsProvider doesn't normally publish a FURL:
410             # instead it passes a live reference to the StatsGatherer
411             # (if and when it connects). To exercise the remote stats
412             # interface, we manually publish client0's StatsProvider
413             # and use client1 to query it.
414             sp = self.clients[0].stats_provider
415             sp_furl = self.clients[0].tub.registerReference(sp)
416             d = self.clients[1].tub.getReference(sp_furl)
417             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
418             def _got_stats(stats):
419                 #print "STATS"
420                 #from pprint import pprint
421                 #pprint(stats)
422                 s = stats["stats"]
423                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
424                 c = stats["counters"]
425                 self.failUnless("storage_server.allocate" in c)
426             d.addCallback(_got_stats)
427             return d
428         d.addCallback(_grab_stats)
429
430         return d
431
432     def _find_shares(self, basedir):
433         shares = []
434         for (dirpath, dirnames, filenames) in os.walk(basedir):
435             if "storage" not in dirpath:
436                 continue
437             if not filenames:
438                 continue
439             pieces = dirpath.split(os.sep)
440             if (len(pieces) >= 5
441                 and pieces[-4] == "storage"
442                 and pieces[-3] == "shares"):
443                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
444                 # are sharefiles here
445                 assert pieces[-5].startswith("client")
446                 client_num = int(pieces[-5][-1])
447                 storage_index_s = pieces[-1]
448                 storage_index = si_a2b(storage_index_s)
449                 for sharename in filenames:
450                     shnum = int(sharename)
451                     filename = os.path.join(dirpath, sharename)
452                     data = (client_num, storage_index, filename, shnum)
453                     shares.append(data)
454         if not shares:
455             self.fail("unable to find any share files in %s" % basedir)
456         return shares
457
458     def _corrupt_mutable_share(self, filename, which):
459         msf = MutableShareFile(filename)
460         datav = msf.readv([ (0, 1000000) ])
461         final_share = datav[0]
462         assert len(final_share) < 1000000 # ought to be truncated
463         pieces = mutable_layout.unpack_share(final_share)
464         (seqnum, root_hash, IV, k, N, segsize, datalen,
465          verification_key, signature, share_hash_chain, block_hash_tree,
466          share_data, enc_privkey) = pieces
467
468         if which == "seqnum":
469             seqnum = seqnum + 15
470         elif which == "R":
471             root_hash = self.flip_bit(root_hash)
472         elif which == "IV":
473             IV = self.flip_bit(IV)
474         elif which == "segsize":
475             segsize = segsize + 15
476         elif which == "pubkey":
477             verification_key = self.flip_bit(verification_key)
478         elif which == "signature":
479             signature = self.flip_bit(signature)
480         elif which == "share_hash_chain":
481             nodenum = share_hash_chain.keys()[0]
482             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
483         elif which == "block_hash_tree":
484             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
485         elif which == "share_data":
486             share_data = self.flip_bit(share_data)
487         elif which == "encprivkey":
488             enc_privkey = self.flip_bit(enc_privkey)
489
490         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
491                                             segsize, datalen)
492         final_share = mutable_layout.pack_share(prefix,
493                                                 verification_key,
494                                                 signature,
495                                                 share_hash_chain,
496                                                 block_hash_tree,
497                                                 share_data,
498                                                 enc_privkey)
499         msf.writev( [(0, final_share)], None)
500
501
502     def test_mutable(self):
503         self.basedir = "system/SystemTest/test_mutable"
504         DATA = "initial contents go here."  # 25 bytes % 3 != 0
505         NEWDATA = "new contents yay"
506         NEWERDATA = "this is getting old"
507
508         d = self.set_up_nodes(use_key_generator=True)
509
510         def _create_mutable(res):
511             c = self.clients[0]
512             log.msg("starting create_mutable_file")
513             d1 = c.create_mutable_file(DATA)
514             def _done(res):
515                 log.msg("DONE: %s" % (res,))
516                 self._mutable_node_1 = res
517                 uri = res.get_uri()
518             d1.addCallback(_done)
519             return d1
520         d.addCallback(_create_mutable)
521
522         def _test_debug(res):
523             # find a share. It is important to run this while there is only
524             # one slot in the grid.
525             shares = self._find_shares(self.basedir)
526             (client_num, storage_index, filename, shnum) = shares[0]
527             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
528                     % filename)
529             log.msg(" for clients[%d]" % client_num)
530
531             out,err = StringIO(), StringIO()
532             rc = runner.runner(["debug", "dump-share", "--offsets",
533                                 filename],
534                                stdout=out, stderr=err)
535             output = out.getvalue()
536             self.failUnlessEqual(rc, 0)
537             try:
538                 self.failUnless("Mutable slot found:\n" in output)
539                 self.failUnless("share_type: SDMF\n" in output)
540                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
541                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
542                 self.failUnless(" num_extra_leases: 0\n" in output)
543                 # the pubkey size can vary by a byte, so the container might
544                 # be a bit larger on some runs.
545                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
546                 self.failUnless(m)
547                 container_size = int(m.group(1))
548                 self.failUnless(2037 <= container_size <= 2049, container_size)
549                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
550                 self.failUnless(m)
551                 data_length = int(m.group(1))
552                 self.failUnless(2037 <= data_length <= 2049, data_length)
553                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
554                                 in output)
555                 self.failUnless(" SDMF contents:\n" in output)
556                 self.failUnless("  seqnum: 1\n" in output)
557                 self.failUnless("  required_shares: 3\n" in output)
558                 self.failUnless("  total_shares: 10\n" in output)
559                 self.failUnless("  segsize: 27\n" in output, (output, filename))
560                 self.failUnless("  datalen: 25\n" in output)
561                 # the exact share_hash_chain nodes depends upon the sharenum,
562                 # and is more of a hassle to compute than I want to deal with
563                 # now
564                 self.failUnless("  share_hash_chain: " in output)
565                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
566                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
567                             base32.b2a(storage_index))
568                 self.failUnless(expected in output)
569             except unittest.FailTest:
570                 print
571                 print "dump-share output was:"
572                 print output
573                 raise
574         d.addCallback(_test_debug)
575
576         # test retrieval
577
578         # first, let's see if we can use the existing node to retrieve the
579         # contents. This allows it to use the cached pubkey and maybe the
580         # latest-known sharemap.
581
582         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
583         def _check_download_1(res):
584             self.failUnlessEqual(res, DATA)
585             # now we see if we can retrieve the data from a new node,
586             # constructed using the URI of the original one. We do this test
587             # on the same client that uploaded the data.
588             uri = self._mutable_node_1.get_uri()
589             log.msg("starting retrieve1")
590             newnode = self.clients[0].create_node_from_uri(uri)
591             newnode_2 = self.clients[0].create_node_from_uri(uri)
592             self.failUnlessIdentical(newnode, newnode_2)
593             return newnode.download_best_version()
594         d.addCallback(_check_download_1)
595
596         def _check_download_2(res):
597             self.failUnlessEqual(res, DATA)
598             # same thing, but with a different client
599             uri = self._mutable_node_1.get_uri()
600             newnode = self.clients[1].create_node_from_uri(uri)
601             log.msg("starting retrieve2")
602             d1 = newnode.download_best_version()
603             d1.addCallback(lambda res: (res, newnode))
604             return d1
605         d.addCallback(_check_download_2)
606
607         def _check_download_3((res, newnode)):
608             self.failUnlessEqual(res, DATA)
609             # replace the data
610             log.msg("starting replace1")
611             d1 = newnode.overwrite(NEWDATA)
612             d1.addCallback(lambda res: newnode.download_best_version())
613             return d1
614         d.addCallback(_check_download_3)
615
616         def _check_download_4(res):
617             self.failUnlessEqual(res, NEWDATA)
618             # now create an even newer node and replace the data on it. This
619             # new node has never been used for download before.
620             uri = self._mutable_node_1.get_uri()
621             newnode1 = self.clients[2].create_node_from_uri(uri)
622             newnode2 = self.clients[3].create_node_from_uri(uri)
623             self._newnode3 = self.clients[3].create_node_from_uri(uri)
624             log.msg("starting replace2")
625             d1 = newnode1.overwrite(NEWERDATA)
626             d1.addCallback(lambda res: newnode2.download_best_version())
627             return d1
628         d.addCallback(_check_download_4)
629
630         def _check_download_5(res):
631             log.msg("finished replace2")
632             self.failUnlessEqual(res, NEWERDATA)
633         d.addCallback(_check_download_5)
634
635         def _corrupt_shares(res):
636             # run around and flip bits in all but k of the shares, to test
637             # the hash checks
638             shares = self._find_shares(self.basedir)
639             ## sort by share number
640             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
641             where = dict([ (shnum, filename)
642                            for (client_num, storage_index, filename, shnum)
643                            in shares ])
644             assert len(where) == 10 # this test is designed for 3-of-10
645             for shnum, filename in where.items():
646                 # shares 7,8,9 are left alone. read will check
647                 # (share_hash_chain, block_hash_tree, share_data). New
648                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
649                 # segsize, signature).
650                 if shnum == 0:
651                     # read: this will trigger "pubkey doesn't match
652                     # fingerprint".
653                     self._corrupt_mutable_share(filename, "pubkey")
654                     self._corrupt_mutable_share(filename, "encprivkey")
655                 elif shnum == 1:
656                     # triggers "signature is invalid"
657                     self._corrupt_mutable_share(filename, "seqnum")
658                 elif shnum == 2:
659                     # triggers "signature is invalid"
660                     self._corrupt_mutable_share(filename, "R")
661                 elif shnum == 3:
662                     # triggers "signature is invalid"
663                     self._corrupt_mutable_share(filename, "segsize")
664                 elif shnum == 4:
665                     self._corrupt_mutable_share(filename, "share_hash_chain")
666                 elif shnum == 5:
667                     self._corrupt_mutable_share(filename, "block_hash_tree")
668                 elif shnum == 6:
669                     self._corrupt_mutable_share(filename, "share_data")
670                 # other things to correct: IV, signature
671                 # 7,8,9 are left alone
672
673                 # note that initial_query_count=5 means that we'll hit the
674                 # first 5 servers in effectively random order (based upon
675                 # response time), so we won't necessarily ever get a "pubkey
676                 # doesn't match fingerprint" error (if we hit shnum>=1 before
677                 # shnum=0, we pull the pubkey from there). To get repeatable
678                 # specific failures, we need to set initial_query_count=1,
679                 # but of course that will change the sequencing behavior of
680                 # the retrieval process. TODO: find a reasonable way to make
681                 # this a parameter, probably when we expand this test to test
682                 # for one failure mode at a time.
683
684                 # when we retrieve this, we should get three signature
685                 # failures (where we've mangled seqnum, R, and segsize). The
686                 # pubkey mangling
687         d.addCallback(_corrupt_shares)
688
689         d.addCallback(lambda res: self._newnode3.download_best_version())
690         d.addCallback(_check_download_5)
691
692         def _check_empty_file(res):
693             # make sure we can create empty files, this usually screws up the
694             # segsize math
695             d1 = self.clients[2].create_mutable_file("")
696             d1.addCallback(lambda newnode: newnode.download_best_version())
697             d1.addCallback(lambda res: self.failUnlessEqual("", res))
698             return d1
699         d.addCallback(_check_empty_file)
700
701         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
702         def _created_dirnode(dnode):
703             log.msg("_created_dirnode(%s)" % (dnode,))
704             d1 = dnode.list()
705             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
706             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
707             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
708             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
709             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
710             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
711             d1.addCallback(lambda res: dnode.build_manifest().when_done())
712             d1.addCallback(lambda res:
713                            self.failUnlessEqual(len(res["manifest"]), 1))
714             return d1
715         d.addCallback(_created_dirnode)
716
717         def wait_for_c3_kg_conn():
718             return self.clients[3]._key_generator is not None
719         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
720
721         def check_kg_poolsize(junk, size_delta):
722             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
723                                  self.key_generator_svc.key_generator.pool_size + size_delta)
724
725         d.addCallback(check_kg_poolsize, 0)
726         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
727         d.addCallback(check_kg_poolsize, -1)
728         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
729         d.addCallback(check_kg_poolsize, -2)
730         # use_helper induces use of clients[3], which is the using-key_gen client
731         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
732         d.addCallback(check_kg_poolsize, -3)
733
734         return d
735     # The default 120 second timeout went off when running it under valgrind
736     # on my old Windows laptop, so I'm bumping up the timeout.
737     test_mutable.timeout = 240
738
739     def flip_bit(self, good):
740         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
741
742     def mangle_uri(self, gooduri):
743         # change the key, which changes the storage index, which means we'll
744         # be asking about the wrong file, so nobody will have any shares
745         u = IFileURI(gooduri)
746         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
747                             uri_extension_hash=u.uri_extension_hash,
748                             needed_shares=u.needed_shares,
749                             total_shares=u.total_shares,
750                             size=u.size)
751         return u2.to_string()
752
753     # TODO: add a test which mangles the uri_extension_hash instead, and
754     # should fail due to not being able to get a valid uri_extension block.
755     # Also a test which sneakily mangles the uri_extension block to change
756     # some of the validation data, so it will fail in the post-download phase
757     # when the file's crypttext integrity check fails. Do the same thing for
758     # the key, which should cause the download to fail the post-download
759     # plaintext_hash check.
760
761     def test_vdrive(self):
762         self.basedir = "system/SystemTest/test_vdrive"
763         self.data = LARGE_DATA
764         d = self.set_up_nodes(use_stats_gatherer=True)
765         d.addCallback(self._test_introweb)
766         d.addCallback(self.log, "starting publish")
767         d.addCallback(self._do_publish1)
768         d.addCallback(self._test_runner)
769         d.addCallback(self._do_publish2)
770         # at this point, we have the following filesystem (where "R" denotes
771         # self._root_directory_uri):
772         # R
773         # R/subdir1
774         # R/subdir1/mydata567
775         # R/subdir1/subdir2/
776         # R/subdir1/subdir2/mydata992
777
778         d.addCallback(lambda res: self.bounce_client(0))
779         d.addCallback(self.log, "bounced client0")
780
781         d.addCallback(self._check_publish1)
782         d.addCallback(self.log, "did _check_publish1")
783         d.addCallback(self._check_publish2)
784         d.addCallback(self.log, "did _check_publish2")
785         d.addCallback(self._do_publish_private)
786         d.addCallback(self.log, "did _do_publish_private")
787         # now we also have (where "P" denotes a new dir):
788         #  P/personal/sekrit data
789         #  P/s2-rw -> /subdir1/subdir2/
790         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
791         d.addCallback(self._check_publish_private)
792         d.addCallback(self.log, "did _check_publish_private")
793         d.addCallback(self._test_web)
794         d.addCallback(self._test_control)
795         d.addCallback(self._test_cli)
796         # P now has four top-level children:
797         # P/personal/sekrit data
798         # P/s2-ro/
799         # P/s2-rw/
800         # P/test_put/  (empty)
801         d.addCallback(self._test_checker)
802         return d
803     test_vdrive.timeout = 1100
804
805     def _test_introweb(self, res):
806         d = getPage(self.introweb_url, method="GET", followRedirect=True)
807         def _check(res):
808             try:
809                 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
810                                 in res)
811                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
812                 self.failUnless("Subscription Summary: storage: 5" in res)
813             except unittest.FailTest:
814                 print
815                 print "GET %s output was:" % self.introweb_url
816                 print res
817                 raise
818         d.addCallback(_check)
819         d.addCallback(lambda res:
820                       getPage(self.introweb_url + "?t=json",
821                               method="GET", followRedirect=True))
822         def _check_json(res):
823             data = simplejson.loads(res)
824             try:
825                 self.failUnlessEqual(data["subscription_summary"],
826                                      {"storage": 5})
827                 self.failUnlessEqual(data["announcement_summary"],
828                                      {"storage": 5, "stub_client": 5})
829                 self.failUnlessEqual(data["announcement_distinct_hosts"],
830                                      {"storage": 1, "stub_client": 1})
831             except unittest.FailTest:
832                 print
833                 print "GET %s?t=json output was:" % self.introweb_url
834                 print res
835                 raise
836         d.addCallback(_check_json)
837         return d
838
839     def _do_publish1(self, res):
840         ut = upload.Data(self.data, convergence=None)
841         c0 = self.clients[0]
842         d = c0.create_empty_dirnode()
843         def _made_root(new_dirnode):
844             self._root_directory_uri = new_dirnode.get_uri()
845             return c0.create_node_from_uri(self._root_directory_uri)
846         d.addCallback(_made_root)
847         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
848         def _made_subdir1(subdir1_node):
849             self._subdir1_node = subdir1_node
850             d1 = subdir1_node.add_file(u"mydata567", ut)
851             d1.addCallback(self.log, "publish finished")
852             def _stash_uri(filenode):
853                 self.uri = filenode.get_uri()
854                 assert isinstance(self.uri, str), (self.uri, filenode)
855             d1.addCallback(_stash_uri)
856             return d1
857         d.addCallback(_made_subdir1)
858         return d
859
860     def _do_publish2(self, res):
861         ut = upload.Data(self.data, convergence=None)
862         d = self._subdir1_node.create_empty_directory(u"subdir2")
863         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
864         return d
865
866     def log(self, res, *args, **kwargs):
867         # print "MSG: %s  RES: %s" % (msg, args)
868         log.msg(*args, **kwargs)
869         return res
870
871     def _do_publish_private(self, res):
872         self.smalldata = "sssh, very secret stuff"
873         ut = upload.Data(self.smalldata, convergence=None)
874         d = self.clients[0].create_empty_dirnode()
875         d.addCallback(self.log, "GOT private directory")
876         def _got_new_dir(privnode):
877             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
878             d1 = privnode.create_empty_directory(u"personal")
879             d1.addCallback(self.log, "made P/personal")
880             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
881             d1.addCallback(self.log, "made P/personal/sekrit data")
882             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
883             def _got_s2(s2node):
884                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
885                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
886                 return d2
887             d1.addCallback(_got_s2)
888             d1.addCallback(lambda res: privnode)
889             return d1
890         d.addCallback(_got_new_dir)
891         return d
892
893     def _check_publish1(self, res):
894         # this one uses the iterative API
895         c1 = self.clients[1]
896         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
897         d.addCallback(self.log, "check_publish1 got /")
898         d.addCallback(lambda root: root.get(u"subdir1"))
899         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
900         d.addCallback(lambda filenode: filenode.download_to_data())
901         d.addCallback(self.log, "get finished")
902         def _get_done(data):
903             self.failUnlessEqual(data, self.data)
904         d.addCallback(_get_done)
905         return d
906
907     def _check_publish2(self, res):
908         # this one uses the path-based API
909         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
910         d = rootnode.get_child_at_path(u"subdir1")
911         d.addCallback(lambda dirnode:
912                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
913         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
914         d.addCallback(lambda filenode: filenode.download_to_data())
915         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
916
917         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
918         def _got_filenode(filenode):
919             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
920             assert fnode == filenode
921         d.addCallback(_got_filenode)
922         return d
923
924     def _check_publish_private(self, resnode):
925         # this one uses the path-based API
926         self._private_node = resnode
927
928         d = self._private_node.get_child_at_path(u"personal")
929         def _got_personal(personal):
930             self._personal_node = personal
931             return personal
932         d.addCallback(_got_personal)
933
934         d.addCallback(lambda dirnode:
935                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
936         def get_path(path):
937             return self._private_node.get_child_at_path(path)
938
939         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
940         d.addCallback(lambda filenode: filenode.download_to_data())
941         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
942         d.addCallback(lambda res: get_path(u"s2-rw"))
943         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
944         d.addCallback(lambda res: get_path(u"s2-ro"))
945         def _got_s2ro(dirnode):
946             self.failUnless(dirnode.is_mutable(), dirnode)
947             self.failUnless(dirnode.is_readonly(), dirnode)
948             d1 = defer.succeed(None)
949             d1.addCallback(lambda res: dirnode.list())
950             d1.addCallback(self.log, "dirnode.list")
951
952             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
953
954             d1.addCallback(self.log, "doing add_file(ro)")
955             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
956             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
957
958             d1.addCallback(self.log, "doing get(ro)")
959             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
960             d1.addCallback(lambda filenode:
961                            self.failUnless(IFileNode.providedBy(filenode)))
962
963             d1.addCallback(self.log, "doing delete(ro)")
964             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
965
966             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
967
968             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
969
970             personal = self._personal_node
971             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
972
973             d1.addCallback(self.log, "doing move_child_to(ro)2")
974             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
975
976             d1.addCallback(self.log, "finished with _got_s2ro")
977             return d1
978         d.addCallback(_got_s2ro)
979         def _got_home(dummy):
980             home = self._private_node
981             personal = self._personal_node
982             d1 = defer.succeed(None)
983             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
984             d1.addCallback(lambda res:
985                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
986
987             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
988             d1.addCallback(lambda res:
989                            home.move_child_to(u"sekrit", home, u"sekrit data"))
990
991             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
992             d1.addCallback(lambda res:
993                            home.move_child_to(u"sekrit data", personal))
994
995             d1.addCallback(lambda res: home.build_manifest().when_done())
996             d1.addCallback(self.log, "manifest")
997             #  five items:
998             # P/
999             # P/personal/
1000             # P/personal/sekrit data
1001             # P/s2-rw  (same as P/s2-ro)
1002             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1003             d1.addCallback(lambda res:
1004                            self.failUnlessEqual(len(res["manifest"]), 5))
1005             d1.addCallback(lambda res: home.start_deep_stats().when_done())
1006             def _check_stats(stats):
1007                 expected = {"count-immutable-files": 1,
1008                             "count-mutable-files": 0,
1009                             "count-literal-files": 1,
1010                             "count-files": 2,
1011                             "count-directories": 3,
1012                             "size-immutable-files": 112,
1013                             "size-literal-files": 23,
1014                             #"size-directories": 616, # varies
1015                             #"largest-directory": 616,
1016                             "largest-directory-children": 3,
1017                             "largest-immutable-file": 112,
1018                             }
1019                 for k,v in expected.iteritems():
1020                     self.failUnlessEqual(stats[k], v,
1021                                          "stats[%s] was %s, not %s" %
1022                                          (k, stats[k], v))
1023                 self.failUnless(stats["size-directories"] > 1300,
1024                                 stats["size-directories"])
1025                 self.failUnless(stats["largest-directory"] > 800,
1026                                 stats["largest-directory"])
1027                 self.failUnlessEqual(stats["size-files-histogram"],
1028                                      [ (11, 31, 1), (101, 316, 1) ])
1029             d1.addCallback(_check_stats)
1030             return d1
1031         d.addCallback(_got_home)
1032         return d
1033
1034     def shouldFail(self, res, expected_failure, which, substring=None):
1035         if isinstance(res, Failure):
1036             res.trap(expected_failure)
1037             if substring:
1038                 self.failUnless(substring in str(res),
1039                                 "substring '%s' not in '%s'"
1040                                 % (substring, str(res)))
1041         else:
1042             self.fail("%s was supposed to raise %s, not get '%s'" %
1043                       (which, expected_failure, res))
1044
1045     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1046         assert substring is None or isinstance(substring, str)
1047         d = defer.maybeDeferred(callable, *args, **kwargs)
1048         def done(res):
1049             if isinstance(res, Failure):
1050                 res.trap(expected_failure)
1051                 if substring:
1052                     self.failUnless(substring in str(res),
1053                                     "substring '%s' not in '%s'"
1054                                     % (substring, str(res)))
1055             else:
1056                 self.fail("%s was supposed to raise %s, not get '%s'" %
1057                           (which, expected_failure, res))
1058         d.addBoth(done)
1059         return d
1060
1061     def PUT(self, urlpath, data):
1062         url = self.webish_url + urlpath
1063         return getPage(url, method="PUT", postdata=data)
1064
1065     def GET(self, urlpath, followRedirect=False):
1066         url = self.webish_url + urlpath
1067         return getPage(url, method="GET", followRedirect=followRedirect)
1068
1069     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1070         if use_helper:
1071             url = self.helper_webish_url + urlpath
1072         else:
1073             url = self.webish_url + urlpath
1074         sepbase = "boogabooga"
1075         sep = "--" + sepbase
1076         form = []
1077         form.append(sep)
1078         form.append('Content-Disposition: form-data; name="_charset"')
1079         form.append('')
1080         form.append('UTF-8')
1081         form.append(sep)
1082         for name, value in fields.iteritems():
1083             if isinstance(value, tuple):
1084                 filename, value = value
1085                 form.append('Content-Disposition: form-data; name="%s"; '
1086                             'filename="%s"' % (name, filename.encode("utf-8")))
1087             else:
1088                 form.append('Content-Disposition: form-data; name="%s"' % name)
1089             form.append('')
1090             form.append(str(value))
1091             form.append(sep)
1092         form[-1] += "--"
1093         body = "\r\n".join(form) + "\r\n"
1094         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1095                    }
1096         return getPage(url, method="POST", postdata=body,
1097                        headers=headers, followRedirect=followRedirect)
1098
1099     def _test_web(self, res):
1100         base = self.webish_url
1101         public = "uri/" + self._root_directory_uri
1102         d = getPage(base)
1103         def _got_welcome(page):
1104             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1105             self.failUnless(expected in page,
1106                             "I didn't see the right 'connected storage servers'"
1107                             " message in: %s" % page
1108                             )
1109             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1110             self.failUnless(expected in page,
1111                             "I didn't see the right 'My nodeid' message "
1112                             "in: %s" % page)
1113             self.failUnless("Helper: 0 active uploads" in page)
1114         d.addCallback(_got_welcome)
1115         d.addCallback(self.log, "done with _got_welcome")
1116
1117         # get the welcome page from the node that uses the helper too
1118         d.addCallback(lambda res: getPage(self.helper_webish_url))
1119         def _got_welcome_helper(page):
1120             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1121                             page)
1122             self.failUnless("Not running helper" in page)
1123         d.addCallback(_got_welcome_helper)
1124
1125         d.addCallback(lambda res: getPage(base + public))
1126         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1127         def _got_subdir1(page):
1128             # there ought to be an href for our file
1129             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1130             self.failUnless(">mydata567</a>" in page)
1131         d.addCallback(_got_subdir1)
1132         d.addCallback(self.log, "done with _got_subdir1")
1133         d.addCallback(lambda res:
1134                       getPage(base + public + "/subdir1/mydata567"))
1135         def _got_data(page):
1136             self.failUnlessEqual(page, self.data)
1137         d.addCallback(_got_data)
1138
1139         # download from a URI embedded in a URL
1140         d.addCallback(self.log, "_get_from_uri")
1141         def _get_from_uri(res):
1142             return getPage(base + "uri/%s?filename=%s"
1143                            % (self.uri, "mydata567"))
1144         d.addCallback(_get_from_uri)
1145         def _got_from_uri(page):
1146             self.failUnlessEqual(page, self.data)
1147         d.addCallback(_got_from_uri)
1148
1149         # download from a URI embedded in a URL, second form
1150         d.addCallback(self.log, "_get_from_uri2")
1151         def _get_from_uri2(res):
1152             return getPage(base + "uri?uri=%s" % (self.uri,))
1153         d.addCallback(_get_from_uri2)
1154         d.addCallback(_got_from_uri)
1155
1156         # download from a bogus URI, make sure we get a reasonable error
1157         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1158         def _get_from_bogus_uri(res):
1159             d1 = getPage(base + "uri/%s?filename=%s"
1160                          % (self.mangle_uri(self.uri), "mydata567"))
1161             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1162                        "410")
1163             return d1
1164         d.addCallback(_get_from_bogus_uri)
1165         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1166
1167         # upload a file with PUT
1168         d.addCallback(self.log, "about to try PUT")
1169         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1170                                            "new.txt contents"))
1171         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1172         d.addCallback(self.failUnlessEqual, "new.txt contents")
1173         # and again with something large enough to use multiple segments,
1174         # and hopefully trigger pauseProducing too
1175         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1176                                            "big" * 500000)) # 1.5MB
1177         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1178         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1179
1180         # can we replace files in place?
1181         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1182                                            "NEWER contents"))
1183         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1184         d.addCallback(self.failUnlessEqual, "NEWER contents")
1185
1186         # test unlinked POST
1187         d.addCallback(lambda res: self.POST("uri", t="upload",
1188                                             file=("new.txt", "data" * 10000)))
1189         # and again using the helper, which exercises different upload-status
1190         # display code
1191         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1192                                             file=("foo.txt", "data2" * 10000)))
1193
1194         # check that the status page exists
1195         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1196         def _got_status(res):
1197             # find an interesting upload and download to look at. LIT files
1198             # are not interesting.
1199             for ds in self.clients[0].list_all_download_statuses():
1200                 if ds.get_size() > 200:
1201                     self._down_status = ds.get_counter()
1202             for us in self.clients[0].list_all_upload_statuses():
1203                 if us.get_size() > 200:
1204                     self._up_status = us.get_counter()
1205             rs = list(self.clients[0].list_all_retrieve_statuses())[0]
1206             self._retrieve_status = rs.get_counter()
1207             ps = list(self.clients[0].list_all_publish_statuses())[0]
1208             self._publish_status = ps.get_counter()
1209             us = list(self.clients[0].list_all_mapupdate_statuses())[0]
1210             self._update_status = us.get_counter()
1211
1212             # and that there are some upload- and download- status pages
1213             return self.GET("status/up-%d" % self._up_status)
1214         d.addCallback(_got_status)
1215         def _got_up(res):
1216             return self.GET("status/down-%d" % self._down_status)
1217         d.addCallback(_got_up)
1218         def _got_down(res):
1219             return self.GET("status/mapupdate-%d" % self._update_status)
1220         d.addCallback(_got_down)
1221         def _got_update(res):
1222             return self.GET("status/publish-%d" % self._publish_status)
1223         d.addCallback(_got_update)
1224         def _got_publish(res):
1225             return self.GET("status/retrieve-%d" % self._retrieve_status)
1226         d.addCallback(_got_publish)
1227
1228         # check that the helper status page exists
1229         d.addCallback(lambda res:
1230                       self.GET("helper_status", followRedirect=True))
1231         def _got_helper_status(res):
1232             self.failUnless("Bytes Fetched:" in res)
1233             # touch a couple of files in the helper's working directory to
1234             # exercise more code paths
1235             workdir = os.path.join(self.getdir("client0"), "helper")
1236             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1237             f = open(incfile, "wb")
1238             f.write("small file")
1239             f.close()
1240             then = time.time() - 86400*3
1241             now = time.time()
1242             os.utime(incfile, (now, then))
1243             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1244             f = open(encfile, "wb")
1245             f.write("less small file")
1246             f.close()
1247             os.utime(encfile, (now, then))
1248         d.addCallback(_got_helper_status)
1249         # and that the json form exists
1250         d.addCallback(lambda res:
1251                       self.GET("helper_status?t=json", followRedirect=True))
1252         def _got_helper_status_json(res):
1253             data = simplejson.loads(res)
1254             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1255                                  1)
1256             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1257             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1258             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1259                                  10)
1260             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1261             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1262             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1263                                  15)
1264         d.addCallback(_got_helper_status_json)
1265
1266         # and check that client[3] (which uses a helper but does not run one
1267         # itself) doesn't explode when you ask for its status
1268         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1269         def _got_non_helper_status(res):
1270             self.failUnless("Upload and Download Status" in res)
1271         d.addCallback(_got_non_helper_status)
1272
1273         # or for helper status with t=json
1274         d.addCallback(lambda res:
1275                       getPage(self.helper_webish_url + "helper_status?t=json"))
1276         def _got_non_helper_status_json(res):
1277             data = simplejson.loads(res)
1278             self.failUnlessEqual(data, {})
1279         d.addCallback(_got_non_helper_status_json)
1280
1281         # see if the statistics page exists
1282         d.addCallback(lambda res: self.GET("statistics"))
1283         def _got_stats(res):
1284             self.failUnless("Node Statistics" in res)
1285             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1286         d.addCallback(_got_stats)
1287         d.addCallback(lambda res: self.GET("statistics?t=json"))
1288         def _got_stats_json(res):
1289             data = simplejson.loads(res)
1290             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1291             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1292         d.addCallback(_got_stats_json)
1293
1294         # TODO: mangle the second segment of a file, to test errors that
1295         # occur after we've already sent some good data, which uses a
1296         # different error path.
1297
1298         # TODO: download a URI with a form
1299         # TODO: create a directory by using a form
1300         # TODO: upload by using a form on the directory page
1301         #    url = base + "somedir/subdir1/freeform_post!!upload"
1302         # TODO: delete a file by using a button on the directory page
1303
1304         return d
1305
1306     def _test_runner(self, res):
1307         # exercise some of the diagnostic tools in runner.py
1308
1309         # find a share
1310         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1311             if "storage" not in dirpath:
1312                 continue
1313             if not filenames:
1314                 continue
1315             pieces = dirpath.split(os.sep)
1316             if (len(pieces) >= 4
1317                 and pieces[-4] == "storage"
1318                 and pieces[-3] == "shares"):
1319                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1320                 # are sharefiles here
1321                 filename = os.path.join(dirpath, filenames[0])
1322                 # peek at the magic to see if it is a chk share
1323                 magic = open(filename, "rb").read(4)
1324                 if magic == '\x00\x00\x00\x01':
1325                     break
1326         else:
1327             self.fail("unable to find any uri_extension files in %s"
1328                       % self.basedir)
1329         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1330
1331         out,err = StringIO(), StringIO()
1332         rc = runner.runner(["debug", "dump-share", "--offsets",
1333                             filename],
1334                            stdout=out, stderr=err)
1335         output = out.getvalue()
1336         self.failUnlessEqual(rc, 0)
1337
1338         # we only upload a single file, so we can assert some things about
1339         # its size and shares.
1340         self.failUnless(("share filename: %s" % filename) in output)
1341         self.failUnless("size: %d\n" % len(self.data) in output)
1342         self.failUnless("num_segments: 1\n" in output)
1343         # segment_size is always a multiple of needed_shares
1344         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1345         self.failUnless("total_shares: 10\n" in output)
1346         # keys which are supposed to be present
1347         for key in ("size", "num_segments", "segment_size",
1348                     "needed_shares", "total_shares",
1349                     "codec_name", "codec_params", "tail_codec_params",
1350                     #"plaintext_hash", "plaintext_root_hash",
1351                     "crypttext_hash", "crypttext_root_hash",
1352                     "share_root_hash", "UEB_hash"):
1353             self.failUnless("%s: " % key in output, key)
1354         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1355
1356         # now use its storage index to find the other shares using the
1357         # 'find-shares' tool
1358         sharedir, shnum = os.path.split(filename)
1359         storagedir, storage_index_s = os.path.split(sharedir)
1360         out,err = StringIO(), StringIO()
1361         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1362         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1363         rc = runner.runner(cmd, stdout=out, stderr=err)
1364         self.failUnlessEqual(rc, 0)
1365         out.seek(0)
1366         sharefiles = [sfn.strip() for sfn in out.readlines()]
1367         self.failUnlessEqual(len(sharefiles), 10)
1368
1369         # also exercise the 'catalog-shares' tool
1370         out,err = StringIO(), StringIO()
1371         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1372         cmd = ["debug", "catalog-shares"] + nodedirs
1373         rc = runner.runner(cmd, stdout=out, stderr=err)
1374         self.failUnlessEqual(rc, 0)
1375         out.seek(0)
1376         descriptions = [sfn.strip() for sfn in out.readlines()]
1377         self.failUnlessEqual(len(descriptions), 30)
1378         matching = [line
1379                     for line in descriptions
1380                     if line.startswith("CHK %s " % storage_index_s)]
1381         self.failUnlessEqual(len(matching), 10)
1382
1383     def _test_control(self, res):
1384         # exercise the remote-control-the-client foolscap interfaces in
1385         # allmydata.control (mostly used for performance tests)
1386         c0 = self.clients[0]
1387         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1388         control_furl = open(control_furl_file, "r").read().strip()
1389         # it doesn't really matter which Tub we use to connect to the client,
1390         # so let's just use our IntroducerNode's
1391         d = self.introducer.tub.getReference(control_furl)
1392         d.addCallback(self._test_control2, control_furl_file)
1393         return d
1394     def _test_control2(self, rref, filename):
1395         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1396         downfile = os.path.join(self.basedir, "control.downfile")
1397         d.addCallback(lambda uri:
1398                       rref.callRemote("download_from_uri_to_file",
1399                                       uri, downfile))
1400         def _check(res):
1401             self.failUnlessEqual(res, downfile)
1402             data = open(downfile, "r").read()
1403             expected_data = open(filename, "r").read()
1404             self.failUnlessEqual(data, expected_data)
1405         d.addCallback(_check)
1406         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1407         if sys.platform == "linux2":
1408             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1409         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1410         return d
1411
1412     def _test_cli(self, res):
1413         # run various CLI commands (in a thread, since they use blocking
1414         # network calls)
1415
1416         private_uri = self._private_node.get_uri()
1417         some_uri = self._root_directory_uri
1418         client0_basedir = self.getdir("client0")
1419
1420         nodeargs = [
1421             "--node-directory", client0_basedir,
1422             ]
1423         TESTDATA = "I will not write the same thing over and over.\n" * 100
1424
1425         d = defer.succeed(None)
1426
1427         # for compatibility with earlier versions, private/root_dir.cap is
1428         # supposed to be treated as an alias named "tahoe:". Start by making
1429         # sure that works, before we add other aliases.
1430
1431         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1432         f = open(root_file, "w")
1433         f.write(private_uri)
1434         f.close()
1435
1436         def run(ignored, verb, *args, **kwargs):
1437             stdin = kwargs.get("stdin", "")
1438             newargs = [verb] + nodeargs + list(args)
1439             return self._run_cli(newargs, stdin=stdin)
1440
1441         def _check_ls((out,err), expected_children, unexpected_children=[]):
1442             self.failUnlessEqual(err, "")
1443             for s in expected_children:
1444                 self.failUnless(s in out, (s,out))
1445             for s in unexpected_children:
1446                 self.failIf(s in out, (s,out))
1447
1448         def _check_ls_root((out,err)):
1449             self.failUnless("personal" in out)
1450             self.failUnless("s2-ro" in out)
1451             self.failUnless("s2-rw" in out)
1452             self.failUnlessEqual(err, "")
1453
1454         # this should reference private_uri
1455         d.addCallback(run, "ls")
1456         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1457
1458         d.addCallback(run, "list-aliases")
1459         def _check_aliases_1((out,err)):
1460             self.failUnlessEqual(err, "")
1461             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1462         d.addCallback(_check_aliases_1)
1463
1464         # now that that's out of the way, remove root_dir.cap and work with
1465         # new files
1466         d.addCallback(lambda res: os.unlink(root_file))
1467         d.addCallback(run, "list-aliases")
1468         def _check_aliases_2((out,err)):
1469             self.failUnlessEqual(err, "")
1470             self.failUnlessEqual(out, "")
1471         d.addCallback(_check_aliases_2)
1472
1473         d.addCallback(run, "mkdir")
1474         def _got_dir( (out,err) ):
1475             self.failUnless(uri.from_string_dirnode(out.strip()))
1476             return out.strip()
1477         d.addCallback(_got_dir)
1478         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1479
1480         d.addCallback(run, "list-aliases")
1481         def _check_aliases_3((out,err)):
1482             self.failUnlessEqual(err, "")
1483             self.failUnless("tahoe: " in out)
1484         d.addCallback(_check_aliases_3)
1485
1486         def _check_empty_dir((out,err)):
1487             self.failUnlessEqual(out, "")
1488             self.failUnlessEqual(err, "")
1489         d.addCallback(run, "ls")
1490         d.addCallback(_check_empty_dir)
1491
1492         def _check_missing_dir((out,err)):
1493             # TODO: check that rc==2
1494             self.failUnlessEqual(out, "")
1495             self.failUnlessEqual(err, "No such file or directory\n")
1496         d.addCallback(run, "ls", "bogus")
1497         d.addCallback(_check_missing_dir)
1498
1499         files = []
1500         datas = []
1501         for i in range(10):
1502             fn = os.path.join(self.basedir, "file%d" % i)
1503             files.append(fn)
1504             data = "data to be uploaded: file%d\n" % i
1505             datas.append(data)
1506             open(fn,"wb").write(data)
1507
1508         def _check_stdout_against((out,err), filenum=None, data=None):
1509             self.failUnlessEqual(err, "")
1510             if filenum is not None:
1511                 self.failUnlessEqual(out, datas[filenum])
1512             if data is not None:
1513                 self.failUnlessEqual(out, data)
1514
1515         # test all both forms of put: from a file, and from stdin
1516         #  tahoe put bar FOO
1517         d.addCallback(run, "put", files[0], "tahoe-file0")
1518         def _put_out((out,err)):
1519             self.failUnless("URI:LIT:" in out, out)
1520             self.failUnless("201 Created" in err, err)
1521             uri0 = out.strip()
1522             return run(None, "get", uri0)
1523         d.addCallback(_put_out)
1524         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1525
1526         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1527         #  tahoe put bar tahoe:FOO
1528         d.addCallback(run, "put", files[2], "tahoe:file2")
1529         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1530         def _check_put_mutable((out,err)):
1531             self._mutable_file3_uri = out.strip()
1532         d.addCallback(_check_put_mutable)
1533         d.addCallback(run, "get", "tahoe:file3")
1534         d.addCallback(_check_stdout_against, 3)
1535
1536         #  tahoe put FOO
1537         STDIN_DATA = "This is the file to upload from stdin."
1538         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1539         #  tahoe put tahoe:FOO
1540         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1541                       stdin="Other file from stdin.")
1542
1543         d.addCallback(run, "ls")
1544         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1545                                   "tahoe-file-stdin", "from-stdin"])
1546         d.addCallback(run, "ls", "subdir")
1547         d.addCallback(_check_ls, ["tahoe-file1"])
1548
1549         # tahoe mkdir FOO
1550         d.addCallback(run, "mkdir", "subdir2")
1551         d.addCallback(run, "ls")
1552         # TODO: extract the URI, set an alias with it
1553         d.addCallback(_check_ls, ["subdir2"])
1554
1555         # tahoe get: (to stdin and to a file)
1556         d.addCallback(run, "get", "tahoe-file0")
1557         d.addCallback(_check_stdout_against, 0)
1558         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1559         d.addCallback(_check_stdout_against, 1)
1560         outfile0 = os.path.join(self.basedir, "outfile0")
1561         d.addCallback(run, "get", "file2", outfile0)
1562         def _check_outfile0((out,err)):
1563             data = open(outfile0,"rb").read()
1564             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1565         d.addCallback(_check_outfile0)
1566         outfile1 = os.path.join(self.basedir, "outfile0")
1567         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1568         def _check_outfile1((out,err)):
1569             data = open(outfile1,"rb").read()
1570             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1571         d.addCallback(_check_outfile1)
1572
1573         d.addCallback(run, "rm", "tahoe-file0")
1574         d.addCallback(run, "rm", "tahoe:file2")
1575         d.addCallback(run, "ls")
1576         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1577
1578         d.addCallback(run, "ls", "-l")
1579         def _check_ls_l((out,err)):
1580             lines = out.split("\n")
1581             for l in lines:
1582                 if "tahoe-file-stdin" in l:
1583                     self.failUnless(l.startswith("-r-- "), l)
1584                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1585                 if "file3" in l:
1586                     self.failUnless(l.startswith("-rw- "), l) # mutable
1587         d.addCallback(_check_ls_l)
1588
1589         d.addCallback(run, "ls", "--uri")
1590         def _check_ls_uri((out,err)):
1591             lines = out.split("\n")
1592             for l in lines:
1593                 if "file3" in l:
1594                     self.failUnless(self._mutable_file3_uri in l)
1595         d.addCallback(_check_ls_uri)
1596
1597         d.addCallback(run, "ls", "--readonly-uri")
1598         def _check_ls_rouri((out,err)):
1599             lines = out.split("\n")
1600             for l in lines:
1601                 if "file3" in l:
1602                     rw_uri = self._mutable_file3_uri
1603                     u = uri.from_string_mutable_filenode(rw_uri)
1604                     ro_uri = u.get_readonly().to_string()
1605                     self.failUnless(ro_uri in l)
1606         d.addCallback(_check_ls_rouri)
1607
1608
1609         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1610         d.addCallback(run, "ls")
1611         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1612
1613         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1614         d.addCallback(run, "ls")
1615         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1616
1617         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1618         d.addCallback(run, "ls")
1619         d.addCallback(_check_ls, ["file3", "file3-copy"])
1620         d.addCallback(run, "get", "tahoe:file3-copy")
1621         d.addCallback(_check_stdout_against, 3)
1622
1623         # copy from disk into tahoe
1624         d.addCallback(run, "cp", files[4], "tahoe:file4")
1625         d.addCallback(run, "ls")
1626         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1627         d.addCallback(run, "get", "tahoe:file4")
1628         d.addCallback(_check_stdout_against, 4)
1629
1630         # copy from tahoe into disk
1631         target_filename = os.path.join(self.basedir, "file-out")
1632         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1633         def _check_cp_out((out,err)):
1634             self.failUnless(os.path.exists(target_filename))
1635             got = open(target_filename,"rb").read()
1636             self.failUnlessEqual(got, datas[4])
1637         d.addCallback(_check_cp_out)
1638
1639         # copy from disk to disk (silly case)
1640         target2_filename = os.path.join(self.basedir, "file-out-copy")
1641         d.addCallback(run, "cp", target_filename, target2_filename)
1642         def _check_cp_out2((out,err)):
1643             self.failUnless(os.path.exists(target2_filename))
1644             got = open(target2_filename,"rb").read()
1645             self.failUnlessEqual(got, datas[4])
1646         d.addCallback(_check_cp_out2)
1647
1648         # copy from tahoe into disk, overwriting an existing file
1649         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1650         def _check_cp_out3((out,err)):
1651             self.failUnless(os.path.exists(target_filename))
1652             got = open(target_filename,"rb").read()
1653             self.failUnlessEqual(got, datas[3])
1654         d.addCallback(_check_cp_out3)
1655
1656         # copy from disk into tahoe, overwriting an existing immutable file
1657         d.addCallback(run, "cp", files[5], "tahoe:file4")
1658         d.addCallback(run, "ls")
1659         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1660         d.addCallback(run, "get", "tahoe:file4")
1661         d.addCallback(_check_stdout_against, 5)
1662
1663         # copy from disk into tahoe, overwriting an existing mutable file
1664         d.addCallback(run, "cp", files[5], "tahoe:file3")
1665         d.addCallback(run, "ls")
1666         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1667         d.addCallback(run, "get", "tahoe:file3")
1668         d.addCallback(_check_stdout_against, 5)
1669
1670         # recursive copy: setup
1671         dn = os.path.join(self.basedir, "dir1")
1672         os.makedirs(dn)
1673         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1674         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1675         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1676         sdn2 = os.path.join(dn, "subdir2")
1677         os.makedirs(sdn2)
1678         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1679         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1680
1681         # from disk into tahoe
1682         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1683         d.addCallback(run, "ls")
1684         d.addCallback(_check_ls, ["dir1"])
1685         d.addCallback(run, "ls", "dir1")
1686         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1687                       ["rfile4", "rfile5"])
1688         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1689         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1690                       ["rfile1", "rfile2", "rfile3"])
1691         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1692         d.addCallback(_check_stdout_against, data="rfile4")
1693
1694         # and back out again
1695         dn_copy = os.path.join(self.basedir, "dir1-copy")
1696         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1697         def _check_cp_r_out((out,err)):
1698             def _cmp(name):
1699                 old = open(os.path.join(dn, name), "rb").read()
1700                 newfn = os.path.join(dn_copy, name)
1701                 self.failUnless(os.path.exists(newfn))
1702                 new = open(newfn, "rb").read()
1703                 self.failUnlessEqual(old, new)
1704             _cmp("rfile1")
1705             _cmp("rfile2")
1706             _cmp("rfile3")
1707             _cmp(os.path.join("subdir2", "rfile4"))
1708             _cmp(os.path.join("subdir2", "rfile5"))
1709         d.addCallback(_check_cp_r_out)
1710
1711         # and copy it a second time, which ought to overwrite the same files
1712         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1713
1714         # and again, only writing filecaps
1715         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1716         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1717         def _check_capsonly((out,err)):
1718             # these should all be LITs
1719             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1720             y = uri.from_string_filenode(x)
1721             self.failUnlessEqual(y.data, "rfile4")
1722         d.addCallback(_check_capsonly)
1723
1724         # and tahoe-to-tahoe
1725         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1726         d.addCallback(run, "ls")
1727         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1728         d.addCallback(run, "ls", "dir1-copy")
1729         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1730                       ["rfile4", "rfile5"])
1731         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1732         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1733                       ["rfile1", "rfile2", "rfile3"])
1734         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1735         d.addCallback(_check_stdout_against, data="rfile4")
1736
1737         # and copy it a second time, which ought to overwrite the same files
1738         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1739
1740         # tahoe_ls doesn't currently handle the error correctly: it tries to
1741         # JSON-parse a traceback.
1742 ##         def _ls_missing(res):
1743 ##             argv = ["ls"] + nodeargs + ["bogus"]
1744 ##             return self._run_cli(argv)
1745 ##         d.addCallback(_ls_missing)
1746 ##         def _check_ls_missing((out,err)):
1747 ##             print "OUT", out
1748 ##             print "ERR", err
1749 ##             self.failUnlessEqual(err, "")
1750 ##         d.addCallback(_check_ls_missing)
1751
1752         return d
1753
1754     def _run_cli(self, argv, stdin=""):
1755         #print "CLI:", argv
1756         stdout, stderr = StringIO(), StringIO()
1757         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1758                                   stdin=StringIO(stdin),
1759                                   stdout=stdout, stderr=stderr)
1760         def _done(res):
1761             return stdout.getvalue(), stderr.getvalue()
1762         d.addCallback(_done)
1763         return d
1764
1765     def _test_checker(self, res):
1766         ut = upload.Data("too big to be literal" * 200, convergence=None)
1767         d = self._personal_node.add_file(u"big file", ut)
1768
1769         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1770         def _check_dirnode_results(r):
1771             self.failUnless(r.is_healthy())
1772         d.addCallback(_check_dirnode_results)
1773         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1774         d.addCallback(_check_dirnode_results)
1775
1776         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1777         def _got_chk_filenode(n):
1778             self.failUnless(isinstance(n, filenode.FileNode))
1779             d = n.check(Monitor())
1780             def _check_filenode_results(r):
1781                 self.failUnless(r.is_healthy())
1782             d.addCallback(_check_filenode_results)
1783             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1784             d.addCallback(_check_filenode_results)
1785             return d
1786         d.addCallback(_got_chk_filenode)
1787
1788         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1789         def _got_lit_filenode(n):
1790             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1791             d = n.check(Monitor())
1792             def _check_lit_filenode_results(r):
1793                 self.failUnlessEqual(r, None)
1794             d.addCallback(_check_lit_filenode_results)
1795             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1796             d.addCallback(_check_lit_filenode_results)
1797             return d
1798         d.addCallback(_got_lit_filenode)
1799         return d
1800