]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/test_encode.py
Add assertions to make sure that set_default_encoding_parameters is always called...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_encode.py
index a70bfd6ecc8aee9b771a001f98116f7735cbf1b8..c908e97490db81b71e2949e4918ea0d5afcd84bc 100644 (file)
@@ -1,14 +1,15 @@
-
 from zope.interface import implements
 from twisted.trial import unittest
 from twisted.internet import defer
 from twisted.python.failure import Failure
-from allmydata import encode, download, hashtree
+from foolscap.api import fireEventually
+from allmydata import uri
+from allmydata.immutable import encode, upload, checker
 from allmydata.util import hashutil
-from allmydata.uri import pack_uri
-from allmydata.Crypto.Cipher import AES
+from allmydata.util.assertutil import _assert
+from allmydata.util.consumer import download_to_data
 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader
-from cStringIO import StringIO
+from allmydata.test.no_network import GridTestMixin
 
 class LostPeerError(Exception):
     pass
@@ -16,24 +17,35 @@ class LostPeerError(Exception):
 def flip_bit(good): # flips the last bit
     return good[:-1] + chr(ord(good[-1]) ^ 0x01)
 
-class FakeBucketWriterProxy:
+class FakeBucketReaderWriterProxy:
     implements(IStorageBucketWriter, IStorageBucketReader)
     # these are used for both reading and writing
-    def __init__(self, mode="good"):
+    def __init__(self, mode="good", peerid="peer"):
         self.mode = mode
         self.blocks = {}
-        self.plaintext_hashes = None
-        self.crypttext_hashes = None
+        self.plaintext_hashes = []
+        self.crypttext_hashes = []
         self.block_hashes = None
         self.share_hashes = None
         self.closed = False
+        self.peerid = peerid
 
-    def startIfNecessary(self):
-        return defer.succeed(self)
-    def start(self):
+    def get_peerid(self):
+        return self.peerid
+
+    def _start(self):
+        if self.mode == "lost-early":
+            f = Failure(LostPeerError("I went away early"))
+            return fireEventually(f)
         return defer.succeed(self)
 
+    def put_header(self):
+        return self._start()
+
     def put_block(self, segmentnum, data):
+        if self.mode == "lost-early":
+            f = Failure(LostPeerError("I went away early"))
+            return fireEventually(f)
         def _try():
             assert not self.closed
             assert segmentnum not in self.blocks
@@ -42,17 +54,10 @@ class FakeBucketWriterProxy:
             self.blocks[segmentnum] = data
         return defer.maybeDeferred(_try)
 
-    def put_plaintext_hashes(self, hashes):
-        def _try():
-            assert not self.closed
-            assert self.plaintext_hashes is None
-            self.plaintext_hashes = hashes
-        return defer.maybeDeferred(_try)
-
     def put_crypttext_hashes(self, hashes):
         def _try():
             assert not self.closed
-            assert self.crypttext_hashes is None
+            assert not self.crypttext_hashes
             self.crypttext_hashes = hashes
         return defer.maybeDeferred(_try)
 
@@ -62,7 +67,7 @@ class FakeBucketWriterProxy:
             assert self.block_hashes is None
             self.block_hashes = blockhashes
         return defer.maybeDeferred(_try)
-        
+
     def put_share_hashes(self, sharehashes):
         def _try():
             assert not self.closed
@@ -82,45 +87,53 @@ class FakeBucketWriterProxy:
             self.closed = True
         return defer.maybeDeferred(_try)
 
-    def get_block(self, blocknum):
-        def _try():
+    def abort(self):
+        return defer.succeed(None)
+
+    def get_block_data(self, blocknum, blocksize, size):
+        d = self._start()
+        def _try(unused=None):
             assert isinstance(blocknum, (int, long))
             if self.mode == "bad block":
                 return flip_bit(self.blocks[blocknum])
             return self.blocks[blocknum]
-        return defer.maybeDeferred(_try)
+        d.addCallback(_try)
+        return d
 
     def get_plaintext_hashes(self):
-        def _try():
+        d = self._start()
+        def _try(unused=None):
             hashes = self.plaintext_hashes[:]
