]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
CLI: remove the '-r' shortcut for --dir-cap, to make it available for cp -r
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1
2 from base64 import b32encode
3 import os, sys, time, re, simplejson
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.application import service
10 import allmydata
11 from allmydata import client, uri, download, upload, storage, offloaded
12 from allmydata.introducer import IntroducerNode
13 from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
14 from allmydata.util import log
15 from allmydata.scripts import runner, cli
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
17 from allmydata.mutable.common import NotMutableError
18 from allmydata.mutable import layout as mutable_layout
19 from allmydata.stats import PickleStatsGatherer
20 from allmydata.key_generator import KeyGeneratorService
21 from foolscap.eventual import flushEventualQueue, fireEventually
22 from foolscap import DeadReferenceError, Tub
23 from twisted.python.failure import Failure
24 from twisted.web.client import getPage
25 from twisted.web.error import Error
26
27 def flush_but_dont_ignore(res):
28     d = flushEventualQueue()
29     def _done(ignored):
30         return res
31     d.addCallback(_done)
32     return d
33
34 LARGE_DATA = """
35 This is some data to publish to the virtual drive, which needs to be large
36 enough to not fit inside a LIT uri.
37 """
38
39 class CountingDataUploadable(upload.Data):
40     bytes_read = 0
41     interrupt_after = None
42     interrupt_after_d = None
43
44     def read(self, length):
45         self.bytes_read += length
46         if self.interrupt_after is not None:
47             if self.bytes_read > self.interrupt_after:
48                 self.interrupt_after = None
49                 self.interrupt_after_d.callback(self)
50         return upload.Data.read(self, length)
51
52
53 class SystemTest(testutil.SignalMixin, testutil.PollMixin, testutil.StallMixin,
54                  unittest.TestCase):
55
56     def setUp(self):
57         self.sparent = service.MultiService()
58         self.sparent.startService()
59     def tearDown(self):
60         log.msg("shutting down SystemTest services")
61         d = self.sparent.stopService()
62         d.addBoth(flush_but_dont_ignore)
63         return d
64
65     def getdir(self, subdir):
66         return os.path.join(self.basedir, subdir)
67
68     def add_service(self, s):
69         s.setServiceParent(self.sparent)
70         return s
71
72     def set_up_nodes(self, NUMCLIENTS=5):
73         self.numclients = NUMCLIENTS
74         iv_dir = self.getdir("introducer")
75         if not os.path.isdir(iv_dir):
76             fileutil.make_dirs(iv_dir)
77         f = open(os.path.join(iv_dir, "webport"), "w")
78         f.write("tcp:0:interface=127.0.0.1\n")
79         f.close()
80         iv = IntroducerNode(basedir=iv_dir)
81         self.introducer = self.add_service(iv)
82         d = self.introducer.when_tub_ready()
83         d.addCallback(self._get_introducer_web)
84         d.addCallback(self._set_up_stats_gatherer)
85         d.addCallback(self._set_up_key_generator)
86         d.addCallback(self._set_up_nodes_2)
87         d.addCallback(self._grab_stats)
88         return d
89
90     def _get_introducer_web(self, res):
91         f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
92         self.introweb_url = f.read().strip()
93         f.close()
94
95     def _set_up_stats_gatherer(self, res):
96         statsdir = self.getdir("stats_gatherer")
97         fileutil.make_dirs(statsdir)
98         t = Tub()
99         self.add_service(t)
100         l = t.listenOn("tcp:0")
101         p = l.getPortnum()
102         t.setLocation("localhost:%d" % p)
103
104         self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
105         self.add_service(self.stats_gatherer)
106         self.stats_gatherer_furl = self.stats_gatherer.get_furl()
107
108     def _set_up_key_generator(self, res):
109         kgsdir = self.getdir("key_generator")
110         fileutil.make_dirs(kgsdir)
111
112         self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False)
113         self.key_generator_svc.key_generator.pool_size = 4
114         self.key_generator_svc.key_generator.pool_refresh_delay = 60
115         self.add_service(self.key_generator_svc)
116
117         d = fireEventually()
118         def check_for_furl():
119             return os.path.exists(os.path.join(kgsdir, 'key_generator.furl'))
120         d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
121         def get_furl(junk):
122             kgf = os.path.join(kgsdir, 'key_generator.furl')
123             self.key_generator_furl = file(kgf, 'rb').read().strip()
124         d.addCallback(get_furl)
125         return d
126
127     def _set_up_nodes_2(self, res):
128         q = self.introducer
129         self.introducer_furl = q.introducer_url
130         self.clients = []
131         basedirs = []
132         for i in range(self.numclients):
133             basedir = self.getdir("client%d" % i)
134             basedirs.append(basedir)
135             fileutil.make_dirs(basedir)
136             if i == 0:
137                 # client[0] runs a webserver and a helper, no key_generator
138                 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
139                 open(os.path.join(basedir, "run_helper"), "w").write("yes\n")
140                 open(os.path.join(basedir, "sizelimit"), "w").write("10GB\n")
141             if i == 3:
142                 # client[3] runs a webserver and uses a helper, uses key_generator
143                 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
144                 kgf = "%s\n" % (self.key_generator_furl,)
145                 open(os.path.join(basedir, "key_generator.furl"), "w").write(kgf)
146             open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
147             open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
148
149         # start client[0], wait for it's tub to be ready (at which point it
150         # will have registered the helper furl).
151         c = self.add_service(client.Client(basedir=basedirs[0]))
152         self.clients.append(c)
153         d = c.when_tub_ready()
154         def _ready(res):
155             f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
156             helper_furl = f.read()
157             f.close()
158             self.helper_furl = helper_furl
159             f = open(os.path.join(basedirs[3],"helper.furl"), "w")
160             f.write(helper_furl)
161             f.close()
162
163             # this starts the rest of the clients
164             for i in range(1, self.numclients):
165                 c = self.add_service(client.Client(basedir=basedirs[i]))
166                 self.clients.append(c)
167             log.msg("STARTING")
168             return self.wait_for_connections()
169         d.addCallback(_ready)
170         def _connected(res):
171             log.msg("CONNECTED")
172             # now find out where the web port was
173             l = self.clients[0].getServiceNamed("webish").listener
174             port = l._port.getHost().port
175             self.webish_url = "http://localhost:%d/" % port
176             # and the helper-using webport
177             l = self.clients[3].getServiceNamed("webish").listener
178             port = l._port.getHost().port
179             self.helper_webish_url = "http://localhost:%d/" % port
180         d.addCallback(_connected)
181         return d
182
183     def _grab_stats(self, res):
184         d = self.stats_gatherer.poll()
185         return d
186
187     def bounce_client(self, num):
188         c = self.clients[num]
189         d = c.disownServiceParent()
190         # I think windows requires a moment to let the connection really stop
191         # and the port number made available for re-use. TODO: examine the
192         # behavior, see if this is really the problem, see if we can do
193         # better than blindly waiting for a second.
194         d.addCallback(self.stall, 1.0)
195         def _stopped(res):
196             new_c = client.Client(basedir=self.getdir("client%d" % num))
197             self.clients[num] = new_c
198             self.add_service(new_c)
199             return new_c.when_tub_ready()
200         d.addCallback(_stopped)
201         d.addCallback(lambda res: self.wait_for_connections())
202         def _maybe_get_webport(res):
203             if num == 0:
204                 # now find out where the web port was
205                 l = self.clients[0].getServiceNamed("webish").listener
206                 port = l._port.getHost().port
207                 self.webish_url = "http://localhost:%d/" % port
208         d.addCallback(_maybe_get_webport)
209         return d
210
211     def add_extra_node(self, client_num, helper_furl=None,
212                        add_to_sparent=False):
213         # usually this node is *not* parented to our self.sparent, so we can
214         # shut it down separately from the rest, to exercise the
215         # connection-lost code
216         basedir = self.getdir("client%d" % client_num)
217         if not os.path.isdir(basedir):
218             fileutil.make_dirs(basedir)
219         open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
220         if helper_furl:
221             f = open(os.path.join(basedir, "helper.furl") ,"w")
222             f.write(helper_furl+"\n")
223             f.close()
224
225         c = client.Client(basedir=basedir)
226         self.clients.append(c)
227         self.numclients += 1
228         if add_to_sparent:
229             c.setServiceParent(self.sparent)
230         else:
231             c.startService()
232         d = self.wait_for_connections()
233         d.addCallback(lambda res: c)
234         return d
235
236     def _check_connections(self):
237         for c in self.clients:
238             ic = c.introducer_client
239             if not ic.connected_to_introducer():
240                 return False
241             if len(ic.get_all_peerids()) != self.numclients:
242                 return False
243         return True
244
245     def wait_for_connections(self, ignored=None):
246         # TODO: replace this with something that takes a list of peerids and
247         # fires when they've all been heard from, instead of using a count
248         # and a threshold
249         return self.poll(self._check_connections, timeout=200)
250
251     def test_connections(self):
252         self.basedir = "system/SystemTest/test_connections"
253         d = self.set_up_nodes()
254         self.extra_node = None
255         d.addCallback(lambda res: self.add_extra_node(self.numclients))
256         def _check(extra_node):
257             self.extra_node = extra_node
258             for c in self.clients:
259                 all_peerids = list(c.get_all_peerids())
260                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
261                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
262                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
263
264         d.addCallback(_check)
265         def _shutdown_extra_node(res):
266             if self.extra_node:
267                 return self.extra_node.stopService()
268             return res
269         d.addBoth(_shutdown_extra_node)
270         return d
271     test_connections.timeout = 300
272     # test_connections is subsumed by test_upload_and_download, and takes
273     # quite a while to run on a slow machine (because of all the TLS
274     # connections that must be established). If we ever rework the introducer
275     # code to such an extent that we're not sure if it works anymore, we can
276     # reinstate this test until it does.
277     del test_connections
278
279     def test_upload_and_download_random_key(self):
280         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
281         return self._test_upload_and_download(convergence=None)
282     test_upload_and_download_random_key.timeout = 4800
283
284     def test_upload_and_download_convergent(self):
285         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
286         return self._test_upload_and_download(convergence="some convergence string")
287     test_upload_and_download_convergent.timeout = 4800
288
289     def _test_upload_and_download(self, convergence):
290         # we use 4000 bytes of data, which will result in about 400k written
291         # to disk among all our simulated nodes
292         DATA = "Some data to upload\n" * 200
293         d = self.set_up_nodes()
294         def _check_connections(res):
295             for c in self.clients:
296                 all_peerids = list(c.get_all_peerids())
297                 self.failUnlessEqual(len(all_peerids), self.numclients)
298                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
299                 self.failUnlessEqual(len(permuted_peers), self.numclients)
300         d.addCallback(_check_connections)
301
302         def _do_upload(res):
303             log.msg("UPLOADING")
304             u = self.clients[0].getServiceNamed("uploader")
305             self.uploader = u
306             # we crank the max segsize down to 1024b for the duration of this
307             # test, so we can exercise multiple segments. It is important
308             # that this is not a multiple of the segment size, so that the
309             # tail segment is not the same length as the others. This actualy
310             # gets rounded up to 1025 to be a multiple of the number of
311             # required shares (since we use 25 out of 100 FEC).
312             up = upload.Data(DATA, convergence=convergence)
313             up.max_segment_size = 1024
314             d1 = u.upload(up)
315             return d1
316         d.addCallback(_do_upload)
317         def _upload_done(results):
318             uri = results.uri
319             log.msg("upload finished: uri is %s" % (uri,))
320             self.uri = uri
321             dl = self.clients[1].getServiceNamed("downloader")
322             self.downloader = dl
323         d.addCallback(_upload_done)
324
325         def _upload_again(res):
326             # Upload again. If using convergent encryption then this ought to be
327             # short-circuited, however with the way we currently generate URIs
328             # (i.e. because they include the roothash), we have to do all of the
329             # encoding work, and only get to save on the upload part.
330             log.msg("UPLOADING AGAIN")
331             up = upload.Data(DATA, convergence=convergence)
332             up.max_segment_size = 1024
333             d1 = self.uploader.upload(up)
334         d.addCallback(_upload_again)
335
336         def _download_to_data(res):
337             log.msg("DOWNLOADING")
338             return self.downloader.download_to_data(self.uri)
339         d.addCallback(_download_to_data)
340         def _download_to_data_done(data):
341             log.msg("download finished")
342             self.failUnlessEqual(data, DATA)
343         d.addCallback(_download_to_data_done)
344
345         target_filename = os.path.join(self.basedir, "download.target")
346         def _download_to_filename(res):
347             return self.downloader.download_to_filename(self.uri,
348                                                         target_filename)
349         d.addCallback(_download_to_filename)
350         def _download_to_filename_done(res):
351             newdata = open(target_filename, "rb").read()
352             self.failUnlessEqual(newdata, DATA)
353         d.addCallback(_download_to_filename_done)
354
355         target_filename2 = os.path.join(self.basedir, "download.target2")
356         def _download_to_filehandle(res):
357             fh = open(target_filename2, "wb")
358             return self.downloader.download_to_filehandle(self.uri, fh)
359         d.addCallback(_download_to_filehandle)
360         def _download_to_filehandle_done(fh):
361             fh.close()
362             newdata = open(target_filename2, "rb").read()
363             self.failUnlessEqual(newdata, DATA)
364         d.addCallback(_download_to_filehandle_done)
365
366         def _download_nonexistent_uri(res):
367             baduri = self.mangle_uri(self.uri)
368             log.msg("about to download non-existent URI", level=log.UNUSUAL,
369                     facility="tahoe.tests")
370             d1 = self.downloader.download_to_data(baduri)
371             def _baduri_should_fail(res):
372                 log.msg("finished downloading non-existend URI",
373                         level=log.UNUSUAL, facility="tahoe.tests")
374                 self.failUnless(isinstance(res, Failure))
375                 self.failUnless(res.check(download.NotEnoughSharesError),
376                                 "expected NotEnoughSharesError, got %s" % res)
377                 # TODO: files that have zero peers should get a special kind
378                 # of NotEnoughSharesError, which can be used to suggest that
379                 # the URI might be wrong or that they've never uploaded the
380                 # file in the first place.
381             d1.addBoth(_baduri_should_fail)
382             return d1
383         d.addCallback(_download_nonexistent_uri)
384
385         # add a new node, which doesn't accept shares, and only uses the
386         # helper for upload.
387         d.addCallback(lambda res: self.add_extra_node(self.numclients,
388                                                       self.helper_furl,
389                                                       add_to_sparent=True))
390         def _added(extra_node):
391             self.extra_node = extra_node
392             extra_node.getServiceNamed("storage").sizelimit = 0
393         d.addCallback(_added)
394
395         HELPER_DATA = "Data that needs help to upload" * 1000
396         def _upload_with_helper(res):
397             u = upload.Data(HELPER_DATA, convergence=convergence)
398             d = self.extra_node.upload(u)
399             def _uploaded(results):
400                 uri = results.uri
401                 return self.downloader.download_to_data(uri)
402             d.addCallback(_uploaded)
403             def _check(newdata):
404                 self.failUnlessEqual(newdata, HELPER_DATA)
405             d.addCallback(_check)
406             return d
407         d.addCallback(_upload_with_helper)
408
409         def _upload_duplicate_with_helper(res):
410             u = upload.Data(HELPER_DATA, convergence=convergence)
411             u.debug_stash_RemoteEncryptedUploadable = True
412             d = self.extra_node.upload(u)
413             def _uploaded(results):
414                 uri = results.uri
415                 return self.downloader.download_to_data(uri)
416             d.addCallback(_uploaded)
417             def _check(newdata):
418                 self.failUnlessEqual(newdata, HELPER_DATA)
419                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
420                             "uploadable started uploading, should have been avoided")
421             d.addCallback(_check)
422             return d
423         if convergence is not None:
424             d.addCallback(_upload_duplicate_with_helper)
425
426         def _upload_resumable(res):
427             DATA = "Data that needs help to upload and gets interrupted" * 1000
428             u1 = CountingDataUploadable(DATA, convergence=convergence)
429             u2 = CountingDataUploadable(DATA, convergence=convergence)
430
431             # we interrupt the connection after about 5kB by shutting down
432             # the helper, then restartingit.
433             u1.interrupt_after = 5000
434             u1.interrupt_after_d = defer.Deferred()
435             u1.interrupt_after_d.addCallback(lambda res:
436                                              self.bounce_client(0))
437
438             # sneak into the helper and reduce its chunk size, so that our
439             # debug_interrupt will sever the connection on about the fifth
440             # chunk fetched. This makes sure that we've started to write the
441             # new shares before we abandon them, which exercises the
442             # abort/delete-partial-share code. TODO: find a cleaner way to do
443             # this. I know that this will affect later uses of the helper in
444             # this same test run, but I'm not currently worried about it.
445             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
446
447             d = self.extra_node.upload(u1)
448
449             def _should_not_finish(res):
450                 self.fail("interrupted upload should have failed, not finished"
451                           " with result %s" % (res,))
452             def _interrupted(f):
453                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
454
455                 # make sure we actually interrupted it before finishing the
456                 # file
457                 self.failUnless(u1.bytes_read < len(DATA),
458                                 "read %d out of %d total" % (u1.bytes_read,
459                                                              len(DATA)))
460
461                 log.msg("waiting for reconnect", level=log.NOISY,
462                         facility="tahoe.test.test_system")
463                 # now, we need to give the nodes a chance to notice that this
464                 # connection has gone away. When this happens, the storage
465                 # servers will be told to abort their uploads, removing the
466                 # partial shares. Unfortunately this involves TCP messages
467                 # going through the loopback interface, and we can't easily
468                 # predict how long that will take. If it were all local, we
469                 # could use fireEventually() to stall. Since we don't have
470                 # the right introduction hooks, the best we can do is use a
471                 # fixed delay. TODO: this is fragile.
472                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
473                 return u1.interrupt_after_d
474             d.addCallbacks(_should_not_finish, _interrupted)
475
476             def _disconnected(res):
477                 # check to make sure the storage servers aren't still hanging
478                 # on to the partial share: their incoming/ directories should
479                 # now be empty.
480                 log.msg("disconnected", level=log.NOISY,
481                         facility="tahoe.test.test_system")
482                 for i in range(self.numclients):
483                     incdir = os.path.join(self.getdir("client%d" % i),
484                                           "storage", "shares", "incoming")
485                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
486             d.addCallback(_disconnected)
487
488             # then we need to give the reconnector a chance to
489             # reestablish the connection to the helper.
490             d.addCallback(lambda res:
491                           log.msg("wait_for_connections", level=log.NOISY,
492                                   facility="tahoe.test.test_system"))
493             d.addCallback(lambda res: self.wait_for_connections())
494
495
496             d.addCallback(lambda res:
497                           log.msg("uploading again", level=log.NOISY,
498                                   facility="tahoe.test.test_system"))
499             d.addCallback(lambda res: self.extra_node.upload(u2))
500
501             def _uploaded(results):
502                 uri = results.uri
503                 log.msg("Second upload complete", level=log.NOISY,
504                         facility="tahoe.test.test_system")
505
506                 # this is really bytes received rather than sent, but it's
507                 # convenient and basically measures the same thing
508                 bytes_sent = results.ciphertext_fetched
509
510                 # We currently don't support resumption of upload if the data is
511                 # encrypted with a random key.  (Because that would require us
512                 # to store the key locally and re-use it on the next upload of
513                 # this file, which isn't a bad thing to do, but we currently
514                 # don't do it.)
515                 if convergence is not None:
516                     # Make sure we did not have to read the whole file the
517                     # second time around .
518                     self.failUnless(bytes_sent < len(DATA),
519                                 "resumption didn't save us any work:"
520                                 " read %d bytes out of %d total" %
521                                 (bytes_sent, len(DATA)))
522                 else:
523                     # Make sure we did have to read the whole file the second
524                     # time around -- because the one that we partially uploaded
525                     # earlier was encrypted with a different random key.
526                     self.failIf(bytes_sent < len(DATA),
527                                 "resumption saved us some work even though we were using random keys:"
528                                 " read %d bytes out of %d total" %
529                                 (bytes_sent, len(DATA)))
530                 return self.downloader.download_to_data(uri)
531             d.addCallback(_uploaded)
532
533             def _check(newdata):
534                 self.failUnlessEqual(newdata, DATA)
535                 # If using convergent encryption, then also check that the
536                 # helper has removed the temp file from its directories.
537                 if convergence is not None:
538                     basedir = os.path.join(self.getdir("client0"), "helper")
539                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
540                     self.failUnlessEqual(files, [])
541                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
542                     self.failUnlessEqual(files, [])
543             d.addCallback(_check)
544             return d
545         d.addCallback(_upload_resumable)
546
547         return d
548
549     def _find_shares(self, basedir):
550         shares = []
551         for (dirpath, dirnames, filenames) in os.walk(basedir):
552             if "storage" not in dirpath:
553                 continue
554             if not filenames:
555                 continue
556             pieces = dirpath.split(os.sep)
557             if pieces[-4] == "storage" and pieces[-3] == "shares":
558                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
559                 # are sharefiles here
560                 assert pieces[-5].startswith("client")
561                 client_num = int(pieces[-5][-1])
562                 storage_index_s = pieces[-1]
563                 storage_index = storage.si_a2b(storage_index_s)
564                 for sharename in filenames:
565                     shnum = int(sharename)
566                     filename = os.path.join(dirpath, sharename)
567                     data = (client_num, storage_index, filename, shnum)
568                     shares.append(data)
569         if not shares:
570             self.fail("unable to find any share files in %s" % basedir)
571         return shares
572
573     def _corrupt_mutable_share(self, filename, which):
574         msf = storage.MutableShareFile(filename)
575         datav = msf.readv([ (0, 1000000) ])
576         final_share = datav[0]
577         assert len(final_share) < 1000000 # ought to be truncated
578         pieces = mutable_layout.unpack_share(final_share)
579         (seqnum, root_hash, IV, k, N, segsize, datalen,
580          verification_key, signature, share_hash_chain, block_hash_tree,
581          share_data, enc_privkey) = pieces
582
583         if which == "seqnum":
584             seqnum = seqnum + 15
585         elif which == "R":
586             root_hash = self.flip_bit(root_hash)
587         elif which == "IV":
588             IV = self.flip_bit(IV)
589         elif which == "segsize":
590             segsize = segsize + 15
591         elif which == "pubkey":
592             verification_key = self.flip_bit(verification_key)
593         elif which == "signature":
594             signature = self.flip_bit(signature)
595         elif which == "share_hash_chain":
596             nodenum = share_hash_chain.keys()[0]
597             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
598         elif which == "block_hash_tree":
599             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
600         elif which == "share_data":
601             share_data = self.flip_bit(share_data)
602         elif which == "encprivkey":
603             enc_privkey = self.flip_bit(enc_privkey)
604
605         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
606                                             segsize, datalen)
607         final_share = mutable_layout.pack_share(prefix,
608                                                 verification_key,
609                                                 signature,
610                                                 share_hash_chain,
611                                                 block_hash_tree,
612                                                 share_data,
613                                                 enc_privkey)
614         msf.writev( [(0, final_share)], None)
615
616     def test_mutable(self):
617         self.basedir = "system/SystemTest/test_mutable"
618         DATA = "initial contents go here."  # 25 bytes % 3 != 0
619         NEWDATA = "new contents yay"
620         NEWERDATA = "this is getting old"
621
622         d = self.set_up_nodes()
623
624         def _create_mutable(res):
625             c = self.clients[0]
626             log.msg("starting create_mutable_file")
627             d1 = c.create_mutable_file(DATA)
628             def _done(res):
629                 log.msg("DONE: %s" % (res,))
630                 self._mutable_node_1 = res
631                 uri = res.get_uri()
632             d1.addCallback(_done)
633             return d1
634         d.addCallback(_create_mutable)
635
636         def _test_debug(res):
637             # find a share. It is important to run this while there is only
638             # one slot in the grid.
639             shares = self._find_shares(self.basedir)
640             (client_num, storage_index, filename, shnum) = shares[0]
641             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
642                     % filename)
643             log.msg(" for clients[%d]" % client_num)
644
645             out,err = StringIO(), StringIO()
646             rc = runner.runner(["dump-share",
647                                 filename],
648                                stdout=out, stderr=err)
649             output = out.getvalue()
650             self.failUnlessEqual(rc, 0)
651             try:
652                 self.failUnless("Mutable slot found:\n" in output)
653                 self.failUnless("share_type: SDMF\n" in output)
654                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
655                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
656                 self.failUnless(" num_extra_leases: 0\n" in output)
657                 # the pubkey size can vary by a byte, so the container might
658                 # be a bit larger on some runs.
659                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
660                 self.failUnless(m)
661                 container_size = int(m.group(1))
662                 self.failUnless(2037 <= container_size <= 2049, container_size)
663                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
664                 self.failUnless(m)
665                 data_length = int(m.group(1))
666                 self.failUnless(2037 <= data_length <= 2049, data_length)
667                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
668                                 in output)
669                 self.failUnless(" SDMF contents:\n" in output)
670                 self.failUnless("  seqnum: 1\n" in output)
671                 self.failUnless("  required_shares: 3\n" in output)
672                 self.failUnless("  total_shares: 10\n" in output)
673                 self.failUnless("  segsize: 27\n" in output, (output, filename))
674                 self.failUnless("  datalen: 25\n" in output)
675                 # the exact share_hash_chain nodes depends upon the sharenum,
676                 # and is more of a hassle to compute than I want to deal with
677                 # now
678                 self.failUnless("  share_hash_chain: " in output)
679                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
680             except unittest.FailTest:
681                 print
682                 print "dump-share output was:"
683                 print output
684                 raise
685         d.addCallback(_test_debug)
686
687         # test retrieval
688
689         # first, let's see if we can use the existing node to retrieve the
690         # contents. This allows it to use the cached pubkey and maybe the
691         # latest-known sharemap.
692
693         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
694         def _check_download_1(res):
695             self.failUnlessEqual(res, DATA)
696             # now we see if we can retrieve the data from a new node,
697             # constructed using the URI of the original one. We do this test
698             # on the same client that uploaded the data.
699             uri = self._mutable_node_1.get_uri()
700             log.msg("starting retrieve1")
701             newnode = self.clients[0].create_node_from_uri(uri)
702             newnode_2 = self.clients[0].create_node_from_uri(uri)
703             self.failUnlessIdentical(newnode, newnode_2)
704             return newnode.download_best_version()
705         d.addCallback(_check_download_1)
706
707         def _check_download_2(res):
708             self.failUnlessEqual(res, DATA)
709             # same thing, but with a different client
710             uri = self._mutable_node_1.get_uri()
711             newnode = self.clients[1].create_node_from_uri(uri)
712             log.msg("starting retrieve2")
713             d1 = newnode.download_best_version()
714             d1.addCallback(lambda res: (res, newnode))
715             return d1
716         d.addCallback(_check_download_2)
717
718         def _check_download_3((res, newnode)):
719             self.failUnlessEqual(res, DATA)
720             # replace the data
721             log.msg("starting replace1")
722             d1 = newnode.overwrite(NEWDATA)
723             d1.addCallback(lambda res: newnode.download_best_version())
724             return d1
725         d.addCallback(_check_download_3)
726
727         def _check_download_4(res):
728             self.failUnlessEqual(res, NEWDATA)
729             # now create an even newer node and replace the data on it. This
730             # new node has never been used for download before.
731             uri = self._mutable_node_1.get_uri()
732             newnode1 = self.clients[2].create_node_from_uri(uri)
733             newnode2 = self.clients[3].create_node_from_uri(uri)
734             self._newnode3 = self.clients[3].create_node_from_uri(uri)
735             log.msg("starting replace2")
736             d1 = newnode1.overwrite(NEWERDATA)
737             d1.addCallback(lambda res: newnode2.download_best_version())
738             return d1
739         d.addCallback(_check_download_4)
740
741         def _check_download_5(res):
742             log.msg("finished replace2")
743             self.failUnlessEqual(res, NEWERDATA)
744         d.addCallback(_check_download_5)
745
746         def _corrupt_shares(res):
747             # run around and flip bits in all but k of the shares, to test
748             # the hash checks
749             shares = self._find_shares(self.basedir)
750             ## sort by share number
751             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
752             where = dict([ (shnum, filename)
753                            for (client_num, storage_index, filename, shnum)
754                            in shares ])
755             assert len(where) == 10 # this test is designed for 3-of-10
756             for shnum, filename in where.items():
757                 # shares 7,8,9 are left alone. read will check
758                 # (share_hash_chain, block_hash_tree, share_data). New
759                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
760                 # segsize, signature).
761                 if shnum == 0:
762                     # read: this will trigger "pubkey doesn't match
763                     # fingerprint".
764                     self._corrupt_mutable_share(filename, "pubkey")
765                     self._corrupt_mutable_share(filename, "encprivkey")
766                 elif shnum == 1:
767                     # triggers "signature is invalid"
768                     self._corrupt_mutable_share(filename, "seqnum")
769                 elif shnum == 2:
770                     # triggers "signature is invalid"
771                     self._corrupt_mutable_share(filename, "R")
772                 elif shnum == 3:
773                     # triggers "signature is invalid"
774                     self._corrupt_mutable_share(filename, "segsize")
775                 elif shnum == 4:
776                     self._corrupt_mutable_share(filename, "share_hash_chain")
777                 elif shnum == 5:
778                     self._corrupt_mutable_share(filename, "block_hash_tree")
779                 elif shnum == 6:
780                     self._corrupt_mutable_share(filename, "share_data")
781                 # other things to correct: IV, signature
782                 # 7,8,9 are left alone
783
784                 # note that initial_query_count=5 means that we'll hit the
785                 # first 5 servers in effectively random order (based upon
786                 # response time), so we won't necessarily ever get a "pubkey
787                 # doesn't match fingerprint" error (if we hit shnum>=1 before
788                 # shnum=0, we pull the pubkey from there). To get repeatable
789                 # specific failures, we need to set initial_query_count=1,
790                 # but of course that will change the sequencing behavior of
791                 # the retrieval process. TODO: find a reasonable way to make
792                 # this a parameter, probably when we expand this test to test
793                 # for one failure mode at a time.
794
795                 # when we retrieve this, we should get three signature
796                 # failures (where we've mangled seqnum, R, and segsize). The
797                 # pubkey mangling
798         d.addCallback(_corrupt_shares)
799
800         d.addCallback(lambda res: self._newnode3.download_best_version())
801         d.addCallback(_check_download_5)
802
803         def _check_empty_file(res):
804             # make sure we can create empty files, this usually screws up the
805             # segsize math
806             d1 = self.clients[2].create_mutable_file("")
807             d1.addCallback(lambda newnode: newnode.download_best_version())
808             d1.addCallback(lambda res: self.failUnlessEqual("", res))
809             return d1
810         d.addCallback(_check_empty_file)
811
812         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
813         def _created_dirnode(dnode):
814             log.msg("_created_dirnode(%s)" % (dnode,))
815             d1 = dnode.list()
816             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
817             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
818             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
819             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
820             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
821             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
822             d1.addCallback(lambda res: dnode.build_manifest())
823             d1.addCallback(lambda manifest:
824                            self.failUnlessEqual(len(manifest), 1))
825             return d1
826         d.addCallback(_created_dirnode)
827
828         def wait_for_c3_kg_conn():
829             return self.clients[3]._key_generator is not None
830         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
831
832         def check_kg_poolsize(junk, size_delta):
833             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
834                                  self.key_generator_svc.key_generator.pool_size + size_delta)
835
836         d.addCallback(check_kg_poolsize, 0)
837         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
838         d.addCallback(check_kg_poolsize, -1)
839         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
840         d.addCallback(check_kg_poolsize, -2)
841         # use_helper induces use of clients[3], which is the using-key_gen client
842         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
843         d.addCallback(check_kg_poolsize, -3)
844
845         return d
846     # The default 120 second timeout went off when running it under valgrind
847     # on my old Windows laptop, so I'm bumping up the timeout.
848     test_mutable.timeout = 240
849
850     def flip_bit(self, good):
851         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
852
853     def mangle_uri(self, gooduri):
854         # change the key, which changes the storage index, which means we'll
855         # be asking about the wrong file, so nobody will have any shares
856         u = IFileURI(gooduri)
857         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
858                             uri_extension_hash=u.uri_extension_hash,
859                             needed_shares=u.needed_shares,
860                             total_shares=u.total_shares,
861                             size=u.size)
862         return u2.to_string()
863
864     # TODO: add a test which mangles the uri_extension_hash instead, and
865     # should fail due to not being able to get a valid uri_extension block.
866     # Also a test which sneakily mangles the uri_extension block to change
867     # some of the validation data, so it will fail in the post-download phase
868     # when the file's crypttext integrity check fails. Do the same thing for
869     # the key, which should cause the download to fail the post-download
870     # plaintext_hash check.
871
872     def test_vdrive(self):
873         self.basedir = "system/SystemTest/test_vdrive"
874         self.data = LARGE_DATA
875         d = self.set_up_nodes()
876         d.addCallback(self._test_introweb)
877         d.addCallback(self.log, "starting publish")
878         d.addCallback(self._do_publish1)
879         d.addCallback(self._test_runner)
880         d.addCallback(self._do_publish2)
881         # at this point, we have the following filesystem (where "R" denotes
882         # self._root_directory_uri):
883         # R
884         # R/subdir1
885         # R/subdir1/mydata567
886         # R/subdir1/subdir2/
887         # R/subdir1/subdir2/mydata992
888
889         d.addCallback(lambda res: self.bounce_client(0))
890         d.addCallback(self.log, "bounced client0")
891
892         d.addCallback(self._check_publish1)
893         d.addCallback(self.log, "did _check_publish1")
894         d.addCallback(self._check_publish2)
895         d.addCallback(self.log, "did _check_publish2")
896         d.addCallback(self._do_publish_private)
897         d.addCallback(self.log, "did _do_publish_private")
898         # now we also have (where "P" denotes a new dir):
899         #  P/personal/sekrit data
900         #  P/s2-rw -> /subdir1/subdir2/
901         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
902         d.addCallback(self._check_publish_private)
903         d.addCallback(self.log, "did _check_publish_private")
904         d.addCallback(self._test_web)
905         d.addCallback(self._test_control)
906         d.addCallback(self._test_cli)
907         # P now has four top-level children:
908         # P/personal/sekrit data
909         # P/s2-ro/
910         # P/s2-rw/
911         # P/test_put/  (empty)
912         d.addCallback(self._test_checker)
913         d.addCallback(self._test_verifier)
914         d.addCallback(self._grab_stats)
915         return d
916     test_vdrive.timeout = 1100
917
918     def _test_introweb(self, res):
919         d = getPage(self.introweb_url, method="GET", followRedirect=True)
920         def _check(res):
921             try:
922                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
923                                 in res)
924                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
925                 self.failUnless("Subscription Summary: storage: 5" in res)
926             except unittest.FailTest:
927                 print
928                 print "GET %s output was:" % self.introweb_url
929                 print res
930                 raise
931         d.addCallback(_check)
932         d.addCallback(lambda res:
933                       getPage(self.introweb_url + "?t=json",
934                               method="GET", followRedirect=True))
935         def _check_json(res):
936             data = simplejson.loads(res)
937             try:
938                 self.failUnlessEqual(data["subscription_summary"],
939                                      {"storage": 5})
940                 self.failUnlessEqual(data["announcement_summary"],
941                                      {"storage": 5, "stub_client": 5})
942             except unittest.FailTest:
943                 print
944                 print "GET %s?t=json output was:" % self.introweb_url
945                 print res
946                 raise
947         d.addCallback(_check_json)
948         return d
949
950     def _do_publish1(self, res):
951         ut = upload.Data(self.data, convergence=None)
952         c0 = self.clients[0]
953         d = c0.create_empty_dirnode()
954         def _made_root(new_dirnode):
955             self._root_directory_uri = new_dirnode.get_uri()
956             return c0.create_node_from_uri(self._root_directory_uri)
957         d.addCallback(_made_root)
958         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
959         def _made_subdir1(subdir1_node):
960             self._subdir1_node = subdir1_node
961             d1 = subdir1_node.add_file(u"mydata567", ut)
962             d1.addCallback(self.log, "publish finished")
963             def _stash_uri(filenode):
964                 self.uri = filenode.get_uri()
965             d1.addCallback(_stash_uri)
966             return d1
967         d.addCallback(_made_subdir1)
968         return d
969
970     def _do_publish2(self, res):
971         ut = upload.Data(self.data, convergence=None)
972         d = self._subdir1_node.create_empty_directory(u"subdir2")
973         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
974         return d
975
976     def log(self, res, msg, **kwargs):
977         # print "MSG: %s  RES: %s" % (msg, res)
978         log.msg(msg, **kwargs)
979         return res
980
981     def _do_publish_private(self, res):
982         self.smalldata = "sssh, very secret stuff"
983         ut = upload.Data(self.smalldata, convergence=None)
984         d = self.clients[0].create_empty_dirnode()
985         d.addCallback(self.log, "GOT private directory")
986         def _got_new_dir(privnode):
987             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
988             d1 = privnode.create_empty_directory(u"personal")
989             d1.addCallback(self.log, "made P/personal")
990             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
991             d1.addCallback(self.log, "made P/personal/sekrit data")
992             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
993             def _got_s2(s2node):
994                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
995                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
996                 return d2
997             d1.addCallback(_got_s2)
998             d1.addCallback(lambda res: privnode)
999             return d1
1000         d.addCallback(_got_new_dir)
1001         return d
1002
1003     def _check_publish1(self, res):
1004         # this one uses the iterative API
1005         c1 = self.clients[1]
1006         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
1007         d.addCallback(self.log, "check_publish1 got /")
1008         d.addCallback(lambda root: root.get(u"subdir1"))
1009         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
1010         d.addCallback(lambda filenode: filenode.download_to_data())
1011         d.addCallback(self.log, "get finished")
1012         def _get_done(data):
1013             self.failUnlessEqual(data, self.data)
1014         d.addCallback(_get_done)
1015         return d
1016
1017     def _check_publish2(self, res):
1018         # this one uses the path-based API
1019         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
1020         d = rootnode.get_child_at_path(u"subdir1")
1021         d.addCallback(lambda dirnode:
1022                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
1023         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1024         d.addCallback(lambda filenode: filenode.download_to_data())
1025         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
1026
1027         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
1028         def _got_filenode(filenode):
1029             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
1030             assert fnode == filenode
1031         d.addCallback(_got_filenode)
1032         return d
1033
1034     def _check_publish_private(self, resnode):
1035         # this one uses the path-based API
1036         self._private_node = resnode
1037
1038         d = self._private_node.get_child_at_path(u"personal")
1039         def _got_personal(personal):
1040             self._personal_node = personal
1041             return personal
1042         d.addCallback(_got_personal)
1043
1044         d.addCallback(lambda dirnode:
1045                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
1046         def get_path(path):
1047             return self._private_node.get_child_at_path(path)
1048
1049         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
1050         d.addCallback(lambda filenode: filenode.download_to_data())
1051         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
1052         d.addCallback(lambda res: get_path(u"s2-rw"))
1053         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
1054         d.addCallback(lambda res: get_path(u"s2-ro"))
1055         def _got_s2ro(dirnode):
1056             self.failUnless(dirnode.is_mutable(), dirnode)
1057             self.failUnless(dirnode.is_readonly(), dirnode)
1058             d1 = defer.succeed(None)
1059             d1.addCallback(lambda res: dirnode.list())
1060             d1.addCallback(self.log, "dirnode.list")
1061
1062             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
1063
1064             d1.addCallback(self.log, "doing add_file(ro)")
1065             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
1066             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
1067
1068             d1.addCallback(self.log, "doing get(ro)")
1069             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
1070             d1.addCallback(lambda filenode:
1071                            self.failUnless(IFileNode.providedBy(filenode)))
1072
1073             d1.addCallback(self.log, "doing delete(ro)")
1074             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
1075
1076             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
1077
1078             d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
1079
1080             personal = self._personal_node
1081             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
1082
1083             d1.addCallback(self.log, "doing move_child_to(ro)2")
1084             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
1085
1086             d1.addCallback(self.log, "finished with _got_s2ro")
1087             return d1
1088         d.addCallback(_got_s2ro)
1089         def _got_home(dummy):
1090             home = self._private_node
1091             personal = self._personal_node
1092             d1 = defer.succeed(None)
1093             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
1094             d1.addCallback(lambda res:
1095                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
1096
1097             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
1098             d1.addCallback(lambda res:
1099                            home.move_child_to(u"sekrit", home, u"sekrit data"))
1100
1101             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
1102             d1.addCallback(lambda res:
1103                            home.move_child_to(u"sekrit data", personal))
1104
1105             d1.addCallback(lambda res: home.build_manifest())
1106             d1.addCallback(self.log, "manifest")
1107             #  four items:
1108             # P/personal/
1109             # P/personal/sekrit data
1110             # P/s2-rw  (same as P/s2-ro)
1111             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1112             d1.addCallback(lambda manifest:
1113                            self.failUnlessEqual(len(manifest), 4))
1114             d1.addCallback(lambda res: home.deep_stats())
1115             def _check_stats(stats):
1116                 expected = {"count-immutable-files": 1,
1117                             "count-mutable-files": 0,
1118                             "count-literal-files": 1,
1119                             "count-files": 2,
1120                             "count-directories": 3,
1121                             "size-immutable-files": 112,
1122                             "size-literal-files": 23,
1123                             #"size-directories": 616, # varies
1124                             #"largest-directory": 616,
1125                             "largest-directory-children": 3,
1126                             "largest-immutable-file": 112,
1127                             }
1128                 for k,v in expected.iteritems():
1129                     self.failUnlessEqual(stats[k], v,
1130                                          "stats[%s] was %s, not %s" %
1131                                          (k, stats[k], v))
1132                 self.failUnless(stats["size-directories"] > 1300,
1133                                 stats["size-directories"])
1134                 self.failUnless(stats["largest-directory"] > 800,
1135                                 stats["largest-directory"])
1136                 self.failUnlessEqual(stats["size-files-histogram"],
1137                                      [ (11, 31, 1), (101, 316, 1) ])
1138             d1.addCallback(_check_stats)
1139             return d1
1140         d.addCallback(_got_home)
1141         return d
1142
1143     def shouldFail(self, res, expected_failure, which, substring=None):
1144         if isinstance(res, Failure):
1145             res.trap(expected_failure)
1146             if substring:
1147                 self.failUnless(substring in str(res),
1148                                 "substring '%s' not in '%s'"
1149                                 % (substring, str(res)))
1150         else:
1151             self.fail("%s was supposed to raise %s, not get '%s'" %
1152                       (which, expected_failure, res))
1153
1154     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1155         assert substring is None or isinstance(substring, str)
1156         d = defer.maybeDeferred(callable, *args, **kwargs)
1157         def done(res):
1158             if isinstance(res, Failure):
1159                 res.trap(expected_failure)
1160                 if substring:
1161                     self.failUnless(substring in str(res),
1162                                     "substring '%s' not in '%s'"
1163                                     % (substring, str(res)))
1164             else:
1165                 self.fail("%s was supposed to raise %s, not get '%s'" %
1166                           (which, expected_failure, res))
1167         d.addBoth(done)
1168         return d
1169
1170     def PUT(self, urlpath, data):
1171         url = self.webish_url + urlpath
1172         return getPage(url, method="PUT", postdata=data)
1173
1174     def GET(self, urlpath, followRedirect=False):
1175         url = self.webish_url + urlpath
1176         return getPage(url, method="GET", followRedirect=followRedirect)
1177
1178     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1179         if use_helper:
1180             url = self.helper_webish_url + urlpath
1181         else:
1182             url = self.webish_url + urlpath
1183         sepbase = "boogabooga"
1184         sep = "--" + sepbase
1185         form = []
1186         form.append(sep)
1187         form.append('Content-Disposition: form-data; name="_charset"')
1188         form.append('')
1189         form.append('UTF-8')
1190         form.append(sep)
1191         for name, value in fields.iteritems():
1192             if isinstance(value, tuple):
1193                 filename, value = value
1194                 form.append('Content-Disposition: form-data; name="%s"; '
1195                             'filename="%s"' % (name, filename.encode("utf-8")))
1196             else:
1197                 form.append('Content-Disposition: form-data; name="%s"' % name)
1198             form.append('')
1199             form.append(str(value))
1200             form.append(sep)
1201         form[-1] += "--"
1202         body = "\r\n".join(form) + "\r\n"
1203         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1204                    }
1205         return getPage(url, method="POST", postdata=body,
1206                        headers=headers, followRedirect=followRedirect)
1207
1208     def _test_web(self, res):
1209         base = self.webish_url
1210         public = "uri/" + self._root_directory_uri
1211         d = getPage(base)
1212         def _got_welcome(page):
1213             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1214             self.failUnless(expected in page,
1215                             "I didn't see the right 'connected storage servers'"
1216                             " message in: %s" % page
1217                             )
1218             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1219             self.failUnless(expected in page,
1220                             "I didn't see the right 'My nodeid' message "
1221                             "in: %s" % page)
1222             self.failUnless("Helper: 0 active uploads" in page)
1223         d.addCallback(_got_welcome)
1224         d.addCallback(self.log, "done with _got_welcome")
1225
1226         # get the welcome page from the node that uses the helper too
1227         d.addCallback(lambda res: getPage(self.helper_webish_url))
1228         def _got_welcome_helper(page):
1229             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1230                             page)
1231             self.failUnless("Not running helper" in page)
1232         d.addCallback(_got_welcome_helper)
1233
1234         d.addCallback(lambda res: getPage(base + public))
1235         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1236         def _got_subdir1(page):
1237             # there ought to be an href for our file
1238             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1239             self.failUnless(">mydata567</a>" in page)
1240         d.addCallback(_got_subdir1)
1241         d.addCallback(self.log, "done with _got_subdir1")
1242         d.addCallback(lambda res:
1243                       getPage(base + public + "/subdir1/mydata567"))
1244         def _got_data(page):
1245             self.failUnlessEqual(page, self.data)
1246         d.addCallback(_got_data)
1247
1248         # download from a URI embedded in a URL
1249         d.addCallback(self.log, "_get_from_uri")
1250         def _get_from_uri(res):
1251             return getPage(base + "uri/%s?filename=%s"
1252                            % (self.uri, "mydata567"))
1253         d.addCallback(_get_from_uri)
1254         def _got_from_uri(page):
1255             self.failUnlessEqual(page, self.data)
1256         d.addCallback(_got_from_uri)
1257
1258         # download from a URI embedded in a URL, second form
1259         d.addCallback(self.log, "_get_from_uri2")
1260         def _get_from_uri2(res):
1261             return getPage(base + "uri?uri=%s" % (self.uri,))
1262         d.addCallback(_get_from_uri2)
1263         d.addCallback(_got_from_uri)
1264
1265         # download from a bogus URI, make sure we get a reasonable error
1266         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1267         def _get_from_bogus_uri(res):
1268             d1 = getPage(base + "uri/%s?filename=%s"
1269                          % (self.mangle_uri(self.uri), "mydata567"))
1270             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1271                        "410")
1272             return d1
1273         d.addCallback(_get_from_bogus_uri)
1274         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1275
1276         # upload a file with PUT
1277         d.addCallback(self.log, "about to try PUT")
1278         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1279                                            "new.txt contents"))
1280         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1281         d.addCallback(self.failUnlessEqual, "new.txt contents")
1282         # and again with something large enough to use multiple segments,
1283         # and hopefully trigger pauseProducing too
1284         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1285                                            "big" * 500000)) # 1.5MB
1286         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1287         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1288
1289         # can we replace files in place?
1290         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1291                                            "NEWER contents"))
1292         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1293         d.addCallback(self.failUnlessEqual, "NEWER contents")
1294
1295         # test unlinked POST
1296         d.addCallback(lambda res: self.POST("uri", t="upload",
1297                                             file=("new.txt", "data" * 10000)))
1298         # and again using the helper, which exercises different upload-status
1299         # display code
1300         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1301                                             file=("foo.txt", "data2" * 10000)))
1302
1303         # check that the status page exists
1304         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1305         def _got_status(res):
1306             # find an interesting upload and download to look at. LIT files
1307             # are not interesting.
1308             for ds in self.clients[0].list_all_download_statuses():
1309                 if ds.get_size() > 200:
1310                     self._down_status = ds.get_counter()
1311             for us in self.clients[0].list_all_upload_statuses():
1312                 if us.get_size() > 200:
1313                     self._up_status = us.get_counter()
1314             rs = self.clients[0].list_all_retrieve_statuses()[0]
1315             self._retrieve_status = rs.get_counter()
1316             ps = self.clients[0].list_all_publish_statuses()[0]
1317             self._publish_status = ps.get_counter()
1318             us = self.clients[0].list_all_mapupdate_statuses()[0]
1319             self._update_status = us.get_counter()
1320
1321             # and that there are some upload- and download- status pages
1322             return self.GET("status/up-%d" % self._up_status)
1323         d.addCallback(_got_status)
1324         def _got_up(res):
1325             return self.GET("status/down-%d" % self._down_status)
1326         d.addCallback(_got_up)
1327         def _got_down(res):
1328             return self.GET("status/mapupdate-%d" % self._update_status)
1329         d.addCallback(_got_down)
1330         def _got_update(res):
1331             return self.GET("status/publish-%d" % self._publish_status)
1332         d.addCallback(_got_update)
1333         def _got_publish(res):
1334             return self.GET("status/retrieve-%d" % self._retrieve_status)
1335         d.addCallback(_got_publish)
1336
1337         # check that the helper status page exists
1338         d.addCallback(lambda res:
1339                       self.GET("helper_status", followRedirect=True))
1340         def _got_helper_status(res):
1341             self.failUnless("Bytes Fetched:" in res)
1342             # touch a couple of files in the helper's working directory to
1343             # exercise more code paths
1344             workdir = os.path.join(self.getdir("client0"), "helper")
1345             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1346             f = open(incfile, "wb")
1347             f.write("small file")
1348             f.close()
1349             then = time.time() - 86400*3
1350             now = time.time()
1351             os.utime(incfile, (now, then))
1352             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1353             f = open(encfile, "wb")
1354             f.write("less small file")
1355             f.close()
1356             os.utime(encfile, (now, then))
1357         d.addCallback(_got_helper_status)
1358         # and that the json form exists
1359         d.addCallback(lambda res:
1360                       self.GET("helper_status?t=json", followRedirect=True))
1361         def _got_helper_status_json(res):
1362             data = simplejson.loads(res)
1363             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1364                                  1)
1365             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1366             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1367             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1368                                  10)
1369             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1370             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1371             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1372                                  15)
1373         d.addCallback(_got_helper_status_json)
1374
1375         # and check that client[3] (which uses a helper but does not run one
1376         # itself) doesn't explode when you ask for its status
1377         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1378         def _got_non_helper_status(res):
1379             self.failUnless("Upload and Download Status" in res)
1380         d.addCallback(_got_non_helper_status)
1381
1382         # or for helper status with t=json
1383         d.addCallback(lambda res:
1384                       getPage(self.helper_webish_url + "helper_status?t=json"))
1385         def _got_non_helper_status_json(res):
1386             data = simplejson.loads(res)
1387             self.failUnlessEqual(data, {})
1388         d.addCallback(_got_non_helper_status_json)
1389
1390         # see if the statistics page exists
1391         d.addCallback(lambda res: self.GET("statistics"))
1392         def _got_stats(res):
1393             self.failUnless("Node Statistics" in res)
1394             self.failUnless("  'downloader.files_downloaded': 8," in res)
1395         d.addCallback(_got_stats)
1396         d.addCallback(lambda res: self.GET("statistics?t=json"))
1397         def _got_stats_json(res):
1398             data = simplejson.loads(res)
1399             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1400             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1401         d.addCallback(_got_stats_json)
1402
1403         # TODO: mangle the second segment of a file, to test errors that
1404         # occur after we've already sent some good data, which uses a
1405         # different error path.
1406
1407         # TODO: download a URI with a form
1408         # TODO: create a directory by using a form
1409         # TODO: upload by using a form on the directory page
1410         #    url = base + "somedir/subdir1/freeform_post!!upload"
1411         # TODO: delete a file by using a button on the directory page
1412
1413         return d
1414
1415     def _test_runner(self, res):
1416         # exercise some of the diagnostic tools in runner.py
1417
1418         # find a share
1419         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1420             if "storage" not in dirpath:
1421                 continue
1422             if not filenames:
1423                 continue
1424             pieces = dirpath.split(os.sep)
1425             if pieces[-4] == "storage" and pieces[-3] == "shares":
1426                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1427                 # are sharefiles here
1428                 filename = os.path.join(dirpath, filenames[0])
1429                 # peek at the magic to see if it is a chk share
1430                 magic = open(filename, "rb").read(4)
1431                 if magic == '\x00\x00\x00\x01':
1432                     break
1433         else:
1434             self.fail("unable to find any uri_extension files in %s"
1435                       % self.basedir)
1436         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1437
1438         out,err = StringIO(), StringIO()
1439         rc = runner.runner(["dump-share",
1440                             filename],
1441                            stdout=out, stderr=err)
1442         output = out.getvalue()
1443         self.failUnlessEqual(rc, 0)
1444
1445         # we only upload a single file, so we can assert some things about
1446         # its size and shares.
1447         self.failUnless(("share filename: %s" % filename) in output)
1448         self.failUnless("size: %d\n" % len(self.data) in output)
1449         self.failUnless("num_segments: 1\n" in output)
1450         # segment_size is always a multiple of needed_shares
1451         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1452         self.failUnless("total_shares: 10\n" in output)
1453         # keys which are supposed to be present
1454         for key in ("size", "num_segments", "segment_size",
1455                     "needed_shares", "total_shares",
1456                     "codec_name", "codec_params", "tail_codec_params",
1457                     #"plaintext_hash", "plaintext_root_hash",
1458                     "crypttext_hash", "crypttext_root_hash",
1459                     "share_root_hash", "UEB_hash"):
1460             self.failUnless("%s: " % key in output, key)
1461
1462         # now use its storage index to find the other shares using the
1463         # 'find-shares' tool
1464         sharedir, shnum = os.path.split(filename)
1465         storagedir, storage_index_s = os.path.split(sharedir)
1466         out,err = StringIO(), StringIO()
1467         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1468         cmd = ["find-shares", storage_index_s] + nodedirs
1469         rc = runner.runner(cmd, stdout=out, stderr=err)
1470         self.failUnlessEqual(rc, 0)
1471         out.seek(0)
1472         sharefiles = [sfn.strip() for sfn in out.readlines()]
1473         self.failUnlessEqual(len(sharefiles), 10)
1474
1475         # also exercise the 'catalog-shares' tool
1476         out,err = StringIO(), StringIO()
1477         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1478         cmd = ["catalog-shares"] + nodedirs
1479         rc = runner.runner(cmd, stdout=out, stderr=err)
1480         self.failUnlessEqual(rc, 0)
1481         out.seek(0)
1482         descriptions = [sfn.strip() for sfn in out.readlines()]
1483         self.failUnlessEqual(len(descriptions), 30)
1484         matching = [line
1485                     for line in descriptions
1486                     if line.startswith("CHK %s " % storage_index_s)]
1487         self.failUnlessEqual(len(matching), 10)
1488
1489     def _test_control(self, res):
1490         # exercise the remote-control-the-client foolscap interfaces in
1491         # allmydata.control (mostly used for performance tests)
1492         c0 = self.clients[0]
1493         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1494         control_furl = open(control_furl_file, "r").read().strip()
1495         # it doesn't really matter which Tub we use to connect to the client,
1496         # so let's just use our IntroducerNode's
1497         d = self.introducer.tub.getReference(control_furl)
1498         d.addCallback(self._test_control2, control_furl_file)
1499         return d
1500     def _test_control2(self, rref, filename):
1501         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1502         downfile = os.path.join(self.basedir, "control.downfile")
1503         d.addCallback(lambda uri:
1504                       rref.callRemote("download_from_uri_to_file",
1505                                       uri, downfile))
1506         def _check(res):
1507             self.failUnlessEqual(res, downfile)
1508             data = open(downfile, "r").read()
1509             expected_data = open(filename, "r").read()
1510             self.failUnlessEqual(data, expected_data)
1511         d.addCallback(_check)
1512         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1513         if sys.platform == "linux2":
1514             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1515         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1516         return d
1517
1518     def _test_cli(self, res):
1519         # run various CLI commands (in a thread, since they use blocking
1520         # network calls)
1521
1522         private_uri = self._private_node.get_uri()
1523         some_uri = self._root_directory_uri
1524         client0_basedir = self.getdir("client0")
1525
1526         nodeargs = [
1527             "--node-directory", client0_basedir,
1528             ]
1529         TESTDATA = "I will not write the same thing over and over.\n" * 100
1530
1531         d = defer.succeed(None)
1532
1533         # for compatibility with earlier versions, private/root_dir.cap is
1534         # supposed to be treated as an alias named "tahoe:". Start by making
1535         # sure that works, before we add other aliases.
1536
1537         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1538         f = open(root_file, "w")
1539         f.write(private_uri)
1540         f.close()
1541
1542         def run(ignored, verb, *args):
1543             newargs = [verb] + nodeargs + list(args)
1544             return self._run_cli(newargs)
1545
1546         def _check_ls((out,err), expected_children, unexpected_children=[]):
1547             self.failUnlessEqual(err, "")
1548             for s in expected_children:
1549                 self.failUnless(s in out, s)
1550             for s in unexpected_children:
1551                 self.failIf(s in out, s)
1552
1553         def _check_ls_root((out,err)):
1554             self.failUnless("personal" in out)
1555             self.failUnless("s2-ro" in out)
1556             self.failUnless("s2-rw" in out)
1557             self.failUnlessEqual(err, "")
1558
1559         # this should reference private_uri
1560         d.addCallback(run, "ls")
1561         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1562
1563         d.addCallback(run, "list-aliases")
1564         def _check_aliases_1((out,err)):
1565             self.failUnlessEqual(err, "")
1566             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1567         d.addCallback(_check_aliases_1)
1568
1569         # now that that's out of the way, remove root_dir.cap and work with
1570         # new files
1571         d.addCallback(lambda res: os.unlink(root_file))
1572         d.addCallback(run, "list-aliases")
1573         def _check_aliases_2((out,err)):
1574             self.failUnlessEqual(err, "")
1575             self.failUnlessEqual(out, "")
1576         d.addCallback(_check_aliases_2)
1577
1578         d.addCallback(run, "mkdir")
1579         def _got_dir( (out,err) ):
1580             self.failUnless(uri.from_string_dirnode(out.strip()))
1581             return out.strip()
1582         d.addCallback(_got_dir)
1583         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1584
1585         d.addCallback(run, "list-aliases")
1586         def _check_aliases_3((out,err)):
1587             self.failUnlessEqual(err, "")
1588             self.failUnless("tahoe: " in out)
1589         d.addCallback(_check_aliases_3)
1590
1591         def _check_empty_dir((out,err)):
1592             self.failUnlessEqual(out, "")
1593             self.failUnlessEqual(err, "")
1594         d.addCallback(run, "ls")
1595         d.addCallback(_check_empty_dir)
1596
1597         def _check_missing_dir((out,err)):
1598             # TODO: check that rc==2
1599             self.failUnlessEqual(out, "")
1600             self.failUnlessEqual(err, "No such file or directory\n")
1601         d.addCallback(run, "ls", "bogus")
1602         d.addCallback(_check_missing_dir)
1603
1604         files = []
1605         datas = []
1606         for i in range(10):
1607             fn = os.path.join(self.basedir, "file%d" % i)
1608             files.append(fn)
1609             data = "data to be uploaded: file%d\n" % i
1610             datas.append(data)
1611             open(fn,"wb").write(data)
1612
1613         def _check_stdout_against((out,err), filenum=None, data=None):
1614             self.failUnlessEqual(err, "")
1615             if filenum is not None:
1616                 self.failUnlessEqual(out, datas[filenum])
1617             if data is not None:
1618                 self.failUnlessEqual(out, data)
1619
1620         # test all both forms of put: from a file, and from stdin
1621         #  tahoe put bar FOO
1622         d.addCallback(run, "put", files[0], "tahoe-file0")
1623         def _put_out((out,err)):
1624             self.failUnless("URI:LIT:" in out, out)
1625             self.failUnless("201 Created" in err, err)
1626             uri0 = out.strip()
1627             return run(None, "get", uri0)
1628         d.addCallback(_put_out)
1629         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1630
1631         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1632         #  tahoe put bar tahoe:FOO
1633         d.addCallback(run, "put", files[2], "tahoe:file2")
1634         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1635         def _check_put_mutable((out,err)):
1636             self._mutable_file3_uri = out.strip()
1637         d.addCallback(_check_put_mutable)
1638         d.addCallback(run, "get", "tahoe:file3")
1639         d.addCallback(_check_stdout_against, 3)
1640
1641         def _put_from_stdin(res, data, *args):
1642             args = nodeargs + list(args)
1643             o = cli.PutOptions()
1644             o.parseOptions(args)
1645             stdin = StringIO(data)
1646             stdout, stderr = StringIO(), StringIO()
1647             d = threads.deferToThread(cli.put, o,
1648                                       stdout=stdout, stderr=stderr, stdin=stdin)
1649             def _done(res):
1650                 return stdout.getvalue(), stderr.getvalue()
1651             d.addCallback(_done)
1652             return d
1653
1654         #  tahoe put FOO
1655         STDIN_DATA = "This is the file to upload from stdin."
1656         d.addCallback(_put_from_stdin,
1657                       STDIN_DATA,
1658                       "tahoe-file-stdin")
1659         #  tahoe put tahoe:FOO
1660         d.addCallback(_put_from_stdin,
1661                       "Other file from stdin.",
1662                       "tahoe:from-stdin")
1663
1664         d.addCallback(run, "ls")
1665         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1666                                   "tahoe-file-stdin", "from-stdin"])
1667         d.addCallback(run, "ls", "subdir")
1668         d.addCallback(_check_ls, ["tahoe-file1"])
1669
1670         # tahoe mkdir FOO
1671         d.addCallback(run, "mkdir", "subdir2")
1672         d.addCallback(run, "ls")
1673         # TODO: extract the URI, set an alias with it
1674         d.addCallback(_check_ls, ["subdir2"])
1675
1676         # tahoe get: (to stdin and to a file)
1677         d.addCallback(run, "get", "tahoe-file0")
1678         d.addCallback(_check_stdout_against, 0)
1679         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1680         d.addCallback(_check_stdout_against, 1)
1681         outfile0 = os.path.join(self.basedir, "outfile0")
1682         d.addCallback(run, "get", "file2", outfile0)
1683         def _check_outfile0((out,err)):
1684             data = open(outfile0,"rb").read()
1685             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1686         d.addCallback(_check_outfile0)
1687         outfile1 = os.path.join(self.basedir, "outfile0")
1688         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1689         def _check_outfile1((out,err)):
1690             data = open(outfile1,"rb").read()
1691             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1692         d.addCallback(_check_outfile1)
1693
1694         d.addCallback(run, "rm", "tahoe-file0")
1695         d.addCallback(run, "rm", "tahoe:file2")
1696         d.addCallback(run, "ls")
1697         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1698
1699         d.addCallback(run, "ls", "-l")
1700         def _check_ls_l((out,err)):
1701             lines = out.split("\n")
1702             for l in lines:
1703                 if "tahoe-file-stdin" in l:
1704                     self.failUnless(l.startswith("-r-- "), l)
1705                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1706                 if "file3" in l:
1707                     self.failUnless(l.startswith("-rw- "), l) # mutable
1708         d.addCallback(_check_ls_l)
1709
1710         d.addCallback(run, "ls", "--uri")
1711         def _check_ls_uri((out,err)):
1712             lines = out.split("\n")
1713             for l in lines:
1714                 if "file3" in l:
1715                     self.failUnless(self._mutable_file3_uri in l)
1716         d.addCallback(_check_ls_uri)
1717
1718         d.addCallback(run, "ls", "--readonly-uri")
1719         def _check_ls_rouri((out,err)):
1720             lines = out.split("\n")
1721             for l in lines:
1722                 if "file3" in l:
1723                     rw_uri = self._mutable_file3_uri
1724                     u = uri.from_string_mutable_filenode(rw_uri)
1725                     ro_uri = u.get_readonly().to_string()
1726                     self.failUnless(ro_uri in l)
1727         d.addCallback(_check_ls_rouri)
1728
1729
1730         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1731         d.addCallback(run, "ls")
1732         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1733
1734         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1735         d.addCallback(run, "ls")
1736         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1737
1738         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1739         d.addCallback(run, "ls")
1740         d.addCallback(_check_ls, ["file3", "file3-copy"])
1741         d.addCallback(run, "get", "tahoe:file3-copy")
1742         d.addCallback(_check_stdout_against, 3)
1743
1744         # copy from disk into tahoe
1745         d.addCallback(run, "cp", files[4], "tahoe:file4")
1746         d.addCallback(run, "ls")
1747         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1748         d.addCallback(run, "get", "tahoe:file4")
1749         d.addCallback(_check_stdout_against, 4)
1750
1751         # copy from tahoe into disk
1752         target_filename = os.path.join(self.basedir, "file-out")
1753         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1754         def _check_cp_out((out,err)):
1755             self.failUnless(os.path.exists(target_filename))
1756             got = open(target_filename,"rb").read()
1757             self.failUnlessEqual(got, datas[4])
1758         d.addCallback(_check_cp_out)
1759
1760         # copy from disk to disk (silly case)
1761         target2_filename = os.path.join(self.basedir, "file-out-copy")
1762         d.addCallback(run, "cp", target_filename, target2_filename)
1763         def _check_cp_out2((out,err)):
1764             self.failUnless(os.path.exists(target2_filename))
1765             got = open(target2_filename,"rb").read()
1766             self.failUnlessEqual(got, datas[4])
1767         d.addCallback(_check_cp_out2)
1768
1769         # copy from tahoe into disk, overwriting an existing file
1770         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1771         def _check_cp_out3((out,err)):
1772             self.failUnless(os.path.exists(target_filename))
1773             got = open(target_filename,"rb").read()
1774             self.failUnlessEqual(got, datas[3])
1775         d.addCallback(_check_cp_out3)
1776
1777         # copy from disk into tahoe, overwriting an existing immutable file
1778         d.addCallback(run, "cp", files[5], "tahoe:file4")
1779         d.addCallback(run, "ls")
1780         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1781         d.addCallback(run, "get", "tahoe:file4")
1782         d.addCallback(_check_stdout_against, 5)
1783
1784         # copy from disk into tahoe, overwriting an existing mutable file
1785         d.addCallback(run, "cp", files[5], "tahoe:file3")
1786         d.addCallback(run, "ls")
1787         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1788         d.addCallback(run, "get", "tahoe:file3")
1789         d.addCallback(_check_stdout_against, 5)
1790
1791         # recursive copy: setup
1792         dn = os.path.join(self.basedir, "dir1")
1793         os.makedirs(dn)
1794         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1795         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1796         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1797         sdn2 = os.path.join(dn, "subdir2")
1798         os.makedirs(sdn2)
1799         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1800         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1801
1802         # from disk into tahoe
1803         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1804         d.addCallback(run, "ls")
1805         d.addCallback(_check_ls, ["dir1"])
1806         d.addCallback(run, "ls", "dir1")
1807         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1808                       ["rfile4", "rfile5"])
1809         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1810         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1811                       ["rfile1", "rfile2", "rfile3"])
1812         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1813         d.addCallback(_check_stdout_against, data="rfile4")
1814
1815         # tahoe_ls doesn't currently handle the error correctly: it tries to
1816         # JSON-parse a traceback.
1817 ##         def _ls_missing(res):
1818 ##             argv = ["ls"] + nodeargs + ["bogus"]
1819 ##             return self._run_cli(argv)
1820 ##         d.addCallback(_ls_missing)
1821 ##         def _check_ls_missing((out,err)):
1822 ##             print "OUT", out
1823 ##             print "ERR", err
1824 ##             self.failUnlessEqual(err, "")
1825 ##         d.addCallback(_check_ls_missing)
1826
1827         return d
1828
1829     def _run_cli(self, argv):
1830         #print "CLI:", argv
1831         stdout, stderr = StringIO(), StringIO()
1832         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1833                                   stdout=stdout, stderr=stderr)
1834         def _done(res):
1835             return stdout.getvalue(), stderr.getvalue()
1836         d.addCallback(_done)
1837         return d
1838
1839     def _test_checker(self, res):
1840         d = self._private_node.build_manifest()
1841         d.addCallback(self._test_checker_2)
1842         return d
1843
1844     def _test_checker_2(self, manifest):
1845         checker1 = self.clients[1].getServiceNamed("checker")
1846         self.failUnlessEqual(checker1.checker_results_for(None), [])
1847         self.failUnlessEqual(checker1.checker_results_for(list(manifest)[0]),
1848                              [])
1849         dl = []
1850         starting_time = time.time()
1851         for si in manifest:
1852             dl.append(checker1.check(si))
1853         d = deferredutil.DeferredListShouldSucceed(dl)
1854
1855         def _check_checker_results(res):
1856             for i in res:
1857                 if type(i) is bool:
1858                     self.failUnless(i is True)
1859                 else:
1860                     (needed, total, found, sharemap) = i
1861                     self.failUnlessEqual(needed, 3)
1862                     self.failUnlessEqual(total, 10)
1863                     self.failUnlessEqual(found, total)
1864                     self.failUnlessEqual(len(sharemap.keys()), 10)
1865                     peers = set()
1866                     for shpeers in sharemap.values():
1867                         peers.update(shpeers)
1868                     self.failUnlessEqual(len(peers), self.numclients)
1869         d.addCallback(_check_checker_results)
1870
1871         def _check_stored_results(res):
1872             finish_time = time.time()
1873             all_results = []
1874             for si in manifest:
1875                 results = checker1.checker_results_for(si)
1876                 if not results:
1877                     # TODO: implement checker for mutable files and implement tests of that checker
1878                     continue
1879                 self.failUnlessEqual(len(results), 1)
1880                 when, those_results = results[0]
1881                 self.failUnless(isinstance(when, (int, float)))
1882                 self.failUnless(starting_time <= when <= finish_time)
1883                 all_results.append(those_results)
1884             _check_checker_results(all_results)
1885         d.addCallback(_check_stored_results)
1886
1887         d.addCallback(self._test_checker_3)
1888         return d
1889
1890     def _test_checker_3(self, res):
1891         # check one file, through FileNode.check()
1892         d = self._private_node.get_child_at_path(u"personal/sekrit data")
1893         d.addCallback(lambda n: n.check())
1894         def _checked(results):
1895             # 'sekrit data' is small, and fits in a LiteralFileNode, so
1896             # checking it is trivial and always returns True
1897             self.failUnlessEqual(results, True)
1898         d.addCallback(_checked)
1899
1900         c0 = self.clients[1]
1901         n = c0.create_node_from_uri(self._root_directory_uri)
1902         d.addCallback(lambda res: n.get_child_at_path(u"subdir1/mydata567"))
1903         d.addCallback(lambda n: n.check())
1904         def _checked2(results):
1905             # mydata567 is large and lives in a CHK
1906             (needed, total, found, sharemap) = results
1907             self.failUnlessEqual(needed, 3)
1908             self.failUnlessEqual(total, 10)
1909             self.failUnlessEqual(found, 10)
1910             self.failUnlessEqual(len(sharemap), 10)
1911             for shnum in range(10):
1912                 self.failUnlessEqual(len(sharemap[shnum]), 1)
1913         d.addCallback(_checked2)
1914         return d
1915
1916
1917     def _test_verifier(self, res):
1918         checker1 = self.clients[1].getServiceNamed("checker")
1919         d = self._private_node.build_manifest()
1920         def _check_all(manifest):
1921             dl = []
1922             for si in manifest:
1923                 dl.append(checker1.verify(si))
1924             return deferredutil.DeferredListShouldSucceed(dl)
1925         d.addCallback(_check_all)
1926         def _done(res):
1927             for i in res:
1928                 self.failUnless(i is True)
1929         d.addCallback(_done)
1930         d.addCallback(lambda res: checker1.verify(None))
1931         d.addCallback(self.failUnlessEqual, True)
1932         return d