]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
storage: rewrite slot API, now use testv_and_readv_and_writev or readv
authorBrian Warner <warner@allmydata.com>
Tue, 6 Nov 2007 03:17:14 +0000 (20:17 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 6 Nov 2007 03:17:14 +0000 (20:17 -0700)
src/allmydata/interfaces.py
src/allmydata/storage.py
src/allmydata/test/test_storage.py

index b4c66787e77b55f5f130ae0f229bbe4e69326b10..51999b63742f4a4db5dfef452d89d37391b51eb4 100644 (file)
@@ -84,49 +84,19 @@ class RIBucketReader(RemoteInterface):
 
 TestVector = ListOf(TupleOf(int, int, str, str))
 # elements are (offset, length, operator, specimen)
-# operator is one of "lt, le, eq, ne, ge, gt, nop"
+# operator is one of "lt, le, eq, ne, ge, gt"
 # nop always passes and is used to fetch data while writing.
 # you should use length==len(specimen) for everything except nop
 DataVector = ListOf(TupleOf(int, ShareData))
 # (offset, data). This limits us to 30 writes of 1MiB each per call
+TestAndWriteVectorsForShares = DictOf(int,
+                                      TupleOf(TestVector,
+                                              DataVector,
+                                              ChoiceOf(None, int))) # new_length
 ReadVector = ListOf(TupleOf(int, int))
-TestResults = ListOf(str)
+ReadData = ListOf(ShareData)
 # returns data[offset:offset+length] for each element of TestVector
 
-class RIMutableSlot(RemoteInterface):
-    def testv_and_writev(write_enabler=Hash,
-                         testv=TestVector,
-                         datav=DataVector,
-                         new_length=ChoiceOf(None, int)):
-        """General-purpose test-and-set operation for mutable slots. Perform
-        the given comparisons. If they all pass, then apply the write vector.
-
-        If new_length is not None, use it to set the size of the container.
-        This can be used to pre-allocate space for a series of upcoming
-        writes, or truncate existing data. If the container is growing,
-        new_length will be applied before datav. If the container is
-        shrinking, it will be applied afterwards.
-
-        Return the old data that was used for the comparisons.
-
-        The boolean return value is True if the write vector was applied,
-        false if not.
-
-        If the write_enabler is wrong, this will raise BadWriteEnablerError.
-        To enable share migration, the exception will have the nodeid used
-        for the old write enabler embedded in it, in the following string::
-
-         The write enabler was recorded by nodeid '%s'.
-
-        """
-        return TupleOf(bool, TestResults)
-
-    def read(offset=int, length=int):
-        return ShareData
-
-    def get_length():
-        return int
-
 class RIStorageServer(RemoteInterface):
     def allocate_buckets(storage_index=StorageIndex,
                          renew_secret=LeaseRenewSecret,
@@ -170,13 +140,27 @@ class RIStorageServer(RemoteInterface):
         return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
 
 
-    def allocate_mutable_slot(storage_index=StorageIndex,
-                              write_enabler=Hash,
-                              renew_secret=LeaseRenewSecret,
-                              cancel_secret=LeaseCancelSecret,
-                              sharenums=SetOf(int, maxLength=MAX_BUCKETS),
-                              allocated_size=int):
-        """
+
+    def slot_readv(storage_index=StorageIndex,
+                   shares=ListOf(int), readv=ReadVector):
+        """Read a vector from the numbered shares associated with the given
+        storage index. An empty shares list means to return data from all
+        known shares. Returns a dictionary with one key per share."""
+        return DictOf(int, DataVector) # shnum -> results
+
+    def slot_testv_and_readv_and_writev(storage_index=StorageIndex,
+                                        secrets=TupleOf(Hash, Hash, Hash),
+                                        tw_vectors=TestAndWriteVectorsForShares,
+                                        r_vector=ReadVector,
+                                        ):
+        """General-purpose test-and-set operation for mutable slots. Perform
+        a bunch of comparisons against the existing shares. If they all pass,
+        then apply a bunch of write vectors to those shares. Then use the
+        read vectors to extract data from all the shares and return the data.
+
+        This method is, um, large. The goal is to allow clients to update all
+        the shares associated with a mutable file in a single round trip.
+
         @param storage_index: the index of the bucket to be created or
                               increfed.
         @param write_enabler: a secret that is stored along with the slot.
@@ -188,32 +172,51 @@ class RIStorageServer(RemoteInterface):
                              stored for later comparison by the server. Each
                              server is given a different secret.
         @param cancel_secret: Like renew_secret, but protects bucket decref.
-        @param sharenums: these are the share numbers (probably between 0 and
-                          99) that the sender is proposing to store on this
-                          server.
-        @param allocated_size: all shares will pre-allocate this many bytes.
-                               Use this to a) confirm that you can claim as
-                               much space as you want before you actually
-                               send the data, and b) reduce the disk-IO cost
-                               of doing incremental writes.
 