-            if self.mode == "bad plaintext hashroot":
-                hashes[0] = flip_bit(hashes[0])
-            if self.mode == "bad plaintext hash":
-                hashes[1] = flip_bit(hashes[1])
             return hashes
-        return defer.maybeDeferred(_try)
+        d.addCallback(_try)
+        return d
 
     def get_crypttext_hashes(self):
-        def _try():
+        d = self._start()
+        def _try(unused=None):
             hashes = self.crypttext_hashes[:]
             if self.mode == "bad crypttext hashroot":
                 hashes[0] = flip_bit(hashes[0])
             if self.mode == "bad crypttext hash":
                 hashes[1] = flip_bit(hashes[1])
             return hashes
-        return defer.maybeDeferred(_try)
+        d.addCallback(_try)
+        return d
 
-    def get_block_hashes(self):
-        def _try():
+    def get_block_hashes(self, at_least_these=()):
+        d = self._start()
+        def _try(unused=None):
             if self.mode == "bad blockhash":
                 hashes = self.block_hashes[:]
                 hashes[1] = flip_bit(hashes[1])
                 return hashes
             return self.block_hashes
-        return defer.maybeDeferred(_try)
+        d.addCallback(_try)
+        return d
 
-    def get_share_hashes(self):
-        def _try():
+    def get_share_hashes(self, at_least_these=()):
+        d = self._start()
+        def _try(unused=None):
             if self.mode == "bad sharehash":
                 hashes = self.share_hashes[:]
                 hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
@@ -131,14 +144,17 @@ class FakeBucketWriterProxy:
                 # download.py is supposed to guard against this case.
                 return []
             return self.share_hashes
-        return defer.maybeDeferred(_try)
+        d.addCallback(_try)
+        return d
 
     def get_uri_extension(self):
-        def _try():
+        d = self._start()
+        def _try(unused=None):
             if self.mode == "bad uri_extension":
                 return flip_bit(self.uri_extension)
             return self.uri_extension
-        return defer.maybeDeferred(_try)
+        d.addCallback(_try)
+        return d
 
 
 def make_data(length):
@@ -146,30 +162,143 @@ def make_data(length):
     assert length <= len(data)
     return data[:length]
 
