was not fully healthy."""
class IRepairable(Interface):
- def repair():
+ def repair(checker_results):
"""Attempt to repair the given object. Returns a Deferred that fires
with a IRepairResults object.
+
+ I must be called with an object that implements ICheckerResults, as
+ proof that you have actually discovered a problem with this file. I
+ will use the data in the checker results to guide the repair process,
+ such as which servers provided bad data and should therefore be
+ avoided.
"""
+class IRepairResults(Interface):
+ """I contain the results of a repair operation."""
+
class IClient(Interface):
def upload(uploadable):
-import struct
from zope.interface import implements
from twisted.internet import defer
from twisted.python import failure
from common import MODE_CHECK, CorruptShareError
from servermap import ServerMap, ServermapUpdater
-from layout import unpack_share, SIGNED_PREFIX
+from layout import unpack_share, SIGNED_PREFIX_LENGTH
class MutableChecker:
self.healthy = True
self.problems = []
self._storage_index = self._node.get_storage_index()
+ self.results = Results(self._storage_index)
def check(self, verify=False, repair=False):
servermap = ServerMap()
+ self.results.servermap = servermap
self.do_verify = verify
self.do_repair = repair
u = ServermapUpdater(self._node, servermap, MODE_CHECK)
except CorruptShareError:
f = failure.Failure()
self.add_problem(shnum, peerid, f)
+ prefix = data[:SIGNED_PREFIX_LENGTH]
+ self.results.servermap.mark_bad_share(peerid, shnum, prefix)
def check_prefix(self, peerid, shnum, data):
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.best_version
- got_prefix = data[:struct.calcsize(SIGNED_PREFIX)]
+ got_prefix = data[:SIGNED_PREFIX_LENGTH]
if got_prefix != prefix:
raise CorruptShareError(peerid, shnum,
"prefix mismatch: share changed while we were reading it")
return
if not self.do_repair:
return
- pass
+ self.results.repair_attempted = True
+ d = self._node.repair(self)
+ def _repair_finished(repair_results):
+ self.results.repair_succeeded = True
+ self.results.repair_results = repair_results
+ def _repair_error(f):
+ # I'm not sure if I want to pass through a failure or not.
+ self.results.repair_succeeded = False
+ self.results.repair_failure = f
+ return f
+ d.addCallbacks(_repair_finished, _repair_error)
+ return d
def _return_results(self, res):
- r = Results(self._storage_index)
- r.healthy = self.healthy
- r.problems = self.problems
- return r
+ self.results.healthy = self.healthy
+ self.results.problems = self.problems
+ return self.results
def add_problem(self, shnum, peerid, what):
self.healthy = False
self.problems.append( (peerid, self._storage_index, shnum, what) )
+
class Results:
implements(ICheckerResults)
def __init__(self, storage_index):
self.storage_index = storage_index
self.storage_index_s = base32.b2a(storage_index)[:6]
+ self.repair_attempted = False
def is_healthy(self):
return self.healthy
PREFIX = ">BQ32s16s" # each version has a different prefix
SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature
+SIGNED_PREFIX_LENGTH = struct.calcsize(SIGNED_PREFIX)
HEADER = ">BQ32s16s BBQQ LLLLQQ" # includes offsets
HEADER_LENGTH = struct.calcsize(HEADER)
def unpack_prefix_and_signature(data):
assert len(data) >= HEADER_LENGTH, len(data)
- prefix = data[:struct.calcsize(SIGNED_PREFIX)]
+ prefix = data[:SIGNED_PREFIX_LENGTH]
(version,
seqnum,
from twisted.internet import defer, reactor
from twisted.python import log
from foolscap.eventual import eventually
-from allmydata.interfaces import IMutableFileNode, IMutableFileURI, ICheckable
+from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
+ ICheckable, ICheckerResults
from allmydata.util import hashutil
from allmydata.util.assertutil import precondition
from allmydata.uri import WriteableSSKFileURI
from servermap import ServerMap, ServermapUpdater
from retrieve import Retrieve
from checker import MutableChecker
+from repair import Repairer
class BackoffAgent:
def get_total_shares(self):
return self._total_shares
+ ####################################
+ # IFilesystemNode
def get_uri(self):
return self._uri.to_string()
return d
#################################
+ # ICheckable
def check(self, verify=False, repair=False):
checker = self.checker_class(self)
d.addCallback(_done)
return d
+ #################################
+ # IRepairable
+
+ def repair(self, checker_results):
+ assert ICheckerResults(checker_results)
+ r = Repairer(self, checker_results)
+ d = r.start()
+ return d
+
+
+ #################################
+ # IMutableFileNode
+
# allow the use of IDownloadTarget
def download(self, target):
# fake it. TODO: make this cleaner.
# use later.
self.connections = {}
+ self.bad_share_checkstrings = {}
+
# we use the servermap to populate the initial goal: this way we will
# try to update each existing share in place.
for (peerid, shnum) in self._servermap.servermap:
self.goal.add( (peerid, shnum) )
self.connections[peerid] = self._servermap.connections[peerid]
+ # then we add in all the shares that were bad (corrupted, bad
+ # signatures, etc). We want to replace these.
+ for (peerid, shnum, old_checkstring) in self._servermap.bad_shares:
+ self.goal.add( (peerid, shnum) )
+ self.bad_share_checkstrings[ (peerid, shnum) ] = old_checkstring
+ self.connections[peerid] = self._servermap.connections[peerid]
# create the shares. We'll discard these as they are delivered. SMDF:
# we're allowed to hold everything in memory.
old_salt)
testv = (0, len(old_checkstring), "eq", old_checkstring)
+ elif key in self.bad_share_checkstrings:
+ old_checkstring = self.bad_share_checkstrings[key]
+ testv = (0, len(old_checkstring), "eq", old_checkstring)
+
else:
# add a testv that requires the share not exist
#testv = (0, 1, 'eq', "")
--- /dev/null
+
+from zope.interface import implements
+from allmydata.interfaces import IRepairResults
+
+class RepairResults:
+ implements(IRepairResults)
+
+class MustForceRepairError(Exception):
+ pass
+
+class Repairer:
+ def __init__(self, node, checker_results):
+ self.node = node
+ self.checker_results = checker_results
+ assert checker_results.storage_index == self.node.get_storage_index()
+
+ def start(self, force=False):
+ # download, then re-publish. If a server had a bad share, try to
+ # replace it with a good one of the same shnum.
+
+ # The normal repair operation should not be used to replace
+ # application-specific merging of alternate versions: i.e if there
+ # are multiple highest seqnums with different roothashes. In this
+ # case, the application must use node.upload() (referencing the
+ # servermap that indicates the multiple-heads condition), or
+ # node.overwrite(). The repair() operation will refuse to run in
+ # these conditions unless a force=True argument is provided. If
+ # force=True is used, then the highest root hash will be reinforced.
+
+ # Likewise, the presence of an unrecoverable latest version is an
+ # unusual event, and should ideally be handled by retrying a couple
+ # times (spaced out over hours or days) and hoping that new shares
+ # will become available. If repair(force=True) is called, data will
+ # be lost: a new seqnum will be generated with the same contents as
+ # the most recent recoverable version, skipping over the lost
+ # version. repair(force=False) will refuse to run in a situation like
+ # this.
+
+ # Repair is designed to fix the following injuries:
+ # missing shares: add new ones to get at least N distinct ones
+ # old shares: replace old shares with the latest version
+ # bogus shares (bad sigs): replace the bad one with a good one
+
+ smap = self.checker_results.servermap
+
+ if smap.unrecoverable_newer_versions():
+ if not force:
+ raise MustForceRepairError("There were unrecoverable newer "
+ "versions, so force=True must be "
+ "passed to the repair() operation")
+ # continuing on means that node.upload() will pick a seqnum that
+ # is higher than everything visible in the servermap, effectively
+ # discarding the unrecoverable versions.
+ if smap.needs_merge():
+ if not force:
+ raise MustForceRepairError("There were multiple recoverable "
+ "versions with identical seqnums, "
+ "so force=True must be passed to "
+ "the repair() operation")
+ # continuing on means that smap.best_recoverable_version() will
+ # pick the one with the highest roothash, and then node.upload()
+ # will replace all shares with its contents
+
+ # missing shares are handled during upload, which tries to find a
+ # home for every share
+
+ # old shares are handled during upload, which will replace any share
+ # that was present in the servermap
+
+ # bogus shares need to be managed here. We might notice a bogus share
+ # during mapupdate (whether done for a filecheck or just before a
+ # download) by virtue of it having an invalid signature. We might
+ # also notice a bad hash in the share during verify or download. In
+ # either case, the problem will be noted in the servermap, and the
+ # bad share (along with its checkstring) will be recorded in
+ # servermap.bad_shares . Publish knows that it should try and replace
+ # these.
+
+ best_version = smap.best_recoverable_version()
+ d = self.node.download_version(smap, best_version)
+ d.addCallback(self.node.upload, smap)
+ d.addCallback(self.get_results)
+ return d
+
+ def get_results(self, res):
+ return RepairResults()
f = failure.Failure()
self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD)
self.remove_peer(peerid)
- self.servermap.mark_bad_share(peerid, shnum)
+ self.servermap.mark_bad_share(peerid, shnum, prefix)
self._bad_shares.add( (peerid, shnum) )
self._status.problems[peerid] = f
self._last_failure = f
from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
DictOfSets, CorruptShareError, NeedMoreDataError
-from layout import unpack_prefix_and_signature, unpack_header, unpack_share
+from layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
+ SIGNED_PREFIX_LENGTH
class UpdateStatus:
implements(IServermapUpdaterStatus)
self.connections = {}
self.unreachable_peers = set() # peerids that didn't respond to queries
self.problems = [] # mostly for debugging
- self.bad_shares = set()
+ self.bad_shares = {} # maps (peerid,shnum) to old checkstring
self.last_update_mode = None
self.last_update_time = 0
- def mark_bad_share(self, peerid, shnum):
- """This share was found to be bad, not in the checkstring or
- signature, but deeper in the share, detected at retrieve time. Remove
- it from our list of useful shares, and remember that it is bad so we
- don't add it back again later.
+ def mark_bad_share(self, peerid, shnum, checkstring):
+ """This share was found to be bad, either in the checkstring or
+ signature (detected during mapupdate), or deeper in the share
+ (detected at retrieve time). Remove it from our list of useful
+ shares, and remember that it is bad so we don't add it back again
+ later. We record the share's old checkstring (which might be
+ corrupted or badly signed) so that a repair operation can do the
+ test-and-set using it as a reference.
"""
- key = (peerid, shnum)
- self.bad_shares.add(key)
+ key = (peerid, shnum) # record checkstring
+ self.bad_shares[key] = checkstring
self.servermap.pop(key, None)
def add_new_share(self, peerid, shnum, verinfo, timestamp):
"""We've written a new share out, replacing any that was there
before."""
key = (peerid, shnum)
- self.bad_shares.discard(key)
+ self.bad_shares.pop(key, None)
self.servermap[key] = (verinfo, timestamp)
def dump(self, out=sys.stdout):
parent=lp, level=log.WEIRD)
self._bad_peers.add(peerid)
self._last_failure = f
+ checkstring = data[:SIGNED_PREFIX_LENGTH]
+ self._servermap.mark_bad_share(peerid, shnum, checkstring)
self._servermap.problems.append(f)
pass
for (shnum, peerid, timestamp) in shares:
if shnum < 5:
self._corrupted.add( (peerid, shnum) )
- sm.mark_bad_share(peerid, shnum)
+ sm.mark_bad_share(peerid, shnum, "")
return self.update_servermap(sm, MODE_WRITE)
d.addCallback(_made_map)
def _check_map(sm):