-        @return: dict mapping sharenum to slot. The return value may include
-                 more sharenums than asked, if some shares already existed.
-                 New leases are added for all
-                 shares.
+        The 'secrets' argument is a tuple of (write_enabler, renew_secret,
+        cancel_secret). The first is required to perform any write. The
+        latter two are used when allocating new shares. To simply acquire a
+        new lease on existing shares, use an empty testv and an empty writev.
+
+        Each share can have a separate test vector (i.e. a list of
+        comparisons to perform). If all vectors for all shares pass, then all
+        writes for all shares are recorded. Each comparison is a 4-tuple of
+        (offset, length, operator, specimen), which effectively does a
+        read(offset, length) and then compares the result against the
+        specimen using the given equality/inequality operator. Reads from the
+        end of the container are truncated, and missing shares behave like
+        empty ones, so to assert that a share doesn't exist (for use when
+        creating a new share), use (0, 1, 'eq', '').
+
+        The write vector will be applied to the given share, expanding it if
+        necessary. A write vector applied to a share number that did not
+        exist previously will cause that share to be created.
+
+        Each write vector is accompanied by a 'new_length' argument. If
+        new_length is not None, use it to set the size of the container. This
+        can be used to pre-allocate space for a series of upcoming writes, or
+        truncate existing data. If the container is growing, new_length will
+        be applied before datav. If the container is shrinking, it will be
+        applied afterwards.
+
+        The read vector is used to extract data from all known shares,
+        *before* any writes have been applied. The same vector is used for
+        all shares. This captures the state that was tested by the test
+        vector.
+
+        This method returns two values: a boolean and a dict. The boolean is
+        True if the write vectors were applied, False if not. The dict is
+        keyed by share number, and each value contains a list of strings, one
+        for each element of the read vector.
 
-        """
-        return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS)
+        If the write_enabler is wrong, this will raise BadWriteEnablerError.
+        To enable share migration, the exception will have the nodeid used
+        for the old write enabler embedded in it, in the following string::
 
-    def get_mutable_slot(storage_index=StorageIndex):
-        """This returns an empty dictionary if the server has no shares
-        of the slot mentioned."""
-        return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS)
+         The write enabler was recorded by nodeid '%s'.
 
-    def readv_slots(storage_index=StorageIndex, readv=ReadVector):
-        """Read a vector from all shares associated with the given storage
-        index. Returns a dictionary with one key per share."""
-        return DictOf(int, DataVector) # shnum -> results
+        """
+        return TupleOf(bool, DictOf(int, ReadData))
 
 class IStorageBucketWriter(Interface):
     def put_block(segmentnum=int, data=ShareData):
index 8f05b302fe892332e23487c382e0a78f469629a8..bd9949caa5d11566e13e25e899634f6c2eb0d6e1 100644 (file)
@@ -8,7 +8,7 @@ from twisted.internet import defer
 from zope.interface import implements
 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
      RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
-     BadWriteEnablerError, RIMutableSlot
+     BadWriteEnablerError
 from allmydata.util import fileutil, idlib, mathutil
 from allmydata.util.assertutil import precondition, _assert
 
@@ -240,13 +240,7 @@ class BucketReader(Referenceable):
 assert struct.calcsize("L"), 4
 assert struct.calcsize("Q"), 8
 
-class MutableShareFile(Referenceable):
-    # note: at any given time, there should only be a single instance of this
-    # class per filename. More than one is likely to corrupt the container,
-    # because of state we cache in instance variables. This requires the
-    # StorageServer to use a WeakValueDictionary, indexed by filename. This
-    # could be improved by cacheing less and doing more IO.
-    implements(RIMutableSlot)
+class MutableShareFile:
 
     DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s")
     EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
@@ -548,12 +542,6 @@ class MutableShareFile(Referenceable):
         assert magic == self.MAGIC
         return (write_enabler, write_enabler_nodeid)
 
