TestVector = ListOf(TupleOf(int, int, str, str))
# elements are (offset, length, operator, specimen)
-# operator is one of "lt, le, eq, ne, ge, gt, nop"
+# operator is one of "lt, le, eq, ne, ge, gt"
# nop always passes and is used to fetch data while writing.
# you should use length==len(specimen) for everything except nop
DataVector = ListOf(TupleOf(int, ShareData))
# (offset, data). This limits us to 30 writes of 1MiB each per call
+TestAndWriteVectorsForShares = DictOf(int,
+ TupleOf(TestVector,
+ DataVector,
+ ChoiceOf(None, int))) # new_length
ReadVector = ListOf(TupleOf(int, int))
-TestResults = ListOf(str)
+ReadData = ListOf(ShareData)
# returns data[offset:offset+length] for each element of TestVector
-class RIMutableSlot(RemoteInterface):
- def testv_and_writev(write_enabler=Hash,
- testv=TestVector,
- datav=DataVector,
- new_length=ChoiceOf(None, int)):
- """General-purpose test-and-set operation for mutable slots. Perform
- the given comparisons. If they all pass, then apply the write vector.
-
- If new_length is not None, use it to set the size of the container.
- This can be used to pre-allocate space for a series of upcoming
- writes, or truncate existing data. If the container is growing,
- new_length will be applied before datav. If the container is
- shrinking, it will be applied afterwards.
-
- Return the old data that was used for the comparisons.
-
- The boolean return value is True if the write vector was applied,
- false if not.
-
- If the write_enabler is wrong, this will raise BadWriteEnablerError.
- To enable share migration, the exception will have the nodeid used
- for the old write enabler embedded in it, in the following string::
-
- The write enabler was recorded by nodeid '%s'.
-
- """
- return TupleOf(bool, TestResults)
-
- def read(offset=int, length=int):
- return ShareData
-
- def get_length():
- return int
-
class RIStorageServer(RemoteInterface):
def allocate_buckets(storage_index=StorageIndex,
renew_secret=LeaseRenewSecret,
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
- def allocate_mutable_slot(storage_index=StorageIndex,
- write_enabler=Hash,
- renew_secret=LeaseRenewSecret,
- cancel_secret=LeaseCancelSecret,
- sharenums=SetOf(int, maxLength=MAX_BUCKETS),
- allocated_size=int):
- """
+
+ def slot_readv(storage_index=StorageIndex,
+ shares=ListOf(int), readv=ReadVector):
+ """Read a vector from the numbered shares associated with the given
+ storage index. An empty shares list means to return data from all
+ known shares. Returns a dictionary with one key per share."""
+ return DictOf(int, DataVector) # shnum -> results
+
+ def slot_testv_and_readv_and_writev(storage_index=StorageIndex,
+ secrets=TupleOf(Hash, Hash, Hash),
+ tw_vectors=TestAndWriteVectorsForShares,
+ r_vector=ReadVector,
+ ):
+ """General-purpose test-and-set operation for mutable slots. Perform
+ a bunch of comparisons against the existing shares. If they all pass,
+ then apply a bunch of write vectors to those shares. Then use the
+ read vectors to extract data from all the shares and return the data.
+
+ This method is, um, large. The goal is to allow clients to update all
+ the shares associated with a mutable file in a single round trip.
+
@param storage_index: the index of the bucket to be created or
increfed.
@param write_enabler: a secret that is stored along with the slot.
stored for later comparison by the server. Each
server is given a different secret.
@param cancel_secret: Like renew_secret, but protects bucket decref.
- @param sharenums: these are the share numbers (probably between 0 and
- 99) that the sender is proposing to store on this
- server.
- @param allocated_size: all shares will pre-allocate this many bytes.
- Use this to a) confirm that you can claim as
- much space as you want before you actually
- send the data, and b) reduce the disk-IO cost
- of doing incremental writes.
- @return: dict mapping sharenum to slot. The return value may include
- more sharenums than asked, if some shares already existed.
- New leases are added for all
- shares.
+ The 'secrets' argument is a tuple of (write_enabler, renew_secret,
+ cancel_secret). The first is required to perform any write. The
+ latter two are used when allocating new shares. To simply acquire a
+ new lease on existing shares, use an empty testv and an empty writev.
+
+ Each share can have a separate test vector (i.e. a list of
+ comparisons to perform). If all vectors for all shares pass, then all
+ writes for all shares are recorded. Each comparison is a 4-tuple of
+ (offset, length, operator, specimen), which effectively does a
+ read(offset, length) and then compares the result against the
+ specimen using the given equality/inequality operator. Reads from the
+ end of the container are truncated, and missing shares behave like
+ empty ones, so to assert that a share doesn't exist (for use when
+ creating a new share), use (0, 1, 'eq', '').
+
+ The write vector will be applied to the given share, expanding it if
+ necessary. A write vector applied to a share number that did not
+ exist previously will cause that share to be created.
+
+ Each write vector is accompanied by a 'new_length' argument. If
+ new_length is not None, use it to set the size of the container. This
+ can be used to pre-allocate space for a series of upcoming writes, or
+ truncate existing data. If the container is growing, new_length will
+ be applied before datav. If the container is shrinking, it will be
+ applied afterwards.
+
+ The read vector is used to extract data from all known shares,
+ *before* any writes have been applied. The same vector is used for
+ all shares. This captures the state that was tested by the test
+ vector.
+
+ This method returns two values: a boolean and a dict. The boolean is
+ True if the write vectors were applied, False if not. The dict is
+ keyed by share number, and each value contains a list of strings, one
+ for each element of the read vector.
- """
- return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS)
+ If the write_enabler is wrong, this will raise BadWriteEnablerError.
+ To enable share migration, the exception will have the nodeid used
+ for the old write enabler embedded in it, in the following string::
- def get_mutable_slot(storage_index=StorageIndex):
- """This returns an empty dictionary if the server has no shares
- of the slot mentioned."""
- return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS)
+ The write enabler was recorded by nodeid '%s'.
- def readv_slots(storage_index=StorageIndex, readv=ReadVector):
- """Read a vector from all shares associated with the given storage
- index. Returns a dictionary with one key per share."""
- return DictOf(int, DataVector) # shnum -> results
+ """
+ return TupleOf(bool, DictOf(int, ReadData))
class IStorageBucketWriter(Interface):
def put_block(segmentnum=int, data=ShareData):
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
- BadWriteEnablerError, RIMutableSlot
+ BadWriteEnablerError
from allmydata.util import fileutil, idlib, mathutil
from allmydata.util.assertutil import precondition, _assert
assert struct.calcsize("L"), 4
assert struct.calcsize("Q"), 8
-class MutableShareFile(Referenceable):
- # note: at any given time, there should only be a single instance of this
- # class per filename. More than one is likely to corrupt the container,
- # because of state we cache in instance variables. This requires the
- # StorageServer to use a WeakValueDictionary, indexed by filename. This
- # could be improved by cacheing less and doing more IO.
- implements(RIMutableSlot)
+class MutableShareFile:
DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s")
EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
assert magic == self.MAGIC
return (write_enabler, write_enabler_nodeid)
- def remote_read(self, offset, length):
- f = open(self.home, 'rb')
- data = self._read_share_data(f, offset, length)
- f.close()
- return data
-
def readv(self, readv):
datav = []
f = open(self.home, 'rb')
f.close()
return datav
- def remote_get_length(self):
- f = open(self.home, 'rb')
- data_length = self._read_data_length(f)
- f.close()
- return data_length
+# def remote_get_length(self):
+# f = open(self.home, 'rb')
+# data_length = self._read_data_length(f)
+# f.close()
+# return data_length
- def remote_testv_and_writev(self, write_enabler, testv, datav, new_length):
+ def check_write_enabler(self, write_enabler):
f = open(self.home, 'rb+')
(real_write_enabler, write_enabler_nodeid) = \
self._read_write_enabler_and_nodeid(f)
+ f.close()
if write_enabler != real_write_enabler:
# accomodate share migration by reporting the nodeid used for the
# old write enabler.
- f.close()
msg = "The write enabler was recorded by nodeid '%s'." % \
(idlib.b2a(write_enabler_nodeid),)
raise BadWriteEnablerError(msg)
- # check testv
- test_results_v = []
- test_failed = False
+ def check_testv(self, testv):
+ test_good = True
+ f = open(self.home, 'rb+')
for (offset, length, operator, specimen) in testv:
data = self._read_share_data(f, offset, length)
- test_results_v.append(data)
if not self.compare(data, operator, specimen):
- test_failed = True
- if test_failed:
- f.close()
- return (False, test_results_v)
- # now apply the write vector
+ test_good = False
+ break
+ f.close()
+ return test_good
+
+ def writev(self, datav, new_length):
+ f = open(self.home, 'rb+')
for (offset, data) in datav:
self._write_share_data(f, offset, data)
if new_length is not None:
f.seek(self.DATA_LENGTH_OFFSET)
f.write(struct.pack(">Q", new_length))
f.close()
- return (True, test_results_v)
def compare(self, a, op, b):
- assert op in ("nop", "lt", "le", "eq", "ne", "ge", "gt")
- if op == "nop":
- return True
+ assert op in ("lt", "le", "eq", "ne", "ge", "gt")
+ if op == "lt":
+ return a < b
+ if op == "le":
+ return a <= b
+ if op == "eq":
+ return a == b
+ if op == "ne":
+ return a != b
+ if op == "ge":
+ return a >= b
+ if op == "gt":
+ return a > b
+ # never reached
+
+class EmptyShare:
+
+ def check_testv(self, testv):
+ test_good = True
+ for (offset, length, operator, specimen) in testv:
+ data = ""
+ if not self.compare(data, operator, specimen):
+ test_good = False
+ break
+ return test_good
+
+ def compare(self, a, op, b):
+ assert op in ("lt", "le", "eq", "ne", "ge", "gt")
if op == "lt":
return a < b
if op == "le":
except StopIteration:
return iter([])
- def remote_allocate_mutable_slot(self, storage_index,
- write_enabler,
- renew_secret, cancel_secret,
- sharenums,
- allocated_size):
- my_nodeid = self.my_nodeid
- sharenums = set(sharenums)
- shares = self.remote_get_mutable_slot(storage_index)
- existing_shnums = set(shares.keys())
- si_s = idlib.b2a(storage_index)
- bucketdir = os.path.join(self.sharedir, si_s)
- fileutil.make_dirs(bucketdir)
- for shnum in (sharenums - existing_shnums):
- filename = os.path.join(bucketdir, "%d" % shnum)
- shares[shnum] = create_mutable_sharefile(filename, my_nodeid,
- write_enabler)
-
- # update the lease on everything
- ownerid = 1 # TODO
- expire_time = time.time() + 31*24*60*60 # one month
- anid = my_nodeid
- lease_info = (ownerid, expire_time, renew_secret, cancel_secret, anid)
- for share in shares.values():
- share.add_or_renew_lease(lease_info)
- return shares
-
- def remote_get_mutable_slot(self, storage_index):
- """This returns an empty dictionary if the server has no shares
- of the slot mentioned."""
+ def remote_slot_testv_and_readv_and_writev(self, storage_index,
+ secrets,
+ test_and_write_vectors,
+ read_vector):
si_s = idlib.b2a(storage_index)
+ (write_enabler, renew_secret, cancel_secret) = secrets
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_s)
- if not os.path.isdir(bucketdir):
- return {}
- slots = {}
- for sharenum_s in os.listdir(bucketdir):
- try:
- sharenum = int(sharenum_s)
- except ValueError:
- continue
- filename = os.path.join(bucketdir, sharenum_s)
- slots[sharenum] = MutableShareFile(filename)
- return slots
+ shares = {}
+ if os.path.isdir(bucketdir):
+ for sharenum_s in os.listdir(bucketdir):
+ try:
+ sharenum = int(sharenum_s)
+ except ValueError:
+ continue
+ filename = os.path.join(bucketdir, sharenum_s)
+ msf = MutableShareFile(filename)
+ msf.check_write_enabler(write_enabler)
+ shares[sharenum] = msf
+ # write_enabler is good for all existing shares.
+
+ # Now evaluate test vectors.
+ testv_is_good = True
+ for sharenum in test_and_write_vectors:
+ (testv, datav, new_length) = test_and_write_vectors[sharenum]
+ if sharenum in shares:
+ if not shares[sharenum].check_testv(testv):
+ testv_is_good = False
+ break
+ else:
+ # compare the vectors against an empty share, in which all
+ # reads return empty strings.
+ if not EmptyShare().check_testv(testv):
+ testv_is_good = False
+ break
+
+ # now gather the read vectors, before we do any writes
+ read_data = {}
+ for sharenum, share in shares.items():
+ read_data[sharenum] = share.readv(read_vector)
+
+ if testv_is_good:
+ # now apply the write vectors
+ for sharenum in test_and_write_vectors:
+ (testv, datav, new_length) = test_and_write_vectors[sharenum]
+ if sharenum not in shares:
+ # allocate a new share
+ allocated_size = 2000 # arbitrary, really
+ share = self._allocate_slot_share(bucketdir, secrets,
+ sharenum,
+ allocated_size,
+ owner_num=0)
+ shares[sharenum] = share
+ shares[sharenum].writev(datav, new_length)
+ # and update the leases on all shares
+ ownerid = 1 # TODO
+ expire_time = time.time() + 31*24*60*60 # one month
+ my_nodeid = self.my_nodeid
+ anid = my_nodeid
+ lease_info = (ownerid, expire_time, renew_secret, cancel_secret,
+ anid)
+ for share in shares.values():
+ share.add_or_renew_lease(lease_info)
+
+ # all done
+ return (testv_is_good, read_data)
+
+ def _allocate_slot_share(self, bucketdir, secrets, sharenum,
+ allocated_size, owner_num=0):
+ (write_enabler, renew_secret, cancel_secret) = secrets
+ my_nodeid = self.my_nodeid
+ fileutil.make_dirs(bucketdir)
+ filename = os.path.join(bucketdir, "%d" % sharenum)
+ share = create_mutable_sharefile(filename, my_nodeid, write_enabler)
+ return share
- def remote_readv_slots(self, storage_index, readv):
+ def remote_slot_readv(self, storage_index, shares, readv):
si_s = idlib.b2a(storage_index)
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_s)
sharenum = int(sharenum_s)
except ValueError:
continue
- filename = os.path.join(bucketdir, sharenum_s)
- msf = MutableShareFile(filename)
- datavs[sharenum] = msf.readv(readv)
+ if sharenum in shares or not shares:
+ filename = os.path.join(bucketdir, sharenum_s)
+ msf = MutableShareFile(filename)
+ datavs[sharenum] = msf.readv(readv)
return datavs
from allmydata import interfaces
from allmydata.util import fileutil, hashutil, idlib
from allmydata.storage import BucketWriter, BucketReader, \
- WriteBucketProxy, ReadBucketProxy, StorageServer
+ WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile
from allmydata.interfaces import BadWriteEnablerError
class Bucket(unittest.TestCase):
write_enabler = self.write_enabler(we_tag)
renew_secret = self.renew_secret(lease_tag)
cancel_secret = self.cancel_secret(lease_tag)
- return ss.remote_allocate_mutable_slot(storage_index,
- write_enabler,
- renew_secret, cancel_secret,
- sharenums, size)
+ rstaraw = ss.remote_slot_testv_and_readv_and_writev
+ testandwritev = dict( [ (shnum, ([], [], None) )
+ for shnum in sharenums ] )
+ readv = []
+ rc = rstaraw(storage_index,
+ (write_enabler, renew_secret, cancel_secret),
+ testandwritev,
+ readv)
+ (did_write, readv_data) = rc
+ self.failUnless(did_write)
+ self.failUnless(isinstance(readv_data, dict))
+ self.failUnlessEqual(len(readv_data), 0)
def test_allocate(self):
ss = self.create("test_allocate")
- shares = self.allocate(ss, "si1", "we1", self._secret.next(),
+ self.allocate(ss, "si1", "we1", self._secret.next(),
set([0,1,2]), 100)
- self.failUnlessEqual(len(shares), 3)
- self.failUnlessEqual(set(shares.keys()), set([0,1,2]))
- shares2 = ss.remote_get_mutable_slot("si1")
- self.failUnlessEqual(len(shares2), 3)
- self.failUnlessEqual(set(shares2.keys()), set([0,1,2]))
-
- s0 = shares[0]
- self.failUnlessEqual(s0.remote_read(0, 10), "")
- self.failUnlessEqual(s0.remote_read(100, 10), "")
- # try writing to one
- WE = self.write_enabler("we1")
- data = "".join([ ("%d" % i) * 10 for i in range(10) ])
- answer = s0.remote_testv_and_writev(WE,
- [],
- [(0, data),],
- new_length=None)
- self.failUnlessEqual(answer, (True, []))
- self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111")
- self.failUnlessEqual(s0.remote_read(95, 10), "99999")
- self.failUnlessEqual(s0.remote_get_length(), 100)
+ read = ss.remote_slot_readv
+ self.failUnlessEqual(read("si1", [0], [(0, 10)]),
+ {0: [""]})
+ self.failUnlessEqual(read("si1", [], [(0, 10)]),
+ {0: [""], 1: [""], 2: [""]})
+ self.failUnlessEqual(read("si1", [0], [(100, 10)]),
+ {0: [""]})
+ # try writing to one
+ secrets = ( self.write_enabler("we1"),
+ self.renew_secret("we1"),
+ self.cancel_secret("we1") )
+ data = "".join([ ("%d" % i) * 10 for i in range(10) ])
+ write = ss.remote_slot_testv_and_readv_and_writev
+ answer = write("si1", secrets,
+ {0: ([], [(0,data)], None)},
+ [])
+ self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
+
+ self.failUnlessEqual(read("si1", [0], [(0,20)]),
+ {0: ["00000000001111111111"]})
+ self.failUnlessEqual(read("si1", [0], [(95,10)]),
+ {0: ["99999"]})
+ #self.failUnlessEqual(s0.remote_get_length(), 100)
+
+ bad_secrets = ("bad write enabler", secrets[1], secrets[2])
self.failUnlessRaises(BadWriteEnablerError,
- s0.remote_testv_and_writev,
- "bad write enabler",
- [], [], None)
+ write, "si1", bad_secrets,
+ {}, [])
+
# this testv should fail
- answer = s0.remote_testv_and_writev(WE,
- [(0, 12, "eq", "444444444444"),
- (20, 5, "eq", "22222"),
- ],
- [(0, "x"*100)], None)
- self.failUnlessEqual(answer, (False, ["000000000011",
- "22222"]))
- self.failUnlessEqual(s0.remote_read(0, 100), data)
+ answer = write("si1", secrets,
+ {0: ([(0, 12, "eq", "444444444444"),
+ (20, 5, "eq", "22222"),
+ ],
+ [(0, "x"*100)],
+ None),
+ },
+ [(0,12), (20,5)],
+ )
+ self.failUnlessEqual(answer, (False,
+ {0: ["000000000011", "22222"],
+ 1: ["", ""],
+ 2: ["", ""],
+ }))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
# as should this one
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "lt", "11111"),
- ],
- [(0, "x"*100)], None)
- self.failUnlessEqual(answer, (False, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), data)
+ answer = write("si1", secrets,
+ {0: ([(10, 5, "lt", "11111"),
+ ],
+ [(0, "x"*100)],
+ None),
+ },
+ [(10,5)],
+ )
+ self.failUnlessEqual(answer, (False,
+ {0: ["11111"],
+ 1: [""],
+ 2: [""]},
+ ))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
def test_operators(self):
# test operators, the data we're comparing is '11111' in all cases.
# test both fail+pass, reset data after each one.
ss = self.create("test_operators")
- shares = self.allocate(ss, "si1", "we1", self._secret.next(),
- set([0,1,2]), 100)
- s0 = shares[0]
- WE = self.write_enabler("we1")
+
+ secrets = ( self.write_enabler("we1"),
+ self.renew_secret("we1"),
+ self.cancel_secret("we1") )
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
- answer = s0.remote_testv_and_writev(WE,
- [],
- [(0, data),],
- new_length=None)
-
- # nop
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "nop", "11111"),
- ],
- [(0, "x"*100)], None)
- self.failUnlessEqual(answer, (True, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), "x"*100)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+ write = ss.remote_slot_testv_and_readv_and_writev
+ read = ss.remote_slot_readv
+
+ def reset():
+ write("si1", secrets,
+ {0: ([], [(0,data)], None)},
+ [])
+
+ reset()
# lt
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "lt", "11110"),
+ answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
],
- [(0, "x"*100)], None)
- self.failUnlessEqual(answer, (False, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), data)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "lt", "11111"),
+ [(0, "x"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+ self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
+ reset()
+
+ answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
],
- [(0, "x"*100)], None)
- self.failUnlessEqual(answer, (False, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), data)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "lt", "11112"),
+ [(0, "x"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+ reset()
+
+ answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
],
- [(0, "y"*100)], None)
- self.failUnlessEqual(answer, (True, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+ [(0, "y"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+ reset()
# le
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "le", "11110"),
+ answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
],
- [(0, "x"*100)], None)
- self.failUnlessEqual(answer, (False, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), data)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "le", "11111"),
+ [(0, "x"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+ reset()
+
+ answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
],
- [(0, "y"*100)], None)
- self.failUnlessEqual(answer, (True, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "le", "11112"),
+ [(0, "y"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+ reset()
+
+ answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
],
- [(0, "y"*100)], None)
- self.failUnlessEqual(answer, (True, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+ [(0, "y"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+ reset()
# eq
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "eq", "11112"),
+ answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
],
- [(0, "x"*100)], None)
- self.failUnlessEqual(answer, (False, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), data)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "eq", "11111"),
+ [(0, "x"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+ reset()
+
+ answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
],
- [(0, "y"*100)], None)
- self.failUnlessEqual(answer, (True, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+ [(0, "y"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+ reset()
# ne
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "ne", "11111"),
+ answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
],
- [(0, "x"*100)], None)
- self.failUnlessEqual(answer, (False, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), data)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "ne", "11112"),
+ [(0, "x"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+ reset()
+
+ answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
],
- [(0, "y"*100)], None)
- self.failUnlessEqual(answer, (True, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+ [(0, "y"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+ reset()
# ge
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "ge", "11110"),
+ answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
],
- [(0, "y"*100)], None)
- self.failUnlessEqual(answer, (True, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "ge", "11111"),
+ [(0, "y"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+ reset()
+
+ answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
],
- [(0, "y"*100)], None)
- self.failUnlessEqual(answer, (True, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "ge", "11112"),
+ [(0, "y"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+ reset()
+
+ answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
],
- [(0, "y"*100)], None)
- self.failUnlessEqual(answer, (False, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), data)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+ [(0, "y"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+ reset()
# gt
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "gt", "11110"),
+ answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
],
- [(0, "y"*100)], None)
- self.failUnlessEqual(answer, (True, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "gt", "11111"),
+ [(0, "y"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (True, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
+ reset()
+
+ answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
],
- [(0, "x"*100)], None)
- self.failUnlessEqual(answer, (False, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), data)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
-
- answer = s0.remote_testv_and_writev(WE,
- [(10, 5, "gt", "11112"),
+ [(0, "x"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+ reset()
+
+ answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
],
- [(0, "x"*100)], None)
- self.failUnlessEqual(answer, (False, ["11111"]))
- self.failUnlessEqual(s0.remote_read(0, 100), data)
- s0.remote_testv_and_writev(WE, [], [(0,data)], None)
+ [(0, "x"*100)],
+ None,
+ )}, [(10,5)])
+ self.failUnlessEqual(answer, (False, {0: ["11111"]}))
+ self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
+ reset()
def test_readv(self):
- ss = self.create("test_allocate")
- shares = self.allocate(ss, "si1", "we1", self._secret.next(),
- set([0,1,2]), 100)
- WE = self.write_enabler("we1")
+ ss = self.create("test_readv")
+ secrets = ( self.write_enabler("we1"),
+ self.renew_secret("we1"),
+ self.cancel_secret("we1") )
+ data = "".join([ ("%d" % i) * 10 for i in range(10) ])
+ write = ss.remote_slot_testv_and_readv_and_writev
+ read = ss.remote_slot_readv
data = [("%d" % i) * 100 for i in range(3)]
- for i in range(3):
- rc = shares[i].remote_testv_and_writev(WE, [], [(0, data[i])],
- new_length=None)
- self.failUnlessEqual(rc, (True, []))
- answer = ss.remote_readv_slots("si1", [(0, 10)])
+ rc = write("si1", secrets,
+ {0: ([], [(0,data[0])], None),
+ 1: ([], [(0,data[1])], None),
+ 2: ([], [(0,data[2])], None),
+ }, [])
+ self.failUnlessEqual(rc, (True, {}))
+
+ answer = read("si1", [], [(0, 10)])
self.failUnlessEqual(answer, {0: ["0"*10],
1: ["1"*10],
2: ["2"*10]})
def test_leases(self):
ss = self.create("test_leases")
- secret = 14
- shares = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100)
- s0 = shares[0]
- WE = self.write_enabler("we1")
+ def secrets(n):
+ return ( self.write_enabler("we1"),
+ self.renew_secret("we1-%d" % n),
+ self.cancel_secret("we1-%d" % n) )
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
- answer = s0.remote_testv_and_writev(WE,
- [],
- [(0, data),],
- new_length=None)
+ write = ss.remote_slot_testv_and_readv_and_writev
+ read = ss.remote_slot_readv
+ rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
+ self.failUnlessEqual(rc, (True, {}))
# create a random non-numeric file in the bucket directory, to
# exercise the code that's supposed to ignore those.
# re-allocate the slots and use the same secrets, that should update
# the lease
- shares2 = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100)
+ write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
# renew it directly
- ss.remote_renew_lease("si1", self.renew_secret(secret))
+ ss.remote_renew_lease("si1", secrets(0)[1])
# now allocate them with a bunch of different secrets, to trigger the
# extended lease code
- shares2 = self.allocate(ss, "si1", "we1", secret+1, set([0,1,2]), 100)
- shares2 = self.allocate(ss, "si1", "we1", secret+2, set([0,1,2]), 100)
- shares2 = self.allocate(ss, "si1", "we1", secret+3, set([0,1,2]), 100)
- shares2 = self.allocate(ss, "si1", "we1", secret+4, set([0,1,2]), 100)
- shares2 = self.allocate(ss, "si1", "we1", secret+5, set([0,1,2]), 100)
+ write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
+ write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
+ write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
+ write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
+ write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
+
# cancel one of them
- ss.remote_cancel_lease("si1", self.cancel_secret(secret+5))
+ ss.remote_cancel_lease("si1", secrets(5)[2])
+ s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
all_leases = s0.debug_get_leases()
self.failUnlessEqual(len(all_leases), 5)
# and write enough data to expand the container, forcing the server
# to move the leases
- answer = s0.remote_testv_and_writev(WE,
- [],
- [(0, data),],
- new_length=200)
+ write("si1", secrets(0),
+ {0: ([], [(0,data)], 200), },
+ [])
# read back the leases, make sure they're still intact.
self.compare_leases_without_timestamps(all_leases,
s0.debug_get_leases())
- ss.remote_renew_lease("si1", self.renew_secret(secret))
- ss.remote_renew_lease("si1", self.renew_secret(secret+1))
- ss.remote_renew_lease("si1", self.renew_secret(secret+2))
- ss.remote_renew_lease("si1", self.renew_secret(secret+3))
- ss.remote_renew_lease("si1", self.renew_secret(secret+4))
+ ss.remote_renew_lease("si1", secrets(0)[1])
+ ss.remote_renew_lease("si1", secrets(1)[1])
+ ss.remote_renew_lease("si1", secrets(2)[1])
+ ss.remote_renew_lease("si1", secrets(3)[1])
+ ss.remote_renew_lease("si1", secrets(4)[1])
self.compare_leases_without_timestamps(all_leases,
s0.debug_get_leases())
# get a new copy of the leases, with the current timestamps. Reading
# is present, to provide for share migration
self.failUnlessRaises(IndexError,
ss.remote_renew_lease, "si1",
- self.renew_secret(secret+20))
+ secrets(20)[1])
# same for cancelling
self.failUnlessRaises(IndexError,
ss.remote_cancel_lease, "si1",
- self.cancel_secret(secret+20))
+ secrets(20)[2])
self.failUnlessEqual(all_leases, s0.debug_get_leases())
- s0.remote_read(0, 200)
+
+ # reading shares should not modify the timestamp
+ read("si1", [], [(0,200)])
self.failUnlessEqual(all_leases, s0.debug_get_leases())
- answer = s0.remote_testv_and_writev(WE,
- [],
- [(200, "make me bigger"),],
- new_length=None)
+ write("si1", secrets(0),
+ {0: ([], [(200, "make me bigger")], None)}, [])
self.compare_leases_without_timestamps(all_leases,
s0.debug_get_leases())
- answer = s0.remote_testv_and_writev(WE,
- [],
- [(500, "make me really bigger"),],
- new_length=None)
+ write("si1", secrets(0),
+ {0: ([], [(500, "make me really bigger")], None)}, [])
self.compare_leases_without_timestamps(all_leases,
s0.debug_get_leases())
# now cancel them all
- ss.remote_cancel_lease("si1", self.cancel_secret(secret))
- ss.remote_cancel_lease("si1", self.cancel_secret(secret+1))
- ss.remote_cancel_lease("si1", self.cancel_secret(secret+2))
- ss.remote_cancel_lease("si1", self.cancel_secret(secret+3))
+ ss.remote_cancel_lease("si1", secrets(0)[2])
+ ss.remote_cancel_lease("si1", secrets(1)[2])
+ ss.remote_cancel_lease("si1", secrets(2)[2])
+ ss.remote_cancel_lease("si1", secrets(3)[2])
+
# the slot should still be there
- shares3 = ss.remote_get_mutable_slot("si1")
- self.failUnlessEqual(len(shares3), 3)
+ remaining_shares = read("si1", [], [(0,10)])
+ self.failUnlessEqual(len(remaining_shares), 1)
self.failUnlessEqual(len(s0.debug_get_leases()), 1)
- ss.remote_cancel_lease("si1", self.cancel_secret(secret+4))
+ ss.remote_cancel_lease("si1", secrets(4)[2])
# now the slot should be gone
- self.failUnlessEqual(ss.remote_get_mutable_slot("si1"), {})
+ no_shares = read("si1", [], [(0,10)])
+ self.failUnlessEqual(no_shares, {})