+class ValidatedExtendedURIProxy(unittest.TestCase):
+    timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
+    K = 4
+    M = 10
+    SIZE = 200
+    SEGSIZE = 72
+    _TMP = SIZE%SEGSIZE
+    if _TMP == 0:
+        _TMP = SEGSIZE
+    if _TMP % K != 0:
+        _TMP += (K - (_TMP % K))
+    TAIL_SEGSIZE = _TMP
+    _TMP = SIZE / SEGSIZE
+    if SIZE % SEGSIZE != 0:
+        _TMP += 1
+    NUM_SEGMENTS = _TMP
+    mindict = { 'segment_size': SEGSIZE,
+                'crypttext_root_hash': '0'*hashutil.CRYPTO_VAL_SIZE,
+                'share_root_hash': '1'*hashutil.CRYPTO_VAL_SIZE }
+    optional_consistent = { 'crypttext_hash': '2'*hashutil.CRYPTO_VAL_SIZE,
+                            'codec_name': "crs",
+                            'codec_params': "%d-%d-%d" % (SEGSIZE, K, M),
+                            'tail_codec_params': "%d-%d-%d" % (TAIL_SEGSIZE, K, M),
+                            'num_segments': NUM_SEGMENTS,
+                            'size': SIZE,
+                            'needed_shares': K,
+                            'total_shares': M,
+                            'plaintext_hash': "anything",
+                            'plaintext_root_hash': "anything", }
+    # optional_inconsistent = { 'crypttext_hash': ('2'*(hashutil.CRYPTO_VAL_SIZE-1), "", 77),
+    optional_inconsistent = { 'crypttext_hash': (77,),
+                              'codec_name': ("digital fountain", ""),
+                              'codec_params': ("%d-%d-%d" % (SEGSIZE, K-1, M),
+                                               "%d-%d-%d" % (SEGSIZE-1, K, M),
+                                               "%d-%d-%d" % (SEGSIZE, K, M-1)),
+                              'tail_codec_params': ("%d-%d-%d" % (TAIL_SEGSIZE, K-1, M),
+                                               "%d-%d-%d" % (TAIL_SEGSIZE-1, K, M),
+                                               "%d-%d-%d" % (TAIL_SEGSIZE, K, M-1)),
+                              'num_segments': (NUM_SEGMENTS-1,),
+                              'size': (SIZE-1,),
+                              'needed_shares': (K-1,),
+                              'total_shares': (M-1,), }
+
+    def _test(self, uebdict):
+        uebstring = uri.pack_extension(uebdict)
+        uebhash = hashutil.uri_extension_hash(uebstring)
+        fb = FakeBucketReaderWriterProxy()
+        fb.put_uri_extension(uebstring)
+        verifycap = uri.CHKFileVerifierURI(storage_index='x'*16, uri_extension_hash=uebhash, needed_shares=self.K, total_shares=self.M, size=self.SIZE)
+        vup = checker.ValidatedExtendedURIProxy(fb, verifycap)
+        return vup.start()
+
+    def _test_accept(self, uebdict):
+        return self._test(uebdict)
+
+    def _should_fail(self, res, expected_failures):
+        if isinstance(res, Failure):
+            res.trap(*expected_failures)
+        else:
+            self.fail("was supposed to raise %s, not get '%s'" % (expected_failures, res))
+
+    def _test_reject(self, uebdict):
+        d = self._test(uebdict)
+        d.addBoth(self._should_fail, (KeyError, checker.BadURIExtension))
+        return d
+
+    def test_accept_minimal(self):
+        return self._test_accept(self.mindict)
+
+    def test_reject_insufficient(self):
+        dl = []
+        for k in self.mindict.iterkeys():
+            insuffdict = self.mindict.copy()
+            del insuffdict[k]
+            d = self._test_reject(insuffdict)
+        dl.append(d)
+        return defer.DeferredList(dl)
+
+    def test_accept_optional(self):
+        dl = []
+        for k in self.optional_consistent.iterkeys():
+            mydict = self.mindict.copy()
+            mydict[k] = self.optional_consistent[k]
+            d = self._test_accept(mydict)
+        dl.append(d)
+        return defer.DeferredList(dl)
+
+    def test_reject_optional(self):
+        dl = []
+        for k in self.optional_inconsistent.iterkeys():
+            for v in self.optional_inconsistent[k]:
+                mydict = self.mindict.copy()
+                mydict[k] = v
+                d = self._test_reject(mydict)
+                dl.append(d)
+        return defer.DeferredList(dl)
+
 class Encode(unittest.TestCase):
+    timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
 
     def do_encode(self, max_segment_size, datalen, NUM_SHARES, NUM_SEGMENTS,
                   expected_block_hashes, expected_share_hashes):
         data = make_data(datalen)
         # force use of multiple segments
-        options = {"max_segment_size": max_segment_size}
-        e = encode.Encoder(options)
-        nonkey = "\x00" * 16
-        e.setup(StringIO(data), nonkey)
-        assert e.num_shares == NUM_SHARES # else we'll be completely confused
-        e.setup_codec() # need to rebuild the codec for that change
-        assert (NUM_SEGMENTS-1)*e.segment_size < len(data) <= NUM_SEGMENTS*e.segment_size
-        shareholders = {}
+        e = encode.Encoder()
+        u = upload.Data(data, convergence="some convergence string")
+        u.set_default_encoding_parameters({'max_segment_size': max_segment_size,
+                                           'k': 25, 'happy': 75, 'n': 100})
+        eu = upload.EncryptAnUploadable(u)
+        d = e.set_encrypted_uploadable(eu)
+
         all_shareholders = []
