]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
deletion phase3: add a sqlite database to track renew/cancel-lease secrets, implement...
authorBrian Warner <warner@lothar.com>
Tue, 28 Aug 2007 06:41:40 +0000 (23:41 -0700)
committerBrian Warner <warner@lothar.com>
Tue, 28 Aug 2007 06:41:40 +0000 (23:41 -0700)
MANIFEST.in
README
setup.py
src/allmydata/interfaces.py
src/allmydata/owner.sql [new file with mode: 0644]
src/allmydata/storage.py
src/allmydata/test/test_storage.py
src/allmydata/test/test_system.py

index 3282c071d94ff263ec2c940993e6280f29bd360d..67695049f2e4675186414ab3641369afc640e2e4 100644 (file)
@@ -1,2 +1,3 @@
 
 include allmydata/web/*.xhtml allmydata/web/*.html allmydata/web/*.css
+include allmydata/*.sql
diff --git a/README b/README
index 4e4954503f459e6e238c74267bd22d76a0a14c2e..a0edc0f31fe91334656ae80c102ec88cb519fe06 100644 (file)
--- a/README
+++ b/README
@@ -109,6 +109,8 @@ gcc make python-dev python-twisted python-nevow python-pyopenssl".
    libraries with the cygwin package management tool, then get the pyOpenSSL
    source code, cd into it, and run "python ./setup.py install".
 
+ + pysqlite3 (database library)
+
  + the pywin32 package: only required on Windows
 
    http://sourceforge.net/projects/pywin32/
index 9d18432d9f282bc3634144c3d094044b124cc90b..1b2ed117ebb721d39ecd7325b0ee5aed8cfcdcad 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -90,7 +90,8 @@ setup(name='allmydata-tahoe',
                 ],
       package_dir={ "allmydata": "src/allmydata",},
       scripts = ["bin/allmydata-tahoe"],
-      package_data={ 'allmydata': ['web/*.xhtml', 'web/*.html', 'web/*.css'] },
+      package_data={ 'allmydata': ['web/*.xhtml', 'web/*.html', 'web/*.css',
+                                   'owner.sql'] },
       classifiers=trove_classifiers,
       test_suite="allmydata.test",
       ext_modules=[
index 8311bc15dd23b7f3102eb5b516fdbb37a97e0b76..2fb60e9a8c9e7106b85682d1cda708d235afd73b 100644 (file)
@@ -105,6 +105,20 @@ class RIStorageServer(RemoteInterface):
         """
         return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
                        DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
+
+    def renew_lease(storage_index=StorageIndex, renew_secret=LeaseRenewSecret):
+        """
+        Renew the lease on a given bucket. Some networks will use this, some
+        will not.
+        """
+
+    def cancel_lease(storage_index=StorageIndex,
+                     cancel_secret=LeaseCancelSecret):
+        """
+        Cancel the lease on a given bucket. If this was the last lease on the
+        bucket, the bucket will be deleted.
+        """
+
     def get_buckets(storage_index=StorageIndex):
         return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
 
diff --git a/src/allmydata/owner.sql b/src/allmydata/owner.sql
new file mode 100644 (file)
index 0000000..66ecfad
--- /dev/null
@@ -0,0 +1,20 @@
+CREATE TABLE buckets
+(
+  bucket_id     integer PRIMARY KEY AUTOINCREMENT,
+  storage_index char(32)
+);
+
+CREATE TABLE owners
+(
+  owner_id      integer PRIMARY KEY AUTOINCREMENT
+);
+
+CREATE TABLE leases
+(
+  lease_id      integer PRIMARY KEY AUTOINCREMENT,
+  bucket_id     integer,
+  owner_id      integer,
+  renew_secret  char(32),
+  cancel_secret char(32),
+  expire_time   timestamp
+);
index 4998e20fa889e816cb8a283d3b97ad14c5d3351b..e2af4fd90f2c2ffdd8c231610e86deda23c0e0dd 100644 (file)
@@ -1,8 +1,9 @@
-import os, re, weakref, stat, struct
+import os, re, weakref, stat, struct, time
 
 from foolscap import Referenceable
 from twisted.application import service
 from twisted.internet import defer
+from twisted.python import util
 
 from zope.interface import implements
 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
@@ -10,14 +11,17 @@ from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
 from allmydata.util import fileutil, idlib, mathutil
 from allmydata.util.assertutil import precondition
 
+from pysqlite2 import dbapi2 as sqlite
+
 # store/
-# store/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/$STORAGEINDEX/$SHARENUM on success
-# store/$STORAGEINDEX
-# store/$STORAGEINDEX/$SHARENUM
-# store/$STORAGEINDEX/$SHARENUM/blocksize
-# store/$STORAGEINDEX/$SHARENUM/data
-# store/$STORAGEINDEX/$SHARENUM/blockhashes
-# store/$STORAGEINDEX/$SHARENUM/sharehashtree
+# store/owners.db
+# store/shares/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/shares/$STORAGEINDEX/$SHARENUM on success
+# store/shares/$STORAGEINDEX
+# store/shares/$STORAGEINDEX/$SHARENUM
+# store/shares/$STORAGEINDEX/$SHARENUM/blocksize
+# store/shares/$STORAGEINDEX/$SHARENUM/data
+# store/shares/$STORAGEINDEX/$SHARENUM/blockhashes
+# store/shares/$STORAGEINDEX/$SHARENUM/sharehashtree
 
 # $SHARENUM matches this regex:
 NUM_RE=re.compile("[0-9]*")
@@ -75,22 +79,40 @@ class StorageServer(service.MultiService, Referenceable):
 
     def __init__(self, storedir, sizelimit=None, no_storage=False):
         service.MultiService.__init__(self)
-        fileutil.make_dirs(storedir)
         self.storedir = storedir
+        sharedir = os.path.join(storedir, "shares")
+        fileutil.make_dirs(sharedir)
+        self.sharedir = sharedir
         self.sizelimit = sizelimit
         self.no_storage = no_storage
-        self.incomingdir = os.path.join(storedir, 'incoming')
+        self.incomingdir = os.path.join(sharedir, 'incoming')
         self._clean_incomplete()
         fileutil.make_dirs(self.incomingdir)
         self._active_writers = weakref.WeakKeyDictionary()
 
+        self.init_db()
+
         self.measure_size()
 
     def _clean_incomplete(self):
         fileutil.rm_dir(self.incomingdir)
 
+    def init_db(self):
+        # files in storedir with non-zbase32 characters in it (like ".") are
+        # safe, in that they cannot be accessed or overwritten by clients
+        # (whose binary storage_index values are always converted into a
+        # filename with idlib.b2a)
+        db_file = os.path.join(self.storedir, "owners.db")
+        need_to_init_db = not os.path.exists(db_file)
+        self._owner_db_con = sqlite.connect(db_file)
+        self._owner_db_cur = self._owner_db_con.cursor()
+        if need_to_init_db:
+            setup_file = util.sibpath(__file__, "owner.sql")
+            setup = open(setup_file, "r").read()
+            self._owner_db_cur.executescript(setup)
+
     def measure_size(self):
-        self.consumed = fileutil.du(self.storedir)
+        self.consumed = fileutil.du(self.sharedir)
 
     def allocated_size(self):
         space = self.consumed
@@ -112,7 +134,7 @@ class StorageServer(service.MultiService, Referenceable):
             remaining_space = self.sizelimit - self.allocated_size()
         for shnum in sharenums:
             incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
-            finalhome = os.path.join(self.storedir, si_s, "%d" % shnum)
+            finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
             if os.path.exists(incominghome) or os.path.exists(finalhome):
                 alreadygot.add(shnum)
             elif no_limits or remaining_space >= space_per_bucket:
@@ -130,17 +152,127 @@ class StorageServer(service.MultiService, Referenceable):
                 pass
 
         if bucketwriters:
-            fileutil.make_dirs(os.path.join(self.storedir, si_s))
+            fileutil.make_dirs(os.path.join(self.sharedir, si_s))
+
+        # now store the secrets somewhere. This requires a
+        # variable-length-list of (renew,cancel) secret tuples per bucket.
+        # Note that this does not need to be kept inside the share itself, if
+        # packing efficiency is a concern. For this implementation, we use a
+        # sqlite database, which puts everything in a single file.
+        self.add_lease(storage_index, renew_secret, cancel_secret)
 
         return alreadygot, bucketwriters
 
+    def add_lease(self, storage_index, renew_secret, cancel_secret):
+        # is the bucket already in our database?
+        cur = self._owner_db_cur
+        cur.execute("SELECT bucket_id FROM buckets"
+                    " WHERE storage_index = ?",
+                    (storage_index,))
+        res = cur.fetchone()
+        if res:
+            bucket_id = res[0]
+        else:
+            cur.execute("INSERT INTO buckets (storage_index)"
+                        " values(?)", (storage_index,))
+            cur.execute("SELECT bucket_id FROM buckets"
+                        " WHERE storage_index = ?",
+                        (storage_index,))
+            res = cur.fetchone()
+            bucket_id = res[0]
+
+        # what time will this lease expire? One month from now.
+        expire_time = time.time() + 31*24*60*60
+
+        # now, is this lease already in our database? Since we don't have
+        # owners yet, look for a match by renew_secret/cancel_secret
+        cur.execute("SELECT lease_id FROM leases"
+                    " WHERE renew_secret = ? AND cancel_secret = ?",
+                    (renew_secret, cancel_secret))
+        res = cur.fetchone()
+        if res:
+            # yes, so just update the timestamp
+            lease_id = res[0]
+            cur.execute("UPDATE leases"
+                        " SET expire_time = ?"
+                        " WHERE lease_id = ?",
+                        (expire_time, lease_id))
+        else:
+            # no, we need to add the lease
+            cur.execute("INSERT INTO leases "
+                        "(bucket_id, renew_secret, cancel_secret, expire_time)"
+                        " values(?,?,?,?)",
+                        (bucket_id, renew_secret, cancel_secret, expire_time))
+        self._owner_db_con.commit()
+
+    def remote_renew_lease(self, storage_index, renew_secret):
+        # find the lease
+        cur = self._owner_db_cur
+        cur.execute("SELECT leases.lease_id FROM buckets, leases"
+                    " WHERE buckets.storage_index = ?"
+                    "  AND buckets.bucket_id = leases.bucket_id"
+                    "  AND leases.renew_secret = ?",
+                    (storage_index, renew_secret))
+        res = cur.fetchone()
+        if res:
+            # found it, now update it. The new leases will expire one month
+            # from now.
+            expire_time = time.time() + 31*24*60*60
+            lease_id = res[0]
+            cur.execute("UPDATE leases"
+                        " SET expire_time = ?"
+                        " WHERE lease_id = ?",
+                        (expire_time, lease_id))
+        else:
+            # no such lease
+            raise IndexError("No such lease")
+        self._owner_db_con.commit()
+
+    def remote_cancel_lease(self, storage_index, cancel_secret):
+        # find the lease
+        cur = self._owner_db_cur
+        cur.execute("SELECT l.lease_id, b.storage_index, b.bucket_id"
+                    " FROM buckets b, leases l"
+                    " WHERE b.storage_index = ?"
+                    "  AND b.bucket_id = l.bucket_id"
+                    "  AND l.cancel_secret = ?",
+                    (storage_index, cancel_secret))
+        res = cur.fetchone()
+        if res:
+            # found it
+            lease_id, storage_index, bucket_id = res
+            cur.execute("DELETE FROM leases WHERE lease_id = ?",
+                        (lease_id,))
+            # was that the last one?
+            cur.execute("SELECT COUNT(*) FROM leases WHERE bucket_id = ?",
+                        (bucket_id,))
+            res = cur.fetchone()
+            remaining_leases = res[0]
+            if not remaining_leases:
+                # delete the share
+                cur.execute("DELETE FROM buckets WHERE bucket_id = ?",
+                            (bucket_id,))
+                self.delete_bucket(storage_index)
+        else:
+            # no such lease
+            raise IndexError("No such lease")
+        self._owner_db_con.commit()
+
+    def delete_bucket(self, storage_index):
+        storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
+        # measure the usage of this directory, to remove it from our current
+        # total
+        consumed = fileutil.du(storagedir)
+        fileutil.rm_dir(storagedir)
+        self.consumed -= consumed
+
     def bucket_writer_closed(self, bw, consumed_size):
         self.consumed += consumed_size
         del self._active_writers[bw]
 
     def remote_get_buckets(self, storage_index):
         bucketreaders = {} # k: sharenum, v: BucketReader
-        storagedir = os.path.join(self.storedir, idlib.b2a(storage_index))
+        storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
         try:
             for f in os.listdir(storagedir):
                 if NUM_RE.match(f):
index accd66db64d74c8d8240a5e67797f7f3a445959f..e25f733379191594729b157a5792d5668443a72d 100644 (file)
@@ -5,15 +5,12 @@ from twisted.application import service
 from twisted.internet import defer
 from foolscap import Referenceable
 import os.path
+import itertools
 from allmydata import interfaces
 from allmydata.util import fileutil, hashutil
 from allmydata.storage import BucketWriter, BucketReader, \
      WriteBucketProxy, ReadBucketProxy, StorageServer
 
-RS = hashutil.tagged_hash("blah", "foo")
-CS = RS
-
-
 class Bucket(unittest.TestCase):
     def make_workdir(self, name):
         basedir = os.path.join("storage", "Bucket", name)
@@ -167,6 +164,7 @@ class Server(unittest.TestCase):
 
     def setUp(self):
         self.sparent = service.MultiService()
+        self._secret = itertools.count()
     def tearDown(self):
         return self.sparent.stopService()
 
@@ -183,14 +181,20 @@ class Server(unittest.TestCase):
     def test_create(self):
         ss = self.create("test_create")
 
+    def allocate(self, ss, storage_index, sharenums, size):
+        renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
+        cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
+        return ss.remote_allocate_buckets(storage_index,
+                                          renew_secret, cancel_secret,
+                                          sharenums, size, Referenceable())
+
     def test_allocate(self):
         ss = self.create("test_allocate")
 
         self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
 
         canary = Referenceable()
-        already,writers = ss.remote_allocate_buckets("vid", RS, CS, [0,1,2],
-                                                     75, canary)
+        already,writers = self.allocate(ss, "vid", [0,1,2], 75)
         self.failUnlessEqual(already, set())
         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
 
@@ -208,8 +212,7 @@ class Server(unittest.TestCase):
 
         # now if we about writing again, the server should offer those three
         # buckets as already present
-        already,writers = ss.remote_allocate_buckets("vid", RS, CS, [0,1,2,3,4],
-                                                     75, canary)
+        already,writers = self.allocate(ss, "vid", [0,1,2,3,4], 75)
         self.failUnlessEqual(already, set([0,1,2]))
         self.failUnlessEqual(set(writers.keys()), set([3,4]))
 
@@ -217,8 +220,7 @@ class Server(unittest.TestCase):
         # tell new uploaders that they already exist (so that we don't try to
         # upload into them a second time)
 
-        already,writers = ss.remote_allocate_buckets("vid", RS, CS, [2,3,4,5],
-                                                     75, canary)
+        already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
         self.failUnlessEqual(already, set([2,3,4]))
         self.failUnlessEqual(set(writers.keys()), set([5]))
 
@@ -226,15 +228,13 @@ class Server(unittest.TestCase):
         ss = self.create("test_sizelimits", 100)
         canary = Referenceable()
         
-        already,writers = ss.remote_allocate_buckets("vid1", RS, CS, [0,1,2],
-                                                     25, canary)
+        already,writers = self.allocate(ss, "vid1", [0,1,2], 25)
         self.failUnlessEqual(len(writers), 3)
         # now the StorageServer should have 75 bytes provisionally allocated,
         # allowing only 25 more to be claimed
         self.failUnlessEqual(len(ss._active_writers), 3)
 
-        already2,writers2 = ss.remote_allocate_buckets("vid2", RS, CS, [0,1,2],
-                                                       25, canary)
+        already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 25)
         self.failUnlessEqual(len(writers2), 1)
         self.failUnlessEqual(len(ss._active_writers), 4)
 
@@ -255,9 +255,7 @@ class Server(unittest.TestCase):
         self.failUnlessEqual(len(ss._active_writers), 0)
 
         # now there should be 25 bytes allocated, and 75 free
-        already3,writers3 = ss.remote_allocate_buckets("vid3", RS, CS,
-                                                       [0,1,2,3],
-                                                       25, canary)
+        already3,writers3 = self.allocate(ss,"vid3", [0,1,2,3], 25)
         self.failUnlessEqual(len(writers3), 3)
         self.failUnlessEqual(len(ss._active_writers), 3)
 
@@ -272,8 +270,77 @@ class Server(unittest.TestCase):
         # during runtime, so if we were creating any metadata, the allocation
         # would be more than 25 bytes and this test would need to be changed.
         ss = self.create("test_sizelimits", 100)
-        already4,writers4 = ss.remote_allocate_buckets("vid4",
-                                                       RS, CS, [0,1,2,3],
-                                                       25, canary)
+        already4,writers4 = self.allocate(ss, "vid4", [0,1,2,3], 25)
         self.failUnlessEqual(len(writers4), 3)
         self.failUnlessEqual(len(ss._active_writers), 3)
+
+    def test_leases(self):
+        ss = self.create("test_leases")
+        canary = Referenceable()
+        sharenums = range(5)
+        size = 100
+
+        rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
+                   hashutil.tagged_hash("blah", "%d" % self._secret.next()))
+        already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
+                                                     sharenums, size, canary)
+        self.failUnlessEqual(len(already), 0)
+        self.failUnlessEqual(len(writers), 5)
+        for wb in writers.values():
+            wb.remote_close()
+
+        rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
+                   hashutil.tagged_hash("blah", "%d" % self._secret.next()))
+        already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
+                                                     sharenums, size, canary)
+        for wb in writers.values():
+            wb.remote_close()
+
+        # take out a second lease on si1
+        rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
+                   hashutil.tagged_hash("blah", "%d" % self._secret.next()))
+        already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
+                                                     sharenums, size, canary)
+        self.failUnlessEqual(len(already), 5)
+        self.failUnlessEqual(len(writers), 0)
+
+        # check that si0 is readable
+        readers = ss.remote_get_buckets("si0")
+        self.failUnlessEqual(len(readers), 5)
+
+        # renew the first lease. Only the proper renew_secret should work
+        ss.remote_renew_lease("si0", rs0)
+        self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
+        self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
+
+        # check that si0 is still readable
+        readers = ss.remote_get_buckets("si0")
+        self.failUnlessEqual(len(readers), 5)
+
+        # now cancel it
+        self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
+        self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
+        ss.remote_cancel_lease("si0", cs0)
+
+        # si0 should now be gone
+        readers = ss.remote_get_buckets("si0")
+        self.failUnlessEqual(len(readers), 0)
+        # and the renew should no longer work
+        self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
+
+
+        # cancel the first lease on si1, leaving the second in place
+        ss.remote_cancel_lease("si1", cs1)
+        readers = ss.remote_get_buckets("si1")
+        self.failUnlessEqual(len(readers), 5)
+        # the corresponding renew should no longer work
+        self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
+
+        ss.remote_renew_lease("si1", rs2)
+        # cancelling the second should make it go away
+        ss.remote_cancel_lease("si1", cs2)
+        readers = ss.remote_get_buckets("si1")
+        self.failUnlessEqual(len(readers), 0)
+        self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
+        self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
+
index 606839cda9741edf9568b66227afb34fa7377f5d..95e20cbc2fd00542e5ab85d9921fb739bd4eafda 100644 (file)
@@ -616,8 +616,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
             if not filenames:
                 continue
             pieces = dirpath.split(os.sep)
-            if pieces[-2] == "storage":
-                # we're sitting in .../storage/$SINDEX , and there are
+            if pieces[-3] == "storage" and pieces[-2] == "shares":
+                # we're sitting in .../storage/shares/$SINDEX , and there are
                 # sharefiles here
                 filename = os.path.join(dirpath, filenames[0])
                 break