From: Brian Warner <warner@lothar.com>
Date: Wed, 31 Oct 2007 07:10:40 +0000 (-0700)
Subject: mutable slots: finish up basic coding on server-side containers, add some tests.... 
X-Git-Tag: allmydata-tahoe-0.7.0~342
X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/css/rgr-080307.php?a=commitdiff_plain;h=68d3d620026330bc14fc27218f1c81023188c657;p=tahoe-lafs%2Ftahoe-lafs.git

mutable slots: finish up basic coding on server-side containers, add some tests. Remove all caching from MutableShareFile.
---

diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py
index 9125350a..a9dce100 100644
--- a/src/allmydata/interfaces.py
+++ b/src/allmydata/interfaces.py
@@ -111,7 +111,7 @@ class RIMutableSlot(RemoteInterface):
         The boolean return value is True if the write vector was applied,
         false if not.
 
-        If the write_enabler is wrong, this will raise BadWriterEnablerError.
+        If the write_enabler is wrong, this will raise BadWriteEnablerError.
         To enable share migration, the exception will have the nodeid used
         for the old write enabler embedded in it, in the following string::
 
diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py
index 6a15edbb..dcd11554 100644
--- a/src/allmydata/storage.py
+++ b/src/allmydata/storage.py
@@ -8,7 +8,7 @@ from twisted.internet import defer
 from zope.interface import implements
 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
      RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
-     BadWriterEnablerError, RIMutableSlot
+     BadWriteEnablerError, RIMutableSlot
 from allmydata.util import fileutil, idlib, mathutil
 from allmydata.util.assertutil import precondition, _assert
 
@@ -224,15 +224,15 @@ class BucketReader(Referenceable):
 # 1   0         32      magic verstr "tahoe mutable container v1" plus binary
 # 2   32        32      write enabler's nodeid
 # 3   64        32      write enabler
-# 4   72        8       data size (actual share data present) (a)
-# 5   80        8       offset of (8) count of extra leases (after data)
-# 6   88        416     four leases, 104 bytes each
+# 4   96        8       data size (actual share data present) (a)
+# 5   104       8       offset of (8) count of extra leases (after data)
+# 6   112       416     four leases, 104 bytes each
 #                        0    4   ownerid (0 means "no lease here")
 #                        4    4   expiration timestamp
 #                        8   32   renewal token
 #                        40  32   cancel token
 #                        72  32   nodeid which accepted the tokens
-# 7   504       (a)     data
+# 7   528       (a)     data
 # 8   ??        4       count of extra leases
 # 9   ??        n*104    extra leases
 
@@ -249,9 +249,12 @@ class MutableShareFile(Referenceable):
     implements(RIMutableSlot)
 
     DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s")
+    EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
     HEADER_SIZE = struct.calcsize(">32s32s32sQQ") # doesn't include leases
     LEASE_SIZE = struct.calcsize(">LL32s32s32s")
+    assert LEASE_SIZE == 104
     DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
+    assert DATA_OFFSET == 528, DATA_OFFSET
     # our sharefiles share with a recognizable string, plus some random
     # binary data to reduce the chance that a regular text file will look
     # like a sharefile.
@@ -262,130 +265,143 @@ class MutableShareFile(Referenceable):
 
     def __init__(self, filename):
         self.home = filename
-        self._base_lease_offset = self.HEADER_SIZE
         if os.path.exists(self.home):
+            # we don't cache anything, just check the magic
             f = open(self.home, 'rb')
-            data = f.read(88)
+            data = f.read(self.HEADER_SIZE)
             (magic,
-             self._write_enabler_nodeid, self._write_enabler,
-             self._data_length, offset) = struct.unpack(">32s32s32sQQ", data)
+             write_enabler_nodeid, write_enabler,
+             data_length, extra_least_offset) = \
+             struct.unpack(">32s32s32sQQ", data)
             assert magic == self.MAGIC
-            self._extra_lease_offset = offset # points at (8)
-            f.seek(self._extra_lease_offset)
-            data = f.read(4)
-            self._num_extra_leases = struct.unpack(">L", data)
-            f.close()
 
 
     def create(self, my_nodeid, write_enabler):
         assert not os.path.exists(self.home)
