from twisted.python import failure
from foolscap.api import DeadReferenceError, RemoteException, eventually, \
fireEventually
-from allmydata.util import base32, hashutil, idlib, log, deferredutil
+from allmydata.util import base32, hashutil, log, deferredutil
from allmydata.util.dictutil import DictOfSets
from allmydata.storage.server import si_b2a
from allmydata.interfaces import IServermapUpdaterStatus
from pycryptopp.publickey import rsa
-from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
- CorruptShareError
+from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \
+ MODE_READ, MODE_REPAIR, CorruptShareError
from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
class UpdateStatus:
self.started = time.time()
self.finished = None
- def add_per_server_time(self, peerid, op, sent, elapsed):
+ def add_per_server_time(self, server, op, sent, elapsed):
assert op in ("query", "late", "privkey")
- if peerid not in self.timings["per_server"]:
- self.timings["per_server"][peerid] = []
- self.timings["per_server"][peerid].append((op,sent,elapsed))
+ if server not in self.timings["per_server"]:
+ self.timings["per_server"][server] = []
+ self.timings["per_server"][server].append((op,sent,elapsed))
def get_started(self):
return self.started
self.storage_index = si
def set_mode(self, mode):
self.mode = mode
- def set_privkey_from(self, peerid):
- self.privkey_from = peerid
+ def set_privkey_from(self, server):
+ self.privkey_from = server
def set_status(self, status):
self.status = status
def set_progress(self, value):
has changed since I last retrieved this data'. This reduces the chances
of clobbering a simultaneous (uncoordinated) write.
- @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
- (versionid, timestamp) tuple. Each 'versionid' is a
- tuple of (seqnum, root_hash, IV, segsize, datalength,
- k, N, signed_prefix, offsets)
-
- @ivar connections: maps peerid to a RemoteReference
-
- @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
- shares that I should ignore (because a previous user of
- the servermap determined that they were invalid). The
- updater only locates a certain number of shares: if
- some of these turn out to have integrity problems and
- are unusable, the caller will need to mark those shares
- as bad, then re-update the servermap, then try again.
- The dict maps (peerid, shnum) tuple to old checkstring.
+ @var _known_shares: a dictionary, mapping a (server, shnum) tuple to a
+ (versionid, timestamp) tuple. Each 'versionid' is a
+ tuple of (seqnum, root_hash, IV, segsize, datalength,
+ k, N, signed_prefix, offsets)
+
+ @ivar _bad_shares: dict with keys of (server, shnum) tuples, describing
+ shares that I should ignore (because a previous user
+ of the servermap determined that they were invalid).
+ The updater only locates a certain number of shares:
+ if some of these turn out to have integrity problems
+ and are unusable, the caller will need to mark those
+ shares as bad, then re-update the servermap, then try
+ again. The dict maps (server, shnum) tuple to old
+ checkstring.
"""
def __init__(self):
- self.servermap = {}
- self.connections = {}
- self.unreachable_peers = set() # peerids that didn't respond to queries
- self.reachable_peers = set() # peerids that did respond to queries
- self.problems = [] # mostly for debugging
- self.bad_shares = {} # maps (peerid,shnum) to old checkstring
- self.last_update_mode = None
- self.last_update_time = 0
- self.update_data = {} # (verinfo,shnum) => data
+ self._known_shares = {}
+ self.unreachable_servers = set() # servers that didn't respond to queries
+ self.reachable_servers = set() # servers that did respond to queries
+ self._problems = [] # mostly for debugging
+ self._bad_shares = {} # maps (server,shnum) to old checkstring
+ self._last_update_mode = None
+ self._last_update_time = 0
+ self.proxies = {}
+ self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
+ # where blockhashes is a list of bytestrings (the result of
+ # layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
+ # (block,salt) tuple-of-bytestrings from get_block_and_salt()
def copy(self):
s = ServerMap()
- s.servermap = self.servermap.copy() # tuple->tuple
- s.connections = self.connections.copy() # str->RemoteReference
- s.unreachable_peers = set(self.unreachable_peers)
- s.reachable_peers = set(self.reachable_peers)
- s.problems = self.problems[:]
- s.bad_shares = self.bad_shares.copy() # tuple->str
- s.last_update_mode = self.last_update_mode
- s.last_update_time = self.last_update_time
+ s._known_shares = self._known_shares.copy() # tuple->tuple
+ s.unreachable_servers = set(self.unreachable_servers)
+ s.reachable_servers = set(self.reachable_servers)
+ s._problems = self._problems[:]
+ s._bad_shares = self._bad_shares.copy() # tuple->str
+ s._last_update_mode = self._last_update_mode
+ s._last_update_time = self._last_update_time
return s
- def mark_bad_share(self, peerid, shnum, checkstring):
+ def get_reachable_servers(self):
+ return self.reachable_servers
+
+ def mark_server_reachable(self, server):
+ self.reachable_servers.add(server)
+
+ def mark_server_unreachable(self, server):
+ self.unreachable_servers.add(server)
+
+ def mark_bad_share(self, server, shnum, checkstring):
"""This share was found to be bad, either in the checkstring or
signature (detected during mapupdate), or deeper in the share
(detected at retrieve time). Remove it from our list of useful
corrupted or badly signed) so that a repair operation can do the
test-and-set using it as a reference.
"""
- key = (peerid, shnum) # record checkstring
- self.bad_shares[key] = checkstring
- self.servermap.pop(key, None)
+ key = (server, shnum) # record checkstring
+ self._bad_shares[key] = checkstring
+ self._known_shares.pop(key, None)
- def add_new_share(self, peerid, shnum, verinfo, timestamp):
+ def get_bad_shares(self):
+ # key=(server,shnum) -> checkstring
+ return self._bad_shares
+
+ def add_new_share(self, server, shnum, verinfo, timestamp):
"""We've written a new share out, replacing any that was there
before."""
- key = (peerid, shnum)
- self.bad_shares.pop(key, None)
- self.servermap[key] = (verinfo, timestamp)
+ key = (server, shnum)
+ self._bad_shares.pop(key, None)
+ self._known_shares[key] = (verinfo, timestamp)
+
+ def add_problem(self, f):
+ self._problems.append(f)
+ def get_problems(self):
+ return self._problems
+
+ def set_last_update(self, mode, when):
+ self._last_update_mode = mode
+ self._last_update_time = when
+ def get_last_update(self):
+ return (self._last_update_mode, self._last_update_time)
def dump(self, out=sys.stdout):
print >>out, "servermap:"
- for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
+ for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = verinfo
print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
- (idlib.shortnodeid_b2a(peerid), shnum,
+ (server.get_name(), shnum,
seqnum, base32.b2a(root_hash)[:4], k, N,
datalength))
- if self.problems:
- print >>out, "%d PROBLEMS" % len(self.problems)
- for f in self.problems:
+ if self._problems:
+ print >>out, "%d PROBLEMS" % len(self._problems)
+ for f in self._problems:
print >>out, str(f)
return out
- def all_peers(self):
- return set([peerid
- for (peerid, shnum)
- in self.servermap])
+ def all_servers(self):
+ return set([server for (server, shnum) in self._known_shares])
- def all_peers_for_version(self, verinfo):
- """Return a set of peerids that hold shares for the given version."""
- return set([peerid
- for ( (peerid, shnum), (verinfo2, timestamp) )
- in self.servermap.items()
+ def all_servers_for_version(self, verinfo):
+ """Return a set of servers that hold shares for the given version."""
+ return set([server
+ for ( (server, shnum), (verinfo2, timestamp) )
+ in self._known_shares.items()
if verinfo == verinfo2])
+ def get_known_shares(self):
+ # maps (server,shnum) to (versionid,timestamp)
+ return self._known_shares
+
def make_sharemap(self):
- """Return a dict that maps shnum to a set of peerds that hold it."""
+ """Return a dict that maps shnum to a set of servers that hold it."""
sharemap = DictOfSets()
- for (peerid, shnum) in self.servermap:
- sharemap.add(shnum, peerid)
+ for (server, shnum) in self._known_shares:
+ sharemap.add(shnum, server)
return sharemap
def make_versionmap(self):
- """Return a dict that maps versionid to sets of (shnum, peerid,
+ """Return a dict that maps versionid to sets of (shnum, server,
timestamp) tuples."""
versionmap = DictOfSets()
- for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
- versionmap.add(verinfo, (shnum, peerid, timestamp))
+ for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
+ versionmap.add(verinfo, (shnum, server, timestamp))
return versionmap
- def shares_on_peer(self, peerid):
- return set([shnum
- for (s_peerid, shnum)
- in self.servermap
- if s_peerid == peerid])
+ def debug_shares_on_server(self, server): # used by tests
+ return set([shnum for (s, shnum) in self._known_shares if s == server])
- def version_on_peer(self, peerid, shnum):
- key = (peerid, shnum)
- if key in self.servermap:
- (verinfo, timestamp) = self.servermap[key]
+ def version_on_server(self, server, shnum):
+ key = (server, shnum)
+ if key in self._known_shares:
+ (verinfo, timestamp) = self._known_shares[key]
return verinfo
return None
all_shares = {}
for verinfo, shares in versionmap.items():
s = set()
- for (shnum, peerid, timestamp) in shares:
+ for (shnum, server, timestamp) in shares:
s.add(shnum)
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = verinfo
bits = []
for (verinfo, shares) in versionmap.items():
vstr = self.summarize_version(verinfo)
- shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+ shnums = set([shnum for (shnum, server, timestamp) in shares])
bits.append("%d*%s" % (len(shnums), vstr))
return "/".join(bits)
for (verinfo, shares) in versionmap.items():
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = verinfo
- shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+ shnums = set([shnum for (shnum, server, timestamp) in shares])
if len(shnums) >= k:
# this one is recoverable
recoverable_versions.add(verinfo)
for (verinfo, shares) in versionmap.items():
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = verinfo
- shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+ shnums = set([shnum for (shnum, server, timestamp) in shares])
if len(shnums) < k:
unrecoverable_versions.add(verinfo)
for (verinfo, shares) in versionmap.items():
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = verinfo
- shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+ shnums = set([shnum for (shnum, server, timestamp) in shares])
healths[verinfo] = (len(shnums),k)
if len(shnums) < k:
unrecoverable.add(verinfo)
self._read_size = 1000
self._need_privkey = False
- if mode == MODE_WRITE and not self._node.get_privkey():
+ if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey():
self._need_privkey = True
# check+repair: repair requires the privkey, so if we didn't happen
# to ask for it during the check, we'll have problems doing the
# avoid re-checking the signatures for each share.
self._valid_versions = set()
- # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
- # timestamp) tuples. This is used to figure out which versions might
- # be retrievable, and to make the eventual data download faster.
- self.versionmap = DictOfSets()
-
self._done_deferred = defer.Deferred()
- # first, which peers should be talk to? Any that were in our old
+ # first, which servers should be talk to? Any that were in our old
# servermap, plus "enough" others.
self._queries_completed = 0
sb = self._storage_broker
- # All of the peers, permuted by the storage index, as usual.
- full_peerlist = [(s.get_serverid(), s.get_rref())
- for s in sb.get_servers_for_psi(self._storage_index)]
- self.full_peerlist = full_peerlist # for use later, immutable
- self.extra_peers = full_peerlist[:] # peers are removed as we use them
- self._good_peers = set() # peers who had some shares
- self._empty_peers = set() # peers who don't have any shares
- self._bad_peers = set() # peers to whom our queries failed
- self._readers = {} # peerid -> dict(sharewriters), filled in
- # after responses come in.
+ # All of the servers, permuted by the storage index, as usual.
+ full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
+ self.full_serverlist = full_serverlist # for use later, immutable
+ self.extra_servers = full_serverlist[:] # servers are removed as we use them
+ self._good_servers = set() # servers who had some shares
+ self._empty_servers = set() # servers who don't have any shares
+ self._bad_servers = set() # servers to whom our queries failed
k = self._node.get_required_shares()
# For what cases can these conditions work?
if N is None:
N = 10
self.EPSILON = k
- # we want to send queries to at least this many peers (although we
+ # we want to send queries to at least this many servers (although we
# might not wait for all of their answers to come back)
- self.num_peers_to_query = k + self.EPSILON
+ self.num_servers_to_query = k + self.EPSILON
- if self.mode == MODE_CHECK:
- # We want to query all of the peers.
- initial_peers_to_query = dict(full_peerlist)
- must_query = set(initial_peers_to_query.keys())
- self.extra_peers = []
+ if self.mode in (MODE_CHECK, MODE_REPAIR):
+ # We want to query all of the servers.
+ initial_servers_to_query = list(full_serverlist)
+ must_query = set(initial_servers_to_query)
+ self.extra_servers = []
elif self.mode == MODE_WRITE:
# we're planning to replace all the shares, so we want a good
# chance of finding them all. We will keep searching until we've
# seen epsilon that don't have a share.
- # We don't query all of the peers because that could take a while.
- self.num_peers_to_query = N + self.EPSILON
- initial_peers_to_query, must_query = self._build_initial_querylist()
- self.required_num_empty_peers = self.EPSILON
+ # We don't query all of the servers because that could take a while.
+ self.num_servers_to_query = N + self.EPSILON
+ initial_servers_to_query, must_query = self._build_initial_querylist()
+ self.required_num_empty_servers = self.EPSILON
# TODO: arrange to read lots of data from k-ish servers, to avoid
# the extra round trip required to read large directories. This
# private key.
else: # MODE_READ, MODE_ANYTHING
- # 2k peers is good enough.
- initial_peers_to_query, must_query = self._build_initial_querylist()
-
- # this is a set of peers that we are required to get responses from:
- # they are peers who used to have a share, so we need to know where
- # they currently stand, even if that means we have to wait for a
- # silently-lost TCP connection to time out. We remove peers from this
- # set as we get responses.
- self._must_query = must_query
-
- # now initial_peers_to_query contains the peers that we should ask,
- # self.must_query contains the peers that we must have heard from
- # before we can consider ourselves finished, and self.extra_peers
- # contains the overflow (peers that we should tap if we don't get
- # enough responses)
+ # 2*k servers is good enough.
+ initial_servers_to_query, must_query = self._build_initial_querylist()
+
+ # this is a set of servers that we are required to get responses
+ # from: they are servers who used to have a share, so we need to know
+ # where they currently stand, even if that means we have to wait for
+ # a silently-lost TCP connection to time out. We remove servers from
+ # this set as we get responses.
+ self._must_query = set(must_query)
+
+ # now initial_servers_to_query contains the servers that we should
+ # ask, self.must_query contains the servers that we must have heard
+ # from before we can consider ourselves finished, and
+ # self.extra_servers contains the overflow (servers that we should
+ # tap if we don't get enough responses)
# I guess that self._must_query is a subset of
- # initial_peers_to_query?
- assert set(must_query).issubset(set(initial_peers_to_query))
+ # initial_servers_to_query?
+ assert must_query.issubset(initial_servers_to_query)
- self._send_initial_requests(initial_peers_to_query)
+ self._send_initial_requests(initial_servers_to_query)
self._status.timings["initial_queries"] = time.time() - self._started
return self._done_deferred
def _build_initial_querylist(self):
- initial_peers_to_query = {}
- must_query = set()
- for peerid in self._servermap.all_peers():
- ss = self._servermap.connections[peerid]
- # we send queries to everyone who was already in the sharemap
- initial_peers_to_query[peerid] = ss
- # and we must wait for responses from them
- must_query.add(peerid)
-
- while ((self.num_peers_to_query > len(initial_peers_to_query))
- and self.extra_peers):
- (peerid, ss) = self.extra_peers.pop(0)
- initial_peers_to_query[peerid] = ss
-
- return initial_peers_to_query, must_query
-
- def _send_initial_requests(self, peerlist):
- self._status.set_status("Sending %d initial queries" % len(peerlist))
+ # we send queries to everyone who was already in the sharemap
+ initial_servers_to_query = set(self._servermap.all_servers())
+ # and we must wait for responses from them
+ must_query = set(initial_servers_to_query)
+
+ while ((self.num_servers_to_query > len(initial_servers_to_query))
+ and self.extra_servers):
+ initial_servers_to_query.add(self.extra_servers.pop(0))
+
+ return initial_servers_to_query, must_query
+
+ def _send_initial_requests(self, serverlist):
+ self._status.set_status("Sending %d initial queries" % len(serverlist))
self._queries_outstanding = set()
- self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
- for (peerid, ss) in peerlist.items():
- self._queries_outstanding.add(peerid)
- self._do_query(ss, peerid, self._storage_index, self._read_size)
+ for server in serverlist:
+ self._queries_outstanding.add(server)
+ self._do_query(server, self._storage_index, self._read_size)
- if not peerlist:
+ if not serverlist:
# there is nobody to ask, so we need to short-circuit the state
# machine.
d = defer.maybeDeferred(self._check_for_done, None)
# might produce a result.
return None
- def _do_query(self, ss, peerid, storage_index, readsize):
- self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
- peerid=idlib.shortnodeid_b2a(peerid),
+ def _do_query(self, server, storage_index, readsize):
+ self.log(format="sending query to [%(name)s], readsize=%(readsize)d",
+ name=server.get_name(),
readsize=readsize,
level=log.NOISY)
- self._servermap.connections[peerid] = ss
started = time.time()
- self._queries_outstanding.add(peerid)
- d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
- d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
+ self._queries_outstanding.add(server)
+ d = self._do_read(server, storage_index, [], [(0, readsize)])
+ d.addCallback(self._got_results, server, readsize, storage_index,
started)
- d.addErrback(self._query_failed, peerid)
+ d.addErrback(self._query_failed, server)
# errors that aren't handled by _query_failed (and errors caused by
# _query_failed) get logged, but we still want to check for doneness.
d.addErrback(log.err)
d.addCallback(self._check_for_done)
return d
- def _do_read(self, ss, peerid, storage_index, shnums, readv):
+ def _do_read(self, server, storage_index, shnums, readv):
+ ss = server.get_rref()
if self._add_lease:
# send an add-lease message in parallel. The results are handled
# separately. This is sent before the slot_readv() so that we can
# be sure the add_lease is retired by the time slot_readv comes
# back (this relies upon our knowledge that the server code for
# add_lease is synchronous).
- renew_secret = self._node.get_renewal_secret(peerid)
- cancel_secret = self._node.get_cancel_secret(peerid)
+ renew_secret = self._node.get_renewal_secret(server)
+ cancel_secret = self._node.get_cancel_secret(server)
d2 = ss.callRemote("add_lease", storage_index,
renew_secret, cancel_secret)
# we ignore success
- d2.addErrback(self._add_lease_failed, peerid, storage_index)
+ d2.addErrback(self._add_lease_failed, server, storage_index)
d = ss.callRemote("slot_readv", storage_index, shnums, readv)
return d
- def _got_corrupt_share(self, e, shnum, peerid, data, lp):
+ def _got_corrupt_share(self, e, shnum, server, data, lp):
"""
I am called when a remote server returns a corrupt share in
response to one of our queries. By corrupt, I mean a share
self.log(format="bad share: %(f_value)s", f_value=str(f),
failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
# Notify the server that its share is corrupt.
- self.notify_server_corruption(peerid, shnum, str(e))
- # By flagging this as a bad peer, we won't count any of
- # the other shares on that peer as valid, though if we
+ self.notify_server_corruption(server, shnum, str(e))
+ # By flagging this as a bad server, we won't count any of
+ # the other shares on that server as valid, though if we
# happen to find a valid version string amongst those
# shares, we'll keep track of it so that we don't need
# to validate the signature on those again.
- self._bad_peers.add(peerid)
+ self._bad_servers.add(server)
self._last_failure = f
# XXX: Use the reader for this?
checkstring = data[:SIGNED_PREFIX_LENGTH]
- self._servermap.mark_bad_share(peerid, shnum, checkstring)
- self._servermap.problems.append(f)
-
-
- def _cache_good_sharedata(self, verinfo, shnum, now, data):
- """
- If one of my queries returns successfully (which means that we
- were able to and successfully did validate the signature), I
- cache the data that we initially fetched from the storage
- server. This will help reduce the number of roundtrips that need
- to occur when the file is downloaded, or when the file is
- updated.
- """
- if verinfo:
- self._node._add_to_cache(verinfo, shnum, 0, data)
+ self._servermap.mark_bad_share(server, shnum, checkstring)
+ self._servermap.add_problem(f)
- def _got_results(self, datavs, peerid, readsize, stuff, started):
- lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
- peerid=idlib.shortnodeid_b2a(peerid),
+ def _got_results(self, datavs, server, readsize, storage_index, started):
+ lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
+ name=server.get_name(),
numshares=len(datavs))
+ ss = server.get_rref()
now = time.time()
elapsed = now - started
def _done_processing(ignored=None):
- self._queries_outstanding.discard(peerid)
- self._servermap.reachable_peers.add(peerid)
- self._must_query.discard(peerid)
+ self._queries_outstanding.discard(server)
+ self._servermap.mark_server_reachable(server)
+ self._must_query.discard(server)
self._queries_completed += 1
if not self._running:
self.log("but we're not running, so we'll ignore it", parent=lp)
_done_processing()
- self._status.add_per_server_time(peerid, "late", started, elapsed)
+ self._status.add_per_server_time(server, "late", started, elapsed)
return
- self._status.add_per_server_time(peerid, "query", started, elapsed)
+ self._status.add_per_server_time(server, "query", started, elapsed)
if datavs:
- self._good_peers.add(peerid)
+ self._good_servers.add(server)
else:
- self._empty_peers.add(peerid)
+ self._empty_servers.add(server)
- ss, storage_index = stuff
ds = []
for shnum,datav in datavs.items():
reader = MDMFSlotReadProxy(ss,
storage_index,
shnum,
- data)
- self._readers.setdefault(peerid, dict())[shnum] = reader
+ data,
+ data_is_everything=(len(data) < readsize))
+
# our goal, with each response, is to validate the version
# information and share data as best we can at this point --
# we do this by validating the signature. To do this, we
if not self._node.get_pubkey():
# fetch and set the public key.
d = reader.get_verification_key()
- d.addCallback(lambda results, shnum=shnum, peerid=peerid:
- self._try_to_set_pubkey(results, peerid, shnum, lp))
+ d.addCallback(lambda results, shnum=shnum:
+ self._try_to_set_pubkey(results, server, shnum, lp))
# XXX: Make self._pubkey_query_failed?
- d.addErrback(lambda error, shnum=shnum, peerid=peerid, data=data:
- self._got_corrupt_share(error, shnum, peerid, data, lp))
+ d.addErrback(lambda error, shnum=shnum, data=data:
+ self._got_corrupt_share(error, shnum, server, data, lp))
else:
# we already have the public key.
d = defer.succeed(None)
# bytes of the share on the storage server, so we
# shouldn't need to fetch anything at this step.
d2 = reader.get_verinfo()
- d2.addErrback(lambda error, shnum=shnum, peerid=peerid, data=data:
- self._got_corrupt_share(error, shnum, peerid, data, lp))
+ d2.addErrback(lambda error, shnum=shnum, data=data:
+ self._got_corrupt_share(error, shnum, server, data, lp))
# - Next, we need the signature. For an SDMF share, it is
# likely that we fetched this when doing our initial fetch
# to get the version information. In MDMF, this lives at
# the end of the share, so unless the file is quite small,
# we'll need to do a remote fetch to get it.
d3 = reader.get_signature()
- d3.addErrback(lambda error, shnum=shnum, peerid=peerid, data=data:
- self._got_corrupt_share(error, shnum, peerid, data, lp))
+ d3.addErrback(lambda error, shnum=shnum, data=data:
+ self._got_corrupt_share(error, shnum, server, data, lp))
# Once we have all three of these responses, we can move on
# to validating the signature
# fetch it here.
if self._need_privkey:
d4 = reader.get_encprivkey()
- d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
- self._try_to_validate_privkey(results, peerid, shnum, lp))
- d4.addErrback(lambda error, shnum=shnum, peerid=peerid, data=data:
- self._privkey_query_failed(error, shnum, data, lp))
+ d4.addCallback(lambda results, shnum=shnum:
+ self._try_to_validate_privkey(results, server, shnum, lp))
+ d4.addErrback(lambda error, shnum=shnum:
+ self._privkey_query_failed(error, server, shnum, lp))
else:
d4 = defer.succeed(None)
d5 = defer.succeed(None)
dl = defer.DeferredList([d, d2, d3, d4, d5])
+ def _append_proxy(passthrough, shnum=shnum, reader=reader):
+ # Store the proxy (with its cache) keyed by serverid and
+ # version.
+ _, (_,verinfo), _, _, _ = passthrough
+ verinfo = self._make_verinfo_hashable(verinfo)
+ self._servermap.proxies[(verinfo,
+ server.get_serverid(),
+ storage_index, shnum)] = reader
+ return passthrough
+ dl.addCallback(_append_proxy)
dl.addBoth(self._turn_barrier)
- dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
- self._got_signature_one_share(results, shnum, peerid, lp))
+ dl.addCallback(lambda results, shnum=shnum:
+ self._got_signature_one_share(results, shnum, server, lp))
dl.addErrback(lambda error, shnum=shnum, data=data:
- self._got_corrupt_share(error, shnum, peerid, data, lp))
- dl.addCallback(lambda verinfo, shnum=shnum, peerid=peerid, data=data:
- self._cache_good_sharedata(verinfo, shnum, now, data))
+ self._got_corrupt_share(error, shnum, server, data, lp))
ds.append(dl)
# dl is a deferred list that will fire when all of the shares
- # that we found on this peer are done processing. When dl fires,
+ # that we found on this server are done processing. When dl fires,
# we know that processing is done, so we can decrement the
# semaphore-like thing that we incremented earlier.
dl = defer.DeferredList(ds, fireOnOneErrback=True)
return fireEventually(result)
- def _try_to_set_pubkey(self, pubkey_s, peerid, shnum, lp):
+ def _try_to_set_pubkey(self, pubkey_s, server, shnum, lp):
if self._node.get_pubkey():
return # don't go through this again if we don't have to
fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
assert len(fingerprint) == 32
if fingerprint != self._node.get_fingerprint():
- raise CorruptShareError(peerid, shnum,
- "pubkey doesn't match fingerprint")
+ raise CorruptShareError(server, shnum,
+ "pubkey doesn't match fingerprint")
self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
assert self._node.get_pubkey()
- def notify_server_corruption(self, peerid, shnum, reason):
- ss = self._servermap.connections[peerid]
- ss.callRemoteOnly("advise_corrupt_share",
- "mutable", self._storage_index, shnum, reason)
+ def notify_server_corruption(self, server, shnum, reason):
+ rref = server.get_rref()
+ rref.callRemoteOnly("advise_corrupt_share",
+ "mutable", self._storage_index, shnum, reason)
- def _got_signature_one_share(self, results, shnum, peerid, lp):
+ def _got_signature_one_share(self, results, shnum, server, lp):
# It is our job to give versioninfo to our caller. We need to
# raise CorruptShareError if the share is corrupt for any
# reason, something that our caller will handle.
- self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
+ self.log(format="_got_results: got shnum #%(shnum)d from serverid %(name)s",
shnum=shnum,
- peerid=idlib.shortnodeid_b2a(peerid),
+ name=server.get_name(),
level=log.NOISY,
parent=lp)
if not self._running:
return None
_, verinfo, signature, __, ___ = results
+ verinfo = self._make_verinfo_hashable(verinfo[1])
+
+ # This tuple uniquely identifies a share on the grid; we use it
+ # to keep track of the ones that we've already seen.
(seqnum,
root_hash,
saltish,
k,
n,
prefix,
- offsets) = verinfo[1]
- offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
+ offsets_tuple) = verinfo
- # XXX: This should be done for us in the method, so
- # presumably you can go in there and fix it.
- verinfo = (seqnum,
- root_hash,
- saltish,
- segsize,
- datalen,
- k,
- n,
- prefix,
- offsets_tuple)
- # This tuple uniquely identifies a share on the grid; we use it
- # to keep track of the ones that we've already seen.
if verinfo not in self._valid_versions:
# This is a new version tuple, and we need to validate it
assert self._node.get_pubkey()
valid = self._node.get_pubkey().verify(prefix, signature[1])
if not valid:
- raise CorruptShareError(peerid, shnum,
+ raise CorruptShareError(server, shnum,
"signature is invalid")
# ok, it's a valid verinfo. Add it to the list of validated
# versions.
self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
% (seqnum, base32.b2a(root_hash)[:4],
- idlib.shortnodeid_b2a(peerid), shnum,
+ server.get_name(), shnum,
k, n, segsize, datalen),
parent=lp)
self._valid_versions.add(verinfo)
# version info again, that its signature checks out and that
# we're okay to skip the signature-checking step.
- # (peerid, shnum) are bound in the method invocation.
- if (peerid, shnum) in self._servermap.bad_shares:
+ # (server, shnum) are bound in the method invocation.
+ if (server, shnum) in self._servermap.get_bad_shares():
# we've been told that the rest of the data in this share is
# unusable, so don't add it to the servermap.
self.log("but we've been told this is a bad share",
# Add the info to our servermap.
timestamp = time.time()
- self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
- # and the versionmap
- self.versionmap.add(verinfo, (shnum, peerid, timestamp))
+ self._servermap.add_new_share(server, shnum, verinfo, timestamp)
return verinfo
-
- def _got_update_results_one_share(self, results, share):
- """
- I record the update results in results.
- """
- assert len(results) == 4
- verinfo, blockhashes, start, end = results
+ def _make_verinfo_hashable(self, verinfo):
(seqnum,
root_hash,
saltish,
n,
prefix,
offsets) = verinfo
+
offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
- # XXX: This should be done for us in the method, so
- # presumably you can go in there and fix it.
verinfo = (seqnum,
root_hash,
saltish,
n,
prefix,
offsets_tuple)
+ return verinfo
+ def _got_update_results_one_share(self, results, share):
+ """
+ I record the update results in results.
+ """
+ assert len(results) == 4
+ verinfo, blockhashes, start, end = results
+ verinfo = self._make_verinfo_hashable(verinfo)
update_data = (blockhashes, start, end)
self._servermap.set_update_data_for_share_and_verinfo(share,
verinfo,
return verifier
- def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
+ def _try_to_validate_privkey(self, enc_privkey, server, shnum, lp):
"""
Given a writekey from a remote server, I validate it against the
writekey stored in my node. If it is valid, then I set the
alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
if alleged_writekey != self._node.get_writekey():
self.log("invalid privkey from %s shnum %d" %
- (idlib.nodeid_b2a(peerid)[:8], shnum),
+ (server.get_name(), shnum),
parent=lp, level=log.WEIRD, umid="aJVccw")
return
# it's good
- self.log("got valid privkey from shnum %d on peerid %s" %
- (shnum, idlib.shortnodeid_b2a(peerid)),
+ self.log("got valid privkey from shnum %d on serverid %s" %
+ (shnum, server.get_name()),
parent=lp)
privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
self._node._populate_encprivkey(enc_privkey)
self._node._populate_privkey(privkey)
self._need_privkey = False
- self._status.set_privkey_from(peerid)
+ self._status.set_privkey_from(server)
- def _add_lease_failed(self, f, peerid, storage_index):
+ def _add_lease_failed(self, f, server, storage_index):
# Older versions of Tahoe didn't handle the add-lease message very
# well: <=1.1.0 throws a NameError because it doesn't implement
# remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
# this may ignore a bit too much, but that only hurts us
# during debugging
return
- self.log(format="error in add_lease from [%(peerid)s]: %(f_value)s",
- peerid=idlib.shortnodeid_b2a(peerid),
+ self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
+ name=server.get_name(),
f_value=str(f.value),
failure=f,
level=log.WEIRD, umid="iqg3mw")
return
# local errors are cause for alarm
log.err(f,
- format="local error in add_lease to [%(peerid)s]: %(f_value)s",
- peerid=idlib.shortnodeid_b2a(peerid),
+ format="local error in add_lease to [%(name)s]: %(f_value)s",
+ name=server.get_name(),
f_value=str(f.value),
level=log.WEIRD, umid="ZWh6HA")
- def _query_failed(self, f, peerid):
+ def _query_failed(self, f, server):
if not self._running:
return
level = log.WEIRD
self.log(format="error during query: %(f_value)s",
f_value=str(f.value), failure=f,
level=level, umid="IHXuQg")
- self._must_query.discard(peerid)
- self._queries_outstanding.discard(peerid)
- self._bad_peers.add(peerid)
- self._servermap.problems.append(f)
- # a peerid could be in both ServerMap.reachable_peers and
- # .unreachable_peers if they responded to our query, but then an
+ self._must_query.discard(server)
+ self._queries_outstanding.discard(server)
+ self._bad_servers.add(server)
+ self._servermap.add_problem(f)
+ # a server could be in both ServerMap.reachable_servers and
+ # .unreachable_servers if they responded to our query, but then an
# exception was raised in _got_results.
- self._servermap.unreachable_peers.add(peerid)
+ self._servermap.mark_server_unreachable(server)
self._queries_completed += 1
self._last_failure = f
- def _privkey_query_failed(self, f, peerid, shnum, lp):
- self._queries_outstanding.discard(peerid)
+ def _privkey_query_failed(self, f, server, shnum, lp):
+ self._queries_outstanding.discard(server)
if not self._running:
return
level = log.WEIRD
self.log(format="error during privkey query: %(f_value)s",
f_value=str(f.value), failure=f,
parent=lp, level=level, umid="McoJ5w")
- self._servermap.problems.append(f)
+ self._servermap.add_problem(f)
self._last_failure = f
# return : keep waiting, no new queries
lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
"%(outstanding)d queries outstanding, "
- "%(extra)d extra peers available, "
- "%(must)d 'must query' peers left, "
+ "%(extra)d extra servers available, "
+ "%(must)d 'must query' servers left, "
"need_privkey=%(need_privkey)s"
),
mode=self.mode,
outstanding=len(self._queries_outstanding),
- extra=len(self.extra_peers),
+ extra=len(self.extra_servers),
must=len(self._must_query),
need_privkey=self._need_privkey,
level=log.NOISY,
return
if self._must_query:
- # we are still waiting for responses from peers that used to have
+ # we are still waiting for responses from servers that used to have
# a share, so we must continue to wait. No additional queries are
# required at this time.
- self.log("%d 'must query' peers left" % len(self._must_query),
+ self.log("%d 'must query' servers left" % len(self._must_query),
level=log.NOISY, parent=lp)
return
- if (not self._queries_outstanding and not self.extra_peers):
- # all queries have retired, and we have no peers left to ask. No
+ if (not self._queries_outstanding and not self.extra_servers):
+ # all queries have retired, and we have no servers left to ask. No
# more progress can be made, therefore we are done.
- self.log("all queries are retired, no extra peers: done",
+ self.log("all queries are retired, no extra servers: done",
parent=lp)
return self._done()
parent=lp)
return self._done()
- if self.mode == MODE_CHECK:
+ if self.mode in (MODE_CHECK, MODE_REPAIR):
# we used self._must_query, and we know there aren't any
# responses still waiting, so that means we must be done
self.log("done", parent=lp)
# version, and we haven't seen any unrecoverable higher-seqnum'ed
# versions, then we're done.
- if self._queries_completed < self.num_peers_to_query:
+ if self._queries_completed < self.num_servers_to_query:
self.log(format="%(completed)d completed, %(query)d to query: need more",
completed=self._queries_completed,
- query=self.num_peers_to_query,
+ query=self.num_servers_to_query,
level=log.NOISY, parent=lp)
return self._send_more_queries(MAX_IN_FLIGHT)
if not recoverable_versions:
states = []
found_boundary = False
- for i,(peerid,ss) in enumerate(self.full_peerlist):
- if peerid in self._bad_peers:
+ for i,server in enumerate(self.full_serverlist):
+ if server in self._bad_servers:
# query failed
states.append("x")
- #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
- elif peerid in self._empty_peers:
+ #self.log("loop [%s]: x" % server.get_name()
+ elif server in self._empty_servers:
# no shares
states.append("0")
- #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
+ #self.log("loop [%s]: 0" % server.get_name()
if last_found != -1:
num_not_found += 1
if num_not_found >= self.EPSILON:
found_boundary = True
break
- elif peerid in self._good_peers:
+ elif server in self._good_servers:
# yes shares
states.append("1")
- #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
+ #self.log("loop [%s]: 1" % server.get_name()
last_found = i
num_not_found = 0
else:
# not responded yet
states.append("?")
- #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
+ #self.log("loop [%s]: ?" % server.get_name()
last_not_responded = i
num_not_responded += 1
return self._send_more_queries(num_not_responded)
# if we hit here, we didn't find our boundary, so we're still
- # waiting for peers
+ # waiting for servers
self.log("no boundary yet, %s" % "".join(states), parent=lp,
level=log.NOISY)
return self._send_more_queries(MAX_IN_FLIGHT)
active_queries = len(self._queries_outstanding) + len(more_queries)
if active_queries >= num_outstanding:
break
- if not self.extra_peers:
+ if not self.extra_servers:
break
- more_queries.append(self.extra_peers.pop(0))
+ more_queries.append(self.extra_servers.pop(0))
self.log(format="sending %(more)d more queries: %(who)s",
more=len(more_queries),
- who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
- for (peerid,ss) in more_queries]),
+ who=" ".join(["[%s]" % s.get_name() for s in more_queries]),
level=log.NOISY)
- for (peerid, ss) in more_queries:
- self._do_query(ss, peerid, self._storage_index, self._read_size)
+ for server in more_queries:
+ self._do_query(server, self._storage_index, self._read_size)
# we'll retrigger when those queries come back
def _done(self):
self._status.set_status("Finished")
self._status.set_active(False)
- self._servermap.last_update_mode = self.mode
- self._servermap.last_update_time = self._started
+ self._servermap.set_last_update(self.mode, self._started)
# the servermap will not be touched after this
self.log("servermap: %s" % self._servermap.summarize_versions())