From: Brian Warner Date: Tue, 28 Aug 2007 06:41:40 +0000 (-0700) Subject: deletion phase3: add a sqlite database to track renew/cancel-lease secrets, implement... X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/%22doc.html?a=commitdiff_plain;h=2a63fe8b018e34a3f40cf4ba6e76429c6a530fed;p=tahoe-lafs%2Ftahoe-lafs.git deletion phase3: add a sqlite database to track renew/cancel-lease secrets, implement renew/cancel_lease (but nobody calls them yet). Also, move the shares from BASEDIR/storage/* down to BASEDIR/storage/shares/* --- diff --git a/MANIFEST.in b/MANIFEST.in index 3282c071..67695049 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,3 @@ include allmydata/web/*.xhtml allmydata/web/*.html allmydata/web/*.css +include allmydata/*.sql diff --git a/README b/README index 4e495450..a0edc0f3 100644 --- a/README +++ b/README @@ -109,6 +109,8 @@ gcc make python-dev python-twisted python-nevow python-pyopenssl". libraries with the cygwin package management tool, then get the pyOpenSSL source code, cd into it, and run "python ./setup.py install". + + pysqlite3 (database library) + + the pywin32 package: only required on Windows http://sourceforge.net/projects/pywin32/ diff --git a/setup.py b/setup.py index 9d18432d..1b2ed117 100644 --- a/setup.py +++ b/setup.py @@ -90,7 +90,8 @@ setup(name='allmydata-tahoe', ], package_dir={ "allmydata": "src/allmydata",}, scripts = ["bin/allmydata-tahoe"], - package_data={ 'allmydata': ['web/*.xhtml', 'web/*.html', 'web/*.css'] }, + package_data={ 'allmydata': ['web/*.xhtml', 'web/*.html', 'web/*.css', + 'owner.sql'] }, classifiers=trove_classifiers, test_suite="allmydata.test", ext_modules=[ diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 8311bc15..2fb60e9a 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -105,6 +105,20 @@ class RIStorageServer(RemoteInterface): """ return TupleOf(SetOf(int, maxLength=MAX_BUCKETS), DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS)) + + def renew_lease(storage_index=StorageIndex, renew_secret=LeaseRenewSecret): + """ + Renew the lease on a given bucket. Some networks will use this, some + will not. + """ + + def cancel_lease(storage_index=StorageIndex, + cancel_secret=LeaseCancelSecret): + """ + Cancel the lease on a given bucket. If this was the last lease on the + bucket, the bucket will be deleted. + """ + def get_buckets(storage_index=StorageIndex): return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS) diff --git a/src/allmydata/owner.sql b/src/allmydata/owner.sql new file mode 100644 index 00000000..66ecfad9 --- /dev/null +++ b/src/allmydata/owner.sql @@ -0,0 +1,20 @@ +CREATE TABLE buckets +( + bucket_id integer PRIMARY KEY AUTOINCREMENT, + storage_index char(32) +); + +CREATE TABLE owners +( + owner_id integer PRIMARY KEY AUTOINCREMENT +); + +CREATE TABLE leases +( + lease_id integer PRIMARY KEY AUTOINCREMENT, + bucket_id integer, + owner_id integer, + renew_secret char(32), + cancel_secret char(32), + expire_time timestamp +); diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index 4998e20f..e2af4fd9 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -1,8 +1,9 @@ -import os, re, weakref, stat, struct +import os, re, weakref, stat, struct, time from foolscap import Referenceable from twisted.application import service from twisted.internet import defer +from twisted.python import util from zope.interface import implements from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ @@ -10,14 +11,17 @@ from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ from allmydata.util import fileutil, idlib, mathutil from allmydata.util.assertutil import precondition +from pysqlite2 import dbapi2 as sqlite + # store/ -# store/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/$STORAGEINDEX/$SHARENUM on success -# store/$STORAGEINDEX -# store/$STORAGEINDEX/$SHARENUM -# store/$STORAGEINDEX/$SHARENUM/blocksize -# store/$STORAGEINDEX/$SHARENUM/data -# store/$STORAGEINDEX/$SHARENUM/blockhashes -# store/$STORAGEINDEX/$SHARENUM/sharehashtree +# store/owners.db +# store/shares/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/shares/$STORAGEINDEX/$SHARENUM on success +# store/shares/$STORAGEINDEX +# store/shares/$STORAGEINDEX/$SHARENUM +# store/shares/$STORAGEINDEX/$SHARENUM/blocksize +# store/shares/$STORAGEINDEX/$SHARENUM/data +# store/shares/$STORAGEINDEX/$SHARENUM/blockhashes +# store/shares/$STORAGEINDEX/$SHARENUM/sharehashtree # $SHARENUM matches this regex: NUM_RE=re.compile("[0-9]*") @@ -75,22 +79,40 @@ class StorageServer(service.MultiService, Referenceable): def __init__(self, storedir, sizelimit=None, no_storage=False): service.MultiService.__init__(self) - fileutil.make_dirs(storedir) self.storedir = storedir + sharedir = os.path.join(storedir, "shares") + fileutil.make_dirs(sharedir) + self.sharedir = sharedir self.sizelimit = sizelimit self.no_storage = no_storage - self.incomingdir = os.path.join(storedir, 'incoming') + self.incomingdir = os.path.join(sharedir, 'incoming') self._clean_incomplete() fileutil.make_dirs(self.incomingdir) self._active_writers = weakref.WeakKeyDictionary() + self.init_db() + self.measure_size() def _clean_incomplete(self): fileutil.rm_dir(self.incomingdir) + def init_db(self): + # files in storedir with non-zbase32 characters in it (like ".") are + # safe, in that they cannot be accessed or overwritten by clients + # (whose binary storage_index values are always converted into a + # filename with idlib.b2a) + db_file = os.path.join(self.storedir, "owners.db") + need_to_init_db = not os.path.exists(db_file) + self._owner_db_con = sqlite.connect(db_file) + self._owner_db_cur = self._owner_db_con.cursor() + if need_to_init_db: + setup_file = util.sibpath(__file__, "owner.sql") + setup = open(setup_file, "r").read() + self._owner_db_cur.executescript(setup) + def measure_size(self): - self.consumed = fileutil.du(self.storedir) + self.consumed = fileutil.du(self.sharedir) def allocated_size(self): space = self.consumed @@ -112,7 +134,7 @@ class StorageServer(service.MultiService, Referenceable): remaining_space = self.sizelimit - self.allocated_size() for shnum in sharenums: incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum) - finalhome = os.path.join(self.storedir, si_s, "%d" % shnum) + finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum) if os.path.exists(incominghome) or os.path.exists(finalhome): alreadygot.add(shnum) elif no_limits or remaining_space >= space_per_bucket: @@ -130,17 +152,127 @@ class StorageServer(service.MultiService, Referenceable): pass if bucketwriters: - fileutil.make_dirs(os.path.join(self.storedir, si_s)) + fileutil.make_dirs(os.path.join(self.sharedir, si_s)) + + # now store the secrets somewhere. This requires a + # variable-length-list of (renew,cancel) secret tuples per bucket. + # Note that this does not need to be kept inside the share itself, if + # packing efficiency is a concern. For this implementation, we use a + # sqlite database, which puts everything in a single file. + self.add_lease(storage_index, renew_secret, cancel_secret) return alreadygot, bucketwriters + def add_lease(self, storage_index, renew_secret, cancel_secret): + # is the bucket already in our database? + cur = self._owner_db_cur + cur.execute("SELECT bucket_id FROM buckets" + " WHERE storage_index = ?", + (storage_index,)) + res = cur.fetchone() + if res: + bucket_id = res[0] + else: + cur.execute("INSERT INTO buckets (storage_index)" + " values(?)", (storage_index,)) + cur.execute("SELECT bucket_id FROM buckets" + " WHERE storage_index = ?", + (storage_index,)) + res = cur.fetchone() + bucket_id = res[0] + + # what time will this lease expire? One month from now. + expire_time = time.time() + 31*24*60*60 + + # now, is this lease already in our database? Since we don't have + # owners yet, look for a match by renew_secret/cancel_secret + cur.execute("SELECT lease_id FROM leases" + " WHERE renew_secret = ? AND cancel_secret = ?", + (renew_secret, cancel_secret)) + res = cur.fetchone() + if res: + # yes, so just update the timestamp + lease_id = res[0] + cur.execute("UPDATE leases" + " SET expire_time = ?" + " WHERE lease_id = ?", + (expire_time, lease_id)) + else: + # no, we need to add the lease + cur.execute("INSERT INTO leases " + "(bucket_id, renew_secret, cancel_secret, expire_time)" + " values(?,?,?,?)", + (bucket_id, renew_secret, cancel_secret, expire_time)) + self._owner_db_con.commit() + + def remote_renew_lease(self, storage_index, renew_secret): + # find the lease + cur = self._owner_db_cur + cur.execute("SELECT leases.lease_id FROM buckets, leases" + " WHERE buckets.storage_index = ?" + " AND buckets.bucket_id = leases.bucket_id" + " AND leases.renew_secret = ?", + (storage_index, renew_secret)) + res = cur.fetchone() + if res: + # found it, now update it. The new leases will expire one month + # from now. + expire_time = time.time() + 31*24*60*60 + lease_id = res[0] + cur.execute("UPDATE leases" + " SET expire_time = ?" + " WHERE lease_id = ?", + (expire_time, lease_id)) + else: + # no such lease + raise IndexError("No such lease") + self._owner_db_con.commit() + + def remote_cancel_lease(self, storage_index, cancel_secret): + # find the lease + cur = self._owner_db_cur + cur.execute("SELECT l.lease_id, b.storage_index, b.bucket_id" + " FROM buckets b, leases l" + " WHERE b.storage_index = ?" + " AND b.bucket_id = l.bucket_id" + " AND l.cancel_secret = ?", + (storage_index, cancel_secret)) + res = cur.fetchone() + if res: + # found it + lease_id, storage_index, bucket_id = res + cur.execute("DELETE FROM leases WHERE lease_id = ?", + (lease_id,)) + # was that the last one? + cur.execute("SELECT COUNT(*) FROM leases WHERE bucket_id = ?", + (bucket_id,)) + res = cur.fetchone() + remaining_leases = res[0] + if not remaining_leases: + # delete the share + cur.execute("DELETE FROM buckets WHERE bucket_id = ?", + (bucket_id,)) + self.delete_bucket(storage_index) + else: + # no such lease + raise IndexError("No such lease") + self._owner_db_con.commit() + + def delete_bucket(self, storage_index): + storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index)) + # measure the usage of this directory, to remove it from our current + # total + consumed = fileutil.du(storagedir) + fileutil.rm_dir(storagedir) + self.consumed -= consumed + def bucket_writer_closed(self, bw, consumed_size): self.consumed += consumed_size del self._active_writers[bw] def remote_get_buckets(self, storage_index): bucketreaders = {} # k: sharenum, v: BucketReader - storagedir = os.path.join(self.storedir, idlib.b2a(storage_index)) + storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index)) try: for f in os.listdir(storagedir): if NUM_RE.match(f): diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index accd66db..e25f7333 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -5,15 +5,12 @@ from twisted.application import service from twisted.internet import defer from foolscap import Referenceable import os.path +import itertools from allmydata import interfaces from allmydata.util import fileutil, hashutil from allmydata.storage import BucketWriter, BucketReader, \ WriteBucketProxy, ReadBucketProxy, StorageServer -RS = hashutil.tagged_hash("blah", "foo") -CS = RS - - class Bucket(unittest.TestCase): def make_workdir(self, name): basedir = os.path.join("storage", "Bucket", name) @@ -167,6 +164,7 @@ class Server(unittest.TestCase): def setUp(self): self.sparent = service.MultiService() + self._secret = itertools.count() def tearDown(self): return self.sparent.stopService() @@ -183,14 +181,20 @@ class Server(unittest.TestCase): def test_create(self): ss = self.create("test_create") + def allocate(self, ss, storage_index, sharenums, size): + renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next()) + cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next()) + return ss.remote_allocate_buckets(storage_index, + renew_secret, cancel_secret, + sharenums, size, Referenceable()) + def test_allocate(self): ss = self.create("test_allocate") self.failUnlessEqual(ss.remote_get_buckets("vid"), {}) canary = Referenceable() - already,writers = ss.remote_allocate_buckets("vid", RS, CS, [0,1,2], - 75, canary) + already,writers = self.allocate(ss, "vid", [0,1,2], 75) self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([0,1,2])) @@ -208,8 +212,7 @@ class Server(unittest.TestCase): # now if we about writing again, the server should offer those three # buckets as already present - already,writers = ss.remote_allocate_buckets("vid", RS, CS, [0,1,2,3,4], - 75, canary) + already,writers = self.allocate(ss, "vid", [0,1,2,3,4], 75) self.failUnlessEqual(already, set([0,1,2])) self.failUnlessEqual(set(writers.keys()), set([3,4])) @@ -217,8 +220,7 @@ class Server(unittest.TestCase): # tell new uploaders that they already exist (so that we don't try to # upload into them a second time) - already,writers = ss.remote_allocate_buckets("vid", RS, CS, [2,3,4,5], - 75, canary) + already,writers = self.allocate(ss, "vid", [2,3,4,5], 75) self.failUnlessEqual(already, set([2,3,4])) self.failUnlessEqual(set(writers.keys()), set([5])) @@ -226,15 +228,13 @@ class Server(unittest.TestCase): ss = self.create("test_sizelimits", 100) canary = Referenceable() - already,writers = ss.remote_allocate_buckets("vid1", RS, CS, [0,1,2], - 25, canary) + already,writers = self.allocate(ss, "vid1", [0,1,2], 25) self.failUnlessEqual(len(writers), 3) # now the StorageServer should have 75 bytes provisionally allocated, # allowing only 25 more to be claimed self.failUnlessEqual(len(ss._active_writers), 3) - already2,writers2 = ss.remote_allocate_buckets("vid2", RS, CS, [0,1,2], - 25, canary) + already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 25) self.failUnlessEqual(len(writers2), 1) self.failUnlessEqual(len(ss._active_writers), 4) @@ -255,9 +255,7 @@ class Server(unittest.TestCase): self.failUnlessEqual(len(ss._active_writers), 0) # now there should be 25 bytes allocated, and 75 free - already3,writers3 = ss.remote_allocate_buckets("vid3", RS, CS, - [0,1,2,3], - 25, canary) + already3,writers3 = self.allocate(ss,"vid3", [0,1,2,3], 25) self.failUnlessEqual(len(writers3), 3) self.failUnlessEqual(len(ss._active_writers), 3) @@ -272,8 +270,77 @@ class Server(unittest.TestCase): # during runtime, so if we were creating any metadata, the allocation # would be more than 25 bytes and this test would need to be changed. ss = self.create("test_sizelimits", 100) - already4,writers4 = ss.remote_allocate_buckets("vid4", - RS, CS, [0,1,2,3], - 25, canary) + already4,writers4 = self.allocate(ss, "vid4", [0,1,2,3], 25) self.failUnlessEqual(len(writers4), 3) self.failUnlessEqual(len(ss._active_writers), 3) + + def test_leases(self): + ss = self.create("test_leases") + canary = Referenceable() + sharenums = range(5) + size = 100 + + rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()), + hashutil.tagged_hash("blah", "%d" % self._secret.next())) + already,writers = ss.remote_allocate_buckets("si0", rs0, cs0, + sharenums, size, canary) + self.failUnlessEqual(len(already), 0) + self.failUnlessEqual(len(writers), 5) + for wb in writers.values(): + wb.remote_close() + + rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()), + hashutil.tagged_hash("blah", "%d" % self._secret.next())) + already,writers = ss.remote_allocate_buckets("si1", rs1, cs1, + sharenums, size, canary) + for wb in writers.values(): + wb.remote_close() + + # take out a second lease on si1 + rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()), + hashutil.tagged_hash("blah", "%d" % self._secret.next())) + already,writers = ss.remote_allocate_buckets("si1", rs2, cs2, + sharenums, size, canary) + self.failUnlessEqual(len(already), 5) + self.failUnlessEqual(len(writers), 0) + + # check that si0 is readable + readers = ss.remote_get_buckets("si0") + self.failUnlessEqual(len(readers), 5) + + # renew the first lease. Only the proper renew_secret should work + ss.remote_renew_lease("si0", rs0) + self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0) + self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1) + + # check that si0 is still readable + readers = ss.remote_get_buckets("si0") + self.failUnlessEqual(len(readers), 5) + + # now cancel it + self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0) + self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1) + ss.remote_cancel_lease("si0", cs0) + + # si0 should now be gone + readers = ss.remote_get_buckets("si0") + self.failUnlessEqual(len(readers), 0) + # and the renew should no longer work + self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0) + + + # cancel the first lease on si1, leaving the second in place + ss.remote_cancel_lease("si1", cs1) + readers = ss.remote_get_buckets("si1") + self.failUnlessEqual(len(readers), 5) + # the corresponding renew should no longer work + self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1) + + ss.remote_renew_lease("si1", rs2) + # cancelling the second should make it go away + ss.remote_cancel_lease("si1", cs2) + readers = ss.remote_get_buckets("si1") + self.failUnlessEqual(len(readers), 0) + self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1) + self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2) + diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 606839cd..95e20cbc 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -616,8 +616,8 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): if not filenames: continue pieces = dirpath.split(os.sep) - if pieces[-2] == "storage": - # we're sitting in .../storage/$SINDEX , and there are + if pieces[-3] == "storage" and pieces[-2] == "shares": + # we're sitting in .../storage/shares/$SINDEX , and there are # sharefiles here filename = os.path.join(dirpath, filenames[0]) break