]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/test_mutable.py
test_mutable: more enhancements
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_mutable.py
index 451793607112981e85a269ad0f810ec9c52868d0..c9711c6dc3ac5287a6406f746667ce0d5cc18bc5 100644 (file)
@@ -1,28 +1,30 @@
-
 import os, re, base64
 from cStringIO import StringIO
+
 from twisted.trial import unittest
 from twisted.internet import defer, reactor
-from twisted.internet.interfaces import IConsumer
-from zope.interface import implements
+
 from allmydata import uri, client
 from allmydata.nodemaker import NodeMaker
 from allmydata.util import base32, consumer, fileutil, mathutil
+from allmydata.util.fileutil import abspath_expanduser_unicode
 from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \
      ssk_pubkey_fingerprint_hash
+from allmydata.util.consumer import MemoryConsumer
 from allmydata.util.deferredutil import gatherResults
 from allmydata.interfaces import IRepairResults, ICheckAndRepairResults, \
-     NotEnoughSharesError, SDMF_VERSION, MDMF_VERSION
+     NotEnoughSharesError, SDMF_VERSION, MDMF_VERSION, DownloadStopped
 from allmydata.monitor import Monitor
 from allmydata.test.common import ShouldFailMixin
 from allmydata.test.no_network import GridTestMixin
-from foolscap.api import eventually, fireEventually
+from foolscap.api import eventually, fireEventually, flushEventualQueue
 from foolscap.logging import log
 from allmydata.storage_client import StorageFarmBroker
 from allmydata.storage.common import storage_index_to_dir
+from allmydata.scripts import debug
 
 from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
-from allmydata.mutable.common import ResponseCache, \
+from allmydata.mutable.common import \
      MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
      NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \
      NotEnoughServersError, CorruptShareError
@@ -36,6 +38,15 @@ from allmydata.mutable.repairer import MustForceRepairError
 
 import allmydata.test.common_util as testutil
 from allmydata.test.common import TEST_RSA_KEY_SIZE
+from allmydata.test.test_download import PausingConsumer, \
+     PausingAndStoppingConsumer, StoppingConsumer, \
+     ImmediatelyStoppingConsumer
+
+def eventuaaaaaly(res=None):
+    d = fireEventually(res)
+    d.addCallback(fireEventually)
+    d.addCallback(fireEventually)
+    return d
 
 
 # this "FakeStorage" exists to put the share data in RAM and avoid using real
@@ -67,11 +78,13 @@ class FakeStorage:
     def read(self, peerid, storage_index):
         shares = self._peers.get(peerid, {})
         if self._sequence is None:
-            return defer.succeed(shares)
+            return eventuaaaaaly(shares)
         d = defer.Deferred()
         if not self._pending:
             self._pending_timer = reactor.callLater(1.0, self._fire_readers)
-        self._pending[peerid] = (d, shares)
+        if peerid not in self._pending:
+            self._pending[peerid] = []
+        self._pending[peerid].append( (d, shares) )
         return d
 
     def _fire_readers(self):
@@ -80,10 +93,11 @@ class FakeStorage:
         self._pending = {}
         for peerid in self._sequence:
             if peerid in pending:
-                d, shares = pending.pop(peerid)
+                for (d, shares) in pending.pop(peerid):
+                    eventually(d.callback, shares)
+        for peerid in pending:
+            for (d, shares) in pending[peerid]:
                 eventually(d.callback, shares)
-        for (d, shares) in pending.values():
-            eventually(d.callback, shares)
 
     def write(self, peerid, storage_index, shnum, offset, data):
         if peerid not in self._peers:
@@ -184,7 +198,7 @@ def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0):
             reader = MDMFSlotReadProxy(None, None, shnum, data)
             # We need to get the offsets for the next part.
             d = reader.get_verinfo()
-            def _do_corruption(verinfo, data, shnum):
+            def _do_corruption(verinfo, data, shnum, shares):
                 (seqnum,
                  root_hash,
                  IV,
@@ -209,7 +223,7 @@ def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0):
                 else:
                     f = flip_bit
                 shares[shnum] = f(data, real_offset)
-            d.addCallback(_do_corruption, data, shnum)
+            d.addCallback(_do_corruption, data, shnum, shares)
             ds.append(d)
     dl = defer.DeferredList(ds)
     dl.addCallback(lambda ignored: res)
@@ -223,17 +237,20 @@ def make_storagebroker(s=None, num_peers=10):
     storage_broker = StorageFarmBroker(None, True)
     for peerid in peerids:
         fss = FakeStorageServer(peerid, s)
-        storage_broker.test_add_rref(peerid, fss)
+        ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
+               "permutation-seed-base32": base32.b2a(peerid) }
+        storage_broker.test_add_rref(peerid, fss, ann)
     return storage_broker
 
-def make_nodemaker(s=None, num_peers=10):
+def make_nodemaker(s=None, num_peers=10, keysize=TEST_RSA_KEY_SIZE):
     storage_broker = make_storagebroker(s, num_peers)
     sh = client.SecretHolder("lease secret", "convergence secret")
     keygen = client.KeyGenerator()
-    keygen.set_default_keysize(TEST_RSA_KEY_SIZE)
+    if keysize:
+        keygen.set_default_keysize(keysize)
     nodemaker = NodeMaker(storage_broker, sh, None,
                           None, None,
-                          {"k": 3, "n": 10}, keygen)
+                          {"k": 3, "n": 10}, SDMF_VERSION, keygen)
     return nodemaker
 
 class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
@@ -277,7 +294,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
         self.nodemaker.default_encoding_parameters['n'] = 1
         d = defer.succeed(None)
         for v in (SDMF_VERSION, MDMF_VERSION):