-        for shnum in range(NUM_SHARES):
-            peer = FakeBucketWriterProxy()
-            shareholders[shnum] = peer
-            all_shareholders.append(peer)
-        e.set_shareholders(shareholders)
-        d = e.start()
-        def _check(roothash):
-            self.failUnless(isinstance(roothash, str))
-            self.failUnlessEqual(len(roothash), 32)
+        def _ready(res):
+            k,happy,n = e.get_param("share_counts")
+            _assert(n == NUM_SHARES) # else we'll be completely confused
+            numsegs = e.get_param("num_segments")
+            _assert(numsegs == NUM_SEGMENTS, numsegs, NUM_SEGMENTS)
+            segsize = e.get_param("segment_size")
+            _assert( (NUM_SEGMENTS-1)*segsize < len(data) <= NUM_SEGMENTS*segsize,
+                     NUM_SEGMENTS, segsize,
+                     (NUM_SEGMENTS-1)*segsize, len(data), NUM_SEGMENTS*segsize)
+
+            shareholders = {}
+            servermap = {}
+            for shnum in range(NUM_SHARES):
+                peer = FakeBucketReaderWriterProxy()
+                shareholders[shnum] = peer
+                servermap.setdefault(shnum, set()).add(peer.get_peerid())
+                all_shareholders.append(peer)
+            e.set_shareholders(shareholders, servermap)
+            return e.start()
+        d.addCallback(_ready)
+
+        def _check(res):
+            verifycap = res
+            self.failUnless(isinstance(verifycap.uri_extension_hash, str))
+            self.failUnlessEqual(len(verifycap.uri_extension_hash), 32)
             for i,peer in enumerate(all_shareholders):
                 self.failUnless(peer.closed)
                 self.failUnlessEqual(len(peer.blocks), NUM_SEGMENTS)
@@ -190,30 +319,6 @@ class Encode(unittest.TestCase):
 
         return d
 
-    # a series of 3*3 tests to check out edge conditions. One axis is how the
-    # plaintext is divided into segments: kn+(-1,0,1). Another way to express
-    # that is that n%k == -1 or 0 or 1. For example, for 25-byte segments, we
-    # might test 74 bytes, 75 bytes, and 76 bytes.
-
-    # on the other axis is how many leaves in the block hash tree we wind up
-    # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns
-    # into a single leaf. So we'd like to check out, e.g., 3 segments, 4
-    # segments, and 5 segments.
-
-    # that results in the following series of data lengths:
-    #  3 segs: 74, 75, 51
-    #  4 segs: 99, 100, 76
-    #  5 segs: 124, 125, 101
-
-    # all tests encode to 100 shares, which means the share hash tree will
-    # have 128 leaves, which means that buckets will be given an 8-long share
-    # hash chain
-    
-    # all 3-segment files will have a 4-leaf blockhashtree, and thus expect
-    # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash
-    # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash
-    # trees, which get 15 blockhashes.
-
     def test_send_74(self):
         # 3 segments (25, 25, 24)
         return self.do_encode(25, 74, 100, 3, 7, 8)
@@ -234,10 +339,6 @@ class Encode(unittest.TestCase):
         # 4 segments: 25,25,25,25
         return self.do_encode(25, 100, 100, 4, 7, 8)
 
-    def test_send_101(self):
-        # encode a 101 byte file (in 5 segments: 25,25,25,25,1) to 100 shares
-        return self.do_encode(25, self.make_data(101), 100, 5, 15, 8)
-
     def test_send_124(self):
         # 5 segments: 25, 25, 25, 25, 24
         return self.do_encode(25, 124, 100, 5, 15, 8)
@@ -248,381 +349,62 @@ class Encode(unittest.TestCase):
         # 5 segments: 25, 25, 25, 25, 1
         return self.do_encode(25, 101, 100, 5, 15, 8)
 