-    def remote_read(self, offset, length):
-        f = open(self.home, 'rb')
-        data = self._read_share_data(f, offset, length)
-        f.close()
-        return data
-
     def readv(self, readv):
         datav = []
         f = open(self.home, 'rb')
@@ -562,36 +550,37 @@ class MutableShareFile(Referenceable):
         f.close()
         return datav
 
-    def remote_get_length(self):
-        f = open(self.home, 'rb')
-        data_length = self._read_data_length(f)
-        f.close()
-        return data_length
+#    def remote_get_length(self):
+#        f = open(self.home, 'rb')
+#        data_length = self._read_data_length(f)
+#        f.close()
+#        return data_length
 
-    def remote_testv_and_writev(self, write_enabler, testv, datav, new_length):
+    def check_write_enabler(self, write_enabler):
         f = open(self.home, 'rb+')
         (real_write_enabler, write_enabler_nodeid) = \
                              self._read_write_enabler_and_nodeid(f)
+        f.close()
         if write_enabler != real_write_enabler:
             # accomodate share migration by reporting the nodeid used for the
             # old write enabler.
-            f.close()
             msg = "The write enabler was recorded by nodeid '%s'." % \
                   (idlib.b2a(write_enabler_nodeid),)
             raise BadWriteEnablerError(msg)
 
-        # check testv
-        test_results_v = []
-        test_failed = False
+    def check_testv(self, testv):
+        test_good = True
+        f = open(self.home, 'rb+')
         for (offset, length, operator, specimen) in testv:
             data = self._read_share_data(f, offset, length)
-            test_results_v.append(data)
             if not self.compare(data, operator, specimen):
-                test_failed = True
-        if test_failed:
-            f.close()
-            return (False, test_results_v)
-        # now apply the write vector
+                test_good = False
+                break
+        f.close()
+        return test_good
+
+    def writev(self, datav, new_length):
+        f = open(self.home, 'rb+')
         for (offset, data) in datav:
             self._write_share_data(f, offset, data)
         if new_length is not None:
@@ -599,12 +588,36 @@ class MutableShareFile(Referenceable):
             f.seek(self.DATA_LENGTH_OFFSET)
             f.write(struct.pack(">Q", new_length))
         f.close()
-        return (True, test_results_v)
 
     def compare(self, a, op, b):
-        assert op in ("nop", "lt", "le", "eq", "ne", "ge", "gt")
-        if op == "nop":
-            return True
+        assert op in ("lt", "le", "eq", "ne", "ge", "gt")
+        if op == "lt":
+            return a < b
+        if op == "le":
+            return a <= b
+        if op == "eq":
+            return a == b
+        if op == "ne":
+            return a != b
+        if op == "ge":
+            return a >= b
+        if op == "gt":
+            return a > b
+        # never reached
+
+class EmptyShare:
+
+    def check_testv(self, testv):
+        test_good = True
+        for (offset, length, operator, specimen) in testv:
+            data = ""
+            if not self.compare(data, operator, specimen):
+                test_good = False
+                break
+        return test_good
+
+    def compare(self, a, op, b):
+        assert op in ("lt", "le", "eq", "ne", "ge", "gt")
         if op == "lt":
             return a < b
         if op == "le":
@@ -842,51 +855,83 @@ class StorageServer(service.MultiService, Referenceable):
         except StopIteration:
             return iter([])
 
-    def remote_allocate_mutable_slot(self, storage_index,
-                                     write_enabler,
-                                     renew_secret, cancel_secret,
-                                     sharenums,
-                                     allocated_size):
-        my_nodeid = self.my_nodeid
-        sharenums = set(sharenums)
-        shares = self.remote_get_mutable_slot(storage_index)
-        existing_shnums = set(shares.keys())
-        si_s = idlib.b2a(storage_index)
-        bucketdir = os.path.join(self.sharedir, si_s)
-        fileutil.make_dirs(bucketdir)
-        for shnum in (sharenums - existing_shnums):
-            filename = os.path.join(bucketdir, "%d" % shnum)
-            shares[shnum] = create_mutable_sharefile(filename, my_nodeid,
-                                                     write_enabler)
-
-        # update the lease on everything
-        ownerid = 1 # TODO
-        expire_time = time.time() + 31*24*60*60   # one month
-        anid = my_nodeid
-        lease_info = (ownerid, expire_time, renew_secret, cancel_secret, anid)
-        for share in shares.values():
-            share.add_or_renew_lease(lease_info)
-        return shares
-
-    def remote_get_mutable_slot(self, storage_index):
-        """This returns an empty dictionary if the server has no shares
-        of the slot mentioned."""
+    def remote_slot_testv_and_readv_and_writev(self, storage_index,
+                                               secrets,
+                                               test_and_write_vectors,
+                                               read_vector):
         si_s = idlib.b2a(storage_index)