-            d.addCallback(lambda ignored:
+            d.addCallback(lambda ignored, v=v:
                 self.nodemaker.create_mutable_file(version=v))
             def _created(n):
                 self.failUnless(isinstance(n, MutableFileNode))
@@ -369,28 +386,6 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
         return d
 
 
-    def test_create_from_mdmf_writecap_with_extensions(self):
-        # Test that the nodemaker is capable of creating an MDMF
-        # filenode when given a writecap with extension parameters in
-        # them.
-        d = self.nodemaker.create_mutable_file(version=MDMF_VERSION)
-        def _created(n):
-            self.failUnless(isinstance(n, MutableFileNode))
-            s = n.get_uri()
-            # We need to cheat a little and delete the nodemaker's
-            # cache, otherwise we'll get the same node instance back.
-            self.failUnlessIn(":3:131073", s)
-            n2 = self.nodemaker.create_from_cap(s)
-
-            self.failUnlessEqual(n2.get_storage_index(), n.get_storage_index())
-            self.failUnlessEqual(n.get_writekey(), n2.get_writekey())
-            hints = n2._downloader_hints
-            self.failUnlessEqual(hints['k'], 3)
-            self.failUnlessEqual(hints['segsize'], 131073)
-        d.addCallback(_created)
-        return d
-
-
     def test_create_from_mdmf_readcap(self):
         d = self.nodemaker.create_mutable_file(version=MDMF_VERSION)
         def _created(n):
@@ -405,26 +400,6 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
         return d
 
 
-    def test_create_from_mdmf_readcap_with_extensions(self):
-        # We should be able to create an MDMF filenode with the
-        # extension parameters without it breaking.
-        d = self.nodemaker.create_mutable_file(version=MDMF_VERSION)
-        def _created(n):
-            self.failUnless(isinstance(n, MutableFileNode))
-            s = n.get_readonly_uri()
-            self.failUnlessIn(":3:131073", s)
-
-            n2 = self.nodemaker.create_from_cap(s)
-            self.failUnless(isinstance(n2, MutableFileNode))
-            self.failUnless(n2.is_readonly())
-            self.failUnlessEqual(n.get_storage_index(), n2.get_storage_index())
-            hints = n2._downloader_hints
-            self.failUnlessEqual(hints["k"], 3)
-            self.failUnlessEqual(hints["segsize"], 131073)
-        d.addCallback(_created)
-        return d
-
-
     def test_internal_version_from_cap(self):
         # MutableFileNodes and MutableFileVersions have an internal
         # switch that tells them whether they're dealing with an SDMF or
@@ -512,7 +487,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
                 dumped = servermap.dump(StringIO())
                 self.failUnlessIn("3-of-10", dumped.getvalue())
             d.addCallback(_then)
-            # Now overwrite the contents with some new contents. We want 
+            # Now overwrite the contents with some new contents. We want
             # to make them big enough to force the file to be uploaded
             # in more than one segment.
             big_contents = "contents1" * 100000 # about 900 KiB
@@ -527,7 +502,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
             # before, they need to be big enough to force multiple
             # segments, so that we make the downloader deal with
             # multiple segments.
-            bigger_contents = "contents2" * 1000000 # about 9MiB 
+            bigger_contents = "contents2" * 1000000 # about 9MiB
             bigger_contents_uploadable = MutableData(bigger_contents)
             d.addCallback(lambda ignored:
                 n.overwrite(bigger_contents_uploadable))
@@ -540,32 +515,69 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
         return d
 
 
-    def test_retrieve_pause(self):
-        # We should make sure that the retriever is able to pause
+    def test_retrieve_producer_mdmf(self):
+        # We should make sure that the retriever is able to pause and stop
         # correctly.
-        d = self.nodemaker.create_mutable_file(version=MDMF_VERSION)
-        def _created(node):
-            self.node = node
+        data = "contents1" * 100000
+        d = self.nodemaker.create_mutable_file(MutableData(data),
+                                               version=MDMF_VERSION)
+        d.addCallback(lambda node: node.get_best_mutable_version())
+        d.addCallback(self._test_retrieve_producer, "MDMF", data)
+        return d
 
-            return node.overwrite(MutableData("contents1" * 100000))
-        d.addCallback(_created)
-        # Now we'll retrieve it into a pausing consumer.
-        d.addCallback(lambda ignored:
-            self.node.get_best_mutable_version())
-        def _got_version(version):
-            self.c = PausingConsumer()
-            return version.read(self.c)
-        d.addCallback(_got_version)
-        d.addCallback(lambda ignored:
-            self.failUnlessEqual(self.c.data, "contents1" * 100000))
+    # note: SDMF has only one big segment, so we can't use the usual
+    # after-the-first-write() trick to pause or stop the download.
+    # Disabled until we find a better approach.
+    def OFF_test_retrieve_producer_sdmf(self):
+        data = "contents1" * 100000
+        d = self.nodemaker.create_mutable_file(MutableData(data),
+                                               version=SDMF_VERSION)
+        d.addCallback(lambda node: node.get_best_mutable_version())
+        d.addCallback(self._test_retrieve_producer, "SDMF", data)
         return d
 
+    def _test_retrieve_producer(self, version, kind, data):
+        # Now we'll retrieve it into a pausing consumer.
+        c = PausingConsumer()
+        d = version.read(c)
+        d.addCallback(lambda ign: self.failUnlessEqual(c.size, len(data)))
+
+        c2 = PausingAndStoppingConsumer()
+        d.addCallback(lambda ign:
+                      self.shouldFail(DownloadStopped, kind+"_pause_stop",
+                                      "our Consumer called stopProducing()",
+                                      version.read, c2))
+
+        c3 = StoppingConsumer()
+        d.addCallback(lambda ign:
+                      self.shouldFail(DownloadStopped, kind+"_stop",
+                                      "our Consumer called stopProducing()",
+                                      version.read, c3))
+
+        c4 = ImmediatelyStoppingConsumer()
+        d.addCallback(lambda ign:
+                      self.shouldFail(DownloadStopped, kind+"_stop_imm",
+                                      "our Consumer called stopProducing()",
+                                      version.read, c4))
+
+        def _then(ign):
+            c5 = MemoryConsumer()
+            d1 = version.read(c5)
+            c5.producer.stopProducing()
+            return self.shouldFail(DownloadStopped, kind+"_stop_imm2",
+                                   "our Consumer called stopProducing()",
+                                   lambda: d1)
+        d.addCallback(_then)
+        return d
 
     def test_download_from_mdmf_cap(self):
         # We should be able to download an MDMF file given its cap
         d = self.nodemaker.create_mutable_file(version=MDMF_VERSION)
         def _created(node):
             self.uri = node.get_uri()
+            # also confirm that the cap has no extension fields
+            pieces = self.uri.split(":")
+            self.failUnlessEqual(len(pieces), 4)
 
             return node.overwrite(MutableData("contents1" * 100000))
         def _then(ignored):
@@ -579,37 +591,6 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
         return d
 
 
-    def test_create_and_download_from_bare_mdmf_cap(self):
-        # MDMF caps have extension parameters on them by default. We
-        # need to make sure that they work without extension parameters.
-        contents = MutableData("contents" * 100000)
-        d = self.nodemaker.create_mutable_file(version=MDMF_VERSION,
-                                               contents=contents)
-        def _created(node):
-            uri = node.get_uri()
-            self._created = node
-            self.failUnlessIn(":3:131073", uri)
-            # Now strip that off the end of the uri, then try creating
-            # and downloading the node again.
-            bare_uri = uri.replace(":3:131073", "")
-            assert ":3:131073" not in bare_uri
-
-            return self.nodemaker.create_from_cap(bare_uri)
-        d.addCallback(_created)
-        def _created_bare(node):
-            self.failUnlessEqual(node.get_writekey(),
-                                 self._created.get_writekey())
-            self.failUnlessEqual(node.get_readkey(),
-                                 self._created.get_readkey())
-            self.failUnlessEqual(node.get_storage_index(),
-                                 self._created.get_storage_index())
-            return node.download_best_version()
-        d.addCallback(_created_bare)
-        d.addCallback(lambda data:
-            self.failUnlessEqual(data, "contents" * 100000))
-        return d
-
-
     def test_mdmf_write_count(self):
         # Publishing an MDMF file should only cause one write for each
         # share that is to be published. Otherwise, we introduce
@@ -661,25 +642,6 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
         d.addCallback(_created)
         return d
 
-
-    def test_response_cache_memory_leak(self):
-        d = self.nodemaker.create_mutable_file("contents")
-        def _created(n):
-            d = n.download_best_version()
-            d.addCallback(lambda res: self.failUnlessEqual(res, "contents"))
-            d.addCallback(lambda ign: self.failUnless(isinstance(n._cache, ResponseCache)))
-
-            def _check_cache(expected):
-                # The total size of cache entries should not increase on the second download;
-                # in fact the cache contents should be identical.
-                d2 = n.download_best_version()
-                d2.addCallback(lambda rep: self.failUnlessEqual(repr(n._cache.cache), expected))
-                return d2
-            d.addCallback(lambda ign: _check_cache(repr(n._cache.cache)))
-            return d
-        d.addCallback(_created)
-        return d
-
     def test_create_with_initial_contents_function(self):
         data = "initial contents"
         def _make_contents(n):
@@ -954,11 +916,13 @@ class PublishMixin:
         d.addCallback(_created)
         return d
 
-    def publish_mdmf(self):
+    def publish_mdmf(self, data=None):
         # like publish_one, except that the result is guaranteed to be
         # an MDMF file.
         # self.CONTENTS should have more than one segment.
-        self.CONTENTS = "This is an MDMF file" * 100000
+        if data is None:
+            data = "This is an MDMF file" * 100000
+        self.CONTENTS = data
         self.uploadable = MutableData(self.CONTENTS)
         self._storage = FakeStorage()
         self._nodemaker = make_nodemaker(self._storage)
@@ -971,10 +935,12 @@ class PublishMixin:
         return d
 
 
-    def publish_sdmf(self):
+    def publish_sdmf(self, data=None):
         # like publish_one, except that the result is guaranteed to be
         # an SDMF file
-        self.CONTENTS = "This is an SDMF file" * 1000
+        if data is None:
+            data = "This is an SDMF file" * 1000
+        self.CONTENTS = data
         self.uploadable = MutableData(self.CONTENTS)
         self._storage = FakeStorage()
         self._nodemaker = make_nodemaker(self._storage)
@@ -1044,30 +1010,6 @@ class PublishMixin:
                     index = versionmap[shnum]
                     shares[peerid][shnum] = oldshares[index][peerid][shnum]
 
-class PausingConsumer:
-    implements(IConsumer)
-    def __init__(self):
-        self.data = ""
-        self.already_paused = False
-
-    def registerProducer(self, producer, streaming):
-        self.producer = producer
-        self.producer.resumeProducing()
-
-    def unregisterProducer(self):
-        self.producer = None
-
-    def _unpause(self, ignored):
-        self.producer.resumeProducing()
-
-    def write(self, data):
-        self.data += data
-        if not self.already_paused:
-           self.producer.pauseProducing()
-           self.already_paused = True
-           reactor.callLater(15, self._unpause, None)
-
-
 class Servermap(unittest.TestCase, PublishMixin):
     def setUp(self):
         return self.publish_one()
@@ -1097,10 +1039,10 @@ class Servermap(unittest.TestCase, PublishMixin):
         self.failUnlessEqual(sm.recoverable_versions(), set([best]))
         self.failUnlessEqual(len(sm.shares_available()), 1)
         self.failUnlessEqual(sm.shares_available()[best], (num_shares, 3, 10))
-        shnum, peerids = sm.make_sharemap().items()[0]
-        peerid = list(peerids)[0]
-        self.failUnlessEqual(sm.version_on_peer(peerid, shnum), best)
-        self.failUnlessEqual(sm.version_on_peer(peerid, 666), None)
+        shnum, servers = sm.make_sharemap().items()[0]
+        server = list(servers)[0]
+        self.failUnlessEqual(sm.version_on_server(server, shnum), best)
+        self.failUnlessEqual(sm.version_on_server(server, 666), None)
         return sm
 
     def test_basic(self):
@@ -1170,10 +1112,10 @@ class Servermap(unittest.TestCase, PublishMixin):
             # mark the first 5 shares as corrupt, then update the servermap.
             # The map should not have the marked shares it in any more, and
             # new shares should be found to replace the missing ones.
-            for (shnum, peerid, timestamp) in shares:
+            for (shnum, server, timestamp) in shares:
                 if shnum < 5:
-                    self._corrupted.add( (peerid, shnum) )
-                    sm.mark_bad_share(peerid, shnum, "")
+                    self._corrupted.add( (server, shnum) )
+                    sm.mark_bad_share(server, shnum, "")
             return self.update_servermap(sm, MODE_WRITE)
         d.addCallback(_made_map)
         def _check_map(sm):
@@ -1181,10 +1123,10 @@ class Servermap(unittest.TestCase, PublishMixin):
             v = sm.best_recoverable_version()
             vm = sm.make_versionmap()
             shares = list(vm[v])
-            for (peerid, shnum) in self._corrupted:
-                peer_shares = sm.shares_on_peer(peerid)
-                self.failIf(shnum in peer_shares,
-                            "%d was in %s" % (shnum, peer_shares))
+            for (server, shnum) in self._corrupted:
+                server_shares = sm.debug_shares_on_server(server)
+                self.failIf(shnum in server_shares,
+                            "%d was in %s" % (shnum, server_shares))
             self.failUnlessEqual(len(shares), 5)
         d.addCallback(_check_map)
         return d
@@ -1334,7 +1276,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
     def do_download(self, servermap, version=None):
         if version is None:
             version = servermap.best_recoverable_version()
-        r = Retrieve(self._fn, servermap, version)
+        r = Retrieve(self._fn, self._storage_broker, servermap, version)
         c = consumer.MemoryConsumer()
         d = r.download(consumer=c)
         d.addCallback(lambda mc: "".join(mc.chunks))
@@ -1375,12 +1317,30 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
                 shares.clear()
             d1 = self.shouldFail(NotEnoughSharesError,
                                  "test_all_shares_vanished",
-                                 "ran out of peers",
+                                 "ran out of servers",
                                  self.do_download, servermap)
             return d1
         d.addCallback(_remove_shares)
         return d
 
+    def test_all_but_two_shares_vanished_updated_servermap(self):
+        # tests error reporting for ticket #1742
+        d = self.make_servermap()
+        def _remove_shares(servermap):
+            self._version = servermap.best_recoverable_version()
+            for shares in self._storage._peers.values()[2:]:
+                shares.clear()
+            return self.make_servermap(servermap)
+        d.addCallback(_remove_shares)
+        def _check(updated_servermap):
+            d1 = self.shouldFail(NotEnoughSharesError,
+                                 "test_all_but_two_shares_vanished_updated_servermap",
+                                 "ran out of servers",
+                                 self.do_download, updated_servermap, version=self._version)
+            return d1
+        d.addCallback(_check)
+        return d
+
     def test_no_servers(self):
         sb2 = make_storagebroker(num_peers=0)
         # if there are no servers, then a MODE_READ servermap should come
@@ -1390,7 +1350,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
             self.failUnlessEqual(servermap.best_recoverable_version(), None)
             self.failIf(servermap.recoverable_versions())
             self.failIf(servermap.unrecoverable_versions())
-            self.failIf(servermap.all_peers())
+            self.failIf(servermap.all_servers())
         d.addCallback(_check_servermap)
         return d
 
@@ -1432,7 +1392,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
                 # no recoverable versions == not succeeding. The problem
                 # should be noted in the servermap's list of problems.
                 if substring:
-                    allproblems = [str(f) for f in servermap.problems]
+                    allproblems = [str(f) for f in servermap.get_problems()]
                     self.failUnlessIn(substring, "".join(allproblems))
                 return servermap
             if should_succeed:
@@ -1533,7 +1493,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
 
 
     def test_corrupt_all_encprivkey_late(self):
-        # this should work for the same reason as above, but we corrupt 
+        # this should work for the same reason as above, but we corrupt
         # after the servermap update to exercise the error handling
         # code.
         # We need to remove the privkey from the node, or the retrieve
@@ -1546,23 +1506,17 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
                                       fetch_privkey=True)
 
 