-class Roundtrip(unittest.TestCase):
-    def send_and_recover(self, k_and_happy_and_n=(25,75,100),
-                         AVAILABLE_SHARES=None,
-                         datalen=76,
-                         max_segment_size=25,
-                         bucket_modes={},
-                         recover_mode="recover",
-                         ):
-        if AVAILABLE_SHARES is None:
-            AVAILABLE_SHARES = k_and_happy_and_n[2]
-        data = make_data(datalen)
-        d = self.send(k_and_happy_and_n, AVAILABLE_SHARES,
-                      max_segment_size, bucket_modes, data)
-        # that fires with (uri_extension_hash, e, shareholders)
-        d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode)
-        # that fires with newdata
-        def _downloaded((newdata, fd)):
-            self.failUnless(newdata == data)
-            return fd
-        d.addCallback(_downloaded)
-        return d
-
-    def send(self, k_and_happy_and_n, AVAILABLE_SHARES, max_segment_size,
-             bucket_modes, data):
-        NUM_SHARES = k_and_happy_and_n[2]
-        if AVAILABLE_SHARES is None:
-            AVAILABLE_SHARES = NUM_SHARES
-        # force use of multiple segments
-        options = {"max_segment_size": max_segment_size,
-                   "needed_and_happy_and_total_shares": k_and_happy_and_n}
-        e = encode.Encoder(options)
-        nonkey = "\x00" * 16
-        e.setup(StringIO(data), nonkey)
-
-        assert e.num_shares == NUM_SHARES # else we'll be completely confused
-        e.setup_codec() # need to rebuild the codec for that change
-
-        shareholders = {}
-        all_peers = []
-        for shnum in range(NUM_SHARES):
-            mode = bucket_modes.get(shnum, "good")
-            peer = FakeBucketWriterProxy(mode)
-            shareholders[shnum] = peer
-        e.set_shareholders(shareholders)
-        plaintext_hasher = hashutil.plaintext_hasher()
-        plaintext_hasher.update(data)
-        cryptor = AES.new(key=nonkey, mode=AES.MODE_CTR,
-                          counterstart="\x00"*16)
-        crypttext_hasher = hashutil.crypttext_hasher()
-        crypttext_hasher.update(cryptor.encrypt(data))
-
-        e.set_uri_extension_data({'crypttext_hash': crypttext_hasher.digest(),
-                                  'plaintext_hash': plaintext_hasher.digest(),
-                                  })
-        d = e.start()
-        def _sent(uri_extension_hash):
-            return (uri_extension_hash, e, shareholders)
-        d.addCallback(_sent)
-        return d
-
-    def recover(self, (uri_extension_hash, e, shareholders), AVAILABLE_SHARES,
-                recover_mode):
-        key = e.key
-        if "corrupt_key" in recover_mode:
-            key = flip_bit(key)
-
-        URI = pack_uri(storage_index="S" * 32,
-                       key=key,
-                       uri_extension_hash=uri_extension_hash,
-                       needed_shares=e.required_shares,
-                       total_shares=e.num_shares,
-                       size=e.file_size)
-        client = None
-        target = download.Data()
-        fd = download.FileDownloader(client, URI, target)
-
-        # we manually cycle the FileDownloader through a number of steps that
-        # would normally be sequenced by a Deferred chain in
-        # FileDownloader.start(), to give us more control over the process.
-        # In particular, by bypassing _get_all_shareholders, we skip
-        # permuted-peerlist selection.
-        for shnum, bucket in shareholders.items():
-            if shnum < AVAILABLE_SHARES and bucket.closed:
-                fd.add_share_bucket(shnum, bucket)
-        fd._got_all_shareholders(None)
-
-        # Make it possible to obtain uri_extension from the shareholders.
-        # Arrange for shareholders[0] to be the first, so we can selectively
-        # corrupt the data it returns.
-        fd._uri_extension_sources = shareholders.values()
-        fd._uri_extension_sources.remove(shareholders[0])
-        fd._uri_extension_sources.insert(0, shareholders[0])
-
-        d = defer.succeed(None)
-
-        # have the FileDownloader retrieve a copy of uri_extension itself
-        d.addCallback(fd._obtain_uri_extension)
-
-        if "corrupt_crypttext_hashes" in recover_mode:
-            # replace everybody's crypttext hash trees with a different one
-            # (computed over a different file), then modify our uri_extension
-            # to reflect the new crypttext hash tree root
-            def _corrupt_crypttext_hashes(uri_extension):
-                assert isinstance(uri_extension, dict)
-                assert 'crypttext_root_hash' in uri_extension
-                badhash = hashutil.tagged_hash("bogus", "data")
-                bad_crypttext_hashes = [badhash] * uri_extension['num_segments']
-                badtree = hashtree.HashTree(bad_crypttext_hashes)
-                for bucket in shareholders.values():
-                    bucket.crypttext_hashes = list(badtree)
-                uri_extension['crypttext_root_hash'] = badtree[0]
-                return uri_extension
-            d.addCallback(_corrupt_crypttext_hashes)
-
-        d.addCallback(fd._got_uri_extension)
-
-        # also have the FileDownloader ask for hash trees
-        d.addCallback(fd._get_hashtrees)
-
-        d.addCallback(fd._create_validated_buckets)
-        d.addCallback(fd._download_all_segments)
-        d.addCallback(fd._done)
-        def _done(newdata):
-            return (newdata, fd)
-        d.addCallback(_done)
-        return d
-
-    def test_not_enough_shares(self):
-        d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2)
-        def _done(res):
-            self.failUnless(isinstance(res, Failure))
-            self.failUnless(res.check(download.NotEnoughPeersError))
-        d.addBoth(_done)
-        return d
 