+        (write_enabler, renew_secret, cancel_secret) = secrets
         # shares exist if there is a file for them
         bucketdir = os.path.join(self.sharedir, si_s)
-        if not os.path.isdir(bucketdir):
-            return {}
-        slots = {}
-        for sharenum_s in os.listdir(bucketdir):
-            try:
-                sharenum = int(sharenum_s)
-            except ValueError:
-                continue
-            filename = os.path.join(bucketdir, sharenum_s)
-            slots[sharenum] = MutableShareFile(filename)
-        return slots
+        shares = {}
+        if os.path.isdir(bucketdir):
+            for sharenum_s in os.listdir(bucketdir):
+                try:
+                    sharenum = int(sharenum_s)
+                except ValueError:
+                    continue
+                filename = os.path.join(bucketdir, sharenum_s)
+                msf = MutableShareFile(filename)
+                msf.check_write_enabler(write_enabler)
+                shares[sharenum] = msf
+        # write_enabler is good for all existing shares.
+
+        # Now evaluate test vectors.
+        testv_is_good = True
+        for sharenum in test_and_write_vectors:
+            (testv, datav, new_length) = test_and_write_vectors[sharenum]
+            if sharenum in shares:
+                if not shares[sharenum].check_testv(testv):
+                    testv_is_good = False
+                    break
+            else:
+                # compare the vectors against an empty share, in which all
+                # reads return empty strings.
+                if not EmptyShare().check_testv(testv):
+                    testv_is_good = False
+                    break
+
+        # now gather the read vectors, before we do any writes
+        read_data = {}
+        for sharenum, share in shares.items():
+            read_data[sharenum] = share.readv(read_vector)
+
+        if testv_is_good:
+            # now apply the write vectors
+            for sharenum in test_and_write_vectors:
+                (testv, datav, new_length) = test_and_write_vectors[sharenum]
+                if sharenum not in shares:
+                    # allocate a new share
+                    allocated_size = 2000 # arbitrary, really
+                    share = self._allocate_slot_share(bucketdir, secrets,
+                                                      sharenum,
+                                                      allocated_size,
+                                                      owner_num=0)
+                    shares[sharenum] = share
+                shares[sharenum].writev(datav, new_length)
+            # and update the leases on all shares
+            ownerid = 1 # TODO
+            expire_time = time.time() + 31*24*60*60   # one month
+            my_nodeid = self.my_nodeid
+            anid = my_nodeid
+            lease_info = (ownerid, expire_time, renew_secret, cancel_secret,
+                          anid)
+            for share in shares.values():
+                share.add_or_renew_lease(lease_info)
+
+        # all done
+        return (testv_is_good, read_data)
+
+    def _allocate_slot_share(self, bucketdir, secrets, sharenum,
+                             allocated_size, owner_num=0):
+        (write_enabler, renew_secret, cancel_secret) = secrets
+        my_nodeid = self.my_nodeid
+        fileutil.make_dirs(bucketdir)
+        filename = os.path.join(bucketdir, "%d" % sharenum)
+        share = create_mutable_sharefile(filename, my_nodeid, write_enabler)
+        return share
 
-    def remote_readv_slots(self, storage_index, readv):
+    def remote_slot_readv(self, storage_index, shares, readv):
         si_s = idlib.b2a(storage_index)
         # shares exist if there is a file for them
         bucketdir = os.path.join(self.sharedir, si_s)
@@ -898,9 +943,10 @@ class StorageServer(service.MultiService, Referenceable):
                 sharenum = int(sharenum_s)
             except ValueError:
                 continue
-            filename = os.path.join(bucketdir, sharenum_s)
-            msf = MutableShareFile(filename)
-            datavs[sharenum] = msf.readv(readv)
+            if sharenum in shares or not shares:
+                filename = os.path.join(bucketdir, sharenum_s)
+                msf = MutableShareFile(filename)
+                datavs[sharenum] = msf.readv(readv)
         return datavs
 
 
index 4716a389506e0c5ac74173b5738b5cc9ff01a9fe..6e73439078d7ac781ec68b4f2ba46f76afcaba3a 100644 (file)
@@ -9,7 +9,7 @@ import itertools
 from allmydata import interfaces
 from allmydata.util import fileutil, hashutil, idlib
 from allmydata.storage import BucketWriter, BucketReader, \
