]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
checkpointing mutable-file work. Storage layer is 80% in place.
authorBrian Warner <warner@allmydata.com>
Wed, 31 Oct 2007 02:47:36 +0000 (19:47 -0700)
committerBrian Warner <warner@allmydata.com>
Wed, 31 Oct 2007 02:47:36 +0000 (19:47 -0700)
src/allmydata/dirnode.py
src/allmydata/interfaces.py
src/allmydata/storage.py
src/allmydata/test/test_storage.py

index 26305def2a651b98cd9992313860baebf4b2d582..6696b6a0cb01070e362c5efc76c469fa393403e9 100644 (file)
@@ -6,15 +6,13 @@ from twisted.internet import defer
 from foolscap import Referenceable
 from allmydata import uri
 from allmydata.interfaces import RIVirtualDriveServer, \
-     IDirectoryNode, IFileNode, IFileURI, IDirnodeURI, IURI
+     IDirectoryNode, IFileNode, IFileURI, IDirnodeURI, IURI, \
+     BadWriteEnablerError
 from allmydata.util import bencode, idlib, hashutil, fileutil
 from allmydata.Crypto.Cipher import AES
 
 # VirtualDriveServer is the side that hosts directory nodes
 
-class BadWriteEnablerError(Exception):
-    pass
-
 class NoPublicRootError(Exception):
     pass
 
index 38285b3df814458eb0e4b297b656f57f5fdf74fa..9125350a5a3df6af105f0ec8caaba379fcb58446 100644 (file)
@@ -82,6 +82,49 @@ class RIBucketReader(RemoteInterface):
     def read(offset=int, length=int):
         return ShareData
 
