--- /dev/null
+
+"""
+This file contains the client-facing interface for manipulating shares. It
+implements RIStorageServer, and contains an embedded owner id which is used
+for all operations that touch leases. Initially, clients will receive a
+special 'anonymous' instance of this class with ownerid=0. Later, when the
+FURLification dance is established, each client will get a different instance
+(with a dedicated ownerid).
+"""
+
+import time
+
+from foolscap.api import Referenceable
+
+from zope.interface import implements
+from allmydata.interfaces import RIStorageServer
+
+from allmydata.storage.leasedb import int_or_none
+from allmydata.storage.common import si_b2a
+
+
+class Account(Referenceable):
+ implements(RIStorageServer)
+
+ def __init__(self, owner_num, pubkey_vs, server, leasedb):
+ self.owner_num = owner_num
+ self.server = server
+ self._leasedb = leasedb
+ # for static accounts ("starter", "anonymous"), pubkey_vs is None,
+ # and the "connected" attributes are unused
+ self.pubkey_vs = pubkey_vs
+ self.connected = False
+ self.connected_since = None
+ self.connection = None
+ self.debug = False
+
+ def is_static(self):
+ return self.owner_num in (0,1)
+
+ # these methods are called by StorageServer
+
+ def get_owner_num(self):
+ return self.owner_num
+
+ def get_renewal_and_expiration_times(self):
+ renewal_time = time.time()
+ return (renewal_time, renewal_time + 31*24*60*60)
+
+ # immutable.BucketWriter.close() does:
+ # add_share(), add_or_renew_lease(), mark_share_as_stable()
+
+ # mutable writev() does:
+ # deleted shares: mark_share_as_going(), remove_share_and_leases()
+ # new shares: add_share(), add_or_renew_lease(), mark_share_as_stable()
+ # changed shares: change_share_space(), add_or_renew_lease()
+
+ def add_share(self, storage_index, shnum, used_space, sharetype):
+ if self.debug: print "ADD_SHARE", si_b2a(storage_index), shnum, used_space, sharetype
+ self._leasedb.add_new_share(storage_index, shnum, used_space, sharetype)
+
+ def add_or_renew_default_lease(self, storage_index, shnum):
+ renewal_time, expiration_time = self.get_renewal_and_expiration_times()
+ return self.add_or_renew_lease(storage_index, shnum, renewal_time, expiration_time)
+
+ def add_or_renew_lease(self, storage_index, shnum, renewal_time, expiration_time):
+ if self.debug: print "ADD_OR_RENEW_LEASE", si_b2a(storage_index), shnum
+ self._leasedb.add_or_renew_leases(storage_index, shnum, self.owner_num,
+ renewal_time, expiration_time)
+
+ def change_share_space(self, storage_index, shnum, used_space):
+ if self.debug: print "CHANGE_SHARE_SPACE", si_b2a(storage_index), shnum, used_space
+ self._leasedb.change_share_space(storage_index, shnum, used_space)
+
+ def mark_share_as_stable(self, storage_index, shnum, used_space):
+ if self.debug: print "MARK_SHARE_AS_STABLE", si_b2a(storage_index), shnum, used_space
+ self._leasedb.mark_share_as_stable(storage_index, shnum, used_space)
+
+ def mark_share_as_going(self, storage_index, shnum):
+ if self.debug: print "MARK_SHARE_AS_GOING", si_b2a(storage_index), shnum
+ self._leasedb.mark_share_as_going(storage_index, shnum)
+
+ def remove_share_and_leases(self, storage_index, shnum):
+ if self.debug: print "REMOVE_SHARE_AND_LEASES", si_b2a(storage_index), shnum
+ self._leasedb.remove_deleted_share(storage_index, shnum)
+
+ # remote_add_lease() and remote_renew_lease() do this
+ def add_lease_for_bucket(self, storage_index):
+ if self.debug: print "ADD_LEASE_FOR_BUCKET", si_b2a(storage_index)
+ renewal_time, expiration_time = self.get_renewal_and_expiration_times()
+ self._leasedb.add_or_renew_leases(storage_index, None,
+ self.owner_num, renewal_time, expiration_time)
+
+ # The following RIStorageServer methods are called by remote clients
+
+ def remote_get_version(self):
+ return self.server.client_get_version(self)
+
+ # all other RIStorageServer methods should pass through to self.server
+ # but (except for remote_advise_corrupt_share) add the account as a final
+ # argument.
+
+ def remote_allocate_buckets(self, storage_index, renew_secret, cancel_secret,
+ sharenums, allocated_size, canary):
+ if self.debug: print "REMOTE_ALLOCATE_BUCKETS", si_b2a(storage_index)
+ return self.server.client_allocate_buckets(storage_index,
+ sharenums, allocated_size,
+ canary, self)
+
+ def remote_add_lease(self, storage_index, renew_secret, cancel_secret):
+ if self.debug: print "REMOTE_ADD_LEASE", si_b2a(storage_index)
+ self.add_lease_for_bucket(storage_index)
+ return None
+
+ def remote_renew_lease(self, storage_index, renew_secret):
+ self.add_lease_for_bucket(storage_index)
+ return None
+
+ def remote_get_buckets(self, storage_index):
+ return self.server.client_get_buckets(storage_index)
+
+ def remote_slot_testv_and_readv_and_writev(self, storage_index, secrets,
+ test_and_write_vectors, read_vector):
+ write_enabler = secrets[0]
+ return self.server.client_slot_testv_and_readv_and_writev(
+ storage_index, write_enabler, test_and_write_vectors, read_vector, self)
+
+ def remote_slot_readv(self, storage_index, shares, readv):
+ return self.server.client_slot_readv(storage_index, shares, readv, self)
+
+ def remote_advise_corrupt_share(self, share_type, storage_index, shnum, reason):
+ # this doesn't use the account.
+ return self.server.client_advise_corrupt_share(
+ share_type, storage_index, shnum, reason)
+
+ def get_account_creation_time(self):
+ return self._leasedb.get_account_creation_time(self.owner_num)
+
+ def get_id(self):
+ return self.pubkey_vs
+
+ def get_leases(self, storage_index):
+ return self._leasedb.get_leases(storage_index, self.owner_num)
+
+ def connection_from(self, rx):
+ self.connected = True
+ self.connected_since = time.time()
+ self.connection = rx
+ #rhost = rx.getPeer()
+ #from twisted.internet import address
+ #if isinstance(rhost, address.IPv4Address):
+ # rhost_s = "%s:%d" % (rhost.host, rhost.port)
+ #elif "LoopbackAddress" in str(rhost):
+ # rhost_s = "loopback"
+ #else:
+ # rhost_s = str(rhost)
+ #self.set_account_attribute("last_connected_from", rhost_s)
+ rx.notifyOnDisconnect(self._disconnected)
+
+ def _disconnected(self):
+ self.connected = False
+ self.connected_since = None
+ self.connection = None
+ #self.set_account_attribute("last_seen", int(time.time()))
+ self.disconnected_since = None
+
+ def get_connection_status(self):
+ # starts as: connected=False, connected_since=None,
+ # last_connected_from=None, last_seen=None
+ # while connected: connected=True, connected_since=START,
+ # last_connected_from=HOST, last_seen=IGNOREME
+ # after disconnect: connected=False, connected_since=None,
+ # last_connected_from=HOST, last_seen=STOP
+
+ #last_seen = int_or_none(self.get_account_attribute("last_seen"))
+ #last_connected_from = self.get_account_attribute("last_connected_from")
+ created = int_or_none(self.get_account_creation_time())
+
+ return {"connected": self.connected,
+ "connected_since": self.connected_since,
+ #"last_connected_from": last_connected_from,
+ #"last_seen": last_seen,
+ "created": created,
+ }
+
+ def get_stats(self):
+ return self.server.get_stats()
+
+ def get_accounting_crawler(self):
+ return self.server.get_accounting_crawler()
+
+ def get_expiration_policy(self):
+ return self.server.get_expiration_policy()
+
+ def get_bucket_counter(self):
+ return self.server.get_bucket_counter()
+
+ def get_nodeid(self):
+ return self.server.get_nodeid()
--- /dev/null
+
+"""
+This file contains the cross-account management code. It creates per-client
+Account objects for the FURLification dance, as well as the 'anonymous
+account for use until the server admin decides to make accounting mandatory.
+It also provides usage statistics and reports for the status UI. This will
+also implement the backend of the control UI (once we figure out how to
+express that: maybe a CLI command, or tahoe.cfg settings, or a web frontend),
+for things like enabling/disabling accounts and setting quotas.
+
+The name 'accountant.py' could be better, preferably something that doesn't
+share a prefix with 'account.py' so my tab-autocomplete will work nicely.
+"""
+
+import weakref
+
+from twisted.application import service
+
+from allmydata.storage.leasedb import LeaseDB
+from allmydata.storage.accounting_crawler import AccountingCrawler
+from allmydata.storage.account import Account
+
+
+class Accountant(service.MultiService):
+ def __init__(self, storage_server, dbfile, statefile):
+ service.MultiService.__init__(self)
+ self.storage_server = storage_server
+ self._leasedb = LeaseDB(dbfile)
+ self._active_accounts = weakref.WeakValueDictionary()
+ self._anonymous_account = Account(LeaseDB.ANONYMOUS_ACCOUNTID, None,
+ self.storage_server, self._leasedb)
+ self._starter_account = Account(LeaseDB.STARTER_LEASE_ACCOUNTID, None,
+ self.storage_server, self._leasedb)
+
+ crawler = AccountingCrawler(storage_server, statefile, self._leasedb)
+ self._accounting_crawler = crawler
+ crawler.setServiceParent(self)
+
+ def get_leasedb(self):
+ return self._leasedb
+
+ def set_expiration_policy(self, policy):
+ self._accounting_crawler.set_expiration_policy(policy)
+
+ def get_anonymous_account(self):
+ return self._anonymous_account
+
+ def get_starter_account(self):
+ return self._starter_account
+
+ def get_accounting_crawler(self):
+ return self._accounting_crawler
+
+ # methods used by admin interfaces
+ def get_all_accounts(self):
+ for ownerid, pubkey_vs in self._leasedb.get_all_accounts():
+ if pubkey_vs in self._active_accounts:
+ yield self._active_accounts[pubkey_vs]
+ else:
+ yield Account(ownerid, pubkey_vs,
+ self.storage_server, self._leasedb)
--- /dev/null
+
+import os, time
+
+from twisted.internet import defer
+
+from allmydata.util.deferredutil import for_items
+from allmydata.util.fileutil import get_used_space
+from allmydata.util import log
+from allmydata.storage.crawler import ShareCrawler
+from allmydata.storage.common import si_a2b
+from allmydata.storage.leasedb import SHARETYPES, SHARETYPE_UNKNOWN
+
+
+class AccountingCrawler(ShareCrawler):
+ """
+ I perform the following functions:
+ - Remove leases that are past their expiration time.
+ - Delete objects containing unleased shares.
+ - Discover shares that have been manually added to storage.
+ - Discover shares that are present when a storage server is upgraded from
+ a pre-leasedb version, and give them "starter leases".
+ - Recover from a situation where the leasedb is lost or detectably
+ corrupted. This is handled in the same way as upgrading.
+ - Detect shares that have unexpectedly disappeared from storage.
+ """
+
+ slow_start = 600 # don't start crawling for 10 minutes after startup
+ minimum_cycle_time = 12*60*60 # not more than twice per day
+
+ def __init__(self, server, statefile, leasedb):
+ ShareCrawler.__init__(self, server, statefile)
+ self._leasedb = leasedb
+
+ def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
+ # assume that we can list every prefixdir in this prefix quickly.
+ # Otherwise we have to retain more state between timeslices.
+
+ # we define "shareid" as (SI string, shnum)
+ disk_shares = set() # shareid
+ for si_s in buckets:
+ bucketdir = os.path.join(prefixdir, si_s)
+ for sharefile in os.listdir(bucketdir):
+ try:
+ shnum = int(sharefile)
+ except ValueError:
+ continue # non-numeric means not a sharefile
+ shareid = (si_s, shnum)
+ disk_shares.add(shareid)
+
+ # now check the database for everything in this prefix
+ db_sharemap = self._leasedb.get_shares_for_prefix(prefix)
+ db_shares = set(db_sharemap)
+
+ rec = self.state["cycle-to-date"]["space-recovered"]
+ examined_sharesets = [set() for st in xrange(len(SHARETYPES))]
+
+ # The lease crawler used to calculate the lease age histogram while
+ # crawling shares, and tests currently rely on that, but it would be
+ # more efficient to maintain the histogram as leases are added,
+ # updated, and removed.
+ for key, value in db_sharemap.iteritems():
+ (si_s, shnum) = key
+ (used_space, sharetype) = value
+
+ examined_sharesets[sharetype].add(si_s)
+
+ for age in self._leasedb.get_lease_ages(si_a2b(si_s), shnum, start_slice):
+ self.add_lease_age_to_histogram(age)
+
+ self.increment(rec, "examined-shares", 1)
+ self.increment(rec, "examined-sharebytes", used_space)
+ self.increment(rec, "examined-shares-" + SHARETYPES[sharetype], 1)
+ self.increment(rec, "examined-sharebytes-" + SHARETYPES[sharetype], used_space)
+
+ self.increment(rec, "examined-buckets", sum([len(s) for s in examined_sharesets]))
+ for st in SHARETYPES:
+ self.increment(rec, "examined-buckets-" + SHARETYPES[st], len(examined_sharesets[st]))
+
+ # add new shares to the DB
+ new_shares = disk_shares - db_shares
+ for (si_s, shnum) in new_shares:
+ sharefile = os.path.join(prefixdir, si_s, str(shnum))
+ used_space = get_used_space(sharefile)
+ # FIXME
+ sharetype = SHARETYPE_UNKNOWN
+ self._leasedb.add_new_share(si_a2b(si_s), shnum, used_space, sharetype)
+ self._leasedb.add_starter_lease(si_s, shnum)
+
+ # remove disappeared shares from DB
+ disappeared_shares = db_shares - disk_shares
+ for (si_s, shnum) in disappeared_shares:
+ log.msg(format="share SI=%(si_s)s shnum=%(shnum)s unexpectedly disappeared",
+ si_s=si_s, shnum=shnum, level=log.WEIRD)
+ self._leasedb.remove_deleted_share(si_a2b(si_s), shnum)
+
+ recovered_sharesets = [set() for st in xrange(len(SHARETYPES))]
+
+ def _delete_share(ign, key, value):
+ (si_s, shnum) = key
+ (used_space, sharetype) = value
+ storage_index = si_a2b(si_s)
+ d2 = defer.succeed(None)
+ def _mark_and_delete(ign):
+ self._leasedb.mark_share_as_going(storage_index, shnum)
+ return self.server.delete_share(storage_index, shnum)
+ d2.addCallback(_mark_and_delete)
+ def _deleted(ign):
+ self._leasedb.remove_deleted_share(storage_index, shnum)
+
+ recovered_sharesets[sharetype].add(si_s)
+
+ self.increment(rec, "actual-shares", 1)
+ self.increment(rec, "actual-sharebytes", used_space)
+ self.increment(rec, "actual-shares-" + SHARETYPES[sharetype], 1)
+ self.increment(rec, "actual-sharebytes-" + SHARETYPES[sharetype], used_space)
+ def _not_deleted(f):
+ log.err(format="accounting crawler could not delete share SI=%(si_s)s shnum=%(shnum)s",
+ si_s=si_s, shnum=shnum, failure=f, level=log.WEIRD)
+ try:
+ self._leasedb.mark_share_as_stable(storage_index, shnum)
+ except Exception, e:
+ log.err(e)
+ # discard the failure
+ d2.addCallbacks(_deleted, _not_deleted)
+ return d2
+
+ unleased_sharemap = self._leasedb.get_unleased_shares_for_prefix(prefix)
+ d = for_items(_delete_share, unleased_sharemap)
+
+ def _inc_recovered_sharesets(ign):
+ self.increment(rec, "actual-buckets", sum([len(s) for s in recovered_sharesets]))
+ for st in SHARETYPES:
+ self.increment(rec, "actual-buckets-" + SHARETYPES[st], len(recovered_sharesets[st]))
+ d.addCallback(_inc_recovered_sharesets)
+ return d
+
+ # these methods are for outside callers to use
+
+ def set_expiration_policy(self, policy):
+ self._expiration_policy = policy
+
+ def get_expiration_policy(self):
+ return self._expiration_policy
+
+ def is_expiration_enabled(self):
+ return self._expiration_policy.is_enabled()
+
+ def db_is_incomplete(self):
+ # don't bother looking at the sqlite database: it's certainly not
+ # complete.
+ return self.state["last-cycle-finished"] is None
+
+ def increment(self, d, k, delta=1):
+ if k not in d:
+ d[k] = 0
+ d[k] += delta
+
+ def add_lease_age_to_histogram(self, age):
+ bin_interval = 24*60*60
+ bin_number = int(age/bin_interval)
+ bin_start = bin_number * bin_interval
+ bin_end = bin_start + bin_interval
+ k = (bin_start, bin_end)
+ self.increment(self.state["cycle-to-date"]["lease-age-histogram"], k, 1)
+
+ def convert_lease_age_histogram(self, lah):
+ # convert { (minage,maxage) : count } into [ (minage,maxage,count) ]
+ # since the former is not JSON-safe (JSON dictionaries must have
+ # string keys).
+ json_safe_lah = []
+ for k in sorted(lah):
+ (minage,maxage) = k
+ json_safe_lah.append( (minage, maxage, lah[k]) )
+ return json_safe_lah
+
+ def add_initial_state(self):
+ # we fill ["cycle-to-date"] here (even though they will be reset in
+ # self.started_cycle) just in case someone grabs our state before we
+ # get started: unit tests do this
+ so_far = self.create_empty_cycle_dict()
+ self.state.setdefault("cycle-to-date", so_far)
+ # in case we upgrade the code while a cycle is in progress, update
+ # the keys individually
+ for k in so_far:
+ self.state["cycle-to-date"].setdefault(k, so_far[k])
+
+ def create_empty_cycle_dict(self):
+ recovered = self.create_empty_recovered_dict()
+ so_far = {"corrupt-shares": [],
+ "space-recovered": recovered,
+ "lease-age-histogram": {}, # (minage,maxage)->count
+ }
+ return so_far
+
+ def create_empty_recovered_dict(self):
+ recovered = {}
+ for a in ("actual", "examined"):
+ for b in ("buckets", "shares", "diskbytes"):
+ recovered["%s-%s" % (a, b)] = 0
+ for st in SHARETYPES:
+ recovered["%s-%s-%s" % (a, b, SHARETYPES[st])] = 0
+ return recovered
+
+ def started_cycle(self, cycle):
+ self.state["cycle-to-date"] = self.create_empty_cycle_dict()
+
+ current_time = time.time()
+ self._expiration_policy.remove_expired_leases(self._leasedb, current_time)
+
+ def finished_cycle(self, cycle):
+ # add to our history state, prune old history
+ h = {}
+
+ start = self.state["current-cycle-start-time"]
+ now = time.time()
+ h["cycle-start-finish-times"] = (start, now)
+ ep = self.get_expiration_policy()
+ h["expiration-enabled"] = ep.is_enabled()
+ h["configured-expiration-mode"] = ep.get_parameters()
+
+ s = self.state["cycle-to-date"]
+
+ # state["lease-age-histogram"] is a dictionary (mapping
+ # (minage,maxage) tuple to a sharecount), but we report
+ # self.get_state()["lease-age-histogram"] as a list of
+ # (min,max,sharecount) tuples, because JSON can handle that better.
+ # We record the list-of-tuples form into the history for the same
+ # reason.
+ lah = self.convert_lease_age_histogram(s["lease-age-histogram"])
+ h["lease-age-histogram"] = lah
+ h["corrupt-shares"] = s["corrupt-shares"][:]
+ # note: if ["shares-recovered"] ever acquires an internal dict, this
+ # copy() needs to become a deepcopy
+ h["space-recovered"] = s["space-recovered"].copy()
+
+ self._leasedb.add_history_entry(cycle, h)
+
+ def get_state(self):
+ """In addition to the crawler state described in
+ ShareCrawler.get_state(), I return the following keys which are
+ specific to the lease-checker/expirer. Note that the non-history keys
+ (with 'cycle' in their names) are only present if a cycle is currently
+ running. If the crawler is between cycles, it is appropriate to show
+ the latest item in the 'history' key instead. Also note that each
+ history item has all the data in the 'cycle-to-date' value, plus
+ cycle-start-finish-times.
+
+ cycle-to-date:
+ expiration-enabled
+ configured-expiration-mode
+ lease-age-histogram (list of (minage,maxage,sharecount) tuples)
+ corrupt-shares (list of (si_b32,shnum) tuples, minimal verification)
+ space-recovered
+
+ estimated-remaining-cycle:
+ # Values may be None if not enough data has been gathered to
+ # produce an estimate.
+ space-recovered
+
+ estimated-current-cycle:
+ # cycle-to-date plus estimated-remaining. Values may be None if
+ # not enough data has been gathered to produce an estimate.
+ space-recovered
+
+ history: maps cyclenum to a dict with the following keys:
+ cycle-start-finish-times
+ expiration-enabled
+ configured-expiration-mode
+ lease-age-histogram
+ corrupt-shares
+ space-recovered
+
+ The 'space-recovered' structure is a dictionary with the following
+ keys:
+ # 'examined' is what was looked at
+ examined-buckets, examined-buckets-$SHARETYPE
+ examined-shares, examined-shares-$SHARETYPE
+ examined-diskbytes, examined-diskbytes-$SHARETYPE
+
+ # 'actual' is what was deleted
+ actual-buckets, actual-buckets-$SHARETYPE
+ actual-shares, actual-shares-$SHARETYPE
+ actual-diskbytes, actual-diskbytes-$SHARETYPE
+
+ Note that the preferred terminology has changed since these keys
+ were defined; "buckets" refers to what are now called sharesets,
+ and "diskbytes" refers to bytes of used space on the storage backend,
+ which is not necessarily the disk backend.
+
+ The 'original-*' and 'configured-*' keys that were populated in
+ pre-leasedb versions are no longer supported.
+ The 'leases-per-share-histogram' is also no longer supported.
+ """
+ progress = self.get_progress()
+
+ state = ShareCrawler.get_state(self) # does a shallow copy
+ state["history"] = self._leasedb.get_history()
+
+ if not progress["cycle-in-progress"]:
+ del state["cycle-to-date"]
+ return state
+
+ so_far = state["cycle-to-date"].copy()
+ state["cycle-to-date"] = so_far
+
+ lah = so_far["lease-age-histogram"]
+ so_far["lease-age-histogram"] = self.convert_lease_age_histogram(lah)
+ so_far["expiration-enabled"] = self._expiration_policy.is_enabled()
+ so_far["configured-expiration-mode"] = self._expiration_policy.get_parameters()
+
+ so_far_sr = so_far["space-recovered"]
+ remaining_sr = {}
+ remaining = {"space-recovered": remaining_sr}
+ cycle_sr = {}
+ cycle = {"space-recovered": cycle_sr}
+
+ if progress["cycle-complete-percentage"] > 0.0:
+ pc = progress["cycle-complete-percentage"] / 100.0
+ m = (1-pc)/pc
+ for a in ("actual", "examined"):
+ for b in ("buckets", "shares", "diskbytes"):
+ k = "%s-%s" % (a, b)
+ remaining_sr[k] = m * so_far_sr[k]
+ cycle_sr[k] = so_far_sr[k] + remaining_sr[k]
+ for st in SHARETYPES:
+ k = "%s-%s-%s" % (a, b, SHARETYPES[st])
+ remaining_sr[k] = m * so_far_sr[k]
+ cycle_sr[k] = so_far_sr[k] + remaining_sr[k]
+ else:
+ for a in ("actual", "examined"):
+ for b in ("buckets", "shares", "diskbytes"):
+ k = "%s-%s" % (a, b)
+ remaining_sr[k] = None
+ cycle_sr[k] = None
+ for st in SHARETYPES:
+ k = "%s-%s-%s" % (a, b, SHARETYPES[st])
+ remaining_sr[k] = None
+ cycle_sr[k] = None
+
+ state["estimated-remaining-cycle"] = remaining
+ state["estimated-current-cycle"] = cycle
+ return state
--- /dev/null
+
+import time
+from types import NoneType
+
+from allmydata.util.assertutil import precondition
+from allmydata.util import time_format
+from allmydata.web.common import abbreviate_time
+
+
+class ExpirationPolicy(object):
+ def __init__(self, enabled=False, mode="age", override_lease_duration=None,
+ cutoff_date=None):
+ precondition(isinstance(enabled, bool), enabled=enabled)
+ precondition(mode in ("age", "cutoff-date"),
+ "GC mode %r must be 'age' or 'cutoff-date'" % (mode,))
+ precondition(isinstance(override_lease_duration, (int, NoneType)),
+ override_lease_duration=override_lease_duration)
+ precondition(isinstance(cutoff_date, int) or (mode != "cutoff-date" and cutoff_date is None),
+ cutoff_date=cutoff_date)
+
+ self._enabled = enabled
+ self._mode = mode
+ self._override_lease_duration = override_lease_duration
+ self._cutoff_date = cutoff_date
+
+ def remove_expired_leases(self, leasedb, current_time):
+ if not self._enabled:
+ return
+
+ if self._mode == "age":
+ if self._override_lease_duration is not None:
+ leasedb.remove_leases_by_renewal_time(current_time - self._override_lease_duration)
+ else:
+ leasedb.remove_leases_by_expiration_time(current_time)
+ else:
+ # self._mode == "cutoff-date"
+ leasedb.remove_leases_by_renewal_time(self._cutoff_date)
+
+ def get_parameters(self):
+ """
+ Return the parameters as represented in the "configured-expiration-mode" field
+ of a history entry.
+ """
+ return (self._mode,
+ self._override_lease_duration,
+ self._cutoff_date,
+ self._enabled and ("mutable", "immutable") or ())
+
+ def is_enabled(self):
+ return self._enabled
+
+ def describe_enabled(self):
+ if self.is_enabled():
+ return "Enabled: expired leases will be removed"
+ else:
+ return "Disabled: scan-only mode, no leases will be removed"
+
+ def describe_expiration(self):
+ if self._mode == "age":
+ if self._override_lease_duration is None:
+ return ("Leases will expire naturally, probably 31 days after "
+ "creation or renewal.")
+ else:
+ return ("Leases created or last renewed more than %s ago "
+ "will be considered expired."
+ % abbreviate_time(self._override_lease_duration))
+ else:
+ localizedutcdate = time.strftime("%d-%b-%Y", time.gmtime(self._cutoff_date))
+ isoutcdate = time_format.iso_utc_date(self._cutoff_date)
+ return ("Leases created or last renewed before %s (%s) UTC "
+ "will be considered expired." % (isoutcdate, localizedutcdate))
--- /dev/null
+
+import time, simplejson
+
+from allmydata.util.assertutil import _assert
+from allmydata.util import dbutil
+from allmydata.storage.common import si_b2a
+
+
+class NonExistentShareError(Exception):
+ def __init__(self, si_s, shnum):
+ Exception.__init__(self, si_s, shnum)
+ self.si_s = si_s
+ self.shnum = shnum
+
+ def __str__(self):
+ return "can't find SI=%r shnum=%r in `shares` table" % (self.si_s, self.shnum)
+
+
+class LeaseInfo(object):
+ def __init__(self, storage_index, shnum, owner_num, renewal_time, expiration_time):
+ self.storage_index = storage_index
+ self.shnum = shnum
+ self.owner_num = owner_num
+ self.renewal_time = renewal_time
+ self.expiration_time = expiration_time
+
+
+def int_or_none(s):
+ if s is None:
+ return s
+ return int(s)
+
+
+SHARETYPE_IMMUTABLE = 0
+SHARETYPE_MUTABLE = 1
+SHARETYPE_CORRUPTED = 2
+SHARETYPE_UNKNOWN = 3
+
+SHARETYPES = { SHARETYPE_IMMUTABLE: 'immutable',
+ SHARETYPE_MUTABLE: 'mutable',
+ SHARETYPE_CORRUPTED: 'corrupted',
+ SHARETYPE_UNKNOWN: 'unknown' }
+
+STATE_COMING = 0
+STATE_STABLE = 1
+STATE_GOING = 2
+
+
+LEASE_SCHEMA_V1 = """
+CREATE TABLE `version`
+(
+ version INTEGER -- contains one row, set to 1
+);
+
+CREATE TABLE `shares`
+(
+ `storage_index` VARCHAR(26) not null,
+ `shnum` INTEGER not null,
+ `prefix` VARCHAR(2) not null,
+ `backend_key` VARCHAR, -- not used by current backends; NULL means '$prefix/$storage_index/$shnum'
+ `used_space` INTEGER not null,
+ `sharetype` INTEGER not null, -- SHARETYPE_*
+ `state` INTEGER not null, -- STATE_*
+ PRIMARY KEY (`storage_index`, `shnum`)
+);
+
+CREATE INDEX `prefix` ON `shares` (`prefix`);
+-- CREATE UNIQUE INDEX `share_id` ON `shares` (`storage_index`,`shnum`);
+
+CREATE TABLE `leases`
+(
+ `id` INTEGER PRIMARY KEY AUTOINCREMENT,
+ `storage_index` VARCHAR(26) not null,
+ `shnum` INTEGER not null,
+ `account_id` INTEGER not null,
+ `renewal_time` INTEGER not null, -- duration is implicit: expiration-renewal
+ `expiration_time` INTEGER, -- seconds since epoch; NULL means the end of time
+ FOREIGN KEY (`storage_index`, `shnum`) REFERENCES `shares` (`storage_index`, `shnum`),
+ FOREIGN KEY (`account_id`) REFERENCES `accounts` (`id`)
+);
+
+CREATE INDEX `account_id` ON `leases` (`account_id`);
+CREATE INDEX `expiration_time` ON `leases` (`expiration_time`);
+
+CREATE TABLE accounts
+(
+ `id` INTEGER PRIMARY KEY AUTOINCREMENT,
+ `pubkey_vs` VARCHAR(52),
+ `creation_time` INTEGER
+);
+CREATE UNIQUE INDEX `pubkey_vs` ON `accounts` (`pubkey_vs`);
+
+CREATE TABLE account_attributes
+(
+ `id` INTEGER PRIMARY KEY AUTOINCREMENT,
+ `account_id` INTEGER,
+ `name` VARCHAR(20),
+ `value` VARCHAR(20) -- actually anything: usually string, unicode, integer
+);
+CREATE UNIQUE INDEX `account_attr` ON `account_attributes` (`account_id`, `name`);
+
+INSERT INTO `accounts` VALUES (0, "anonymous", 0);
+INSERT INTO `accounts` VALUES (1, "starter", 0);
+
+CREATE TABLE crawler_history
+(
+ `cycle` INTEGER,
+ `json` TEXT
+);
+CREATE UNIQUE INDEX `cycle` ON `crawler_history` (`cycle`);
+"""
+
+DAY = 24*60*60
+MONTH = 30*DAY
+
+class LeaseDB:
+ ANONYMOUS_ACCOUNTID = 0
+ STARTER_LEASE_ACCOUNTID = 1
+ STARTER_LEASE_DURATION = 2*MONTH
+
+ def __init__(self, dbfile):
+ (self._sqlite,
+ self._db) = dbutil.get_db(dbfile, create_version=(LEASE_SCHEMA_V1, 1))
+ self._cursor = self._db.cursor()
+ self.debug = False
+ self.retained_history_entries = 10
+
+ # share management
+
+ def get_shares_for_prefix(self, prefix):
+ """
+ Returns a dict mapping (si_s, shnum) pairs to (used_space, sharetype) pairs.
+ """
+ self._cursor.execute("SELECT `storage_index`,`shnum`, `used_space`, `sharetype`"
+ " FROM `shares`"
+ " WHERE `prefix` == ?",
+ (prefix,))
+ db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype)))
+ for (si_s, shnum, used_space, sharetype) in self._cursor.fetchall()])
+ return db_sharemap
+
+ def add_new_share(self, storage_index, shnum, used_space, sharetype):
+ si_s = si_b2a(storage_index)
+ prefix = si_s[:2]
+ if self.debug: print "ADD_NEW_SHARE", prefix, si_s, shnum, used_space, sharetype
+ backend_key = None
+ # This needs to be an INSERT OR REPLACE because it is possible for add_new_share
+ # to be called when this share is already in the database (but not on disk).
+ self._cursor.execute("INSERT OR REPLACE INTO `shares`"
+ " VALUES (?,?,?,?,?,?,?)",
+ (si_s, shnum, prefix, backend_key, used_space, sharetype, STATE_COMING))
+
+ def add_starter_lease(self, storage_index, shnum):
+ si_s = si_b2a(storage_index)
+ if self.debug: print "ADD_STARTER_LEASE", si_s, shnum
+ self._dirty = True
+ renewal_time = time.time()
+ self._cursor.execute("INSERT INTO `leases`"
+ " VALUES (?,?,?,?,?,?)",
+ (None, si_s, shnum, self.STARTER_LEASE_ACCOUNTID,
+ int(renewal_time), int(renewal_time + self.STARTER_LEASE_DURATION)))
+ self._db.commit()
+
+ def mark_share_as_stable(self, storage_index, shnum, used_space=None, backend_key=None):
+ """
+ Call this method after adding a share to backend storage.
+ """
+ si_s = si_b2a(storage_index)
+ if self.debug: print "MARK_SHARE_AS_STABLE", si_s, shnum, used_space
+ self._dirty = True
+ if used_space is not None:
+ self._cursor.execute("UPDATE `shares` SET `state`=?, `used_space`=?, `backend_key`=?"
+ " WHERE `storage_index`=? AND `shnum`=? AND `state`!=?",
+ (STATE_STABLE, used_space, backend_key, si_s, shnum, STATE_GOING))
+ else:
+ _assert(backend_key is None, backend_key=backend_key)
+ self._cursor.execute("UPDATE `shares` SET `state`=?"
+ " WHERE `storage_index`=? AND `shnum`=? AND `state`!=?",
+ (STATE_STABLE, si_s, shnum, STATE_GOING))
+ self._db.commit()
+ if self._cursor.rowcount < 1:
+ raise NonExistentShareError(si_s, shnum)
+
+ def mark_share_as_going(self, storage_index, shnum):
+ """
+ Call this method and commit before deleting a share from backend storage,
+ then call remove_deleted_share.
+ """
+ si_s = si_b2a(storage_index)
+ if self.debug: print "MARK_SHARE_AS_GOING", si_s, shnum
+ self._cursor.execute("UPDATE `shares` SET `state`=?"
+ " WHERE `storage_index`=? AND `shnum`=? AND `state`!=?",
+ (STATE_GOING, si_s, shnum, STATE_COMING))
+ self._db.commit()
+ if self._cursor.rowcount < 1:
+ raise NonExistentShareError(si_s, shnum)
+
+ def remove_deleted_share(self, storage_index, shnum):
+ si_s = si_b2a(storage_index)
+ if self.debug: print "REMOVE_DELETED_SHARE", si_s, shnum
+ # delete leases first to maintain integrity constraint
+ self._cursor.execute("DELETE FROM `leases`"
+ " WHERE `storage_index`=? AND `shnum`=?",
+ (si_s, shnum))
+ try:
+ self._cursor.execute("DELETE FROM `shares`"
+ " WHERE `storage_index`=? AND `shnum`=?",
+ (si_s, shnum))
+ except Exception:
+ self._db.rollback() # roll back the lease deletion
+ raise
+ else:
+ self._db.commit()
+
+ def change_share_space(self, storage_index, shnum, used_space):
+ si_s = si_b2a(storage_index)
+ if self.debug: print "CHANGE_SHARE_SPACE", si_s, shnum, used_space
+ self._cursor.execute("UPDATE `shares` SET `used_space`=?"
+ " WHERE `storage_index`=? AND `shnum`=?",
+ (used_space, si_s, shnum))
+ self._db.commit()
+ if self._cursor.rowcount < 1:
+ raise NonExistentShareError(si_s, shnum)
+
+ # lease management
+
+ def add_or_renew_leases(self, storage_index, shnum, ownerid,
+ renewal_time, expiration_time):
+ """
+ shnum=None means renew leases on all shares; do nothing if there are no shares for this storage_index in the `shares` table.
+
+ Raises NonExistentShareError if a specific shnum is given and that share does not exist in the `shares` table.
+ """
+ si_s = si_b2a(storage_index)
+ if self.debug: print "ADD_OR_RENEW_LEASES", si_s, shnum, ownerid, renewal_time, expiration_time
+ if shnum is None:
+ self._cursor.execute("SELECT `storage_index`, `shnum` FROM `shares`"
+ " WHERE `storage_index`=?",
+ (si_s,))
+ rows = self._cursor.fetchall()
+ else:
+ self._cursor.execute("SELECT `storage_index`, `shnum` FROM `shares`"
+ " WHERE `storage_index`=? AND `shnum`=?",
+ (si_s, shnum))
+ rows = self._cursor.fetchall()
+ if not rows:
+ raise NonExistentShareError(si_s, shnum)
+
+ for (found_si_s, found_shnum) in rows:
+ _assert(si_s == found_si_s, si_s=si_s, found_si_s=found_si_s)
+ # XXX can we simplify this by using INSERT OR REPLACE?
+ self._cursor.execute("SELECT `id` FROM `leases`"
+ " WHERE `storage_index`=? AND `shnum`=? AND `account_id`=?",
+ (si_s, found_shnum, ownerid))
+ row = self._cursor.fetchone()
+ if row:
+ # Note that unlike the pre-LeaseDB code, this allows leases to be backdated.
+ # There is currently no way for a client to specify lease duration, and so
+ # backdating can only happen in normal operation if there is a timequake on
+ # the server and time goes backward by more than 31 days. This needs to be
+ # revisited for ticket #1816, which would allow the client to request a lease
+ # duration.
+ leaseid = row[0]
+ self._cursor.execute("UPDATE `leases` SET `renewal_time`=?, `expiration_time`=?"
+ " WHERE `id`=?",
+ (renewal_time, expiration_time, leaseid))
+ else:
+ self._cursor.execute("INSERT INTO `leases` VALUES (?,?,?,?,?,?)",
+ (None, si_s, found_shnum, ownerid, renewal_time, expiration_time))
+ self._db.commit()
+
+ def get_leases(self, storage_index, ownerid):
+ si_s = si_b2a(storage_index)
+ self._cursor.execute("SELECT `shnum`, `account_id`, `renewal_time`, `expiration_time` FROM `leases`"
+ " WHERE `storage_index`=? AND `account_id`=?",
+ (si_s, ownerid))
+ rows = self._cursor.fetchall()
+ def _to_LeaseInfo(row):
+ (shnum, account_id, renewal_time, expiration_time) = tuple(row)
+ return LeaseInfo(storage_index, int(shnum), int(account_id), float(renewal_time), float(expiration_time))
+ return map(_to_LeaseInfo, rows)
+
+ def get_lease_ages(self, storage_index, shnum, now):
+ si_s = si_b2a(storage_index)
+ self._cursor.execute("SELECT `renewal_time` FROM `leases`"
+ " WHERE `storage_index`=? AND `shnum`=?",
+ (si_s, shnum))
+ rows = self._cursor.fetchall()
+ def _to_age(row):
+ return now - float(row[0])
+ return map(_to_age, rows)
+
+ def get_unleased_shares_for_prefix(self, prefix):
+ if self.debug: print "GET_UNLEASED_SHARES_FOR_PREFIX", prefix
+ # This would be simpler, but it doesn't work because 'NOT IN' doesn't support multiple columns.
+ #query = ("SELECT `storage_index`, `shnum`, `used_space`, `sharetype` FROM `shares`"
+ # " WHERE (`storage_index`, `shnum`) NOT IN (SELECT DISTINCT `storage_index`, `shnum` FROM `leases`)")
+
+ # This "negative join" should be equivalent.
+ self._cursor.execute("SELECT DISTINCT s.storage_index, s.shnum, s.used_space, s.sharetype FROM `shares` s LEFT JOIN `leases` l"
+ " ON (s.storage_index = l.storage_index AND s.shnum = l.shnum)"
+ " WHERE s.prefix = ? AND l.storage_index IS NULL",
+ (prefix,))
+ db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype)))
+ for (si_s, shnum, used_space, sharetype) in self._cursor.fetchall()])
+ return db_sharemap
+
+ def remove_leases_by_renewal_time(self, renewal_cutoff_time):
+ if self.debug: print "REMOVE_LEASES_BY_RENEWAL_TIME", renewal_cutoff_time
+ self._cursor.execute("DELETE FROM `leases` WHERE `renewal_time` < ?",
+ (renewal_cutoff_time,))
+ self._db.commit()
+
+ def remove_leases_by_expiration_time(self, expiration_cutoff_time):
+ if self.debug: print "REMOVE_LEASES_BY_EXPIRATION_TIME", expiration_cutoff_time
+ self._cursor.execute("DELETE FROM `leases` WHERE `expiration_time` IS NOT NULL AND `expiration_time` < ?",
+ (expiration_cutoff_time,))
+ self._db.commit()
+
+ # history
+
+ def add_history_entry(self, cycle, entry):
+ if self.debug: print "ADD_HISTORY_ENTRY", cycle, entry
+ json = simplejson.dumps(entry)
+ self._cursor.execute("SELECT `cycle` FROM `crawler_history`")
+ rows = self._cursor.fetchall()
+ if len(rows) >= self.retained_history_entries:
+ first_cycle_to_retain = list(sorted(rows))[-(self.retained_history_entries - 1)][0]
+ self._cursor.execute("DELETE FROM `crawler_history` WHERE `cycle` < ?",
+ (first_cycle_to_retain,))
+ self._db.commit()
+
+ try:
+ self._cursor.execute("INSERT OR REPLACE INTO `crawler_history` VALUES (?,?)",
+ (cycle, json))
+ except Exception:
+ self._db.rollback() # roll back the deletion of unretained entries
+ raise
+ else:
+ self._db.commit()
+
+ def get_history(self):
+ self._cursor.execute("SELECT `cycle`,`json` FROM `crawler_history`")
+ rows = self._cursor.fetchall()
+ decoded = [(row[0], simplejson.loads(row[1])) for row in rows]
+ return dict(decoded)
+
+ def get_account_creation_time(self, owner_num):
+ self._cursor.execute("SELECT `creation_time` from `accounts`"
+ " WHERE `id`=?",
+ (owner_num,))
+ row = self._cursor.fetchone()
+ if row:
+ return row[0]
+ return None
+
+ def get_all_accounts(self):
+ self._cursor.execute("SELECT `id`,`pubkey_vs`"
+ " FROM `accounts` ORDER BY `id` ASC")
+ return self._cursor.fetchall()