include allmydata/web/*.xhtml allmydata/web/*.html allmydata/web/*.css
+include allmydata/*.sql
libraries with the cygwin package management tool, then get the pyOpenSSL
source code, cd into it, and run "python ./setup.py install".
+ + pysqlite3 (database library)
+
+ the pywin32 package: only required on Windows
http://sourceforge.net/projects/pywin32/
],
package_dir={ "allmydata": "src/allmydata",},
scripts = ["bin/allmydata-tahoe"],
- package_data={ 'allmydata': ['web/*.xhtml', 'web/*.html', 'web/*.css'] },
+ package_data={ 'allmydata': ['web/*.xhtml', 'web/*.html', 'web/*.css',
+ 'owner.sql'] },
classifiers=trove_classifiers,
test_suite="allmydata.test",
ext_modules=[
"""
return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
+
+ def renew_lease(storage_index=StorageIndex, renew_secret=LeaseRenewSecret):
+ """
+ Renew the lease on a given bucket. Some networks will use this, some
+ will not.
+ """
+
+ def cancel_lease(storage_index=StorageIndex,
+ cancel_secret=LeaseCancelSecret):
+ """
+ Cancel the lease on a given bucket. If this was the last lease on the
+ bucket, the bucket will be deleted.
+ """
+
def get_buckets(storage_index=StorageIndex):
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
--- /dev/null
+CREATE TABLE buckets
+(
+ bucket_id integer PRIMARY KEY AUTOINCREMENT,
+ storage_index char(32)
+);
+
+CREATE TABLE owners
+(
+ owner_id integer PRIMARY KEY AUTOINCREMENT
+);
+
+CREATE TABLE leases
+(
+ lease_id integer PRIMARY KEY AUTOINCREMENT,
+ bucket_id integer,
+ owner_id integer,
+ renew_secret char(32),
+ cancel_secret char(32),
+ expire_time timestamp
+);
-import os, re, weakref, stat, struct
+import os, re, weakref, stat, struct, time
from foolscap import Referenceable
from twisted.application import service
from twisted.internet import defer
+from twisted.python import util
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
from allmydata.util import fileutil, idlib, mathutil
from allmydata.util.assertutil import precondition
+from pysqlite2 import dbapi2 as sqlite
+
# store/
-# store/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/$STORAGEINDEX/$SHARENUM on success
-# store/$STORAGEINDEX
-# store/$STORAGEINDEX/$SHARENUM
-# store/$STORAGEINDEX/$SHARENUM/blocksize
-# store/$STORAGEINDEX/$SHARENUM/data
-# store/$STORAGEINDEX/$SHARENUM/blockhashes
-# store/$STORAGEINDEX/$SHARENUM/sharehashtree
+# store/owners.db
+# store/shares/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/shares/$STORAGEINDEX/$SHARENUM on success
+# store/shares/$STORAGEINDEX
+# store/shares/$STORAGEINDEX/$SHARENUM
+# store/shares/$STORAGEINDEX/$SHARENUM/blocksize
+# store/shares/$STORAGEINDEX/$SHARENUM/data
+# store/shares/$STORAGEINDEX/$SHARENUM/blockhashes
+# store/shares/$STORAGEINDEX/$SHARENUM/sharehashtree
# $SHARENUM matches this regex:
NUM_RE=re.compile("[0-9]*")
def __init__(self, storedir, sizelimit=None, no_storage=False):
service.MultiService.__init__(self)
- fileutil.make_dirs(storedir)
self.storedir = storedir
+ sharedir = os.path.join(storedir, "shares")
+ fileutil.make_dirs(sharedir)
+ self.sharedir = sharedir
self.sizelimit = sizelimit
self.no_storage = no_storage
- self.incomingdir = os.path.join(storedir, 'incoming')
+ self.incomingdir = os.path.join(sharedir, 'incoming')
self._clean_incomplete()
fileutil.make_dirs(self.incomingdir)
self._active_writers = weakref.WeakKeyDictionary()
+ self.init_db()
+
self.measure_size()
def _clean_incomplete(self):
fileutil.rm_dir(self.incomingdir)
+ def init_db(self):
+ # files in storedir with non-zbase32 characters in it (like ".") are
+ # safe, in that they cannot be accessed or overwritten by clients
+ # (whose binary storage_index values are always converted into a
+ # filename with idlib.b2a)
+ db_file = os.path.join(self.storedir, "owners.db")
+ need_to_init_db = not os.path.exists(db_file)
+ self._owner_db_con = sqlite.connect(db_file)
+ self._owner_db_cur = self._owner_db_con.cursor()
+ if need_to_init_db:
+ setup_file = util.sibpath(__file__, "owner.sql")
+ setup = open(setup_file, "r").read()
+ self._owner_db_cur.executescript(setup)
+
def measure_size(self):
- self.consumed = fileutil.du(self.storedir)
+ self.consumed = fileutil.du(self.sharedir)
def allocated_size(self):
space = self.consumed
remaining_space = self.sizelimit - self.allocated_size()
for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
- finalhome = os.path.join(self.storedir, si_s, "%d" % shnum)
+ finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
if os.path.exists(incominghome) or os.path.exists(finalhome):
alreadygot.add(shnum)
elif no_limits or remaining_space >= space_per_bucket:
pass
if bucketwriters:
- fileutil.make_dirs(os.path.join(self.storedir, si_s))
+ fileutil.make_dirs(os.path.join(self.sharedir, si_s))
+
+ # now store the secrets somewhere. This requires a
+ # variable-length-list of (renew,cancel) secret tuples per bucket.
+ # Note that this does not need to be kept inside the share itself, if
+ # packing efficiency is a concern. For this implementation, we use a
+ # sqlite database, which puts everything in a single file.
+ self.add_lease(storage_index, renew_secret, cancel_secret)
return alreadygot, bucketwriters
+ def add_lease(self, storage_index, renew_secret, cancel_secret):
+ # is the bucket already in our database?
+ cur = self._owner_db_cur
+ cur.execute("SELECT bucket_id FROM buckets"
+ " WHERE storage_index = ?",
+ (storage_index,))
+ res = cur.fetchone()
+ if res:
+ bucket_id = res[0]
+ else:
+ cur.execute("INSERT INTO buckets (storage_index)"
+ " values(?)", (storage_index,))
+ cur.execute("SELECT bucket_id FROM buckets"
+ " WHERE storage_index = ?",
+ (storage_index,))
+ res = cur.fetchone()
+ bucket_id = res[0]
+
+ # what time will this lease expire? One month from now.
+ expire_time = time.time() + 31*24*60*60
+
+ # now, is this lease already in our database? Since we don't have
+ # owners yet, look for a match by renew_secret/cancel_secret
+ cur.execute("SELECT lease_id FROM leases"
+ " WHERE renew_secret = ? AND cancel_secret = ?",
+ (renew_secret, cancel_secret))
+ res = cur.fetchone()
+ if res:
+ # yes, so just update the timestamp
+ lease_id = res[0]
+ cur.execute("UPDATE leases"
+ " SET expire_time = ?"
+ " WHERE lease_id = ?",
+ (expire_time, lease_id))
+ else:
+ # no, we need to add the lease
+ cur.execute("INSERT INTO leases "
+ "(bucket_id, renew_secret, cancel_secret, expire_time)"
+ " values(?,?,?,?)",
+ (bucket_id, renew_secret, cancel_secret, expire_time))
+ self._owner_db_con.commit()
+
+ def remote_renew_lease(self, storage_index, renew_secret):
+ # find the lease
+ cur = self._owner_db_cur
+ cur.execute("SELECT leases.lease_id FROM buckets, leases"
+ " WHERE buckets.storage_index = ?"
+ " AND buckets.bucket_id = leases.bucket_id"
+ " AND leases.renew_secret = ?",
+ (storage_index, renew_secret))
+ res = cur.fetchone()
+ if res:
+ # found it, now update it. The new leases will expire one month
+ # from now.
+ expire_time = time.time() + 31*24*60*60
+ lease_id = res[0]
+ cur.execute("UPDATE leases"
+ " SET expire_time = ?"
+ " WHERE lease_id = ?",
+ (expire_time, lease_id))
+ else:
+ # no such lease
+ raise IndexError("No such lease")
+ self._owner_db_con.commit()
+
+ def remote_cancel_lease(self, storage_index, cancel_secret):
+ # find the lease
+ cur = self._owner_db_cur
+ cur.execute("SELECT l.lease_id, b.storage_index, b.bucket_id"
+ " FROM buckets b, leases l"
+ " WHERE b.storage_index = ?"
+ " AND b.bucket_id = l.bucket_id"
+ " AND l.cancel_secret = ?",
+ (storage_index, cancel_secret))
+ res = cur.fetchone()
+ if res:
+ # found it
+ lease_id, storage_index, bucket_id = res
+ cur.execute("DELETE FROM leases WHERE lease_id = ?",
+ (lease_id,))
+ # was that the last one?
+ cur.execute("SELECT COUNT(*) FROM leases WHERE bucket_id = ?",
+ (bucket_id,))
+ res = cur.fetchone()
+ remaining_leases = res[0]
+ if not remaining_leases:
+ # delete the share
+ cur.execute("DELETE FROM buckets WHERE bucket_id = ?",
+ (bucket_id,))
+ self.delete_bucket(storage_index)
+ else:
+ # no such lease
+ raise IndexError("No such lease")
+ self._owner_db_con.commit()
+
+ def delete_bucket(self, storage_index):
+ storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
+ # measure the usage of this directory, to remove it from our current
+ # total
+ consumed = fileutil.du(storagedir)
+ fileutil.rm_dir(storagedir)
+ self.consumed -= consumed
+
def bucket_writer_closed(self, bw, consumed_size):
self.consumed += consumed_size
del self._active_writers[bw]
def remote_get_buckets(self, storage_index):
bucketreaders = {} # k: sharenum, v: BucketReader
- storagedir = os.path.join(self.storedir, idlib.b2a(storage_index))
+ storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
try:
for f in os.listdir(storagedir):
if NUM_RE.match(f):
from twisted.internet import defer
from foolscap import Referenceable
import os.path
+import itertools
from allmydata import interfaces
from allmydata.util import fileutil, hashutil
from allmydata.storage import BucketWriter, BucketReader, \
WriteBucketProxy, ReadBucketProxy, StorageServer
-RS = hashutil.tagged_hash("blah", "foo")
-CS = RS
-
-
class Bucket(unittest.TestCase):
def make_workdir(self, name):
basedir = os.path.join("storage", "Bucket", name)
def setUp(self):
self.sparent = service.MultiService()
+ self._secret = itertools.count()
def tearDown(self):
return self.sparent.stopService()
def test_create(self):
ss = self.create("test_create")
+ def allocate(self, ss, storage_index, sharenums, size):
+ renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
+ cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
+ return ss.remote_allocate_buckets(storage_index,
+ renew_secret, cancel_secret,
+ sharenums, size, Referenceable())
+
def test_allocate(self):
ss = self.create("test_allocate")
self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
canary = Referenceable()
- already,writers = ss.remote_allocate_buckets("vid", RS, CS, [0,1,2],
- 75, canary)
+ already,writers = self.allocate(ss, "vid", [0,1,2], 75)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
# now if we about writing again, the server should offer those three
# buckets as already present
- already,writers = ss.remote_allocate_buckets("vid", RS, CS, [0,1,2,3,4],
- 75, canary)
+ already,writers = self.allocate(ss, "vid", [0,1,2,3,4], 75)
self.failUnlessEqual(already, set([0,1,2]))
self.failUnlessEqual(set(writers.keys()), set([3,4]))
# tell new uploaders that they already exist (so that we don't try to
# upload into them a second time)
- already,writers = ss.remote_allocate_buckets("vid", RS, CS, [2,3,4,5],
- 75, canary)
+ already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
self.failUnlessEqual(already, set([2,3,4]))
self.failUnlessEqual(set(writers.keys()), set([5]))
ss = self.create("test_sizelimits", 100)
canary = Referenceable()
- already,writers = ss.remote_allocate_buckets("vid1", RS, CS, [0,1,2],
- 25, canary)
+ already,writers = self.allocate(ss, "vid1", [0,1,2], 25)
self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 75 bytes provisionally allocated,
# allowing only 25 more to be claimed
self.failUnlessEqual(len(ss._active_writers), 3)
- already2,writers2 = ss.remote_allocate_buckets("vid2", RS, CS, [0,1,2],
- 25, canary)
+ already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 25)
self.failUnlessEqual(len(writers2), 1)
self.failUnlessEqual(len(ss._active_writers), 4)
self.failUnlessEqual(len(ss._active_writers), 0)
# now there should be 25 bytes allocated, and 75 free
- already3,writers3 = ss.remote_allocate_buckets("vid3", RS, CS,
- [0,1,2,3],
- 25, canary)
+ already3,writers3 = self.allocate(ss,"vid3", [0,1,2,3], 25)
self.failUnlessEqual(len(writers3), 3)
self.failUnlessEqual(len(ss._active_writers), 3)
# during runtime, so if we were creating any metadata, the allocation
# would be more than 25 bytes and this test would need to be changed.
ss = self.create("test_sizelimits", 100)
- already4,writers4 = ss.remote_allocate_buckets("vid4",
- RS, CS, [0,1,2,3],
- 25, canary)
+ already4,writers4 = self.allocate(ss, "vid4", [0,1,2,3], 25)
self.failUnlessEqual(len(writers4), 3)
self.failUnlessEqual(len(ss._active_writers), 3)
+
+ def test_leases(self):
+ ss = self.create("test_leases")
+ canary = Referenceable()
+ sharenums = range(5)
+ size = 100
+
+ rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
+ hashutil.tagged_hash("blah", "%d" % self._secret.next()))
+ already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
+ sharenums, size, canary)
+ self.failUnlessEqual(len(already), 0)
+ self.failUnlessEqual(len(writers), 5)
+ for wb in writers.values():
+ wb.remote_close()
+
+ rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
+ hashutil.tagged_hash("blah", "%d" % self._secret.next()))
+ already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
+ sharenums, size, canary)
+ for wb in writers.values():
+ wb.remote_close()
+
+ # take out a second lease on si1
+ rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
+ hashutil.tagged_hash("blah", "%d" % self._secret.next()))
+ already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
+ sharenums, size, canary)
+ self.failUnlessEqual(len(already), 5)
+ self.failUnlessEqual(len(writers), 0)
+
+ # check that si0 is readable
+ readers = ss.remote_get_buckets("si0")
+ self.failUnlessEqual(len(readers), 5)
+
+ # renew the first lease. Only the proper renew_secret should work
+ ss.remote_renew_lease("si0", rs0)
+ self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
+ self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
+
+ # check that si0 is still readable
+ readers = ss.remote_get_buckets("si0")
+ self.failUnlessEqual(len(readers), 5)
+
+ # now cancel it
+ self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
+ self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
+ ss.remote_cancel_lease("si0", cs0)
+
+ # si0 should now be gone
+ readers = ss.remote_get_buckets("si0")
+ self.failUnlessEqual(len(readers), 0)
+ # and the renew should no longer work
+ self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
+
+
+ # cancel the first lease on si1, leaving the second in place
+ ss.remote_cancel_lease("si1", cs1)
+ readers = ss.remote_get_buckets("si1")
+ self.failUnlessEqual(len(readers), 5)
+ # the corresponding renew should no longer work
+ self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
+
+ ss.remote_renew_lease("si1", rs2)
+ # cancelling the second should make it go away
+ ss.remote_cancel_lease("si1", cs2)
+ readers = ss.remote_get_buckets("si1")
+ self.failUnlessEqual(len(readers), 0)
+ self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
+ self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
+
if not filenames:
continue
pieces = dirpath.split(os.sep)
- if pieces[-2] == "storage":
- # we're sitting in .../storage/$SINDEX , and there are
+ if pieces[-3] == "storage" and pieces[-2] == "shares":
+ # we're sitting in .../storage/shares/$SINDEX , and there are
# sharefiles here
filename = os.path.join(dirpath, filenames[0])
break