]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/storage.py
storage: replace sqlite with in-share lease records
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage.py
index 1d1a60a08e02d8893917a14f71549c6926926249..123090ab8c34f7b0798e634cc06b91df0700325c 100644 (file)
@@ -3,7 +3,6 @@ import os, re, weakref, stat, struct, time
 from foolscap import Referenceable
 from twisted.application import service
 from twisted.internet import defer
-from twisted.python import util
 
 from zope.interface import implements
 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
@@ -11,60 +10,180 @@ from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
 from allmydata.util import fileutil, idlib, mathutil
 from allmydata.util.assertutil import precondition
 
-try:
-    # python2.5 ships with sqlite builtin
-    import sqlite3.dbapi2
-    sqlite = sqlite3.dbapi2
-except ImportError:
-    # for python2.4, it's installed under a different name
-    import pysqlite2.dbapi2
-    sqlite = pysqlite2.dbapi2
-
-# store/
-# store/owners.db
-# store/shares/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/shares/$STORAGEINDEX/$SHARENUM on success
-# store/shares/$STORAGEINDEX
-# store/shares/$STORAGEINDEX/$SHARENUM
-# store/shares/$STORAGEINDEX/$SHARENUM/blocksize
-# store/shares/$STORAGEINDEX/$SHARENUM/data
-# store/shares/$STORAGEINDEX/$SHARENUM/blockhashes
-# store/shares/$STORAGEINDEX/$SHARENUM/sharehashtree
+# storage/
+# storage/shares/incoming
+#   incoming/ holds temp dirs named $STORAGEINDEX/$SHARENUM which will be
+#   moved to storage/shares/$STORAGEINDEX/$SHARENUM upon success
+# storage/shares/$STORAGEINDEX
+# storage/shares/$STORAGEINDEX/$SHARENUM
 
 # $SHARENUM matches this regex:
 NUM_RE=re.compile("[0-9]*")
 
