def read(offset=int, length=int):
return ShareData
+TestVector = ListOf(TupleOf(int, int, str, str))
+# elements are (offset, length, operator, specimen)
+# operator is one of "lt, le, eq, ne, ge, gt, nop"
+# nop always passes and is used to fetch data while writing.
+# you should use length==len(specimen) for everything except nop
+DataVector = ListOf(TupleOf(int, ShareData))
+# (offset, data). This limits us to 30 writes of 1MiB each per call
+TestResults = ListOf(str)
+# returns data[offset:offset+length] for each element of TestVector
+
+class RIMutableSlot(RemoteInterface):
+ def testv_and_writev(write_enabler=Hash,
+ testv=TestVector,
+ datav=DataVector,
+ new_length=ChoiceOf(None, int)):
+ """General-purpose test-and-set operation for mutable slots. Perform
+ the given comparisons. If they all pass, then apply the write vector.
+
+ If new_length is not None, use it to set the size of the container.
+ This can be used to pre-allocate space for a series of upcoming
+ writes, or truncate existing data. If the container is growing,
+ new_length will be applied before datav. If the container is
+ shrinking, it will be applied afterwards.
+
+ Return the old data that was used for the comparisons.
+
+ The boolean return value is True if the write vector was applied,
+ false if not.
+
+ If the write_enabler is wrong, this will raise BadWriterEnablerError.
+ To enable share migration, the exception will have the nodeid used
+ for the old write enabler embedded in it, in the following string::
+
+ The write enabler was recorded by nodeid '%s'.
+
+ """
+ return TupleOf(bool, TestResults)
+
+ def read(offset=int, length=int):
+ return ShareData
+
+ def get_length():
+ return int
class RIStorageServer(RemoteInterface):
def allocate_buckets(storage_index=StorageIndex,
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
+ def allocate_mutable_slot(storage_index=StorageIndex,
+ write_enabler=Hash,
+ renew_secret=LeaseRenewSecret,
+ cancel_secret=LeaseCancelSecret,
+ sharenums=SetOf(int, maxLength=MAX_BUCKETS),
+ allocated_size=int):
+ """
+ @param storage_index: the index of the bucket to be created or
+ increfed.
+ @param write_enabler: a secret that is stored along with the slot.
+ Writes are accepted from any caller who can
+ present the matching secret. A different secret
+ should be used for each slot*server pair.
+ @param renew_secret: This is the secret used to protect bucket refresh
+ This secret is generated by the client and
+ stored for later comparison by the server. Each
+ server is given a different secret.
+ @param cancel_secret: Like renew_secret, but protects bucket decref.
+ @param sharenums: these are the share numbers (probably between 0 and
+ 99) that the sender is proposing to store on this
+ server.
+ @param allocated_size: all shares will pre-allocate this many bytes.
+ Use this to a) confirm that you can claim as
+ much space as you want before you actually
+ send the data, and b) reduce the disk-IO cost
+ of doing incremental writes.
+
+ @return: dict mapping sharenum to slot. The return value may include
+ more sharenums than asked, if some shares already existed.
+ New leases are added for all
+ shares.
+
+ """
+ return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS)
+
+ def get_mutable_slot(storage_index=StorageIndex):
+ """This returns an empty dictionary if the server has no shares
+ of the slot mentioned."""
+ return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS)
+
+
class IStorageBucketWriter(Interface):
def put_block(segmentnum=int, data=ShareData):
"""@param data: For most segments, this data will be 'blocksize'
class NotCapableError(Exception):
"""You have tried to write to a read-only node."""
+class BadWriteEnablerError(Exception):
+ pass
+
+
class RIControlClient(RemoteInterface):
def wait_for_client_connections(num_clients=int):
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
- RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE
+ RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
+ BadWriterEnablerError, RIMutableSlot
from allmydata.util import fileutil, idlib, mathutil
from allmydata.util.assertutil import precondition, _assert
+class DataTooLargeError(Exception):
+ pass
+
# storage/
# storage/shares/incoming
# incoming/ holds temp dirs named $STORAGEINDEX/$SHARENUM which will be
return self._share_file.read_share_data(offset, length)
+# the MutableShareFile is like the ShareFile, but used for mutable data. It
+# has a different layout. See docs/mutable.txt for more details.
+
+# # offset size name
+# 1 0 32 magic verstr "tahoe mutable container v1" plus binary
+# 2 32 32 write enabler's nodeid
+# 3 64 32 write enabler
+# 4 72 8 data size (actual share data present) (a)
+# 5 80 8 offset of (8) count of extra leases (after data)
+# 6 88 416 four leases, 104 bytes each
+# 0 4 ownerid (0 means "no lease here")
+# 4 4 expiration timestamp
+# 8 32 renewal token
+# 40 32 cancel token
+# 72 32 nodeid which accepted the tokens
+# 7 504 (a) data
+# 8 ?? 4 count of extra leases
+# 9 ?? n*104 extra leases
+
+
+assert struct.calcsize("L"), 4
+assert struct.calcsize("Q"), 8
+
+class MutableShareFile(Referenceable):
+ # note: at any given time, there should only be a single instance of this
+ # class per filename. More than one is likely to corrupt the container,
+ # because of state we cache in instance variables. This requires the
+ # StorageServer to use a WeakValueDictionary, indexed by filename. This
+ # could be improved by cacheing less and doing more IO.
+ implements(RIMutableSlot)
+
+ DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s")
+ HEADER_SIZE = struct.calcsize(">32s32s32sQQ") # doesn't include leases
+ LEASE_SIZE = struct.calcsize(">LL32s32s32s")
+ DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
+ # our sharefiles share with a recognizable string, plus some random
+ # binary data to reduce the chance that a regular text file will look
+ # like a sharefile.
+ MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
+ assert len(MAGIC) == 32
+ MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
+ # TODO: decide upon a policy for max share size
+
+ def __init__(self, filename):
+ self.home = filename
+ self._base_lease_offset = self.HEADER_SIZE
+ if os.path.exists(self.home):
+ f = open(self.home, 'rb')
+ data = f.read(88)
+ (magic,
+ self._write_enabler_nodeid, self._write_enabler,
+ self._data_length, offset) = struct.unpack(">32s32s32sQQ", data)
+ assert magic == self.MAGIC
+ self._extra_lease_offset = offset # points at (8)
+ f.seek(self._extra_lease_offset)
+ data = f.read(4)
+ self._num_extra_leases = struct.unpack(">L", data)
+ f.close()
+
+
+ def create(self, my_nodeid, write_enabler):
+ assert not os.path.exists(self.home)
+ self._write_enabler = write_enabler
+ self._data_length = 0
+ self._extra_lease_offset = (self.HEADER_SIZE
+ + 4 * self.LEASE_SIZE
+ + self._data_length)
+ assert self._extra_lease_offset == self.DATA_OFFSET # true at creation
+ self._num_extra_leases = 0
+ f = open(self.home, 'wb')
+ header = struct.pack(">32s32s32sQQ",
+ self.MAGIC, my_nodeid, write_enabler,
+ self._data_length, self._extra_lease_offset,
+ )
+ f.write(header)
+ # data goes here, empty after creation
+ f.write(struct.pack(">L", self._num_extra_leases))
+ # extra leases go here, none at creation
+ f.close()
+
+
+ def read_share_data(self, offset, length):
+ precondition(offset >= 0)
+ if offset+length > self._data_length:
+ # reads beyond the end of the data are truncated. Reads that
+ # start beyond the end of the data return an empty string.
+ length = max(0, self._data_length-offset)
+ if length == 0:
+ return ""
+ precondition(offset+length <= self._data_length)
+ f = open(self.home, 'rb')
+ f.seek(self.DATA_OFFSET+offset)
+ return f.read(length)
+
+ def change_container_size(self, new_container_size):
+ if new_container_size > self.MAX_SIZE:
+ raise DataTooLargeError()
+ new_extra_lease_offset = self.DATA_OFFSET + new_container_size
+ if new_extra_lease_offset < self._extra_lease_offset:
+ # TODO: allow containers to shrink
+ return
+ f = open(self.home, 'rb+')
+ f.seek(self._extra_lease_offset)
+ extra_lease_data = f.read(4 + self._num_extra_leases * self.LEASE_SIZE)
+ f.seek(new_extra_lease_offset)
+ f.write(extra_lease_data)
+ self._extra_lease_offset = new_extra_lease_offset
+ # an interrupt here will corrupt the leases, iff the move caused the
+ # extra leases to overlap.
+ f.seek(self.DATA_LENGTH_OFFSET+8)
+ f.write(struct.pack(">Q", new_extra_lease_offset))
+ f.close()
+
+ def write_share_data(self, offset, data):
+ length = len(data)
+ precondition(offset >= 0)
+ if offset+length < self._data_length:
+ # they are not expanding their data size
+ f = open(self.home, 'rb+')
+ f.seek(self.DATA_OFFSET+offset)
+ f.write(data)
+ f.close()
+ return
+ if self.DATA_OFFSET+offset+length <= self._extra_lease_offset:
+ # they are expanding their data size, but not the container size
+ f = open(self.home, 'rb+')
+ self._data_length = offset+length
+ f.seek(self.DATA_LENGTH_OFFSET)
+ f.write(struct.pack(">Q", self._data_length))
+ # an interrupt here will result in a corrupted share
+ f.seek(self.DATA_OFFSET+offset)
+ f.write(data)
+ f.close()
+ return
+
+ # they are expanding the container, so we have to move the leases.
+ # With luck, they're expanding it more than the size of the extra
+ # lease block, which will minimize the corrupt-the-share window
+
+ self.change_container_size(offset+length)
+
+ # an interrupt here is ok.. the container has been enlarged but the
+ # data remains untouched
+
+ self._data_length = offset+length
+
+ f = open(self.home, 'rb+')
+ f.seek(self.DATA_OFFSET+offset)
+ f.write(data)
+ # an interrupt here will result in a corrupted share
+ f.seek(self.DATA_LENGTH_OFFSET)
+ f.write(struct.pack(">Q", self._data_length))
+ f.close()
+ return
+
+ def _write_lease_record(self, f, lease_number, lease_info):
+ (ownerid, expiration_time,
+ renew_secret, cancel_secret, nodeid) = lease_info
+ if lease_number < 4:
+ offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
+ elif (lease_number-4) < self._num_extra_leases:
+ offset = (self._extra_lease_offset
+ + 4
+ + (lease_number-4)*self.LEASE_NUMBER)
+ else:
+ f.seek(self._extra_lease_offset)
+ f.write(struct.pack(">L", self._num_extra_leases+1))
+ self._num_extra_leases += 1
+ offset = (self._extra_lease_offset
+ + 4
+ + (lease_number-4)*self.LEASE_NUMBER)
+ f.seek(offset)
+ assert f.tell() == offset
+ f.write(struct.pack(">LL32s32s32s",
+ ownerid, int(expiration_time),
+ renew_secret, cancel_secret, nodeid))
+
+ def _read_lease_record(self, f, lease_number):
+ # returns a 5-tuple of lease info, or None
+ if lease_number < 4:
+ offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
+ elif (lease_number-4) < self._num_extra_leases:
+ offset = (self._extra_lease_offset
+ + 4
+ + (lease_number-4)*self.LEASE_NUMBER)
+ else:
+ raise IndexError("No such lease number %d" % lease_number)
+ f.seek(offset)
+ assert f.tell() == offset
+ data = f.read(self.LEASE_SIZE)
+ lease_info = struct.unpack(">LL32s32s32s", data)
+ (ownerid, expiration_time,
+ renew_secret, cancel_secret, nodeid) = lease_info
+ if ownerid == 0:
+ return None
+ return lease_info
+
+ def _read_num_leases(self, f):
+ f.seek(self.HEADER_SIZE)
+ leasedata = f.read(4*self.LEASE_SIZE)
+ num_leases = 0
+ for i in range(4):
+ base = i*self.LEASE_SIZE
+ (ownerid,) = struct.unpack(">L", leasedata[base:base+4])
+ if ownerid != 0:
+ num_leases += 1
+ return num_leases + self._num_extra_leases
+
+ def pack_leases(self):
+ pass
+
+ def _truncate_leases(self, f, num_leases):
+ f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
+
+ def enumerate_leases(self, f):
+ """Yields (leasenum, (ownerid, expiration_time, renew_secret,
+ cancel_secret, accepting_nodeid)) for all leases."""
+ for i in range(self._read_num_leases(f)):
+ try:
+ data = self._read_lease_record(f, i)
+ if data is not None:
+ yield (i,data)
+ except IndexError:
+ return
+
+ def add_lease(self, lease_info):
+ f = open(self.home, 'rb+')
+ num_leases = self._read_num_leases(f)
+ self._write_lease_record(f, num_leases, lease_info)
+ self._write_num_leases(f, num_leases+1)
+ f.close()
+
+ def renew_lease(self, renew_secret, new_expire_time):
+ accepting_nodeids = set()
+ f = open(self.home, 'rb+')
+ for (leasenum,(oid,et,rs,cs,anid)) in self.enumerate_leases(f):
+ if rs == renew_secret:
+ # yup. See if we need to update the owner time.
+ if new_expire_time > et:
+ # yes
+ new_lease = (oid,new_expire_time,rs,cs,anid)
+ self._write_lease_record(f, leasenum, new_lease)
+ f.close()
+ return
+ accepting_nodeids.add(anid)
+ f.close()
+ # TODO: return the accepting_nodeids set, to give the client a chance
+ # to update the leases on a share which has been migrated from its
+ # original server to a new one.
+ raise IndexError("unable to renew non-existent lease")
+
+ def add_or_renew_lease(self, lease_info):
+ ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info
+ try:
+ self.renew_lease(renew_secret, expire_time)
+ except IndexError:
+ self.add_lease(lease_info)
+
+ def cancel_lease(self, cancel_secret):
+ """Remove any leases with the given cancel_secret. Return
+ (num_remaining_leases, space_freed). Raise IndexError if there was no
+ lease with the given cancel_secret."""
+
+ modified = 0
+ remaining = 0
+ blank = "\x00"*32
+ blank_lease = (0, 0, blank, blank, blank)
+ f = open(self.home, 'rb+')
+ for (leasenum,(oid,et,rs,cs,anid)) in self.enumerate_leases(f):
+ if cs == cancel_secret:
+ self._write_lease_record(f, leasenum, blank_lease)
+ modified += 1
+ else:
+ remaining += 1
+ if modified:
+ freed_space = self._pack_leases(f)
+ f.close()
+ return (freed_space, remaining)
+
+ def _pack_leases(self, f):
+ # TODO: reclaim space from cancelled leases
+ return 0
+
+ def remote_read(self, offset, length):
+ return self.read_share_data(offset, length)
+ def remote_get_length(self):
+ return self._data_length
+ def remote_testv_and_writev(self, write_enabler, testv, datav, new_length):
+ if write_enabler != self._write_enabler:
+ # accomodate share migration by reporting the nodeid used for the
+ # old write enabler.
+ msg = "The write enabler was recorded by nodeid '%s'." % \
+ (idlib.b2a(self._write_enabler_nodeid),)
+ raise BadWriterEnablerError(msg)
+ # check testv
+ test_results_v = []
+ test_failed = False
+ for (offset, length, operator, specimen) in testv:
+ data = self.read_share_data(offset, length)
+ test_results_v.append(data)
+ if not self.compare(data, operator, specimen):
+ test_failed = False
+ if test_failed:
+ return (False, test_results_v)
+ # now apply the write vector
+ for (offset, data) in datav:
+ self.write_share_data(offset, data)
+ if new_length is not None:
+ self.change_container_size(new_length)
+ self._data_length = new_length
+ f = open(self.home, "rb+")
+ f.seek(self.DATA_LENGTH_OFFSET)
+ f.write(struct.pack(">Q", self._data_length))
+ f.close()
+ return (True, test_results_v)
+
+ def compare(self, a, op, b):
+ assert op in ("nop", "lt", "le", "eq", "ne", "ge", "gt")
+ if op == "nop":
+ return True
+ if op == "lt":
+ return a <= b
+ if op == "le":
+ return a < b
+ if op == "eq":
+ return a == b
+ if op == "ne":
+ return a != b
+ if op == "ge":
+ return a >= b
+ if op == "gt":
+ return a > b
+ # never reached
+
+def create_mutable_sharefile(filename, my_nodeid, write_enabler):
+ ms = MutableShareFile(filename)
+ ms.create(my_nodeid, write_enabler)
+ del ms
+ return MutableShareFile(filename)
+
+
class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer)
name = 'storageserver'
return iter([])
+# the code before here runs on the storage server, not the client
+# the code beyond here runs on the client, not the storage server
+
"""
Share data is written into a single file. At the start of the file, there is
a series of four-byte big-endian offset values, which indicate where each
leases = list(ss.get_leases("si3"))
self.failUnlessEqual(len(leases), 2)
+
+
+class MutableServer(unittest.TestCase):
+
+ def setUp(self):
+ self.sparent = service.MultiService()
+ self._secret = itertools.count()
+ def tearDown(self):
+ return self.sparent.stopService()
+
+ def workdir(self, name):
+ basedir = os.path.join("storage", "MutableServer", name)
+ return basedir
+
+ def create(self, name, sizelimit=None):
+ workdir = self.workdir(name)
+ ss = StorageServer(workdir, sizelimit)
+ ss.setServiceParent(self.sparent)
+ return ss
+
+ def test_create(self):
+ ss = self.create("test_create")
+
+ def write_enabler(self, we_tag):
+ return hashutil.tagged_hash("we_blah", we_tag)
+
+ def allocate(self, ss, storage_index, we_tag, sharenums, size):
+ write_enabler = self.write_enabler(we_tag)
+ renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
+ cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
+ return ss.remote_allocate_mutable_slot(storage_index,
+ write_enabler,
+ renew_secret, cancel_secret,
+ sharenums, size)
+
+ def test_allocate(self):
+ ss = self.create("test_allocate")
+ shares = self.allocate(ss, "si1", "we1", set([0,1,2]), 100)
+ self.failUnlessEqual(len(shares), 3)
+ self.failUnlessEqual(set(shares.keys()), set([0,1,2]))
+ shares2 = ss.get_mutable_slot("si1")
+ self.failUnlessEqual(len(shares2), 3)
+ self.failUnlessEqual(set(shares2.keys()), set([0,1,2]))
+ # the actual RIMutableSlot objects are required to be singtons (one
+ # per SI+shnum), so each get_mutable_slot() call should return the
+ # same RemoteReferences
+ self.failUnlessEqual(set(shares.values()), set(shares2.values()))
+
+ s0 = shares[0]
+ self.failUnlessEqual(s0.remote_read(0, 10), "")
+ self.failUnlessEqual(s0.remote_read(100, 10), "")
+ # try writing to one
+ data = "".join([ ("%d" % i) * 10 for i in range(10) ])
+ answer = s0.remote_testv_and_writev(self.write_enabler("we1"),
+ [],
+ [(0, data),],
+ new_length=None)
+ self.failUnlessEqual(answer, [])
+
+ self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111")
+ self.failUnlessEqual(s0.remote_read(95, 10), "99999")
+ self.failUnlessEqual(s0.remote_get_length(), 100)
+