]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/test_mutable.py
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_mutable.py
index 212cd5dafff0a9bdab002c1f1592ed4404c3382e..ce36d12cd74616dfbd65cc106fdcfc4b31867cba 100644 (file)
@@ -1,20 +1,20 @@
 
-import os, struct
+import struct
 from cStringIO import StringIO
 from twisted.trial import unittest
 from twisted.internet import defer, reactor
-from twisted.python import failure
-from allmydata import uri
-from allmydata.storage.server import StorageServer
+from allmydata import uri, client
+from allmydata.nodemaker import NodeMaker
 from allmydata.immutable import download
-from allmydata.util import base32, idlib
+from allmydata.util import base32
 from allmydata.util.idlib import shortnodeid_b2a
-from allmydata.util.hashutil import tagged_hash
-from allmydata.util.fileutil import make_dirs
-from allmydata.interfaces import IURI, IMutableFileURI, IUploadable, \
-     NotEnoughSharesError, IRepairResults, ICheckAndRepairResults
+from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \
+     ssk_pubkey_fingerprint_hash
+from allmydata.interfaces import IRepairResults, ICheckAndRepairResults, \
+     NotEnoughSharesError
 from allmydata.monitor import Monitor
 from allmydata.test.common import ShouldFailMixin
+from allmydata.test.no_network import GridTestMixin
 from foolscap.api import eventually, fireEventually
 from foolscap.logging import log
 from allmydata.storage_client import StorageFarmBroker
@@ -32,13 +32,6 @@ from allmydata.mutable.repairer import MustForceRepairError
 
 import common_util as testutil
 
-# this "FastMutableFileNode" exists solely to speed up tests by using smaller
-# public/private keys. Once we switch to fast DSA-based keys, we can get rid
-# of this.
-
-class FastMutableFileNode(MutableFileNode):
-    SIGNATURE_KEY_SIZE = 522
-
 # this "FakeStorage" exists to put the share data in RAM and avoid using real
 # network connections, both to speed up the tests and to reduce the amount of
 # non-mutable.py code being exercised.
@@ -64,18 +57,9 @@ class FakeStorage:
         self._sequence = None
         self._pending = {}
         self._pending_timer = None
-        self._special_answers = {}
 
     def read(self, peerid, storage_index):
         shares = self._peers.get(peerid, {})
-        if self._special_answers.get(peerid, []):
-            mode = self._special_answers[peerid].pop(0)
-            if mode == "fail":
-                shares = failure.Failure(IntentionalError())
-            elif mode == "none":
-                shares = {}
-            elif mode == "normal":
-                pass
         if self._sequence is None:
             return defer.succeed(shares)
         d = defer.Deferred()
@@ -162,73 +146,6 @@ class FakeStorageServer:
         return fireEventually(answer)
 
 
-# our "FakeClient" has just enough functionality of the real Client to let
-# the tests run.
-
-class FakeClient:
-    mutable_file_node_class = FastMutableFileNode
-
-    def __init__(self, num_peers=10):
-        self._storage = FakeStorage()
-        self._num_peers = num_peers
-        peerids = [tagged_hash("peerid", "%d" % i)[:20]
-                   for i in range(self._num_peers)]
-        self.nodeid = "fakenodeid"
-        self.storage_broker = StorageFarmBroker(None, True)
-        for peerid in peerids:
-            fss = FakeStorageServer(peerid, self._storage)
-            self.storage_broker.test_add_server(peerid, fss)
-
-    def get_storage_broker(self):
-        return self.storage_broker
-    def debug_break_connection(self, peerid):
-        self.storage_broker.test_servers[peerid].broken = True
-    def debug_remove_connection(self, peerid):
-        self.storage_broker.test_servers.pop(peerid)
-    def debug_get_connection(self, peerid):
-        return self.storage_broker.test_servers[peerid]
-
-    def get_encoding_parameters(self):
-        return {"k": 3, "n": 10}
-
-    def log(self, msg, **kw):
-        return log.msg(msg, **kw)
-
-    def get_renewal_secret(self):
-        return "I hereby permit you to renew my files"
-    def get_cancel_secret(self):
-        return "I hereby permit you to cancel my leases"
-
-    def create_mutable_file(self, contents=""):
-        n = self.mutable_file_node_class(self)
-        d = n.create(contents)
-        d.addCallback(lambda res: n)
-        return d
-
-    def get_history(self):
-        return None
-
-    def create_node_from_uri(self, u, readcap=None):
-        if not u:
-            u = readcap
-        u = IURI(u)
-        assert IMutableFileURI.providedBy(u), u
-        res = self.mutable_file_node_class(self).init_from_uri(u)
-        return res
-
-    def upload(self, uploadable):
-        assert IUploadable.providedBy(uploadable)
-        d = uploadable.get_size()
-        d.addCallback(lambda length: uploadable.read(length))
-        #d.addCallback(self.create_mutable_file)
-        def _got_data(datav):
-            data = "".join(datav)
-            #newnode = FastMutableFileNode(self)
-            return uri.LiteralFileURI(data)
-        d.addCallback(_got_data)
-        return d
-
-
 def flip_bit(original, byte_offset):
     return (original[:byte_offset] +
             chr(ord(original[byte_offset]) ^ 0x01) +
@@ -266,28 +183,50 @@ def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0):
             shares[shnum] = flip_bit(data, real_offset)
     return res
 
