]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
4e8b3687870cbafe0257a4b1a92017e343aa0248
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1 from base64 import b32encode
2 import os, random, struct, sys, time, re, simplejson
3 from cStringIO import StringIO
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.internet import threads # CLI tests use deferToThread
7 from twisted.internet.error import ConnectionDone, ConnectionLost
8 import allmydata
9 from allmydata import uri, storage, offloaded
10 from allmydata.immutable import download, upload, filenode
11 from allmydata.util import idlib, mathutil, testutil
12 from allmydata.util import log, base32
13 from allmydata.scripts import runner
14 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
15 from allmydata.mutable.common import NotMutableError
16 from allmydata.mutable import layout as mutable_layout
17 from foolscap import DeadReferenceError
18 from twisted.python.failure import Failure
19 from twisted.web.client import getPage
20 from twisted.web.error import Error
21
22 from allmydata.test.common import SystemTestMixin
23
24 LARGE_DATA = """
25 This is some data to publish to the virtual drive, which needs to be large
26 enough to not fit inside a LIT uri.
27 """
28
29 class CountingDataUploadable(upload.Data):
30     bytes_read = 0
31     interrupt_after = None
32     interrupt_after_d = None
33
34     def read(self, length):
35         self.bytes_read += length
36         if self.interrupt_after is not None:
37             if self.bytes_read > self.interrupt_after:
38                 self.interrupt_after = None
39                 self.interrupt_after_d.callback(self)
40         return upload.Data.read(self, length)
41
42
43 class SystemTest(SystemTestMixin, unittest.TestCase):
44
45     def test_connections(self):
46         self.basedir = "system/SystemTest/test_connections"
47         d = self.set_up_nodes()
48         self.extra_node = None
49         d.addCallback(lambda res: self.add_extra_node(self.numclients))
50         def _check(extra_node):
51             self.extra_node = extra_node
52             for c in self.clients:
53                 all_peerids = list(c.get_all_peerids())
54                 self.failUnlessEqual(len(all_peerids), self.numclients+1)
55                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
56                 self.failUnlessEqual(len(permuted_peers), self.numclients+1)
57
58         d.addCallback(_check)
59         def _shutdown_extra_node(res):
60             if self.extra_node:
61                 return self.extra_node.stopService()
62             return res
63         d.addBoth(_shutdown_extra_node)
64         return d
65     test_connections.timeout = 300
66     # test_connections is subsumed by test_upload_and_download, and takes
67     # quite a while to run on a slow machine (because of all the TLS
68     # connections that must be established). If we ever rework the introducer
69     # code to such an extent that we're not sure if it works anymore, we can
70     # reinstate this test until it does.
71     del test_connections
72
73     def test_upload_and_download_random_key(self):
74         self.basedir = "system/SystemTest/test_upload_and_download_random_key"
75         return self._test_upload_and_download(convergence=None)
76     test_upload_and_download_random_key.timeout = 4800
77
78     def test_upload_and_download_convergent(self):
79         self.basedir = "system/SystemTest/test_upload_and_download_convergent"
80         return self._test_upload_and_download(convergence="some convergence string")
81     test_upload_and_download_convergent.timeout = 4800
82
83     def _test_upload_and_download(self, convergence):
84         # we use 4000 bytes of data, which will result in about 400k written
85         # to disk among all our simulated nodes
86         DATA = "Some data to upload\n" * 200
87         d = self.set_up_nodes()
88         def _check_connections(res):
89             for c in self.clients:
90                 all_peerids = list(c.get_all_peerids())
91                 self.failUnlessEqual(len(all_peerids), self.numclients)
92                 permuted_peers = list(c.get_permuted_peers("storage", "a"))
93                 self.failUnlessEqual(len(permuted_peers), self.numclients)
94         d.addCallback(_check_connections)
95
96         def _do_upload(res):
97             log.msg("UPLOADING")
98             u = self.clients[0].getServiceNamed("uploader")
99             self.uploader = u
100             # we crank the max segsize down to 1024b for the duration of this
101             # test, so we can exercise multiple segments. It is important
102             # that this is not a multiple of the segment size, so that the
103             # tail segment is not the same length as the others. This actualy
104             # gets rounded up to 1025 to be a multiple of the number of
105             # required shares (since we use 25 out of 100 FEC).
106             up = upload.Data(DATA, convergence=convergence)
107             up.max_segment_size = 1024
108             d1 = u.upload(up)
109             return d1
110         d.addCallback(_do_upload)
111         def _upload_done(results):
112             uri = results.uri
113             log.msg("upload finished: uri is %s" % (uri,))
114             self.uri = uri
115             dl = self.clients[1].getServiceNamed("downloader")
116             self.downloader = dl
117         d.addCallback(_upload_done)
118
119         def _upload_again(res):
120             # Upload again. If using convergent encryption then this ought to be
121             # short-circuited, however with the way we currently generate URIs
122             # (i.e. because they include the roothash), we have to do all of the
123             # encoding work, and only get to save on the upload part.
124             log.msg("UPLOADING AGAIN")
125             up = upload.Data(DATA, convergence=convergence)
126             up.max_segment_size = 1024
127             d1 = self.uploader.upload(up)
128         d.addCallback(_upload_again)
129
130         def _download_to_data(res):
131             log.msg("DOWNLOADING")
132             return self.downloader.download_to_data(self.uri)
133         d.addCallback(_download_to_data)
134         def _download_to_data_done(data):
135             log.msg("download finished")
136             self.failUnlessEqual(data, DATA)
137         d.addCallback(_download_to_data_done)
138
139         target_filename = os.path.join(self.basedir, "download.target")
140         def _download_to_filename(res):
141             return self.downloader.download_to_filename(self.uri,
142                                                         target_filename)
143         d.addCallback(_download_to_filename)
144         def _download_to_filename_done(res):
145             newdata = open(target_filename, "rb").read()
146             self.failUnlessEqual(newdata, DATA)
147         d.addCallback(_download_to_filename_done)
148
149         target_filename2 = os.path.join(self.basedir, "download.target2")
150         def _download_to_filehandle(res):
151             fh = open(target_filename2, "wb")
152             return self.downloader.download_to_filehandle(self.uri, fh)
153         d.addCallback(_download_to_filehandle)
154         def _download_to_filehandle_done(fh):
155             fh.close()
156             newdata = open(target_filename2, "rb").read()
157             self.failUnlessEqual(newdata, DATA)
158         d.addCallback(_download_to_filehandle_done)
159
160         def _download_nonexistent_uri(res):
161             baduri = self.mangle_uri(self.uri)
162             log.msg("about to download non-existent URI", level=log.UNUSUAL,
163                     facility="tahoe.tests")
164             d1 = self.downloader.download_to_data(baduri)
165             def _baduri_should_fail(res):
166                 log.msg("finished downloading non-existend URI",
167                         level=log.UNUSUAL, facility="tahoe.tests")
168                 self.failUnless(isinstance(res, Failure))
169                 self.failUnless(res.check(download.NotEnoughSharesError),
170                                 "expected NotEnoughSharesError, got %s" % res)
171                 # TODO: files that have zero peers should get a special kind
172                 # of NotEnoughSharesError, which can be used to suggest that
173                 # the URI might be wrong or that they've never uploaded the
174                 # file in the first place.
175             d1.addBoth(_baduri_should_fail)
176             return d1
177         d.addCallback(_download_nonexistent_uri)
178
179         # add a new node, which doesn't accept shares, and only uses the
180         # helper for upload.
181         d.addCallback(lambda res: self.add_extra_node(self.numclients,
182                                                       self.helper_furl,
183                                                       add_to_sparent=True))
184         def _added(extra_node):
185             self.extra_node = extra_node
186             extra_node.getServiceNamed("storage").sizelimit = 0
187         d.addCallback(_added)
188
189         HELPER_DATA = "Data that needs help to upload" * 1000
190         def _upload_with_helper(res):
191             u = upload.Data(HELPER_DATA, convergence=convergence)
192             d = self.extra_node.upload(u)
193             def _uploaded(results):
194                 uri = results.uri
195                 return self.downloader.download_to_data(uri)
196             d.addCallback(_uploaded)
197             def _check(newdata):
198                 self.failUnlessEqual(newdata, HELPER_DATA)
199             d.addCallback(_check)
200             return d
201         d.addCallback(_upload_with_helper)
202
203         def _upload_duplicate_with_helper(res):
204             u = upload.Data(HELPER_DATA, convergence=convergence)
205             u.debug_stash_RemoteEncryptedUploadable = True
206             d = self.extra_node.upload(u)
207             def _uploaded(results):
208                 uri = results.uri
209                 return self.downloader.download_to_data(uri)
210             d.addCallback(_uploaded)
211             def _check(newdata):
212                 self.failUnlessEqual(newdata, HELPER_DATA)
213                 self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
214                             "uploadable started uploading, should have been avoided")
215             d.addCallback(_check)
216             return d
217         if convergence is not None:
218             d.addCallback(_upload_duplicate_with_helper)
219
220         def _upload_resumable(res):
221             DATA = "Data that needs help to upload and gets interrupted" * 1000
222             u1 = CountingDataUploadable(DATA, convergence=convergence)
223             u2 = CountingDataUploadable(DATA, convergence=convergence)
224
225             # we interrupt the connection after about 5kB by shutting down
226             # the helper, then restartingit.
227             u1.interrupt_after = 5000
228             u1.interrupt_after_d = defer.Deferred()
229             u1.interrupt_after_d.addCallback(lambda res:
230                                              self.bounce_client(0))
231
232             # sneak into the helper and reduce its chunk size, so that our
233             # debug_interrupt will sever the connection on about the fifth
234             # chunk fetched. This makes sure that we've started to write the
235             # new shares before we abandon them, which exercises the
236             # abort/delete-partial-share code. TODO: find a cleaner way to do
237             # this. I know that this will affect later uses of the helper in
238             # this same test run, but I'm not currently worried about it.
239             offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
240
241             d = self.extra_node.upload(u1)
242
243             def _should_not_finish(res):
244                 self.fail("interrupted upload should have failed, not finished"
245                           " with result %s" % (res,))
246             def _interrupted(f):
247                 f.trap(ConnectionLost, ConnectionDone, DeadReferenceError)
248
249                 # make sure we actually interrupted it before finishing the
250                 # file
251                 self.failUnless(u1.bytes_read < len(DATA),
252                                 "read %d out of %d total" % (u1.bytes_read,
253                                                              len(DATA)))
254
255                 log.msg("waiting for reconnect", level=log.NOISY,
256                         facility="tahoe.test.test_system")
257                 # now, we need to give the nodes a chance to notice that this
258                 # connection has gone away. When this happens, the storage
259                 # servers will be told to abort their uploads, removing the
260                 # partial shares. Unfortunately this involves TCP messages
261                 # going through the loopback interface, and we can't easily
262                 # predict how long that will take. If it were all local, we
263                 # could use fireEventually() to stall. Since we don't have
264                 # the right introduction hooks, the best we can do is use a
265                 # fixed delay. TODO: this is fragile.
266                 u1.interrupt_after_d.addCallback(self.stall, 2.0)
267                 return u1.interrupt_after_d
268             d.addCallbacks(_should_not_finish, _interrupted)
269
270             def _disconnected(res):
271                 # check to make sure the storage servers aren't still hanging
272                 # on to the partial share: their incoming/ directories should
273                 # now be empty.
274                 log.msg("disconnected", level=log.NOISY,
275                         facility="tahoe.test.test_system")
276                 for i in range(self.numclients):
277                     incdir = os.path.join(self.getdir("client%d" % i),
278                                           "storage", "shares", "incoming")
279                     self.failIf(os.path.exists(incdir) and os.listdir(incdir))
280             d.addCallback(_disconnected)
281
282             # then we need to give the reconnector a chance to
283             # reestablish the connection to the helper.
284             d.addCallback(lambda res:
285                           log.msg("wait_for_connections", level=log.NOISY,
286                                   facility="tahoe.test.test_system"))
287             d.addCallback(lambda res: self.wait_for_connections())
288
289
290             d.addCallback(lambda res:
291                           log.msg("uploading again", level=log.NOISY,
292                                   facility="tahoe.test.test_system"))
293             d.addCallback(lambda res: self.extra_node.upload(u2))
294
295             def _uploaded(results):
296                 uri = results.uri
297                 log.msg("Second upload complete", level=log.NOISY,
298                         facility="tahoe.test.test_system")
299
300                 # this is really bytes received rather than sent, but it's
301                 # convenient and basically measures the same thing
302                 bytes_sent = results.ciphertext_fetched
303
304                 # We currently don't support resumption of upload if the data is
305                 # encrypted with a random key.  (Because that would require us
306                 # to store the key locally and re-use it on the next upload of
307                 # this file, which isn't a bad thing to do, but we currently
308                 # don't do it.)
309                 if convergence is not None:
310                     # Make sure we did not have to read the whole file the
311                     # second time around .
312                     self.failUnless(bytes_sent < len(DATA),
313                                 "resumption didn't save us any work:"
314                                 " read %d bytes out of %d total" %
315                                 (bytes_sent, len(DATA)))
316                 else:
317                     # Make sure we did have to read the whole file the second
318                     # time around -- because the one that we partially uploaded
319                     # earlier was encrypted with a different random key.
320                     self.failIf(bytes_sent < len(DATA),
321                                 "resumption saved us some work even though we were using random keys:"
322                                 " read %d bytes out of %d total" %
323                                 (bytes_sent, len(DATA)))
324                 return self.downloader.download_to_data(uri)
325             d.addCallback(_uploaded)
326
327             def _check(newdata):
328                 self.failUnlessEqual(newdata, DATA)
329                 # If using convergent encryption, then also check that the
330                 # helper has removed the temp file from its directories.
331                 if convergence is not None:
332                     basedir = os.path.join(self.getdir("client0"), "helper")
333                     files = os.listdir(os.path.join(basedir, "CHK_encoding"))
334                     self.failUnlessEqual(files, [])
335                     files = os.listdir(os.path.join(basedir, "CHK_incoming"))
336                     self.failUnlessEqual(files, [])
337             d.addCallback(_check)
338             return d
339         d.addCallback(_upload_resumable)
340
341         return d
342
343     def _find_shares(self, basedir):
344         shares = []
345         for (dirpath, dirnames, filenames) in os.walk(basedir):
346             if "storage" not in dirpath:
347                 continue
348             if not filenames:
349                 continue
350             pieces = dirpath.split(os.sep)
351             if pieces[-4] == "storage" and pieces[-3] == "shares":
352                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
353                 # are sharefiles here
354                 assert pieces[-5].startswith("client")
355                 client_num = int(pieces[-5][-1])
356                 storage_index_s = pieces[-1]
357                 storage_index = storage.si_a2b(storage_index_s)
358                 for sharename in filenames:
359                     shnum = int(sharename)
360                     filename = os.path.join(dirpath, sharename)
361                     data = (client_num, storage_index, filename, shnum)
362                     shares.append(data)
363         if not shares:
364             self.fail("unable to find any share files in %s" % basedir)
365         return shares
366
367     def _corrupt_mutable_share(self, filename, which):
368         msf = storage.MutableShareFile(filename)
369         datav = msf.readv([ (0, 1000000) ])
370         final_share = datav[0]
371         assert len(final_share) < 1000000 # ought to be truncated
372         pieces = mutable_layout.unpack_share(final_share)
373         (seqnum, root_hash, IV, k, N, segsize, datalen,
374          verification_key, signature, share_hash_chain, block_hash_tree,
375          share_data, enc_privkey) = pieces
376
377         if which == "seqnum":
378             seqnum = seqnum + 15
379         elif which == "R":
380             root_hash = self.flip_bit(root_hash)
381         elif which == "IV":
382             IV = self.flip_bit(IV)
383         elif which == "segsize":
384             segsize = segsize + 15
385         elif which == "pubkey":
386             verification_key = self.flip_bit(verification_key)
387         elif which == "signature":
388             signature = self.flip_bit(signature)
389         elif which == "share_hash_chain":
390             nodenum = share_hash_chain.keys()[0]
391             share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
392         elif which == "block_hash_tree":
393             block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
394         elif which == "share_data":
395             share_data = self.flip_bit(share_data)
396         elif which == "encprivkey":
397             enc_privkey = self.flip_bit(enc_privkey)
398
399         prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
400                                             segsize, datalen)
401         final_share = mutable_layout.pack_share(prefix,
402                                                 verification_key,
403                                                 signature,
404                                                 share_hash_chain,
405                                                 block_hash_tree,
406                                                 share_data,
407                                                 enc_privkey)
408         msf.writev( [(0, final_share)], None)
409
410
411     def test_mutable(self):
412         self.basedir = "system/SystemTest/test_mutable"
413         DATA = "initial contents go here."  # 25 bytes % 3 != 0
414         NEWDATA = "new contents yay"
415         NEWERDATA = "this is getting old"
416
417         d = self.set_up_nodes(use_key_generator=True)
418
419         def _create_mutable(res):
420             c = self.clients[0]
421             log.msg("starting create_mutable_file")
422             d1 = c.create_mutable_file(DATA)
423             def _done(res):
424                 log.msg("DONE: %s" % (res,))
425                 self._mutable_node_1 = res
426                 uri = res.get_uri()
427             d1.addCallback(_done)
428             return d1
429         d.addCallback(_create_mutable)
430
431         def _test_debug(res):
432             # find a share. It is important to run this while there is only
433             # one slot in the grid.
434             shares = self._find_shares(self.basedir)
435             (client_num, storage_index, filename, shnum) = shares[0]
436             log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
437                     % filename)
438             log.msg(" for clients[%d]" % client_num)
439
440             out,err = StringIO(), StringIO()
441             rc = runner.runner(["dump-share",
442                                 filename],
443                                stdout=out, stderr=err)
444             output = out.getvalue()
445             self.failUnlessEqual(rc, 0)
446             try:
447                 self.failUnless("Mutable slot found:\n" in output)
448                 self.failUnless("share_type: SDMF\n" in output)
449                 peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
450                 self.failUnless(" WE for nodeid: %s\n" % peerid in output)
451                 self.failUnless(" num_extra_leases: 0\n" in output)
452                 # the pubkey size can vary by a byte, so the container might
453                 # be a bit larger on some runs.
454                 m = re.search(r'^ container_size: (\d+)$', output, re.M)
455                 self.failUnless(m)
456                 container_size = int(m.group(1))
457                 self.failUnless(2037 <= container_size <= 2049, container_size)
458                 m = re.search(r'^ data_length: (\d+)$', output, re.M)
459                 self.failUnless(m)
460                 data_length = int(m.group(1))
461                 self.failUnless(2037 <= data_length <= 2049, data_length)
462                 self.failUnless("  secrets are for nodeid: %s\n" % peerid
463                                 in output)
464                 self.failUnless(" SDMF contents:\n" in output)
465                 self.failUnless("  seqnum: 1\n" in output)
466                 self.failUnless("  required_shares: 3\n" in output)
467                 self.failUnless("  total_shares: 10\n" in output)
468                 self.failUnless("  segsize: 27\n" in output, (output, filename))
469                 self.failUnless("  datalen: 25\n" in output)
470                 # the exact share_hash_chain nodes depends upon the sharenum,
471                 # and is more of a hassle to compute than I want to deal with
472                 # now
473                 self.failUnless("  share_hash_chain: " in output)
474                 self.failUnless("  block_hash_tree: 1 nodes\n" in output)
475                 expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
476                             base32.b2a(storage_index))
477                 self.failUnless(expected in output)
478             except unittest.FailTest:
479                 print
480                 print "dump-share output was:"
481                 print output
482                 raise
483         d.addCallback(_test_debug)
484
485         # test retrieval
486
487         # first, let's see if we can use the existing node to retrieve the
488         # contents. This allows it to use the cached pubkey and maybe the
489         # latest-known sharemap.
490
491         d.addCallback(lambda res: self._mutable_node_1.download_best_version())
492         def _check_download_1(res):
493             self.failUnlessEqual(res, DATA)
494             # now we see if we can retrieve the data from a new node,
495             # constructed using the URI of the original one. We do this test
496             # on the same client that uploaded the data.
497             uri = self._mutable_node_1.get_uri()
498             log.msg("starting retrieve1")
499             newnode = self.clients[0].create_node_from_uri(uri)
500             newnode_2 = self.clients[0].create_node_from_uri(uri)
501             self.failUnlessIdentical(newnode, newnode_2)
502             return newnode.download_best_version()
503         d.addCallback(_check_download_1)
504
505         def _check_download_2(res):
506             self.failUnlessEqual(res, DATA)
507             # same thing, but with a different client
508             uri = self._mutable_node_1.get_uri()
509             newnode = self.clients[1].create_node_from_uri(uri)
510             log.msg("starting retrieve2")
511             d1 = newnode.download_best_version()
512             d1.addCallback(lambda res: (res, newnode))
513             return d1
514         d.addCallback(_check_download_2)
515
516         def _check_download_3((res, newnode)):
517             self.failUnlessEqual(res, DATA)
518             # replace the data
519             log.msg("starting replace1")
520             d1 = newnode.overwrite(NEWDATA)
521             d1.addCallback(lambda res: newnode.download_best_version())
522             return d1
523         d.addCallback(_check_download_3)
524
525         def _check_download_4(res):
526             self.failUnlessEqual(res, NEWDATA)
527             # now create an even newer node and replace the data on it. This
528             # new node has never been used for download before.
529             uri = self._mutable_node_1.get_uri()
530             newnode1 = self.clients[2].create_node_from_uri(uri)
531             newnode2 = self.clients[3].create_node_from_uri(uri)
532             self._newnode3 = self.clients[3].create_node_from_uri(uri)
533             log.msg("starting replace2")
534             d1 = newnode1.overwrite(NEWERDATA)
535             d1.addCallback(lambda res: newnode2.download_best_version())
536             return d1
537         d.addCallback(_check_download_4)
538
539         def _check_download_5(res):
540             log.msg("finished replace2")
541             self.failUnlessEqual(res, NEWERDATA)
542         d.addCallback(_check_download_5)
543
544         def _corrupt_shares(res):
545             # run around and flip bits in all but k of the shares, to test
546             # the hash checks
547             shares = self._find_shares(self.basedir)
548             ## sort by share number
549             #shares.sort( lambda a,b: cmp(a[3], b[3]) )
550             where = dict([ (shnum, filename)
551                            for (client_num, storage_index, filename, shnum)
552                            in shares ])
553             assert len(where) == 10 # this test is designed for 3-of-10
554             for shnum, filename in where.items():
555                 # shares 7,8,9 are left alone. read will check
556                 # (share_hash_chain, block_hash_tree, share_data). New
557                 # seqnum+R pairs will trigger a check of (seqnum, R, IV,
558                 # segsize, signature).
559                 if shnum == 0:
560                     # read: this will trigger "pubkey doesn't match
561                     # fingerprint".
562                     self._corrupt_mutable_share(filename, "pubkey")
563                     self._corrupt_mutable_share(filename, "encprivkey")
564                 elif shnum == 1:
565                     # triggers "signature is invalid"
566                     self._corrupt_mutable_share(filename, "seqnum")
567                 elif shnum == 2:
568                     # triggers "signature is invalid"
569                     self._corrupt_mutable_share(filename, "R")
570                 elif shnum == 3:
571                     # triggers "signature is invalid"
572                     self._corrupt_mutable_share(filename, "segsize")
573                 elif shnum == 4:
574                     self._corrupt_mutable_share(filename, "share_hash_chain")
575                 elif shnum == 5:
576                     self._corrupt_mutable_share(filename, "block_hash_tree")
577                 elif shnum == 6:
578                     self._corrupt_mutable_share(filename, "share_data")
579                 # other things to correct: IV, signature
580                 # 7,8,9 are left alone
581
582                 # note that initial_query_count=5 means that we'll hit the
583                 # first 5 servers in effectively random order (based upon
584                 # response time), so we won't necessarily ever get a "pubkey
585                 # doesn't match fingerprint" error (if we hit shnum>=1 before
586                 # shnum=0, we pull the pubkey from there). To get repeatable
587                 # specific failures, we need to set initial_query_count=1,
588                 # but of course that will change the sequencing behavior of
589                 # the retrieval process. TODO: find a reasonable way to make
590                 # this a parameter, probably when we expand this test to test
591                 # for one failure mode at a time.
592
593                 # when we retrieve this, we should get three signature
594                 # failures (where we've mangled seqnum, R, and segsize). The
595                 # pubkey mangling
596         d.addCallback(_corrupt_shares)
597
598         d.addCallback(lambda res: self._newnode3.download_best_version())
599         d.addCallback(_check_download_5)
600
601         def _check_empty_file(res):
602             # make sure we can create empty files, this usually screws up the
603             # segsize math
604             d1 = self.clients[2].create_mutable_file("")
605             d1.addCallback(lambda newnode: newnode.download_best_version())
606             d1.addCallback(lambda res: self.failUnlessEqual("", res))
607             return d1
608         d.addCallback(_check_empty_file)
609
610         d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
611         def _created_dirnode(dnode):
612             log.msg("_created_dirnode(%s)" % (dnode,))
613             d1 = dnode.list()
614             d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
615             d1.addCallback(lambda res: dnode.has_child(u"edgar"))
616             d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
617             d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
618             d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
619             d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
620             d1.addCallback(lambda res: dnode.build_manifest())
621             d1.addCallback(lambda manifest:
622                            self.failUnlessEqual(len(manifest), 1))
623             return d1
624         d.addCallback(_created_dirnode)
625
626         def wait_for_c3_kg_conn():
627             return self.clients[3]._key_generator is not None
628         d.addCallback(lambda junk: self.poll(wait_for_c3_kg_conn))
629
630         def check_kg_poolsize(junk, size_delta):
631             self.failUnlessEqual(len(self.key_generator_svc.key_generator.keypool),
632                                  self.key_generator_svc.key_generator.pool_size + size_delta)
633
634         d.addCallback(check_kg_poolsize, 0)
635         d.addCallback(lambda junk: self.clients[3].create_mutable_file('hello, world'))
636         d.addCallback(check_kg_poolsize, -1)
637         d.addCallback(lambda junk: self.clients[3].create_empty_dirnode())
638         d.addCallback(check_kg_poolsize, -2)
639         # use_helper induces use of clients[3], which is the using-key_gen client
640         d.addCallback(lambda junk: self.POST("uri", use_helper=True, t="mkdir", name='george'))
641         d.addCallback(check_kg_poolsize, -3)
642
643         return d
644     # The default 120 second timeout went off when running it under valgrind
645     # on my old Windows laptop, so I'm bumping up the timeout.
646     test_mutable.timeout = 240
647
648     def flip_bit(self, good):
649         return good[:-1] + chr(ord(good[-1]) ^ 0x01)
650
651     def mangle_uri(self, gooduri):
652         # change the key, which changes the storage index, which means we'll
653         # be asking about the wrong file, so nobody will have any shares
654         u = IFileURI(gooduri)
655         u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
656                             uri_extension_hash=u.uri_extension_hash,
657                             needed_shares=u.needed_shares,
658                             total_shares=u.total_shares,
659                             size=u.size)
660         return u2.to_string()
661
662     # TODO: add a test which mangles the uri_extension_hash instead, and
663     # should fail due to not being able to get a valid uri_extension block.
664     # Also a test which sneakily mangles the uri_extension block to change
665     # some of the validation data, so it will fail in the post-download phase
666     # when the file's crypttext integrity check fails. Do the same thing for
667     # the key, which should cause the download to fail the post-download
668     # plaintext_hash check.
669
670     def test_vdrive(self):
671         self.basedir = "system/SystemTest/test_vdrive"
672         self.data = LARGE_DATA
673         d = self.set_up_nodes(use_stats_gatherer=True)
674         d.addCallback(self._test_introweb)
675         d.addCallback(self.log, "starting publish")
676         d.addCallback(self._do_publish1)
677         d.addCallback(self._test_runner)
678         d.addCallback(self._do_publish2)
679         # at this point, we have the following filesystem (where "R" denotes
680         # self._root_directory_uri):
681         # R
682         # R/subdir1
683         # R/subdir1/mydata567
684         # R/subdir1/subdir2/
685         # R/subdir1/subdir2/mydata992
686
687         d.addCallback(lambda res: self.bounce_client(0))
688         d.addCallback(self.log, "bounced client0")
689
690         d.addCallback(self._check_publish1)
691         d.addCallback(self.log, "did _check_publish1")
692         d.addCallback(self._check_publish2)
693         d.addCallback(self.log, "did _check_publish2")
694         d.addCallback(self._do_publish_private)
695         d.addCallback(self.log, "did _do_publish_private")
696         # now we also have (where "P" denotes a new dir):
697         #  P/personal/sekrit data
698         #  P/s2-rw -> /subdir1/subdir2/
699         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
700         d.addCallback(self._check_publish_private)
701         d.addCallback(self.log, "did _check_publish_private")
702         d.addCallback(self._test_web)
703         d.addCallback(self._test_control)
704         d.addCallback(self._test_cli)
705         # P now has four top-level children:
706         # P/personal/sekrit data
707         # P/s2-ro/
708         # P/s2-rw/
709         # P/test_put/  (empty)
710         d.addCallback(self._test_checker)
711         d.addCallback(self._grab_stats)
712         return d
713     test_vdrive.timeout = 1100
714
715     def _test_introweb(self, res):
716         d = getPage(self.introweb_url, method="GET", followRedirect=True)
717         def _check(res):
718             try:
719                 self.failUnless("allmydata: %s" % str(allmydata.__version__)
720                                 in res)
721                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
722                 self.failUnless("Subscription Summary: storage: 5" in res)
723             except unittest.FailTest:
724                 print
725                 print "GET %s output was:" % self.introweb_url
726                 print res
727                 raise
728         d.addCallback(_check)
729         d.addCallback(lambda res:
730                       getPage(self.introweb_url + "?t=json",
731                               method="GET", followRedirect=True))
732         def _check_json(res):
733             data = simplejson.loads(res)
734             try:
735                 self.failUnlessEqual(data["subscription_summary"],
736                                      {"storage": 5})
737                 self.failUnlessEqual(data["announcement_summary"],
738                                      {"storage": 5, "stub_client": 5})
739             except unittest.FailTest:
740                 print
741                 print "GET %s?t=json output was:" % self.introweb_url
742                 print res
743                 raise
744         d.addCallback(_check_json)
745         return d
746
747     def _do_publish1(self, res):
748         ut = upload.Data(self.data, convergence=None)
749         c0 = self.clients[0]
750         d = c0.create_empty_dirnode()
751         def _made_root(new_dirnode):
752             self._root_directory_uri = new_dirnode.get_uri()
753             return c0.create_node_from_uri(self._root_directory_uri)
754         d.addCallback(_made_root)
755         d.addCallback(lambda root: root.create_empty_directory(u"subdir1"))
756         def _made_subdir1(subdir1_node):
757             self._subdir1_node = subdir1_node
758             d1 = subdir1_node.add_file(u"mydata567", ut)
759             d1.addCallback(self.log, "publish finished")
760             def _stash_uri(filenode):
761                 self.uri = filenode.get_uri()
762             d1.addCallback(_stash_uri)
763             return d1
764         d.addCallback(_made_subdir1)
765         return d
766
767     def _do_publish2(self, res):
768         ut = upload.Data(self.data, convergence=None)
769         d = self._subdir1_node.create_empty_directory(u"subdir2")
770         d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
771         return d
772
773     def log(self, res, msg, **kwargs):
774         # print "MSG: %s  RES: %s" % (msg, res)
775         log.msg(msg, **kwargs)
776         return res
777
778     def _do_publish_private(self, res):
779         self.smalldata = "sssh, very secret stuff"
780         ut = upload.Data(self.smalldata, convergence=None)
781         d = self.clients[0].create_empty_dirnode()
782         d.addCallback(self.log, "GOT private directory")
783         def _got_new_dir(privnode):
784             rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
785             d1 = privnode.create_empty_directory(u"personal")
786             d1.addCallback(self.log, "made P/personal")
787             d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
788             d1.addCallback(self.log, "made P/personal/sekrit data")
789             d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
790             def _got_s2(s2node):
791                 d2 = privnode.set_uri(u"s2-rw", s2node.get_uri())
792                 d2.addCallback(lambda node: privnode.set_uri(u"s2-ro", s2node.get_readonly_uri()))
793                 return d2
794             d1.addCallback(_got_s2)
795             d1.addCallback(lambda res: privnode)
796             return d1
797         d.addCallback(_got_new_dir)
798         return d
799
800     def _check_publish1(self, res):
801         # this one uses the iterative API
802         c1 = self.clients[1]
803         d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
804         d.addCallback(self.log, "check_publish1 got /")
805         d.addCallback(lambda root: root.get(u"subdir1"))
806         d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
807         d.addCallback(lambda filenode: filenode.download_to_data())
808         d.addCallback(self.log, "get finished")
809         def _get_done(data):
810             self.failUnlessEqual(data, self.data)
811         d.addCallback(_get_done)
812         return d
813
814     def _check_publish2(self, res):
815         # this one uses the path-based API
816         rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
817         d = rootnode.get_child_at_path(u"subdir1")
818         d.addCallback(lambda dirnode:
819                       self.failUnless(IDirectoryNode.providedBy(dirnode)))
820         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
821         d.addCallback(lambda filenode: filenode.download_to_data())
822         d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
823
824         d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
825         def _got_filenode(filenode):
826             fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
827             assert fnode == filenode
828         d.addCallback(_got_filenode)
829         return d
830
831     def _check_publish_private(self, resnode):
832         # this one uses the path-based API
833         self._private_node = resnode
834
835         d = self._private_node.get_child_at_path(u"personal")
836         def _got_personal(personal):
837             self._personal_node = personal
838             return personal
839         d.addCallback(_got_personal)
840
841         d.addCallback(lambda dirnode:
842                       self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
843         def get_path(path):
844             return self._private_node.get_child_at_path(path)
845
846         d.addCallback(lambda res: get_path(u"personal/sekrit data"))
847         d.addCallback(lambda filenode: filenode.download_to_data())
848         d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
849         d.addCallback(lambda res: get_path(u"s2-rw"))
850         d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
851         d.addCallback(lambda res: get_path(u"s2-ro"))
852         def _got_s2ro(dirnode):
853             self.failUnless(dirnode.is_mutable(), dirnode)
854             self.failUnless(dirnode.is_readonly(), dirnode)
855             d1 = defer.succeed(None)
856             d1.addCallback(lambda res: dirnode.list())
857             d1.addCallback(self.log, "dirnode.list")
858
859             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mkdir(nope)", None, dirnode.create_empty_directory, u"nope"))
860
861             d1.addCallback(self.log, "doing add_file(ro)")
862             ut = upload.Data("I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence="99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
863             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
864
865             d1.addCallback(self.log, "doing get(ro)")
866             d1.addCallback(lambda res: dirnode.get(u"mydata992"))
867             d1.addCallback(lambda filenode:
868                            self.failUnless(IFileNode.providedBy(filenode)))
869
870             d1.addCallback(self.log, "doing delete(ro)")
871             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
872
873             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri))
874
875             d1.addCallback(lambda res: self.shouldFail2(KeyError, "get(missing)", "'missing'", dirnode.get, u"missing"))
876
877             personal = self._personal_node
878             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
879
880             d1.addCallback(self.log, "doing move_child_to(ro)2")
881             d1.addCallback(lambda res: self.shouldFail2(NotMutableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
882
883             d1.addCallback(self.log, "finished with _got_s2ro")
884             return d1
885         d.addCallback(_got_s2ro)
886         def _got_home(dummy):
887             home = self._private_node
888             personal = self._personal_node
889             d1 = defer.succeed(None)
890             d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
891             d1.addCallback(lambda res:
892                            personal.move_child_to(u"sekrit data",home,u"sekrit"))
893
894             d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
895             d1.addCallback(lambda res:
896                            home.move_child_to(u"sekrit", home, u"sekrit data"))
897
898             d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
899             d1.addCallback(lambda res:
900                            home.move_child_to(u"sekrit data", personal))
901
902             d1.addCallback(lambda res: home.build_manifest())
903             d1.addCallback(self.log, "manifest")
904             #  four items:
905             # P/personal/
906             # P/personal/sekrit data
907             # P/s2-rw  (same as P/s2-ro)
908             # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
909             d1.addCallback(lambda manifest:
910                            self.failUnlessEqual(len(manifest), 4))
911             d1.addCallback(lambda res: home.deep_stats())
912             def _check_stats(stats):
913                 expected = {"count-immutable-files": 1,
914                             "count-mutable-files": 0,
915                             "count-literal-files": 1,
916                             "count-files": 2,
917                             "count-directories": 3,
918                             "size-immutable-files": 112,
919                             "size-literal-files": 23,
920                             #"size-directories": 616, # varies
921                             #"largest-directory": 616,
922                             "largest-directory-children": 3,
923                             "largest-immutable-file": 112,
924                             }
925                 for k,v in expected.iteritems():
926                     self.failUnlessEqual(stats[k], v,
927                                          "stats[%s] was %s, not %s" %
928                                          (k, stats[k], v))
929                 self.failUnless(stats["size-directories"] > 1300,
930                                 stats["size-directories"])
931                 self.failUnless(stats["largest-directory"] > 800,
932                                 stats["largest-directory"])
933                 self.failUnlessEqual(stats["size-files-histogram"],
934                                      [ (11, 31, 1), (101, 316, 1) ])
935             d1.addCallback(_check_stats)
936             return d1
937         d.addCallback(_got_home)
938         return d
939
940     def shouldFail(self, res, expected_failure, which, substring=None):
941         if isinstance(res, Failure):
942             res.trap(expected_failure)
943             if substring:
944                 self.failUnless(substring in str(res),
945                                 "substring '%s' not in '%s'"
946                                 % (substring, str(res)))
947         else:
948             self.fail("%s was supposed to raise %s, not get '%s'" %
949                       (which, expected_failure, res))
950
951     def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
952         assert substring is None or isinstance(substring, str)
953         d = defer.maybeDeferred(callable, *args, **kwargs)
954         def done(res):
955             if isinstance(res, Failure):
956                 res.trap(expected_failure)
957                 if substring:
958                     self.failUnless(substring in str(res),
959                                     "substring '%s' not in '%s'"
960                                     % (substring, str(res)))
961             else:
962                 self.fail("%s was supposed to raise %s, not get '%s'" %
963                           (which, expected_failure, res))
964         d.addBoth(done)
965         return d
966
967     def PUT(self, urlpath, data):
968         url = self.webish_url + urlpath
969         return getPage(url, method="PUT", postdata=data)
970
971     def GET(self, urlpath, followRedirect=False):
972         url = self.webish_url + urlpath
973         return getPage(url, method="GET", followRedirect=followRedirect)
974
975     def POST(self, urlpath, followRedirect=False, use_helper=False, **fields):
976         if use_helper:
977             url = self.helper_webish_url + urlpath
978         else:
979             url = self.webish_url + urlpath
980         sepbase = "boogabooga"
981         sep = "--" + sepbase
982         form = []
983         form.append(sep)
984         form.append('Content-Disposition: form-data; name="_charset"')
985         form.append('')
986         form.append('UTF-8')
987         form.append(sep)
988         for name, value in fields.iteritems():
989             if isinstance(value, tuple):
990                 filename, value = value
991                 form.append('Content-Disposition: form-data; name="%s"; '
992                             'filename="%s"' % (name, filename.encode("utf-8")))
993             else:
994                 form.append('Content-Disposition: form-data; name="%s"' % name)
995             form.append('')
996             form.append(str(value))
997             form.append(sep)
998         form[-1] += "--"
999         body = "\r\n".join(form) + "\r\n"
1000         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
1001                    }
1002         return getPage(url, method="POST", postdata=body,
1003                        headers=headers, followRedirect=followRedirect)
1004
1005     def _test_web(self, res):
1006         base = self.webish_url
1007         public = "uri/" + self._root_directory_uri
1008         d = getPage(base)
1009         def _got_welcome(page):
1010             expected = "Connected Storage Servers: <span>%d</span>" % (self.numclients)
1011             self.failUnless(expected in page,
1012                             "I didn't see the right 'connected storage servers'"
1013                             " message in: %s" % page
1014                             )
1015             expected = "My nodeid: <span>%s</span>" % (b32encode(self.clients[0].nodeid).lower(),)
1016             self.failUnless(expected in page,
1017                             "I didn't see the right 'My nodeid' message "
1018                             "in: %s" % page)
1019             self.failUnless("Helper: 0 active uploads" in page)
1020         d.addCallback(_got_welcome)
1021         d.addCallback(self.log, "done with _got_welcome")
1022
1023         # get the welcome page from the node that uses the helper too
1024         d.addCallback(lambda res: getPage(self.helper_webish_url))
1025         def _got_welcome_helper(page):
1026             self.failUnless("Connected to helper?: <span>yes</span>" in page,
1027                             page)
1028             self.failUnless("Not running helper" in page)
1029         d.addCallback(_got_welcome_helper)
1030
1031         d.addCallback(lambda res: getPage(base + public))
1032         d.addCallback(lambda res: getPage(base + public + "/subdir1"))
1033         def _got_subdir1(page):
1034             # there ought to be an href for our file
1035             self.failUnless(("<td>%d</td>" % len(self.data)) in page)
1036             self.failUnless(">mydata567</a>" in page)
1037         d.addCallback(_got_subdir1)
1038         d.addCallback(self.log, "done with _got_subdir1")
1039         d.addCallback(lambda res:
1040                       getPage(base + public + "/subdir1/mydata567"))
1041         def _got_data(page):
1042             self.failUnlessEqual(page, self.data)
1043         d.addCallback(_got_data)
1044
1045         # download from a URI embedded in a URL
1046         d.addCallback(self.log, "_get_from_uri")
1047         def _get_from_uri(res):
1048             return getPage(base + "uri/%s?filename=%s"
1049                            % (self.uri, "mydata567"))
1050         d.addCallback(_get_from_uri)
1051         def _got_from_uri(page):
1052             self.failUnlessEqual(page, self.data)
1053         d.addCallback(_got_from_uri)
1054
1055         # download from a URI embedded in a URL, second form
1056         d.addCallback(self.log, "_get_from_uri2")
1057         def _get_from_uri2(res):
1058             return getPage(base + "uri?uri=%s" % (self.uri,))
1059         d.addCallback(_get_from_uri2)
1060         d.addCallback(_got_from_uri)
1061
1062         # download from a bogus URI, make sure we get a reasonable error
1063         d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1064         def _get_from_bogus_uri(res):
1065             d1 = getPage(base + "uri/%s?filename=%s"
1066                          % (self.mangle_uri(self.uri), "mydata567"))
1067             d1.addBoth(self.shouldFail, Error, "downloading bogus URI",
1068                        "410")
1069             return d1
1070         d.addCallback(_get_from_bogus_uri)
1071         d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1072
1073         # upload a file with PUT
1074         d.addCallback(self.log, "about to try PUT")
1075         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1076                                            "new.txt contents"))
1077         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1078         d.addCallback(self.failUnlessEqual, "new.txt contents")
1079         # and again with something large enough to use multiple segments,
1080         # and hopefully trigger pauseProducing too
1081         d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1082                                            "big" * 500000)) # 1.5MB
1083         d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1084         d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1085
1086         # can we replace files in place?
1087         d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1088                                            "NEWER contents"))
1089         d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1090         d.addCallback(self.failUnlessEqual, "NEWER contents")
1091
1092         # test unlinked POST
1093         d.addCallback(lambda res: self.POST("uri", t="upload",
1094                                             file=("new.txt", "data" * 10000)))
1095         # and again using the helper, which exercises different upload-status
1096         # display code
1097         d.addCallback(lambda res: self.POST("uri", use_helper=True, t="upload",
1098                                             file=("foo.txt", "data2" * 10000)))
1099
1100         # check that the status page exists
1101         d.addCallback(lambda res: self.GET("status", followRedirect=True))
1102         def _got_status(res):
1103             # find an interesting upload and download to look at. LIT files
1104             # are not interesting.
1105             for ds in self.clients[0].list_all_download_statuses():
1106                 if ds.get_size() > 200:
1107                     self._down_status = ds.get_counter()
1108             for us in self.clients[0].list_all_upload_statuses():
1109                 if us.get_size() > 200:
1110                     self._up_status = us.get_counter()
1111             rs = self.clients[0].list_all_retrieve_statuses()[0]
1112             self._retrieve_status = rs.get_counter()
1113             ps = self.clients[0].list_all_publish_statuses()[0]
1114             self._publish_status = ps.get_counter()
1115             us = self.clients[0].list_all_mapupdate_statuses()[0]
1116             self._update_status = us.get_counter()
1117
1118             # and that there are some upload- and download- status pages
1119             return self.GET("status/up-%d" % self._up_status)
1120         d.addCallback(_got_status)
1121         def _got_up(res):
1122             return self.GET("status/down-%d" % self._down_status)
1123         d.addCallback(_got_up)
1124         def _got_down(res):
1125             return self.GET("status/mapupdate-%d" % self._update_status)
1126         d.addCallback(_got_down)
1127         def _got_update(res):
1128             return self.GET("status/publish-%d" % self._publish_status)
1129         d.addCallback(_got_update)
1130         def _got_publish(res):
1131             return self.GET("status/retrieve-%d" % self._retrieve_status)
1132         d.addCallback(_got_publish)
1133
1134         # check that the helper status page exists
1135         d.addCallback(lambda res:
1136                       self.GET("helper_status", followRedirect=True))
1137         def _got_helper_status(res):
1138             self.failUnless("Bytes Fetched:" in res)
1139             # touch a couple of files in the helper's working directory to
1140             # exercise more code paths
1141             workdir = os.path.join(self.getdir("client0"), "helper")
1142             incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1143             f = open(incfile, "wb")
1144             f.write("small file")
1145             f.close()
1146             then = time.time() - 86400*3
1147             now = time.time()
1148             os.utime(incfile, (now, then))
1149             encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1150             f = open(encfile, "wb")
1151             f.write("less small file")
1152             f.close()
1153             os.utime(encfile, (now, then))
1154         d.addCallback(_got_helper_status)
1155         # and that the json form exists
1156         d.addCallback(lambda res:
1157                       self.GET("helper_status?t=json", followRedirect=True))
1158         def _got_helper_status_json(res):
1159             data = simplejson.loads(res)
1160             self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1161                                  1)
1162             self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1163             self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1164             self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1165                                  10)
1166             self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1167             self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1168             self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1169                                  15)
1170         d.addCallback(_got_helper_status_json)
1171
1172         # and check that client[3] (which uses a helper but does not run one
1173         # itself) doesn't explode when you ask for its status
1174         d.addCallback(lambda res: getPage(self.helper_webish_url + "status/"))
1175         def _got_non_helper_status(res):
1176             self.failUnless("Upload and Download Status" in res)
1177         d.addCallback(_got_non_helper_status)
1178
1179         # or for helper status with t=json
1180         d.addCallback(lambda res:
1181                       getPage(self.helper_webish_url + "helper_status?t=json"))
1182         def _got_non_helper_status_json(res):
1183             data = simplejson.loads(res)
1184             self.failUnlessEqual(data, {})
1185         d.addCallback(_got_non_helper_status_json)
1186
1187         # see if the statistics page exists
1188         d.addCallback(lambda res: self.GET("statistics"))
1189         def _got_stats(res):
1190             self.failUnless("Node Statistics" in res)
1191             self.failUnless("  'downloader.files_downloaded': 5," in res, res)
1192         d.addCallback(_got_stats)
1193         d.addCallback(lambda res: self.GET("statistics?t=json"))
1194         def _got_stats_json(res):
1195             data = simplejson.loads(res)
1196             self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1197             self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1198         d.addCallback(_got_stats_json)
1199
1200         # TODO: mangle the second segment of a file, to test errors that
1201         # occur after we've already sent some good data, which uses a
1202         # different error path.
1203
1204         # TODO: download a URI with a form
1205         # TODO: create a directory by using a form
1206         # TODO: upload by using a form on the directory page
1207         #    url = base + "somedir/subdir1/freeform_post!!upload"
1208         # TODO: delete a file by using a button on the directory page
1209
1210         return d
1211
1212     def _test_runner(self, res):
1213         # exercise some of the diagnostic tools in runner.py
1214
1215         # find a share
1216         for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1217             if "storage" not in dirpath:
1218                 continue
1219             if not filenames:
1220                 continue
1221             pieces = dirpath.split(os.sep)
1222             if pieces[-4] == "storage" and pieces[-3] == "shares":
1223                 # we're sitting in .../storage/shares/$START/$SINDEX , and there
1224                 # are sharefiles here
1225                 filename = os.path.join(dirpath, filenames[0])
1226                 # peek at the magic to see if it is a chk share
1227                 magic = open(filename, "rb").read(4)
1228                 if magic == '\x00\x00\x00\x01':
1229                     break
1230         else:
1231             self.fail("unable to find any uri_extension files in %s"
1232                       % self.basedir)
1233         log.msg("test_system.SystemTest._test_runner using %s" % filename)
1234
1235         out,err = StringIO(), StringIO()
1236         rc = runner.runner(["dump-share",
1237                             filename],
1238                            stdout=out, stderr=err)
1239         output = out.getvalue()
1240         self.failUnlessEqual(rc, 0)
1241
1242         # we only upload a single file, so we can assert some things about
1243         # its size and shares.
1244         self.failUnless(("share filename: %s" % filename) in output)
1245         self.failUnless("size: %d\n" % len(self.data) in output)
1246         self.failUnless("num_segments: 1\n" in output)
1247         # segment_size is always a multiple of needed_shares
1248         self.failUnless("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3) in output)
1249         self.failUnless("total_shares: 10\n" in output)
1250         # keys which are supposed to be present
1251         for key in ("size", "num_segments", "segment_size",
1252                     "needed_shares", "total_shares",
1253                     "codec_name", "codec_params", "tail_codec_params",
1254                     #"plaintext_hash", "plaintext_root_hash",
1255                     "crypttext_hash", "crypttext_root_hash",
1256                     "share_root_hash", "UEB_hash"):
1257             self.failUnless("%s: " % key in output, key)
1258         self.failUnless("  verify-cap: URI:CHK-Verifier:" in output)
1259
1260         # now use its storage index to find the other shares using the
1261         # 'find-shares' tool
1262         sharedir, shnum = os.path.split(filename)
1263         storagedir, storage_index_s = os.path.split(sharedir)
1264         out,err = StringIO(), StringIO()
1265         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1266         cmd = ["find-shares", storage_index_s] + nodedirs
1267         rc = runner.runner(cmd, stdout=out, stderr=err)
1268         self.failUnlessEqual(rc, 0)
1269         out.seek(0)
1270         sharefiles = [sfn.strip() for sfn in out.readlines()]
1271         self.failUnlessEqual(len(sharefiles), 10)
1272
1273         # also exercise the 'catalog-shares' tool
1274         out,err = StringIO(), StringIO()
1275         nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1276         cmd = ["catalog-shares"] + nodedirs
1277         rc = runner.runner(cmd, stdout=out, stderr=err)
1278         self.failUnlessEqual(rc, 0)
1279         out.seek(0)
1280         descriptions = [sfn.strip() for sfn in out.readlines()]
1281         self.failUnlessEqual(len(descriptions), 30)
1282         matching = [line
1283                     for line in descriptions
1284                     if line.startswith("CHK %s " % storage_index_s)]
1285         self.failUnlessEqual(len(matching), 10)
1286
1287     def _test_control(self, res):
1288         # exercise the remote-control-the-client foolscap interfaces in
1289         # allmydata.control (mostly used for performance tests)
1290         c0 = self.clients[0]
1291         control_furl_file = os.path.join(c0.basedir, "private", "control.furl")
1292         control_furl = open(control_furl_file, "r").read().strip()
1293         # it doesn't really matter which Tub we use to connect to the client,
1294         # so let's just use our IntroducerNode's
1295         d = self.introducer.tub.getReference(control_furl)
1296         d.addCallback(self._test_control2, control_furl_file)
1297         return d
1298     def _test_control2(self, rref, filename):
1299         d = rref.callRemote("upload_from_file_to_uri", filename, convergence=None)
1300         downfile = os.path.join(self.basedir, "control.downfile")
1301         d.addCallback(lambda uri:
1302                       rref.callRemote("download_from_uri_to_file",
1303                                       uri, downfile))
1304         def _check(res):
1305             self.failUnlessEqual(res, downfile)
1306             data = open(downfile, "r").read()
1307             expected_data = open(filename, "r").read()
1308             self.failUnlessEqual(data, expected_data)
1309         d.addCallback(_check)
1310         d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
1311         if sys.platform == "linux2":
1312             d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
1313         d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
1314         return d
1315
1316     def _test_cli(self, res):
1317         # run various CLI commands (in a thread, since they use blocking
1318         # network calls)
1319
1320         private_uri = self._private_node.get_uri()
1321         some_uri = self._root_directory_uri
1322         client0_basedir = self.getdir("client0")
1323
1324         nodeargs = [
1325             "--node-directory", client0_basedir,
1326             ]
1327         TESTDATA = "I will not write the same thing over and over.\n" * 100
1328
1329         d = defer.succeed(None)
1330
1331         # for compatibility with earlier versions, private/root_dir.cap is
1332         # supposed to be treated as an alias named "tahoe:". Start by making
1333         # sure that works, before we add other aliases.
1334
1335         root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1336         f = open(root_file, "w")
1337         f.write(private_uri)
1338         f.close()
1339
1340         def run(ignored, verb, *args, **kwargs):
1341             stdin = kwargs.get("stdin", "")
1342             newargs = [verb] + nodeargs + list(args)
1343             return self._run_cli(newargs, stdin=stdin)
1344
1345         def _check_ls((out,err), expected_children, unexpected_children=[]):
1346             self.failUnlessEqual(err, "")
1347             for s in expected_children:
1348                 self.failUnless(s in out, (s,out))
1349             for s in unexpected_children:
1350                 self.failIf(s in out, (s,out))
1351
1352         def _check_ls_root((out,err)):
1353             self.failUnless("personal" in out)
1354             self.failUnless("s2-ro" in out)
1355             self.failUnless("s2-rw" in out)
1356             self.failUnlessEqual(err, "")
1357
1358         # this should reference private_uri
1359         d.addCallback(run, "ls")
1360         d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1361
1362         d.addCallback(run, "list-aliases")
1363         def _check_aliases_1((out,err)):
1364             self.failUnlessEqual(err, "")
1365             self.failUnlessEqual(out, "tahoe: %s\n" % private_uri)
1366         d.addCallback(_check_aliases_1)
1367
1368         # now that that's out of the way, remove root_dir.cap and work with
1369         # new files
1370         d.addCallback(lambda res: os.unlink(root_file))
1371         d.addCallback(run, "list-aliases")
1372         def _check_aliases_2((out,err)):
1373             self.failUnlessEqual(err, "")
1374             self.failUnlessEqual(out, "")
1375         d.addCallback(_check_aliases_2)
1376
1377         d.addCallback(run, "mkdir")
1378         def _got_dir( (out,err) ):
1379             self.failUnless(uri.from_string_dirnode(out.strip()))
1380             return out.strip()
1381         d.addCallback(_got_dir)
1382         d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1383
1384         d.addCallback(run, "list-aliases")
1385         def _check_aliases_3((out,err)):
1386             self.failUnlessEqual(err, "")
1387             self.failUnless("tahoe: " in out)
1388         d.addCallback(_check_aliases_3)
1389
1390         def _check_empty_dir((out,err)):
1391             self.failUnlessEqual(out, "")
1392             self.failUnlessEqual(err, "")
1393         d.addCallback(run, "ls")
1394         d.addCallback(_check_empty_dir)
1395
1396         def _check_missing_dir((out,err)):
1397             # TODO: check that rc==2
1398             self.failUnlessEqual(out, "")
1399             self.failUnlessEqual(err, "No such file or directory\n")
1400         d.addCallback(run, "ls", "bogus")
1401         d.addCallback(_check_missing_dir)
1402
1403         files = []
1404         datas = []
1405         for i in range(10):
1406             fn = os.path.join(self.basedir, "file%d" % i)
1407             files.append(fn)
1408             data = "data to be uploaded: file%d\n" % i
1409             datas.append(data)
1410             open(fn,"wb").write(data)
1411
1412         def _check_stdout_against((out,err), filenum=None, data=None):
1413             self.failUnlessEqual(err, "")
1414             if filenum is not None:
1415                 self.failUnlessEqual(out, datas[filenum])
1416             if data is not None:
1417                 self.failUnlessEqual(out, data)
1418
1419         # test all both forms of put: from a file, and from stdin
1420         #  tahoe put bar FOO
1421         d.addCallback(run, "put", files[0], "tahoe-file0")
1422         def _put_out((out,err)):
1423             self.failUnless("URI:LIT:" in out, out)
1424             self.failUnless("201 Created" in err, err)
1425             uri0 = out.strip()
1426             return run(None, "get", uri0)
1427         d.addCallback(_put_out)
1428         d.addCallback(lambda (out,err): self.failUnlessEqual(out, datas[0]))
1429
1430         d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1431         #  tahoe put bar tahoe:FOO
1432         d.addCallback(run, "put", files[2], "tahoe:file2")
1433         d.addCallback(run, "put", "--mutable", files[3], "tahoe:file3")
1434         def _check_put_mutable((out,err)):
1435             self._mutable_file3_uri = out.strip()
1436         d.addCallback(_check_put_mutable)
1437         d.addCallback(run, "get", "tahoe:file3")
1438         d.addCallback(_check_stdout_against, 3)
1439
1440         #  tahoe put FOO
1441         STDIN_DATA = "This is the file to upload from stdin."
1442         d.addCallback(run, "put", "tahoe-file-stdin", stdin=STDIN_DATA)
1443         #  tahoe put tahoe:FOO
1444         d.addCallback(run, "put", "tahoe:from-stdin",
1445                       stdin="Other file from stdin.")
1446
1447         d.addCallback(run, "ls")
1448         d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1449                                   "tahoe-file-stdin", "from-stdin"])
1450         d.addCallback(run, "ls", "subdir")
1451         d.addCallback(_check_ls, ["tahoe-file1"])
1452
1453         # tahoe mkdir FOO
1454         d.addCallback(run, "mkdir", "subdir2")
1455         d.addCallback(run, "ls")
1456         # TODO: extract the URI, set an alias with it
1457         d.addCallback(_check_ls, ["subdir2"])
1458
1459         # tahoe get: (to stdin and to a file)
1460         d.addCallback(run, "get", "tahoe-file0")
1461         d.addCallback(_check_stdout_against, 0)
1462         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1463         d.addCallback(_check_stdout_against, 1)
1464         outfile0 = os.path.join(self.basedir, "outfile0")
1465         d.addCallback(run, "get", "file2", outfile0)
1466         def _check_outfile0((out,err)):
1467             data = open(outfile0,"rb").read()
1468             self.failUnlessEqual(data, "data to be uploaded: file2\n")
1469         d.addCallback(_check_outfile0)
1470         outfile1 = os.path.join(self.basedir, "outfile0")
1471         d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1472         def _check_outfile1((out,err)):
1473             data = open(outfile1,"rb").read()
1474             self.failUnlessEqual(data, "data to be uploaded: file1\n")
1475         d.addCallback(_check_outfile1)
1476
1477         d.addCallback(run, "rm", "tahoe-file0")
1478         d.addCallback(run, "rm", "tahoe:file2")
1479         d.addCallback(run, "ls")
1480         d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1481
1482         d.addCallback(run, "ls", "-l")
1483         def _check_ls_l((out,err)):
1484             lines = out.split("\n")
1485             for l in lines:
1486                 if "tahoe-file-stdin" in l:
1487                     self.failUnless(l.startswith("-r-- "), l)
1488                     self.failUnless(" %d " % len(STDIN_DATA) in l)
1489                 if "file3" in l:
1490                     self.failUnless(l.startswith("-rw- "), l) # mutable
1491         d.addCallback(_check_ls_l)
1492
1493         d.addCallback(run, "ls", "--uri")
1494         def _check_ls_uri((out,err)):
1495             lines = out.split("\n")
1496             for l in lines:
1497                 if "file3" in l:
1498                     self.failUnless(self._mutable_file3_uri in l)
1499         d.addCallback(_check_ls_uri)
1500
1501         d.addCallback(run, "ls", "--readonly-uri")
1502         def _check_ls_rouri((out,err)):
1503             lines = out.split("\n")
1504             for l in lines:
1505                 if "file3" in l:
1506                     rw_uri = self._mutable_file3_uri
1507                     u = uri.from_string_mutable_filenode(rw_uri)
1508                     ro_uri = u.get_readonly().to_string()
1509                     self.failUnless(ro_uri in l)
1510         d.addCallback(_check_ls_rouri)
1511
1512
1513         d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1514         d.addCallback(run, "ls")
1515         d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1516
1517         d.addCallback(run, "ln", "tahoe-moved", "newlink")
1518         d.addCallback(run, "ls")
1519         d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1520
1521         d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1522         d.addCallback(run, "ls")
1523         d.addCallback(_check_ls, ["file3", "file3-copy"])
1524         d.addCallback(run, "get", "tahoe:file3-copy")
1525         d.addCallback(_check_stdout_against, 3)
1526
1527         # copy from disk into tahoe
1528         d.addCallback(run, "cp", files[4], "tahoe:file4")
1529         d.addCallback(run, "ls")
1530         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1531         d.addCallback(run, "get", "tahoe:file4")
1532         d.addCallback(_check_stdout_against, 4)
1533
1534         # copy from tahoe into disk
1535         target_filename = os.path.join(self.basedir, "file-out")
1536         d.addCallback(run, "cp", "tahoe:file4", target_filename)
1537         def _check_cp_out((out,err)):
1538             self.failUnless(os.path.exists(target_filename))
1539             got = open(target_filename,"rb").read()
1540             self.failUnlessEqual(got, datas[4])
1541         d.addCallback(_check_cp_out)
1542
1543         # copy from disk to disk (silly case)
1544         target2_filename = os.path.join(self.basedir, "file-out-copy")
1545         d.addCallback(run, "cp", target_filename, target2_filename)
1546         def _check_cp_out2((out,err)):
1547             self.failUnless(os.path.exists(target2_filename))
1548             got = open(target2_filename,"rb").read()
1549             self.failUnlessEqual(got, datas[4])
1550         d.addCallback(_check_cp_out2)
1551
1552         # copy from tahoe into disk, overwriting an existing file
1553         d.addCallback(run, "cp", "tahoe:file3", target_filename)
1554         def _check_cp_out3((out,err)):
1555             self.failUnless(os.path.exists(target_filename))
1556             got = open(target_filename,"rb").read()
1557             self.failUnlessEqual(got, datas[3])
1558         d.addCallback(_check_cp_out3)
1559
1560         # copy from disk into tahoe, overwriting an existing immutable file
1561         d.addCallback(run, "cp", files[5], "tahoe:file4")
1562         d.addCallback(run, "ls")
1563         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1564         d.addCallback(run, "get", "tahoe:file4")
1565         d.addCallback(_check_stdout_against, 5)
1566
1567         # copy from disk into tahoe, overwriting an existing mutable file
1568         d.addCallback(run, "cp", files[5], "tahoe:file3")
1569         d.addCallback(run, "ls")
1570         d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1571         d.addCallback(run, "get", "tahoe:file3")
1572         d.addCallback(_check_stdout_against, 5)
1573
1574         # recursive copy: setup
1575         dn = os.path.join(self.basedir, "dir1")
1576         os.makedirs(dn)
1577         open(os.path.join(dn, "rfile1"), "wb").write("rfile1")
1578         open(os.path.join(dn, "rfile2"), "wb").write("rfile2")
1579         open(os.path.join(dn, "rfile3"), "wb").write("rfile3")
1580         sdn2 = os.path.join(dn, "subdir2")
1581         os.makedirs(sdn2)
1582         open(os.path.join(sdn2, "rfile4"), "wb").write("rfile4")
1583         open(os.path.join(sdn2, "rfile5"), "wb").write("rfile5")
1584
1585         # from disk into tahoe
1586         d.addCallback(run, "cp", "-r", dn, "tahoe:dir1")
1587         d.addCallback(run, "ls")
1588         d.addCallback(_check_ls, ["dir1"])
1589         d.addCallback(run, "ls", "dir1")
1590         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1591                       ["rfile4", "rfile5"])
1592         d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1593         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1594                       ["rfile1", "rfile2", "rfile3"])
1595         d.addCallback(run, "get", "dir1/subdir2/rfile4")
1596         d.addCallback(_check_stdout_against, data="rfile4")
1597
1598         # and back out again
1599         dn_copy = os.path.join(self.basedir, "dir1-copy")
1600         d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1601         def _check_cp_r_out((out,err)):
1602             def _cmp(name):
1603                 old = open(os.path.join(dn, name), "rb").read()
1604                 newfn = os.path.join(dn_copy, name)
1605                 self.failUnless(os.path.exists(newfn))
1606                 new = open(newfn, "rb").read()
1607                 self.failUnlessEqual(old, new)
1608             _cmp("rfile1")
1609             _cmp("rfile2")
1610             _cmp("rfile3")
1611             _cmp(os.path.join("subdir2", "rfile4"))
1612             _cmp(os.path.join("subdir2", "rfile5"))
1613         d.addCallback(_check_cp_r_out)
1614
1615         # and copy it a second time, which ought to overwrite the same files
1616         d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1617
1618         # and tahoe-to-tahoe
1619         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1620         d.addCallback(run, "ls")
1621         d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1622         d.addCallback(run, "ls", "dir1-copy")
1623         d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1624                       ["rfile4", "rfile5"])
1625         d.addCallback(run, "ls", "tahoe:dir1-copy/subdir2")
1626         d.addCallback(_check_ls, ["rfile4", "rfile5"],
1627                       ["rfile1", "rfile2", "rfile3"])
1628         d.addCallback(run, "get", "dir1-copy/subdir2/rfile4")
1629         d.addCallback(_check_stdout_against, data="rfile4")
1630
1631         # and copy it a second time, which ought to overwrite the same files
1632         d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1633
1634         # tahoe_ls doesn't currently handle the error correctly: it tries to
1635         # JSON-parse a traceback.
1636 ##         def _ls_missing(res):
1637 ##             argv = ["ls"] + nodeargs + ["bogus"]
1638 ##             return self._run_cli(argv)
1639 ##         d.addCallback(_ls_missing)
1640 ##         def _check_ls_missing((out,err)):
1641 ##             print "OUT", out
1642 ##             print "ERR", err
1643 ##             self.failUnlessEqual(err, "")
1644 ##         d.addCallback(_check_ls_missing)
1645
1646         return d
1647
1648     def _run_cli(self, argv, stdin=""):
1649         #print "CLI:", argv
1650         stdout, stderr = StringIO(), StringIO()
1651         d = threads.deferToThread(runner.runner, argv, run_by_human=False,
1652                                   stdin=StringIO(stdin),
1653                                   stdout=stdout, stderr=stderr)
1654         def _done(res):
1655             return stdout.getvalue(), stderr.getvalue()
1656         d.addCallback(_done)
1657         return d
1658
1659     def _test_checker(self, res):
1660         ut = upload.Data("too big to be literal" * 200, convergence=None)
1661         d = self._personal_node.add_file(u"big file", ut)
1662
1663         d.addCallback(lambda res: self._personal_node.check())
1664         def _check_dirnode_results(r):
1665             self.failUnless(r.is_healthy())
1666         d.addCallback(_check_dirnode_results)
1667         d.addCallback(lambda res: self._personal_node.check(verify=True))
1668         d.addCallback(_check_dirnode_results)
1669
1670         d.addCallback(lambda res: self._personal_node.get(u"big file"))
1671         def _got_chk_filenode(n):
1672             self.failUnless(isinstance(n, filenode.FileNode))
1673             d = n.check()
1674             def _check_filenode_results(r):
1675                 self.failUnless(r.is_healthy())
1676             d.addCallback(_check_filenode_results)
1677             d.addCallback(lambda res: n.check(verify=True))
1678             d.addCallback(_check_filenode_results)
1679             return d
1680         d.addCallback(_got_chk_filenode)
1681
1682         d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1683         def _got_lit_filenode(n):
1684             self.failUnless(isinstance(n, filenode.LiteralFileNode))
1685             d = n.check()
1686             def _check_filenode_results(r):
1687                 self.failUnless(r.is_healthy())
1688             d.addCallback(_check_filenode_results)
1689             d.addCallback(lambda res: n.check(verify=True))
1690             d.addCallback(_check_filenode_results)
1691             return d
1692         d.addCallback(_got_lit_filenode)
1693         return d
1694
1695
1696 class Checker(SystemTestMixin, unittest.TestCase):
1697     def setUp(self):
1698         # Set self.basedir to a temp dir which has the name of the current test method in its
1699         # name.
1700         self.basedir = self.mktemp()
1701         TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
1702
1703         d = defer.maybeDeferred(SystemTestMixin.setUp, self)
1704         d.addCallback(lambda x: self.set_up_nodes())
1705
1706         def _upload_a_file(ignored):
1707             d2 = self.clients[0].upload(upload.Data(TEST_DATA, convergence=""))
1708             d2.addCallback(lambda u: self.clients[0].create_node_from_uri(u.uri))
1709             return d2
1710         d.addCallback(_upload_a_file)
1711
1712         def _stash_it(filenode):
1713             self.filenode = filenode
1714         d.addCallback(_stash_it)
1715         return d
1716
1717     def _find_shares(self, unused=None):
1718         shares = {} # k: (i, sharenum), v: data
1719
1720         for i, c in enumerate(self.clients):
1721             sharedir = c.getServiceNamed("storage").sharedir
1722             for (dirp, dirns, fns) in os.walk(sharedir):
1723                 for fn in fns:
1724                     try:
1725                         sharenum = int(fn)
1726                     except TypeError:
1727                         # Whoops, I guess that's not a share file then.
1728                         pass
1729                     else:
1730                         data = open(os.path.join(sharedir, dirp, fn), "r").read()
1731                         shares[(i, sharenum)] = data
1732
1733         return shares
1734
1735     def _replace_shares(self, newshares):
1736         for i, c in enumerate(self.clients):
1737             sharedir = c.getServiceNamed("storage").sharedir
1738             for (dirp, dirns, fns) in os.walk(sharedir):
1739                 for fn in fns:
1740                     try:
1741                         sharenum = int(fn)
1742                     except TypeError:
1743                         # Whoops, I guess that's not a share file then.
1744                         pass
1745                     else:
1746                         pathtosharefile = os.path.join(sharedir, dirp, fn)
1747                         os.unlink(pathtosharefile)
1748                         newdata = newshares.get((i, sharenum))
1749                         if newdata is not None:
1750                             open(pathtosharefile, "w").write(newdata)
1751
1752     def _corrupt_a_share(self, unused=None):
1753         """ Exactly one bit of exactly one share on disk will be flipped (randomly selected from
1754         among the bits of the 'share data' -- the verifiable bits)."""
1755
1756         shares = self._find_shares()
1757         ks = shares.keys()
1758         k = random.choice(ks)
1759         data = shares[k]
1760
1761         (version, size, num_leases) = struct.unpack(">LLL", data[:0xc])
1762         sharedata = data[0xc:0xc+size]
1763
1764         corruptedsharedata = testutil.flip_one_bit(sharedata)
1765         corrupteddata = data[:0xc]+corruptedsharedata+data[0xc+size:]
1766         shares[k] = corrupteddata
1767
1768         self._replace_shares(shares)
1769
1770     def test_test_code(self):
1771         # The following process of stashing the shares, running
1772         # _replace_shares, and asserting that the new set of shares equals the
1773         # old is more to test this test code than to test the Tahoe code...
1774         d = defer.succeed(None)
1775         d.addCallback(self._find_shares)
1776         stash = [None]
1777         def _stash_it(res):
1778             stash[0] = res
1779             return res
1780
1781         d.addCallback(_stash_it)
1782         d.addCallback(self._replace_shares)
1783
1784         def _compare(res):
1785             oldshares = stash[0]
1786             self.failUnless(isinstance(oldshares, dict), oldshares)
1787             self.failUnlessEqual(oldshares, res)
1788
1789         d.addCallback(self._find_shares)
1790         d.addCallback(_compare)
1791
1792         d.addCallback(lambda ignore: self._replace_shares({}))
1793         d.addCallback(self._find_shares)
1794         d.addCallback(lambda x: self.failUnlessEqual(x, {}))
1795
1796         return d
1797
1798     def _count_reads(self):
1799         sum_of_read_counts = 0
1800         for client in self.clients:
1801             counters = client.stats_provider.get_stats()['counters']
1802             sum_of_read_counts += counters.get('storage_server.read', 0)
1803         return sum_of_read_counts
1804
1805     def test_check_without_verify(self):
1806         """ Check says the file is healthy when none of the shares have been
1807         touched.  It says that the file is unhealthy when all of them have
1808         been removed. It says that the file is healthy if one bit of one share
1809         has been flipped."""
1810         d = defer.succeed(self.filenode)
1811         def _check1(filenode):
1812             before_check_reads = self._count_reads()
1813
1814             d2 = filenode.check(verify=False, repair=False)
1815             def _after_check(checkresults):
1816                 after_check_reads = self._count_reads()
1817                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
1818                 self.failUnless(checkresults.is_healthy())
1819
1820             d2.addCallback(_after_check)
1821             return d2
1822         d.addCallback(_check1)
1823
1824         d.addCallback(self._corrupt_a_share)
1825         def _check2(ignored):
1826             before_check_reads = self._count_reads()
1827             d2 = self.filenode.check(verify=False, repair=False)
1828
1829             def _after_check(checkresults):
1830                 after_check_reads = self._count_reads()
1831                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
1832
1833             d2.addCallback(_after_check)
1834             return d2
1835         d.addCallback(_check2)
1836         return d
1837
1838         d.addCallback(lambda ignore: self._replace_shares({}))
1839         def _check3(ignored):
1840             before_check_reads = self._count_reads()
1841             d2 = self.filenode.check(verify=False, repair=False)
1842
1843             def _after_check(checkresults):
1844                 after_check_reads = self._count_reads()
1845                 self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
1846                 self.failIf(checkresults.is_healthy())
1847
1848             d2.addCallback(_after_check)
1849             return d2
1850         d.addCallback(_check3)
1851
1852         return d
1853
1854     def test_check_with_verify(self):
1855         """ Check says the file is healthy when none of the shares have been touched.  It says
1856         that the file is unhealthy if one bit of one share has been flipped."""
1857         DELTA_READS = 10 * 2 # N == 10
1858         d = defer.succeed(self.filenode)
1859         def _check1(filenode):
1860             before_check_reads = self._count_reads()
1861
1862             d2 = filenode.check(verify=True, repair=False)
1863             def _after_check(checkresults):
1864                 after_check_reads = self._count_reads()
1865                 # print "delta was ", after_check_reads - before_check_reads
1866                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
1867                 self.failUnless(checkresults.is_healthy())
1868
1869             d2.addCallback(_after_check)
1870             return d2
1871         d.addCallback(_check1)
1872
1873         d.addCallback(self._corrupt_a_share)
1874         def _check2(ignored):
1875             before_check_reads = self._count_reads()
1876             d2 = self.filenode.check(verify=True, repair=False)
1877
1878             def _after_check(checkresults):
1879                 after_check_reads = self._count_reads()
1880                 # print "delta was ", after_check_reads - before_check_reads
1881                 self.failIf(after_check_reads - before_check_reads > DELTA_READS)
1882                 self.failIf(checkresults.is_healthy())
1883
1884             d2.addCallback(_after_check)
1885             return d2
1886         d.addCallback(_check2)
1887         return d
1888     test_check_with_verify.todo = "We haven't implemented a verifier this thorough yet."