]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/storage.py
deletion phase3: add a sqlite database to track renew/cancel-lease secrets, implement...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage.py
index 4998e20fa889e816cb8a283d3b97ad14c5d3351b..e2af4fd90f2c2ffdd8c231610e86deda23c0e0dd 100644 (file)
@@ -1,8 +1,9 @@
-import os, re, weakref, stat, struct
+import os, re, weakref, stat, struct, time
 
 from foolscap import Referenceable
 from twisted.application import service
 from twisted.internet import defer
+from twisted.python import util
 
 from zope.interface import implements
 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
@@ -10,14 +11,17 @@ from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
 from allmydata.util import fileutil, idlib, mathutil
 from allmydata.util.assertutil import precondition
 
+from pysqlite2 import dbapi2 as sqlite
+
 # store/
-# store/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/$STORAGEINDEX/$SHARENUM on success
-# store/$STORAGEINDEX
-# store/$STORAGEINDEX/$SHARENUM
-# store/$STORAGEINDEX/$SHARENUM/blocksize
-# store/$STORAGEINDEX/$SHARENUM/data
-# store/$STORAGEINDEX/$SHARENUM/blockhashes
-# store/$STORAGEINDEX/$SHARENUM/sharehashtree
+# store/owners.db
+# store/shares/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/shares/$STORAGEINDEX/$SHARENUM on success
+# store/shares/$STORAGEINDEX
+# store/shares/$STORAGEINDEX/$SHARENUM
+# store/shares/$STORAGEINDEX/$SHARENUM/blocksize
+# store/shares/$STORAGEINDEX/$SHARENUM/data
+# store/shares/$STORAGEINDEX/$SHARENUM/blockhashes
+# store/shares/$STORAGEINDEX/$SHARENUM/sharehashtree
 
 # $SHARENUM matches this regex:
 NUM_RE=re.compile("[0-9]*")
@@ -75,22 +79,40 @@ class StorageServer(service.MultiService, Referenceable):
 
     def __init__(self, storedir, sizelimit=None, no_storage=False):
         service.MultiService.__init__(self)
-        fileutil.make_dirs(storedir)
         self.storedir = storedir
+        sharedir = os.path.join(storedir, "shares")
+        fileutil.make_dirs(sharedir)
+        self.sharedir = sharedir
         self.sizelimit = sizelimit
         self.no_storage = no_storage
-        self.incomingdir = os.path.join(storedir, 'incoming')
+        self.incomingdir = os.path.join(sharedir, 'incoming')
         self._clean_incomplete()
         fileutil.make_dirs(self.incomingdir)
         self._active_writers = weakref.WeakKeyDictionary()
 
+        self.init_db()
+
         self.measure_size()
 
     def _clean_incomplete(self):
         fileutil.rm_dir(self.incomingdir)
 
+    def init_db(self):
+        # files in storedir with non-zbase32 characters in it (like ".") are
+        # safe, in that they cannot be accessed or overwritten by clients
+        # (whose binary storage_index values are always converted into a
+        # filename with idlib.b2a)
+        db_file = os.path.join(self.storedir, "owners.db")
+        need_to_init_db = not os.path.exists(db_file)
+        self._owner_db_con = sqlite.connect(db_file)
+        self._owner_db_cur = self._owner_db_con.cursor()
+        if need_to_init_db:
+            setup_file = util.sibpath(__file__, "owner.sql")
+            setup = open(setup_file, "r").read()
+            self._owner_db_cur.executescript(setup)
+
     def measure_size(self):
-        self.consumed = fileutil.du(self.storedir)
+        self.consumed = fileutil.du(self.sharedir)
 
     def allocated_size(self):
         space = self.consumed
@@ -112,7 +134,7 @@ class StorageServer(service.MultiService, Referenceable):
             remaining_space = self.sizelimit - self.allocated_size()
         for shnum in sharenums:
             incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
-            finalhome = os.path.join(self.storedir, si_s, "%d" % shnum)
+            finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
             if os.path.exists(incominghome) or os.path.exists(finalhome):
                 alreadygot.add(shnum)
             elif no_limits or remaining_space >= space_per_bucket:
@@ -130,17 +152,127 @@ class StorageServer(service.MultiService, Referenceable):
                 pass
 
         if bucketwriters:
-            fileutil.make_dirs(os.path.join(self.storedir, si_s))
+            fileutil.make_dirs(os.path.join(self.sharedir, si_s))
+
+        # now store the secrets somewhere. This requires a
+        # variable-length-list of (renew,cancel) secret tuples per bucket.
+        # Note that this does not need to be kept inside the share itself, if
+        # packing efficiency is a concern. For this implementation, we use a
+        # sqlite database, which puts everything in a single file.
+        self.add_lease(storage_index, renew_secret, cancel_secret)
 
         return alreadygot, bucketwriters
 