+def make_storagebroker(s=None, num_peers=10):
+    if not s:
+        s = FakeStorage()
+    peerids = [tagged_hash("peerid", "%d" % i)[:20]
+               for i in range(num_peers)]
+    storage_broker = StorageFarmBroker(None, True)
+    for peerid in peerids:
+        fss = FakeStorageServer(peerid, s)
+        storage_broker.test_add_server(peerid, fss)
+    return storage_broker
+
+def make_nodemaker(s=None, num_peers=10):
+    storage_broker = make_storagebroker(s, num_peers)
+    sh = client.SecretHolder("lease secret")
+    keygen = client.KeyGenerator()
+    keygen.set_default_keysize(522)
+    nodemaker = NodeMaker(storage_broker, sh, None,
+                          None, None, None,
+                          {"k": 3, "n": 10}, keygen)
+    return nodemaker
+
 class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
     # this used to be in Publish, but we removed the limit. Some of
     # these tests test whether the new code correctly allows files
     # larger than the limit.
     OLD_MAX_SEGMENT_SIZE = 3500000
     def setUp(self):
-        self.client = FakeClient()
+        self._storage = s = FakeStorage()
+        self.nodemaker = make_nodemaker(s)
 
     def test_create(self):
-        d = self.client.create_mutable_file()
+        d = self.nodemaker.create_mutable_file()
         def _created(n):
-            self.failUnless(isinstance(n, FastMutableFileNode))
+            self.failUnless(isinstance(n, MutableFileNode))
             self.failUnlessEqual(n.get_storage_index(), n._storage_index)
-            sb = self.client.get_storage_broker()
+            sb = self.nodemaker.storage_broker
             peer0 = sorted(sb.get_all_serverids())[0]
-            shnums = self.client._storage._peers[peer0].keys()
+            shnums = self._storage._peers[peer0].keys()
             self.failUnlessEqual(len(shnums), 1)
         d.addCallback(_created)
         return d
 
     def test_serialize(self):
-        n = MutableFileNode(self.client)
+        n = MutableFileNode(None, None, {"k": 3, "n": 10}, None)
         calls = []
         def _callback(*args, **kwargs):
             self.failUnlessEqual(args, (4,) )
@@ -308,7 +247,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
         return d
 
     def test_upload_and_download(self):
-        d = self.client.create_mutable_file()
+        d = self.nodemaker.create_mutable_file()
         def _created(n):
             d = defer.succeed(None)
             d.addCallback(lambda res: n.get_servermap(MODE_READ))
@@ -349,7 +288,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
         return d
 
     def test_create_with_initial_contents(self):
-        d = self.client.create_mutable_file("contents 1")
+        d = self.nodemaker.create_mutable_file("contents 1")
         def _created(n):
             d = n.download_best_version()
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
@@ -362,7 +301,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
 
     def test_create_with_too_large_contents(self):
         BIG = "a" * (self.OLD_MAX_SEGMENT_SIZE + 1)
-        d = self.client.create_mutable_file(BIG)
+        d = self.nodemaker.create_mutable_file(BIG)
         def _created(n):
             d = n.overwrite(BIG)
             return d
@@ -402,7 +341,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
                 raise UncoordinatedWriteError("simulated")
             return old_contents
 
-        d = self.client.create_mutable_file("line1")
+        d = self.nodemaker.create_mutable_file("line1")
         def _created(n):
             d = n.modify(_modifier)
             d.addCallback(lambda res: n.download_best_version())
@@ -484,7 +423,7 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
         giveuper._delay = 0.1
         giveuper.factor = 1
 
-        d = self.client.create_mutable_file("line1")
+        d = self.nodemaker.create_mutable_file("line1")
         def _created(n):
             d = n.modify(_modifier)
             d.addCallback(lambda res: n.download_best_version())
@@ -526,8 +465,8 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
         return d
 
     def test_upload_and_download_full_size_keys(self):
-        self.client.mutable_file_node_class = MutableFileNode
-        d = self.client.create_mutable_file()
+        self.nodemaker.key_generator = client.KeyGenerator()
+        d = self.nodemaker.create_mutable_file()
         def _created(n):
             d = defer.succeed(None)
             d.addCallback(lambda res: n.get_servermap(MODE_READ))
