]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
test_system.py: clean up control flow, reduce use of stall()
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, sys, time, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7
8 import allmydata
9 from allmydata import uri
10 from allmydata.storage.mutable import MutableShareFile
11 from allmydata.storage.server import si_a2b
12 from allmydata.immutable import offloaded, upload
13 from allmydata.immutable.literal import LiteralFileNode
14 from allmydata.immutable.filenode import ImmutableFileNode
15 from allmydata.util import idlib, mathutil
16 from allmydata.util import log, base32
17 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding
18 from allmydata.util.fileutil import abspath_expanduser_unicode
19 from allmydata.util.consumer import MemoryConsumer, download_to_data
20 from allmydata.scripts import runner
21 from allmydata.interfaces import IDirectoryNode, IFileNode, \
22      NoSuchChildError, NoSharesError
23 from allmydata.monitor import Monitor
24 from allmydata.mutable.common import NotWriteableError
25 from allmydata.mutable import layout as mutable_layout
26 from allmydata.mutable.publish import MutableData
27 from foolscap.api import DeadReferenceError, fireEventually
28 from twisted.python.failure import Failure
29 from twisted.web.client import getPage
30 from twisted.web.error import Error
31
32 from allmydata.test.common import SystemTestMixin
33
34 # TODO: move this to common or common_util
35 from allmydata.test.test_runner import RunBinTahoeMixin
36
37 LARGE_DATA = """
38 This is some data to publish to the remote grid.., which needs to be large
39 enough to not fit inside a LIT uri.
40 """
41
42 class CountingDataUploadable(upload.Data):
43     bytes_read = 0
44     interrupt_after = None
45     interrupt_after_d = None
46
47     def read(self, length):
48         self.bytes_read += length
49         if self.interrupt_after is not None:
50             if self.bytes_read > self.interrupt_after:
51                 self.interrupt_after = None
52                 self.interrupt_after_d.callback(self)
53         return upload.Data.read(self, length)
54
55 class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
56     timeout = 3600 # It takes longer than 960 seconds on Zandr's ARM box.
57
58     def test_connections(self):
59         self.basedir = "system/SystemTest/test_connections"
60         d = self.set_up_nodes()
61         self.extra_node = None
62         d.addCallback(lambda res: self.add_extra_node(self.numclients))
63         def _check(extra_node):
64             self.extra_node = extra_node
65             for c in self.clients:
66                 all_peerids = c.get_storage_broker().get_all_serverids()
67                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
68                 sb = c.storage_broker
69                 permuted_peers = sb.get_servers_for_psi("a")
70                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
71
72         d.addCallback(_check)
73         def _shutdown_extra_node(res):
74             if self.extra_node:
75                 return self.extra_node.stopService()
76             return res
77         d.addBoth(_shutdown_extra_node)
78         return d
79     # test_connections is subsumed by test_upload_and_download, and takes
80     # quite a while to run on a slow machine (because of all the TLS
81     # connections that must be established). If we ever rework the introducer
82     # code to such an extent that we're not sure if it works anymore, we can
83     # reinstate this test until it does.
84     del test_connections
85
86     def test_upload_and_download_random_key(self):
87         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
88         return self._test_upload_and_download(convergence=None)
89
90     def test_upload_and_download_convergent(self):
91         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
92         return self._test_upload_and_download(convergence="some convergence string")
93
94     def _test_upload_and_download(self, convergence):
95         # we use 4000 bytes of data, which will result in about 400k written
96         # to disk among all our simulated nodes
97         DATA = "Some data to upload\n" * 200
98         d = self.set_up_nodes()
99         def _check_connections(res):
100             for c in self.clients:
101                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
102                 all_peerids = c.get_storage_broker().get_all_serverids()
103                 self.failUnlessEqual(len(all_peerids), self.numclients)
104                 sb = c.storage_broker
105                 permuted_peers = sb.get_servers_for_psi("a")
106                 self.failUnlessEqual(len(permuted_peers), self.numclients)
107         d.addCallback(_check_connections)
108
109         def _do_upload(res):
110             log.msg("UPLOADING")
111             u = self.clients[0].getServiceNamed("uploader")
112             self.uploader = u
113             # we crank the max segsize down to 1024b for the duration of this
114             # test, so we can exercise multiple segments. It is important
115             # that this is not a multiple of the segment size, so that the
116             # tail segment is not the same length as the others. This actualy
117             # gets rounded up to 1025 to be a multiple of the number of
118             # required shares (since we use 25 out of 100 FEC).
119             up = upload.Data(DATA, convergence=convergence)
120             up.max_segment_size = 1024
121             d1 = u.upload(up)
122             return d1
123         d.addCallback(_do_upload)
124         def _upload_done(results):
125             theuri = results.get_uri()
126             log.msg("upload finished: uri is %s" % (theuri,))
127             self.uri = theuri
128             assert isinstance(self.uri, str), self.uri
129             self.cap = uri.from_string(self.uri)
130             self.n = self.clients[1].create_node_from_uri(self.uri)
131         d.addCallback(_upload_done)
132
133         def _upload_again(res):
134             # Upload again. If using convergent encryption then this ought to be
135             # short-circuited, however with the way we currently generate URIs
136             # (i.e. because they include the roothash), we have to do all of the
137             # encoding work, and only get to save on the upload part.
138             log.msg("UPLOADING AGAIN")
139             up = upload.Data(DATA, convergence=convergence)
140             up.max_segment_size = 1024
141             return self.uploader.upload(up)
142         d.addCallback(_upload_again)
143
144         def _download_to_data(res):
145             log.msg("DOWNLOADING")
146             return download_to_data(self.n)
147         d.addCallback(_download_to_data)
148         def _download_to_data_done(data):
149             log.msg("download finished")
150             self.failUnlessEqual(data, DATA)
151         d.addCallback(_download_to_data_done)
152
153         def _test_read(res):
154             n = self.clients[1].create_node_from_uri(self.uri)
155             d = download_to_data(n)
156             def _read_done(data):
157                 self.failUnlessEqual(data, DATA)
158             d.addCallback(_read_done)
159             d.addCallback(lambda ign:
160                           n.read(MemoryConsumer(), offset=1, size=4))
161             def _read_portion_done(mc):
162                 self.failUnlessEqual("".join(mc.chunks), DATA[1:1+4])
163             d.addCallback(_read_portion_done)
164             d.addCallback(lambda ign:
165                           n.read(MemoryConsumer(), offset=2, size=None))
166             def _read_tail_done(mc):
167                 self.failUnlessEqual("".join(mc.chunks), DATA[2:])
168             d.addCallback(_read_tail_done)
169             d.addCallback(lambda ign:
170                           n.read(MemoryConsumer(), size=len(DATA)+1000))
171             def _read_too_much(mc):
172                 self.failUnlessEqual("".join(mc.chunks), DATA)
173             d.addCallback(_read_too_much)
174
175             return d
176         d.addCallback(_test_read)
177
178         def _test_bad_read(res):
179             bad_u = uri.from_string_filenode(self.uri)
180             bad_u.key = self.flip_bit(bad_u.key)
181             bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
182             # this should cause an error during download
183
184             d = self.shouldFail2(NoSharesError, "'download bad node'",
185                                  None,
186                                  bad_n.read, MemoryConsumer(), offset=2)
187             return d
188         d.addCallback(_test_bad_read)
189
190         def _download_nonexistent_uri(res):
191             baduri = self.mangle_uri(self.uri)
192             badnode = self.clients[1].create_node_from_uri(baduri)
193             log.msg("about to download non-existent URI", level=log.UNUSUAL,
194                     facility="tahoe.tests")
195             d1 = download_to_data(badnode)
196             def _baduri_should_fail(res):
197                 log.msg("finished downloading non-existent URI",
198                         level=log.UNUSUAL, facility="tahoe.tests")
199                 self.failUnless(isinstance(res, Failure))
200                 self.failUnless(res.check(NoSharesError),
201                                 "expected NoSharesError, got %s" % res)
202             d1.addBoth(_baduri_should_fail)
203             return d1
204         d.addCallback(_download_nonexistent_uri)
205
206         # add a new node, which doesn't accept shares, and only uses the
207         # helper for upload.
208         d.addCallback(lambda res: self.add_extra_node(self.numclients,
209                                                       self.helper_furl,
210                                                       add_to_sparent=True))
211         def _added(extra_node):
212             self.extra_node = extra_node
213             self.extra_node.DEFAULT_ENCODING_PARAMETERS['happy'] = 5
214         d.addCallback(_added)
215
216         HELPER_DATA = "Data that needs help to upload" * 1000
217         def _upload_with_helper(res):
218             u = upload.Data(HELPER_DATA, convergence=convergence)
219             d = self.extra_node.upload(u)
220             def _uploaded(results):
221                 n = self.clients[1].create_node_from_uri(results.get_uri())
222                 return download_to_data(n)
223             d.addCallback(_uploaded)
224             def _check(newdata):
225                 self.failUnlessEqual(newdata, HELPER_DATA)
226             d.addCallback(_check)
227             return d
228         d.addCallback(_upload_with_helper)
229
230         def _upload_duplicate_with_helper(res):
231             u = upload.Data(HELPER_DATA, convergence=convergence)
232             u.debug_stash_RemoteEncryptedUploadable = True
233             d = self.extra_node.upload(u)
234             def _uploaded(results):
235                 n = self.clients[1].create_node_from_uri(results.get_uri())
236                 return download_to_data(n)
237             d.addCallback(_uploaded)
238             def _check(newdata):
239                 self.failUnlessEqual(newdata, HELPER_DATA)
240                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
241                             "uploadable started uploading, should have been avoided")
242             d.addCallback(_check)
243             return d
244         if convergence is not None:
245             d.addCallback(_upload_duplicate_with_helper)
246
247         d.addCallback(fireEventually)
248
249         def _upload_resumable(res):
250             DATA = "Data that needs help to upload and gets interrupted" * 1000
251             u1 = CountingDataUploadable(DATA, convergence=convergence)
252             u2 = CountingDataUploadable(DATA, convergence=convergence)
253
254             # we interrupt the connection after about 5kB by shutting down
255             # the helper, then restarting it.
256             u1.interrupt_after = 5000
257             u1.interrupt_after_d = defer.Deferred()
258             bounced_d = defer.Deferred()
259             def _do_bounce(res):
260                 d = self.bounce_client(0)
261                 d.addBoth(bounced_d.callback)
262             u1.interrupt_after_d.addCallback(_do_bounce)
263
264             # sneak into the helper and reduce its chunk size, so that our
265             # debug_interrupt will sever the connection on about the fifth
266             # chunk fetched. This makes sure that we've started to write the
267             # new shares before we abandon them, which exercises the
268             # abort/delete-partial-share code. TODO: find a cleaner way to do
269             # this. I know that this will affect later uses of the helper in
270             # this same test run, but I'm not currently worried about it.
271             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
272
273             upload_d = self.extra_node.upload(u1)
274             # The upload will start, and bounce_client() will be called after
275             # about 5kB. bounced_d will fire after bounce_client() finishes
276             # shutting down and restarting the node.
277             d = bounced_d
278             def _bounced(ign):
279                 # By this point, the upload should have failed because of the
280                 # interruption. upload_d will fire in a moment
281                 def _should_not_finish(res):
282                     self.fail("interrupted upload should have failed, not"
283                               " finished with result %s" % (res,))
284                 def _interrupted(f):
285                     f.trap(DeadReferenceError)
286                     # make sure we actually interrupted it before finishing
287                     # the file
288                     self.failUnless(u1.bytes_read < len(DATA),
289                                     "read %d out of %d total" %
290                                     (u1.bytes_read, len(DATA)))
291                 upload_d.addCallbacks(_should_not_finish, _interrupted)
292                 return upload_d
293             d.addCallback(_bounced)
294
295             def _disconnected(res):
296                 # check to make sure the storage servers aren't still hanging
297                 # on to the partial share: their incoming/ directories should
298                 # now be empty.
299                 log.msg("disconnected", level=log.NOISY,
300                         facility="tahoe.test.test_system")
301                 for i in range(self.numclients):
302                     incdir = os.path.join(self.getdir("client%d" % i),
303                                           "storage", "shares", "incoming")
304                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
305             d.addCallback(_disconnected)
306
307             # then we need to give the reconnector a chance to
308             # reestablish the connection to the helper.
309             d.addCallback(lambda res:
310                           log.msg("wait_for_connections", level=log.NOISY,
311                                   facility="tahoe.test.test_system"))
312
313             d.addCallback(lambda res:
314                           log.msg("uploading again", level=log.NOISY,
315                                   facility="tahoe.test.test_system"))
316             d.addCallback(lambda res: self.extra_node.upload(u2))
317
318             def _uploaded(results):
319                 cap = results.get_uri()
320                 log.msg("Second upload complete", level=log.NOISY,
321                         facility="tahoe.test.test_system")
322
323                 # this is really bytes received rather than sent, but it's
324                 # convenient and basically measures the same thing
325                 bytes_sent = results.get_ciphertext_fetched()
326                 self.failUnless(isinstance(bytes_sent, (int, long)), bytes_sent)
327
328                 # We currently don't support resumption of upload if the data is
329                 # encrypted with a random key.  (Because that would require us
330                 # to store the key locally and re-use it on the next upload of
331                 # this file, which isn't a bad thing to do, but we currently
332                 # don't do it.)
333                 if convergence is not None:
334                     # Make sure we did not have to read the whole file the
335                     # second time around .
336                     self.failUnless(bytes_sent < len(DATA),
337                                     "resumption didn't save us any work:"
338                                     " read %r bytes out of %r total" %
339                                     (bytes_sent, len(DATA)))
340                 else:
341                     # Make sure we did have to read the whole file the second
342                     # time around -- because the one that we partially uploaded
343                     # earlier was encrypted with a different random key.
344                     self.failIf(bytes_sent < len(DATA),
345                                 "resumption saved us some work even though we were using random keys:"
346                                 " read %r bytes out of %r total" %
347                                 (bytes_sent, len(DATA)))
348                 n = self.clients[1].create_node_from_uri(cap)
349                 return download_to_data(n)
350             d.addCallback(_uploaded)
351
352             def _check(newdata):
353                 self.failUnlessEqual(newdata, DATA)
354                 # If using convergent encryption, then also check that the
355                 # helper has removed the temp file from its directories.
356                 if convergence is not None:
357                     basedir = os.path.join(self.getdir("client0"), "helper")
358                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
359                     self.failUnlessEqual(files, [])
360                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
361                     self.failUnlessEqual(files, [])
362             d.addCallback(_check)
363             return d
364         d.addCallback(_upload_resumable)
365
366         def _grab_stats(ignored):
367             # the StatsProvider doesn't normally publish a FURL:
368             # instead it passes a live reference to the StatsGatherer
369             # (if and when it connects). To exercise the remote stats
370             # interface, we manually publish client0's StatsProvider
371             # and use client1 to query it.
372             sp = self.clients[0].stats_provider
373             sp_furl = self.clients[0].tub.registerReference(sp)
374             d = self.clients[1].tub.getReference(sp_furl)
375             d.addCallback(lambda sp_rref: sp_rref.callRemote("get_stats"))
376             def _got_stats(stats):
377                 #print "STATS"
378                 #from pprint import pprint
379                 #pprint(stats)
380                 s = stats["stats"]
381                 self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
382                 c = stats["counters"]
383                 self.failUnless("storage_server.allocate" in c)
384             d.addCallback(_got_stats)
385             return d
386         d.addCallback(_grab_stats)
387
388         return d
389
390     def _find_all_shares(self, basedir):
391         shares = []
392         for (dirpath, dirnames, filenames) in os.walk(basedir):
393             if "storage" not in dirpath:
394                 continue
395             if not filenames:
396                 continue
397             pieces = dirpath.split(os.sep)
398             if (len(pieces) >= 5
399                 and pieces[-4] == "storage"
400                 and pieces[-3] == "shares"):
401                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
402                 # are sharefiles here
403                 assert pieces[-5].startswith("client")
404                 client_num = int(pieces[-5][-1])
405                 storage_index_s = pieces[-1]
406                 storage_index = si_a2b(storage_index_s)
407                 for sharename in filenames:
408                     shnum = int(sharename)
409                     filename = os.path.join(dirpath, sharename)
410                     data = (client_num, storage_index, filename, shnum)
411                     shares.append(data)
412         if not shares:
413             self.fail("unable to find any share files in %s" % basedir)
414         return shares
415
416     def _corrupt_mutable_share(self, filename, which):
417         msf = MutableShareFile(filename)
418         datav = msf.readv([ (0, 1000000) ])
419         final_share = datav[0]
420         assert len(final_share) < 1000000 # ought to be truncated
421         pieces = mutable_layout.unpack_share(final_share)
422         (seqnum, root_hash, IV, k, N, segsize, datalen,
423          verification_key, signature, share_hash_chain, block_hash_tree,
424          share_data, enc_privkey) = pieces
425
426         if which == "seqnum":
427             seqnum = seqnum + 15
428         elif which == "R":
429             root_hash = self.flip_bit(root_hash)
430         elif which == "IV":
431             IV = self.flip_bit(IV)
432         elif which == "segsize":
433             segsize = segsize + 15
434         elif which == "pubkey":
435             verification_key = self.flip_bit(verification_key)
436         elif which == "signature":
437             signature = self.flip_bit(signature)
438         elif which == "share_hash_chain":
439             nodenum = share_hash_chain.keys()[0]
440             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
441         elif which == "block_hash_tree":
442             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
443         elif which == "share_data":
444             share_data = self.flip_bit(share_data)
445         elif which == "encprivkey":
446             enc_privkey = self.flip_bit(enc_privkey)
447
448         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
449                                             segsize, datalen)
450         final_share = mutable_layout.pack_share(prefix,
451                                                 verification_key,
452                                                 signature,
453                                                 share_hash_chain,
454                                                 block_hash_tree,
455                                                 share_data,
456                                                 enc_privkey)
457         msf.writev( [(0, final_share)], None)
458
459
460     def test_mutable(self):
461         self.basedir = "system/SystemTest/test_mutable"
462         DATA = "initial contents go here."  # 25 bytes % 3 != 0
463         DATA_uploadable = MutableData(DATA)
464         NEWDATA = "new contents yay"
465         NEWDATA_uploadable = MutableData(NEWDATA)
466         NEWERDATA = "this is getting old"
467         NEWERDATA_uploadable = MutableData(NEWERDATA)
468
469         d = self.set_up_nodes(use_key_generator=True)
470
471         def _create_mutable(res):
472             c = self.clients[0]
473             log.msg("starting create_mutable_file")
474             d1 = c.create_mutable_file(DATA_uploadable)
475             def _done(res):
476                 log.msg("DONE: %s" % (res,))
477                 self._mutable_node_1 = res
478             d1.addCallback(_done)
479             return d1
480         d.addCallback(_create_mutable)
481
482         def _test_debug(res):
483             # find a share. It is important to run this while there is only
484             # one slot in the grid.
485             shares = self._find_all_shares(self.basedir)
486             (client_num, storage_index, filename, shnum) = shares[0]
487             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
488                     % filename)
489             log.msg(" for clients[%d]" % client_num)
490
491             out,err = StringIO(), StringIO()
492             rc = runner.runner(["debug", "dump-share", "--offsets",
493                                 filename],
494                                stdout=out, stderr=err)
495             output = out.getvalue()
496             self.failUnlessEqual(rc, 0)
497             try:
498                 self.failUnless("Mutable slot found:\n" in output)
499                 self.failUnless("share_type: SDMF\n" in output)
500                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
501                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
502                 self.failUnless(" num_extra_leases: 0\n" in output)
503                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
504                                 in output)
505                 self.failUnless(" SDMF contents:\n" in output)
506                 self.failUnless("  seqnum: 1\n" in output)
507                 self.failUnless("  required_shares: 3\n" in output)
508                 self.failUnless("  total_shares: 10\n" in output)
509                 self.failUnless("  segsize: 27\n" in output, (output, filename))
510                 self.failUnless("  datalen: 25\n" in output)
511                 # the exact share_hash_chain nodes depends upon the sharenum,
512                 # and is more of a hassle to compute than I want to deal with
513                 # now
514                 self.failUnless("  share_hash_chain: " in output)
515                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
516                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
517                             base32.b2a(storage_index))
518                 self.failUnless(expected in output)
519             except unittest.FailTest:
520                 print
521                 print "dump-share output was:"
522                 print output
523                 raise
524         d.addCallback(_test_debug)
525
526         # test retrieval
527
528         # first, let's see if we can use the existing node to retrieve the
529         # contents. This allows it to use the cached pubkey and maybe the
530         # latest-known sharemap.
531
532         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
533         def _check_download_1(res):
534             self.failUnlessEqual(res, DATA)
535             # now we see if we can retrieve the data from a new node,
536             # constructed using the URI of the original one. We do this test
537             # on the same client that uploaded the data.
538             uri = self._mutable_node_1.get_uri()
539             log.msg("starting retrieve1")
540             newnode = self.clients[0].create_node_from_uri(uri)
541             newnode_2 = self.clients[0].create_node_from_uri(uri)
542             self.failUnlessIdentical(newnode, newnode_2)
543             return newnode.download_best_version()
544         d.addCallback(_check_download_1)
545
546         def _check_download_2(res):
547             self.failUnlessEqual(res, DATA)
548             # same thing, but with a different client
549             uri = self._mutable_node_1.get_uri()
550             newnode = self.clients[1].create_node_from_uri(uri)
551             log.msg("starting retrieve2")
552             d1 = newnode.download_best_version()
553             d1.addCallback(lambda res: (res, newnode))
554             return d1
555         d.addCallback(_check_download_2)
556
557         def _check_download_3((res, newnode)):
558             self.failUnlessEqual(res, DATA)
559             # replace the data
560             log.msg("starting replace1")
561             d1 = newnode.overwrite(NEWDATA_uploadable)
562             d1.addCallback(lambda res: newnode.download_best_version())
563             return d1
564         d.addCallback(_check_download_3)
565
566         def _check_download_4(res):
567             self.failUnlessEqual(res, NEWDATA)
568             # now create an even newer node and replace the data on it. This
569             # new node has never been used for download before.
570             uri = self._mutable_node_1.get_uri()
571             newnode1 = self.clients[2].create_node_from_uri(uri)
572             newnode2 = self.clients[3].create_node_from_uri(uri)
573             self._newnode3 = self.clients[3].create_node_from_uri(uri)
574             log.msg("starting replace2")
575             d1 = newnode1.overwrite(NEWERDATA_uploadable)
576             d1.addCallback(lambda res: newnode2.download_best_version())
577             return d1
578         d.addCallback(_check_download_4)
579
580         def _check_download_5(res):
581             log.msg("finished replace2")
582             self.failUnlessEqual(res, NEWERDATA)
583         d.addCallback(_check_download_5)
584
585         def _corrupt_shares(res):
586             # run around and flip bits in all but k of the shares, to test
587             # the hash checks
588             shares = self._find_all_shares(self.basedir)
589             ## sort by share number
590             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
591             where = dict([ (shnum, filename)
592                            for (client_num, storage_index, filename, shnum)
593                            in shares ])
594             assert len(where) == 10 # this test is designed for 3-of-10
595             for shnum, filename in where.items():
596                 # shares 7,8,9 are left alone. read will check
597                 # (share_hash_chain, block_hash_tree, share_data). New
598                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
599                 # segsize, signature).
600                 if shnum == 0:
601                     # read: this will trigger "pubkey doesn't match
602                     # fingerprint".
603                     self._corrupt_mutable_share(filename, "pubkey")
604                     self._corrupt_mutable_share(filename, "encprivkey")
605                 elif shnum == 1:
606                     # triggers "signature is invalid"
607                     self._corrupt_mutable_share(filename, "seqnum")
608                 elif shnum == 2:
609                     # triggers "signature is invalid"
610                     self._corrupt_mutable_share(filename, "R")
611                 elif shnum == 3:
612                     # triggers "signature is invalid"
613                     self._corrupt_mutable_share(filename, "segsize")
614                 elif shnum == 4:
615                     self._corrupt_mutable_share(filename, "share_hash_chain")
616                 elif shnum == 5:
617                     self._corrupt_mutable_share(filename, "block_hash_tree")
618                 elif shnum == 6:
619                     self._corrupt_mutable_share(filename, "share_data")
620                 # other things to correct: IV, signature
621                 # 7,8,9 are left alone
622
623                 # note that initial_query_count=5 means that we'll hit the
624                 # first 5 servers in effectively random order (based upon
625                 # response time), so we won't necessarily ever get a "pubkey
626                 # doesn't match fingerprint" error (if we hit shnum>=1 before
627                 # shnum=0, we pull the pubkey from there). To get repeatable
628                 # specific failures, we need to set initial_query_count=1,
629                 # but of course that will change the sequencing behavior of
630                 # the retrieval process. TODO: find a reasonable way to make
631                 # this a parameter, probably when we expand this test to test
632                 # for one failure mode at a time.
633
634                 # when we retrieve this, we should get three signature
635                 # failures (where we've mangled seqnum, R, and segsize). The
636                 # pubkey mangling
637         d.addCallback(_corrupt_shares)
638
639         d.addCallback(lambda res: self._newnode3.download_best_version())
640         d.addCallback(_check_download_5)
641
642         def _check_empty_file(res):
643             # make sure we can create empty files, this usually screws up the
644             # segsize math
645             d1 = self.clients[2].create_mutable_file(MutableData(""))
646             d1.addCallback(lambda newnode: newnode.download_best_version())
647             d1.addCallback(lambda res: self.failUnlessEqual("", res))
648             return d1
649         d.addCallback(_check_empty_file)
650
651         d.addCallback(lambda res: self.clients[0].create_dirnode())
652         def _created_dirnode(dnode):
653             log.msg("_created_dirnode(%s)" % (dnode,))
654             d1 = dnode.list()
655             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
656             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
657             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
658             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
659             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
660             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
661             d1.addCallback(lambda res: dnode.build_manifest().when_done())
662             d1.addCallback(lambda res:
663                            self.failUnlessEqual(len(res["manifest"]), 1))
664             return d1
665         d.addCallback(_created_dirnode)
666
667         def wait_for_c3_kg_conn():
668             return self.clients[3]._key_generator is not None
669         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
670
671         def check_kg_poolsize(junk, size_delta):
672             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
673                                  self.key_generator_svc.key_generator.pool_size + size_delta)
674
675         d.addCallback(check_kg_poolsize, 0)
676         d.addCallback(lambda junk:
677             self.clients[3].create_mutable_file(MutableData('hello, world')))
678         d.addCallback(check_kg_poolsize, -1)
679         d.addCallback(lambda junk: self.clients[3].create_dirnode())
680         d.addCallback(check_kg_poolsize, -2)
681         # use_helper induces use of clients[3], which is the using-key_gen client
682         d.addCallback(lambda junk:
683                       self.POST("uri?t=mkdir&name=george", use_helper=True))
684         d.addCallback(check_kg_poolsize, -3)
685
686         return d
687
688     def flip_bit(self, good):
689         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
690
691     def mangle_uri(self, gooduri):
692         # change the key, which changes the storage index, which means we'll
693         # be asking about the wrong file, so nobody will have any shares
694         u = uri.from_string(gooduri)
695         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
696                             uri_extension_hash=u.uri_extension_hash,
697                             needed_shares=u.needed_shares,
698                             total_shares=u.total_shares,
699                             size=u.size)
700         return u2.to_string()
701
702     # TODO: add a test which mangles the uri_extension_hash instead, and
703     # should fail due to not being able to get a valid uri_extension block.
704     # Also a test which sneakily mangles the uri_extension block to change
705     # some of the validation data, so it will fail in the post-download phase
706     # when the file's crypttext integrity check fails. Do the same thing for
707     # the key, which should cause the download to fail the post-download
708     # plaintext_hash check.
709
710     def test_filesystem(self):
711         self.basedir = "system/SystemTest/test_filesystem"
712         self.data = LARGE_DATA
713         d = self.set_up_nodes(use_stats_gatherer=True)
714         def _new_happy_semantics(ign):
715             for c in self.clients:
716                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
717         d.addCallback(_new_happy_semantics)
718         d.addCallback(self._test_introweb)
719         d.addCallback(self.log, "starting publish")
720         d.addCallback(self._do_publish1)
721         d.addCallback(self._test_runner)
722         d.addCallback(self._do_publish2)
723         # at this point, we have the following filesystem (where "R" denotes
724         # self._root_directory_uri):
725         # R
726         # R/subdir1
727         # R/subdir1/mydata567
728         # R/subdir1/subdir2/
729         # R/subdir1/subdir2/mydata992
730
731         d.addCallback(lambda res: self.bounce_client(0))
732         d.addCallback(self.log, "bounced client0")
733
734         d.addCallback(self._check_publish1)
735         d.addCallback(self.log, "did _check_publish1")
736         d.addCallback(self._check_publish2)
737         d.addCallback(self.log, "did _check_publish2")
738         d.addCallback(self._do_publish_private)
739         d.addCallback(self.log, "did _do_publish_private")
740         # now we also have (where "P" denotes a new dir):
741         #  P/personal/sekrit data
742         #  P/s2-rw -> /subdir1/subdir2/
743         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
744         d.addCallback(self._check_publish_private)
745         d.addCallback(self.log, "did _check_publish_private")
746         d.addCallback(self._test_web)
747         d.addCallback(self._test_control)
748         d.addCallback(self._test_cli)
749         # P now has four top-level children:
750         # P/personal/sekrit data
751         # P/s2-ro/
752         # P/s2-rw/
753         # P/test_put/  (empty)
754         d.addCallback(self._test_checker)
755         return d
756
757     def _test_introweb(self, res):
758         d = getPage(self.introweb_url, method="GET", followRedirect=True)
759         def _check(res):
760             try:
761                 self.failUnless("%s: %s" % (allmydata.__appname__, allmydata.__version__) in res)
762                 verstr = str(allmydata.__version__)
763
764                 # The Python "rational version numbering" convention
765                 # disallows "-r$REV" but allows ".post$REV"
766                 # instead. Eventually we'll probably move to
767                 # that. When we do, this test won't go red:
768                 ix = verstr.rfind('-r')
769                 if ix != -1:
770                     altverstr = verstr[:ix] + '.post' + verstr[ix+2:]
771                 else:
772                     ix = verstr.rfind('.post')
773                     if ix != -1:
774                         altverstr = verstr[:ix] + '-r' + verstr[ix+5:]
775                     else:
776                         altverstr = verstr
777
778                 appverstr = "%s: %s" % (allmydata.__appname__, verstr)
779                 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
780
781                 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
782                 self.failUnless("Announcement Summary: storage: 5" in res)
783                 self.failUnless("Subscription Summary: storage: 5" in res)
784                 self.failUnless("tahoe.css" in res)
785             except unittest.FailTest:
786                 print
787                 print "GET %s output was:" % self.introweb_url
788                 print res
789                 raise
790         d.addCallback(_check)
791         # make sure it serves the CSS too
792         d.addCallback(lambda res:
793                       getPage(self.introweb_url+"tahoe.css", method="GET"))
794         d.addCallback(lambda res:
795                       getPage(self.introweb_url + "?t=json",
796                               method="GET", followRedirect=True))
797         def _check_json(res):
798             data = simplejson.loads(res)
799             try:
800                 self.failUnlessEqual(data["subscription_summary"],
801                                      {"storage": 5})
802                 self.failUnlessEqual(data["announcement_summary"],
803                                      {"storage": 5})
804                 self.failUnlessEqual(data["announcement_distinct_hosts"],
805                                      {"storage": 1})
806             except unittest.FailTest:
807                 print
808                 print "GET %s?t=json output was:" % self.introweb_url
809                 print res
810                 raise
811         d.addCallback(_check_json)
812         return d
813
814     def _do_publish1(self, res):
815         ut = upload.Data(self.data, convergence=None)
816         c0 = self.clients[0]
817         d = c0.create_dirnode()
818         def _made_root(new_dirnode):
819             self._root_directory_uri = new_dirnode.get_uri()
820             return c0.create_node_from_uri(self._root_directory_uri)
821         d.addCallback(_made_root)
822         d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
823         def _made_subdir1(subdir1_node):
824             self._subdir1_node = subdir1_node
825             d1 = subdir1_node.add_file(u"mydata567", ut)
826             d1.addCallback(self.log, "publish finished")
827             def _stash_uri(filenode):
828                 self.uri = filenode.get_uri()
829                 assert isinstance(self.uri, str), (self.uri, filenode)
830             d1.addCallback(_stash_uri)
831             return d1
832         d.addCallback(_made_subdir1)
833         return d
834
835     def _do_publish2(self, res):
836         ut = upload.Data(self.data, convergence=None)
837         d = self._subdir1_node.create_subdirectory(u"subdir2")
838         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
839         return d
840
841     def log(self, res, *args, **kwargs):
842         # print "MSG: %s  RES: %s" % (msg, args)
843         log.msg(*args, **kwargs)
844         return res
845
846     def _do_publish_private(self, res):
847         self.smalldata = "sssh, very secret stuff"
848         ut = upload.Data(self.smalldata, convergence=None)
849         d = self.clients[0].create_dirnode()
850         d.addCallback(self.log, "GOT private directory")
851         def _got_new_dir(privnode):
852             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
853             d1 = privnode.create_subdirectory(u"personal")
854             d1.addCallback(self.log, "made P/personal")
855             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
856             d1.addCallback(self.log, "made P/personal/sekrit data")
857             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
858             def _got_s2(s2node):
859                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
860                                       s2node.get_readonly_uri())
861                 d2.addCallback(lambda node:
862                                privnode.set_uri(u"s2-ro",
863                                                 s2node.get_readonly_uri(),
864                                                 s2node.get_readonly_uri()))
865                 return d2
866             d1.addCallback(_got_s2)
867             d1.addCallback(lambda res: privnode)
868             return d1
869         d.addCallback(_got_new_dir)
870         return d
871
872     def _check_publish1(self, res):
873         # this one uses the iterative API
874         c1 = self.clients[1]
875         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
876         d.addCallback(self.log, "check_publish1 got /")
877         d.addCallback(lambda root: root.get(u"subdir1"))
878         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
879         d.addCallback(lambda filenode: download_to_data(filenode))
880         d.addCallback(self.log, "get finished")
881         def _get_done(data):
882             self.failUnlessEqual(data, self.data)
883         d.addCallback(_get_done)
884         return d
885
886     def _check_publish2(self, res):
887         # this one uses the path-based API
888         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
889         d = rootnode.get_child_at_path(u"subdir1")
890         d.addCallback(lambda dirnode:
891                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
892         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
893         d.addCallback(lambda filenode: download_to_data(filenode))
894         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
895
896         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
897         def _got_filenode(filenode):
898             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
899             assert fnode == filenode
900         d.addCallback(_got_filenode)
901         return d
902
903     def _check_publish_private(self, resnode):
904         # this one uses the path-based API
905         self._private_node = resnode
906
907         d = self._private_node.get_child_at_path(u"personal")
908         def _got_personal(personal):
909             self._personal_node = personal
910             return personal
911         d.addCallback(_got_personal)
912
913         d.addCallback(lambda dirnode:
914                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
915         def get_path(path):
916             return self._private_node.get_child_at_path(path)
917
918         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
919         d.addCallback(lambda filenode: download_to_data(filenode))
920         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
921         d.addCallback(lambda res: get_path(u"s2-rw"))
922         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
923         d.addCallback(lambda res: get_path(u"s2-ro"))
924         def _got_s2ro(dirnode):
925             self.failUnless(dirnode.is_mutable(), dirnode)
926             self.failUnless(dirnode.is_readonly(), dirnode)
927             d1 = defer.succeed(None)
928             d1.addCallback(lambda res: dirnode.list())
929             d1.addCallback(self.log, "dirnode.list")
930
931             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
932
933             d1.addCallback(self.log, "doing add_file(ro)")
934             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
935             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
936
937             d1.addCallback(self.log, "doing get(ro)")
938             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
939             d1.addCallback(lambda filenode:
940                            self.failUnless(IFileNode.providedBy(filenode)))
941
942             d1.addCallback(self.log, "doing delete(ro)")
943             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
944
945             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
946
947             d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
948
949             personal = self._personal_node
950             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
951
952             d1.addCallback(self.log, "doing move_child_to(ro)2")
953             d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
954
955             d1.addCallback(self.log, "finished with _got_s2ro")
956             return d1
957         d.addCallback(_got_s2ro)
958         def _got_home(dummy):
959             home = self._private_node
960             personal = self._personal_node
961             d1 = defer.succeed(None)
962             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
963             d1.addCallback(lambda res:
964                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
965
966             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
967             d1.addCallback(lambda res:
968                            home.move_child_to(u"sekrit", home, u"sekrit data"))
969
970             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
971             d1.addCallback(lambda res:
972                            home.move_child_to(u"sekrit data", personal))
973
974             d1.addCallback(lambda res: home.build_manifest().when_done())
975             d1.addCallback(self.log, "manifest")
976             #  five items:
977             # P/
978             # P/personal/
979             # P/personal/sekrit data
980             # P/s2-rw  (same as P/s2-ro)
981             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
982             d1.addCallback(lambda res:
983                            self.failUnlessEqual(len(res["manifest"]), 5))
984             d1.addCallback(lambda res: home.start_deep_stats().when_done())
985             def _check_stats(stats):
986                 expected = {"count-immutable-files": 1,
987                             "count-mutable-files": 0,
988                             "count-literal-files": 1,
989                             "count-files": 2,
990                             "count-directories": 3,
991                             "size-immutable-files": 112,
992                             "size-literal-files": 23,
993                             #"size-directories": 616, # varies
994                             #"largest-directory": 616,
995                             "largest-directory-children": 3,
996                             "largest-immutable-file": 112,
997                             }
998                 for k,v in expected.iteritems():
999                     self.failUnlessEqual(stats[k], v,
1000                                          "stats[%s] was %s, not %s" %
1001                                          (k, stats[k], v))
1002                 self.failUnless(stats["size-directories"] > 1300,
1003                                 stats["size-directories"])
1004                 self.failUnless(stats["largest-directory"] > 800,
1005                                 stats["largest-directory"])
1006                 self.failUnlessEqual(stats["size-files-histogram"],
1007                                      [ (11, 31, 1), (101, 316, 1) ])
1008             d1.addCallback(_check_stats)
1009             return d1
1010         d.addCallback(_got_home)
1011         return d
1012
1013     def shouldFail(self, res, expected_failure, which, substring=None):
1014         if isinstance(res, Failure):
1015             res.trap(expected_failure)
1016             if substring:
1017                 self.failUnless(substring in str(res),
1018                                 "substring '%s' not in '%s'"
1019                                 % (substring, str(res)))
1020         else:
1021             self.fail("%s was supposed to raise %s, not get '%s'" %
1022                       (which, expected_failure, res))
1023
1024     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1025         assert substring is None or isinstance(substring, str)
1026         d = defer.maybeDeferred(callable, *args, **kwargs)
1027         def done(res):
1028             if isinstance(res, Failure):
1029                 res.trap(expected_failure)
1030                 if substring:
1031                     self.failUnless(substring in str(res),
1032                                     "substring '%s' not in '%s'"
1033                                     % (substring, str(res)))
1034             else:
1035                 self.fail("%s was supposed to raise %s, not get '%s'" %
1036                           (which, expected_failure, res))
1037         d.addBoth(done)
1038         return d
1039
1040     def PUT(self, urlpath, data):
1041         url = self.webish_url + urlpath
1042         return getPage(url, method="PUT", postdata=data)
1043
1044     def GET(self, urlpath, followRedirect=False):
1045         url = self.webish_url + urlpath
1046         return getPage(url, method="GET", followRedirect=followRedirect)
1047
1048     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1049         sepbase = "boogabooga"
1050         sep = "--" + sepbase
1051         form = []
1052         form.append(sep)
1053         form.append('Content-Disposition: form-data; name="_charset"')
1054         form.append('')
1055         form.append('UTF-8')
1056         form.append(sep)
1057         for name, value in fields.iteritems():
1058             if isinstance(value, tuple):
1059                 filename, value = value
1060                 form.append('Content-Disposition: form-data; name="%s"; '
1061                             'filename="%s"' % (name, filename.encode("utf-8")))
1062             else:
1063                 form.append('Content-Disposition: form-data; name="%s"' % name)
1064             form.append('')
1065             form.append(str(value))
1066             form.append(sep)
1067         form[-1] += "--"
1068         body = ""
1069         headers = {}
1070         if fields:
1071             body = "\r\n".join(form) + "\r\n"
1072             headers["content-type"] = "multipart/form-data; boundary=%s" % sepbase
1073         return self.POST2(urlpath, body, headers, followRedirect, use_helper)
1074
1075     def POST2(self, urlpath, body="", headers={}, followRedirect=False,
1076               use_helper=False):
1077         if use_helper:
1078             url = self.helper_webish_url + urlpath
1079         else:
1080             url = self.webish_url + urlpath
1081         return getPage(url, method="POST", postdata=body, headers=headers,
1082                        followRedirect=followRedirect)
1083
1084     def _test_web(self, res):
1085         base = self.webish_url
1086         public = "uri/" + self._root_directory_uri
1087         d = getPage(base)
1088         def _got_welcome(page):
1089             # XXX This test is oversensitive to formatting
1090             expected = "Connected to <span>%d</span>\n     of <span>%d</span> known storage servers:" % (self.numclients, self.numclients)
1091             self.failUnless(expected in page,
1092                             "I didn't see the right 'connected storage servers'"
1093                             " message in: %s" % page
1094                             )
1095             expected = "<th>My nodeid:</th> <td class=\"nodeid mine data-chars\">%s</td>" % (b32encode(self.clients[0].nodeid).lower(),)
1096             self.failUnless(expected in page,
1097                             "I didn't see the right 'My nodeid' message "
1098                             "in: %s" % page)
1099             self.failUnless("Helper: 0 active uploads" in page)
1100         d.addCallback(_got_welcome)
1101         d.addCallback(self.log, "done with _got_welcome")
1102
1103         # get the welcome page from the node that uses the helper too
1104         d.addCallback(lambda res: getPage(self.helper_webish_url))
1105         def _got_welcome_helper(page):
1106             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1107                             page)
1108             self.failUnless("Not running helper" in page)
1109         d.addCallback(_got_welcome_helper)
1110
1111         d.addCallback(lambda res: getPage(base + public))
1112         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1113         def _got_subdir1(page):
1114             # there ought to be an href for our file
1115             self.failUnlessIn('<td align="right">%d</td>' % len(self.data), page)
1116             self.failUnless(">mydata567</a>" in page)
1117         d.addCallback(_got_subdir1)
1118         d.addCallback(self.log, "done with _got_subdir1")
1119         d.addCallback(lambda res:
1120                       getPage(base + public + "/subdir1/mydata567"))
1121         def _got_data(page):
1122             self.failUnlessEqual(page, self.data)
1123         d.addCallback(_got_data)
1124
1125         # download from a URI embedded in a URL
1126         d.addCallback(self.log, "_get_from_uri")
1127         def _get_from_uri(res):
1128             return getPage(base + "uri/%s?filename=%s"
1129                            % (self.uri, "mydata567"))
1130         d.addCallback(_get_from_uri)
1131         def _got_from_uri(page):
1132             self.failUnlessEqual(page, self.data)
1133         d.addCallback(_got_from_uri)
1134
1135         # download from a URI embedded in a URL, second form
1136         d.addCallback(self.log, "_get_from_uri2")
1137         def _get_from_uri2(res):
1138             return getPage(base + "uri?uri=%s" % (self.uri,))
1139         d.addCallback(_get_from_uri2)
1140         d.addCallback(_got_from_uri)
1141
1142         # download from a bogus URI, make sure we get a reasonable error
1143         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1144         def _get_from_bogus_uri(res):
1145             d1 = getPage(base + "uri/%s?filename=%s"
1146                          % (self.mangle_uri(self.uri), "mydata567"))
1147             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1148                        "410")
1149             return d1
1150         d.addCallback(_get_from_bogus_uri)
1151         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1152
1153         # upload a file with PUT
1154         d.addCallback(self.log, "about to try PUT")
1155         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1156                                            "new.txt contents"))
1157         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1158         d.addCallback(self.failUnlessEqual, "new.txt contents")
1159         # and again with something large enough to use multiple segments,
1160         # and hopefully trigger pauseProducing too
1161         def _new_happy_semantics(ign):
1162             for c in self.clients:
1163                 # these get reset somewhere? Whatever.
1164                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1165         d.addCallback(_new_happy_semantics)
1166         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1167                                            "big" * 500000)) # 1.5MB
1168         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1169         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1170
1171         # can we replace files in place?
1172         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1173                                            "NEWER contents"))
1174         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1175         d.addCallback(self.failUnlessEqual, "NEWER contents")
1176
1177         # test unlinked POST
1178         d.addCallback(lambda res: self.POST("uri", t="upload",
1179                                             file=("new.txt", "data" * 10000)))
1180         # and again using the helper, which exercises different upload-status
1181         # display code
1182         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1183                                             file=("foo.txt", "data2" * 10000)))
1184
1185         # check that the status page exists
1186         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1187         def _got_status(res):
1188             # find an interesting upload and download to look at. LIT files
1189             # are not interesting.
1190             h = self.clients[0].get_history()
1191             for ds in h.list_all_download_statuses():
1192                 if ds.get_size() > 200:
1193                     self._down_status = ds.get_counter()
1194             for us in h.list_all_upload_statuses():
1195                 if us.get_size() > 200:
1196                     self._up_status = us.get_counter()
1197             rs = list(h.list_all_retrieve_statuses())[0]
1198             self._retrieve_status = rs.get_counter()
1199             ps = list(h.list_all_publish_statuses())[0]
1200             self._publish_status = ps.get_counter()
1201             us = list(h.list_all_mapupdate_statuses())[0]
1202             self._update_status = us.get_counter()
1203
1204             # and that there are some upload- and download- status pages
1205             return self.GET("status/up-%d" % self._up_status)
1206         d.addCallback(_got_status)
1207         def _got_up(res):
1208             return self.GET("status/down-%d" % self._down_status)
1209         d.addCallback(_got_up)
1210         def _got_down(res):
1211             return self.GET("status/mapupdate-%d" % self._update_status)
1212         d.addCallback(_got_down)
1213         def _got_update(res):
1214             return self.GET("status/publish-%d" % self._publish_status)
1215         d.addCallback(_got_update)
1216         def _got_publish(res):
1217             self.failUnlessIn("Publish Results", res)
1218             return self.GET("status/retrieve-%d" % self._retrieve_status)
1219         d.addCallback(_got_publish)
1220         def _got_retrieve(res):
1221             self.failUnlessIn("Retrieve Results", res)
1222         d.addCallback(_got_retrieve)
1223
1224         # check that the helper status page exists
1225         d.addCallback(lambda res:
1226                       self.GET("helper_status", followRedirect=True))
1227         def _got_helper_status(res):
1228             self.failUnless("Bytes Fetched:" in res)
1229             # touch a couple of files in the helper's working directory to
1230             # exercise more code paths
1231             workdir = os.path.join(self.getdir("client0"), "helper")
1232             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1233             f = open(incfile, "wb")
1234             f.write("small file")
1235             f.close()
1236             then = time.time() - 86400*3
1237             now = time.time()
1238             os.utime(incfile, (now, then))
1239             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1240             f = open(encfile, "wb")
1241             f.write("less small file")
1242             f.close()
1243             os.utime(encfile, (now, then))
1244         d.addCallback(_got_helper_status)
1245         # and that the json form exists
1246         d.addCallback(lambda res:
1247                       self.GET("helper_status?t=json", followRedirect=True))
1248         def _got_helper_status_json(res):
1249             data = simplejson.loads(res)
1250             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1251                                  1)
1252             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1253             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1254             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1255                                  10)
1256             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1257             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1258             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1259                                  15)
1260         d.addCallback(_got_helper_status_json)
1261
1262         # and check that client[3] (which uses a helper but does not run one
1263         # itself) doesn't explode when you ask for its status
1264         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1265         def _got_non_helper_status(res):
1266             self.failUnless("Upload and Download Status" in res)
1267         d.addCallback(_got_non_helper_status)
1268
1269         # or for helper status with t=json
1270         d.addCallback(lambda res:
1271                       getPage(self.helper_webish_url + "helper_status?t=json"))
1272         def _got_non_helper_status_json(res):
1273             data = simplejson.loads(res)
1274             self.failUnlessEqual(data, {})
1275         d.addCallback(_got_non_helper_status_json)
1276
1277         # see if the statistics page exists
1278         d.addCallback(lambda res: self.GET("statistics"))
1279         def _got_stats(res):
1280             self.failUnless("Node Statistics" in res)
1281             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1282         d.addCallback(_got_stats)
1283         d.addCallback(lambda res: self.GET("statistics?t=json"))
1284         def _got_stats_json(res):
1285             data = simplejson.loads(res)
1286             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1287             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1288         d.addCallback(_got_stats_json)
1289
1290         # TODO: mangle the second segment of a file, to test errors that
1291         # occur after we've already sent some good data, which uses a
1292         # different error path.
1293
1294         # TODO: download a URI with a form
1295         # TODO: create a directory by using a form
1296         # TODO: upload by using a form on the directory page
1297         #    url = base + "somedir/subdir1/freeform_post!!upload"
1298         # TODO: delete a file by using a button on the directory page
1299
1300         return d
1301
1302     def _test_runner(self, res):
1303         # exercise some of the diagnostic tools in runner.py
1304
1305         # find a share
1306         for (dirpath, dirnames, filenames) in os.walk(unicode(self.basedir)):
1307             if "storage" not in dirpath:
1308                 continue
1309             if not filenames:
1310                 continue
1311             pieces = dirpath.split(os.sep)
1312             if (len(pieces) >= 4
1313                 and pieces[-4] == "storage"
1314                 and pieces[-3] == "shares"):
1315                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1316                 # are sharefiles here
1317                 filename = os.path.join(dirpath, filenames[0])
1318                 # peek at the magic to see if it is a chk share
1319                 magic = open(filename, "rb").read(4)
1320                 if magic == '\x00\x00\x00\x01':
1321                     break
1322         else:
1323             self.fail("unable to find any uri_extension files in %r"
1324                       % self.basedir)
1325         log.msg("test_system.SystemTest._test_runner using %r" % filename)
1326
1327         out,err = StringIO(), StringIO()
1328         rc = runner.runner(["debug", "dump-share", "--offsets",
1329                             unicode_to_argv(filename)],
1330                            stdout=out, stderr=err)
1331         output = out.getvalue()
1332         self.failUnlessEqual(rc, 0)
1333
1334         # we only upload a single file, so we can assert some things about
1335         # its size and shares.
1336         self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1337         self.failUnlessIn("size: %d\n" % len(self.data), output)
1338         self.failUnlessIn("num_segments: 1\n", output)
1339         # segment_size is always a multiple of needed_shares
1340         self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1341         self.failUnlessIn("total_shares: 10\n", output)
1342         # keys which are supposed to be present
1343         for key in ("size", "num_segments", "segment_size",
1344                     "needed_shares", "total_shares",
1345                     "codec_name", "codec_params", "tail_codec_params",
1346                     #"plaintext_hash", "plaintext_root_hash",
1347                     "crypttext_hash", "crypttext_root_hash",
1348                     "share_root_hash", "UEB_hash"):
1349             self.failUnlessIn("%s: " % key, output)
1350         self.failUnlessIn("  verify-cap: URI:CHK-Verifier:", output)
1351
1352         # now use its storage index to find the other shares using the
1353         # 'find-shares' tool
1354         sharedir, shnum = os.path.split(filename)
1355         storagedir, storage_index_s = os.path.split(sharedir)
1356         storage_index_s = str(storage_index_s)
1357         out,err = StringIO(), StringIO()
1358         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1359         cmd = ["debug", "find-shares", storage_index_s] + nodedirs
1360         rc = runner.runner(cmd, stdout=out, stderr=err)
1361         self.failUnlessEqual(rc, 0)
1362         out.seek(0)
1363         sharefiles = [sfn.strip() for sfn in out.readlines()]
1364         self.failUnlessEqual(len(sharefiles), 10)
1365
1366         # also exercise the 'catalog-shares' tool
1367         out,err = StringIO(), StringIO()
1368         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1369         cmd = ["debug", "catalog-shares"] + nodedirs
1370         rc = runner.runner(cmd, stdout=out, stderr=err)
1371         self.failUnlessEqual(rc, 0)
1372         out.seek(0)
1373         descriptions = [sfn.strip() for sfn in out.readlines()]
1374         self.failUnlessEqual(len(descriptions), 30)
1375         matching = [line
1376                     for line in descriptions
1377                     if line.startswith("CHK %s " % storage_index_s)]
1378         self.failUnlessEqual(len(matching), 10)
1379
1380     def _test_control(self, res):
1381         # exercise the remote-control-the-client foolscap interfaces in
1382         # allmydata.control (mostly used for performance tests)
1383         c0 = self.clients[0]
1384         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1385         control_furl = open(control_furl_file, "r").read().strip()
1386         # it doesn't really matter which Tub we use to connect to the client,
1387         # so let's just use our IntroducerNode's
1388         d = self.introducer.tub.getReference(control_furl)
1389         d.addCallback(self._test_control2, control_furl_file)
1390         return d
1391     def _test_control2(self, rref, filename):
1392         d = rref.callRemote("upload_from_file_to_uri",
1393                             filename.encode(get_filesystem_encoding()), convergence=None)
1394         downfile = os.path.join(self.basedir, "control.downfile").encode(get_filesystem_encoding())
1395         d.addCallback(lambda uri:
1396                       rref.callRemote("download_from_uri_to_file",
1397                                       uri, downfile))
1398         def _check(res):
1399             self.failUnlessEqual(res, downfile)
1400             data = open(downfile, "r").read()
1401             expected_data = open(filename, "r").read()
1402             self.failUnlessEqual(data, expected_data)
1403         d.addCallback(_check)
1404         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1405         if sys.platform in ("linux2", "linux3"):
1406             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1407         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1408         return d
1409
1410     def _test_cli(self, res):
1411         # run various CLI commands (in a thread, since they use blocking
1412         # network calls)
1413
1414         private_uri = self._private_node.get_uri()
1415         client0_basedir = self.getdir("client0")
1416
1417         nodeargs = [
1418             "--node-directory", client0_basedir,
1419             ]
1420
1421         d = defer.succeed(None)
1422
1423         # for compatibility with earlier versions, private/root_dir.cap is
1424         # supposed to be treated as an alias named "tahoe:". Start by making
1425         # sure that works, before we add other aliases.
1426
1427         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1428         f = open(root_file, "w")
1429         f.write(private_uri)
1430         f.close()
1431
1432         def run(ignored, verb, *args, **kwargs):
1433             stdin = kwargs.get("stdin", "")
1434             newargs = [verb] + nodeargs + list(args)
1435             return self._run_cli(newargs, stdin=stdin)
1436
1437         def _check_ls((out,err), expected_children, unexpected_children=[]):
1438             self.failUnlessEqual(err, "")
1439             for s in expected_children:
1440                 self.failUnless(s in out, (s,out))
1441             for s in unexpected_children:
1442                 self.failIf(s in out, (s,out))
1443
1444         def _check_ls_root((out,err)):
1445             self.failUnless("personal" in out)
1446             self.failUnless("s2-ro" in out)
1447             self.failUnless("s2-rw" in out)
1448             self.failUnlessEqual(err, "")
1449
1450         # this should reference private_uri
1451         d.addCallback(run, "ls")
1452         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1453
1454         d.addCallback(run, "list-aliases")
1455         def _check_aliases_1((out,err)):
1456             self.failUnlessEqual(err, "")
1457             self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % private_uri)
1458         d.addCallback(_check_aliases_1)
1459
1460         # now that that's out of the way, remove root_dir.cap and work with
1461         # new files
1462         d.addCallback(lambda res: os.unlink(root_file))
1463         d.addCallback(run, "list-aliases")
1464         def _check_aliases_2((out,err)):
1465             self.failUnlessEqual(err, "")
1466             self.failUnlessEqual(out, "")
1467         d.addCallback(_check_aliases_2)
1468
1469         d.addCallback(run, "mkdir")
1470         def _got_dir( (out,err) ):
1471             self.failUnless(uri.from_string_dirnode(out.strip()))
1472             return out.strip()
1473         d.addCallback(_got_dir)
1474         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1475
1476         d.addCallback(run, "list-aliases")
1477         def _check_aliases_3((out,err)):
1478             self.failUnlessEqual(err, "")
1479             self.failUnless("tahoe: " in out)
1480         d.addCallback(_check_aliases_3)
1481
1482         def _check_empty_dir((out,err)):
1483             self.failUnlessEqual(out, "")
1484             self.failUnlessEqual(err, "")
1485         d.addCallback(run, "ls")
1486         d.addCallback(_check_empty_dir)
1487
1488         def _check_missing_dir((out,err)):
1489             # TODO: check that rc==2
1490             self.failUnlessEqual(out, "")
1491             self.failUnlessEqual(err, "No such file or directory\n")
1492         d.addCallback(run, "ls", "bogus")
1493         d.addCallback(_check_missing_dir)
1494
1495         files = []
1496         datas = []
1497         for i in range(10):
1498             fn = os.path.join(self.basedir, "file%d" % i)
1499             files.append(fn)
1500             data = "data to be uploaded: file%d\n" % i
1501             datas.append(data)
1502             open(fn,"wb").write(data)
1503
1504         def _check_stdout_against((out,err), filenum=None, data=None):
1505             self.failUnlessEqual(err, "")
1506             if filenum is not None:
1507                 self.failUnlessEqual(out, datas[filenum])
1508             if data is not None:
1509                 self.failUnlessEqual(out, data)
1510
1511         # test all both forms of put: from a file, and from stdin
1512         #  tahoe put bar FOO
1513         d.addCallback(run, "put", files[0], "tahoe-file0")
1514         def _put_out((out,err)):
1515             self.failUnless("URI:LIT:" in out, out)
1516             self.failUnless("201 Created" in err, err)
1517             uri0 = out.strip()
1518             return run(None, "get", uri0)
1519         d.addCallback(_put_out)
1520         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1521
1522         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1523         #  tahoe put bar tahoe:FOO
1524         d.addCallback(run, "put", files[2], "tahoe:file2")
1525         d.addCallback(run, "put", "--format=SDMF", files[3], "tahoe:file3")
1526         def _check_put_mutable((out,err)):
1527             self._mutable_file3_uri = out.strip()
1528         d.addCallback(_check_put_mutable)
1529         d.addCallback(run, "get", "tahoe:file3")
1530         d.addCallback(_check_stdout_against, 3)
1531
1532         #  tahoe put FOO
1533         STDIN_DATA = "This is the file to upload from stdin."
1534         d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1535         #  tahoe put tahoe:FOO
1536         d.addCallback(run, "put", "-", "tahoe:from-stdin",
1537                       stdin="Other file from stdin.")
1538
1539         d.addCallback(run, "ls")
1540         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1541                                   "tahoe-file-stdin", "from-stdin"])
1542         d.addCallback(run, "ls", "subdir")
1543         d.addCallback(_check_ls, ["tahoe-file1"])
1544
1545         # tahoe mkdir FOO
1546         d.addCallback(run, "mkdir", "subdir2")
1547         d.addCallback(run, "ls")
1548         # TODO: extract the URI, set an alias with it
1549         d.addCallback(_check_ls, ["subdir2"])
1550
1551         # tahoe get: (to stdin and to a file)
1552         d.addCallback(run, "get", "tahoe-file0")
1553         d.addCallback(_check_stdout_against, 0)
1554         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1555         d.addCallback(_check_stdout_against, 1)
1556         outfile0 = os.path.join(self.basedir, "outfile0")
1557         d.addCallback(run, "get", "file2", outfile0)
1558         def _check_outfile0((out,err)):
1559             data = open(outfile0,"rb").read()
1560             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1561         d.addCallback(_check_outfile0)
1562         outfile1 = os.path.join(self.basedir, "outfile0")
1563         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1564         def _check_outfile1((out,err)):
1565             data = open(outfile1,"rb").read()
1566             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1567         d.addCallback(_check_outfile1)
1568
1569         d.addCallback(run, "rm", "tahoe-file0")
1570         d.addCallback(run, "rm", "tahoe:file2")
1571         d.addCallback(run, "ls")
1572         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1573
1574         d.addCallback(run, "ls", "-l")
1575         def _check_ls_l((out,err)):
1576             lines = out.split("\n")
1577             for l in lines:
1578                 if "tahoe-file-stdin" in l:
1579                     self.failUnless(l.startswith("-r-- "), l)
1580                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1581                 if "file3" in l:
1582                     self.failUnless(l.startswith("-rw- "), l) # mutable
1583         d.addCallback(_check_ls_l)
1584
1585         d.addCallback(run, "ls", "--uri")
1586         def _check_ls_uri((out,err)):
1587             lines = out.split("\n")
1588             for l in lines:
1589                 if "file3" in l:
1590                     self.failUnless(self._mutable_file3_uri in l)
1591         d.addCallback(_check_ls_uri)
1592
1593         d.addCallback(run, "ls", "--readonly-uri")
1594         def _check_ls_rouri((out,err)):
1595             lines = out.split("\n")
1596             for l in lines:
1597                 if "file3" in l:
1598                     rw_uri = self._mutable_file3_uri
1599                     u = uri.from_string_mutable_filenode(rw_uri)
1600                     ro_uri = u.get_readonly().to_string()
1601                     self.failUnless(ro_uri in l)
1602         d.addCallback(_check_ls_rouri)
1603
1604
1605         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1606         d.addCallback(run, "ls")
1607         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1608
1609         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1610         d.addCallback(run, "ls")
1611         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1612
1613         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1614         d.addCallback(run, "ls")
1615         d.addCallback(_check_ls, ["file3", "file3-copy"])
1616         d.addCallback(run, "get", "tahoe:file3-copy")
1617         d.addCallback(_check_stdout_against, 3)
1618
1619         # copy from disk into tahoe
1620         d.addCallback(run, "cp", files[4], "tahoe:file4")
1621         d.addCallback(run, "ls")
1622         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1623         d.addCallback(run, "get", "tahoe:file4")
1624         d.addCallback(_check_stdout_against, 4)
1625
1626         # copy from tahoe into disk
1627         target_filename = os.path.join(self.basedir, "file-out")
1628         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1629         def _check_cp_out((out,err)):
1630             self.failUnless(os.path.exists(target_filename))
1631             got = open(target_filename,"rb").read()
1632             self.failUnlessEqual(got, datas[4])
1633         d.addCallback(_check_cp_out)
1634
1635         # copy from disk to disk (silly case)
1636         target2_filename = os.path.join(self.basedir, "file-out-copy")
1637         d.addCallback(run, "cp", target_filename, target2_filename)
1638         def _check_cp_out2((out,err)):
1639             self.failUnless(os.path.exists(target2_filename))
1640             got = open(target2_filename,"rb").read()
1641             self.failUnlessEqual(got, datas[4])
1642         d.addCallback(_check_cp_out2)
1643
1644         # copy from tahoe into disk, overwriting an existing file
1645         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1646         def _check_cp_out3((out,err)):
1647             self.failUnless(os.path.exists(target_filename))
1648             got = open(target_filename,"rb").read()
1649             self.failUnlessEqual(got, datas[3])
1650         d.addCallback(_check_cp_out3)
1651
1652         # copy from disk into tahoe, overwriting an existing immutable file
1653         d.addCallback(run, "cp", files[5], "tahoe:file4")
1654         d.addCallback(run, "ls")
1655         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1656         d.addCallback(run, "get", "tahoe:file4")
1657         d.addCallback(_check_stdout_against, 5)
1658
1659         # copy from disk into tahoe, overwriting an existing mutable file
1660         d.addCallback(run, "cp", files[5], "tahoe:file3")
1661         d.addCallback(run, "ls")
1662         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1663         d.addCallback(run, "get", "tahoe:file3")
1664         d.addCallback(_check_stdout_against, 5)
1665
1666         # recursive copy: setup
1667         dn = os.path.join(self.basedir, "dir1")
1668         os.makedirs(dn)
1669         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1670         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1671         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1672         sdn2 = os.path.join(dn, "subdir2")
1673         os.makedirs(sdn2)
1674         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1675         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1676
1677         # from disk into tahoe
1678         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1679         d.addCallback(run, "ls")
1680         d.addCallback(_check_ls, ["dir1"])
1681         d.addCallback(run, "ls", "dir1")
1682         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1683                       ["rfile4", "rfile5"])
1684         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1685         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1686                       ["rfile1", "rfile2", "rfile3"])
1687         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1688         d.addCallback(_check_stdout_against, data="rfile4")
1689
1690         # and back out again
1691         dn_copy = os.path.join(self.basedir, "dir1-copy")
1692         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1693         def _check_cp_r_out((out,err)):
1694             def _cmp(name):
1695                 old = open(os.path.join(dn, name), "rb").read()
1696                 newfn = os.path.join(dn_copy, name)
1697                 self.failUnless(os.path.exists(newfn))
1698                 new = open(newfn, "rb").read()
1699                 self.failUnlessEqual(old, new)
1700             _cmp("rfile1")
1701             _cmp("rfile2")
1702             _cmp("rfile3")
1703             _cmp(os.path.join("subdir2", "rfile4"))
1704             _cmp(os.path.join("subdir2", "rfile5"))
1705         d.addCallback(_check_cp_r_out)
1706
1707         # and copy it a second time, which ought to overwrite the same files
1708         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1709
1710         # and again, only writing filecaps
1711         dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1712         d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1713         def _check_capsonly((out,err)):
1714             # these should all be LITs
1715             x = open(os.path.join(dn_copy2, "subdir2", "rfile4")).read()
1716             y = uri.from_string_filenode(x)
1717             self.failUnlessEqual(y.data, "rfile4")
1718         d.addCallback(_check_capsonly)
1719
1720         # and tahoe-to-tahoe
1721         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1722         d.addCallback(run, "ls")
1723         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1724         d.addCallback(run, "ls", "dir1-copy")
1725         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1726                       ["rfile4", "rfile5"])
1727         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1728         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1729                       ["rfile1", "rfile2", "rfile3"])
1730         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1731         d.addCallback(_check_stdout_against, data="rfile4")
1732
1733         # and copy it a second time, which ought to overwrite the same files
1734         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1735
1736         # tahoe_ls doesn't currently handle the error correctly: it tries to
1737         # JSON-parse a traceback.
1738 ##         def _ls_missing(res):
1739 ##             argv = ["ls"] + nodeargs + ["bogus"]
1740 ##             return self._run_cli(argv)
1741 ##         d.addCallback(_ls_missing)
1742 ##         def _check_ls_missing((out,err)):
1743 ##             print "OUT", out
1744 ##             print "ERR", err
1745 ##             self.failUnlessEqual(err, "")
1746 ##         d.addCallback(_check_ls_missing)
1747
1748         return d
1749
1750     def test_filesystem_with_cli_in_subprocess(self):
1751         # We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.
1752
1753         self.basedir = "system/SystemTest/test_filesystem_with_cli_in_subprocess"
1754         d = self.set_up_nodes()
1755         def _new_happy_semantics(ign):
1756             for c in self.clients:
1757                 c.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
1758         d.addCallback(_new_happy_semantics)
1759
1760         def _run_in_subprocess(ignored, verb, *args, **kwargs):
1761             stdin = kwargs.get("stdin")
1762             env = kwargs.get("env")
1763             newargs = [verb, "--node-directory", self.getdir("client0")] + list(args)
1764             return self.run_bintahoe(newargs, stdin=stdin, env=env)
1765
1766         def _check_succeeded(res, check_stderr=True):
1767             out, err, rc_or_sig = res
1768             self.failUnlessEqual(rc_or_sig, 0, str(res))
1769             if check_stderr:
1770                 self.failUnlessEqual(err, "")
1771
1772         d.addCallback(_run_in_subprocess, "create-alias", "newalias")
1773         d.addCallback(_check_succeeded)
1774
1775         STDIN_DATA = "This is the file to upload from stdin."
1776         d.addCallback(_run_in_subprocess, "put", "-", "newalias:tahoe-file", stdin=STDIN_DATA)
1777         d.addCallback(_check_succeeded, check_stderr=False)
1778
1779         def _mv_with_http_proxy(ign):
1780             env = os.environ
1781             env['http_proxy'] = env['HTTP_PROXY'] = "http://127.0.0.0:12345"  # invalid address
1782             return _run_in_subprocess(None, "mv", "newalias:tahoe-file", "newalias:tahoe-moved", env=env)
1783         d.addCallback(_mv_with_http_proxy)
1784         d.addCallback(_check_succeeded)
1785
1786         d.addCallback(_run_in_subprocess, "ls", "newalias:")
1787         def _check_ls(res):
1788             out, err, rc_or_sig = res
1789             self.failUnlessEqual(rc_or_sig, 0, str(res))
1790             self.failUnlessEqual(err, "", str(res))
1791             self.failUnlessIn("tahoe-moved", out)
1792             self.failIfIn("tahoe-file", out)
1793         d.addCallback(_check_ls)
1794         return d
1795
1796     def test_debug_trial(self):
1797         def _check_for_line(lines, result, test):
1798             for l in lines:
1799                 if result in l and test in l:
1800                     return
1801             self.fail("output (prefixed with '##') does not have a line containing both %r and %r:\n## %s"
1802                       % (result, test, "\n## ".join(lines)))
1803
1804         def _check_for_outcome(lines, out, outcome):
1805             self.failUnlessIn(outcome, out, "output (prefixed with '##') does not contain %r:\n## %s"
1806                                             % (outcome, "\n## ".join(lines)))
1807
1808         d = self.run_bintahoe(['debug', 'trial', '--reporter=verbose',
1809                                'allmydata.test.trialtest'])
1810         def _check_failure( (out, err, rc) ):
1811             self.failUnlessEqual(rc, 1)
1812             lines = out.split('\n')
1813             _check_for_line(lines, "[SKIPPED]", "test_skip")
1814             _check_for_line(lines, "[TODO]",    "test_todo")
1815             _check_for_line(lines, "[FAIL]",    "test_fail")
1816             _check_for_line(lines, "[ERROR]",   "test_deferred_error")
1817             _check_for_line(lines, "[ERROR]",   "test_error")
1818             _check_for_outcome(lines, out, "FAILED")
1819         d.addCallback(_check_failure)
1820
1821         # the --quiet argument regression-tests a problem in finding which arguments to pass to trial
1822         d.addCallback(lambda ign: self.run_bintahoe(['--quiet', 'debug', 'trial', '--reporter=verbose',
1823                                                      'allmydata.test.trialtest.Success']))
1824         def _check_success( (out, err, rc) ):
1825             self.failUnlessEqual(rc, 0)
1826             lines = out.split('\n')
1827             _check_for_line(lines, "[SKIPPED]", "test_skip")
1828             _check_for_line(lines, "[TODO]",    "test_todo")
1829             _check_for_outcome(lines, out, "PASSED")
1830         d.addCallback(_check_success)
1831         return d
1832
1833     def _run_cli(self, argv, stdin=""):
1834         #print "CLI:", argv
1835         stdout, stderr = StringIO(), StringIO()
1836         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1837                                   stdin=StringIO(stdin),
1838                                   stdout=stdout, stderr=stderr)
1839         def _done(res):
1840             return stdout.getvalue(), stderr.getvalue()
1841         d.addCallback(_done)
1842         return d
1843
1844     def _test_checker(self, res):
1845         ut = upload.Data("too big to be literal" * 200, convergence=None)
1846         d = self._personal_node.add_file(u"big file", ut)
1847
1848         d.addCallback(lambda res: self._personal_node.check(Monitor()))
1849         def _check_dirnode_results(r):
1850             self.failUnless(r.is_healthy())
1851         d.addCallback(_check_dirnode_results)
1852         d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1853         d.addCallback(_check_dirnode_results)
1854
1855         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1856         def _got_chk_filenode(n):
1857             self.failUnless(isinstance(n, ImmutableFileNode))
1858             d = n.check(Monitor())
1859             def _check_filenode_results(r):
1860                 self.failUnless(r.is_healthy())
1861             d.addCallback(_check_filenode_results)
1862             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1863             d.addCallback(_check_filenode_results)
1864             return d
1865         d.addCallback(_got_chk_filenode)
1866
1867         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1868         def _got_lit_filenode(n):
1869             self.failUnless(isinstance(n, LiteralFileNode))
1870             d = n.check(Monitor())
1871             def _check_lit_filenode_results(r):
1872                 self.failUnlessEqual(r, None)
1873             d.addCallback(_check_lit_filenode_results)
1874             d.addCallback(lambda res: n.check(Monitor(), verify=True))
1875             d.addCallback(_check_lit_filenode_results)
1876             return d
1877         d.addCallback(_got_lit_filenode)
1878         return d