-import os, re, weakref, stat, struct
+import os, re, weakref, stat, struct, time
from foolscap import Referenceable
from twisted.application import service
from twisted.internet import defer
+from twisted.python import util
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
from allmydata.util import fileutil, idlib, mathutil
from allmydata.util.assertutil import precondition
+from pysqlite2 import dbapi2 as sqlite
+
# store/
-# store/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/$STORAGEINDEX/$SHARENUM on success
-# store/$STORAGEINDEX
-# store/$STORAGEINDEX/$SHARENUM
-# store/$STORAGEINDEX/$SHARENUM/blocksize
-# store/$STORAGEINDEX/$SHARENUM/data
-# store/$STORAGEINDEX/$SHARENUM/blockhashes
-# store/$STORAGEINDEX/$SHARENUM/sharehashtree
+# store/owners.db
+# store/shares/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/shares/$STORAGEINDEX/$SHARENUM on success
+# store/shares/$STORAGEINDEX
+# store/shares/$STORAGEINDEX/$SHARENUM
+# store/shares/$STORAGEINDEX/$SHARENUM/blocksize
+# store/shares/$STORAGEINDEX/$SHARENUM/data
+# store/shares/$STORAGEINDEX/$SHARENUM/blockhashes
+# store/shares/$STORAGEINDEX/$SHARENUM/sharehashtree
# $SHARENUM matches this regex:
NUM_RE=re.compile("[0-9]*")
def __init__(self, storedir, sizelimit=None, no_storage=False):
service.MultiService.__init__(self)
- fileutil.make_dirs(storedir)
self.storedir = storedir
+ sharedir = os.path.join(storedir, "shares")
+ fileutil.make_dirs(sharedir)
+ self.sharedir = sharedir
self.sizelimit = sizelimit
self.no_storage = no_storage
- self.incomingdir = os.path.join(storedir, 'incoming')
+ self.incomingdir = os.path.join(sharedir, 'incoming')
self._clean_incomplete()
fileutil.make_dirs(self.incomingdir)
self._active_writers = weakref.WeakKeyDictionary()
+ self.init_db()
+
self.measure_size()
def _clean_incomplete(self):
fileutil.rm_dir(self.incomingdir)
+ def init_db(self):
+ # files in storedir with non-zbase32 characters in it (like ".") are
+ # safe, in that they cannot be accessed or overwritten by clients
+ # (whose binary storage_index values are always converted into a
+ # filename with idlib.b2a)
+ db_file = os.path.join(self.storedir, "owners.db")
+ need_to_init_db = not os.path.exists(db_file)
+ self._owner_db_con = sqlite.connect(db_file)
+ self._owner_db_cur = self._owner_db_con.cursor()
+ if need_to_init_db:
+ setup_file = util.sibpath(__file__, "owner.sql")
+ setup = open(setup_file, "r").read()
+ self._owner_db_cur.executescript(setup)
+
def measure_size(self):
- self.consumed = fileutil.du(self.storedir)
+ self.consumed = fileutil.du(self.sharedir)
def allocated_size(self):
space = self.consumed
remaining_space = self.sizelimit - self.allocated_size()
for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, si_s, "%d" % shnum)
- finalhome = os.path.join(self.storedir, si_s, "%d" % shnum)
+ finalhome = os.path.join(self.sharedir, si_s, "%d" % shnum)
if os.path.exists(incominghome) or os.path.exists(finalhome):
alreadygot.add(shnum)
elif no_limits or remaining_space >= space_per_bucket:
pass
if bucketwriters:
- fileutil.make_dirs(os.path.join(self.storedir, si_s))
+ fileutil.make_dirs(os.path.join(self.sharedir, si_s))
+
+ # now store the secrets somewhere. This requires a
+ # variable-length-list of (renew,cancel) secret tuples per bucket.
+ # Note that this does not need to be kept inside the share itself, if
+ # packing efficiency is a concern. For this implementation, we use a
+ # sqlite database, which puts everything in a single file.
+ self.add_lease(storage_index, renew_secret, cancel_secret)
return alreadygot, bucketwriters
+ def add_lease(self, storage_index, renew_secret, cancel_secret):
+ # is the bucket already in our database?
+ cur = self._owner_db_cur
+ cur.execute("SELECT bucket_id FROM buckets"
+ " WHERE storage_index = ?",
+ (storage_index,))
+ res = cur.fetchone()
+ if res:
+ bucket_id = res[0]
+ else:
+ cur.execute("INSERT INTO buckets (storage_index)"
+ " values(?)", (storage_index,))
+ cur.execute("SELECT bucket_id FROM buckets"
+ " WHERE storage_index = ?",
+ (storage_index,))
+ res = cur.fetchone()
+ bucket_id = res[0]
+
+ # what time will this lease expire? One month from now.
+ expire_time = time.time() + 31*24*60*60
+
+ # now, is this lease already in our database? Since we don't have
+ # owners yet, look for a match by renew_secret/cancel_secret
+ cur.execute("SELECT lease_id FROM leases"
+ " WHERE renew_secret = ? AND cancel_secret = ?",
+ (renew_secret, cancel_secret))
+ res = cur.fetchone()
+ if res:
+ # yes, so just update the timestamp
+ lease_id = res[0]
+ cur.execute("UPDATE leases"
+ " SET expire_time = ?"
+ " WHERE lease_id = ?",
+ (expire_time, lease_id))
+ else:
+ # no, we need to add the lease
+ cur.execute("INSERT INTO leases "
+ "(bucket_id, renew_secret, cancel_secret, expire_time)"
+ " values(?,?,?,?)",
+ (bucket_id, renew_secret, cancel_secret, expire_time))
+ self._owner_db_con.commit()
+
+ def remote_renew_lease(self, storage_index, renew_secret):
+ # find the lease
+ cur = self._owner_db_cur
+ cur.execute("SELECT leases.lease_id FROM buckets, leases"
+ " WHERE buckets.storage_index = ?"
+ " AND buckets.bucket_id = leases.bucket_id"
+ " AND leases.renew_secret = ?",
+ (storage_index, renew_secret))
+ res = cur.fetchone()
+ if res:
+ # found it, now update it. The new leases will expire one month
+ # from now.
+ expire_time = time.time() + 31*24*60*60
+ lease_id = res[0]
+ cur.execute("UPDATE leases"
+ " SET expire_time = ?"
+ " WHERE lease_id = ?",
+ (expire_time, lease_id))
+ else:
+ # no such lease
+ raise IndexError("No such lease")
+ self._owner_db_con.commit()
+
+ def remote_cancel_lease(self, storage_index, cancel_secret):
+ # find the lease
+ cur = self._owner_db_cur
+ cur.execute("SELECT l.lease_id, b.storage_index, b.bucket_id"
+ " FROM buckets b, leases l"
+ " WHERE b.storage_index = ?"
+ " AND b.bucket_id = l.bucket_id"
+ " AND l.cancel_secret = ?",
+ (storage_index, cancel_secret))
+ res = cur.fetchone()
+ if res:
+ # found it
+ lease_id, storage_index, bucket_id = res
+ cur.execute("DELETE FROM leases WHERE lease_id = ?",
+ (lease_id,))
+ # was that the last one?
+ cur.execute("SELECT COUNT(*) FROM leases WHERE bucket_id = ?",
+ (bucket_id,))
+ res = cur.fetchone()
+ remaining_leases = res[0]
+ if not remaining_leases:
+ # delete the share
+ cur.execute("DELETE FROM buckets WHERE bucket_id = ?",
+ (bucket_id,))
+ self.delete_bucket(storage_index)
+ else:
+ # no such lease
+ raise IndexError("No such lease")
+ self._owner_db_con.commit()
+
+ def delete_bucket(self, storage_index):
+ storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
+ # measure the usage of this directory, to remove it from our current
+ # total
+ consumed = fileutil.du(storagedir)
+ fileutil.rm_dir(storagedir)
+ self.consumed -= consumed
+
def bucket_writer_closed(self, bw, consumed_size):
self.consumed += consumed_size
del self._active_writers[bw]
def remote_get_buckets(self, storage_index):
bucketreaders = {} # k: sharenum, v: BucketReader
- storagedir = os.path.join(self.storedir, idlib.b2a(storage_index))
+ storagedir = os.path.join(self.sharedir, idlib.b2a(storage_index))
try:
for f in os.listdir(storagedir):
if NUM_RE.match(f):