+# each share file (in storage/shares/$SI/$SHNUM) contains lease information
+# and share data. The share data is accessed by RIBucketWriter.write and
+# RIBucketReader.read . The lease information is not accessible through these
+# interfaces.
+
+# The share file has the following layout:
+#  0x00: share file version number, four bytes, current version is 1
+#  0x04: share data length, four bytes big-endian = A
+#  0x08: number of leases, four bytes big-endian
+#  0x0c: beginning of share data (described below, at WriteBucketProxy)
+#  A+0x0c = B: first lease. Lease format is:
+#   B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
+#   B+0x04: renew secret, 32 bytes (SHA256)
+#   B+0x24: cancel secret, 32 bytes (SHA256)
+#   B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
+#   B+0x48: next lease, or end of record
+
+class ShareFile:
+    LEASE_SIZE = struct.calcsize(">L32s32sL")
+
+    def __init__(self, filename):
+        self.home = filename
+        f = open(self.home, 'rb')
+        (version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
+        assert version == 1
+        self._size = size
+        self._num_leases = num_leases
+        self._data_offset = 0xc
+        self._lease_offset = 0xc + self._size
+
+    def read_share_data(self, offset, length):
+        precondition(offset >= 0)
+        precondition(offset+length <= self._size)
+        f = open(self.home, 'rb')
+        f.seek(self._data_offset+offset)
+        return f.read(length)
+
+    def write_share_data(self, offset, data):
+        length = len(data)
+        precondition(offset >= 0)
+        precondition(offset+length <= self._size)
+        f = open(self.home, 'rb+')
+        real_offset = self._data_offset+offset
+        f.seek(real_offset)
+        assert f.tell() == real_offset
+        f.write(data)
+        f.close()
+
+    def _write_lease_record(self, f, lease_number, lease_info):
+        (owner_num, renew_secret, cancel_secret, expiration_time) = lease_info
+        offset = self._lease_offset + lease_number * self.LEASE_SIZE
+        f.seek(offset)
+        assert f.tell() == offset
+        f.write(struct.pack(">L32s32sL",
+                            owner_num, renew_secret, cancel_secret,
+                            expiration_time))
+
+    def _read_num_leases(self, f):
+        f.seek(0x08)
+        (num_leases,) = struct.unpack(">L", f.read(4))
+        return num_leases
+
+    def _write_num_leases(self, f, num_leases):
+        f.seek(0x08)
+        f.write(struct.pack(">L", num_leases))
+
+    def _truncate_leases(self, f, num_leases):
+        f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
+
+    def iter_leases(self):
+        """Yields (ownernum, renew_secret, cancel_secret, expiration_time)
+        for all leases."""
+        f = open(self.home, 'rb')
+        (version, size, num_leases) = struct.unpack(">LLL", f.read(0xc))
+        f.seek(self._lease_offset)
+        for i in range(num_leases):
+            data = f.read(self.LEASE_SIZE)
+            if data:
+                yield struct.unpack(">L32s32sL", data)
+
+    def add_lease(self, lease_info):
+        f = open(self.home, 'rb+')
+        num_leases = self._read_num_leases(f)
+        self._write_lease_record(f, num_leases, lease_info)
+        self._write_num_leases(f, num_leases+1)
+        f.close()
+
+    def renew_lease(self, renew_secret, new_expire_time):
+        for i,(on,rs,cs,et) in enumerate(self.iter_leases()):
+            if rs == renew_secret:
+                # yup. See if we need to update the owner time.
+                if new_expire_time > et:
+                    # yes
+                    new_lease = (on,rs,cs,new_expire_time)
+                    f = open(self.home, 'rb+')
+                    self._write_lease_record(f, i, new_lease)
+                    f.close()
+                return
+        raise IndexError("unable to renew non-existent lease")
+
+    def cancel_lease(self, cancel_secret):
+        """Remove a lease with the given cancel_secret. Return
+        (num_remaining_leases, space_freed). Raise IndexError if there was no
+        lease with the given cancel_secret."""
+
+        leases = list(self.iter_leases())
+        num_leases = len(leases)
+        num_leases_removed = 0
+        for i,lease_info in enumerate(leases[:]):
+            (on,rs,cs,et) = lease_info
+            if cs == cancel_secret:
+                leases[i] = None
+                num_leases_removed += 1
+        if not num_leases_removed:
+            raise IndexError("unable to find matching lease to cancel")
+        if num_leases_removed:
+            # pack and write out the remaining leases. We write these out in
+            # the same order as they were added, so that if we crash while
+            # doing this, we won't lose any non-cancelled leases.
+            leases = [l for l in leases if l] # remove the cancelled leases
+            f = open(self.home, 'rb+')
+            for i,lease in enumerate(leases):
+                self._write_lease_record(f, i, lease)
+            self._write_num_leases(f, len(leases))
+            self._truncate_leases(f, len(leases))
+            f.close()
+        return len(leases), self.LEASE_SIZE * num_leases_removed
+
+
 class BucketWriter(Referenceable):
     implements(RIBucketWriter)
 
-    def __init__(self, ss, incominghome, finalhome, size):
+    def __init__(self, ss, incominghome, finalhome, size, lease_info):
         self.ss = ss
         self.incominghome = incominghome
         self.finalhome = finalhome
         self._size = size
+        self._lease_info = lease_info
         self.closed = False
         self.throw_out_all_data = False
-        # touch the file, so later callers will see that we're working on it
-        f = open(self.incominghome, 'ab')
+        # touch the file, so later callers will see that we're working on it.
+        # Also construct the metadata.
+        assert not os.path.exists(self.incominghome)
+        f = open(self.incominghome, 'wb')
+        f.write(struct.pack(">LLL", 1, size, 0))
         f.close()
+        self._sharefile = ShareFile(self.incominghome)
 
     def allocated_size(self):
         return self._size
 
     def remote_write(self, offset, data):
         precondition(not self.closed)
-        precondition(offset >= 0)
-        precondition(offset+len(data) <= self._size)
         if self.throw_out_all_data:
             return
-        f = open(self.incominghome, 'ab')
-        f.seek(offset)
-        f.write(data)
-        f.close()
+        self._sharefile.write_share_data(offset, data)
 
     def remote_close(self):
         precondition(not self.closed)
+        self._sharefile.add_lease(self._lease_info)
         fileutil.rename(self.incominghome, self.finalhome)
+        self._sharefile = None
         self.closed = True
+
         filelen = os.stat(self.finalhome)[stat.ST_SIZE]
         self.ss.bucket_writer_closed(self, filelen)
 
@@ -73,12 +192,10 @@ class BucketReader(Referenceable):
     implements(RIBucketReader)
 
     def __init__(self, home):
-        self.home = home
+        self._share_file = ShareFile(home)
 
     def remote_read(self, offset, length):
-        f = open(self.home, 'rb')
-        f.seek(offset)
-        return f.read(length)
+        return self._share_file.read_share_data(offset, length)
 
 class StorageServer(service.MultiService, Referenceable):
     implements(RIStorageServer)
@@ -97,27 +214,11 @@ class StorageServer(service.MultiService, Referenceable):
         fileutil.make_dirs(self.incomingdir)
         self._active_writers = weakref.WeakKeyDictionary()
 
-        self.init_db()
-
         self.measure_size()
 
     def _clean_incomplete(self):
         fileutil.rm_dir(self.incomingdir)
 
-    def init_db(self):
-        # files in storedir with non-zbase32 characters in it (like ".") are
-        # safe, in that they cannot be accessed or overwritten by clients
-        # (whose binary storage_index values are always converted into a
-        # filename with idlib.b2a)
-        db_file = os.path.join(self.storedir, "owners.db")
-        need_to_init_db = not os.path.exists(db_file)
-        self._owner_db_con = sqlite.connect(db_file)
-        self._owner_db_cur = self._owner_db_con.cursor()
-        if need_to_init_db:
-            setup_file = util.sibpath(__file__, "owner.sql")
-            setup = open(setup_file, "r").read()
-            self._owner_db_cur.executescript(setup)
-
     def measure_size(self):
         self.consumed = fileutil.du(self.sharedir)
 
@@ -130,10 +231,21 @@ class StorageServer(service.MultiService, Referenceable):
     def remote_allocate_buckets(self, storage_index,
                                 renew_secret, cancel_secret,
                                 sharenums, allocated_size,
-                                canary):
+                                canary, owner_num=0):
+        # owner_num is not for clients to set, but rather it should be
+        # curried into the PersonalStorageServer instance that is dedicated
+        # to a particular owner.
         alreadygot = set()
         bucketwriters = {} # k: shnum, v: BucketWriter
         si_s = idlib.b2a(storage_index)