@@ -559,12 +498,11 @@ class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
 
 class MakeShares(unittest.TestCase):
     def test_encrypt(self):
-        c = FakeClient()
-        fn = FastMutableFileNode(c)
+        nm = make_nodemaker()
         CONTENTS = "some initial contents"
-        d = fn.create(CONTENTS)
-        def _created(res):
-            p = Publish(fn, None)
+        d = nm.create_mutable_file(CONTENTS)
+        def _created(fn):
+            p = Publish(fn, nm.storage_broker, None)
             p.salt = "SALT" * 4
             p.readkey = "\x00" * 16
             p.newdata = CONTENTS
@@ -584,12 +522,12 @@ class MakeShares(unittest.TestCase):
         return d
 
     def test_generate(self):
-        c = FakeClient()
-        fn = FastMutableFileNode(c)
+        nm = make_nodemaker()
         CONTENTS = "some initial contents"
-        d = fn.create(CONTENTS)
-        def _created(res):
-            p = Publish(fn, None)
+        d = nm.create_mutable_file(CONTENTS)
+        def _created(fn):
+            self._fn = fn
+            p = Publish(fn, nm.storage_broker, None)
             self._p = p
             p.newdata = CONTENTS
             p.required_shares = 3
@@ -641,7 +579,7 @@ class MakeShares(unittest.TestCase):
                 self.failUnlessEqual(len(block_hash_tree), 1) # very small tree
                 self.failUnlessEqual(IV, "SALT"*4)
                 self.failUnlessEqual(len(share_data), len("%07d" % 1))
-                self.failUnlessEqual(enc_privkey, fn.get_encprivkey())
+                self.failUnlessEqual(enc_privkey, self._fn.get_encprivkey())
         d.addCallback(_generated)
         return d
 
@@ -655,14 +593,16 @@ class PublishMixin:
         # later.
         self.CONTENTS = "New contents go here" * 1000
         num_peers = 20
-        self._client = FakeClient(num_peers)
-        self._storage = self._client._storage
-        d = self._client.create_mutable_file(self.CONTENTS)
+        self._storage = FakeStorage()
+        self._nodemaker = make_nodemaker(self._storage)
+        self._storage_broker = self._nodemaker.storage_broker
+        d = self._nodemaker.create_mutable_file(self.CONTENTS)
         def _created(node):
             self._fn = node
-            self._fn2 = self._client.create_node_from_uri(node.get_uri())
+            self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
         d.addCallback(_created)
         return d
+
     def publish_multiple(self):
         self.CONTENTS = ["Contents 0",
                          "Contents 1",
@@ -671,9 +611,9 @@ class PublishMixin:
                          "Contents 3b"]
         self._copied_shares = {}
         num_peers = 20
-        self._client = FakeClient(num_peers)
-        self._storage = self._client._storage
-        d = self._client.create_mutable_file(self.CONTENTS[0]) # seqnum=1
+        self._storage = FakeStorage()
+        self._nodemaker = make_nodemaker(self._storage)
+        d = self._nodemaker.create_mutable_file(self.CONTENTS[0]) # seqnum=1
         def _created(node):
             self._fn = node
             # now create multiple versions of the same file, and accumulate
@@ -698,7 +638,7 @@ class PublishMixin:
         return d
 
     def _copy_shares(self, ignored, index):
-        shares = self._client._storage._peers
+        shares = self._storage._peers
         # we need a deep copy
         new_shares = {}
         for peerid in shares:
@@ -711,7 +651,7 @@ class PublishMixin:
         # versionmap maps shnums to which version (0,1,2,3,4) we want the
         # share to be at. Any shnum which is left out of the map will stay at
         # its current version.
-        shares = self._client._storage._peers
+        shares = self._storage._peers
         oldshares = self._copied_shares
         for peerid in shares:
             for shnum in shares[peerid]:
@@ -724,15 +664,19 @@ class Servermap(unittest.TestCase, PublishMixin):
     def setUp(self):
         return self.publish_one()
 
-    def make_servermap(self, mode=MODE_CHECK, fn=None):
+    def make_servermap(self, mode=MODE_CHECK, fn=None, sb=None):
         if fn is None:
             fn = self._fn
-        smu = ServermapUpdater(fn, Monitor(), ServerMap(), mode)
+        if sb is None:
+            sb = self._storage_broker
+        smu = ServermapUpdater(fn, sb, Monitor(),
+                               ServerMap(), mode)
         d = smu.update()
         return d
 
     def update_servermap(self, oldmap, mode=MODE_CHECK):
-        smu = ServermapUpdater(self._fn, Monitor(), oldmap, mode)
+        smu = ServermapUpdater(self._fn, self._storage_broker, Monitor(),
+                               oldmap, mode)
         d = smu.update()
         return d
 