+    def add_lease(self, storage_index, renew_secret, cancel_secret):
+        # is the bucket already in our database?
+        cur = self._owner_db_cur
+        cur.execute("SELECT bucket_id FROM buckets"
+                    " WHERE storage_index = ?",
+                    (storage_index,))
+        res = cur.fetchone()
+        if res:
+            bucket_id = res[0]
+        else:
+            cur.execute("INSERT INTO buckets (storage_index)"
+                        " values(?)", (storage_index,))
+            cur.execute("SELECT bucket_id FROM buckets"
+                        " WHERE storage_index = ?",
+                        (storage_index,))
+            res = cur.fetchone()
+            bucket_id = res[0]
+
+        # what time will this lease expire? One month from now.
+        expire_time = time.time() + 31*24*60*60
+
+        # now, is this lease already in our database? Since we don't have
+        # owners yet, look for a match by renew_secret/cancel_secret
+        cur.execute("SELECT lease_id FROM leases"
+                    " WHERE renew_secret = ? AND cancel_secret = ?",
+                    (renew_secret, cancel_secret))
+        res = cur.fetchone()
+        if res:
+            # yes, so just update the timestamp
+            lease_id = res[0]
+            cur.execute("UPDATE leases"
+                        " SET expire_time = ?"
+                        " WHERE lease_id = ?",
+                        (expire_time, lease_id))
+        else:
+            # no, we need to add the lease
+            cur.execute("INSERT INTO leases "
+                        "(bucket_id, renew_secret, cancel_secret, expire_time)"
+                        " values(?,?,?,?)",
+                        (bucket_id, renew_secret, cancel_secret, expire_time))
+        self._owner_db_con.commit()
+
+    def remote_renew_lease(self, storage_index, renew_secret):
+        # find the lease
+        cur = self._owner_db_cur
+        cur.execute("SELECT leases.lease_id FROM buckets, leases"
+                    " WHERE buckets.storage_index = ?"
+                    "  AND buckets.bucket_id = leases.bucket_id"
+                    "  AND leases.renew_secret = ?",
+                    (storage_index, renew_secret))
+        res = cur.fetchone()
+        if res:
+            # found it, now update it. The new leases will expire one month
+            # from now.
+            expire_time = time.time() + 31*24*60*60
+            lease_id = res[0]
+            cur.execute("UPDATE leases"
+                        " SET expire_time = ?"
+                        " WHERE lease_id = ?",
+                        (expire_time, lease_id))
+        else:
+            # no such lease
+            raise IndexError("No such lease")
+        self._owner_db_con.commit()
+
+    def remote_cancel_lease(self, storage_index, cancel_secret):
+        # find the lease
+        cur = self._owner_db_cur
+        cur.execute("SELECT l.lease_id, b.storage_index, b.bucket_id"
+                    " FROM buckets b, leases l"
+                    " WHERE b.storage_index = ?"
+                    "  AND b.bucket_id = l.bucket_id"
+                    "  AND l.cancel_secret = ?",
+                    (storage_index, cancel_secret))
+        res = cur.fetchone()
+        if res:
+            # found it
+            lease_id, storage_index, bucket_id = res
+            cur.execute("DELETE FROM leases WHERE lease_id = ?",
+                        (lease_id,))
+            # was that the last one?
+            cur.execute("SELECT COUNT(*) FROM leases WHERE bucket_id = ?",
+                        (bucket_id,))
+            res = cur.fetchone()
+            remaining_leases = res[0]
+            if not remaining_leases:
+                # delete the share
+                cur.execute("DELETE FROM buckets WHERE bucket_id = ?",
+                            (bucket_id,))
+                self.delete_bucket(storage_index)
+        else:
+            # no such lease
+            raise IndexError("No such lease")
+        self._owner_db_con.commit()
+
+    def delete_bucket(self, storage_index):
+        storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
+        # measure the usage of this directory, to remove it from our current
+        # total
+        consumed = fileutil.du(storagedir)
+        fileutil.rm_dir(storagedir)
+        self.consumed -= consumed
+
     def bucket_writer_closed(self, bw, consumed_size):
         self.consumed += consumed_size
         del self._active_writers[bw]
 
     def remote_get_buckets(self, storage_index):
         bucketreaders = {} # k: sharenum, v: BucketReader
-        storagedir = os.path.join(self.storedir, idlib.b2a(storage_index))
+        storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
         try:
             for f in os.listdir(storagedir):
                 if NUM_RE.match(f):