+
+        # in this implementation, the lease information (including secrets)
+        # goes into the share files themselves. It could also be put into a
+        # separate database. Note that the lease should not be added until
+        # the BucketWrite has been closed.
+        expire_time = time.time() + 31*24*60*60
+        lease_info = (owner_num, renew_secret, cancel_secret, expire_time)
+
         space_per_bucket = allocated_size
         no_limits = self.sizelimit is None
         yes_limits = not no_limits
@@ -144,10 +256,17 @@ class StorageServer(service.MultiService, Referenceable):
             finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
             if os.path.exists(incominghome) or os.path.exists(finalhome):
                 alreadygot.add(shnum)
+                # add a lease
+                if os.path.exists(incominghome):
+                    # TODO: add a lease to the still-in-construction share
+                    pass
+                else:
+                    sf = ShareFile(finalhome)
+                    sf.add_lease(lease_info)
             elif no_limits or remaining_space >= space_per_bucket:
                 fileutil.make_dirs(os.path.join(self.incomingdir, si_s))
                 bw = BucketWriter(self, incominghome, finalhome,
-                                  space_per_bucket)
+                                  space_per_bucket, lease_info)
                 if self.no_storage:
                     bw.throw_out_all_data = True
                 bucketwriters[shnum] = bw
@@ -161,109 +280,56 @@ class StorageServer(service.MultiService, Referenceable):
         if bucketwriters:
             fileutil.make_dirs(os.path.join(self.sharedir, si_s))
 