-    def test_one_share_per_peer(self):
-        return self.send_and_recover()
-
-    def test_74(self):
-        return self.send_and_recover(datalen=74)
-    def test_75(self):
-        return self.send_and_recover(datalen=75)
-    def test_51(self):
-        return self.send_and_recover(datalen=51)
-
-    def test_99(self):
-        return self.send_and_recover(datalen=99)
-    def test_100(self):
-        return self.send_and_recover(datalen=100)
-    def test_76(self):
-        return self.send_and_recover(datalen=76)
-
-    def test_124(self):
-        return self.send_and_recover(datalen=124)
-    def test_125(self):
-        return self.send_and_recover(datalen=125)
-    def test_101(self):
-        return self.send_and_recover(datalen=101)
-
-    # the following tests all use 4-out-of-10 encoding
-
-    def test_bad_blocks(self):
-        # the first 6 servers have bad blocks, which will be caught by the
-        # blockhashes
-        modemap = dict([(i, "bad block")
-                        for i in range(6)]
-                       + [(i, "good")
-                          for i in range(6, 10)])
-        return self.send_and_recover((4,8,10), bucket_modes=modemap)
-
-    def test_bad_blocks_failure(self):
-        # the first 7 servers have bad blocks, which will be caught by the
-        # blockhashes, and the download will fail
-        modemap = dict([(i, "bad block")
-                        for i in range(7)]
-                       + [(i, "good")
-                          for i in range(7, 10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
-        def _done(res):
-            self.failUnless(isinstance(res, Failure))
-            self.failUnless(res.check(download.NotEnoughPeersError))
-        d.addBoth(_done)
-        return d
-
-    def test_bad_blockhashes(self):
-        # the first 6 servers have bad block hashes, so the blockhash tree
-        # will not validate
-        modemap = dict([(i, "bad blockhash")
-                        for i in range(6)]
-                       + [(i, "good")
-                          for i in range(6, 10)])
-        return self.send_and_recover((4,8,10), bucket_modes=modemap)
-
-    def test_bad_blockhashes_failure(self):
-        # the first 7 servers have bad block hashes, so the blockhash tree
-        # will not validate, and the download will fail
-        modemap = dict([(i, "bad blockhash")
-                        for i in range(7)]
-                       + [(i, "good")
-                          for i in range(7, 10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
-        def _done(res):
-            self.failUnless(isinstance(res, Failure))
-            self.failUnless(res.check(download.NotEnoughPeersError))
-        d.addBoth(_done)
-        return d
-
-    def test_bad_sharehashes(self):
-        # the first 6 servers have bad block hashes, so the sharehash tree
-        # will not validate
-        modemap = dict([(i, "bad sharehash")
-                        for i in range(6)]
-                       + [(i, "good")
-                          for i in range(6, 10)])
-        return self.send_and_recover((4,8,10), bucket_modes=modemap)
-
-    def assertFetchFailureIn(self, fd, where):
-        expected = {"uri_extension": 0,
-                    "plaintext_hashroot": 0,
-                    "plaintext_hashtree": 0,
-                    "crypttext_hashroot": 0,
-                    "crypttext_hashtree": 0,
-                    }
-        if where is not None:
-            expected[where] += 1
-        self.failUnlessEqual(fd._fetch_failures, expected)
-
-    def test_good(self):
-        # just to make sure the test harness works when we aren't
-        # intentionally causing failures
-        modemap = dict([(i, "good") for i in range(0, 10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
-        d.addCallback(self.assertFetchFailureIn, None)
-        return d
-
-    def test_bad_uri_extension(self):
-        # the first server has a bad uri_extension block, so we will fail
-        # over to a different server.
-        modemap = dict([(i, "bad uri_extension") for i in range(1)] +
-                       [(i, "good") for i in range(1, 10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
-        d.addCallback(self.assertFetchFailureIn, "uri_extension")
-        return d
-
-    def test_bad_plaintext_hashroot(self):
-        # the first server has a bad plaintext hashroot, so we will fail over
-        # to a different server.
-        modemap = dict([(i, "bad plaintext hashroot") for i in range(1)] +
-                       [(i, "good") for i in range(1, 10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
-        d.addCallback(self.assertFetchFailureIn, "plaintext_hashroot")
-        return d
-
-    def test_bad_crypttext_hashroot(self):
-        # the first server has a bad crypttext hashroot, so we will fail
-        # over to a different server.
-        modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] +
-                       [(i, "good") for i in range(1, 10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
-        d.addCallback(self.assertFetchFailureIn, "crypttext_hashroot")
-        return d
-
-    def test_bad_plaintext_hashes(self):
-        # the first server has a bad plaintext hash block, so we will fail
-        # over to a different server.
-        modemap = dict([(i, "bad plaintext hash") for i in range(1)] +
-                       [(i, "good") for i in range(1, 10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
-        d.addCallback(self.assertFetchFailureIn, "plaintext_hashtree")
-        return d
-
-    def test_bad_crypttext_hashes(self):
-        # the first server has a bad crypttext hash block, so we will fail
-        # over to a different server.
-        modemap = dict([(i, "bad crypttext hash") for i in range(1)] +
-                       [(i, "good") for i in range(1, 10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
-        d.addCallback(self.assertFetchFailureIn, "crypttext_hashtree")
-        return d
-
-    def test_bad_crypttext_hashes_failure(self):
-        # to test that the crypttext merkle tree is really being applied, we
-        # sneak into the download process and corrupt two things: we replace
-        # everybody's crypttext hashtree with a bad version (computed over
-        # bogus data), and we modify the supposedly-validated uri_extension
-        # block to match the new crypttext hashtree root. The download
-        # process should notice that the crypttext coming out of FEC doesn't
-        # match the tree, and fail.
-
-        modemap = dict([(i, "good") for i in range(0, 10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap,
-                                  recover_mode=("corrupt_crypttext_hashes"))
-        def _done(res):
-            self.failUnless(isinstance(res, Failure))
-            self.failUnless(res.check(hashtree.BadHashError), res)
-        d.addBoth(_done)
-        return d
+class Roundtrip(GridTestMixin, unittest.TestCase):
 
+    # a series of 3*3 tests to check out edge conditions. One axis is how the
+    # plaintext is divided into segments: kn+(-1,0,1). Another way to express
+    # this is n%k == -1 or 0 or 1. For example, for 25-byte segments, we
+    # might test 74 bytes, 75 bytes, and 76 bytes.
 
-    def test_bad_plaintext(self):
-        # faking a decryption failure is easier: just corrupt the key
-        modemap = dict([(i, "good") for i in range(0, 10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap,
-                                  recover_mode=("corrupt_key"))
-        def _done(res):
-            self.failUnless(isinstance(res, Failure))
-            self.failUnless(res.check(hashtree.BadHashError))
-        d.addBoth(_done)
-        return d
+    # on the other axis is how many leaves in the block hash tree we wind up
+    # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns
+    # into a single leaf. So we'd like to check out, e.g., 3 segments, 4
+    # segments, and 5 segments.
 
-    def test_bad_sharehashes_failure(self):
-        # the first 7 servers have bad block hashes, so the sharehash tree
-        # will not validate, and the download will fail
-        modemap = dict([(i, "bad sharehash")
-                        for i in range(7)]
-                       + [(i, "good")
-                          for i in range(7, 10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
-        def _done(res):
-            self.failUnless(isinstance(res, Failure))
-            self.failUnless(res.check(download.NotEnoughPeersError))
-        d.addBoth(_done)
-        return d
+    # that results in the following series of data lengths:
+    #  3 segs: 74, 75, 51
+    #  4 segs: 99, 100, 76
+    #  5 segs: 124, 125, 101
 
-    def test_missing_sharehashes(self):
-        # the first 6 servers are missing their sharehashes, so the
-        # sharehash tree will not validate
-        modemap = dict([(i, "missing sharehash")
-                        for i in range(6)]
-                       + [(i, "good")
-                          for i in range(6, 10)])
-        return self.send_and_recover((4,8,10), bucket_modes=modemap)
-
-    def test_missing_sharehashes_failure(self):
-        # the first 7 servers are missing their sharehashes, so the
-        # sharehash tree will not validate, and the download will fail
-        modemap = dict([(i, "missing sharehash")
-                        for i in range(7)]
-                       + [(i, "good")
-                          for i in range(7, 10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
-        def _done(res):
-            self.failUnless(isinstance(res, Failure))
-            self.failUnless(res.check(download.NotEnoughPeersError))
-        d.addBoth(_done)
-        return d
+    # all tests encode to 100 shares, which means the share hash tree will
+    # have 128 leaves, which means that buckets will be given an 8-long share
+    # hash chain
 
-    def test_lost_one_shareholder(self):
-        # we have enough shareholders when we start, but one segment in we
-        # lose one of them. The upload should still succeed, as long as we
-        # still have 'shares_of_happiness' peers left.
-        modemap = dict([(i, "good") for i in range(9)] +
-                       [(i, "lost") for i in range(9, 10)])
-        return self.send_and_recover((4,8,10), bucket_modes=modemap)
-
-    def test_lost_many_shareholders(self):
-        # we have enough shareholders when we start, but one segment in we
-        # lose all but one of them. The upload should fail.
-        modemap = dict([(i, "good") for i in range(1)] +
-                       [(i, "lost") for i in range(1, 10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
-        def _done(res):
-            self.failUnless(isinstance(res, Failure))
-            self.failUnless(res.check(encode.NotEnoughPeersError))
-        d.addBoth(_done)
+    # all 3-segment files will have a 4-leaf blockhashtree, and thus expect
+    # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash
+    # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash
+    # trees, which gets 15 blockhashes.
+
+    def test_74(self): return self.do_test_size(74)
+    def test_75(self): return self.do_test_size(75)
+    def test_51(self): return self.do_test_size(51)
+    def test_99(self): return self.do_test_size(99)
+    def test_100(self): return self.do_test_size(100)
+    def test_76(self): return self.do_test_size(76)
+    def test_124(self): return self.do_test_size(124)
+    def test_125(self): return self.do_test_size(125)
+    def test_101(self): return self.do_test_size(101)
+
+    def upload(self, data):
+        u = upload.Data(data, None)
+        u.max_segment_size = 25
+        u.encoding_param_k = 25
+        u.encoding_param_happy = 1
+        u.encoding_param_n = 100
+        d = self.c0.upload(u)
+        d.addCallback(lambda ur: self.c0.create_node_from_uri(ur.get_uri()))
+        # returns a FileNode
         return d
 
-    def test_lost_all_shareholders(self):
-        # we have enough shareholders when we start, but one segment in we
-        # lose all of them. The upload should fail.
-        modemap = dict([(i, "lost") for i in range(10)])
-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
-        def _done(res):
-            self.failUnless(isinstance(res, Failure))
-            self.failUnless(res.check(encode.NotEnoughPeersError))
-        d.addBoth(_done)
+    def do_test_size(self, size):
+        self.basedir = self.mktemp()
+        self.set_up_grid()
+        self.c0 = self.g.clients[0]
+        DATA = "p"*size
+        d = self.upload(DATA)
+        d.addCallback(lambda n: download_to_data(n))
+        def _downloaded(newdata):
+            self.failUnlessEqual(newdata, DATA)
+        d.addCallback(_downloaded)
         return d
-