From: Brian Warner Date: Tue, 6 Nov 2007 03:17:14 +0000 (-0700) Subject: storage: rewrite slot API, now use testv_and_readv_and_writev or readv X-Git-Tag: allmydata-tahoe-0.7.0~286 X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/...?a=commitdiff_plain;h=e08b091d9f3ca2980f260b5c7447f29e1f7b1aa9;p=tahoe-lafs%2Ftahoe-lafs.git storage: rewrite slot API, now use testv_and_readv_and_writev or readv --- diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index b4c66787..51999b63 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -84,49 +84,19 @@ class RIBucketReader(RemoteInterface): TestVector = ListOf(TupleOf(int, int, str, str)) # elements are (offset, length, operator, specimen) -# operator is one of "lt, le, eq, ne, ge, gt, nop" +# operator is one of "lt, le, eq, ne, ge, gt" # nop always passes and is used to fetch data while writing. # you should use length==len(specimen) for everything except nop DataVector = ListOf(TupleOf(int, ShareData)) # (offset, data). This limits us to 30 writes of 1MiB each per call +TestAndWriteVectorsForShares = DictOf(int, + TupleOf(TestVector, + DataVector, + ChoiceOf(None, int))) # new_length ReadVector = ListOf(TupleOf(int, int)) -TestResults = ListOf(str) +ReadData = ListOf(ShareData) # returns data[offset:offset+length] for each element of TestVector -class RIMutableSlot(RemoteInterface): - def testv_and_writev(write_enabler=Hash, - testv=TestVector, - datav=DataVector, - new_length=ChoiceOf(None, int)): - """General-purpose test-and-set operation for mutable slots. Perform - the given comparisons. If they all pass, then apply the write vector. - - If new_length is not None, use it to set the size of the container. - This can be used to pre-allocate space for a series of upcoming - writes, or truncate existing data. If the container is growing, - new_length will be applied before datav. If the container is - shrinking, it will be applied afterwards. - - Return the old data that was used for the comparisons. - - The boolean return value is True if the write vector was applied, - false if not. - - If the write_enabler is wrong, this will raise BadWriteEnablerError. - To enable share migration, the exception will have the nodeid used - for the old write enabler embedded in it, in the following string:: - - The write enabler was recorded by nodeid '%s'. - - """ - return TupleOf(bool, TestResults) - - def read(offset=int, length=int): - return ShareData - - def get_length(): - return int - class RIStorageServer(RemoteInterface): def allocate_buckets(storage_index=StorageIndex, renew_secret=LeaseRenewSecret, @@ -170,13 +140,27 @@ class RIStorageServer(RemoteInterface): return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS) - def allocate_mutable_slot(storage_index=StorageIndex, - write_enabler=Hash, - renew_secret=LeaseRenewSecret, - cancel_secret=LeaseCancelSecret, - sharenums=SetOf(int, maxLength=MAX_BUCKETS), - allocated_size=int): - """ + + def slot_readv(storage_index=StorageIndex, + shares=ListOf(int), readv=ReadVector): + """Read a vector from the numbered shares associated with the given + storage index. An empty shares list means to return data from all + known shares. Returns a dictionary with one key per share.""" + return DictOf(int, DataVector) # shnum -> results + + def slot_testv_and_readv_and_writev(storage_index=StorageIndex, + secrets=TupleOf(Hash, Hash, Hash), + tw_vectors=TestAndWriteVectorsForShares, + r_vector=ReadVector, + ): + """General-purpose test-and-set operation for mutable slots. Perform + a bunch of comparisons against the existing shares. If they all pass, + then apply a bunch of write vectors to those shares. Then use the + read vectors to extract data from all the shares and return the data. + + This method is, um, large. The goal is to allow clients to update all + the shares associated with a mutable file in a single round trip. + @param storage_index: the index of the bucket to be created or increfed. @param write_enabler: a secret that is stored along with the slot. @@ -188,32 +172,51 @@ class RIStorageServer(RemoteInterface): stored for later comparison by the server. Each server is given a different secret. @param cancel_secret: Like renew_secret, but protects bucket decref. - @param sharenums: these are the share numbers (probably between 0 and - 99) that the sender is proposing to store on this - server. - @param allocated_size: all shares will pre-allocate this many bytes. - Use this to a) confirm that you can claim as - much space as you want before you actually - send the data, and b) reduce the disk-IO cost - of doing incremental writes. - @return: dict mapping sharenum to slot. The return value may include - more sharenums than asked, if some shares already existed. - New leases are added for all - shares. + The 'secrets' argument is a tuple of (write_enabler, renew_secret, + cancel_secret). The first is required to perform any write. The + latter two are used when allocating new shares. To simply acquire a + new lease on existing shares, use an empty testv and an empty writev. + + Each share can have a separate test vector (i.e. a list of + comparisons to perform). If all vectors for all shares pass, then all + writes for all shares are recorded. Each comparison is a 4-tuple of + (offset, length, operator, specimen), which effectively does a + read(offset, length) and then compares the result against the + specimen using the given equality/inequality operator. Reads from the + end of the container are truncated, and missing shares behave like + empty ones, so to assert that a share doesn't exist (for use when + creating a new share), use (0, 1, 'eq', ''). + + The write vector will be applied to the given share, expanding it if + necessary. A write vector applied to a share number that did not + exist previously will cause that share to be created. + + Each write vector is accompanied by a 'new_length' argument. If + new_length is not None, use it to set the size of the container. This + can be used to pre-allocate space for a series of upcoming writes, or + truncate existing data. If the container is growing, new_length will + be applied before datav. If the container is shrinking, it will be + applied afterwards. + + The read vector is used to extract data from all known shares, + *before* any writes have been applied. The same vector is used for + all shares. This captures the state that was tested by the test + vector. + + This method returns two values: a boolean and a dict. The boolean is + True if the write vectors were applied, False if not. The dict is + keyed by share number, and each value contains a list of strings, one + for each element of the read vector. - """ - return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS) + If the write_enabler is wrong, this will raise BadWriteEnablerError. + To enable share migration, the exception will have the nodeid used + for the old write enabler embedded in it, in the following string:: - def get_mutable_slot(storage_index=StorageIndex): - """This returns an empty dictionary if the server has no shares - of the slot mentioned.""" - return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS) + The write enabler was recorded by nodeid '%s'. - def readv_slots(storage_index=StorageIndex, readv=ReadVector): - """Read a vector from all shares associated with the given storage - index. Returns a dictionary with one key per share.""" - return DictOf(int, DataVector) # shnum -> results + """ + return TupleOf(bool, DictOf(int, ReadData)) class IStorageBucketWriter(Interface): def put_block(segmentnum=int, data=ShareData): diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index 8f05b302..bd9949ca 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -8,7 +8,7 @@ from twisted.internet import defer from zope.interface import implements from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \ - BadWriteEnablerError, RIMutableSlot + BadWriteEnablerError from allmydata.util import fileutil, idlib, mathutil from allmydata.util.assertutil import precondition, _assert @@ -240,13 +240,7 @@ class BucketReader(Referenceable): assert struct.calcsize("L"), 4 assert struct.calcsize("Q"), 8 -class MutableShareFile(Referenceable): - # note: at any given time, there should only be a single instance of this - # class per filename. More than one is likely to corrupt the container, - # because of state we cache in instance variables. This requires the - # StorageServer to use a WeakValueDictionary, indexed by filename. This - # could be improved by cacheing less and doing more IO. - implements(RIMutableSlot) +class MutableShareFile: DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s") EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8 @@ -548,12 +542,6 @@ class MutableShareFile(Referenceable): assert magic == self.MAGIC return (write_enabler, write_enabler_nodeid) - def remote_read(self, offset, length): - f = open(self.home, 'rb') - data = self._read_share_data(f, offset, length) - f.close() - return data - def readv(self, readv): datav = [] f = open(self.home, 'rb') @@ -562,36 +550,37 @@ class MutableShareFile(Referenceable): f.close() return datav - def remote_get_length(self): - f = open(self.home, 'rb') - data_length = self._read_data_length(f) - f.close() - return data_length +# def remote_get_length(self): +# f = open(self.home, 'rb') +# data_length = self._read_data_length(f) +# f.close() +# return data_length - def remote_testv_and_writev(self, write_enabler, testv, datav, new_length): + def check_write_enabler(self, write_enabler): f = open(self.home, 'rb+') (real_write_enabler, write_enabler_nodeid) = \ self._read_write_enabler_and_nodeid(f) + f.close() if write_enabler != real_write_enabler: # accomodate share migration by reporting the nodeid used for the # old write enabler. - f.close() msg = "The write enabler was recorded by nodeid '%s'." % \ (idlib.b2a(write_enabler_nodeid),) raise BadWriteEnablerError(msg) - # check testv - test_results_v = [] - test_failed = False + def check_testv(self, testv): + test_good = True + f = open(self.home, 'rb+') for (offset, length, operator, specimen) in testv: data = self._read_share_data(f, offset, length) - test_results_v.append(data) if not self.compare(data, operator, specimen): - test_failed = True - if test_failed: - f.close() - return (False, test_results_v) - # now apply the write vector + test_good = False + break + f.close() + return test_good + + def writev(self, datav, new_length): + f = open(self.home, 'rb+') for (offset, data) in datav: self._write_share_data(f, offset, data) if new_length is not None: @@ -599,12 +588,36 @@ class MutableShareFile(Referenceable): f.seek(self.DATA_LENGTH_OFFSET) f.write(struct.pack(">Q", new_length)) f.close() - return (True, test_results_v) def compare(self, a, op, b): - assert op in ("nop", "lt", "le", "eq", "ne", "ge", "gt") - if op == "nop": - return True + assert op in ("lt", "le", "eq", "ne", "ge", "gt") + if op == "lt": + return a < b + if op == "le": + return a <= b + if op == "eq": + return a == b + if op == "ne": + return a != b + if op == "ge": + return a >= b + if op == "gt": + return a > b + # never reached + +class EmptyShare: + + def check_testv(self, testv): + test_good = True + for (offset, length, operator, specimen) in testv: + data = "" + if not self.compare(data, operator, specimen): + test_good = False + break + return test_good + + def compare(self, a, op, b): + assert op in ("lt", "le", "eq", "ne", "ge", "gt") if op == "lt": return a < b if op == "le": @@ -842,51 +855,83 @@ class StorageServer(service.MultiService, Referenceable): except StopIteration: return iter([]) - def remote_allocate_mutable_slot(self, storage_index, - write_enabler, - renew_secret, cancel_secret, - sharenums, - allocated_size): - my_nodeid = self.my_nodeid - sharenums = set(sharenums) - shares = self.remote_get_mutable_slot(storage_index) - existing_shnums = set(shares.keys()) - si_s = idlib.b2a(storage_index) - bucketdir = os.path.join(self.sharedir, si_s) - fileutil.make_dirs(bucketdir) - for shnum in (sharenums - existing_shnums): - filename = os.path.join(bucketdir, "%d" % shnum) - shares[shnum] = create_mutable_sharefile(filename, my_nodeid, - write_enabler) - - # update the lease on everything - ownerid = 1 # TODO - expire_time = time.time() + 31*24*60*60 # one month - anid = my_nodeid - lease_info = (ownerid, expire_time, renew_secret, cancel_secret, anid) - for share in shares.values(): - share.add_or_renew_lease(lease_info) - return shares - - def remote_get_mutable_slot(self, storage_index): - """This returns an empty dictionary if the server has no shares - of the slot mentioned.""" + def remote_slot_testv_and_readv_and_writev(self, storage_index, + secrets, + test_and_write_vectors, + read_vector): si_s = idlib.b2a(storage_index) + (write_enabler, renew_secret, cancel_secret) = secrets # shares exist if there is a file for them bucketdir = os.path.join(self.sharedir, si_s) - if not os.path.isdir(bucketdir): - return {} - slots = {} - for sharenum_s in os.listdir(bucketdir): - try: - sharenum = int(sharenum_s) - except ValueError: - continue - filename = os.path.join(bucketdir, sharenum_s) - slots[sharenum] = MutableShareFile(filename) - return slots + shares = {} + if os.path.isdir(bucketdir): + for sharenum_s in os.listdir(bucketdir): + try: + sharenum = int(sharenum_s) + except ValueError: + continue + filename = os.path.join(bucketdir, sharenum_s) + msf = MutableShareFile(filename) + msf.check_write_enabler(write_enabler) + shares[sharenum] = msf + # write_enabler is good for all existing shares. + + # Now evaluate test vectors. + testv_is_good = True + for sharenum in test_and_write_vectors: + (testv, datav, new_length) = test_and_write_vectors[sharenum] + if sharenum in shares: + if not shares[sharenum].check_testv(testv): + testv_is_good = False + break + else: + # compare the vectors against an empty share, in which all + # reads return empty strings. + if not EmptyShare().check_testv(testv): + testv_is_good = False + break + + # now gather the read vectors, before we do any writes + read_data = {} + for sharenum, share in shares.items(): + read_data[sharenum] = share.readv(read_vector) + + if testv_is_good: + # now apply the write vectors + for sharenum in test_and_write_vectors: + (testv, datav, new_length) = test_and_write_vectors[sharenum] + if sharenum not in shares: + # allocate a new share + allocated_size = 2000 # arbitrary, really + share = self._allocate_slot_share(bucketdir, secrets, + sharenum, + allocated_size, + owner_num=0) + shares[sharenum] = share + shares[sharenum].writev(datav, new_length) + # and update the leases on all shares + ownerid = 1 # TODO + expire_time = time.time() + 31*24*60*60 # one month + my_nodeid = self.my_nodeid + anid = my_nodeid + lease_info = (ownerid, expire_time, renew_secret, cancel_secret, + anid) + for share in shares.values(): + share.add_or_renew_lease(lease_info) + + # all done + return (testv_is_good, read_data) + + def _allocate_slot_share(self, bucketdir, secrets, sharenum, + allocated_size, owner_num=0): + (write_enabler, renew_secret, cancel_secret) = secrets + my_nodeid = self.my_nodeid + fileutil.make_dirs(bucketdir) + filename = os.path.join(bucketdir, "%d" % sharenum) + share = create_mutable_sharefile(filename, my_nodeid, write_enabler) + return share - def remote_readv_slots(self, storage_index, readv): + def remote_slot_readv(self, storage_index, shares, readv): si_s = idlib.b2a(storage_index) # shares exist if there is a file for them bucketdir = os.path.join(self.sharedir, si_s) @@ -898,9 +943,10 @@ class StorageServer(service.MultiService, Referenceable): sharenum = int(sharenum_s) except ValueError: continue - filename = os.path.join(bucketdir, sharenum_s) - msf = MutableShareFile(filename) - datavs[sharenum] = msf.readv(readv) + if sharenum in shares or not shares: + filename = os.path.join(bucketdir, sharenum_s) + msf = MutableShareFile(filename) + datavs[sharenum] = msf.readv(readv) return datavs diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 4716a389..6e734390 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -9,7 +9,7 @@ import itertools from allmydata import interfaces from allmydata.util import fileutil, hashutil, idlib from allmydata.storage import BucketWriter, BucketReader, \ - WriteBucketProxy, ReadBucketProxy, StorageServer + WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile from allmydata.interfaces import BadWriteEnablerError class Bucket(unittest.TestCase): @@ -476,228 +476,275 @@ class MutableServer(unittest.TestCase): write_enabler = self.write_enabler(we_tag) renew_secret = self.renew_secret(lease_tag) cancel_secret = self.cancel_secret(lease_tag) - return ss.remote_allocate_mutable_slot(storage_index, - write_enabler, - renew_secret, cancel_secret, - sharenums, size) + rstaraw = ss.remote_slot_testv_and_readv_and_writev + testandwritev = dict( [ (shnum, ([], [], None) ) + for shnum in sharenums ] ) + readv = [] + rc = rstaraw(storage_index, + (write_enabler, renew_secret, cancel_secret), + testandwritev, + readv) + (did_write, readv_data) = rc + self.failUnless(did_write) + self.failUnless(isinstance(readv_data, dict)) + self.failUnlessEqual(len(readv_data), 0) def test_allocate(self): ss = self.create("test_allocate") - shares = self.allocate(ss, "si1", "we1", self._secret.next(), + self.allocate(ss, "si1", "we1", self._secret.next(), set([0,1,2]), 100) - self.failUnlessEqual(len(shares), 3) - self.failUnlessEqual(set(shares.keys()), set([0,1,2])) - shares2 = ss.remote_get_mutable_slot("si1") - self.failUnlessEqual(len(shares2), 3) - self.failUnlessEqual(set(shares2.keys()), set([0,1,2])) - - s0 = shares[0] - self.failUnlessEqual(s0.remote_read(0, 10), "") - self.failUnlessEqual(s0.remote_read(100, 10), "") - # try writing to one - WE = self.write_enabler("we1") - data = "".join([ ("%d" % i) * 10 for i in range(10) ]) - answer = s0.remote_testv_and_writev(WE, - [], - [(0, data),], - new_length=None) - self.failUnlessEqual(answer, (True, [])) - self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111") - self.failUnlessEqual(s0.remote_read(95, 10), "99999") - self.failUnlessEqual(s0.remote_get_length(), 100) + read = ss.remote_slot_readv + self.failUnlessEqual(read("si1", [0], [(0, 10)]), + {0: [""]}) + self.failUnlessEqual(read("si1", [], [(0, 10)]), + {0: [""], 1: [""], 2: [""]}) + self.failUnlessEqual(read("si1", [0], [(100, 10)]), + {0: [""]}) + # try writing to one + secrets = ( self.write_enabler("we1"), + self.renew_secret("we1"), + self.cancel_secret("we1") ) + data = "".join([ ("%d" % i) * 10 for i in range(10) ]) + write = ss.remote_slot_testv_and_readv_and_writev + answer = write("si1", secrets, + {0: ([], [(0,data)], None)}, + []) + self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) ) + + self.failUnlessEqual(read("si1", [0], [(0,20)]), + {0: ["00000000001111111111"]}) + self.failUnlessEqual(read("si1", [0], [(95,10)]), + {0: ["99999"]}) + #self.failUnlessEqual(s0.remote_get_length(), 100) + + bad_secrets = ("bad write enabler", secrets[1], secrets[2]) self.failUnlessRaises(BadWriteEnablerError, - s0.remote_testv_and_writev, - "bad write enabler", - [], [], None) + write, "si1", bad_secrets, + {}, []) + # this testv should fail - answer = s0.remote_testv_and_writev(WE, - [(0, 12, "eq", "444444444444"), - (20, 5, "eq", "22222"), - ], - [(0, "x"*100)], None) - self.failUnlessEqual(answer, (False, ["000000000011", - "22222"])) - self.failUnlessEqual(s0.remote_read(0, 100), data) + answer = write("si1", secrets, + {0: ([(0, 12, "eq", "444444444444"), + (20, 5, "eq", "22222"), + ], + [(0, "x"*100)], + None), + }, + [(0,12), (20,5)], + ) + self.failUnlessEqual(answer, (False, + {0: ["000000000011", "22222"], + 1: ["", ""], + 2: ["", ""], + })) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]}) # as should this one - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "lt", "11111"), - ], - [(0, "x"*100)], None) - self.failUnlessEqual(answer, (False, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), data) + answer = write("si1", secrets, + {0: ([(10, 5, "lt", "11111"), + ], + [(0, "x"*100)], + None), + }, + [(10,5)], + ) + self.failUnlessEqual(answer, (False, + {0: ["11111"], + 1: [""], + 2: [""]}, + )) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]}) def test_operators(self): # test operators, the data we're comparing is '11111' in all cases. # test both fail+pass, reset data after each one. ss = self.create("test_operators") - shares = self.allocate(ss, "si1", "we1", self._secret.next(), - set([0,1,2]), 100) - s0 = shares[0] - WE = self.write_enabler("we1") + + secrets = ( self.write_enabler("we1"), + self.renew_secret("we1"), + self.cancel_secret("we1") ) data = "".join([ ("%d" % i) * 10 for i in range(10) ]) - answer = s0.remote_testv_and_writev(WE, - [], - [(0, data),], - new_length=None) - - # nop - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "nop", "11111"), - ], - [(0, "x"*100)], None) - self.failUnlessEqual(answer, (True, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), "x"*100) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) + write = ss.remote_slot_testv_and_readv_and_writev + read = ss.remote_slot_readv + + def reset(): + write("si1", secrets, + {0: ([], [(0,data)], None)}, + []) + + reset() # lt - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "lt", "11110"), + answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"), ], - [(0, "x"*100)], None) - self.failUnlessEqual(answer, (False, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), data) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) - - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "lt", "11111"), + [(0, "x"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (False, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]}) + self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]}) + reset() + + answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"), ], - [(0, "x"*100)], None) - self.failUnlessEqual(answer, (False, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), data) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) - - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "lt", "11112"), + [(0, "x"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (False, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]}) + reset() + + answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"), ], - [(0, "y"*100)], None) - self.failUnlessEqual(answer, (True, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) + [(0, "y"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (True, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]}) + reset() # le - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "le", "11110"), + answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"), ], - [(0, "x"*100)], None) - self.failUnlessEqual(answer, (False, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), data) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) - - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "le", "11111"), + [(0, "x"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (False, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]}) + reset() + + answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"), ], - [(0, "y"*100)], None) - self.failUnlessEqual(answer, (True, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) - - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "le", "11112"), + [(0, "y"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (True, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]}) + reset() + + answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"), ], - [(0, "y"*100)], None) - self.failUnlessEqual(answer, (True, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) + [(0, "y"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (True, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]}) + reset() # eq - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "eq", "11112"), + answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"), ], - [(0, "x"*100)], None) - self.failUnlessEqual(answer, (False, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), data) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) - - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "eq", "11111"), + [(0, "x"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (False, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]}) + reset() + + answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"), ], - [(0, "y"*100)], None) - self.failUnlessEqual(answer, (True, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) + [(0, "y"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (True, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]}) + reset() # ne - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "ne", "11111"), + answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"), ], - [(0, "x"*100)], None) - self.failUnlessEqual(answer, (False, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), data) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) - - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "ne", "11112"), + [(0, "x"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (False, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]}) + reset() + + answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"), ], - [(0, "y"*100)], None) - self.failUnlessEqual(answer, (True, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) + [(0, "y"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (True, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]}) + reset() # ge - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "ge", "11110"), + answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"), ], - [(0, "y"*100)], None) - self.failUnlessEqual(answer, (True, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) - - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "ge", "11111"), + [(0, "y"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (True, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]}) + reset() + + answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"), ], - [(0, "y"*100)], None) - self.failUnlessEqual(answer, (True, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) - - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "ge", "11112"), + [(0, "y"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (True, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]}) + reset() + + answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"), ], - [(0, "y"*100)], None) - self.failUnlessEqual(answer, (False, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), data) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) + [(0, "y"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (False, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]}) + reset() # gt - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "gt", "11110"), + answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"), ], - [(0, "y"*100)], None) - self.failUnlessEqual(answer, (True, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) - - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "gt", "11111"), + [(0, "y"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (True, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]}) + reset() + + answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"), ], - [(0, "x"*100)], None) - self.failUnlessEqual(answer, (False, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), data) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) - - answer = s0.remote_testv_and_writev(WE, - [(10, 5, "gt", "11112"), + [(0, "x"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (False, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]}) + reset() + + answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"), ], - [(0, "x"*100)], None) - self.failUnlessEqual(answer, (False, ["11111"])) - self.failUnlessEqual(s0.remote_read(0, 100), data) - s0.remote_testv_and_writev(WE, [], [(0,data)], None) + [(0, "x"*100)], + None, + )}, [(10,5)]) + self.failUnlessEqual(answer, (False, {0: ["11111"]})) + self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]}) + reset() def test_readv(self): - ss = self.create("test_allocate") - shares = self.allocate(ss, "si1", "we1", self._secret.next(), - set([0,1,2]), 100) - WE = self.write_enabler("we1") + ss = self.create("test_readv") + secrets = ( self.write_enabler("we1"), + self.renew_secret("we1"), + self.cancel_secret("we1") ) + data = "".join([ ("%d" % i) * 10 for i in range(10) ]) + write = ss.remote_slot_testv_and_readv_and_writev + read = ss.remote_slot_readv data = [("%d" % i) * 100 for i in range(3)] - for i in range(3): - rc = shares[i].remote_testv_and_writev(WE, [], [(0, data[i])], - new_length=None) - self.failUnlessEqual(rc, (True, [])) - answer = ss.remote_readv_slots("si1", [(0, 10)]) + rc = write("si1", secrets, + {0: ([], [(0,data[0])], None), + 1: ([], [(0,data[1])], None), + 2: ([], [(0,data[2])], None), + }, []) + self.failUnlessEqual(rc, (True, {})) + + answer = read("si1", [], [(0, 10)]) self.failUnlessEqual(answer, {0: ["0"*10], 1: ["1"*10], 2: ["2"*10]}) @@ -716,15 +763,15 @@ class MutableServer(unittest.TestCase): def test_leases(self): ss = self.create("test_leases") - secret = 14 - shares = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100) - s0 = shares[0] - WE = self.write_enabler("we1") + def secrets(n): + return ( self.write_enabler("we1"), + self.renew_secret("we1-%d" % n), + self.cancel_secret("we1-%d" % n) ) data = "".join([ ("%d" % i) * 10 for i in range(10) ]) - answer = s0.remote_testv_and_writev(WE, - [], - [(0, data),], - new_length=None) + write = ss.remote_slot_testv_and_readv_and_writev + read = ss.remote_slot_readv + rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, []) + self.failUnlessEqual(rc, (True, {})) # create a random non-numeric file in the bucket directory, to # exercise the code that's supposed to ignore those. @@ -736,40 +783,41 @@ class MutableServer(unittest.TestCase): # re-allocate the slots and use the same secrets, that should update # the lease - shares2 = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100) + write("si1", secrets(0), {0: ([], [(0,data)], None)}, []) # renew it directly - ss.remote_renew_lease("si1", self.renew_secret(secret)) + ss.remote_renew_lease("si1", secrets(0)[1]) # now allocate them with a bunch of different secrets, to trigger the # extended lease code - shares2 = self.allocate(ss, "si1", "we1", secret+1, set([0,1,2]), 100) - shares2 = self.allocate(ss, "si1", "we1", secret+2, set([0,1,2]), 100) - shares2 = self.allocate(ss, "si1", "we1", secret+3, set([0,1,2]), 100) - shares2 = self.allocate(ss, "si1", "we1", secret+4, set([0,1,2]), 100) - shares2 = self.allocate(ss, "si1", "we1", secret+5, set([0,1,2]), 100) + write("si1", secrets(1), {0: ([], [(0,data)], None)}, []) + write("si1", secrets(2), {0: ([], [(0,data)], None)}, []) + write("si1", secrets(3), {0: ([], [(0,data)], None)}, []) + write("si1", secrets(4), {0: ([], [(0,data)], None)}, []) + write("si1", secrets(5), {0: ([], [(0,data)], None)}, []) + # cancel one of them - ss.remote_cancel_lease("si1", self.cancel_secret(secret+5)) + ss.remote_cancel_lease("si1", secrets(5)[2]) + s0 = MutableShareFile(os.path.join(bucket_dir, "0")) all_leases = s0.debug_get_leases() self.failUnlessEqual(len(all_leases), 5) # and write enough data to expand the container, forcing the server # to move the leases - answer = s0.remote_testv_and_writev(WE, - [], - [(0, data),], - new_length=200) + write("si1", secrets(0), + {0: ([], [(0,data)], 200), }, + []) # read back the leases, make sure they're still intact. self.compare_leases_without_timestamps(all_leases, s0.debug_get_leases()) - ss.remote_renew_lease("si1", self.renew_secret(secret)) - ss.remote_renew_lease("si1", self.renew_secret(secret+1)) - ss.remote_renew_lease("si1", self.renew_secret(secret+2)) - ss.remote_renew_lease("si1", self.renew_secret(secret+3)) - ss.remote_renew_lease("si1", self.renew_secret(secret+4)) + ss.remote_renew_lease("si1", secrets(0)[1]) + ss.remote_renew_lease("si1", secrets(1)[1]) + ss.remote_renew_lease("si1", secrets(2)[1]) + ss.remote_renew_lease("si1", secrets(3)[1]) + ss.remote_renew_lease("si1", secrets(4)[1]) self.compare_leases_without_timestamps(all_leases, s0.debug_get_leases()) # get a new copy of the leases, with the current timestamps. Reading @@ -782,40 +830,40 @@ class MutableServer(unittest.TestCase): # is present, to provide for share migration self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", - self.renew_secret(secret+20)) + secrets(20)[1]) # same for cancelling self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si1", - self.cancel_secret(secret+20)) + secrets(20)[2]) self.failUnlessEqual(all_leases, s0.debug_get_leases()) - s0.remote_read(0, 200) + + # reading shares should not modify the timestamp + read("si1", [], [(0,200)]) self.failUnlessEqual(all_leases, s0.debug_get_leases()) - answer = s0.remote_testv_and_writev(WE, - [], - [(200, "make me bigger"),], - new_length=None) + write("si1", secrets(0), + {0: ([], [(200, "make me bigger")], None)}, []) self.compare_leases_without_timestamps(all_leases, s0.debug_get_leases()) - answer = s0.remote_testv_and_writev(WE, - [], - [(500, "make me really bigger"),], - new_length=None) + write("si1", secrets(0), + {0: ([], [(500, "make me really bigger")], None)}, []) self.compare_leases_without_timestamps(all_leases, s0.debug_get_leases()) # now cancel them all - ss.remote_cancel_lease("si1", self.cancel_secret(secret)) - ss.remote_cancel_lease("si1", self.cancel_secret(secret+1)) - ss.remote_cancel_lease("si1", self.cancel_secret(secret+2)) - ss.remote_cancel_lease("si1", self.cancel_secret(secret+3)) + ss.remote_cancel_lease("si1", secrets(0)[2]) + ss.remote_cancel_lease("si1", secrets(1)[2]) + ss.remote_cancel_lease("si1", secrets(2)[2]) + ss.remote_cancel_lease("si1", secrets(3)[2]) + # the slot should still be there - shares3 = ss.remote_get_mutable_slot("si1") - self.failUnlessEqual(len(shares3), 3) + remaining_shares = read("si1", [], [(0,10)]) + self.failUnlessEqual(len(remaining_shares), 1) self.failUnlessEqual(len(s0.debug_get_leases()), 1) - ss.remote_cancel_lease("si1", self.cancel_secret(secret+4)) + ss.remote_cancel_lease("si1", secrets(4)[2]) # now the slot should be gone - self.failUnlessEqual(ss.remote_get_mutable_slot("si1"), {}) + no_shares = read("si1", [], [(0,10)]) + self.failUnlessEqual(no_shares, {})