-        # now store the secrets somewhere. This requires a
-        # variable-length-list of (renew,cancel) secret tuples per bucket.
-        # Note that this does not need to be kept inside the share itself, if
-        # packing efficiency is a concern. For this implementation, we use a
-        # sqlite database, which puts everything in a single file.
-        self.add_lease(storage_index, renew_secret, cancel_secret)
-
         return alreadygot, bucketwriters
 
-    def add_lease(self, storage_index, renew_secret, cancel_secret):
-        # is the bucket already in our database?
-        cur = self._owner_db_cur
-        cur.execute("SELECT bucket_id FROM buckets"
-                    " WHERE storage_index = ?",
-                    (storage_index,))
-        res = cur.fetchone()
-        if res:
-            bucket_id = res[0]
-        else:
-            cur.execute("INSERT INTO buckets (storage_index)"
-                        " values(?)", (storage_index,))
-            cur.execute("SELECT bucket_id FROM buckets"
-                        " WHERE storage_index = ?",
-                        (storage_index,))
-            res = cur.fetchone()
-            bucket_id = res[0]
-
-        # what time will this lease expire? One month from now.
-        expire_time = time.time() + 31*24*60*60
-
-        # now, is this lease already in our database? Since we don't have
-        # owners yet, look for a match by renew_secret/cancel_secret
-        cur.execute("SELECT lease_id FROM leases"
-                    " WHERE renew_secret = ? AND cancel_secret = ?",
-                    (renew_secret, cancel_secret))
-        res = cur.fetchone()
-        if res:
-            # yes, so just update the timestamp
-            lease_id = res[0]
-            cur.execute("UPDATE leases"
-                        " SET expire_time = ?"
-                        " WHERE lease_id = ?",
-                        (expire_time, lease_id))
-        else:
-            # no, we need to add the lease
-            cur.execute("INSERT INTO leases "
-                        "(bucket_id, renew_secret, cancel_secret, expire_time)"
-                        " values(?,?,?,?)",
-                        (bucket_id, renew_secret, cancel_secret, expire_time))
-        self._owner_db_con.commit()
+    def get_or_add_owner(self, owner):
+        # this will be more fully implemented when we get the Introduction
+        # protocol built. At that point, it should take the 'owner' argument
+        # (either a FURL or a Sealer/public-key) and look it up in a
+        # persistent table, returning a short integer. If the owner doesn't
+        # yet exist in the table, create a new entry for it and return the
+        # new index.
+        return 0
 
     def remote_renew_lease(self, storage_index, renew_secret):
-        # find the lease
-        cur = self._owner_db_cur
-        cur.execute("SELECT leases.lease_id FROM buckets, leases"
-                    " WHERE buckets.storage_index = ?"
-                    "  AND buckets.bucket_id = leases.bucket_id"
-                    "  AND leases.renew_secret = ?",
-                    (storage_index, renew_secret))
-        res = cur.fetchone()
-        if res:
-            # found it, now update it. The new leases will expire one month
-            # from now.
-            expire_time = time.time() + 31*24*60*60
-            lease_id = res[0]
-            cur.execute("UPDATE leases"
-                        " SET expire_time = ?"
-                        " WHERE lease_id = ?",
-                        (expire_time, lease_id))
-        else:
-            # no such lease
-            raise IndexError("No such lease")
-        self._owner_db_con.commit()
+        new_expire_time = time.time() + 31*24*60*60
+        found_buckets = False
+        for shnum, filename in self._get_bucket_shares(storage_index):
+            found_buckets = True
+            sf = ShareFile(filename)
+            sf.renew_lease(renew_secret, new_expire_time)
+        if not found_buckets:
+            raise IndexError("no such lease to renew")
 
     def remote_cancel_lease(self, storage_index, cancel_secret):
