3 from twisted.internet import defer
4 from twisted.python import failure
5 from foolscap.eventual import eventually
6 from allmydata.util import base32, hashutil, idlib, log
7 from allmydata import storage
8 from pycryptopp.publickey import rsa
10 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
11 DictOfSets, CorruptShareError, NeedMoreDataError
12 from layout import unpack_prefix_and_signature, unpack_header, unpack_share
15 """I record the placement of mutable shares.
17 This object records which shares (of various versions) are located on
20 One purpose I serve is to inform callers about which versions of the
21 mutable file are recoverable and 'current'.
23 A second purpose is to serve as a state marker for test-and-set
24 operations. I am passed out of retrieval operations and back into publish
25 operations, which means 'publish this new version, but only if nothing
26 has changed since I last retrieved this data'. This reduces the chances
27 of clobbering a simultaneous (uncoordinated) write.
29 @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
30 (versionid, timestamp) tuple. Each 'versionid' is a
31 tuple of (seqnum, root_hash, IV, segsize, datalength,
32 k, N, signed_prefix, offsets)
34 @ivar connections: maps peerid to a RemoteReference
36 @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
37 shares that I should ignore (because a previous user of
38 the servermap determined that they were invalid). The
39 updater only locates a certain number of shares: if
40 some of these turn out to have integrity problems and
41 are unusable, the caller will need to mark those shares
42 as bad, then re-update the servermap, then try again.
48 self.unreachable_peers = set() # peerids that didn't respond to queries
49 self.problems = [] # mostly for debugging
50 self.bad_shares = set()
51 self.last_update_mode = None
52 self.last_update_time = 0
54 def mark_bad_share(self, peerid, shnum):
55 """This share was found to be bad, not in the checkstring or
56 signature, but deeper in the share, detected at retrieve time. Remove
57 it from our list of useful shares, and remember that it is bad so we
58 don't add it back again later.
61 self.bad_shares.add(key)
62 self.servermap.pop(key, None)
64 def add_new_share(self, peerid, shnum, verinfo, timestamp):
65 """We've written a new share out, replacing any that was there
68 self.bad_shares.discard(key)
69 self.servermap[key] = (verinfo, timestamp)
71 def dump(self, out=sys.stdout):
72 print >>out, "servermap:"
74 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
75 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
76 offsets_tuple) = verinfo
77 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
78 (idlib.shortnodeid_b2a(peerid), shnum,
79 seqnum, base32.b2a(root_hash)[:4], k, N,
88 def make_sharemap(self):
89 """Return a dict that maps shnum to a set of peerds that hold it."""
90 sharemap = DictOfSets()
91 for (peerid, shnum) in self.servermap:
92 sharemap.add(shnum, peerid)
95 def make_versionmap(self):
96 """Return a dict that maps versionid to sets of (shnum, peerid,
98 versionmap = DictOfSets()
99 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
100 versionmap.add(verinfo, (shnum, peerid, timestamp))
103 def shares_on_peer(self, peerid):
105 for (s_peerid, shnum)
107 if s_peerid == peerid])
109 def version_on_peer(self, peerid, shnum):
110 key = (peerid, shnum)
111 if key in self.servermap:
112 (verinfo, timestamp) = self.servermap[key]
117 def shares_available(self):
118 """Return a dict that maps verinfo to tuples of
119 (num_distinct_shares, k) tuples."""
120 versionmap = self.make_versionmap()
122 for verinfo, shares in versionmap.items():
124 for (shnum, peerid, timestamp) in shares:
126 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
127 offsets_tuple) = verinfo
128 all_shares[verinfo] = (len(s), k)
131 def highest_seqnum(self):
132 available = self.shares_available()
133 seqnums = [verinfo[0]
134 for verinfo in available.keys()]
138 def summarize_versions(self):
139 """Return a string describing which versions we know about."""
140 versionmap = self.make_versionmap()
142 for (verinfo, shares) in versionmap.items():
143 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
144 offsets_tuple) = verinfo
145 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
146 bits.append("%d*seq%d-%s" %
147 (len(shnums), seqnum, base32.b2a(root_hash)[:4]))
148 return "/".join(bits)
150 def recoverable_versions(self):
151 """Return a set of versionids, one for each version that is currently
153 versionmap = self.make_versionmap()
155 recoverable_versions = set()
156 for (verinfo, shares) in versionmap.items():
157 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
158 offsets_tuple) = verinfo
159 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
161 # this one is recoverable
162 recoverable_versions.add(verinfo)
164 return recoverable_versions
166 def unrecoverable_versions(self):
167 """Return a set of versionids, one for each version that is currently
169 versionmap = self.make_versionmap()
171 unrecoverable_versions = set()
172 for (verinfo, shares) in versionmap.items():
173 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
174 offsets_tuple) = verinfo
175 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
177 unrecoverable_versions.add(verinfo)
179 return unrecoverable_versions
181 def best_recoverable_version(self):
182 """Return a single versionid, for the so-called 'best' recoverable
183 version. Sequence number is the primary sort criteria, followed by
184 root hash. Returns None if there are no recoverable versions."""
185 recoverable = list(self.recoverable_versions())
188 return recoverable[-1]
191 def unrecoverable_newer_versions(self):
192 # Return a dict of versionid -> health, for versions that are
193 # unrecoverable and have later seqnums than any recoverable versions.
194 # These indicate that a write will lose data.
197 def needs_merge(self):
198 # return True if there are multiple recoverable versions with the
199 # same seqnum, meaning that MutableFileNode.read_best_version is not
200 # giving you the whole story, and that using its data to do a
201 # subsequent publish will lose information.
204 class ServermapUpdater:
205 def __init__(self, filenode, servermap, mode=MODE_READ):
206 """I update a servermap, locating a sufficient number of useful
207 shares and remembering where they are located.
211 self._node = filenode
212 self._servermap = servermap
216 self._storage_index = filenode.get_storage_index()
217 self._last_failure = None
219 # how much data should we read?
220 # * if we only need the checkstring, then [0:75]
221 # * if we need to validate the checkstring sig, then [543ish:799ish]
222 # * if we need the verification key, then [107:436ish]
223 # * the offset table at [75:107] tells us about the 'ish'
224 # * if we need the encrypted private key, we want [-1216ish:]
225 # * but we can't read from negative offsets
226 # * the offset table tells us the 'ish', also the positive offset
227 # A future version of the SMDF slot format should consider using
228 # fixed-size slots so we can retrieve less data. For now, we'll just
229 # read 2000 bytes, which also happens to read enough actual data to
230 # pre-fetch a 9-entry dirnode.
231 self._read_size = 2000
232 if mode == MODE_CHECK:
233 # we use unpack_prefix_and_signature, so we need 1k
234 self._read_size = 1000
235 self._need_privkey = False
236 if mode == MODE_WRITE and not self._node._privkey:
237 self._need_privkey = True
239 prefix = storage.si_b2a(self._storage_index)[:5]
240 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
241 si=prefix, mode=mode)
243 def log(self, *args, **kwargs):
244 if "parent" not in kwargs:
245 kwargs["parent"] = self._log_number
246 return log.msg(*args, **kwargs)
249 """Update the servermap to reflect current conditions. Returns a
250 Deferred that fires with the servermap once the update has finished."""
252 # self._valid_versions is a set of validated verinfo tuples. We just
253 # use it to remember which versions had valid signatures, so we can
254 # avoid re-checking the signatures for each share.
255 self._valid_versions = set()
257 # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
258 # timestamp) tuples. This is used to figure out which versions might
259 # be retrievable, and to make the eventual data download faster.
260 self.versionmap = DictOfSets()
262 self._started = time.time()
263 self._done_deferred = defer.Deferred()
265 # first, which peers should be talk to? Any that were in our old
266 # servermap, plus "enough" others.
268 self._queries_completed = 0
270 client = self._node._client
271 full_peerlist = client.get_permuted_peers("storage",
272 self._node._storage_index)
273 self.full_peerlist = full_peerlist # for use later, immutable
274 self.extra_peers = full_peerlist[:] # peers are removed as we use them
275 self._good_peers = set() # peers who had some shares
276 self._empty_peers = set() # peers who don't have any shares
277 self._bad_peers = set() # peers to whom our queries failed
279 k = self._node.get_required_shares()
283 N = self._node.get_required_shares()
287 # we want to send queries to at least this many peers (although we
288 # might not wait for all of their answers to come back)
289 self.num_peers_to_query = k + self.EPSILON
291 if self.mode == MODE_CHECK:
292 initial_peers_to_query = dict(full_peerlist)
293 must_query = set(initial_peers_to_query.keys())
294 self.extra_peers = []
295 elif self.mode == MODE_WRITE:
296 # we're planning to replace all the shares, so we want a good
297 # chance of finding them all. We will keep searching until we've
298 # seen epsilon that don't have a share.
299 self.num_peers_to_query = N + self.EPSILON
300 initial_peers_to_query, must_query = self._build_initial_querylist()
301 self.required_num_empty_peers = self.EPSILON
303 # TODO: arrange to read lots of data from k-ish servers, to avoid
304 # the extra round trip required to read large directories. This
305 # might also avoid the round trip required to read the encrypted
309 initial_peers_to_query, must_query = self._build_initial_querylist()
311 # this is a set of peers that we are required to get responses from:
312 # they are peers who used to have a share, so we need to know where
313 # they currently stand, even if that means we have to wait for a
314 # silently-lost TCP connection to time out. We remove peers from this
315 # set as we get responses.
316 self._must_query = must_query
318 # now initial_peers_to_query contains the peers that we should ask,
319 # self.must_query contains the peers that we must have heard from
320 # before we can consider ourselves finished, and self.extra_peers
321 # contains the overflow (peers that we should tap if we don't get
324 self._send_initial_requests(initial_peers_to_query)
325 return self._done_deferred
327 def _build_initial_querylist(self):
328 initial_peers_to_query = {}
330 for peerid in self._servermap.all_peers():
331 ss = self._servermap.connections[peerid]
332 # we send queries to everyone who was already in the sharemap
333 initial_peers_to_query[peerid] = ss
334 # and we must wait for responses from them
335 must_query.add(peerid)
337 while ((self.num_peers_to_query > len(initial_peers_to_query))
338 and self.extra_peers):
339 (peerid, ss) = self.extra_peers.pop(0)
340 initial_peers_to_query[peerid] = ss
342 return initial_peers_to_query, must_query
344 def _send_initial_requests(self, peerlist):
345 self._queries_outstanding = set()
346 self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
348 for (peerid, ss) in peerlist.items():
349 self._queries_outstanding.add(peerid)
350 self._do_query(ss, peerid, self._storage_index, self._read_size)
352 # control flow beyond this point: state machine. Receiving responses
353 # from queries is the input. We might send out more queries, or we
354 # might produce a result.
357 def _do_query(self, ss, peerid, storage_index, readsize):
358 self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
359 peerid=idlib.shortnodeid_b2a(peerid),
362 self._servermap.connections[peerid] = ss
363 started = time.time()
364 self._queries_outstanding.add(peerid)
365 d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
366 d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
368 d.addErrback(self._query_failed, peerid)
369 # errors that aren't handled by _query_failed (and errors caused by
370 # _query_failed) get logged, but we still want to check for doneness.
371 d.addErrback(log.err)
372 d.addBoth(self._check_for_done)
373 d.addErrback(self._fatal_error)
376 def _do_read(self, ss, peerid, storage_index, shnums, readv):
377 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
380 def _got_results(self, datavs, peerid, readsize, stuff, started):
381 lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
382 peerid=idlib.shortnodeid_b2a(peerid),
383 numshares=len(datavs),
385 self._queries_outstanding.discard(peerid)
386 self._must_query.discard(peerid)
387 self._queries_completed += 1
388 if not self._running:
389 self.log("but we're not running, so we'll ignore it", parent=lp)
393 self._good_peers.add(peerid)
395 self._empty_peers.add(peerid)
399 for shnum,datav in datavs.items():
402 verinfo = self._got_results_one_share(shnum, data, peerid)
403 last_verinfo = verinfo
405 except CorruptShareError, e:
406 # log it and give the other shares a chance to be processed
407 f = failure.Failure()
408 self.log("bad share: %s %s" % (f, f.value),
409 parent=lp, level=log.WEIRD)
410 self._bad_peers.add(peerid)
411 self._last_failure = f
412 self._servermap.problems.append(f)
415 if self._need_privkey and last_verinfo:
416 # send them a request for the privkey. We send one request per
418 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
419 offsets_tuple) = last_verinfo
420 o = dict(offsets_tuple)
422 self._queries_outstanding.add(peerid)
423 readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
424 ss = self._servermap.connections[peerid]
425 d = self._do_read(ss, peerid, self._storage_index,
427 d.addCallback(self._got_privkey_results, peerid, last_shnum)
428 d.addErrback(self._privkey_query_failed, peerid, last_shnum)
429 d.addErrback(log.err)
430 d.addCallback(self._check_for_done)
431 d.addErrback(self._fatal_error)
434 self.log("_got_results done", parent=lp)
436 def _got_results_one_share(self, shnum, data, peerid):
437 lp = self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
439 peerid=idlib.shortnodeid_b2a(peerid))
441 # this might raise NeedMoreDataError, if the pubkey and signature
442 # live at some weird offset. That shouldn't happen, so I'm going to
443 # treat it as a bad share.
444 (seqnum, root_hash, IV, k, N, segsize, datalength,
445 pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
447 if not self._node._pubkey:
448 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
449 assert len(fingerprint) == 32
450 if fingerprint != self._node._fingerprint:
451 raise CorruptShareError(peerid, shnum,
452 "pubkey doesn't match fingerprint")
453 self._node._pubkey = self._deserialize_pubkey(pubkey_s)
455 if self._need_privkey:
456 self._try_to_extract_privkey(data, peerid, shnum)
458 (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
459 ig_segsize, ig_datalen, offsets) = unpack_header(data)
460 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
462 verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
465 if verinfo not in self._valid_versions:
466 # it's a new pair. Verify the signature.
467 valid = self._node._pubkey.verify(prefix, signature)
469 raise CorruptShareError(peerid, shnum, "signature is invalid")
471 # ok, it's a valid verinfo. Add it to the list of validated
473 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
474 % (seqnum, base32.b2a(root_hash)[:4],
475 idlib.shortnodeid_b2a(peerid), shnum,
476 k, N, segsize, datalength),
478 self._valid_versions.add(verinfo)
479 # We now know that this is a valid candidate verinfo.
481 if (peerid, shnum) in self._servermap.bad_shares:
482 # we've been told that the rest of the data in this share is
483 # unusable, so don't add it to the servermap.
484 self.log("but we've been told this is a bad share",
485 parent=lp, level=log.UNUSUAL)
488 # Add the info to our servermap.
489 timestamp = time.time()
490 self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
492 self.versionmap.add(verinfo, (shnum, peerid, timestamp))
495 def _deserialize_pubkey(self, pubkey_s):
496 verifier = rsa.create_verifying_key_from_string(pubkey_s)
499 def _try_to_extract_privkey(self, data, peerid, shnum):
501 r = unpack_share(data)
502 except NeedMoreDataError, e:
503 # this share won't help us. oh well.
504 offset = e.encprivkey_offset
505 length = e.encprivkey_length
506 self.log("shnum %d on peerid %s: share was too short (%dB) "
507 "to get the encprivkey; [%d:%d] ought to hold it" %
508 (shnum, idlib.shortnodeid_b2a(peerid), len(data),
509 offset, offset+length))
510 # NOTE: if uncoordinated writes are taking place, someone might
511 # change the share (and most probably move the encprivkey) before
512 # we get a chance to do one of these reads and fetch it. This
513 # will cause us to see a NotEnoughSharesError(unable to fetch
514 # privkey) instead of an UncoordinatedWriteError . This is a
515 # nuisance, but it will go away when we move to DSA-based mutable
516 # files (since the privkey will be small enough to fit in the
521 (seqnum, root_hash, IV, k, N, segsize, datalen,
522 pubkey, signature, share_hash_chain, block_hash_tree,
523 share_data, enc_privkey) = r
525 return self._try_to_validate_privkey(self, enc_privkey, peerid, shnum)
527 def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
529 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
530 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
531 if alleged_writekey != self._node.get_writekey():
532 self.log("invalid privkey from %s shnum %d" %
533 (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
537 self.log("got valid privkey from shnum %d on peerid %s" %
538 (shnum, idlib.shortnodeid_b2a(peerid)))
539 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
540 self._node._populate_encprivkey(enc_privkey)
541 self._node._populate_privkey(privkey)
542 self._need_privkey = False
545 def _query_failed(self, f, peerid):
546 self.log("error during query: %s %s" % (f, f.value), level=log.WEIRD)
547 if not self._running:
549 self._must_query.discard(peerid)
550 self._queries_outstanding.discard(peerid)
551 self._bad_peers.add(peerid)
552 self._servermap.problems.append(f)
553 self._servermap.unreachable_peers.add(peerid) # TODO: overkill?
554 self._queries_completed += 1
555 self._last_failure = f
557 def _got_privkey_results(self, datavs, peerid, shnum):
558 self._queries_outstanding.discard(peerid)
559 if not self._need_privkey:
561 if shnum not in datavs:
562 self.log("privkey wasn't there when we asked it", level=log.WEIRD)
564 datav = datavs[shnum]
565 enc_privkey = datav[0]
566 self._try_to_validate_privkey(enc_privkey, peerid, shnum)
568 def _privkey_query_failed(self, f, peerid, shnum):
569 self._queries_outstanding.discard(peerid)
570 self.log("error during privkey query: %s %s" % (f, f.value),
572 if not self._running:
574 self._queries_outstanding.discard(peerid)
575 self._servermap.problems.append(f)
576 self._last_failure = f
578 def _check_for_done(self, res):
580 # return self._send_more_queries(outstanding) : send some more queries
581 # return self._done() : all done
582 # return : keep waiting, no new queries
584 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
585 "%(outstanding)d queries outstanding, "
586 "%(extra)d extra peers available, "
587 "%(must)d 'must query' peers left"
590 outstanding=len(self._queries_outstanding),
591 extra=len(self.extra_peers),
592 must=len(self._must_query),
596 if not self._running:
597 self.log("but we're not running", parent=lp, level=log.NOISY)
601 # we are still waiting for responses from peers that used to have
602 # a share, so we must continue to wait. No additional queries are
603 # required at this time.
604 self.log("%d 'must query' peers left" % len(self._must_query),
608 if (not self._queries_outstanding and not self.extra_peers):
609 # all queries have retired, and we have no peers left to ask. No
610 # more progress can be made, therefore we are done.
611 self.log("all queries are retired, no extra peers: done",
615 recoverable_versions = self._servermap.recoverable_versions()
616 unrecoverable_versions = self._servermap.unrecoverable_versions()
618 # what is our completion policy? how hard should we work?
620 if self.mode == MODE_ANYTHING:
621 if recoverable_versions:
622 self.log("%d recoverable versions: done"
623 % len(recoverable_versions),
627 if self.mode == MODE_CHECK:
628 # we used self._must_query, and we know there aren't any
629 # responses still waiting, so that means we must be done
630 self.log("done", parent=lp)
634 if self.mode == MODE_READ:
635 # if we've queried k+epsilon servers, and we see a recoverable
636 # version, and we haven't seen any unrecoverable higher-seqnum'ed
637 # versions, then we're done.
639 if self._queries_completed < self.num_peers_to_query:
640 self.log(format="%(completed)d completed, %(query)d to query: need more",
641 completed=self._queries_completed,
642 query=self.num_peers_to_query,
644 return self._send_more_queries(MAX_IN_FLIGHT)
645 if not recoverable_versions:
646 self.log("no recoverable versions: need more",
648 return self._send_more_queries(MAX_IN_FLIGHT)
649 highest_recoverable = max(recoverable_versions)
650 highest_recoverable_seqnum = highest_recoverable[0]
651 for unrec_verinfo in unrecoverable_versions:
652 if unrec_verinfo[0] > highest_recoverable_seqnum:
653 # there is evidence of a higher-seqnum version, but we
654 # don't yet see enough shares to recover it. Try harder.
655 # TODO: consider sending more queries.
656 # TODO: consider limiting the search distance
657 self.log("evidence of higher seqnum: need more")
658 return self._send_more_queries(MAX_IN_FLIGHT)
659 # all the unrecoverable versions were old or concurrent with a
660 # recoverable version. Good enough.
661 self.log("no higher-seqnum: done", parent=lp)
664 if self.mode == MODE_WRITE:
665 # we want to keep querying until we've seen a few that don't have
666 # any shares, to be sufficiently confident that we've seen all
667 # the shares. This is still less work than MODE_CHECK, which asks
668 # every server in the world.
670 if not recoverable_versions:
671 self.log("no recoverable versions: need more", parent=lp)
672 return self._send_more_queries(MAX_IN_FLIGHT)
675 last_not_responded = -1
676 num_not_responded = 0
679 found_boundary = False
681 for i,(peerid,ss) in enumerate(self.full_peerlist):
682 if peerid in self._bad_peers:
685 #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
686 elif peerid in self._empty_peers:
689 #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
692 if num_not_found >= self.EPSILON:
693 self.log("found our boundary, %s" %
696 found_boundary = True
699 elif peerid in self._good_peers:
702 #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
708 #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
709 last_not_responded = i
710 num_not_responded += 1
713 # we need to know that we've gotten answers from
714 # everybody to the left of here
715 if last_not_responded == -1:
717 self.log("have all our answers",
719 # .. unless we're still waiting on the privkey
720 if self._need_privkey:
721 self.log("but we're still waiting for the privkey",
723 # if we found the boundary but we haven't yet found
724 # the privkey, we may need to look further. If
725 # somehow all the privkeys were corrupted (but the
726 # shares were readable), then this is likely to do an
728 return self._send_more_queries(MAX_IN_FLIGHT)
730 # still waiting for somebody
731 return self._send_more_queries(num_not_responded)
733 # if we hit here, we didn't find our boundary, so we're still
735 self.log("no boundary yet, %s" % "".join(states), parent=lp)
736 return self._send_more_queries(MAX_IN_FLIGHT)
738 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
739 # arbitrary, really I want this to be something like k -
740 # max(known_version_sharecounts) + some extra
741 self.log("catchall: need more", parent=lp)
742 return self._send_more_queries(MAX_IN_FLIGHT)
744 def _send_more_queries(self, num_outstanding):
748 self.log(format=" there are %(outstanding)d queries outstanding",
749 outstanding=len(self._queries_outstanding),
751 active_queries = len(self._queries_outstanding) + len(more_queries)
752 if active_queries >= num_outstanding:
754 if not self.extra_peers:
756 more_queries.append(self.extra_peers.pop(0))
758 self.log(format="sending %(more)d more queries: %(who)s",
759 more=len(more_queries),
760 who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
761 for (peerid,ss) in more_queries]),
764 for (peerid, ss) in more_queries:
765 self._do_query(ss, peerid, self._storage_index, self._read_size)
766 # we'll retrigger when those queries come back
769 if not self._running:
771 self._running = False
772 self._servermap.last_update_mode = self.mode
773 self._servermap.last_update_time = self._started
774 # the servermap will not be touched after this
775 self.log("servermap: %s" % self._servermap.summarize_versions())
776 eventually(self._done_deferred.callback, self._servermap)
778 def _fatal_error(self, f):
779 self.log("fatal error", failure=f, level=log.WEIRD)
780 self._done_deferred.errback(f)