-    def test_corrupt_all_seqnum_late(self):
+    # disabled until retrieve tests checkstring on each blockfetch. I didn't
+    # just use a .todo because the failing-but-ignored test emits about 30kB
+    # of noise.
+    def OFF_test_corrupt_all_seqnum_late(self):
         # corrupting the seqnum between mapupdate and retrieve should result
         # in NotEnoughSharesError, since each share will look invalid
         def _check(res):
             f = res[0]
             self.failUnless(f.check(NotEnoughSharesError))
             self.failUnless("uncoordinated write" in str(f))
-        return self._test_corrupt_all(1, "ran out of peers",
-                                      corrupt_early=False,
-                                      failure_checker=_check)
-
-    def test_corrupt_all_block_hash_tree_late(self):
-        def _check(res):
-            f = res[0]
-            self.failUnless(f.check(NotEnoughSharesError))
-        return self._test_corrupt_all("block_hash_tree",
-                                      "block hash tree failure",
+        return self._test_corrupt_all(1, "ran out of servers",
                                       corrupt_early=False,
                                       failure_checker=_check)
 
@@ -1591,11 +1545,11 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
                       shnums_to_corrupt=range(0, N-k))
         d.addCallback(lambda res: self.make_servermap())
         def _do_retrieve(servermap):
-            self.failUnless(servermap.problems)
+            self.failUnless(servermap.get_problems())
             self.failUnless("pubkey doesn't match fingerprint"
-                            in str(servermap.problems[0]))
+                            in str(servermap.get_problems()[0]))
             ver = servermap.best_recoverable_version()
-            r = Retrieve(self._fn, servermap, ver)
+            r = Retrieve(self._fn, self._storage_broker, servermap, ver)
             c = consumer.MemoryConsumer()
             return r.download(c)
         d.addCallback(_do_retrieve)
@@ -1647,17 +1601,20 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
         d.addCallback(lambda ignored:
             self._test_corrupt_all(("block_hash_tree", 12 * 32),
                                    "block hash tree failure",
-                                   corrupt_early=False,
+                                   corrupt_early=True,
                                    should_succeed=False))
         return d
 
 
     def test_corrupt_mdmf_block_hash_tree_late(self):
+        # Note - there is no SDMF counterpart to this test, as the SDMF
+        # files are guaranteed to have exactly one block, and therefore
+        # the block hash tree fits within the initial read (#1240).
         d = self.publish_mdmf()
         d.addCallback(lambda ignored:
             self._test_corrupt_all(("block_hash_tree", 12 * 32),
                                    "block hash tree failure",
-                                   corrupt_early=True,
+                                   corrupt_early=False,
                                    should_succeed=False))
         return d
 
@@ -1689,14 +1646,14 @@ class CheckerMixin:
         return r
 
     def check_expected_failure(self, r, expected_exception, substring, where):
-        for (peerid, storage_index, shnum, f) in r.problems:
+        for (peerid, storage_index, shnum, f) in r.get_share_problems():
             if f.check(expected_exception):
                 self.failUnless(substring in str(f),
                                 "%s: substring '%s' not in '%s'" %
                                 (where, substring, str(f)))
                 return
         self.fail("%s: didn't see expected exception %s in problems %s" %
-                  (where, expected_exception, r.problems))
+                  (where, expected_exception, r.get_share_problems()))
 
 
 class Checker(unittest.TestCase, CheckerMixin, PublishMixin):
@@ -1773,6 +1730,33 @@ class Checker(unittest.TestCase, CheckerMixin, PublishMixin):
         d.addCallback(self.check_bad, "test_check_mdmf_all_bad_sig")
         return d
 
+    def test_verify_mdmf_all_bad_sharedata(self):
+        d = self.publish_mdmf()
+        # On 8 of the shares, corrupt the beginning of the share data.
+        # The signature check during the servermap update won't catch this.
+        d.addCallback(lambda ignored:
+            corrupt(None, self._storage, "share_data", range(8)))
+        # On 2 of the shares, corrupt the end of the share data.
+        # The signature check during the servermap update won't catch
+        # this either, and the retrieval process will have to process
+        # all of the segments before it notices.
+        d.addCallback(lambda ignored:
+            # the block hash tree comes right after the share data, so if we
+            # corrupt a little before the block hash tree, we'll corrupt in the
+            # last block of each share.
+            corrupt(None, self._storage, "block_hash_tree", [8, 9], -5))
+        d.addCallback(lambda ignored:
+            self._fn.check(Monitor(), verify=True))
+        # The verifier should flag the file as unhealthy, and should
+        # list all 10 shares as bad.
+        d.addCallback(self.check_bad, "test_verify_mdmf_all_bad_sharedata")
+        def _check_num_bad(r):
+            self.failIf(r.is_recoverable())
+            smap = r.get_servermap()
+            self.failUnlessEqual(len(smap.get_bad_shares()), 10)
+        d.addCallback(_check_num_bad)
+        return d
+
     def test_check_all_bad_blocks(self):
         d = corrupt(None, self._storage, "share_data", [9]) # bad blocks
         # the Checker won't notice this.. it doesn't look at actual data
@@ -1909,6 +1893,19 @@ class Checker(unittest.TestCase, CheckerMixin, PublishMixin):
                       "test_verify_mdmf_bad_encprivkey_uncheckable")
         return d
 
+    def test_verify_sdmf_empty(self):
+        d = self.publish_sdmf("")
+        d.addCallback(lambda ignored: self._fn.check(Monitor(), verify=True))
+        d.addCallback(self.check_good, "test_verify_sdmf")
+        d.addCallback(flushEventualQueue)
+        return d
+
+    def test_verify_mdmf_empty(self):
+        d = self.publish_mdmf("")
+        d.addCallback(lambda ignored: self._fn.check(Monitor(), verify=True))
+        d.addCallback(self.check_good, "test_verify_mdmf")
+        d.addCallback(flushEventualQueue)
+        return d
 
 class Repair(unittest.TestCase, PublishMixin, ShouldFailMixin):
 
@@ -1976,102 +1973,78 @@ class Repair(unittest.TestCase, PublishMixin, ShouldFailMixin):
         self.failUnlessEqual(old_shares, current_shares)
 
 
-    def test_unrepairable_0shares(self):
-        d = self.publish_one()
-        def _delete_all_shares(ign):
+    def _test_whether_repairable(self, publisher, nshares, expected_result):
+        d = publisher()
+        def _delete_some_shares(ign):
             shares = self._storage._peers
             for peerid in shares:
-                shares[peerid] = {}
-        d.addCallback(_delete_all_shares)
+                for shnum in list(shares[peerid]):
+                    if shnum >= nshares:
+                        del shares[peerid][shnum]
+        d.addCallback(_delete_some_shares)
         d.addCallback(lambda ign: self._fn.check(Monitor()))
-        d.addCallback(lambda check_results: self._fn.repair(check_results))
-        def _check(crr):
-            self.failUnlessEqual(crr.get_successful(), False)
+        def _check(cr):
+            self.failIf(cr.is_healthy())
+            self.failUnlessEqual(cr.is_recoverable(), expected_result)
+            return cr
         d.addCallback(_check)
-        return d
-
-    def test_mdmf_unrepairable_0shares(self):
-        d = self.publish_mdmf()
-        def _delete_all_shares(ign):
-            shares = self._storage._peers
-            for peerid in shares:
-                shares[peerid] = {}
-        d.addCallback(_delete_all_shares)
-        d.addCallback(lambda ign: self._fn.check(Monitor()))
         d.addCallback(lambda check_results: self._fn.repair(check_results))
-        d.addCallback(lambda crr: self.failIf(crr.get_successful()))
+        d.addCallback(lambda crr: self.failUnlessEqual(crr.get_successful(), expected_result))
         return d
 
+    def test_unrepairable_0shares(self):
+        return self._test_whether_repairable(self.publish_one, 0, False)
+
+    def test_mdmf_unrepairable_0shares(self):
+        return self._test_whether_repairable(self.publish_mdmf, 0, False)
 
     def test_unrepairable_1share(self):
-        d = self.publish_one()
-        def _delete_all_shares(ign):
-            shares = self._storage._peers
-            for peerid in shares:
-                for shnum in list(shares[peerid]):
-                    if shnum > 0:
-                        del shares[peerid][shnum]
-        d.addCallback(_delete_all_shares)
-        d.addCallback(lambda ign: self._fn.check(Monitor()))
-        d.addCallback(lambda check_results: self._fn.repair(check_results))
-        def _check(crr):
-            self.failUnlessEqual(crr.get_successful(), False)
-        d.addCallback(_check)
-        return d
+        return self._test_whether_repairable(self.publish_one, 1, False)
 
     def test_mdmf_unrepairable_1share(self):
-        d = self.publish_mdmf()
-        def _delete_all_shares(ign):
-            shares = self._storage._peers
-            for peerid in shares:
-                for shnum in list(shares[peerid]):
-                    if shnum > 0:
-                        del shares[peerid][shnum]
-        d.addCallback(_delete_all_shares)
-        d.addCallback(lambda ign: self._fn.check(Monitor()))
-        d.addCallback(lambda check_results: self._fn.repair(check_results))
-        def _check(crr):
-            self.failUnlessEqual(crr.get_successful(), False)
-        d.addCallback(_check)
-        return d
+        return self._test_whether_repairable(self.publish_mdmf, 1, False)
 
     def test_repairable_5shares(self):
-        d = self.publish_mdmf()
-        def _delete_all_shares(ign):
-            shares = self._storage._peers
-            for peerid in shares:
-                for shnum in list(shares[peerid]):
-                    if shnum > 4:
-                        del shares[peerid][shnum]
-        d.addCallback(_delete_all_shares)
-        d.addCallback(lambda ign: self._fn.check(Monitor()))
-        d.addCallback(lambda check_results: self._fn.repair(check_results))
-        def _check(crr):
-            self.failUnlessEqual(crr.get_successful(), True)
-        d.addCallback(_check)
-        return d
+        return self._test_whether_repairable(self.publish_one, 5, True)
 
     def test_mdmf_repairable_5shares(self):
-        d = self.publish_mdmf()
+        return self._test_whether_repairable(self.publish_mdmf, 5, True)
+
+    def _test_whether_checkandrepairable(self, publisher, nshares, expected_result):
+        """
+        Like the _test_whether_repairable tests, but invoking check_and_repair
+        instead of invoking check and then invoking repair.
+        """
+        d = publisher()
         def _delete_some_shares(ign):
             shares = self._storage._peers
             for peerid in shares:
                 for shnum in list(shares[peerid]):
-                    if shnum > 5:
+                    if shnum >= nshares:
                         del shares[peerid][shnum]
         d.addCallback(_delete_some_shares)