-        self._write_enabler = write_enabler
-        self._data_length = 0
-        self._extra_lease_offset = (self.HEADER_SIZE
-                                    + 4 * self.LEASE_SIZE
-                                    + self._data_length)
-        assert self._extra_lease_offset == self.DATA_OFFSET # true at creation
-        self._num_extra_leases = 0
+        data_length = 0
+        extra_lease_offset = (self.HEADER_SIZE
+                              + 4 * self.LEASE_SIZE
+                              + data_length)
+        assert extra_lease_offset == self.DATA_OFFSET # true at creation
+        num_extra_leases = 0
         f = open(self.home, 'wb')
         header = struct.pack(">32s32s32sQQ",
                              self.MAGIC, my_nodeid, write_enabler,
-                             self._data_length, self._extra_lease_offset,
+                             data_length, extra_lease_offset,
                              )
-        f.write(header)
+        leases = ("\x00"*self.LEASE_SIZE) * 4
+        f.write(header + leases)
         # data goes here, empty after creation
-        f.write(struct.pack(">L", self._num_extra_leases))
+        f.write(struct.pack(">L", num_extra_leases))
         # extra leases go here, none at creation
         f.close()
 
+    def _read_data_length(self, f):
+        f.seek(self.DATA_LENGTH_OFFSET)
+        (data_length,) = struct.unpack(">Q", f.read(8))
+        return data_length
 
-    def read_share_data(self, offset, length):
+    def _write_data_length(self, f, data_length):
+        f.seek(self.DATA_LENGTH_OFFSET)
+        f.write(struct.pack(">Q", data_length))
+
+    def _read_share_data(self, f, offset, length):
         precondition(offset >= 0)
-        if offset+length > self._data_length:
+        data_length = self._read_data_length(f)
+        if offset+length > data_length:
             # reads beyond the end of the data are truncated. Reads that
             # start beyond the end of the data return an empty string.
-            length = max(0, self._data_length-offset)
+            length = max(0, data_length-offset)
         if length == 0:
             return ""
-        precondition(offset+length <= self._data_length)
-        f = open(self.home, 'rb')
+        precondition(offset+length <= data_length)
         f.seek(self.DATA_OFFSET+offset)