@@ -792,9 +736,9 @@ class Servermap(unittest.TestCase, PublishMixin):
         # create a new file, which is large enough to knock the privkey out
         # of the early part of the file
         LARGE = "These are Larger contents" * 200 # about 5KB
-        d.addCallback(lambda res: self._client.create_mutable_file(LARGE))
+        d.addCallback(lambda res: self._nodemaker.create_mutable_file(LARGE))
         def _created(large_fn):
-            large_fn2 = self._client.create_node_from_uri(large_fn.get_uri())
+            large_fn2 = self._nodemaker.create_from_cap(large_fn.get_uri())
             return self.make_servermap(MODE_WRITE, large_fn2)
         d.addCallback(_created)
         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
@@ -843,7 +787,7 @@ class Servermap(unittest.TestCase, PublishMixin):
         self.failUnlessEqual(len(sm.shares_available()), 0)
 
     def test_no_shares(self):
-        self._client._storage._peers = {} # delete all shares
+        self._storage._peers = {} # delete all shares
         ms = self.make_servermap
         d = defer.succeed(None)
 
@@ -871,7 +815,7 @@ class Servermap(unittest.TestCase, PublishMixin):
         return sm
 
     def test_not_quite_enough_shares(self):
-        s = self._client._storage
+        s = self._storage
         ms = self.make_servermap
         num_shares = len(s._peers)
         for peerid in s._peers:
@@ -903,10 +847,12 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
     def setUp(self):
         return self.publish_one()
 
-    def make_servermap(self, mode=MODE_READ, oldmap=None):
+    def make_servermap(self, mode=MODE_READ, oldmap=None, sb=None):
         if oldmap is None:
             oldmap = ServerMap()
-        smu = ServermapUpdater(self._fn, Monitor(), oldmap, mode)
+        if sb is None:
+            sb = self._storage_broker
+        smu = ServermapUpdater(self._fn, sb, Monitor(), oldmap, mode)
         d = smu.update()
         return d
 
@@ -979,11 +925,10 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
         return d
 
     def test_no_servers(self):
-        c2 = FakeClient(0)
-        self._fn._client = c2
+        sb2 = make_storagebroker(num_peers=0)
         # if there are no servers, then a MODE_READ servermap should come
         # back empty
-        d = self.make_servermap()
+        d = self.make_servermap(sb=sb2)
         def _check_servermap(servermap):
             self.failUnlessEqual(servermap.best_recoverable_version(), None)
             self.failIf(servermap.recoverable_versions())
@@ -994,8 +939,8 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
     test_no_servers.timeout = 15
 
     def test_no_servers_download(self):