-     WriteBucketProxy, ReadBucketProxy, StorageServer
+     WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile
 from allmydata.interfaces import BadWriteEnablerError
 
 class Bucket(unittest.TestCase):
@@ -476,228 +476,275 @@ class MutableServer(unittest.TestCase):
         write_enabler = self.write_enabler(we_tag)
         renew_secret = self.renew_secret(lease_tag)
         cancel_secret = self.cancel_secret(lease_tag)
-        return ss.remote_allocate_mutable_slot(storage_index,
-                                               write_enabler,
-                                               renew_secret, cancel_secret,
-                                               sharenums, size)
+        rstaraw = ss.remote_slot_testv_and_readv_and_writev
+        testandwritev = dict( [ (shnum, ([], [], None) )
+                         for shnum in sharenums ] )
+        readv = []
+        rc = rstaraw(storage_index,
+                     (write_enabler, renew_secret, cancel_secret),
+                     testandwritev,
+                     readv)
+        (did_write, readv_data) = rc
+        self.failUnless(did_write)
+        self.failUnless(isinstance(readv_data, dict))
+        self.failUnlessEqual(len(readv_data), 0)
 
     def test_allocate(self):
         ss = self.create("test_allocate")
-        shares = self.allocate(ss, "si1", "we1", self._secret.next(),
+        self.allocate(ss, "si1", "we1", self._secret.next(),
                                set([0,1,2]), 100)
-        self.failUnlessEqual(len(shares), 3)
-        self.failUnlessEqual(set(shares.keys()), set([0,1,2]))
-        shares2 = ss.remote_get_mutable_slot("si1")
-        self.failUnlessEqual(len(shares2), 3)
-        self.failUnlessEqual(set(shares2.keys()), set([0,1,2]))
-
-        s0 = shares[0]
-        self.failUnlessEqual(s0.remote_read(0, 10), "")
-        self.failUnlessEqual(s0.remote_read(100, 10), "")
-        # try writing to one
-        WE = self.write_enabler("we1")
-        data = "".join([ ("%d" % i) * 10 for i in range(10) ])
-        answer = s0.remote_testv_and_writev(WE,
-                                            [],
-                                            [(0, data),],
-                                            new_length=None)
-        self.failUnlessEqual(answer, (True, []))
 
-        self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111")
-        self.failUnlessEqual(s0.remote_read(95, 10), "99999")
-        self.failUnlessEqual(s0.remote_get_length(), 100)
+        read = ss.remote_slot_readv
+        self.failUnlessEqual(read("si1", [0], [(0, 10)]),
+                             {0: [""]})
+        self.failUnlessEqual(read("si1", [], [(0, 10)]),
+                             {0: [""], 1: [""], 2: [""]})
+        self.failUnlessEqual(read("si1", [0], [(100, 10)]),
+                             {0: [""]})
 
+        # try writing to one
+        secrets = ( self.write_enabler("we1"),
+                    self.renew_secret("we1"),
+                    self.cancel_secret("we1") )
+        data = "".join([ ("%d" % i) * 10 for i in range(10) ])
+        write = ss.remote_slot_testv_and_readv_and_writev
+        answer = write("si1", secrets,
+                       {0: ([], [(0,data)], None)},
+                       [])
+        self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
+
+        self.failUnlessEqual(read("si1", [0], [(0,20)]),
+                             {0: ["00000000001111111111"]})
+        self.failUnlessEqual(read("si1", [0], [(95,10)]),
+                             {0: ["99999"]})
+        #self.failUnlessEqual(s0.remote_get_length(), 100)
+
+        bad_secrets = ("bad write enabler", secrets[1], secrets[2])
         self.failUnlessRaises(BadWriteEnablerError,
-                              s0.remote_testv_and_writev,
-                              "bad write enabler",
-                              [], [], None)
+                              write, "si1", bad_secrets,
+                              {}, [])
+
         # this testv should fail
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(0, 12, "eq", "444444444444"),
-                                             (20, 5, "eq", "22222"),
-                                             ],
-                                            [(0, "x"*100)], None)
-        self.failUnlessEqual(answer, (False, ["000000000011",
-                                              "22222"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), data)
+        answer = write("si1", secrets,
+                       {0: ([(0, 12, "eq", "444444444444"),
+                             (20, 5, "eq", "22222"),
+                             ],
+                            [(0, "x"*100)],
+                            None),
+                        },
+                       [(0,12), (20,5)],
+                       )
+        self.failUnlessEqual(answer, (False,
+                                      {0: ["000000000011", "22222"],
+                                       1: ["", ""],
+                                       2: ["", ""],
+                                       }))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
 
         # as should this one
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "lt", "11111"),
-                                             ],
-                                            [(0, "x"*100)], None)
-        self.failUnlessEqual(answer, (False, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), data)
+        answer = write("si1", secrets,
+                       {0: ([(10, 5, "lt", "11111"),
+                             ],
+                            [(0, "x"*100)],
+                            None),
+                        },
+                       [(10,5)],
+                       )
+        self.failUnlessEqual(answer, (False,
+                                      {0: ["11111"],
+                                       1: [""],
+                                       2: [""]},
+                                      ))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
 
 
     def test_operators(self):
         # test operators, the data we're comparing is '11111' in all cases.
         # test both fail+pass, reset data after each one.
         ss = self.create("test_operators")