+TestVector = ListOf(TupleOf(int, int, str, str))
+# elements are (offset, length, operator, specimen)
+# operator is one of "lt, le, eq, ne, ge, gt, nop"
+# nop always passes and is used to fetch data while writing.
+# you should use length==len(specimen) for everything except nop
+DataVector = ListOf(TupleOf(int, ShareData))
+# (offset, data). This limits us to 30 writes of 1MiB each per call
+TestResults = ListOf(str)
+# returns data[offset:offset+length] for each element of TestVector
+
+class RIMutableSlot(RemoteInterface):
+    def testv_and_writev(write_enabler=Hash,
+                         testv=TestVector,
+                         datav=DataVector,
+                         new_length=ChoiceOf(None, int)):
+        """General-purpose test-and-set operation for mutable slots. Perform
+        the given comparisons. If they all pass, then apply the write vector.
+
+        If new_length is not None, use it to set the size of the container.
+        This can be used to pre-allocate space for a series of upcoming
+        writes, or truncate existing data. If the container is growing,
+        new_length will be applied before datav. If the container is
+        shrinking, it will be applied afterwards.
+
+        Return the old data that was used for the comparisons.
+
+        The boolean return value is True if the write vector was applied,
+        false if not.
+
+        If the write_enabler is wrong, this will raise BadWriterEnablerError.
+        To enable share migration, the exception will have the nodeid used
+        for the old write enabler embedded in it, in the following string::
+
+         The write enabler was recorded by nodeid '%s'.
+
+        """
+        return TupleOf(bool, TestResults)
+
+    def read(offset=int, length=int):
+        return ShareData
+
+    def get_length():
+        return int
 
 class RIStorageServer(RemoteInterface):
     def allocate_buckets(storage_index=StorageIndex,
@@ -126,6 +169,47 @@ class RIStorageServer(RemoteInterface):
         return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
 
 
+    def allocate_mutable_slot(storage_index=StorageIndex,
+                              write_enabler=Hash,
+                              renew_secret=LeaseRenewSecret,
+                              cancel_secret=LeaseCancelSecret,
+                              sharenums=SetOf(int, maxLength=MAX_BUCKETS),
+                              allocated_size=int):
+        """
+        @param storage_index: the index of the bucket to be created or
+                              increfed.
+        @param write_enabler: a secret that is stored along with the slot.
+                              Writes are accepted from any caller who can
+                              present the matching secret. A different secret
+                              should be used for each slot*server pair.
+        @param renew_secret: This is the secret used to protect bucket refresh
+                             This secret is generated by the client and
+                             stored for later comparison by the server. Each
+                             server is given a different secret.
+        @param cancel_secret: Like renew_secret, but protects bucket decref.
+        @param sharenums: these are the share numbers (probably between 0 and
+                          99) that the sender is proposing to store on this
+                          server.
+        @param allocated_size: all shares will pre-allocate this many bytes.
+                               Use this to a) confirm that you can claim as
+                               much space as you want before you actually
+                               send the data, and b) reduce the disk-IO cost
+                               of doing incremental writes.
+
+        @return: dict mapping sharenum to slot. The return value may include
+                 more sharenums than asked, if some shares already existed.
+                 New leases are added for all
+                 shares.
+
+        """
+        return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS)
+
+    def get_mutable_slot(storage_index=StorageIndex):
+        """This returns an empty dictionary if the server has no shares
+        of the slot mentioned."""
+        return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS)
+
+
 class IStorageBucketWriter(Interface):
     def put_block(segmentnum=int, data=ShareData):
         """@param data: For most segments, this data will be 'blocksize'
@@ -1117,6 +1201,10 @@ class IVirtualDrive(Interface):
 class NotCapableError(Exception):
     """You have tried to write to a read-only node."""
 
+class BadWriteEnablerError(Exception):
+    pass
+
+
 class RIControlClient(RemoteInterface):
 
     def wait_for_client_connections(num_clients=int):
index 388f8e1bac8ee6e24655366313c8963f2c2ecc7c..6a15edbbcbe209b5964feb35765084f159386df6 100644 (file)
@@ -7,10 +7,14 @@ from twisted.internet import defer
 
 from zope.interface import implements
 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
-     RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE
+     RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
+     BadWriterEnablerError, RIMutableSlot
 from allmydata.util import fileutil, idlib, mathutil
 from allmydata.util.assertutil import precondition, _assert
 
+class DataTooLargeError(Exception):
+    pass
+
 # storage/
 # storage/shares/incoming
 #   incoming/ holds temp dirs named $STORAGEINDEX/$SHARENUM which will be
@@ -213,6 +217,347 @@ class BucketReader(Referenceable):
         return self._share_file.read_share_data(offset, length)
 
 
+# the MutableShareFile is like the ShareFile, but used for mutable data. It
+# has a different layout. See docs/mutable.txt for more details.
+
+# #   offset    size    name
+# 1   0         32      magic verstr "tahoe mutable container v1" plus binary
+# 2   32        32      write enabler's nodeid
+# 3   64        32      write enabler
+# 4   72        8       data size (actual share data present) (a)
+# 5   80        8       offset of (8) count of extra leases (after data)
+# 6   88        416     four leases, 104 bytes each
+#                        0    4   ownerid (0 means "no lease here")
+#                        4    4   expiration timestamp
+#                        8   32   renewal token
+#                        40  32   cancel token
+#                        72  32   nodeid which accepted the tokens
+# 7   504       (a)     data
+# 8   ??        4       count of extra leases
+# 9   ??        n*104    extra leases
+
+
+assert struct.calcsize("L"), 4
+assert struct.calcsize("Q"), 8
+
+class MutableShareFile(Referenceable):
+    # note: at any given time, there should only be a single instance of this
+    # class per filename. More than one is likely to corrupt the container,
+    # because of state we cache in instance variables. This requires the
+    # StorageServer to use a WeakValueDictionary, indexed by filename. This
+    # could be improved by cacheing less and doing more IO.
+    implements(RIMutableSlot)
+
+    DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s")
+    HEADER_SIZE = struct.calcsize(">32s32s32sQQ") # doesn't include leases
+    LEASE_SIZE = struct.calcsize(">LL32s32s32s")
+    DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
+    # our sharefiles share with a recognizable string, plus some random
+    # binary data to reduce the chance that a regular text file will look
+    # like a sharefile.
+    MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
+    assert len(MAGIC) == 32
+    MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
+    # TODO: decide upon a policy for max share size
+
+    def __init__(self, filename):
+        self.home = filename
+        self._base_lease_offset = self.HEADER_SIZE
+        if os.path.exists(self.home):
+            f = open(self.home, 'rb')
+            data = f.read(88)
+            (magic,
+             self._write_enabler_nodeid, self._write_enabler,
+             self._data_length, offset) = struct.unpack(">32s32s32sQQ", data)
+            assert magic == self.MAGIC
+            self._extra_lease_offset = offset # points at (8)
+            f.seek(self._extra_lease_offset)
+            data = f.read(4)
+            self._num_extra_leases = struct.unpack(">L", data)
+            f.close()
+
+
+    def create(self, my_nodeid, write_enabler):
+        assert not os.path.exists(self.home)
+        self._write_enabler = write_enabler
+        self._data_length = 0
+        self._extra_lease_offset = (self.HEADER_SIZE
+                                    + 4 * self.LEASE_SIZE
+                                    + self._data_length)
+        assert self._extra_lease_offset == self.DATA_OFFSET # true at creation
+        self._num_extra_leases = 0
+        f = open(self.home, 'wb')
+        header = struct.pack(">32s32s32sQQ",
+                             self.MAGIC, my_nodeid, write_enabler,
+                             self._data_length, self._extra_lease_offset,
+                             )
+        f.write(header)
+        # data goes here, empty after creation
+        f.write(struct.pack(">L", self._num_extra_leases))
+        # extra leases go here, none at creation
+        f.close()
+
+
+    def read_share_data(self, offset, length):
+        precondition(offset >= 0)
+        if offset+length > self._data_length:
+            # reads beyond the end of the data are truncated. Reads that
+            # start beyond the end of the data return an empty string.
+            length = max(0, self._data_length-offset)
+        if length == 0:
+            return ""
+        precondition(offset+length <= self._data_length)
+        f = open(self.home, 'rb')
+        f.seek(self.DATA_OFFSET+offset)
+        return f.read(length)
+
+    def change_container_size(self, new_container_size):
+        if new_container_size > self.MAX_SIZE:
+            raise DataTooLargeError()
+        new_extra_lease_offset = self.DATA_OFFSET + new_container_size
+        if new_extra_lease_offset < self._extra_lease_offset:
+            # TODO: allow containers to shrink
+            return
+        f = open(self.home, 'rb+')
+        f.seek(self._extra_lease_offset)
+        extra_lease_data = f.read(4 + self._num_extra_leases * self.LEASE_SIZE)
+        f.seek(new_extra_lease_offset)
+        f.write(extra_lease_data)
+        self._extra_lease_offset = new_extra_lease_offset
+        # an interrupt here will corrupt the leases, iff the move caused the
+        # extra leases to overlap.
+        f.seek(self.DATA_LENGTH_OFFSET+8)
+        f.write(struct.pack(">Q", new_extra_lease_offset))
+        f.close()
+
+    def write_share_data(self, offset, data):
+        length = len(data)
+        precondition(offset >= 0)
+        if offset+length < self._data_length:
+            # they are not expanding their data size
+            f = open(self.home, 'rb+')
+            f.seek(self.DATA_OFFSET+offset)
+            f.write(data)
+            f.close()
+            return
+        if self.DATA_OFFSET+offset+length <= self._extra_lease_offset:
+            # they are expanding their data size, but not the container size
+            f = open(self.home, 'rb+')
+            self._data_length = offset+length
+            f.seek(self.DATA_LENGTH_OFFSET)
+            f.write(struct.pack(">Q", self._data_length))
+            # an interrupt here will result in a corrupted share
+            f.seek(self.DATA_OFFSET+offset)
+            f.write(data)
+            f.close()
+            return
+
+        # they are expanding the container, so we have to move the leases.
+        # With luck, they're expanding it more than the size of the extra
+        # lease block, which will minimize the corrupt-the-share window
+
+        self.change_container_size(offset+length)
+
+        # an interrupt here is ok.. the container has been enlarged but the
+        # data remains untouched
+
+        self._data_length = offset+length
+
+        f = open(self.home, 'rb+')
+        f.seek(self.DATA_OFFSET+offset)
+        f.write(data)
+        # an interrupt here will result in a corrupted share
+        f.seek(self.DATA_LENGTH_OFFSET)
+        f.write(struct.pack(">Q", self._data_length))
+        f.close()
+        return
+
+    def _write_lease_record(self, f, lease_number, lease_info):
+        (ownerid, expiration_time,
+         renew_secret, cancel_secret, nodeid) = lease_info
+        if lease_number < 4:
+            offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
+        elif (lease_number-4) < self._num_extra_leases:
+            offset = (self._extra_lease_offset
+                      + 4
+                      + (lease_number-4)*self.LEASE_NUMBER)
+        else:
+            f.seek(self._extra_lease_offset)
+            f.write(struct.pack(">L", self._num_extra_leases+1))
+            self._num_extra_leases += 1
+            offset = (self._extra_lease_offset
+                      + 4
+                      + (lease_number-4)*self.LEASE_NUMBER)
+        f.seek(offset)
+        assert f.tell() == offset
+        f.write(struct.pack(">LL32s32s32s",
+                            ownerid, int(expiration_time),
+                            renew_secret, cancel_secret, nodeid))
+
+    def _read_lease_record(self, f, lease_number):
+        # returns a 5-tuple of lease info, or None
+        if lease_number < 4:
+            offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
+        elif (lease_number-4) < self._num_extra_leases:
+            offset = (self._extra_lease_offset
+                      + 4
+                      + (lease_number-4)*self.LEASE_NUMBER)
+        else:
+            raise IndexError("No such lease number %d" % lease_number)
+        f.seek(offset)
+        assert f.tell() == offset
+        data = f.read(self.LEASE_SIZE)
+        lease_info = struct.unpack(">LL32s32s32s", data)
+        (ownerid, expiration_time,
+         renew_secret, cancel_secret, nodeid) = lease_info
+        if ownerid == 0:
+            return None
+        return lease_info
+
+    def _read_num_leases(self, f):
+        f.seek(self.HEADER_SIZE)
+        leasedata = f.read(4*self.LEASE_SIZE)
+        num_leases = 0
+        for i in range(4):
+            base = i*self.LEASE_SIZE
+            (ownerid,) = struct.unpack(">L", leasedata[base:base+4])
+            if ownerid != 0:
+                num_leases += 1
+        return num_leases + self._num_extra_leases
+
+    def pack_leases(self):
+        pass
+
+    def _truncate_leases(self, f, num_leases):
+        f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
+
+    def enumerate_leases(self, f):
+        """Yields (leasenum, (ownerid, expiration_time, renew_secret,
+        cancel_secret, accepting_nodeid)) for all leases."""
+        for i in range(self._read_num_leases(f)):
+            try:
+                data = self._read_lease_record(f, i)
+                if data is not None:
+                    yield (i,data)
+            except IndexError:
+                return
+
+    def add_lease(self, lease_info):
+        f = open(self.home, 'rb+')
+        num_leases = self._read_num_leases(f)
+        self._write_lease_record(f, num_leases, lease_info)
+        self._write_num_leases(f, num_leases+1)
+        f.close()
+
+    def renew_lease(self, renew_secret, new_expire_time):
+        accepting_nodeids = set()
+        f = open(self.home, 'rb+')
+        for (leasenum,(oid,et,rs,cs,anid)) in self.enumerate_leases(f):
+            if rs == renew_secret:
+                # yup. See if we need to update the owner time.
+                if new_expire_time > et:
+                    # yes
+                    new_lease = (oid,new_expire_time,rs,cs,anid)
+                    self._write_lease_record(f, leasenum, new_lease)
+                f.close()
+                return
+            accepting_nodeids.add(anid)
+        f.close()
+        # TODO: return the accepting_nodeids set, to give the client a chance
+        # to update the leases on a share which has been migrated from its
+        # original server to a new one.
+        raise IndexError("unable to renew non-existent lease")
+
+    def add_or_renew_lease(self, lease_info):
+        ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info
+        try:
+            self.renew_lease(renew_secret, expire_time)
+        except IndexError:
+            self.add_lease(lease_info)
+
+    def cancel_lease(self, cancel_secret):
+        """Remove any leases with the given cancel_secret. Return
+        (num_remaining_leases, space_freed). Raise IndexError if there was no
+        lease with the given cancel_secret."""
+
+        modified = 0
+        remaining = 0
+        blank = "\x00"*32
+        blank_lease = (0, 0, blank, blank, blank)
+        f = open(self.home, 'rb+')
+        for (leasenum,(oid,et,rs,cs,anid)) in self.enumerate_leases(f):
+            if cs == cancel_secret:
+                self._write_lease_record(f, leasenum, blank_lease)
+                modified += 1
+            else:
+                remaining += 1
+        if modified:
+            freed_space = self._pack_leases(f)
+        f.close()
+        return (freed_space, remaining)
+
+    def _pack_leases(self, f):
+        # TODO: reclaim space from cancelled leases
+        return 0
+
+    def remote_read(self, offset, length):
+        return self.read_share_data(offset, length)
+    def remote_get_length(self):
+        return self._data_length
+    def remote_testv_and_writev(self, write_enabler, testv, datav, new_length):
+        if write_enabler != self._write_enabler:
+            # accomodate share migration by reporting the nodeid used for the
+            # old write enabler.
+            msg = "The write enabler was recorded by nodeid '%s'." % \
+                  (idlib.b2a(self._write_enabler_nodeid),)
+            raise BadWriterEnablerError(msg)
+        # check testv
+        test_results_v = []
+        test_failed = False
+        for (offset, length, operator, specimen) in testv:
+            data = self.read_share_data(offset, length)
+            test_results_v.append(data)
+            if not self.compare(data, operator, specimen):
+                test_failed = False
+        if test_failed:
+            return (False, test_results_v)
+        # now apply the write vector
+        for (offset, data) in datav:
+            self.write_share_data(offset, data)
+        if new_length is not None:
+            self.change_container_size(new_length)
+            self._data_length = new_length
+            f = open(self.home, "rb+")
+            f.seek(self.DATA_LENGTH_OFFSET)
+            f.write(struct.pack(">Q", self._data_length))
+            f.close()
+        return (True, test_results_v)
+
+    def compare(self, a, op, b):
+        assert op in ("nop", "lt", "le", "eq", "ne", "ge", "gt")
+        if op == "nop":
+            return True
+        if op == "lt":
+            return a <= b
+        if op == "le":
+            return a < b
+        if op == "eq":
+            return a == b
+        if op == "ne":
+            return a != b
+        if op == "ge":
+            return a >= b
+        if op == "gt":
+            return a > b
+        # never reached
+
+def create_mutable_sharefile(filename, my_nodeid, write_enabler):
+    ms = MutableShareFile(filename)
+    ms.create(my_nodeid, write_enabler)
+    del ms
+    return MutableShareFile(filename)
+
+
 class StorageServer(service.MultiService, Referenceable):
     implements(RIStorageServer)
     name = 'storageserver'
@@ -396,6 +741,9 @@ class StorageServer(service.MultiService, Referenceable):
             return iter([])
 
 
+# the code before here runs on the storage server, not the client
+# the code beyond here runs on the client, not the storage server
+
 """
 Share data is written into a single file. At the start of the file, there is
 a series of four-byte big-endian offset values, which indicate where each