-        c2 = FakeClient(0)
-        self._fn._client = c2
+        sb2 = make_storagebroker(num_peers=0)
+        self._fn._storage_broker = sb2
         d = self.shouldFail(UnrecoverableFileError,
                             "test_no_servers_download",
                             "no recoverable versions",
@@ -1005,7 +950,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
             # anybody should not prevent a subsequent download from working.
             # This isn't quite the webapi-driven test that #463 wants, but it
             # should be close enough.
-            self._fn._client = self._client
+            self._fn._storage_broker = self._storage_broker
             return self._fn.download_best_version()
         def _retrieved(new_contents):
             self.failUnlessEqual(new_contents, self.CONTENTS)
@@ -1489,13 +1434,17 @@ class Repair(unittest.TestCase, PublishMixin, ShouldFailMixin):
         d.addCallback(_check_results)
         return d
 
+class DevNullDictionary(dict):
+    def __setitem__(self, key, value):
+        return
+
 class MultipleEncodings(unittest.TestCase):
     def setUp(self):
         self.CONTENTS = "New contents go here"
-        num_peers = 20
-        self._client = FakeClient(num_peers)
-        self._storage = self._client._storage
-        d = self._client.create_mutable_file(self.CONTENTS)
+        self._storage = FakeStorage()
+        self._nodemaker = make_nodemaker(self._storage, num_peers=20)
+        self._storage_broker = self._nodemaker.storage_broker
+        d = self._nodemaker.create_mutable_file(self.CONTENTS)
         def _created(node):
             self._fn = node
         d.addCallback(_created)
@@ -1504,11 +1453,11 @@ class MultipleEncodings(unittest.TestCase):
     def _encode(self, k, n, data):
         # encode 'data' into a peerid->shares dict.
 
-        fn2 = FastMutableFileNode(self._client)
-        # init_from_uri populates _uri, _writekey, _readkey, _storage_index,
-        # and _fingerprint
         fn = self._fn
-        fn2.init_from_uri(fn.get_uri())
+        # disable the nodecache, since for these tests we explicitly need
+        # multiple nodes pointing at the same file
+        self._nodemaker._node_cache = DevNullDictionary()
+        fn2 = self._nodemaker.create_from_cap(fn.get_uri())
         # then we copy over other fields that are normally fetched from the
         # existing shares
         fn2._pubkey = fn._pubkey
@@ -1518,9 +1467,9 @@ class MultipleEncodings(unittest.TestCase):
         fn2._required_shares = k
         fn2._total_shares = n
 
-        s = self._client._storage
+        s = self._storage
         s._peers = {} # clear existing storage
-        p2 = Publish(fn2, None)
+        p2 = Publish(fn2, self._storage_broker, None)
         d = p2.publish(data)
         def _published(res):
             shares = s._peers
@@ -1532,7 +1481,8 @@ class MultipleEncodings(unittest.TestCase):
     def make_servermap(self, mode=MODE_READ, oldmap=None):
         if oldmap is None:
             oldmap = ServerMap()
-        smu = ServermapUpdater(self._fn, Monitor(), oldmap, mode)
+        smu = ServermapUpdater(self._fn, self._storage_broker, Monitor(),
+                               oldmap, mode)
         d = smu.update()
         return d
 
@@ -1547,8 +1497,7 @@ class MultipleEncodings(unittest.TestCase):
 
         # we make a retrieval object that doesn't know what encoding
         # parameters to use
-        fn3 = FastMutableFileNode(self._client)
-        fn3.init_from_uri(self._fn.get_uri())
+        fn3 = self._nodemaker.create_from_cap(self._fn.get_uri())
 
         # now we upload a file through fn1, and grab its shares
         d = self._encode(3, 10, contents1)
@@ -1591,7 +1540,7 @@ class MultipleEncodings(unittest.TestCase):
             places = [2, 2, 3, 2, 1, 1, 1, 2]
 
             sharemap = {}
-            sb = self._client.get_storage_broker()
+            sb = self._storage_broker
 
             for peerid in sorted(sb.get_all_serverids()):
                 peerid_s = shortnodeid_b2a(peerid)
@@ -1600,7 +1549,7 @@ class MultipleEncodings(unittest.TestCase):
                         which = places[shnum]
                     else:
                         which = "x"
-                    self._client._storage._peers[peerid] = peers = {}
+                    self._storage._peers[peerid] = peers = {}
                     in_1 = shnum in self._shares1[peerid]
                     in_2 = shnum in self._shares2.get(peerid, {})
                     in_3 = shnum in self._shares3.get(peerid, {})
@@ -1622,7 +1571,7 @@ class MultipleEncodings(unittest.TestCase):
             # now sort the sequence so that share 0 is returned first
             new_sequence = [sharemap[shnum]
                             for shnum in sorted(sharemap.keys())]
-            self._client._storage._sequence = new_sequence
+            self._storage._sequence = new_sequence
             log.msg("merge done")
         d.addCallback(_merge)
         d.addCallback(lambda res: fn3.download_best_version())
@@ -1789,50 +1738,43 @@ class Exceptions(unittest.TestCase):
         ucwe = UncoordinatedWriteError()
         self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
 
-# we can't do this test with a FakeClient, since it uses FakeStorageServer
-# instances which always succeed. So we need a less-fake one.
-
-class IntentionalError(Exception):
-    pass
-
-class LocalWrapper:
-    def __init__(self, original):
-        self.original = original
-        self.broken = False
-        self.post_call_notifier = None
-    def callRemote(self, methname, *args, **kwargs):
-        def _call():
-            if self.broken:
-                raise IntentionalError("I was asked to break")
-            meth = getattr(self.original, "remote_" + methname)
-            return meth(*args, **kwargs)
-        d = fireEventually()
-        d.addCallback(lambda res: _call())
-        if self.post_call_notifier:
-            d.addCallback(self.post_call_notifier, methname)
-        return d
-
-class LessFakeClient(FakeClient):
-
-    def __init__(self, basedir, num_peers=10):
-        self._num_peers = num_peers
-        peerids = [tagged_hash("peerid", "%d" % i)[:20]
-                   for i in range(self._num_peers)]
-        self.storage_broker = StorageFarmBroker(None, True)
-        for peerid in peerids:
-            peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
-            make_dirs(peerdir)
-            ss = StorageServer(peerdir, peerid)
-            lw = LocalWrapper(ss)
-            self.storage_broker.test_add_server(peerid, lw)
-        self.nodeid = "fakenodeid"
-
-
-class Problems(unittest.TestCase, testutil.ShouldFailMixin):
+class SameKeyGenerator:
+    def __init__(self, pubkey, privkey):
+        self.pubkey = pubkey
+        self.privkey = privkey
+    def generate(self, keysize=None):
+        return defer.succeed( (self.pubkey, self.privkey) )
+
+class FirstServerGetsKilled:
+    done = False
+    def notify(self, retval, wrapper, methname):
+        if not self.done:
+            wrapper.broken = True
+            self.done = True
+        return retval
+
+class FirstServerGetsDeleted:
+    def __init__(self):
+        self.done = False
+        self.silenced = None
+    def notify(self, retval, wrapper, methname):
+        if not self.done:
+            # this query will work, but later queries should think the share
+            # has been deleted
+            self.done = True
+            self.silenced = wrapper
+            return retval
+        if wrapper == self.silenced:
+            assert methname == "slot_testv_and_readv_and_writev"
+            return (True, {})
+        return retval
+
+class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
     def test_publish_surprise(self):
-        basedir = os.path.join("mutable/CollidingWrites/test_surprise")
-        self.client = LessFakeClient(basedir)
-        d = self.client.create_mutable_file("contents 1")
+        self.basedir = "mutable/Problems/test_publish_surprise"
+        self.set_up_grid()
+        nm = self.g.clients[0].nodemaker
+        d = nm.create_mutable_file("contents 1")
         def _created(n):
             d = defer.succeed(None)
             d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
@@ -1857,9 +1799,10 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
         return d
 
     def test_retrieve_surprise(self):
-        basedir = os.path.join("mutable/CollidingWrites/test_retrieve")
-        self.client = LessFakeClient(basedir)
-        d = self.client.create_mutable_file("contents 1")
+        self.basedir = "mutable/Problems/test_retrieve_surprise"
+        self.set_up_grid()
+        nm = self.g.clients[0].nodemaker
+        d = nm.create_mutable_file("contents 1")
         def _created(n):
             d = defer.succeed(None)
             d.addCallback(lambda res: n.get_servermap(MODE_READ))
@@ -1893,9 +1836,10 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
         # upload using the old servermap. The last upload should fail with an
         # UncoordinatedWriteError, because of the shares that didn't appear
         # in the servermap.
-        basedir = os.path.join("mutable/CollidingWrites/test_unexpexted_shares")
-        self.client = LessFakeClient(basedir)
-        d = self.client.create_mutable_file("contents 1")
+        self.basedir = "mutable/Problems/test_unexpected_shares"
+        self.set_up_grid()
+        nm = self.g.clients[0].nodemaker
+        d = nm.create_mutable_file("contents 1")
         def _created(n):
             d = defer.succeed(None)
             d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
@@ -1904,7 +1848,7 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
                 self.old_map = smap
                 # now shut down one of the servers
                 peer0 = list(smap.make_sharemap()[0])[0]
-                self.client.debug_remove_connection(peer0)
+                self.g.remove_server(peer0)
                 # then modify the file, leaving the old map untouched
                 log.msg("starting winning write")
                 return n.overwrite("contents 2")
@@ -1926,43 +1870,57 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
         # Break one server, then create the file: the initial publish should
         # complete with an alternate server. Breaking a second server should
         # not prevent an update from succeeding either.
-        basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
-        self.client = LessFakeClient(basedir, 20)
+        self.basedir = "mutable/Problems/test_bad_server"
+        self.set_up_grid()
+        nm = self.g.clients[0].nodemaker
+
         # to make sure that one of the initial peers is broken, we have to
-        # get creative. We create the keys, so we can figure out the storage
-        # index, but we hold off on doing the initial publish until we've
-        # broken the server on which the first share wants to be stored.
-        n = FastMutableFileNode(self.client)
-        d = defer.succeed(None)
-        d.addCallback(n._generate_pubprivkeys, keysize=522)
-        d.addCallback(n._generated)
+        # get creative. We create an RSA key and compute its storage-index.
+        # Then we make a KeyGenerator that always returns that one key, and
+        # use it to create the mutable file. This will get easier when we can
+        # use #467 static-server-selection to disable permutation and force
+        # the choice of server for share[0].
+
+        d = nm.key_generator.generate(522)
+        def _got_key( (pubkey, privkey) ):
+            nm.key_generator = SameKeyGenerator(pubkey, privkey)
+            pubkey_s = pubkey.serialize()
+            privkey_s = privkey.serialize()
+            u = uri.WriteableSSKFileURI(ssk_writekey_hash(privkey_s),
+                                        ssk_pubkey_fingerprint_hash(pubkey_s))
+            self._storage_index = u.storage_index
+        d.addCallback(_got_key)
         def _break_peer0(res):
-            si = n.get_storage_index()
-            peerlist = self.client.storage_broker.get_servers_for_index(si)
+            si = self._storage_index
+            peerlist = nm.storage_broker.get_servers_for_index(si)
             peerid0, connection0 = peerlist[0]
             peerid1, connection1 = peerlist[1]
             connection0.broken = True
             self.connection1 = connection1
         d.addCallback(_break_peer0)
-        # now let the initial publish finally happen
-        d.addCallback(lambda res: n._upload("contents 1", None))
+        # now "create" the file, using the pre-established key, and let the
+        # initial publish finally happen
+        d.addCallback(lambda res: nm.create_mutable_file("contents 1"))
         # that ought to work
-        d.addCallback(lambda res: n.download_best_version())
-        d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
-        # now break the second peer
-        def _break_peer1(res):
-            self.connection1.broken = True
-        d.addCallback(_break_peer1)
-        d.addCallback(lambda res: n.overwrite("contents 2"))
-        # that ought to work too
-        d.addCallback(lambda res: n.download_best_version())
-        d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
-        def _explain_error(f):
-            print f
-            if f.check(NotEnoughServersError):
-                print "first_error:", f.value.first_error
-            return f
-        d.addErrback(_explain_error)
+        def _got_node(n):
+            d = n.download_best_version()
+            d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
+            # now break the second peer
+            def _break_peer1(res):
+                self.connection1.broken = True
+            d.addCallback(_break_peer1)
+            d.addCallback(lambda res: n.overwrite("contents 2"))
+            # that ought to work too
+            d.addCallback(lambda res: n.download_best_version())
+            d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+            def _explain_error(f):
+                print f
+                if f.check(NotEnoughServersError):
+                    print "first_error:", f.value.first_error
+                return f
+            d.addErrback(_explain_error)
+            return d
+        d.addCallback(_got_node)
         return d
 
     def test_bad_server_overlap(self):
@@ -1975,20 +1933,21 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
         # Break one server, then create the file: the initial publish should
         # complete with an alternate server. Breaking a second server should
         # not prevent an update from succeeding either.
-        basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
-        self.client = LessFakeClient(basedir, 10)
-        sb = self.client.get_storage_broker()
+        self.basedir = "mutable/Problems/test_bad_server_overlap"
+        self.set_up_grid()
+        nm = self.g.clients[0].nodemaker
+        sb = nm.storage_broker
 
-        peerids = list(sb.get_all_serverids())
-        self.client.debug_break_connection(peerids[0])
+        peerids = [serverid for (serverid,ss) in sb.get_all_servers()]
+        self.g.break_server(peerids[0])
 
-        d = self.client.create_mutable_file("contents 1")
+        d = nm.create_mutable_file("contents 1")
         def _created(n):
             d = n.download_best_version()
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
             # now break one of the remaining servers
             def _break_second_server(res):
-                self.client.debug_break_connection(peerids[1])
+                self.g.break_server(peerids[1])
             d.addCallback(_break_second_server)
             d.addCallback(lambda res: n.overwrite("contents 2"))
             # that ought to work too
@@ -2000,25 +1959,28 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
 
     def test_publish_all_servers_bad(self):
         # Break all servers: the publish should fail
-        basedir = os.path.join("mutable/CollidingWrites/publish_all_servers_bad")
-        self.client = LessFakeClient(basedir, 20)
-        sb = self.client.get_storage_broker()
-        for peerid in sb.get_all_serverids():
-            self.client.debug_break_connection(peerid)
+        self.basedir = "mutable/Problems/test_publish_all_servers_bad"
+        self.set_up_grid()
+        nm = self.g.clients[0].nodemaker
+        for (serverid,ss) in nm.storage_broker.get_all_servers():
+            ss.broken = True
+
         d = self.shouldFail(NotEnoughServersError,
                             "test_publish_all_servers_bad",
                             "Ran out of non-bad servers",
-                            self.client.create_mutable_file, "contents")
+                            nm.create_mutable_file, "contents")
         return d
 
     def test_publish_no_servers(self):
         # no servers at all: the publish should fail