-        shares = self.allocate(ss, "si1", "we1", self._secret.next(),
-                               set([0,1,2]), 100)
-        s0 = shares[0]
-        WE = self.write_enabler("we1")
+
+        secrets = ( self.write_enabler("we1"),
+                    self.renew_secret("we1"),
+                    self.cancel_secret("we1") )
         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
-        answer = s0.remote_testv_and_writev(WE,
-                                            [],
-                                            [(0, data),],
-                                            new_length=None)
-
-        #  nop
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "nop", "11111"),
-                                             ],
-                                            [(0, "x"*100)], None)
-        self.failUnlessEqual(answer, (True, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), "x"*100)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+        write = ss.remote_slot_testv_and_readv_and_writev
+        read = ss.remote_slot_readv
+
+        def reset():
+            write("si1", secrets,
+                  {0: ([], [(0,data)], None)},
+                  [])
+
+        reset()
 
         #  lt
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "lt", "11110"),
+        answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
                                              ],
-                                            [(0, "x"*100)], None)
-        self.failUnlessEqual(answer, (False, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), data)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "lt", "11111"),
+                                            [(0, "x"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+        self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
+        reset()
+
+        answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
                                              ],
-                                            [(0, "x"*100)], None)
-        self.failUnlessEqual(answer, (False, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), data)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "lt", "11112"),
+                                            [(0, "x"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+        reset()
+
+        answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
                                              ],
-                                            [(0, "y"*100)], None)
-        self.failUnlessEqual(answer, (True, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+                                            [(0, "y"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+        reset()
 
         #  le
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "le", "11110"),
+        answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
                                              ],
-                                            [(0, "x"*100)], None)
-        self.failUnlessEqual(answer, (False, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), data)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "le", "11111"),
+                                            [(0, "x"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+        reset()
+
+        answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
                                              ],
-                                            [(0, "y"*100)], None)
-        self.failUnlessEqual(answer, (True, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "le", "11112"),
+                                            [(0, "y"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+        reset()
+
+        answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
                                              ],
-                                            [(0, "y"*100)], None)
-        self.failUnlessEqual(answer, (True, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+                                            [(0, "y"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+        reset()
 
         #  eq
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "eq", "11112"),
+        answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
                                              ],
-                                            [(0, "x"*100)], None)
-        self.failUnlessEqual(answer, (False, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), data)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "eq", "11111"),
+                                            [(0, "x"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+        reset()
+
+        answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
                                              ],
-                                            [(0, "y"*100)], None)
-        self.failUnlessEqual(answer, (True, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+                                            [(0, "y"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+        reset()
 
         #  ne
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "ne", "11111"),
+        answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
                                              ],
-                                            [(0, "x"*100)], None)
-        self.failUnlessEqual(answer, (False, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), data)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "ne", "11112"),
+                                            [(0, "x"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+        reset()
+
+        answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
                                              ],
-                                            [(0, "y"*100)], None)
-        self.failUnlessEqual(answer, (True, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+                                            [(0, "y"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+        reset()
 
         #  ge
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "ge", "11110"),
+        answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
                                              ],
-                                            [(0, "y"*100)], None)
-        self.failUnlessEqual(answer, (True, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "ge", "11111"),
+                                            [(0, "y"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+        reset()
+
+        answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
                                              ],
-                                            [(0, "y"*100)], None)
-        self.failUnlessEqual(answer, (True, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "ge", "11112"),
+                                            [(0, "y"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+        reset()
+
+        answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
                                              ],
-                                            [(0, "y"*100)], None)
-        self.failUnlessEqual(answer, (False, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), data)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+                                            [(0, "y"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+        reset()
 
         #  gt
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "gt", "11110"),
+        answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
                                              ],
-                                            [(0, "y"*100)], None)
-        self.failUnlessEqual(answer, (True, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "gt", "11111"),
+                                            [(0, "y"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+        reset()
+
+        answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
                                              ],
-                                            [(0, "x"*100)], None)
-        self.failUnlessEqual(answer, (False, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), data)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
-        answer = s0.remote_testv_and_writev(WE,
-                                            [(10, 5, "gt", "11112"),
+                                            [(0, "x"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+        reset()
+
+        answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
                                              ],
-                                            [(0, "x"*100)], None)
-        self.failUnlessEqual(answer, (False, ["11111"]))
-        self.failUnlessEqual(s0.remote_read(0, 100), data)
-        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+                                            [(0, "x"*100)],
+                                            None,
+                                            )}, [(10,5)])
+        self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+        self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+        reset()
 
     def test_readv(self):
-        ss = self.create("test_allocate")
-        shares = self.allocate(ss, "si1", "we1", self._secret.next(),
-                               set([0,1,2]), 100)
-        WE = self.write_enabler("we1")
+        ss = self.create("test_readv")
+        secrets = ( self.write_enabler("we1"),
+                    self.renew_secret("we1"),
+                    self.cancel_secret("we1") )
+        data = "".join([ ("%d" % i) * 10 for i in range(10) ])
+        write = ss.remote_slot_testv_and_readv_and_writev
+        read = ss.remote_slot_readv
         data = [("%d" % i) * 100 for i in range(3)]
-        for i in range(3):
-            rc = shares[i].remote_testv_and_writev(WE, [], [(0, data[i])],
-                                                   new_length=None)
-            self.failUnlessEqual(rc, (True, []))
-        answer = ss.remote_readv_slots("si1", [(0, 10)])
+        rc = write("si1", secrets,
+                   {0: ([], [(0,data[0])], None),
+                    1: ([], [(0,data[1])], None),
+                    2: ([], [(0,data[2])], None),
+                    }, [])
+        self.failUnlessEqual(rc, (True, {}))
+
+        answer = read("si1", [], [(0, 10)])
         self.failUnlessEqual(answer, {0: ["0"*10],
                                       1: ["1"*10],
                                       2: ["2"*10]})
@@ -716,15 +763,15 @@ class MutableServer(unittest.TestCase):
 
     def test_leases(self):
         ss = self.create("test_leases")
-        secret = 14
-        shares = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100)
-        s0 = shares[0]
-        WE = self.write_enabler("we1")
+        def secrets(n):
+            return ( self.write_enabler("we1"),
+                     self.renew_secret("we1-%d" % n),
+                     self.cancel_secret("we1-%d" % n) )
         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
-        answer = s0.remote_testv_and_writev(WE,
-                                            [],
-                                            [(0, data),],
-                                            new_length=None)
+        write = ss.remote_slot_testv_and_readv_and_writev
+        read = ss.remote_slot_readv
+        rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
+        self.failUnlessEqual(rc, (True, {}))
 
         # create a random non-numeric file in the bucket directory, to
         # exercise the code that's supposed to ignore those.
@@ -736,40 +783,41 @@ class MutableServer(unittest.TestCase):
 
         # re-allocate the slots and use the same secrets, that should update
         # the lease
-        shares2 = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100)
+        write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
 
         # renew it directly
-        ss.remote_renew_lease("si1", self.renew_secret(secret))
+        ss.remote_renew_lease("si1", secrets(0)[1])
 
         # now allocate them with a bunch of different secrets, to trigger the
         # extended lease code
-        shares2 = self.allocate(ss, "si1", "we1", secret+1, set([0,1,2]), 100)
-        shares2 = self.allocate(ss, "si1", "we1", secret+2, set([0,1,2]), 100)
-        shares2 = self.allocate(ss, "si1", "we1", secret+3, set([0,1,2]), 100)
-        shares2 = self.allocate(ss, "si1", "we1", secret+4, set([0,1,2]), 100)
-        shares2 = self.allocate(ss, "si1", "we1", secret+5, set([0,1,2]), 100)
+        write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
+        write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
+        write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
+        write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
+        write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
+
         # cancel one of them
-        ss.remote_cancel_lease("si1", self.cancel_secret(secret+5))
+        ss.remote_cancel_lease("si1", secrets(5)[2])
 
+        s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
         all_leases = s0.debug_get_leases()
         self.failUnlessEqual(len(all_leases), 5)
 
         # and write enough data to expand the container, forcing the server
         # to move the leases
-        answer = s0.remote_testv_and_writev(WE,
-                                            [],
-                                            [(0, data),],
-                                            new_length=200)
+        write("si1", secrets(0),
+              {0: ([], [(0,data)], 200), },
+              [])
 
         # read back the leases, make sure they're still intact.
         self.compare_leases_without_timestamps(all_leases,
                                                s0.debug_get_leases())
 
-        ss.remote_renew_lease("si1", self.renew_secret(secret))
-        ss.remote_renew_lease("si1", self.renew_secret(secret+1))
-        ss.remote_renew_lease("si1", self.renew_secret(secret+2))
-        ss.remote_renew_lease("si1", self.renew_secret(secret+3))
-        ss.remote_renew_lease("si1", self.renew_secret(secret+4))
+        ss.remote_renew_lease("si1", secrets(0)[1])
+        ss.remote_renew_lease("si1", secrets(1)[1])
+        ss.remote_renew_lease("si1", secrets(2)[1])
+        ss.remote_renew_lease("si1", secrets(3)[1])
+        ss.remote_renew_lease("si1", secrets(4)[1])
         self.compare_leases_without_timestamps(all_leases,
                                                s0.debug_get_leases())
         # get a new copy of the leases, with the current timestamps. Reading
@@ -782,40 +830,40 @@ class MutableServer(unittest.TestCase):
         # is present, to provide for share migration
         self.failUnlessRaises(IndexError,
                               ss.remote_renew_lease, "si1",
-                              self.renew_secret(secret+20))
+                              secrets(20)[1])
         # same for cancelling
         self.failUnlessRaises(IndexError,
                               ss.remote_cancel_lease, "si1",
-                              self.cancel_secret(secret+20))
+                              secrets(20)[2])
         self.failUnlessEqual(all_leases, s0.debug_get_leases())
