]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
add a webserver for the Introducer, showing service announcements and subscriber...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1
2 from base64 import b32encode
3 import os, sys, time, re
4 from cStringIO import StringIO
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.internet import threads # CLI tests use deferToThread
8 from twisted.internet.error import ConnectionDone, ConnectionLost
9 from twisted.application import service
10 import allmydata
11 from allmydata import client, uri, download, upload, storage, mutable, offloaded
12 from allmydata.introducer import IntroducerNode
13 from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
14 from allmydata.util import log
15 from allmydata.scripts import runner
16 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
17 from allmydata.mutable import NotMutableError
18 from allmydata.stats import PickleStatsGatherer
19 from foolscap.eventual import flushEventualQueue
20 from foolscap import DeadReferenceError, Tub
21 from twisted.python.failure import Failure
22 from twisted.web.client import getPage
23 from twisted.web.error import Error
24
25 def flush_but_dont_ignore(res):
26     d = flushEventualQueue()
27     def _done(ignored):
28         return res
29     d.addCallback(_done)
30     return d
31
32 LARGE_DATA = """
33 This is some data to publish to the virtual drive, which needs to be large
34 enough to not fit inside a LIT uri.
35 """
36
37 class CountingDataUploadable(upload.Data):
38     bytes_read = 0
39     interrupt_after = None
40     interrupt_after_d = None
41
42     def read(self, length):
43         self.bytes_read += length
44         if self.interrupt_after is not None:
45             if self.bytes_read > self.interrupt_after:
46                 self.interrupt_after = None
47                 self.interrupt_after_d.callback(self)
48         return upload.Data.read(self, length)
49
50
51 class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
52
53     def setUp(self):
54         self.sparent = service.MultiService()
55         self.sparent.startService()
56     def tearDown(self):
57         log.msg("shutting down SystemTest services")
58         d = self.sparent.stopService()
59         d.addBoth(flush_but_dont_ignore)
60         return d
61
62     def getdir(self, subdir):
63         return os.path.join(self.basedir, subdir)
64
65     def add_service(self, s):
66         s.setServiceParent(self.sparent)
67         return s
68
69     def set_up_nodes(self, NUMCLIENTS=5, createprivdir=False):
70         self.numclients = NUMCLIENTS
71         self.createprivdir = createprivdir
72         iv_dir = self.getdir("introducer")
73         if not os.path.isdir(iv_dir):
74             fileutil.make_dirs(iv_dir)
75         f = open(os.path.join(iv_dir, "webport"), "w")
76         f.write("tcp:0:interface=127.0.0.1\n")
77         f.close()
78         iv = IntroducerNode(basedir=iv_dir)
79         self.introducer = self.add_service(iv)
80         d = self.introducer.when_tub_ready()
81         d.addCallback(self._get_introducer_web)
82         d.addCallback(self._set_up_stats_gatherer)
83         d.addCallback(self._set_up_nodes_2)
84         d.addCallback(self._grab_stats)
85         return d
86
87     def _get_introducer_web(self, res):
88         f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
89         self.introweb_url = f.read().strip()
90         f.close()
91
92     def _set_up_stats_gatherer(self, res):
93         statsdir = self.getdir("stats_gatherer")
94         fileutil.make_dirs(statsdir)
95         t = Tub()
96         self.add_service(t)
97         l = t.listenOn("tcp:0")
98         p = l.getPortnum()
99         t.setLocation("localhost:%d" % p)
100
101         self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
102         self.add_service(self.stats_gatherer)
103         self.stats_gatherer_furl = self.stats_gatherer.get_furl()
104
105     def _set_up_nodes_2(self, res):
106         q = self.introducer
107         self.introducer_furl = q.introducer_url
108         self.clients = []
109         basedirs = []
110         for i in range(self.numclients):
111             basedir = self.getdir("client%d" % i)
112             basedirs.append(basedir)
113             fileutil.make_dirs(basedir)
114             if i == 0:
115                 # client[0] runs a webserver and a helper
116                 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
117                 open(os.path.join(basedir, "run_helper"), "w").write("yes\n")
118             if i == 3:
119                 # client[3] runs a webserver and uses a helper
120                 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
121             if self.createprivdir:
122                 fileutil.make_dirs(os.path.join(basedir, "private"))
123                 open(os.path.join(basedir, "private", "root_dir.cap"), "w")
124             open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
125             open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
126
127         # start client[0], wait for it's tub to be ready (at which point it
128         # will have registered the helper furl).
129         c = self.add_service(client.Client(basedir=basedirs[0]))
130         self.clients.append(c)
131         d = c.when_tub_ready()
132         def _ready(res):
133             f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
134             helper_furl = f.read()
135             f.close()
136             self.helper_furl = helper_furl
137             f = open(os.path.join(basedirs[3],"helper.furl"), "w")
138             f.write(helper_furl)
139             f.close()
140
141             # this starts the rest of the clients
142             for i in range(1, self.numclients):
143                 c = self.add_service(client.Client(basedir=basedirs[i]))
144                 self.clients.append(c)
145             log.msg("STARTING")
146             return self.wait_for_connections()
147         d.addCallback(_ready)
148         def _connected(res):
149             log.msg("CONNECTED")
150             # now find out where the web port was
151             l = self.clients[0].getServiceNamed("webish").listener
152             port = l._port.getHost().port
153             self.webish_url = "http://localhost:%d/" % port
154             # and the helper-using webport
155             l = self.clients[3].getServiceNamed("webish").listener
156             port = l._port.getHost().port
157             self.helper_webish_url = "http://localhost:%d/" % port
158         d.addCallback(_connected)
159         return d
160
161     def _grab_stats(self, res):
162         d = self.stats_gatherer.poll()
163         return d
164
165     def bounce_client(self, num):
166         c = self.clients[num]
167         d = c.disownServiceParent()
168         # I think windows requires a moment to let the connection really stop
169         # and the port number made available for re-use. TODO: examine the
170         # behavior, see if this is really the problem, see if we can do
171         # better than blindly waiting for a second.
172         d.addCallback(self.stall, 1.0)
173         def _stopped(res):
174             new_c = client.Client(basedir=self.getdir("client%d" % num))
175             self.clients[num] = new_c
176             self.add_service(new_c)
177             return new_c.when_tub_ready()
178         d.addCallback(_stopped)
179         d.addCallback(lambda res: self.wait_for_connections())
180         def _maybe_get_webport(res):
181             if num == 0:
182                 # now find out where the web port was
183                 l = self.clients[0].getServiceNamed("webish").listener
184                 port = l._port.getHost().port
185                 self.webish_url = "http://localhost:%d/" % port
186         d.addCallback(_maybe_get_webport)
187         return d
188
189     def add_extra_node(self, client_num, helper_furl=None,
190                        add_to_sparent=False):
191         # usually this node is *not* parented to our self.sparent, so we can
192         # shut it down separately from the rest, to exercise the
193         # connection-lost code
194         basedir = self.getdir("client%d" % client_num)
195         if not os.path.isdir(basedir):
196             fileutil.make_dirs(basedir)
197         open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
198         if helper_furl:
199             f = open(os.path.join(basedir, "helper.furl") ,"w")
200             f.write(helper_furl+"\n")
201             f.close()
202
203         c = client.Client(basedir=basedir)
204         self.clients.append(c)
205         self.numclients += 1
206         if add_to_sparent:
207             c.setServiceParent(self.sparent)
208         else:
209             c.startService()
210         d = self.wait_for_connections()
211         d.addCallback(lambda res: c)
212         return d
213
214     def _check_connections(self):
215         for c in self.clients:
216             ic = c.introducer_client
217             if not ic.connected_to_introducer():
218                 return False
219             if len(ic.get_all_peerids()) != self.numclients:
220                 return False
221         return True
222
223     def wait_for_connections(self, ignored=None):
224         # TODO: replace this with something that takes a list of peerids and
225         # fires when they've all been heard from, instead of using a count
226         # and a threshold
227         return self.poll(self._check_connections, timeout=200)
228
229     def test_connections(self):
230         self.basedir = "system/SystemTest/test_connections"
231         d = self.set_up_nodes()
232         self.extra_node = None
233         d.addCallback(lambda res: self.add_extra_node(self.numclients))
234         def _check(extra_node):
235             self.extra_node = extra_node
236             for c in self.clients:
237                 all_peerids = list(c.get_all_peerids())
238                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
239                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
240                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
241
242         d.addCallback(_check)
243         def _shutdown_extra_node(res):
244             if self.extra_node:
245                 return self.extra_node.stopService()
246             return res
247         d.addBoth(_shutdown_extra_node)
248         return d
249     test_connections.timeout = 300
250     # test_connections is subsumed by test_upload_and_download, and takes
251     # quite a while to run on a slow machine (because of all the TLS
252     # connections that must be established). If we ever rework the introducer
253     # code to such an extent that we're not sure if it works anymore, we can
254     # reinstate this test until it does.
255     del test_connections
256
257     def test_upload_and_download_random_key(self):
258         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
259         return self._test_upload_and_download(False)
260     test_upload_and_download_random_key.timeout = 4800
261
262     def test_upload_and_download_content_hash_key(self):
263         self.basedir = "system/SystemTest/test_upload_and_download_CHK"
264         return self._test_upload_and_download(True)
265     test_upload_and_download_content_hash_key.timeout = 4800
266
267     def _test_upload_and_download(self, contenthashkey):
268         # we use 4000 bytes of data, which will result in about 400k written
269         # to disk among all our simulated nodes
270         DATA = "Some data to upload\n" * 200
271         d = self.set_up_nodes()
272         def _check_connections(res):
273             for c in self.clients:
274                 all_peerids = list(c.get_all_peerids())
275                 self.failUnlessEqual(len(all_peerids), self.numclients)
276                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
277                 self.failUnlessEqual(len(permuted_peers), self.numclients)
278         d.addCallback(_check_connections)
279
280         def _do_upload(res):
281             log.msg("UPLOADING")
282             u = self.clients[0].getServiceNamed("uploader")
283             self.uploader = u
284             # we crank the max segsize down to 1024b for the duration of this
285             # test, so we can exercise multiple segments. It is important
286             # that this is not a multiple of the segment size, so that the
287             # tail segment is not the same length as the others. This actualy
288             # gets rounded up to 1025 to be a multiple of the number of
289             # required shares (since we use 25 out of 100 FEC).
290             up = upload.Data(DATA, contenthashkey=contenthashkey)
291             up.max_segment_size = 1024
292             d1 = u.upload(up)
293             return d1
294         d.addCallback(_do_upload)
295         def _upload_done(results):
296             uri = results.uri
297             log.msg("upload finished: uri is %s" % (uri,))
298             self.uri = uri
299             dl = self.clients[1].getServiceNamed("downloader")
300             self.downloader = dl
301         d.addCallback(_upload_done)
302
303         def _upload_again(res):
304             # Upload again. If contenthashkey then this ought to be
305             # short-circuited, however with the way we currently generate URIs
306             # (i.e. because they include the roothash), we have to do all of the
307             # encoding work, and only get to save on the upload part.
308             log.msg("UPLOADING AGAIN")
309             up = upload.Data(DATA, contenthashkey=contenthashkey)
310             up.max_segment_size = 1024
311             d1 = self.uploader.upload(up)
312         d.addCallback(_upload_again)
313
314         def _download_to_data(res):
315             log.msg("DOWNLOADING")
316             return self.downloader.download_to_data(self.uri)
317         d.addCallback(_download_to_data)
318         def _download_to_data_done(data):
319             log.msg("download finished")
320             self.failUnlessEqual(data, DATA)
321         d.addCallback(_download_to_data_done)
322
323         target_filename = os.path.join(self.basedir, "download.target")
324         def _download_to_filename(res):
325             return self.downloader.download_to_filename(self.uri,
326                                                         target_filename)
327         d.addCallback(_download_to_filename)
328         def _download_to_filename_done(res):
329             newdata = open(target_filename, "rb").read()
330             self.failUnlessEqual(newdata, DATA)
331         d.addCallback(_download_to_filename_done)
332
333         target_filename2 = os.path.join(self.basedir, "download.target2")
334         def _download_to_filehandle(res):
335             fh = open(target_filename2, "wb")
336             return self.downloader.download_to_filehandle(self.uri, fh)
337         d.addCallback(_download_to_filehandle)
338         def _download_to_filehandle_done(fh):
339             fh.close()
340             newdata = open(target_filename2, "rb").read()
341             self.failUnlessEqual(newdata, DATA)
342         d.addCallback(_download_to_filehandle_done)
343
344         def _download_nonexistent_uri(res):
345             baduri = self.mangle_uri(self.uri)
346             log.msg("about to download non-existent URI", level=log.UNUSUAL,
347                     facility="tahoe.tests")
348             d1 = self.downloader.download_to_data(baduri)
349             def _baduri_should_fail(res):
350                 log.msg("finished downloading non-existend URI",
351                         level=log.UNUSUAL, facility="tahoe.tests")
352                 self.failUnless(isinstance(res, Failure))
353                 self.failUnless(res.check(download.NotEnoughPeersError),
354                                 "expected NotEnoughPeersError, got %s" % res)
355                 # TODO: files that have zero peers should get a special kind
356                 # of NotEnoughPeersError, which can be used to suggest that
357                 # the URI might be wrong or that they've never uploaded the
358                 # file in the first place.
359             d1.addBoth(_baduri_should_fail)
360             return d1
361         d.addCallback(_download_nonexistent_uri)
362
363         # add a new node, which doesn't accept shares, and only uses the
364         # helper for upload.
365         d.addCallback(lambda res: self.add_extra_node(self.numclients,
366                                                       self.helper_furl,
367                                                       add_to_sparent=True))
368         def _added(extra_node):
369             self.extra_node = extra_node
370             extra_node.getServiceNamed("storage").sizelimit = 0
371         d.addCallback(_added)
372
373         HELPER_DATA = "Data that needs help to upload" * 1000
374         def _upload_with_helper(res):
375             u = upload.Data(HELPER_DATA, contenthashkey=contenthashkey)
376             d = self.extra_node.upload(u)
377             def _uploaded(results):
378                 uri = results.uri
379                 return self.downloader.download_to_data(uri)
380             d.addCallback(_uploaded)
381             def _check(newdata):
382                 self.failUnlessEqual(newdata, HELPER_DATA)
383             d.addCallback(_check)
384             return d
385         d.addCallback(_upload_with_helper)
386
387         def _upload_duplicate_with_helper(res):
388             u = upload.Data(HELPER_DATA, contenthashkey=contenthashkey)
389             u.debug_stash_RemoteEncryptedUploadable = True
390             d = self.extra_node.upload(u)
391             def _uploaded(results):
392                 uri = results.uri
393                 return self.downloader.download_to_data(uri)
394             d.addCallback(_uploaded)
395             def _check(newdata):
396                 self.failUnlessEqual(newdata, HELPER_DATA)
397                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
398                             "uploadable started uploading, should have been avoided")
399             d.addCallback(_check)
400             return d
401         if contenthashkey:
402             d.addCallback(_upload_duplicate_with_helper)
403
404         def _upload_resumable(res):
405             DATA = "Data that needs help to upload and gets interrupted" * 1000
406             u1 = CountingDataUploadable(DATA, contenthashkey=contenthashkey)
407             u2 = CountingDataUploadable(DATA, contenthashkey=contenthashkey)
408
409             # we interrupt the connection after about 5kB by shutting down
410             # the helper, then restartingit.
411             u1.interrupt_after = 5000
412             u1.interrupt_after_d = defer.Deferred()
413             u1.interrupt_after_d.addCallback(lambda res:
414                                              self.bounce_client(0))
415
416             # sneak into the helper and reduce its chunk size, so that our
417             # debug_interrupt will sever the connection on about the fifth
418             # chunk fetched. This makes sure that we've started to write the
419             # new shares before we abandon them, which exercises the
420             # abort/delete-partial-share code. TODO: find a cleaner way to do
421             # this. I know that this will affect later uses of the helper in
422             # this same test run, but I'm not currently worried about it.
423             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
424
425             d = self.extra_node.upload(u1)
426
427             def _should_not_finish(res):
428                 self.fail("interrupted upload should have failed, not finished"
429                           " with result %s" % (res,))
430             def _interrupted(f):
431                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
432
433                 # make sure we actually interrupted it before finishing the
434                 # file
435                 self.failUnless(u1.bytes_read < len(DATA),
436                                 "read %d out of %d total" % (u1.bytes_read,
437                                                              len(DATA)))
438
439                 log.msg("waiting for reconnect", level=log.NOISY,
440                         facility="tahoe.test.test_system")
441                 # now, we need to give the nodes a chance to notice that this
442                 # connection has gone away. When this happens, the storage
443                 # servers will be told to abort their uploads, removing the
444                 # partial shares. Unfortunately this involves TCP messages
445                 # going through the loopback interface, and we can't easily
446                 # predict how long that will take. If it were all local, we
447                 # could use fireEventually() to stall. Since we don't have
448                 # the right introduction hooks, the best we can do is use a
449                 # fixed delay. TODO: this is fragile.
450                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
451                 return u1.interrupt_after_d
452             d.addCallbacks(_should_not_finish, _interrupted)
453
454             def _disconnected(res):
455                 # check to make sure the storage servers aren't still hanging
456                 # on to the partial share: their incoming/ directories should
457                 # now be empty.
458                 log.msg("disconnected", level=log.NOISY,
459                         facility="tahoe.test.test_system")
460                 for i in range(self.numclients):
461                     incdir = os.path.join(self.getdir("client%d" % i),
462                                           "storage", "shares", "incoming")
463                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
464             d.addCallback(_disconnected)
465
466             # then we need to give the reconnector a chance to
467             # reestablish the connection to the helper.
468             d.addCallback(lambda res:
469                           log.msg("wait_for_connections", level=log.NOISY,
470                                   facility="tahoe.test.test_system"))
471             d.addCallback(lambda res: self.wait_for_connections())
472
473
474             d.addCallback(lambda res:
475                           log.msg("uploading again", level=log.NOISY,
476                                   facility="tahoe.test.test_system"))
477             d.addCallback(lambda res: self.extra_node.upload(u2))
478
479             def _uploaded(results):
480                 uri = results.uri
481                 log.msg("Second upload complete", level=log.NOISY,
482                         facility="tahoe.test.test_system")
483
484                 # this is really bytes received rather than sent, but it's
485                 # convenient and basically measures the same thing
486                 bytes_sent = results.ciphertext_fetched
487
488                 # We currently don't support resumption of upload if the data is
489                 # encrypted with a random key.  (Because that would require us
490                 # to store the key locally and re-use it on the next upload of
491                 # this file, which isn't a bad thing to do, but we currently
492                 # don't do it.)
493                 if contenthashkey:
494                     # Make sure we did not have to read the whole file the
495                     # second time around .
496                     self.failUnless(bytes_sent < len(DATA),
497                                 "resumption didn't save us any work:"
498                                 " read %d bytes out of %d total" %
499                                 (bytes_sent, len(DATA)))
500                 else:
501                     # Make sure we did have to read the whole file the second
502                     # time around -- because the one that we partially uploaded
503                     # earlier was encrypted with a different random key.
504                     self.failIf(bytes_sent < len(DATA),
505                                 "resumption saved us some work even though we were using random keys:"
506                                 " read %d bytes out of %d total" %
507                                 (bytes_sent, len(DATA)))
508                 return self.downloader.download_to_data(uri)
509             d.addCallback(_uploaded)
510
511             def _check(newdata):
512                 self.failUnlessEqual(newdata, DATA)
513                 # If using a content hash key, then also check that the helper
514                 # has removed the temp file from its directories.
515                 if contenthashkey:
516                     basedir = os.path.join(self.getdir("client0"), "helper")
517                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
518                     self.failUnlessEqual(files, [])
519                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
520                     self.failUnlessEqual(files, [])
521             d.addCallback(_check)
522             return d
523         d.addCallback(_upload_resumable)
524
525         return d
526
527     def _find_shares(self, basedir):
528         shares = []
529         for (dirpath, dirnames, filenames) in os.walk(basedir):
530             if "storage" not in dirpath:
531                 continue
532             if not filenames:
533                 continue
534             pieces = dirpath.split(os.sep)
535             if pieces[-4] == "storage" and pieces[-3] == "shares":
536                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
537                 # are sharefiles here
538                 assert pieces[-5].startswith("client")
539                 client_num = int(pieces[-5][-1])
540                 storage_index_s = pieces[-1]
541                 storage_index = storage.si_a2b(storage_index_s)
542                 for sharename in filenames:
543                     shnum = int(sharename)
544                     filename = os.path.join(dirpath, sharename)
545                     data = (client_num, storage_index, filename, shnum)
546                     shares.append(data)
547         if not shares:
548             self.fail("unable to find any share files in %s" % basedir)
549         return shares
550
551     def _corrupt_mutable_share(self, filename, which):
552         msf = storage.MutableShareFile(filename)
553         datav = msf.readv([ (0, 1000000) ])
554         final_share = datav[0]
555         assert len(final_share) < 1000000 # ought to be truncated
556         pieces = mutable.unpack_share(final_share)
557         (seqnum, root_hash, IV, k, N, segsize, datalen,
558          verification_key, signature, share_hash_chain, block_hash_tree,
559          share_data, enc_privkey) = pieces
560
561         if which == "seqnum":
562             seqnum = seqnum + 15
563         elif which == "R":
564             root_hash = self.flip_bit(root_hash)
565         elif which == "IV":
566             IV = self.flip_bit(IV)
567         elif which == "segsize":
568             segsize = segsize + 15
569         elif which == "pubkey":
570             verification_key = self.flip_bit(verification_key)
571         elif which == "signature":
572             signature = self.flip_bit(signature)
573         elif which == "share_hash_chain":
574             nodenum = share_hash_chain.keys()[0]
575             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
576         elif which == "block_hash_tree":
577             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
578         elif which == "share_data":
579             share_data = self.flip_bit(share_data)
580         elif which == "encprivkey":
581             enc_privkey = self.flip_bit(enc_privkey)
582
583         prefix = mutable.pack_prefix(seqnum, root_hash, IV, k, N,
584                                      segsize, datalen)
585         final_share = mutable.pack_share(prefix,
586                                          verification_key,
587                                          signature,
588                                          share_hash_chain,
589                                          block_hash_tree,
590                                          share_data,
591                                          enc_privkey)
592         msf.writev( [(0, final_share)], None)
593
594     def test_mutable(self):
595         self.basedir = "system/SystemTest/test_mutable"
596         DATA = "initial contents go here."  # 25 bytes % 3 != 0
597         NEWDATA = "new contents yay"
598         NEWERDATA = "this is getting old"
599
600         d = self.set_up_nodes()
601
602         def _create_mutable(res):
603             c = self.clients[0]
604             log.msg("starting create_mutable_file")
605             d1 = c.create_mutable_file(DATA)
606             def _done(res):
607                 log.msg("DONE: %s" % (res,))
608                 self._mutable_node_1 = res
609                 uri = res.get_uri()
610             d1.addCallback(_done)
611             return d1
612         d.addCallback(_create_mutable)
613
614         def _test_debug(res):
615             # find a share. It is important to run this while there is only
616             # one slot in the grid.
617             shares = self._find_shares(self.basedir)
618             (client_num, storage_index, filename, shnum) = shares[0]
619             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
620                     % filename)
621             log.msg(" for clients[%d]" % client_num)
622
623             out,err = StringIO(), StringIO()
624             rc = runner.runner(["dump-share",
625                                 filename],
626                                stdout=out, stderr=err)
627             output = out.getvalue()
628             self.failUnlessEqual(rc, 0)
629             try:
630                 self.failUnless("Mutable slot found:\n" in output)
631                 self.failUnless("share_type: SDMF\n" in output)
632                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
633                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
634                 self.failUnless(" num_extra_leases: 0\n" in output)
635                 # the pubkey size can vary by a byte, so the container might
636                 # be a bit larger on some runs.
637                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
638                 self.failUnless(m)
639                 container_size = int(m.group(1))
640                 self.failUnless(2037 <= container_size <= 2049, container_size)
641                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
642                 self.failUnless(m)
643                 data_length = int(m.group(1))
644                 self.failUnless(2037 <= data_length <= 2049, data_length)
645                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
646                                 in output)
647                 self.failUnless(" SDMF contents:\n" in output)
648                 self.failUnless("  seqnum: 1\n" in output)
649                 self.failUnless("  required_shares: 3\n" in output)
650                 self.failUnless("  total_shares: 10\n" in output)
651                 self.failUnless("  segsize: 27\n" in output, (output, filename))
652                 self.failUnless("  datalen: 25\n" in output)
653                 # the exact share_hash_chain nodes depends upon the sharenum,
654                 # and is more of a hassle to compute than I want to deal with
655                 # now
656                 self.failUnless("  share_hash_chain: " in output)
657                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
658             except unittest.FailTest:
659                 print
660                 print "dump-share output was:"
661                 print output
662                 raise
663         d.addCallback(_test_debug)
664
665         # test retrieval
666
667         # first, let's see if we can use the existing node to retrieve the
668         # contents. This allows it to use the cached pubkey and maybe the
669         # latest-known sharemap.
670
671         d.addCallback(lambda res: self._mutable_node_1.download_to_data())
672         def _check_download_1(res):
673             self.failUnlessEqual(res, DATA)
674             # now we see if we can retrieve the data from a new node,
675             # constructed using the URI of the original one. We do this test
676             # on the same client that uploaded the data.
677             uri = self._mutable_node_1.get_uri()
678             log.msg("starting retrieve1")
679             newnode = self.clients[0].create_node_from_uri(uri)
680             return newnode.download_to_data()
681         d.addCallback(_check_download_1)
682
683         def _check_download_2(res):
684             self.failUnlessEqual(res, DATA)
685             # same thing, but with a different client
686             uri = self._mutable_node_1.get_uri()
687             newnode = self.clients[1].create_node_from_uri(uri)
688             log.msg("starting retrieve2")
689             d1 = newnode.download_to_data()
690             d1.addCallback(lambda res: (res, newnode))
691             return d1
692         d.addCallback(_check_download_2)
693
694         def _check_download_3((res, newnode)):
695             self.failUnlessEqual(res, DATA)
696             # replace the data
697             log.msg("starting replace1")
698             d1 = newnode.replace(NEWDATA)
699             d1.addCallback(lambda res: newnode.download_to_data())
700             return d1
701         d.addCallback(_check_download_3)
702
703         def _check_download_4(res):
704             self.failUnlessEqual(res, NEWDATA)
705             # now create an even newer node and replace the data on it. This
706             # new node has never been used for download before.
707             uri = self._mutable_node_1.get_uri()
708             newnode1 = self.clients[2].create_node_from_uri(uri)
709             newnode2 = self.clients[3].create_node_from_uri(uri)
710             self._newnode3 = self.clients[3].create_node_from_uri(uri)
711             log.msg("starting replace2")
712             d1 = newnode1.replace(NEWERDATA)
713             d1.addCallback(lambda res: newnode2.download_to_data())
714             return d1
715         d.addCallback(_check_download_4)
716
717         def _check_download_5(res):
718             log.msg("finished replace2")
719             self.failUnlessEqual(res, NEWERDATA)
720         d.addCallback(_check_download_5)
721
722         def _corrupt_shares(res):
723             # run around and flip bits in all but k of the shares, to test
724             # the hash checks
725             shares = self._find_shares(self.basedir)
726             ## sort by share number
727             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
728             where = dict([ (shnum, filename)
729                            for (client_num, storage_index, filename, shnum)
730                            in shares ])
731             assert len(where) == 10 # this test is designed for 3-of-10
732             for shnum, filename in where.items():
733                 # shares 7,8,9 are left alone. read will check
734                 # (share_hash_chain, block_hash_tree, share_data). New
735                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
736                 # segsize, signature).
737                 if shnum == 0:
738                     # read: this will trigger "pubkey doesn't match
739                     # fingerprint".
740                     self._corrupt_mutable_share(filename, "pubkey")
741                     self._corrupt_mutable_share(filename, "encprivkey")
742                 elif shnum == 1:
743                     # triggers "signature is invalid"
744                     self._corrupt_mutable_share(filename, "seqnum")
745                 elif shnum == 2:
746                     # triggers "signature is invalid"
747                     self._corrupt_mutable_share(filename, "R")
748                 elif shnum == 3:
749                     # triggers "signature is invalid"
750                     self._corrupt_mutable_share(filename, "segsize")
751                 elif shnum == 4:
752                     self._corrupt_mutable_share(filename, "share_hash_chain")
753                 elif shnum == 5:
754                     self._corrupt_mutable_share(filename, "block_hash_tree")
755                 elif shnum == 6:
756                     self._corrupt_mutable_share(filename, "share_data")
757                 # other things to correct: IV, signature
758                 # 7,8,9 are left alone
759
760                 # note that initial_query_count=5 means that we'll hit the
761                 # first 5 servers in effectively random order (based upon
762                 # response time), so we won't necessarily ever get a "pubkey
763                 # doesn't match fingerprint" error (if we hit shnum>=1 before
764                 # shnum=0, we pull the pubkey from there). To get repeatable
765                 # specific failures, we need to set initial_query_count=1,
766                 # but of course that will change the sequencing behavior of
767                 # the retrieval process. TODO: find a reasonable way to make
768                 # this a parameter, probably when we expand this test to test
769                 # for one failure mode at a time.
770
771                 # when we retrieve this, we should get three signature
772                 # failures (where we've mangled seqnum, R, and segsize). The
773                 # pubkey mangling
774         d.addCallback(_corrupt_shares)
775
776         d.addCallback(lambda res: self._newnode3.download_to_data())
777         d.addCallback(_check_download_5)
778
779         def _check_empty_file(res):
780             # make sure we can create empty files, this usually screws up the
781             # segsize math
782             d1 = self.clients[2].create_mutable_file("")
783             d1.addCallback(lambda newnode: newnode.download_to_data())
784             d1.addCallback(lambda res: self.failUnlessEqual("", res))
785             return d1
786         d.addCallback(_check_empty_file)
787
788         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
789         def _created_dirnode(dnode):
790             log.msg("_created_dirnode(%s)" % (dnode,))
791             d1 = dnode.list()
792             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
793             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
794             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
795             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
796             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
797             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
798             d1.addCallback(lambda res: dnode.build_manifest())
799             d1.addCallback(lambda manifest:
800                            self.failUnlessEqual(len(manifest), 1))
801             return d1
802         d.addCallback(_created_dirnode)
803
804         return d
805     # The default 120 second timeout went off when running it under valgrind
806     # on my old Windows laptop, so I'm bumping up the timeout.
807     test_mutable.timeout = 240
808
809     def flip_bit(self, good):
810         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
811
812     def mangle_uri(self, gooduri):
813         # change the key, which changes the storage index, which means we'll
814         # be asking about the wrong file, so nobody will have any shares
815         u = IFileURI(gooduri)
816         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
817                             uri_extension_hash=u.uri_extension_hash,
818                             needed_shares=u.needed_shares,
819                             total_shares=u.total_shares,
820                             size=u.size)
821         return u2.to_string()
822
823     # TODO: add a test which mangles the uri_extension_hash instead, and
824     # should fail due to not being able to get a valid uri_extension block.
825     # Also a test which sneakily mangles the uri_extension block to change
826     # some of the validation data, so it will fail in the post-download phase
827     # when the file's crypttext integrity check fails. Do the same thing for
828     # the key, which should cause the download to fail the post-download
829     # plaintext_hash check.
830
831     def test_vdrive(self):
832         self.basedir = "system/SystemTest/test_vdrive"
833         self.data = LARGE_DATA
834         d = self.set_up_nodes(createprivdir=True)
835         d.addCallback(self._test_introweb)
836         d.addCallback(self.log, "starting publish")
837         d.addCallback(self._do_publish1)
838         d.addCallback(self._test_runner)
839         d.addCallback(self._do_publish2)
840         # at this point, we have the following filesystem (where "R" denotes
841         # self._root_directory_uri):
842         # R
843         # R/subdir1
844         # R/subdir1/mydata567
845         # R/subdir1/subdir2/
846         # R/subdir1/subdir2/mydata992
847
848         d.addCallback(lambda res: self.bounce_client(0))
849         d.addCallback(self.log, "bounced client0")
850
851         d.addCallback(self._check_publish1)
852         d.addCallback(self.log, "did _check_publish1")
853         d.addCallback(self._check_publish2)
854         d.addCallback(self.log, "did _check_publish2")
855         d.addCallback(self._do_publish_private)
856         d.addCallback(self.log, "did _do_publish_private")
857         # now we also have (where "P" denotes a new dir):
858         #  P/personal/sekrit data
859         #  P/s2-rw -> /subdir1/subdir2/
860         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
861         d.addCallback(self._check_publish_private)
862         d.addCallback(self.log, "did _check_publish_private")
863         d.addCallback(self._test_web)
864         d.addCallback(self._test_control)
865         d.addCallback(self._test_cli)
866         # P now has four top-level children:
867         # P/personal/sekrit data
868         # P/s2-ro/
869         # P/s2-rw/
870         # P/test_put/  (empty)
871         d.addCallback(self._test_checker)
872         d.addCallback(self._test_verifier)
873         d.addCallback(self._grab_stats)
874         return d
875     test_vdrive.timeout = 1100
876
877     def _test_introweb(self, res):
878         d = getPage(self.introweb_url, method="GET", followRedirect=True)
879         def _check(res):
880             try:
881                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
882                                 in res)
883                 self.failUnless("Clients:" in res)
884             except unittest.FailTest:
885                 print
886                 print "GET %s output was:" % self.introweb_url
887                 print res
888                 raise
889         d.addCallback(_check)
890         return d
891
892     def _do_publish1(self, res):
893         ut = upload.Data(self.data)
894         c0 = self.clients[0]
895         d = c0.create_empty_dirnode()
896         def _made_root(new_dirnode):
897             self._root_directory_uri = new_dirnode.get_uri()
898             return c0.create_node_from_uri(self._root_directory_uri)
899         d.addCallback(_made_root)
900         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
901         def _made_subdir1(subdir1_node):
902             self._subdir1_node = subdir1_node
903             d1 = subdir1_node.add_file(u"mydata567", ut)
904             d1.addCallback(self.log, "publish finished")
905             def _stash_uri(filenode):
906                 self.uri = filenode.get_uri()
907             d1.addCallback(_stash_uri)
908             return d1
909         d.addCallback(_made_subdir1)
910         return d
911
912     def _do_publish2(self, res):
913         ut = upload.Data(self.data)
914         d = self._subdir1_node.create_empty_directory(u"subdir2")
915         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
916         return d
917
918     def log(self, res, msg, **kwargs):
919         # print "MSG: %s  RES: %s" % (msg, res)
920         log.msg(msg, **kwargs)
921         return res
922
923     def stall(self, res, delay=1.0):
924         d = defer.Deferred()
925         reactor.callLater(delay, d.callback, res)
926         return d
927
928     def _do_publish_private(self, res):
929         self.smalldata = "sssh, very secret stuff"
930         ut = upload.Data(self.smalldata)
931         d = self.clients[0].create_empty_dirnode()
932         d.addCallback(self.log, "GOT private directory")
933         def _got_new_dir(privnode):
934             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
935             d1 = privnode.create_empty_directory(u"personal")
936             d1.addCallback(self.log, "made P/personal")
937             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
938             d1.addCallback(self.log, "made P/personal/sekrit data")
939             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
940             def _got_s2(s2node):
941                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
942                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
943                 return d2
944             d1.addCallback(_got_s2)
945             d1.addCallback(lambda res: privnode)
946             return d1
947         d.addCallback(_got_new_dir)
948         return d
949
950     def _check_publish1(self, res):
951         # this one uses the iterative API
952         c1 = self.clients[1]
953         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
954         d.addCallback(self.log, "check_publish1 got /")
955         d.addCallback(lambda root: root.get(u"subdir1"))
956         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
957         d.addCallback(lambda filenode: filenode.download_to_data())
958         d.addCallback(self.log, "get finished")
959         def _get_done(data):
960             self.failUnlessEqual(data, self.data)
961         d.addCallback(_get_done)
962         return d
963
964     def _check_publish2(self, res):
965         # this one uses the path-based API
966         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
967         d = rootnode.get_child_at_path(u"subdir1")
968         d.addCallback(lambda dirnode:
969                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
970         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
971         d.addCallback(lambda filenode: filenode.download_to_data())
972         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
973
974         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
975         def _got_filenode(filenode):
976             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
977             assert fnode == filenode
978         d.addCallback(_got_filenode)
979         return d
980
981     def _check_publish_private(self, resnode):
982         # this one uses the path-based API
983         self._private_node = resnode
984
985         d = self._private_node.get_child_at_path(u"personal")
986         def _got_personal(personal):
987             self._personal_node = personal
988             return personal
989         d.addCallback(_got_personal)
990
991         d.addCallback(lambda dirnode:
992                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
993         def get_path(path):
994             return self._private_node.get_child_at_path(path)
995
996         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
997         d.addCallback(lambda filenode: filenode.download_to_data())
998         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
999         d.addCallback(lambda res: get_path(u"s2-rw"))
1000         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
1001         d.addCallback(lambda res: get_path(u"s2-ro"))
1002         def _got_s2ro(dirnode):
1003             self.failUnless(dirnode.is_mutable(), dirnode)
1004             self.failUnless(dirnode.is_readonly(), dirnode)
1005             d1 = defer.succeed(None)
1006             d1.addCallback(lambda res: dirnode.list())
1007             d1.addCallback(self.log, "dirnode.list")
1008
1009             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
1010
1011             d1.addCallback(self.log, "doing add_file(ro)")
1012             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.")
1013             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
1014
1015             d1.addCallback(self.log, "doing get(ro)")
1016             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
1017             d1.addCallback(lambda filenode:
1018                            self.failUnless(IFileNode.providedBy(filenode)))
1019
1020             d1.addCallback(self.log, "doing delete(ro)")
1021             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
1022
1023             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
1024
1025             d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
1026
1027             personal = self._personal_node
1028             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
1029
1030             d1.addCallback(self.log, "doing move_child_to(ro)2")
1031             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
1032
1033             d1.addCallback(self.log, "finished with _got_s2ro")
1034             return d1
1035         d.addCallback(_got_s2ro)
1036         def _got_home(dummy):
1037             home = self._private_node
1038             personal = self._personal_node
1039             d1 = defer.succeed(None)
1040             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
1041             d1.addCallback(lambda res:
1042                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
1043
1044             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
1045             d1.addCallback(lambda res:
1046                            home.move_child_to(u"sekrit", home, u"sekrit data"))
1047
1048             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
1049             d1.addCallback(lambda res:
1050                            home.move_child_to(u"sekrit data", personal))
1051
1052             d1.addCallback(lambda res: home.build_manifest())
1053             d1.addCallback(self.log, "manifest")
1054             #  four items:
1055             # P/personal/
1056             # P/personal/sekrit data
1057             # P/s2-rw  (same as P/s2-ro)
1058             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
1059             d1.addCallback(lambda manifest:
1060                            self.failUnlessEqual(len(manifest), 4))
1061             return d1
1062         d.addCallback(_got_home)
1063         return d
1064
1065     def shouldFail(self, res, expected_failure, which, substring=None):
1066         if isinstance(res, Failure):
1067             res.trap(expected_failure)
1068             if substring:
1069                 self.failUnless(substring in str(res),
1070                                 "substring '%s' not in '%s'"
1071                                 % (substring, str(res)))
1072         else:
1073             self.fail("%s was supposed to raise %s, not get '%s'" %
1074                       (which, expected_failure, res))
1075
1076     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1077         assert substring is None or isinstance(substring, str)
1078         d = defer.maybeDeferred(callable, *args, **kwargs)
1079         def done(res):
1080             if isinstance(res, Failure):
1081                 res.trap(expected_failure)
1082                 if substring:
1083                     self.failUnless(substring in str(res),
1084                                     "substring '%s' not in '%s'"
1085                                     % (substring, str(res)))
1086             else:
1087                 self.fail("%s was supposed to raise %s, not get '%s'" %
1088                           (which, expected_failure, res))
1089         d.addBoth(done)
1090         return d
1091
1092     def PUT(self, urlpath, data):
1093         url = self.webish_url + urlpath
1094         return getPage(url, method="PUT", postdata=data)
1095
1096     def GET(self, urlpath, followRedirect=False):
1097         url = self.webish_url + urlpath
1098         return getPage(url, method="GET", followRedirect=followRedirect)
1099
1100     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
1101         if use_helper:
1102             url = self.helper_webish_url + urlpath
1103         else:
1104             url = self.webish_url + urlpath
1105         sepbase = "boogabooga"
1106         sep = "--" + sepbase
1107         form = []
1108         form.append(sep)
1109         form.append('Content-Disposition: form-data; name="_charset"')
1110         form.append('')
1111         form.append('UTF-8')
1112         form.append(sep)
1113         for name, value in fields.iteritems():
1114             if isinstance(value, tuple):
1115                 filename, value = value
1116                 form.append('Content-Disposition: form-data; name="%s"; '
1117                             'filename="%s"' % (name, filename.encode("utf-8")))
1118             else:
1119                 form.append('Content-Disposition: form-data; name="%s"' % name)
1120             form.append('')
1121             form.append(str(value))
1122             form.append(sep)
1123         form[-1] += "--"
1124         body = "\r\n".join(form) + "\r\n"
1125         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1126                    }
1127         return getPage(url, method="POST", postdata=body,
1128                        headers=headers, followRedirect=followRedirect)
1129
1130     def _test_web(self, res):
1131         base = self.webish_url
1132         public = "uri/" + self._root_directory_uri
1133         d = getPage(base)
1134         def _got_welcome(page):
1135             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1136             self.failUnless(expected in page,
1137                             "I didn't see the right 'connected storage servers'"
1138                             " message in: %s" % page
1139                             )
1140             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1141             self.failUnless(expected in page,
1142                             "I didn't see the right 'My nodeid' message "
1143                             "in: %s" % page)
1144         d.addCallback(_got_welcome)
1145         d.addCallback(self.log, "done with _got_welcome")
1146
1147         # get the welcome page from the node that uses the helper too
1148         d.addCallback(lambda res: getPage(self.helper_webish_url))
1149         def _got_welcome_helper(page):
1150             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1151                             page)
1152         d.addCallback(_got_welcome_helper)
1153
1154         d.addCallback(lambda res: getPage(base + public))
1155         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1156         def _got_subdir1(page):
1157             # there ought to be an href for our file
1158             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1159             self.failUnless(">mydata567</a>" in page)
1160         d.addCallback(_got_subdir1)
1161         d.addCallback(self.log, "done with _got_subdir1")
1162         d.addCallback(lambda res:
1163                       getPage(base + public + "/subdir1/mydata567"))
1164         def _got_data(page):
1165             self.failUnlessEqual(page, self.data)
1166         d.addCallback(_got_data)
1167
1168         # download from a URI embedded in a URL
1169         d.addCallback(self.log, "_get_from_uri")
1170         def _get_from_uri(res):
1171             return getPage(base + "uri/%s?filename=%s"
1172                            % (self.uri, "mydata567"))
1173         d.addCallback(_get_from_uri)
1174         def _got_from_uri(page):
1175             self.failUnlessEqual(page, self.data)
1176         d.addCallback(_got_from_uri)
1177
1178         # download from a URI embedded in a URL, second form
1179         d.addCallback(self.log, "_get_from_uri2")
1180         def _get_from_uri2(res):
1181             return getPage(base + "uri?uri=%s" % (self.uri,))
1182         d.addCallback(_get_from_uri2)
1183         d.addCallback(_got_from_uri)
1184
1185         # download from a bogus URI, make sure we get a reasonable error
1186         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1187         def _get_from_bogus_uri(res):
1188             d1 = getPage(base + "uri/%s?filename=%s"
1189                          % (self.mangle_uri(self.uri), "mydata567"))
1190             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1191                        "410")
1192             return d1
1193         d.addCallback(_get_from_bogus_uri)
1194         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1195
1196         # upload a file with PUT
1197         d.addCallback(self.log, "about to try PUT")
1198         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1199                                            "new.txt contents"))
1200         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1201         d.addCallback(self.failUnlessEqual, "new.txt contents")
1202         # and again with something large enough to use multiple segments,
1203         # and hopefully trigger pauseProducing too
1204         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1205                                            "big" * 500000)) # 1.5MB
1206         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1207         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1208
1209         # can we replace files in place?
1210         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1211                                            "NEWER contents"))
1212         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1213         d.addCallback(self.failUnlessEqual, "NEWER contents")
1214
1215         # test unlinked POST
1216         d.addCallback(lambda res: self.POST("uri", t="upload",
1217                                             file=("new.txt", "data" * 10000)))
1218         # and again using the helper, which exercises different upload-status
1219         # display code
1220         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1221                                             file=("foo.txt", "data2" * 10000)))
1222
1223         # check that the status page exists
1224         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1225         def _got_status(res):
1226             # find an interesting upload and download to look at. LIT files
1227             # are not interesting.
1228             for dl in self.clients[0].list_recent_downloads():
1229                 if dl.get_size() > 200:
1230                     self._down_status = dl.get_counter()
1231             for ul in self.clients[0].list_recent_uploads():
1232                 if ul.get_size() > 200:
1233                     self._up_status = ul.get_counter()
1234             rs = self.clients[0].list_recent_retrieve()[0]
1235             self._retrieve_status = rs.get_counter()
1236             ps = self.clients[0].list_recent_publish()[0]
1237             self._publish_status = ps.get_counter()
1238
1239             # and that there are some upload- and download- status pages
1240             return self.GET("status/up-%d" % self._up_status)
1241         d.addCallback(_got_status)
1242         def _got_up(res):
1243             return self.GET("status/down-%d" % self._down_status)
1244         d.addCallback(_got_up)
1245         def _got_down(res):
1246             return self.GET("status/publish-%d" % self._publish_status)
1247         d.addCallback(_got_down)
1248         def _got_publish(res):
1249             return self.GET("status/retrieve-%d" % self._retrieve_status)
1250         d.addCallback(_got_publish)
1251
1252         # TODO: mangle the second segment of a file, to test errors that
1253         # occur after we've already sent some good data, which uses a
1254         # different error path.
1255
1256         # TODO: download a URI with a form
1257         # TODO: create a directory by using a form
1258         # TODO: upload by using a form on the directory page
1259         #    url = base + "somedir/subdir1/freeform_post!!upload"
1260         # TODO: delete a file by using a button on the directory page
1261
1262         return d
1263
1264     def _test_runner(self, res):
1265         # exercise some of the diagnostic tools in runner.py
1266
1267         # find a share
1268         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1269             if "storage" not in dirpath:
1270                 continue
1271             if not filenames:
1272                 continue
1273             pieces = dirpath.split(os.sep)
1274             if pieces[-4] == "storage" and pieces[-3] == "shares":
1275                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1276                 # are sharefiles here
1277                 filename = os.path.join(dirpath, filenames[0])
1278                 # peek at the magic to see if it is a chk share
1279                 magic = open(filename, "rb").read(4)
1280                 if magic == '\x00\x00\x00\x01':
1281                     break
1282         else:
1283             self.fail("unable to find any uri_extension files in %s"
1284                       % self.basedir)
1285         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1286
1287         out,err = StringIO(), StringIO()
1288         rc = runner.runner(["dump-share",
1289                             filename],
1290                            stdout=out, stderr=err)
1291         output = out.getvalue()
1292         self.failUnlessEqual(rc, 0)
1293
1294         # we only upload a single file, so we can assert some things about
1295         # its size and shares.
1296         self.failUnless(("share filename: %s" % filename) in output)
1297         self.failUnless("size: %d\n" % len(self.data) in output)
1298         self.failUnless("num_segments: 1\n" in output)
1299         # segment_size is always a multiple of needed_shares
1300         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1301         self.failUnless("total_shares: 10\n" in output)
1302         # keys which are supposed to be present
1303         for key in ("size", "num_segments", "segment_size",
1304                     "needed_shares", "total_shares",
1305                     "codec_name", "codec_params", "tail_codec_params",
1306                     "plaintext_hash", "plaintext_root_hash",
1307                     "crypttext_hash", "crypttext_root_hash",
1308                     "share_root_hash", "UEB_hash"):
1309             self.failUnless("%s: " % key in output, key)
1310
1311         # now use its storage index to find the other shares using the
1312         # 'find-shares' tool
1313         sharedir, shnum = os.path.split(filename)
1314         storagedir, storage_index_s = os.path.split(sharedir)
1315         out,err = StringIO(), StringIO()
1316         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1317         cmd = ["find-shares", storage_index_s] + nodedirs
1318         rc = runner.runner(cmd, stdout=out, stderr=err)
1319         self.failUnlessEqual(rc, 0)
1320         out.seek(0)
1321         sharefiles = [sfn.strip() for sfn in out.readlines()]
1322         self.failUnlessEqual(len(sharefiles), 10)
1323
1324         # also exercise the 'catalog-shares' tool
1325         out,err = StringIO(), StringIO()
1326         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1327         cmd = ["catalog-shares"] + nodedirs
1328         rc = runner.runner(cmd, stdout=out, stderr=err)
1329         self.failUnlessEqual(rc, 0)
1330         out.seek(0)
1331         descriptions = [sfn.strip() for sfn in out.readlines()]
1332         self.failUnlessEqual(len(descriptions), 30)
1333         matching = [line
1334                     for line in descriptions
1335                     if line.startswith("CHK %s " % storage_index_s)]
1336         self.failUnlessEqual(len(matching), 10)
1337
1338     def _test_control(self, res):
1339         # exercise the remote-control-the-client foolscap interfaces in
1340         # allmydata.control (mostly used for performance tests)
1341         c0 = self.clients[0]
1342         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1343         control_furl = open(control_furl_file, "r").read().strip()
1344         # it doesn't really matter which Tub we use to connect to the client,
1345         # so let's just use our IntroducerNode's
1346         d = self.introducer.tub.getReference(control_furl)
1347         d.addCallback(self._test_control2, control_furl_file)
1348         return d
1349     def _test_control2(self, rref, filename):
1350         d = rref.callRemote("upload_from_file_to_uri", filename)
1351         downfile = os.path.join(self.basedir, "control.downfile")
1352         d.addCallback(lambda uri:
1353                       rref.callRemote("download_from_uri_to_file",
1354                                       uri, downfile))
1355         def _check(res):
1356             self.failUnlessEqual(res, downfile)
1357             data = open(downfile, "r").read()
1358             expected_data = open(filename, "r").read()
1359             self.failUnlessEqual(data, expected_data)
1360         d.addCallback(_check)
1361         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1362         if sys.platform == "linux2":
1363             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1364         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1365         return d
1366
1367     def _test_cli(self, res):
1368         # run various CLI commands (in a thread, since they use blocking
1369         # network calls)
1370
1371         private_uri = self._private_node.get_uri()
1372         some_uri = self._root_directory_uri
1373         client0_basedir = self.getdir("client0")
1374
1375         nodeargs = [
1376             "--node-directory", client0_basedir,
1377             "--dir-cap", private_uri,
1378             ]
1379         public_nodeargs = [
1380             "--node-url", self.webish_url,
1381             "--dir-cap", some_uri,
1382             ]
1383         TESTDATA = "I will not write the same thing over and over.\n" * 100
1384
1385         d = defer.succeed(None)
1386
1387         def _ls_root(res):
1388             argv = ["ls"] + nodeargs
1389             return self._run_cli(argv)
1390         d.addCallback(_ls_root)
1391         def _check_ls_root((out,err)):
1392             self.failUnless("personal" in out)
1393             self.failUnless("s2-ro" in out)
1394             self.failUnless("s2-rw" in out)
1395             self.failUnlessEqual(err, "")
1396         d.addCallback(_check_ls_root)
1397
1398         def _ls_subdir(res):
1399             argv = ["ls"] + nodeargs + ["personal"]
1400             return self._run_cli(argv)
1401         d.addCallback(_ls_subdir)
1402         def _check_ls_subdir((out,err)):
1403             self.failUnless("sekrit data" in out)
1404             self.failUnlessEqual(err, "")
1405         d.addCallback(_check_ls_subdir)
1406
1407         def _ls_public_subdir(res):
1408             argv = ["ls"] + public_nodeargs + ["subdir1"]
1409             return self._run_cli(argv)
1410         d.addCallback(_ls_public_subdir)
1411         def _check_ls_public_subdir((out,err)):
1412             self.failUnless("subdir2" in out)
1413             self.failUnless("mydata567" in out)
1414             self.failUnlessEqual(err, "")
1415         d.addCallback(_check_ls_public_subdir)
1416
1417         def _ls_file(res):
1418             argv = ["ls"] + public_nodeargs + ["subdir1/mydata567"]
1419             return self._run_cli(argv)
1420         d.addCallback(_ls_file)
1421         def _check_ls_file((out,err)):
1422             self.failUnlessEqual(out.strip(), "112 subdir1/mydata567")
1423             self.failUnlessEqual(err, "")
1424         d.addCallback(_check_ls_file)
1425
1426         # tahoe_ls doesn't currently handle the error correctly: it tries to
1427         # JSON-parse a traceback.
1428 ##         def _ls_missing(res):
1429 ##             argv = ["ls"] + nodeargs + ["bogus"]
1430 ##             return self._run_cli(argv)
1431 ##         d.addCallback(_ls_missing)
1432 ##         def _check_ls_missing((out,err)):
1433 ##             print "OUT", out
1434 ##             print "ERR", err
1435 ##             self.failUnlessEqual(err, "")
1436 ##         d.addCallback(_check_ls_missing)
1437
1438         def _put(res):
1439             tdir = self.getdir("cli_put")
1440             fileutil.make_dirs(tdir)
1441             fn = os.path.join(tdir, "upload_me")
1442             f = open(fn, "wb")
1443             f.write(TESTDATA)
1444             f.close()
1445             argv = ["put"] + nodeargs + [fn, "test_put/upload.txt"]
1446             return self._run_cli(argv)
1447         d.addCallback(_put)
1448         def _check_put((out,err)):
1449             self.failUnless("200 OK" in out)
1450             self.failUnlessEqual(err, "")
1451             d = self._private_node.get_child_at_path(u"test_put/upload.txt")
1452             d.addCallback(lambda filenode: filenode.download_to_data())
1453             def _check_put2(res):
1454                 self.failUnlessEqual(res, TESTDATA)
1455             d.addCallback(_check_put2)
1456             return d
1457         d.addCallback(_check_put)
1458
1459         def _get_to_stdout(res):
1460             argv = ["get"] + nodeargs + ["test_put/upload.txt"]
1461             return self._run_cli(argv)
1462         d.addCallback(_get_to_stdout)
1463         def _check_get_to_stdout((out,err)):
1464             self.failUnlessEqual(out, TESTDATA)
1465             self.failUnlessEqual(err, "")
1466         d.addCallback(_check_get_to_stdout)
1467
1468         get_to_file_target = self.basedir + "/get.downfile"
1469         def _get_to_file(res):
1470             argv = ["get"] + nodeargs + ["test_put/upload.txt",
1471                                          get_to_file_target]
1472             return self._run_cli(argv)
1473         d.addCallback(_get_to_file)
1474         def _check_get_to_file((out,err)):
1475             data = open(get_to_file_target, "rb").read()
1476             self.failUnlessEqual(data, TESTDATA)
1477             self.failUnlessEqual(out, "")
1478             self.failUnlessEqual(err, "test_put/upload.txt retrieved and written to system/SystemTest/test_vdrive/get.downfile\n")
1479         d.addCallback(_check_get_to_file)
1480
1481
1482         def _mv(res):
1483             argv = ["mv"] + nodeargs + ["test_put/upload.txt",
1484                                         "test_put/moved.txt"]
1485             return self._run_cli(argv)
1486         d.addCallback(_mv)
1487         def _check_mv((out,err)):
1488             self.failUnless("OK" in out)
1489             self.failUnlessEqual(err, "")
1490             d = self.shouldFail2(KeyError, "test_cli._check_rm", "'upload.txt'", self._private_node.get_child_at_path, u"test_put/upload.txt")
1491
1492             d.addCallback(lambda res:
1493                           self._private_node.get_child_at_path(u"test_put/moved.txt"))
1494             d.addCallback(lambda filenode: filenode.download_to_data())
1495             def _check_mv2(res):
1496                 self.failUnlessEqual(res, TESTDATA)
1497             d.addCallback(_check_mv2)
1498             return d
1499         d.addCallback(_check_mv)
1500
1501         def _rm(res):
1502             argv = ["rm"] + nodeargs + ["test_put/moved.txt"]
1503             return self._run_cli(argv)
1504         d.addCallback(_rm)
1505         def _check_rm((out,err)):
1506             self.failUnless("200 OK" in out)
1507             self.failUnlessEqual(err, "")
1508             d = self.shouldFail2(KeyError, "test_cli._check_rm", "'moved.txt'", self._private_node.get_child_at_path, u"test_put/moved.txt")
1509             return d
1510         d.addCallback(_check_rm)
1511         return d
1512
1513     def _run_cli(self, argv):
1514         stdout, stderr = StringIO(), StringIO()
1515         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1516                                   stdout=stdout, stderr=stderr)
1517         def _done(res):
1518             return stdout.getvalue(), stderr.getvalue()
1519         d.addCallback(_done)
1520         return d
1521
1522     def _test_checker(self, res):
1523         d = self._private_node.build_manifest()
1524         d.addCallback(self._test_checker_2)
1525         return d
1526
1527     def _test_checker_2(self, manifest):
1528         checker1 = self.clients[1].getServiceNamed("checker")
1529         self.failUnlessEqual(checker1.checker_results_for(None), [])
1530         self.failUnlessEqual(checker1.checker_results_for(list(manifest)[0]),
1531                              [])
1532         dl = []
1533         starting_time = time.time()
1534         for si in manifest:
1535             dl.append(checker1.check(si))
1536         d = deferredutil.DeferredListShouldSucceed(dl)
1537
1538         def _check_checker_results(res):
1539             for i in res:
1540                 if type(i) is bool:
1541                     self.failUnless(i is True)
1542                 else:
1543                     (needed, total, found, sharemap) = i
1544                     self.failUnlessEqual(needed, 3)
1545                     self.failUnlessEqual(total, 10)
1546                     self.failUnlessEqual(found, total)
1547                     self.failUnlessEqual(len(sharemap.keys()), 10)
1548                     peers = set()
1549                     for shpeers in sharemap.values():
1550                         peers.update(shpeers)
1551                     self.failUnlessEqual(len(peers), self.numclients)
1552         d.addCallback(_check_checker_results)
1553
1554         def _check_stored_results(res):
1555             finish_time = time.time()
1556             all_results = []
1557             for si in manifest:
1558                 results = checker1.checker_results_for(si)
1559                 if not results:
1560                     # TODO: implement checker for mutable files and implement tests of that checker
1561                     continue
1562                 self.failUnlessEqual(len(results), 1)
1563                 when, those_results = results[0]
1564                 self.failUnless(isinstance(when, (int, float)))
1565                 self.failUnless(starting_time <= when <= finish_time)
1566                 all_results.append(those_results)
1567             _check_checker_results(all_results)
1568         d.addCallback(_check_stored_results)
1569
1570         d.addCallback(self._test_checker_3)
1571         return d
1572
1573     def _test_checker_3(self, res):
1574         # check one file, through FileNode.check()
1575         d = self._private_node.get_child_at_path(u"personal/sekrit data")
1576         d.addCallback(lambda n: n.check())
1577         def _checked(results):
1578             # 'sekrit data' is small, and fits in a LiteralFileNode, so
1579             # checking it is trivial and always returns True
1580             self.failUnlessEqual(results, True)
1581         d.addCallback(_checked)
1582
1583         c0 = self.clients[1]
1584         n = c0.create_node_from_uri(self._root_directory_uri)
1585         d.addCallback(lambda res: n.get_child_at_path(u"subdir1/mydata567"))
1586         d.addCallback(lambda n: n.check())
1587         def _checked2(results):
1588             # mydata567 is large and lives in a CHK
1589             (needed, total, found, sharemap) = results
1590             self.failUnlessEqual(needed, 3)
1591             self.failUnlessEqual(total, 10)
1592             self.failUnlessEqual(found, 10)
1593             self.failUnlessEqual(len(sharemap), 10)
1594             for shnum in range(10):
1595                 self.failUnlessEqual(len(sharemap[shnum]), 1)
1596         d.addCallback(_checked2)
1597         return d
1598
1599
1600     def _test_verifier(self, res):
1601         checker1 = self.clients[1].getServiceNamed("checker")
1602         d = self._private_node.build_manifest()
1603         def _check_all(manifest):
1604             dl = []
1605             for si in manifest:
1606                 dl.append(checker1.verify(si))
1607             return deferredutil.DeferredListShouldSucceed(dl)
1608         d.addCallback(_check_all)
1609         def _done(res):
1610             for i in res:
1611                 self.failUnless(i is True)
1612         d.addCallback(_done)
1613         d.addCallback(lambda res: checker1.verify(None))
1614         d.addCallback(self.failUnlessEqual, True)
1615         return d