-        basedir = os.path.join("mutable/CollidingWrites/publish_no_servers")
-        self.client = LessFakeClient(basedir, 0)
+        self.basedir = "mutable/Problems/test_publish_no_servers"
+        self.set_up_grid(num_servers=0)
+        nm = self.g.clients[0].nodemaker
+
         d = self.shouldFail(NotEnoughServersError,
                             "test_publish_no_servers",
                             "Ran out of non-bad servers",
-                            self.client.create_mutable_file, "contents")
+                            nm.create_mutable_file, "contents")
         return d
     test_publish_no_servers.timeout = 30
 
@@ -2026,68 +1988,61 @@ class Problems(unittest.TestCase, testutil.ShouldFailMixin):
     def test_privkey_query_error(self):
         # when a servermap is updated with MODE_WRITE, it tries to get the
         # privkey. Something might go wrong during this query attempt.
-        self.client = FakeClient(20)
+        # Exercise the code in _privkey_query_failed which tries to handle
+        # such an error.
+        self.basedir = "mutable/Problems/test_privkey_query_error"
+        self.set_up_grid(num_servers=20)
+        nm = self.g.clients[0].nodemaker
+        nm._node_cache = DevNullDictionary() # disable the nodecache
+
         # we need some contents that are large enough to push the privkey out
         # of the early part of the file
