]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/common.py
test/common.py: fix race condition waiting for the helper connection
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / common.py
index 9f37073a3ccddc292a5cc85831606dec91048abc..e9103a2a1365925366622412011f395faf1d29d9 100644 (file)
@@ -6,15 +6,17 @@ from twisted.python import failure
 from twisted.application import service
 from twisted.web.error import Error as WebError
 from foolscap.api import flushEventualQueue, fireEventually
-from allmydata import uri, dirnode, client
+from allmydata import uri, client
 from allmydata.introducer.server import IntroducerNode
-from allmydata.interfaces import IMutableFileNode, IImmutableFileNode, \
-     FileTooLargeError, NotEnoughSharesError, ICheckable
+from allmydata.interfaces import IMutableFileNode, IImmutableFileNode,\
+                                 NotEnoughSharesError, ICheckable, \
+                                 IMutableUploadable, SDMF_VERSION, \
+                                 MDMF_VERSION
 from allmydata.check_results import CheckResults, CheckAndRepairResults, \
      DeepCheckResults, DeepCheckAndRepairResults
-from allmydata.mutable.common import CorruptShareError
+from allmydata.storage_client import StubServer
 from allmydata.mutable.layout import unpack_header
-from allmydata.storage.server import storage_index_to_dir
+from allmydata.mutable.publish import MutableData
 from allmydata.storage.mutable import MutableShareFile
 from allmydata.util import hashutil, log, fileutil, pollmixin
 from allmydata.util.assertutil import precondition
@@ -24,6 +26,7 @@ from allmydata.key_generator import KeyGeneratorService
 import allmydata.test.common_util as testutil
 from allmydata import immutable
 
+TEST_RSA_KEY_SIZE = 522
 
 def flush_but_dont_ignore(res):
     d = flushEventualQueue()
@@ -41,11 +44,10 @@ class FakeCHKFileNode:
     """I provide IImmutableFileNode, but all of my data is stored in a
     class-level dictionary."""
     implements(IImmutableFileNode)
-    all_contents = {}
-    bad_shares = {}
 
-    def __init__(self, filecap):
+    def __init__(self, filecap, all_contents):
         precondition(isinstance(filecap, (uri.CHKFileURI, uri.LiteralFileURI)), filecap)
+        self.all_contents = all_contents
         self.my_uri = filecap
         self.storage_index = self.my_uri.get_storage_index()
 
@@ -65,32 +67,27 @@ class FakeCHKFileNode:
         return self.storage_index
 
     def check(self, monitor, verify=False, add_lease=False):
-        r = CheckResults(self.my_uri, self.storage_index)
-        is_bad = self.bad_shares.get(self.storage_index, None)
-        data = {}
-        data["count-shares-needed"] = 3
-        data["count-shares-expected"] = 10
-        data["count-good-share-hosts"] = 10
-        data["count-wrong-shares"] = 0
-        nodeid = "\x00"*20
-        data["list-corrupt-shares"] = []
-        data["sharemap"] = {1: [nodeid]}
-        data["servers-responding"] = [nodeid]
-        data["count-recoverable-versions"] = 1
-        data["count-unrecoverable-versions"] = 0
-        if is_bad:
-            r.set_healthy(False)
-            r.set_recoverable(True)
-            data["count-shares-good"] = 9
-            data["list-corrupt-shares"] = [(nodeid, self.storage_index, 0)]
-            r.problems = failure.Failure(CorruptShareError(is_bad))
-        else:
-            r.set_healthy(True)
-            r.set_recoverable(True)
-            data["count-shares-good"] = 10
-            r.problems = []
-        r.set_data(data)
-        r.set_needs_rebalancing(False)
+        s = StubServer("\x00"*20)
+        r = CheckResults(self.my_uri, self.storage_index,
+                         healthy=True, recoverable=True,
+                         needs_rebalancing=False,
+                         count_shares_needed=3,
+                         count_shares_expected=10,
+                         count_shares_good=10,
+                         count_good_share_hosts=10,
+                         count_recoverable_versions=1,
+                         count_unrecoverable_versions=0,
+                         servers_responding=[s],
+                         sharemap={1: [s]},
+                         count_wrong_shares=0,
+                         list_corrupt_shares=[],
+                         count_corrupt_shares=0,
+                         list_incompatible_shares=[],
+                         count_incompatible_shares=0,
+                         summary="",
+                         report=[],
+                         share_problems=[],
+                         servermap=None)
         return defer.succeed(r)
     def check_and_repair(self, monitor, verify=False, add_lease=False):
         d = self.check(verify)