-        d.addCallback(lambda ign: self._fn.check(Monitor()))
-        def _check(cr):
-            self.failIf(cr.is_healthy())
-            self.failUnless(cr.is_recoverable())
-            return cr
-        d.addCallback(_check)
-        d.addCallback(lambda check_results: self._fn.repair(check_results))
-        def _check1(crr):
-            self.failUnlessEqual(crr.get_successful(), True)
-        d.addCallback(_check1)
+        d.addCallback(lambda ign: self._fn.check_and_repair(Monitor()))
+        d.addCallback(lambda crr: self.failUnlessEqual(crr.get_repair_successful(), expected_result))
         return d
 
+    def test_unrepairable_0shares_checkandrepair(self):
+        return self._test_whether_checkandrepairable(self.publish_one, 0, False)
+
+    def test_mdmf_unrepairable_0shares_checkandrepair(self):
+        return self._test_whether_checkandrepairable(self.publish_mdmf, 0, False)
+
+    def test_unrepairable_1share_checkandrepair(self):
+        return self._test_whether_checkandrepairable(self.publish_one, 1, False)
+
+    def test_mdmf_unrepairable_1share_checkandrepair(self):
+        return self._test_whether_checkandrepairable(self.publish_mdmf, 1, False)
+
+    def test_repairable_5shares_checkandrepair(self):
+        return self._test_whether_checkandrepairable(self.publish_one, 5, True)
+
+    def test_mdmf_repairable_5shares_checkandrepair(self):
+        return self._test_whether_checkandrepairable(self.publish_mdmf, 5, True)
+
 
     def test_merge(self):
         self.old_shares = []
@@ -2180,6 +2153,26 @@ class Repair(unittest.TestCase, PublishMixin, ShouldFailMixin):
         d.addCallback(_check_results)
         return d
 
+    def test_repair_empty(self):
+        # bug 1689: delete one share of an empty mutable file, then repair.
+        # In the buggy version, the check that precedes the retrieve+publish
+        # cycle uses MODE_READ, instead of MODE_REPAIR, and fails to get the
+        # privkey that repair needs.
+        d = self.publish_sdmf("")
+        def _delete_one_share(ign):
+            shares = self._storage._peers
+            for peerid in shares:
+                for shnum in list(shares[peerid]):
+                    if shnum == 0:
+                        del shares[peerid][shnum]
+        d.addCallback(_delete_one_share)
+        d.addCallback(lambda ign: self._fn2.check(Monitor()))
+        d.addCallback(lambda check_results: self._fn2.repair(check_results))
+        def _check(crr):
+            self.failUnlessEqual(crr.get_successful(), True)
+        d.addCallback(_check)
+        return d
+
 class DevNullDictionary(dict):
     def __setitem__(self, key, value):
         return
@@ -2239,9 +2232,9 @@ class MultipleEncodings(unittest.TestCase):
         # then mix up the shares, to make sure that download survives seeing
         # a variety of encodings. This is actually kind of tricky to set up.
 
-        contents1 = "Contents for encoding 1 (3-of-10) go here"
-        contents2 = "Contents for encoding 2 (4-of-9) go here"
-        contents3 = "Contents for encoding 3 (4-of-7) go here"
+        contents1 = "Contents for encoding 1 (3-of-10) go here"*1000
+        contents2 = "Contents for encoding 2 (4-of-9) go here"*1000
+        contents3 = "Contents for encoding 3 (4-of-7) go here"*1000
 
         # we make a retrieval object that doesn't know what encoding
         # parameters to use
@@ -2409,39 +2402,6 @@ class MultipleVersions(unittest.TestCase, PublishMixin, CheckerMixin):
         return d
 
 
-class Utils(unittest.TestCase):
-    def test_cache(self):
-        c = ResponseCache()
-        # xdata = base62.b2a(os.urandom(100))[:100]
-        xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"
-        ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"
-        c.add("v1", 1, 0, xdata)
-        c.add("v1", 1, 2000, ydata)
-        self.failUnlessEqual(c.read("v2", 1, 10, 11), None)
-        self.failUnlessEqual(c.read("v1", 2, 10, 11), None)
-        self.failUnlessEqual(c.read("v1", 1, 0, 10), xdata[:10])
-        self.failUnlessEqual(c.read("v1", 1, 90, 10), xdata[90:])
-        self.failUnlessEqual(c.read("v1", 1, 300, 10), None)
-        self.failUnlessEqual(c.read("v1", 1, 2050, 5), ydata[50:55])
-        self.failUnlessEqual(c.read("v1", 1, 0, 101), None)
-        self.failUnlessEqual(c.read("v1", 1, 99, 1), xdata[99:100])
-        self.failUnlessEqual(c.read("v1", 1, 100, 1), None)
-        self.failUnlessEqual(c.read("v1", 1, 1990, 9), None)
-        self.failUnlessEqual(c.read("v1", 1, 1990, 10), None)
-        self.failUnlessEqual(c.read("v1", 1, 1990, 11), None)
-        self.failUnlessEqual(c.read("v1", 1, 1990, 15), None)
-        self.failUnlessEqual(c.read("v1", 1, 1990, 19), None)
-        self.failUnlessEqual(c.read("v1", 1, 1990, 20), None)
-        self.failUnlessEqual(c.read("v1", 1, 1990, 21), None)
-        self.failUnlessEqual(c.read("v1", 1, 1990, 25), None)
-        self.failUnlessEqual(c.read("v1", 1, 1999, 25), None)
-
-        # test joining fragments
-        c = ResponseCache()
-        c.add("v1", 1, 0, xdata[:10])
-        c.add("v1", 1, 10, xdata[10:20])
-        self.failUnlessEqual(c.read("v1", 1, 0, 20), xdata[:20])
-
 class Exceptions(unittest.TestCase):
     def test_repr(self):
         nmde = NeedMoreDataError(100, 50, 100)
@@ -2449,6 +2409,7 @@ class Exceptions(unittest.TestCase):
         ucwe = UncoordinatedWriteError()
         self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
 
+
 class SameKeyGenerator:
     def __init__(self, pubkey, privkey):
         self.pubkey = pubkey
@@ -2481,11 +2442,12 @@ class FirstServerGetsDeleted:
         return retval
 
 class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
-    def test_publish_surprise(self):
-        self.basedir = "mutable/Problems/test_publish_surprise"
+    def do_publish_surprise(self, version):
+        self.basedir = "mutable/Problems/test_publish_surprise_%s" % version
         self.set_up_grid()
         nm = self.g.clients[0].nodemaker
-        d = nm.create_mutable_file(MutableData("contents 1"))
+        d = nm.create_mutable_file(MutableData("contents 1"),
+                                    version=version)
         def _created(n):
             d = defer.succeed(None)
             d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
@@ -2509,11 +2471,17 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
         d.addCallback(_created)
         return d
 
+    def test_publish_surprise_sdmf(self):
+        return self.do_publish_surprise(SDMF_VERSION)
+
+    def test_publish_surprise_mdmf(self):
+        return self.do_publish_surprise(MDMF_VERSION)
+
     def test_retrieve_surprise(self):
         self.basedir = "mutable/Problems/test_retrieve_surprise"
         self.set_up_grid()
         nm = self.g.clients[0].nodemaker
-        d = nm.create_mutable_file(MutableData("contents 1"))
+        d = nm.create_mutable_file(MutableData("contents 1"*4000))
         def _created(n):
             d = defer.succeed(None)
             d.addCallback(lambda res: n.get_servermap(MODE_READ))
@@ -2527,12 +2495,11 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
             # now attempt to retrieve the old version with the old servermap.
             # This will look like someone has changed the file since we
             # updated the servermap.
