From ca14b9939710d9e0e7a5bffa02b82401f3c11a6a Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 15 Apr 2008 15:58:02 -0700 Subject: [PATCH] mutable WIP: if corrupt shares cause a retrieve to fail, restart it once, ignoring those shares and using different ones --- src/allmydata/encode.py | 2 ++ src/allmydata/mutable/node.py | 22 +++++++++++++-- src/allmydata/mutable/retrieve.py | 34 ++++++++++++++-------- src/allmydata/mutable/servermap.py | 45 ++++++++++++++++++++++++++++++ 4 files changed, 88 insertions(+), 15 deletions(-) diff --git a/src/allmydata/encode.py b/src/allmydata/encode.py index e8d1a1bf..3480ce5f 100644 --- a/src/allmydata/encode.py +++ b/src/allmydata/encode.py @@ -61,6 +61,8 @@ hash tree is put into the URI. """ class NotEnoughPeersError(Exception): + worth_retrying = False + servermap = None pass class UploadAborted(Exception): diff --git a/src/allmydata/mutable/node.py b/src/allmydata/mutable/node.py index 87295f2c..3ad40470 100644 --- a/src/allmydata/mutable/node.py +++ b/src/allmydata/mutable/node.py @@ -7,6 +7,7 @@ from twisted.internet import defer from allmydata.interfaces import IMutableFileNode, IMutableFileURI from allmydata.util import hashutil from allmydata.uri import WriteableSSKFileURI +from allmydata.encode import NotEnoughPeersError from pycryptopp.publickey import rsa from pycryptopp.cipher.aes import AES @@ -279,15 +280,30 @@ class MutableFileNode: d.addCallback(_done) return d - def download_to_data(self): - d = self.obtain_lock() - d.addCallback(lambda res: self.update_servermap(mode=MODE_ENOUGH)) + def _update_and_retrieve_best(self, old_map=None): + d = self.update_servermap(old_map=old_map, mode=MODE_ENOUGH) def _updated(smap): goal = smap.best_recoverable_version() if not goal: raise UnrecoverableFileError("no recoverable versions") return self.download_version(smap, goal) d.addCallback(_updated) + return d + + def download_to_data(self): + d = self.obtain_lock() + d.addCallback(lambda res: self._update_and_retrieve_best()) + def _maybe_retry(f): + f.trap(NotEnoughPeersError) + e = f.value + if not e.worth_retrying: + return f + # the download is worth retrying once. Make sure to use the old + # servermap, since it is what remembers the bad shares. TODO: + # consider allowing this to retry multiple times.. this approach + # will let us tolerate about 8 bad shares, I think. + return self._update_and_retrieve_best(e.servermap) + d.addErrback(_maybe_retry) d.addBoth(self.release_lock) return d diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index 4942d96b..9abc5430 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -93,6 +93,7 @@ class Retrieve: self._outstanding_queries = {} # maps (peerid,shnum) to start_time self._running = True self._decoding = False + self._bad_shares = set() self.servermap = servermap assert self._node._pubkey @@ -238,6 +239,8 @@ class Retrieve: f = failure.Failure() self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD) self.remove_peer(peerid) + self.servermap.mark_bad_share(peerid, shnum) + self._bad_shares.add( (peerid, shnum) ) self._last_failure = f pass # all done! @@ -374,19 +377,26 @@ class Retrieve: format = ("ran out of peers: " "have %(have)d shares (k=%(k)d), " "%(outstanding)d queries in flight, " - "need %(need)d more") + "need %(need)d more, " + "found %(bad)d bad shares") + args = {"have": len(self.shares), + "k": k, + "outstanding": len(self._outstanding_queries), + "need": needed, + "bad": len(self._bad_shares), + } self.log(format=format, - have=len(self.shares), k=k, - outstanding=len(self._outstanding_queries), - need=needed, - level=log.WEIRD) - msg2 = format % {"have": len(self.shares), - "k": k, - "outstanding": len(self._outstanding_queries), - "need": needed, - } - raise NotEnoughPeersError("%s, last failure: %s" % - (msg2, self._last_failure)) + level=log.WEIRD, **args) + err = NotEnoughPeersError("%s, last failure: %s" % + (format % args, self._last_failure)) + if self._bad_shares: + self.log("We found some bad shares this pass. You should " + "update the servermap and try again to check " + "more peers", + level=log.WEIRD) + err.worth_retrying = True + err.servermap = self.servermap + raise err return diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index 8863ea6e..4f8fca2b 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -25,6 +25,14 @@ class ServerMap: operations, which means 'publish this new version, but only if nothing has changed since I last retrieved this data'. This reduces the chances of clobbering a simultaneous (uncoordinated) write. + + @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing + shares that I should ignore (because a previous user of + the servermap determined that they were invalid). The + updater only locates a certain number of shares: if + some of these turn out to have integrity problems and + are unusable, the caller will need to mark those shares + as bad, then re-update the servermap, then try again. """ def __init__(self): @@ -35,9 +43,36 @@ class ServerMap: self.connections = {} # maps peerid to a RemoteReference self.unreachable_peers = set() # peerids that didn't respond to queries self.problems = [] # mostly for debugging + self.bad_shares = set() self.last_update_mode = None self.last_update_time = 0 + def mark_bad_share(self, peerid, shnum): + """This share was found to be bad, not in the checkstring or + signature, but deeper in the share, detected at retrieve time. Remove + it from our list of useful shares, and remember that it is bad so we + don't add it back again later. + """ + self.bad_shares.add( (peerid, shnum) ) + self._remove_share(peerid, shnum) + + def _remove_share(self, peerid, shnum): + #(s_shnum, s_verinfo, s_timestamp) = share + to_remove = [share + for share in self.servermap[peerid] + if share[0] == shnum] + for share in to_remove: + self.servermap[peerid].discard(share) + if not self.servermap[peerid]: + del self.servermap[peerid] + + def add_new_share(self, peerid, shnum, verinfo, timestamp): + """We've written a new share out, replacing any that was there + before.""" + self.bad_shares.discard( (peerid, shnum) ) + self._remove_share(peerid, shnum) + self.servermap.add(peerid, (shnum, verinfo, timestamp) ) + def dump(self, out=sys.stdout): print >>out, "servermap:" for (peerid, shares) in self.servermap.items(): @@ -148,6 +183,11 @@ class ServerMap: class ServermapUpdater: def __init__(self, filenode, servermap, mode=MODE_ENOUGH): + """I update a servermap, locating a sufficient number of useful + shares and remembering where they are located. + + """ + self._node = filenode self._servermap = servermap self.mode = mode @@ -415,6 +455,11 @@ class ServermapUpdater: self._valid_versions.add(verinfo) # We now know that this is a valid candidate verinfo. + if (peerid, shnum, verinfo) in self._servermap.bad_shares: + # we've been told that the rest of the data in this share is + # unusable, so don't add it to the servermap. + return verinfo + # Add the info to our servermap. timestamp = time.time() self._servermap.servermap.add(peerid, (shnum, verinfo, timestamp)) -- 2.45.2