]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
mutable WIP: add servermap update status pages
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1
2 from base64 import b32encode
3 import os, sys, time, re, simplejson
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.application import service
10 import allmydata
11 from allmydata import client, uri, download, upload, storage, offloaded
12 from allmydata.introducer import IntroducerNode
13 from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
14 from allmydata.util import log
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
17 from allmydata.mutable.common import NotMutableError
18 from allmydata.mutable import layout as mutable_layout
19 from allmydata.stats import PickleStatsGatherer
20 from allmydata.key_generator import KeyGeneratorService
21 from foolscap.eventual import flushEventualQueue, fireEventually
22 from foolscap import DeadReferenceError, Tub
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
26
27 def flush_but_dont_ignore(res):
28     d = flushEventualQueue()
29     def _done(ignored):
30         return res
31     d.addCallback(_done)
32     return d
33
34 LARGE_DATA = """
35 This is some data to publish to the virtual drive, which needs to be large
36 enough to not fit inside a LIT uri.
37 """
38
39 class CountingDataUploadable(upload.Data):
40     bytes_read = 0
41     interrupt_after = None
42     interrupt_after_d = None
43
44     def read(self, length):
45         self.bytes_read += length
46         if self.interrupt_after is not None:
47             if self.bytes_read > self.interrupt_after:
48                 self.interrupt_after = None
49                 self.interrupt_after_d.callback(self)
50         return upload.Data.read(self, length)
51
52
53 class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
54
55     def setUp(self):
56         self.sparent = service.MultiService()
57         self.sparent.startService()
58     def tearDown(self):
59         log.msg("shutting down SystemTest services")
60         d = self.sparent.stopService()
61         d.addBoth(flush_but_dont_ignore)
62         return d
63
64     def getdir(self, subdir):
65         return os.path.join(self.basedir, subdir)
66
67     def add_service(self, s):
68         s.setServiceParent(self.sparent)
69         return s
70
71     def set_up_nodes(self, NUMCLIENTS=5, createprivdir=False):
72         self.numclients = NUMCLIENTS
73         self.createprivdir = createprivdir
74         iv_dir = self.getdir("introducer")
75         if not os.path.isdir(iv_dir):
76             fileutil.make_dirs(iv_dir)
77         f = open(os.path.join(iv_dir, "webport"), "w")
78         f.write("tcp:0:interface=127.0.0.1\n")
79         f.close()
80         iv = IntroducerNode(basedir=iv_dir)
81         self.introducer = self.add_service(iv)
82         d = self.introducer.when_tub_ready()
83         d.addCallback(self._get_introducer_web)
84         d.addCallback(self._set_up_stats_gatherer)
85         d.addCallback(self._set_up_key_generator)
86         d.addCallback(self._set_up_nodes_2)
87         d.addCallback(self._grab_stats)
88         return d
89
90     def _get_introducer_web(self, res):
91         f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
92         self.introweb_url = f.read().strip()
93         f.close()
94
95     def _set_up_stats_gatherer(self, res):
96         statsdir = self.getdir("stats_gatherer")
97         fileutil.make_dirs(statsdir)
98         t = Tub()
99         self.add_service(t)
100         l = t.listenOn("tcp:0")
101         p = l.getPortnum()
102         t.setLocation("localhost:%d" % p)
103
104         self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
105         self.add_service(self.stats_gatherer)
106         self.stats_gatherer_furl = self.stats_gatherer.get_furl()
107
108     def _set_up_key_generator(self, res):
109         kgsdir = self.getdir("key_generator")
110         fileutil.make_dirs(kgsdir)
111
112         self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False)
113         self.key_generator_svc.key_generator.pool_size = 4
114         self.key_generator_svc.key_generator.pool_refresh_delay = 60
115         self.add_service(self.key_generator_svc)
116
117         d = fireEventually()
118         def check_for_furl():
119             return os.path.exists(os.path.join(kgsdir, 'key_generator.furl'))
120         d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
121         def get_furl(junk):
122             kgf = os.path.join(kgsdir, 'key_generator.furl')
123             self.key_generator_furl = file(kgf, 'rb').read().strip()
124         d.addCallback(get_furl)
125         return d
126
127     def _set_up_nodes_2(self, res):
128         q = self.introducer
129         self.introducer_furl = q.introducer_url
130         self.clients = []
131         basedirs = []
132         for i in range(self.numclients):
133             basedir = self.getdir("client%d" % i)
134             basedirs.append(basedir)
135             fileutil.make_dirs(basedir)
136             if i == 0:
137                 # client[0] runs a webserver and a helper, no key_generator
138                 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
139                 open(os.path.join(basedir, "run_helper"), "w").write("yes\n")
140             if i == 3:
141                 # client[3] runs a webserver and uses a helper, uses key_generator
142                 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
143                 kgf = "%s\n" % (self.key_generator_furl,)
144                 open(os.path.join(basedir, "key_generator.furl"), "w").write(kgf)
145             if self.createprivdir:
146                 fileutil.make_dirs(os.path.join(basedir, "private"))
147                 open(os.path.join(basedir, "private", "root_dir.cap"), "w")
148             open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
149             open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
150
151         # start client[0], wait for it's tub to be ready (at which point it
152         # will have registered the helper furl).
153         c = self.add_service(client.Client(basedir=basedirs[0]))
154         self.clients.append(c)
155         d = c.when_tub_ready()
156         def _ready(res):
157             f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
158             helper_furl = f.read()
159             f.close()
160             self.helper_furl = helper_furl
161             f = open(os.path.join(basedirs[3],"helper.furl"), "w")
162             f.write(helper_furl)
163             f.close()
164
165             # this starts the rest of the clients
166             for i in range(1, self.numclients):
167                 c = self.add_service(client.Client(basedir=basedirs[i]))
168                 self.clients.append(c)
169             log.msg("STARTING")
170             return self.wait_for_connections()
171         d.addCallback(_ready)
172         def _connected(res):
173             log.msg("CONNECTED")
174             # now find out where the web port was
175             l = self.clients[0].getServiceNamed("webish").listener
176             port = l._port.getHost().port
177             self.webish_url = "http://localhost:%d/" % port
178             # and the helper-using webport
179             l = self.clients[3].getServiceNamed("webish").listener
180             port = l._port.getHost().port
181             self.helper_webish_url = "http://localhost:%d/" % port
182         d.addCallback(_connected)
183         return d
184
185     def _grab_stats(self, res):
186         d = self.stats_gatherer.poll()
187         return d
188
189     def bounce_client(self, num):
190         c = self.clients[num]
191         d = c.disownServiceParent()
192         # I think windows requires a moment to let the connection really stop
193         # and the port number made available for re-use. TODO: examine the
194         # behavior, see if this is really the problem, see if we can do
195         # better than blindly waiting for a second.
196         d.addCallback(self.stall, 1.0)
197         def _stopped(res):
198             new_c = client.Client(basedir=self.getdir("client%d" % num))
199             self.clients[num] = new_c
200             self.add_service(new_c)
201             return new_c.when_tub_ready()
202         d.addCallback(_stopped)
203         d.addCallback(lambda res: self.wait_for_connections())
204         def _maybe_get_webport(res):
205             if num == 0:
206                 # now find out where the web port was
207                 l = self.clients[0].getServiceNamed("webish").listener
208                 port = l._port.getHost().port
209                 self.webish_url = "http://localhost:%d/" % port
210         d.addCallback(_maybe_get_webport)
211         return d
212
213     def add_extra_node(self, client_num, helper_furl=None,
214                        add_to_sparent=False):
215         # usually this node is *not* parented to our self.sparent, so we can
216         # shut it down separately from the rest, to exercise the
217         # connection-lost code
218         basedir = self.getdir("client%d" % client_num)
219         if not os.path.isdir(basedir):
220             fileutil.make_dirs(basedir)
221         open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
222         if helper_furl:
223             f = open(os.path.join(basedir, "helper.furl") ,"w")
224             f.write(helper_furl+"\n")
225             f.close()
226
227         c = client.Client(basedir=basedir)
228         self.clients.append(c)
229         self.numclients += 1
230         if add_to_sparent:
231             c.setServiceParent(self.sparent)
232         else:
233             c.startService()
234         d = self.wait_for_connections()
235         d.addCallback(lambda res: c)
236         return d
237
238     def _check_connections(self):
239         for c in self.clients:
240             ic = c.introducer_client
241             if not ic.connected_to_introducer():
242                 return False
243             if len(ic.get_all_peerids()) != self.numclients:
244                 return False
245         return True
246
247     def wait_for_connections(self, ignored=None):
248         # TODO: replace this with something that takes a list of peerids and
249         # fires when they've all been heard from, instead of using a count
250         # and a threshold
251         return self.poll(self._check_connections, timeout=200)
252
253     def test_connections(self):
254         self.basedir = "system/SystemTest/test_connections"
255         d = self.set_up_nodes()
256         self.extra_node = None
257         d.addCallback(lambda res: self.add_extra_node(self.numclients))
258         def _check(extra_node):
259             self.extra_node = extra_node
260             for c in self.clients:
261                 all_peerids = list(c.get_all_peerids())
262                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
263                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
264                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
265
266         d.addCallback(_check)
267         def _shutdown_extra_node(res):
268             if self.extra_node:
269                 return self.extra_node.stopService()
270             return res
271         d.addBoth(_shutdown_extra_node)
272         return d
273     test_connections.timeout = 300
274     # test_connections is subsumed by test_upload_and_download, and takes
275     # quite a while to run on a slow machine (because of all the TLS
276     # connections that must be established). If we ever rework the introducer
277     # code to such an extent that we're not sure if it works anymore, we can
278     # reinstate this test until it does.
279     del test_connections
280
281     def test_upload_and_download_random_key(self):
282         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
283         return self._test_upload_and_download(convergence=None)
284     test_upload_and_download_random_key.timeout = 4800
285
286     def test_upload_and_download_convergent(self):
287         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
288         return self._test_upload_and_download(convergence="some convergence string")
289     test_upload_and_download_convergent.timeout = 4800
290
291     def _test_upload_and_download(self, convergence):
292         # we use 4000 bytes of data, which will result in about 400k written
293         # to disk among all our simulated nodes
294         DATA = "Some data to upload\n" * 200
295         d = self.set_up_nodes()
296         def _check_connections(res):
297             for c in self.clients:
298                 all_peerids = list(c.get_all_peerids())
299                 self.failUnlessEqual(len(all_peerids), self.numclients)
300                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
301                 self.failUnlessEqual(len(permuted_peers), self.numclients)
302         d.addCallback(_check_connections)
303
304         def _do_upload(res):
305             log.msg("UPLOADING")
306             u = self.clients[0].getServiceNamed("uploader")
307             self.uploader = u
308             # we crank the max segsize down to 1024b for the duration of this
309             # test, so we can exercise multiple segments. It is important
310             # that this is not a multiple of the segment size, so that the
311             # tail segment is not the same length as the others. This actualy
312             # gets rounded up to 1025 to be a multiple of the number of
313             # required shares (since we use 25 out of 100 FEC).
314             up = upload.Data(DATA, convergence=convergence)
315             up.max_segment_size = 1024
316             d1 = u.upload(up)
317             return d1
318         d.addCallback(_do_upload)
319         def _upload_done(results):
320             uri = results.uri
321             log.msg("upload finished: uri is %s" % (uri,))
322             self.uri = uri
323             dl = self.clients[1].getServiceNamed("downloader")
324             self.downloader = dl
325         d.addCallback(_upload_done)
326
327         def _upload_again(res):
328             # Upload again. If using convergent encryption then this ought to be
329             # short-circuited, however with the way we currently generate URIs
330             # (i.e. because they include the roothash), we have to do all of the
331             # encoding work, and only get to save on the upload part.
332             log.msg("UPLOADING AGAIN")
333             up = upload.Data(DATA, convergence=convergence)
334             up.max_segment_size = 1024
335             d1 = self.uploader.upload(up)
336         d.addCallback(_upload_again)
337
338         def _download_to_data(res):
339             log.msg("DOWNLOADING")
340             return self.downloader.download_to_data(self.uri)
341         d.addCallback(_download_to_data)
342         def _download_to_data_done(data):
343             log.msg("download finished")
344             self.failUnlessEqual(data, DATA)
345         d.addCallback(_download_to_data_done)
346
347         target_filename = os.path.join(self.basedir, "download.target")
348         def _download_to_filename(res):
349             return self.downloader.download_to_filename(self.uri,
350                                                         target_filename)
351         d.addCallback(_download_to_filename)
352         def _download_to_filename_done(res):
353             newdata = open(target_filename, "rb").read()
354             self.failUnlessEqual(newdata, DATA)
355         d.addCallback(_download_to_filename_done)
356
357         target_filename2 = os.path.join(self.basedir, "download.target2")
358         def _download_to_filehandle(res):
359             fh = open(target_filename2, "wb")
360             return self.downloader.download_to_filehandle(self.uri, fh)
361         d.addCallback(_download_to_filehandle)
362         def _download_to_filehandle_done(fh):
363             fh.close()
364             newdata = open(target_filename2, "rb").read()
365             self.failUnlessEqual(newdata, DATA)
366         d.addCallback(_download_to_filehandle_done)
367
368         def _download_nonexistent_uri(res):
369             baduri = self.mangle_uri(self.uri)
370             log.msg("about to download non-existent URI", level=log.UNUSUAL,
371                     facility="tahoe.tests")
372             d1 = self.downloader.download_to_data(baduri)
373             def _baduri_should_fail(res):
374                 log.msg("finished downloading non-existend URI",
375                         level=log.UNUSUAL, facility="tahoe.tests")
376                 self.failUnless(isinstance(res, Failure))
377                 self.failUnless(res.check(download.NotEnoughSharesError),
378                                 "expected NotEnoughSharesError, got %s" % res)
379                 # TODO: files that have zero peers should get a special kind
380                 # of NotEnoughSharesError, which can be used to suggest that
381                 # the URI might be wrong or that they've never uploaded the
382                 # file in the first place.
383             d1.addBoth(_baduri_should_fail)
384             return d1
385         d.addCallback(_download_nonexistent_uri)
386
387         # add a new node, which doesn't accept shares, and only uses the
388         # helper for upload.
389         d.addCallback(lambda res: self.add_extra_node(self.numclients,
390                                                       self.helper_furl,
391                                                       add_to_sparent=True))
392         def _added(extra_node):
393             self.extra_node = extra_node
394             extra_node.getServiceNamed("storage").sizelimit = 0
395         d.addCallback(_added)
396
397         HELPER_DATA = "Data that needs help to upload" * 1000
398         def _upload_with_helper(res):
399             u = upload.Data(HELPER_DATA, convergence=convergence)
400             d = self.extra_node.upload(u)
401             def _uploaded(results):
402                 uri = results.uri
403                 return self.downloader.download_to_data(uri)
404             d.addCallback(_uploaded)
405             def _check(newdata):
406                 self.failUnlessEqual(newdata, HELPER_DATA)
407             d.addCallback(_check)
408             return d
409         d.addCallback(_upload_with_helper)
410
411         def _upload_duplicate_with_helper(res):
412             u = upload.Data(HELPER_DATA, convergence=convergence)
413             u.debug_stash_RemoteEncryptedUploadable = True
414             d = self.extra_node.upload(u)
415             def _uploaded(results):
416                 uri = results.uri
417                 return self.downloader.download_to_data(uri)
418             d.addCallback(_uploaded)
419             def _check(newdata):
420                 self.failUnlessEqual(newdata, HELPER_DATA)
421                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
422                             "uploadable started uploading, should have been avoided")
423             d.addCallback(_check)
424             return d
425         if convergence is not None:
426             d.addCallback(_upload_duplicate_with_helper)
427
428         def _upload_resumable(res):
429             DATA = "Data that needs help to upload and gets interrupted" * 1000
430             u1 = CountingDataUploadable(DATA, convergence=convergence)
431             u2 = CountingDataUploadable(DATA, convergence=convergence)
432
433             # we interrupt the connection after about 5kB by shutting down
434             # the helper, then restartingit.
435             u1.interrupt_after = 5000
436             u1.interrupt_after_d = defer.Deferred()
437             u1.interrupt_after_d.addCallback(lambda res:
438                                              self.bounce_client(0))
439
440             # sneak into the helper and reduce its chunk size, so that our
441             # debug_interrupt will sever the connection on about the fifth
442             # chunk fetched. This makes sure that we've started to write the
443             # new shares before we abandon them, which exercises the
444             # abort/delete-partial-share code. TODO: find a cleaner way to do
445             # this. I know that this will affect later uses of the helper in
446             # this same test run, but I'm not currently worried about it.
447             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
448
449             d = self.extra_node.upload(u1)
450
451             def _should_not_finish(res):
452                 self.fail("interrupted upload should have failed, not finished"
453                           " with result %s" % (res,))
454             def _interrupted(f):
455                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
456
457                 # make sure we actually interrupted it before finishing the
458                 # file
459                 self.failUnless(u1.bytes_read < len(DATA),
460                                 "read %d out of %d total" % (u1.bytes_read,
461                                                              len(DATA)))
462
463                 log.msg("waiting for reconnect", level=log.NOISY,
464                         facility="tahoe.test.test_system")
465                 # now, we need to give the nodes a chance to notice that this
466                 # connection has gone away. When this happens, the storage
467                 # servers will be told to abort their uploads, removing the
468                 # partial shares. Unfortunately this involves TCP messages
469                 # going through the loopback interface, and we can't easily
470                 # predict how long that will take. If it were all local, we
471                 # could use fireEventually() to stall. Since we don't have
472                 # the right introduction hooks, the best we can do is use a
473                 # fixed delay. TODO: this is fragile.
474                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
475                 return u1.interrupt_after_d
476             d.addCallbacks(_should_not_finish, _interrupted)
477
478             def _disconnected(res):
479                 # check to make sure the storage servers aren't still hanging
480                 # on to the partial share: their incoming/ directories should
481                 # now be empty.
482                 log.msg("disconnected", level=log.NOISY,
483                         facility="tahoe.test.test_system")
484                 for i in range(self.numclients):
485                     incdir = os.path.join(self.getdir("client%d" % i),
486                                           "storage", "shares", "incoming")
487                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
488             d.addCallback(_disconnected)
489
490             # then we need to give the reconnector a chance to
491             # reestablish the connection to the helper.
492             d.addCallback(lambda res:
493                           log.msg("wait_for_connections", level=log.NOISY,
494                                   facility="tahoe.test.test_system"))
495             d.addCallback(lambda res: self.wait_for_connections())
496
497
498             d.addCallback(lambda res:
499                           log.msg("uploading again", level=log.NOISY,
500                                   facility="tahoe.test.test_system"))
501             d.addCallback(lambda res: self.extra_node.upload(u2))
502
503             def _uploaded(results):
504                 uri = results.uri
505                 log.msg("Second upload complete", level=log.NOISY,
506                         facility="tahoe.test.test_system")
507
508                 # this is really bytes received rather than sent, but it's
509                 # convenient and basically measures the same thing
510                 bytes_sent = results.ciphertext_fetched
511
512                 # We currently don't support resumption of upload if the data is
513                 # encrypted with a random key.  (Because that would require us
514                 # to store the key locally and re-use it on the next upload of
515                 # this file, which isn't a bad thing to do, but we currently
516                 # don't do it.)
517                 if convergence is not None:
518                     # Make sure we did not have to read the whole file the
519                     # second time around .
520                     self.failUnless(bytes_sent < len(DATA),
521                                 "resumption didn't save us any work:"
522                                 " read %d bytes out of %d total" %
523                                 (bytes_sent, len(DATA)))
524                 else:
525                     # Make sure we did have to read the whole file the second
526                     # time around -- because the one that we partially uploaded
527                     # earlier was encrypted with a different random key.
528                     self.failIf(bytes_sent < len(DATA),
529                                 "resumption saved us some work even though we were using random keys:"
530                                 " read %d bytes out of %d total" %
531                                 (bytes_sent, len(DATA)))
532                 return self.downloader.download_to_data(uri)
533             d.addCallback(_uploaded)
534
535             def _check(newdata):
536                 self.failUnlessEqual(newdata, DATA)
537                 # If using convergent encryption, then also check that the
538                 # helper has removed the temp file from its directories.
539                 if convergence is not None:
540                     basedir = os.path.join(self.getdir("client0"), "helper")
541                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
542                     self.failUnlessEqual(files, [])
543                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
544                     self.failUnlessEqual(files, [])
545             d.addCallback(_check)
546             return d
547         d.addCallback(_upload_resumable)
548
549         return d
550
551     def _find_shares(self, basedir):
552         shares = []
553         for (dirpath, dirnames, filenames) in os.walk(basedir):
554             if "storage" not in dirpath:
555                 continue
556             if not filenames:
557                 continue
558             pieces = dirpath.split(os.sep)
559             if pieces[-4] == "storage" and pieces[-3] == "shares":
560                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
561                 # are sharefiles here
562                 assert pieces[-5].startswith("client")
563                 client_num = int(pieces[-5][-1])
564                 storage_index_s = pieces[-1]
565                 storage_index = storage.si_a2b(storage_index_s)
566                 for sharename in filenames:
567                     shnum = int(sharename)
568                     filename = os.path.join(dirpath, sharename)
569                     data = (client_num, storage_index, filename, shnum)
570                     shares.append(data)
571         if not shares:
572             self.fail("unable to find any share files in %s" % basedir)
573         return shares
574
575     def _corrupt_mutable_share(self, filename, which):
576         msf = storage.MutableShareFile(filename)
577         datav = msf.readv([ (0, 1000000) ])
578         final_share = datav[0]
579         assert len(final_share) < 1000000 # ought to be truncated
580         pieces = mutable_layout.unpack_share(final_share)
581         (seqnum, root_hash, IV, k, N, segsize, datalen,
582          verification_key, signature, share_hash_chain, block_hash_tree,
583          share_data, enc_privkey) = pieces
584
585         if which == "seqnum":
586             seqnum = seqnum + 15
587         elif which == "R":
588             root_hash = self.flip_bit(root_hash)
589         elif which == "IV":
590             IV = self.flip_bit(IV)
591         elif which == "segsize":
592             segsize = segsize + 15
593         elif which == "pubkey":
594             verification_key = self.flip_bit(verification_key)
595         elif which == "signature":
596             signature = self.flip_bit(signature)
597         elif which == "share_hash_chain":
598             nodenum = share_hash_chain.keys()[0]
599             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
600         elif which == "block_hash_tree":
601             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
602         elif which == "share_data":
603             share_data = self.flip_bit(share_data)
604         elif which == "encprivkey":
605             enc_privkey = self.flip_bit(enc_privkey)
606
607         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
608                                             segsize, datalen)
609         final_share = mutable_layout.pack_share(prefix,
610                                                 verification_key,
611                                                 signature,
612                                                 share_hash_chain,
613                                                 block_hash_tree,
614                                                 share_data,
615                                                 enc_privkey)
616         msf.writev( [(0, final_share)], None)
617
618     def test_mutable(self):
619         self.basedir = "system/SystemTest/test_mutable"
620         DATA = "initial contents go here."  # 25 bytes % 3 != 0
621         NEWDATA = "new contents yay"
622         NEWERDATA = "this is getting old"
623
624         d = self.set_up_nodes()
625
626         def _create_mutable(res):
627             c = self.clients[0]
628             log.msg("starting create_mutable_file")
629             d1 = c.create_mutable_file(DATA)
630             def _done(res):
631                 log.msg("DONE: %s" % (res,))
632                 self._mutable_node_1 = res
633                 uri = res.get_uri()
634             d1.addCallback(_done)
635             return d1
636         d.addCallback(_create_mutable)
637
638         def _test_debug(res):
639             # find a share. It is important to run this while there is only
640             # one slot in the grid.
641             shares = self._find_shares(self.basedir)
642             (client_num, storage_index, filename, shnum) = shares[0]
643             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
644                     % filename)
645             log.msg(" for clients[%d]" % client_num)
646
647             out,err = StringIO(), StringIO()
648             rc = runner.runner(["dump-share",
649                                 filename],
650                                stdout=out, stderr=err)
651             output = out.getvalue()
652             self.failUnlessEqual(rc, 0)
653             try:
654                 self.failUnless("Mutable slot found:\n" in output)
655                 self.failUnless("share_type: SDMF\n" in output)
656                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
657                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
658                 self.failUnless(" num_extra_leases: 0\n" in output)
659                 # the pubkey size can vary by a byte, so the container might
660                 # be a bit larger on some runs.
661                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
662                 self.failUnless(m)
663                 container_size = int(m.group(1))
664                 self.failUnless(2037 <= container_size <= 2049, container_size)
665                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
666                 self.failUnless(m)
667                 data_length = int(m.group(1))
668                 self.failUnless(2037 <= data_length <= 2049, data_length)
669                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
670                                 in output)
671                 self.failUnless(" SDMF contents:\n" in output)
672                 self.failUnless("  seqnum: 1\n" in output)
673                 self.failUnless("  required_shares: 3\n" in output)
674                 self.failUnless("  total_shares: 10\n" in output)
675                 self.failUnless("  segsize: 27\n" in output, (output, filename))
676                 self.failUnless("  datalen: 25\n" in output)
677                 # the exact share_hash_chain nodes depends upon the sharenum,
678                 # and is more of a hassle to compute than I want to deal with
679                 # now
680                 self.failUnless("  share_hash_chain: " in output)
681                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
682             except unittest.FailTest:
683                 print
684                 print "dump-share output was:"
685                 print output
686                 raise
687         d.addCallback(_test_debug)
688
689         # test retrieval
690
691         # first, let's see if we can use the existing node to retrieve the
692         # contents. This allows it to use the cached pubkey and maybe the
693         # latest-known sharemap.
694
695         d.addCallback(lambda res: self._mutable_node_1.download_to_data())
696         def _check_download_1(res):
697             self.failUnlessEqual(res, DATA)
698             # now we see if we can retrieve the data from a new node,
699             # constructed using the URI of the original one. We do this test
700             # on the same client that uploaded the data.
701             uri = self._mutable_node_1.get_uri()
702             log.msg("starting retrieve1")
703             newnode = self.clients[0].create_node_from_uri(uri)
704             return newnode.download_to_data()
705         d.addCallback(_check_download_1)
706
707         def _check_download_2(res):
708             self.failUnlessEqual(res, DATA)
709             # same thing, but with a different client
710             uri = self._mutable_node_1.get_uri()
711             newnode = self.clients[1].create_node_from_uri(uri)
712             log.msg("starting retrieve2")
713             d1 = newnode.download_to_data()
714             d1.addCallback(lambda res: (res, newnode))
715             return d1
716         d.addCallback(_check_download_2)
717
718         def _check_download_3((res, newnode)):
719             self.failUnlessEqual(res, DATA)
720             # replace the data
721             log.msg("starting replace1")
722             d1 = newnode.update(NEWDATA)
723             d1.addCallback(lambda res: newnode.download_to_data())
724             return d1
725         d.addCallback(_check_download_3)
726
727         def _check_download_4(res):
728             self.failUnlessEqual(res, NEWDATA)
729             # now create an even newer node and replace the data on it. This
730             # new node has never been used for download before.
731             uri = self._mutable_node_1.get_uri()
732             newnode1 = self.clients[2].create_node_from_uri(uri)
733             newnode2 = self.clients[3].create_node_from_uri(uri)
734             self._newnode3 = self.clients[3].create_node_from_uri(uri)
735             log.msg("starting replace2")
736             d1 = newnode1.overwrite(NEWERDATA)
737             d1.addCallback(lambda res: newnode2.download_to_data())
738             return d1
739         d.addCallback(_check_download_4)
740
741         def _check_download_5(res):
742             log.msg("finished replace2")
743             self.failUnlessEqual(res, NEWERDATA)
744         d.addCallback(_check_download_5)
745
746         def _corrupt_shares(res):
747             # run around and flip bits in all but k of the shares, to test
748             # the hash checks
749             shares = self._find_shares(self.basedir)
750             ## sort by share number
751             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
752             where = dict([ (shnum, filename)
753                            for (client_num, storage_index, filename, shnum)
754                            in shares ])
755             assert len(where) == 10 # this test is designed for 3-of-10
756             for shnum, filename in where.items():
757                 # shares 7,8,9 are left alone. read will check
758                 # (share_hash_chain, block_hash_tree, share_data). New
759                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
760                 # segsize, signature).
761                 if shnum == 0:
762                     # read: this will trigger "pubkey doesn't match
763                     # fingerprint".
764                     self._corrupt_mutable_share(filename, "pubkey")
765                     self._corrupt_mutable_share(filename, "encprivkey")
766                 elif shnum == 1:
767                     # triggers "signature is invalid"
768                     self._corrupt_mutable_share(filename, "seqnum")
769                 elif shnum == 2:
770                     # triggers "signature is invalid"
771                     self._corrupt_mutable_share(filename, "R")
772                 elif shnum == 3:
773                     # triggers "signature is invalid"
774                     self._corrupt_mutable_share(filename, "segsize")
775                 elif shnum == 4:
776                     self._corrupt_mutable_share(filename, "share_hash_chain")
777                 elif shnum == 5:
778                     self._corrupt_mutable_share(filename, "block_hash_tree")
779                 elif shnum == 6:
780                     self._corrupt_mutable_share(filename, "share_data")
781                 # other things to correct: IV, signature
782                 # 7,8,9 are left alone
783
784                 # note that initial_query_count=5 means that we'll hit the
785                 # first 5 servers in effectively random order (based upon
786                 # response time), so we won't necessarily ever get a "pubkey
787                 # doesn't match fingerprint" error (if we hit shnum>=1 before
788                 # shnum=0, we pull the pubkey from there). To get repeatable
789                 # specific failures, we need to set initial_query_count=1,
790                 # but of course that will change the sequencing behavior of
791                 # the retrieval process. TODO: find a reasonable way to make
792                 # this a parameter, probably when we expand this test to test
793                 # for one failure mode at a time.
794
795                 # when we retrieve this, we should get three signature
796                 # failures (where we've mangled seqnum, R, and segsize). The
797                 # pubkey mangling
798         d.addCallback(_corrupt_shares)
799
800         d.addCallback(lambda res: self._newnode3.download_to_data())
801         d.addCallback(_check_download_5)
802
803         def _check_empty_file(res):
804             # make sure we can create empty files, this usually screws up the
805             # segsize math
806             d1 = self.clients[2].create_mutable_file("")
807             d1.addCallback(lambda newnode: newnode.download_to_data())
808             d1.addCallback(lambda res: self.failUnlessEqual("", res))
809             return d1
810         d.addCallback(_check_empty_file)
811
812         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
813         def _created_dirnode(dnode):
814             log.msg("_created_dirnode(%s)" % (dnode,))
815             d1 = dnode.list()
816             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
817             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
818             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
819             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
820             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
821             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
822             d1.addCallback(lambda res: dnode.build_manifest())
823             d1.addCallback(lambda manifest:
824                            self.failUnlessEqual(len(manifest), 1))
825             return d1
826         d.addCallback(_created_dirnode)
827
828         def wait_for_c3_kg_conn():
829             return self.clients[3]._key_generator is not None
830         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
831
832         def check_kg_poolsize(junk, size_delta):
833             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
834                                  self.key_generator_svc.key_generator.pool_size + size_delta)
835
836         d.addCallback(check_kg_poolsize, 0)
837         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
838         d.addCallback(check_kg_poolsize, -1)
839         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
840         d.addCallback(check_kg_poolsize, -2)
841         # use_helper induces use of clients[3], which is the using-key_gen client
842         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
843         d.addCallback(check_kg_poolsize, -3)
844
845         return d
846     # The default 120 second timeout went off when running it under valgrind
847     # on my old Windows laptop, so I'm bumping up the timeout.
848     test_mutable.timeout = 240
849
850     def flip_bit(self, good):
851         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
852
853     def mangle_uri(self, gooduri):
854         # change the key, which changes the storage index, which means we'll
855         # be asking about the wrong file, so nobody will have any shares
856         u = IFileURI(gooduri)
857         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
858                             uri_extension_hash=u.uri_extension_hash,
859                             needed_shares=u.needed_shares,
860                             total_shares=u.total_shares,
861                             size=u.size)
862         return u2.to_string()
863
864     # TODO: add a test which mangles the uri_extension_hash instead, and
865     # should fail due to not being able to get a valid uri_extension block.
866     # Also a test which sneakily mangles the uri_extension block to change
867     # some of the validation data, so it will fail in the post-download phase
868     # when the file's crypttext integrity check fails. Do the same thing for
869     # the key, which should cause the download to fail the post-download
870     # plaintext_hash check.
871
872     def test_vdrive(self):
873         self.basedir = "system/SystemTest/test_vdrive"
874         self.data = LARGE_DATA
875         d = self.set_up_nodes(createprivdir=True)
876         d.addCallback(self._test_introweb)
877         d.addCallback(self.log, "starting publish")
878         d.addCallback(self._do_publish1)
879         d.addCallback(self._test_runner)
880         d.addCallback(self._do_publish2)
881         # at this point, we have the following filesystem (where "R" denotes
882         # self._root_directory_uri):
883         # R
884         # R/subdir1
885         # R/subdir1/mydata567
886         # R/subdir1/subdir2/
887         # R/subdir1/subdir2/mydata992
888
889         d.addCallback(lambda res: self.bounce_client(0))
890         d.addCallback(self.log, "bounced client0")
891
892         d.addCallback(self._check_publish1)
893         d.addCallback(self.log, "did _check_publish1")
894         d.addCallback(self._check_publish2)
895         d.addCallback(self.log, "did _check_publish2")
896         d.addCallback(self._do_publish_private)
897         d.addCallback(self.log, "did _do_publish_private")
898         # now we also have (where "P" denotes a new dir):
899         #  P/personal/sekrit data
900         #  P/s2-rw -> /subdir1/subdir2/
901         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
902         d.addCallback(self._check_publish_private)
903         d.addCallback(self.log, "did _check_publish_private")
904         d.addCallback(self._test_web)
905         d.addCallback(self._test_control)
906         d.addCallback(self._test_cli)
907         # P now has four top-level children:
908         # P/personal/sekrit data
909         # P/s2-ro/
910         # P/s2-rw/
911         # P/test_put/  (empty)
912         d.addCallback(self._test_checker)
913         d.addCallback(self._test_verifier)
914         d.addCallback(self._grab_stats)
915         return d
916     test_vdrive.timeout = 1100
917
918     def _test_introweb(self, res):
919         d = getPage(self.introweb_url, method="GET", followRedirect=True)
920         def _check(res):
921             try:
922                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
923                                 in res)
924                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
925                 self.failUnless("Subscription Summary: storage: 5" in res)
926             except unittest.FailTest:
927                 print
928                 print "GET %s output was:" % self.introweb_url
929                 print res
930                 raise
931         d.addCallback(_check)
932         d.addCallback(lambda res:
933                       getPage(self.introweb_url + "?t=json",
934                               method="GET", followRedirect=True))
935         def _check_json(res):
936             data = simplejson.loads(res)
937             try:
938                 self.failUnlessEqual(data["subscription_summary"],
939                                      {"storage": 5})
940                 self.failUnlessEqual(data["announcement_summary"],
941                                      {"storage": 5, "stub_client": 5})
942             except unittest.FailTest:
943                 print
944                 print "GET %s?t=json output was:" % self.introweb_url
945                 print res
946                 raise
947         d.addCallback(_check_json)
948         return d
949
950     def _do_publish1(self, res):
951         ut = upload.Data(self.data, convergence=None)
952         c0 = self.clients[0]
953         d = c0.create_empty_dirnode()
954         def _made_root(new_dirnode):
955             self._root_directory_uri = new_dirnode.get_uri()
956             return c0.create_node_from_uri(self._root_directory_uri)
957         d.addCallback(_made_root)
958         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
959         def _made_subdir1(subdir1_node):
960             self._subdir1_node = subdir1_node
961             d1 = subdir1_node.add_file(u"mydata567", ut)
962             d1.addCallback(self.log, "publish finished")
963             def _stash_uri(filenode):
964                 self.uri = filenode.get_uri()
965             d1.addCallback(_stash_uri)
966             return d1
967         d.addCallback(_made_subdir1)
968         return d
969
970     def _do_publish2(self, res):
971         ut = upload.Data(self.data, convergence=None)
972         d = self._subdir1_node.create_empty_directory(u"subdir2")
973         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
974         return d
975
976     def log(self, res, msg, **kwargs):
977         # print "MSG: %s  RES: %s" % (msg, res)
978         log.msg(msg, **kwargs)
979         return res
980
981     def stall(self, res, delay=1.0):
982         d = defer.Deferred()
983         reactor.callLater(delay, d.callback, res)
984         return d
985
986     def _do_publish_private(self, res):
987         self.smalldata = "sssh, very secret stuff"
988         ut = upload.Data(self.smalldata, convergence=None)
989         d = self.clients[0].create_empty_dirnode()
990         d.addCallback(self.log, "GOT private directory")
991         def _got_new_dir(privnode):
992             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
993             d1 = privnode.create_empty_directory(u"personal")
994             d1.addCallback(self.log, "made P/personal")
995             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
996             d1.addCallback(self.log, "made P/personal/sekrit data")
997             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
998             def _got_s2(s2node):
999                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
1000                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
1001                 return d2
1002             d1.addCallback(_got_s2)
1003             d1.addCallback(lambda res: privnode)
1004             return d1
1005         d.addCallback(_got_new_dir)
1006         return d
1007
1008     def _check_publish1(self, res):
1009         # this one uses the iterative API
1010         c1 = self.clients[1]
1011         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
1012         d.addCallback(self.log, "check_publish1 got /")
1013         d.addCallback(lambda root: root.get(u"subdir1"))
1014         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
1015         d.addCallback(lambda filenode: filenode.download_to_data())
1016         d.addCallback(self.log, "get finished")
1017         def _get_done(data):
1018             self.failUnlessEqual(data, self.data)
1019         d.addCallback(_get_done)
1020         return d
1021
1022     def _check_publish2(self, res):
1023         # this one uses the path-based API
1024         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
1025         d = rootnode.get_child_at_path(u"subdir1")
1026         d.addCallback(lambda dirnode:
1027                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
1028         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1029         d.addCallback(lambda filenode: filenode.download_to_data())
1030         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
1031
1032         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1033         def _got_filenode(filenode):
1034             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
1035             assert fnode == filenode
1036         d.addCallback(_got_filenode)
1037         return d
1038
1039     def _check_publish_private(self, resnode):
1040         # this one uses the path-based API
1041         self._private_node = resnode
1042
1043         d = self._private_node.get_child_at_path(u"personal")
1044         def _got_personal(personal):
1045             self._personal_node = personal
1046             return personal
1047         d.addCallback(_got_personal)
1048
1049         d.addCallback(lambda dirnode:
1050                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
1051         def get_path(path):
1052             return self._private_node.get_child_at_path(path)
1053
1054         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
1055         d.addCallback(lambda filenode: filenode.download_to_data())
1056         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
1057         d.addCallback(lambda res: get_path(u"s2-rw"))
1058         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
1059         d.addCallback(lambda res: get_path(u"s2-ro"))
1060         def _got_s2ro(dirnode):
1061             self.failUnless(dirnode.is_mutable(), dirnode)
1062             self.failUnless(dirnode.is_readonly(), dirnode)
1063             d1 = defer.succeed(None)
1064             d1.addCallback(lambda res: dirnode.list())
1065             d1.addCallback(self.log, "dirnode.list")
1066
1067             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
1068
1069             d1.addCallback(self.log, "doing add_file(ro)")
1070             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
1071             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
1072
1073             d1.addCallback(self.log, "doing get(ro)")
1074             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
1075             d1.addCallback(lambda filenode:
1076                            self.failUnless(IFileNode.providedBy(filenode)))
1077
1078             d1.addCallback(self.log, "doing delete(ro)")
1079             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
1080
1081             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
1082
1083             d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
1084
1085             personal = self._personal_node
1086             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
1087
1088             d1.addCallback(self.log, "doing move_child_to(ro)2")
1089             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
1090
1091             d1.addCallback(self.log, "finished with _got_s2ro")
1092             return d1
1093         d.addCallback(_got_s2ro)
1094         def _got_home(dummy):
1095             home = self._private_node
1096             personal = self._personal_node
1097             d1 = defer.succeed(None)
1098             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
1099             d1.addCallback(lambda res:
1100                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
1101
1102             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
1103             d1.addCallback(lambda res:
1104                            home.move_child_to(u"sekrit", home, u"sekrit data"))
1105
1106             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
1107             d1.addCallback(lambda res:
1108                            home.move_child_to(u"sekrit data", personal))
1109
1110             d1.addCallback(lambda res: home.build_manifest())
1111             d1.addCallback(self.log, "manifest")
1112             #  four items:
1113             # P/personal/
1114             # P/personal/sekrit data
1115             # P/s2-rw  (same as P/s2-ro)
1116             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1117             d1.addCallback(lambda manifest:
1118                            self.failUnlessEqual(len(manifest), 4))
1119             return d1
1120         d.addCallback(_got_home)
1121         return d
1122
1123     def shouldFail(self, res, expected_failure, which, substring=None):
1124         if isinstance(res, Failure):
1125             res.trap(expected_failure)
1126             if substring:
1127                 self.failUnless(substring in str(res),
1128                                 "substring '%s' not in '%s'"
1129                                 % (substring, str(res)))
1130         else:
1131             self.fail("%s was supposed to raise %s, not get '%s'" %
1132                       (which, expected_failure, res))
1133
1134     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1135         assert substring is None or isinstance(substring, str)
1136         d = defer.maybeDeferred(callable, *args, **kwargs)
1137         def done(res):
1138             if isinstance(res, Failure):
1139                 res.trap(expected_failure)
1140                 if substring:
1141                     self.failUnless(substring in str(res),
1142                                     "substring '%s' not in '%s'"
1143                                     % (substring, str(res)))
1144             else:
1145                 self.fail("%s was supposed to raise %s, not get '%s'" %
1146                           (which, expected_failure, res))
1147         d.addBoth(done)
1148         return d
1149
1150     def PUT(self, urlpath, data):
1151         url = self.webish_url + urlpath
1152         return getPage(url, method="PUT", postdata=data)
1153
1154     def GET(self, urlpath, followRedirect=False):
1155         url = self.webish_url + urlpath
1156         return getPage(url, method="GET", followRedirect=followRedirect)
1157
1158     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1159         if use_helper:
1160             url = self.helper_webish_url + urlpath
1161         else:
1162             url = self.webish_url + urlpath
1163         sepbase = "boogabooga"
1164         sep = "--" + sepbase
1165         form = []
1166         form.append(sep)
1167         form.append('Content-Disposition: form-data; name="_charset"')
1168         form.append('')
1169         form.append('UTF-8')
1170         form.append(sep)
1171         for name, value in fields.iteritems():
1172             if isinstance(value, tuple):
1173                 filename, value = value
1174                 form.append('Content-Disposition: form-data; name="%s"; '
1175                             'filename="%s"' % (name, filename.encode("utf-8")))
1176             else:
1177                 form.append('Content-Disposition: form-data; name="%s"' % name)
1178             form.append('')
1179             form.append(str(value))
1180             form.append(sep)
1181         form[-1] += "--"
1182         body = "\r\n".join(form) + "\r\n"
1183         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1184                    }
1185         return getPage(url, method="POST", postdata=body,
1186                        headers=headers, followRedirect=followRedirect)
1187
1188     def _test_web(self, res):
1189         base = self.webish_url
1190         public = "uri/" + self._root_directory_uri
1191         d = getPage(base)
1192         def _got_welcome(page):
1193             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1194             self.failUnless(expected in page,
1195                             "I didn't see the right 'connected storage servers'"
1196                             " message in: %s" % page
1197                             )
1198             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1199             self.failUnless(expected in page,
1200                             "I didn't see the right 'My nodeid' message "
1201                             "in: %s" % page)
1202             self.failUnless("Helper: 0 active uploads" in page)
1203         d.addCallback(_got_welcome)
1204         d.addCallback(self.log, "done with _got_welcome")
1205
1206         # get the welcome page from the node that uses the helper too
1207         d.addCallback(lambda res: getPage(self.helper_webish_url))
1208         def _got_welcome_helper(page):
1209             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1210                             page)
1211             self.failUnless("Not running helper" in page)
1212         d.addCallback(_got_welcome_helper)
1213
1214         d.addCallback(lambda res: getPage(base + public))
1215         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1216         def _got_subdir1(page):
1217             # there ought to be an href for our file
1218             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1219             self.failUnless(">mydata567</a>" in page)
1220         d.addCallback(_got_subdir1)
1221         d.addCallback(self.log, "done with _got_subdir1")
1222         d.addCallback(lambda res:
1223                       getPage(base + public + "/subdir1/mydata567"))
1224         def _got_data(page):
1225             self.failUnlessEqual(page, self.data)
1226         d.addCallback(_got_data)
1227
1228         # download from a URI embedded in a URL
1229         d.addCallback(self.log, "_get_from_uri")
1230         def _get_from_uri(res):
1231             return getPage(base + "uri/%s?filename=%s"
1232                            % (self.uri, "mydata567"))
1233         d.addCallback(_get_from_uri)
1234         def _got_from_uri(page):
1235             self.failUnlessEqual(page, self.data)
1236         d.addCallback(_got_from_uri)
1237
1238         # download from a URI embedded in a URL, second form
1239         d.addCallback(self.log, "_get_from_uri2")
1240         def _get_from_uri2(res):
1241             return getPage(base + "uri?uri=%s" % (self.uri,))
1242         d.addCallback(_get_from_uri2)
1243         d.addCallback(_got_from_uri)
1244
1245         # download from a bogus URI, make sure we get a reasonable error
1246         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1247         def _get_from_bogus_uri(res):
1248             d1 = getPage(base + "uri/%s?filename=%s"
1249                          % (self.mangle_uri(self.uri), "mydata567"))
1250             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1251                        "410")
1252             return d1
1253         d.addCallback(_get_from_bogus_uri)
1254         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1255
1256         # upload a file with PUT
1257         d.addCallback(self.log, "about to try PUT")
1258         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1259                                            "new.txt contents"))
1260         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1261         d.addCallback(self.failUnlessEqual, "new.txt contents")
1262         # and again with something large enough to use multiple segments,
1263         # and hopefully trigger pauseProducing too
1264         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1265                                            "big" * 500000)) # 1.5MB
1266         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1267         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1268
1269         # can we replace files in place?
1270         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1271                                            "NEWER contents"))
1272         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1273         d.addCallback(self.failUnlessEqual, "NEWER contents")
1274
1275         # test unlinked POST
1276         d.addCallback(lambda res: self.POST("uri", t="upload",
1277                                             file=("new.txt", "data" * 10000)))
1278         # and again using the helper, which exercises different upload-status
1279         # display code
1280         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1281                                             file=("foo.txt", "data2" * 10000)))
1282
1283         # check that the status page exists
1284         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1285         def _got_status(res):
1286             # find an interesting upload and download to look at. LIT files
1287             # are not interesting.
1288             for dl in self.clients[0].list_recent_downloads():
1289                 if dl.get_size() > 200:
1290                     self._down_status = dl.get_counter()
1291             for ul in self.clients[0].list_recent_uploads():
1292                 if ul.get_size() > 200:
1293                     self._up_status = ul.get_counter()
1294             rs = self.clients[0].list_recent_retrieve()[0]
1295             self._retrieve_status = rs.get_counter()
1296             ps = self.clients[0].list_recent_publish()[0]
1297             self._publish_status = ps.get_counter()
1298
1299             # and that there are some upload- and download- status pages
1300             return self.GET("status/up-%d" % self._up_status)
1301         d.addCallback(_got_status)
1302         def _got_up(res):
1303             return self.GET("status/down-%d" % self._down_status)
1304         d.addCallback(_got_up)
1305         def _got_down(res):
1306             return self.GET("status/publish-%d" % self._publish_status)
1307         d.addCallback(_got_down)
1308         def _got_publish(res):
1309             return self.GET("status/retrieve-%d" % self._retrieve_status)
1310         d.addCallback(_got_publish)
1311
1312         # check that the helper status page exists
1313         d.addCallback(lambda res:
1314                       self.GET("helper_status", followRedirect=True))
1315         def _got_helper_status(res):
1316             self.failUnless("Bytes Fetched:" in res)
1317             # touch a couple of files in the helper's working directory to
1318             # exercise more code paths
1319             workdir = os.path.join(self.getdir("client0"), "helper")
1320             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1321             f = open(incfile, "wb")
1322             f.write("small file")
1323             f.close()
1324             then = time.time() - 86400*3
1325             now = time.time()
1326             os.utime(incfile, (now, then))
1327             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1328             f = open(encfile, "wb")
1329             f.write("less small file")
1330             f.close()
1331             os.utime(encfile, (now, then))
1332         d.addCallback(_got_helper_status)
1333         # and that the json form exists
1334         d.addCallback(lambda res:
1335                       self.GET("helper_status?t=json", followRedirect=True))
1336         def _got_helper_status_json(res):
1337             data = simplejson.loads(res)
1338             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1339                                  1)
1340             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1341             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1342             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1343                                  10)
1344             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1345             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1346             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1347                                  15)
1348         d.addCallback(_got_helper_status_json)
1349
1350         # and check that client[3] (which uses a helper but does not run one
1351         # itself) doesn't explode when you ask for its status
1352         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1353         def _got_non_helper_status(res):
1354             self.failUnless("Upload and Download Status" in res)
1355         d.addCallback(_got_non_helper_status)
1356
1357         # or for helper status with t=json
1358         d.addCallback(lambda res:
1359                       getPage(self.helper_webish_url + "helper_status?t=json"))
1360         def _got_non_helper_status_json(res):
1361             data = simplejson.loads(res)
1362             self.failUnlessEqual(data, {})
1363         d.addCallback(_got_non_helper_status_json)
1364
1365         # see if the statistics page exists
1366         d.addCallback(lambda res: self.GET("statistics"))
1367         def _got_stats(res):
1368             self.failUnless("Node Statistics" in res)
1369             self.failUnless("  'downloader.files_downloaded': 8," in res)
1370         d.addCallback(_got_stats)
1371         d.addCallback(lambda res: self.GET("statistics?t=json"))
1372         def _got_stats_json(res):
1373             data = simplejson.loads(res)
1374             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1375             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1376         d.addCallback(_got_stats_json)
1377
1378         # TODO: mangle the second segment of a file, to test errors that
1379         # occur after we've already sent some good data, which uses a
1380         # different error path.
1381
1382         # TODO: download a URI with a form
1383         # TODO: create a directory by using a form
1384         # TODO: upload by using a form on the directory page
1385         #    url = base + "somedir/subdir1/freeform_post!!upload"
1386         # TODO: delete a file by using a button on the directory page
1387
1388         return d
1389
1390     def _test_runner(self, res):
1391         # exercise some of the diagnostic tools in runner.py
1392
1393         # find a share
1394         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1395             if "storage" not in dirpath:
1396                 continue
1397             if not filenames:
1398                 continue
1399             pieces = dirpath.split(os.sep)
1400             if pieces[-4] == "storage" and pieces[-3] == "shares":
1401                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1402                 # are sharefiles here
1403                 filename = os.path.join(dirpath, filenames[0])
1404                 # peek at the magic to see if it is a chk share
1405                 magic = open(filename, "rb").read(4)
1406                 if magic == '\x00\x00\x00\x01':
1407                     break
1408         else:
1409             self.fail("unable to find any uri_extension files in %s"
1410                       % self.basedir)
1411         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1412
1413         out,err = StringIO(), StringIO()
1414         rc = runner.runner(["dump-share",
1415                             filename],
1416                            stdout=out, stderr=err)
1417         output = out.getvalue()
1418         self.failUnlessEqual(rc, 0)
1419
1420         # we only upload a single file, so we can assert some things about
1421         # its size and shares.
1422         self.failUnless(("share filename: %s" % filename) in output)
1423         self.failUnless("size: %d\n" % len(self.data) in output)
1424         self.failUnless("num_segments: 1\n" in output)
1425         # segment_size is always a multiple of needed_shares
1426         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1427         self.failUnless("total_shares: 10\n" in output)
1428         # keys which are supposed to be present
1429         for key in ("size", "num_segments", "segment_size",
1430                     "needed_shares", "total_shares",
1431                     "codec_name", "codec_params", "tail_codec_params",
1432                     #"plaintext_hash", "plaintext_root_hash",
1433                     "crypttext_hash", "crypttext_root_hash",
1434                     "share_root_hash", "UEB_hash"):
1435             self.failUnless("%s: " % key in output, key)
1436
1437         # now use its storage index to find the other shares using the
1438         # 'find-shares' tool
1439         sharedir, shnum = os.path.split(filename)
1440         storagedir, storage_index_s = os.path.split(sharedir)
1441         out,err = StringIO(), StringIO()
1442         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1443         cmd = ["find-shares", storage_index_s] + nodedirs
1444         rc = runner.runner(cmd, stdout=out, stderr=err)
1445         self.failUnlessEqual(rc, 0)
1446         out.seek(0)
1447         sharefiles = [sfn.strip() for sfn in out.readlines()]
1448         self.failUnlessEqual(len(sharefiles), 10)
1449
1450         # also exercise the 'catalog-shares' tool
1451         out,err = StringIO(), StringIO()
1452         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1453         cmd = ["catalog-shares"] + nodedirs
1454         rc = runner.runner(cmd, stdout=out, stderr=err)
1455         self.failUnlessEqual(rc, 0)
1456         out.seek(0)
1457         descriptions = [sfn.strip() for sfn in out.readlines()]
1458         self.failUnlessEqual(len(descriptions), 30)
1459         matching = [line
1460                     for line in descriptions
1461                     if line.startswith("CHK %s " % storage_index_s)]
1462         self.failUnlessEqual(len(matching), 10)
1463
1464     def _test_control(self, res):
1465         # exercise the remote-control-the-client foolscap interfaces in
1466         # allmydata.control (mostly used for performance tests)
1467         c0 = self.clients[0]
1468         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1469         control_furl = open(control_furl_file, "r").read().strip()
1470         # it doesn't really matter which Tub we use to connect to the client,
1471         # so let's just use our IntroducerNode's
1472         d = self.introducer.tub.getReference(control_furl)
1473         d.addCallback(self._test_control2, control_furl_file)
1474         return d
1475     def _test_control2(self, rref, filename):
1476         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1477         downfile = os.path.join(self.basedir, "control.downfile")
1478         d.addCallback(lambda uri:
1479                       rref.callRemote("download_from_uri_to_file",
1480                                       uri, downfile))
1481         def _check(res):
1482             self.failUnlessEqual(res, downfile)
1483             data = open(downfile, "r").read()
1484             expected_data = open(filename, "r").read()
1485             self.failUnlessEqual(data, expected_data)
1486         d.addCallback(_check)
1487         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1488         if sys.platform == "linux2":
1489             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1490         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1491         return d
1492
1493     def _test_cli(self, res):
1494         # run various CLI commands (in a thread, since they use blocking
1495         # network calls)
1496
1497         private_uri = self._private_node.get_uri()
1498         some_uri = self._root_directory_uri
1499         client0_basedir = self.getdir("client0")
1500
1501         nodeargs = [
1502             "--node-directory", client0_basedir,
1503             "--dir-cap", private_uri,
1504             ]
1505         public_nodeargs = [
1506             "--node-url", self.webish_url,
1507             "--dir-cap", some_uri,
1508             ]
1509         TESTDATA = "I will not write the same thing over and over.\n" * 100
1510
1511         d = defer.succeed(None)
1512
1513         def _ls_root(res):
1514             argv = ["ls"] + nodeargs
1515             return self._run_cli(argv)
1516         d.addCallback(_ls_root)
1517         def _check_ls_root((out,err)):
1518             self.failUnless("personal" in out)
1519             self.failUnless("s2-ro" in out)
1520             self.failUnless("s2-rw" in out)
1521             self.failUnlessEqual(err, "")
1522         d.addCallback(_check_ls_root)
1523
1524         def _ls_subdir(res):
1525             argv = ["ls"] + nodeargs + ["personal"]
1526             return self._run_cli(argv)
1527         d.addCallback(_ls_subdir)
1528         def _check_ls_subdir((out,err)):
1529             self.failUnless("sekrit data" in out)
1530             self.failUnlessEqual(err, "")
1531         d.addCallback(_check_ls_subdir)
1532
1533         def _ls_public_subdir(res):
1534             argv = ["ls"] + public_nodeargs + ["subdir1"]
1535             return self._run_cli(argv)
1536         d.addCallback(_ls_public_subdir)
1537         def _check_ls_public_subdir((out,err)):
1538             self.failUnless("subdir2" in out)
1539             self.failUnless("mydata567" in out)
1540             self.failUnlessEqual(err, "")
1541         d.addCallback(_check_ls_public_subdir)
1542
1543         def _ls_file(res):
1544             argv = ["ls"] + public_nodeargs + ["subdir1/mydata567"]
1545             return self._run_cli(argv)
1546         d.addCallback(_ls_file)
1547         def _check_ls_file((out,err)):
1548             self.failUnlessEqual(out.strip(), "112 subdir1/mydata567")
1549             self.failUnlessEqual(err, "")
1550         d.addCallback(_check_ls_file)
1551
1552         # tahoe_ls doesn't currently handle the error correctly: it tries to
1553         # JSON-parse a traceback.
1554 ##         def _ls_missing(res):
1555 ##             argv = ["ls"] + nodeargs + ["bogus"]
1556 ##             return self._run_cli(argv)
1557 ##         d.addCallback(_ls_missing)
1558 ##         def _check_ls_missing((out,err)):
1559 ##             print "OUT", out
1560 ##             print "ERR", err
1561 ##             self.failUnlessEqual(err, "")
1562 ##         d.addCallback(_check_ls_missing)
1563
1564         def _put(res):
1565             tdir = self.getdir("cli_put")
1566             fileutil.make_dirs(tdir)
1567             fn = os.path.join(tdir, "upload_me")
1568             f = open(fn, "wb")
1569             f.write(TESTDATA)
1570             f.close()
1571             argv = ["put"] + nodeargs + [fn, "test_put/upload.txt"]
1572             return self._run_cli(argv)
1573         d.addCallback(_put)
1574         def _check_put((out,err)):
1575             self.failUnless("200 OK" in out)
1576             self.failUnlessEqual(err, "")
1577             d = self._private_node.get_child_at_path(u"test_put/upload.txt")
1578             d.addCallback(lambda filenode: filenode.download_to_data())
1579             def _check_put2(res):
1580                 self.failUnlessEqual(res, TESTDATA)
1581             d.addCallback(_check_put2)
1582             return d
1583         d.addCallback(_check_put)
1584
1585         def _get_to_stdout(res):
1586             argv = ["get"] + nodeargs + ["test_put/upload.txt"]
1587             return self._run_cli(argv)
1588         d.addCallback(_get_to_stdout)
1589         def _check_get_to_stdout((out,err)):
1590             self.failUnlessEqual(out, TESTDATA)
1591             self.failUnlessEqual(err, "")
1592         d.addCallback(_check_get_to_stdout)
1593
1594         get_to_file_target = self.basedir + "/get.downfile"
1595         def _get_to_file(res):
1596             argv = ["get"] + nodeargs + ["test_put/upload.txt",
1597                                          get_to_file_target]
1598             return self._run_cli(argv)
1599         d.addCallback(_get_to_file)
1600         def _check_get_to_file((out,err)):
1601             data = open(get_to_file_target, "rb").read()
1602             self.failUnlessEqual(data, TESTDATA)
1603             self.failUnlessEqual(out, "")
1604             self.failUnlessEqual(err, "test_put/upload.txt retrieved and written to system/SystemTest/test_vdrive/get.downfile\n")
1605         d.addCallback(_check_get_to_file)
1606
1607
1608         def _mv(res):
1609             argv = ["mv"] + nodeargs + ["test_put/upload.txt",
1610                                         "test_put/moved.txt"]
1611             return self._run_cli(argv)
1612         d.addCallback(_mv)
1613         def _check_mv((out,err)):
1614             self.failUnless("OK" in out)
1615             self.failUnlessEqual(err, "")
1616             d = self.shouldFail2(KeyError, "test_cli._check_rm", "'upload.txt'", self._private_node.get_child_at_path, u"test_put/upload.txt")
1617
1618             d.addCallback(lambda res:
1619                           self._private_node.get_child_at_path(u"test_put/moved.txt"))
1620             d.addCallback(lambda filenode: filenode.download_to_data())
1621             def _check_mv2(res):
1622                 self.failUnlessEqual(res, TESTDATA)
1623             d.addCallback(_check_mv2)
1624             return d
1625         d.addCallback(_check_mv)
1626
1627         def _rm(res):
1628             argv = ["rm"] + nodeargs + ["test_put/moved.txt"]
1629             return self._run_cli(argv)
1630         d.addCallback(_rm)
1631         def _check_rm((out,err)):
1632             self.failUnless("200 OK" in out)
1633             self.failUnlessEqual(err, "")
1634             d = self.shouldFail2(KeyError, "test_cli._check_rm", "'moved.txt'", self._private_node.get_child_at_path, u"test_put/moved.txt")
1635             return d
1636         d.addCallback(_check_rm)
1637         return d
1638
1639     def _run_cli(self, argv):
1640         stdout, stderr = StringIO(), StringIO()
1641         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1642                                   stdout=stdout, stderr=stderr)
1643         def _done(res):
1644             return stdout.getvalue(), stderr.getvalue()
1645         d.addCallback(_done)
1646         return d
1647
1648     def _test_checker(self, res):
1649         d = self._private_node.build_manifest()
1650         d.addCallback(self._test_checker_2)
1651         return d
1652
1653     def _test_checker_2(self, manifest):
1654         checker1 = self.clients[1].getServiceNamed("checker")
1655         self.failUnlessEqual(checker1.checker_results_for(None), [])
1656         self.failUnlessEqual(checker1.checker_results_for(list(manifest)[0]),
1657                              [])
1658         dl = []
1659         starting_time = time.time()
1660         for si in manifest:
1661             dl.append(checker1.check(si))
1662         d = deferredutil.DeferredListShouldSucceed(dl)
1663
1664         def _check_checker_results(res):
1665             for i in res:
1666                 if type(i) is bool:
1667                     self.failUnless(i is True)
1668                 else:
1669                     (needed, total, found, sharemap) = i
1670                     self.failUnlessEqual(needed, 3)
1671                     self.failUnlessEqual(total, 10)
1672                     self.failUnlessEqual(found, total)
1673                     self.failUnlessEqual(len(sharemap.keys()), 10)
1674                     peers = set()
1675                     for shpeers in sharemap.values():
1676                         peers.update(shpeers)
1677                     self.failUnlessEqual(len(peers), self.numclients)
1678         d.addCallback(_check_checker_results)
1679
1680         def _check_stored_results(res):
1681             finish_time = time.time()
1682             all_results = []
1683             for si in manifest:
1684                 results = checker1.checker_results_for(si)
1685                 if not results:
1686                     # TODO: implement checker for mutable files and implement tests of that checker
1687                     continue
1688                 self.failUnlessEqual(len(results), 1)
1689                 when, those_results = results[0]
1690                 self.failUnless(isinstance(when, (int, float)))
1691                 self.failUnless(starting_time <= when <= finish_time)
1692                 all_results.append(those_results)
1693             _check_checker_results(all_results)
1694         d.addCallback(_check_stored_results)
1695
1696         d.addCallback(self._test_checker_3)
1697         return d
1698
1699     def _test_checker_3(self, res):
1700         # check one file, through FileNode.check()
1701         d = self._private_node.get_child_at_path(u"personal/sekrit data")
1702         d.addCallback(lambda n: n.check())
1703         def _checked(results):
1704             # 'sekrit data' is small, and fits in a LiteralFileNode, so
1705             # checking it is trivial and always returns True
1706             self.failUnlessEqual(results, True)
1707         d.addCallback(_checked)
1708
1709         c0 = self.clients[1]
1710         n = c0.create_node_from_uri(self._root_directory_uri)
1711         d.addCallback(lambda res: n.get_child_at_path(u"subdir1/mydata567"))
1712         d.addCallback(lambda n: n.check())
1713         def _checked2(results):
1714             # mydata567 is large and lives in a CHK
1715             (needed, total, found, sharemap) = results
1716             self.failUnlessEqual(needed, 3)
1717             self.failUnlessEqual(total, 10)
1718             self.failUnlessEqual(found, 10)
1719             self.failUnlessEqual(len(sharemap), 10)
1720             for shnum in range(10):
1721                 self.failUnlessEqual(len(sharemap[shnum]), 1)
1722         d.addCallback(_checked2)
1723         return d
1724
1725
1726     def _test_verifier(self, res):
1727         checker1 = self.clients[1].getServiceNamed("checker")
1728         d = self._private_node.build_manifest()
1729         def _check_all(manifest):
1730             dl = []
1731             for si in manifest:
1732                 dl.append(checker1.verify(si))
1733             return deferredutil.DeferredListShouldSucceed(dl)
1734         d.addCallback(_check_all)
1735         def _done(res):
1736             for i in res:
1737                 self.failUnless(i is True)
1738         d.addCallback(_done)
1739         d.addCallback(lambda res: checker1.verify(None))
1740         d.addCallback(self.failUnlessEqual, True)
1741         return d