@@ -120,6 +117,8 @@ class FakeCHKFileNode:
         except KeyError, le:
             raise NotEnoughSharesError(le, 0, 3)
         return len(data)
+    def get_current_size(self):
+        return defer.succeed(self.get_size())
 
     def read(self, consumer, offset=0, size=None):
         # we don't bother to call registerProducer/unregisterProducer,
@@ -147,6 +146,22 @@ class FakeCHKFileNode:
         consumer.write(data[start:end])
         return consumer
 
+
+    def get_best_readable_version(self):
+        return defer.succeed(self)
+
+
+    def download_to_data(self):
+        return download_to_data(self)
+
+
+    download_best_version = download_to_data
+
+
+    def get_size_of_best_version(self):
+        return defer.succeed(self.get_size)
+
+
 def make_chk_file_cap(size):
     return uri.CHKFileURI(key=os.urandom(16),
                           uri_extension_hash=os.urandom(32),
@@ -156,10 +171,10 @@ def make_chk_file_cap(size):
 def make_chk_file_uri(size):
     return make_chk_file_cap(size).to_string()
 
-def create_chk_filenode(contents):
+def create_chk_filenode(contents, all_contents):
     filecap = make_chk_file_cap(len(contents))
-    n = FakeCHKFileNode(filecap)
-    FakeCHKFileNode.all_contents[filecap.to_string()] = contents
+    n = FakeCHKFileNode(filecap, all_contents)
+    all_contents[filecap.to_string()] = contents
     return n
 
 
@@ -169,33 +184,50 @@ class FakeMutableFileNode:
 
     implements(IMutableFileNode, ICheckable)
     MUTABLE_SIZELIMIT = 10000
-    all_contents = {}
-    bad_shares = {}
 
     def __init__(self, storage_broker, secret_holder,
-                 default_encoding_parameters, history):
+                 default_encoding_parameters, history, all_contents):
+        self.all_contents = all_contents
+        self.file_types = {} # storage index => MDMF_VERSION or SDMF_VERSION
         self.init_from_cap(make_mutable_file_cap())
-    def create(self, contents, key_generator=None, keysize=None):
+        self._k = default_encoding_parameters['k']
+        self._segsize = default_encoding_parameters['max_segment_size']
+    def create(self, contents, key_generator=None, keysize=None,
+               version=SDMF_VERSION):
+        if version == MDMF_VERSION and \
+            isinstance(self.my_uri, (uri.ReadonlySSKFileURI,
+                                 uri.WriteableSSKFileURI)):
+            self.init_from_cap(make_mdmf_mutable_file_cap())
+        self.file_types[self.storage_index] = version
         initial_contents = self._get_initial_contents(contents)
-        if len(initial_contents) > self.MUTABLE_SIZELIMIT:
-            raise FileTooLargeError("SDMF is limited to one segment, and "
-                                    "%d > %d" % (len(initial_contents),
-                                                 self.MUTABLE_SIZELIMIT))
-        self.all_contents[self.storage_index] = initial_contents
+        data = initial_contents.read(initial_contents.get_size())
+        data = "".join(data)
+        self.all_contents[self.storage_index] = data
         return defer.succeed(self)
     def _get_initial_contents(self, contents):
-        if isinstance(contents, str):
-            return contents
         if contents is None:
