]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
2d807b94cc20f1dbc66a9f1fd63317d161cbbb29
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1
2 from base64 import b32encode
3 import os, sys, time, re
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.application import service
9 from allmydata import client, uri, download, upload, storage, mutable
10 from allmydata.introducer import IntroducerNode
11 from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
12 from allmydata.scripts import runner
13 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
14 from allmydata.mutable import NotMutableError
15 from foolscap.eventual import flushEventualQueue
16 from twisted.python import log
17 from twisted.python.failure import Failure
18 from twisted.web.client import getPage
19 from twisted.web.error import Error
20
21 def flush_but_dont_ignore(res):
22     d = flushEventualQueue()
23     def _done(ignored):
24         return res
25     d.addCallback(_done)
26     return d
27
28 LARGE_DATA = """
29 This is some data to publish to the virtual drive, which needs to be large
30 enough to not fit inside a LIT uri.
31 """
32
33 class SystemTest(testutil.SignalMixin, unittest.TestCase):
34
35     def setUp(self):
36         self.sparent = service.MultiService()
37         self.sparent.startService()
38     def tearDown(self):
39         log.msg("shutting down SystemTest services")
40         d = self.sparent.stopService()
41         d.addBoth(flush_but_dont_ignore)
42         return d
43
44     def getdir(self, subdir):
45         return os.path.join(self.basedir, subdir)
46
47     def add_service(self, s):
48         s.setServiceParent(self.sparent)
49         return s
50
51     def set_up_nodes(self, NUMCLIENTS=5, createprivdir=False):
52         self.numclients = NUMCLIENTS
53         self.createprivdir = createprivdir
54         iv_dir = self.getdir("introducer")
55         if not os.path.isdir(iv_dir):
56             fileutil.make_dirs(iv_dir)
57         iv = IntroducerNode(basedir=iv_dir)
58         self.introducer = self.add_service(iv)
59         d = self.introducer.when_tub_ready()
60         d.addCallback(self._set_up_nodes_2)
61         return d
62
63     def _set_up_nodes_2(self, res):
64         q = self.introducer
65         self.introducer_furl = q.introducer_url
66         self.clients = []
67         for i in range(self.numclients):
68             basedir = self.getdir("client%d" % i)
69             fileutil.make_dirs(basedir)
70             if i == 0:
71                 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
72             if self.createprivdir:
73                 open(os.path.join(basedir, "my_private_dir.uri"), "w")
74             open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
75             c = self.add_service(client.Client(basedir=basedir))
76             self.clients.append(c)
77         log.msg("STARTING")
78         d = self.wait_for_connections()
79         def _connected(res):
80             log.msg("CONNECTED")
81             # now find out where the web port was
82             l = self.clients[0].getServiceNamed("webish").listener
83             port = l._port.getHost().port
84             self.webish_url = "http://localhost:%d/" % port
85         d.addCallback(_connected)
86         return d
87
88     def add_extra_node(self, client_num):
89         # this node is *not* parented to our self.sparent, so we can shut it
90         # down separately from the rest, to exercise the connection-lost code
91         basedir = self.getdir("client%d" % client_num)
92         if not os.path.isdir(basedir):
93             fileutil.make_dirs(basedir)
94         open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
95
96         c = client.Client(basedir=basedir)
97         self.clients.append(c)
98         self.numclients += 1
99         c.startService()
100         d = self.wait_for_connections()
101         d.addCallback(lambda res: c)
102         return d
103
104     def wait_for_connections(self, ignored=None):
105         # TODO: replace this with something that takes a list of peerids and
106         # fires when they've all been heard from, instead of using a count
107         # and a threshold
108         for c in self.clients:
109             if (not c.introducer_client or
110                 len(list(c.get_all_peerids())) != self.numclients):
111                 d = defer.Deferred()
112                 d.addCallback(self.wait_for_connections)
113                 reactor.callLater(0.05, d.callback, None)
114                 return d
115         return defer.succeed(None)
116
117     def test_connections(self):
118         self.basedir = "system/SystemTest/test_connections"
119         d = self.set_up_nodes()
120         self.extra_node = None
121         d.addCallback(lambda res: self.add_extra_node(self.numclients))
122         def _check(extra_node):
123             self.extra_node = extra_node
124             for c in self.clients:
125                 all_peerids = list(c.get_all_peerids())
126                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
127                 permuted_peers = list(c.get_permuted_peers("a", True))
128                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
129                 permuted_other_peers = list(c.get_permuted_peers("a", False))
130                 self.failUnlessEqual(len(permuted_other_peers), self.numclients)
131
132         d.addCallback(_check)
133         def _shutdown_extra_node(res):
134             if self.extra_node:
135                 return self.extra_node.stopService()
136             return res
137         d.addBoth(_shutdown_extra_node)
138         return d
139     test_connections.timeout = 300
140     # test_connections is subsumed by test_upload_and_download, and takes
141     # quite a while to run on a slow machine (because of all the TLS
142     # connections that must be established). If we ever rework the introducer
143     # code to such an extent that we're not sure if it works anymore, we can
144     # reinstate this test until it does.
145     del test_connections
146
147     def test_upload_and_download(self):
148         self.basedir = "system/SystemTest/test_upload_and_download"
149         # we use 4000 bytes of data, which will result in about 400k written
150         # to disk among all our simulated nodes
151         DATA = "Some data to upload\n" * 200
152         d = self.set_up_nodes()
153         def _check_connections(res):
154             for c in self.clients:
155                 all_peerids = list(c.get_all_peerids())
156                 self.failUnlessEqual(len(all_peerids), self.numclients)
157                 permuted_peers = list(c.get_permuted_peers("a", True))
158                 self.failUnlessEqual(len(permuted_peers), self.numclients)
159                 permuted_other_peers = list(c.get_permuted_peers("a", False))
160                 self.failUnlessEqual(len(permuted_other_peers), self.numclients-1)
161         d.addCallback(_check_connections)
162         def _do_upload(res):
163             log.msg("UPLOADING")
164             u = self.clients[0].getServiceNamed("uploader")
165             self.uploader = u
166             # we crank the max segsize down to 1024b for the duration of this
167             # test, so we can exercise multiple segments. It is important
168             # that this is not a multiple of the segment size, so that the
169             # tail segment is not the same length as the others. This actualy
170             # gets rounded up to 1025 to be a multiple of the number of
171             # required shares (since we use 25 out of 100 FEC).
172             options = {"max_segment_size": 1024}
173             d1 = u.upload_data(DATA, options)
174             return d1
175         d.addCallback(_do_upload)
176         def _upload_done(uri):
177             log.msg("upload finished: uri is %s" % (uri,))
178             self.uri = uri
179             dl = self.clients[1].getServiceNamed("downloader")
180             self.downloader = dl
181         d.addCallback(_upload_done)
182
183         def _upload_again(res):
184             # upload again. This ought to be short-circuited, however with
185             # the way we currently generate URIs (i.e. because they include
186             # the roothash), we have to do all of the encoding work, and only
187             # get to save on the upload part.
188             log.msg("UPLOADING AGAIN")
189             options = {"max_segment_size": 1024}
190             d1 = self.uploader.upload_data(DATA, options)
191         d.addCallback(_upload_again)
192
193         def _download_to_data(res):
194             log.msg("DOWNLOADING")
195             return self.downloader.download_to_data(self.uri)
196         d.addCallback(_download_to_data)
197         def _download_to_data_done(data):
198             log.msg("download finished")
199             self.failUnlessEqual(data, DATA)
200         d.addCallback(_download_to_data_done)
201
202         target_filename = os.path.join(self.basedir, "download.target")
203         def _download_to_filename(res):
204             return self.downloader.download_to_filename(self.uri,
205                                                         target_filename)
206         d.addCallback(_download_to_filename)
207         def _download_to_filename_done(res):
208             newdata = open(target_filename, "rb").read()
209             self.failUnlessEqual(newdata, DATA)
210         d.addCallback(_download_to_filename_done)
211
212         target_filename2 = os.path.join(self.basedir, "download.target2")
213         def _download_to_filehandle(res):
214             fh = open(target_filename2, "wb")
215             return self.downloader.download_to_filehandle(self.uri, fh)
216         d.addCallback(_download_to_filehandle)
217         def _download_to_filehandle_done(fh):
218             fh.close()
219             newdata = open(target_filename2, "rb").read()
220             self.failUnlessEqual(newdata, DATA)
221         d.addCallback(_download_to_filehandle_done)
222
223         def _download_nonexistent_uri(res):
224             baduri = self.mangle_uri(self.uri)
225             d1 = self.downloader.download_to_data(baduri)
226             def _baduri_should_fail(res):
227                 self.failUnless(isinstance(res, Failure))
228                 self.failUnless(res.check(download.NotEnoughPeersError),
229                                 "expected NotEnoughPeersError, got %s" % res)
230                 # TODO: files that have zero peers should get a special kind
231                 # of NotEnoughPeersError, which can be used to suggest that
232                 # the URI might be wrong or that they've never uploaded the
233                 # file in the first place.
234             d1.addBoth(_baduri_should_fail)
235             return d1
236         d.addCallback(_download_nonexistent_uri)
237         return d
238     test_upload_and_download.timeout = 4800
239
240     def _find_shares(self, basedir):
241         shares = []
242         for (dirpath, dirnames, filenames) in os.walk(basedir):
243             if "storage" not in dirpath:
244                 continue
245             if not filenames:
246                 continue
247             pieces = dirpath.split(os.sep)
248             if pieces[-3] == "storage" and pieces[-2] == "shares":
249                 # we're sitting in .../storage/shares/$SINDEX , and there
250                 # are sharefiles here
251                 assert pieces[-4].startswith("client")
252                 client_num = int(pieces[-4][-1])
253                 storage_index_s = pieces[-1]
254                 storage_index = idlib.a2b(storage_index_s)
255                 for sharename in filenames:
256                     shnum = int(sharename)
257                     filename = os.path.join(dirpath, sharename)
258                     data = (client_num, storage_index, filename, shnum)
259                     shares.append(data)
260         if not shares:
261             self.fail("unable to find any share files in %s" % basedir)
262         return shares
263
264     def _corrupt_mutable_share(self, filename, which):
265         msf = storage.MutableShareFile(filename)
266         datav = msf.readv([ (0, 1000000) ])
267         final_share = datav[0]
268         assert len(final_share) < 1000000 # ought to be truncated
269         pieces = mutable.unpack_share(final_share)
270         (seqnum, root_hash, IV, k, N, segsize, datalen,
271          verification_key, signature, share_hash_chain, block_hash_tree,
272          share_data, enc_privkey) = pieces
273
274         if which == "seqnum":
275             seqnum = seqnum + 15
276         elif which == "R":
277             root_hash = self.flip_bit(root_hash)
278         elif which == "IV":
279             IV = self.flip_bit(IV)
280         elif which == "segsize":
281             segsize = segsize + 15
282         elif which == "pubkey":
283             verification_key = self.flip_bit(verification_key)
284         elif which == "signature":
285             signature = self.flip_bit(signature)
286         elif which == "share_hash_chain":
287             nodenum = share_hash_chain.keys()[0]
288             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
289         elif which == "block_hash_tree":
290             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
291         elif which == "share_data":
292             share_data = self.flip_bit(share_data)
293         elif which == "encprivkey":
294             enc_privkey = self.flip_bit(enc_privkey)
295
296         prefix = mutable.pack_prefix(seqnum, root_hash, IV, k, N,
297                                      segsize, datalen)
298         final_share = mutable.pack_share(prefix,
299                                          verification_key,
300                                          signature,
301                                          share_hash_chain,
302                                          block_hash_tree,
303                                          share_data,
304                                          enc_privkey)
305         msf.writev( [(0, final_share)], None)
306
307     def test_mutable(self):
308         self.basedir = "system/SystemTest/test_mutable"
309         DATA = "initial contents go here."  # 25 bytes % 3 != 0
310         NEWDATA = "new contents yay"
311         NEWERDATA = "this is getting old"
312
313         d = self.set_up_nodes()
314
315         def _create_mutable(res):
316             c = self.clients[0]
317             log.msg("starting create_mutable_file")
318             d1 = c.create_mutable_file(DATA, wait_for_numpeers=self.numclients)
319             def _done(res):
320                 log.msg("DONE: %s" % (res,))
321                 self._mutable_node_1 = res
322                 uri = res.get_uri()
323             d1.addCallback(_done)
324             return d1
325         d.addCallback(_create_mutable)
326
327         def _test_debug(res):
328             # find a share. It is important to run this while there is only
329             # one slot in the grid.
330             shares = self._find_shares(self.basedir)
331             (client_num, storage_index, filename, shnum) = shares[0]
332             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
333                     % filename)
334             log.msg(" for clients[%d]" % client_num)
335
336             out,err = StringIO(), StringIO()
337             rc = runner.runner(["dump-share",
338                                 filename],
339                                stdout=out, stderr=err)
340             output = out.getvalue()
341             self.failUnlessEqual(rc, 0)
342             try:
343                 self.failUnless("Mutable slot found:\n" in output)
344                 self.failUnless("share_type: SDMF\n" in output)
345                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
346                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
347                 self.failUnless(" num_extra_leases: 0\n" in output)
348                 # the pubkey size can vary by a byte, so the container might
349                 # be a bit larger on some runs.
350                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
351                 self.failUnless(m)
352                 container_size = int(m.group(1))
353                 self.failUnless(2037 <= container_size <= 2049, container_size)
354                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
355                 self.failUnless(m)
356                 data_length = int(m.group(1))
357                 self.failUnless(2037 <= data_length <= 2049, data_length)
358                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
359                                 in output)
360                 self.failUnless(" SDMF contents:\n" in output)
361                 self.failUnless("  seqnum: 1\n" in output)
362                 self.failUnless("  required_shares: 3\n" in output)
363                 self.failUnless("  total_shares: 10\n" in output)
364                 self.failUnless("  segsize: 27\n" in output, (output, filename))
365                 self.failUnless("  datalen: 25\n" in output)
366                 # the exact share_hash_chain nodes depends upon the sharenum,
367                 # and is more of a hassle to compute than I want to deal with
368                 # now
369                 self.failUnless("  share_hash_chain: " in output)
370                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
371             except unittest.FailTest:
372                 print
373                 print "dump-share output was:"
374                 print output
375                 raise
376         d.addCallback(_test_debug)
377
378         # test retrieval
379
380         # first, let's see if we can use the existing node to retrieve the
381         # contents. This allows it to use the cached pubkey and maybe the
382         # latest-known sharemap.
383
384         d.addCallback(lambda res: self._mutable_node_1.download_to_data())
385         def _check_download_1(res):
386             self.failUnlessEqual(res, DATA)
387             # now we see if we can retrieve the data from a new node,
388             # constructed using the URI of the original one. We do this test
389             # on the same client that uploaded the data.
390             uri = self._mutable_node_1.get_uri()
391             log.msg("starting retrieve1")
392             newnode = self.clients[0].create_node_from_uri(uri)
393             return newnode.download_to_data()
394         d.addCallback(_check_download_1)
395
396         def _check_download_2(res):
397             self.failUnlessEqual(res, DATA)
398             # same thing, but with a different client
399             uri = self._mutable_node_1.get_uri()
400             newnode = self.clients[1].create_node_from_uri(uri)
401             log.msg("starting retrieve2")
402             d1 = newnode.download_to_data()
403             d1.addCallback(lambda res: (res, newnode))
404             return d1
405         d.addCallback(_check_download_2)
406
407         def _check_download_3((res, newnode)):
408             self.failUnlessEqual(res, DATA)
409             # replace the data
410             log.msg("starting replace1")
411             d1 = newnode.replace(NEWDATA, wait_for_numpeers=self.numclients)
412             d1.addCallback(lambda res: newnode.download_to_data())
413             return d1
414         d.addCallback(_check_download_3)
415
416         def _check_download_4(res):
417             self.failUnlessEqual(res, NEWDATA)
418             # now create an even newer node and replace the data on it. This
419             # new node has never been used for download before.
420             uri = self._mutable_node_1.get_uri()
421             newnode1 = self.clients[2].create_node_from_uri(uri)
422             newnode2 = self.clients[3].create_node_from_uri(uri)
423             self._newnode3 = self.clients[3].create_node_from_uri(uri)
424             log.msg("starting replace2")
425             d1 = newnode1.replace(NEWERDATA, wait_for_numpeers=self.numclients)
426             d1.addCallback(lambda res: newnode2.download_to_data())
427             return d1
428         d.addCallback(_check_download_4)
429
430         def _check_download_5(res):
431             log.msg("finished replace2")
432             self.failUnlessEqual(res, NEWERDATA)
433         d.addCallback(_check_download_5)
434
435         def _corrupt_shares(res):
436             # run around and flip bits in all but k of the shares, to test
437             # the hash checks
438             shares = self._find_shares(self.basedir)
439             ## sort by share number
440             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
441             where = dict([ (shnum, filename)
442                            for (client_num, storage_index, filename, shnum)
443                            in shares ])
444             assert len(where) == 10 # this test is designed for 3-of-10
445             for shnum, filename in where.items():
446                 # shares 7,8,9 are left alone. read will check
447                 # (share_hash_chain, block_hash_tree, share_data). New
448                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
449                 # segsize, signature).
450                 if shnum == 0:
451                     # read: this will trigger "pubkey doesn't match
452                     # fingerprint".
453                     self._corrupt_mutable_share(filename, "pubkey")
454                     self._corrupt_mutable_share(filename, "encprivkey")
455                 elif shnum == 1:
456                     # triggers "signature is invalid"
457                     self._corrupt_mutable_share(filename, "seqnum")
458                 elif shnum == 2:
459                     # triggers "signature is invalid"
460                     self._corrupt_mutable_share(filename, "R")
461                 elif shnum == 3:
462                     # triggers "signature is invalid"
463                     self._corrupt_mutable_share(filename, "segsize")
464                 elif shnum == 4:
465                     self._corrupt_mutable_share(filename, "share_hash_chain")
466                 elif shnum == 5:
467                     self._corrupt_mutable_share(filename, "block_hash_tree")
468                 elif shnum == 6:
469                     self._corrupt_mutable_share(filename, "share_data")
470                 # other things to correct: IV, signature
471                 # 7,8,9 are left alone
472
473                 # note that initial_query_count=5 means that we'll hit the
474                 # first 5 servers in effectively random order (based upon
475                 # response time), so we won't necessarily ever get a "pubkey
476                 # doesn't match fingerprint" error (if we hit shnum>=1 before
477                 # shnum=0, we pull the pubkey from there). To get repeatable
478                 # specific failures, we need to set initial_query_count=1,
479                 # but of course that will change the sequencing behavior of
480                 # the retrieval process. TODO: find a reasonable way to make
481                 # this a parameter, probably when we expand this test to test
482                 # for one failure mode at a time.
483
484                 # when we retrieve this, we should get three signature
485                 # failures (where we've mangled seqnum, R, and segsize). The
486                 # pubkey mangling
487         d.addCallback(_corrupt_shares)
488
489         d.addCallback(lambda res: self._newnode3.download_to_data())
490         d.addCallback(_check_download_5)
491
492         def _check_empty_file(res):
493             # make sure we can create empty files, this usually screws up the
494             # segsize math
495             d1 = self.clients[2].create_mutable_file("", wait_for_numpeers=self.numclients)
496             d1.addCallback(lambda newnode: newnode.download_to_data())
497             d1.addCallback(lambda res: self.failUnlessEqual("", res))
498             return d1
499         d.addCallback(_check_empty_file)
500
501         d.addCallback(lambda res: self.clients[0].create_empty_dirnode(wait_for_numpeers=self.numclients))
502         def _created_dirnode(dnode):
503             log.msg("_created_dirnode(%s)" % (dnode,))
504             d1 = dnode.list()
505             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
506             d1.addCallback(lambda res: dnode.has_child("edgar"))
507             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
508             d1.addCallback(lambda res: dnode.set_node("see recursive", dnode, wait_for_numpeers=self.numclients))
509             d1.addCallback(lambda res: dnode.has_child("see recursive"))
510             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
511             d1.addCallback(lambda res: dnode.build_manifest())
512             d1.addCallback(lambda manifest:
513                            self.failUnlessEqual(len(manifest), 1))
514             return d1
515         d.addCallback(_created_dirnode)
516
517         return d
518     # The default 120 second timeout went off when running it under valgrind
519     # on my old Windows laptop, so I'm bumping up the timeout.
520     test_mutable.timeout = 240
521
522     def flip_bit(self, good):
523         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
524
525     def mangle_uri(self, gooduri):
526         # change the key, which changes the storage index, which means we'll
527         # be asking about the wrong file, so nobody will have any shares
528         u = IFileURI(gooduri)
529         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
530                             uri_extension_hash=u.uri_extension_hash,
531                             needed_shares=u.needed_shares,
532                             total_shares=u.total_shares,
533                             size=u.size)
534         return u2.to_string()
535
536     # TODO: add a test which mangles the uri_extension_hash instead, and
537     # should fail due to not being able to get a valid uri_extension block.
538     # Also a test which sneakily mangles the uri_extension block to change
539     # some of the validation data, so it will fail in the post-download phase
540     # when the file's crypttext integrity check fails. Do the same thing for
541     # the key, which should cause the download to fail the post-download
542     # plaintext_hash check.
543
544     def test_vdrive(self):
545         self.basedir = "system/SystemTest/test_vdrive"
546         self.data = LARGE_DATA
547         d = self.set_up_nodes(createprivdir=True)
548         d.addCallback(self.log, "starting publish")
549         d.addCallback(self._do_publish1)
550         d.addCallback(self._test_runner)
551         d.addCallback(self._do_publish2)
552         # at this point, we have the following filesystem (where "R" denotes
553         # self._root_directory_uri):
554         # R
555         # R/subdir1
556         # R/subdir1/mydata567
557         # R/subdir1/subdir2/
558         # R/subdir1/subdir2/mydata992
559
560         d.addCallback(self._bounce_client0)
561         d.addCallback(self.log, "bounced client0")
562
563         d.addCallback(self._check_publish1)
564         d.addCallback(self.log, "did _check_publish1")
565         d.addCallback(self._check_publish2)
566         d.addCallback(self.log, "did _check_publish2")
567         d.addCallback(self._do_publish_private)
568         d.addCallback(self.log, "did _do_publish_private")
569         # now we also have (where "P" denotes clients[0]'s automatic private
570         # dir):
571         #  P/personal/sekrit data
572         #  P/s2-rw -> /subdir1/subdir2/
573         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
574         d.addCallback(self._check_publish_private)
575         d.addCallback(self.log, "did _check_publish_private")
576         d.addCallback(self._test_web)
577         d.addCallback(self._test_web_start)
578         d.addCallback(self._test_control)
579         d.addCallback(self._test_cli)
580         # P now has four top-level children:
581         # P/personal/sekrit data
582         # P/s2-ro/
583         # P/s2-rw/
584         # P/test_put/  (empty)
585         d.addCallback(self._test_checker)
586         d.addCallback(self._test_verifier)
587         return d
588     test_vdrive.timeout = 1100
589
590     def _do_publish1(self, res):
591         ut = upload.Data(self.data)
592         c0 = self.clients[0]
593         d = c0.create_empty_dirnode(wait_for_numpeers=self.numclients)
594         def _made_root(new_dirnode):
595             log.msg("ZZZ %s -> %s" % (hasattr(self, '_root_directory_uri') and self._root_directory_uri, new_dirnode.get_uri(),))
596             self._root_directory_uri = new_dirnode.get_uri()
597             return c0.create_node_from_uri(self._root_directory_uri)
598         d.addCallback(_made_root)
599         d.addCallback(lambda root: root.create_empty_directory("subdir1", wait_for_numpeers=self.numclients))
600         def _made_subdir1(subdir1_node):
601             self._subdir1_node = subdir1_node
602             d1 = subdir1_node.add_file("mydata567", ut, wait_for_numpeers=self.numclients)
603             d1.addCallback(self.log, "publish finished")
604             def _stash_uri(filenode):
605                 self.uri = filenode.get_uri()
606             d1.addCallback(_stash_uri)
607             return d1
608         d.addCallback(_made_subdir1)
609         return d
610
611     def _do_publish2(self, res):
612         ut = upload.Data(self.data)
613         d = self._subdir1_node.create_empty_directory("subdir2", wait_for_numpeers=self.numclients)
614         d.addCallback(lambda subdir2: subdir2.add_file("mydata992", ut, wait_for_numpeers=self.numclients))
615         return d
616
617     def _bounce_client0(self, res):
618         old_client0 = self.clients[0]
619         d = old_client0.disownServiceParent()
620         assert isinstance(d, defer.Deferred)
621         d.addCallback(self.log, "STOPPED")
622         # I think windows requires a moment to let the connection really stop
623         # and the port number made available for re-use. TODO: examine the
624         # behavior, see if this is really the problem, see if we can do
625         # better than blindly waiting for a second.
626         d.addCallback(self.stall, 1.0)
627         def _stopped(res):
628             new_client0 = client.Client(basedir=self.getdir("client0"))
629             self.add_service(new_client0)
630             self.clients[0] = new_client0
631             return self.wait_for_connections()
632         d.addCallback(_stopped)
633         d.addCallback(self.log, "CONNECTED")
634         def _connected(res):
635             # now find out where the web port was
636             l = self.clients[0].getServiceNamed("webish").listener
637             port = l._port.getHost().port
638             self.webish_url = "http://localhost:%d/" % port
639         d.addCallback(_connected)
640         d.addCallback(self.log, "GOT WEB LISTENER")
641         return d
642
643     def log(self, res, msg):
644         # print "MSG: %s  RES: %s" % (msg, res)
645         log.msg(msg)
646         return res
647
648     def stall(self, res, delay=1.0):
649         d = defer.Deferred()
650         reactor.callLater(delay, d.callback, res)
651         return d
652
653     def _do_publish_private(self, res):
654         self.smalldata = "sssh, very secret stuff"
655         ut = upload.Data(self.smalldata)
656         d = self.clients[0].get_private_uri()
657         d.addCallback(self.log, "GOT private directory")
658         def _got_root_uri(privuri):
659             assert privuri
660             privnode = self.clients[0].create_node_from_uri(privuri)
661             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
662             d1 = privnode.create_empty_directory("personal", wait_for_numpeers=self.numclients)
663             d1.addCallback(self.log, "made P/personal")
664             d1.addCallback(lambda node: node.add_file("sekrit data", ut, wait_for_numpeers=self.numclients))
665             d1.addCallback(self.log, "made P/personal/sekrit data")
666             d1.addCallback(lambda res: rootnode.get_child_at_path(["subdir1", "subdir2"]))
667             def _got_s2(s2node):
668                 d2 = privnode.set_uri("s2-rw", s2node.get_uri(), wait_for_numpeers=self.numclients)
669                 d2.addCallback(lambda node: privnode.set_uri("s2-ro", s2node.get_readonly_uri(), wait_for_numpeers=self.numclients))
670                 return d2
671             d1.addCallback(_got_s2)
672             return d1
673         d.addCallback(_got_root_uri)
674         return d
675
676     def _check_publish1(self, res):
677         # this one uses the iterative API
678         c1 = self.clients[1]
679         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
680         d.addCallback(self.log, "check_publish1 got /")
681         d.addCallback(lambda root: root.get("subdir1"))
682         d.addCallback(lambda subdir1: subdir1.get("mydata567"))
683         d.addCallback(lambda filenode: filenode.download_to_data())
684         d.addCallback(self.log, "get finished")
685         def _get_done(data):
686             self.failUnlessEqual(data, self.data)
687         d.addCallback(_get_done)
688         return d
689
690     def _check_publish2(self, res):
691         # this one uses the path-based API
692         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
693         d = rootnode.get_child_at_path("subdir1")
694         d.addCallback(lambda dirnode:
695                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
696         d.addCallback(lambda res: rootnode.get_child_at_path("subdir1/mydata567"))
697         d.addCallback(lambda filenode: filenode.download_to_data())
698         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
699
700         d.addCallback(lambda res: rootnode.get_child_at_path("subdir1/mydata567"))
701         def _got_filenode(filenode):
702             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
703             assert fnode == filenode
704         d.addCallback(_got_filenode)
705         return d
706
707     def _check_publish_private(self, res):
708         # this one uses the path-based API
709         d = self.clients[0].get_private_uri()
710         def _got_private_uri(privateuri):
711             self._private_node = self.clients[0].create_node_from_uri(privateuri)
712         d.addCallback(_got_private_uri)
713
714         d.addCallback(lambda res: self._private_node.get_child_at_path("personal"))
715         def _got_personal(personal):
716             self._personal_node = personal
717             return personal
718         d.addCallback(_got_personal)
719
720         d.addCallback(lambda dirnode:
721                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
722         def get_path(path):
723             return self._private_node.get_child_at_path(path)
724
725         d.addCallback(lambda res: get_path("personal/sekrit data"))
726         d.addCallback(lambda filenode: filenode.download_to_data())
727         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
728         d.addCallback(lambda res: get_path("s2-rw"))
729         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
730         d.addCallback(lambda res: get_path("s2-ro"))
731         def _got_s2ro(dirnode):
732             self.failUnless(dirnode.is_mutable(), dirnode)
733             self.failUnless(dirnode.is_readonly(), dirnode)
734             d1 = defer.succeed(None)
735             d1.addCallback(lambda res: dirnode.list())
736             d1.addCallback(self.log, "dirnode.list")
737
738             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, "nope"))
739
740             d1.addCallback(self.log, "doing add_file(ro)")
741             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.")
742             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, "hope", ut))
743
744             d1.addCallback(self.log, "doing get(ro)")
745             d1.addCallback(lambda res: dirnode.get("mydata992"))
746             d1.addCallback(lambda filenode:
747                            self.failUnless(IFileNode.providedBy(filenode)))
748
749             d1.addCallback(self.log, "doing delete(ro)")
750             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, "mydata992"))
751
752             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, "hopeless", self.uri))
753
754             d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, "missing"))
755
756             personal = self._personal_node
757             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, "mydata992", personal, "nope"))
758
759             d1.addCallback(self.log, "doing move_child_to(ro)2")
760             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, "sekrit data", dirnode, "nope"))
761
762             d1.addCallback(self.log, "finished with _got_s2ro")
763             return d1
764         d.addCallback(_got_s2ro)
765         def _got_home(dummy):
766             home = self._private_node
767             personal = self._personal_node
768             d1 = defer.succeed(None)
769             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
770             d1.addCallback(lambda res:
771                            personal.move_child_to("sekrit data",home,"sekrit"))
772
773             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
774             d1.addCallback(lambda res:
775                            home.move_child_to("sekrit", home, "sekrit data"))
776
777             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
778             d1.addCallback(lambda res:
779                            home.move_child_to("sekrit data", personal))
780
781             d1.addCallback(lambda res: home.build_manifest())
782             d1.addCallback(self.log, "manifest")
783             #  four items:
784             # P/personal/
785             # P/personal/sekrit data
786             # P/s2-rw  (same as P/s2-ro)
787             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
788             d1.addCallback(lambda manifest:
789                            self.failUnlessEqual(len(manifest), 4))
790             return d1
791         d.addCallback(_got_home)
792         return d
793
794     def shouldFail(self, res, expected_failure, which, substring=None):
795         if isinstance(res, Failure):
796             res.trap(expected_failure)
797             if substring:
798                 self.failUnless(substring in str(res),
799                                 "substring '%s' not in '%s'"
800                                 % (substring, str(res)))
801         else:
802             self.fail("%s was supposed to raise %s, not get '%s'" %
803                       (which, expected_failure, res))
804
805     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
806         assert substring is None or isinstance(substring, str)
807         d = defer.maybeDeferred(callable, *args, **kwargs)
808         def done(res):
809             if isinstance(res, Failure):
810                 res.trap(expected_failure)
811                 if substring:
812                     self.failUnless(substring in str(res),
813                                     "substring '%s' not in '%s'"
814                                     % (substring, str(res)))
815             else:
816                 self.fail("%s was supposed to raise %s, not get '%s'" %
817                           (which, expected_failure, res))
818         d.addBoth(done)
819         return d
820
821     def PUT(self, urlpath, data):
822         url = self.webish_url + urlpath
823         return getPage(url, method="PUT", postdata=data)
824
825     def GET(self, urlpath, followRedirect=False):
826         url = self.webish_url + urlpath
827         return getPage(url, method="GET", followRedirect=followRedirect)
828
829     def _test_web(self, res):
830         base = self.webish_url
831         public = "uri/" + self._root_directory_uri.replace("/", "!")
832         d = getPage(base)
833         def _got_welcome(page):
834             expected = "Connected Peers: <span>%d</span>" % (self.numclients)
835             self.failUnless(expected in page,
836                             "I didn't see the right 'connected peers' message "
837                             "in: %s" % page
838                             )
839             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
840             self.failUnless(expected in page,
841                             "I didn't see the right 'My nodeid' message "
842                             "in: %s" % page)
843         d.addCallback(_got_welcome)
844         d.addCallback(self.log, "done with _got_welcome")
845         d.addCallback(lambda res: getPage(base + public))
846         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
847         def _got_subdir1(page):
848             # there ought to be an href for our file
849             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
850             self.failUnless(">mydata567</a>" in page)
851         d.addCallback(_got_subdir1)
852         d.addCallback(self.log, "done with _got_subdir1")
853         d.addCallback(lambda res:
854                       getPage(base + public + "/subdir1/mydata567"))
855         def _got_data(page):
856             self.failUnlessEqual(page, self.data)
857         d.addCallback(_got_data)
858
859         # download from a URI embedded in a URL
860         d.addCallback(self.log, "_get_from_uri")
861         def _get_from_uri(res):
862             return getPage(base + "uri/%s?filename=%s"
863                            % (self.uri, "mydata567"))
864         d.addCallback(_get_from_uri)
865         def _got_from_uri(page):
866             self.failUnlessEqual(page, self.data)
867         d.addCallback(_got_from_uri)
868
869         # download from a URI embedded in a URL, second form
870         d.addCallback(self.log, "_get_from_uri2")
871         def _get_from_uri2(res):
872             return getPage(base + "uri?uri=%s" % (self.uri,))
873         d.addCallback(_get_from_uri2)
874         d.addCallback(_got_from_uri)
875
876         # download from a bogus URI, make sure we get a reasonable error
877         d.addCallback(self.log, "_get_from_bogus_uri")
878         def _get_from_bogus_uri(res):
879             d1 = getPage(base + "uri/%s?filename=%s"
880                          % (self.mangle_uri(self.uri), "mydata567"))
881             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
882                        "410")
883             return d1
884         d.addCallback(_get_from_bogus_uri)
885
886         # upload a file with PUT
887         d.addCallback(self.log, "about to try PUT")
888         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
889                                            "new.txt contents"))
890         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
891         d.addCallback(self.failUnlessEqual, "new.txt contents")
892         # and again with something large enough to use multiple segments,
893         # and hopefully trigger pauseProducing too
894         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
895                                            "big" * 500000)) # 1.5MB
896         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
897         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
898
899         # can we replace files in place?
900         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
901                                            "NEWER contents"))
902         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
903         d.addCallback(self.failUnlessEqual, "NEWER contents")
904
905
906         # TODO: mangle the second segment of a file, to test errors that
907         # occur after we've already sent some good data, which uses a
908         # different error path.
909
910         # TODO: download a URI with a form
911         # TODO: create a directory by using a form
912         # TODO: upload by using a form on the directory page
913         #    url = base + "somedir/subdir1/freeform_post!!upload"
914         # TODO: delete a file by using a button on the directory page
915
916         return d
917
918     def _test_web_start(self, res):
919         basedir = self.clients[0].basedir
920         startfile = os.path.join(basedir, "start.html")
921         self.failUnless(os.path.exists(startfile))
922         start_html = open(startfile, "r").read()
923         self.failUnless(self.webish_url in start_html)
924         d = self.clients[0].get_private_uri()
925         def done(private_uri):
926             private_url = self.webish_url + "uri/" + private_uri.replace("/","!")
927             self.failUnless(private_url in start_html)
928         d.addCallback(done)
929         return d
930
931     def _test_runner(self, res):
932         # exercise some of the diagnostic tools in runner.py
933
934         # find a share
935         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
936             if "storage" not in dirpath:
937                 continue
938             if not filenames:
939                 continue
940             pieces = dirpath.split(os.sep)
941             if pieces[-3] == "storage" and pieces[-2] == "shares":
942                 # we're sitting in .../storage/shares/$SINDEX , and there are
943                 # sharefiles here
944                 filename = os.path.join(dirpath, filenames[0])
945                 # peek at the magic to see if it is a chk share
946                 magic = open(filename, "rb").read(4)
947                 if magic == '\x00\x00\x00\x01':
948                     break
949         else:
950             self.fail("unable to find any uri_extension files in %s"
951                       % self.basedir)
952         log.msg("test_system.SystemTest._test_runner using %s" % filename)
953
954         out,err = StringIO(), StringIO()
955         rc = runner.runner(["dump-share",
956                             filename],
957                            stdout=out, stderr=err)
958         output = out.getvalue()
959         self.failUnlessEqual(rc, 0)
960
961         # we only upload a single file, so we can assert some things about
962         # its size and shares.
963         self.failUnless("size: %d\n" % len(self.data) in output)
964         self.failUnless("num_segments: 1\n" in output)
965         # segment_size is always a multiple of needed_shares
966         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
967         self.failUnless("total_shares: 10\n" in output)
968         # keys which are supposed to be present
969         for key in ("size", "num_segments", "segment_size",
970                     "needed_shares", "total_shares",
971                     "codec_name", "codec_params", "tail_codec_params",
972                     "plaintext_hash", "plaintext_root_hash",
973                     "crypttext_hash", "crypttext_root_hash",
974                     "share_root_hash",):
975             self.failUnless("%s: " % key in output, key)
976
977     def _test_control(self, res):
978         # exercise the remote-control-the-client foolscap interfaces in
979         # allmydata.control (mostly used for performance tests)
980         c0 = self.clients[0]
981         control_furl_file = os.path.join(c0.basedir, "control.furl")
982         control_furl = open(control_furl_file, "r").read().strip()
983         # it doesn't really matter which Tub we use to connect to the client,
984         # so let's just use our IntroducerNode's
985         d = self.introducer.tub.getReference(control_furl)
986         d.addCallback(self._test_control2, control_furl_file)
987         return d
988     def _test_control2(self, rref, filename):
989         d = rref.callRemote("upload_from_file_to_uri", filename)
990         downfile = os.path.join(self.basedir, "control.downfile")
991         d.addCallback(lambda uri:
992                       rref.callRemote("download_from_uri_to_file",
993                                       uri, downfile))
994         def _check(res):
995             self.failUnlessEqual(res, downfile)
996             data = open(downfile, "r").read()
997             expected_data = open(filename, "r").read()
998             self.failUnlessEqual(data, expected_data)
999         d.addCallback(_check)
1000         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1001         if sys.platform == "linux2":
1002             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1003         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1004         return d
1005
1006     def _test_cli(self, res):
1007         # run various CLI commands (in a thread, since they use blocking
1008         # network calls)
1009
1010         private_uri = self._private_node.get_uri()
1011         some_uri = self._root_directory_uri
1012
1013         nodeargs = [
1014             "--node-url", self.webish_url,
1015             "--root-uri", private_uri,
1016             ]
1017         public_nodeargs = [
1018             "--node-url", self.webish_url,
1019             "--root-uri", some_uri,
1020             ]
1021         TESTDATA = "I will not write the same thing over and over.\n" * 100
1022
1023         d = defer.succeed(None)
1024
1025         def _ls_root(res):
1026             argv = ["ls"] + nodeargs
1027             return self._run_cli(argv)
1028         d.addCallback(_ls_root)
1029         def _check_ls_root((out,err)):
1030             self.failUnless("personal" in out)
1031             self.failUnless("s2-ro" in out)
1032             self.failUnless("s2-rw" in out)
1033             self.failUnlessEqual(err, "")
1034         d.addCallback(_check_ls_root)
1035
1036         def _ls_subdir(res):
1037             argv = ["ls"] + nodeargs + ["personal"]
1038             return self._run_cli(argv)
1039         d.addCallback(_ls_subdir)
1040         def _check_ls_subdir((out,err)):
1041             self.failUnless("sekrit data" in out)
1042             self.failUnlessEqual(err, "")
1043         d.addCallback(_check_ls_subdir)
1044
1045         def _ls_public_subdir(res):
1046             argv = ["ls"] + public_nodeargs + ["subdir1"]
1047             return self._run_cli(argv)
1048         d.addCallback(_ls_public_subdir)
1049         def _check_ls_public_subdir((out,err)):
1050             self.failUnless("subdir2" in out)
1051             self.failUnless("mydata567" in out)
1052             self.failUnlessEqual(err, "")
1053         d.addCallback(_check_ls_public_subdir)
1054
1055         def _ls_file(res):
1056             argv = ["ls"] + public_nodeargs + ["subdir1/mydata567"]
1057             return self._run_cli(argv)
1058         d.addCallback(_ls_file)
1059         def _check_ls_file((out,err)):
1060             self.failUnlessEqual(out.strip(), "112 subdir1/mydata567")
1061             self.failUnlessEqual(err, "")
1062         d.addCallback(_check_ls_file)
1063
1064         # tahoe_ls doesn't currently handle the error correctly: it tries to
1065         # JSON-parse a traceback.
1066 ##         def _ls_missing(res):
1067 ##             argv = ["ls"] + nodeargs + ["bogus"]
1068 ##             return self._run_cli(argv)
1069 ##         d.addCallback(_ls_missing)
1070 ##         def _check_ls_missing((out,err)):
1071 ##             print "OUT", out
1072 ##             print "ERR", err
1073 ##             self.failUnlessEqual(err, "")
1074 ##         d.addCallback(_check_ls_missing)
1075
1076         def _put(res):
1077             tdir = self.getdir("cli_put")
1078             fileutil.make_dirs(tdir)
1079             fn = os.path.join(tdir, "upload_me")
1080             f = open(fn, "wb")
1081             f.write(TESTDATA)
1082             f.close()
1083             argv = ["put"] + nodeargs + [fn, "test_put/upload.txt"]
1084             return self._run_cli(argv)
1085         d.addCallback(_put)
1086         def _check_put((out,err)):
1087             self.failUnless("200 OK" in out)
1088             self.failUnlessEqual(err, "")
1089             d = self._private_node.get_child_at_path("test_put/upload.txt")
1090             d.addCallback(lambda filenode: filenode.download_to_data())
1091             def _check_put2(res):
1092                 self.failUnlessEqual(res, TESTDATA)
1093             d.addCallback(_check_put2)
1094             return d
1095         d.addCallback(_check_put)
1096
1097         def _get_to_stdout(res):
1098             argv = ["get"] + nodeargs + ["test_put/upload.txt"]
1099             return self._run_cli(argv)
1100         d.addCallback(_get_to_stdout)
1101         def _check_get_to_stdout((out,err)):
1102             self.failUnlessEqual(out, TESTDATA)
1103             self.failUnlessEqual(err, "")
1104         d.addCallback(_check_get_to_stdout)
1105
1106         get_to_file_target = self.basedir + "/get.downfile"
1107         def _get_to_file(res):
1108             argv = ["get"] + nodeargs + ["test_put/upload.txt",
1109                                          get_to_file_target]
1110             return self._run_cli(argv)
1111         d.addCallback(_get_to_file)
1112         def _check_get_to_file((out,err)):
1113             data = open(get_to_file_target, "rb").read()
1114             self.failUnlessEqual(data, TESTDATA)
1115             self.failUnlessEqual(out, "")
1116             self.failUnlessEqual(err, "test_put/upload.txt retrieved and written to system/SystemTest/test_vdrive/get.downfile\n")
1117         d.addCallback(_check_get_to_file)
1118
1119
1120         def _mv(res):
1121             argv = ["mv"] + nodeargs + ["test_put/upload.txt",
1122                                         "test_put/moved.txt"]
1123             return self._run_cli(argv)
1124         d.addCallback(_mv)
1125         def _check_mv((out,err)):
1126             self.failUnless("OK" in out)
1127             self.failUnlessEqual(err, "")
1128             d = self.shouldFail2(KeyError, "test_cli._check_rm", "'upload.txt'", self._private_node.get_child_at_path, "test_put/upload.txt")
1129
1130             d.addCallback(lambda res:
1131                           self._private_node.get_child_at_path("test_put/moved.txt"))
1132             d.addCallback(lambda filenode: filenode.download_to_data())
1133             def _check_mv2(res):
1134                 self.failUnlessEqual(res, TESTDATA)
1135             d.addCallback(_check_mv2)
1136             return d
1137         d.addCallback(_check_mv)
1138
1139         def _rm(res):
1140             argv = ["rm"] + nodeargs + ["test_put/moved.txt"]
1141             return self._run_cli(argv)
1142         d.addCallback(_rm)
1143         def _check_rm((out,err)):
1144             self.failUnless("200 OK" in out)
1145             self.failUnlessEqual(err, "")
1146             d = self.shouldFail2(KeyError, "test_cli._check_rm", "'moved.txt'", self._private_node.get_child_at_path, "test_put/moved.txt")
1147             return d
1148         d.addCallback(_check_rm)
1149         return d
1150
1151     def _run_cli(self, argv):
1152         stdout, stderr = StringIO(), StringIO()
1153         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1154                                   stdout=stdout, stderr=stderr)
1155         def _done(res):
1156             return stdout.getvalue(), stderr.getvalue()
1157         d.addCallback(_done)
1158         return d
1159
1160     def _test_checker(self, res):
1161         d = self._private_node.build_manifest()
1162         d.addCallback(self._test_checker_2)
1163         return d
1164
1165     def _test_checker_2(self, manifest):
1166         checker1 = self.clients[1].getServiceNamed("checker")
1167         self.failUnlessEqual(checker1.checker_results_for(None), [])
1168         self.failUnlessEqual(checker1.checker_results_for(list(manifest)[0]),
1169                              [])
1170         dl = []
1171         starting_time = time.time()
1172         for si in manifest:
1173             dl.append(checker1.check(si))
1174         d = deferredutil.DeferredListShouldSucceed(dl)
1175
1176         def _check_checker_results(res):
1177             for i in res:
1178                 if type(i) is bool:
1179                     self.failUnless(i is True)
1180                 else:
1181                     (needed, total, found, sharemap) = i
1182                     self.failUnlessEqual(needed, 3)
1183                     self.failUnlessEqual(total, 10)
1184                     self.failUnlessEqual(found, total)
1185                     self.failUnlessEqual(len(sharemap.keys()), 10)
1186                     peers = set()
1187                     for shpeers in sharemap.values():
1188                         peers.update(shpeers)
1189                     self.failUnlessEqual(len(peers), self.numclients-1)
1190         d.addCallback(_check_checker_results)
1191
1192         def _check_stored_results(res):
1193             finish_time = time.time()
1194             all_results = []
1195             for si in manifest:
1196                 results = checker1.checker_results_for(si)
1197                 if not results:
1198                     # TODO: implement checker for mutable files and implement tests of that checker
1199                     continue
1200                 self.failUnlessEqual(len(results), 1)
1201                 when, those_results = results[0]
1202                 self.failUnless(isinstance(when, (int, float)))
1203                 self.failUnless(starting_time <= when <= finish_time)
1204                 all_results.append(those_results)
1205             _check_checker_results(all_results)
1206         d.addCallback(_check_stored_results)
1207
1208         d.addCallback(self._test_checker_3)
1209         return d
1210
1211     def _test_checker_3(self, res):
1212         # check one file, through FileNode.check()
1213         d = self._private_node.get_child_at_path("personal/sekrit data")
1214         d.addCallback(lambda n: n.check())
1215         def _checked(results):
1216             # 'sekrit data' is small, and fits in a LiteralFileNode, so
1217             # checking it is trivial and always returns True
1218             self.failUnlessEqual(results, True)
1219         d.addCallback(_checked)
1220
1221         c0 = self.clients[1]
1222         n = c0.create_node_from_uri(self._root_directory_uri)
1223         d.addCallback(lambda res: n.get_child_at_path("subdir1/mydata567"))
1224         d.addCallback(lambda n: n.check())
1225         def _checked2(results):
1226             # mydata567 is large and lives in a CHK
1227             (needed, total, found, sharemap) = results
1228             self.failUnlessEqual(needed, 3)
1229             self.failUnlessEqual(total, 10)
1230             self.failUnlessEqual(found, 10)
1231             self.failUnlessEqual(len(sharemap), 10)
1232             for shnum in range(10):
1233                 self.failUnlessEqual(len(sharemap[shnum]), 1)
1234         d.addCallback(_checked2)
1235         return d
1236
1237
1238     def _test_verifier(self, res):
1239         checker1 = self.clients[1].getServiceNamed("checker")
1240         d = self._private_node.build_manifest()
1241         def _check_all(manifest):
1242             dl = []
1243             for si in manifest:
1244                 dl.append(checker1.verify(si))
1245             return deferredutil.DeferredListShouldSucceed(dl)
1246         d.addCallback(_check_all)
1247         def _done(res):
1248             for i in res:
1249                 self.failUnless(i is True)
1250         d.addCallback(_done)
1251         d.addCallback(lambda res: checker1.verify(None))
1252         d.addCallback(self.failUnlessEqual, True)
1253         return d