3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.eventual import eventually
8 from allmydata.util import base32, hashutil, idlib, log
9 from allmydata import storage
10 from allmydata.interfaces import IServermapUpdaterStatus
11 from pycryptopp.publickey import rsa
13 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
14 DictOfSets, CorruptShareError, NeedMoreDataError
15 from layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
19 implements(IServermapUpdaterStatus)
20 statusid_counter = count(0)
23 self.timings["per_server"] = {}
24 self.timings["cumulative_verify"] = 0.0
25 self.privkey_from = None
28 self.storage_index = None
30 self.status = "Not started"
32 self.counter = self.statusid_counter.next()
33 self.started = time.time()
36 def add_per_server_time(self, peerid, op, sent, elapsed):
37 assert op in ("query", "late", "privkey")
38 if peerid not in self.timings["per_server"]:
39 self.timings["per_server"][peerid] = []
40 self.timings["per_server"][peerid].append((op,sent,elapsed))
42 def get_started(self):
44 def get_finished(self):
46 def get_storage_index(self):
47 return self.storage_index
50 def get_servermap(self):
52 def get_privkey_from(self):
53 return self.privkey_from
54 def using_helper(self):
60 def get_progress(self):
64 def get_counter(self):
67 def set_storage_index(self, si):
68 self.storage_index = si
69 def set_mode(self, mode):
71 def set_privkey_from(self, peerid):
72 self.privkey_from = peerid
73 def set_status(self, status):
75 def set_progress(self, value):
77 def set_active(self, value):
79 def set_finished(self, when):
83 """I record the placement of mutable shares.
85 This object records which shares (of various versions) are located on
88 One purpose I serve is to inform callers about which versions of the
89 mutable file are recoverable and 'current'.
91 A second purpose is to serve as a state marker for test-and-set
92 operations. I am passed out of retrieval operations and back into publish
93 operations, which means 'publish this new version, but only if nothing
94 has changed since I last retrieved this data'. This reduces the chances
95 of clobbering a simultaneous (uncoordinated) write.
97 @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
98 (versionid, timestamp) tuple. Each 'versionid' is a
99 tuple of (seqnum, root_hash, IV, segsize, datalength,
100 k, N, signed_prefix, offsets)
102 @ivar connections: maps peerid to a RemoteReference
104 @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
105 shares that I should ignore (because a previous user of
106 the servermap determined that they were invalid). The
107 updater only locates a certain number of shares: if
108 some of these turn out to have integrity problems and
109 are unusable, the caller will need to mark those shares
110 as bad, then re-update the servermap, then try again.
115 self.connections = {}
116 self.unreachable_peers = set() # peerids that didn't respond to queries
117 self.problems = [] # mostly for debugging
118 self.bad_shares = {} # maps (peerid,shnum) to old checkstring
119 self.last_update_mode = None
120 self.last_update_time = 0
122 def mark_bad_share(self, peerid, shnum, checkstring):
123 """This share was found to be bad, either in the checkstring or
124 signature (detected during mapupdate), or deeper in the share
125 (detected at retrieve time). Remove it from our list of useful
126 shares, and remember that it is bad so we don't add it back again
127 later. We record the share's old checkstring (which might be
128 corrupted or badly signed) so that a repair operation can do the
129 test-and-set using it as a reference.
131 key = (peerid, shnum) # record checkstring
132 self.bad_shares[key] = checkstring
133 self.servermap.pop(key, None)
135 def add_new_share(self, peerid, shnum, verinfo, timestamp):
136 """We've written a new share out, replacing any that was there
138 key = (peerid, shnum)
139 self.bad_shares.pop(key, None)
140 self.servermap[key] = (verinfo, timestamp)
142 def dump(self, out=sys.stdout):
143 print >>out, "servermap:"
145 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
146 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
147 offsets_tuple) = verinfo
148 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
149 (idlib.shortnodeid_b2a(peerid), shnum,
150 seqnum, base32.b2a(root_hash)[:4], k, N,
153 print >>out, "%d PROBLEMS" % len(self.problems)
154 for f in self.problems:
163 def make_sharemap(self):
164 """Return a dict that maps shnum to a set of peerds that hold it."""
165 sharemap = DictOfSets()
166 for (peerid, shnum) in self.servermap:
167 sharemap.add(shnum, peerid)
170 def make_versionmap(self):
171 """Return a dict that maps versionid to sets of (shnum, peerid,
172 timestamp) tuples."""
173 versionmap = DictOfSets()
174 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
175 versionmap.add(verinfo, (shnum, peerid, timestamp))
178 def shares_on_peer(self, peerid):
180 for (s_peerid, shnum)
182 if s_peerid == peerid])
184 def version_on_peer(self, peerid, shnum):
185 key = (peerid, shnum)
186 if key in self.servermap:
187 (verinfo, timestamp) = self.servermap[key]
191 def shares_available(self):
192 """Return a dict that maps verinfo to tuples of
193 (num_distinct_shares, k, N) tuples."""
194 versionmap = self.make_versionmap()
196 for verinfo, shares in versionmap.items():
198 for (shnum, peerid, timestamp) in shares:
200 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
201 offsets_tuple) = verinfo
202 all_shares[verinfo] = (len(s), k, N)
205 def highest_seqnum(self):
206 available = self.shares_available()
207 seqnums = [verinfo[0]
208 for verinfo in available.keys()]
212 def summarize_version(self, verinfo):
213 """Take a versionid, return a string that describes it."""
214 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
215 offsets_tuple) = verinfo
216 return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
218 def summarize_versions(self):
219 """Return a string describing which versions we know about."""
220 versionmap = self.make_versionmap()
222 for (verinfo, shares) in versionmap.items():
223 vstr = self.summarize_version(verinfo)
224 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
225 bits.append("%d*%s" % (len(shnums), vstr))
226 return "/".join(bits)
228 def recoverable_versions(self):
229 """Return a set of versionids, one for each version that is currently
231 versionmap = self.make_versionmap()
233 recoverable_versions = set()
234 for (verinfo, shares) in versionmap.items():
235 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
236 offsets_tuple) = verinfo
237 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
239 # this one is recoverable
240 recoverable_versions.add(verinfo)
242 return recoverable_versions
244 def unrecoverable_versions(self):
245 """Return a set of versionids, one for each version that is currently
247 versionmap = self.make_versionmap()
249 unrecoverable_versions = set()
250 for (verinfo, shares) in versionmap.items():
251 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
252 offsets_tuple) = verinfo
253 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
255 unrecoverable_versions.add(verinfo)
257 return unrecoverable_versions
259 def best_recoverable_version(self):
260 """Return a single versionid, for the so-called 'best' recoverable
261 version. Sequence number is the primary sort criteria, followed by
262 root hash. Returns None if there are no recoverable versions."""
263 recoverable = list(self.recoverable_versions())
266 return recoverable[-1]
269 def size_of_version(self, verinfo):
270 """Given a versionid (perhaps returned by best_recoverable_version),
271 return the size of the file in bytes."""
272 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
273 offsets_tuple) = verinfo
276 def unrecoverable_newer_versions(self):
277 # Return a dict of versionid -> health, for versions that are
278 # unrecoverable and have later seqnums than any recoverable versions.
279 # These indicate that a write will lose data.
280 versionmap = self.make_versionmap()
281 healths = {} # maps verinfo to (found,k)
282 unrecoverable = set()
283 highest_recoverable_seqnum = -1
284 for (verinfo, shares) in versionmap.items():
285 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
286 offsets_tuple) = verinfo
287 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
288 healths[verinfo] = (len(shnums),k)
290 unrecoverable.add(verinfo)
292 highest_recoverable_seqnum = max(seqnum,
293 highest_recoverable_seqnum)
296 for verinfo in unrecoverable:
297 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
298 offsets_tuple) = verinfo
299 if seqnum > highest_recoverable_seqnum:
300 newversions[verinfo] = healths[verinfo]
305 def needs_merge(self):
306 # return True if there are multiple recoverable versions with the
307 # same seqnum, meaning that MutableFileNode.read_best_version is not
308 # giving you the whole story, and that using its data to do a
309 # subsequent publish will lose information.
310 return bool(len(self.recoverable_versions()) > 1)
313 class ServermapUpdater:
314 def __init__(self, filenode, servermap, mode=MODE_READ):
315 """I update a servermap, locating a sufficient number of useful
316 shares and remembering where they are located.
320 self._node = filenode
321 self._servermap = servermap
325 self._storage_index = filenode.get_storage_index()
326 self._last_failure = None
328 self._status = UpdateStatus()
329 self._status.set_storage_index(self._storage_index)
330 self._status.set_progress(0.0)
331 self._status.set_mode(mode)
333 # how much data should we read?
334 # * if we only need the checkstring, then [0:75]
335 # * if we need to validate the checkstring sig, then [543ish:799ish]
336 # * if we need the verification key, then [107:436ish]
337 # * the offset table at [75:107] tells us about the 'ish'
338 # * if we need the encrypted private key, we want [-1216ish:]
339 # * but we can't read from negative offsets
340 # * the offset table tells us the 'ish', also the positive offset
341 # A future version of the SMDF slot format should consider using
342 # fixed-size slots so we can retrieve less data. For now, we'll just
343 # read 2000 bytes, which also happens to read enough actual data to
344 # pre-fetch a 9-entry dirnode.
345 self._read_size = 2000
346 if mode == MODE_CHECK:
347 # we use unpack_prefix_and_signature, so we need 1k
348 self._read_size = 1000
349 self._need_privkey = False
350 if mode == MODE_WRITE and not self._node._privkey:
351 self._need_privkey = True
353 prefix = storage.si_b2a(self._storage_index)[:5]
354 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
355 si=prefix, mode=mode)
357 def get_status(self):
360 def log(self, *args, **kwargs):
361 if "parent" not in kwargs:
362 kwargs["parent"] = self._log_number
363 if "facility" not in kwargs:
364 kwargs["facility"] = "tahoe.mutable.mapupdate"
365 return log.msg(*args, **kwargs)
368 """Update the servermap to reflect current conditions. Returns a
369 Deferred that fires with the servermap once the update has finished."""
370 self._started = time.time()
371 self._status.set_active(True)
373 # self._valid_versions is a set of validated verinfo tuples. We just
374 # use it to remember which versions had valid signatures, so we can
375 # avoid re-checking the signatures for each share.
376 self._valid_versions = set()
378 # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
379 # timestamp) tuples. This is used to figure out which versions might
380 # be retrievable, and to make the eventual data download faster.
381 self.versionmap = DictOfSets()
383 self._done_deferred = defer.Deferred()
385 # first, which peers should be talk to? Any that were in our old
386 # servermap, plus "enough" others.
388 self._queries_completed = 0
390 client = self._node._client
391 full_peerlist = client.get_permuted_peers("storage",
392 self._node._storage_index)
393 self.full_peerlist = full_peerlist # for use later, immutable
394 self.extra_peers = full_peerlist[:] # peers are removed as we use them
395 self._good_peers = set() # peers who had some shares
396 self._empty_peers = set() # peers who don't have any shares
397 self._bad_peers = set() # peers to whom our queries failed
399 k = self._node.get_required_shares()
403 N = self._node.get_required_shares()
407 # we want to send queries to at least this many peers (although we
408 # might not wait for all of their answers to come back)
409 self.num_peers_to_query = k + self.EPSILON
411 if self.mode == MODE_CHECK:
412 initial_peers_to_query = dict(full_peerlist)
413 must_query = set(initial_peers_to_query.keys())
414 self.extra_peers = []
415 elif self.mode == MODE_WRITE:
416 # we're planning to replace all the shares, so we want a good
417 # chance of finding them all. We will keep searching until we've
418 # seen epsilon that don't have a share.
419 self.num_peers_to_query = N + self.EPSILON
420 initial_peers_to_query, must_query = self._build_initial_querylist()
421 self.required_num_empty_peers = self.EPSILON
423 # TODO: arrange to read lots of data from k-ish servers, to avoid
424 # the extra round trip required to read large directories. This
425 # might also avoid the round trip required to read the encrypted
429 initial_peers_to_query, must_query = self._build_initial_querylist()
431 # this is a set of peers that we are required to get responses from:
432 # they are peers who used to have a share, so we need to know where
433 # they currently stand, even if that means we have to wait for a
434 # silently-lost TCP connection to time out. We remove peers from this
435 # set as we get responses.
436 self._must_query = must_query
438 # now initial_peers_to_query contains the peers that we should ask,
439 # self.must_query contains the peers that we must have heard from
440 # before we can consider ourselves finished, and self.extra_peers
441 # contains the overflow (peers that we should tap if we don't get
444 self._send_initial_requests(initial_peers_to_query)
445 self._status.timings["initial_queries"] = time.time() - self._started
446 return self._done_deferred
448 def _build_initial_querylist(self):
449 initial_peers_to_query = {}
451 for peerid in self._servermap.all_peers():
452 ss = self._servermap.connections[peerid]
453 # we send queries to everyone who was already in the sharemap
454 initial_peers_to_query[peerid] = ss
455 # and we must wait for responses from them
456 must_query.add(peerid)
458 while ((self.num_peers_to_query > len(initial_peers_to_query))
459 and self.extra_peers):
460 (peerid, ss) = self.extra_peers.pop(0)
461 initial_peers_to_query[peerid] = ss
463 return initial_peers_to_query, must_query
465 def _send_initial_requests(self, peerlist):
466 self._status.set_status("Sending %d initial queries" % len(peerlist))
467 self._queries_outstanding = set()
468 self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
470 for (peerid, ss) in peerlist.items():
471 self._queries_outstanding.add(peerid)
472 self._do_query(ss, peerid, self._storage_index, self._read_size)
475 # there is nobody to ask, so we need to short-circuit the state
477 d = defer.maybeDeferred(self._check_for_done, None)
478 d.addErrback(self._fatal_error)
480 # control flow beyond this point: state machine. Receiving responses
481 # from queries is the input. We might send out more queries, or we
482 # might produce a result.
485 def _do_query(self, ss, peerid, storage_index, readsize):
486 self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
487 peerid=idlib.shortnodeid_b2a(peerid),
490 self._servermap.connections[peerid] = ss
491 started = time.time()
492 self._queries_outstanding.add(peerid)
493 d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
494 d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
496 d.addErrback(self._query_failed, peerid)
497 # errors that aren't handled by _query_failed (and errors caused by
498 # _query_failed) get logged, but we still want to check for doneness.
499 d.addErrback(log.err)
500 d.addBoth(self._check_for_done)
501 d.addErrback(self._fatal_error)
504 def _do_read(self, ss, peerid, storage_index, shnums, readv):
505 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
508 def _got_results(self, datavs, peerid, readsize, stuff, started):
509 lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
510 peerid=idlib.shortnodeid_b2a(peerid),
511 numshares=len(datavs),
514 elapsed = now - started
515 self._queries_outstanding.discard(peerid)
516 self._must_query.discard(peerid)
517 self._queries_completed += 1
518 if not self._running:
519 self.log("but we're not running, so we'll ignore it", parent=lp,
521 self._status.add_per_server_time(peerid, "late", started, elapsed)
523 self._status.add_per_server_time(peerid, "query", started, elapsed)
526 self._good_peers.add(peerid)
528 self._empty_peers.add(peerid)
532 for shnum,datav in datavs.items():
535 verinfo = self._got_results_one_share(shnum, data, peerid, lp)
536 last_verinfo = verinfo
538 self._node._cache.add(verinfo, shnum, 0, data, now)
539 except CorruptShareError, e:
540 # log it and give the other shares a chance to be processed
541 f = failure.Failure()
542 self.log(format="bad share: %(f_value)s", f_value=str(f.value),
543 failure=f, parent=lp, level=log.WEIRD)
544 self._bad_peers.add(peerid)
545 self._last_failure = f
546 checkstring = data[:SIGNED_PREFIX_LENGTH]
547 self._servermap.mark_bad_share(peerid, shnum, checkstring)
548 self._servermap.problems.append(f)
551 self._status.timings["cumulative_verify"] += (time.time() - now)
553 if self._need_privkey and last_verinfo:
554 # send them a request for the privkey. We send one request per
556 lp2 = self.log("sending privkey request",
557 parent=lp, level=log.NOISY)
558 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
559 offsets_tuple) = last_verinfo
560 o = dict(offsets_tuple)
562 self._queries_outstanding.add(peerid)
563 readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
564 ss = self._servermap.connections[peerid]
565 privkey_started = time.time()
566 d = self._do_read(ss, peerid, self._storage_index,
568 d.addCallback(self._got_privkey_results, peerid, last_shnum,
569 privkey_started, lp2)
570 d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
571 d.addErrback(log.err)
572 d.addCallback(self._check_for_done)
573 d.addErrback(self._fatal_error)
576 self.log("_got_results done", parent=lp, level=log.NOISY)
578 def _got_results_one_share(self, shnum, data, peerid, lp):
579 self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
581 peerid=idlib.shortnodeid_b2a(peerid),
585 # this might raise NeedMoreDataError, if the pubkey and signature
586 # live at some weird offset. That shouldn't happen, so I'm going to
587 # treat it as a bad share.
588 (seqnum, root_hash, IV, k, N, segsize, datalength,
589 pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
591 if not self._node.get_pubkey():
592 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
593 assert len(fingerprint) == 32
594 if fingerprint != self._node._fingerprint:
595 raise CorruptShareError(peerid, shnum,
596 "pubkey doesn't match fingerprint")
597 self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
599 if self._need_privkey:
600 self._try_to_extract_privkey(data, peerid, shnum, lp)
602 (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
603 ig_segsize, ig_datalen, offsets) = unpack_header(data)
604 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
606 verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
609 if verinfo not in self._valid_versions:
610 # it's a new pair. Verify the signature.
611 valid = self._node._pubkey.verify(prefix, signature)
613 raise CorruptShareError(peerid, shnum, "signature is invalid")
615 # ok, it's a valid verinfo. Add it to the list of validated
617 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
618 % (seqnum, base32.b2a(root_hash)[:4],
619 idlib.shortnodeid_b2a(peerid), shnum,
620 k, N, segsize, datalength),
622 self._valid_versions.add(verinfo)
623 # We now know that this is a valid candidate verinfo.
625 if (peerid, shnum) in self._servermap.bad_shares:
626 # we've been told that the rest of the data in this share is
627 # unusable, so don't add it to the servermap.
628 self.log("but we've been told this is a bad share",
629 parent=lp, level=log.UNUSUAL)
632 # Add the info to our servermap.
633 timestamp = time.time()
634 self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
636 self.versionmap.add(verinfo, (shnum, peerid, timestamp))
639 def _deserialize_pubkey(self, pubkey_s):
640 verifier = rsa.create_verifying_key_from_string(pubkey_s)
643 def _try_to_extract_privkey(self, data, peerid, shnum, lp):
645 r = unpack_share(data)
646 except NeedMoreDataError, e:
647 # this share won't help us. oh well.
648 offset = e.encprivkey_offset
649 length = e.encprivkey_length
650 self.log("shnum %d on peerid %s: share was too short (%dB) "
651 "to get the encprivkey; [%d:%d] ought to hold it" %
652 (shnum, idlib.shortnodeid_b2a(peerid), len(data),
653 offset, offset+length),
655 # NOTE: if uncoordinated writes are taking place, someone might
656 # change the share (and most probably move the encprivkey) before
657 # we get a chance to do one of these reads and fetch it. This
658 # will cause us to see a NotEnoughSharesError(unable to fetch
659 # privkey) instead of an UncoordinatedWriteError . This is a
660 # nuisance, but it will go away when we move to DSA-based mutable
661 # files (since the privkey will be small enough to fit in the
666 (seqnum, root_hash, IV, k, N, segsize, datalen,
667 pubkey, signature, share_hash_chain, block_hash_tree,
668 share_data, enc_privkey) = r
670 return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
672 def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
674 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
675 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
676 if alleged_writekey != self._node.get_writekey():
677 self.log("invalid privkey from %s shnum %d" %
678 (idlib.nodeid_b2a(peerid)[:8], shnum),
679 parent=lp, level=log.WEIRD)
683 self.log("got valid privkey from shnum %d on peerid %s" %
684 (shnum, idlib.shortnodeid_b2a(peerid)),
686 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
687 self._node._populate_encprivkey(enc_privkey)
688 self._node._populate_privkey(privkey)
689 self._need_privkey = False
690 self._status.set_privkey_from(peerid)
693 def _query_failed(self, f, peerid):
694 self.log(format="error during query: %(f_value)s",
695 f_value=str(f.value), failure=f, level=log.WEIRD)
696 if not self._running:
698 self._must_query.discard(peerid)
699 self._queries_outstanding.discard(peerid)
700 self._bad_peers.add(peerid)
701 self._servermap.problems.append(f)
702 self._servermap.unreachable_peers.add(peerid) # TODO: overkill?
703 self._queries_completed += 1
704 self._last_failure = f
706 def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
708 elapsed = now - started
709 self._status.add_per_server_time(peerid, "privkey", started, elapsed)
710 self._queries_outstanding.discard(peerid)
711 if not self._need_privkey:
713 if shnum not in datavs:
714 self.log("privkey wasn't there when we asked it", level=log.WEIRD)
716 datav = datavs[shnum]
717 enc_privkey = datav[0]
718 self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
720 def _privkey_query_failed(self, f, peerid, shnum, lp):
721 self._queries_outstanding.discard(peerid)
722 self.log(format="error during privkey query: %(f_value)s",
723 f_value=str(f.value), failure=f,
724 parent=lp, level=log.WEIRD)
725 if not self._running:
727 self._queries_outstanding.discard(peerid)
728 self._servermap.problems.append(f)
729 self._last_failure = f
731 def _check_for_done(self, res):
733 # return self._send_more_queries(outstanding) : send some more queries
734 # return self._done() : all done
735 # return : keep waiting, no new queries
737 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
738 "%(outstanding)d queries outstanding, "
739 "%(extra)d extra peers available, "
740 "%(must)d 'must query' peers left, "
741 "need_privkey=%(need_privkey)s"
744 outstanding=len(self._queries_outstanding),
745 extra=len(self.extra_peers),
746 must=len(self._must_query),
747 need_privkey=self._need_privkey,
751 if not self._running:
752 self.log("but we're not running", parent=lp, level=log.NOISY)
756 # we are still waiting for responses from peers that used to have
757 # a share, so we must continue to wait. No additional queries are
758 # required at this time.
759 self.log("%d 'must query' peers left" % len(self._must_query),
760 level=log.NOISY, parent=lp)
763 if (not self._queries_outstanding and not self.extra_peers):
764 # all queries have retired, and we have no peers left to ask. No
765 # more progress can be made, therefore we are done.
766 self.log("all queries are retired, no extra peers: done",
770 recoverable_versions = self._servermap.recoverable_versions()
771 unrecoverable_versions = self._servermap.unrecoverable_versions()
773 # what is our completion policy? how hard should we work?
775 if self.mode == MODE_ANYTHING:
776 if recoverable_versions:
777 self.log("%d recoverable versions: done"
778 % len(recoverable_versions),
782 if self.mode == MODE_CHECK:
783 # we used self._must_query, and we know there aren't any
784 # responses still waiting, so that means we must be done
785 self.log("done", parent=lp)
789 if self.mode == MODE_READ:
790 # if we've queried k+epsilon servers, and we see a recoverable
791 # version, and we haven't seen any unrecoverable higher-seqnum'ed
792 # versions, then we're done.
794 if self._queries_completed < self.num_peers_to_query:
795 self.log(format="%(completed)d completed, %(query)d to query: need more",
796 completed=self._queries_completed,
797 query=self.num_peers_to_query,
798 level=log.NOISY, parent=lp)
799 return self._send_more_queries(MAX_IN_FLIGHT)
800 if not recoverable_versions:
801 self.log("no recoverable versions: need more",
802 level=log.NOISY, parent=lp)
803 return self._send_more_queries(MAX_IN_FLIGHT)
804 highest_recoverable = max(recoverable_versions)
805 highest_recoverable_seqnum = highest_recoverable[0]
806 for unrec_verinfo in unrecoverable_versions:
807 if unrec_verinfo[0] > highest_recoverable_seqnum:
808 # there is evidence of a higher-seqnum version, but we
809 # don't yet see enough shares to recover it. Try harder.
810 # TODO: consider sending more queries.
811 # TODO: consider limiting the search distance
812 self.log("evidence of higher seqnum: need more",
813 level=log.UNUSUAL, parent=lp)
814 return self._send_more_queries(MAX_IN_FLIGHT)
815 # all the unrecoverable versions were old or concurrent with a
816 # recoverable version. Good enough.
817 self.log("no higher-seqnum: done", parent=lp)
820 if self.mode == MODE_WRITE:
821 # we want to keep querying until we've seen a few that don't have
822 # any shares, to be sufficiently confident that we've seen all
823 # the shares. This is still less work than MODE_CHECK, which asks
824 # every server in the world.
826 if not recoverable_versions:
827 self.log("no recoverable versions: need more", parent=lp,
829 return self._send_more_queries(MAX_IN_FLIGHT)
832 last_not_responded = -1
833 num_not_responded = 0
836 found_boundary = False
838 for i,(peerid,ss) in enumerate(self.full_peerlist):
839 if peerid in self._bad_peers:
842 #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
843 elif peerid in self._empty_peers:
846 #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
849 if num_not_found >= self.EPSILON:
850 self.log("found our boundary, %s" %
852 parent=lp, level=log.NOISY)
853 found_boundary = True
856 elif peerid in self._good_peers:
859 #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
865 #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
866 last_not_responded = i
867 num_not_responded += 1
870 # we need to know that we've gotten answers from
871 # everybody to the left of here
872 if last_not_responded == -1:
874 self.log("have all our answers",
875 parent=lp, level=log.NOISY)
876 # .. unless we're still waiting on the privkey
877 if self._need_privkey:
878 self.log("but we're still waiting for the privkey",
879 parent=lp, level=log.NOISY)
880 # if we found the boundary but we haven't yet found
881 # the privkey, we may need to look further. If
882 # somehow all the privkeys were corrupted (but the
883 # shares were readable), then this is likely to do an
885 return self._send_more_queries(MAX_IN_FLIGHT)
887 # still waiting for somebody
888 return self._send_more_queries(num_not_responded)
890 # if we hit here, we didn't find our boundary, so we're still
892 self.log("no boundary yet, %s" % "".join(states), parent=lp,
894 return self._send_more_queries(MAX_IN_FLIGHT)
896 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
897 # arbitrary, really I want this to be something like k -
898 # max(known_version_sharecounts) + some extra
899 self.log("catchall: need more", parent=lp, level=log.NOISY)
900 return self._send_more_queries(MAX_IN_FLIGHT)
902 def _send_more_queries(self, num_outstanding):
906 self.log(format=" there are %(outstanding)d queries outstanding",
907 outstanding=len(self._queries_outstanding),
909 active_queries = len(self._queries_outstanding) + len(more_queries)
910 if active_queries >= num_outstanding:
912 if not self.extra_peers:
914 more_queries.append(self.extra_peers.pop(0))
916 self.log(format="sending %(more)d more queries: %(who)s",
917 more=len(more_queries),
918 who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
919 for (peerid,ss) in more_queries]),
922 for (peerid, ss) in more_queries:
923 self._do_query(ss, peerid, self._storage_index, self._read_size)
924 # we'll retrigger when those queries come back
927 if not self._running:
929 self._running = False
931 elapsed = now - self._started
932 self._status.set_finished(now)
933 self._status.timings["total"] = elapsed
934 self._status.set_progress(1.0)
935 self._status.set_status("Done")
936 self._status.set_active(False)
938 self._servermap.last_update_mode = self.mode
939 self._servermap.last_update_time = self._started
940 # the servermap will not be touched after this
941 self.log("servermap: %s" % self._servermap.summarize_versions())
942 eventually(self._done_deferred.callback, self._servermap)
944 def _fatal_error(self, f):
945 self.log("fatal error", failure=f, level=log.WEIRD)
946 self._done_deferred.errback(f)