-            return ""
+            return MutableData("")
+
+        if IMutableUploadable.providedBy(contents):
+            return contents
+
         assert callable(contents), "%s should be callable, not %s" % \
                (contents, type(contents))
         return contents(self)
     def init_from_cap(self, filecap):
         assert isinstance(filecap, (uri.WriteableSSKFileURI,
-                                    uri.ReadonlySSKFileURI))
+                                    uri.ReadonlySSKFileURI,
+                                    uri.WriteableMDMFFileURI,
+                                    uri.ReadonlyMDMFFileURI))
         self.my_uri = filecap
         self.storage_index = self.my_uri.get_storage_index()
+        if isinstance(filecap, (uri.WriteableMDMFFileURI,
+                                uri.ReadonlyMDMFFileURI)):
+            self.file_types[self.storage_index] = MDMF_VERSION
+
+        else:
+            self.file_types[self.storage_index] = SDMF_VERSION
+
         return self
     def get_cap(self):
         return self.my_uri
@@ -213,6 +245,10 @@ class FakeMutableFileNode:
         return self.my_uri.get_readonly().to_string()
     def get_verify_cap(self):
         return self.my_uri.get_verify_cap()
+    def get_repair_cap(self):
+        if self.my_uri.is_readonly():
+            return None
+        return self.my_uri
     def is_readonly(self):
         return self.my_uri.is_readonly()
     def is_mutable(self):
@@ -235,34 +271,35 @@ class FakeMutableFileNode:
     def get_storage_index(self):
         return self.storage_index
 
+    def get_servermap(self, mode):
+        return defer.succeed(None)
+
+    def get_version(self):
+        assert self.storage_index in self.file_types
+        return self.file_types[self.storage_index]
+
     def check(self, monitor, verify=False, add_lease=False):
-        r = CheckResults(self.my_uri, self.storage_index)
-        is_bad = self.bad_shares.get(self.storage_index, None)
-        data = {}
-        data["count-shares-needed"] = 3
-        data["count-shares-expected"] = 10
-        data["count-good-share-hosts"] = 10
-        data["count-wrong-shares"] = 0
-        data["list-corrupt-shares"] = []
-        nodeid = "\x00"*20
-        data["sharemap"] = {"seq1-abcd-sh0": [nodeid]}
-        data["servers-responding"] = [nodeid]
-        data["count-recoverable-versions"] = 1
-        data["count-unrecoverable-versions"] = 0
-        if is_bad:
-            r.set_healthy(False)
-            r.set_recoverable(True)
-            data["count-shares-good"] = 9
-            r.problems = failure.Failure(CorruptShareError("peerid",
-                                                           0, # shnum
-                                                           is_bad))
-        else:
-            r.set_healthy(True)
-            r.set_recoverable(True)
-            data["count-shares-good"] = 10
-            r.problems = []
-        r.set_data(data)
-        r.set_needs_rebalancing(False)
+        s = StubServer("\x00"*20)
+        r = CheckResults(self.my_uri, self.storage_index,
+                         healthy=True, recoverable=True,
+                         needs_rebalancing=False,
+                         count_shares_needed=3,
+                         count_shares_expected=10,
+                         count_shares_good=10,
+                         count_good_share_hosts=10,
+                         count_recoverable_versions=1,
+                         count_unrecoverable_versions=0,
+                         servers_responding=[s],
+                         sharemap={"seq1-abcd-sh0": [s]},
+                         count_wrong_shares=0,
+                         list_corrupt_shares=[],
+                         count_corrupt_shares=0,
+                         list_incompatible_shares=[],
+                         count_incompatible_shares=0,
+                         summary="",
+                         report=[],
+                         share_problems=[],
+                         servermap=None)
         return defer.succeed(r)
 
     def check_and_repair(self, monitor, verify=False, add_lease=False):
@@ -293,19 +330,22 @@ class FakeMutableFileNode:
         return d
 
     def download_best_version(self):
+        return defer.succeed(self._download_best_version())
+
+
+    def _download_best_version(self, ignored=None):
         if isinstance(self.my_uri, uri.LiteralFileURI):
-            return defer.succeed(self.my_uri.data)
+            return self.my_uri.data
         if self.storage_index not in self.all_contents:
