From b24c2925e8e99656d42bcd3c7e7466391bd6afa7 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 30 Oct 2007 19:47:36 -0700 Subject: [PATCH] checkpointing mutable-file work. Storage layer is 80% in place. --- src/allmydata/dirnode.py | 6 +- src/allmydata/interfaces.py | 88 ++++++++ src/allmydata/storage.py | 350 ++++++++++++++++++++++++++++- src/allmydata/test/test_storage.py | 63 ++++++ 4 files changed, 502 insertions(+), 5 deletions(-) diff --git a/src/allmydata/dirnode.py b/src/allmydata/dirnode.py index 26305def..6696b6a0 100644 --- a/src/allmydata/dirnode.py +++ b/src/allmydata/dirnode.py @@ -6,15 +6,13 @@ from twisted.internet import defer from foolscap import Referenceable from allmydata import uri from allmydata.interfaces import RIVirtualDriveServer, \ - IDirectoryNode, IFileNode, IFileURI, IDirnodeURI, IURI + IDirectoryNode, IFileNode, IFileURI, IDirnodeURI, IURI, \ + BadWriteEnablerError from allmydata.util import bencode, idlib, hashutil, fileutil from allmydata.Crypto.Cipher import AES # VirtualDriveServer is the side that hosts directory nodes -class BadWriteEnablerError(Exception): - pass - class NoPublicRootError(Exception): pass diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 38285b3d..9125350a 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -82,6 +82,49 @@ class RIBucketReader(RemoteInterface): def read(offset=int, length=int): return ShareData +TestVector = ListOf(TupleOf(int, int, str, str)) +# elements are (offset, length, operator, specimen) +# operator is one of "lt, le, eq, ne, ge, gt, nop" +# nop always passes and is used to fetch data while writing. +# you should use length==len(specimen) for everything except nop +DataVector = ListOf(TupleOf(int, ShareData)) +# (offset, data). This limits us to 30 writes of 1MiB each per call +TestResults = ListOf(str) +# returns data[offset:offset+length] for each element of TestVector + +class RIMutableSlot(RemoteInterface): + def testv_and_writev(write_enabler=Hash, + testv=TestVector, + datav=DataVector, + new_length=ChoiceOf(None, int)): + """General-purpose test-and-set operation for mutable slots. Perform + the given comparisons. If they all pass, then apply the write vector. + + If new_length is not None, use it to set the size of the container. + This can be used to pre-allocate space for a series of upcoming + writes, or truncate existing data. If the container is growing, + new_length will be applied before datav. If the container is + shrinking, it will be applied afterwards. + + Return the old data that was used for the comparisons. + + The boolean return value is True if the write vector was applied, + false if not. + + If the write_enabler is wrong, this will raise BadWriterEnablerError. + To enable share migration, the exception will have the nodeid used + for the old write enabler embedded in it, in the following string:: + + The write enabler was recorded by nodeid '%s'. + + """ + return TupleOf(bool, TestResults) + + def read(offset=int, length=int): + return ShareData + + def get_length(): + return int class RIStorageServer(RemoteInterface): def allocate_buckets(storage_index=StorageIndex, @@ -126,6 +169,47 @@ class RIStorageServer(RemoteInterface): return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS) + def allocate_mutable_slot(storage_index=StorageIndex, + write_enabler=Hash, + renew_secret=LeaseRenewSecret, + cancel_secret=LeaseCancelSecret, + sharenums=SetOf(int, maxLength=MAX_BUCKETS), + allocated_size=int): + """ + @param storage_index: the index of the bucket to be created or + increfed. + @param write_enabler: a secret that is stored along with the slot. + Writes are accepted from any caller who can + present the matching secret. A different secret + should be used for each slot*server pair. + @param renew_secret: This is the secret used to protect bucket refresh + This secret is generated by the client and + stored for later comparison by the server. Each + server is given a different secret. + @param cancel_secret: Like renew_secret, but protects bucket decref. + @param sharenums: these are the share numbers (probably between 0 and + 99) that the sender is proposing to store on this + server. + @param allocated_size: all shares will pre-allocate this many bytes. + Use this to a) confirm that you can claim as + much space as you want before you actually + send the data, and b) reduce the disk-IO cost + of doing incremental writes. + + @return: dict mapping sharenum to slot. The return value may include + more sharenums than asked, if some shares already existed. + New leases are added for all + shares. + + """ + return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS) + + def get_mutable_slot(storage_index=StorageIndex): + """This returns an empty dictionary if the server has no shares + of the slot mentioned.""" + return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS) + + class IStorageBucketWriter(Interface): def put_block(segmentnum=int, data=ShareData): """@param data: For most segments, this data will be 'blocksize' @@ -1117,6 +1201,10 @@ class IVirtualDrive(Interface): class NotCapableError(Exception): """You have tried to write to a read-only node.""" +class BadWriteEnablerError(Exception): + pass + + class RIControlClient(RemoteInterface): def wait_for_client_connections(num_clients=int): diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index 388f8e1b..6a15edbb 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -7,10 +7,14 @@ from twisted.internet import defer from zope.interface import implements from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ - RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE + RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \ + BadWriterEnablerError, RIMutableSlot from allmydata.util import fileutil, idlib, mathutil from allmydata.util.assertutil import precondition, _assert +class DataTooLargeError(Exception): + pass + # storage/ # storage/shares/incoming # incoming/ holds temp dirs named $STORAGEINDEX/$SHARENUM which will be @@ -213,6 +217,347 @@ class BucketReader(Referenceable): return self._share_file.read_share_data(offset, length) +# the MutableShareFile is like the ShareFile, but used for mutable data. It +# has a different layout. See docs/mutable.txt for more details. + +# # offset size name +# 1 0 32 magic verstr "tahoe mutable container v1" plus binary +# 2 32 32 write enabler's nodeid +# 3 64 32 write enabler +# 4 72 8 data size (actual share data present) (a) +# 5 80 8 offset of (8) count of extra leases (after data) +# 6 88 416 four leases, 104 bytes each +# 0 4 ownerid (0 means "no lease here") +# 4 4 expiration timestamp +# 8 32 renewal token +# 40 32 cancel token +# 72 32 nodeid which accepted the tokens +# 7 504 (a) data +# 8 ?? 4 count of extra leases +# 9 ?? n*104 extra leases + + +assert struct.calcsize("L"), 4 +assert struct.calcsize("Q"), 8 + +class MutableShareFile(Referenceable): + # note: at any given time, there should only be a single instance of this + # class per filename. More than one is likely to corrupt the container, + # because of state we cache in instance variables. This requires the + # StorageServer to use a WeakValueDictionary, indexed by filename. This + # could be improved by cacheing less and doing more IO. + implements(RIMutableSlot) + + DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s") + HEADER_SIZE = struct.calcsize(">32s32s32sQQ") # doesn't include leases + LEASE_SIZE = struct.calcsize(">LL32s32s32s") + DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE + # our sharefiles share with a recognizable string, plus some random + # binary data to reduce the chance that a regular text file will look + # like a sharefile. + MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e" + assert len(MAGIC) == 32 + MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary + # TODO: decide upon a policy for max share size + + def __init__(self, filename): + self.home = filename + self._base_lease_offset = self.HEADER_SIZE + if os.path.exists(self.home): + f = open(self.home, 'rb') + data = f.read(88) + (magic, + self._write_enabler_nodeid, self._write_enabler, + self._data_length, offset) = struct.unpack(">32s32s32sQQ", data) + assert magic == self.MAGIC + self._extra_lease_offset = offset # points at (8) + f.seek(self._extra_lease_offset) + data = f.read(4) + self._num_extra_leases = struct.unpack(">L", data) + f.close() + + + def create(self, my_nodeid, write_enabler): + assert not os.path.exists(self.home) + self._write_enabler = write_enabler + self._data_length = 0 + self._extra_lease_offset = (self.HEADER_SIZE + + 4 * self.LEASE_SIZE + + self._data_length) + assert self._extra_lease_offset == self.DATA_OFFSET # true at creation + self._num_extra_leases = 0 + f = open(self.home, 'wb') + header = struct.pack(">32s32s32sQQ", + self.MAGIC, my_nodeid, write_enabler, + self._data_length, self._extra_lease_offset, + ) + f.write(header) + # data goes here, empty after creation + f.write(struct.pack(">L", self._num_extra_leases)) + # extra leases go here, none at creation + f.close() + + + def read_share_data(self, offset, length): + precondition(offset >= 0) + if offset+length > self._data_length: + # reads beyond the end of the data are truncated. Reads that + # start beyond the end of the data return an empty string. + length = max(0, self._data_length-offset) + if length == 0: + return "" + precondition(offset+length <= self._data_length) + f = open(self.home, 'rb') + f.seek(self.DATA_OFFSET+offset) + return f.read(length) + + def change_container_size(self, new_container_size): + if new_container_size > self.MAX_SIZE: + raise DataTooLargeError() + new_extra_lease_offset = self.DATA_OFFSET + new_container_size + if new_extra_lease_offset < self._extra_lease_offset: + # TODO: allow containers to shrink + return + f = open(self.home, 'rb+') + f.seek(self._extra_lease_offset) + extra_lease_data = f.read(4 + self._num_extra_leases * self.LEASE_SIZE) + f.seek(new_extra_lease_offset) + f.write(extra_lease_data) + self._extra_lease_offset = new_extra_lease_offset + # an interrupt here will corrupt the leases, iff the move caused the + # extra leases to overlap. + f.seek(self.DATA_LENGTH_OFFSET+8) + f.write(struct.pack(">Q", new_extra_lease_offset)) + f.close() + + def write_share_data(self, offset, data): + length = len(data) + precondition(offset >= 0) + if offset+length < self._data_length: + # they are not expanding their data size + f = open(self.home, 'rb+') + f.seek(self.DATA_OFFSET+offset) + f.write(data) + f.close() + return + if self.DATA_OFFSET+offset+length <= self._extra_lease_offset: + # they are expanding their data size, but not the container size + f = open(self.home, 'rb+') + self._data_length = offset+length + f.seek(self.DATA_LENGTH_OFFSET) + f.write(struct.pack(">Q", self._data_length)) + # an interrupt here will result in a corrupted share + f.seek(self.DATA_OFFSET+offset) + f.write(data) + f.close() + return + + # they are expanding the container, so we have to move the leases. + # With luck, they're expanding it more than the size of the extra + # lease block, which will minimize the corrupt-the-share window + + self.change_container_size(offset+length) + + # an interrupt here is ok.. the container has been enlarged but the + # data remains untouched + + self._data_length = offset+length + + f = open(self.home, 'rb+') + f.seek(self.DATA_OFFSET+offset) + f.write(data) + # an interrupt here will result in a corrupted share + f.seek(self.DATA_LENGTH_OFFSET) + f.write(struct.pack(">Q", self._data_length)) + f.close() + return + + def _write_lease_record(self, f, lease_number, lease_info): + (ownerid, expiration_time, + renew_secret, cancel_secret, nodeid) = lease_info + if lease_number < 4: + offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE + elif (lease_number-4) < self._num_extra_leases: + offset = (self._extra_lease_offset + + 4 + + (lease_number-4)*self.LEASE_NUMBER) + else: + f.seek(self._extra_lease_offset) + f.write(struct.pack(">L", self._num_extra_leases+1)) + self._num_extra_leases += 1 + offset = (self._extra_lease_offset + + 4 + + (lease_number-4)*self.LEASE_NUMBER) + f.seek(offset) + assert f.tell() == offset + f.write(struct.pack(">LL32s32s32s", + ownerid, int(expiration_time), + renew_secret, cancel_secret, nodeid)) + + def _read_lease_record(self, f, lease_number): + # returns a 5-tuple of lease info, or None + if lease_number < 4: + offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE + elif (lease_number-4) < self._num_extra_leases: + offset = (self._extra_lease_offset + + 4 + + (lease_number-4)*self.LEASE_NUMBER) + else: + raise IndexError("No such lease number %d" % lease_number) + f.seek(offset) + assert f.tell() == offset + data = f.read(self.LEASE_SIZE) + lease_info = struct.unpack(">LL32s32s32s", data) + (ownerid, expiration_time, + renew_secret, cancel_secret, nodeid) = lease_info + if ownerid == 0: + return None + return lease_info + + def _read_num_leases(self, f): + f.seek(self.HEADER_SIZE) + leasedata = f.read(4*self.LEASE_SIZE) + num_leases = 0 + for i in range(4): + base = i*self.LEASE_SIZE + (ownerid,) = struct.unpack(">L", leasedata[base:base+4]) + if ownerid != 0: + num_leases += 1 + return num_leases + self._num_extra_leases + + def pack_leases(self): + pass + + def _truncate_leases(self, f, num_leases): + f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE) + + def enumerate_leases(self, f): + """Yields (leasenum, (ownerid, expiration_time, renew_secret, + cancel_secret, accepting_nodeid)) for all leases.""" + for i in range(self._read_num_leases(f)): + try: + data = self._read_lease_record(f, i) + if data is not None: + yield (i,data) + except IndexError: + return + + def add_lease(self, lease_info): + f = open(self.home, 'rb+') + num_leases = self._read_num_leases(f) + self._write_lease_record(f, num_leases, lease_info) + self._write_num_leases(f, num_leases+1) + f.close() + + def renew_lease(self, renew_secret, new_expire_time): + accepting_nodeids = set() + f = open(self.home, 'rb+') + for (leasenum,(oid,et,rs,cs,anid)) in self.enumerate_leases(f): + if rs == renew_secret: + # yup. See if we need to update the owner time. + if new_expire_time > et: + # yes + new_lease = (oid,new_expire_time,rs,cs,anid) + self._write_lease_record(f, leasenum, new_lease) + f.close() + return + accepting_nodeids.add(anid) + f.close() + # TODO: return the accepting_nodeids set, to give the client a chance + # to update the leases on a share which has been migrated from its + # original server to a new one. + raise IndexError("unable to renew non-existent lease") + + def add_or_renew_lease(self, lease_info): + ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info + try: + self.renew_lease(renew_secret, expire_time) + except IndexError: + self.add_lease(lease_info) + + def cancel_lease(self, cancel_secret): + """Remove any leases with the given cancel_secret. Return + (num_remaining_leases, space_freed). Raise IndexError if there was no + lease with the given cancel_secret.""" + + modified = 0 + remaining = 0 + blank = "\x00"*32 + blank_lease = (0, 0, blank, blank, blank) + f = open(self.home, 'rb+') + for (leasenum,(oid,et,rs,cs,anid)) in self.enumerate_leases(f): + if cs == cancel_secret: + self._write_lease_record(f, leasenum, blank_lease) + modified += 1 + else: + remaining += 1 + if modified: + freed_space = self._pack_leases(f) + f.close() + return (freed_space, remaining) + + def _pack_leases(self, f): + # TODO: reclaim space from cancelled leases + return 0 + + def remote_read(self, offset, length): + return self.read_share_data(offset, length) + def remote_get_length(self): + return self._data_length + def remote_testv_and_writev(self, write_enabler, testv, datav, new_length): + if write_enabler != self._write_enabler: + # accomodate share migration by reporting the nodeid used for the + # old write enabler. + msg = "The write enabler was recorded by nodeid '%s'." % \ + (idlib.b2a(self._write_enabler_nodeid),) + raise BadWriterEnablerError(msg) + # check testv + test_results_v = [] + test_failed = False + for (offset, length, operator, specimen) in testv: + data = self.read_share_data(offset, length) + test_results_v.append(data) + if not self.compare(data, operator, specimen): + test_failed = False + if test_failed: + return (False, test_results_v) + # now apply the write vector + for (offset, data) in datav: + self.write_share_data(offset, data) + if new_length is not None: + self.change_container_size(new_length) + self._data_length = new_length + f = open(self.home, "rb+") + f.seek(self.DATA_LENGTH_OFFSET) + f.write(struct.pack(">Q", self._data_length)) + f.close() + return (True, test_results_v) + + def compare(self, a, op, b): + assert op in ("nop", "lt", "le", "eq", "ne", "ge", "gt") + if op == "nop": + return True + if op == "lt": + return a <= b + if op == "le": + return a < b + if op == "eq": + return a == b + if op == "ne": + return a != b + if op == "ge": + return a >= b + if op == "gt": + return a > b + # never reached + +def create_mutable_sharefile(filename, my_nodeid, write_enabler): + ms = MutableShareFile(filename) + ms.create(my_nodeid, write_enabler) + del ms + return MutableShareFile(filename) + + class StorageServer(service.MultiService, Referenceable): implements(RIStorageServer) name = 'storageserver' @@ -396,6 +741,9 @@ class StorageServer(service.MultiService, Referenceable): return iter([]) +# the code before here runs on the storage server, not the client +# the code beyond here runs on the client, not the storage server + """ Share data is written into a single file. At the start of the file, there is a series of four-byte big-endian offset values, which indicate where each diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 39e2c18a..e8d81dba 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -438,3 +438,66 @@ class Server(unittest.TestCase): leases = list(ss.get_leases("si3")) self.failUnlessEqual(len(leases), 2) + + +class MutableServer(unittest.TestCase): + + def setUp(self): + self.sparent = service.MultiService() + self._secret = itertools.count() + def tearDown(self): + return self.sparent.stopService() + + def workdir(self, name): + basedir = os.path.join("storage", "MutableServer", name) + return basedir + + def create(self, name, sizelimit=None): + workdir = self.workdir(name) + ss = StorageServer(workdir, sizelimit) + ss.setServiceParent(self.sparent) + return ss + + def test_create(self): + ss = self.create("test_create") + + def write_enabler(self, we_tag): + return hashutil.tagged_hash("we_blah", we_tag) + + def allocate(self, ss, storage_index, we_tag, sharenums, size): + write_enabler = self.write_enabler(we_tag) + renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next()) + cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next()) + return ss.remote_allocate_mutable_slot(storage_index, + write_enabler, + renew_secret, cancel_secret, + sharenums, size) + + def test_allocate(self): + ss = self.create("test_allocate") + shares = self.allocate(ss, "si1", "we1", set([0,1,2]), 100) + self.failUnlessEqual(len(shares), 3) + self.failUnlessEqual(set(shares.keys()), set([0,1,2])) + shares2 = ss.get_mutable_slot("si1") + self.failUnlessEqual(len(shares2), 3) + self.failUnlessEqual(set(shares2.keys()), set([0,1,2])) + # the actual RIMutableSlot objects are required to be singtons (one + # per SI+shnum), so each get_mutable_slot() call should return the + # same RemoteReferences + self.failUnlessEqual(set(shares.values()), set(shares2.values())) + + s0 = shares[0] + self.failUnlessEqual(s0.remote_read(0, 10), "") + self.failUnlessEqual(s0.remote_read(100, 10), "") + # try writing to one + data = "".join([ ("%d" % i) * 10 for i in range(10) ]) + answer = s0.remote_testv_and_writev(self.write_enabler("we1"), + [], + [(0, data),], + new_length=None) + self.failUnlessEqual(answer, []) + + self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111") + self.failUnlessEqual(s0.remote_read(95, 10), "99999") + self.failUnlessEqual(s0.remote_get_length(), 100) + -- 2.45.2