From: David-Sarah Hopwood <david-sarah@jacaranda.org>
Date: Wed, 12 Dec 2012 05:17:11 +0000 (+0000)
Subject: Add new files for leasedb.
X-Git-Url: https://git.rkrishnan.org/%5B/frontends/%22news.html/%22doc.html/nxhtml.html?a=commitdiff_plain;h=54685c532238fd8f48c02672c8e3fd98001f9d17;p=tahoe-lafs%2Ftahoe-lafs.git

Add new files for leasedb.

Authors: Brian Warner <warner@lothar.com> and David-Sarah Hopwood
Signed-off-by: David-Sarah Hopwood <david-sarah@jacaranda.org>
---

diff --git a/src/allmydata/storage/account.py b/src/allmydata/storage/account.py
new file mode 100644
index 00000000..bf82e274
--- /dev/null
+++ b/src/allmydata/storage/account.py
@@ -0,0 +1,198 @@
+
+"""
+This file contains the client-facing interface for manipulating shares. It
+implements RIStorageServer, and contains an embedded owner id which is used
+for all operations that touch leases. Initially, clients will receive a
+special 'anonymous' instance of this class with ownerid=0. Later, when the
+FURLification dance is established, each client will get a different instance
+(with a dedicated ownerid).
+"""
+
+import time
+
+from foolscap.api import Referenceable
+
+from zope.interface import implements
+from allmydata.interfaces import RIStorageServer
+
+from allmydata.storage.leasedb import int_or_none
+from allmydata.storage.common import si_b2a
+
+
+class Account(Referenceable):
+    implements(RIStorageServer)
+
+    def __init__(self, owner_num, pubkey_vs, server, leasedb):
+        self.owner_num = owner_num
+        self.server = server
+        self._leasedb = leasedb
+        # for static accounts ("starter", "anonymous"), pubkey_vs is None,
+        # and the "connected" attributes are unused
+        self.pubkey_vs = pubkey_vs
+        self.connected = False
+        self.connected_since = None
+        self.connection = None
+        self.debug = False
+
+    def is_static(self):
+        return self.owner_num in (0,1)
+
+    # these methods are called by StorageServer
+
+    def get_owner_num(self):
+        return self.owner_num
+
+    def get_renewal_and_expiration_times(self):
+        renewal_time = time.time()
+        return (renewal_time, renewal_time + 31*24*60*60)
+
+    # immutable.BucketWriter.close() does:
+    #  add_share(), add_or_renew_lease(), mark_share_as_stable()
+
+    # mutable writev() does:
+    #  deleted shares: mark_share_as_going(), remove_share_and_leases()
+    #  new shares: add_share(), add_or_renew_lease(), mark_share_as_stable()
+    #  changed shares: change_share_space(), add_or_renew_lease()
+
+    def add_share(self, storage_index, shnum, used_space, sharetype):
+        if self.debug: print "ADD_SHARE", si_b2a(storage_index), shnum, used_space, sharetype
+        self._leasedb.add_new_share(storage_index, shnum, used_space, sharetype)
+
+    def add_or_renew_default_lease(self, storage_index, shnum):
+        renewal_time, expiration_time = self.get_renewal_and_expiration_times()
+        return self.add_or_renew_lease(storage_index, shnum, renewal_time, expiration_time)
+
+    def add_or_renew_lease(self, storage_index, shnum, renewal_time, expiration_time):
+        if self.debug: print "ADD_OR_RENEW_LEASE", si_b2a(storage_index), shnum
+        self._leasedb.add_or_renew_leases(storage_index, shnum, self.owner_num,
+                                          renewal_time, expiration_time)
+
+    def change_share_space(self, storage_index, shnum, used_space):
+        if self.debug: print "CHANGE_SHARE_SPACE", si_b2a(storage_index), shnum, used_space
+        self._leasedb.change_share_space(storage_index, shnum, used_space)
+
+    def mark_share_as_stable(self, storage_index, shnum, used_space):
+        if self.debug: print "MARK_SHARE_AS_STABLE", si_b2a(storage_index), shnum, used_space
+        self._leasedb.mark_share_as_stable(storage_index, shnum, used_space)
+
+    def mark_share_as_going(self, storage_index, shnum):
+        if self.debug: print "MARK_SHARE_AS_GOING", si_b2a(storage_index), shnum
+        self._leasedb.mark_share_as_going(storage_index, shnum)
+
+    def remove_share_and_leases(self, storage_index, shnum):
+        if self.debug: print "REMOVE_SHARE_AND_LEASES", si_b2a(storage_index), shnum
+        self._leasedb.remove_deleted_share(storage_index, shnum)
+
+    # remote_add_lease() and remote_renew_lease() do this
+    def add_lease_for_bucket(self, storage_index):
+        if self.debug: print "ADD_LEASE_FOR_BUCKET", si_b2a(storage_index)
+        renewal_time, expiration_time = self.get_renewal_and_expiration_times()
+        self._leasedb.add_or_renew_leases(storage_index, None,
+                                          self.owner_num, renewal_time, expiration_time)
+
+    # The following RIStorageServer methods are called by remote clients
+
+    def remote_get_version(self):
+        return self.server.client_get_version(self)
+
+    # all other RIStorageServer methods should pass through to self.server
+    # but (except for remote_advise_corrupt_share) add the account as a final
+    # argument.
+
+    def remote_allocate_buckets(self, storage_index, renew_secret, cancel_secret,
+                                sharenums, allocated_size, canary):
+        if self.debug: print "REMOTE_ALLOCATE_BUCKETS", si_b2a(storage_index)
+        return self.server.client_allocate_buckets(storage_index,
+                                                   sharenums, allocated_size,
+                                                   canary, self)
+
+    def remote_add_lease(self, storage_index, renew_secret, cancel_secret):
+        if self.debug: print "REMOTE_ADD_LEASE", si_b2a(storage_index)
+        self.add_lease_for_bucket(storage_index)
+        return None
+
+    def remote_renew_lease(self, storage_index, renew_secret):
+        self.add_lease_for_bucket(storage_index)
+        return None
+
+    def remote_get_buckets(self, storage_index):
+        return self.server.client_get_buckets(storage_index)
+
+    def remote_slot_testv_and_readv_and_writev(self, storage_index, secrets,
+                                               test_and_write_vectors, read_vector):
+        write_enabler = secrets[0]
+        return self.server.client_slot_testv_and_readv_and_writev(
+            storage_index, write_enabler, test_and_write_vectors, read_vector, self)
+
+    def remote_slot_readv(self, storage_index, shares, readv):
+        return self.server.client_slot_readv(storage_index, shares, readv, self)
+
+    def remote_advise_corrupt_share(self, share_type, storage_index, shnum, reason):
+        # this doesn't use the account.
+        return self.server.client_advise_corrupt_share(
+            share_type, storage_index, shnum, reason)
+
+    def get_account_creation_time(self):
+        return self._leasedb.get_account_creation_time(self.owner_num)
+
+    def get_id(self):
+        return self.pubkey_vs
+
+    def get_leases(self, storage_index):
+        return self._leasedb.get_leases(storage_index, self.owner_num)
+
+    def connection_from(self, rx):
+        self.connected = True
+        self.connected_since = time.time()
+        self.connection = rx
+        #rhost = rx.getPeer()
+        #from twisted.internet import address
+        #if isinstance(rhost, address.IPv4Address):
+        #    rhost_s = "%s:%d" % (rhost.host, rhost.port)
+        #elif "LoopbackAddress" in str(rhost):
+        #    rhost_s = "loopback"
+        #else:
+        #    rhost_s = str(rhost)
+        #self.set_account_attribute("last_connected_from", rhost_s)
+        rx.notifyOnDisconnect(self._disconnected)
+
+    def _disconnected(self):
+        self.connected = False
+        self.connected_since = None
+        self.connection = None
+        #self.set_account_attribute("last_seen", int(time.time()))
+        self.disconnected_since = None
+
+    def get_connection_status(self):
+        # starts as: connected=False, connected_since=None,
+        #            last_connected_from=None, last_seen=None
+        # while connected: connected=True, connected_since=START,
+        #                  last_connected_from=HOST, last_seen=IGNOREME
+        # after disconnect: connected=False, connected_since=None,
+        #                   last_connected_from=HOST, last_seen=STOP
+
+        #last_seen = int_or_none(self.get_account_attribute("last_seen"))
+        #last_connected_from = self.get_account_attribute("last_connected_from")
+        created = int_or_none(self.get_account_creation_time())
+
+        return {"connected": self.connected,
+                "connected_since": self.connected_since,
+                #"last_connected_from": last_connected_from,
+                #"last_seen": last_seen,
+                "created": created,
+                }
+
+    def get_stats(self):
+        return self.server.get_stats()
+
+    def get_accounting_crawler(self):
+        return self.server.get_accounting_crawler()
+
+    def get_expiration_policy(self):
+        return self.server.get_expiration_policy()
+
+    def get_bucket_counter(self):
+        return self.server.get_bucket_counter()
+
+    def get_nodeid(self):
+        return self.server.get_nodeid()
diff --git a/src/allmydata/storage/accountant.py b/src/allmydata/storage/accountant.py
new file mode 100644
index 00000000..6521ee4d
--- /dev/null
+++ b/src/allmydata/storage/accountant.py
@@ -0,0 +1,61 @@
+
+"""
+This file contains the cross-account management code. It creates per-client
+Account objects for the FURLification dance, as well as the 'anonymous
+account for use until the server admin decides to make accounting mandatory.
+It also provides usage statistics and reports for the status UI. This will
+also implement the backend of the control UI (once we figure out how to
+express that: maybe a CLI command, or tahoe.cfg settings, or a web frontend),
+for things like enabling/disabling accounts and setting quotas.
+
+The name 'accountant.py' could be better, preferably something that doesn't
+share a prefix with 'account.py' so my tab-autocomplete will work nicely.
+"""
+
+import weakref
+
+from twisted.application import service
+
+from allmydata.storage.leasedb import LeaseDB
+from allmydata.storage.accounting_crawler import AccountingCrawler
+from allmydata.storage.account import Account
+
+
+class Accountant(service.MultiService):
+    def __init__(self, storage_server, dbfile, statefile):
+        service.MultiService.__init__(self)
+        self.storage_server = storage_server
+        self._leasedb = LeaseDB(dbfile)
+        self._active_accounts = weakref.WeakValueDictionary()
+        self._anonymous_account = Account(LeaseDB.ANONYMOUS_ACCOUNTID, None,
+                                          self.storage_server, self._leasedb)
+        self._starter_account = Account(LeaseDB.STARTER_LEASE_ACCOUNTID, None,
+                                        self.storage_server, self._leasedb)
+
+        crawler = AccountingCrawler(storage_server, statefile, self._leasedb)
+        self._accounting_crawler = crawler
+        crawler.setServiceParent(self)
+
+    def get_leasedb(self):
+        return self._leasedb
+
+    def set_expiration_policy(self, policy):
+        self._accounting_crawler.set_expiration_policy(policy)
+
+    def get_anonymous_account(self):
+        return self._anonymous_account
+
+    def get_starter_account(self):
+        return self._starter_account
+
+    def get_accounting_crawler(self):
+        return self._accounting_crawler
+
+    # methods used by admin interfaces
+    def get_all_accounts(self):
+        for ownerid, pubkey_vs in self._leasedb.get_all_accounts():
+            if pubkey_vs in self._active_accounts:
+                yield self._active_accounts[pubkey_vs]
+            else:
+                yield Account(ownerid, pubkey_vs,
+                              self.storage_server, self._leasedb)
diff --git a/src/allmydata/storage/accounting_crawler.py b/src/allmydata/storage/accounting_crawler.py
new file mode 100644
index 00000000..493dc54b
--- /dev/null
+++ b/src/allmydata/storage/accounting_crawler.py
@@ -0,0 +1,342 @@
+
+import os, time
+
+from twisted.internet import defer
+
+from allmydata.util.deferredutil import for_items
+from allmydata.util.fileutil import get_used_space
+from allmydata.util import log
+from allmydata.storage.crawler import ShareCrawler
+from allmydata.storage.common import si_a2b
+from allmydata.storage.leasedb import SHARETYPES, SHARETYPE_UNKNOWN
+
+
+class AccountingCrawler(ShareCrawler):
+    """
+    I perform the following functions:
+    - Remove leases that are past their expiration time.
+    - Delete objects containing unleased shares.
+    - Discover shares that have been manually added to storage.
+    - Discover shares that are present when a storage server is upgraded from
+      a pre-leasedb version, and give them "starter leases".
+    - Recover from a situation where the leasedb is lost or detectably
+      corrupted. This is handled in the same way as upgrading.
+    - Detect shares that have unexpectedly disappeared from storage.
+    """
+
+    slow_start = 600 # don't start crawling for 10 minutes after startup
+    minimum_cycle_time = 12*60*60 # not more than twice per day
+
+    def __init__(self, server, statefile, leasedb):
+        ShareCrawler.__init__(self, server, statefile)
+        self._leasedb = leasedb
+
+    def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
+        # assume that we can list every prefixdir in this prefix quickly.
+        # Otherwise we have to retain more state between timeslices.
+
+        # we define "shareid" as (SI string, shnum)
+        disk_shares = set() # shareid
+        for si_s in buckets:
+            bucketdir = os.path.join(prefixdir, si_s)
+            for sharefile in os.listdir(bucketdir):
+                try:
+                    shnum = int(sharefile)
+                except ValueError:
+                    continue # non-numeric means not a sharefile
+                shareid = (si_s, shnum)
+                disk_shares.add(shareid)
+
+        # now check the database for everything in this prefix
+        db_sharemap = self._leasedb.get_shares_for_prefix(prefix)
+        db_shares = set(db_sharemap)
+
+        rec = self.state["cycle-to-date"]["space-recovered"]
+        examined_sharesets = [set() for st in xrange(len(SHARETYPES))]
+
+        # The lease crawler used to calculate the lease age histogram while
+        # crawling shares, and tests currently rely on that, but it would be
+        # more efficient to maintain the histogram as leases are added,
+        # updated, and removed.
+        for key, value in db_sharemap.iteritems():
+            (si_s, shnum) = key
+            (used_space, sharetype) = value
+
+            examined_sharesets[sharetype].add(si_s)
+
+            for age in self._leasedb.get_lease_ages(si_a2b(si_s), shnum, start_slice):
+                self.add_lease_age_to_histogram(age)
+
+            self.increment(rec, "examined-shares", 1)
+            self.increment(rec, "examined-sharebytes", used_space)
+            self.increment(rec, "examined-shares-" + SHARETYPES[sharetype], 1)
+            self.increment(rec, "examined-sharebytes-" + SHARETYPES[sharetype], used_space)
+
+        self.increment(rec, "examined-buckets", sum([len(s) for s in examined_sharesets]))
+        for st in SHARETYPES:
+            self.increment(rec, "examined-buckets-" + SHARETYPES[st], len(examined_sharesets[st]))
+
+        # add new shares to the DB
+        new_shares = disk_shares - db_shares
+        for (si_s, shnum) in new_shares:
+            sharefile = os.path.join(prefixdir, si_s, str(shnum))
+            used_space = get_used_space(sharefile)
+            # FIXME
+            sharetype = SHARETYPE_UNKNOWN
+            self._leasedb.add_new_share(si_a2b(si_s), shnum, used_space, sharetype)
+            self._leasedb.add_starter_lease(si_s, shnum)
+
+        # remove disappeared shares from DB
+        disappeared_shares = db_shares - disk_shares
+        for (si_s, shnum) in disappeared_shares:
+            log.msg(format="share SI=%(si_s)s shnum=%(shnum)s unexpectedly disappeared",
+                    si_s=si_s, shnum=shnum, level=log.WEIRD)
+            self._leasedb.remove_deleted_share(si_a2b(si_s), shnum)
+
+        recovered_sharesets = [set() for st in xrange(len(SHARETYPES))]
+
+        def _delete_share(ign, key, value):
+            (si_s, shnum) = key
+            (used_space, sharetype) = value
+            storage_index = si_a2b(si_s)
+            d2 = defer.succeed(None)
+            def _mark_and_delete(ign):
+                self._leasedb.mark_share_as_going(storage_index, shnum)
+                return self.server.delete_share(storage_index, shnum)
+            d2.addCallback(_mark_and_delete)
+            def _deleted(ign):
+                self._leasedb.remove_deleted_share(storage_index, shnum)
+
+                recovered_sharesets[sharetype].add(si_s)
+
+                self.increment(rec, "actual-shares", 1)
+                self.increment(rec, "actual-sharebytes", used_space)
+                self.increment(rec, "actual-shares-" + SHARETYPES[sharetype], 1)
+                self.increment(rec, "actual-sharebytes-" + SHARETYPES[sharetype], used_space)
+            def _not_deleted(f):
+                log.err(format="accounting crawler could not delete share SI=%(si_s)s shnum=%(shnum)s",
+                        si_s=si_s, shnum=shnum, failure=f, level=log.WEIRD)
+                try:
+                    self._leasedb.mark_share_as_stable(storage_index, shnum)
+                except Exception, e:
+                    log.err(e)
+                # discard the failure
+            d2.addCallbacks(_deleted, _not_deleted)
+            return d2
+
+        unleased_sharemap = self._leasedb.get_unleased_shares_for_prefix(prefix)
+        d = for_items(_delete_share, unleased_sharemap)
+
+        def _inc_recovered_sharesets(ign):
+            self.increment(rec, "actual-buckets", sum([len(s) for s in recovered_sharesets]))
+            for st in SHARETYPES:
+                self.increment(rec, "actual-buckets-" + SHARETYPES[st], len(recovered_sharesets[st]))
+        d.addCallback(_inc_recovered_sharesets)
+        return d
+
+    # these methods are for outside callers to use
+
+    def set_expiration_policy(self, policy):
+        self._expiration_policy = policy
+
+    def get_expiration_policy(self):
+        return self._expiration_policy
+
+    def is_expiration_enabled(self):
+        return self._expiration_policy.is_enabled()
+
+    def db_is_incomplete(self):
+        # don't bother looking at the sqlite database: it's certainly not
+        # complete.
+        return self.state["last-cycle-finished"] is None
+
+    def increment(self, d, k, delta=1):
+        if k not in d:
+            d[k] = 0
+        d[k] += delta
+
+    def add_lease_age_to_histogram(self, age):
+        bin_interval = 24*60*60
+        bin_number = int(age/bin_interval)
+        bin_start = bin_number * bin_interval
+        bin_end = bin_start + bin_interval
+        k = (bin_start, bin_end)
+        self.increment(self.state["cycle-to-date"]["lease-age-histogram"], k, 1)
+
+    def convert_lease_age_histogram(self, lah):
+        # convert { (minage,maxage) : count } into [ (minage,maxage,count) ]
+        # since the former is not JSON-safe (JSON dictionaries must have
+        # string keys).
+        json_safe_lah = []
+        for k in sorted(lah):
+            (minage,maxage) = k
+            json_safe_lah.append( (minage, maxage, lah[k]) )
+        return json_safe_lah
+
+    def add_initial_state(self):
+        # we fill ["cycle-to-date"] here (even though they will be reset in
+        # self.started_cycle) just in case someone grabs our state before we
+        # get started: unit tests do this
+        so_far = self.create_empty_cycle_dict()
+        self.state.setdefault("cycle-to-date", so_far)
+        # in case we upgrade the code while a cycle is in progress, update
+        # the keys individually
+        for k in so_far:
+            self.state["cycle-to-date"].setdefault(k, so_far[k])
+
+    def create_empty_cycle_dict(self):
+        recovered = self.create_empty_recovered_dict()
+        so_far = {"corrupt-shares": [],
+                  "space-recovered": recovered,
+                  "lease-age-histogram": {}, # (minage,maxage)->count
+                  }
+        return so_far
+
+    def create_empty_recovered_dict(self):
+        recovered = {}
+        for a in ("actual", "examined"):
+            for b in ("buckets", "shares", "diskbytes"):
+                recovered["%s-%s" % (a, b)] = 0
+                for st in SHARETYPES:
+                    recovered["%s-%s-%s" % (a, b, SHARETYPES[st])] = 0
+        return recovered
+
+    def started_cycle(self, cycle):
+        self.state["cycle-to-date"] = self.create_empty_cycle_dict()
+
+        current_time = time.time()
+        self._expiration_policy.remove_expired_leases(self._leasedb, current_time)
+
+    def finished_cycle(self, cycle):
+        # add to our history state, prune old history
+        h = {}
+
+        start = self.state["current-cycle-start-time"]
+        now = time.time()
+        h["cycle-start-finish-times"] = (start, now)
+        ep = self.get_expiration_policy()
+        h["expiration-enabled"] = ep.is_enabled()
+        h["configured-expiration-mode"] = ep.get_parameters()
+
+        s = self.state["cycle-to-date"]
+
+        # state["lease-age-histogram"] is a dictionary (mapping
+        # (minage,maxage) tuple to a sharecount), but we report
+        # self.get_state()["lease-age-histogram"] as a list of
+        # (min,max,sharecount) tuples, because JSON can handle that better.
+        # We record the list-of-tuples form into the history for the same
+        # reason.
+        lah = self.convert_lease_age_histogram(s["lease-age-histogram"])
+        h["lease-age-histogram"] = lah
+        h["corrupt-shares"] = s["corrupt-shares"][:]
+        # note: if ["shares-recovered"] ever acquires an internal dict, this
+        # copy() needs to become a deepcopy
+        h["space-recovered"] = s["space-recovered"].copy()
+
+        self._leasedb.add_history_entry(cycle, h)
+
+    def get_state(self):
+        """In addition to the crawler state described in
+        ShareCrawler.get_state(), I return the following keys which are
+        specific to the lease-checker/expirer. Note that the non-history keys
+        (with 'cycle' in their names) are only present if a cycle is currently
+        running. If the crawler is between cycles, it is appropriate to show
+        the latest item in the 'history' key instead. Also note that each
+        history item has all the data in the 'cycle-to-date' value, plus
+        cycle-start-finish-times.
+
+         cycle-to-date:
+          expiration-enabled
+          configured-expiration-mode
+          lease-age-histogram (list of (minage,maxage,sharecount) tuples)
+          corrupt-shares (list of (si_b32,shnum) tuples, minimal verification)
+          space-recovered
+
+         estimated-remaining-cycle:
+          # Values may be None if not enough data has been gathered to
+          # produce an estimate.
+          space-recovered
+
+         estimated-current-cycle:
+          # cycle-to-date plus estimated-remaining. Values may be None if
+          # not enough data has been gathered to produce an estimate.
+          space-recovered
+
+         history: maps cyclenum to a dict with the following keys:
+          cycle-start-finish-times
+          expiration-enabled
+          configured-expiration-mode
+          lease-age-histogram
+          corrupt-shares
+          space-recovered
+
+         The 'space-recovered' structure is a dictionary with the following
+         keys:
+          # 'examined' is what was looked at
+          examined-buckets,     examined-buckets-$SHARETYPE
+          examined-shares,      examined-shares-$SHARETYPE
+          examined-diskbytes,   examined-diskbytes-$SHARETYPE
+
+          # 'actual' is what was deleted
+          actual-buckets,       actual-buckets-$SHARETYPE
+          actual-shares,        actual-shares-$SHARETYPE
+          actual-diskbytes,     actual-diskbytes-$SHARETYPE
+
+        Note that the preferred terminology has changed since these keys
+        were defined; "buckets" refers to what are now called sharesets,
+        and "diskbytes" refers to bytes of used space on the storage backend,
+        which is not necessarily the disk backend.
+
+        The 'original-*' and 'configured-*' keys that were populated in
+        pre-leasedb versions are no longer supported.
+        The 'leases-per-share-histogram' is also no longer supported.
+        """
+        progress = self.get_progress()
+
+        state = ShareCrawler.get_state(self) # does a shallow copy
+        state["history"] = self._leasedb.get_history()
+
+        if not progress["cycle-in-progress"]:
+            del state["cycle-to-date"]
+            return state
+
+        so_far = state["cycle-to-date"].copy()
+        state["cycle-to-date"] = so_far
+
+        lah = so_far["lease-age-histogram"]
+        so_far["lease-age-histogram"] = self.convert_lease_age_histogram(lah)
+        so_far["expiration-enabled"] = self._expiration_policy.is_enabled()
+        so_far["configured-expiration-mode"] = self._expiration_policy.get_parameters()
+
+        so_far_sr = so_far["space-recovered"]
+        remaining_sr = {}
+        remaining = {"space-recovered": remaining_sr}
+        cycle_sr = {}
+        cycle = {"space-recovered": cycle_sr}
+
+        if progress["cycle-complete-percentage"] > 0.0:
+            pc = progress["cycle-complete-percentage"] / 100.0
+            m = (1-pc)/pc
+            for a in ("actual", "examined"):
+                for b in ("buckets", "shares", "diskbytes"):
+                    k = "%s-%s" % (a, b)
+                    remaining_sr[k] = m * so_far_sr[k]
+                    cycle_sr[k] = so_far_sr[k] + remaining_sr[k]
+                    for st in SHARETYPES:
+                        k = "%s-%s-%s" % (a, b, SHARETYPES[st])
+                        remaining_sr[k] = m * so_far_sr[k]
+                        cycle_sr[k] = so_far_sr[k] + remaining_sr[k]
+        else:
+            for a in ("actual", "examined"):
+                for b in ("buckets", "shares", "diskbytes"):
+                    k = "%s-%s" % (a, b)
+                    remaining_sr[k] = None
+                    cycle_sr[k] = None
+                    for st in SHARETYPES:
+                        k = "%s-%s-%s" % (a, b, SHARETYPES[st])
+                        remaining_sr[k] = None
+                        cycle_sr[k] = None
+
+        state["estimated-remaining-cycle"] = remaining
+        state["estimated-current-cycle"] = cycle
+        return state
diff --git a/src/allmydata/storage/expiration.py b/src/allmydata/storage/expiration.py
new file mode 100644
index 00000000..6f7cd04c
--- /dev/null
+++ b/src/allmydata/storage/expiration.py
@@ -0,0 +1,71 @@
+
+import time
+from types import NoneType
+
+from allmydata.util.assertutil import precondition
+from allmydata.util import time_format
+from allmydata.web.common import abbreviate_time
+
+
+class ExpirationPolicy(object):
+    def __init__(self, enabled=False, mode="age", override_lease_duration=None,
+                 cutoff_date=None):
+        precondition(isinstance(enabled, bool), enabled=enabled)
+        precondition(mode in ("age", "cutoff-date"),
+                     "GC mode %r must be 'age' or 'cutoff-date'" % (mode,))
+        precondition(isinstance(override_lease_duration, (int, NoneType)),
+                     override_lease_duration=override_lease_duration)
+        precondition(isinstance(cutoff_date, int) or (mode != "cutoff-date" and cutoff_date is None),
+                     cutoff_date=cutoff_date)
+
+        self._enabled = enabled
+        self._mode = mode
+        self._override_lease_duration = override_lease_duration
+        self._cutoff_date = cutoff_date
+
+    def remove_expired_leases(self, leasedb, current_time):
+        if not self._enabled:
+            return
+
+        if self._mode == "age":
+            if self._override_lease_duration is not None:
+                leasedb.remove_leases_by_renewal_time(current_time - self._override_lease_duration)
+            else:
+                leasedb.remove_leases_by_expiration_time(current_time)
+        else:
+            # self._mode == "cutoff-date"
+            leasedb.remove_leases_by_renewal_time(self._cutoff_date)
+
+    def get_parameters(self):
+        """
+        Return the parameters as represented in the "configured-expiration-mode" field
+        of a history entry.
+        """
+        return (self._mode,
+                self._override_lease_duration,
+                self._cutoff_date,
+                self._enabled and ("mutable", "immutable") or ())
+
+    def is_enabled(self):
+        return self._enabled
+
+    def describe_enabled(self):
+        if self.is_enabled():
+            return "Enabled: expired leases will be removed"
+        else:
+            return "Disabled: scan-only mode, no leases will be removed"
+
+    def describe_expiration(self):
+        if self._mode == "age":
+            if self._override_lease_duration is None:
+                return ("Leases will expire naturally, probably 31 days after "
+                        "creation or renewal.")
+            else:
+                return ("Leases created or last renewed more than %s ago "
+                        "will be considered expired."
+                        % abbreviate_time(self._override_lease_duration))
+        else:
+            localizedutcdate = time.strftime("%d-%b-%Y", time.gmtime(self._cutoff_date))
+            isoutcdate = time_format.iso_utc_date(self._cutoff_date)
+            return ("Leases created or last renewed before %s (%s) UTC "
+                    "will be considered expired." % (isoutcdate, localizedutcdate))
diff --git a/src/allmydata/storage/leasedb.py b/src/allmydata/storage/leasedb.py
new file mode 100644
index 00000000..bdd45ff5
--- /dev/null
+++ b/src/allmydata/storage/leasedb.py
@@ -0,0 +1,360 @@
+
+import time, simplejson
+
+from allmydata.util.assertutil import _assert
+from allmydata.util import dbutil
+from allmydata.storage.common import si_b2a
+
+
+class NonExistentShareError(Exception):
+    def __init__(self, si_s, shnum):
+        Exception.__init__(self, si_s, shnum)
+        self.si_s = si_s
+        self.shnum = shnum
+
+    def __str__(self):
+        return "can't find SI=%r shnum=%r in `shares` table" % (self.si_s, self.shnum)
+
+
+class LeaseInfo(object):
+    def __init__(self, storage_index, shnum, owner_num, renewal_time, expiration_time):
+        self.storage_index = storage_index
+        self.shnum = shnum
+        self.owner_num = owner_num
+        self.renewal_time = renewal_time
+        self.expiration_time = expiration_time
+
+
+def int_or_none(s):
+    if s is None:
+        return s
+    return int(s)
+
+
+SHARETYPE_IMMUTABLE  = 0
+SHARETYPE_MUTABLE    = 1
+SHARETYPE_CORRUPTED  = 2
+SHARETYPE_UNKNOWN    = 3
+
+SHARETYPES = { SHARETYPE_IMMUTABLE: 'immutable',
+               SHARETYPE_MUTABLE:   'mutable',
+               SHARETYPE_CORRUPTED: 'corrupted',
+               SHARETYPE_UNKNOWN:   'unknown' }
+
+STATE_COMING = 0
+STATE_STABLE = 1
+STATE_GOING  = 2
+
+
+LEASE_SCHEMA_V1 = """
+CREATE TABLE `version`
+(
+ version INTEGER -- contains one row, set to 1
+);
+
+CREATE TABLE `shares`
+(
+ `storage_index` VARCHAR(26) not null,
+ `shnum` INTEGER not null,
+ `prefix` VARCHAR(2) not null,
+ `backend_key` VARCHAR,         -- not used by current backends; NULL means '$prefix/$storage_index/$shnum'
+ `used_space` INTEGER not null,
+ `sharetype` INTEGER not null,  -- SHARETYPE_*
+ `state` INTEGER not null,      -- STATE_*
+ PRIMARY KEY (`storage_index`, `shnum`)
+);
+
+CREATE INDEX `prefix` ON `shares` (`prefix`);
+-- CREATE UNIQUE INDEX `share_id` ON `shares` (`storage_index`,`shnum`);
+
+CREATE TABLE `leases`
+(
+ `id` INTEGER PRIMARY KEY AUTOINCREMENT,
+ `storage_index` VARCHAR(26) not null,
+ `shnum` INTEGER not null,
+ `account_id` INTEGER not null,
+ `renewal_time` INTEGER not null, -- duration is implicit: expiration-renewal
+ `expiration_time` INTEGER,       -- seconds since epoch; NULL means the end of time
+ FOREIGN KEY (`storage_index`, `shnum`) REFERENCES `shares` (`storage_index`, `shnum`),
+ FOREIGN KEY (`account_id`) REFERENCES `accounts` (`id`)
+);
+
+CREATE INDEX `account_id` ON `leases` (`account_id`);
+CREATE INDEX `expiration_time` ON `leases` (`expiration_time`);
+
+CREATE TABLE accounts
+(
+ `id` INTEGER PRIMARY KEY AUTOINCREMENT,
+ `pubkey_vs` VARCHAR(52),
+ `creation_time` INTEGER
+);
+CREATE UNIQUE INDEX `pubkey_vs` ON `accounts` (`pubkey_vs`);
+
+CREATE TABLE account_attributes
+(
+ `id` INTEGER PRIMARY KEY AUTOINCREMENT,
+ `account_id` INTEGER,
+ `name` VARCHAR(20),
+ `value` VARCHAR(20) -- actually anything: usually string, unicode, integer
+);
+CREATE UNIQUE INDEX `account_attr` ON `account_attributes` (`account_id`, `name`);
+
+INSERT INTO `accounts` VALUES (0, "anonymous", 0);
+INSERT INTO `accounts` VALUES (1, "starter", 0);
+
+CREATE TABLE crawler_history
+(
+ `cycle` INTEGER,
+ `json` TEXT
+);
+CREATE UNIQUE INDEX `cycle` ON `crawler_history` (`cycle`);
+"""
+
+DAY = 24*60*60
+MONTH = 30*DAY
+
+class LeaseDB:
+    ANONYMOUS_ACCOUNTID = 0
+    STARTER_LEASE_ACCOUNTID = 1
+    STARTER_LEASE_DURATION = 2*MONTH
+
+    def __init__(self, dbfile):
+        (self._sqlite,
+         self._db) = dbutil.get_db(dbfile, create_version=(LEASE_SCHEMA_V1, 1))
+        self._cursor = self._db.cursor()
+        self.debug = False
+        self.retained_history_entries = 10
+
+    # share management
+
+    def get_shares_for_prefix(self, prefix):
+        """
+        Returns a dict mapping (si_s, shnum) pairs to (used_space, sharetype) pairs.
+        """
+        self._cursor.execute("SELECT `storage_index`,`shnum`, `used_space`, `sharetype`"
+                             " FROM `shares`"
+                             " WHERE `prefix` == ?",
+                             (prefix,))
+        db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype)))
+                           for (si_s, shnum, used_space, sharetype) in self._cursor.fetchall()])
+        return db_sharemap
+
+    def add_new_share(self, storage_index, shnum, used_space, sharetype):
+        si_s = si_b2a(storage_index)
+        prefix = si_s[:2]
+        if self.debug: print "ADD_NEW_SHARE", prefix, si_s, shnum, used_space, sharetype
+        backend_key = None
+        # This needs to be an INSERT OR REPLACE because it is possible for add_new_share
+        # to be called when this share is already in the database (but not on disk).
+        self._cursor.execute("INSERT OR REPLACE INTO `shares`"
+                             " VALUES (?,?,?,?,?,?,?)",
+                             (si_s, shnum, prefix, backend_key, used_space, sharetype, STATE_COMING))
+
+    def add_starter_lease(self, storage_index, shnum):
+        si_s = si_b2a(storage_index)
+        if self.debug: print "ADD_STARTER_LEASE", si_s, shnum
+        self._dirty = True
+        renewal_time = time.time()
+        self._cursor.execute("INSERT INTO `leases`"
+                             " VALUES (?,?,?,?,?,?)",
+                             (None, si_s, shnum, self.STARTER_LEASE_ACCOUNTID,
+                              int(renewal_time), int(renewal_time + self.STARTER_LEASE_DURATION)))
+        self._db.commit()
+
+    def mark_share_as_stable(self, storage_index, shnum, used_space=None, backend_key=None):
+        """
+        Call this method after adding a share to backend storage.
+        """
+        si_s = si_b2a(storage_index)
+        if self.debug: print "MARK_SHARE_AS_STABLE", si_s, shnum, used_space
+        self._dirty = True
+        if used_space is not None:
+            self._cursor.execute("UPDATE `shares` SET `state`=?, `used_space`=?, `backend_key`=?"
+                                 " WHERE `storage_index`=? AND `shnum`=? AND `state`!=?",
+                                 (STATE_STABLE, used_space, backend_key, si_s, shnum, STATE_GOING))
+        else:
+            _assert(backend_key is None, backend_key=backend_key)
+            self._cursor.execute("UPDATE `shares` SET `state`=?"
+                                 " WHERE `storage_index`=? AND `shnum`=? AND `state`!=?",
+                                 (STATE_STABLE, si_s, shnum, STATE_GOING))
+        self._db.commit()
+        if self._cursor.rowcount < 1:
+            raise NonExistentShareError(si_s, shnum)
+
+    def mark_share_as_going(self, storage_index, shnum):
+        """
+        Call this method and commit before deleting a share from backend storage,
+        then call remove_deleted_share.
+        """
+        si_s = si_b2a(storage_index)
+        if self.debug: print "MARK_SHARE_AS_GOING", si_s, shnum
+        self._cursor.execute("UPDATE `shares` SET `state`=?"
+                             " WHERE `storage_index`=? AND `shnum`=? AND `state`!=?",
+                             (STATE_GOING, si_s, shnum, STATE_COMING))
+        self._db.commit()
+        if self._cursor.rowcount < 1:
+            raise NonExistentShareError(si_s, shnum)
+
+    def remove_deleted_share(self, storage_index, shnum):
+        si_s = si_b2a(storage_index)
+        if self.debug: print "REMOVE_DELETED_SHARE", si_s, shnum
+        # delete leases first to maintain integrity constraint
+        self._cursor.execute("DELETE FROM `leases`"
+                             " WHERE `storage_index`=? AND `shnum`=?",
+                             (si_s, shnum))
+        try:
+            self._cursor.execute("DELETE FROM `shares`"
+                                 " WHERE `storage_index`=? AND `shnum`=?",
+                                 (si_s, shnum))
+        except Exception:
+            self._db.rollback()  # roll back the lease deletion
+            raise
+        else:
+            self._db.commit()
+
+    def change_share_space(self, storage_index, shnum, used_space):
+        si_s = si_b2a(storage_index)
+        if self.debug: print "CHANGE_SHARE_SPACE", si_s, shnum, used_space
+        self._cursor.execute("UPDATE `shares` SET `used_space`=?"
+                             " WHERE `storage_index`=? AND `shnum`=?",
+                             (used_space, si_s, shnum))
+        self._db.commit()
+        if self._cursor.rowcount < 1:
+            raise NonExistentShareError(si_s, shnum)
+
+    # lease management
+
+    def add_or_renew_leases(self, storage_index, shnum, ownerid,
+                            renewal_time, expiration_time):
+        """
+        shnum=None means renew leases on all shares; do nothing if there are no shares for this storage_index in the `shares` table.
+
+        Raises NonExistentShareError if a specific shnum is given and that share does not exist in the `shares` table.
+        """
+        si_s = si_b2a(storage_index)
+        if self.debug: print "ADD_OR_RENEW_LEASES", si_s, shnum, ownerid, renewal_time, expiration_time
+        if shnum is None:
+            self._cursor.execute("SELECT `storage_index`, `shnum` FROM `shares`"
+                                 " WHERE `storage_index`=?",
+                                 (si_s,))
+            rows = self._cursor.fetchall()
+        else:
+            self._cursor.execute("SELECT `storage_index`, `shnum` FROM `shares`"
+                                 " WHERE `storage_index`=? AND `shnum`=?",
+                                 (si_s, shnum))
+            rows = self._cursor.fetchall()
+            if not rows:
+                raise NonExistentShareError(si_s, shnum)
+
+        for (found_si_s, found_shnum) in rows:
+            _assert(si_s == found_si_s, si_s=si_s, found_si_s=found_si_s)
+            # XXX can we simplify this by using INSERT OR REPLACE?
+            self._cursor.execute("SELECT `id` FROM `leases`"
+                                 " WHERE `storage_index`=? AND `shnum`=? AND `account_id`=?",
+                                 (si_s, found_shnum, ownerid))
+            row = self._cursor.fetchone()
+            if row:
+                # Note that unlike the pre-LeaseDB code, this allows leases to be backdated.
+                # There is currently no way for a client to specify lease duration, and so
+                # backdating can only happen in normal operation if there is a timequake on
+                # the server and time goes backward by more than 31 days. This needs to be
+                # revisited for ticket #1816, which would allow the client to request a lease
+                # duration.
+                leaseid = row[0]
+                self._cursor.execute("UPDATE `leases` SET `renewal_time`=?, `expiration_time`=?"
+                                     " WHERE `id`=?",
+                                     (renewal_time, expiration_time, leaseid))
+            else:
+                self._cursor.execute("INSERT INTO `leases` VALUES (?,?,?,?,?,?)",
+                                     (None, si_s, found_shnum, ownerid, renewal_time, expiration_time))
+            self._db.commit()
+
+    def get_leases(self, storage_index, ownerid):
+        si_s = si_b2a(storage_index)
+        self._cursor.execute("SELECT `shnum`, `account_id`, `renewal_time`, `expiration_time` FROM `leases`"
+                             " WHERE `storage_index`=? AND `account_id`=?",
+                             (si_s, ownerid))
+        rows = self._cursor.fetchall()
+        def _to_LeaseInfo(row):
+            (shnum, account_id, renewal_time, expiration_time) = tuple(row)
+            return LeaseInfo(storage_index, int(shnum), int(account_id), float(renewal_time), float(expiration_time))
+        return map(_to_LeaseInfo, rows)
+
+    def get_lease_ages(self, storage_index, shnum, now):
+        si_s = si_b2a(storage_index)
+        self._cursor.execute("SELECT `renewal_time` FROM `leases`"
+                             " WHERE `storage_index`=? AND `shnum`=?",
+                             (si_s, shnum))
+        rows = self._cursor.fetchall()
+        def _to_age(row):
+            return now - float(row[0])
+        return map(_to_age, rows)
+
+    def get_unleased_shares_for_prefix(self, prefix):
+        if self.debug: print "GET_UNLEASED_SHARES_FOR_PREFIX", prefix
+        # This would be simpler, but it doesn't work because 'NOT IN' doesn't support multiple columns.
+        #query = ("SELECT `storage_index`, `shnum`, `used_space`, `sharetype` FROM `shares`"
+        #         " WHERE (`storage_index`, `shnum`) NOT IN (SELECT DISTINCT `storage_index`, `shnum` FROM `leases`)")
+
+        # This "negative join" should be equivalent.
+        self._cursor.execute("SELECT DISTINCT s.storage_index, s.shnum, s.used_space, s.sharetype FROM `shares` s LEFT JOIN `leases` l"
+                             " ON (s.storage_index = l.storage_index AND s.shnum = l.shnum)"
+                             " WHERE s.prefix = ? AND l.storage_index IS NULL",
+                             (prefix,))
+        db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype)))
+                           for (si_s, shnum, used_space, sharetype) in self._cursor.fetchall()])
+        return db_sharemap
+
+    def remove_leases_by_renewal_time(self, renewal_cutoff_time):
+        if self.debug: print "REMOVE_LEASES_BY_RENEWAL_TIME", renewal_cutoff_time
+        self._cursor.execute("DELETE FROM `leases` WHERE `renewal_time` < ?",
+                             (renewal_cutoff_time,))
+        self._db.commit()
+
+    def remove_leases_by_expiration_time(self, expiration_cutoff_time):
+        if self.debug: print "REMOVE_LEASES_BY_EXPIRATION_TIME", expiration_cutoff_time
+        self._cursor.execute("DELETE FROM `leases` WHERE `expiration_time` IS NOT NULL AND `expiration_time` < ?",
+                             (expiration_cutoff_time,))
+        self._db.commit()
+
+    # history
+
+    def add_history_entry(self, cycle, entry):
+        if self.debug: print "ADD_HISTORY_ENTRY", cycle, entry
+        json = simplejson.dumps(entry)
+        self._cursor.execute("SELECT `cycle` FROM `crawler_history`")
+        rows = self._cursor.fetchall()
+        if len(rows) >= self.retained_history_entries:
+            first_cycle_to_retain = list(sorted(rows))[-(self.retained_history_entries - 1)][0]
+            self._cursor.execute("DELETE FROM `crawler_history` WHERE `cycle` < ?",
+                                 (first_cycle_to_retain,))
+            self._db.commit()
+
+        try:
+            self._cursor.execute("INSERT OR REPLACE INTO `crawler_history` VALUES (?,?)",
+                                 (cycle, json))
+        except Exception:
+            self._db.rollback()  # roll back the deletion of unretained entries
+            raise
+        else:
+            self._db.commit()
+
+    def get_history(self):
+        self._cursor.execute("SELECT `cycle`,`json` FROM `crawler_history`")
+        rows = self._cursor.fetchall()
+        decoded = [(row[0], simplejson.loads(row[1])) for row in rows]
+        return dict(decoded)
+
+    def get_account_creation_time(self, owner_num):
+        self._cursor.execute("SELECT `creation_time` from `accounts`"
+                             " WHERE `id`=?",
+                             (owner_num,))
+        row = self._cursor.fetchone()
+        if row:
+            return row[0]
+        return None
+
+    def get_all_accounts(self):
+        self._cursor.execute("SELECT `id`,`pubkey_vs`"
+                             " FROM `accounts` ORDER BY `id` ASC")
+        return self._cursor.fetchall()