-            return defer.fail(NotEnoughSharesError(None, 0, 3))
-        return defer.succeed(self.all_contents[self.storage_index])
+            raise NotEnoughSharesError(None, 0, 3)
+        return self.all_contents[self.storage_index]
+
 
     def overwrite(self, new_contents):
-        if len(new_contents) > self.MUTABLE_SIZELIMIT:
-            raise FileTooLargeError("SDMF is limited to one segment, and "
-                                    "%d > %d" % (len(new_contents),
-                                                 self.MUTABLE_SIZELIMIT))
         assert not self.is_readonly()
-        self.all_contents[self.storage_index] = new_contents
+        new_data = new_contents.read(new_contents.get_size())
+        new_data = "".join(new_data)
+        self.all_contents[self.storage_index] = new_data
         return defer.succeed(None)
     def modify(self, modifier):
         # this does not implement FileTooLargeError, but the real one does
@@ -313,32 +353,87 @@ class FakeMutableFileNode:
     def _modify(self, modifier):
         assert not self.is_readonly()
         old_contents = self.all_contents[self.storage_index]
-        self.all_contents[self.storage_index] = modifier(old_contents, None, True)
+        new_data = modifier(old_contents, None, True)
+        self.all_contents[self.storage_index] = new_data
         return None
 
+    # As actually implemented, MutableFilenode and MutableFileVersion
+    # are distinct. However, nothing in the webapi uses (yet) that
+    # distinction -- it just uses the unified download interface
+    # provided by get_best_readable_version and read. When we start
+    # doing cooler things like LDMF, we will want to revise this code to
+    # be less simplistic.
+    def get_best_readable_version(self):
+        return defer.succeed(self)
+
+
+    def get_best_mutable_version(self):
+        return defer.succeed(self)
+
+    # Ditto for this, which is an implementation of IWriteable.
+    # XXX: Declare that the same is implemented.
+    def update(self, data, offset):
+        assert not self.is_readonly()
+        def modifier(old, servermap, first_time):
+            new = old[:offset] + "".join(data.read(data.get_size()))
+            new += old[len(new):]
+            return new
+        return self.modify(modifier)
+
+
+    def read(self, consumer, offset=0, size=None):
+        data = self._download_best_version()
+        if size:
+            data = data[offset:offset+size]
+        consumer.write(data)
+        return defer.succeed(consumer)
+
+
 def make_mutable_file_cap():
     return uri.WriteableSSKFileURI(writekey=os.urandom(16),
                                    fingerprint=os.urandom(32))
-def make_mutable_file_uri():
-    return make_mutable_file_cap().to_string()
+
+def make_mdmf_mutable_file_cap():
+    return uri.WriteableMDMFFileURI(writekey=os.urandom(16),
+                                   fingerprint=os.urandom(32))
+
+def make_mutable_file_uri(mdmf=False):
+    if mdmf:
+        uri = make_mdmf_mutable_file_cap()
+    else:
+        uri = make_mutable_file_cap()
+
+    return uri.to_string()
 
 def make_verifier_uri():
     return uri.SSKVerifierURI(storage_index=os.urandom(16),
                               fingerprint=os.urandom(32)).to_string()
 
-class FakeDirectoryNode(dirnode.DirectoryNode):
-    """This offers IDirectoryNode, but uses a FakeMutableFileNode for the
-    backing store, so it doesn't go to the grid. The child data is still
-    encrypted and serialized, so this isn't useful for tests that want to
-    look inside the dirnodes and check their contents.
-    """
-    filenode_class = FakeMutableFileNode
+def create_mutable_filenode(contents, mdmf=False, all_contents=None):
+    # XXX: All of these arguments are kind of stupid. 
+    if mdmf:
+        cap = make_mdmf_mutable_file_cap()
+    else:
+        cap = make_mutable_file_cap()
+
+    encoding_params = {}
+    encoding_params['k'] = 3
+    encoding_params['max_segment_size'] = 128*1024
+
+    filenode = FakeMutableFileNode(None, None, encoding_params, None,
+                                   all_contents)
+    filenode.init_from_cap(cap)
+    if mdmf:
+        filenode.create(MutableData(contents), version=MDMF_VERSION)
+    else:
+        filenode.create(MutableData(contents), version=SDMF_VERSION)
+    return filenode
+
 
 class LoggingServiceParent(service.MultiService):
     def log(self, *args, **kwargs):
         return log.msg(*args, **kwargs)
 