index 39e2c18a5fe79316f8767630b052f466b71fdcce..e8d81dbabf2ae44c06608939b3720083fa01e029 100644 (file)
@@ -438,3 +438,66 @@ class Server(unittest.TestCase):
         leases = list(ss.get_leases("si3"))
         self.failUnlessEqual(len(leases), 2)
 
+
+
+class MutableServer(unittest.TestCase):
+
+    def setUp(self):
+        self.sparent = service.MultiService()
+        self._secret = itertools.count()
+    def tearDown(self):
+        return self.sparent.stopService()
+
+    def workdir(self, name):
+        basedir = os.path.join("storage", "MutableServer", name)
+        return basedir
+
+    def create(self, name, sizelimit=None):
+        workdir = self.workdir(name)
+        ss = StorageServer(workdir, sizelimit)
+        ss.setServiceParent(self.sparent)
+        return ss
+
+    def test_create(self):
+        ss = self.create("test_create")
+
+    def write_enabler(self, we_tag):
+        return hashutil.tagged_hash("we_blah", we_tag)
+
+    def allocate(self, ss, storage_index, we_tag, sharenums, size):
+        write_enabler = self.write_enabler(we_tag)
+        renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
+        cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
+        return ss.remote_allocate_mutable_slot(storage_index,
+                                               write_enabler,
+                                               renew_secret, cancel_secret,
+                                               sharenums, size)
+
+    def test_allocate(self):
+        ss = self.create("test_allocate")
+        shares = self.allocate(ss, "si1", "we1", set([0,1,2]), 100)
+        self.failUnlessEqual(len(shares), 3)
+        self.failUnlessEqual(set(shares.keys()), set([0,1,2]))
+        shares2 = ss.get_mutable_slot("si1")
+        self.failUnlessEqual(len(shares2), 3)
+        self.failUnlessEqual(set(shares2.keys()), set([0,1,2]))
+        # the actual RIMutableSlot objects are required to be singtons (one
+        # per SI+shnum), so each get_mutable_slot() call should return the
+        # same RemoteReferences
+        self.failUnlessEqual(set(shares.values()), set(shares2.values()))
+
+        s0 = shares[0]
+        self.failUnlessEqual(s0.remote_read(0, 10), "")
+        self.failUnlessEqual(s0.remote_read(100, 10), "")
+        # try writing to one
+        data = "".join([ ("%d" % i) * 10 for i in range(10) ])
+        answer = s0.remote_testv_and_writev(self.write_enabler("we1"),
+                                            [],
+                                            [(0, data),],
+                                            new_length=None)
+        self.failUnlessEqual(answer, [])
+
+        self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111")
+        self.failUnlessEqual(s0.remote_read(95, 10), "99999")
+        self.failUnlessEqual(s0.remote_get_length(), 100)
+