from allmydata.interfaces import IMutableFileNode, IMutableFileURI
from allmydata.util import hashutil
from allmydata.uri import WriteableSSKFileURI
+from allmydata.encode import NotEnoughPeersError
from pycryptopp.publickey import rsa
from pycryptopp.cipher.aes import AES
d.addCallback(_done)
return d
- def download_to_data(self):
- d = self.obtain_lock()
- d.addCallback(lambda res: self.update_servermap(mode=MODE_ENOUGH))
+ def _update_and_retrieve_best(self, old_map=None):
+ d = self.update_servermap(old_map=old_map, mode=MODE_ENOUGH)
def _updated(smap):
goal = smap.best_recoverable_version()
if not goal:
raise UnrecoverableFileError("no recoverable versions")
return self.download_version(smap, goal)
d.addCallback(_updated)
+ return d
+
+ def download_to_data(self):
+ d = self.obtain_lock()
+ d.addCallback(lambda res: self._update_and_retrieve_best())
+ def _maybe_retry(f):
+ f.trap(NotEnoughPeersError)
+ e = f.value
+ if not e.worth_retrying:
+ return f
+ # the download is worth retrying once. Make sure to use the old
+ # servermap, since it is what remembers the bad shares. TODO:
+ # consider allowing this to retry multiple times.. this approach
+ # will let us tolerate about 8 bad shares, I think.
+ return self._update_and_retrieve_best(e.servermap)
+ d.addErrback(_maybe_retry)
d.addBoth(self.release_lock)
return d
self._outstanding_queries = {} # maps (peerid,shnum) to start_time
self._running = True
self._decoding = False
+ self._bad_shares = set()
self.servermap = servermap
assert self._node._pubkey
f = failure.Failure()
self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD)
self.remove_peer(peerid)
+ self.servermap.mark_bad_share(peerid, shnum)
+ self._bad_shares.add( (peerid, shnum) )
self._last_failure = f
pass
# all done!
format = ("ran out of peers: "
"have %(have)d shares (k=%(k)d), "
"%(outstanding)d queries in flight, "
- "need %(need)d more")
+ "need %(need)d more, "
+ "found %(bad)d bad shares")
+ args = {"have": len(self.shares),
+ "k": k,
+ "outstanding": len(self._outstanding_queries),
+ "need": needed,
+ "bad": len(self._bad_shares),
+ }
self.log(format=format,
- have=len(self.shares), k=k,
- outstanding=len(self._outstanding_queries),
- need=needed,
- level=log.WEIRD)
- msg2 = format % {"have": len(self.shares),
- "k": k,
- "outstanding": len(self._outstanding_queries),
- "need": needed,
- }
- raise NotEnoughPeersError("%s, last failure: %s" %
- (msg2, self._last_failure))
+ level=log.WEIRD, **args)
+ err = NotEnoughPeersError("%s, last failure: %s" %
+ (format % args, self._last_failure))
+ if self._bad_shares:
+ self.log("We found some bad shares this pass. You should "
+ "update the servermap and try again to check "
+ "more peers",
+ level=log.WEIRD)
+ err.worth_retrying = True
+ err.servermap = self.servermap
+ raise err
return
operations, which means 'publish this new version, but only if nothing
has changed since I last retrieved this data'. This reduces the chances
of clobbering a simultaneous (uncoordinated) write.
+
+ @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
+ shares that I should ignore (because a previous user of
+ the servermap determined that they were invalid). The
+ updater only locates a certain number of shares: if
+ some of these turn out to have integrity problems and
+ are unusable, the caller will need to mark those shares
+ as bad, then re-update the servermap, then try again.
"""
def __init__(self):
self.connections = {} # maps peerid to a RemoteReference
self.unreachable_peers = set() # peerids that didn't respond to queries
self.problems = [] # mostly for debugging
+ self.bad_shares = set()
self.last_update_mode = None
self.last_update_time = 0
+ def mark_bad_share(self, peerid, shnum):
+ """This share was found to be bad, not in the checkstring or
+ signature, but deeper in the share, detected at retrieve time. Remove
+ it from our list of useful shares, and remember that it is bad so we
+ don't add it back again later.
+ """
+ self.bad_shares.add( (peerid, shnum) )
+ self._remove_share(peerid, shnum)
+
+ def _remove_share(self, peerid, shnum):
+ #(s_shnum, s_verinfo, s_timestamp) = share
+ to_remove = [share
+ for share in self.servermap[peerid]
+ if share[0] == shnum]
+ for share in to_remove:
+ self.servermap[peerid].discard(share)
+ if not self.servermap[peerid]:
+ del self.servermap[peerid]
+
+ def add_new_share(self, peerid, shnum, verinfo, timestamp):
+ """We've written a new share out, replacing any that was there
+ before."""
+ self.bad_shares.discard( (peerid, shnum) )
+ self._remove_share(peerid, shnum)
+ self.servermap.add(peerid, (shnum, verinfo, timestamp) )
+
def dump(self, out=sys.stdout):
print >>out, "servermap:"
for (peerid, shares) in self.servermap.items():
class ServermapUpdater:
def __init__(self, filenode, servermap, mode=MODE_ENOUGH):
+ """I update a servermap, locating a sufficient number of useful
+ shares and remembering where they are located.
+
+ """
+
self._node = filenode
self._servermap = servermap
self.mode = mode
self._valid_versions.add(verinfo)
# We now know that this is a valid candidate verinfo.
+ if (peerid, shnum, verinfo) in self._servermap.bad_shares:
+ # we've been told that the rest of the data in this share is
+ # unusable, so don't add it to the servermap.
+ return verinfo
+
# Add the info to our servermap.
timestamp = time.time()
self._servermap.servermap.add(peerid, (shnum, verinfo, timestamp))