-
 class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
 
     # SystemTestMixin tests tend to be a lot of work, and we have a few
@@ -375,9 +470,9 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
         iv_dir = self.getdir("introducer")
         if not os.path.isdir(iv_dir):
             fileutil.make_dirs(iv_dir)
-            f = open(os.path.join(iv_dir, "webport"), "w")
-            f.write("tcp:0:interface=127.0.0.1\n")
-            f.close()
+            fileutil.write(os.path.join(iv_dir, 'tahoe.cfg'), \
+                               "[node]\n" + \
+                               "web.port = tcp:0:interface=127.0.0.1\n")
             if SYSTEM_TEST_CERTS:
                 os.mkdir(os.path.join(iv_dir, "private"))
                 f = open(os.path.join(iv_dir, "private", "node.pem"), "w")
@@ -424,7 +519,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
 
         self.key_generator_svc = KeyGeneratorService(kgsdir,
                                                      display_furl=False,
-                                                     default_key_size=522)
+                                                     default_key_size=TEST_RSA_KEY_SIZE)
         self.key_generator_svc.key_generator.pool_size = 4
         self.key_generator_svc.key_generator.pool_refresh_delay = 60
         self.add_service(self.key_generator_svc)
@@ -453,24 +548,28 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
                 f.write(SYSTEM_TEST_CERTS[i+1])
                 f.close()
 
-            def write(name, value):
-                open(os.path.join(basedir, name), "w").write(value+"\n")
+            config = "[client]\n"
+            config += "introducer.furl = %s\n" % self.introducer_furl
+            if self.stats_gatherer_furl:
+                config += "stats_gatherer.furl = %s\n" % self.stats_gatherer_furl
+
             if i == 0:
                 # clients[0] runs a webserver and a helper, no key_generator
-                write("webport", "tcp:0:interface=127.0.0.1")
-                write("run_helper", "yes")
-                write("keepalive_timeout", "600")
+                config += "[node]\n"
+                config += "web.port = tcp:0:interface=127.0.0.1\n"
+                config += "timeout.keepalive = 600\n"
+                config += "[helper]\n"
+                config += "enabled = True\n"
             if i == 3:
                 # clients[3] runs a webserver and uses a helper, uses
                 # key_generator
-                write("webport", "tcp:0:interface=127.0.0.1")
-                write("disconnect_timeout", "1800")
                 if self.key_generator_furl:
-                    kgf = "%s\n" % (self.key_generator_furl,)
-                    write("key_generator.furl", kgf)
-            write("introducer.furl", self.introducer_furl)
-            if self.stats_gatherer_furl:
-                write("stats_gatherer.furl", self.stats_gatherer_furl)
+                    config += "key_generator.furl = %s\n" % self.key_generator_furl
+                config += "[node]\n"
+                config += "web.port = tcp:0:interface=127.0.0.1\n"
+                config += "timeout.disconnect = 1800\n"
+
+            fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config)
 
         # give subclasses a chance to append lines to the node's tahoe.cfg
         # files before they are launched.
@@ -480,7 +579,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
         # will have registered the helper furl).
         c = self.add_service(client.Client(basedir=basedirs[0]))
         self.clients.append(c)
-        c.set_default_mutable_keysize(522)
+        c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
         d = c.when_tub_ready()
         def _ready(res):
             f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
@@ -488,29 +587,27 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
             f.close()
             self.helper_furl = helper_furl
             if self.numclients >= 4:
-                f = open(os.path.join(basedirs[3],"helper.furl"), "w")
-                f.write(helper_furl)
+                f = open(os.path.join(basedirs[3], 'tahoe.cfg'), 'ab+')
+                f.write(
+                      "[client]\n"
+                      "helper.furl = %s\n" % helper_furl)
                 f.close()
 
             # this starts the rest of the clients
             for i in range(1, self.numclients):
                 c = self.add_service(client.Client(basedir=basedirs[i]))
                 self.clients.append(c)