-        LARGE = "These are Larger contents" * 200 # about 5KB
-        d = self.client.create_mutable_file(LARGE)
+        LARGE = "These are Larger contents" * 2000 # about 50KB
+        d = nm.create_mutable_file(LARGE)
         def _created(n):
             self.uri = n.get_uri()
-            self.n2 = self.client.create_node_from_uri(self.uri)
-            # we start by doing a map update to figure out which is the first
-            # server.
-            return n.get_servermap(MODE_WRITE)
+            self.n2 = nm.create_from_cap(self.uri)
+
+            # When a mapupdate is performed on a node that doesn't yet know
+            # the privkey, a short read is sent to a batch of servers, to get
+            # the verinfo and (hopefully, if the file is short enough) the
+            # encprivkey. Our file is too large to let this first read
+            # contain the encprivkey. Each non-encprivkey-bearing response
+            # that arrives (until the node gets the encprivkey) will trigger
+            # a second read to specifically read the encprivkey.
+            #
+            # So, to exercise this case:
+            #  1. notice which server gets a read() call first
+            #  2. tell that server to start throwing errors
+            killer = FirstServerGetsKilled()
+            for (serverid,ss) in nm.storage_broker.get_all_servers():
+                ss.post_call_notifier = killer.notify
         d.addCallback(_created)
