corrupted. This is handled in the same way as upgrading from a previous
version.
-- Detect shares that have unexpectedly disappeared from storage. The
- disappearance of a share is logged, and its entry and leases are removed
- from the leasedb.
+- Detect shares with stable entries in the leasedb that have unexpectedly
+ disappeared from storage. The disappearance of a share is logged, and its
+ entry and leases are removed from the leasedb.
Accounts
from twisted.internet import defer
from allmydata.util.deferredutil import for_items
+from allmydata.util.assertutil import _assert
from allmydata.util import log
from allmydata.storage.crawler import ShareCrawler
from allmydata.storage.common import si_a2b
-from allmydata.storage.leasedb import SHARETYPES, SHARETYPE_UNKNOWN, SHARETYPE_CORRUPTED
+from allmydata.storage.leasedb import SHARETYPES, SHARETYPE_UNKNOWN, SHARETYPE_CORRUPTED, \
+ STATE_STABLE
class AccountingCrawler(ShareCrawler):
# updated, and removed.
for key, value in db_sharemap.iteritems():
(si_s, shnum) = key
- (used_space, sharetype) = value
+ (used_space, sharetype, state) = value
examined_sharesets[sharetype].add(si_s)
stored_shares = set(stored_sharemap)
db_shares = set(db_sharemap)
- # add new shares to the DB
+ # Add new shares to the DB.
new_shares = stored_shares - db_shares
for shareid in new_shares:
(si_s, shnum) = shareid
- (used_space, sharetype) = stored_sharemap[shareid]
+ (used_space, sharetype, state) = stored_sharemap[shareid]
self._leasedb.add_new_share(si_a2b(si_s), shnum, used_space, sharetype)
self._leasedb.add_starter_lease(si_s, shnum)
- # remove disappeared shares from DB
- disappeared_shares = db_shares - stored_shares
- for (si_s, shnum) in disappeared_shares:
- log.msg(format="share SI=%(si_s)s shnum=%(shnum)s unexpectedly disappeared",
- si_s=si_s, shnum=shnum, level=log.WEIRD)
- # This is temporarily disabled, because it results in failures if we're examining
- # a prefix while a share is created in it (ticket #1921).
- #self._leasedb.remove_deleted_share(si_a2b(si_s), shnum)
+ # Remove disappeared shares from the DB. Note that only shares in STATE_STABLE
+ # should be considered "disappeared", since otherwise it would be possible for
+ # this to delete shares that are in the process of being created (see ticket #1921).
+ potentially_disappeared_shares = db_shares - stored_shares
+ for shareid in potentially_disappeared_shares:
+ (used_space, sharetype, state) = db_sharemap[shareid]
+ if state == STATE_STABLE:
+ (si_s, shnum) = shareid
+ log.msg(format="share SI=%(si_s)s shnum=%(shnum)s unexpectedly disappeared",
+ si_s=si_s, shnum=shnum, level=log.WEIRD)
+ self._leasedb.remove_deleted_share(si_a2b(si_s), shnum)
recovered_sharesets = [set() for st in xrange(len(SHARETYPES))]
def _delete_share(ign, key, value):
(si_s, shnum) = key
- (used_space, sharetype) = value
+ (used_space, sharetype, state) = value
+ _assert(state == STATE_STABLE, state=state)
storage_index = si_a2b(si_s)
+
d3 = defer.succeed(None)
def _mark_and_delete(ign):
self._leasedb.mark_share_as_going(storage_index, shnum)
d3.addCallbacks(_deleted, _not_deleted)
return d3
+ # This only includes stable unleased shares (see ticket #1921).
unleased_sharemap = self._leasedb.get_unleased_shares_for_prefix(prefix)
d2 = for_items(_delete_share, unleased_sharemap)
def get_shares_for_prefix(self, prefix):
"""
- Returns a dict mapping (si_s, shnum) pairs to (used_space, sharetype) pairs.
+ Returns a dict mapping (si_s, shnum) pairs to (used_space, sharetype, state) triples
+ for shares with this prefix.
"""
- self._cursor.execute("SELECT `storage_index`,`shnum`, `used_space`, `sharetype`"
+ self._cursor.execute("SELECT `storage_index`,`shnum`, `used_space`, `sharetype`, `state`"
" FROM `shares`"
" WHERE `prefix` == ?",
(prefix,))
- db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype)))
- for (si_s, shnum, used_space, sharetype) in self._cursor.fetchall()])
+ db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype), int(state)))
+ for (si_s, shnum, used_space, sharetype, state) in self._cursor.fetchall()])
return db_sharemap
def add_new_share(self, storage_index, shnum, used_space, sharetype):
return map(_to_age, rows)
def get_unleased_shares_for_prefix(self, prefix):
+ """
+ Returns a dict mapping (si_s, shnum) pairs to (used_space, sharetype, state) triples
+ for stable, unleased shares with this prefix.
+ """
if self.debug: print "GET_UNLEASED_SHARES_FOR_PREFIX", prefix
# This would be simpler, but it doesn't work because 'NOT IN' doesn't support multiple columns.
- #query = ("SELECT `storage_index`, `shnum`, `used_space`, `sharetype` FROM `shares`"
- # " WHERE (`storage_index`, `shnum`) NOT IN (SELECT DISTINCT `storage_index`, `shnum` FROM `leases`)")
+ #query = ("SELECT `storage_index`, `shnum`, `used_space`, `sharetype`, `state` FROM `shares`"
+ # " WHERE `state` = STATE_STABLE "
+ # " AND (`storage_index`, `shnum`) NOT IN (SELECT DISTINCT `storage_index`, `shnum` FROM `leases`)")
# This "negative join" should be equivalent.
- self._cursor.execute("SELECT DISTINCT s.storage_index, s.shnum, s.used_space, s.sharetype FROM `shares` s LEFT JOIN `leases` l"
+ self._cursor.execute("SELECT DISTINCT s.storage_index, s.shnum, s.used_space, s.sharetype, s.state"
+ " FROM `shares` s LEFT JOIN `leases` l"
" ON (s.storage_index = l.storage_index AND s.shnum = l.shnum)"
- " WHERE s.prefix = ? AND l.storage_index IS NULL",
- (prefix,))
- db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype)))
- for (si_s, shnum, used_space, sharetype) in self._cursor.fetchall()])
+ " WHERE s.prefix = ? AND s.state = ? AND l.storage_index IS NULL",
+ (prefix, STATE_STABLE))
+ db_sharemap = dict([((str(si_s), int(shnum)), (int(used_space), int(sharetype), int(state)))
+ for (si_s, shnum, used_space, sharetype, state) in self._cursor.fetchall()])
return db_sharemap
def remove_leases_by_renewal_time(self, renewal_cutoff_time):