-        return f.read(length)
+        data = f.read(length)
+        return data
+
+    def _read_extra_lease_offset(self, f):
+        f.seek(self.EXTRA_LEASE_OFFSET)
+        (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
+        return extra_lease_offset
+
+    def _write_extra_lease_offset(self, f, offset):
+        f.seek(self.EXTRA_LEASE_OFFSET)
+        f.write(struct.pack(">Q", offset))
 
-    def change_container_size(self, new_container_size):
+    def _read_num_extra_leases(self, f):
+        offset = self._read_extra_lease_offset(f)
+        f.seek(offset)
+        (num_extra_leases,) = struct.unpack(">L", f.read(4))
+        return num_extra_leases
+
+    def _write_num_extra_leases(self, f, num_leases):
+        extra_lease_offset = self._read_extra_lease_offset(f)
+        f.seek(extra_lease_offset)
+        f.write(struct.pack(">L", num_leases))
+
+    def _change_container_size(self, f, new_container_size):
         if new_container_size > self.MAX_SIZE:
             raise DataTooLargeError()
+        old_extra_lease_offset = self._read_extra_lease_offset(f)
         new_extra_lease_offset = self.DATA_OFFSET + new_container_size
-        if new_extra_lease_offset < self._extra_lease_offset:
-            # TODO: allow containers to shrink
+        if new_extra_lease_offset < old_extra_lease_offset:
+            # TODO: allow containers to shrink. For now they remain large.
             return
-        f = open(self.home, 'rb+')
-        f.seek(self._extra_lease_offset)
-        extra_lease_data = f.read(4 + self._num_extra_leases * self.LEASE_SIZE)
+        num_extra_leases = self._read_num_extra_leases(f)
+        f.seek(old_extra_lease_offset)
+        extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
         f.seek(new_extra_lease_offset)
         f.write(extra_lease_data)
-        self._extra_lease_offset = new_extra_lease_offset
         # an interrupt here will corrupt the leases, iff the move caused the
         # extra leases to overlap.
-        f.seek(self.DATA_LENGTH_OFFSET+8)
-        f.write(struct.pack(">Q", new_extra_lease_offset))
-        f.close()
+        self._write_extra_lease_offset(f, new_extra_lease_offset)
 
-    def write_share_data(self, offset, data):
+    def _write_share_data(self, f, offset, data):
         length = len(data)
         precondition(offset >= 0)
-        if offset+length < self._data_length:
-            # they are not expanding their data size
-            f = open(self.home, 'rb+')
-            f.seek(self.DATA_OFFSET+offset)
-            f.write(data)
-            f.close()
-            return
-        if self.DATA_OFFSET+offset+length <= self._extra_lease_offset:
-            # they are expanding their data size, but not the container size
-            f = open(self.home, 'rb+')
-            self._data_length = offset+length
-            f.seek(self.DATA_LENGTH_OFFSET)
-            f.write(struct.pack(">Q", self._data_length))
+        data_length = self._read_data_length(f)
+        extra_lease_offset = self._read_extra_lease_offset(f)
+
+        if offset+length >= data_length:
+            # They are expanding their data size.
+            if self.DATA_OFFSET+offset+length > extra_lease_offset:
+                # Their new data won't fit in the current container, so we
+                # have to move the leases. With luck, they're expanding it
+                # more than the size of the extra lease block, which will
+                # minimize the corrupt-the-share window
+                self._change_container_size(f, offset+length)
+                extra_lease_offset = self._read_extra_lease_offset(f)
+
+                # an interrupt here is ok.. the container has been enlarged
+                # but the data remains untouched
+
+            assert self.DATA_OFFSET+offset+length <= extra_lease_offset
+            # Their data now fits in the current container. We must write
+            # their new data and modify the recorded data size.
+            new_data_length = offset+length
+            self._write_data_length(f, new_data_length)
             # an interrupt here will result in a corrupted share
-            f.seek(self.DATA_OFFSET+offset)
-            f.write(data)
-            f.close()
-            return
-
-        # they are expanding the container, so we have to move the leases.
-        # With luck, they're expanding it more than the size of the extra
-        # lease block, which will minimize the corrupt-the-share window
-
-        self.change_container_size(offset+length)
-
-        # an interrupt here is ok.. the container has been enlarged but the
-        # data remains untouched
 
-        self._data_length = offset+length
-
-        f = open(self.home, 'rb+')
+        # now all that's left to do is write out their data
         f.seek(self.DATA_OFFSET+offset)
         f.write(data)
-        # an interrupt here will result in a corrupted share
-        f.seek(self.DATA_LENGTH_OFFSET)
-        f.write(struct.pack(">Q", self._data_length))
-        f.close()
         return
 
     def _write_lease_record(self, f, lease_number, lease_info):
         (ownerid, expiration_time,
          renew_secret, cancel_secret, nodeid) = lease_info
+        extra_lease_offset = self._read_extra_lease_offset(f)
+        num_extra_leases = self._read_num_extra_leases(f)
         if lease_number < 4:
             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
-        elif (lease_number-4) < self._num_extra_leases:
-            offset = (self._extra_lease_offset
+        elif (lease_number-4) < num_extra_leases:
+            offset = (extra_lease_offset
                       + 4
                       + (lease_number-4)*self.LEASE_NUMBER)
         else:
-            f.seek(self._extra_lease_offset)
-            f.write(struct.pack(">L", self._num_extra_leases+1))
-            self._num_extra_leases += 1
-            offset = (self._extra_lease_offset
+            # must add an extra lease record
+            self._write_num_extra_leases(f, num_extra_leases+1)
+            offset = (extra_lease_offset
                       + 4
                       + (lease_number-4)*self.LEASE_NUMBER)
         f.seek(offset)
@@ -396,10 +412,12 @@ class MutableShareFile(Referenceable):
 
     def _read_lease_record(self, f, lease_number):
         # returns a 5-tuple of lease info, or None
+        extra_lease_offset = self._read_extra_lease_offset(f)
+        num_extra_leases = self._read_num_extra_leases(f)
         if lease_number < 4:
             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
-        elif (lease_number-4) < self._num_extra_leases:
-            offset = (self._extra_lease_offset
+        elif (lease_number-4) < num_extra_leases:
+            offset = (extra_lease_offset
                       + 4
                       + (lease_number-4)*self.LEASE_NUMBER)
         else:
@@ -414,27 +432,25 @@ class MutableShareFile(Referenceable):
             return None
         return lease_info
 
-    def _read_num_leases(self, f):
-        f.seek(self.HEADER_SIZE)
-        leasedata = f.read(4*self.LEASE_SIZE)
-        num_leases = 0
-        for i in range(4):
-            base = i*self.LEASE_SIZE
-            (ownerid,) = struct.unpack(">L", leasedata[base:base+4])
-            if ownerid != 0:
-                num_leases += 1
-        return num_leases + self._num_extra_leases
-
-    def pack_leases(self):
-        pass
+    def _get_num_lease_slots(self, f):
+        # how many places do we have allocated for leases? Not all of them
+        # are filled.
+        num_extra_leases = self._read_num_extra_leases(f)
+        return 4+num_extra_leases
 
-    def _truncate_leases(self, f, num_leases):
-        f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
+    def _get_first_empty_lease_slot(self, f):
+        # return an int with the index of an empty slot, or None if we do not
+        # currently have an empty slot
+
+        for i in range(self._get_num_lease_slots(f)):
+            if self._read_lease_record(f, i) is None:
+                return i
+        return None
 
     def enumerate_leases(self, f):
         """Yields (leasenum, (ownerid, expiration_time, renew_secret,
         cancel_secret, accepting_nodeid)) for all leases."""
-        for i in range(self._read_num_leases(f)):
+        for i in range(self._get_num_lease_slots(f)):
             try:
                 data = self._read_lease_record(f, i)
                 if data is not None:
@@ -444,9 +460,12 @@ class MutableShareFile(Referenceable):
 
     def add_lease(self, lease_info):
         f = open(self.home, 'rb+')
-        num_leases = self._read_num_leases(f)
-        self._write_lease_record(f, num_leases, lease_info)
-        self._write_num_leases(f, num_leases+1)
+        num_lease_slots = self._get_num_lease_slots(f)
+        empty_slot = self._get_first_empty_lease_slot(f)
+        if empty_slot is not None:
+            self._write_lease_record(f, empty_slot, lease_info)
+        else:
+            self._write_lease_record(f, num_lease_slots, lease_info)
         f.close()
 
     def renew_lease(self, renew_secret, new_expire_time):
@@ -463,10 +482,15 @@ class MutableShareFile(Referenceable):
                 return
             accepting_nodeids.add(anid)
         f.close()
-        # TODO: return the accepting_nodeids set, to give the client a chance
-        # to update the leases on a share which has been migrated from its
+        # Return the accepting_nodeids set, to give the client a chance to
+        # update the leases on a share which has been migrated from its
         # original server to a new one.
-        raise IndexError("unable to renew non-existent lease")
+        msg = ("Unable to renew non-existent lease. I have leases accepted by"
+               " nodeids: ")
+        msg += ",".join([("'%s'" % idlib.b2a(anid))
+                         for anid in accepting_nodeids])
+        msg += " ."
+        raise IndexError(msg)
 
     def add_or_renew_lease(self, lease_info):
         ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info
@@ -480,12 +504,14 @@ class MutableShareFile(Referenceable):
         (num_remaining_leases, space_freed). Raise IndexError if there was no
         lease with the given cancel_secret."""
 
+        accepting_nodeids = set()
         modified = 0
         remaining = 0
         blank = "\x00"*32
         blank_lease = (0, 0, blank, blank, blank)
         f = open(self.home, 'rb+')
         for (leasenum,(oid,et,rs,cs,anid)) in self.enumerate_leases(f):
+            accepting_nodeids.add(anid)
             if cs == cancel_secret:
                 self._write_lease_record(f, leasenum, blank_lease)
                 modified += 1
@@ -493,44 +519,72 @@ class MutableShareFile(Referenceable):
                 remaining += 1
         if modified:
             freed_space = self._pack_leases(f)
-        f.close()
-        return (freed_space, remaining)
+            f.close()
+            return (freed_space, remaining)
+        msg = ("Unable to cancel non-existent lease. I have leases "
+               "accepted by nodeids: ")
+        msg += ",".join([("'%s'" % idlib.b2a(anid))
+                         for anid in accepting_nodeids])
+        msg += " ."
+        raise IndexError(msg)
 
     def _pack_leases(self, f):
         # TODO: reclaim space from cancelled leases
         return 0
 
+    def _read_write_enabler_and_nodeid(self, f):
+        f.seek(0)
+        data = f.read(self.HEADER_SIZE)
+        (magic,
+         write_enabler_nodeid, write_enabler,
+         data_length, extra_least_offset) = \
+         struct.unpack(">32s32s32sQQ", data)
+        assert magic == self.MAGIC
+        return (write_enabler, write_enabler_nodeid)
+
     def remote_read(self, offset, length):
-        return self.read_share_data(offset, length)
+        f = open(self.home, 'rb')
+        data = self._read_share_data(f, offset, length)
+        f.close()
+        return data
+
     def remote_get_length(self):
-        return self._data_length
+        f = open(self.home, 'rb')
+        data_length = self._read_data_length(f)
+        f.close()
+        return data_length
+
     def remote_testv_and_writev(self, write_enabler, testv, datav, new_length):
-        if write_enabler != self._write_enabler:
+        f = open(self.home, 'rb+')
+        (real_write_enabler, write_enabler_nodeid) = \
+                             self._read_write_enabler_and_nodeid(f)
+        if write_enabler != real_write_enabler:
             # accomodate share migration by reporting the nodeid used for the
             # old write enabler.
+            f.close()
             msg = "The write enabler was recorded by nodeid '%s'." % \
-                  (idlib.b2a(self._write_enabler_nodeid),)
-            raise BadWriterEnablerError(msg)
+                  (idlib.b2a(write_enabler_nodeid),)
+            raise BadWriteEnablerError(msg)
+
         # check testv
         test_results_v = []
         test_failed = False
         for (offset, length, operator, specimen) in testv:
-            data = self.read_share_data(offset, length)
+            data = self._read_share_data(f, offset, length)
             test_results_v.append(data)
             if not self.compare(data, operator, specimen):
-                test_failed = False
+                test_failed = True
         if test_failed:
+            f.close()
             return (False, test_results_v)
         # now apply the write vector
         for (offset, data) in datav:
-            self.write_share_data(offset, data)
+            self._write_share_data(f, offset, data)
         if new_length is not None:
-            self.change_container_size(new_length)
-            self._data_length = new_length
-            f = open(self.home, "rb+")
+            self._change_container_size(f, new_length)
             f.seek(self.DATA_LENGTH_OFFSET)
-            f.write(struct.pack(">Q", self._data_length))
-            f.close()
+            f.write(struct.pack(">Q", new_length))
+        f.close()
         return (True, test_results_v)
 
     def compare(self, a, op, b):
@@ -538,9 +592,9 @@ class MutableShareFile(Referenceable):
         if op == "nop":
             return True
         if op == "lt":
-            return a <= b
-        if op == "le":
             return a < b
+        if op == "le":
+            return a <= b
         if op == "eq":
             return a == b
         if op == "ne":
@@ -574,9 +628,21 @@ class StorageServer(service.MultiService, Referenceable):
         self._clean_incomplete()
         fileutil.make_dirs(self.incomingdir)
         self._active_writers = weakref.WeakKeyDictionary()
-
         self.measure_size()
 
+    def setNodeID(self, nodeid):
+        # somebody must set this before any slots can be created or leases
+        # added
+        self.my_nodeid = nodeid
+
+    def startService(self):
+        service.MultiService.startService(self)
+        if self.parent:
+            nodeid = self.parent.nodeid # 20 bytes, binary
+            assert len(nodeid) == 20
+            self.setNodeID(nodeid + "\x00"*12) # make it 32 bytes
+            # TODO: review this 20-vs-32 thing, settle on one or the other
+
     def _clean_incomplete(self):
         fileutil.rm_dir(self.incomingdir)
 
@@ -740,6 +806,50 @@ class StorageServer(service.MultiService, Referenceable):
         except StopIteration:
             return iter([])
 
+    def remote_allocate_mutable_slot(self, storage_index,
+                                     write_enabler,
+                                     renew_secret, cancel_secret,
+                                     sharenums,
+                                     allocated_size):
+        my_nodeid = self.my_nodeid
+        sharenums = set(sharenums)
+        shares = self.remote_get_mutable_slot(storage_index)
+        existing_shnums = set(shares.keys())
+        si_s = idlib.b2a(storage_index)
+        bucketdir = os.path.join(self.sharedir, si_s)
+        fileutil.make_dirs(bucketdir)
+        for shnum in (sharenums - existing_shnums):
+            filename = os.path.join(bucketdir, "%d" % shnum)
+            shares[shnum] = create_mutable_sharefile(filename, my_nodeid,
+                                                     write_enabler)
+
+        # update the lease on everything
+        ownerid = 1 # TODO
+        expire_time = time.time() + 31*24*60*60   # one month
+        anid = my_nodeid
+        lease_info = (ownerid, expire_time, renew_secret, cancel_secret, anid)
+        for share in shares.values():
+            share.add_or_renew_lease(lease_info)
+        return shares
+
+    def remote_get_mutable_slot(self, storage_index):
+        """This returns an empty dictionary if the server has no shares
+        of the slot mentioned."""
+        si_s = idlib.b2a(storage_index)
+        # shares exist if there is a file for them
+        bucketdir = os.path.join(self.sharedir, si_s)
+        if not os.path.isdir(bucketdir):
+            return {}
+        slots = {}
+        for sharenum_s in os.listdir(bucketdir):
+            try:
+                sharenum = int(sharenum_s)
+            except ValueError:
+                continue
+            filename = os.path.join(bucketdir, sharenum_s)
+            slots[sharenum] = MutableShareFile(filename)
+        return slots
+
 
 # the code before here runs on the storage server, not the client
 # the code beyond here runs on the client, not the storage server
diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py
index e8d81dba..1bb7ae7b 100644
--- a/src/allmydata/test/test_storage.py
+++ b/src/allmydata/test/test_storage.py
@@ -10,6 +10,7 @@ from allmydata import interfaces
 from allmydata.util import fileutil, hashutil
 from allmydata.storage import BucketWriter, BucketReader, \
      WriteBucketProxy, ReadBucketProxy, StorageServer
+from allmydata.interfaces import BadWriteEnablerError
 
 class Bucket(unittest.TestCase):
     def make_workdir(self, name):
@@ -456,6 +457,7 @@ class MutableServer(unittest.TestCase):
         workdir = self.workdir(name)
         ss = StorageServer(workdir, sizelimit)
         ss.setServiceParent(self.sparent)
+        ss.setNodeID("\x00" * 32)
         return ss
 
     def test_create(self):
@@ -478,26 +480,201 @@ class MutableServer(unittest.TestCase):
         shares = self.allocate(ss, "si1", "we1", set([0,1,2]), 100)
         self.failUnlessEqual(len(shares), 3)
         self.failUnlessEqual(set(shares.keys()), set([0,1,2]))
-        shares2 = ss.get_mutable_slot("si1")
+        shares2 = ss.remote_get_mutable_slot("si1")
         self.failUnlessEqual(len(shares2), 3)
         self.failUnlessEqual(set(shares2.keys()), set([0,1,2]))
-        # the actual RIMutableSlot objects are required to be singtons (one
-        # per SI+shnum), so each get_mutable_slot() call should return the
-        # same RemoteReferences
-        self.failUnlessEqual(set(shares.values()), set(shares2.values()))
 
         s0 = shares[0]
         self.failUnlessEqual(s0.remote_read(0, 10), "")
         self.failUnlessEqual(s0.remote_read(100, 10), "")
         # try writing to one
+        WE = self.write_enabler("we1")
         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
-        answer = s0.remote_testv_and_writev(self.write_enabler("we1"),
+        answer = s0.remote_testv_and_writev(WE,
                                             [],
                                             [(0, data),],
                                             new_length=None)
-        self.failUnlessEqual(answer, [])
+        self.failUnlessEqual(answer, (True, []))
 
         self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111")
         self.failUnlessEqual(s0.remote_read(95, 10), "99999")
         self.failUnlessEqual(s0.remote_get_length(), 100)
 
+        self.failUnlessRaises(BadWriteEnablerError,
+                              s0.remote_testv_and_writev,
+                              "bad write enabler",
+                              [], [], None)
+        # this testv should fail
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(0, 12, "eq", "444444444444"),
+                                             (20, 5, "eq", "22222"),
+                                             ],
+                                            [(0, "x"*100)], None)
+        self.failUnlessEqual(answer, (False, ["000000000011",
+                                              "22222"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), data)
+
+        # as should this one
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "lt", "11111"),
+                                             ],
+                                            [(0, "x"*100)], None)
+        self.failUnlessEqual(answer, (False, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), data)
+
+
+    def test_operators(self):
+        # test operators, the data we're comparing is '11111' in all cases.
+        # test both fail+pass, reset data after each one.
+        ss = self.create("test_operators")
+        shares = self.allocate(ss, "si1", "we1", set([0,1,2]), 100)
+        s0 = shares[0]
+        WE = self.write_enabler("we1")
+        data = "".join([ ("%d" % i) * 10 for i in range(10) ])
+        answer = s0.remote_testv_and_writev(WE,
+                                            [],
+                                            [(0, data),],
+                                            new_length=None)
+
+        #  nop
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "nop", "11111"),
+                                             ],
+                                            [(0, "x"*100)], None)
+        self.failUnlessEqual(answer, (True, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), "x"*100)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        #  lt
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "lt", "11110"),
+                                             ],
+                                            [(0, "x"*100)], None)
+        self.failUnlessEqual(answer, (False, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), data)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "lt", "11111"),
+                                             ],
+                                            [(0, "x"*100)], None)
+        self.failUnlessEqual(answer, (False, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), data)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "lt", "11112"),
+                                             ],
+                                            [(0, "y"*100)], None)
+        self.failUnlessEqual(answer, (True, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        #  le
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "le", "11110"),
+                                             ],
+                                            [(0, "x"*100)], None)
+        self.failUnlessEqual(answer, (False, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), data)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "le", "11111"),
+                                             ],
+                                            [(0, "y"*100)], None)
+        self.failUnlessEqual(answer, (True, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "le", "11112"),
+                                             ],
+                                            [(0, "y"*100)], None)
+        self.failUnlessEqual(answer, (True, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        #  eq
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "eq", "11112"),
+                                             ],
+                                            [(0, "x"*100)], None)
+        self.failUnlessEqual(answer, (False, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), data)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "eq", "11111"),
+                                             ],
+                                            [(0, "y"*100)], None)
+        self.failUnlessEqual(answer, (True, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        #  ne
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "ne", "11111"),
+                                             ],
+                                            [(0, "x"*100)], None)
+        self.failUnlessEqual(answer, (False, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), data)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "ne", "11112"),
+                                             ],
+                                            [(0, "y"*100)], None)
+        self.failUnlessEqual(answer, (True, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        #  ge
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "ge", "11110"),
+                                             ],
+                                            [(0, "y"*100)], None)
+        self.failUnlessEqual(answer, (True, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "ge", "11111"),
+                                             ],
+                                            [(0, "y"*100)], None)
+        self.failUnlessEqual(answer, (True, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "ge", "11112"),
+                                             ],
+                                            [(0, "y"*100)], None)
+        self.failUnlessEqual(answer, (False, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), data)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        #  gt
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "gt", "11110"),
+                                             ],
+                                            [(0, "y"*100)], None)
+        self.failUnlessEqual(answer, (True, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "gt", "11111"),
+                                             ],
+                                            [(0, "x"*100)], None)
+        self.failUnlessEqual(answer, (False, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), data)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+
+        answer = s0.remote_testv_and_writev(WE,
+                                            [(10, 5, "gt", "11112"),
+                                             ],
+                                            [(0, "x"*100)], None)
+        self.failUnlessEqual(answer, (False, ["11111"]))
+        self.failUnlessEqual(s0.remote_read(0, 100), data)
+        s0.remote_testv_and_writev(WE, [], [(0,data)], None)