-        d.addCallback(lambda res: fireEventually(res))
-        def _got_smap1(smap):
-            peer0 = list(smap.make_sharemap()[0])[0]
-            # we tell the server to respond to this peer first, so that it
-            # will be asked for the privkey first
-            self.client._storage._sequence = [peer0]
-            # now we make the peer fail their second query
-            self.client._storage._special_answers[peer0] = ["normal", "fail"]
-        d.addCallback(_got_smap1)
+
         # now we update a servermap from a new node (which doesn't have the
-        # privkey yet, forcing it to use a separate privkey query). Each
-        # query response will trigger a privkey query, and since we're using
-        # _sequence to make the peer0 response come back first, we'll send it
-        # a privkey query first, and _sequence will again ensure that the
-        # peer0 query will also come back before the others, and then
-        # _special_answers will make sure that the query raises an exception.
-        # The whole point of these hijinks is to exercise the code in
-        # _privkey_query_failed. Note that the map-update will succeed, since
-        # we'll just get a copy from one of the other shares.
+        # privkey yet, forcing it to use a separate privkey query). Note that
+        # the map-update will succeed, since we'll just get a copy from one
+        # of the other shares.
         d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
-        # Using FakeStorage._sequence means there will be read requests still
-        # floating around.. wait for them to retire
-        def _cancel_timer(res):
-            if self.client._storage._pending_timer:
-                self.client._storage._pending_timer.cancel()
-            return res
-        d.addBoth(_cancel_timer)
+
         return d
 
     def test_privkey_query_missing(self):
         # like test_privkey_query_error, but the shares are deleted by the
         # second query, instead of raising an exception.
-        self.client = FakeClient(20)
-        LARGE = "These are Larger contents" * 200 # about 5KB
-        d = self.client.create_mutable_file(LARGE)
+        self.basedir = "mutable/Problems/test_privkey_query_missing"
+        self.set_up_grid(num_servers=20)
+        nm = self.g.clients[0].nodemaker
+        LARGE = "These are Larger contents" * 2000 # about 50KB
+        nm._node_cache = DevNullDictionary() # disable the nodecache
+
+        d = nm.create_mutable_file(LARGE)
         def _created(n):
             self.uri = n.get_uri()
-            self.n2 = self.client.create_node_from_uri(self.uri)
-            return n.get_servermap(MODE_WRITE)
+            self.n2 = nm.create_from_cap(self.uri)
+            deleter = FirstServerGetsDeleted()
+            for (serverid,ss) in nm.storage_broker.get_all_servers():
+                ss.post_call_notifier = deleter.notify
         d.addCallback(_created)
-        d.addCallback(lambda res: fireEventually(res))
-        def _got_smap1(smap):
-            peer0 = list(smap.make_sharemap()[0])[0]
-            self.client._storage._sequence = [peer0]
-            self.client._storage._special_answers[peer0] = ["normal", "none"]
-        d.addCallback(_got_smap1)
         d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
-        def _cancel_timer(res):
-            if self.client._storage._pending_timer:
-                self.client._storage._pending_timer.cancel()
-            return res
-        d.addBoth(_cancel_timer)
         return d