3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.eventual import eventually
8 from allmydata.util import base32, hashutil, idlib, log
9 from allmydata import storage
10 from allmydata.interfaces import IServermapUpdaterStatus
11 from pycryptopp.publickey import rsa
13 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
14 DictOfSets, CorruptShareError, NeedMoreDataError
15 from layout import unpack_prefix_and_signature, unpack_header, unpack_share
18 implements(IServermapUpdaterStatus)
19 statusid_counter = count(0)
22 self.timings["per_server"] = {}
23 self.timings["cumulative_verify"] = 0.0
24 self.privkey_from = None
27 self.storage_index = None
29 self.status = "Not started"
31 self.counter = self.statusid_counter.next()
32 self.started = time.time()
35 def add_per_server_time(self, peerid, op, sent, elapsed):
36 assert op in ("query", "late", "privkey")
37 if peerid not in self.timings["per_server"]:
38 self.timings["per_server"][peerid] = []
39 self.timings["per_server"][peerid].append((op,sent,elapsed))
41 def get_started(self):
43 def get_finished(self):
45 def get_storage_index(self):
46 return self.storage_index
49 def get_servermap(self):
51 def get_privkey_from(self):
52 return self.privkey_from
53 def using_helper(self):
59 def get_progress(self):
63 def get_counter(self):
66 def set_storage_index(self, si):
67 self.storage_index = si
68 def set_mode(self, mode):
70 def set_privkey_from(self, peerid):
71 self.privkey_from = peerid
72 def set_status(self, status):
74 def set_progress(self, value):
76 def set_active(self, value):
78 def set_finished(self, when):
82 """I record the placement of mutable shares.
84 This object records which shares (of various versions) are located on
87 One purpose I serve is to inform callers about which versions of the
88 mutable file are recoverable and 'current'.
90 A second purpose is to serve as a state marker for test-and-set
91 operations. I am passed out of retrieval operations and back into publish
92 operations, which means 'publish this new version, but only if nothing
93 has changed since I last retrieved this data'. This reduces the chances
94 of clobbering a simultaneous (uncoordinated) write.
96 @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
97 (versionid, timestamp) tuple. Each 'versionid' is a
98 tuple of (seqnum, root_hash, IV, segsize, datalength,
99 k, N, signed_prefix, offsets)
101 @ivar connections: maps peerid to a RemoteReference
103 @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
104 shares that I should ignore (because a previous user of
105 the servermap determined that they were invalid). The
106 updater only locates a certain number of shares: if
107 some of these turn out to have integrity problems and
108 are unusable, the caller will need to mark those shares
109 as bad, then re-update the servermap, then try again.
114 self.connections = {}
115 self.unreachable_peers = set() # peerids that didn't respond to queries
116 self.problems = [] # mostly for debugging
117 self.bad_shares = set()
118 self.last_update_mode = None
119 self.last_update_time = 0
121 def mark_bad_share(self, peerid, shnum):
122 """This share was found to be bad, not in the checkstring or
123 signature, but deeper in the share, detected at retrieve time. Remove
124 it from our list of useful shares, and remember that it is bad so we
125 don't add it back again later.
127 key = (peerid, shnum)
128 self.bad_shares.add(key)
129 self.servermap.pop(key, None)
131 def add_new_share(self, peerid, shnum, verinfo, timestamp):
132 """We've written a new share out, replacing any that was there
134 key = (peerid, shnum)
135 self.bad_shares.discard(key)
136 self.servermap[key] = (verinfo, timestamp)
138 def dump(self, out=sys.stdout):
139 print >>out, "servermap:"
141 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
142 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
143 offsets_tuple) = verinfo
144 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
145 (idlib.shortnodeid_b2a(peerid), shnum,
146 seqnum, base32.b2a(root_hash)[:4], k, N,
149 print >>out, "%d PROBLEMS" % len(self.problems)
150 for f in self.problems:
159 def make_sharemap(self):
160 """Return a dict that maps shnum to a set of peerds that hold it."""
161 sharemap = DictOfSets()
162 for (peerid, shnum) in self.servermap:
163 sharemap.add(shnum, peerid)
166 def make_versionmap(self):
167 """Return a dict that maps versionid to sets of (shnum, peerid,
168 timestamp) tuples."""
169 versionmap = DictOfSets()
170 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
171 versionmap.add(verinfo, (shnum, peerid, timestamp))
174 def shares_on_peer(self, peerid):
176 for (s_peerid, shnum)
178 if s_peerid == peerid])
180 def version_on_peer(self, peerid, shnum):
181 key = (peerid, shnum)
182 if key in self.servermap:
183 (verinfo, timestamp) = self.servermap[key]
187 def shares_available(self):
188 """Return a dict that maps verinfo to tuples of
189 (num_distinct_shares, k) tuples."""
190 versionmap = self.make_versionmap()
192 for verinfo, shares in versionmap.items():
194 for (shnum, peerid, timestamp) in shares:
196 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
197 offsets_tuple) = verinfo
198 all_shares[verinfo] = (len(s), k)
201 def highest_seqnum(self):
202 available = self.shares_available()
203 seqnums = [verinfo[0]
204 for verinfo in available.keys()]
208 def summarize_versions(self):
209 """Return a string describing which versions we know about."""
210 versionmap = self.make_versionmap()
212 for (verinfo, shares) in versionmap.items():
213 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
214 offsets_tuple) = verinfo
215 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
216 bits.append("%d*seq%d-%s" %
217 (len(shnums), seqnum, base32.b2a(root_hash)[:4]))
218 return "/".join(bits)
220 def recoverable_versions(self):
221 """Return a set of versionids, one for each version that is currently
223 versionmap = self.make_versionmap()
225 recoverable_versions = set()
226 for (verinfo, shares) in versionmap.items():
227 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
228 offsets_tuple) = verinfo
229 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
231 # this one is recoverable
232 recoverable_versions.add(verinfo)
234 return recoverable_versions
236 def unrecoverable_versions(self):
237 """Return a set of versionids, one for each version that is currently
239 versionmap = self.make_versionmap()
241 unrecoverable_versions = set()
242 for (verinfo, shares) in versionmap.items():
243 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
244 offsets_tuple) = verinfo
245 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
247 unrecoverable_versions.add(verinfo)
249 return unrecoverable_versions
251 def best_recoverable_version(self):
252 """Return a single versionid, for the so-called 'best' recoverable
253 version. Sequence number is the primary sort criteria, followed by
254 root hash. Returns None if there are no recoverable versions."""
255 recoverable = list(self.recoverable_versions())
258 return recoverable[-1]
261 def unrecoverable_newer_versions(self):
262 # Return a dict of versionid -> health, for versions that are
263 # unrecoverable and have later seqnums than any recoverable versions.
264 # These indicate that a write will lose data.
265 versionmap = self.make_versionmap()
266 healths = {} # maps verinfo to (found,k)
267 unrecoverable = set()
268 highest_recoverable_seqnum = -1
269 for (verinfo, shares) in versionmap.items():
270 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
271 offsets_tuple) = verinfo
272 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
273 healths[verinfo] = (len(shnums),k)
275 unrecoverable.add(verinfo)
277 highest_recoverable_seqnum = max(seqnum,
278 highest_recoverable_seqnum)
281 for verinfo in unrecoverable:
282 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
283 offsets_tuple) = verinfo
284 if seqnum > highest_recoverable_seqnum:
285 newversions[verinfo] = healths[verinfo]
290 def needs_merge(self):
291 # return True if there are multiple recoverable versions with the
292 # same seqnum, meaning that MutableFileNode.read_best_version is not
293 # giving you the whole story, and that using its data to do a
294 # subsequent publish will lose information.
295 return bool(len(self.recoverable_versions()) > 1)
298 class ServermapUpdater:
299 def __init__(self, filenode, servermap, mode=MODE_READ):
300 """I update a servermap, locating a sufficient number of useful
301 shares and remembering where they are located.
305 self._node = filenode
306 self._servermap = servermap
310 self._storage_index = filenode.get_storage_index()
311 self._last_failure = None
313 self._status = UpdateStatus()
314 self._status.set_storage_index(self._storage_index)
315 self._status.set_progress(0.0)
316 self._status.set_mode(mode)
318 # how much data should we read?
319 # * if we only need the checkstring, then [0:75]
320 # * if we need to validate the checkstring sig, then [543ish:799ish]
321 # * if we need the verification key, then [107:436ish]
322 # * the offset table at [75:107] tells us about the 'ish'
323 # * if we need the encrypted private key, we want [-1216ish:]
324 # * but we can't read from negative offsets
325 # * the offset table tells us the 'ish', also the positive offset
326 # A future version of the SMDF slot format should consider using
327 # fixed-size slots so we can retrieve less data. For now, we'll just
328 # read 2000 bytes, which also happens to read enough actual data to
329 # pre-fetch a 9-entry dirnode.
330 self._read_size = 2000
331 if mode == MODE_CHECK:
332 # we use unpack_prefix_and_signature, so we need 1k
333 self._read_size = 1000
334 self._need_privkey = False
335 if mode == MODE_WRITE and not self._node._privkey:
336 self._need_privkey = True
338 prefix = storage.si_b2a(self._storage_index)[:5]
339 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
340 si=prefix, mode=mode)
342 def get_status(self):
345 def log(self, *args, **kwargs):
346 if "parent" not in kwargs:
347 kwargs["parent"] = self._log_number
348 return log.msg(*args, **kwargs)
351 """Update the servermap to reflect current conditions. Returns a
352 Deferred that fires with the servermap once the update has finished."""
353 self._started = time.time()
354 self._status.set_active(True)
356 # self._valid_versions is a set of validated verinfo tuples. We just
357 # use it to remember which versions had valid signatures, so we can
358 # avoid re-checking the signatures for each share.
359 self._valid_versions = set()
361 # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
362 # timestamp) tuples. This is used to figure out which versions might
363 # be retrievable, and to make the eventual data download faster.
364 self.versionmap = DictOfSets()
366 self._done_deferred = defer.Deferred()
368 # first, which peers should be talk to? Any that were in our old
369 # servermap, plus "enough" others.
371 self._queries_completed = 0
373 client = self._node._client
374 full_peerlist = client.get_permuted_peers("storage",
375 self._node._storage_index)
376 self.full_peerlist = full_peerlist # for use later, immutable
377 self.extra_peers = full_peerlist[:] # peers are removed as we use them
378 self._good_peers = set() # peers who had some shares
379 self._empty_peers = set() # peers who don't have any shares
380 self._bad_peers = set() # peers to whom our queries failed
382 k = self._node.get_required_shares()
386 N = self._node.get_required_shares()
390 # we want to send queries to at least this many peers (although we
391 # might not wait for all of their answers to come back)
392 self.num_peers_to_query = k + self.EPSILON
394 if self.mode == MODE_CHECK:
395 initial_peers_to_query = dict(full_peerlist)
396 must_query = set(initial_peers_to_query.keys())
397 self.extra_peers = []
398 elif self.mode == MODE_WRITE:
399 # we're planning to replace all the shares, so we want a good
400 # chance of finding them all. We will keep searching until we've
401 # seen epsilon that don't have a share.
402 self.num_peers_to_query = N + self.EPSILON
403 initial_peers_to_query, must_query = self._build_initial_querylist()
404 self.required_num_empty_peers = self.EPSILON
406 # TODO: arrange to read lots of data from k-ish servers, to avoid
407 # the extra round trip required to read large directories. This
408 # might also avoid the round trip required to read the encrypted
412 initial_peers_to_query, must_query = self._build_initial_querylist()
414 # this is a set of peers that we are required to get responses from:
415 # they are peers who used to have a share, so we need to know where
416 # they currently stand, even if that means we have to wait for a
417 # silently-lost TCP connection to time out. We remove peers from this
418 # set as we get responses.
419 self._must_query = must_query
421 # now initial_peers_to_query contains the peers that we should ask,
422 # self.must_query contains the peers that we must have heard from
423 # before we can consider ourselves finished, and self.extra_peers
424 # contains the overflow (peers that we should tap if we don't get
427 self._send_initial_requests(initial_peers_to_query)
428 self._status.timings["setup"] = time.time() - self._started
429 return self._done_deferred
431 def _build_initial_querylist(self):
432 initial_peers_to_query = {}
434 for peerid in self._servermap.all_peers():
435 ss = self._servermap.connections[peerid]
436 # we send queries to everyone who was already in the sharemap
437 initial_peers_to_query[peerid] = ss
438 # and we must wait for responses from them
439 must_query.add(peerid)
441 while ((self.num_peers_to_query > len(initial_peers_to_query))
442 and self.extra_peers):
443 (peerid, ss) = self.extra_peers.pop(0)
444 initial_peers_to_query[peerid] = ss
446 return initial_peers_to_query, must_query
448 def _send_initial_requests(self, peerlist):
449 self._status.set_status("Sending %d initial queries" % len(peerlist))
450 self._queries_outstanding = set()
451 self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
453 for (peerid, ss) in peerlist.items():
454 self._queries_outstanding.add(peerid)
455 self._do_query(ss, peerid, self._storage_index, self._read_size)
457 # control flow beyond this point: state machine. Receiving responses
458 # from queries is the input. We might send out more queries, or we
459 # might produce a result.
462 def _do_query(self, ss, peerid, storage_index, readsize):
463 self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
464 peerid=idlib.shortnodeid_b2a(peerid),
467 self._servermap.connections[peerid] = ss
468 started = time.time()
469 self._queries_outstanding.add(peerid)
470 d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
471 d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
473 d.addErrback(self._query_failed, peerid)
474 # errors that aren't handled by _query_failed (and errors caused by
475 # _query_failed) get logged, but we still want to check for doneness.
476 d.addErrback(log.err)
477 d.addBoth(self._check_for_done)
478 d.addErrback(self._fatal_error)
481 def _do_read(self, ss, peerid, storage_index, shnums, readv):
482 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
485 def _got_results(self, datavs, peerid, readsize, stuff, started):
486 lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
487 peerid=idlib.shortnodeid_b2a(peerid),
488 numshares=len(datavs),
491 elapsed = now - started
492 self._queries_outstanding.discard(peerid)
493 self._must_query.discard(peerid)
494 self._queries_completed += 1
495 if not self._running:
496 self.log("but we're not running, so we'll ignore it", parent=lp)
497 self._status.add_per_server_time(peerid, "late", started, elapsed)
499 self._status.add_per_server_time(peerid, "query", started, elapsed)
502 self._good_peers.add(peerid)
504 self._empty_peers.add(peerid)
508 for shnum,datav in datavs.items():
511 verinfo = self._got_results_one_share(shnum, data, peerid)
512 last_verinfo = verinfo
514 except CorruptShareError, e:
515 # log it and give the other shares a chance to be processed
516 f = failure.Failure()
517 self.log("bad share: %s %s" % (f, f.value),
518 parent=lp, level=log.WEIRD)
519 self._bad_peers.add(peerid)
520 self._last_failure = f
521 self._servermap.problems.append(f)
524 self._status.timings["cumulative_verify"] += (time.time() - now)
526 if self._need_privkey and last_verinfo:
527 # send them a request for the privkey. We send one request per
529 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
530 offsets_tuple) = last_verinfo
531 o = dict(offsets_tuple)
533 self._queries_outstanding.add(peerid)
534 readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
535 ss = self._servermap.connections[peerid]
536 privkey_started = time.time()
537 d = self._do_read(ss, peerid, self._storage_index,
539 d.addCallback(self._got_privkey_results, peerid, last_shnum,
541 d.addErrback(self._privkey_query_failed, peerid, last_shnum)
542 d.addErrback(log.err)
543 d.addCallback(self._check_for_done)
544 d.addErrback(self._fatal_error)
547 self.log("_got_results done", parent=lp)
549 def _got_results_one_share(self, shnum, data, peerid):
550 lp = self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
552 peerid=idlib.shortnodeid_b2a(peerid))
554 # this might raise NeedMoreDataError, if the pubkey and signature
555 # live at some weird offset. That shouldn't happen, so I'm going to
556 # treat it as a bad share.
557 (seqnum, root_hash, IV, k, N, segsize, datalength,
558 pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
560 if not self._node._pubkey:
561 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
562 assert len(fingerprint) == 32
563 if fingerprint != self._node._fingerprint:
564 raise CorruptShareError(peerid, shnum,
565 "pubkey doesn't match fingerprint")
566 self._node._pubkey = self._deserialize_pubkey(pubkey_s)
568 if self._need_privkey:
569 self._try_to_extract_privkey(data, peerid, shnum)
571 (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
572 ig_segsize, ig_datalen, offsets) = unpack_header(data)
573 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
575 verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
578 if verinfo not in self._valid_versions:
579 # it's a new pair. Verify the signature.
580 valid = self._node._pubkey.verify(prefix, signature)
582 raise CorruptShareError(peerid, shnum, "signature is invalid")
584 # ok, it's a valid verinfo. Add it to the list of validated
586 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
587 % (seqnum, base32.b2a(root_hash)[:4],
588 idlib.shortnodeid_b2a(peerid), shnum,
589 k, N, segsize, datalength),
591 self._valid_versions.add(verinfo)
592 # We now know that this is a valid candidate verinfo.
594 if (peerid, shnum) in self._servermap.bad_shares:
595 # we've been told that the rest of the data in this share is
596 # unusable, so don't add it to the servermap.
597 self.log("but we've been told this is a bad share",
598 parent=lp, level=log.UNUSUAL)
601 # Add the info to our servermap.
602 timestamp = time.time()
603 self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
605 self.versionmap.add(verinfo, (shnum, peerid, timestamp))
608 def _deserialize_pubkey(self, pubkey_s):
609 verifier = rsa.create_verifying_key_from_string(pubkey_s)
612 def _try_to_extract_privkey(self, data, peerid, shnum):
614 r = unpack_share(data)
615 except NeedMoreDataError, e:
616 # this share won't help us. oh well.
617 offset = e.encprivkey_offset
618 length = e.encprivkey_length
619 self.log("shnum %d on peerid %s: share was too short (%dB) "
620 "to get the encprivkey; [%d:%d] ought to hold it" %
621 (shnum, idlib.shortnodeid_b2a(peerid), len(data),
622 offset, offset+length))
623 # NOTE: if uncoordinated writes are taking place, someone might
624 # change the share (and most probably move the encprivkey) before
625 # we get a chance to do one of these reads and fetch it. This
626 # will cause us to see a NotEnoughSharesError(unable to fetch
627 # privkey) instead of an UncoordinatedWriteError . This is a
628 # nuisance, but it will go away when we move to DSA-based mutable
629 # files (since the privkey will be small enough to fit in the
634 (seqnum, root_hash, IV, k, N, segsize, datalen,
635 pubkey, signature, share_hash_chain, block_hash_tree,
636 share_data, enc_privkey) = r
638 return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
640 def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
642 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
643 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
644 if alleged_writekey != self._node.get_writekey():
645 self.log("invalid privkey from %s shnum %d" %
646 (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
650 self.log("got valid privkey from shnum %d on peerid %s" %
651 (shnum, idlib.shortnodeid_b2a(peerid)))
652 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
653 self._node._populate_encprivkey(enc_privkey)
654 self._node._populate_privkey(privkey)
655 self._need_privkey = False
656 self._status.set_privkey_from(peerid)
659 def _query_failed(self, f, peerid):
660 self.log("error during query: %s %s" % (f, f.value), level=log.WEIRD)
661 if not self._running:
663 self._must_query.discard(peerid)
664 self._queries_outstanding.discard(peerid)
665 self._bad_peers.add(peerid)
666 self._servermap.problems.append(f)
667 self._servermap.unreachable_peers.add(peerid) # TODO: overkill?
668 self._queries_completed += 1
669 self._last_failure = f
671 def _got_privkey_results(self, datavs, peerid, shnum, started):
673 elapsed = now - started
674 self._status.add_per_server_time(peerid, "privkey", started, elapsed)
675 self._queries_outstanding.discard(peerid)
676 if not self._need_privkey:
678 if shnum not in datavs:
679 self.log("privkey wasn't there when we asked it", level=log.WEIRD)
681 datav = datavs[shnum]
682 enc_privkey = datav[0]
683 self._try_to_validate_privkey(enc_privkey, peerid, shnum)
685 def _privkey_query_failed(self, f, peerid, shnum):
686 self._queries_outstanding.discard(peerid)
687 self.log("error during privkey query: %s %s" % (f, f.value),
689 if not self._running:
691 self._queries_outstanding.discard(peerid)
692 self._servermap.problems.append(f)
693 self._last_failure = f
695 def _check_for_done(self, res):
697 # return self._send_more_queries(outstanding) : send some more queries
698 # return self._done() : all done
699 # return : keep waiting, no new queries
701 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
702 "%(outstanding)d queries outstanding, "
703 "%(extra)d extra peers available, "
704 "%(must)d 'must query' peers left"
707 outstanding=len(self._queries_outstanding),
708 extra=len(self.extra_peers),
709 must=len(self._must_query),
713 if not self._running:
714 self.log("but we're not running", parent=lp, level=log.NOISY)
718 # we are still waiting for responses from peers that used to have
719 # a share, so we must continue to wait. No additional queries are
720 # required at this time.
721 self.log("%d 'must query' peers left" % len(self._must_query),
725 if (not self._queries_outstanding and not self.extra_peers):
726 # all queries have retired, and we have no peers left to ask. No
727 # more progress can be made, therefore we are done.
728 self.log("all queries are retired, no extra peers: done",
732 recoverable_versions = self._servermap.recoverable_versions()
733 unrecoverable_versions = self._servermap.unrecoverable_versions()
735 # what is our completion policy? how hard should we work?
737 if self.mode == MODE_ANYTHING:
738 if recoverable_versions:
739 self.log("%d recoverable versions: done"
740 % len(recoverable_versions),
744 if self.mode == MODE_CHECK:
745 # we used self._must_query, and we know there aren't any
746 # responses still waiting, so that means we must be done
747 self.log("done", parent=lp)
751 if self.mode == MODE_READ:
752 # if we've queried k+epsilon servers, and we see a recoverable
753 # version, and we haven't seen any unrecoverable higher-seqnum'ed
754 # versions, then we're done.
756 if self._queries_completed < self.num_peers_to_query:
757 self.log(format="%(completed)d completed, %(query)d to query: need more",
758 completed=self._queries_completed,
759 query=self.num_peers_to_query,
761 return self._send_more_queries(MAX_IN_FLIGHT)
762 if not recoverable_versions:
763 self.log("no recoverable versions: need more",
765 return self._send_more_queries(MAX_IN_FLIGHT)
766 highest_recoverable = max(recoverable_versions)
767 highest_recoverable_seqnum = highest_recoverable[0]
768 for unrec_verinfo in unrecoverable_versions:
769 if unrec_verinfo[0] > highest_recoverable_seqnum:
770 # there is evidence of a higher-seqnum version, but we
771 # don't yet see enough shares to recover it. Try harder.
772 # TODO: consider sending more queries.
773 # TODO: consider limiting the search distance
774 self.log("evidence of higher seqnum: need more")
775 return self._send_more_queries(MAX_IN_FLIGHT)
776 # all the unrecoverable versions were old or concurrent with a
777 # recoverable version. Good enough.
778 self.log("no higher-seqnum: done", parent=lp)
781 if self.mode == MODE_WRITE:
782 # we want to keep querying until we've seen a few that don't have
783 # any shares, to be sufficiently confident that we've seen all
784 # the shares. This is still less work than MODE_CHECK, which asks
785 # every server in the world.
787 if not recoverable_versions:
788 self.log("no recoverable versions: need more", parent=lp)
789 return self._send_more_queries(MAX_IN_FLIGHT)
792 last_not_responded = -1
793 num_not_responded = 0
796 found_boundary = False
798 for i,(peerid,ss) in enumerate(self.full_peerlist):
799 if peerid in self._bad_peers:
802 #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
803 elif peerid in self._empty_peers:
806 #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
809 if num_not_found >= self.EPSILON:
810 self.log("found our boundary, %s" %
813 found_boundary = True
816 elif peerid in self._good_peers:
819 #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
825 #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
826 last_not_responded = i
827 num_not_responded += 1
830 # we need to know that we've gotten answers from
831 # everybody to the left of here
832 if last_not_responded == -1:
834 self.log("have all our answers",
836 # .. unless we're still waiting on the privkey
837 if self._need_privkey:
838 self.log("but we're still waiting for the privkey",
840 # if we found the boundary but we haven't yet found
841 # the privkey, we may need to look further. If
842 # somehow all the privkeys were corrupted (but the
843 # shares were readable), then this is likely to do an
845 return self._send_more_queries(MAX_IN_FLIGHT)
847 # still waiting for somebody
848 return self._send_more_queries(num_not_responded)
850 # if we hit here, we didn't find our boundary, so we're still
852 self.log("no boundary yet, %s" % "".join(states), parent=lp)
853 return self._send_more_queries(MAX_IN_FLIGHT)
855 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
856 # arbitrary, really I want this to be something like k -
857 # max(known_version_sharecounts) + some extra
858 self.log("catchall: need more", parent=lp)
859 return self._send_more_queries(MAX_IN_FLIGHT)
861 def _send_more_queries(self, num_outstanding):
865 self.log(format=" there are %(outstanding)d queries outstanding",
866 outstanding=len(self._queries_outstanding),
868 active_queries = len(self._queries_outstanding) + len(more_queries)
869 if active_queries >= num_outstanding:
871 if not self.extra_peers:
873 more_queries.append(self.extra_peers.pop(0))
875 self.log(format="sending %(more)d more queries: %(who)s",
876 more=len(more_queries),
877 who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
878 for (peerid,ss) in more_queries]),
881 for (peerid, ss) in more_queries:
882 self._do_query(ss, peerid, self._storage_index, self._read_size)
883 # we'll retrigger when those queries come back
886 if not self._running:
888 self._running = False
890 elapsed = now - self._started
891 self._status.set_finished(now)
892 self._status.timings["total"] = elapsed
893 self._status.set_progress(1.0)
894 self._status.set_status("Done")
895 self._status.set_active(False)
897 self._servermap.last_update_mode = self.mode
898 self._servermap.last_update_time = self._started
899 # the servermap will not be touched after this
900 self.log("servermap: %s" % self._servermap.summarize_versions())
901 eventually(self._done_deferred.callback, self._servermap)
903 def _fatal_error(self, f):
904 self.log("fatal error", failure=f, level=log.WEIRD)
905 self._done_deferred.errback(f)