-            d.addCallback(lambda res: n._cache._clear())
             d.addCallback(lambda res: log.msg("starting doomed read"))
             d.addCallback(lambda res:
                           self.shouldFail(NotEnoughSharesError,
                                           "test_retrieve_surprise",
-                                          "ran out of peers: have 0 of 1",
+                                          "ran out of servers: have 0 of 1",
                                           n.download_version,
                                           self.old_map,
                                           self.old_map.best_recoverable_version(),
@@ -2559,7 +2526,7 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
                 # stash the old state of the file
                 self.old_map = smap
                 # now shut down one of the servers
-                peer0 = list(smap.make_sharemap()[0])[0]
+                peer0 = list(smap.make_sharemap()[0])[0].get_serverid()
                 self.g.remove_server(peer0)
                 # then modify the file, leaving the old map untouched
                 log.msg("starting winning write")
@@ -2578,6 +2545,44 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
         d.addCallback(_created)
         return d
 
+    def test_multiply_placed_shares(self):
+        self.basedir = "mutable/Problems/test_multiply_placed_shares"
+        self.set_up_grid()
+        nm = self.g.clients[0].nodemaker
+        d = nm.create_mutable_file(MutableData("contents 1"))
+        # remove one of the servers and reupload the file.
+        def _created(n):
+            self._node = n
+
+            servers = self.g.get_all_serverids()
+            self.ss = self.g.remove_server(servers[len(servers)-1])
+
+            new_server = self.g.make_server(len(servers)-1)
+            self.g.add_server(len(servers)-1, new_server)
+
+            return self._node.download_best_version()
+        d.addCallback(_created)
+        d.addCallback(lambda data: MutableData(data))
+        d.addCallback(lambda data: self._node.overwrite(data))
+
+        # restore the server we removed earlier, then download+upload
+        # the file again
+        def _overwritten(ign):
+            self.g.add_server(len(self.g.servers_by_number), self.ss)
+            return self._node.download_best_version()
+        d.addCallback(_overwritten)
+        d.addCallback(lambda data: MutableData(data))
+        d.addCallback(lambda data: self._node.overwrite(data))
+        d.addCallback(lambda ignored:
+            self._node.get_servermap(MODE_CHECK))
+        def _overwritten_again(smap):
+            # Make sure that all shares were updated by making sure that
+            # there aren't any other versions in the sharemap.
+            self.failUnlessEqual(len(smap.recoverable_versions()), 1)
+            self.failUnlessEqual(len(smap.unrecoverable_versions()), 0)
+        d.addCallback(_overwritten_again)
+        return d
+
     def test_bad_server(self):
         # Break one server, then create the file: the initial publish should
         # complete with an alternate server. Breaking a second server should
@@ -2796,6 +2801,155 @@ class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
             self.failUnlessEqual(data, CONTENTS))
         return d
 
+    def test_1654(self):
+        # test that the Retrieve object unconditionally verifies the block
+        # hash tree root for mutable shares. The failure mode is that
+        # carefully crafted shares can cause undetected corruption (the
+        # retrieve appears to finish successfully, but the result is
+        # corrupted). When fixed, these shares always cause a
+        # CorruptShareError, which results in NotEnoughSharesError in this
+        # 2-of-2 file.
+        self.basedir = "mutable/Problems/test_1654"
+        self.set_up_grid(num_servers=2)
+        cap = uri.from_string(TEST_1654_CAP)
+        si = cap.get_storage_index()
+
+        for share, shnum in [(TEST_1654_SH0, 0), (TEST_1654_SH1, 1)]:
+            sharedata = base64.b64decode(share)
+            storedir = self.get_serverdir(shnum)
+            storage_path = os.path.join(storedir, "shares",
+                                        storage_index_to_dir(si))
+            fileutil.make_dirs(storage_path)
+            fileutil.write(os.path.join(storage_path, "%d" % shnum),
+                           sharedata)
+
+        nm = self.g.clients[0].nodemaker
+        n = nm.create_from_cap(TEST_1654_CAP)
+        # to exercise the problem correctly, we must ensure that sh0 is
+        # processed first, and sh1 second. NoNetworkGrid has facilities to
+        # stall the first request from a single server, but it's not
+        # currently easy to extend that to stall the second request (mutable
+        # retrievals will see two: first the mapupdate, then the fetch).
+        # However, repeated executions of this run without the #1654 fix
+        # suggests that we're failing reliably even without explicit stalls,
+        # probably because the servers are queried in a fixed order. So I'm
+        # ok with relying upon that.
+        d = self.shouldFail(NotEnoughSharesError, "test #1654 share corruption",
+                            "ran out of servers",
+                            n.download_best_version)
+        return d
+
+
+TEST_1654_CAP = "URI:SSK:6jthysgozssjnagqlcxjq7recm:yxawei54fmf2ijkrvs2shs6iey4kpdp6joi7brj2vrva6sp5nf3a"
+
+TEST_1654_SH0 = """\
+VGFob2UgbXV0YWJsZSBjb250YWluZXIgdjEKdQlEA46m9s5j6lnzsOHytBTs2JOo
+AkWe8058hyrDa8igfBSqZMKO3aDOrFuRVt0ySYZ6oihFqPJRAAAAAAAAB8YAAAAA
+AAAJmgAAAAFPNgDkK8brSCzKz6n8HFqzbnAlALvnaB0Qpa1Bjo9jiZdmeMyneHR+
+UoJcDb1Ls+lVLeUqP2JitBEXdCzcF/X2YMDlmKb2zmPqWfOw4fK0FOzYk6gCRZ7z
+AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABCDwr
+uIlhFlv21pDqyMeA9X1wHp98a1CKY4qfC7gn5exyODAcnhZKHCV18XBerbZLAgIA
+AAAAAAAAJgAAAAAAAAAmAAABjwAAAo8AAALTAAAC8wAAAAAAAAMGAAAAAAAAB8Yw
+ggEgMA0GCSqGSIb3DQEBAQUAA4IBDQAwggEIAoIBAQCXKMor062nfxHVutMbqNcj
+vVC92wXTcQulenNWEX+0huK54igTAG60p0lZ6FpBJ9A+dlStT386bn5I6qe50ky5
+CFodQSsQX+1yByMFlzqPDo4rclk/6oVySLypxnt/iBs3FPZ4zruhYXcITc6zaYYU
+Xqaw/C86g6M06MWQKsGev7PS3tH7q+dtovWzDgU13Q8PG2whGvGNfxPOmEX4j0wL
+FCBavpFnLpo3bJrj27V33HXxpPz3NP+fkaG0pKH03ANd/yYHfGf74dC+eD5dvWBM
+DU6fZQN4k/T+cth+qzjS52FPPTY9IHXIb4y+1HryVvxcx6JDifKoOzpFc3SDbBAP
+AgERKDjOFxVClH81DF/QkqpP0glOh6uTsFNx8Nes02q0d7iip2WqfG9m2+LmiWy8
+Pg7RlQQy2M45gert1EDsH4OI69uxteviZP1Mo0wD6HjmWUbGIQRmsT3DmYEZCCMA
+/KjhNmlov2+OhVxIaHwE7aN840IfkGdJ/JssB6Z/Ym3+ou4+jAYKhifPQGrpBVjd
+73oH6w9StnoGYIrEEQw8LFc4jnAFYciKlPuo6E6E3zDseE7gwkcOpCtVVksZu6Ii
+GQgIV8vjFbNz9M//RMXOBTwKFDiG08IAPh7fv2uKzFis0TFrR7sQcMQ/kZZCLPPi
+ECIX95NRoFRlxK/1kZ1+FuuDQgABz9+5yd/pjkVybmvc7Jr70bOVpxvRoI2ZEgh/
++QdxfcwAAm5iDnzPtsVdcbuNkKprfI8N4n+QmUOSMbAJ7M8r1cp4z9+5yd/pjkVy
+bmvc7Jr70bOVpxvRoI2ZEgh/+QdxfcxGzRV0shAW86irr5bDQOyyknYk0p2xw2Wn
+z6QccyXyobXPOFLO3ZBPnKaE58aaN7x3srQZYUKafet5ZMDX8fsQf2mbxnaeG5NF
+eO6wG++WBUo9leddnzKBnRcMGRAtJEjwfKMVPE8SmuTlL6kRc7n8wvY2ygClWlRm
+d7o95tZfoO+mexB/DLEpWLtlAiqh8yJ8cWaC5rYz4ZC2+z7QkeKXCHWAN3i4C++u
+dfZoD7qWnyAldYTydADwL885dVY7WN6NX9YtQrG3JGrp3wZvFrX5x9Jv7hls0A6l
+2xI4NlcSSrgWIjzrGdwQEjIUDyfc7DWroEpJEfIaSnjkeTT0D8WV5NqzWH8UwWoF
+wjwDltaQ3Y8O/wJPGBqBAJEob+p6QxvP5T2W1jnOvbgsMZLNDuY6FF1XcuR7yvNF
+sXKP6aXMV8BKSlrehFlpBMTu4HvJ1rZlKuxgR1A9njiaKD2U0NitCKMIpIXQxT6L
+eZn9M8Ky68m0Zjdw/WCsKz22GTljSM5Nfme32BrW+4G+R55ECwZ1oh08nrnWjXmw
+PlSHj2lwpnsuOG2fwJkyMnIIoIUII31VLATeLERD9HfMK8/+uZqJ2PftT2fhHL/u
+CDCIdEWSUBBHpA7p8BbgiZKCpYzf+pbS2/EJGL8gQAvSH1atGv/o0BiAd10MzTXC
+Xn5xDB1Yh+FtYPYloBGAwmxKieDMnsjy6wp5ovdmOc2y6KBr27DzgEGchLyOxHV4
+Q7u0Hkm7Om33ir1TUgK6bdPFL8rGNDOZq/SR4yn4qSsQTPD6Y/HQSK5GzkU4dGLw
+tU6GNpu142QE36NfWkoUWHKf1YgIYrlAGJWlj93et54ZGUZGVN7pAspZ+mvoMnDU
+Jh46nrQsEJiQz8AqgREck4Fi4S7Rmjh/AhXmzFWFca3YD0BmuYU6fxGTRPZ70eys
+LV5qPTmTGpX+bpvufAp0vznkiOdqTn1flnxdslM2AukiD6OwkX1dBH8AvzObhbz0
+ABhx3c+cAhAnYhJmsYaAwbpWpp8CM5opmsRgwgaz8f8lxiRfXbrWD8vdd4dm2B9J
+jaiGCR8/UXHFBGZhCgLB2S+BNXKynIeP+POGQtMIIERUtwOIKt1KfZ9jZwf/ulJK
+fv/VmBPmGu+CHvFIlHAzlxwJeUz8wSltUeeHjADZ9Wag5ESN3R6hsmJL+KL4av5v
+DFobNPiNWbc+4H+3wg1R0oK/uTQb8u1S7uWIGVmi5fJ4rVVZ/VKKtHGVwm/8OGKF
+tcrJFJcJADFVkgpsqN8UINsMJLxfJRoBgABEWih5DTRwNXK76Ma2LjDBrEvxhw8M
+7SLKhi5vH7/Cs7jfLZFgh2T6flDV4VM/EA7CYEHgEb8MFmioFGOmhUpqifkA3SdX
+jGi2KuZZ5+O+sHFWXsUjiFPEzUJF+syPEzH1aF5R+F8pkhifeYh0KP6OHd6Sgn8s
+TStXB+q0MndBXw5ADp/Jac1DVaSWruVAdjemQ+si1olk8xH+uTMXU7PgV9WkpIiy
+4BhnFU9IbCr/m7806c13xfeelaffP2pr7EDdgwz5K89VWCa3k9OSDnMtj2CQXlC7
+bQHi/oRGA1aHSn84SIt+HpAfRoVdr4N90bYWmYQNqfKoyWCbEr+dge/GSD1nddAJ
+72mXGlqyLyWYuAAAAAA="""
+
+TEST_1654_SH1 = """\
+VGFob2UgbXV0YWJsZSBjb250YWluZXIgdjEKdQlEA45R4Y4kuV458rSTGDVTqdzz
+9Fig3NQ3LermyD+0XLeqbC7KNgvv6cNzMZ9psQQ3FseYsIR1AAAAAAAAB8YAAAAA
+AAAJmgAAAAFPNgDkd/Y9Z+cuKctZk9gjwF8thT+fkmNCsulILsJw5StGHAA1f7uL
+MG73c5WBcesHB2epwazfbD3/0UZTlxXWXotywVHhjiS5XjnytJMYNVOp3PP0WKDc
+AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
+AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABCDwr
+uIlhFlv21pDqyMeA9X1wHp98a1CKY4qfC7gn5exyODAcnhZKHCV18XBerbZLAgIA
+AAAAAAAAJgAAAAAAAAAmAAABjwAAAo8AAALTAAAC8wAAAAAAAAMGAAAAAAAAB8Yw
+ggEgMA0GCSqGSIb3DQEBAQUAA4IBDQAwggEIAoIBAQCXKMor062nfxHVutMbqNcj
+vVC92wXTcQulenNWEX+0huK54igTAG60p0lZ6FpBJ9A+dlStT386bn5I6qe50ky5
+CFodQSsQX+1yByMFlzqPDo4rclk/6oVySLypxnt/iBs3FPZ4zruhYXcITc6zaYYU
+Xqaw/C86g6M06MWQKsGev7PS3tH7q+dtovWzDgU13Q8PG2whGvGNfxPOmEX4j0wL
+FCBavpFnLpo3bJrj27V33HXxpPz3NP+fkaG0pKH03ANd/yYHfGf74dC+eD5dvWBM
+DU6fZQN4k/T+cth+qzjS52FPPTY9IHXIb4y+1HryVvxcx6JDifKoOzpFc3SDbBAP
+AgERKDjOFxVClH81DF/QkqpP0glOh6uTsFNx8Nes02q0d7iip2WqfG9m2+LmiWy8
+Pg7RlQQy2M45gert1EDsH4OI69uxteviZP1Mo0wD6HjmWUbGIQRmsT3DmYEZCCMA
+/KjhNmlov2+OhVxIaHwE7aN840IfkGdJ/JssB6Z/Ym3+ou4+jAYKhifPQGrpBVjd
+73oH6w9StnoGYIrEEQw8LFc4jnAFYciKlPuo6E6E3zDseE7gwkcOpCtVVksZu6Ii
+GQgIV8vjFbNz9M//RMXOBTwKFDiG08IAPh7fv2uKzFis0TFrR7sQcMQ/kZZCLPPi
+ECIX95NRoFRlxK/1kZ1+FuuDQgABz9+5yd/pjkVybmvc7Jr70bOVpxvRoI2ZEgh/
++QdxfcwAAm5iDnzPtsVdcbuNkKprfI8N4n+QmUOSMbAJ7M8r1cp40cTBnAw+rMKC
+98P4pURrotx116Kd0i3XmMZu81ew57H3Zb73r+syQCXZNOP0xhMDclIt0p2xw2Wn
+z6QccyXyobXPOFLO3ZBPnKaE58aaN7x3srQZYUKafet5ZMDX8fsQf2mbxnaeG5NF
+eO6wG++WBUo9leddnzKBnRcMGRAtJEjwfKMVPE8SmuTlL6kRc7n8wvY2ygClWlRm
+d7o95tZfoO+mexB/DLEpWLtlAiqh8yJ8cWaC5rYz4ZC2+z7QkeKXCHWAN3i4C++u
+dfZoD7qWnyAldYTydADwL885dVY7WN6NX9YtQrG3JGrp3wZvFrX5x9Jv7hls0A6l
+2xI4NlcSSrgWIjzrGdwQEjIUDyfc7DWroEpJEfIaSnjkeTT0D8WV5NqzWH8UwWoF
+wjwDltaQ3Y8O/wJPGBqBAJEob+p6QxvP5T2W1jnOvbgsMZLNDuY6FF1XcuR7yvNF
+sXKP6aXMV8BKSlrehFlpBMTu4HvJ1rZlKuxgR1A9njiaKD2U0NitCKMIpIXQxT6L
+eZn9M8Ky68m0Zjdw/WCsKz22GTljSM5Nfme32BrW+4G+R55ECwZ1oh08nrnWjXmw
+PlSHj2lwpnsuOG2fwJkyMnIIoIUII31VLATeLERD9HfMK8/+uZqJ2PftT2fhHL/u
+CDCIdEWSUBBHpA7p8BbgiZKCpYzf+pbS2/EJGL8gQAvSH1atGv/o0BiAd10MzTXC
+Xn5xDB1Yh+FtYPYloBGAwmxKieDMnsjy6wp5ovdmOc2y6KBr27DzgEGchLyOxHV4
+Q7u0Hkm7Om33ir1TUgK6bdPFL8rGNDOZq/SR4yn4qSsQTPD6Y/HQSK5GzkU4dGLw
+tU6GNpu142QE36NfWkoUWHKf1YgIYrlAGJWlj93et54ZGUZGVN7pAspZ+mvoMnDU
+Jh46nrQsEJiQz8AqgREck4Fi4S7Rmjh/AhXmzFWFca3YD0BmuYU6fxGTRPZ70eys
+LV5qPTmTGpX+bpvufAp0vznkiOdqTn1flnxdslM2AukiD6OwkX1dBH8AvzObhbz0
+ABhx3c+cAhAnYhJmsYaAwbpWpp8CM5opmsRgwgaz8f8lxiRfXbrWD8vdd4dm2B9J
+jaiGCR8/UXHFBGZhCgLB2S+BNXKynIeP+POGQtMIIERUtwOIKt1KfZ9jZwf/ulJK
+fv/VmBPmGu+CHvFIlHAzlxwJeUz8wSltUeeHjADZ9Wag5ESN3R6hsmJL+KL4av5v
+DFobNPiNWbc+4H+3wg1R0oK/uTQb8u1S7uWIGVmi5fJ4rVVZ/VKKtHGVwm/8OGKF
+tcrJFJcJADFVkgpsqN8UINsMJLxfJRoBgABEWih5DTRwNXK76Ma2LjDBrEvxhw8M
+7SLKhi5vH7/Cs7jfLZFgh2T6flDV4VM/EA7CYEHgEb8MFmioFGOmhUpqifkA3SdX
+jGi2KuZZ5+O+sHFWXsUjiFPEzUJF+syPEzH1aF5R+F8pkhifeYh0KP6OHd6Sgn8s
+TStXB+q0MndBXw5ADp/Jac1DVaSWruVAdjemQ+si1olk8xH+uTMXU7PgV9WkpIiy
+4BhnFU9IbCr/m7806c13xfeelaffP2pr7EDdgwz5K89VWCa3k9OSDnMtj2CQXlC7
+bQHi/oRGA1aHSn84SIt+HpAfRoVdr4N90bYWmYQNqfKoyWCbEr+dge/GSD1nddAJ
+72mXGlqyLyWYuAAAAAA="""
+
 
 class FileHandle(unittest.TestCase):
     def setUp(self):
@@ -2917,33 +3071,105 @@ class Version(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin, \
         self.c = self.g.clients[0]
         self.nm = self.c.nodemaker
         self.data = "test data" * 100000 # about 900 KiB; MDMF
-        self.small_data = "test data" * 10 # about 90 B; SDMF
+        self.small_data = "test data" * 10 # 90 B; SDMF
 
 
-    def do_upload_mdmf(self):
-        d = self.nm.create_mutable_file(MutableData(self.data),
+    def do_upload_mdmf(self, data=None):
+        if data is None:
+            data = self.data
+        d = self.nm.create_mutable_file(MutableData(data),
                                         version=MDMF_VERSION)
         def _then(n):
             assert isinstance(n, MutableFileNode)
+            assert n._protocol_version == MDMF_VERSION
             self.mdmf_node = n
             return n
         d.addCallback(_then)
         return d
 
-    def do_upload_sdmf(self):
-        d = self.nm.create_mutable_file(MutableData(self.small_data))
+    def do_upload_sdmf(self, data=None):
+        if data is None:
+            data = self.small_data
+        d = self.nm.create_mutable_file(MutableData(data))
         def _then(n):
             assert isinstance(n, MutableFileNode)
+            assert n._protocol_version == SDMF_VERSION
             self.sdmf_node = n
             return n
         d.addCallback(_then)
         return d
 
+    def do_upload_empty_sdmf(self):
+        d = self.nm.create_mutable_file(MutableData(""))
+        def _then(n):
+            assert isinstance(n, MutableFileNode)
+            self.sdmf_zero_length_node = n
+            assert n._protocol_version == SDMF_VERSION
+            return n
+        d.addCallback(_then)
+        return d
+
     def do_upload(self):
         d = self.do_upload_mdmf()
         d.addCallback(lambda ign: self.do_upload_sdmf())
         return d
 
+    def test_debug(self):
+        d = self.do_upload_mdmf()
+        def _debug(n):
+            fso = debug.FindSharesOptions()
+            storage_index = base32.b2a(n.get_storage_index())
+            fso.si_s = storage_index
+            fso.nodedirs = [os.path.dirname(abspath_expanduser_unicode(unicode(storedir)))
+                            for (i,ss,storedir)
+                            in self.iterate_servers()]
+            fso.stdout = StringIO()
+            fso.stderr = StringIO()
+            debug.find_shares(fso)
+            sharefiles = fso.stdout.getvalue().splitlines()
+            expected = self.nm.default_encoding_parameters["n"]
+            self.failUnlessEqual(len(sharefiles), expected)
+
+            do = debug.DumpOptions()
+            do["filename"] = sharefiles[0]
+            do.stdout = StringIO()
+            debug.dump_share(do)
+            output = do.stdout.getvalue()
+            lines = set(output.splitlines())
+            self.failUnless("Mutable slot found:" in lines, output)
+            self.failUnless(" share_type: MDMF" in lines, output)
+            self.failUnless(" num_extra_leases: 0" in lines, output)
+            self.failUnless(" MDMF contents:" in lines, output)
+            self.failUnless("  seqnum: 1" in lines, output)
+            self.failUnless("  required_shares: 3" in lines, output)
+            self.failUnless("  total_shares: 10" in lines, output)
+            self.failUnless("  segsize: 131073" in lines, output)
+            self.failUnless("  datalen: %d" % len(self.data) in lines, output)
+            vcap = n.get_verify_cap().to_string()
+            self.failUnless("  verify-cap: %s" % vcap in lines, output)
+
+            cso = debug.CatalogSharesOptions()
+            cso.nodedirs = fso.nodedirs
+            cso.stdout = StringIO()
+            cso.stderr = StringIO()
+            debug.catalog_shares(cso)
+            shares = cso.stdout.getvalue().splitlines()
+            oneshare = shares[0] # all shares should be MDMF
+            self.failIf(oneshare.startswith("UNKNOWN"), oneshare)
+            self.failUnless(oneshare.startswith("MDMF"), oneshare)
+            fields = oneshare.split()
+            self.failUnlessEqual(fields[0], "MDMF")
+            self.failUnlessEqual(fields[1], storage_index)
+            self.failUnlessEqual(fields[2], "3/10")
+            self.failUnlessEqual(fields[3], "%d" % len(self.data))
+            self.failUnless(fields[4].startswith("#1:"), fields[3])
+            # the rest of fields[4] is the roothash, which depends upon
+            # encryption salts and is not constant. fields[5] is the
+            # remaining time on the longest lease, which is timing dependent.
+            # The rest of the line is the quoted pathname to the share.
+        d.addCallback(_debug)
+        return d
+
     def test_get_sequence_number(self):
         d = self.do_upload()
         d.addCallback(lambda ign: self.mdmf_node.get_best_readable_version())
@@ -2974,62 +3200,6 @@ class Version(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin, \
         return d
 
 
-    def test_version_extension_api(self):
-        # We need to define an API by which an uploader can set the
-        # extension parameters, and by which a downloader can retrieve
-        # extensions.
-        d = self.do_upload_mdmf()
-        d.addCallback(lambda ign: self.mdmf_node.get_best_mutable_version())
-        def _got_version(version):
-            hints = version.get_downloader_hints()
-            # Should be empty at this point.
-            self.failUnlessIn("k", hints)
-            self.failUnlessEqual(hints['k'], 3)
-            self.failUnlessIn('segsize', hints)
-            self.failUnlessEqual(hints['segsize'], 131073)
-        d.addCallback(_got_version)
-        return d
-
-
-    def test_extensions_from_cap(self):
-        # If we initialize a mutable file with a cap that has extension
-        # parameters in it and then grab the extension parameters using
-        # our API, we should see that they're set correctly.
-        d = self.do_upload_mdmf()
-        def _then(ign):
-            mdmf_uri = self.mdmf_node.get_uri()
-            new_node = self.nm.create_from_cap(mdmf_uri)
-            return new_node.get_best_mutable_version()
-        d.addCallback(_then)
-        def _got_version(version):
-            hints = version.get_downloader_hints()
-            self.failUnlessIn("k", hints)
-            self.failUnlessEqual(hints["k"], 3)
-            self.failUnlessIn("segsize", hints)
-            self.failUnlessEqual(hints["segsize"], 131073)
-        d.addCallback(_got_version)
-        return d
-
-
-    def test_extensions_from_upload(self):
-        # If we create a new mutable file with some contents, we should
-        # get back an MDMF cap with the right hints in place.
-        contents = "foo bar baz" * 100000
-        d = self.nm.create_mutable_file(contents, version=MDMF_VERSION)
-        def _got_mutable_file(n):
-            rw_uri = n.get_uri()
-            expected_k = str(self.c.DEFAULT_ENCODING_PARAMETERS['k'])
-            self.failUnlessIn(expected_k, rw_uri)
-            # XXX: Get this more intelligently.
-            self.failUnlessIn("131073", rw_uri)
-
-            ro_uri = n.get_readonly_uri()
-            self.failUnlessIn(expected_k, ro_uri)
-            self.failUnlessIn("131073", ro_uri)
-        d.addCallback(_got_mutable_file)
-        return d
-
-
     def test_cap_after_upload(self):
         # If we create a new mutable file and upload things to it, and
         # it's an MDMF file, we should get an MDMF cap back from that
@@ -3196,73 +3366,146 @@ class Version(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin, \
         return d
 
 
-    def test_partial_read(self):
-        # read only a few bytes at a time, and see that the results are
-        # what we expect.
-        d = self.do_upload_mdmf()
-        d.addCallback(lambda ign: self.mdmf_node.get_best_readable_version())
+    def _test_partial_read(self, node, expected, modes, step):
+        d = node.get_best_readable_version()
+        for (name, offset, length) in modes:
+            d.addCallback(self._do_partial_read, name, expected, offset, length)
+        # then read the whole thing, but only a few bytes at a time, and see
+        # that the results are what we expect.
         def _read_data(version):
             c = consumer.MemoryConsumer()
             d2 = defer.succeed(None)
-            for i in xrange(0, len(self.data), 10000):
-                d2.addCallback(lambda ignored, i=i: version.read(c, i, 10000))
+            for i in xrange(0, len(expected), step):
+                d2.addCallback(lambda ignored, i=i: version.read(c, i, step))
             d2.addCallback(lambda ignored:
-                self.failUnlessEqual(self.data, "".join(c.chunks)))
+                self.failUnlessEqual(expected, "".join(c.chunks)))
             return d2
         d.addCallback(_read_data)
         return d
 
-    def _test_partial_read(self, offset, length):
-        d = self.do_upload_mdmf()
-        d.addCallback(lambda ign: self.mdmf_node.get_best_readable_version())
+    def _do_partial_read(self, version, name, expected, offset, length):
         c = consumer.MemoryConsumer()
-        d.addCallback(lambda version:
-            version.read(c, offset, length))
-        expected = self.data[offset:offset+length]
+        d = version.read(c, offset, length)
+        if length is None:
+            expected_range = expected[offset:]
+        else:
+            expected_range = expected[offset:offset+length]
         d.addCallback(lambda ignored: "".join(c.chunks))
         def _check(results):
-            if results != expected:
-                print
+            if results != expected_range:
+                print "read([%d]+%s) got %d bytes, not %d" % \
+                      (offset, length, len(results), len(expected_range))
                 print "got: %s ... %s" % (results[:20], results[-20:])
-                print "exp: %s ... %s" % (expected[:20], expected[-20:])
-                self.fail("results != expected")
+                print "exp: %s ... %s" % (expected_range[:20], expected_range[-20:])
+                self.fail("results[%s] != expected_range" % name)
+            return version # daisy-chained to next call
         d.addCallback(_check)
         return d
 
-    def test_partial_read_ending_on_segment_boundary(self):
-        d = self.mdmf_node.get_best_readable_version()
-        c = consumer.MemoryConsumer()
-        offset = mathutil.next_multiple(128 * 1024, 3)
-        start = offset - 50
-        d.addCallback(lambda version:
-            version.read(c, start, 51))
-        expected = self.data[offset-50:offset+1]
-        d.addCallback(lambda ignored:
-            self.failUnlessEqual(expected, "".join(c.chunks)))
-        return d
-
-    def test_read(self):
+    def test_partial_read_mdmf_0(self):
+        data = ""
+        d = self.do_upload_mdmf(data=data)
+        modes = [("all1",    0,0),
+                 ("all2",    0,None),
+                 ]
+        d.addCallback(self._test_partial_read, data, modes, 1)
+        return d
+
+    def test_partial_read_mdmf_large(self):
+        segment_boundary = mathutil.next_multiple(128 * 1024, 3)
+        modes = [("start_on_segment_boundary",              segment_boundary, 50),
+                 ("ending_one_byte_after_segment_boundary", segment_boundary-50, 51),
+                 ("zero_length_at_start",                   0, 0),
+                 ("zero_length_in_middle",                  50, 0),
+                 ("zero_length_at_segment_boundary",        segment_boundary, 0),
+                 ("complete_file1",                         0, len(self.data)),
+                 ("complete_file2",                         0, None),
+                 ]
         d = self.do_upload_mdmf()
-        d.addCallback(lambda ign: self.mdmf_node.get_best_readable_version())
+        d.addCallback(self._test_partial_read, self.data, modes, 10000)
+        return d
+
+    def test_partial_read_sdmf_0(self):
+        data = ""
+        modes = [("all1",    0,0),
+                 ("all2",    0,None),
+                 ]
+        d = self.do_upload_sdmf(data=data)
+        d.addCallback(self._test_partial_read, data, modes, 1)
+        return d
+
+    def test_partial_read_sdmf_2(self):
+        data = "hi"
+        modes = [("one_byte",                  0, 1),
+                 ("last_byte",                 1, 1),
+                 ("last_byte2",                1, None),
+                 ("complete_file",             0, 2),
+                 ("complete_file2",            0, None),
+                 ]
+        d = self.do_upload_sdmf(data=data)
+        d.addCallback(self._test_partial_read, data, modes, 1)
+        return d
+
+    def test_partial_read_sdmf_90(self):
+        modes = [("start_at_middle",           50, 40),
+                 ("start_at_middle2",          50, None),
+                 ("zero_length_at_start",      0, 0),
+                 ("zero_length_in_middle",     50, 0),
+                 ("zero_length_at_end",        90, 0),
+                 ("complete_file1",            0, None),
+                 ("complete_file2",            0, 90),
+                 ]
+        d = self.do_upload_sdmf()
+        d.addCallback(self._test_partial_read, self.small_data, modes, 10)
+        return d
+
+    def test_partial_read_sdmf_100(self):
+        data = "test data "*10
+        modes = [("start_at_middle",           50, 50),
+                 ("start_at_middle2",          50, None),
+                 ("zero_length_at_start",      0, 0),
+                 ("zero_length_in_middle",     50, 0),
+                 ("complete_file1",            0, 100),
+                 ("complete_file2",            0, None),
+                 ]
+        d = self.do_upload_sdmf(data=data)
+        d.addCallback(self._test_partial_read, data, modes, 10)
+        return d
+
+
+    def _test_read_and_download(self, node, expected):
+        d = node.get_best_readable_version()
         def _read_data(version):
             c = consumer.MemoryConsumer()
+            c2 = consumer.MemoryConsumer()
             d2 = defer.succeed(None)
             d2.addCallback(lambda ignored: version.read(c))
             d2.addCallback(lambda ignored:
-                self.failUnlessEqual("".join(c.chunks), self.data))
+                self.failUnlessEqual(expected, "".join(c.chunks)))
+
+            d2.addCallback(lambda ignored: version.read(c2, offset=0,
+                                                        size=len(expected)))
+            d2.addCallback(lambda ignored:
+                self.failUnlessEqual(expected, "".join(c2.chunks)))
             return d2
         d.addCallback(_read_data)
+        d.addCallback(lambda ignored: node.download_best_version())
+        d.addCallback(lambda data: self.failUnlessEqual(expected, data))
         return d
 
-    def test_download_best_version(self):
-        d = self.do_upload()
-        d.addCallback(lambda ign: self.mdmf_node.download_best_version())
-        d.addCallback(lambda data:
-            self.failUnlessEqual(data, self.data))
-        d.addCallback(lambda ignored:
-            self.sdmf_node.download_best_version())
-        d.addCallback(lambda data:
-            self.failUnlessEqual(data, self.small_data))
+    def test_read_and_download_mdmf(self):
+        d = self.do_upload_mdmf()
+        d.addCallback(self._test_read_and_download, self.data)
+        return d
+
+    def test_read_and_download_sdmf(self):
+        d = self.do_upload_sdmf()
+        d.addCallback(self._test_read_and_download, self.small_data)
+        return d
+
+    def test_read_and_download_sdmf_zero_length(self):
+        d = self.do_upload_empty_sdmf()
+        d.addCallback(self._test_read_and_download, "")
         return d
 
 
@@ -3272,63 +3515,70 @@ class Update(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
     def setUp(self):
         GridTestMixin.setUp(self)
         self.basedir = self.mktemp()
-        self.set_up_grid()
+        self.set_up_grid(num_servers=13)
         self.c = self.g.clients[0]
         self.nm = self.c.nodemaker
         self.data = "testdata " * 100000 # about 900 KiB; MDMF
-        self.small_data = "test data" * 10 # about 90 B; SDMF
-        return self.do_upload()
+        self.small_data = "test data" * 10 # 90 B; SDMF
 
 
-    def do_upload(self):
-        d1 = self.nm.create_mutable_file(MutableData(self.data),
-                                         version=MDMF_VERSION)
-        d2 = self.nm.create_mutable_file(MutableData(self.small_data))
-        dl = gatherResults([d1, d2])
-        def _then((n1, n2)):
-            assert isinstance(n1, MutableFileNode)
-            assert isinstance(n2, MutableFileNode)
-
-            self.mdmf_node = n1
-            self.sdmf_node = n2
-        dl.addCallback(_then)
-        # Make SDMF and MDMF mutable file nodes that have 255 shares.
-        def _make_max_shares(ign):
+    def do_upload_sdmf(self):
+        d = self.nm.create_mutable_file(MutableData(self.small_data))
+        def _then(n):
+            assert isinstance(n, MutableFileNode)
+            self.sdmf_node = n
+            # Make SDMF node that has 255 shares.
             self.nm.default_encoding_parameters['n'] = 255
             self.nm.default_encoding_parameters['k'] = 127
-            d1 = self.nm.create_mutable_file(MutableData(self.data),
-                                             version=MDMF_VERSION)
-            d2 = \
-                self.nm.create_mutable_file(MutableData(self.small_data))
-            return gatherResults([d1, d2])
-        dl.addCallback(_make_max_shares)
-        def _stash((n1, n2)):
-            assert isinstance(n1, MutableFileNode)
-            assert isinstance(n2, MutableFileNode)
-
-            self.mdmf_max_shares_node = n1
-            self.sdmf_max_shares_node = n2
-        dl.addCallback(_stash)
-        return dl
+            return self.nm.create_mutable_file(MutableData(self.small_data))
+        d.addCallback(_then)
+        def _then2(n):
+            assert isinstance(n, MutableFileNode)
+            self.sdmf_max_shares_node = n
+        d.addCallback(_then2)
+        return d
 
+    def do_upload_mdmf(self):
+        d = self.nm.create_mutable_file(MutableData(self.data),
+                                        version=MDMF_VERSION)
+        def _then(n):
+            assert isinstance(n, MutableFileNode)
+            self.mdmf_node = n
+            # Make MDMF node that has 255 shares.
+            self.nm.default_encoding_parameters['n'] = 255
+            self.nm.default_encoding_parameters['k'] = 127
+            return self.nm.create_mutable_file(MutableData(self.data),
+                                               version=MDMF_VERSION)
+        d.addCallback(_then)
+        def _then2(n):
+            assert isinstance(n, MutableFileNode)
+            self.mdmf_max_shares_node = n
+        d.addCallback(_then2)
+        return d
 
     def _test_replace(self, offset, new_data):
         expected = self.data[:offset]+new_data+self.data[offset+len(new_data):]
-        for node in (self.mdmf_node, self.mdmf_max_shares_node):
-            d = node.get_best_mutable_version()
-            d.addCallback(lambda mv:
-                mv.update(MutableData(new_data), offset))
-            # close around node.
-            d.addCallback(lambda ignored, node=node:
-                node.download_best_version())
-            def _check(results):
-                if results != expected:
-                    print
-                    print "got: %s ... %s" % (results[:20], results[-20:])
-                    print "exp: %s ... %s" % (expected[:20], expected[-20:])
-                    self.fail("results != expected")
-            d.addCallback(_check)
-        return d
+        d0 = self.do_upload_mdmf()
+        def _run(ign):
+            d = defer.succeed(None)
+            for node in (self.mdmf_node, self.mdmf_max_shares_node):
+                # close over 'node'.
+                d.addCallback(lambda ign, node=node:
+                              node.get_best_mutable_version())
+                d.addCallback(lambda mv:
+                              mv.update(MutableData(new_data), offset))
+                d.addCallback(lambda ign, node=node:
+                              node.download_best_version())
+                def _check(results):
+                    if results != expected:
+                        print
+                        print "got: %s ... %s" % (results[:20], results[-20:])
+                        print "exp: %s ... %s" % (expected[:20], expected[-20:])
+                        self.fail("results != expected")
+                d.addCallback(_check)
+            return d
+        d0.addCallback(_run)
+        return d0
 
     def test_append(self):
         # We should be able to append data to a mutable file and get
@@ -3397,47 +3647,55 @@ class Update(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
 
     def test_replace_locations(self):
         # exercise fencepost conditions
-        expected = self.data
         SEGSIZE = 128*1024
         suspects = range(SEGSIZE-3, SEGSIZE+1)+range(2*SEGSIZE-3, 2*SEGSIZE+1)
         letters = iter("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
-        d = defer.succeed(None)
-        for offset in suspects:
-            new_data = letters.next()*2 # "AA", then "BB", etc
-            expected = expected[:offset]+new_data+expected[offset+2:]
-            d.addCallback(lambda ign:
-                          self.mdmf_node.get_best_mutable_version())
-            def _modify(mv, offset=offset, new_data=new_data):
-                # close over 'offset','new_data'
-                md = MutableData(new_data)
-                return mv.update(md, offset)
-            d.addCallback(_modify)
-            d.addCallback(lambda ignored:
-                          self.mdmf_node.download_best_version())
-            d.addCallback(self._check_differences, expected)
-        return d
+        d0 = self.do_upload_mdmf()
+        def _run(ign):
+            expected = self.data
+            d = defer.succeed(None)
+            for offset in suspects:
+                new_data = letters.next()*2 # "AA", then "BB", etc
+                expected = expected[:offset]+new_data+expected[offset+2:]
+                d.addCallback(lambda ign:
+                              self.mdmf_node.get_best_mutable_version())
+                def _modify(mv, offset=offset, new_data=new_data):
+                    # close over 'offset','new_data'
+                    md = MutableData(new_data)
+                    return mv.update(md, offset)
+                d.addCallback(_modify)
+                d.addCallback(lambda ignored:
+                              self.mdmf_node.download_best_version())
+                d.addCallback(self._check_differences, expected)
+            return d
+        d0.addCallback(_run)
+        return d0
 
     def test_replace_locations_max_shares(self):
         # exercise fencepost conditions
-        expected = self.data
         SEGSIZE = 128*1024
         suspects = range(SEGSIZE-3, SEGSIZE+1)+range(2*SEGSIZE-3, 2*SEGSIZE+1)
         letters = iter("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
-        d = defer.succeed(None)
-        for offset in suspects:
-            new_data = letters.next()*2 # "AA", then "BB", etc
-            expected = expected[:offset]+new_data+expected[offset+2:]
-            d.addCallback(lambda ign:
-                          self.mdmf_max_shares_node.get_best_mutable_version())
-            def _modify(mv, offset=offset, new_data=new_data):
-                # close over 'offset','new_data'
-                md = MutableData(new_data)
-                return mv.update(md, offset)
-            d.addCallback(_modify)
-            d.addCallback(lambda ignored:
-                          self.mdmf_max_shares_node.download_best_version())
-            d.addCallback(self._check_differences, expected)
-        return d
+        d0 = self.do_upload_mdmf()
+        def _run(ign):
+            expected = self.data
+            d = defer.succeed(None)
+            for offset in suspects:
+                new_data = letters.next()*2 # "AA", then "BB", etc
+                expected = expected[:offset]+new_data+expected[offset+2:]
+                d.addCallback(lambda ign:
+                              self.mdmf_max_shares_node.get_best_mutable_version())
+                def _modify(mv, offset=offset, new_data=new_data):
+                    # close over 'offset','new_data'
+                    md = MutableData(new_data)
+                    return mv.update(md, offset)
+                d.addCallback(_modify)
+                d.addCallback(lambda ignored:
+                              self.mdmf_max_shares_node.download_best_version())
+                d.addCallback(self._check_differences, expected)
+            return d
+        d0.addCallback(_run)
+        return d0
 
 
     def test_append_power_of_two(self):
@@ -3451,29 +3709,42 @@ class Update(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
         # power-of-two boundary.
         segment = "a" * DEFAULT_MAX_SEGMENT_SIZE
         new_data = self.data + (segment * 2)
-        for node in (self.mdmf_node, self.mdmf_max_shares_node):
-            d = node.get_best_mutable_version()
-            d.addCallback(lambda mv:
-                mv.update(MutableData(segment * 2), len(self.data)))
-            d.addCallback(lambda ignored, node=node:
-                node.download_best_version())
-            d.addCallback(lambda results:
-                self.failUnlessEqual(results, new_data))
-        return d
-
+        d0 = self.do_upload_mdmf()
+        def _run(ign):
+            d = defer.succeed(None)
+            for node in (self.mdmf_node, self.mdmf_max_shares_node):
+                # close over 'node'.
+                d.addCallback(lambda ign, node=node:
+                              node.get_best_mutable_version())
+                d.addCallback(lambda mv:
+                              mv.update(MutableData(segment * 2), len(self.data)))
+                d.addCallback(lambda ign, node=node:
+                              node.download_best_version())
+                d.addCallback(lambda results:
+                              self.failUnlessEqual(results, new_data))
+            return d
+        d0.addCallback(_run)
+        return d0
 
     def test_update_sdmf(self):
         # Running update on a single-segment file should still work.
         new_data = self.small_data + "appended"
-        for node in (self.sdmf_node, self.sdmf_max_shares_node):
-            d = node.get_best_mutable_version()
-            d.addCallback(lambda mv:
-                mv.update(MutableData("appended"), len(self.small_data)))
-            d.addCallback(lambda ignored, node=node:
-                node.download_best_version())
-            d.addCallback(lambda results:
-                self.failUnlessEqual(results, new_data))
-        return d
+        d0 = self.do_upload_sdmf()
+        def _run(ign):
+            d = defer.succeed(None)
+            for node in (self.sdmf_node, self.sdmf_max_shares_node):
+                # close over 'node'.
+                d.addCallback(lambda ign, node=node:
+                              node.get_best_mutable_version())
+                d.addCallback(lambda mv:
+                              mv.update(MutableData("appended"), len(self.small_data)))
+                d.addCallback(lambda ign, node=node:
+                              node.download_best_version())
+                d.addCallback(lambda results:
+                              self.failUnlessEqual(results, new_data))
+            return d
+        d0.addCallback(_run)
+        return d0
 
     def test_replace_in_last_segment(self):
         # The wrapper should know how to handle the tail segment
@@ -3482,16 +3753,22 @@ class Update(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
         new_data = self.data[:replace_offset] + "replaced"
         rest_offset = replace_offset + len("replaced")
         new_data += self.data[rest_offset:]
-        for node in (self.mdmf_node, self.mdmf_max_shares_node):
-            d = node.get_best_mutable_version()
-            d.addCallback(lambda mv:
-                mv.update(MutableData("replaced"), replace_offset))
-            d.addCallback(lambda ignored, node=node:
-                node.download_best_version())
-            d.addCallback(lambda results:
-                self.failUnlessEqual(results, new_data))
-        return d
-
+        d0 = self.do_upload_mdmf()
+        def _run(ign):
+            d = defer.succeed(None)
+            for node in (self.mdmf_node, self.mdmf_max_shares_node):
+                # close over 'node'.
+                d.addCallback(lambda ign, node=node:
+                              node.get_best_mutable_version())
+                d.addCallback(lambda mv:
+                              mv.update(MutableData("replaced"), replace_offset))
+                d.addCallback(lambda ign, node=node:
+                              node.download_best_version())
+                d.addCallback(lambda results:
+                              self.failUnlessEqual(results, new_data))
+            return d
+        d0.addCallback(_run)
+        return d0
 
     def test_multiple_segment_replace(self):
         replace_offset = 2 * DEFAULT_MAX_SEGMENT_SIZE
@@ -3501,16 +3778,23 @@ class Update(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
         new_data += "replaced"
         rest_offset = len(new_data)
         new_data += self.data[rest_offset:]
-        for node in (self.mdmf_node, self.mdmf_max_shares_node):
-            d = node.get_best_mutable_version()
-            d.addCallback(lambda mv:
-                mv.update(MutableData((2 * new_segment) + "replaced"),
-                          replace_offset))
-            d.addCallback(lambda ignored, node=node:
-                node.download_best_version())
-            d.addCallback(lambda results:
-                self.failUnlessEqual(results, new_data))
-        return d
+        d0 = self.do_upload_mdmf()
+        def _run(ign):
+            d = defer.succeed(None)
+            for node in (self.mdmf_node, self.mdmf_max_shares_node):
+                # close over 'node'.
+                d.addCallback(lambda ign, node=node:
+                              node.get_best_mutable_version())
+                d.addCallback(lambda mv:
+                              mv.update(MutableData((2 * new_segment) + "replaced"),
+                                        replace_offset))
+                d.addCallback(lambda ignored, node=node:
+                              node.download_best_version())
+                d.addCallback(lambda results:
+                              self.failUnlessEqual(results, new_data))
+            return d
+        d0.addCallback(_run)
+        return d0
 
 class Interoperability(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
     sdmf_old_shares = {}