-                c.set_default_mutable_keysize(522)
+                c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
             log.msg("STARTING")
             return self.wait_for_connections()
         d.addCallback(_ready)
         def _connected(res):
             log.msg("CONNECTED")
             # now find out where the web port was
-            l = self.clients[0].getServiceNamed("webish").listener
-            port = l._port.getHost().port
-            self.webish_url = "http://localhost:%d/" % port
+            self.webish_url = self.clients[0].getServiceNamed("webish").getURL()
             if self.numclients >=4:
                 # and the helper-using webport
-                l = self.clients[3].getServiceNamed("webish").listener
-                port = l._port.getHost().port
-                self.helper_webish_url = "http://localhost:%d/" % port
+                self.helper_webish_url = self.clients[3].getServiceNamed("webish").getURL()
         d.addCallback(_connected)
         return d
 
@@ -533,7 +630,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
         def _stopped(res):
             new_c = client.Client(basedir=self.getdir("client%d" % num))
             self.clients[num] = new_c
-            new_c.set_default_mutable_keysize(522)
+            new_c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
             self.add_service(new_c)
             return new_c.when_tub_ready()
         d.addCallback(_stopped)
@@ -541,9 +638,7 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
         def _maybe_get_webport(res):
             if num == 0:
                 # now find out where the web port was
-                l = self.clients[0].getServiceNamed("webish").listener
-                port = l._port.getHost().port
-                self.webish_url = "http://localhost:%d/" % port
+                self.webish_url = self.clients[0].getServiceNamed("webish").getURL()
         d.addCallback(_maybe_get_webport)
         return d
 
@@ -555,15 +650,15 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
         basedir = self.getdir("client%d" % client_num)
         if not os.path.isdir(basedir):
             fileutil.make_dirs(basedir)
-        open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
+        config = "[client]\n"
+        config += "introducer.furl = %s\n" % self.introducer_furl
         if helper_furl:
-            f = open(os.path.join(basedir, "helper.furl") ,"w")
-            f.write(helper_furl+"\n")
-            f.close()
+            config += "helper.furl = %s\n" % helper_furl
+        fileutil.write(os.path.join(basedir, 'tahoe.cfg'), config)
 
         c = client.Client(basedir=basedir)
         self.clients.append(c)
-        c.set_default_mutable_keysize(522)
+        c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
         self.numclients += 1
         if add_to_sparent:
             c.setServiceParent(self.sparent)
@@ -578,14 +673,14 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
             if not c.connected_to_introducer():
                 return False
             sb = c.get_storage_broker()
-            if len(sb.get_all_servers()) != self.numclients:
+            if len(sb.get_connected_servers()) != self.numclients:
+                return False
+            up = c.getServiceNamed("uploader")
+            if up._helper_furl and not up._helper:
                 return False
         return True
 
     def wait_for_connections(self, ignored=None):
-        # TODO: replace this with something that takes a list of peerids and
-        # fires when they've all been heard from, instead of using a count
-        # and a threshold
         return self.poll(self._check_connections, timeout=200)
 
 
