]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
start to factor server-connection-management into a distinct 'StorageServerFarmBroker...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, re, simplejson
3 from cStringIO import StringIO
4 from zope.interface import implements
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.internet.interfaces import IConsumer, IPushProducer
10 import allmydata
11 from allmydata import uri
12 from allmydata.storage.mutable import MutableShareFile
13 from allmydata.storage.server import si_a2b
14 from allmydata.immutable import download, filenode, offloaded, upload
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.scripts import runner
18 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
19      NoSuchChildError, NotEnoughSharesError
20 from allmydata.monitor import Monitor
21 from allmydata.mutable.common import NotMutableError
22 from allmydata.mutable import layout as mutable_layout
23 from foolscap.api import DeadReferenceError
24 from twisted.python.failure import Failure
25 from twisted.web.client import getPage
26 from twisted.web.error import Error
27
28 from allmydata.test.common import SystemTestMixin, MemoryConsumer, \
29      download_to_data
30
31 LARGE_DATA = """
32 This is some data to publish to the virtual drive, which needs to be large
33 enough to not fit inside a LIT uri.
34 """
35
36 class CountingDataUploadable(upload.Data):
37     bytes_read = 0
38     interrupt_after = None
39     interrupt_after_d = None
40
41     def read(self, length):
42         self.bytes_read += length
43         if self.interrupt_after is not None:
44             if self.bytes_read > self.interrupt_after:
45                 self.interrupt_after = None
46                 self.interrupt_after_d.callback(self)
47         return upload.Data.read(self, length)
48
49 class GrabEverythingConsumer:
50     implements(IConsumer)
51
52     def __init__(self):
53         self.contents = ""
54
55     def registerProducer(self, producer, streaming):
56         assert streaming
57         assert IPushProducer.providedBy(producer)
58
59     def write(self, data):
60         self.contents += data
61
62     def unregisterProducer(self):
63         pass
64
65 class SystemTest(SystemTestMixin, unittest.TestCase):
66
67     def test_connections(self):
68         self.basedir = "system/SystemTest/test_connections"
69         d = self.set_up_nodes()
70         self.extra_node = None
71         d.addCallback(lambda res: self.add_extra_node(self.numclients))
72         def _check(extra_node):
73             self.extra_node = extra_node
74             for c in self.clients:
75                 all_peerids = list(c.get_all_serverids())
76                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
77                 sb = c.storage_broker
78                 permuted_peers = list(sb.get_servers("a"))
79                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
80
81         d.addCallback(_check)
82         def _shutdown_extra_node(res):
83             if self.extra_node:
84                 return self.extra_node.stopService()
85             return res
86         d.addBoth(_shutdown_extra_node)
87         return d
88     test_connections.timeout = 300
89     # test_connections is subsumed by test_upload_and_download, and takes
90     # quite a while to run on a slow machine (because of all the TLS
91     # connections that must be established). If we ever rework the introducer
92     # code to such an extent that we're not sure if it works anymore, we can
93     # reinstate this test until it does.
94     del test_connections
95
96     def test_upload_and_download_random_key(self):
97         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
98         return self._test_upload_and_download(convergence=None)
99     test_upload_and_download_random_key.timeout = 4800
100
101     def test_upload_and_download_convergent(self):
102         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
103         return self._test_upload_and_download(convergence="some convergence string")
104     test_upload_and_download_convergent.timeout = 4800
105
106     def _test_upload_and_download(self, convergence):
107         # we use 4000 bytes of data, which will result in about 400k written
108         # to disk among all our simulated nodes
109         DATA = "Some data to upload\n" * 200
110         d = self.set_up_nodes()
111         def _check_connections(res):
112             for c in self.clients:
113                 all_peerids = list(c.get_all_serverids())
114                 self.failUnlessEqual(len(all_peerids), self.numclients)
115                 sb = c.storage_broker
116                 permuted_peers = list(sb.get_servers("a"))
117                 self.failUnlessEqual(len(permuted_peers), self.numclients)
118         d.addCallback(_check_connections)
119
120         def _do_upload(res):
121             log.msg("UPLOADING")
122             u = self.clients[0].getServiceNamed("uploader")
123             self.uploader = u
124             # we crank the max segsize down to 1024b for the duration of this
125             # test, so we can exercise multiple segments. It is important
126             # that this is not a multiple of the segment size, so that the
127             # tail segment is not the same length as the others. This actualy
128             # gets rounded up to 1025 to be a multiple of the number of
129             # required shares (since we use 25 out of 100 FEC).
130             up = upload.Data(DATA, convergence=convergence)
131             up.max_segment_size = 1024
132             d1 = u.upload(up)
133             return d1
134         d.addCallback(_do_upload)
135         def _upload_done(results):
136             theuri = results.uri
137             log.msg("upload finished: uri is %s" % (theuri,))
138             self.uri = theuri
139             assert isinstance(self.uri, str), self.uri
140             dl = self.clients[1].getServiceNamed("downloader")
141             self.downloader = dl
142         d.addCallback(_upload_done)
143
144         def _upload_again(res):
145             # Upload again. If using convergent encryption then this ought to be
146             # short-circuited, however with the way we currently generate URIs
147             # (i.e. because they include the roothash), we have to do all of the
148             # encoding work, and only get to save on the upload part.
149             log.msg("UPLOADING AGAIN")
150             up = upload.Data(DATA, convergence=convergence)
151             up.max_segment_size = 1024
152             d1 = self.uploader.upload(up)
153         d.addCallback(_upload_again)
154
155         def _download_to_data(res):
156             log.msg("DOWNLOADING")
157             return self.downloader.download_to_data(self.uri)
158         d.addCallback(_download_to_data)
159         def _download_to_data_done(data):
160             log.msg("download finished")
161             self.failUnlessEqual(data, DATA)
162         d.addCallback(_download_to_data_done)
163
164         target_filename = os.path.join(self.basedir, "download.target")
165         def _download_to_filename(res):
166             return self.downloader.download_to_filename(self.uri,
167                                                         target_filename)
168         d.addCallback(_download_to_filename)
169         def _download_to_filename_done(res):
170             newdata = open(target_filename, "rb").read()
171             self.failUnlessEqual(newdata, DATA)
172         d.addCallback(_download_to_filename_done)
173
174         target_filename2 = os.path.join(self.basedir, "download.target2")
175         def _download_to_filehandle(res):
176             fh = open(target_filename2, "wb")
177             return self.downloader.download_to_filehandle(self.uri, fh)
178         d.addCallback(_download_to_filehandle)
179         def _download_to_filehandle_done(fh):
180             fh.close()
181             newdata = open(target_filename2, "rb").read()
182             self.failUnlessEqual(newdata, DATA)
183         d.addCallback(_download_to_filehandle_done)
184
185         consumer = GrabEverythingConsumer()
186         ct = download.ConsumerAdapter(consumer)
187         d.addCallback(lambda res:
188                       self.downloader.download(self.uri, ct))
189         def _download_to_consumer_done(ign):
190             self.failUnlessEqual(consumer.contents, DATA)
191         d.addCallback(_download_to_consumer_done)
192
193         def _test_read(res):
194             n = self.clients[1].create_node_from_uri(self.uri)
195             d = download_to_data(n)
196             def _read_done(data):
197                 self.failUnlessEqual(data, DATA)
198             d.addCallback(_read_done)
199             d.addCallback(lambda ign:
200                           n.read(MemoryConsumer(), offset=1, size=4))
201             def _read_portion_done(mc):
202                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
203             d.addCallback(_read_portion_done)
204             d.addCallback(lambda ign:
205                           n.read(MemoryConsumer(), offset=2, size=None))
206             def _read_tail_done(mc):
207                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
208             d.addCallback(_read_tail_done)
209             d.addCallback(lambda ign:
210                           n.read(MemoryConsumer(), size=len(DATA)+1000))
211             def _read_too_much(mc):
212                 self.failUnlessEqual("".join(mc.chunks), DATA)
213             d.addCallback(_read_too_much)
214
215             return d
216         d.addCallback(_test_read)
217
218         def _test_bad_read(res):
219             bad_u = uri.from_string_filenode(self.uri)
220             bad_u.key = self.flip_bit(bad_u.key)
221             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
222             # this should cause an error during download
223
224             d = self.shouldFail2(NotEnoughSharesError, "'download bad node'",
225                                  None,
226                                  bad_n.read, MemoryConsumer(), offset=2)
227             return d
228         d.addCallback(_test_bad_read)
229
230         def _download_nonexistent_uri(res):
231             baduri = self.mangle_uri(self.uri)
232             log.msg("about to download non-existent URI", level=log.UNUSUAL,
233                     facility="tahoe.tests")
234             d1 = self.downloader.download_to_data(baduri)
235             def _baduri_should_fail(res):
236                 log.msg("finished downloading non-existend URI",
237                         level=log.UNUSUAL, facility="tahoe.tests")
238                 self.failUnless(isinstance(res, Failure))
239                 self.failUnless(res.check(NotEnoughSharesError),
240                                 "expected NotEnoughSharesError, got %s" % res)
241                 # TODO: files that have zero peers should get a special kind
242                 # of NotEnoughSharesError, which can be used to suggest that
243                 # the URI might be wrong or that they've never uploaded the
244                 # file in the first place.
245             d1.addBoth(_baduri_should_fail)
246             return d1
247         d.addCallback(_download_nonexistent_uri)
248
249         # add a new node, which doesn't accept shares, and only uses the
250         # helper for upload.
251         d.addCallback(lambda res: self.add_extra_node(self.numclients,
252                                                       self.helper_furl,
253                                                       add_to_sparent=True))
254         def _added(extra_node):
255             self.extra_node = extra_node
256         d.addCallback(_added)
257
258         HELPER_DATA = "Data that needs help to upload" * 1000
259         def _upload_with_helper(res):
260             u = upload.Data(HELPER_DATA, convergence=convergence)
261             d = self.extra_node.upload(u)
262             def _uploaded(results):
263                 uri = results.uri
264                 return self.downloader.download_to_data(uri)
265             d.addCallback(_uploaded)
266             def _check(newdata):
267                 self.failUnlessEqual(newdata, HELPER_DATA)
268             d.addCallback(_check)
269             return d
270         d.addCallback(_upload_with_helper)
271
272         def _upload_duplicate_with_helper(res):
273             u = upload.Data(HELPER_DATA, convergence=convergence)
274             u.debug_stash_RemoteEncryptedUploadable = True
275             d = self.extra_node.upload(u)
276             def _uploaded(results):
277                 uri = results.uri
278                 return self.downloader.download_to_data(uri)
279             d.addCallback(_uploaded)
280             def _check(newdata):
281                 self.failUnlessEqual(newdata, HELPER_DATA)
282                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
283                             "uploadable started uploading, should have been avoided")
284             d.addCallback(_check)
285             return d
286         if convergence is not None:
287             d.addCallback(_upload_duplicate_with_helper)
288
289         def _upload_resumable(res):
290             DATA = "Data that needs help to upload and gets interrupted" * 1000
291             u1 = CountingDataUploadable(DATA, convergence=convergence)
292             u2 = CountingDataUploadable(DATA, convergence=convergence)
293
294             # we interrupt the connection after about 5kB by shutting down
295             # the helper, then restartingit.
296             u1.interrupt_after = 5000
297             u1.interrupt_after_d = defer.Deferred()
298             u1.interrupt_after_d.addCallback(lambda res:
299                                              self.bounce_client(0))
300
301             # sneak into the helper and reduce its chunk size, so that our
302             # debug_interrupt will sever the connection on about the fifth
303             # chunk fetched. This makes sure that we've started to write the
304             # new shares before we abandon them, which exercises the
305             # abort/delete-partial-share code. TODO: find a cleaner way to do
306             # this. I know that this will affect later uses of the helper in
307             # this same test run, but I'm not currently worried about it.
308             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
309
310             d = self.extra_node.upload(u1)
311
312             def _should_not_finish(res):
313                 self.fail("interrupted upload should have failed, not finished"
314                           " with result %s" % (res,))
315             def _interrupted(f):
316                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
317
318                 # make sure we actually interrupted it before finishing the
319                 # file
320                 self.failUnless(u1.bytes_read < len(DATA),
321                                 "read %d out of %d total" % (u1.bytes_read,
322                                                              len(DATA)))
323
324                 log.msg("waiting for reconnect", level=log.NOISY,
325                         facility="tahoe.test.test_system")
326                 # now, we need to give the nodes a chance to notice that this
327                 # connection has gone away. When this happens, the storage
328                 # servers will be told to abort their uploads, removing the
329                 # partial shares. Unfortunately this involves TCP messages
330                 # going through the loopback interface, and we can't easily
331                 # predict how long that will take. If it were all local, we
332                 # could use fireEventually() to stall. Since we don't have
333                 # the right introduction hooks, the best we can do is use a
334                 # fixed delay. TODO: this is fragile.
335                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
336                 return u1.interrupt_after_d
337             d.addCallbacks(_should_not_finish, _interrupted)
338
339             def _disconnected(res):
340                 # check to make sure the storage servers aren't still hanging
341                 # on to the partial share: their incoming/ directories should
342                 # now be empty.
343                 log.msg("disconnected", level=log.NOISY,
344                         facility="tahoe.test.test_system")
345                 for i in range(self.numclients):
346                     incdir = os.path.join(self.getdir("client%d" % i),
347                                           "storage", "shares", "incoming")
348                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
349             d.addCallback(_disconnected)
350
351             # then we need to give the reconnector a chance to
352             # reestablish the connection to the helper.
353             d.addCallback(lambda res:
354                           log.msg("wait_for_connections", level=log.NOISY,
355                                   facility="tahoe.test.test_system"))
356             d.addCallback(lambda res: self.wait_for_connections())
357
358
359             d.addCallback(lambda res:
360                           log.msg("uploading again", level=log.NOISY,
361                                   facility="tahoe.test.test_system"))
362             d.addCallback(lambda res: self.extra_node.upload(u2))
363
364             def _uploaded(results):
365                 uri = results.uri
366                 log.msg("Second upload complete", level=log.NOISY,
367                         facility="tahoe.test.test_system")
368
369                 # this is really bytes received rather than sent, but it's
370                 # convenient and basically measures the same thing
371                 bytes_sent = results.ciphertext_fetched
372
373                 # We currently don't support resumption of upload if the data is
374                 # encrypted with a random key.  (Because that would require us
375                 # to store the key locally and re-use it on the next upload of
376                 # this file, which isn't a bad thing to do, but we currently
377                 # don't do it.)
378                 if convergence is not None:
379                     # Make sure we did not have to read the whole file the
380                     # second time around .
381                     self.failUnless(bytes_sent < len(DATA),
382                                 "resumption didn't save us any work:"
383                                 " read %d bytes out of %d total" %
384                                 (bytes_sent, len(DATA)))
385                 else:
386                     # Make sure we did have to read the whole file the second
387                     # time around -- because the one that we partially uploaded
388                     # earlier was encrypted with a different random key.
389                     self.failIf(bytes_sent < len(DATA),
390                                 "resumption saved us some work even though we were using random keys:"
391                                 " read %d bytes out of %d total" %
392                                 (bytes_sent, len(DATA)))
393                 return self.downloader.download_to_data(uri)
394             d.addCallback(_uploaded)
395
396             def _check(newdata):
397                 self.failUnlessEqual(newdata, DATA)
398                 # If using convergent encryption, then also check that the
399                 # helper has removed the temp file from its directories.
400                 if convergence is not None:
401                     basedir = os.path.join(self.getdir("client0"), "helper")
402                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
403                     self.failUnlessEqual(files, [])
404                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
405                     self.failUnlessEqual(files, [])
406             d.addCallback(_check)
407             return d
408         d.addCallback(_upload_resumable)
409
410         def _grab_stats(ignored):
411             # the StatsProvider doesn't normally publish a FURL:
412             # instead it passes a live reference to the StatsGatherer
413             # (if and when it connects). To exercise the remote stats
414             # interface, we manually publish client0's StatsProvider
415             # and use client1 to query it.
416             sp = self.clients[0].stats_provider
417             sp_furl = self.clients[0].tub.registerReference(sp)
418             d = self.clients[1].tub.getReference(sp_furl)
419             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
420             def _got_stats(stats):
421                 #print "STATS"
422                 #from pprint import pprint
423                 #pprint(stats)
424                 s = stats["stats"]
425                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
426                 c = stats["counters"]
427                 self.failUnless("storage_server.allocate" in c)
428             d.addCallback(_got_stats)
429             return d
430         d.addCallback(_grab_stats)
431
432         return d
433
434     def _find_shares(self, basedir):
435         shares = []
436         for (dirpath, dirnames, filenames) in os.walk(basedir):
437             if "storage" not in dirpath:
438                 continue
439             if not filenames:
440                 continue
441             pieces = dirpath.split(os.sep)
442             if (len(pieces) >= 5
443                 and pieces[-4] == "storage"
444                 and pieces[-3] == "shares"):
445                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
446                 # are sharefiles here
447                 assert pieces[-5].startswith("client")
448                 client_num = int(pieces[-5][-1])
449                 storage_index_s = pieces[-1]
450                 storage_index = si_a2b(storage_index_s)
451                 for sharename in filenames:
452                     shnum = int(sharename)
453                     filename = os.path.join(dirpath, sharename)
454                     data = (client_num, storage_index, filename, shnum)
455                     shares.append(data)
456         if not shares:
457             self.fail("unable to find any share files in %s" % basedir)
458         return shares
459
460     def _corrupt_mutable_share(self, filename, which):
461         msf = MutableShareFile(filename)
462         datav = msf.readv([ (0, 1000000) ])
463         final_share = datav[0]
464         assert len(final_share) < 1000000 # ought to be truncated
465         pieces = mutable_layout.unpack_share(final_share)
466         (seqnum, root_hash, IV, k, N, segsize, datalen,
467          verification_key, signature, share_hash_chain, block_hash_tree,
468          share_data, enc_privkey) = pieces
469
470         if which == "seqnum":
471             seqnum = seqnum + 15
472         elif which == "R":
473             root_hash = self.flip_bit(root_hash)
474         elif which == "IV":
475             IV = self.flip_bit(IV)
476         elif which == "segsize":
477             segsize = segsize + 15
478         elif which == "pubkey":
479             verification_key = self.flip_bit(verification_key)
480         elif which == "signature":
481             signature = self.flip_bit(signature)
482         elif which == "share_hash_chain":
483             nodenum = share_hash_chain.keys()[0]
484             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
485         elif which == "block_hash_tree":
486             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
487         elif which == "share_data":
488             share_data = self.flip_bit(share_data)
489         elif which == "encprivkey":
490             enc_privkey = self.flip_bit(enc_privkey)
491
492         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
493                                             segsize, datalen)
494         final_share = mutable_layout.pack_share(prefix,
495                                                 verification_key,
496                                                 signature,
497                                                 share_hash_chain,
498                                                 block_hash_tree,
499                                                 share_data,
500                                                 enc_privkey)
501         msf.writev( [(0, final_share)], None)
502
503
504     def test_mutable(self):
505         self.basedir = "system/SystemTest/test_mutable"
506         DATA = "initial contents go here."  # 25 bytes % 3 != 0
507         NEWDATA = "new contents yay"
508         NEWERDATA = "this is getting old"
509
510         d = self.set_up_nodes(use_key_generator=True)
511
512         def _create_mutable(res):
513             c = self.clients[0]
514             log.msg("starting create_mutable_file")
515             d1 = c.create_mutable_file(DATA)
516             def _done(res):
517                 log.msg("DONE: %s" % (res,))
518                 self._mutable_node_1 = res
519                 uri = res.get_uri()
520             d1.addCallback(_done)
521             return d1
522         d.addCallback(_create_mutable)
523
524         def _test_debug(res):
525             # find a share. It is important to run this while there is only
526             # one slot in the grid.
527             shares = self._find_shares(self.basedir)
528             (client_num, storage_index, filename, shnum) = shares[0]
529             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
530                     % filename)
531             log.msg(" for clients[%d]" % client_num)
532
533             out,err = StringIO(), StringIO()
534             rc = runner.runner(["debug", "dump-share", "--offsets",
535                                 filename],
536                                stdout=out, stderr=err)
537             output = out.getvalue()
538             self.failUnlessEqual(rc, 0)
539             try:
540                 self.failUnless("Mutable slot found:\n" in output)
541                 self.failUnless("share_type: SDMF\n" in output)
542                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
543                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
544                 self.failUnless(" num_extra_leases: 0\n" in output)
545                 # the pubkey size can vary by a byte, so the container might
546                 # be a bit larger on some runs.
547                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
548                 self.failUnless(m)
549                 container_size = int(m.group(1))
550                 self.failUnless(2037 <= container_size <= 2049, container_size)
551                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
552                 self.failUnless(m)
553                 data_length = int(m.group(1))
554                 self.failUnless(2037 <= data_length <= 2049, data_length)
555                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
556                                 in output)
557                 self.failUnless(" SDMF contents:\n" in output)
558                 self.failUnless("  seqnum: 1\n" in output)
559                 self.failUnless("  required_shares: 3\n" in output)
560                 self.failUnless("  total_shares: 10\n" in output)
561                 self.failUnless("  segsize: 27\n" in output, (output, filename))
562                 self.failUnless("  datalen: 25\n" in output)
563                 # the exact share_hash_chain nodes depends upon the sharenum,
564                 # and is more of a hassle to compute than I want to deal with
565                 # now
566                 self.failUnless("  share_hash_chain: " in output)
567                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
568                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
569                             base32.b2a(storage_index))
570                 self.failUnless(expected in output)
571             except unittest.FailTest:
572                 print
573                 print "dump-share output was:"
574                 print output
575                 raise
576         d.addCallback(_test_debug)
577
578         # test retrieval
579
580         # first, let's see if we can use the existing node to retrieve the
581         # contents. This allows it to use the cached pubkey and maybe the
582         # latest-known sharemap.
583
584         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
585         def _check_download_1(res):
586             self.failUnlessEqual(res, DATA)
587             # now we see if we can retrieve the data from a new node,
588             # constructed using the URI of the original one. We do this test
589             # on the same client that uploaded the data.
590             uri = self._mutable_node_1.get_uri()
591             log.msg("starting retrieve1")
592             newnode = self.clients[0].create_node_from_uri(uri)
593             newnode_2 = self.clients[0].create_node_from_uri(uri)
594             self.failUnlessIdentical(newnode, newnode_2)
595             return newnode.download_best_version()
596         d.addCallback(_check_download_1)
597
598         def _check_download_2(res):
599             self.failUnlessEqual(res, DATA)
600             # same thing, but with a different client
601             uri = self._mutable_node_1.get_uri()
602             newnode = self.clients[1].create_node_from_uri(uri)
603             log.msg("starting retrieve2")
604             d1 = newnode.download_best_version()
605             d1.addCallback(lambda res: (res, newnode))
606             return d1
607         d.addCallback(_check_download_2)
608
609         def _check_download_3((res, newnode)):
610             self.failUnlessEqual(res, DATA)
611             # replace the data
612             log.msg("starting replace1")
613             d1 = newnode.overwrite(NEWDATA)
614             d1.addCallback(lambda res: newnode.download_best_version())
615             return d1
616         d.addCallback(_check_download_3)
617
618         def _check_download_4(res):
619             self.failUnlessEqual(res, NEWDATA)
620             # now create an even newer node and replace the data on it. This
621             # new node has never been used for download before.
622             uri = self._mutable_node_1.get_uri()
623             newnode1 = self.clients[2].create_node_from_uri(uri)
624             newnode2 = self.clients[3].create_node_from_uri(uri)
625             self._newnode3 = self.clients[3].create_node_from_uri(uri)
626             log.msg("starting replace2")
627             d1 = newnode1.overwrite(NEWERDATA)
628             d1.addCallback(lambda res: newnode2.download_best_version())
629             return d1
630         d.addCallback(_check_download_4)
631
632         def _check_download_5(res):
633             log.msg("finished replace2")
634             self.failUnlessEqual(res, NEWERDATA)
635         d.addCallback(_check_download_5)
636
637         def _corrupt_shares(res):
638             # run around and flip bits in all but k of the shares, to test
639             # the hash checks
640             shares = self._find_shares(self.basedir)
641             ## sort by share number
642             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
643             where = dict([ (shnum, filename)
644                            for (client_num, storage_index, filename, shnum)
645                            in shares ])
646             assert len(where) == 10 # this test is designed for 3-of-10
647             for shnum, filename in where.items():
648                 # shares 7,8,9 are left alone. read will check
649                 # (share_hash_chain, block_hash_tree, share_data). New
650                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
651                 # segsize, signature).
652                 if shnum == 0:
653                     # read: this will trigger "pubkey doesn't match
654                     # fingerprint".
655                     self._corrupt_mutable_share(filename, "pubkey")
656                     self._corrupt_mutable_share(filename, "encprivkey")
657                 elif shnum == 1:
658                     # triggers "signature is invalid"
659                     self._corrupt_mutable_share(filename, "seqnum")
660                 elif shnum == 2:
661                     # triggers "signature is invalid"
662                     self._corrupt_mutable_share(filename, "R")
663                 elif shnum == 3:
664                     # triggers "signature is invalid"
665                     self._corrupt_mutable_share(filename, "segsize")
666                 elif shnum == 4:
667                     self._corrupt_mutable_share(filename, "share_hash_chain")
668                 elif shnum == 5:
669                     self._corrupt_mutable_share(filename, "block_hash_tree")
670                 elif shnum == 6:
671                     self._corrupt_mutable_share(filename, "share_data")
672                 # other things to correct: IV, signature
673                 # 7,8,9 are left alone
674
675                 # note that initial_query_count=5 means that we'll hit the
676                 # first 5 servers in effectively random order (based upon
677                 # response time), so we won't necessarily ever get a "pubkey
678                 # doesn't match fingerprint" error (if we hit shnum>=1 before
679                 # shnum=0, we pull the pubkey from there). To get repeatable
680                 # specific failures, we need to set initial_query_count=1,
681                 # but of course that will change the sequencing behavior of
682                 # the retrieval process. TODO: find a reasonable way to make
683                 # this a parameter, probably when we expand this test to test
684                 # for one failure mode at a time.
685
686                 # when we retrieve this, we should get three signature
687                 # failures (where we've mangled seqnum, R, and segsize). The
688                 # pubkey mangling
689         d.addCallback(_corrupt_shares)
690
691         d.addCallback(lambda res: self._newnode3.download_best_version())
692         d.addCallback(_check_download_5)
693
694         def _check_empty_file(res):
695             # make sure we can create empty files, this usually screws up the
696             # segsize math
697             d1 = self.clients[2].create_mutable_file("")
698             d1.addCallback(lambda newnode: newnode.download_best_version())
699             d1.addCallback(lambda res: self.failUnlessEqual("", res))
700             return d1
701         d.addCallback(_check_empty_file)
702
703         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
704         def _created_dirnode(dnode):
705             log.msg("_created_dirnode(%s)" % (dnode,))
706             d1 = dnode.list()
707             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
708             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
709             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
710             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
711             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
712             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
713             d1.addCallback(lambda res: dnode.build_manifest().when_done())
714             d1.addCallback(lambda res:
715                            self.failUnlessEqual(len(res["manifest"]), 1))
716             return d1
717         d.addCallback(_created_dirnode)
718
719         def wait_for_c3_kg_conn():
720             return self.clients[3]._key_generator is not None
721         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
722
723         def check_kg_poolsize(junk, size_delta):
724             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
725                                  self.key_generator_svc.key_generator.pool_size + size_delta)
726
727         d.addCallback(check_kg_poolsize, 0)
728         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
729         d.addCallback(check_kg_poolsize, -1)
730         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
731         d.addCallback(check_kg_poolsize, -2)
732         # use_helper induces use of clients[3], which is the using-key_gen client
733         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
734         d.addCallback(check_kg_poolsize, -3)
735
736         return d
737     # The default 120 second timeout went off when running it under valgrind
738     # on my old Windows laptop, so I'm bumping up the timeout.
739     test_mutable.timeout = 240
740
741     def flip_bit(self, good):
742         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
743
744     def mangle_uri(self, gooduri):
745         # change the key, which changes the storage index, which means we'll
746         # be asking about the wrong file, so nobody will have any shares
747         u = IFileURI(gooduri)
748         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
749                             uri_extension_hash=u.uri_extension_hash,
750                             needed_shares=u.needed_shares,
751                             total_shares=u.total_shares,
752                             size=u.size)
753         return u2.to_string()
754
755     # TODO: add a test which mangles the uri_extension_hash instead, and
756     # should fail due to not being able to get a valid uri_extension block.
757     # Also a test which sneakily mangles the uri_extension block to change
758     # some of the validation data, so it will fail in the post-download phase
759     # when the file's crypttext integrity check fails. Do the same thing for
760     # the key, which should cause the download to fail the post-download
761     # plaintext_hash check.
762
763     def test_vdrive(self):
764         self.basedir = "system/SystemTest/test_vdrive"
765         self.data = LARGE_DATA
766         d = self.set_up_nodes(use_stats_gatherer=True)
767         d.addCallback(self._test_introweb)
768         d.addCallback(self.log, "starting publish")
769         d.addCallback(self._do_publish1)
770         d.addCallback(self._test_runner)
771         d.addCallback(self._do_publish2)
772         # at this point, we have the following filesystem (where "R" denotes
773         # self._root_directory_uri):
774         # R
775         # R/subdir1
776         # R/subdir1/mydata567
777         # R/subdir1/subdir2/
778         # R/subdir1/subdir2/mydata992
779
780         d.addCallback(lambda res: self.bounce_client(0))
781         d.addCallback(self.log, "bounced client0")
782
783         d.addCallback(self._check_publish1)
784         d.addCallback(self.log, "did _check_publish1")
785         d.addCallback(self._check_publish2)
786         d.addCallback(self.log, "did _check_publish2")
787         d.addCallback(self._do_publish_private)
788         d.addCallback(self.log, "did _do_publish_private")
789         # now we also have (where "P" denotes a new dir):
790         #  P/personal/sekrit data
791         #  P/s2-rw -> /subdir1/subdir2/
792         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
793         d.addCallback(self._check_publish_private)
794         d.addCallback(self.log, "did _check_publish_private")
795         d.addCallback(self._test_web)
796         d.addCallback(self._test_control)
797         d.addCallback(self._test_cli)
798         # P now has four top-level children:
799         # P/personal/sekrit data
800         # P/s2-ro/
801         # P/s2-rw/
802         # P/test_put/  (empty)
803         d.addCallback(self._test_checker)
804         return d
805     test_vdrive.timeout = 1100
806
807     def _test_introweb(self, res):
808         d = getPage(self.introweb_url, method="GET", followRedirect=True)
809         def _check(res):
810             try:
811                 self.failUnless("allmydata-tahoe: %s" % str(allmydata.__version__)
812                                 in res)
813                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
814                 self.failUnless("Subscription Summary: storage: 5" in res)
815             except unittest.FailTest:
816                 print
817                 print "GET %s output was:" % self.introweb_url
818                 print res
819                 raise
820         d.addCallback(_check)
821         d.addCallback(lambda res:
822                       getPage(self.introweb_url + "?t=json",
823                               method="GET", followRedirect=True))
824         def _check_json(res):
825             data = simplejson.loads(res)
826             try:
827                 self.failUnlessEqual(data["subscription_summary"],
828                                      {"storage": 5})
829                 self.failUnlessEqual(data["announcement_summary"],
830                                      {"storage": 5, "stub_client": 5})
831                 self.failUnlessEqual(data["announcement_distinct_hosts"],
832                                      {"storage": 1, "stub_client": 1})
833             except unittest.FailTest:
834                 print
835                 print "GET %s?t=json output was:" % self.introweb_url
836                 print res
837                 raise
838         d.addCallback(_check_json)
839         return d
840
841     def _do_publish1(self, res):
842         ut = upload.Data(self.data, convergence=None)
843         c0 = self.clients[0]
844         d = c0.create_empty_dirnode()
845         def _made_root(new_dirnode):
846             self._root_directory_uri = new_dirnode.get_uri()
847             return c0.create_node_from_uri(self._root_directory_uri)
848         d.addCallback(_made_root)
849         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
850         def _made_subdir1(subdir1_node):
851             self._subdir1_node = subdir1_node
852             d1 = subdir1_node.add_file(u"mydata567", ut)
853             d1.addCallback(self.log, "publish finished")
854             def _stash_uri(filenode):
855                 self.uri = filenode.get_uri()
856                 assert isinstance(self.uri, str), (self.uri, filenode)
857             d1.addCallback(_stash_uri)
858             return d1
859         d.addCallback(_made_subdir1)
860         return d
861
862     def _do_publish2(self, res):
863         ut = upload.Data(self.data, convergence=None)
864         d = self._subdir1_node.create_empty_directory(u"subdir2")
865         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
866         return d
867
868     def log(self, res, *args, **kwargs):
869         # print "MSG: %s  RES: %s" % (msg, args)
870         log.msg(*args, **kwargs)
871         return res
872
873     def _do_publish_private(self, res):
874         self.smalldata = "sssh, very secret stuff"
875         ut = upload.Data(self.smalldata, convergence=None)
876         d = self.clients[0].create_empty_dirnode()
877         d.addCallback(self.log, "GOT private directory")
878         def _got_new_dir(privnode):
879             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
880             d1 = privnode.create_empty_directory(u"personal")
881             d1.addCallback(self.log, "made P/personal")
882             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
883             d1.addCallback(self.log, "made P/personal/sekrit data")
884             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
885             def _got_s2(s2node):
886                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
887                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
888                 return d2
889             d1.addCallback(_got_s2)
890             d1.addCallback(lambda res: privnode)
891             return d1
892         d.addCallback(_got_new_dir)
893         return d
894
895     def _check_publish1(self, res):
896         # this one uses the iterative API
897         c1 = self.clients[1]
898         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
899         d.addCallback(self.log, "check_publish1 got /")
900         d.addCallback(lambda root: root.get(u"subdir1"))
901         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
902         d.addCallback(lambda filenode: filenode.download_to_data())
903         d.addCallback(self.log, "get finished")
904         def _get_done(data):
905             self.failUnlessEqual(data, self.data)
906         d.addCallback(_get_done)
907         return d
908
909     def _check_publish2(self, res):
910         # this one uses the path-based API
911         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
912         d = rootnode.get_child_at_path(u"subdir1")
913         d.addCallback(lambda dirnode:
914                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
915         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
916         d.addCallback(lambda filenode: filenode.download_to_data())
917         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
918
919         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
920         def _got_filenode(filenode):
921             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
922             assert fnode == filenode
923         d.addCallback(_got_filenode)
924         return d
925
926     def _check_publish_private(self, resnode):
927         # this one uses the path-based API
928         self._private_node = resnode
929
930         d = self._private_node.get_child_at_path(u"personal")
931         def _got_personal(personal):
932             self._personal_node = personal
933             return personal
934         d.addCallback(_got_personal)
935
936         d.addCallback(lambda dirnode:
937                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
938         def get_path(path):
939             return self._private_node.get_child_at_path(path)
940
941         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
942         d.addCallback(lambda filenode: filenode.download_to_data())
943         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
944         d.addCallback(lambda res: get_path(u"s2-rw"))
945         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
946         d.addCallback(lambda res: get_path(u"s2-ro"))
947         def _got_s2ro(dirnode):
948             self.failUnless(dirnode.is_mutable(), dirnode)
949             self.failUnless(dirnode.is_readonly(), dirnode)
950             d1 = defer.succeed(None)
951             d1.addCallback(lambda res: dirnode.list())
952             d1.addCallback(self.log, "dirnode.list")
953
954             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
955
956             d1.addCallback(self.log, "doing add_file(ro)")
957             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
958             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
959
960             d1.addCallback(self.log, "doing get(ro)")
961             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
962             d1.addCallback(lambda filenode:
963                            self.failUnless(IFileNode.providedBy(filenode)))
964
965             d1.addCallback(self.log, "doing delete(ro)")
966             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
967
968             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
969
970             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
971
972             personal = self._personal_node
973             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
974
975             d1.addCallback(self.log, "doing move_child_to(ro)2")
976             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
977
978             d1.addCallback(self.log, "finished with _got_s2ro")
979             return d1
980         d.addCallback(_got_s2ro)
981         def _got_home(dummy):
982             home = self._private_node
983             personal = self._personal_node
984             d1 = defer.succeed(None)
985             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
986             d1.addCallback(lambda res:
987                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
988
989             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
990             d1.addCallback(lambda res:
991                            home.move_child_to(u"sekrit", home, u"sekrit data"))
992
993             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
994             d1.addCallback(lambda res:
995                            home.move_child_to(u"sekrit data", personal))
996
997             d1.addCallback(lambda res: home.build_manifest().when_done())
998             d1.addCallback(self.log, "manifest")
999             #  five items:
1000             # P/
1001             # P/personal/
1002             # P/personal/sekrit data
1003             # P/s2-rw  (same as P/s2-ro)
1004             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1005             d1.addCallback(lambda res:
1006                            self.failUnlessEqual(len(res["manifest"]), 5))
1007             d1.addCallback(lambda res: home.start_deep_stats().when_done())
1008             def _check_stats(stats):
1009                 expected = {"count-immutable-files": 1,
1010                             "count-mutable-files": 0,
1011                             "count-literal-files": 1,
1012                             "count-files": 2,
1013                             "count-directories": 3,
1014                             "size-immutable-files": 112,
1015                             "size-literal-files": 23,
1016                             #"size-directories": 616, # varies
1017                             #"largest-directory": 616,
1018                             "largest-directory-children": 3,
1019                             "largest-immutable-file": 112,
1020                             }
1021                 for k,v in expected.iteritems():
1022                     self.failUnlessEqual(stats[k], v,
1023                                          "stats[%s] was %s, not %s" %
1024                                          (k, stats[k], v))
1025                 self.failUnless(stats["size-directories"] > 1300,
1026                                 stats["size-directories"])
1027                 self.failUnless(stats["largest-directory"] > 800,
1028                                 stats["largest-directory"])
1029                 self.failUnlessEqual(stats["size-files-histogram"],
1030                                      [ (11, 31, 1), (101, 316, 1) ])
1031             d1.addCallback(_check_stats)
1032             return d1
1033         d.addCallback(_got_home)
1034         return d
1035
1036     def shouldFail(self, res, expected_failure, which, substring=None):
1037         if isinstance(res, Failure):
1038             res.trap(expected_failure)
1039             if substring:
1040                 self.failUnless(substring in str(res),
1041                                 "substring '%s' not in '%s'"
1042                                 % (substring, str(res)))
1043         else:
1044             self.fail("%s was supposed to raise %s, not get '%s'" %
1045                       (which, expected_failure, res))
1046
1047     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1048         assert substring is None or isinstance(substring, str)
1049         d = defer.maybeDeferred(callable, *args, **kwargs)
1050         def done(res):
1051             if isinstance(res, Failure):
1052                 res.trap(expected_failure)
1053                 if substring:
1054                     self.failUnless(substring in str(res),
1055                                     "substring '%s' not in '%s'"
1056                                     % (substring, str(res)))
1057             else:
1058                 self.fail("%s was supposed to raise %s, not get '%s'" %
1059                           (which, expected_failure, res))
1060         d.addBoth(done)
1061         return d
1062
1063     def PUT(self, urlpath, data):
1064         url = self.webish_url + urlpath
1065         return getPage(url, method="PUT", postdata=data)
1066
1067     def GET(self, urlpath, followRedirect=False):
1068         url = self.webish_url + urlpath
1069         return getPage(url, method="GET", followRedirect=followRedirect)
1070
1071     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1072         if use_helper:
1073             url = self.helper_webish_url + urlpath
1074         else:
1075             url = self.webish_url + urlpath
1076         sepbase = "boogabooga"
1077         sep = "--" + sepbase
1078         form = []
1079         form.append(sep)
1080         form.append('Content-Disposition: form-data; name="_charset"')
1081         form.append('')
1082         form.append('UTF-8')
1083         form.append(sep)
1084         for name, value in fields.iteritems():
1085             if isinstance(value, tuple):
1086                 filename, value = value
1087                 form.append('Content-Disposition: form-data; name="%s"; '
1088                             'filename="%s"' % (name, filename.encode("utf-8")))
1089             else:
1090                 form.append('Content-Disposition: form-data; name="%s"' % name)
1091             form.append('')
1092             form.append(str(value))
1093             form.append(sep)
1094         form[-1] += "--"
1095         body = "\r\n".join(form) + "\r\n"
1096         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1097                    }
1098         return getPage(url, method="POST", postdata=body,
1099                        headers=headers, followRedirect=followRedirect)
1100
1101     def _test_web(self, res):
1102         base = self.webish_url
1103         public = "uri/" + self._root_directory_uri
1104         d = getPage(base)
1105         def _got_welcome(page):
1106             # XXX This test is oversensitive to formatting
1107             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1108             self.failUnless(expected in page,
1109                             "I didn't see the right 'connected storage servers'"
1110                             " message in: %s" % page
1111                             )
1112             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1113             self.failUnless(expected in page,
1114                             "I didn't see the right 'My nodeid' message "
1115                             "in: %s" % page)
1116             self.failUnless("Helper: 0 active uploads" in page)
1117         d.addCallback(_got_welcome)
1118         d.addCallback(self.log, "done with _got_welcome")
1119
1120         # get the welcome page from the node that uses the helper too
1121         d.addCallback(lambda res: getPage(self.helper_webish_url))
1122         def _got_welcome_helper(page):
1123             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1124                             page)
1125             self.failUnless("Not running helper" in page)
1126         d.addCallback(_got_welcome_helper)
1127
1128         d.addCallback(lambda res: getPage(base + public))
1129         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1130         def _got_subdir1(page):
1131             # there ought to be an href for our file
1132             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1133             self.failUnless(">mydata567</a>" in page)
1134         d.addCallback(_got_subdir1)
1135         d.addCallback(self.log, "done with _got_subdir1")
1136         d.addCallback(lambda res:
1137                       getPage(base + public + "/subdir1/mydata567"))
1138         def _got_data(page):
1139             self.failUnlessEqual(page, self.data)
1140         d.addCallback(_got_data)
1141
1142         # download from a URI embedded in a URL
1143         d.addCallback(self.log, "_get_from_uri")
1144         def _get_from_uri(res):
1145             return getPage(base + "uri/%s?filename=%s"
1146                            % (self.uri, "mydata567"))
1147         d.addCallback(_get_from_uri)
1148         def _got_from_uri(page):
1149             self.failUnlessEqual(page, self.data)
1150         d.addCallback(_got_from_uri)
1151
1152         # download from a URI embedded in a URL, second form
1153         d.addCallback(self.log, "_get_from_uri2")
1154         def _get_from_uri2(res):
1155             return getPage(base + "uri?uri=%s" % (self.uri,))
1156         d.addCallback(_get_from_uri2)
1157         d.addCallback(_got_from_uri)
1158
1159         # download from a bogus URI, make sure we get a reasonable error
1160         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1161         def _get_from_bogus_uri(res):
1162             d1 = getPage(base + "uri/%s?filename=%s"
1163                          % (self.mangle_uri(self.uri), "mydata567"))
1164             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1165                        "410")
1166             return d1
1167         d.addCallback(_get_from_bogus_uri)
1168         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1169
1170         # upload a file with PUT
1171         d.addCallback(self.log, "about to try PUT")
1172         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1173                                            "new.txt contents"))
1174         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1175         d.addCallback(self.failUnlessEqual, "new.txt contents")
1176         # and again with something large enough to use multiple segments,
1177         # and hopefully trigger pauseProducing too
1178         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1179                                            "big" * 500000)) # 1.5MB
1180         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1181         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1182
1183         # can we replace files in place?
1184         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1185                                            "NEWER contents"))
1186         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1187         d.addCallback(self.failUnlessEqual, "NEWER contents")
1188
1189         # test unlinked POST
1190         d.addCallback(lambda res: self.POST("uri", t="upload",
1191                                             file=("new.txt", "data" * 10000)))
1192         # and again using the helper, which exercises different upload-status
1193         # display code
1194         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1195                                             file=("foo.txt", "data2" * 10000)))
1196
1197         # check that the status page exists
1198         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1199         def _got_status(res):
1200             # find an interesting upload and download to look at. LIT files
1201             # are not interesting.
1202             for ds in self.clients[0].list_all_download_statuses():
1203                 if ds.get_size() > 200:
1204                     self._down_status = ds.get_counter()
1205             for us in self.clients[0].list_all_upload_statuses():
1206                 if us.get_size() > 200:
1207                     self._up_status = us.get_counter()
1208             rs = list(self.clients[0].list_all_retrieve_statuses())[0]
1209             self._retrieve_status = rs.get_counter()
1210             ps = list(self.clients[0].list_all_publish_statuses())[0]
1211             self._publish_status = ps.get_counter()
1212             us = list(self.clients[0].list_all_mapupdate_statuses())[0]
1213             self._update_status = us.get_counter()
1214
1215             # and that there are some upload- and download- status pages
1216             return self.GET("status/up-%d" % self._up_status)
1217         d.addCallback(_got_status)
1218         def _got_up(res):
1219             return self.GET("status/down-%d" % self._down_status)
1220         d.addCallback(_got_up)
1221         def _got_down(res):
1222             return self.GET("status/mapupdate-%d" % self._update_status)
1223         d.addCallback(_got_down)
1224         def _got_update(res):
1225             return self.GET("status/publish-%d" % self._publish_status)
1226         d.addCallback(_got_update)
1227         def _got_publish(res):
1228             return self.GET("status/retrieve-%d" % self._retrieve_status)
1229         d.addCallback(_got_publish)
1230
1231         # check that the helper status page exists
1232         d.addCallback(lambda res:
1233                       self.GET("helper_status", followRedirect=True))
1234         def _got_helper_status(res):
1235             self.failUnless("Bytes Fetched:" in res)
1236             # touch a couple of files in the helper's working directory to
1237             # exercise more code paths
1238             workdir = os.path.join(self.getdir("client0"), "helper")
1239             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1240             f = open(incfile, "wb")
1241             f.write("small file")
1242             f.close()
1243             then = time.time() - 86400*3
1244             now = time.time()
1245             os.utime(incfile, (now, then))
1246             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1247             f = open(encfile, "wb")
1248             f.write("less small file")
1249             f.close()
1250             os.utime(encfile, (now, then))
1251         d.addCallback(_got_helper_status)
1252         # and that the json form exists
1253         d.addCallback(lambda res:
1254                       self.GET("helper_status?t=json", followRedirect=True))
1255         def _got_helper_status_json(res):
1256             data = simplejson.loads(res)
1257             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1258                                  1)
1259             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1260             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1261             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1262                                  10)
1263             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1264             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1265             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1266                                  15)
1267         d.addCallback(_got_helper_status_json)
1268
1269         # and check that client[3] (which uses a helper but does not run one
1270         # itself) doesn't explode when you ask for its status
1271         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1272         def _got_non_helper_status(res):
1273             self.failUnless("Upload and Download Status" in res)
1274         d.addCallback(_got_non_helper_status)
1275
1276         # or for helper status with t=json
1277         d.addCallback(lambda res:
1278                       getPage(self.helper_webish_url + "helper_status?t=json"))
1279         def _got_non_helper_status_json(res):
1280             data = simplejson.loads(res)
1281             self.failUnlessEqual(data, {})
1282         d.addCallback(_got_non_helper_status_json)
1283
1284         # see if the statistics page exists
1285         d.addCallback(lambda res: self.GET("statistics"))
1286         def _got_stats(res):
1287             self.failUnless("Node Statistics" in res)
1288             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1289         d.addCallback(_got_stats)
1290         d.addCallback(lambda res: self.GET("statistics?t=json"))
1291         def _got_stats_json(res):
1292             data = simplejson.loads(res)
1293             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1294             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1295         d.addCallback(_got_stats_json)
1296
1297         # TODO: mangle the second segment of a file, to test errors that
1298         # occur after we've already sent some good data, which uses a
1299         # different error path.
1300
1301         # TODO: download a URI with a form
1302         # TODO: create a directory by using a form
1303         # TODO: upload by using a form on the directory page
1304         #    url = base + "somedir/subdir1/freeform_post!!upload"
1305         # TODO: delete a file by using a button on the directory page
1306
1307         return d
1308
1309     def _test_runner(self, res):
1310         # exercise some of the diagnostic tools in runner.py
1311
1312         # find a share
1313         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1314             if "storage" not in dirpath:
1315                 continue
1316             if not filenames:
1317                 continue
1318             pieces = dirpath.split(os.sep)
1319             if (len(pieces) >= 4
1320                 and pieces[-4] == "storage"
1321                 and pieces[-3] == "shares"):
1322                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1323                 # are sharefiles here
1324                 filename = os.path.join(dirpath, filenames[0])
1325                 # peek at the magic to see if it is a chk share
1326                 magic = open(filename, "rb").read(4)
1327                 if magic == '\x00\x00\x00\x01':
1328                     break
1329         else:
1330             self.fail("unable to find any uri_extension files in %s"
1331                       % self.basedir)
1332         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1333
1334         out,err = StringIO(), StringIO()
1335         rc = runner.runner(["debug", "dump-share", "--offsets",
1336                             filename],
1337                            stdout=out, stderr=err)
1338         output = out.getvalue()
1339         self.failUnlessEqual(rc, 0)
1340
1341         # we only upload a single file, so we can assert some things about
1342         # its size and shares.
1343         self.failUnless(("share filename: %s" % filename) in output)
1344         self.failUnless("size: %d\n" % len(self.data) in output)
1345         self.failUnless("num_segments: 1\n" in output)
1346         # segment_size is always a multiple of needed_shares
1347         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1348         self.failUnless("total_shares: 10\n" in output)
1349         # keys which are supposed to be present
1350         for key in ("size", "num_segments", "segment_size",
1351                     "needed_shares", "total_shares",
1352                     "codec_name", "codec_params", "tail_codec_params",
1353                     #"plaintext_hash", "plaintext_root_hash",
1354                     "crypttext_hash", "crypttext_root_hash",
1355                     "share_root_hash", "UEB_hash"):
1356             self.failUnless("%s: " % key in output, key)
1357         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1358
1359         # now use its storage index to find the other shares using the
1360         # 'find-shares' tool
1361         sharedir, shnum = os.path.split(filename)
1362         storagedir, storage_index_s = os.path.split(sharedir)
1363         out,err = StringIO(), StringIO()
1364         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1365         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1366         rc = runner.runner(cmd, stdout=out, stderr=err)
1367         self.failUnlessEqual(rc, 0)
1368         out.seek(0)
1369         sharefiles = [sfn.strip() for sfn in out.readlines()]
1370         self.failUnlessEqual(len(sharefiles), 10)
1371
1372         # also exercise the 'catalog-shares' tool
1373         out,err = StringIO(), StringIO()
1374         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1375         cmd = ["debug", "catalog-shares"] + nodedirs
1376         rc = runner.runner(cmd, stdout=out, stderr=err)
1377         self.failUnlessEqual(rc, 0)
1378         out.seek(0)
1379         descriptions = [sfn.strip() for sfn in out.readlines()]
1380         self.failUnlessEqual(len(descriptions), 30)
1381         matching = [line
1382                     for line in descriptions
1383                     if line.startswith("CHK %s " % storage_index_s)]
1384         self.failUnlessEqual(len(matching), 10)
1385
1386     def _test_control(self, res):
1387         # exercise the remote-control-the-client foolscap interfaces in
1388         # allmydata.control (mostly used for performance tests)
1389         c0 = self.clients[0]
1390         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1391         control_furl = open(control_furl_file, "r").read().strip()
1392         # it doesn't really matter which Tub we use to connect to the client,
1393         # so let's just use our IntroducerNode's
1394         d = self.introducer.tub.getReference(control_furl)
1395         d.addCallback(self._test_control2, control_furl_file)
1396         return d
1397     def _test_control2(self, rref, filename):
1398         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1399         downfile = os.path.join(self.basedir, "control.downfile")
1400         d.addCallback(lambda uri:
1401                       rref.callRemote("download_from_uri_to_file",
1402                                       uri, downfile))
1403         def _check(res):
1404             self.failUnlessEqual(res, downfile)
1405             data = open(downfile, "r").read()
1406             expected_data = open(filename, "r").read()
1407             self.failUnlessEqual(data, expected_data)
1408         d.addCallback(_check)
1409         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1410         if sys.platform == "linux2":
1411             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1412         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1413         return d
1414
1415     def _test_cli(self, res):
1416         # run various CLI commands (in a thread, since they use blocking
1417         # network calls)
1418
1419         private_uri = self._private_node.get_uri()
1420         some_uri = self._root_directory_uri
1421         client0_basedir = self.getdir("client0")
1422
1423         nodeargs = [
1424             "--node-directory", client0_basedir,
1425             ]
1426         TESTDATA = "I will not write the same thing over and over.\n" * 100
1427
1428         d = defer.succeed(None)
1429
1430         # for compatibility with earlier versions, private/root_dir.cap is
1431         # supposed to be treated as an alias named "tahoe:". Start by making
1432         # sure that works, before we add other aliases.
1433
1434         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1435         f = open(root_file, "w")
1436         f.write(private_uri)
1437         f.close()
1438
1439         def run(ignored, verb, *args, **kwargs):
1440             stdin = kwargs.get("stdin", "")
1441             newargs = [verb] + nodeargs + list(args)
1442             return self._run_cli(newargs, stdin=stdin)
1443
1444         def _check_ls((out,err), expected_children, unexpected_children=[]):
1445             self.failUnlessEqual(err, "")
1446             for s in expected_children:
1447                 self.failUnless(s in out, (s,out))
1448             for s in unexpected_children:
1449                 self.failIf(s in out, (s,out))
1450
1451         def _check_ls_root((out,err)):
1452             self.failUnless("personal" in out)
1453             self.failUnless("s2-ro" in out)
1454             self.failUnless("s2-rw" in out)
1455             self.failUnlessEqual(err, "")
1456
1457         # this should reference private_uri
1458         d.addCallback(run, "ls")
1459         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1460
1461         d.addCallback(run, "list-aliases")
1462         def _check_aliases_1((out,err)):
1463             self.failUnlessEqual(err, "")
1464             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1465         d.addCallback(_check_aliases_1)
1466
1467         # now that that's out of the way, remove root_dir.cap and work with
1468         # new files
1469         d.addCallback(lambda res: os.unlink(root_file))
1470         d.addCallback(run, "list-aliases")
1471         def _check_aliases_2((out,err)):
1472             self.failUnlessEqual(err, "")
1473             self.failUnlessEqual(out, "")
1474         d.addCallback(_check_aliases_2)
1475
1476         d.addCallback(run, "mkdir")
1477         def _got_dir( (out,err) ):
1478             self.failUnless(uri.from_string_dirnode(out.strip()))
1479             return out.strip()
1480         d.addCallback(_got_dir)
1481         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1482
1483         d.addCallback(run, "list-aliases")
1484         def _check_aliases_3((out,err)):
1485             self.failUnlessEqual(err, "")
1486             self.failUnless("tahoe: " in out)
1487         d.addCallback(_check_aliases_3)
1488
1489         def _check_empty_dir((out,err)):
1490             self.failUnlessEqual(out, "")
1491             self.failUnlessEqual(err, "")
1492         d.addCallback(run, "ls")
1493         d.addCallback(_check_empty_dir)
1494
1495         def _check_missing_dir((out,err)):
1496             # TODO: check that rc==2
1497             self.failUnlessEqual(out, "")
1498             self.failUnlessEqual(err, "No such file or directory\n")
1499         d.addCallback(run, "ls", "bogus")
1500         d.addCallback(_check_missing_dir)
1501
1502         files = []
1503         datas = []
1504         for i in range(10):
1505             fn = os.path.join(self.basedir, "file%d" % i)
1506             files.append(fn)
1507             data = "data to be uploaded: file%d\n" % i
1508             datas.append(data)
1509             open(fn,"wb").write(data)
1510
1511         def _check_stdout_against((out,err), filenum=None, data=None):
1512             self.failUnlessEqual(err, "")
1513             if filenum is not None:
1514                 self.failUnlessEqual(out, datas[filenum])
1515             if data is not None:
1516                 self.failUnlessEqual(out, data)
1517
1518         # test all both forms of put: from a file, and from stdin
1519         #  tahoe put bar FOO
1520         d.addCallback(run, "put", files[0], "tahoe-file0")
1521         def _put_out((out,err)):
1522             self.failUnless("URI:LIT:" in out, out)
1523             self.failUnless("201 Created" in err, err)
1524             uri0 = out.strip()
1525             return run(None, "get", uri0)
1526         d.addCallback(_put_out)
1527         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1528
1529         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1530         #  tahoe put bar tahoe:FOO
1531         d.addCallback(run, "put", files[2], "tahoe:file2")
1532         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1533         def _check_put_mutable((out,err)):
1534             self._mutable_file3_uri = out.strip()
1535         d.addCallback(_check_put_mutable)
1536         d.addCallback(run, "get", "tahoe:file3")
1537         d.addCallback(_check_stdout_against, 3)
1538
1539         #  tahoe put FOO
1540         STDIN_DATA = "This is the file to upload from stdin."
1541         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1542         #  tahoe put tahoe:FOO
1543         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1544                       stdin="Other file from stdin.")
1545
1546         d.addCallback(run, "ls")
1547         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1548                                   "tahoe-file-stdin", "from-stdin"])
1549         d.addCallback(run, "ls", "subdir")
1550         d.addCallback(_check_ls, ["tahoe-file1"])
1551
1552         # tahoe mkdir FOO
1553         d.addCallback(run, "mkdir", "subdir2")
1554         d.addCallback(run, "ls")
1555         # TODO: extract the URI, set an alias with it
1556         d.addCallback(_check_ls, ["subdir2"])
1557
1558         # tahoe get: (to stdin and to a file)
1559         d.addCallback(run, "get", "tahoe-file0")
1560         d.addCallback(_check_stdout_against, 0)
1561         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1562         d.addCallback(_check_stdout_against, 1)
1563         outfile0 = os.path.join(self.basedir, "outfile0")
1564         d.addCallback(run, "get", "file2", outfile0)
1565         def _check_outfile0((out,err)):
1566             data = open(outfile0,"rb").read()
1567             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1568         d.addCallback(_check_outfile0)
1569         outfile1 = os.path.join(self.basedir, "outfile0")
1570         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1571         def _check_outfile1((out,err)):
1572             data = open(outfile1,"rb").read()
1573             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1574         d.addCallback(_check_outfile1)
1575
1576         d.addCallback(run, "rm", "tahoe-file0")
1577         d.addCallback(run, "rm", "tahoe:file2")
1578         d.addCallback(run, "ls")
1579         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1580
1581         d.addCallback(run, "ls", "-l")
1582         def _check_ls_l((out,err)):
1583             lines = out.split("\n")
1584             for l in lines:
1585                 if "tahoe-file-stdin" in l:
1586                     self.failUnless(l.startswith("-r-- "), l)
1587                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1588                 if "file3" in l:
1589                     self.failUnless(l.startswith("-rw- "), l) # mutable
1590         d.addCallback(_check_ls_l)
1591
1592         d.addCallback(run, "ls", "--uri")
1593         def _check_ls_uri((out,err)):
1594             lines = out.split("\n")
1595             for l in lines:
1596                 if "file3" in l:
1597                     self.failUnless(self._mutable_file3_uri in l)
1598         d.addCallback(_check_ls_uri)
1599
1600         d.addCallback(run, "ls", "--readonly-uri")
1601         def _check_ls_rouri((out,err)):
1602             lines = out.split("\n")
1603             for l in lines:
1604                 if "file3" in l:
1605                     rw_uri = self._mutable_file3_uri
1606                     u = uri.from_string_mutable_filenode(rw_uri)
1607                     ro_uri = u.get_readonly().to_string()
1608                     self.failUnless(ro_uri in l)
1609         d.addCallback(_check_ls_rouri)
1610
1611
1612         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1613         d.addCallback(run, "ls")
1614         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1615
1616         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1617         d.addCallback(run, "ls")
1618         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1619
1620         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1621         d.addCallback(run, "ls")
1622         d.addCallback(_check_ls, ["file3", "file3-copy"])
1623         d.addCallback(run, "get", "tahoe:file3-copy")
1624         d.addCallback(_check_stdout_against, 3)
1625
1626         # copy from disk into tahoe
1627         d.addCallback(run, "cp", files[4], "tahoe:file4")
1628         d.addCallback(run, "ls")
1629         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1630         d.addCallback(run, "get", "tahoe:file4")
1631         d.addCallback(_check_stdout_against, 4)
1632
1633         # copy from tahoe into disk
1634         target_filename = os.path.join(self.basedir, "file-out")
1635         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1636         def _check_cp_out((out,err)):
1637             self.failUnless(os.path.exists(target_filename))
1638             got = open(target_filename,"rb").read()
1639             self.failUnlessEqual(got, datas[4])
1640         d.addCallback(_check_cp_out)
1641
1642         # copy from disk to disk (silly case)
1643         target2_filename = os.path.join(self.basedir, "file-out-copy")
1644         d.addCallback(run, "cp", target_filename, target2_filename)
1645         def _check_cp_out2((out,err)):
1646             self.failUnless(os.path.exists(target2_filename))
1647             got = open(target2_filename,"rb").read()
1648             self.failUnlessEqual(got, datas[4])
1649         d.addCallback(_check_cp_out2)
1650
1651         # copy from tahoe into disk, overwriting an existing file
1652         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1653         def _check_cp_out3((out,err)):
1654             self.failUnless(os.path.exists(target_filename))
1655             got = open(target_filename,"rb").read()
1656             self.failUnlessEqual(got, datas[3])
1657         d.addCallback(_check_cp_out3)
1658
1659         # copy from disk into tahoe, overwriting an existing immutable file
1660         d.addCallback(run, "cp", files[5], "tahoe:file4")
1661         d.addCallback(run, "ls")
1662         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1663         d.addCallback(run, "get", "tahoe:file4")
1664         d.addCallback(_check_stdout_against, 5)
1665
1666         # copy from disk into tahoe, overwriting an existing mutable file
1667         d.addCallback(run, "cp", files[5], "tahoe:file3")
1668         d.addCallback(run, "ls")
1669         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1670         d.addCallback(run, "get", "tahoe:file3")
1671         d.addCallback(_check_stdout_against, 5)
1672
1673         # recursive copy: setup
1674         dn = os.path.join(self.basedir, "dir1")
1675         os.makedirs(dn)
1676         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1677         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1678         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1679         sdn2 = os.path.join(dn, "subdir2")
1680         os.makedirs(sdn2)
1681         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1682         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1683
1684         # from disk into tahoe
1685         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1686         d.addCallback(run, "ls")
1687         d.addCallback(_check_ls, ["dir1"])
1688         d.addCallback(run, "ls", "dir1")
1689         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1690                       ["rfile4", "rfile5"])
1691         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1692         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1693                       ["rfile1", "rfile2", "rfile3"])
1694         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1695         d.addCallback(_check_stdout_against, data="rfile4")
1696
1697         # and back out again
1698         dn_copy = os.path.join(self.basedir, "dir1-copy")
1699         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1700         def _check_cp_r_out((out,err)):
1701             def _cmp(name):
1702                 old = open(os.path.join(dn, name), "rb").read()
1703                 newfn = os.path.join(dn_copy, name)
1704                 self.failUnless(os.path.exists(newfn))
1705                 new = open(newfn, "rb").read()
1706                 self.failUnlessEqual(old, new)
1707             _cmp("rfile1")
1708             _cmp("rfile2")
1709             _cmp("rfile3")
1710             _cmp(os.path.join("subdir2", "rfile4"))
1711             _cmp(os.path.join("subdir2", "rfile5"))
1712         d.addCallback(_check_cp_r_out)
1713
1714         # and copy it a second time, which ought to overwrite the same files
1715         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1716
1717         # and again, only writing filecaps
1718         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1719         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1720         def _check_capsonly((out,err)):
1721             # these should all be LITs
1722             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1723             y = uri.from_string_filenode(x)
1724             self.failUnlessEqual(y.data, "rfile4")
1725         d.addCallback(_check_capsonly)
1726
1727         # and tahoe-to-tahoe
1728         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1729         d.addCallback(run, "ls")
1730         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1731         d.addCallback(run, "ls", "dir1-copy")
1732         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1733                       ["rfile4", "rfile5"])
1734         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1735         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1736                       ["rfile1", "rfile2", "rfile3"])
1737         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1738         d.addCallback(_check_stdout_against, data="rfile4")
1739
1740         # and copy it a second time, which ought to overwrite the same files
1741         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1742
1743         # tahoe_ls doesn't currently handle the error correctly: it tries to
1744         # JSON-parse a traceback.
1745 ##         def _ls_missing(res):
1746 ##             argv = ["ls"] + nodeargs + ["bogus"]
1747 ##             return self._run_cli(argv)
1748 ##         d.addCallback(_ls_missing)
1749 ##         def _check_ls_missing((out,err)):
1750 ##             print "OUT", out
1751 ##             print "ERR", err
1752 ##             self.failUnlessEqual(err, "")
1753 ##         d.addCallback(_check_ls_missing)
1754
1755         return d
1756
1757     def _run_cli(self, argv, stdin=""):
1758         #print "CLI:", argv
1759         stdout, stderr = StringIO(), StringIO()
1760         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1761                                   stdin=StringIO(stdin),
1762                                   stdout=stdout, stderr=stderr)
1763         def _done(res):
1764             return stdout.getvalue(), stderr.getvalue()
1765         d.addCallback(_done)
1766         return d
1767
1768     def _test_checker(self, res):
1769         ut = upload.Data("too big to be literal" * 200, convergence=None)
1770         d = self._personal_node.add_file(u"big file", ut)
1771
1772         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1773         def _check_dirnode_results(r):
1774             self.failUnless(r.is_healthy())
1775         d.addCallback(_check_dirnode_results)
1776         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1777         d.addCallback(_check_dirnode_results)
1778
1779         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1780         def _got_chk_filenode(n):
1781             self.failUnless(isinstance(n, filenode.FileNode))
1782             d = n.check(Monitor())
1783             def _check_filenode_results(r):
1784                 self.failUnless(r.is_healthy())
1785             d.addCallback(_check_filenode_results)
1786             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1787             d.addCallback(_check_filenode_results)
1788             return d
1789         d.addCallback(_got_chk_filenode)
1790
1791         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1792         def _got_lit_filenode(n):
1793             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1794             d = n.check(Monitor())
1795             def _check_lit_filenode_results(r):
1796                 self.failUnlessEqual(r, None)
1797             d.addCallback(_check_lit_filenode_results)
1798             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1799             d.addCallback(_check_lit_filenode_results)
1800             return d
1801         d.addCallback(_got_lit_filenode)
1802         return d
1803