]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
helper stats: fix the /helper_status page, the recent conflict merging missed some...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1
2 from base64 import b32encode
3 import os, sys, time, re, simplejson
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.application import service
10 import allmydata
11 from allmydata import client, uri, download, upload, storage, mutable, offloaded
12 from allmydata.introducer import IntroducerNode
13 from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
14 from allmydata.util import log
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
17 from allmydata.mutable import NotMutableError
18 from allmydata.stats import PickleStatsGatherer
19 from allmydata.key_generator import KeyGeneratorService
20 from foolscap.eventual import flushEventualQueue, fireEventually
21 from foolscap import DeadReferenceError, Tub
22 from twisted.python.failure import Failure
23 from twisted.web.client import getPage
24 from twisted.web.error import Error
25
26 def flush_but_dont_ignore(res):
27     d = flushEventualQueue()
28     def _done(ignored):
29         return res
30     d.addCallback(_done)
31     return d
32
33 LARGE_DATA = """
34 This is some data to publish to the virtual drive, which needs to be large
35 enough to not fit inside a LIT uri.
36 """
37
38 class CountingDataUploadable(upload.Data):
39     bytes_read = 0
40     interrupt_after = None
41     interrupt_after_d = None
42
43     def read(self, length):
44         self.bytes_read += length
45         if self.interrupt_after is not None:
46             if self.bytes_read > self.interrupt_after:
47                 self.interrupt_after = None
48                 self.interrupt_after_d.callback(self)
49         return upload.Data.read(self, length)
50
51
52 class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
53
54     def setUp(self):
55         self.sparent = service.MultiService()
56         self.sparent.startService()
57     def tearDown(self):
58         log.msg("shutting down SystemTest services")
59         d = self.sparent.stopService()
60         d.addBoth(flush_but_dont_ignore)
61         return d
62
63     def getdir(self, subdir):
64         return os.path.join(self.basedir, subdir)
65
66     def add_service(self, s):
67         s.setServiceParent(self.sparent)
68         return s
69
70     def set_up_nodes(self, NUMCLIENTS=5, createprivdir=False):
71         self.numclients = NUMCLIENTS
72         self.createprivdir = createprivdir
73         iv_dir = self.getdir("introducer")
74         if not os.path.isdir(iv_dir):
75             fileutil.make_dirs(iv_dir)
76         f = open(os.path.join(iv_dir, "webport"), "w")
77         f.write("tcp:0:interface=127.0.0.1\n")
78         f.close()
79         iv = IntroducerNode(basedir=iv_dir)
80         self.introducer = self.add_service(iv)
81         d = self.introducer.when_tub_ready()
82         d.addCallback(self._get_introducer_web)
83         d.addCallback(self._set_up_stats_gatherer)
84         d.addCallback(self._set_up_key_generator)
85         d.addCallback(self._set_up_nodes_2)
86         d.addCallback(self._grab_stats)
87         return d
88
89     def _get_introducer_web(self, res):
90         f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
91         self.introweb_url = f.read().strip()
92         f.close()
93
94     def _set_up_stats_gatherer(self, res):
95         statsdir = self.getdir("stats_gatherer")
96         fileutil.make_dirs(statsdir)
97         t = Tub()
98         self.add_service(t)
99         l = t.listenOn("tcp:0")
100         p = l.getPortnum()
101         t.setLocation("localhost:%d" % p)
102
103         self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
104         self.add_service(self.stats_gatherer)
105         self.stats_gatherer_furl = self.stats_gatherer.get_furl()
106
107     def _set_up_key_generator(self, res):
108         kgsdir = self.getdir("key_generator")
109         fileutil.make_dirs(kgsdir)
110
111         self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False)
112         self.key_generator_svc.key_generator.pool_size = 4
113         self.key_generator_svc.key_generator.pool_refresh_delay = 60
114         self.add_service(self.key_generator_svc)
115
116         d = fireEventually()
117         def check_for_furl():
118             return os.path.exists(os.path.join(kgsdir, 'key_generator.furl'))
119         d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
120         def get_furl(junk):
121             kgf = os.path.join(kgsdir, 'key_generator.furl')
122             self.key_generator_furl = file(kgf, 'rb').read().strip()
123         d.addCallback(get_furl)
124         return d
125
126     def _set_up_nodes_2(self, res):
127         q = self.introducer
128         self.introducer_furl = q.introducer_url
129         self.clients = []
130         basedirs = []
131         for i in range(self.numclients):
132             basedir = self.getdir("client%d" % i)
133             basedirs.append(basedir)
134             fileutil.make_dirs(basedir)
135             if i == 0:
136                 # client[0] runs a webserver and a helper, no key_generator
137                 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
138                 open(os.path.join(basedir, "run_helper"), "w").write("yes\n")
139             if i == 3:
140                 # client[3] runs a webserver and uses a helper, uses key_generator
141                 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
142                 kgf = "%s\n" % (self.key_generator_furl,)
143                 open(os.path.join(basedir, "key_generator.furl"), "w").write(kgf)
144             if self.createprivdir:
145                 fileutil.make_dirs(os.path.join(basedir, "private"))
146                 open(os.path.join(basedir, "private", "root_dir.cap"), "w")
147             open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
148             open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
149
150         # start client[0], wait for it's tub to be ready (at which point it
151         # will have registered the helper furl).
152         c = self.add_service(client.Client(basedir=basedirs[0]))
153         self.clients.append(c)
154         d = c.when_tub_ready()
155         def _ready(res):
156             f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
157             helper_furl = f.read()
158             f.close()
159             self.helper_furl = helper_furl
160             f = open(os.path.join(basedirs[3],"helper.furl"), "w")
161             f.write(helper_furl)
162             f.close()
163
164             # this starts the rest of the clients
165             for i in range(1, self.numclients):
166                 c = self.add_service(client.Client(basedir=basedirs[i]))
167                 self.clients.append(c)
168             log.msg("STARTING")
169             return self.wait_for_connections()
170         d.addCallback(_ready)
171         def _connected(res):
172             log.msg("CONNECTED")
173             # now find out where the web port was
174             l = self.clients[0].getServiceNamed("webish").listener
175             port = l._port.getHost().port
176             self.webish_url = "http://localhost:%d/" % port
177             # and the helper-using webport
178             l = self.clients[3].getServiceNamed("webish").listener
179             port = l._port.getHost().port
180             self.helper_webish_url = "http://localhost:%d/" % port
181         d.addCallback(_connected)
182         return d
183
184     def _grab_stats(self, res):
185         d = self.stats_gatherer.poll()
186         return d
187
188     def bounce_client(self, num):
189         c = self.clients[num]
190         d = c.disownServiceParent()
191         # I think windows requires a moment to let the connection really stop
192         # and the port number made available for re-use. TODO: examine the
193         # behavior, see if this is really the problem, see if we can do
194         # better than blindly waiting for a second.
195         d.addCallback(self.stall, 1.0)
196         def _stopped(res):
197             new_c = client.Client(basedir=self.getdir("client%d" % num))
198             self.clients[num] = new_c
199             self.add_service(new_c)
200             return new_c.when_tub_ready()
201         d.addCallback(_stopped)
202         d.addCallback(lambda res: self.wait_for_connections())
203         def _maybe_get_webport(res):
204             if num == 0:
205                 # now find out where the web port was
206                 l = self.clients[0].getServiceNamed("webish").listener
207                 port = l._port.getHost().port
208                 self.webish_url = "http://localhost:%d/" % port
209         d.addCallback(_maybe_get_webport)
210         return d
211
212     def add_extra_node(self, client_num, helper_furl=None,
213                        add_to_sparent=False):
214         # usually this node is *not* parented to our self.sparent, so we can
215         # shut it down separately from the rest, to exercise the
216         # connection-lost code
217         basedir = self.getdir("client%d" % client_num)
218         if not os.path.isdir(basedir):
219             fileutil.make_dirs(basedir)
220         open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
221         if helper_furl:
222             f = open(os.path.join(basedir, "helper.furl") ,"w")
223             f.write(helper_furl+"\n")
224             f.close()
225
226         c = client.Client(basedir=basedir)
227         self.clients.append(c)
228         self.numclients += 1
229         if add_to_sparent:
230             c.setServiceParent(self.sparent)
231         else:
232             c.startService()
233         d = self.wait_for_connections()
234         d.addCallback(lambda res: c)
235         return d
236
237     def _check_connections(self):
238         for c in self.clients:
239             ic = c.introducer_client
240             if not ic.connected_to_introducer():
241                 return False
242             if len(ic.get_all_peerids()) != self.numclients:
243                 return False
244         return True
245
246     def wait_for_connections(self, ignored=None):
247         # TODO: replace this with something that takes a list of peerids and
248         # fires when they've all been heard from, instead of using a count
249         # and a threshold
250         return self.poll(self._check_connections, timeout=200)
251
252     def test_connections(self):
253         self.basedir = "system/SystemTest/test_connections"
254         d = self.set_up_nodes()
255         self.extra_node = None
256         d.addCallback(lambda res: self.add_extra_node(self.numclients))
257         def _check(extra_node):
258             self.extra_node = extra_node
259             for c in self.clients:
260                 all_peerids = list(c.get_all_peerids())
261                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
262                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
263                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
264
265         d.addCallback(_check)
266         def _shutdown_extra_node(res):
267             if self.extra_node:
268                 return self.extra_node.stopService()
269             return res
270         d.addBoth(_shutdown_extra_node)
271         return d
272     test_connections.timeout = 300
273     # test_connections is subsumed by test_upload_and_download, and takes
274     # quite a while to run on a slow machine (because of all the TLS
275     # connections that must be established). If we ever rework the introducer
276     # code to such an extent that we're not sure if it works anymore, we can
277     # reinstate this test until it does.
278     del test_connections
279
280     def test_upload_and_download_random_key(self):
281         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
282         return self._test_upload_and_download(convergence=None)
283     test_upload_and_download_random_key.timeout = 4800
284
285     def test_upload_and_download_convergent(self):
286         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
287         return self._test_upload_and_download(convergence="some convergence string")
288     test_upload_and_download_convergent.timeout = 4800
289
290     def _test_upload_and_download(self, convergence):
291         # we use 4000 bytes of data, which will result in about 400k written
292         # to disk among all our simulated nodes
293         DATA = "Some data to upload\n" * 200
294         d = self.set_up_nodes()
295         def _check_connections(res):
296             for c in self.clients:
297                 all_peerids = list(c.get_all_peerids())
298                 self.failUnlessEqual(len(all_peerids), self.numclients)
299                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
300                 self.failUnlessEqual(len(permuted_peers), self.numclients)
301         d.addCallback(_check_connections)
302
303         def _do_upload(res):
304             log.msg("UPLOADING")
305             u = self.clients[0].getServiceNamed("uploader")
306             self.uploader = u
307             # we crank the max segsize down to 1024b for the duration of this
308             # test, so we can exercise multiple segments. It is important
309             # that this is not a multiple of the segment size, so that the
310             # tail segment is not the same length as the others. This actualy
311             # gets rounded up to 1025 to be a multiple of the number of
312             # required shares (since we use 25 out of 100 FEC).
313             up = upload.Data(DATA, convergence=convergence)
314             up.max_segment_size = 1024
315             d1 = u.upload(up)
316             return d1
317         d.addCallback(_do_upload)
318         def _upload_done(results):
319             uri = results.uri
320             log.msg("upload finished: uri is %s" % (uri,))
321             self.uri = uri
322             dl = self.clients[1].getServiceNamed("downloader")
323             self.downloader = dl
324         d.addCallback(_upload_done)
325
326         def _upload_again(res):
327             # Upload again. If using convergent encryption then this ought to be
328             # short-circuited, however with the way we currently generate URIs
329             # (i.e. because they include the roothash), we have to do all of the
330             # encoding work, and only get to save on the upload part.
331             log.msg("UPLOADING AGAIN")
332             up = upload.Data(DATA, convergence=convergence)
333             up.max_segment_size = 1024
334             d1 = self.uploader.upload(up)
335         d.addCallback(_upload_again)
336
337         def _download_to_data(res):
338             log.msg("DOWNLOADING")
339             return self.downloader.download_to_data(self.uri)
340         d.addCallback(_download_to_data)
341         def _download_to_data_done(data):
342             log.msg("download finished")
343             self.failUnlessEqual(data, DATA)
344         d.addCallback(_download_to_data_done)
345
346         target_filename = os.path.join(self.basedir, "download.target")
347         def _download_to_filename(res):
348             return self.downloader.download_to_filename(self.uri,
349                                                         target_filename)
350         d.addCallback(_download_to_filename)
351         def _download_to_filename_done(res):
352             newdata = open(target_filename, "rb").read()
353             self.failUnlessEqual(newdata, DATA)
354         d.addCallback(_download_to_filename_done)
355
356         target_filename2 = os.path.join(self.basedir, "download.target2")
357         def _download_to_filehandle(res):
358             fh = open(target_filename2, "wb")
359             return self.downloader.download_to_filehandle(self.uri, fh)
360         d.addCallback(_download_to_filehandle)
361         def _download_to_filehandle_done(fh):
362             fh.close()
363             newdata = open(target_filename2, "rb").read()
364             self.failUnlessEqual(newdata, DATA)
365         d.addCallback(_download_to_filehandle_done)
366
367         def _download_nonexistent_uri(res):
368             baduri = self.mangle_uri(self.uri)
369             log.msg("about to download non-existent URI", level=log.UNUSUAL,
370                     facility="tahoe.tests")
371             d1 = self.downloader.download_to_data(baduri)
372             def _baduri_should_fail(res):
373                 log.msg("finished downloading non-existend URI",
374                         level=log.UNUSUAL, facility="tahoe.tests")
375                 self.failUnless(isinstance(res, Failure))
376                 self.failUnless(res.check(download.NotEnoughPeersError),
377                                 "expected NotEnoughPeersError, got %s" % res)
378                 # TODO: files that have zero peers should get a special kind
379                 # of NotEnoughPeersError, which can be used to suggest that
380                 # the URI might be wrong or that they've never uploaded the
381                 # file in the first place.
382             d1.addBoth(_baduri_should_fail)
383             return d1
384         d.addCallback(_download_nonexistent_uri)
385
386         # add a new node, which doesn't accept shares, and only uses the
387         # helper for upload.
388         d.addCallback(lambda res: self.add_extra_node(self.numclients,
389                                                       self.helper_furl,
390                                                       add_to_sparent=True))
391         def _added(extra_node):
392             self.extra_node = extra_node
393             extra_node.getServiceNamed("storage").sizelimit = 0
394         d.addCallback(_added)
395
396         HELPER_DATA = "Data that needs help to upload" * 1000
397         def _upload_with_helper(res):
398             u = upload.Data(HELPER_DATA, convergence=convergence)
399             d = self.extra_node.upload(u)
400             def _uploaded(results):
401                 uri = results.uri
402                 return self.downloader.download_to_data(uri)
403             d.addCallback(_uploaded)
404             def _check(newdata):
405                 self.failUnlessEqual(newdata, HELPER_DATA)
406             d.addCallback(_check)
407             return d
408         d.addCallback(_upload_with_helper)
409
410         def _upload_duplicate_with_helper(res):
411             u = upload.Data(HELPER_DATA, convergence=convergence)
412             u.debug_stash_RemoteEncryptedUploadable = True
413             d = self.extra_node.upload(u)
414             def _uploaded(results):
415                 uri = results.uri
416                 return self.downloader.download_to_data(uri)
417             d.addCallback(_uploaded)
418             def _check(newdata):
419                 self.failUnlessEqual(newdata, HELPER_DATA)
420                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
421                             "uploadable started uploading, should have been avoided")
422             d.addCallback(_check)
423             return d
424         if convergence is not None:
425             d.addCallback(_upload_duplicate_with_helper)
426
427         def _upload_resumable(res):
428             DATA = "Data that needs help to upload and gets interrupted" * 1000
429             u1 = CountingDataUploadable(DATA, convergence=convergence)
430             u2 = CountingDataUploadable(DATA, convergence=convergence)
431
432             # we interrupt the connection after about 5kB by shutting down
433             # the helper, then restartingit.
434             u1.interrupt_after = 5000
435             u1.interrupt_after_d = defer.Deferred()
436             u1.interrupt_after_d.addCallback(lambda res:
437                                              self.bounce_client(0))
438
439             # sneak into the helper and reduce its chunk size, so that our
440             # debug_interrupt will sever the connection on about the fifth
441             # chunk fetched. This makes sure that we've started to write the
442             # new shares before we abandon them, which exercises the
443             # abort/delete-partial-share code. TODO: find a cleaner way to do
444             # this. I know that this will affect later uses of the helper in
445             # this same test run, but I'm not currently worried about it.
446             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
447
448             d = self.extra_node.upload(u1)
449
450             def _should_not_finish(res):
451                 self.fail("interrupted upload should have failed, not finished"
452                           " with result %s" % (res,))
453             def _interrupted(f):
454                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
455
456                 # make sure we actually interrupted it before finishing the
457                 # file
458                 self.failUnless(u1.bytes_read < len(DATA),
459                                 "read %d out of %d total" % (u1.bytes_read,
460                                                              len(DATA)))
461
462                 log.msg("waiting for reconnect", level=log.NOISY,
463                         facility="tahoe.test.test_system")
464                 # now, we need to give the nodes a chance to notice that this
465                 # connection has gone away. When this happens, the storage
466                 # servers will be told to abort their uploads, removing the
467                 # partial shares. Unfortunately this involves TCP messages
468                 # going through the loopback interface, and we can't easily
469                 # predict how long that will take. If it were all local, we
470                 # could use fireEventually() to stall. Since we don't have
471                 # the right introduction hooks, the best we can do is use a
472                 # fixed delay. TODO: this is fragile.
473                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
474                 return u1.interrupt_after_d
475             d.addCallbacks(_should_not_finish, _interrupted)
476
477             def _disconnected(res):
478                 # check to make sure the storage servers aren't still hanging
479                 # on to the partial share: their incoming/ directories should
480                 # now be empty.
481                 log.msg("disconnected", level=log.NOISY,
482                         facility="tahoe.test.test_system")
483                 for i in range(self.numclients):
484                     incdir = os.path.join(self.getdir("client%d" % i),
485                                           "storage", "shares", "incoming")
486                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
487             d.addCallback(_disconnected)
488
489             # then we need to give the reconnector a chance to
490             # reestablish the connection to the helper.
491             d.addCallback(lambda res:
492                           log.msg("wait_for_connections", level=log.NOISY,
493                                   facility="tahoe.test.test_system"))
494             d.addCallback(lambda res: self.wait_for_connections())
495
496
497             d.addCallback(lambda res:
498                           log.msg("uploading again", level=log.NOISY,
499                                   facility="tahoe.test.test_system"))
500             d.addCallback(lambda res: self.extra_node.upload(u2))
501
502             def _uploaded(results):
503                 uri = results.uri
504                 log.msg("Second upload complete", level=log.NOISY,
505                         facility="tahoe.test.test_system")
506
507                 # this is really bytes received rather than sent, but it's
508                 # convenient and basically measures the same thing
509                 bytes_sent = results.ciphertext_fetched
510
511                 # We currently don't support resumption of upload if the data is
512                 # encrypted with a random key.  (Because that would require us
513                 # to store the key locally and re-use it on the next upload of
514                 # this file, which isn't a bad thing to do, but we currently
515                 # don't do it.)
516                 if convergence is not None:
517                     # Make sure we did not have to read the whole file the
518                     # second time around .
519                     self.failUnless(bytes_sent < len(DATA),
520                                 "resumption didn't save us any work:"
521                                 " read %d bytes out of %d total" %
522                                 (bytes_sent, len(DATA)))
523                 else:
524                     # Make sure we did have to read the whole file the second
525                     # time around -- because the one that we partially uploaded
526                     # earlier was encrypted with a different random key.
527                     self.failIf(bytes_sent < len(DATA),
528                                 "resumption saved us some work even though we were using random keys:"
529                                 " read %d bytes out of %d total" %
530                                 (bytes_sent, len(DATA)))
531                 return self.downloader.download_to_data(uri)
532             d.addCallback(_uploaded)
533
534             def _check(newdata):
535                 self.failUnlessEqual(newdata, DATA)
536                 # If using convergent encryption, then also check that the
537                 # helper has removed the temp file from its directories.
538                 if convergence is not None:
539                     basedir = os.path.join(self.getdir("client0"), "helper")
540                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
541                     self.failUnlessEqual(files, [])
542                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
543                     self.failUnlessEqual(files, [])
544             d.addCallback(_check)
545             return d
546         d.addCallback(_upload_resumable)
547
548         return d
549
550     def _find_shares(self, basedir):
551         shares = []
552         for (dirpath, dirnames, filenames) in os.walk(basedir):
553             if "storage" not in dirpath:
554                 continue
555             if not filenames:
556                 continue
557             pieces = dirpath.split(os.sep)
558             if pieces[-4] == "storage" and pieces[-3] == "shares":
559                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
560                 # are sharefiles here
561                 assert pieces[-5].startswith("client")
562                 client_num = int(pieces[-5][-1])
563                 storage_index_s = pieces[-1]
564                 storage_index = storage.si_a2b(storage_index_s)
565                 for sharename in filenames:
566                     shnum = int(sharename)
567                     filename = os.path.join(dirpath, sharename)
568                     data = (client_num, storage_index, filename, shnum)
569                     shares.append(data)
570         if not shares:
571             self.fail("unable to find any share files in %s" % basedir)
572         return shares
573
574     def _corrupt_mutable_share(self, filename, which):
575         msf = storage.MutableShareFile(filename)
576         datav = msf.readv([ (0, 1000000) ])
577         final_share = datav[0]
578         assert len(final_share) < 1000000 # ought to be truncated
579         pieces = mutable.unpack_share(final_share)
580         (seqnum, root_hash, IV, k, N, segsize, datalen,
581          verification_key, signature, share_hash_chain, block_hash_tree,
582          share_data, enc_privkey) = pieces
583
584         if which == "seqnum":
585             seqnum = seqnum + 15
586         elif which == "R":
587             root_hash = self.flip_bit(root_hash)
588         elif which == "IV":
589             IV = self.flip_bit(IV)
590         elif which == "segsize":
591             segsize = segsize + 15
592         elif which == "pubkey":
593             verification_key = self.flip_bit(verification_key)
594         elif which == "signature":
595             signature = self.flip_bit(signature)
596         elif which == "share_hash_chain":
597             nodenum = share_hash_chain.keys()[0]
598             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
599         elif which == "block_hash_tree":
600             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
601         elif which == "share_data":
602             share_data = self.flip_bit(share_data)
603         elif which == "encprivkey":
604             enc_privkey = self.flip_bit(enc_privkey)
605
606         prefix = mutable.pack_prefix(seqnum, root_hash, IV, k, N,
607                                      segsize, datalen)
608         final_share = mutable.pack_share(prefix,
609                                          verification_key,
610                                          signature,
611                                          share_hash_chain,
612                                          block_hash_tree,
613                                          share_data,
614                                          enc_privkey)
615         msf.writev( [(0, final_share)], None)
616
617     def test_mutable(self):
618         self.basedir = "system/SystemTest/test_mutable"
619         DATA = "initial contents go here."  # 25 bytes % 3 != 0
620         NEWDATA = "new contents yay"
621         NEWERDATA = "this is getting old"
622
623         d = self.set_up_nodes()
624
625         def _create_mutable(res):
626             c = self.clients[0]
627             log.msg("starting create_mutable_file")
628             d1 = c.create_mutable_file(DATA)
629             def _done(res):
630                 log.msg("DONE: %s" % (res,))
631                 self._mutable_node_1 = res
632                 uri = res.get_uri()
633             d1.addCallback(_done)
634             return d1
635         d.addCallback(_create_mutable)
636
637         def _test_debug(res):
638             # find a share. It is important to run this while there is only
639             # one slot in the grid.
640             shares = self._find_shares(self.basedir)
641             (client_num, storage_index, filename, shnum) = shares[0]
642             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
643                     % filename)
644             log.msg(" for clients[%d]" % client_num)
645
646             out,err = StringIO(), StringIO()
647             rc = runner.runner(["dump-share",
648                                 filename],
649                                stdout=out, stderr=err)
650             output = out.getvalue()
651             self.failUnlessEqual(rc, 0)
652             try:
653                 self.failUnless("Mutable slot found:\n" in output)
654                 self.failUnless("share_type: SDMF\n" in output)
655                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
656                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
657                 self.failUnless(" num_extra_leases: 0\n" in output)
658                 # the pubkey size can vary by a byte, so the container might
659                 # be a bit larger on some runs.
660                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
661                 self.failUnless(m)
662                 container_size = int(m.group(1))
663                 self.failUnless(2037 <= container_size <= 2049, container_size)
664                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
665                 self.failUnless(m)
666                 data_length = int(m.group(1))
667                 self.failUnless(2037 <= data_length <= 2049, data_length)
668                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
669                                 in output)
670                 self.failUnless(" SDMF contents:\n" in output)
671                 self.failUnless("  seqnum: 1\n" in output)
672                 self.failUnless("  required_shares: 3\n" in output)
673                 self.failUnless("  total_shares: 10\n" in output)
674                 self.failUnless("  segsize: 27\n" in output, (output, filename))
675                 self.failUnless("  datalen: 25\n" in output)
676                 # the exact share_hash_chain nodes depends upon the sharenum,
677                 # and is more of a hassle to compute than I want to deal with
678                 # now
679                 self.failUnless("  share_hash_chain: " in output)
680                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
681             except unittest.FailTest:
682                 print
683                 print "dump-share output was:"
684                 print output
685                 raise
686         d.addCallback(_test_debug)
687
688         # test retrieval
689
690         # first, let's see if we can use the existing node to retrieve the
691         # contents. This allows it to use the cached pubkey and maybe the
692         # latest-known sharemap.
693
694         d.addCallback(lambda res: self._mutable_node_1.download_to_data())
695         def _check_download_1(res):
696             self.failUnlessEqual(res, DATA)
697             # now we see if we can retrieve the data from a new node,
698             # constructed using the URI of the original one. We do this test
699             # on the same client that uploaded the data.
700             uri = self._mutable_node_1.get_uri()
701             log.msg("starting retrieve1")
702             newnode = self.clients[0].create_node_from_uri(uri)
703             return newnode.download_to_data()
704         d.addCallback(_check_download_1)
705
706         def _check_download_2(res):
707             self.failUnlessEqual(res, DATA)
708             # same thing, but with a different client
709             uri = self._mutable_node_1.get_uri()
710             newnode = self.clients[1].create_node_from_uri(uri)
711             log.msg("starting retrieve2")
712             d1 = newnode.download_to_data()
713             d1.addCallback(lambda res: (res, newnode))
714             return d1
715         d.addCallback(_check_download_2)
716
717         def _check_download_3((res, newnode)):
718             self.failUnlessEqual(res, DATA)
719             # replace the data
720             log.msg("starting replace1")
721             d1 = newnode.update(NEWDATA)
722             d1.addCallback(lambda res: newnode.download_to_data())
723             return d1
724         d.addCallback(_check_download_3)
725
726         def _check_download_4(res):
727             self.failUnlessEqual(res, NEWDATA)
728             # now create an even newer node and replace the data on it. This
729             # new node has never been used for download before.
730             uri = self._mutable_node_1.get_uri()
731             newnode1 = self.clients[2].create_node_from_uri(uri)
732             newnode2 = self.clients[3].create_node_from_uri(uri)
733             self._newnode3 = self.clients[3].create_node_from_uri(uri)
734             log.msg("starting replace2")
735             d1 = newnode1.overwrite(NEWERDATA)
736             d1.addCallback(lambda res: newnode2.download_to_data())
737             return d1
738         d.addCallback(_check_download_4)
739
740         def _check_download_5(res):
741             log.msg("finished replace2")
742             self.failUnlessEqual(res, NEWERDATA)
743         d.addCallback(_check_download_5)
744
745         def _corrupt_shares(res):
746             # run around and flip bits in all but k of the shares, to test
747             # the hash checks
748             shares = self._find_shares(self.basedir)
749             ## sort by share number
750             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
751             where = dict([ (shnum, filename)
752                            for (client_num, storage_index, filename, shnum)
753                            in shares ])
754             assert len(where) == 10 # this test is designed for 3-of-10
755             for shnum, filename in where.items():
756                 # shares 7,8,9 are left alone. read will check
757                 # (share_hash_chain, block_hash_tree, share_data). New
758                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
759                 # segsize, signature).
760                 if shnum == 0:
761                     # read: this will trigger "pubkey doesn't match
762                     # fingerprint".
763                     self._corrupt_mutable_share(filename, "pubkey")
764                     self._corrupt_mutable_share(filename, "encprivkey")
765                 elif shnum == 1:
766                     # triggers "signature is invalid"
767                     self._corrupt_mutable_share(filename, "seqnum")
768                 elif shnum == 2:
769                     # triggers "signature is invalid"
770                     self._corrupt_mutable_share(filename, "R")
771                 elif shnum == 3:
772                     # triggers "signature is invalid"
773                     self._corrupt_mutable_share(filename, "segsize")
774                 elif shnum == 4:
775                     self._corrupt_mutable_share(filename, "share_hash_chain")
776                 elif shnum == 5:
777                     self._corrupt_mutable_share(filename, "block_hash_tree")
778                 elif shnum == 6:
779                     self._corrupt_mutable_share(filename, "share_data")
780                 # other things to correct: IV, signature
781                 # 7,8,9 are left alone
782
783                 # note that initial_query_count=5 means that we'll hit the
784                 # first 5 servers in effectively random order (based upon
785                 # response time), so we won't necessarily ever get a "pubkey
786                 # doesn't match fingerprint" error (if we hit shnum>=1 before
787                 # shnum=0, we pull the pubkey from there). To get repeatable
788                 # specific failures, we need to set initial_query_count=1,
789                 # but of course that will change the sequencing behavior of
790                 # the retrieval process. TODO: find a reasonable way to make
791                 # this a parameter, probably when we expand this test to test
792                 # for one failure mode at a time.
793
794                 # when we retrieve this, we should get three signature
795                 # failures (where we've mangled seqnum, R, and segsize). The
796                 # pubkey mangling
797         d.addCallback(_corrupt_shares)
798
799         d.addCallback(lambda res: self._newnode3.download_to_data())
800         d.addCallback(_check_download_5)
801
802         def _check_empty_file(res):
803             # make sure we can create empty files, this usually screws up the
804             # segsize math
805             d1 = self.clients[2].create_mutable_file("")
806             d1.addCallback(lambda newnode: newnode.download_to_data())
807             d1.addCallback(lambda res: self.failUnlessEqual("", res))
808             return d1
809         d.addCallback(_check_empty_file)
810
811         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
812         def _created_dirnode(dnode):
813             log.msg("_created_dirnode(%s)" % (dnode,))
814             d1 = dnode.list()
815             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
816             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
817             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
818             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
819             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
820             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
821             d1.addCallback(lambda res: dnode.build_manifest())
822             d1.addCallback(lambda manifest:
823                            self.failUnlessEqual(len(manifest), 1))
824             return d1
825         d.addCallback(_created_dirnode)
826
827         def wait_for_c3_kg_conn():
828             return self.clients[3]._key_generator is not None
829         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
830
831         def check_kg_poolsize(junk, size_delta):
832             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
833                                  self.key_generator_svc.key_generator.pool_size + size_delta)
834
835         d.addCallback(check_kg_poolsize, 0)
836         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
837         d.addCallback(check_kg_poolsize, -1)
838         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
839         d.addCallback(check_kg_poolsize, -2)
840         # use_helper induces use of clients[3], which is the using-key_gen client
841         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
842         d.addCallback(check_kg_poolsize, -3)
843
844         return d
845     # The default 120 second timeout went off when running it under valgrind
846     # on my old Windows laptop, so I'm bumping up the timeout.
847     test_mutable.timeout = 240
848
849     def flip_bit(self, good):
850         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
851
852     def mangle_uri(self, gooduri):
853         # change the key, which changes the storage index, which means we'll
854         # be asking about the wrong file, so nobody will have any shares
855         u = IFileURI(gooduri)
856         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
857                             uri_extension_hash=u.uri_extension_hash,
858                             needed_shares=u.needed_shares,
859                             total_shares=u.total_shares,
860                             size=u.size)
861         return u2.to_string()
862
863     # TODO: add a test which mangles the uri_extension_hash instead, and
864     # should fail due to not being able to get a valid uri_extension block.
865     # Also a test which sneakily mangles the uri_extension block to change
866     # some of the validation data, so it will fail in the post-download phase
867     # when the file's crypttext integrity check fails. Do the same thing for
868     # the key, which should cause the download to fail the post-download
869     # plaintext_hash check.
870
871     def test_vdrive(self):
872         self.basedir = "system/SystemTest/test_vdrive"
873         self.data = LARGE_DATA
874         d = self.set_up_nodes(createprivdir=True)
875         d.addCallback(self._test_introweb)
876         d.addCallback(self.log, "starting publish")
877         d.addCallback(self._do_publish1)
878         d.addCallback(self._test_runner)
879         d.addCallback(self._do_publish2)
880         # at this point, we have the following filesystem (where "R" denotes
881         # self._root_directory_uri):
882         # R
883         # R/subdir1
884         # R/subdir1/mydata567
885         # R/subdir1/subdir2/
886         # R/subdir1/subdir2/mydata992
887
888         d.addCallback(lambda res: self.bounce_client(0))
889         d.addCallback(self.log, "bounced client0")
890
891         d.addCallback(self._check_publish1)
892         d.addCallback(self.log, "did _check_publish1")
893         d.addCallback(self._check_publish2)
894         d.addCallback(self.log, "did _check_publish2")
895         d.addCallback(self._do_publish_private)
896         d.addCallback(self.log, "did _do_publish_private")
897         # now we also have (where "P" denotes a new dir):
898         #  P/personal/sekrit data
899         #  P/s2-rw -> /subdir1/subdir2/
900         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
901         d.addCallback(self._check_publish_private)
902         d.addCallback(self.log, "did _check_publish_private")
903         d.addCallback(self._test_web)
904         d.addCallback(self._test_control)
905         d.addCallback(self._test_cli)
906         # P now has four top-level children:
907         # P/personal/sekrit data
908         # P/s2-ro/
909         # P/s2-rw/
910         # P/test_put/  (empty)
911         d.addCallback(self._test_checker)
912         d.addCallback(self._test_verifier)
913         d.addCallback(self._grab_stats)
914         return d
915     test_vdrive.timeout = 1100
916
917     def _test_introweb(self, res):
918         d = getPage(self.introweb_url, method="GET", followRedirect=True)
919         def _check(res):
920             try:
921                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
922                                 in res)
923                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
924                 self.failUnless("Subscription Summary: storage: 5" in res)
925             except unittest.FailTest:
926                 print
927                 print "GET %s output was:" % self.introweb_url
928                 print res
929                 raise
930         d.addCallback(_check)
931         d.addCallback(lambda res:
932                       getPage(self.introweb_url + "?t=json",
933                               method="GET", followRedirect=True))
934         def _check_json(res):
935             data = simplejson.loads(res)
936             try:
937                 self.failUnlessEqual(data["subscription_summary"],
938                                      {"storage": 5})
939                 self.failUnlessEqual(data["announcement_summary"],
940                                      {"storage": 5, "stub_client": 5})
941             except unittest.FailTest:
942                 print
943                 print "GET %s?t=json output was:" % self.introweb_url
944                 print res
945                 raise
946         d.addCallback(_check_json)
947         return d
948
949     def _do_publish1(self, res):
950         ut = upload.Data(self.data, convergence=None)
951         c0 = self.clients[0]
952         d = c0.create_empty_dirnode()
953         def _made_root(new_dirnode):
954             self._root_directory_uri = new_dirnode.get_uri()
955             return c0.create_node_from_uri(self._root_directory_uri)
956         d.addCallback(_made_root)
957         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
958         def _made_subdir1(subdir1_node):
959             self._subdir1_node = subdir1_node
960             d1 = subdir1_node.add_file(u"mydata567", ut)
961             d1.addCallback(self.log, "publish finished")
962             def _stash_uri(filenode):
963                 self.uri = filenode.get_uri()
964             d1.addCallback(_stash_uri)
965             return d1
966         d.addCallback(_made_subdir1)
967         return d
968
969     def _do_publish2(self, res):
970         ut = upload.Data(self.data, convergence=None)
971         d = self._subdir1_node.create_empty_directory(u"subdir2")
972         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
973         return d
974
975     def log(self, res, msg, **kwargs):
976         # print "MSG: %s  RES: %s" % (msg, res)
977         log.msg(msg, **kwargs)
978         return res
979
980     def stall(self, res, delay=1.0):
981         d = defer.Deferred()
982         reactor.callLater(delay, d.callback, res)
983         return d
984
985     def _do_publish_private(self, res):
986         self.smalldata = "sssh, very secret stuff"
987         ut = upload.Data(self.smalldata, convergence=None)
988         d = self.clients[0].create_empty_dirnode()
989         d.addCallback(self.log, "GOT private directory")
990         def _got_new_dir(privnode):
991             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
992             d1 = privnode.create_empty_directory(u"personal")
993             d1.addCallback(self.log, "made P/personal")
994             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
995             d1.addCallback(self.log, "made P/personal/sekrit data")
996             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
997             def _got_s2(s2node):
998                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
999                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
1000                 return d2
1001             d1.addCallback(_got_s2)
1002             d1.addCallback(lambda res: privnode)
1003             return d1
1004         d.addCallback(_got_new_dir)
1005         return d
1006
1007     def _check_publish1(self, res):
1008         # this one uses the iterative API
1009         c1 = self.clients[1]
1010         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
1011         d.addCallback(self.log, "check_publish1 got /")
1012         d.addCallback(lambda root: root.get(u"subdir1"))
1013         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
1014         d.addCallback(lambda filenode: filenode.download_to_data())
1015         d.addCallback(self.log, "get finished")
1016         def _get_done(data):
1017             self.failUnlessEqual(data, self.data)
1018         d.addCallback(_get_done)
1019         return d
1020
1021     def _check_publish2(self, res):
1022         # this one uses the path-based API
1023         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
1024         d = rootnode.get_child_at_path(u"subdir1")
1025         d.addCallback(lambda dirnode:
1026                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
1027         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1028         d.addCallback(lambda filenode: filenode.download_to_data())
1029         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
1030
1031         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1032         def _got_filenode(filenode):
1033             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
1034             assert fnode == filenode
1035         d.addCallback(_got_filenode)
1036         return d
1037
1038     def _check_publish_private(self, resnode):
1039         # this one uses the path-based API
1040         self._private_node = resnode
1041
1042         d = self._private_node.get_child_at_path(u"personal")
1043         def _got_personal(personal):
1044             self._personal_node = personal
1045             return personal
1046         d.addCallback(_got_personal)
1047
1048         d.addCallback(lambda dirnode:
1049                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
1050         def get_path(path):
1051             return self._private_node.get_child_at_path(path)
1052
1053         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
1054         d.addCallback(lambda filenode: filenode.download_to_data())
1055         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
1056         d.addCallback(lambda res: get_path(u"s2-rw"))
1057         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
1058         d.addCallback(lambda res: get_path(u"s2-ro"))
1059         def _got_s2ro(dirnode):
1060             self.failUnless(dirnode.is_mutable(), dirnode)
1061             self.failUnless(dirnode.is_readonly(), dirnode)
1062             d1 = defer.succeed(None)
1063             d1.addCallback(lambda res: dirnode.list())
1064             d1.addCallback(self.log, "dirnode.list")
1065
1066             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
1067
1068             d1.addCallback(self.log, "doing add_file(ro)")
1069             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
1070             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
1071
1072             d1.addCallback(self.log, "doing get(ro)")
1073             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
1074             d1.addCallback(lambda filenode:
1075                            self.failUnless(IFileNode.providedBy(filenode)))
1076
1077             d1.addCallback(self.log, "doing delete(ro)")
1078             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
1079
1080             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
1081
1082             d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
1083
1084             personal = self._personal_node
1085             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
1086
1087             d1.addCallback(self.log, "doing move_child_to(ro)2")
1088             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
1089
1090             d1.addCallback(self.log, "finished with _got_s2ro")
1091             return d1
1092         d.addCallback(_got_s2ro)
1093         def _got_home(dummy):
1094             home = self._private_node
1095             personal = self._personal_node
1096             d1 = defer.succeed(None)
1097             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
1098             d1.addCallback(lambda res:
1099                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
1100
1101             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
1102             d1.addCallback(lambda res:
1103                            home.move_child_to(u"sekrit", home, u"sekrit data"))
1104
1105             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
1106             d1.addCallback(lambda res:
1107                            home.move_child_to(u"sekrit data", personal))
1108
1109             d1.addCallback(lambda res: home.build_manifest())
1110             d1.addCallback(self.log, "manifest")
1111             #  four items:
1112             # P/personal/
1113             # P/personal/sekrit data
1114             # P/s2-rw  (same as P/s2-ro)
1115             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1116             d1.addCallback(lambda manifest:
1117                            self.failUnlessEqual(len(manifest), 4))
1118             return d1
1119         d.addCallback(_got_home)
1120         return d
1121
1122     def shouldFail(self, res, expected_failure, which, substring=None):
1123         if isinstance(res, Failure):
1124             res.trap(expected_failure)
1125             if substring:
1126                 self.failUnless(substring in str(res),
1127                                 "substring '%s' not in '%s'"
1128                                 % (substring, str(res)))
1129         else:
1130             self.fail("%s was supposed to raise %s, not get '%s'" %
1131                       (which, expected_failure, res))
1132
1133     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1134         assert substring is None or isinstance(substring, str)
1135         d = defer.maybeDeferred(callable, *args, **kwargs)
1136         def done(res):
1137             if isinstance(res, Failure):
1138                 res.trap(expected_failure)
1139                 if substring:
1140                     self.failUnless(substring in str(res),
1141                                     "substring '%s' not in '%s'"
1142                                     % (substring, str(res)))
1143             else:
1144                 self.fail("%s was supposed to raise %s, not get '%s'" %
1145                           (which, expected_failure, res))
1146         d.addBoth(done)
1147         return d
1148
1149     def PUT(self, urlpath, data):
1150         url = self.webish_url + urlpath
1151         return getPage(url, method="PUT", postdata=data)
1152
1153     def GET(self, urlpath, followRedirect=False):
1154         url = self.webish_url + urlpath
1155         return getPage(url, method="GET", followRedirect=followRedirect)
1156
1157     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1158         if use_helper:
1159             url = self.helper_webish_url + urlpath
1160         else:
1161             url = self.webish_url + urlpath
1162         sepbase = "boogabooga"
1163         sep = "--" + sepbase
1164         form = []
1165         form.append(sep)
1166         form.append('Content-Disposition: form-data; name="_charset"')
1167         form.append('')
1168         form.append('UTF-8')
1169         form.append(sep)
1170         for name, value in fields.iteritems():
1171             if isinstance(value, tuple):
1172                 filename, value = value
1173                 form.append('Content-Disposition: form-data; name="%s"; '
1174                             'filename="%s"' % (name, filename.encode("utf-8")))
1175             else:
1176                 form.append('Content-Disposition: form-data; name="%s"' % name)
1177             form.append('')
1178             form.append(str(value))
1179             form.append(sep)
1180         form[-1] += "--"
1181         body = "\r\n".join(form) + "\r\n"
1182         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1183                    }
1184         return getPage(url, method="POST", postdata=body,
1185                        headers=headers, followRedirect=followRedirect)
1186
1187     def _test_web(self, res):
1188         base = self.webish_url
1189         public = "uri/" + self._root_directory_uri
1190         d = getPage(base)
1191         def _got_welcome(page):
1192             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1193             self.failUnless(expected in page,
1194                             "I didn't see the right 'connected storage servers'"
1195                             " message in: %s" % page
1196                             )
1197             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1198             self.failUnless(expected in page,
1199                             "I didn't see the right 'My nodeid' message "
1200                             "in: %s" % page)
1201         d.addCallback(_got_welcome)
1202         d.addCallback(self.log, "done with _got_welcome")
1203
1204         # get the welcome page from the node that uses the helper too
1205         d.addCallback(lambda res: getPage(self.helper_webish_url))
1206         def _got_welcome_helper(page):
1207             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1208                             page)
1209         d.addCallback(_got_welcome_helper)
1210
1211         d.addCallback(lambda res: getPage(base + public))
1212         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1213         def _got_subdir1(page):
1214             # there ought to be an href for our file
1215             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1216             self.failUnless(">mydata567</a>" in page)
1217         d.addCallback(_got_subdir1)
1218         d.addCallback(self.log, "done with _got_subdir1")
1219         d.addCallback(lambda res:
1220                       getPage(base + public + "/subdir1/mydata567"))
1221         def _got_data(page):
1222             self.failUnlessEqual(page, self.data)
1223         d.addCallback(_got_data)
1224
1225         # download from a URI embedded in a URL
1226         d.addCallback(self.log, "_get_from_uri")
1227         def _get_from_uri(res):
1228             return getPage(base + "uri/%s?filename=%s"
1229                            % (self.uri, "mydata567"))
1230         d.addCallback(_get_from_uri)
1231         def _got_from_uri(page):
1232             self.failUnlessEqual(page, self.data)
1233         d.addCallback(_got_from_uri)
1234
1235         # download from a URI embedded in a URL, second form
1236         d.addCallback(self.log, "_get_from_uri2")
1237         def _get_from_uri2(res):
1238             return getPage(base + "uri?uri=%s" % (self.uri,))
1239         d.addCallback(_get_from_uri2)
1240         d.addCallback(_got_from_uri)
1241
1242         # download from a bogus URI, make sure we get a reasonable error
1243         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1244         def _get_from_bogus_uri(res):
1245             d1 = getPage(base + "uri/%s?filename=%s"
1246                          % (self.mangle_uri(self.uri), "mydata567"))
1247             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1248                        "410")
1249             return d1
1250         d.addCallback(_get_from_bogus_uri)
1251         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1252
1253         # upload a file with PUT
1254         d.addCallback(self.log, "about to try PUT")
1255         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1256                                            "new.txt contents"))
1257         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1258         d.addCallback(self.failUnlessEqual, "new.txt contents")
1259         # and again with something large enough to use multiple segments,
1260         # and hopefully trigger pauseProducing too
1261         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1262                                            "big" * 500000)) # 1.5MB
1263         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1264         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1265
1266         # can we replace files in place?
1267         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1268                                            "NEWER contents"))
1269         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1270         d.addCallback(self.failUnlessEqual, "NEWER contents")
1271
1272         # test unlinked POST
1273         d.addCallback(lambda res: self.POST("uri", t="upload",
1274                                             file=("new.txt", "data" * 10000)))
1275         # and again using the helper, which exercises different upload-status
1276         # display code
1277         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1278                                             file=("foo.txt", "data2" * 10000)))
1279
1280         # check that the status page exists
1281         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1282         def _got_status(res):
1283             # find an interesting upload and download to look at. LIT files
1284             # are not interesting.
1285             for dl in self.clients[0].list_recent_downloads():
1286                 if dl.get_size() > 200:
1287                     self._down_status = dl.get_counter()
1288             for ul in self.clients[0].list_recent_uploads():
1289                 if ul.get_size() > 200:
1290                     self._up_status = ul.get_counter()
1291             rs = self.clients[0].list_recent_retrieve()[0]
1292             self._retrieve_status = rs.get_counter()
1293             ps = self.clients[0].list_recent_publish()[0]
1294             self._publish_status = ps.get_counter()
1295
1296             # and that there are some upload- and download- status pages
1297             return self.GET("status/up-%d" % self._up_status)
1298         d.addCallback(_got_status)
1299         def _got_up(res):
1300             return self.GET("status/down-%d" % self._down_status)
1301         d.addCallback(_got_up)
1302         def _got_down(res):
1303             return self.GET("status/publish-%d" % self._publish_status)
1304         d.addCallback(_got_down)
1305         def _got_publish(res):
1306             return self.GET("status/retrieve-%d" % self._retrieve_status)
1307         d.addCallback(_got_publish)
1308
1309         # check that the helper status page exists
1310         d.addCallback(lambda res:
1311                       self.GET("helper_status", followRedirect=True))
1312         def _got_helper_status(res):
1313             self.failUnless("Bytes Fetched:" in res)
1314             # touch a couple of files in the helper's working directory to
1315             # exercise more code paths
1316             workdir = os.path.join(self.getdir("client0"), "helper")
1317             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1318             f = open(incfile, "wb")
1319             f.write("small file")
1320             f.close()
1321             then = time.time() - 86400*3
1322             now = time.time()
1323             os.utime(incfile, (now, then))
1324             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1325             f = open(encfile, "wb")
1326             f.write("less small file")
1327             f.close()
1328             os.utime(encfile, (now, then))
1329         d.addCallback(_got_helper_status)
1330         # and that the json form exists
1331         d.addCallback(lambda res:
1332                       self.GET("helper_status?t=json", followRedirect=True))
1333         def _got_helper_status_json(res):
1334             data = simplejson.loads(res)
1335             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1336                                  1)
1337             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1338             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1339             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1340                                  10)
1341             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1342             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1343             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1344                                  15)
1345         d.addCallback(_got_helper_status_json)
1346
1347         # and check that client[3] (which uses a helper but does not run one
1348         # itself) doesn't explode when you ask for its helper status with
1349         # t=json
1350         d.addCallback(lambda res:
1351                       getPage(self.helper_webish_url + "helper_status?t=json"))
1352         def _got_non_helper_status_json(res):
1353             data = simplejson.loads(res)
1354             self.failUnlessEqual(data, {})
1355         d.addCallback(_got_non_helper_status_json)
1356
1357
1358         # TODO: mangle the second segment of a file, to test errors that
1359         # occur after we've already sent some good data, which uses a
1360         # different error path.
1361
1362         # TODO: download a URI with a form
1363         # TODO: create a directory by using a form
1364         # TODO: upload by using a form on the directory page
1365         #    url = base + "somedir/subdir1/freeform_post!!upload"
1366         # TODO: delete a file by using a button on the directory page
1367
1368         return d
1369
1370     def _test_runner(self, res):
1371         # exercise some of the diagnostic tools in runner.py
1372
1373         # find a share
1374         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1375             if "storage" not in dirpath:
1376                 continue
1377             if not filenames:
1378                 continue
1379             pieces = dirpath.split(os.sep)
1380             if pieces[-4] == "storage" and pieces[-3] == "shares":
1381                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1382                 # are sharefiles here
1383                 filename = os.path.join(dirpath, filenames[0])
1384                 # peek at the magic to see if it is a chk share
1385                 magic = open(filename, "rb").read(4)
1386                 if magic == '\x00\x00\x00\x01':
1387                     break
1388         else:
1389             self.fail("unable to find any uri_extension files in %s"
1390                       % self.basedir)
1391         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1392
1393         out,err = StringIO(), StringIO()
1394         rc = runner.runner(["dump-share",
1395                             filename],
1396                            stdout=out, stderr=err)
1397         output = out.getvalue()
1398         self.failUnlessEqual(rc, 0)
1399
1400         # we only upload a single file, so we can assert some things about
1401         # its size and shares.
1402         self.failUnless(("share filename: %s" % filename) in output)
1403         self.failUnless("size: %d\n" % len(self.data) in output)
1404         self.failUnless("num_segments: 1\n" in output)
1405         # segment_size is always a multiple of needed_shares
1406         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1407         self.failUnless("total_shares: 10\n" in output)
1408         # keys which are supposed to be present
1409         for key in ("size", "num_segments", "segment_size",
1410                     "needed_shares", "total_shares",
1411                     "codec_name", "codec_params", "tail_codec_params",
1412                     #"plaintext_hash", "plaintext_root_hash",
1413                     "crypttext_hash", "crypttext_root_hash",
1414                     "share_root_hash", "UEB_hash"):
1415             self.failUnless("%s: " % key in output, key)
1416
1417         # now use its storage index to find the other shares using the
1418         # 'find-shares' tool
1419         sharedir, shnum = os.path.split(filename)
1420         storagedir, storage_index_s = os.path.split(sharedir)
1421         out,err = StringIO(), StringIO()
1422         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1423         cmd = ["find-shares", storage_index_s] + nodedirs
1424         rc = runner.runner(cmd, stdout=out, stderr=err)
1425         self.failUnlessEqual(rc, 0)
1426         out.seek(0)
1427         sharefiles = [sfn.strip() for sfn in out.readlines()]
1428         self.failUnlessEqual(len(sharefiles), 10)
1429
1430         # also exercise the 'catalog-shares' tool
1431         out,err = StringIO(), StringIO()
1432         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1433         cmd = ["catalog-shares"] + nodedirs
1434         rc = runner.runner(cmd, stdout=out, stderr=err)
1435         self.failUnlessEqual(rc, 0)
1436         out.seek(0)
1437         descriptions = [sfn.strip() for sfn in out.readlines()]
1438         self.failUnlessEqual(len(descriptions), 30)
1439         matching = [line
1440                     for line in descriptions
1441                     if line.startswith("CHK %s " % storage_index_s)]
1442         self.failUnlessEqual(len(matching), 10)
1443
1444     def _test_control(self, res):
1445         # exercise the remote-control-the-client foolscap interfaces in
1446         # allmydata.control (mostly used for performance tests)
1447         c0 = self.clients[0]
1448         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1449         control_furl = open(control_furl_file, "r").read().strip()
1450         # it doesn't really matter which Tub we use to connect to the client,
1451         # so let's just use our IntroducerNode's
1452         d = self.introducer.tub.getReference(control_furl)
1453         d.addCallback(self._test_control2, control_furl_file)
1454         return d
1455     def _test_control2(self, rref, filename):
1456         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1457         downfile = os.path.join(self.basedir, "control.downfile")
1458         d.addCallback(lambda uri:
1459                       rref.callRemote("download_from_uri_to_file",
1460                                       uri, downfile))
1461         def _check(res):
1462             self.failUnlessEqual(res, downfile)
1463             data = open(downfile, "r").read()
1464             expected_data = open(filename, "r").read()
1465             self.failUnlessEqual(data, expected_data)
1466         d.addCallback(_check)
1467         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1468         if sys.platform == "linux2":
1469             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1470         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1471         return d
1472
1473     def _test_cli(self, res):
1474         # run various CLI commands (in a thread, since they use blocking
1475         # network calls)
1476
1477         private_uri = self._private_node.get_uri()
1478         some_uri = self._root_directory_uri
1479         client0_basedir = self.getdir("client0")
1480
1481         nodeargs = [
1482             "--node-directory", client0_basedir,
1483             "--dir-cap", private_uri,
1484             ]
1485         public_nodeargs = [
1486             "--node-url", self.webish_url,
1487             "--dir-cap", some_uri,
1488             ]
1489         TESTDATA = "I will not write the same thing over and over.\n" * 100
1490
1491         d = defer.succeed(None)
1492
1493         def _ls_root(res):
1494             argv = ["ls"] + nodeargs
1495             return self._run_cli(argv)
1496         d.addCallback(_ls_root)
1497         def _check_ls_root((out,err)):
1498             self.failUnless("personal" in out)
1499             self.failUnless("s2-ro" in out)
1500             self.failUnless("s2-rw" in out)
1501             self.failUnlessEqual(err, "")
1502         d.addCallback(_check_ls_root)
1503
1504         def _ls_subdir(res):
1505             argv = ["ls"] + nodeargs + ["personal"]
1506             return self._run_cli(argv)
1507         d.addCallback(_ls_subdir)
1508         def _check_ls_subdir((out,err)):
1509             self.failUnless("sekrit data" in out)
1510             self.failUnlessEqual(err, "")
1511         d.addCallback(_check_ls_subdir)
1512
1513         def _ls_public_subdir(res):
1514             argv = ["ls"] + public_nodeargs + ["subdir1"]
1515             return self._run_cli(argv)
1516         d.addCallback(_ls_public_subdir)
1517         def _check_ls_public_subdir((out,err)):
1518             self.failUnless("subdir2" in out)
1519             self.failUnless("mydata567" in out)
1520             self.failUnlessEqual(err, "")
1521         d.addCallback(_check_ls_public_subdir)
1522
1523         def _ls_file(res):
1524             argv = ["ls"] + public_nodeargs + ["subdir1/mydata567"]
1525             return self._run_cli(argv)
1526         d.addCallback(_ls_file)
1527         def _check_ls_file((out,err)):
1528             self.failUnlessEqual(out.strip(), "112 subdir1/mydata567")
1529             self.failUnlessEqual(err, "")
1530         d.addCallback(_check_ls_file)
1531
1532         # tahoe_ls doesn't currently handle the error correctly: it tries to
1533         # JSON-parse a traceback.
1534 ##         def _ls_missing(res):
1535 ##             argv = ["ls"] + nodeargs + ["bogus"]
1536 ##             return self._run_cli(argv)
1537 ##         d.addCallback(_ls_missing)
1538 ##         def _check_ls_missing((out,err)):
1539 ##             print "OUT", out
1540 ##             print "ERR", err
1541 ##             self.failUnlessEqual(err, "")
1542 ##         d.addCallback(_check_ls_missing)
1543
1544         def _put(res):
1545             tdir = self.getdir("cli_put")
1546             fileutil.make_dirs(tdir)
1547             fn = os.path.join(tdir, "upload_me")
1548             f = open(fn, "wb")
1549             f.write(TESTDATA)
1550             f.close()
1551             argv = ["put"] + nodeargs + [fn, "test_put/upload.txt"]
1552             return self._run_cli(argv)
1553         d.addCallback(_put)
1554         def _check_put((out,err)):
1555             self.failUnless("200 OK" in out)
1556             self.failUnlessEqual(err, "")
1557             d = self._private_node.get_child_at_path(u"test_put/upload.txt")
1558             d.addCallback(lambda filenode: filenode.download_to_data())
1559             def _check_put2(res):
1560                 self.failUnlessEqual(res, TESTDATA)
1561             d.addCallback(_check_put2)
1562             return d
1563         d.addCallback(_check_put)
1564
1565         def _get_to_stdout(res):
1566             argv = ["get"] + nodeargs + ["test_put/upload.txt"]
1567             return self._run_cli(argv)
1568         d.addCallback(_get_to_stdout)
1569         def _check_get_to_stdout((out,err)):
1570             self.failUnlessEqual(out, TESTDATA)
1571             self.failUnlessEqual(err, "")
1572         d.addCallback(_check_get_to_stdout)
1573
1574         get_to_file_target = self.basedir + "/get.downfile"
1575         def _get_to_file(res):
1576             argv = ["get"] + nodeargs + ["test_put/upload.txt",
1577                                          get_to_file_target]
1578             return self._run_cli(argv)
1579         d.addCallback(_get_to_file)
1580         def _check_get_to_file((out,err)):
1581             data = open(get_to_file_target, "rb").read()
1582             self.failUnlessEqual(data, TESTDATA)
1583             self.failUnlessEqual(out, "")
1584             self.failUnlessEqual(err, "test_put/upload.txt retrieved and written to system/SystemTest/test_vdrive/get.downfile\n")
1585         d.addCallback(_check_get_to_file)
1586
1587
1588         def _mv(res):
1589             argv = ["mv"] + nodeargs + ["test_put/upload.txt",
1590                                         "test_put/moved.txt"]
1591             return self._run_cli(argv)
1592         d.addCallback(_mv)
1593         def _check_mv((out,err)):
1594             self.failUnless("OK" in out)
1595             self.failUnlessEqual(err, "")
1596             d = self.shouldFail2(KeyError, "test_cli._check_rm", "'upload.txt'", self._private_node.get_child_at_path, u"test_put/upload.txt")
1597
1598             d.addCallback(lambda res:
1599                           self._private_node.get_child_at_path(u"test_put/moved.txt"))
1600             d.addCallback(lambda filenode: filenode.download_to_data())
1601             def _check_mv2(res):
1602                 self.failUnlessEqual(res, TESTDATA)
1603             d.addCallback(_check_mv2)
1604             return d
1605         d.addCallback(_check_mv)
1606
1607         def _rm(res):
1608             argv = ["rm"] + nodeargs + ["test_put/moved.txt"]
1609             return self._run_cli(argv)
1610         d.addCallback(_rm)
1611         def _check_rm((out,err)):
1612             self.failUnless("200 OK" in out)
1613             self.failUnlessEqual(err, "")
1614             d = self.shouldFail2(KeyError, "test_cli._check_rm", "'moved.txt'", self._private_node.get_child_at_path, u"test_put/moved.txt")
1615             return d
1616         d.addCallback(_check_rm)
1617         return d
1618
1619     def _run_cli(self, argv):
1620         stdout, stderr = StringIO(), StringIO()
1621         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1622                                   stdout=stdout, stderr=stderr)
1623         def _done(res):
1624             return stdout.getvalue(), stderr.getvalue()
1625         d.addCallback(_done)
1626         return d
1627
1628     def _test_checker(self, res):
1629         d = self._private_node.build_manifest()
1630         d.addCallback(self._test_checker_2)
1631         return d
1632
1633     def _test_checker_2(self, manifest):
1634         checker1 = self.clients[1].getServiceNamed("checker")
1635         self.failUnlessEqual(checker1.checker_results_for(None), [])
1636         self.failUnlessEqual(checker1.checker_results_for(list(manifest)[0]),
1637                              [])
1638         dl = []
1639         starting_time = time.time()
1640         for si in manifest:
1641             dl.append(checker1.check(si))
1642         d = deferredutil.DeferredListShouldSucceed(dl)
1643
1644         def _check_checker_results(res):
1645             for i in res:
1646                 if type(i) is bool:
1647                     self.failUnless(i is True)
1648                 else:
1649                     (needed, total, found, sharemap) = i
1650                     self.failUnlessEqual(needed, 3)
1651                     self.failUnlessEqual(total, 10)
1652                     self.failUnlessEqual(found, total)
1653                     self.failUnlessEqual(len(sharemap.keys()), 10)
1654                     peers = set()
1655                     for shpeers in sharemap.values():
1656                         peers.update(shpeers)
1657                     self.failUnlessEqual(len(peers), self.numclients)
1658         d.addCallback(_check_checker_results)
1659
1660         def _check_stored_results(res):
1661             finish_time = time.time()
1662             all_results = []
1663             for si in manifest:
1664                 results = checker1.checker_results_for(si)
1665                 if not results:
1666                     # TODO: implement checker for mutable files and implement tests of that checker
1667                     continue
1668                 self.failUnlessEqual(len(results), 1)
1669                 when, those_results = results[0]
1670                 self.failUnless(isinstance(when, (int, float)))
1671                 self.failUnless(starting_time <= when <= finish_time)
1672                 all_results.append(those_results)
1673             _check_checker_results(all_results)
1674         d.addCallback(_check_stored_results)
1675
1676         d.addCallback(self._test_checker_3)
1677         return d
1678
1679     def _test_checker_3(self, res):
1680         # check one file, through FileNode.check()
1681         d = self._private_node.get_child_at_path(u"personal/sekrit data")
1682         d.addCallback(lambda n: n.check())
1683         def _checked(results):
1684             # 'sekrit data' is small, and fits in a LiteralFileNode, so
1685             # checking it is trivial and always returns True
1686             self.failUnlessEqual(results, True)
1687         d.addCallback(_checked)
1688
1689         c0 = self.clients[1]
1690         n = c0.create_node_from_uri(self._root_directory_uri)
1691         d.addCallback(lambda res: n.get_child_at_path(u"subdir1/mydata567"))
1692         d.addCallback(lambda n: n.check())
1693         def _checked2(results):
1694             # mydata567 is large and lives in a CHK
1695             (needed, total, found, sharemap) = results
1696             self.failUnlessEqual(needed, 3)
1697             self.failUnlessEqual(total, 10)
1698             self.failUnlessEqual(found, 10)
1699             self.failUnlessEqual(len(sharemap), 10)
1700             for shnum in range(10):
1701                 self.failUnlessEqual(len(sharemap[shnum]), 1)
1702         d.addCallback(_checked2)
1703         return d
1704
1705
1706     def _test_verifier(self, res):
1707         checker1 = self.clients[1].getServiceNamed("checker")
1708         d = self._private_node.build_manifest()
1709         def _check_all(manifest):
1710             dl = []
1711             for si in manifest:
1712                 dl.append(checker1.verify(si))
1713             return deferredutil.DeferredListShouldSucceed(dl)
1714         d.addCallback(_check_all)
1715         def _done(res):
1716             for i in res:
1717                 self.failUnless(i is True)
1718         d.addCallback(_done)
1719         d.addCallback(lambda res: checker1.verify(None))
1720         d.addCallback(self.failUnlessEqual, True)
1721         return d