From: Brian Warner Date: Wed, 31 Oct 2007 07:10:40 +0000 (-0700) Subject: mutable slots: finish up basic coding on server-side containers, add some tests.... X-Git-Tag: allmydata-tahoe-0.7.0~342 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/%22doc.html/running.html?a=commitdiff_plain;h=68d3d620026330bc14fc27218f1c81023188c657;p=tahoe-lafs%2Ftahoe-lafs.git mutable slots: finish up basic coding on server-side containers, add some tests. Remove all caching from MutableShareFile. --- diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 9125350a..a9dce100 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -111,7 +111,7 @@ class RIMutableSlot(RemoteInterface): The boolean return value is True if the write vector was applied, false if not. - If the write_enabler is wrong, this will raise BadWriterEnablerError. + If the write_enabler is wrong, this will raise BadWriteEnablerError. To enable share migration, the exception will have the nodeid used for the old write enabler embedded in it, in the following string:: diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index 6a15edbb..dcd11554 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -8,7 +8,7 @@ from twisted.internet import defer from zope.interface import implements from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \ - BadWriterEnablerError, RIMutableSlot + BadWriteEnablerError, RIMutableSlot from allmydata.util import fileutil, idlib, mathutil from allmydata.util.assertutil import precondition, _assert @@ -224,15 +224,15 @@ class BucketReader(Referenceable): # 1 0 32 magic verstr "tahoe mutable container v1" plus binary # 2 32 32 write enabler's nodeid # 3 64 32 write enabler -# 4 72 8 data size (actual share data present) (a) -# 5 80 8 offset of (8) count of extra leases (after data) -# 6 88 416 four leases, 104 bytes each +# 4 96 8 data size (actual share data present) (a) +# 5 104 8 offset of (8) count of extra leases (after data) +# 6 112 416 four leases, 104 bytes each # 0 4 ownerid (0 means "no lease here") # 4 4 expiration timestamp # 8 32 renewal token # 40 32 cancel token # 72 32 nodeid which accepted the tokens -# 7 504 (a) data +# 7 528 (a) data # 8 ?? 4 count of extra leases # 9 ?? n*104 extra leases @@ -249,9 +249,12 @@ class MutableShareFile(Referenceable): implements(RIMutableSlot) DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s") + EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8 HEADER_SIZE = struct.calcsize(">32s32s32sQQ") # doesn't include leases LEASE_SIZE = struct.calcsize(">LL32s32s32s") + assert LEASE_SIZE == 104 DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE + assert DATA_OFFSET == 528, DATA_OFFSET # our sharefiles share with a recognizable string, plus some random # binary data to reduce the chance that a regular text file will look # like a sharefile. @@ -262,130 +265,143 @@ class MutableShareFile(Referenceable): def __init__(self, filename): self.home = filename - self._base_lease_offset = self.HEADER_SIZE if os.path.exists(self.home): + # we don't cache anything, just check the magic f = open(self.home, 'rb') - data = f.read(88) + data = f.read(self.HEADER_SIZE) (magic, - self._write_enabler_nodeid, self._write_enabler, - self._data_length, offset) = struct.unpack(">32s32s32sQQ", data) + write_enabler_nodeid, write_enabler, + data_length, extra_least_offset) = \ + struct.unpack(">32s32s32sQQ", data) assert magic == self.MAGIC - self._extra_lease_offset = offset # points at (8) - f.seek(self._extra_lease_offset) - data = f.read(4) - self._num_extra_leases = struct.unpack(">L", data) - f.close() def create(self, my_nodeid, write_enabler): assert not os.path.exists(self.home) - self._write_enabler = write_enabler - self._data_length = 0 - self._extra_lease_offset = (self.HEADER_SIZE - + 4 * self.LEASE_SIZE - + self._data_length) - assert self._extra_lease_offset == self.DATA_OFFSET # true at creation - self._num_extra_leases = 0 + data_length = 0 + extra_lease_offset = (self.HEADER_SIZE + + 4 * self.LEASE_SIZE + + data_length) + assert extra_lease_offset == self.DATA_OFFSET # true at creation + num_extra_leases = 0 f = open(self.home, 'wb') header = struct.pack(">32s32s32sQQ", self.MAGIC, my_nodeid, write_enabler, - self._data_length, self._extra_lease_offset, + data_length, extra_lease_offset, ) - f.write(header) + leases = ("\x00"*self.LEASE_SIZE) * 4 + f.write(header + leases) # data goes here, empty after creation - f.write(struct.pack(">L", self._num_extra_leases)) + f.write(struct.pack(">L", num_extra_leases)) # extra leases go here, none at creation f.close() + def _read_data_length(self, f): + f.seek(self.DATA_LENGTH_OFFSET) + (data_length,) = struct.unpack(">Q", f.read(8)) + return data_length - def read_share_data(self, offset, length): + def _write_data_length(self, f, data_length): + f.seek(self.DATA_LENGTH_OFFSET) + f.write(struct.pack(">Q", data_length)) + + def _read_share_data(self, f, offset, length): precondition(offset >= 0) - if offset+length > self._data_length: + data_length = self._read_data_length(f) + if offset+length > data_length: # reads beyond the end of the data are truncated. Reads that # start beyond the end of the data return an empty string. - length = max(0, self._data_length-offset) + length = max(0, data_length-offset) if length == 0: return "" - precondition(offset+length <= self._data_length) - f = open(self.home, 'rb') + precondition(offset+length <= data_length) f.seek(self.DATA_OFFSET+offset) - return f.read(length) + data = f.read(length) + return data + + def _read_extra_lease_offset(self, f): + f.seek(self.EXTRA_LEASE_OFFSET) + (extra_lease_offset,) = struct.unpack(">Q", f.read(8)) + return extra_lease_offset + + def _write_extra_lease_offset(self, f, offset): + f.seek(self.EXTRA_LEASE_OFFSET) + f.write(struct.pack(">Q", offset)) - def change_container_size(self, new_container_size): + def _read_num_extra_leases(self, f): + offset = self._read_extra_lease_offset(f) + f.seek(offset) + (num_extra_leases,) = struct.unpack(">L", f.read(4)) + return num_extra_leases + + def _write_num_extra_leases(self, f, num_leases): + extra_lease_offset = self._read_extra_lease_offset(f) + f.seek(extra_lease_offset) + f.write(struct.pack(">L", num_leases)) + + def _change_container_size(self, f, new_container_size): if new_container_size > self.MAX_SIZE: raise DataTooLargeError() + old_extra_lease_offset = self._read_extra_lease_offset(f) new_extra_lease_offset = self.DATA_OFFSET + new_container_size - if new_extra_lease_offset < self._extra_lease_offset: - # TODO: allow containers to shrink + if new_extra_lease_offset < old_extra_lease_offset: + # TODO: allow containers to shrink. For now they remain large. return - f = open(self.home, 'rb+') - f.seek(self._extra_lease_offset) - extra_lease_data = f.read(4 + self._num_extra_leases * self.LEASE_SIZE) + num_extra_leases = self._read_num_extra_leases(f) + f.seek(old_extra_lease_offset) + extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE) f.seek(new_extra_lease_offset) f.write(extra_lease_data) - self._extra_lease_offset = new_extra_lease_offset # an interrupt here will corrupt the leases, iff the move caused the # extra leases to overlap. - f.seek(self.DATA_LENGTH_OFFSET+8) - f.write(struct.pack(">Q", new_extra_lease_offset)) - f.close() + self._write_extra_lease_offset(f, new_extra_lease_offset) - def write_share_data(self, offset, data): + def _write_share_data(self, f, offset, data): length = len(data) precondition(offset >= 0) - if offset+length < self._data_length: - # they are not expanding their data size - f = open(self.home, 'rb+') - f.seek(self.DATA_OFFSET+offset) - f.write(data) - f.close() - return - if self.DATA_OFFSET+offset+length <= self._extra_lease_offset: - # they are expanding their data size, but not the container size - f = open(self.home, 'rb+') - self._data_length = offset+length - f.seek(self.DATA_LENGTH_OFFSET) - f.write(struct.pack(">Q", self._data_length)) + data_length = self._read_data_length(f) + extra_lease_offset = self._read_extra_lease_offset(f) + + if offset+length >= data_length: + # They are expanding their data size. + if self.DATA_OFFSET+offset+length > extra_lease_offset: + # Their new data won't fit in the current container, so we + # have to move the leases. With luck, they're expanding it + # more than the size of the extra lease block, which will + # minimize the corrupt-the-share window + self._change_container_size(f, offset+length) + extra_lease_offset = self._read_extra_lease_offset(f) + + # an interrupt here is ok.. the container has been enlarged + # but the data remains untouched + + assert self.DATA_OFFSET+offset+length <= extra_lease_offset + # Their data now fits in the current container. We must write + # their new data and modify the recorded data size. + new_data_length = offset+length + self._write_data_length(f, new_data_length) # an interrupt here will result in a corrupted share - f.seek(self.DATA_OFFSET+offset) - f.write(data) - f.close() - return - - # they are expanding the container, so we have to move the leases. - # With luck, they're expanding it more than the size of the extra - # lease block, which will minimize the corrupt-the-share window - - self.change_container_size(offset+length) - - # an interrupt here is ok.. the container has been enlarged but the - # data remains untouched - self._data_length = offset+length - - f = open(self.home, 'rb+') + # now all that's left to do is write out their data f.seek(self.DATA_OFFSET+offset) f.write(data) - # an interrupt here will result in a corrupted share - f.seek(self.DATA_LENGTH_OFFSET) - f.write(struct.pack(">Q", self._data_length)) - f.close() return def _write_lease_record(self, f, lease_number, lease_info): (ownerid, expiration_time, renew_secret, cancel_secret, nodeid) = lease_info + extra_lease_offset = self._read_extra_lease_offset(f) + num_extra_leases = self._read_num_extra_leases(f) if lease_number < 4: offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE - elif (lease_number-4) < self._num_extra_leases: - offset = (self._extra_lease_offset + elif (lease_number-4) < num_extra_leases: + offset = (extra_lease_offset + 4 + (lease_number-4)*self.LEASE_NUMBER) else: - f.seek(self._extra_lease_offset) - f.write(struct.pack(">L", self._num_extra_leases+1)) - self._num_extra_leases += 1 - offset = (self._extra_lease_offset + # must add an extra lease record + self._write_num_extra_leases(f, num_extra_leases+1) + offset = (extra_lease_offset + 4 + (lease_number-4)*self.LEASE_NUMBER) f.seek(offset) @@ -396,10 +412,12 @@ class MutableShareFile(Referenceable): def _read_lease_record(self, f, lease_number): # returns a 5-tuple of lease info, or None + extra_lease_offset = self._read_extra_lease_offset(f) + num_extra_leases = self._read_num_extra_leases(f) if lease_number < 4: offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE - elif (lease_number-4) < self._num_extra_leases: - offset = (self._extra_lease_offset + elif (lease_number-4) < num_extra_leases: + offset = (extra_lease_offset + 4 + (lease_number-4)*self.LEASE_NUMBER) else: @@ -414,27 +432,25 @@ class MutableShareFile(Referenceable): return None return lease_info - def _read_num_leases(self, f): - f.seek(self.HEADER_SIZE) - leasedata = f.read(4*self.LEASE_SIZE) - num_leases = 0 - for i in range(4): - base = i*self.LEASE_SIZE - (ownerid,) = struct.unpack(">L", leasedata[base:base+4]) - if ownerid != 0: - num_leases += 1 - return num_leases + self._num_extra_leases - - def pack_leases(self): - pass + def _get_num_lease_slots(self, f): + # how many places do we have allocated for leases? Not all of them + # are filled. + num_extra_leases = self._read_num_extra_leases(f) + return 4+num_extra_leases - def _truncate_leases(self, f, num_leases): - f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE) + def _get_first_empty_lease_slot(self, f): + # return an int with the index of an empty slot, or None if we do not + # currently have an empty slot + + for i in range(self._get_num_lease_slots(f)): + if self._read_lease_record(f, i) is None: + return i + return None def enumerate_leases(self, f): """Yields (leasenum, (ownerid, expiration_time, renew_secret, cancel_secret, accepting_nodeid)) for all leases.""" - for i in range(self._read_num_leases(f)): + for i in range(self._get_num_lease_slots(f)): try: data = self._read_lease_record(f, i) if data is not None: @@ -444,9 +460,12 @@ class MutableShareFile(Referenceable): def add_lease(self, lease_info): f = open(self.home, 'rb+') - num_leases = self._read_num_leases(f) - self._write_lease_record(f, num_leases, lease_info) - self._write_num_leases(f, num_leases+1) + num_lease_slots = self._get_num_lease_slots(f) + empty_slot = self._get_first_empty_lease_slot(f) + if empty_slot is not None: + self._write_lease_record(f, empty_slot, lease_info) + else: + self._write_lease_record(f, num_lease_slots, lease_info) f.close() def renew_lease(self, renew_secret, new_expire_time): @@ -463,10 +482,15 @@ class MutableShareFile(Referenceable): return accepting_nodeids.add(anid) f.close() - # TODO: return the accepting_nodeids set, to give the client a chance - # to update the leases on a share which has been migrated from its + # Return the accepting_nodeids set, to give the client a chance to + # update the leases on a share which has been migrated from its # original server to a new one. - raise IndexError("unable to renew non-existent lease") + msg = ("Unable to renew non-existent lease. I have leases accepted by" + " nodeids: ") + msg += ",".join([("'%s'" % idlib.b2a(anid)) + for anid in accepting_nodeids]) + msg += " ." + raise IndexError(msg) def add_or_renew_lease(self, lease_info): ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info @@ -480,12 +504,14 @@ class MutableShareFile(Referenceable): (num_remaining_leases, space_freed). Raise IndexError if there was no lease with the given cancel_secret.""" + accepting_nodeids = set() modified = 0 remaining = 0 blank = "\x00"*32 blank_lease = (0, 0, blank, blank, blank) f = open(self.home, 'rb+') for (leasenum,(oid,et,rs,cs,anid)) in self.enumerate_leases(f): + accepting_nodeids.add(anid) if cs == cancel_secret: self._write_lease_record(f, leasenum, blank_lease) modified += 1 @@ -493,44 +519,72 @@ class MutableShareFile(Referenceable): remaining += 1 if modified: freed_space = self._pack_leases(f) - f.close() - return (freed_space, remaining) + f.close() + return (freed_space, remaining) + msg = ("Unable to cancel non-existent lease. I have leases " + "accepted by nodeids: ") + msg += ",".join([("'%s'" % idlib.b2a(anid)) + for anid in accepting_nodeids]) + msg += " ." + raise IndexError(msg) def _pack_leases(self, f): # TODO: reclaim space from cancelled leases return 0 + def _read_write_enabler_and_nodeid(self, f): + f.seek(0) + data = f.read(self.HEADER_SIZE) + (magic, + write_enabler_nodeid, write_enabler, + data_length, extra_least_offset) = \ + struct.unpack(">32s32s32sQQ", data) + assert magic == self.MAGIC + return (write_enabler, write_enabler_nodeid) + def remote_read(self, offset, length): - return self.read_share_data(offset, length) + f = open(self.home, 'rb') + data = self._read_share_data(f, offset, length) + f.close() + return data + def remote_get_length(self): - return self._data_length + f = open(self.home, 'rb') + data_length = self._read_data_length(f) + f.close() + return data_length + def remote_testv_and_writev(self, write_enabler, testv, datav, new_length): - if write_enabler != self._write_enabler: + f = open(self.home, 'rb+') + (real_write_enabler, write_enabler_nodeid) = \ + self._read_write_enabler_and_nodeid(f) + if write_enabler != real_write_enabler: # accomodate share migration by reporting the nodeid used for the # old write enabler. + f.close() msg = "The write enabler was recorded by nodeid '%s'." % \ - (idlib.b2a(self._write_enabler_nodeid),) - raise BadWriterEnablerError(msg) + (idlib.b2a(write_enabler_nodeid),) + raise BadWriteEnablerError(msg) + # check testv test_results_v = [] test_failed = False for (offset, length, operator, specimen) in testv: - data = self.read_share_data(offset, length) + data = self._read_share_data(f, offset, length) test_results_v.append(data) if not self.compare(data, operator, specimen): - test_failed = False + test_failed = True if test_failed: + f.close() return (False, test_results_v) # now apply the write vector for (offset, data) in datav: - self.write_share_data(offset, data) + self._write_share_data(f, offset, data) if new_length is not None: - self.change_container_size(new_length) - self._data_length = new_length - f = open(self.home, "rb+") + self._change_container_size(f, new_length) f.seek(self.DATA_LENGTH_OFFSET) - f.write(struct.pack(">Q", self._data_length)) - f.close() + f.write(struct.pack(">Q", new_length)) + f.close() return (True, test_results_v) def compare(self, a, op, b): @@ -538,9 +592,9 @@ class MutableShareFile(Referenceable): if op == "nop": return True if op == "lt": - return a <= b - if op == "le": return a < b + if op == "le": + return a <= b if op == "eq": return a == b if op == "ne": @@ -574,9 +628,21 @@ class StorageServer(service.MultiService, Referenceable): self._clean_incomplete() fileutil.make_dirs(self.incomingdir) self._active_writers = weakref.WeakKeyDictionary() - self.measure_size() + def setNodeID(self, nodeid): + # somebody must set this before any slots can be created or leases + # added + self.my_nodeid = nodeid + + def startService(self): + service.MultiService.startService(self) + if self.parent: + nodeid = self.parent.nodeid # 20 bytes, binary + assert len(nodeid) == 20 + self.setNodeID(nodeid + "\x00"*12) # make it 32 bytes + # TODO: review this 20-vs-32 thing, settle on one or the other + def _clean_incomplete(self): fileutil.rm_dir(self.incomingdir) @@ -740,6 +806,50 @@ class StorageServer(service.MultiService, Referenceable): except StopIteration: return iter([]) + def remote_allocate_mutable_slot(self, storage_index, + write_enabler, + renew_secret, cancel_secret, + sharenums, + allocated_size): + my_nodeid = self.my_nodeid + sharenums = set(sharenums) + shares = self.remote_get_mutable_slot(storage_index) + existing_shnums = set(shares.keys()) + si_s = idlib.b2a(storage_index) + bucketdir = os.path.join(self.sharedir, si_s) + fileutil.make_dirs(bucketdir) + for shnum in (sharenums - existing_shnums): + filename = os.path.join(bucketdir, "%d" % shnum) + shares[shnum] = create_mutable_sharefile(filename, my_nodeid, + write_enabler) + + # update the lease on everything + ownerid = 1 # TODO + expire_time = time.time() + 31*24*60*60 # one month + anid = my_nodeid + lease_info = (ownerid, expire_time, renew_secret, cancel_secret, anid) + for share in shares.values(): + share.add_or_renew_lease(lease_info) + return shares + + def remote_get_mutable_slot(self, storage_index): + """This returns an empty dictionary if the server has no shares + of the slot mentioned.""" + si_s = idlib.b2a(storage_index) + # shares exist if there is a file for them + bucketdir = os.path.join(self.sharedir, si_s) + if not os.path.isdir(bucketdir): + return {} + slots = {} + for sharenum_s in os.listdir(bucketdir): + try: + sharenum = int(sharenum_s) + except ValueError: + continue + filename = os.path.join(bucketdir, sharenum_s) + slots[sharenum] = MutableShareFile(filename) + return slots + # the code before here runs on the storage server, not the client # the code beyond here runs on the client, not the storage server diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index e8d81dba..1bb7ae7b 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -10,6 +10,7 @@ from allmydata import interfaces from allmydata.util import fileutil, hashutil from allmydata.storage import BucketWriter, BucketReader, \ WriteBucketProxy, ReadBucketProxy, StorageServer +from allmydata.interfaces import BadWriteEnablerError class Bucket(unittest.TestCase): def make_workdir(self, name): @@ -456,6 +457,7 @@ class MutableServer(unittest.TestCase): workdir = self.workdir(name) ss = StorageServer(workdir, sizelimit) ss.setServiceParent(self.sparent) + ss.setNodeID("\x00" * 32) return ss def test_create(self): @@ -478,26 +480,201 @@ class MutableServer(unittest.TestCase): shares = self.allocate(ss, "si1", "we1", set([0,1,2]), 100) self.failUnlessEqual(len(shares), 3) self.failUnlessEqual(set(shares.keys()), set([0,1,2])) - shares2 = ss.get_mutable_slot("si1") + shares2 = ss.remote_get_mutable_slot("si1") self.failUnlessEqual(len(shares2), 3) self.failUnlessEqual(set(shares2.keys()), set([0,1,2])) - # the actual RIMutableSlot objects are required to be singtons (one - # per SI+shnum), so each get_mutable_slot() call should return the - # same RemoteReferences - self.failUnlessEqual(set(shares.values()), set(shares2.values())) s0 = shares[0] self.failUnlessEqual(s0.remote_read(0, 10), "") self.failUnlessEqual(s0.remote_read(100, 10), "") # try writing to one + WE = self.write_enabler("we1") data = "".join([ ("%d" % i) * 10 for i in range(10) ]) - answer = s0.remote_testv_and_writev(self.write_enabler("we1"), + answer = s0.remote_testv_and_writev(WE, [], [(0, data),], new_length=None) - self.failUnlessEqual(answer, []) + self.failUnlessEqual(answer, (True, [])) self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111") self.failUnlessEqual(s0.remote_read(95, 10), "99999") self.failUnlessEqual(s0.remote_get_length(), 100) + self.failUnlessRaises(BadWriteEnablerError, + s0.remote_testv_and_writev, + "bad write enabler", + [], [], None) + # this testv should fail + answer = s0.remote_testv_and_writev(WE, + [(0, 12, "eq", "444444444444"), + (20, 5, "eq", "22222"), + ], + [(0, "x"*100)], None) + self.failUnlessEqual(answer, (False, ["000000000011", + "22222"])) + self.failUnlessEqual(s0.remote_read(0, 100), data) + + # as should this one + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "lt", "11111"), + ], + [(0, "x"*100)], None) + self.failUnlessEqual(answer, (False, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), data) + + + def test_operators(self): + # test operators, the data we're comparing is '11111' in all cases. + # test both fail+pass, reset data after each one. + ss = self.create("test_operators") + shares = self.allocate(ss, "si1", "we1", set([0,1,2]), 100) + s0 = shares[0] + WE = self.write_enabler("we1") + data = "".join([ ("%d" % i) * 10 for i in range(10) ]) + answer = s0.remote_testv_and_writev(WE, + [], + [(0, data),], + new_length=None) + + # nop + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "nop", "11111"), + ], + [(0, "x"*100)], None) + self.failUnlessEqual(answer, (True, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), "x"*100) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + # lt + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "lt", "11110"), + ], + [(0, "x"*100)], None) + self.failUnlessEqual(answer, (False, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), data) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "lt", "11111"), + ], + [(0, "x"*100)], None) + self.failUnlessEqual(answer, (False, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), data) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "lt", "11112"), + ], + [(0, "y"*100)], None) + self.failUnlessEqual(answer, (True, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + # le + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "le", "11110"), + ], + [(0, "x"*100)], None) + self.failUnlessEqual(answer, (False, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), data) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "le", "11111"), + ], + [(0, "y"*100)], None) + self.failUnlessEqual(answer, (True, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "le", "11112"), + ], + [(0, "y"*100)], None) + self.failUnlessEqual(answer, (True, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + # eq + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "eq", "11112"), + ], + [(0, "x"*100)], None) + self.failUnlessEqual(answer, (False, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), data) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "eq", "11111"), + ], + [(0, "y"*100)], None) + self.failUnlessEqual(answer, (True, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + # ne + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "ne", "11111"), + ], + [(0, "x"*100)], None) + self.failUnlessEqual(answer, (False, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), data) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "ne", "11112"), + ], + [(0, "y"*100)], None) + self.failUnlessEqual(answer, (True, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + # ge + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "ge", "11110"), + ], + [(0, "y"*100)], None) + self.failUnlessEqual(answer, (True, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "ge", "11111"), + ], + [(0, "y"*100)], None) + self.failUnlessEqual(answer, (True, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "ge", "11112"), + ], + [(0, "y"*100)], None) + self.failUnlessEqual(answer, (False, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), data) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + # gt + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "gt", "11110"), + ], + [(0, "y"*100)], None) + self.failUnlessEqual(answer, (True, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "gt", "11111"), + ], + [(0, "x"*100)], None) + self.failUnlessEqual(answer, (False, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), data) + s0.remote_testv_and_writev(WE, [], [(0,data)], None) + + answer = s0.remote_testv_and_writev(WE, + [(10, 5, "gt", "11112"), + ], + [(0, "x"*100)], None) + self.failUnlessEqual(answer, (False, ["11111"])) + self.failUnlessEqual(s0.remote_read(0, 100), data) + s0.remote_testv_and_writev(WE, [], [(0,data)], None)