@@ -923,152 +1018,6 @@ N8L+bvLd4BU9g6hRS8b59lQ6GNjryx2bUnCVtLcey4Jd
 
 TEST_DATA="\x02"*(immutable.upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
 
-class ShareManglingMixin(SystemTestMixin):
-
-    def setUp(self):
-        # Set self.basedir to a temp dir which has the name of the current
-        # test method in its name.
-        self.basedir = self.mktemp()
-
-        d = defer.maybeDeferred(SystemTestMixin.setUp, self)
-        d.addCallback(lambda x: self.set_up_nodes())
-
-        def _upload_a_file(ignored):
-            cl0 = self.clients[0]
-            # We need multiple segments to test crypttext hash trees that are
-            # non-trivial (i.e. they have more than just one hash in them).
-            cl0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
-            # Tests that need to test servers of happiness using this should
-            # set their own value for happy -- the default (7) breaks stuff.
-            cl0.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
-            d2 = cl0.upload(immutable.upload.Data(TEST_DATA, convergence=""))
-            def _after_upload(u):
-                filecap = u.uri
-                self.n = self.clients[1].create_node_from_uri(filecap)
-                self.uri = uri.CHKFileURI.init_from_string(filecap)
-                return cl0.create_node_from_uri(filecap)
-            d2.addCallback(_after_upload)
-            return d2
-        d.addCallback(_upload_a_file)
-
-        def _stash_it(filenode):
-            self.filenode = filenode
-        d.addCallback(_stash_it)
-        return d
-
-    def find_shares(self, unused=None):
-        """Locate shares on disk. Returns a dict that maps
-        (clientnum,sharenum) to a string that contains the share container
-        (copied directly from the disk, containing leases etc). You can
-        modify this dict and then call replace_shares() to modify the shares.
-        """
-        shares = {} # k: (i, sharenum), v: data
-
-        for i, c in enumerate(self.clients):
-            sharedir = c.getServiceNamed("storage").sharedir
-            for (dirp, dirns, fns) in os.walk(sharedir):
-                for fn in fns:
-                    try:
-                        sharenum = int(fn)
-                    except TypeError:
-                        # Whoops, I guess that's not a share file then.
-                        pass
-                    else:
-                        data = open(os.path.join(sharedir, dirp, fn), "rb").read()
-                        shares[(i, sharenum)] = data
-
-        return shares
-
-    def replace_shares(self, newshares, storage_index):
-        """Replace shares on disk. Takes a dictionary in the same form
-        as find_shares() returns."""
-
-        for i, c in enumerate(self.clients):
-            sharedir = c.getServiceNamed("storage").sharedir
-            for (dirp, dirns, fns) in os.walk(sharedir):
-                for fn in fns:
-                    try:
-                        sharenum = int(fn)
-                    except TypeError:
-                        # Whoops, I guess that's not a share file then.
-                        pass
-                    else:
-                        pathtosharefile = os.path.join(sharedir, dirp, fn)
-                        os.unlink(pathtosharefile)
-            for ((clientnum, sharenum), newdata) in newshares.iteritems():
-                if clientnum == i:
-                    fullsharedirp=os.path.join(sharedir, storage_index_to_dir(storage_index))
-                    fileutil.make_dirs(fullsharedirp)
-                    wf = open(os.path.join(fullsharedirp, str(sharenum)), "wb")
-                    wf.write(newdata)
-                    wf.close()
-
-    def _delete_a_share(self, unused=None, sharenum=None):
-        """ Delete one share. """
-
-        shares = self.find_shares()
-        ks = shares.keys()
-        if sharenum is not None:
-            k = [ key for key in shares.keys() if key[1] == sharenum ][0]
-        else:
-            k = random.choice(ks)
-        del shares[k]
-        self.replace_shares(shares, storage_index=self.uri.get_storage_index())
-
-        return unused
-
-    def _corrupt_a_share(self, unused, corruptor_func, sharenum):
-        shares = self.find_shares()
-        ks = [ key for key in shares.keys() if key[1] == sharenum ]
-        assert ks, (shares.keys(), sharenum)
-        k = ks[0]
-        shares[k] = corruptor_func(shares[k])
-        self.replace_shares(shares, storage_index=self.uri.get_storage_index())
-        return corruptor_func
-
-    def _corrupt_all_shares(self, unused, corruptor_func):
-        """ All shares on disk will be corrupted by corruptor_func. """
-        shares = self.find_shares()
-        for k in shares.keys():
-            self._corrupt_a_share(unused, corruptor_func, k[1])
-        return corruptor_func
-
-    def _corrupt_a_random_share(self, unused, corruptor_func):
-        """ Exactly one share on disk will be corrupted by corruptor_func. """
-        shares = self.find_shares()
-        ks = shares.keys()
-        k = random.choice(ks)
-        self._corrupt_a_share(unused, corruptor_func, k[1])
-        return k[1]
-
-    def _count_reads(self):
-        sum_of_read_counts = 0
-        for thisclient in self.clients:
-            counters = thisclient.stats_provider.get_stats()['counters']
-            sum_of_read_counts += counters.get('storage_server.read', 0)
-        return sum_of_read_counts
-
-    def _count_allocates(self):
-        sum_of_allocate_counts = 0
-        for thisclient in self.clients:
-            counters = thisclient.stats_provider.get_stats()['counters']
-            sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
-        return sum_of_allocate_counts
-
-    def _count_writes(self):
-        sum_of_write_counts = 0
-        for thisclient in self.clients:
-            counters = thisclient.stats_provider.get_stats()['counters']
-            sum_of_write_counts += counters.get('storage_server.write', 0)
-        return sum_of_write_counts
-
-    def _download_and_check_plaintext(self, unused=None):
-        d = download_to_data(self.n)
-        def _after_download(result):
-            self.failUnlessEqual(result, TEST_DATA)
-        d.addCallback(_after_download)
-        return d
-
 class ShouldFailMixin:
     def shouldFail(self, expected_failure, which, substring,
                    callable, *args, **kwargs):
@@ -1102,8 +1051,8 @@ class ShouldFailMixin:
                 if substring:
                     message = repr(res.value.args[0])
                     self.failUnless(substring in message,
-                                    "substring '%s' not in '%s'"
-                                    % (substring, message))
+                                    "%s: substring '%s' not in '%s'"
+                                    % (which, substring, message))
             else:
                 self.fail("%s was supposed to raise %s, not get '%s'" %
                           (which, expected_failure, res))