-        s0.remote_read(0, 200)
+
+        # reading shares should not modify the timestamp
+        read("si1", [], [(0,200)])
         self.failUnlessEqual(all_leases, s0.debug_get_leases())
 
-        answer = s0.remote_testv_and_writev(WE,
-                                            [],
-                                            [(200, "make me bigger"),],
-                                            new_length=None)
+        write("si1", secrets(0),
+              {0: ([], [(200, "make me bigger")], None)}, [])
         self.compare_leases_without_timestamps(all_leases,
                                                s0.debug_get_leases())
 
-        answer = s0.remote_testv_and_writev(WE,
-                                            [],
-                                            [(500, "make me really bigger"),],
-                                            new_length=None)
+        write("si1", secrets(0),
+              {0: ([], [(500, "make me really bigger")], None)}, [])
         self.compare_leases_without_timestamps(all_leases,
                                                s0.debug_get_leases())
 
         # now cancel them all
-        ss.remote_cancel_lease("si1", self.cancel_secret(secret))
-        ss.remote_cancel_lease("si1", self.cancel_secret(secret+1))
-        ss.remote_cancel_lease("si1", self.cancel_secret(secret+2))
-        ss.remote_cancel_lease("si1", self.cancel_secret(secret+3))
+        ss.remote_cancel_lease("si1", secrets(0)[2])
+        ss.remote_cancel_lease("si1", secrets(1)[2])
+        ss.remote_cancel_lease("si1", secrets(2)[2])
+        ss.remote_cancel_lease("si1", secrets(3)[2])
+
         # the slot should still be there
-        shares3 = ss.remote_get_mutable_slot("si1")
-        self.failUnlessEqual(len(shares3), 3)
+        remaining_shares = read("si1", [], [(0,10)])
+        self.failUnlessEqual(len(remaining_shares), 1)
         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
 
-        ss.remote_cancel_lease("si1", self.cancel_secret(secret+4))
+        ss.remote_cancel_lease("si1", secrets(4)[2])
         # now the slot should be gone
-        self.failUnlessEqual(ss.remote_get_mutable_slot("si1"), {})
+        no_shares = read("si1", [], [(0,10)])
+        self.failUnlessEqual(no_shares, {})