-        # find the lease
-        cur = self._owner_db_cur
-        cur.execute("SELECT l.lease_id, b.storage_index, b.bucket_id"
-                    " FROM buckets b, leases l"
-                    " WHERE b.storage_index = ?"
-                    "  AND b.bucket_id = l.bucket_id"
-                    "  AND l.cancel_secret = ?",
-                    (storage_index, cancel_secret))
-        res = cur.fetchone()
-        if res:
-            # found it
-            lease_id, storage_index, bucket_id = res
-            cur.execute("DELETE FROM leases WHERE lease_id = ?",
-                        (lease_id,))
-            # was that the last one?
-            cur.execute("SELECT COUNT(*) FROM leases WHERE bucket_id = ?",
-                        (bucket_id,))
-            res = cur.fetchone()
-            remaining_leases = res[0]
-            if not remaining_leases:
-                # delete the share
-                cur.execute("DELETE FROM buckets WHERE bucket_id = ?",
-                            (bucket_id,))
-                self.delete_bucket(storage_index)
-        else:
-            # no such lease
-            raise IndexError("No such lease")
-        self._owner_db_con.commit()
+        storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
+
+        remaining_files = 0
+        total_space_freed = 0
+        found_buckets = False
+        for shnum, filename in self._get_bucket_shares(storage_index):
+            # note: if we can't find a lease on one share, we won't bother
+            # looking in the others. Unless something broke internally
+            # (perhaps we ran out of disk space while adding a lease), the
+            # leases on all shares will be identical.
+            found_buckets = True
+            sf = ShareFile(filename)
+            # this raises IndexError if the lease wasn't present
+            remaining_leases, space_freed = sf.cancel_lease(cancel_secret)
+            total_space_freed += space_freed
+            if remaining_leases:
+                remaining_files += 1
+            else:
+                # now remove the sharefile. We'll almost certainly be
+                # removing the entire directory soon.
+                filelen = os.stat(filename)[stat.ST_SIZE]
+                os.unlink(filename)
+                total_space_freed += filelen
+        if not remaining_files:
+            os.rmdir(storagedir)
+        self.consumed -= total_space_freed
+        if not found_buckets:
+            raise IndexError("no such lease to cancel")
 
     def delete_bucket(self, storage_index):
         storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
@@ -277,20 +343,44 @@ class StorageServer(service.MultiService, Referenceable):
         self.consumed += consumed_size
         del self._active_writers[bw]
 
-    def remote_get_buckets(self, storage_index):
-        bucketreaders = {} # k: sharenum, v: BucketReader
+    def _get_bucket_shares(self, storage_index):
+        """Return a list of (shnum, pathname) tuples for files that hold
+        shares for this storage_index. In each tuple, 'shnum' will always be
+        the integer form of the last component of 'pathname'."""
         storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
         try:
             for f in os.listdir(storagedir):
                 if NUM_RE.match(f):
-                    br = BucketReader(os.path.join(storagedir, f))
-                    bucketreaders[int(f)] = br
+                    filename = os.path.join(storagedir, f)
+                    yield (int(f), filename)
         except OSError:
             # Commonly caused by there being no buckets at all.
             pass
 
+    def remote_get_buckets(self, storage_index):
+        bucketreaders = {} # k: sharenum, v: BucketReader
+        for shnum, filename in self._get_bucket_shares(storage_index):
+            bucketreaders[shnum] = BucketReader(filename)
         return bucketreaders
 
+    def get_leases(self, storage_index):
+        """Provide an iterator that yields all of the leases attached to this
+        bucket. Each lease is returned as a tuple of (owner_num,
+        renew_secret, cancel_secret, expiration_time).
+
+        This method is not for client use.
+        """
+
+        # since all shares get the same lease data, we just grab the leases
+        # from the first share
+        try:
+            shnum, filename = self._get_bucket_shares(storage_index).next()
+            sf = ShareFile(filename)
+            return sf.iter_leases()
+        except StopIteration:
+            return iter([])
+
+
 """
 Share data is written into a single file. At the start of the file, there is
 a series of four-byte big-endian offset values, which indicate where each