@@ -1136,17 +1085,17 @@ class WebErrorMixin:
         assert callable
         def _validate(f):
             if code is not None:
-                self.failUnlessEqual(f.value.status, str(code))
+                self.failUnlessEqual(f.value.status, str(code), which)
             if substring:
                 code_string = str(f)
                 self.failUnless(substring in code_string,
-                                "substring '%s' not in '%s'"
-                                % (substring, code_string))
+                                "%s: substring '%s' not in '%s'"
+                                % (which, substring, code_string))
             response_body = f.value.response
             if response_substring:
                 self.failUnless(response_substring in response_body,
-                                "response substring '%s' not in '%s'"
-                                % (response_substring, response_body))
+                                "%s: response substring '%s' not in '%s'"
+                                % (which, response_substring, response_body))
             return response_body
         d = defer.maybeDeferred(callable, *args, **kwargs)
         d.addBoth(self._shouldHTTPError, which, _validate)
@@ -1349,6 +1298,22 @@ def _corrupt_share_data(data, debug=False):
 
         return corrupt_field(data, 0x0c+0x44, sharedatasize)
 
+def _corrupt_share_data_last_byte(data, debug=False):
+    """Scramble the file data -- flip all bits of the last byte."""
+    sharevernum = struct.unpack(">L", data[0x0c:0x0c+4])[0]
+    assert sharevernum in (1, 2), "This test is designed to corrupt immutable shares of v1 or v2 in specific ways, not v%d." % sharevernum
+    if sharevernum == 1:
+        sharedatasize = struct.unpack(">L", data[0x0c+0x08:0x0c+0x08+4])[0]
+        offset = 0x0c+0x24+sharedatasize-1
+    else:
+        sharedatasize = struct.unpack(">Q", data[0x0c+0x08:0x0c+0x0c+8])[0]
+        offset = 0x0c+0x44+sharedatasize-1
+
+    newdata = data[:offset] + chr(ord(data[offset])^0xFF) + data[offset+1:]
+    if debug:
+        log.msg("testing: flipping all bits of byte at offset %d: %r, newdata: %r" % (offset, data[offset], newdata[offset]))
+    return newdata
+
 def _corrupt_crypttext_hash_tree(data, debug=False):
     """Scramble the file data -- the field containing the crypttext hash tree
     will have one bit flipped or else will be changed to a random value.