]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
stats: add /statistics web page to show them, add tests
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1
2 from base64 import b32encode
3 import os, sys, time, re, simplejson
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.application import service
10 import allmydata
11 from allmydata import client, uri, download, upload, storage, mutable, offloaded
12 from allmydata.introducer import IntroducerNode
13 from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
14 from allmydata.util import log
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
17 from allmydata.mutable import NotMutableError
18 from allmydata.stats import PickleStatsGatherer
19 from allmydata.key_generator import KeyGeneratorService
20 from foolscap.eventual import flushEventualQueue, fireEventually
21 from foolscap import DeadReferenceError, Tub
22 from twisted.python.failure import Failure
23 from twisted.web.client import getPage
24 from twisted.web.error import Error
25
26 def flush_but_dont_ignore(res):
27     d = flushEventualQueue()
28     def _done(ignored):
29         return res
30     d.addCallback(_done)
31     return d
32
33 LARGE_DATA = """
34 This is some data to publish to the virtual drive, which needs to be large
35 enough to not fit inside a LIT uri.
36 """
37
38 class CountingDataUploadable(upload.Data):
39     bytes_read = 0
40     interrupt_after = None
41     interrupt_after_d = None
42
43     def read(self, length):
44         self.bytes_read += length
45         if self.interrupt_after is not None:
46             if self.bytes_read > self.interrupt_after:
47                 self.interrupt_after = None
48                 self.interrupt_after_d.callback(self)
49         return upload.Data.read(self, length)
50
51
52 class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
53
54     def setUp(self):
55         self.sparent = service.MultiService()
56         self.sparent.startService()
57     def tearDown(self):
58         log.msg("shutting down SystemTest services")
59         d = self.sparent.stopService()
60         d.addBoth(flush_but_dont_ignore)
61         return d
62
63     def getdir(self, subdir):
64         return os.path.join(self.basedir, subdir)
65
66     def add_service(self, s):
67         s.setServiceParent(self.sparent)
68         return s
69
70     def set_up_nodes(self, NUMCLIENTS=5, createprivdir=False):
71         self.numclients = NUMCLIENTS
72         self.createprivdir = createprivdir
73         iv_dir = self.getdir("introducer")
74         if not os.path.isdir(iv_dir):
75             fileutil.make_dirs(iv_dir)
76         f = open(os.path.join(iv_dir, "webport"), "w")
77         f.write("tcp:0:interface=127.0.0.1\n")
78         f.close()
79         iv = IntroducerNode(basedir=iv_dir)
80         self.introducer = self.add_service(iv)
81         d = self.introducer.when_tub_ready()
82         d.addCallback(self._get_introducer_web)
83         d.addCallback(self._set_up_stats_gatherer)
84         d.addCallback(self._set_up_key_generator)
85         d.addCallback(self._set_up_nodes_2)
86         d.addCallback(self._grab_stats)
87         return d
88
89     def _get_introducer_web(self, res):
90         f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
91         self.introweb_url = f.read().strip()
92         f.close()
93
94     def _set_up_stats_gatherer(self, res):
95         statsdir = self.getdir("stats_gatherer")
96         fileutil.make_dirs(statsdir)
97         t = Tub()
98         self.add_service(t)
99         l = t.listenOn("tcp:0")
100         p = l.getPortnum()
101         t.setLocation("localhost:%d" % p)
102
103         self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
104         self.add_service(self.stats_gatherer)
105         self.stats_gatherer_furl = self.stats_gatherer.get_furl()
106
107     def _set_up_key_generator(self, res):
108         kgsdir = self.getdir("key_generator")
109         fileutil.make_dirs(kgsdir)
110
111         self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False)
112         self.key_generator_svc.key_generator.pool_size = 4
113         self.key_generator_svc.key_generator.pool_refresh_delay = 60
114         self.add_service(self.key_generator_svc)
115
116         d = fireEventually()
117         def check_for_furl():
118             return os.path.exists(os.path.join(kgsdir, 'key_generator.furl'))
119         d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
120         def get_furl(junk):
121             kgf = os.path.join(kgsdir, 'key_generator.furl')
122             self.key_generator_furl = file(kgf, 'rb').read().strip()
123         d.addCallback(get_furl)
124         return d
125
126     def _set_up_nodes_2(self, res):
127         q = self.introducer
128         self.introducer_furl = q.introducer_url
129         self.clients = []
130         basedirs = []
131         for i in range(self.numclients):
132             basedir = self.getdir("client%d" % i)
133             basedirs.append(basedir)
134             fileutil.make_dirs(basedir)
135             if i == 0:
136                 # client[0] runs a webserver and a helper, no key_generator
137                 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
138                 open(os.path.join(basedir, "run_helper"), "w").write("yes\n")
139             if i == 3:
140                 # client[3] runs a webserver and uses a helper, uses key_generator
141                 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
142                 kgf = "%s\n" % (self.key_generator_furl,)
143                 open(os.path.join(basedir, "key_generator.furl"), "w").write(kgf)
144             if self.createprivdir:
145                 fileutil.make_dirs(os.path.join(basedir, "private"))
146                 open(os.path.join(basedir, "private", "root_dir.cap"), "w")
147             open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
148             open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
149
150         # start client[0], wait for it's tub to be ready (at which point it
151         # will have registered the helper furl).
152         c = self.add_service(client.Client(basedir=basedirs[0]))
153         self.clients.append(c)
154         d = c.when_tub_ready()
155         def _ready(res):
156             f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
157             helper_furl = f.read()
158             f.close()
159             self.helper_furl = helper_furl
160             f = open(os.path.join(basedirs[3],"helper.furl"), "w")
161             f.write(helper_furl)
162             f.close()
163
164             # this starts the rest of the clients
165             for i in range(1, self.numclients):
166                 c = self.add_service(client.Client(basedir=basedirs[i]))
167                 self.clients.append(c)
168             log.msg("STARTING")
169             return self.wait_for_connections()
170         d.addCallback(_ready)
171         def _connected(res):
172             log.msg("CONNECTED")
173             # now find out where the web port was
174             l = self.clients[0].getServiceNamed("webish").listener
175             port = l._port.getHost().port
176             self.webish_url = "http://localhost:%d/" % port
177             # and the helper-using webport
178             l = self.clients[3].getServiceNamed("webish").listener
179             port = l._port.getHost().port
180             self.helper_webish_url = "http://localhost:%d/" % port
181         d.addCallback(_connected)
182         return d
183
184     def _grab_stats(self, res):
185         d = self.stats_gatherer.poll()
186         return d
187
188     def bounce_client(self, num):
189         c = self.clients[num]
190         d = c.disownServiceParent()
191         # I think windows requires a moment to let the connection really stop
192         # and the port number made available for re-use. TODO: examine the
193         # behavior, see if this is really the problem, see if we can do
194         # better than blindly waiting for a second.
195         d.addCallback(self.stall, 1.0)
196         def _stopped(res):
197             new_c = client.Client(basedir=self.getdir("client%d" % num))
198             self.clients[num] = new_c
199             self.add_service(new_c)
200             return new_c.when_tub_ready()
201         d.addCallback(_stopped)
202         d.addCallback(lambda res: self.wait_for_connections())
203         def _maybe_get_webport(res):
204             if num == 0:
205                 # now find out where the web port was
206                 l = self.clients[0].getServiceNamed("webish").listener
207                 port = l._port.getHost().port
208                 self.webish_url = "http://localhost:%d/" % port
209         d.addCallback(_maybe_get_webport)
210         return d
211
212     def add_extra_node(self, client_num, helper_furl=None,
213                        add_to_sparent=False):
214         # usually this node is *not* parented to our self.sparent, so we can
215         # shut it down separately from the rest, to exercise the
216         # connection-lost code
217         basedir = self.getdir("client%d" % client_num)
218         if not os.path.isdir(basedir):
219             fileutil.make_dirs(basedir)
220         open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
221         if helper_furl:
222             f = open(os.path.join(basedir, "helper.furl") ,"w")
223             f.write(helper_furl+"\n")
224             f.close()
225
226         c = client.Client(basedir=basedir)
227         self.clients.append(c)
228         self.numclients += 1
229         if add_to_sparent:
230             c.setServiceParent(self.sparent)
231         else:
232             c.startService()
233         d = self.wait_for_connections()
234         d.addCallback(lambda res: c)
235         return d
236
237     def _check_connections(self):
238         for c in self.clients:
239             ic = c.introducer_client
240             if not ic.connected_to_introducer():
241                 return False
242             if len(ic.get_all_peerids()) != self.numclients:
243                 return False
244         return True
245
246     def wait_for_connections(self, ignored=None):
247         # TODO: replace this with something that takes a list of peerids and
248         # fires when they've all been heard from, instead of using a count
249         # and a threshold
250         return self.poll(self._check_connections, timeout=200)
251
252     def test_connections(self):
253         self.basedir = "system/SystemTest/test_connections"
254         d = self.set_up_nodes()
255         self.extra_node = None
256         d.addCallback(lambda res: self.add_extra_node(self.numclients))
257         def _check(extra_node):
258             self.extra_node = extra_node
259             for c in self.clients:
260                 all_peerids = list(c.get_all_peerids())
261                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
262                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
263                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
264
265         d.addCallback(_check)
266         def _shutdown_extra_node(res):
267             if self.extra_node:
268                 return self.extra_node.stopService()
269             return res
270         d.addBoth(_shutdown_extra_node)
271         return d
272     test_connections.timeout = 300
273     # test_connections is subsumed by test_upload_and_download, and takes
274     # quite a while to run on a slow machine (because of all the TLS
275     # connections that must be established). If we ever rework the introducer
276     # code to such an extent that we're not sure if it works anymore, we can
277     # reinstate this test until it does.
278     del test_connections
279
280     def test_upload_and_download_random_key(self):
281         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
282         return self._test_upload_and_download(convergence=None)
283     test_upload_and_download_random_key.timeout = 4800
284
285     def test_upload_and_download_convergent(self):
286         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
287         return self._test_upload_and_download(convergence="some convergence string")
288     test_upload_and_download_convergent.timeout = 4800
289
290     def _test_upload_and_download(self, convergence):
291         # we use 4000 bytes of data, which will result in about 400k written
292         # to disk among all our simulated nodes
293         DATA = "Some data to upload\n" * 200
294         d = self.set_up_nodes()
295         def _check_connections(res):
296             for c in self.clients:
297                 all_peerids = list(c.get_all_peerids())
298                 self.failUnlessEqual(len(all_peerids), self.numclients)
299                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
300                 self.failUnlessEqual(len(permuted_peers), self.numclients)
301         d.addCallback(_check_connections)
302
303         def _do_upload(res):
304             log.msg("UPLOADING")
305             u = self.clients[0].getServiceNamed("uploader")
306             self.uploader = u
307             # we crank the max segsize down to 1024b for the duration of this
308             # test, so we can exercise multiple segments. It is important
309             # that this is not a multiple of the segment size, so that the
310             # tail segment is not the same length as the others. This actualy
311             # gets rounded up to 1025 to be a multiple of the number of
312             # required shares (since we use 25 out of 100 FEC).
313             up = upload.Data(DATA, convergence=convergence)
314             up.max_segment_size = 1024
315             d1 = u.upload(up)
316             return d1
317         d.addCallback(_do_upload)
318         def _upload_done(results):
319             uri = results.uri
320             log.msg("upload finished: uri is %s" % (uri,))
321             self.uri = uri
322             dl = self.clients[1].getServiceNamed("downloader")
323             self.downloader = dl
324         d.addCallback(_upload_done)
325
326         def _upload_again(res):
327             # Upload again. If using convergent encryption then this ought to be
328             # short-circuited, however with the way we currently generate URIs
329             # (i.e. because they include the roothash), we have to do all of the
330             # encoding work, and only get to save on the upload part.
331             log.msg("UPLOADING AGAIN")
332             up = upload.Data(DATA, convergence=convergence)
333             up.max_segment_size = 1024
334             d1 = self.uploader.upload(up)
335         d.addCallback(_upload_again)
336
337         def _download_to_data(res):
338             log.msg("DOWNLOADING")
339             return self.downloader.download_to_data(self.uri)
340         d.addCallback(_download_to_data)
341         def _download_to_data_done(data):
342             log.msg("download finished")
343             self.failUnlessEqual(data, DATA)
344         d.addCallback(_download_to_data_done)
345
346         target_filename = os.path.join(self.basedir, "download.target")
347         def _download_to_filename(res):
348             return self.downloader.download_to_filename(self.uri,
349                                                         target_filename)
350         d.addCallback(_download_to_filename)
351         def _download_to_filename_done(res):
352             newdata = open(target_filename, "rb").read()
353             self.failUnlessEqual(newdata, DATA)
354         d.addCallback(_download_to_filename_done)
355
356         target_filename2 = os.path.join(self.basedir, "download.target2")
357         def _download_to_filehandle(res):
358             fh = open(target_filename2, "wb")
359             return self.downloader.download_to_filehandle(self.uri, fh)
360         d.addCallback(_download_to_filehandle)
361         def _download_to_filehandle_done(fh):
362             fh.close()
363             newdata = open(target_filename2, "rb").read()
364             self.failUnlessEqual(newdata, DATA)
365         d.addCallback(_download_to_filehandle_done)
366
367         def _download_nonexistent_uri(res):
368             baduri = self.mangle_uri(self.uri)
369             log.msg("about to download non-existent URI", level=log.UNUSUAL,
370                     facility="tahoe.tests")
371             d1 = self.downloader.download_to_data(baduri)
372             def _baduri_should_fail(res):
373                 log.msg("finished downloading non-existend URI",
374                         level=log.UNUSUAL, facility="tahoe.tests")
375                 self.failUnless(isinstance(res, Failure))
376                 self.failUnless(res.check(download.NotEnoughPeersError),
377                                 "expected NotEnoughPeersError, got %s" % res)
378                 # TODO: files that have zero peers should get a special kind
379                 # of NotEnoughPeersError, which can be used to suggest that
380                 # the URI might be wrong or that they've never uploaded the
381                 # file in the first place.
382             d1.addBoth(_baduri_should_fail)
383             return d1
384         d.addCallback(_download_nonexistent_uri)
385
386         # add a new node, which doesn't accept shares, and only uses the
387         # helper for upload.
388         d.addCallback(lambda res: self.add_extra_node(self.numclients,
389                                                       self.helper_furl,
390                                                       add_to_sparent=True))
391         def _added(extra_node):
392             self.extra_node = extra_node
393             extra_node.getServiceNamed("storage").sizelimit = 0
394         d.addCallback(_added)
395
396         HELPER_DATA = "Data that needs help to upload" * 1000
397         def _upload_with_helper(res):
398             u = upload.Data(HELPER_DATA, convergence=convergence)
399             d = self.extra_node.upload(u)
400             def _uploaded(results):
401                 uri = results.uri
402                 return self.downloader.download_to_data(uri)
403             d.addCallback(_uploaded)
404             def _check(newdata):
405                 self.failUnlessEqual(newdata, HELPER_DATA)
406             d.addCallback(_check)
407             return d
408         d.addCallback(_upload_with_helper)
409
410         def _upload_duplicate_with_helper(res):
411             u = upload.Data(HELPER_DATA, convergence=convergence)
412             u.debug_stash_RemoteEncryptedUploadable = True
413             d = self.extra_node.upload(u)
414             def _uploaded(results):
415                 uri = results.uri
416                 return self.downloader.download_to_data(uri)
417             d.addCallback(_uploaded)
418             def _check(newdata):
419                 self.failUnlessEqual(newdata, HELPER_DATA)
420                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
421                             "uploadable started uploading, should have been avoided")
422             d.addCallback(_check)
423             return d
424         if convergence is not None:
425             d.addCallback(_upload_duplicate_with_helper)
426
427         def _upload_resumable(res):
428             DATA = "Data that needs help to upload and gets interrupted" * 1000
429             u1 = CountingDataUploadable(DATA, convergence=convergence)
430             u2 = CountingDataUploadable(DATA, convergence=convergence)
431
432             # we interrupt the connection after about 5kB by shutting down
433             # the helper, then restartingit.
434             u1.interrupt_after = 5000
435             u1.interrupt_after_d = defer.Deferred()
436             u1.interrupt_after_d.addCallback(lambda res:
437                                              self.bounce_client(0))
438
439             # sneak into the helper and reduce its chunk size, so that our
440             # debug_interrupt will sever the connection on about the fifth
441             # chunk fetched. This makes sure that we've started to write the
442             # new shares before we abandon them, which exercises the
443             # abort/delete-partial-share code. TODO: find a cleaner way to do
444             # this. I know that this will affect later uses of the helper in
445             # this same test run, but I'm not currently worried about it.
446             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
447
448             d = self.extra_node.upload(u1)
449
450             def _should_not_finish(res):
451                 self.fail("interrupted upload should have failed, not finished"
452                           " with result %s" % (res,))
453             def _interrupted(f):
454                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
455
456                 # make sure we actually interrupted it before finishing the
457                 # file
458                 self.failUnless(u1.bytes_read < len(DATA),
459                                 "read %d out of %d total" % (u1.bytes_read,
460                                                              len(DATA)))
461
462                 log.msg("waiting for reconnect", level=log.NOISY,
463                         facility="tahoe.test.test_system")
464                 # now, we need to give the nodes a chance to notice that this
465                 # connection has gone away. When this happens, the storage
466                 # servers will be told to abort their uploads, removing the
467                 # partial shares. Unfortunately this involves TCP messages
468                 # going through the loopback interface, and we can't easily
469                 # predict how long that will take. If it were all local, we
470                 # could use fireEventually() to stall. Since we don't have
471                 # the right introduction hooks, the best we can do is use a
472                 # fixed delay. TODO: this is fragile.
473                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
474                 return u1.interrupt_after_d
475             d.addCallbacks(_should_not_finish, _interrupted)
476
477             def _disconnected(res):
478                 # check to make sure the storage servers aren't still hanging
479                 # on to the partial share: their incoming/ directories should
480                 # now be empty.
481                 log.msg("disconnected", level=log.NOISY,
482                         facility="tahoe.test.test_system")
483                 for i in range(self.numclients):
484                     incdir = os.path.join(self.getdir("client%d" % i),
485                                           "storage", "shares", "incoming")
486                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
487             d.addCallback(_disconnected)
488
489             # then we need to give the reconnector a chance to
490             # reestablish the connection to the helper.
491             d.addCallback(lambda res:
492                           log.msg("wait_for_connections", level=log.NOISY,
493                                   facility="tahoe.test.test_system"))
494             d.addCallback(lambda res: self.wait_for_connections())
495
496
497             d.addCallback(lambda res:
498                           log.msg("uploading again", level=log.NOISY,
499                                   facility="tahoe.test.test_system"))
500             d.addCallback(lambda res: self.extra_node.upload(u2))
501
502             def _uploaded(results):
503                 uri = results.uri
504                 log.msg("Second upload complete", level=log.NOISY,
505                         facility="tahoe.test.test_system")
506
507                 # this is really bytes received rather than sent, but it's
508                 # convenient and basically measures the same thing
509                 bytes_sent = results.ciphertext_fetched
510
511                 # We currently don't support resumption of upload if the data is
512                 # encrypted with a random key.  (Because that would require us
513                 # to store the key locally and re-use it on the next upload of
514                 # this file, which isn't a bad thing to do, but we currently
515                 # don't do it.)
516                 if convergence is not None:
517                     # Make sure we did not have to read the whole file the
518                     # second time around .
519                     self.failUnless(bytes_sent < len(DATA),
520                                 "resumption didn't save us any work:"
521                                 " read %d bytes out of %d total" %
522                                 (bytes_sent, len(DATA)))
523                 else:
524                     # Make sure we did have to read the whole file the second
525                     # time around -- because the one that we partially uploaded
526                     # earlier was encrypted with a different random key.
527                     self.failIf(bytes_sent < len(DATA),
528                                 "resumption saved us some work even though we were using random keys:"
529                                 " read %d bytes out of %d total" %
530                                 (bytes_sent, len(DATA)))
531                 return self.downloader.download_to_data(uri)
532             d.addCallback(_uploaded)
533
534             def _check(newdata):
535                 self.failUnlessEqual(newdata, DATA)
536                 # If using convergent encryption, then also check that the
537                 # helper has removed the temp file from its directories.
538                 if convergence is not None:
539                     basedir = os.path.join(self.getdir("client0"), "helper")
540                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
541                     self.failUnlessEqual(files, [])
542                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
543                     self.failUnlessEqual(files, [])
544             d.addCallback(_check)
545             return d
546         d.addCallback(_upload_resumable)
547
548         return d
549
550     def _find_shares(self, basedir):
551         shares = []
552         for (dirpath, dirnames, filenames) in os.walk(basedir):
553             if "storage" not in dirpath:
554                 continue
555             if not filenames:
556                 continue
557             pieces = dirpath.split(os.sep)
558             if pieces[-4] == "storage" and pieces[-3] == "shares":
559                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
560                 # are sharefiles here
561                 assert pieces[-5].startswith("client")
562                 client_num = int(pieces[-5][-1])
563                 storage_index_s = pieces[-1]
564                 storage_index = storage.si_a2b(storage_index_s)
565                 for sharename in filenames:
566                     shnum = int(sharename)
567                     filename = os.path.join(dirpath, sharename)
568                     data = (client_num, storage_index, filename, shnum)
569                     shares.append(data)
570         if not shares:
571             self.fail("unable to find any share files in %s" % basedir)
572         return shares
573
574     def _corrupt_mutable_share(self, filename, which):
575         msf = storage.MutableShareFile(filename)
576         datav = msf.readv([ (0, 1000000) ])
577         final_share = datav[0]
578         assert len(final_share) < 1000000 # ought to be truncated
579         pieces = mutable.unpack_share(final_share)
580         (seqnum, root_hash, IV, k, N, segsize, datalen,
581          verification_key, signature, share_hash_chain, block_hash_tree,
582          share_data, enc_privkey) = pieces
583
584         if which == "seqnum":
585             seqnum = seqnum + 15
586         elif which == "R":
587             root_hash = self.flip_bit(root_hash)
588         elif which == "IV":
589             IV = self.flip_bit(IV)
590         elif which == "segsize":
591             segsize = segsize + 15
592         elif which == "pubkey":
593             verification_key = self.flip_bit(verification_key)
594         elif which == "signature":
595             signature = self.flip_bit(signature)
596         elif which == "share_hash_chain":
597             nodenum = share_hash_chain.keys()[0]
598             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
599         elif which == "block_hash_tree":
600             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
601         elif which == "share_data":
602             share_data = self.flip_bit(share_data)
603         elif which == "encprivkey":
604             enc_privkey = self.flip_bit(enc_privkey)
605
606         prefix = mutable.pack_prefix(seqnum, root_hash, IV, k, N,
607                                      segsize, datalen)
608         final_share = mutable.pack_share(prefix,
609                                          verification_key,
610                                          signature,
611                                          share_hash_chain,
612                                          block_hash_tree,
613                                          share_data,
614                                          enc_privkey)
615         msf.writev( [(0, final_share)], None)
616
617     def test_mutable(self):
618         self.basedir = "system/SystemTest/test_mutable"
619         DATA = "initial contents go here."  # 25 bytes % 3 != 0
620         NEWDATA = "new contents yay"
621         NEWERDATA = "this is getting old"
622
623         d = self.set_up_nodes()
624
625         def _create_mutable(res):
626             c = self.clients[0]
627             log.msg("starting create_mutable_file")
628             d1 = c.create_mutable_file(DATA)
629             def _done(res):
630                 log.msg("DONE: %s" % (res,))
631                 self._mutable_node_1 = res
632                 uri = res.get_uri()
633             d1.addCallback(_done)
634             return d1
635         d.addCallback(_create_mutable)
636
637         def _test_debug(res):
638             # find a share. It is important to run this while there is only
639             # one slot in the grid.
640             shares = self._find_shares(self.basedir)
641             (client_num, storage_index, filename, shnum) = shares[0]
642             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
643                     % filename)
644             log.msg(" for clients[%d]" % client_num)
645
646             out,err = StringIO(), StringIO()
647             rc = runner.runner(["dump-share",
648                                 filename],
649                                stdout=out, stderr=err)
650             output = out.getvalue()
651             self.failUnlessEqual(rc, 0)
652             try:
653                 self.failUnless("Mutable slot found:\n" in output)
654                 self.failUnless("share_type: SDMF\n" in output)
655                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
656                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
657                 self.failUnless(" num_extra_leases: 0\n" in output)
658                 # the pubkey size can vary by a byte, so the container might
659                 # be a bit larger on some runs.
660                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
661                 self.failUnless(m)
662                 container_size = int(m.group(1))
663                 self.failUnless(2037 <= container_size <= 2049, container_size)
664                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
665                 self.failUnless(m)
666                 data_length = int(m.group(1))
667                 self.failUnless(2037 <= data_length <= 2049, data_length)
668                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
669                                 in output)
670                 self.failUnless(" SDMF contents:\n" in output)
671                 self.failUnless("  seqnum: 1\n" in output)
672                 self.failUnless("  required_shares: 3\n" in output)
673                 self.failUnless("  total_shares: 10\n" in output)
674                 self.failUnless("  segsize: 27\n" in output, (output, filename))
675                 self.failUnless("  datalen: 25\n" in output)
676                 # the exact share_hash_chain nodes depends upon the sharenum,
677                 # and is more of a hassle to compute than I want to deal with
678                 # now
679                 self.failUnless("  share_hash_chain: " in output)
680                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
681             except unittest.FailTest:
682                 print
683                 print "dump-share output was:"
684                 print output
685                 raise
686         d.addCallback(_test_debug)
687
688         # test retrieval
689
690         # first, let's see if we can use the existing node to retrieve the
691         # contents. This allows it to use the cached pubkey and maybe the
692         # latest-known sharemap.
693
694         d.addCallback(lambda res: self._mutable_node_1.download_to_data())
695         def _check_download_1(res):
696             self.failUnlessEqual(res, DATA)
697             # now we see if we can retrieve the data from a new node,
698             # constructed using the URI of the original one. We do this test
699             # on the same client that uploaded the data.
700             uri = self._mutable_node_1.get_uri()
701             log.msg("starting retrieve1")
702             newnode = self.clients[0].create_node_from_uri(uri)
703             return newnode.download_to_data()
704         d.addCallback(_check_download_1)
705
706         def _check_download_2(res):
707             self.failUnlessEqual(res, DATA)
708             # same thing, but with a different client
709             uri = self._mutable_node_1.get_uri()
710             newnode = self.clients[1].create_node_from_uri(uri)
711             log.msg("starting retrieve2")
712             d1 = newnode.download_to_data()
713             d1.addCallback(lambda res: (res, newnode))
714             return d1
715         d.addCallback(_check_download_2)
716
717         def _check_download_3((res, newnode)):
718             self.failUnlessEqual(res, DATA)
719             # replace the data
720             log.msg("starting replace1")
721             d1 = newnode.update(NEWDATA)
722             d1.addCallback(lambda res: newnode.download_to_data())
723             return d1
724         d.addCallback(_check_download_3)
725
726         def _check_download_4(res):
727             self.failUnlessEqual(res, NEWDATA)
728             # now create an even newer node and replace the data on it. This
729             # new node has never been used for download before.
730             uri = self._mutable_node_1.get_uri()
731             newnode1 = self.clients[2].create_node_from_uri(uri)
732             newnode2 = self.clients[3].create_node_from_uri(uri)
733             self._newnode3 = self.clients[3].create_node_from_uri(uri)
734             log.msg("starting replace2")
735             d1 = newnode1.overwrite(NEWERDATA)
736             d1.addCallback(lambda res: newnode2.download_to_data())
737             return d1
738         d.addCallback(_check_download_4)
739
740         def _check_download_5(res):
741             log.msg("finished replace2")
742             self.failUnlessEqual(res, NEWERDATA)
743         d.addCallback(_check_download_5)
744
745         def _corrupt_shares(res):
746             # run around and flip bits in all but k of the shares, to test
747             # the hash checks
748             shares = self._find_shares(self.basedir)
749             ## sort by share number
750             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
751             where = dict([ (shnum, filename)
752                            for (client_num, storage_index, filename, shnum)
753                            in shares ])
754             assert len(where) == 10 # this test is designed for 3-of-10
755             for shnum, filename in where.items():
756                 # shares 7,8,9 are left alone. read will check
757                 # (share_hash_chain, block_hash_tree, share_data). New
758                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
759                 # segsize, signature).
760                 if shnum == 0:
761                     # read: this will trigger "pubkey doesn't match
762                     # fingerprint".
763                     self._corrupt_mutable_share(filename, "pubkey")
764                     self._corrupt_mutable_share(filename, "encprivkey")
765                 elif shnum == 1:
766                     # triggers "signature is invalid"
767                     self._corrupt_mutable_share(filename, "seqnum")
768                 elif shnum == 2:
769                     # triggers "signature is invalid"
770                     self._corrupt_mutable_share(filename, "R")
771                 elif shnum == 3:
772                     # triggers "signature is invalid"
773                     self._corrupt_mutable_share(filename, "segsize")
774                 elif shnum == 4:
775                     self._corrupt_mutable_share(filename, "share_hash_chain")
776                 elif shnum == 5:
777                     self._corrupt_mutable_share(filename, "block_hash_tree")
778                 elif shnum == 6:
779                     self._corrupt_mutable_share(filename, "share_data")
780                 # other things to correct: IV, signature
781                 # 7,8,9 are left alone
782
783                 # note that initial_query_count=5 means that we'll hit the
784                 # first 5 servers in effectively random order (based upon
785                 # response time), so we won't necessarily ever get a "pubkey
786                 # doesn't match fingerprint" error (if we hit shnum>=1 before
787                 # shnum=0, we pull the pubkey from there). To get repeatable
788                 # specific failures, we need to set initial_query_count=1,
789                 # but of course that will change the sequencing behavior of
790                 # the retrieval process. TODO: find a reasonable way to make
791                 # this a parameter, probably when we expand this test to test
792                 # for one failure mode at a time.
793
794                 # when we retrieve this, we should get three signature
795                 # failures (where we've mangled seqnum, R, and segsize). The
796                 # pubkey mangling
797         d.addCallback(_corrupt_shares)
798
799         d.addCallback(lambda res: self._newnode3.download_to_data())
800         d.addCallback(_check_download_5)
801
802         def _check_empty_file(res):
803             # make sure we can create empty files, this usually screws up the
804             # segsize math
805             d1 = self.clients[2].create_mutable_file("")
806             d1.addCallback(lambda newnode: newnode.download_to_data())
807             d1.addCallback(lambda res: self.failUnlessEqual("", res))
808             return d1
809         d.addCallback(_check_empty_file)
810
811         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
812         def _created_dirnode(dnode):
813             log.msg("_created_dirnode(%s)" % (dnode,))
814             d1 = dnode.list()
815             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
816             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
817             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
818             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
819             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
820             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
821             d1.addCallback(lambda res: dnode.build_manifest())
822             d1.addCallback(lambda manifest:
823                            self.failUnlessEqual(len(manifest), 1))
824             return d1
825         d.addCallback(_created_dirnode)
826
827         def wait_for_c3_kg_conn():
828             return self.clients[3]._key_generator is not None
829         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
830
831         def check_kg_poolsize(junk, size_delta):
832             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
833                                  self.key_generator_svc.key_generator.pool_size + size_delta)
834
835         d.addCallback(check_kg_poolsize, 0)
836         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
837         d.addCallback(check_kg_poolsize, -1)
838         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
839         d.addCallback(check_kg_poolsize, -2)
840         # use_helper induces use of clients[3], which is the using-key_gen client
841         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
842         d.addCallback(check_kg_poolsize, -3)
843
844         return d
845     # The default 120 second timeout went off when running it under valgrind
846     # on my old Windows laptop, so I'm bumping up the timeout.
847     test_mutable.timeout = 240
848
849     def flip_bit(self, good):
850         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
851
852     def mangle_uri(self, gooduri):
853         # change the key, which changes the storage index, which means we'll
854         # be asking about the wrong file, so nobody will have any shares
855         u = IFileURI(gooduri)
856         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
857                             uri_extension_hash=u.uri_extension_hash,
858                             needed_shares=u.needed_shares,
859                             total_shares=u.total_shares,
860                             size=u.size)
861         return u2.to_string()
862
863     # TODO: add a test which mangles the uri_extension_hash instead, and
864     # should fail due to not being able to get a valid uri_extension block.
865     # Also a test which sneakily mangles the uri_extension block to change
866     # some of the validation data, so it will fail in the post-download phase
867     # when the file's crypttext integrity check fails. Do the same thing for
868     # the key, which should cause the download to fail the post-download
869     # plaintext_hash check.
870
871     def test_vdrive(self):
872         self.basedir = "system/SystemTest/test_vdrive"
873         self.data = LARGE_DATA
874         d = self.set_up_nodes(createprivdir=True)
875         d.addCallback(self._test_introweb)
876         d.addCallback(self.log, "starting publish")
877         d.addCallback(self._do_publish1)
878         d.addCallback(self._test_runner)
879         d.addCallback(self._do_publish2)
880         # at this point, we have the following filesystem (where "R" denotes
881         # self._root_directory_uri):
882         # R
883         # R/subdir1
884         # R/subdir1/mydata567
885         # R/subdir1/subdir2/
886         # R/subdir1/subdir2/mydata992
887
888         d.addCallback(lambda res: self.bounce_client(0))
889         d.addCallback(self.log, "bounced client0")
890
891         d.addCallback(self._check_publish1)
892         d.addCallback(self.log, "did _check_publish1")
893         d.addCallback(self._check_publish2)
894         d.addCallback(self.log, "did _check_publish2")
895         d.addCallback(self._do_publish_private)
896         d.addCallback(self.log, "did _do_publish_private")
897         # now we also have (where "P" denotes a new dir):
898         #  P/personal/sekrit data
899         #  P/s2-rw -> /subdir1/subdir2/
900         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
901         d.addCallback(self._check_publish_private)
902         d.addCallback(self.log, "did _check_publish_private")
903         d.addCallback(self._test_web)
904         d.addCallback(self._test_control)
905         d.addCallback(self._test_cli)
906         # P now has four top-level children:
907         # P/personal/sekrit data
908         # P/s2-ro/
909         # P/s2-rw/
910         # P/test_put/  (empty)
911         d.addCallback(self._test_checker)
912         d.addCallback(self._test_verifier)
913         d.addCallback(self._grab_stats)
914         return d
915     test_vdrive.timeout = 1100
916
917     def _test_introweb(self, res):
918         d = getPage(self.introweb_url, method="GET", followRedirect=True)
919         def _check(res):
920             try:
921                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
922                                 in res)
923                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
924                 self.failUnless("Subscription Summary: storage: 5" in res)
925             except unittest.FailTest:
926                 print
927                 print "GET %s output was:" % self.introweb_url
928                 print res
929                 raise
930         d.addCallback(_check)
931         d.addCallback(lambda res:
932                       getPage(self.introweb_url + "?t=json",
933                               method="GET", followRedirect=True))
934         def _check_json(res):
935             data = simplejson.loads(res)
936             try:
937                 self.failUnlessEqual(data["subscription_summary"],
938                                      {"storage": 5})
939                 self.failUnlessEqual(data["announcement_summary"],
940                                      {"storage": 5, "stub_client": 5})
941             except unittest.FailTest:
942                 print
943                 print "GET %s?t=json output was:" % self.introweb_url
944                 print res
945                 raise
946         d.addCallback(_check_json)
947         return d
948
949     def _do_publish1(self, res):
950         ut = upload.Data(self.data, convergence=None)
951         c0 = self.clients[0]
952         d = c0.create_empty_dirnode()
953         def _made_root(new_dirnode):
954             self._root_directory_uri = new_dirnode.get_uri()
955             return c0.create_node_from_uri(self._root_directory_uri)
956         d.addCallback(_made_root)
957         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
958         def _made_subdir1(subdir1_node):
959             self._subdir1_node = subdir1_node
960             d1 = subdir1_node.add_file(u"mydata567", ut)
961             d1.addCallback(self.log, "publish finished")
962             def _stash_uri(filenode):
963                 self.uri = filenode.get_uri()
964             d1.addCallback(_stash_uri)
965             return d1
966         d.addCallback(_made_subdir1)
967         return d
968
969     def _do_publish2(self, res):
970         ut = upload.Data(self.data, convergence=None)
971         d = self._subdir1_node.create_empty_directory(u"subdir2")
972         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
973         return d
974
975     def log(self, res, msg, **kwargs):
976         # print "MSG: %s  RES: %s" % (msg, res)
977         log.msg(msg, **kwargs)
978         return res
979
980     def stall(self, res, delay=1.0):
981         d = defer.Deferred()
982         reactor.callLater(delay, d.callback, res)
983         return d
984
985     def _do_publish_private(self, res):
986         self.smalldata = "sssh, very secret stuff"
987         ut = upload.Data(self.smalldata, convergence=None)
988         d = self.clients[0].create_empty_dirnode()
989         d.addCallback(self.log, "GOT private directory")
990         def _got_new_dir(privnode):
991             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
992             d1 = privnode.create_empty_directory(u"personal")
993             d1.addCallback(self.log, "made P/personal")
994             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
995             d1.addCallback(self.log, "made P/personal/sekrit data")
996             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
997             def _got_s2(s2node):
998                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
999                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
1000                 return d2
1001             d1.addCallback(_got_s2)
1002             d1.addCallback(lambda res: privnode)
1003             return d1
1004         d.addCallback(_got_new_dir)
1005         return d
1006
1007     def _check_publish1(self, res):
1008         # this one uses the iterative API
1009         c1 = self.clients[1]
1010         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
1011         d.addCallback(self.log, "check_publish1 got /")
1012         d.addCallback(lambda root: root.get(u"subdir1"))
1013         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
1014         d.addCallback(lambda filenode: filenode.download_to_data())
1015         d.addCallback(self.log, "get finished")
1016         def _get_done(data):
1017             self.failUnlessEqual(data, self.data)
1018         d.addCallback(_get_done)
1019         return d
1020
1021     def _check_publish2(self, res):
1022         # this one uses the path-based API
1023         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
1024         d = rootnode.get_child_at_path(u"subdir1")
1025         d.addCallback(lambda dirnode:
1026                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
1027         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1028         d.addCallback(lambda filenode: filenode.download_to_data())
1029         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
1030
1031         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1032         def _got_filenode(filenode):
1033             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
1034             assert fnode == filenode
1035         d.addCallback(_got_filenode)
1036         return d
1037
1038     def _check_publish_private(self, resnode):
1039         # this one uses the path-based API
1040         self._private_node = resnode
1041
1042         d = self._private_node.get_child_at_path(u"personal")
1043         def _got_personal(personal):
1044             self._personal_node = personal
1045             return personal
1046         d.addCallback(_got_personal)
1047
1048         d.addCallback(lambda dirnode:
1049                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
1050         def get_path(path):
1051             return self._private_node.get_child_at_path(path)
1052
1053         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
1054         d.addCallback(lambda filenode: filenode.download_to_data())
1055         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
1056         d.addCallback(lambda res: get_path(u"s2-rw"))
1057         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
1058         d.addCallback(lambda res: get_path(u"s2-ro"))
1059         def _got_s2ro(dirnode):
1060             self.failUnless(dirnode.is_mutable(), dirnode)
1061             self.failUnless(dirnode.is_readonly(), dirnode)
1062             d1 = defer.succeed(None)
1063             d1.addCallback(lambda res: dirnode.list())
1064             d1.addCallback(self.log, "dirnode.list")
1065
1066             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
1067
1068             d1.addCallback(self.log, "doing add_file(ro)")
1069             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
1070             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
1071
1072             d1.addCallback(self.log, "doing get(ro)")
1073             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
1074             d1.addCallback(lambda filenode:
1075                            self.failUnless(IFileNode.providedBy(filenode)))
1076
1077             d1.addCallback(self.log, "doing delete(ro)")
1078             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
1079
1080             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
1081
1082             d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
1083
1084             personal = self._personal_node
1085             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
1086
1087             d1.addCallback(self.log, "doing move_child_to(ro)2")
1088             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
1089
1090             d1.addCallback(self.log, "finished with _got_s2ro")
1091             return d1
1092         d.addCallback(_got_s2ro)
1093         def _got_home(dummy):
1094             home = self._private_node
1095             personal = self._personal_node
1096             d1 = defer.succeed(None)
1097             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
1098             d1.addCallback(lambda res:
1099                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
1100
1101             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
1102             d1.addCallback(lambda res:
1103                            home.move_child_to(u"sekrit", home, u"sekrit data"))
1104
1105             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
1106             d1.addCallback(lambda res:
1107                            home.move_child_to(u"sekrit data", personal))
1108
1109             d1.addCallback(lambda res: home.build_manifest())
1110             d1.addCallback(self.log, "manifest")
1111             #  four items:
1112             # P/personal/
1113             # P/personal/sekrit data
1114             # P/s2-rw  (same as P/s2-ro)
1115             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1116             d1.addCallback(lambda manifest:
1117                            self.failUnlessEqual(len(manifest), 4))
1118             return d1
1119         d.addCallback(_got_home)
1120         return d
1121
1122     def shouldFail(self, res, expected_failure, which, substring=None):
1123         if isinstance(res, Failure):
1124             res.trap(expected_failure)
1125             if substring:
1126                 self.failUnless(substring in str(res),
1127                                 "substring '%s' not in '%s'"
1128                                 % (substring, str(res)))
1129         else:
1130             self.fail("%s was supposed to raise %s, not get '%s'" %
1131                       (which, expected_failure, res))
1132
1133     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1134         assert substring is None or isinstance(substring, str)
1135         d = defer.maybeDeferred(callable, *args, **kwargs)
1136         def done(res):
1137             if isinstance(res, Failure):
1138                 res.trap(expected_failure)
1139                 if substring:
1140                     self.failUnless(substring in str(res),
1141                                     "substring '%s' not in '%s'"
1142                                     % (substring, str(res)))
1143             else:
1144                 self.fail("%s was supposed to raise %s, not get '%s'" %
1145                           (which, expected_failure, res))
1146         d.addBoth(done)
1147         return d
1148
1149     def PUT(self, urlpath, data):
1150         url = self.webish_url + urlpath
1151         return getPage(url, method="PUT", postdata=data)
1152
1153     def GET(self, urlpath, followRedirect=False):
1154         url = self.webish_url + urlpath
1155         return getPage(url, method="GET", followRedirect=followRedirect)
1156
1157     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1158         if use_helper:
1159             url = self.helper_webish_url + urlpath
1160         else:
1161             url = self.webish_url + urlpath
1162         sepbase = "boogabooga"
1163         sep = "--" + sepbase
1164         form = []
1165         form.append(sep)
1166         form.append('Content-Disposition: form-data; name="_charset"')
1167         form.append('')
1168         form.append('UTF-8')
1169         form.append(sep)
1170         for name, value in fields.iteritems():
1171             if isinstance(value, tuple):
1172                 filename, value = value
1173                 form.append('Content-Disposition: form-data; name="%s"; '
1174                             'filename="%s"' % (name, filename.encode("utf-8")))
1175             else:
1176                 form.append('Content-Disposition: form-data; name="%s"' % name)
1177             form.append('')
1178             form.append(str(value))
1179             form.append(sep)
1180         form[-1] += "--"
1181         body = "\r\n".join(form) + "\r\n"
1182         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1183                    }
1184         return getPage(url, method="POST", postdata=body,
1185                        headers=headers, followRedirect=followRedirect)
1186
1187     def _test_web(self, res):
1188         base = self.webish_url
1189         public = "uri/" + self._root_directory_uri
1190         d = getPage(base)
1191         def _got_welcome(page):
1192             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1193             self.failUnless(expected in page,
1194                             "I didn't see the right 'connected storage servers'"
1195                             " message in: %s" % page
1196                             )
1197             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1198             self.failUnless(expected in page,
1199                             "I didn't see the right 'My nodeid' message "
1200                             "in: %s" % page)
1201         d.addCallback(_got_welcome)
1202         d.addCallback(self.log, "done with _got_welcome")
1203
1204         # get the welcome page from the node that uses the helper too
1205         d.addCallback(lambda res: getPage(self.helper_webish_url))
1206         def _got_welcome_helper(page):
1207             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1208                             page)
1209         d.addCallback(_got_welcome_helper)
1210
1211         d.addCallback(lambda res: getPage(base + public))
1212         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1213         def _got_subdir1(page):
1214             # there ought to be an href for our file
1215             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1216             self.failUnless(">mydata567</a>" in page)
1217         d.addCallback(_got_subdir1)
1218         d.addCallback(self.log, "done with _got_subdir1")
1219         d.addCallback(lambda res:
1220                       getPage(base + public + "/subdir1/mydata567"))
1221         def _got_data(page):
1222             self.failUnlessEqual(page, self.data)
1223         d.addCallback(_got_data)
1224
1225         # download from a URI embedded in a URL
1226         d.addCallback(self.log, "_get_from_uri")
1227         def _get_from_uri(res):
1228             return getPage(base + "uri/%s?filename=%s"
1229                            % (self.uri, "mydata567"))
1230         d.addCallback(_get_from_uri)
1231         def _got_from_uri(page):
1232             self.failUnlessEqual(page, self.data)
1233         d.addCallback(_got_from_uri)
1234
1235         # download from a URI embedded in a URL, second form
1236         d.addCallback(self.log, "_get_from_uri2")
1237         def _get_from_uri2(res):
1238             return getPage(base + "uri?uri=%s" % (self.uri,))
1239         d.addCallback(_get_from_uri2)
1240         d.addCallback(_got_from_uri)
1241
1242         # download from a bogus URI, make sure we get a reasonable error
1243         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1244         def _get_from_bogus_uri(res):
1245             d1 = getPage(base + "uri/%s?filename=%s"
1246                          % (self.mangle_uri(self.uri), "mydata567"))
1247             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1248                        "410")
1249             return d1
1250         d.addCallback(_get_from_bogus_uri)
1251         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1252
1253         # upload a file with PUT
1254         d.addCallback(self.log, "about to try PUT")
1255         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1256                                            "new.txt contents"))
1257         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1258         d.addCallback(self.failUnlessEqual, "new.txt contents")
1259         # and again with something large enough to use multiple segments,
1260         # and hopefully trigger pauseProducing too
1261         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1262                                            "big" * 500000)) # 1.5MB
1263         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1264         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1265
1266         # can we replace files in place?
1267         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1268                                            "NEWER contents"))
1269         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1270         d.addCallback(self.failUnlessEqual, "NEWER contents")
1271
1272         # test unlinked POST
1273         d.addCallback(lambda res: self.POST("uri", t="upload",
1274                                             file=("new.txt", "data" * 10000)))
1275         # and again using the helper, which exercises different upload-status
1276         # display code
1277         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1278                                             file=("foo.txt", "data2" * 10000)))
1279
1280         # check that the status page exists
1281         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1282         def _got_status(res):
1283             # find an interesting upload and download to look at. LIT files
1284             # are not interesting.
1285             for dl in self.clients[0].list_recent_downloads():
1286                 if dl.get_size() > 200:
1287                     self._down_status = dl.get_counter()
1288             for ul in self.clients[0].list_recent_uploads():
1289                 if ul.get_size() > 200:
1290                     self._up_status = ul.get_counter()
1291             rs = self.clients[0].list_recent_retrieve()[0]
1292             self._retrieve_status = rs.get_counter()
1293             ps = self.clients[0].list_recent_publish()[0]
1294             self._publish_status = ps.get_counter()
1295
1296             # and that there are some upload- and download- status pages
1297             return self.GET("status/up-%d" % self._up_status)
1298         d.addCallback(_got_status)
1299         def _got_up(res):
1300             return self.GET("status/down-%d" % self._down_status)
1301         d.addCallback(_got_up)
1302         def _got_down(res):
1303             return self.GET("status/publish-%d" % self._publish_status)
1304         d.addCallback(_got_down)
1305         def _got_publish(res):
1306             return self.GET("status/retrieve-%d" % self._retrieve_status)
1307         d.addCallback(_got_publish)
1308
1309         # check that the helper status page exists
1310         d.addCallback(lambda res:
1311                       self.GET("helper_status", followRedirect=True))
1312         def _got_helper_status(res):
1313             self.failUnless("Bytes Fetched:" in res)
1314             # touch a couple of files in the helper's working directory to
1315             # exercise more code paths
1316             workdir = os.path.join(self.getdir("client0"), "helper")
1317             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1318             f = open(incfile, "wb")
1319             f.write("small file")
1320             f.close()
1321             then = time.time() - 86400*3
1322             now = time.time()
1323             os.utime(incfile, (now, then))
1324             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1325             f = open(encfile, "wb")
1326             f.write("less small file")
1327             f.close()
1328             os.utime(encfile, (now, then))
1329         d.addCallback(_got_helper_status)
1330         # and that the json form exists
1331         d.addCallback(lambda res:
1332                       self.GET("helper_status?t=json", followRedirect=True))
1333         def _got_helper_status_json(res):
1334             data = simplejson.loads(res)
1335             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1336                                  1)
1337             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1338             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1339             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1340                                  10)
1341             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1342             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1343             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1344                                  15)
1345         d.addCallback(_got_helper_status_json)
1346
1347         # and check that client[3] (which uses a helper but does not run one
1348         # itself) doesn't explode when you ask for its helper status with
1349         # t=json
1350         d.addCallback(lambda res:
1351                       getPage(self.helper_webish_url + "helper_status?t=json"))
1352         def _got_non_helper_status_json(res):
1353             data = simplejson.loads(res)
1354             self.failUnlessEqual(data, {})
1355         d.addCallback(_got_non_helper_status_json)
1356
1357         # see if the statistics page exists
1358         d.addCallback(lambda res: self.GET("statistics"))
1359         def _got_stats(res):
1360             self.failUnless("Node Statistics" in res)
1361             self.failUnless("  'downloader.files_downloaded': 8," in res)
1362         d.addCallback(_got_stats)
1363         d.addCallback(lambda res: self.GET("statistics?t=json"))
1364         def _got_stats_json(res):
1365             data = simplejson.loads(res)
1366             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1367             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1368         d.addCallback(_got_stats_json)
1369
1370         # TODO: mangle the second segment of a file, to test errors that
1371         # occur after we've already sent some good data, which uses a
1372         # different error path.
1373
1374         # TODO: download a URI with a form
1375         # TODO: create a directory by using a form
1376         # TODO: upload by using a form on the directory page
1377         #    url = base + "somedir/subdir1/freeform_post!!upload"
1378         # TODO: delete a file by using a button on the directory page
1379
1380         return d
1381
1382     def _test_runner(self, res):
1383         # exercise some of the diagnostic tools in runner.py
1384
1385         # find a share
1386         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1387             if "storage" not in dirpath:
1388                 continue
1389             if not filenames:
1390                 continue
1391             pieces = dirpath.split(os.sep)
1392             if pieces[-4] == "storage" and pieces[-3] == "shares":
1393                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1394                 # are sharefiles here
1395                 filename = os.path.join(dirpath, filenames[0])
1396                 # peek at the magic to see if it is a chk share
1397                 magic = open(filename, "rb").read(4)
1398                 if magic == '\x00\x00\x00\x01':
1399                     break
1400         else:
1401             self.fail("unable to find any uri_extension files in %s"
1402                       % self.basedir)
1403         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1404
1405         out,err = StringIO(), StringIO()
1406         rc = runner.runner(["dump-share",
1407                             filename],
1408                            stdout=out, stderr=err)
1409         output = out.getvalue()
1410         self.failUnlessEqual(rc, 0)
1411
1412         # we only upload a single file, so we can assert some things about
1413         # its size and shares.
1414         self.failUnless(("share filename: %s" % filename) in output)
1415         self.failUnless("size: %d\n" % len(self.data) in output)
1416         self.failUnless("num_segments: 1\n" in output)
1417         # segment_size is always a multiple of needed_shares
1418         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1419         self.failUnless("total_shares: 10\n" in output)
1420         # keys which are supposed to be present
1421         for key in ("size", "num_segments", "segment_size",
1422                     "needed_shares", "total_shares",
1423                     "codec_name", "codec_params", "tail_codec_params",
1424                     #"plaintext_hash", "plaintext_root_hash",
1425                     "crypttext_hash", "crypttext_root_hash",
1426                     "share_root_hash", "UEB_hash"):
1427             self.failUnless("%s: " % key in output, key)
1428
1429         # now use its storage index to find the other shares using the
1430         # 'find-shares' tool
1431         sharedir, shnum = os.path.split(filename)
1432         storagedir, storage_index_s = os.path.split(sharedir)
1433         out,err = StringIO(), StringIO()
1434         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1435         cmd = ["find-shares", storage_index_s] + nodedirs
1436         rc = runner.runner(cmd, stdout=out, stderr=err)
1437         self.failUnlessEqual(rc, 0)
1438         out.seek(0)
1439         sharefiles = [sfn.strip() for sfn in out.readlines()]
1440         self.failUnlessEqual(len(sharefiles), 10)
1441
1442         # also exercise the 'catalog-shares' tool
1443         out,err = StringIO(), StringIO()
1444         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1445         cmd = ["catalog-shares"] + nodedirs
1446         rc = runner.runner(cmd, stdout=out, stderr=err)
1447         self.failUnlessEqual(rc, 0)
1448         out.seek(0)
1449         descriptions = [sfn.strip() for sfn in out.readlines()]
1450         self.failUnlessEqual(len(descriptions), 30)
1451         matching = [line
1452                     for line in descriptions
1453                     if line.startswith("CHK %s " % storage_index_s)]
1454         self.failUnlessEqual(len(matching), 10)
1455
1456     def _test_control(self, res):
1457         # exercise the remote-control-the-client foolscap interfaces in
1458         # allmydata.control (mostly used for performance tests)
1459         c0 = self.clients[0]
1460         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1461         control_furl = open(control_furl_file, "r").read().strip()
1462         # it doesn't really matter which Tub we use to connect to the client,
1463         # so let's just use our IntroducerNode's
1464         d = self.introducer.tub.getReference(control_furl)
1465         d.addCallback(self._test_control2, control_furl_file)
1466         return d
1467     def _test_control2(self, rref, filename):
1468         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1469         downfile = os.path.join(self.basedir, "control.downfile")
1470         d.addCallback(lambda uri:
1471                       rref.callRemote("download_from_uri_to_file",
1472                                       uri, downfile))
1473         def _check(res):
1474             self.failUnlessEqual(res, downfile)
1475             data = open(downfile, "r").read()
1476             expected_data = open(filename, "r").read()
1477             self.failUnlessEqual(data, expected_data)
1478         d.addCallback(_check)
1479         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1480         if sys.platform == "linux2":
1481             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1482         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1483         return d
1484
1485     def _test_cli(self, res):
1486         # run various CLI commands (in a thread, since they use blocking
1487         # network calls)
1488
1489         private_uri = self._private_node.get_uri()
1490         some_uri = self._root_directory_uri
1491         client0_basedir = self.getdir("client0")
1492
1493         nodeargs = [
1494             "--node-directory", client0_basedir,
1495             "--dir-cap", private_uri,
1496             ]
1497         public_nodeargs = [
1498             "--node-url", self.webish_url,
1499             "--dir-cap", some_uri,
1500             ]
1501         TESTDATA = "I will not write the same thing over and over.\n" * 100
1502
1503         d = defer.succeed(None)
1504
1505         def _ls_root(res):
1506             argv = ["ls"] + nodeargs
1507             return self._run_cli(argv)
1508         d.addCallback(_ls_root)
1509         def _check_ls_root((out,err)):
1510             self.failUnless("personal" in out)
1511             self.failUnless("s2-ro" in out)
1512             self.failUnless("s2-rw" in out)
1513             self.failUnlessEqual(err, "")
1514         d.addCallback(_check_ls_root)
1515
1516         def _ls_subdir(res):
1517             argv = ["ls"] + nodeargs + ["personal"]
1518             return self._run_cli(argv)
1519         d.addCallback(_ls_subdir)
1520         def _check_ls_subdir((out,err)):
1521             self.failUnless("sekrit data" in out)
1522             self.failUnlessEqual(err, "")
1523         d.addCallback(_check_ls_subdir)
1524
1525         def _ls_public_subdir(res):
1526             argv = ["ls"] + public_nodeargs + ["subdir1"]
1527             return self._run_cli(argv)
1528         d.addCallback(_ls_public_subdir)
1529         def _check_ls_public_subdir((out,err)):
1530             self.failUnless("subdir2" in out)
1531             self.failUnless("mydata567" in out)
1532             self.failUnlessEqual(err, "")
1533         d.addCallback(_check_ls_public_subdir)
1534
1535         def _ls_file(res):
1536             argv = ["ls"] + public_nodeargs + ["subdir1/mydata567"]
1537             return self._run_cli(argv)
1538         d.addCallback(_ls_file)
1539         def _check_ls_file((out,err)):
1540             self.failUnlessEqual(out.strip(), "112 subdir1/mydata567")
1541             self.failUnlessEqual(err, "")
1542         d.addCallback(_check_ls_file)
1543
1544         # tahoe_ls doesn't currently handle the error correctly: it tries to
1545         # JSON-parse a traceback.
1546 ##         def _ls_missing(res):
1547 ##             argv = ["ls"] + nodeargs + ["bogus"]
1548 ##             return self._run_cli(argv)
1549 ##         d.addCallback(_ls_missing)
1550 ##         def _check_ls_missing((out,err)):
1551 ##             print "OUT", out
1552 ##             print "ERR", err
1553 ##             self.failUnlessEqual(err, "")
1554 ##         d.addCallback(_check_ls_missing)
1555
1556         def _put(res):
1557             tdir = self.getdir("cli_put")
1558             fileutil.make_dirs(tdir)
1559             fn = os.path.join(tdir, "upload_me")
1560             f = open(fn, "wb")
1561             f.write(TESTDATA)
1562             f.close()
1563             argv = ["put"] + nodeargs + [fn, "test_put/upload.txt"]
1564             return self._run_cli(argv)
1565         d.addCallback(_put)
1566         def _check_put((out,err)):
1567             self.failUnless("200 OK" in out)
1568             self.failUnlessEqual(err, "")
1569             d = self._private_node.get_child_at_path(u"test_put/upload.txt")
1570             d.addCallback(lambda filenode: filenode.download_to_data())
1571             def _check_put2(res):
1572                 self.failUnlessEqual(res, TESTDATA)
1573             d.addCallback(_check_put2)
1574             return d
1575         d.addCallback(_check_put)
1576
1577         def _get_to_stdout(res):
1578             argv = ["get"] + nodeargs + ["test_put/upload.txt"]
1579             return self._run_cli(argv)
1580         d.addCallback(_get_to_stdout)
1581         def _check_get_to_stdout((out,err)):
1582             self.failUnlessEqual(out, TESTDATA)
1583             self.failUnlessEqual(err, "")
1584         d.addCallback(_check_get_to_stdout)
1585
1586         get_to_file_target = self.basedir + "/get.downfile"
1587         def _get_to_file(res):
1588             argv = ["get"] + nodeargs + ["test_put/upload.txt",
1589                                          get_to_file_target]
1590             return self._run_cli(argv)
1591         d.addCallback(_get_to_file)
1592         def _check_get_to_file((out,err)):
1593             data = open(get_to_file_target, "rb").read()
1594             self.failUnlessEqual(data, TESTDATA)
1595             self.failUnlessEqual(out, "")
1596             self.failUnlessEqual(err, "test_put/upload.txt retrieved and written to system/SystemTest/test_vdrive/get.downfile\n")
1597         d.addCallback(_check_get_to_file)
1598
1599
1600         def _mv(res):
1601             argv = ["mv"] + nodeargs + ["test_put/upload.txt",
1602                                         "test_put/moved.txt"]
1603             return self._run_cli(argv)
1604         d.addCallback(_mv)
1605         def _check_mv((out,err)):
1606             self.failUnless("OK" in out)
1607             self.failUnlessEqual(err, "")
1608             d = self.shouldFail2(KeyError, "test_cli._check_rm", "'upload.txt'", self._private_node.get_child_at_path, u"test_put/upload.txt")
1609
1610             d.addCallback(lambda res:
1611                           self._private_node.get_child_at_path(u"test_put/moved.txt"))
1612             d.addCallback(lambda filenode: filenode.download_to_data())
1613             def _check_mv2(res):
1614                 self.failUnlessEqual(res, TESTDATA)
1615             d.addCallback(_check_mv2)
1616             return d
1617         d.addCallback(_check_mv)
1618
1619         def _rm(res):
1620             argv = ["rm"] + nodeargs + ["test_put/moved.txt"]
1621             return self._run_cli(argv)
1622         d.addCallback(_rm)
1623         def _check_rm((out,err)):
1624             self.failUnless("200 OK" in out)
1625             self.failUnlessEqual(err, "")
1626             d = self.shouldFail2(KeyError, "test_cli._check_rm", "'moved.txt'", self._private_node.get_child_at_path, u"test_put/moved.txt")
1627             return d
1628         d.addCallback(_check_rm)
1629         return d
1630
1631     def _run_cli(self, argv):
1632         stdout, stderr = StringIO(), StringIO()
1633         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1634                                   stdout=stdout, stderr=stderr)
1635         def _done(res):
1636             return stdout.getvalue(), stderr.getvalue()
1637         d.addCallback(_done)
1638         return d
1639
1640     def _test_checker(self, res):
1641         d = self._private_node.build_manifest()
1642         d.addCallback(self._test_checker_2)
1643         return d
1644
1645     def _test_checker_2(self, manifest):
1646         checker1 = self.clients[1].getServiceNamed("checker")
1647         self.failUnlessEqual(checker1.checker_results_for(None), [])
1648         self.failUnlessEqual(checker1.checker_results_for(list(manifest)[0]),
1649                              [])
1650         dl = []
1651         starting_time = time.time()
1652         for si in manifest:
1653             dl.append(checker1.check(si))
1654         d = deferredutil.DeferredListShouldSucceed(dl)
1655
1656         def _check_checker_results(res):
1657             for i in res:
1658                 if type(i) is bool:
1659                     self.failUnless(i is True)
1660                 else:
1661                     (needed, total, found, sharemap) = i
1662                     self.failUnlessEqual(needed, 3)
1663                     self.failUnlessEqual(total, 10)
1664                     self.failUnlessEqual(found, total)
1665                     self.failUnlessEqual(len(sharemap.keys()), 10)
1666                     peers = set()
1667                     for shpeers in sharemap.values():
1668                         peers.update(shpeers)
1669                     self.failUnlessEqual(len(peers), self.numclients)
1670         d.addCallback(_check_checker_results)
1671
1672         def _check_stored_results(res):
1673             finish_time = time.time()
1674             all_results = []
1675             for si in manifest:
1676                 results = checker1.checker_results_for(si)
1677                 if not results:
1678                     # TODO: implement checker for mutable files and implement tests of that checker
1679                     continue
1680                 self.failUnlessEqual(len(results), 1)
1681                 when, those_results = results[0]
1682                 self.failUnless(isinstance(when, (int, float)))
1683                 self.failUnless(starting_time <= when <= finish_time)
1684                 all_results.append(those_results)
1685             _check_checker_results(all_results)
1686         d.addCallback(_check_stored_results)
1687
1688         d.addCallback(self._test_checker_3)
1689         return d
1690
1691     def _test_checker_3(self, res):
1692         # check one file, through FileNode.check()
1693         d = self._private_node.get_child_at_path(u"personal/sekrit data")
1694         d.addCallback(lambda n: n.check())
1695         def _checked(results):
1696             # 'sekrit data' is small, and fits in a LiteralFileNode, so
1697             # checking it is trivial and always returns True
1698             self.failUnlessEqual(results, True)
1699         d.addCallback(_checked)
1700
1701         c0 = self.clients[1]
1702         n = c0.create_node_from_uri(self._root_directory_uri)
1703         d.addCallback(lambda res: n.get_child_at_path(u"subdir1/mydata567"))
1704         d.addCallback(lambda n: n.check())
1705         def _checked2(results):
1706             # mydata567 is large and lives in a CHK
1707             (needed, total, found, sharemap) = results
1708             self.failUnlessEqual(needed, 3)
1709             self.failUnlessEqual(total, 10)
1710             self.failUnlessEqual(found, 10)
1711             self.failUnlessEqual(len(sharemap), 10)
1712             for shnum in range(10):
1713                 self.failUnlessEqual(len(sharemap[shnum]), 1)
1714         d.addCallback(_checked2)
1715         return d
1716
1717
1718     def _test_verifier(self, res):
1719         checker1 = self.clients[1].getServiceNamed("checker")
1720         d = self._private_node.build_manifest()
1721         def _check_all(manifest):
1722             dl = []
1723             for si in manifest:
1724                 dl.append(checker1.verify(si))
1725             return deferredutil.DeferredListShouldSucceed(dl)
1726         d.addCallback(_check_all)
1727         def _done(res):
1728             for i in res:
1729                 self.failUnless(i is True)
1730         d.addCallback(_done)
1731         d.addCallback(lambda res: checker1.verify(None))
1732         d.addCallback(self.failUnlessEqual, True)
1733         return d