From 592af37ee677c76118e0d05bf13749cd4fe26279 Mon Sep 17 00:00:00 2001 From: David-Sarah Hopwood Date: Wed, 12 Dec 2012 05:17:11 +0000 Subject: [PATCH] Add new files for leasedb. Authors: Brian Warner and David-Sarah Hopwood Signed-off-by: David-Sarah Hopwood --- src/allmydata/storage/account.py | 198 +++++++++++ src/allmydata/storage/accountant.py | 61 ++++ src/allmydata/storage/accounting_crawler.py | 342 +++++++++++++++++++ src/allmydata/storage/expiration.py | 71 ++++ src/allmydata/storage/leasedb.py | 360 ++++++++++++++++++++ 5 files changed, 1032 insertions(+) create mode 100644 src/allmydata/storage/account.py create mode 100644 src/allmydata/storage/accountant.py create mode 100644 src/allmydata/storage/accounting_crawler.py create mode 100644 src/allmydata/storage/expiration.py create mode 100644 src/allmydata/storage/leasedb.py diff --git a/src/allmydata/storage/account.py b/src/allmydata/storage/account.py new file mode 100644 index 00000000..bf82e274 --- /dev/null +++ b/src/allmydata/storage/account.py @@ -0,0 +1,198 @@ + +""" +This file contains the client-facing interface for manipulating shares. It +implements RIStorageServer, and contains an embedded owner id which is used +for all operations that touch leases. Initially, clients will receive a +special 'anonymous' instance of this class with ownerid=0. Later, when the +FURLification dance is established, each client will get a different instance +(with a dedicated ownerid). +""" + +import time + +from foolscap.api import Referenceable + +from zope.interface import implements +from allmydata.interfaces import RIStorageServer + +from allmydata.storage.leasedb import int_or_none +from allmydata.storage.common import si_b2a + + +class Account(Referenceable): + implements(RIStorageServer) + + def __init__(self, owner_num, pubkey_vs, server, leasedb): + self.owner_num = owner_num + self.server = server + self._leasedb = leasedb + # for static accounts ("starter", "anonymous"), pubkey_vs is None, + # and the "connected" attributes are unused + self.pubkey_vs = pubkey_vs + self.connected = False + self.connected_since = None + self.connection = None + self.debug = False + + def is_static(self): + return self.owner_num in (0,1) + + # these methods are called by StorageServer + + def get_owner_num(self): + return self.owner_num + + def get_renewal_and_expiration_times(self): + renewal_time = time.time() + return (renewal_time, renewal_time + 31*24*60*60) + + # immutable.BucketWriter.close() does: + # add_share(), add_or_renew_lease(), mark_share_as_stable() + + # mutable writev() does: + # deleted shares: mark_share_as_going(), remove_share_and_leases() + # new shares: add_share(), add_or_renew_lease(), mark_share_as_stable() + # changed shares: change_share_space(), add_or_renew_lease() + + def add_share(self, storage_index, shnum, used_space, sharetype): + if self.debug: print "ADD_SHARE", si_b2a(storage_index), shnum, used_space, sharetype + self._leasedb.add_new_share(storage_index, shnum, used_space, sharetype) + + def add_or_renew_default_lease(self, storage_index, shnum): + renewal_time, expiration_time = self.get_renewal_and_expiration_times() + return self.add_or_renew_lease(storage_index, shnum, renewal_time, expiration_time) + + def add_or_renew_lease(self, storage_index, shnum, renewal_time, expiration_time): + if self.debug: print "ADD_OR_RENEW_LEASE", si_b2a(storage_index), shnum + self._leasedb.add_or_renew_leases(storage_index, shnum, self.owner_num, + renewal_time, expiration_time) + + def change_share_space(self, storage_index, shnum, used_space): + if self.debug: print "CHANGE_SHARE_SPACE", si_b2a(storage_index), shnum, used_space + self._leasedb.change_share_space(storage_index, shnum, used_space) + + def mark_share_as_stable(self, storage_index, shnum, used_space): + if self.debug: print "MARK_SHARE_AS_STABLE", si_b2a(storage_index), shnum, used_space + self._leasedb.mark_share_as_stable(storage_index, shnum, used_space) + + def mark_share_as_going(self, storage_index, shnum): + if self.debug: print "MARK_SHARE_AS_GOING", si_b2a(storage_index), shnum + self._leasedb.mark_share_as_going(storage_index, shnum) + + def remove_share_and_leases(self, storage_index, shnum): + if self.debug: print "REMOVE_SHARE_AND_LEASES", si_b2a(storage_index), shnum + self._leasedb.remove_deleted_share(storage_index, shnum) + + # remote_add_lease() and remote_renew_lease() do this + def add_lease_for_bucket(self, storage_index): + if self.debug: print "ADD_LEASE_FOR_BUCKET", si_b2a(storage_index) + renewal_time, expiration_time = self.get_renewal_and_expiration_times() + self._leasedb.add_or_renew_leases(storage_index, None, + self.owner_num, renewal_time, expiration_time) + + # The following RIStorageServer methods are called by remote clients + + def remote_get_version(self): + return self.server.client_get_version(self) + + # all other RIStorageServer methods should pass through to self.server + # but (except for remote_advise_corrupt_share) add the account as a final + # argument. + + def remote_allocate_buckets(self, storage_index, renew_secret, cancel_secret, + sharenums, allocated_size, canary): + if self.debug: print "REMOTE_ALLOCATE_BUCKETS", si_b2a(storage_index) + return self.server.client_allocate_buckets(storage_index, + sharenums, allocated_size, + canary, self) + + def remote_add_lease(self, storage_index, renew_secret, cancel_secret): + if self.debug: print "REMOTE_ADD_LEASE", si_b2a(storage_index) + self.add_lease_for_bucket(storage_index) + return None + + def remote_renew_lease(self, storage_index, renew_secret): + self.add_lease_for_bucket(storage_index) + return None + + def remote_get_buckets(self, storage_index): + return self.server.client_get_buckets(storage_index) + + def remote_slot_testv_and_readv_and_writev(self, storage_index, secrets, + test_and_write_vectors, read_vector): + write_enabler = secrets[0] + return self.server.client_slot_testv_and_readv_and_writev( + storage_index, write_enabler, test_and_write_vectors, read_vector, self) + + def remote_slot_readv(self, storage_index, shares, readv): + return self.server.client_slot_readv(storage_index, shares, readv, self) + + def remote_advise_corrupt_share(self, share_type, storage_index, shnum, reason): + # this doesn't use the account. + return self.server.client_advise_corrupt_share( + share_type, storage_index, shnum, reason) + + def get_account_creation_time(self): + return self._leasedb.get_account_creation_time(self.owner_num) + + def get_id(self): + return self.pubkey_vs + + def get_leases(self, storage_index): + return self._leasedb.get_leases(storage_index, self.owner_num) + + def connection_from(self, rx): + self.connected = True + self.connected_since = time.time() + self.connection = rx + #rhost = rx.getPeer() + #from twisted.internet import address + #if isinstance(rhost, address.IPv4Address): + # rhost_s = "%s:%d" % (rhost.host, rhost.port) + #elif "LoopbackAddress" in str(rhost): + # rhost_s = "loopback" + #else: + # rhost_s = str(rhost) + #self.set_account_attribute("last_connected_from", rhost_s) + rx.notifyOnDisconnect(self._disconnected) + + def _disconnected(self): + self.connected = False + self.connected_since = None + self.connection = None + #self.set_account_attribute("last_seen", int(time.time())) + self.disconnected_since = None + + def get_connection_status(self): + # starts as: connected=False, connected_since=None, + # last_connected_from=None, last_seen=None + # while connected: connected=True, connected_since=START, + # last_connected_from=HOST, last_seen=IGNOREME + # after disconnect: connected=False, connected_since=None, + # last_connected_from=HOST, last_seen=STOP + + #last_seen = int_or_none(self.get_account_attribute("last_seen")) + #last_connected_from = self.get_account_attribute("last_connected_from") + created = int_or_none(self.get_account_creation_time()) + + return {"connected": self.connected, + "connected_since": self.connected_since, + #"last_connected_from": last_connected_from, + #"last_seen": last_seen, + "created": created, + } + + def get_stats(self): + return self.server.get_stats() + + def get_accounting_crawler(self): + return self.server.get_accounting_crawler() + + def get_expiration_policy(self): + return self.server.get_expiration_policy() + + def get_bucket_counter(self): + return self.server.get_bucket_counter() + + def get_nodeid(self): + return self.server.get_nodeid() diff --git a/src/allmydata/storage/accountant.py b/src/allmydata/storage/accountant.py new file mode 100644 index 00000000..6521ee4d --- /dev/null +++ b/src/allmydata/storage/accountant.py @@ -0,0 +1,61 @@ + +""" +This file contains the cross-account management code. It creates per-client +Account objects for the FURLification dance, as well as the 'anonymous +account for use until the server admin decides to make accounting mandatory. +It also provides usage statistics and reports for the status UI. This will +also implement the backend of the control UI (once we figure out how to +express that: maybe a CLI command, or tahoe.cfg settings, or a web frontend), +for things like enabling/disabling accounts and setting quotas. + +The name 'accountant.py' could be better, preferably something that doesn't +share a prefix with 'account.py' so my tab-autocomplete will work nicely. +""" + +import weakref + +from twisted.application import service + +from allmydata.storage.leasedb import LeaseDB +from allmydata.storage.accounting_crawler import AccountingCrawler +from allmydata.storage.account import Account + + +class Accountant(service.MultiService): + def __init__(self, storage_server, dbfile, statefile): + service.MultiService.__init__(self) + self.storage_server = storage_server + self._leasedb = LeaseDB(dbfile) + self._active_accounts = weakref.WeakValueDictionary() + self._anonymous_account = Account(LeaseDB.ANONYMOUS_ACCOUNTID, None, + self.storage_server, self._leasedb) + self._starter_account = Account(LeaseDB.STARTER_LEASE_ACCOUNTID, None, + self.storage_server, self._leasedb) + + crawler = AccountingCrawler(storage_server, statefile, self._leasedb) + self._accounting_crawler = crawler + crawler.setServiceParent(self) + + def get_leasedb(self): + return self._leasedb + + def set_expiration_policy(self, policy): + self._accounting_crawler.set_expiration_policy(policy) + + def get_anonymous_account(self): + return self._anonymous_account + + def get_starter_account(self): + return self._starter_account + + def get_accounting_crawler(self): + return self._accounting_crawler + + # methods used by admin interfaces + def get_all_accounts(self): + for ownerid, pubkey_vs in self._leasedb.get_all_accounts(): + if pubkey_vs in self._active_accounts: + yield self._active_accounts[pubkey_vs] + else: + yield Account(ownerid, pubkey_vs, + self.storage_server, self._leasedb) diff --git a/src/allmydata/storage/accounting_crawler.py b/src/allmydata/storage/accounting_crawler.py new file mode 100644 index 00000000..493dc54b --- /dev/null +++ b/src/allmydata/storage/accounting_crawler.py @@ -0,0 +1,342 @@ + +import os, time + +from twisted.internet import defer + +from allmydata.util.deferredutil import for_items +from allmydata.util.fileutil import get_used_space +from allmydata.util import log +from allmydata.storage.crawler import ShareCrawler +from allmydata.storage.common import si_a2b +from allmydata.storage.leasedb import SHARETYPES, SHARETYPE_UNKNOWN + + +class AccountingCrawler(ShareCrawler): + """ + I perform the following functions: + - Remove leases that are past their expiration time. + - Delete objects containing unleased shares. + - Discover shares that have been manually added to storage. + - Discover shares that are present when a storage server is upgraded from + a pre-leasedb version, and give them "starter leases". + - Recover from a situation where the leasedb is lost or detectably + corrupted. This is handled in the same way as upgrading. + - Detect shares that have unexpectedly disappeared from storage. + """ + + slow_start = 600 # don't start crawling for 10 minutes after startup + minimum_cycle_time = 12*60*60 # not more than twice per day + + def __init__(self, server, statefile, leasedb): + ShareCrawler.__init__(self, server, statefile) + self._leasedb = leasedb + + def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice): + # assume that we can list every prefixdir in this prefix quickly. + # Otherwise we have to retain more state between timeslices. + + # we define "shareid" as (SI string, shnum) + disk_shares = set() # shareid + for si_s in buckets: + bucketdir = os.path.join(prefixdir, si_s) + for sharefile in os.listdir(bucketdir): + try: + shnum = int(sharefile) + except ValueError: + continue # non-numeric means not a sharefile + shareid = (si_s, shnum) + disk_shares.add(shareid) + + # now check the database for everything in this prefix + db_sharemap = self._leasedb.get_shares_for_prefix(prefix) + db_shares = set(db_sharemap) + + rec = self.state["cycle-to-date"]["space-recovered"] + examined_sharesets = [set() for st in xrange(len(SHARETYPES))] + + # The lease crawler used to calculate the lease age histogram while + # crawling shares, and tests currently rely on that, but it would be + # more efficient to maintain the histogram as leases are added, + # updated, and removed. + for key, value in db_sharemap.iteritems(): + (si_s, shnum) = key + (used_space, sharetype) = value + + examined_sharesets[sharetype].add(si_s) + + for age in self._leasedb.get_lease_ages(si_a2b(si_s), shnum, start_slice): + self.add_lease_age_to_histogram(age) + + self.increment(rec, "examined-shares", 1) + self.increment(rec, "examined-sharebytes", used_space) + self.increment(rec, "examined-shares-" + SHARETYPES[sharetype], 1) + self.increment(rec, "examined-sharebytes-" + SHARETYPES[sharetype], used_space) + + self.increment(rec, "examined-buckets", sum([len(s) for s in examined_sharesets])) + for st in SHARETYPES: + self.increment(rec, "examined-buckets-" + SHARETYPES[st], len(examined_sharesets[st])) + + # add new shares to the DB + new_shares = disk_shares - db_shares + for (si_s, shnum) in new_shares: + sharefile = os.path.join(prefixdir, si_s, str(shnum)) + used_space = get_used_space(sharefile) + # FIXME + sharetype = SHARETYPE_UNKNOWN + self._leasedb.add_new_share(si_a2b(si_s), shnum, used_space, sharetype) + self._leasedb.add_starter_lease(si_s, shnum) + + # remove disappeared shares from DB + disappeared_shares = db_shares - disk_shares + for (si_s, shnum) in disappeared_shares: + log.msg(format="share SI=%(si_s)s shnum=%(shnum)s unexpectedly disappeared", + si_s=si_s, shnum=shnum, level=log.WEIRD) + self._leasedb.remove_deleted_share(si_a2b(si_s), shnum) + + recovered_sharesets = [set() for st in xrange(len(SHARETYPES))] + + def _delete_share(ign, key, value): + (si_s, shnum) = key + (used_space, sharetype) = value + storage_index = si_a2b(si_s) + d2 = defer.succeed(None) + def _mark_and_delete(ign): + self._leasedb.mark_share_as_going(storage_index, shnum) + return self.server.delete_share(storage_index, shnum) + d2.addCallback(_mark_and_delete) + def _deleted(ign): + self._leasedb.remove_deleted_share(storage_index, shnum) + + recovered_sharesets[sharetype].add(si_s) + + self.increment(rec, "actual-shares", 1) + self.increment(rec, "actual-sharebytes", used_space) + self.increment(rec, "actual-shares-" + SHARETYPES[sharetype], 1) + self.increment(rec, "actual-sharebytes-" + SHARETYPES[sharetype], used_space) + def _not_deleted(f): + log.err(format="accounting crawler could not delete share SI=%(si_s)s shnum=%(shnum)s", + si_s=si_s, shnum=shnum, failure=f, level=log.WEIRD) + try: + self._leasedb.mark_share_as_stable(storage_index, shnum) + except Exception, e: + log.err(e) + # discard the failure + d2.addCallbacks(_deleted, _not_deleted) + return d2 + + unleased_sharemap = self._leasedb.get_unleased_shares_for_prefix(prefix) + d = for_items(_delete_share, unleased_sharemap) + + def _inc_recovered_sharesets(ign): + self.increment(rec, "actual-buckets", sum([len(s) for s in recovered_sharesets])) + for st in SHARETYPES: + self.increment(rec, "actual-buckets-" + SHARETYPES[st], len(recovered_sharesets[st])) + d.addCallback(_inc_recovered_sharesets) + return d + + # these methods are for outside callers to use + + def set_expiration_policy(self, policy): + self._expiration_policy = policy + + def get_expiration_policy(self): + return self._expiration_policy + + def is_expiration_enabled(self): + return self._expiration_policy.is_enabled() + + def db_is_incomplete(self): + # don't bother looking at the sqlite database: it's certainly not + # complete. + return self.state["last-cycle-finished"] is None + + def increment(self, d, k, delta=1): + if k not in d: + d[k] = 0 + d[k] += delta + + def add_lease_age_to_histogram(self, age): + bin_interval = 24*60*60 + bin_number = int(age/bin_interval) + bin_start = bin_number * bin_interval + bin_end = bin_start + bin_interval + k = (bin_start, bin_end) + self.increment(self.state["cycle-to-date"]["lease-age-histogram"], k, 1) + + def convert_lease_age_histogram(self, lah): + # convert { (minage,maxage) : count } into [ (minage,maxage,count) ] + # since the former is not JSON-safe (JSON dictionaries must have + # string keys). + json_safe_lah = [] + for k in sorted(lah): + (minage,maxage) = k + json_safe_lah.append( (minage, maxage, lah[k]) ) + return json_safe_lah + + def add_initial_state(self): + # we fill ["cycle-to-date"] here (even though they will be reset in + # self.started_cycle) just in case someone grabs our state before we + # get started: unit tests do this + so_far = self.create_empty_cycle_dict() + self.state.setdefault("cycle-to-date", so_far) + # in case we upgrade the code while a cycle is in progress, update + # the keys individually + for k in so_far: + self.state["cycle-to-date"].setdefault(k, so_far[k]) + + def create_empty_cycle_dict(self): + recovered = self.create_empty_recovered_dict() + so_far = {"corrupt-shares": [], + "space-recovered": recovered, + "lease-age-histogram": {}, # (minage,maxage)->count + } + return so_far + + def create_empty_recovered_dict(self): + recovered = {} + for a in ("actual", "examined"): + for b in ("buckets", "shares", "diskbytes"): + recovered["%s-%s" % (a, b)] = 0 + for st in SHARETYPES: + recovered["%s-%s-%s" % (a, b, SHARETYPES[st])] = 0 + return recovered + + def started_cycle(self, cycle): + self.state["cycle-to-date"] = self.create_empty_cycle_dict() + + current_time = time.time() + self._expiration_policy.remove_expired_leases(self._leasedb, current_time) + + def finished_cycle(self, cycle): + # add to our history state, prune old history + h = {} + + start = self.state["current-cycle-start-time"] + now = time.time() + h["cycle-start-finish-times"] = (start, now) + ep = self.get_expiration_policy() + h["expiration-enabled"] = ep.is_enabled() + h["configured-expiration-mode"] = ep.get_parameters() + + s = self.state["cycle-to-date"] + + # state["lease-age-histogram"] is a dictionary (mapping + # (minage,maxage) tuple to a sharecount), but we report + # self.get_state()["lease-age-histogram"] as a list of + # (min,max,sharecount) tuples, because JSON can handle that better. + # We record the list-of-tuples form into the history for the same + # reason. + lah = self.convert_lease_age_histogram(s["lease-age-histogram"]) + h["lease-age-histogram"] = lah + h["corrupt-shares"] = s["corrupt-shares"][:] + # note: if ["shares-recovered"] ever acquires an internal dict, this + # copy() needs to become a deepcopy + h["space-recovered"] = s["space-recovered"].copy() + + self._leasedb.add_history_entry(cycle, h) + + def get_state(self): + """In addition to the crawler state described in + ShareCrawler.get_state(), I return the following keys which are + specific to the lease-checker/expirer. Note that the non-history keys + (with 'cycle' in their names) are only present if a cycle is currently + running. If the crawler is between cycles, it is appropriate to show + the latest item in the 'history' key instead. Also note that each + history item has all the data in the 'cycle-to-date' value, plus + cycle-start-finish-times. + + cycle-to-date: + expiration-enabled + configured-expiration-mode + lease-age-histogram (list of (minage,maxage,sharecount) tuples) + corrupt-shares (list of (si_b32,shnum) tuples, minimal verification) + space-recovered + + estimated-remaining-cycle: + # Values may be None if not enough data has been gathered to + # produce an estimate. + space-recovered + + estimated-current-cycle: + # cycle-to-date plus estimated-remaining. Values may be None if + # not enough data has been gathered to produce an estimate. + space-recovered + + history: maps cyclenum to a dict with the following keys: + cycle-start-finish-times + expiration-enabled + configured-expiration-mode + lease-age-histogram + corrupt-shares + space-recovered + + The 'space-recovered' structure is a dictionary with the following + keys: + # 'examined' is what was looked at + examined-buckets, examined-buckets-$SHARETYPE + examined-shares, examined-shares-$SHARETYPE + examined-diskbytes, examined-diskbytes-$SHARETYPE + + # 'actual' is what was deleted + actual-buckets, actual-buckets-$SHARETYPE + actual-shares, actual-shares-$SHARETYPE + actual-diskbytes, actual-diskbytes-$SHARETYPE + + Note that the preferred terminology has changed since these keys + were defined; "buckets" refers to what are now called sharesets, + and "diskbytes" refers to bytes of used space on the storage backend, + which is not necessarily the disk backend. + + The 'original-*' and 'configured-*' keys that were populated in + pre-leasedb versions are no longer supported. + The 'leases-per-share-histogram' is also no longer supported. + """ + progress = self.get_progress() + + state = ShareCrawler.get_state(self) # does a shallow copy + state["history"] = self._leasedb.get_history() + + if not progress["cycle-in-progress"]: + del state["cycle-to-date"] + return state + + so_far = state["cycle-to-date"].copy() + state["cycle-to-date"] = so_far + + lah = so_far["lease-age-histogram"] + so_far["lease-age-histogram"] = self.convert_lease_age_histogram(lah) + so_far["expiration-enabled"] = self._expiration_policy.is_enabled() + so_far["configured-expiration-mode"] = self._expiration_policy.get_parameters() + + so_far_sr = so_far["space-recovered"] + remaining_sr = {} + remaining = {"space-recovered": remaining_sr} + cycle_sr = {} + cycle = {"space-recovered": cycle_sr} + + if progress["cycle-complete-percentage"] > 0.0: + pc = progress["cycle-complete-percentage"] / 100.0 + m = (1-pc)/pc + for a in ("actual", "examined"): + for b in ("buckets", "shares", "diskbytes"): + k = "%s-%s" % (a, b) + remaining_sr[k] = m * so_far_sr[k] + cycle_sr[k] = so_far_sr[k] + remaining_sr[k] + for st in SHARETYPES: + k = "%s-%s-%s" % (a, b, SHARETYPES[st]) + remaining_sr[k] = m * so_far_sr[k] + cycle_sr[k] = so_far_sr[k] + remaining_sr[k] + else: + for a in ("actual", "examined"): + for b in ("buckets", "shares", "diskbytes"): + k = "%s-%s" % (a, b) + remaining_sr[k] = None + cycle_sr[k] = None + for st in SHARETYPES: + k = "%s-%s-%s" % (a, b, SHARETYPES[st]) + remaining_sr[k] = None + cycle_sr[k] = None + + state["estimated-remaining-cycle"] = remaining + state["estimated-current-cycle"] = cycle + return state diff --git a/src/allmydata/storage/expiration.py b/src/allmydata/storage/expiration.py new file mode 100644 index 00000000..6f7cd04c --- /dev/null +++ b/src/allmydata/storage/expiration.py @@ -0,0 +1,71 @@ + +import time +from types import NoneType + +from allmydata.util.assertutil import precondition +from allmydata.util import time_format +from allmydata.web.common import abbreviate_time + + +class ExpirationPolicy(object): + def __init__(self, enabled=False, mode="age", override_lease_duration=None, + cutoff_date=None): + precondition(isinstance(enabled, bool), enabled=enabled) + precondition(mode in ("age", "cutoff-date"), + "GC mode %r must be 'age' or 'cutoff-date'" % (mode,)) + precondition(isinstance(override_lease_duration, (int, NoneType)), + override_lease_duration=override_lease_duration) + precondition(isinstance(cutoff_date, int) or (mode != "cutoff-date" and cutoff_date is None), + cutoff_date=cutoff_date) + + self._enabled = enabled + self._mode = mode + self._override_lease_duration = override_lease_duration + self._cutoff_date = cutoff_date + + def remove_expired_leases(self, leasedb, current_time): + if not self._enabled: + return + + if self._mode == "age": + if self._override_lease_duration is not None: + leasedb.remove_leases_by_renewal_time(current_time - self._override_lease_duration) + else: + leasedb.remove_leases_by_expiration_time(current_time) + else: + # self._mode == "cutoff-date" + leasedb.remove_leases_by_renewal_time(self._cutoff_date) + + def get_parameters(self): + """ + Return the parameters as represented in the "configured-expiration-mode" field + of a history entry. + """ + return (self._mode, + self._override_lease_duration, + self._cutoff_date, + self._enabled and ("mutable", "immutable") or ()) + + def is_enabled(self): + return self._enabled + + def describe_enabled(self): + if self.is_enabled(): + return "Enabled: expired leases will be removed" + else: + return "Disabled: scan-only mode, no leases will be removed" + + def describe_expiration(self): + if self._mode == "age": + if self._override_lease_duration is None: + return ("Leases will expire naturally, probably 31 days after " + "creation or renewal.") + else: + return ("Leases created or last renewed more than %s ago " + "will be considered expired." + % abbreviate_time(self._override_lease_duration)) + else: + localizedutcdate = time.strftime("%d-%b-%Y", time.gmtime(self._cutoff_date)) + isoutcdate = time_format.iso_utc_date(self._cutoff_date) + return ("Leases created or last renewed before %s (%s) UTC " + "will be considered expired." % (isoutcdate, localizedutcdate)) diff --git a/src/allmydata/storage/leasedb.py b/src/allmydata/storage/leasedb.py new file mode 100644 index 00000000..bdd45ff5 --- /dev/null +++ b/src/allmydata/storage/leasedb.py @@ -0,0 +1,360 @@ + +import time, simplejson + +from allmydata.util.assertutil import _assert +from allmydata.util import dbutil +from allmydata.storage.common import si_b2a + + +class NonExistentShareError(Exception): + def __init__(self, si_s, shnum): + Exception.__init__(self, si_s, shnum) + self.si_s = si_s + self.shnum = shnum + + def __str__(self): + return "can't find SI=%r shnum=%r in `shares` table" % (self.si_s, self.shnum) + + +class LeaseInfo(object): + def __init__(self, storage_index, shnum, owner_num, renewal_time, expiration_time): + self.storage_index = storage_index + self.shnum = shnum + self.owner_num = owner_num + self.renewal_time = renewal_time + self.expiration_time = expiration_time + + +def int_or_none(s): + if s is None: + return s + return int(s) + + +SHARETYPE_IMMUTABLE = 0 +SHARETYPE_MUTABLE = 1 +SHARETYPE_CORRUPTED = 2 +SHARETYPE_UNKNOWN = 3 + +SHARETYPES = { SHARETYPE_IMMUTABLE: 'immutable', + SHARETYPE_MUTABLE: 'mutable', + SHARETYPE_CORRUPTED: 'corrupted', + SHARETYPE_UNKNOWN: 'unknown' } + +STATE_COMING = 0 +STATE_STABLE = 1 +STATE_GOING = 2 + + +LEASE_SCHEMA_V1 = """ +CREATE TABLE `version` +( + version INTEGER -- contains one row, set to 1 +); + +CREATE TABLE `shares` +( + `storage_index` VARCHAR(26) not null, + `shnum` INTEGER not null, + `prefix` VARCHAR(2) not null, + `backend_key` VARCHAR, -- not used by current backends; NULL means '$prefix/$storage_index/$shnum' + `used_space` INTEGER not null, + `sharetype` INTEGER not null, -- SHARETYPE_* + `state` INTEGER not null, -- STATE_* + PRIMARY KEY (`storage_index`, `shnum`) +); + +CREATE INDEX `prefix` ON `shares` (`prefix`); +-- CREATE UNIQUE INDEX `share_id` ON `shares` (`storage_index`,`shnum`); + +CREATE TABLE `leases` +( + `id` INTEGER PRIMARY KEY AUTOINCREMENT, + `storage_index` VARCHAR(26) not null, + `shnum` INTEGER not null, + `account_id` INTEGER not null, + `renewal_time` INTEGER not null, -- duration is implicit: expiration-renewal + `expiration_time` INTEGER, -- seconds since epoch; NULL means the end of time + FOREIGN KEY (`storage_index`, `shnum`) REFERENCES `shares` (`storage_index`, `shnum`), + FOREIGN KEY (`account_id`) REFERENCES `accounts` (`id`) +); + +CREATE INDEX `account_id` ON `leases` (`account_id`); +CREATE INDEX `expiration_time` ON `leases` (`expiration_time`); + +CREATE TABLE accounts +( + `id` INTEGER PRIMARY KEY AUTOINCREMENT, + `pubkey_vs` VARCHAR(52), + `creation_time` INTEGER +); +CREATE UNIQUE INDEX `pubkey_vs` ON `accounts` (`pubkey_vs`); + +CREATE TABLE account_attributes +( + `id` INTEGER PRIMARY KEY AUTOINCREMENT, + `account_id` INTEGER, + `name` VARCHAR(20), + `value` VARCHAR(20) -- actually anything: usually string, unicode, integer +); +CREATE UNIQUE INDEX `account_attr` ON `account_attributes` (`account_id`, `name`); + +INSERT INTO `accounts` VALUES (0, "anonymous", 0); +INSERT INTO `accounts` VALUES (1, "starter", 0); + +CREATE TABLE crawler_history +( + `cycle` INTEGER, + `json` TEXT +); +CREATE UNIQUE INDEX `cycle` ON `crawler_history` (`cycle`); +""" + +DAY = 24*60*60 +MONTH = 30*DAY + +class LeaseDB: + ANONYMOUS_ACCOUNTID = 0 + STARTER_LEASE_ACCOUNTID = 1 + STARTER_LEASE_DURATION = 2*MONTH + + def __init__(self, dbfile): + (self._sqlite, + self._db) = dbutil.get_db(dbfile, create_version=(LEASE_SCHEMA_V1, 1)) + self._cursor = self._db.cursor() + self.debug = False + self.retained_history_entries = 10 + + # share management + + def get_shares_for_prefix(self, prefix): + """ + Returns a dict mapping (si_s, shnum) pairs to (used_space, sharetype) pairs. + """ + self._cursor.execute("SELECT `storage_index`,`shnum`, `used_space`, `sharetype`" + " FROM `shares`" + " WHERE `prefix` == ?", + (prefix,)) + db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype))) + for (si_s, shnum, used_space, sharetype) in self._cursor.fetchall()]) + return db_sharemap + + def add_new_share(self, storage_index, shnum, used_space, sharetype): + si_s = si_b2a(storage_index) + prefix = si_s[:2] + if self.debug: print "ADD_NEW_SHARE", prefix, si_s, shnum, used_space, sharetype + backend_key = None + # This needs to be an INSERT OR REPLACE because it is possible for add_new_share + # to be called when this share is already in the database (but not on disk). + self._cursor.execute("INSERT OR REPLACE INTO `shares`" + " VALUES (?,?,?,?,?,?,?)", + (si_s, shnum, prefix, backend_key, used_space, sharetype, STATE_COMING)) + + def add_starter_lease(self, storage_index, shnum): + si_s = si_b2a(storage_index) + if self.debug: print "ADD_STARTER_LEASE", si_s, shnum + self._dirty = True + renewal_time = time.time() + self._cursor.execute("INSERT INTO `leases`" + " VALUES (?,?,?,?,?,?)", + (None, si_s, shnum, self.STARTER_LEASE_ACCOUNTID, + int(renewal_time), int(renewal_time + self.STARTER_LEASE_DURATION))) + self._db.commit() + + def mark_share_as_stable(self, storage_index, shnum, used_space=None, backend_key=None): + """ + Call this method after adding a share to backend storage. + """ + si_s = si_b2a(storage_index) + if self.debug: print "MARK_SHARE_AS_STABLE", si_s, shnum, used_space + self._dirty = True + if used_space is not None: + self._cursor.execute("UPDATE `shares` SET `state`=?, `used_space`=?, `backend_key`=?" + " WHERE `storage_index`=? AND `shnum`=? AND `state`!=?", + (STATE_STABLE, used_space, backend_key, si_s, shnum, STATE_GOING)) + else: + _assert(backend_key is None, backend_key=backend_key) + self._cursor.execute("UPDATE `shares` SET `state`=?" + " WHERE `storage_index`=? AND `shnum`=? AND `state`!=?", + (STATE_STABLE, si_s, shnum, STATE_GOING)) + self._db.commit() + if self._cursor.rowcount < 1: + raise NonExistentShareError(si_s, shnum) + + def mark_share_as_going(self, storage_index, shnum): + """ + Call this method and commit before deleting a share from backend storage, + then call remove_deleted_share. + """ + si_s = si_b2a(storage_index) + if self.debug: print "MARK_SHARE_AS_GOING", si_s, shnum + self._cursor.execute("UPDATE `shares` SET `state`=?" + " WHERE `storage_index`=? AND `shnum`=? AND `state`!=?", + (STATE_GOING, si_s, shnum, STATE_COMING)) + self._db.commit() + if self._cursor.rowcount < 1: + raise NonExistentShareError(si_s, shnum) + + def remove_deleted_share(self, storage_index, shnum): + si_s = si_b2a(storage_index) + if self.debug: print "REMOVE_DELETED_SHARE", si_s, shnum + # delete leases first to maintain integrity constraint + self._cursor.execute("DELETE FROM `leases`" + " WHERE `storage_index`=? AND `shnum`=?", + (si_s, shnum)) + try: + self._cursor.execute("DELETE FROM `shares`" + " WHERE `storage_index`=? AND `shnum`=?", + (si_s, shnum)) + except Exception: + self._db.rollback() # roll back the lease deletion + raise + else: + self._db.commit() + + def change_share_space(self, storage_index, shnum, used_space): + si_s = si_b2a(storage_index) + if self.debug: print "CHANGE_SHARE_SPACE", si_s, shnum, used_space + self._cursor.execute("UPDATE `shares` SET `used_space`=?" + " WHERE `storage_index`=? AND `shnum`=?", + (used_space, si_s, shnum)) + self._db.commit() + if self._cursor.rowcount < 1: + raise NonExistentShareError(si_s, shnum) + + # lease management + + def add_or_renew_leases(self, storage_index, shnum, ownerid, + renewal_time, expiration_time): + """ + shnum=None means renew leases on all shares; do nothing if there are no shares for this storage_index in the `shares` table. + + Raises NonExistentShareError if a specific shnum is given and that share does not exist in the `shares` table. + """ + si_s = si_b2a(storage_index) + if self.debug: print "ADD_OR_RENEW_LEASES", si_s, shnum, ownerid, renewal_time, expiration_time + if shnum is None: + self._cursor.execute("SELECT `storage_index`, `shnum` FROM `shares`" + " WHERE `storage_index`=?", + (si_s,)) + rows = self._cursor.fetchall() + else: + self._cursor.execute("SELECT `storage_index`, `shnum` FROM `shares`" + " WHERE `storage_index`=? AND `shnum`=?", + (si_s, shnum)) + rows = self._cursor.fetchall() + if not rows: + raise NonExistentShareError(si_s, shnum) + + for (found_si_s, found_shnum) in rows: + _assert(si_s == found_si_s, si_s=si_s, found_si_s=found_si_s) + # XXX can we simplify this by using INSERT OR REPLACE? + self._cursor.execute("SELECT `id` FROM `leases`" + " WHERE `storage_index`=? AND `shnum`=? AND `account_id`=?", + (si_s, found_shnum, ownerid)) + row = self._cursor.fetchone() + if row: + # Note that unlike the pre-LeaseDB code, this allows leases to be backdated. + # There is currently no way for a client to specify lease duration, and so + # backdating can only happen in normal operation if there is a timequake on + # the server and time goes backward by more than 31 days. This needs to be + # revisited for ticket #1816, which would allow the client to request a lease + # duration. + leaseid = row[0] + self._cursor.execute("UPDATE `leases` SET `renewal_time`=?, `expiration_time`=?" + " WHERE `id`=?", + (renewal_time, expiration_time, leaseid)) + else: + self._cursor.execute("INSERT INTO `leases` VALUES (?,?,?,?,?,?)", + (None, si_s, found_shnum, ownerid, renewal_time, expiration_time)) + self._db.commit() + + def get_leases(self, storage_index, ownerid): + si_s = si_b2a(storage_index) + self._cursor.execute("SELECT `shnum`, `account_id`, `renewal_time`, `expiration_time` FROM `leases`" + " WHERE `storage_index`=? AND `account_id`=?", + (si_s, ownerid)) + rows = self._cursor.fetchall() + def _to_LeaseInfo(row): + (shnum, account_id, renewal_time, expiration_time) = tuple(row) + return LeaseInfo(storage_index, int(shnum), int(account_id), float(renewal_time), float(expiration_time)) + return map(_to_LeaseInfo, rows) + + def get_lease_ages(self, storage_index, shnum, now): + si_s = si_b2a(storage_index) + self._cursor.execute("SELECT `renewal_time` FROM `leases`" + " WHERE `storage_index`=? AND `shnum`=?", + (si_s, shnum)) + rows = self._cursor.fetchall() + def _to_age(row): + return now - float(row[0]) + return map(_to_age, rows) + + def get_unleased_shares_for_prefix(self, prefix): + if self.debug: print "GET_UNLEASED_SHARES_FOR_PREFIX", prefix + # This would be simpler, but it doesn't work because 'NOT IN' doesn't support multiple columns. + #query = ("SELECT `storage_index`, `shnum`, `used_space`, `sharetype` FROM `shares`" + # " WHERE (`storage_index`, `shnum`) NOT IN (SELECT DISTINCT `storage_index`, `shnum` FROM `leases`)") + + # This "negative join" should be equivalent. + self._cursor.execute("SELECT DISTINCT s.storage_index, s.shnum, s.used_space, s.sharetype FROM `shares` s LEFT JOIN `leases` l" + " ON (s.storage_index = l.storage_index AND s.shnum = l.shnum)" + " WHERE s.prefix = ? AND l.storage_index IS NULL", + (prefix,)) + db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype))) + for (si_s, shnum, used_space, sharetype) in self._cursor.fetchall()]) + return db_sharemap + + def remove_leases_by_renewal_time(self, renewal_cutoff_time): + if self.debug: print "REMOVE_LEASES_BY_RENEWAL_TIME", renewal_cutoff_time + self._cursor.execute("DELETE FROM `leases` WHERE `renewal_time` < ?", + (renewal_cutoff_time,)) + self._db.commit() + + def remove_leases_by_expiration_time(self, expiration_cutoff_time): + if self.debug: print "REMOVE_LEASES_BY_EXPIRATION_TIME", expiration_cutoff_time + self._cursor.execute("DELETE FROM `leases` WHERE `expiration_time` IS NOT NULL AND `expiration_time` < ?", + (expiration_cutoff_time,)) + self._db.commit() + + # history + + def add_history_entry(self, cycle, entry): + if self.debug: print "ADD_HISTORY_ENTRY", cycle, entry + json = simplejson.dumps(entry) + self._cursor.execute("SELECT `cycle` FROM `crawler_history`") + rows = self._cursor.fetchall() + if len(rows) >= self.retained_history_entries: + first_cycle_to_retain = list(sorted(rows))[-(self.retained_history_entries - 1)][0] + self._cursor.execute("DELETE FROM `crawler_history` WHERE `cycle` < ?", + (first_cycle_to_retain,)) + self._db.commit() + + try: + self._cursor.execute("INSERT OR REPLACE INTO `crawler_history` VALUES (?,?)", + (cycle, json)) + except Exception: + self._db.rollback() # roll back the deletion of unretained entries + raise + else: + self._db.commit() + + def get_history(self): + self._cursor.execute("SELECT `cycle`,`json` FROM `crawler_history`") + rows = self._cursor.fetchall() + decoded = [(row[0], simplejson.loads(row[1])) for row in rows] + return dict(decoded) + + def get_account_creation_time(self, owner_num): + self._cursor.execute("SELECT `creation_time` from `accounts`" + " WHERE `id`=?", + (owner_num,)) + row = self._cursor.fetchone() + if row: + return row[0] + return None + + def get_all_accounts(self): + self._cursor.execute("SELECT `id`,`pubkey_vs`" + " FROM `accounts` ORDER BY `id` ASC") + return self._cursor.fetchall() -- 2.45.2