]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
CLI: simplify argument-passing, use options= for everthing, including stdout
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, random, struct, sys, time, re, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 from twisted.internet.error import ConnectionDone, ConnectionLost
8 import allmydata
9 from allmydata import uri, storage, offloaded
10 from allmydata.immutable import download, upload, filenode
11 from allmydata.util import idlib, mathutil, testutil
12 from allmydata.util import log, base32
13 from allmydata.scripts import runner, cli
14 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
15 from allmydata.mutable.common import NotMutableError
16 from allmydata.mutable import layout as mutable_layout
17 from foolscap import DeadReferenceError
18 from twisted.python.failure import Failure
19 from twisted.web.client import getPage
20 from twisted.web.error import Error
21
22 from allmydata.test.common import SystemTestMixin
23
24 LARGE_DATA = """
25 This is some data to publish to the virtual drive, which needs to be large
26 enough to not fit inside a LIT uri.
27 """
28
29 class CountingDataUploadable(upload.Data):
30     bytes_read = 0
31     interrupt_after = None
32     interrupt_after_d = None
33
34     def read(self, length):
35         self.bytes_read += length
36         if self.interrupt_after is not None:
37             if self.bytes_read > self.interrupt_after:
38                 self.interrupt_after = None
39                 self.interrupt_after_d.callback(self)
40         return upload.Data.read(self, length)
41
42
43 class SystemTest(SystemTestMixin, unittest.TestCase):
44
45     def test_connections(self):
46         self.basedir = "system/SystemTest/test_connections"
47         d = self.set_up_nodes()
48         self.extra_node = None
49         d.addCallback(lambda res: self.add_extra_node(self.numclients))
50         def _check(extra_node):
51             self.extra_node = extra_node
52             for c in self.clients:
53                 all_peerids = list(c.get_all_peerids())
54                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
55                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
56                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
57
58         d.addCallback(_check)
59         def _shutdown_extra_node(res):
60             if self.extra_node:
61                 return self.extra_node.stopService()
62             return res
63         d.addBoth(_shutdown_extra_node)
64         return d
65     test_connections.timeout = 300
66     # test_connections is subsumed by test_upload_and_download, and takes
67     # quite a while to run on a slow machine (because of all the TLS
68     # connections that must be established). If we ever rework the introducer
69     # code to such an extent that we're not sure if it works anymore, we can
70     # reinstate this test until it does.
71     del test_connections
72
73     def test_upload_and_download_random_key(self):
74         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
75         return self._test_upload_and_download(convergence=None)
76     test_upload_and_download_random_key.timeout = 4800
77
78     def test_upload_and_download_convergent(self):
79         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
80         return self._test_upload_and_download(convergence="some convergence string")
81     test_upload_and_download_convergent.timeout = 4800
82
83     def _test_upload_and_download(self, convergence):
84         # we use 4000 bytes of data, which will result in about 400k written
85         # to disk among all our simulated nodes
86         DATA = "Some data to upload\n" * 200
87         d = self.set_up_nodes()
88         def _check_connections(res):
89             for c in self.clients:
90                 all_peerids = list(c.get_all_peerids())
91                 self.failUnlessEqual(len(all_peerids), self.numclients)
92                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
93                 self.failUnlessEqual(len(permuted_peers), self.numclients)
94         d.addCallback(_check_connections)
95
96         def _do_upload(res):
97             log.msg("UPLOADING")
98             u = self.clients[0].getServiceNamed("uploader")
99             self.uploader = u
100             # we crank the max segsize down to 1024b for the duration of this
101             # test, so we can exercise multiple segments. It is important
102             # that this is not a multiple of the segment size, so that the
103             # tail segment is not the same length as the others. This actualy
104             # gets rounded up to 1025 to be a multiple of the number of
105             # required shares (since we use 25 out of 100 FEC).
106             up = upload.Data(DATA, convergence=convergence)
107             up.max_segment_size = 1024
108             d1 = u.upload(up)
109             return d1
110         d.addCallback(_do_upload)
111         def _upload_done(results):
112             uri = results.uri
113             log.msg("upload finished: uri is %s" % (uri,))
114             self.uri = uri
115             dl = self.clients[1].getServiceNamed("downloader")
116             self.downloader = dl
117         d.addCallback(_upload_done)
118
119         def _upload_again(res):
120             # Upload again. If using convergent encryption then this ought to be
121             # short-circuited, however with the way we currently generate URIs
122             # (i.e. because they include the roothash), we have to do all of the
123             # encoding work, and only get to save on the upload part.
124             log.msg("UPLOADING AGAIN")
125             up = upload.Data(DATA, convergence=convergence)
126             up.max_segment_size = 1024
127             d1 = self.uploader.upload(up)
128         d.addCallback(_upload_again)
129
130         def _download_to_data(res):
131             log.msg("DOWNLOADING")
132             return self.downloader.download_to_data(self.uri)
133         d.addCallback(_download_to_data)
134         def _download_to_data_done(data):
135             log.msg("download finished")
136             self.failUnlessEqual(data, DATA)
137         d.addCallback(_download_to_data_done)
138
139         target_filename = os.path.join(self.basedir, "download.target")
140         def _download_to_filename(res):
141             return self.downloader.download_to_filename(self.uri,
142                                                         target_filename)
143         d.addCallback(_download_to_filename)
144         def _download_to_filename_done(res):
145             newdata = open(target_filename, "rb").read()
146             self.failUnlessEqual(newdata, DATA)
147         d.addCallback(_download_to_filename_done)
148
149         target_filename2 = os.path.join(self.basedir, "download.target2")
150         def _download_to_filehandle(res):
151             fh = open(target_filename2, "wb")
152             return self.downloader.download_to_filehandle(self.uri, fh)
153         d.addCallback(_download_to_filehandle)
154         def _download_to_filehandle_done(fh):
155             fh.close()
156             newdata = open(target_filename2, "rb").read()
157             self.failUnlessEqual(newdata, DATA)
158         d.addCallback(_download_to_filehandle_done)
159
160         def _download_nonexistent_uri(res):
161             baduri = self.mangle_uri(self.uri)
162             log.msg("about to download non-existent URI", level=log.UNUSUAL,
163                     facility="tahoe.tests")
164             d1 = self.downloader.download_to_data(baduri)
165             def _baduri_should_fail(res):
166                 log.msg("finished downloading non-existend URI",
167                         level=log.UNUSUAL, facility="tahoe.tests")
168                 self.failUnless(isinstance(res, Failure))
169                 self.failUnless(res.check(download.NotEnoughSharesError),
170                                 "expected NotEnoughSharesError, got %s" % res)
171                 # TODO: files that have zero peers should get a special kind
172                 # of NotEnoughSharesError, which can be used to suggest that
173                 # the URI might be wrong or that they've never uploaded the
174                 # file in the first place.
175             d1.addBoth(_baduri_should_fail)
176             return d1
177         d.addCallback(_download_nonexistent_uri)
178
179         # add a new node, which doesn't accept shares, and only uses the
180         # helper for upload.
181         d.addCallback(lambda res: self.add_extra_node(self.numclients,
182                                                       self.helper_furl,
183                                                       add_to_sparent=True))
184         def _added(extra_node):
185             self.extra_node = extra_node
186             extra_node.getServiceNamed("storage").sizelimit = 0
187         d.addCallback(_added)
188
189         HELPER_DATA = "Data that needs help to upload" * 1000
190         def _upload_with_helper(res):
191             u = upload.Data(HELPER_DATA, convergence=convergence)
192             d = self.extra_node.upload(u)
193             def _uploaded(results):
194                 uri = results.uri
195                 return self.downloader.download_to_data(uri)
196             d.addCallback(_uploaded)
197             def _check(newdata):
198                 self.failUnlessEqual(newdata, HELPER_DATA)
199             d.addCallback(_check)
200             return d
201         d.addCallback(_upload_with_helper)
202
203         def _upload_duplicate_with_helper(res):
204             u = upload.Data(HELPER_DATA, convergence=convergence)
205             u.debug_stash_RemoteEncryptedUploadable = True
206             d = self.extra_node.upload(u)
207             def _uploaded(results):
208                 uri = results.uri
209                 return self.downloader.download_to_data(uri)
210             d.addCallback(_uploaded)
211             def _check(newdata):
212                 self.failUnlessEqual(newdata, HELPER_DATA)
213                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
214                             "uploadable started uploading, should have been avoided")
215             d.addCallback(_check)
216             return d
217         if convergence is not None:
218             d.addCallback(_upload_duplicate_with_helper)
219
220         def _upload_resumable(res):
221             DATA = "Data that needs help to upload and gets interrupted" * 1000
222             u1 = CountingDataUploadable(DATA, convergence=convergence)
223             u2 = CountingDataUploadable(DATA, convergence=convergence)
224
225             # we interrupt the connection after about 5kB by shutting down
226             # the helper, then restartingit.
227             u1.interrupt_after = 5000
228             u1.interrupt_after_d = defer.Deferred()
229             u1.interrupt_after_d.addCallback(lambda res:
230                                              self.bounce_client(0))
231
232             # sneak into the helper and reduce its chunk size, so that our
233             # debug_interrupt will sever the connection on about the fifth
234             # chunk fetched. This makes sure that we've started to write the
235             # new shares before we abandon them, which exercises the
236             # abort/delete-partial-share code. TODO: find a cleaner way to do
237             # this. I know that this will affect later uses of the helper in
238             # this same test run, but I'm not currently worried about it.
239             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
240
241             d = self.extra_node.upload(u1)
242
243             def _should_not_finish(res):
244                 self.fail("interrupted upload should have failed, not finished"
245                           " with result %s" % (res,))
246             def _interrupted(f):
247                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
248
249                 # make sure we actually interrupted it before finishing the
250                 # file
251                 self.failUnless(u1.bytes_read < len(DATA),
252                                 "read %d out of %d total" % (u1.bytes_read,
253                                                              len(DATA)))
254
255                 log.msg("waiting for reconnect", level=log.NOISY,
256                         facility="tahoe.test.test_system")
257                 # now, we need to give the nodes a chance to notice that this
258                 # connection has gone away. When this happens, the storage
259                 # servers will be told to abort their uploads, removing the
260                 # partial shares. Unfortunately this involves TCP messages
261                 # going through the loopback interface, and we can't easily
262                 # predict how long that will take. If it were all local, we
263                 # could use fireEventually() to stall. Since we don't have
264                 # the right introduction hooks, the best we can do is use a
265                 # fixed delay. TODO: this is fragile.
266                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
267                 return u1.interrupt_after_d
268             d.addCallbacks(_should_not_finish, _interrupted)
269
270             def _disconnected(res):
271                 # check to make sure the storage servers aren't still hanging
272                 # on to the partial share: their incoming/ directories should
273                 # now be empty.
274                 log.msg("disconnected", level=log.NOISY,
275                         facility="tahoe.test.test_system")
276                 for i in range(self.numclients):
277                     incdir = os.path.join(self.getdir("client%d" % i),
278                                           "storage", "shares", "incoming")
279                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
280             d.addCallback(_disconnected)
281
282             # then we need to give the reconnector a chance to
283             # reestablish the connection to the helper.
284             d.addCallback(lambda res:
285                           log.msg("wait_for_connections", level=log.NOISY,
286                                   facility="tahoe.test.test_system"))
287             d.addCallback(lambda res: self.wait_for_connections())
288
289
290             d.addCallback(lambda res:
291                           log.msg("uploading again", level=log.NOISY,
292                                   facility="tahoe.test.test_system"))
293             d.addCallback(lambda res: self.extra_node.upload(u2))
294
295             def _uploaded(results):
296                 uri = results.uri
297                 log.msg("Second upload complete", level=log.NOISY,
298                         facility="tahoe.test.test_system")
299
300                 # this is really bytes received rather than sent, but it's
301                 # convenient and basically measures the same thing
302                 bytes_sent = results.ciphertext_fetched
303
304                 # We currently don't support resumption of upload if the data is
305                 # encrypted with a random key.  (Because that would require us
306                 # to store the key locally and re-use it on the next upload of
307                 # this file, which isn't a bad thing to do, but we currently
308                 # don't do it.)
309                 if convergence is not None:
310                     # Make sure we did not have to read the whole file the
311                     # second time around .
312                     self.failUnless(bytes_sent < len(DATA),
313                                 "resumption didn't save us any work:"
314                                 " read %d bytes out of %d total" %
315                                 (bytes_sent, len(DATA)))
316                 else:
317                     # Make sure we did have to read the whole file the second
318                     # time around -- because the one that we partially uploaded
319                     # earlier was encrypted with a different random key.
320                     self.failIf(bytes_sent < len(DATA),
321                                 "resumption saved us some work even though we were using random keys:"
322                                 " read %d bytes out of %d total" %
323                                 (bytes_sent, len(DATA)))
324                 return self.downloader.download_to_data(uri)
325             d.addCallback(_uploaded)
326
327             def _check(newdata):
328                 self.failUnlessEqual(newdata, DATA)
329                 # If using convergent encryption, then also check that the
330                 # helper has removed the temp file from its directories.
331                 if convergence is not None:
332                     basedir = os.path.join(self.getdir("client0"), "helper")
333                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
334                     self.failUnlessEqual(files, [])
335                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
336                     self.failUnlessEqual(files, [])
337             d.addCallback(_check)
338             return d
339         d.addCallback(_upload_resumable)
340
341         return d
342
343     def _find_shares(self, basedir):
344         shares = []
345         for (dirpath, dirnames, filenames) in os.walk(basedir):
346             if "storage" not in dirpath:
347                 continue
348             if not filenames:
349                 continue
350             pieces = dirpath.split(os.sep)
351             if pieces[-4] == "storage" and pieces[-3] == "shares":
352                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
353                 # are sharefiles here
354                 assert pieces[-5].startswith("client")
355                 client_num = int(pieces[-5][-1])
356                 storage_index_s = pieces[-1]
357                 storage_index = storage.si_a2b(storage_index_s)
358                 for sharename in filenames:
359                     shnum = int(sharename)
360                     filename = os.path.join(dirpath, sharename)
361                     data = (client_num, storage_index, filename, shnum)
362                     shares.append(data)
363         if not shares:
364             self.fail("unable to find any share files in %s" % basedir)
365         return shares
366
367     def _corrupt_mutable_share(self, filename, which):
368         msf = storage.MutableShareFile(filename)
369         datav = msf.readv([ (0, 1000000) ])
370         final_share = datav[0]
371         assert len(final_share) < 1000000 # ought to be truncated
372         pieces = mutable_layout.unpack_share(final_share)
373         (seqnum, root_hash, IV, k, N, segsize, datalen,
374          verification_key, signature, share_hash_chain, block_hash_tree,
375          share_data, enc_privkey) = pieces
376
377         if which == "seqnum":
378             seqnum = seqnum + 15
379         elif which == "R":
380             root_hash = self.flip_bit(root_hash)
381         elif which == "IV":
382             IV = self.flip_bit(IV)
383         elif which == "segsize":
384             segsize = segsize + 15
385         elif which == "pubkey":
386             verification_key = self.flip_bit(verification_key)
387         elif which == "signature":
388             signature = self.flip_bit(signature)
389         elif which == "share_hash_chain":
390             nodenum = share_hash_chain.keys()[0]
391             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
392         elif which == "block_hash_tree":
393             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
394         elif which == "share_data":
395             share_data = self.flip_bit(share_data)
396         elif which == "encprivkey":
397             enc_privkey = self.flip_bit(enc_privkey)
398
399         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
400                                             segsize, datalen)
401         final_share = mutable_layout.pack_share(prefix,
402                                                 verification_key,
403                                                 signature,
404                                                 share_hash_chain,
405                                                 block_hash_tree,
406                                                 share_data,
407                                                 enc_privkey)
408         msf.writev( [(0, final_share)], None)
409
410
411     def test_mutable(self):
412         self.basedir = "system/SystemTest/test_mutable"
413         DATA = "initial contents go here."  # 25 bytes % 3 != 0
414         NEWDATA = "new contents yay"
415         NEWERDATA = "this is getting old"
416
417         d = self.set_up_nodes(use_key_generator=True)
418
419         def _create_mutable(res):
420             c = self.clients[0]
421             log.msg("starting create_mutable_file")
422             d1 = c.create_mutable_file(DATA)
423             def _done(res):
424                 log.msg("DONE: %s" % (res,))
425                 self._mutable_node_1 = res
426                 uri = res.get_uri()
427             d1.addCallback(_done)
428             return d1
429         d.addCallback(_create_mutable)
430
431         def _test_debug(res):
432             # find a share. It is important to run this while there is only
433             # one slot in the grid.
434             shares = self._find_shares(self.basedir)
435             (client_num, storage_index, filename, shnum) = shares[0]
436             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
437                     % filename)
438             log.msg(" for clients[%d]" % client_num)
439
440             out,err = StringIO(), StringIO()
441             rc = runner.runner(["dump-share",
442                                 filename],
443                                stdout=out, stderr=err)
444             output = out.getvalue()
445             self.failUnlessEqual(rc, 0)
446             try:
447                 self.failUnless("Mutable slot found:\n" in output)
448                 self.failUnless("share_type: SDMF\n" in output)
449                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
450                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
451                 self.failUnless(" num_extra_leases: 0\n" in output)
452                 # the pubkey size can vary by a byte, so the container might
453                 # be a bit larger on some runs.
454                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
455                 self.failUnless(m)
456                 container_size = int(m.group(1))
457                 self.failUnless(2037 <= container_size <= 2049, container_size)
458                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
459                 self.failUnless(m)
460                 data_length = int(m.group(1))
461                 self.failUnless(2037 <= data_length <= 2049, data_length)
462                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
463                                 in output)
464                 self.failUnless(" SDMF contents:\n" in output)
465                 self.failUnless("  seqnum: 1\n" in output)
466                 self.failUnless("  required_shares: 3\n" in output)
467                 self.failUnless("  total_shares: 10\n" in output)
468                 self.failUnless("  segsize: 27\n" in output, (output, filename))
469                 self.failUnless("  datalen: 25\n" in output)
470                 # the exact share_hash_chain nodes depends upon the sharenum,
471                 # and is more of a hassle to compute than I want to deal with
472                 # now
473                 self.failUnless("  share_hash_chain: " in output)
474                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
475                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
476                             base32.b2a(storage_index))
477                 self.failUnless(expected in output)
478             except unittest.FailTest:
479                 print
480                 print "dump-share output was:"
481                 print output
482                 raise
483         d.addCallback(_test_debug)
484
485         # test retrieval
486
487         # first, let's see if we can use the existing node to retrieve the
488         # contents. This allows it to use the cached pubkey and maybe the
489         # latest-known sharemap.
490
491         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
492         def _check_download_1(res):
493             self.failUnlessEqual(res, DATA)
494             # now we see if we can retrieve the data from a new node,
495             # constructed using the URI of the original one. We do this test
496             # on the same client that uploaded the data.
497             uri = self._mutable_node_1.get_uri()
498             log.msg("starting retrieve1")
499             newnode = self.clients[0].create_node_from_uri(uri)
500             newnode_2 = self.clients[0].create_node_from_uri(uri)
501             self.failUnlessIdentical(newnode, newnode_2)
502             return newnode.download_best_version()
503         d.addCallback(_check_download_1)
504
505         def _check_download_2(res):
506             self.failUnlessEqual(res, DATA)
507             # same thing, but with a different client
508             uri = self._mutable_node_1.get_uri()
509             newnode = self.clients[1].create_node_from_uri(uri)
510             log.msg("starting retrieve2")
511             d1 = newnode.download_best_version()
512             d1.addCallback(lambda res: (res, newnode))
513             return d1
514         d.addCallback(_check_download_2)
515
516         def _check_download_3((res, newnode)):
517             self.failUnlessEqual(res, DATA)
518             # replace the data
519             log.msg("starting replace1")
520             d1 = newnode.overwrite(NEWDATA)
521             d1.addCallback(lambda res: newnode.download_best_version())
522             return d1
523         d.addCallback(_check_download_3)
524
525         def _check_download_4(res):
526             self.failUnlessEqual(res, NEWDATA)
527             # now create an even newer node and replace the data on it. This
528             # new node has never been used for download before.
529             uri = self._mutable_node_1.get_uri()
530             newnode1 = self.clients[2].create_node_from_uri(uri)
531             newnode2 = self.clients[3].create_node_from_uri(uri)
532             self._newnode3 = self.clients[3].create_node_from_uri(uri)
533             log.msg("starting replace2")
534             d1 = newnode1.overwrite(NEWERDATA)
535             d1.addCallback(lambda res: newnode2.download_best_version())
536             return d1
537         d.addCallback(_check_download_4)
538
539         def _check_download_5(res):
540             log.msg("finished replace2")
541             self.failUnlessEqual(res, NEWERDATA)
542         d.addCallback(_check_download_5)
543
544         def _corrupt_shares(res):
545             # run around and flip bits in all but k of the shares, to test
546             # the hash checks
547             shares = self._find_shares(self.basedir)
548             ## sort by share number
549             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
550             where = dict([ (shnum, filename)
551                            for (client_num, storage_index, filename, shnum)
552                            in shares ])
553             assert len(where) == 10 # this test is designed for 3-of-10
554             for shnum, filename in where.items():
555                 # shares 7,8,9 are left alone. read will check
556                 # (share_hash_chain, block_hash_tree, share_data). New
557                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
558                 # segsize, signature).
559                 if shnum == 0:
560                     # read: this will trigger "pubkey doesn't match
561                     # fingerprint".
562                     self._corrupt_mutable_share(filename, "pubkey")
563                     self._corrupt_mutable_share(filename, "encprivkey")
564                 elif shnum == 1:
565                     # triggers "signature is invalid"
566                     self._corrupt_mutable_share(filename, "seqnum")
567                 elif shnum == 2:
568                     # triggers "signature is invalid"
569                     self._corrupt_mutable_share(filename, "R")
570                 elif shnum == 3:
571                     # triggers "signature is invalid"
572                     self._corrupt_mutable_share(filename, "segsize")
573                 elif shnum == 4:
574                     self._corrupt_mutable_share(filename, "share_hash_chain")
575                 elif shnum == 5:
576                     self._corrupt_mutable_share(filename, "block_hash_tree")
577                 elif shnum == 6:
578                     self._corrupt_mutable_share(filename, "share_data")
579                 # other things to correct: IV, signature
580                 # 7,8,9 are left alone
581
582                 # note that initial_query_count=5 means that we'll hit the
583                 # first 5 servers in effectively random order (based upon
584                 # response time), so we won't necessarily ever get a "pubkey
585                 # doesn't match fingerprint" error (if we hit shnum>=1 before
586                 # shnum=0, we pull the pubkey from there). To get repeatable
587                 # specific failures, we need to set initial_query_count=1,
588                 # but of course that will change the sequencing behavior of
589                 # the retrieval process. TODO: find a reasonable way to make
590                 # this a parameter, probably when we expand this test to test
591                 # for one failure mode at a time.
592
593                 # when we retrieve this, we should get three signature
594                 # failures (where we've mangled seqnum, R, and segsize). The
595                 # pubkey mangling
596         d.addCallback(_corrupt_shares)
597
598         d.addCallback(lambda res: self._newnode3.download_best_version())
599         d.addCallback(_check_download_5)
600
601         def _check_empty_file(res):
602             # make sure we can create empty files, this usually screws up the
603             # segsize math
604             d1 = self.clients[2].create_mutable_file("")
605             d1.addCallback(lambda newnode: newnode.download_best_version())
606             d1.addCallback(lambda res: self.failUnlessEqual("", res))
607             return d1
608         d.addCallback(_check_empty_file)
609
610         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
611         def _created_dirnode(dnode):
612             log.msg("_created_dirnode(%s)" % (dnode,))
613             d1 = dnode.list()
614             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
615             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
616             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
617             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
618             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
619             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
620             d1.addCallback(lambda res: dnode.build_manifest())
621             d1.addCallback(lambda manifest:
622                            self.failUnlessEqual(len(manifest), 1))
623             return d1
624         d.addCallback(_created_dirnode)
625
626         def wait_for_c3_kg_conn():
627             return self.clients[3]._key_generator is not None
628         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
629
630         def check_kg_poolsize(junk, size_delta):
631             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
632                                  self.key_generator_svc.key_generator.pool_size + size_delta)
633
634         d.addCallback(check_kg_poolsize, 0)
635         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
636         d.addCallback(check_kg_poolsize, -1)
637         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
638         d.addCallback(check_kg_poolsize, -2)
639         # use_helper induces use of clients[3], which is the using-key_gen client
640         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
641         d.addCallback(check_kg_poolsize, -3)
642
643         return d
644     # The default 120 second timeout went off when running it under valgrind
645     # on my old Windows laptop, so I'm bumping up the timeout.
646     test_mutable.timeout = 240
647
648     def flip_bit(self, good):
649         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
650
651     def mangle_uri(self, gooduri):
652         # change the key, which changes the storage index, which means we'll
653         # be asking about the wrong file, so nobody will have any shares
654         u = IFileURI(gooduri)
655         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
656                             uri_extension_hash=u.uri_extension_hash,
657                             needed_shares=u.needed_shares,
658                             total_shares=u.total_shares,
659                             size=u.size)
660         return u2.to_string()
661
662     # TODO: add a test which mangles the uri_extension_hash instead, and
663     # should fail due to not being able to get a valid uri_extension block.
664     # Also a test which sneakily mangles the uri_extension block to change
665     # some of the validation data, so it will fail in the post-download phase
666     # when the file's crypttext integrity check fails. Do the same thing for
667     # the key, which should cause the download to fail the post-download
668     # plaintext_hash check.
669
670     def test_vdrive(self):
671         self.basedir = "system/SystemTest/test_vdrive"
672         self.data = LARGE_DATA
673         d = self.set_up_nodes(use_stats_gatherer=True)
674         d.addCallback(self._test_introweb)
675         d.addCallback(self.log, "starting publish")
676         d.addCallback(self._do_publish1)
677         d.addCallback(self._test_runner)
678         d.addCallback(self._do_publish2)
679         # at this point, we have the following filesystem (where "R" denotes
680         # self._root_directory_uri):
681         # R
682         # R/subdir1
683         # R/subdir1/mydata567
684         # R/subdir1/subdir2/
685         # R/subdir1/subdir2/mydata992
686
687         d.addCallback(lambda res: self.bounce_client(0))
688         d.addCallback(self.log, "bounced client0")
689
690         d.addCallback(self._check_publish1)
691         d.addCallback(self.log, "did _check_publish1")
692         d.addCallback(self._check_publish2)
693         d.addCallback(self.log, "did _check_publish2")
694         d.addCallback(self._do_publish_private)
695         d.addCallback(self.log, "did _do_publish_private")
696         # now we also have (where "P" denotes a new dir):
697         #  P/personal/sekrit data
698         #  P/s2-rw -> /subdir1/subdir2/
699         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
700         d.addCallback(self._check_publish_private)
701         d.addCallback(self.log, "did _check_publish_private")
702         d.addCallback(self._test_web)
703         d.addCallback(self._test_control)
704         d.addCallback(self._test_cli)
705         # P now has four top-level children:
706         # P/personal/sekrit data
707         # P/s2-ro/
708         # P/s2-rw/
709         # P/test_put/  (empty)
710         d.addCallback(self._test_checker)
711         d.addCallback(self._grab_stats)
712         return d
713     test_vdrive.timeout = 1100
714
715     def _test_introweb(self, res):
716         d = getPage(self.introweb_url, method="GET", followRedirect=True)
717         def _check(res):
718             try:
719                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
720                                 in res)
721                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
722                 self.failUnless("Subscription Summary: storage: 5" in res)
723             except unittest.FailTest:
724                 print
725                 print "GET %s output was:" % self.introweb_url
726                 print res
727                 raise
728         d.addCallback(_check)
729         d.addCallback(lambda res:
730                       getPage(self.introweb_url + "?t=json",
731                               method="GET", followRedirect=True))
732         def _check_json(res):
733             data = simplejson.loads(res)
734             try:
735                 self.failUnlessEqual(data["subscription_summary"],
736                                      {"storage": 5})
737                 self.failUnlessEqual(data["announcement_summary"],
738                                      {"storage": 5, "stub_client": 5})
739             except unittest.FailTest:
740                 print
741                 print "GET %s?t=json output was:" % self.introweb_url
742                 print res
743                 raise
744         d.addCallback(_check_json)
745         return d
746
747     def _do_publish1(self, res):
748         ut = upload.Data(self.data, convergence=None)
749         c0 = self.clients[0]
750         d = c0.create_empty_dirnode()
751         def _made_root(new_dirnode):
752             self._root_directory_uri = new_dirnode.get_uri()
753             return c0.create_node_from_uri(self._root_directory_uri)
754         d.addCallback(_made_root)
755         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
756         def _made_subdir1(subdir1_node):
757             self._subdir1_node = subdir1_node
758             d1 = subdir1_node.add_file(u"mydata567", ut)
759             d1.addCallback(self.log, "publish finished")
760             def _stash_uri(filenode):
761                 self.uri = filenode.get_uri()
762             d1.addCallback(_stash_uri)
763             return d1
764         d.addCallback(_made_subdir1)
765         return d
766
767     def _do_publish2(self, res):
768         ut = upload.Data(self.data, convergence=None)
769         d = self._subdir1_node.create_empty_directory(u"subdir2")
770         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
771         return d
772
773     def log(self, res, msg, **kwargs):
774         # print "MSG: %s  RES: %s" % (msg, res)
775         log.msg(msg, **kwargs)
776         return res
777
778     def _do_publish_private(self, res):
779         self.smalldata = "sssh, very secret stuff"
780         ut = upload.Data(self.smalldata, convergence=None)
781         d = self.clients[0].create_empty_dirnode()
782         d.addCallback(self.log, "GOT private directory")
783         def _got_new_dir(privnode):
784             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
785             d1 = privnode.create_empty_directory(u"personal")
786             d1.addCallback(self.log, "made P/personal")
787             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
788             d1.addCallback(self.log, "made P/personal/sekrit data")
789             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
790             def _got_s2(s2node):
791                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
792                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
793                 return d2
794             d1.addCallback(_got_s2)
795             d1.addCallback(lambda res: privnode)
796             return d1
797         d.addCallback(_got_new_dir)
798         return d
799
800     def _check_publish1(self, res):
801         # this one uses the iterative API
802         c1 = self.clients[1]
803         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
804         d.addCallback(self.log, "check_publish1 got /")
805         d.addCallback(lambda root: root.get(u"subdir1"))
806         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
807         d.addCallback(lambda filenode: filenode.download_to_data())
808         d.addCallback(self.log, "get finished")
809         def _get_done(data):
810             self.failUnlessEqual(data, self.data)
811         d.addCallback(_get_done)
812         return d
813
814     def _check_publish2(self, res):
815         # this one uses the path-based API
816         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
817         d = rootnode.get_child_at_path(u"subdir1")
818         d.addCallback(lambda dirnode:
819                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
820         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
821         d.addCallback(lambda filenode: filenode.download_to_data())
822         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
823
824         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
825         def _got_filenode(filenode):
826             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
827             assert fnode == filenode
828         d.addCallback(_got_filenode)
829         return d
830
831     def _check_publish_private(self, resnode):
832         # this one uses the path-based API
833         self._private_node = resnode
834
835         d = self._private_node.get_child_at_path(u"personal")
836         def _got_personal(personal):
837             self._personal_node = personal
838             return personal
839         d.addCallback(_got_personal)
840
841         d.addCallback(lambda dirnode:
842                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
843         def get_path(path):
844             return self._private_node.get_child_at_path(path)
845
846         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
847         d.addCallback(lambda filenode: filenode.download_to_data())
848         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
849         d.addCallback(lambda res: get_path(u"s2-rw"))
850         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
851         d.addCallback(lambda res: get_path(u"s2-ro"))
852         def _got_s2ro(dirnode):
853             self.failUnless(dirnode.is_mutable(), dirnode)
854             self.failUnless(dirnode.is_readonly(), dirnode)
855             d1 = defer.succeed(None)
856             d1.addCallback(lambda res: dirnode.list())
857             d1.addCallback(self.log, "dirnode.list")
858
859             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
860
861             d1.addCallback(self.log, "doing add_file(ro)")
862             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
863             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
864
865             d1.addCallback(self.log, "doing get(ro)")
866             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
867             d1.addCallback(lambda filenode:
868                            self.failUnless(IFileNode.providedBy(filenode)))
869
870             d1.addCallback(self.log, "doing delete(ro)")
871             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
872
873             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
874
875             d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
876
877             personal = self._personal_node
878             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
879
880             d1.addCallback(self.log, "doing move_child_to(ro)2")
881             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
882
883             d1.addCallback(self.log, "finished with _got_s2ro")
884             return d1
885         d.addCallback(_got_s2ro)
886         def _got_home(dummy):
887             home = self._private_node
888             personal = self._personal_node
889             d1 = defer.succeed(None)
890             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
891             d1.addCallback(lambda res:
892                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
893
894             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
895             d1.addCallback(lambda res:
896                            home.move_child_to(u"sekrit", home, u"sekrit data"))
897
898             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
899             d1.addCallback(lambda res:
900                            home.move_child_to(u"sekrit data", personal))
901
902             d1.addCallback(lambda res: home.build_manifest())
903             d1.addCallback(self.log, "manifest")
904             #  four items:
905             # P/personal/
906             # P/personal/sekrit data
907             # P/s2-rw  (same as P/s2-ro)
908             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
909             d1.addCallback(lambda manifest:
910                            self.failUnlessEqual(len(manifest), 4))
911             d1.addCallback(lambda res: home.deep_stats())
912             def _check_stats(stats):
913                 expected = {"count-immutable-files": 1,
914                             "count-mutable-files": 0,
915                             "count-literal-files": 1,
916                             "count-files": 2,
917                             "count-directories": 3,
918                             "size-immutable-files": 112,
919                             "size-literal-files": 23,
920                             #"size-directories": 616, # varies
921                             #"largest-directory": 616,
922                             "largest-directory-children": 3,
923                             "largest-immutable-file": 112,
924                             }
925                 for k,v in expected.iteritems():
926                     self.failUnlessEqual(stats[k], v,
927                                          "stats[%s] was %s, not %s" %
928                                          (k, stats[k], v))
929                 self.failUnless(stats["size-directories"] > 1300,
930                                 stats["size-directories"])
931                 self.failUnless(stats["largest-directory"] > 800,
932                                 stats["largest-directory"])
933                 self.failUnlessEqual(stats["size-files-histogram"],
934                                      [ (11, 31, 1), (101, 316, 1) ])
935             d1.addCallback(_check_stats)
936             return d1
937         d.addCallback(_got_home)
938         return d
939
940     def shouldFail(self, res, expected_failure, which, substring=None):
941         if isinstance(res, Failure):
942             res.trap(expected_failure)
943             if substring:
944                 self.failUnless(substring in str(res),
945                                 "substring '%s' not in '%s'"
946                                 % (substring, str(res)))
947         else:
948             self.fail("%s was supposed to raise %s, not get '%s'" %
949                       (which, expected_failure, res))
950
951     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
952         assert substring is None or isinstance(substring, str)
953         d = defer.maybeDeferred(callable, *args, **kwargs)
954         def done(res):
955             if isinstance(res, Failure):
956                 res.trap(expected_failure)
957                 if substring:
958                     self.failUnless(substring in str(res),
959                                     "substring '%s' not in '%s'"
960                                     % (substring, str(res)))
961             else:
962                 self.fail("%s was supposed to raise %s, not get '%s'" %
963                           (which, expected_failure, res))
964         d.addBoth(done)
965         return d
966
967     def PUT(self, urlpath, data):
968         url = self.webish_url + urlpath
969         return getPage(url, method="PUT", postdata=data)
970
971     def GET(self, urlpath, followRedirect=False):
972         url = self.webish_url + urlpath
973         return getPage(url, method="GET", followRedirect=followRedirect)
974
975     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
976         if use_helper:
977             url = self.helper_webish_url + urlpath
978         else:
979             url = self.webish_url + urlpath
980         sepbase = "boogabooga"
981         sep = "--" + sepbase
982         form = []
983         form.append(sep)
984         form.append('Content-Disposition: form-data; name="_charset"')
985         form.append('')
986         form.append('UTF-8')
987         form.append(sep)
988         for name, value in fields.iteritems():
989             if isinstance(value, tuple):
990                 filename, value = value
991                 form.append('Content-Disposition: form-data; name="%s"; '
992                             'filename="%s"' % (name, filename.encode("utf-8")))
993             else:
994                 form.append('Content-Disposition: form-data; name="%s"' % name)
995             form.append('')
996             form.append(str(value))
997             form.append(sep)
998         form[-1] += "--"
999         body = "\r\n".join(form) + "\r\n"
1000         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1001                    }
1002         return getPage(url, method="POST", postdata=body,
1003                        headers=headers, followRedirect=followRedirect)
1004
1005     def _test_web(self, res):
1006         base = self.webish_url
1007         public = "uri/" + self._root_directory_uri
1008         d = getPage(base)
1009         def _got_welcome(page):
1010             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1011             self.failUnless(expected in page,
1012                             "I didn't see the right 'connected storage servers'"
1013                             " message in: %s" % page
1014                             )
1015             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1016             self.failUnless(expected in page,
1017                             "I didn't see the right 'My nodeid' message "
1018                             "in: %s" % page)
1019             self.failUnless("Helper: 0 active uploads" in page)
1020         d.addCallback(_got_welcome)
1021         d.addCallback(self.log, "done with _got_welcome")
1022
1023         # get the welcome page from the node that uses the helper too
1024         d.addCallback(lambda res: getPage(self.helper_webish_url))
1025         def _got_welcome_helper(page):
1026             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1027                             page)
1028             self.failUnless("Not running helper" in page)
1029         d.addCallback(_got_welcome_helper)
1030
1031         d.addCallback(lambda res: getPage(base + public))
1032         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1033         def _got_subdir1(page):
1034             # there ought to be an href for our file
1035             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1036             self.failUnless(">mydata567</a>" in page)
1037         d.addCallback(_got_subdir1)
1038         d.addCallback(self.log, "done with _got_subdir1")
1039         d.addCallback(lambda res:
1040                       getPage(base + public + "/subdir1/mydata567"))
1041         def _got_data(page):
1042             self.failUnlessEqual(page, self.data)
1043         d.addCallback(_got_data)
1044
1045         # download from a URI embedded in a URL
1046         d.addCallback(self.log, "_get_from_uri")
1047         def _get_from_uri(res):
1048             return getPage(base + "uri/%s?filename=%s"
1049                            % (self.uri, "mydata567"))
1050         d.addCallback(_get_from_uri)
1051         def _got_from_uri(page):
1052             self.failUnlessEqual(page, self.data)
1053         d.addCallback(_got_from_uri)
1054
1055         # download from a URI embedded in a URL, second form
1056         d.addCallback(self.log, "_get_from_uri2")
1057         def _get_from_uri2(res):
1058             return getPage(base + "uri?uri=%s" % (self.uri,))
1059         d.addCallback(_get_from_uri2)
1060         d.addCallback(_got_from_uri)
1061
1062         # download from a bogus URI, make sure we get a reasonable error
1063         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1064         def _get_from_bogus_uri(res):
1065             d1 = getPage(base + "uri/%s?filename=%s"
1066                          % (self.mangle_uri(self.uri), "mydata567"))
1067             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1068                        "410")
1069             return d1
1070         d.addCallback(_get_from_bogus_uri)
1071         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1072
1073         # upload a file with PUT
1074         d.addCallback(self.log, "about to try PUT")
1075         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1076                                            "new.txt contents"))
1077         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1078         d.addCallback(self.failUnlessEqual, "new.txt contents")
1079         # and again with something large enough to use multiple segments,
1080         # and hopefully trigger pauseProducing too
1081         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1082                                            "big" * 500000)) # 1.5MB
1083         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1084         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1085
1086         # can we replace files in place?
1087         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1088                                            "NEWER contents"))
1089         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1090         d.addCallback(self.failUnlessEqual, "NEWER contents")
1091
1092         # test unlinked POST
1093         d.addCallback(lambda res: self.POST("uri", t="upload",
1094                                             file=("new.txt", "data" * 10000)))
1095         # and again using the helper, which exercises different upload-status
1096         # display code
1097         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1098                                             file=("foo.txt", "data2" * 10000)))
1099
1100         # check that the status page exists
1101         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1102         def _got_status(res):
1103             # find an interesting upload and download to look at. LIT files
1104             # are not interesting.
1105             for ds in self.clients[0].list_all_download_statuses():
1106                 if ds.get_size() > 200:
1107                     self._down_status = ds.get_counter()
1108             for us in self.clients[0].list_all_upload_statuses():
1109                 if us.get_size() > 200:
1110                     self._up_status = us.get_counter()
1111             rs = self.clients[0].list_all_retrieve_statuses()[0]
1112             self._retrieve_status = rs.get_counter()
1113             ps = self.clients[0].list_all_publish_statuses()[0]
1114             self._publish_status = ps.get_counter()
1115             us = self.clients[0].list_all_mapupdate_statuses()[0]
1116             self._update_status = us.get_counter()
1117
1118             # and that there are some upload- and download- status pages
1119             return self.GET("status/up-%d" % self._up_status)
1120         d.addCallback(_got_status)
1121         def _got_up(res):
1122             return self.GET("status/down-%d" % self._down_status)
1123         d.addCallback(_got_up)
1124         def _got_down(res):
1125             return self.GET("status/mapupdate-%d" % self._update_status)
1126         d.addCallback(_got_down)
1127         def _got_update(res):
1128             return self.GET("status/publish-%d" % self._publish_status)
1129         d.addCallback(_got_update)
1130         def _got_publish(res):
1131             return self.GET("status/retrieve-%d" % self._retrieve_status)
1132         d.addCallback(_got_publish)
1133
1134         # check that the helper status page exists
1135         d.addCallback(lambda res:
1136                       self.GET("helper_status", followRedirect=True))
1137         def _got_helper_status(res):
1138             self.failUnless("Bytes Fetched:" in res)
1139             # touch a couple of files in the helper's working directory to
1140             # exercise more code paths
1141             workdir = os.path.join(self.getdir("client0"), "helper")
1142             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1143             f = open(incfile, "wb")
1144             f.write("small file")
1145             f.close()
1146             then = time.time() - 86400*3
1147             now = time.time()
1148             os.utime(incfile, (now, then))
1149             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1150             f = open(encfile, "wb")
1151             f.write("less small file")
1152             f.close()
1153             os.utime(encfile, (now, then))
1154         d.addCallback(_got_helper_status)
1155         # and that the json form exists
1156         d.addCallback(lambda res:
1157                       self.GET("helper_status?t=json", followRedirect=True))
1158         def _got_helper_status_json(res):
1159             data = simplejson.loads(res)
1160             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1161                                  1)
1162             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1163             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1164             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1165                                  10)
1166             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1167             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1168             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1169                                  15)
1170         d.addCallback(_got_helper_status_json)
1171
1172         # and check that client[3] (which uses a helper but does not run one
1173         # itself) doesn't explode when you ask for its status
1174         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1175         def _got_non_helper_status(res):
1176             self.failUnless("Upload and Download Status" in res)
1177         d.addCallback(_got_non_helper_status)
1178
1179         # or for helper status with t=json
1180         d.addCallback(lambda res:
1181                       getPage(self.helper_webish_url + "helper_status?t=json"))
1182         def _got_non_helper_status_json(res):
1183             data = simplejson.loads(res)
1184             self.failUnlessEqual(data, {})
1185         d.addCallback(_got_non_helper_status_json)
1186
1187         # see if the statistics page exists
1188         d.addCallback(lambda res: self.GET("statistics"))
1189         def _got_stats(res):
1190             self.failUnless("Node Statistics" in res)
1191             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1192         d.addCallback(_got_stats)
1193         d.addCallback(lambda res: self.GET("statistics?t=json"))
1194         def _got_stats_json(res):
1195             data = simplejson.loads(res)
1196             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1197             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1198         d.addCallback(_got_stats_json)
1199
1200         # TODO: mangle the second segment of a file, to test errors that
1201         # occur after we've already sent some good data, which uses a
1202         # different error path.
1203
1204         # TODO: download a URI with a form
1205         # TODO: create a directory by using a form
1206         # TODO: upload by using a form on the directory page
1207         #    url = base + "somedir/subdir1/freeform_post!!upload"
1208         # TODO: delete a file by using a button on the directory page
1209
1210         return d
1211
1212     def _test_runner(self, res):
1213         # exercise some of the diagnostic tools in runner.py
1214
1215         # find a share
1216         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1217             if "storage" not in dirpath:
1218                 continue
1219             if not filenames:
1220                 continue
1221             pieces = dirpath.split(os.sep)
1222             if pieces[-4] == "storage" and pieces[-3] == "shares":
1223                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1224                 # are sharefiles here
1225                 filename = os.path.join(dirpath, filenames[0])
1226                 # peek at the magic to see if it is a chk share
1227                 magic = open(filename, "rb").read(4)
1228                 if magic == '\x00\x00\x00\x01':
1229                     break
1230         else:
1231             self.fail("unable to find any uri_extension files in %s"
1232                       % self.basedir)
1233         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1234
1235         out,err = StringIO(), StringIO()
1236         rc = runner.runner(["dump-share",
1237                             filename],
1238                            stdout=out, stderr=err)
1239         output = out.getvalue()
1240         self.failUnlessEqual(rc, 0)
1241
1242         # we only upload a single file, so we can assert some things about
1243         # its size and shares.
1244         self.failUnless(("share filename: %s" % filename) in output)
1245         self.failUnless("size: %d\n" % len(self.data) in output)
1246         self.failUnless("num_segments: 1\n" in output)
1247         # segment_size is always a multiple of needed_shares
1248         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1249         self.failUnless("total_shares: 10\n" in output)
1250         # keys which are supposed to be present
1251         for key in ("size", "num_segments", "segment_size",
1252                     "needed_shares", "total_shares",
1253                     "codec_name", "codec_params", "tail_codec_params",
1254                     #"plaintext_hash", "plaintext_root_hash",
1255                     "crypttext_hash", "crypttext_root_hash",
1256                     "share_root_hash", "UEB_hash"):
1257             self.failUnless("%s: " % key in output, key)
1258         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1259
1260         # now use its storage index to find the other shares using the
1261         # 'find-shares' tool
1262         sharedir, shnum = os.path.split(filename)
1263         storagedir, storage_index_s = os.path.split(sharedir)
1264         out,err = StringIO(), StringIO()
1265         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1266         cmd = ["find-shares", storage_index_s] + nodedirs
1267         rc = runner.runner(cmd, stdout=out, stderr=err)
1268         self.failUnlessEqual(rc, 0)
1269         out.seek(0)
1270         sharefiles = [sfn.strip() for sfn in out.readlines()]
1271         self.failUnlessEqual(len(sharefiles), 10)
1272
1273         # also exercise the 'catalog-shares' tool
1274         out,err = StringIO(), StringIO()
1275         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1276         cmd = ["catalog-shares"] + nodedirs
1277         rc = runner.runner(cmd, stdout=out, stderr=err)
1278         self.failUnlessEqual(rc, 0)
1279         out.seek(0)
1280         descriptions = [sfn.strip() for sfn in out.readlines()]
1281         self.failUnlessEqual(len(descriptions), 30)
1282         matching = [line
1283                     for line in descriptions
1284                     if line.startswith("CHK %s " % storage_index_s)]
1285         self.failUnlessEqual(len(matching), 10)
1286
1287     def _test_control(self, res):
1288         # exercise the remote-control-the-client foolscap interfaces in
1289         # allmydata.control (mostly used for performance tests)
1290         c0 = self.clients[0]
1291         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1292         control_furl = open(control_furl_file, "r").read().strip()
1293         # it doesn't really matter which Tub we use to connect to the client,
1294         # so let's just use our IntroducerNode's
1295         d = self.introducer.tub.getReference(control_furl)
1296         d.addCallback(self._test_control2, control_furl_file)
1297         return d
1298     def _test_control2(self, rref, filename):
1299         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1300         downfile = os.path.join(self.basedir, "control.downfile")
1301         d.addCallback(lambda uri:
1302                       rref.callRemote("download_from_uri_to_file",
1303                                       uri, downfile))
1304         def _check(res):
1305             self.failUnlessEqual(res, downfile)
1306             data = open(downfile, "r").read()
1307             expected_data = open(filename, "r").read()
1308             self.failUnlessEqual(data, expected_data)
1309         d.addCallback(_check)
1310         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1311         if sys.platform == "linux2":
1312             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1313         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1314         return d
1315
1316     def _test_cli(self, res):
1317         # run various CLI commands (in a thread, since they use blocking
1318         # network calls)
1319
1320         private_uri = self._private_node.get_uri()
1321         some_uri = self._root_directory_uri
1322         client0_basedir = self.getdir("client0")
1323
1324         nodeargs = [
1325             "--node-directory", client0_basedir,
1326             ]
1327         TESTDATA = "I will not write the same thing over and over.\n" * 100
1328
1329         d = defer.succeed(None)
1330
1331         # for compatibility with earlier versions, private/root_dir.cap is
1332         # supposed to be treated as an alias named "tahoe:". Start by making
1333         # sure that works, before we add other aliases.
1334
1335         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1336         f = open(root_file, "w")
1337         f.write(private_uri)
1338         f.close()
1339
1340         def run(ignored, verb, *args):
1341             newargs = [verb] + nodeargs + list(args)
1342             return self._run_cli(newargs)
1343
1344         def _check_ls((out,err), expected_children, unexpected_children=[]):
1345             self.failUnlessEqual(err, "")
1346             for s in expected_children:
1347                 self.failUnless(s in out, (s,out))
1348             for s in unexpected_children:
1349                 self.failIf(s in out, (s,out))
1350
1351         def _check_ls_root((out,err)):
1352             self.failUnless("personal" in out)
1353             self.failUnless("s2-ro" in out)
1354             self.failUnless("s2-rw" in out)
1355             self.failUnlessEqual(err, "")
1356
1357         # this should reference private_uri
1358         d.addCallback(run, "ls")
1359         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1360
1361         d.addCallback(run, "list-aliases")
1362         def _check_aliases_1((out,err)):
1363             self.failUnlessEqual(err, "")
1364             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1365         d.addCallback(_check_aliases_1)
1366
1367         # now that that's out of the way, remove root_dir.cap and work with
1368         # new files
1369         d.addCallback(lambda res: os.unlink(root_file))
1370         d.addCallback(run, "list-aliases")
1371         def _check_aliases_2((out,err)):
1372             self.failUnlessEqual(err, "")
1373             self.failUnlessEqual(out, "")
1374         d.addCallback(_check_aliases_2)
1375
1376         d.addCallback(run, "mkdir")
1377         def _got_dir( (out,err) ):
1378             self.failUnless(uri.from_string_dirnode(out.strip()))
1379             return out.strip()
1380         d.addCallback(_got_dir)
1381         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1382
1383         d.addCallback(run, "list-aliases")
1384         def _check_aliases_3((out,err)):
1385             self.failUnlessEqual(err, "")
1386             self.failUnless("tahoe: " in out)
1387         d.addCallback(_check_aliases_3)
1388
1389         def _check_empty_dir((out,err)):
1390             self.failUnlessEqual(out, "")
1391             self.failUnlessEqual(err, "")
1392         d.addCallback(run, "ls")
1393         d.addCallback(_check_empty_dir)
1394
1395         def _check_missing_dir((out,err)):
1396             # TODO: check that rc==2
1397             self.failUnlessEqual(out, "")
1398             self.failUnlessEqual(err, "No such file or directory\n")
1399         d.addCallback(run, "ls", "bogus")
1400         d.addCallback(_check_missing_dir)
1401
1402         files = []
1403         datas = []
1404         for i in range(10):
1405             fn = os.path.join(self.basedir, "file%d" % i)
1406             files.append(fn)
1407             data = "data to be uploaded: file%d\n" % i
1408             datas.append(data)
1409             open(fn,"wb").write(data)
1410
1411         def _check_stdout_against((out,err), filenum=None, data=None):
1412             self.failUnlessEqual(err, "")
1413             if filenum is not None:
1414                 self.failUnlessEqual(out, datas[filenum])
1415             if data is not None:
1416                 self.failUnlessEqual(out, data)
1417
1418         # test all both forms of put: from a file, and from stdin
1419         #  tahoe put bar FOO
1420         d.addCallback(run, "put", files[0], "tahoe-file0")
1421         def _put_out((out,err)):
1422             self.failUnless("URI:LIT:" in out, out)
1423             self.failUnless("201 Created" in err, err)
1424             uri0 = out.strip()
1425             return run(None, "get", uri0)
1426         d.addCallback(_put_out)
1427         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1428
1429         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1430         #  tahoe put bar tahoe:FOO
1431         d.addCallback(run, "put", files[2], "tahoe:file2")
1432         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1433         def _check_put_mutable((out,err)):
1434             self._mutable_file3_uri = out.strip()
1435         d.addCallback(_check_put_mutable)
1436         d.addCallback(run, "get", "tahoe:file3")
1437         d.addCallback(_check_stdout_against, 3)
1438
1439         def _put_from_stdin(res, data, *args):
1440             args = nodeargs + list(args)
1441             o = cli.PutOptions()
1442             o.parseOptions(args)
1443             stdin = StringIO(data)
1444             stdout, stderr = StringIO(), StringIO()
1445             o.stdin = stdin
1446             o.stdout = stdout
1447             o.stderr = stderr
1448             d = threads.deferToThread(cli.put, o)
1449             def _done(res):
1450                 return stdout.getvalue(), stderr.getvalue()
1451             d.addCallback(_done)
1452             return d
1453
1454         #  tahoe put FOO
1455         STDIN_DATA = "This is the file to upload from stdin."
1456         d.addCallback(_put_from_stdin,
1457                       STDIN_DATA,
1458                       "tahoe-file-stdin")
1459         #  tahoe put tahoe:FOO
1460         d.addCallback(_put_from_stdin,
1461                       "Other file from stdin.",
1462                       "tahoe:from-stdin")
1463
1464         d.addCallback(run, "ls")
1465         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1466                                   "tahoe-file-stdin", "from-stdin"])
1467         d.addCallback(run, "ls", "subdir")
1468         d.addCallback(_check_ls, ["tahoe-file1"])
1469
1470         # tahoe mkdir FOO
1471         d.addCallback(run, "mkdir", "subdir2")
1472         d.addCallback(run, "ls")
1473         # TODO: extract the URI, set an alias with it
1474         d.addCallback(_check_ls, ["subdir2"])
1475
1476         # tahoe get: (to stdin and to a file)
1477         d.addCallback(run, "get", "tahoe-file0")
1478         d.addCallback(_check_stdout_against, 0)
1479         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1480         d.addCallback(_check_stdout_against, 1)
1481         outfile0 = os.path.join(self.basedir, "outfile0")
1482         d.addCallback(run, "get", "file2", outfile0)
1483         def _check_outfile0((out,err)):
1484             data = open(outfile0,"rb").read()
1485             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1486         d.addCallback(_check_outfile0)
1487         outfile1 = os.path.join(self.basedir, "outfile0")
1488         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1489         def _check_outfile1((out,err)):
1490             data = open(outfile1,"rb").read()
1491             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1492         d.addCallback(_check_outfile1)
1493
1494         d.addCallback(run, "rm", "tahoe-file0")
1495         d.addCallback(run, "rm", "tahoe:file2")
1496         d.addCallback(run, "ls")
1497         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1498
1499         d.addCallback(run, "ls", "-l")
1500         def _check_ls_l((out,err)):
1501             lines = out.split("\n")
1502             for l in lines:
1503                 if "tahoe-file-stdin" in l:
1504                     self.failUnless(l.startswith("-r-- "), l)
1505                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1506                 if "file3" in l:
1507                     self.failUnless(l.startswith("-rw- "), l) # mutable
1508         d.addCallback(_check_ls_l)
1509
1510         d.addCallback(run, "ls", "--uri")
1511         def _check_ls_uri((out,err)):
1512             lines = out.split("\n")
1513             for l in lines:
1514                 if "file3" in l:
1515                     self.failUnless(self._mutable_file3_uri in l)
1516         d.addCallback(_check_ls_uri)
1517
1518         d.addCallback(run, "ls", "--readonly-uri")
1519         def _check_ls_rouri((out,err)):
1520             lines = out.split("\n")
1521             for l in lines:
1522                 if "file3" in l:
1523                     rw_uri = self._mutable_file3_uri
1524                     u = uri.from_string_mutable_filenode(rw_uri)
1525                     ro_uri = u.get_readonly().to_string()
1526                     self.failUnless(ro_uri in l)
1527         d.addCallback(_check_ls_rouri)
1528
1529
1530         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1531         d.addCallback(run, "ls")
1532         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1533
1534         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1535         d.addCallback(run, "ls")
1536         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1537
1538         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1539         d.addCallback(run, "ls")
1540         d.addCallback(_check_ls, ["file3", "file3-copy"])
1541         d.addCallback(run, "get", "tahoe:file3-copy")
1542         d.addCallback(_check_stdout_against, 3)
1543
1544         # copy from disk into tahoe
1545         d.addCallback(run, "cp", files[4], "tahoe:file4")
1546         d.addCallback(run, "ls")
1547         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1548         d.addCallback(run, "get", "tahoe:file4")
1549         d.addCallback(_check_stdout_against, 4)
1550
1551         # copy from tahoe into disk
1552         target_filename = os.path.join(self.basedir, "file-out")
1553         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1554         def _check_cp_out((out,err)):
1555             self.failUnless(os.path.exists(target_filename))
1556             got = open(target_filename,"rb").read()
1557             self.failUnlessEqual(got, datas[4])
1558         d.addCallback(_check_cp_out)
1559
1560         # copy from disk to disk (silly case)
1561         target2_filename = os.path.join(self.basedir, "file-out-copy")
1562         d.addCallback(run, "cp", target_filename, target2_filename)
1563         def _check_cp_out2((out,err)):
1564             self.failUnless(os.path.exists(target2_filename))
1565             got = open(target2_filename,"rb").read()
1566             self.failUnlessEqual(got, datas[4])
1567         d.addCallback(_check_cp_out2)
1568
1569         # copy from tahoe into disk, overwriting an existing file
1570         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1571         def _check_cp_out3((out,err)):
1572             self.failUnless(os.path.exists(target_filename))
1573             got = open(target_filename,"rb").read()
1574             self.failUnlessEqual(got, datas[3])
1575         d.addCallback(_check_cp_out3)
1576
1577         # copy from disk into tahoe, overwriting an existing immutable file
1578         d.addCallback(run, "cp", files[5], "tahoe:file4")
1579         d.addCallback(run, "ls")
1580         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1581         d.addCallback(run, "get", "tahoe:file4")
1582         d.addCallback(_check_stdout_against, 5)
1583
1584         # copy from disk into tahoe, overwriting an existing mutable file
1585         d.addCallback(run, "cp", files[5], "tahoe:file3")
1586         d.addCallback(run, "ls")
1587         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1588         d.addCallback(run, "get", "tahoe:file3")
1589         d.addCallback(_check_stdout_against, 5)
1590
1591         # recursive copy: setup
1592         dn = os.path.join(self.basedir, "dir1")
1593         os.makedirs(dn)
1594         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1595         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1596         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1597         sdn2 = os.path.join(dn, "subdir2")
1598         os.makedirs(sdn2)
1599         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1600         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1601
1602         # from disk into tahoe
1603         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1604         d.addCallback(run, "ls")
1605         d.addCallback(_check_ls, ["dir1"])
1606         d.addCallback(run, "ls", "dir1")
1607         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1608                       ["rfile4", "rfile5"])
1609         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1610         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1611                       ["rfile1", "rfile2", "rfile3"])
1612         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1613         d.addCallback(_check_stdout_against, data="rfile4")
1614
1615         # and back out again
1616         dn_copy = os.path.join(self.basedir, "dir1-copy")
1617         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1618         def _check_cp_r_out((out,err)):
1619             def _cmp(name):
1620                 old = open(os.path.join(dn, name), "rb").read()
1621                 newfn = os.path.join(dn_copy, name)
1622                 self.failUnless(os.path.exists(newfn))
1623                 new = open(newfn, "rb").read()
1624                 self.failUnlessEqual(old, new)
1625             _cmp("rfile1")
1626             _cmp("rfile2")
1627             _cmp("rfile3")
1628             _cmp(os.path.join("subdir2", "rfile4"))
1629             _cmp(os.path.join("subdir2", "rfile5"))
1630         d.addCallback(_check_cp_r_out)
1631
1632         # and copy it a second time, which ought to overwrite the same files
1633         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1634
1635         # and tahoe-to-tahoe
1636         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1637         d.addCallback(run, "ls")
1638         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1639         d.addCallback(run, "ls", "dir1-copy")
1640         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1641                       ["rfile4", "rfile5"])
1642         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1643         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1644                       ["rfile1", "rfile2", "rfile3"])
1645         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1646         d.addCallback(_check_stdout_against, data="rfile4")
1647
1648         # and copy it a second time, which ought to overwrite the same files
1649         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1650
1651         # tahoe_ls doesn't currently handle the error correctly: it tries to
1652         # JSON-parse a traceback.
1653 ##         def _ls_missing(res):
1654 ##             argv = ["ls"] + nodeargs + ["bogus"]
1655 ##             return self._run_cli(argv)
1656 ##         d.addCallback(_ls_missing)
1657 ##         def _check_ls_missing((out,err)):
1658 ##             print "OUT", out
1659 ##             print "ERR", err
1660 ##             self.failUnlessEqual(err, "")
1661 ##         d.addCallback(_check_ls_missing)
1662
1663         return d
1664
1665     def _run_cli(self, argv):
1666         #print "CLI:", argv
1667         stdout, stderr = StringIO(), StringIO()
1668         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1669                                   stdout=stdout, stderr=stderr)
1670         def _done(res):
1671             return stdout.getvalue(), stderr.getvalue()
1672         d.addCallback(_done)
1673         return d
1674
1675     def _test_checker(self, res):
1676         ut = upload.Data("too big to be literal" * 200, convergence=None)
1677         d = self._personal_node.add_file(u"big file", ut)
1678
1679         d.addCallback(lambda res: self._personal_node.check())
1680         def _check_dirnode_results(r):
1681             self.failUnless(r.is_healthy())
1682         d.addCallback(_check_dirnode_results)
1683         d.addCallback(lambda res: self._personal_node.check(verify=True))
1684         d.addCallback(_check_dirnode_results)
1685
1686         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1687         def _got_chk_filenode(n):
1688             self.failUnless(isinstance(n, filenode.FileNode))
1689             d = n.check()
1690             def _check_filenode_results(r):
1691                 self.failUnless(r.is_healthy())
1692             d.addCallback(_check_filenode_results)
1693             d.addCallback(lambda res: n.check(verify=True))
1694             d.addCallback(_check_filenode_results)
1695             return d
1696         d.addCallback(_got_chk_filenode)
1697
1698         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1699         def _got_lit_filenode(n):
1700             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1701             d = n.check()
1702             def _check_filenode_results(r):
1703                 self.failUnless(r.is_healthy())
1704             d.addCallback(_check_filenode_results)
1705             d.addCallback(lambda res: n.check(verify=True))
1706             d.addCallback(_check_filenode_results)
1707             return d
1708         d.addCallback(_got_lit_filenode)
1709         return d
1710
1711
1712 class Checker(SystemTestMixin, unittest.TestCase):
1713     def setUp(self):
1714         # Set self.basedir to a temp dir which has the name of the current test method in its
1715         # name.
1716         self.basedir = self.mktemp()
1717         TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
1718
1719         d = defer.maybeDeferred(SystemTestMixin.setUp, self)
1720         d.addCallback(lambda x: self.set_up_nodes())
1721
1722         def _upload_a_file(ignored):
1723             d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
1724             d2.addCallback(lambda u: self.clients[0].create_node_from_uri(u.uri))
1725             return d2
1726         d.addCallback(_upload_a_file)
1727
1728         def _stash_it(filenode):
1729             self.filenode = filenode
1730         d.addCallback(_stash_it)
1731         return d
1732
1733     def _find_shares(self, unused=None):
1734         shares = {} # k: (i, sharenum), v: data
1735
1736         for i, c in enumerate(self.clients):
1737             sharedir = c.getServiceNamed("storage").sharedir
1738             for (dirp, dirns, fns) in os.walk(sharedir):
1739                 for fn in fns:
1740                     try:
1741                         sharenum = int(fn)
1742                     except TypeError:
1743                         # Whoops, I guess that's not a share file then.
1744                         pass
1745                     else:
1746                         data = open(os.path.join(sharedir, dirp, fn), "r").read()
1747                         shares[(i, sharenum)] = data
1748
1749         return shares
1750
1751     def _replace_shares(self, newshares):
1752         for i, c in enumerate(self.clients):
1753             sharedir = c.getServiceNamed("storage").sharedir
1754             for (dirp, dirns, fns) in os.walk(sharedir):
1755                 for fn in fns:
1756                     try:
1757                         sharenum = int(fn)
1758                     except TypeError:
1759                         # Whoops, I guess that's not a share file then.
1760                         pass
1761                     else:
1762                         pathtosharefile = os.path.join(sharedir, dirp, fn)
1763                         os.unlink(pathtosharefile)
1764                         newdata = newshares.get((i, sharenum))
1765                         if newdata is not None:
1766                             open(pathtosharefile, "w").write(newdata)
1767
1768     def _corrupt_a_share(self, unused=None):
1769         """ Exactly one bit of exactly one share on disk will be flipped (randomly selected from
1770         among the bits of the 'share data' -- the verifiable bits)."""
1771
1772         shares = self._find_shares()
1773         ks = shares.keys()
1774         k = random.choice(ks)
1775         data = shares[k]
1776
1777         (version, size, num_leases) = struct.unpack(">LLL", data[:0xc])
1778         sharedata = data[0xc:0xc+size]
1779
1780         corruptedsharedata = testutil.flip_one_bit(sharedata)
1781         corrupteddata = data[:0xc]+corruptedsharedata+data[0xc+size:]
1782         shares[k] = corrupteddata
1783
1784         self._replace_shares(shares)
1785
1786     def test_test_code(self):
1787         # The following process of stashing the shares, running
1788         # _replace_shares, and asserting that the new set of shares equals the
1789         # old is more to test this test code than to test the Tahoe code...
1790         d = defer.succeed(None)
1791         d.addCallback(self._find_shares)
1792         stash = [None]
1793         def _stash_it(res):
1794             stash[0] = res
1795             return res
1796
1797         d.addCallback(_stash_it)
1798         d.addCallback(self._replace_shares)
1799
1800         def _compare(res):
1801             oldshares = stash[0]
1802             self.failUnless(isinstance(oldshares, dict), oldshares)
1803             self.failUnlessEqual(oldshares, res)
1804
1805         d.addCallback(self._find_shares)
1806         d.addCallback(_compare)
1807
1808         d.addCallback(lambda ignore: self._replace_shares({}))
1809         d.addCallback(self._find_shares)
1810         d.addCallback(lambda x: self.failUnlessEqual(x, {}))
1811
1812         return d
1813
1814     def _count_reads(self):
1815         sum_of_read_counts = 0
1816         for client in self.clients:
1817             counters = client.stats_provider.get_stats()['counters']
1818             sum_of_read_counts += counters.get('storage_server.read', 0)
1819         return sum_of_read_counts
1820
1821     def test_check_without_verify(self):
1822         """ Check says the file is healthy when none of the shares have been
1823         touched.  It says that the file is unhealthy when all of them have
1824         been removed. It says that the file is healthy if one bit of one share
1825         has been flipped."""
1826         d = defer.succeed(self.filenode)
1827         def _check1(filenode):
1828             before_check_reads = self._count_reads()
1829
1830             d2 = filenode.check(verify=False, repair=False)
1831             def _after_check(checkresults):
1832                 after_check_reads = self._count_reads()
1833                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
1834                 self.failUnless(checkresults.is_healthy())
1835
1836             d2.addCallback(_after_check)
1837             return d2
1838         d.addCallback(_check1)
1839
1840         d.addCallback(self._corrupt_a_share)
1841         def _check2(ignored):
1842             before_check_reads = self._count_reads()
1843             d2 = self.filenode.check(verify=False, repair=False)
1844
1845             def _after_check(checkresults):
1846                 after_check_reads = self._count_reads()
1847                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
1848
1849             d2.addCallback(_after_check)
1850             return d2
1851         d.addCallback(_check2)
1852         return d
1853
1854         d.addCallback(lambda ignore: self._replace_shares({}))
1855         def _check3(ignored):
1856             before_check_reads = self._count_reads()
1857             d2 = self.filenode.check(verify=False, repair=False)
1858
1859             def _after_check(checkresults):
1860                 after_check_reads = self._count_reads()
1861                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
1862                 self.failIf(checkresults.is_healthy())
1863
1864             d2.addCallback(_after_check)
1865             return d2
1866         d.addCallback(_check3)
1867
1868         return d
1869
1870     def test_check_with_verify(self):
1871         """ Check says the file is healthy when none of the shares have been touched.  It says
1872         that the file is unhealthy if one bit of one share has been flipped."""
1873         DELTA_READS = 10 * 2 # N == 10
1874         d = defer.succeed(self.filenode)
1875         def _check1(filenode):
1876             before_check_reads = self._count_reads()
1877
1878             d2 = filenode.check(verify=True, repair=False)
1879             def _after_check(checkresults):
1880                 after_check_reads = self._count_reads()
1881                 # print "delta was ", after_check_reads - before_check_reads
1882                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
1883                 self.failUnless(checkresults.is_healthy())
1884
1885             d2.addCallback(_after_check)
1886             return d2
1887         d.addCallback(_check1)
1888
1889         d.addCallback(self._corrupt_a_share)
1890         def _check2(ignored):
1891             before_check_reads = self._count_reads()
1892             d2 = self.filenode.check(verify=True, repair=False)
1893
1894             def _after_check(checkresults):
1895                 after_check_reads = self._count_reads()
1896                 # print "delta was ", after_check_reads - before_check_reads
1897                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
1898                 self.failIf(checkresults.is_healthy())
1899
1900             d2.addCallback(_after_check)
1901             return d2
1902         d.addCallback(_check2)
1903         return d
1904     test_check_with_verify.todo = "We haven't implemented a verifier this thorough yet."