]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
mutable WIP: re-enable publish/retrieve status
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from twisted.internet import defer
4 from twisted.python import failure
5 from foolscap.eventual import eventually
6 from allmydata.util import base32, hashutil, idlib, log
7 from allmydata import storage
8 from pycryptopp.publickey import rsa
9
10 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
11      DictOfSets, CorruptShareError, NeedMoreDataError
12 from layout import unpack_prefix_and_signature, unpack_header, unpack_share
13
14 class ServerMap:
15     """I record the placement of mutable shares.
16
17     This object records which shares (of various versions) are located on
18     which servers.
19
20     One purpose I serve is to inform callers about which versions of the
21     mutable file are recoverable and 'current'.
22
23     A second purpose is to serve as a state marker for test-and-set
24     operations. I am passed out of retrieval operations and back into publish
25     operations, which means 'publish this new version, but only if nothing
26     has changed since I last retrieved this data'. This reduces the chances
27     of clobbering a simultaneous (uncoordinated) write.
28
29     @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
30                      (versionid, timestamp) tuple. Each 'versionid' is a
31                      tuple of (seqnum, root_hash, IV, segsize, datalength,
32                      k, N, signed_prefix, offsets)
33
34     @ivar connections: maps peerid to a RemoteReference
35
36     @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
37                       shares that I should ignore (because a previous user of
38                       the servermap determined that they were invalid). The
39                       updater only locates a certain number of shares: if
40                       some of these turn out to have integrity problems and
41                       are unusable, the caller will need to mark those shares
42                       as bad, then re-update the servermap, then try again.
43     """
44
45     def __init__(self):
46         self.servermap = {}
47         self.connections = {}
48         self.unreachable_peers = set() # peerids that didn't respond to queries
49         self.problems = [] # mostly for debugging
50         self.bad_shares = set()
51         self.last_update_mode = None
52         self.last_update_time = 0
53
54     def mark_bad_share(self, peerid, shnum):
55         """This share was found to be bad, not in the checkstring or
56         signature, but deeper in the share, detected at retrieve time. Remove
57         it from our list of useful shares, and remember that it is bad so we
58         don't add it back again later.
59         """
60         key = (peerid, shnum)
61         self.bad_shares.add(key)
62         self.servermap.pop(key, None)
63
64     def add_new_share(self, peerid, shnum, verinfo, timestamp):
65         """We've written a new share out, replacing any that was there
66         before."""
67         key = (peerid, shnum)
68         self.bad_shares.discard(key)
69         self.servermap[key] = (verinfo, timestamp)
70
71     def dump(self, out=sys.stdout):
72         print >>out, "servermap:"
73
74         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
75             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
76              offsets_tuple) = verinfo
77             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
78                           (idlib.shortnodeid_b2a(peerid), shnum,
79                            seqnum, base32.b2a(root_hash)[:4], k, N,
80                            datalength))
81         return out
82
83     def all_peers(self):
84         return set([peerid
85                     for (peerid, shnum)
86                     in self.servermap])
87
88     def make_sharemap(self):
89         """Return a dict that maps shnum to a set of peerds that hold it."""
90         sharemap = DictOfSets()
91         for (peerid, shnum) in self.servermap:
92             sharemap.add(shnum, peerid)
93         return sharemap
94
95     def make_versionmap(self):
96         """Return a dict that maps versionid to sets of (shnum, peerid,
97         timestamp) tuples."""
98         versionmap = DictOfSets()
99         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
100             versionmap.add(verinfo, (shnum, peerid, timestamp))
101         return versionmap
102
103     def shares_on_peer(self, peerid):
104         return set([shnum
105                     for (s_peerid, shnum)
106                     in self.servermap
107                     if s_peerid == peerid])
108
109     def version_on_peer(self, peerid, shnum):
110         key = (peerid, shnum)
111         if key in self.servermap:
112             (verinfo, timestamp) = self.servermap[key]
113             return verinfo
114         return None
115         return None
116
117     def shares_available(self):
118         """Return a dict that maps verinfo to tuples of
119         (num_distinct_shares, k) tuples."""
120         versionmap = self.make_versionmap()
121         all_shares = {}
122         for verinfo, shares in versionmap.items():
123             s = set()
124             for (shnum, peerid, timestamp) in shares:
125                 s.add(shnum)
126             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
127              offsets_tuple) = verinfo
128             all_shares[verinfo] = (len(s), k)
129         return all_shares
130
131     def highest_seqnum(self):
132         available = self.shares_available()
133         seqnums = [verinfo[0]
134                    for verinfo in available.keys()]
135         seqnums.append(0)
136         return max(seqnums)
137
138     def summarize_versions(self):
139         """Return a string describing which versions we know about."""
140         versionmap = self.make_versionmap()
141         bits = []
142         for (verinfo, shares) in versionmap.items():
143             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
144              offsets_tuple) = verinfo
145             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
146             bits.append("%d*seq%d-%s" %
147                         (len(shnums), seqnum, base32.b2a(root_hash)[:4]))
148         return "/".join(bits)
149
150     def recoverable_versions(self):
151         """Return a set of versionids, one for each version that is currently
152         recoverable."""
153         versionmap = self.make_versionmap()
154
155         recoverable_versions = set()
156         for (verinfo, shares) in versionmap.items():
157             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
158              offsets_tuple) = verinfo
159             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
160             if len(shnums) >= k:
161                 # this one is recoverable
162                 recoverable_versions.add(verinfo)
163
164         return recoverable_versions
165
166     def unrecoverable_versions(self):
167         """Return a set of versionids, one for each version that is currently
168         unrecoverable."""
169         versionmap = self.make_versionmap()
170
171         unrecoverable_versions = set()
172         for (verinfo, shares) in versionmap.items():
173             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
174              offsets_tuple) = verinfo
175             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
176             if len(shnums) < k:
177                 unrecoverable_versions.add(verinfo)
178
179         return unrecoverable_versions
180
181     def best_recoverable_version(self):
182         """Return a single versionid, for the so-called 'best' recoverable
183         version. Sequence number is the primary sort criteria, followed by
184         root hash. Returns None if there are no recoverable versions."""
185         recoverable = list(self.recoverable_versions())
186         recoverable.sort()
187         if recoverable:
188             return recoverable[-1]
189         return None
190
191     def unrecoverable_newer_versions(self):
192         # Return a dict of versionid -> health, for versions that are
193         # unrecoverable and have later seqnums than any recoverable versions.
194         # These indicate that a write will lose data.
195         pass
196
197     def needs_merge(self):
198         # return True if there are multiple recoverable versions with the
199         # same seqnum, meaning that MutableFileNode.read_best_version is not
200         # giving you the whole story, and that using its data to do a
201         # subsequent publish will lose information.
202         pass
203
204 class ServermapUpdater:
205     def __init__(self, filenode, servermap, mode=MODE_READ):
206         """I update a servermap, locating a sufficient number of useful
207         shares and remembering where they are located.
208
209         """
210
211         self._node = filenode
212         self._servermap = servermap
213         self.mode = mode
214         self._running = True
215
216         self._storage_index = filenode.get_storage_index()
217         self._last_failure = None
218
219         # how much data should we read?
220         #  * if we only need the checkstring, then [0:75]
221         #  * if we need to validate the checkstring sig, then [543ish:799ish]
222         #  * if we need the verification key, then [107:436ish]
223         #   * the offset table at [75:107] tells us about the 'ish'
224         #  * if we need the encrypted private key, we want [-1216ish:]
225         #   * but we can't read from negative offsets
226         #   * the offset table tells us the 'ish', also the positive offset
227         # A future version of the SMDF slot format should consider using
228         # fixed-size slots so we can retrieve less data. For now, we'll just
229         # read 2000 bytes, which also happens to read enough actual data to
230         # pre-fetch a 9-entry dirnode.
231         self._read_size = 2000
232         if mode == MODE_CHECK:
233             # we use unpack_prefix_and_signature, so we need 1k
234             self._read_size = 1000
235         self._need_privkey = False
236         if mode == MODE_WRITE and not self._node._privkey:
237             self._need_privkey = True
238
239         prefix = storage.si_b2a(self._storage_index)[:5]
240         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
241                                    si=prefix, mode=mode)
242
243     def log(self, *args, **kwargs):
244         if "parent" not in kwargs:
245             kwargs["parent"] = self._log_number
246         return log.msg(*args, **kwargs)
247
248     def update(self):
249         """Update the servermap to reflect current conditions. Returns a
250         Deferred that fires with the servermap once the update has finished."""
251
252         # self._valid_versions is a set of validated verinfo tuples. We just
253         # use it to remember which versions had valid signatures, so we can
254         # avoid re-checking the signatures for each share.
255         self._valid_versions = set()
256
257         # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
258         # timestamp) tuples. This is used to figure out which versions might
259         # be retrievable, and to make the eventual data download faster.
260         self.versionmap = DictOfSets()
261
262         self._started = time.time()
263         self._done_deferred = defer.Deferred()
264
265         # first, which peers should be talk to? Any that were in our old
266         # servermap, plus "enough" others.
267
268         self._queries_completed = 0
269
270         client = self._node._client
271         full_peerlist = client.get_permuted_peers("storage",
272                                                   self._node._storage_index)
273         self.full_peerlist = full_peerlist # for use later, immutable
274         self.extra_peers = full_peerlist[:] # peers are removed as we use them
275         self._good_peers = set() # peers who had some shares
276         self._empty_peers = set() # peers who don't have any shares
277         self._bad_peers = set() # peers to whom our queries failed
278
279         k = self._node.get_required_shares()
280         if k is None:
281             # make a guess
282             k = 3
283         N = self._node.get_required_shares()
284         if N is None:
285             N = 10
286         self.EPSILON = k
287         # we want to send queries to at least this many peers (although we
288         # might not wait for all of their answers to come back)
289         self.num_peers_to_query = k + self.EPSILON
290
291         if self.mode == MODE_CHECK:
292             initial_peers_to_query = dict(full_peerlist)
293             must_query = set(initial_peers_to_query.keys())
294             self.extra_peers = []
295         elif self.mode == MODE_WRITE:
296             # we're planning to replace all the shares, so we want a good
297             # chance of finding them all. We will keep searching until we've
298             # seen epsilon that don't have a share.
299             self.num_peers_to_query = N + self.EPSILON
300             initial_peers_to_query, must_query = self._build_initial_querylist()
301             self.required_num_empty_peers = self.EPSILON
302
303             # TODO: arrange to read lots of data from k-ish servers, to avoid
304             # the extra round trip required to read large directories. This
305             # might also avoid the round trip required to read the encrypted
306             # private key.
307
308         else:
309             initial_peers_to_query, must_query = self._build_initial_querylist()
310
311         # this is a set of peers that we are required to get responses from:
312         # they are peers who used to have a share, so we need to know where
313         # they currently stand, even if that means we have to wait for a
314         # silently-lost TCP connection to time out. We remove peers from this
315         # set as we get responses.
316         self._must_query = must_query
317
318         # now initial_peers_to_query contains the peers that we should ask,
319         # self.must_query contains the peers that we must have heard from
320         # before we can consider ourselves finished, and self.extra_peers
321         # contains the overflow (peers that we should tap if we don't get
322         # enough responses)
323
324         self._send_initial_requests(initial_peers_to_query)
325         return self._done_deferred
326
327     def _build_initial_querylist(self):
328         initial_peers_to_query = {}
329         must_query = set()
330         for peerid in self._servermap.all_peers():
331             ss = self._servermap.connections[peerid]
332             # we send queries to everyone who was already in the sharemap
333             initial_peers_to_query[peerid] = ss
334             # and we must wait for responses from them
335             must_query.add(peerid)
336
337         while ((self.num_peers_to_query > len(initial_peers_to_query))
338                and self.extra_peers):
339             (peerid, ss) = self.extra_peers.pop(0)
340             initial_peers_to_query[peerid] = ss
341
342         return initial_peers_to_query, must_query
343
344     def _send_initial_requests(self, peerlist):
345         self._queries_outstanding = set()
346         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
347         dl = []
348         for (peerid, ss) in peerlist.items():
349             self._queries_outstanding.add(peerid)
350             self._do_query(ss, peerid, self._storage_index, self._read_size)
351
352         # control flow beyond this point: state machine. Receiving responses
353         # from queries is the input. We might send out more queries, or we
354         # might produce a result.
355         return None
356
357     def _do_query(self, ss, peerid, storage_index, readsize):
358         self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
359                  peerid=idlib.shortnodeid_b2a(peerid),
360                  readsize=readsize,
361                  level=log.NOISY)
362         self._servermap.connections[peerid] = ss
363         started = time.time()
364         self._queries_outstanding.add(peerid)
365         d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
366         d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
367                       started)
368         d.addErrback(self._query_failed, peerid)
369         # errors that aren't handled by _query_failed (and errors caused by
370         # _query_failed) get logged, but we still want to check for doneness.
371         d.addErrback(log.err)
372         d.addBoth(self._check_for_done)
373         d.addErrback(self._fatal_error)
374         return d
375
376     def _do_read(self, ss, peerid, storage_index, shnums, readv):
377         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
378         return d
379
380     def _got_results(self, datavs, peerid, readsize, stuff, started):
381         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
382                      peerid=idlib.shortnodeid_b2a(peerid),
383                      numshares=len(datavs),
384                      level=log.NOISY)
385         self._queries_outstanding.discard(peerid)
386         self._must_query.discard(peerid)
387         self._queries_completed += 1
388         if not self._running:
389             self.log("but we're not running, so we'll ignore it", parent=lp)
390             return
391
392         if datavs:
393             self._good_peers.add(peerid)
394         else:
395             self._empty_peers.add(peerid)
396
397         last_verinfo = None
398         last_shnum = None
399         for shnum,datav in datavs.items():
400             data = datav[0]
401             try:
402                 verinfo = self._got_results_one_share(shnum, data, peerid)
403                 last_verinfo = verinfo
404                 last_shnum = shnum
405             except CorruptShareError, e:
406                 # log it and give the other shares a chance to be processed
407                 f = failure.Failure()
408                 self.log("bad share: %s %s" % (f, f.value),
409                          parent=lp, level=log.WEIRD)
410                 self._bad_peers.add(peerid)
411                 self._last_failure = f
412                 self._servermap.problems.append(f)
413                 pass
414
415         if self._need_privkey and last_verinfo:
416             # send them a request for the privkey. We send one request per
417             # server.
418             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
419              offsets_tuple) = last_verinfo
420             o = dict(offsets_tuple)
421
422             self._queries_outstanding.add(peerid)
423             readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
424             ss = self._servermap.connections[peerid]
425             d = self._do_read(ss, peerid, self._storage_index,
426                               [last_shnum], readv)
427             d.addCallback(self._got_privkey_results, peerid, last_shnum)
428             d.addErrback(self._privkey_query_failed, peerid, last_shnum)
429             d.addErrback(log.err)
430             d.addCallback(self._check_for_done)
431             d.addErrback(self._fatal_error)
432
433         # all done!
434         self.log("_got_results done", parent=lp)
435
436     def _got_results_one_share(self, shnum, data, peerid):
437         lp = self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
438                       shnum=shnum,
439                       peerid=idlib.shortnodeid_b2a(peerid))
440
441         # this might raise NeedMoreDataError, if the pubkey and signature
442         # live at some weird offset. That shouldn't happen, so I'm going to
443         # treat it as a bad share.
444         (seqnum, root_hash, IV, k, N, segsize, datalength,
445          pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
446
447         if not self._node._pubkey:
448             fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
449             assert len(fingerprint) == 32
450             if fingerprint != self._node._fingerprint:
451                 raise CorruptShareError(peerid, shnum,
452                                         "pubkey doesn't match fingerprint")
453             self._node._pubkey = self._deserialize_pubkey(pubkey_s)
454
455         if self._need_privkey:
456             self._try_to_extract_privkey(data, peerid, shnum)
457
458         (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
459          ig_segsize, ig_datalen, offsets) = unpack_header(data)
460         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
461
462         verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
463                    offsets_tuple)
464
465         if verinfo not in self._valid_versions:
466             # it's a new pair. Verify the signature.
467             valid = self._node._pubkey.verify(prefix, signature)
468             if not valid:
469                 raise CorruptShareError(peerid, shnum, "signature is invalid")
470
471             # ok, it's a valid verinfo. Add it to the list of validated
472             # versions.
473             self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
474                      % (seqnum, base32.b2a(root_hash)[:4],
475                         idlib.shortnodeid_b2a(peerid), shnum,
476                         k, N, segsize, datalength),
477                      parent=lp)
478             self._valid_versions.add(verinfo)
479         # We now know that this is a valid candidate verinfo.
480
481         if (peerid, shnum) in self._servermap.bad_shares:
482             # we've been told that the rest of the data in this share is
483             # unusable, so don't add it to the servermap.
484             self.log("but we've been told this is a bad share",
485                      parent=lp, level=log.UNUSUAL)
486             return verinfo
487
488         # Add the info to our servermap.
489         timestamp = time.time()
490         self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
491         # and the versionmap
492         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
493         return verinfo
494
495     def _deserialize_pubkey(self, pubkey_s):
496         verifier = rsa.create_verifying_key_from_string(pubkey_s)
497         return verifier
498
499     def _try_to_extract_privkey(self, data, peerid, shnum):
500         try:
501             r = unpack_share(data)
502         except NeedMoreDataError, e:
503             # this share won't help us. oh well.
504             offset = e.encprivkey_offset
505             length = e.encprivkey_length
506             self.log("shnum %d on peerid %s: share was too short (%dB) "
507                      "to get the encprivkey; [%d:%d] ought to hold it" %
508                      (shnum, idlib.shortnodeid_b2a(peerid), len(data),
509                       offset, offset+length))
510             # NOTE: if uncoordinated writes are taking place, someone might
511             # change the share (and most probably move the encprivkey) before
512             # we get a chance to do one of these reads and fetch it. This
513             # will cause us to see a NotEnoughSharesError(unable to fetch
514             # privkey) instead of an UncoordinatedWriteError . This is a
515             # nuisance, but it will go away when we move to DSA-based mutable
516             # files (since the privkey will be small enough to fit in the
517             # write cap).
518
519             return
520
521         (seqnum, root_hash, IV, k, N, segsize, datalen,
522          pubkey, signature, share_hash_chain, block_hash_tree,
523          share_data, enc_privkey) = r
524
525         return self._try_to_validate_privkey(self, enc_privkey, peerid, shnum)
526
527     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
528
529         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
530         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
531         if alleged_writekey != self._node.get_writekey():
532             self.log("invalid privkey from %s shnum %d" %
533                      (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
534             return
535
536         # it's good
537         self.log("got valid privkey from shnum %d on peerid %s" %
538                  (shnum, idlib.shortnodeid_b2a(peerid)))
539         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
540         self._node._populate_encprivkey(enc_privkey)
541         self._node._populate_privkey(privkey)
542         self._need_privkey = False
543
544
545     def _query_failed(self, f, peerid):
546         self.log("error during query: %s %s" % (f, f.value), level=log.WEIRD)
547         if not self._running:
548             return
549         self._must_query.discard(peerid)
550         self._queries_outstanding.discard(peerid)
551         self._bad_peers.add(peerid)
552         self._servermap.problems.append(f)
553         self._servermap.unreachable_peers.add(peerid) # TODO: overkill?
554         self._queries_completed += 1
555         self._last_failure = f
556
557     def _got_privkey_results(self, datavs, peerid, shnum):
558         self._queries_outstanding.discard(peerid)
559         if not self._need_privkey:
560             return
561         if shnum not in datavs:
562             self.log("privkey wasn't there when we asked it", level=log.WEIRD)
563             return
564         datav = datavs[shnum]
565         enc_privkey = datav[0]
566         self._try_to_validate_privkey(enc_privkey, peerid, shnum)
567
568     def _privkey_query_failed(self, f, peerid, shnum):
569         self._queries_outstanding.discard(peerid)
570         self.log("error during privkey query: %s %s" % (f, f.value),
571                  level=log.WEIRD)
572         if not self._running:
573             return
574         self._queries_outstanding.discard(peerid)
575         self._servermap.problems.append(f)
576         self._last_failure = f
577
578     def _check_for_done(self, res):
579         # exit paths:
580         #  return self._send_more_queries(outstanding) : send some more queries
581         #  return self._done() : all done
582         #  return : keep waiting, no new queries
583
584         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
585                               "%(outstanding)d queries outstanding, "
586                               "%(extra)d extra peers available, "
587                               "%(must)d 'must query' peers left"
588                               ),
589                       mode=self.mode,
590                       outstanding=len(self._queries_outstanding),
591                       extra=len(self.extra_peers),
592                       must=len(self._must_query),
593                       level=log.NOISY,
594                       )
595
596         if not self._running:
597             self.log("but we're not running", parent=lp, level=log.NOISY)
598             return
599
600         if self._must_query:
601             # we are still waiting for responses from peers that used to have
602             # a share, so we must continue to wait. No additional queries are
603             # required at this time.
604             self.log("%d 'must query' peers left" % len(self._must_query),
605                      parent=lp)
606             return
607
608         if (not self._queries_outstanding and not self.extra_peers):
609             # all queries have retired, and we have no peers left to ask. No
610             # more progress can be made, therefore we are done.
611             self.log("all queries are retired, no extra peers: done",
612                      parent=lp)
613             return self._done()
614
615         recoverable_versions = self._servermap.recoverable_versions()
616         unrecoverable_versions = self._servermap.unrecoverable_versions()
617
618         # what is our completion policy? how hard should we work?
619
620         if self.mode == MODE_ANYTHING:
621             if recoverable_versions:
622                 self.log("%d recoverable versions: done"
623                          % len(recoverable_versions),
624                          parent=lp)
625                 return self._done()
626
627         if self.mode == MODE_CHECK:
628             # we used self._must_query, and we know there aren't any
629             # responses still waiting, so that means we must be done
630             self.log("done", parent=lp)
631             return self._done()
632
633         MAX_IN_FLIGHT = 5
634         if self.mode == MODE_READ:
635             # if we've queried k+epsilon servers, and we see a recoverable
636             # version, and we haven't seen any unrecoverable higher-seqnum'ed
637             # versions, then we're done.
638
639             if self._queries_completed < self.num_peers_to_query:
640                 self.log(format="%(completed)d completed, %(query)d to query: need more",
641                          completed=self._queries_completed,
642                          query=self.num_peers_to_query,
643                          parent=lp)
644                 return self._send_more_queries(MAX_IN_FLIGHT)
645             if not recoverable_versions:
646                 self.log("no recoverable versions: need more",
647                          parent=lp)
648                 return self._send_more_queries(MAX_IN_FLIGHT)
649             highest_recoverable = max(recoverable_versions)
650             highest_recoverable_seqnum = highest_recoverable[0]
651             for unrec_verinfo in unrecoverable_versions:
652                 if unrec_verinfo[0] > highest_recoverable_seqnum:
653                     # there is evidence of a higher-seqnum version, but we
654                     # don't yet see enough shares to recover it. Try harder.
655                     # TODO: consider sending more queries.
656                     # TODO: consider limiting the search distance
657                     self.log("evidence of higher seqnum: need more")
658                     return self._send_more_queries(MAX_IN_FLIGHT)
659             # all the unrecoverable versions were old or concurrent with a
660             # recoverable version. Good enough.
661             self.log("no higher-seqnum: done", parent=lp)
662             return self._done()
663
664         if self.mode == MODE_WRITE:
665             # we want to keep querying until we've seen a few that don't have
666             # any shares, to be sufficiently confident that we've seen all
667             # the shares. This is still less work than MODE_CHECK, which asks
668             # every server in the world.
669
670             if not recoverable_versions:
671                 self.log("no recoverable versions: need more", parent=lp)
672                 return self._send_more_queries(MAX_IN_FLIGHT)
673
674             last_found = -1
675             last_not_responded = -1
676             num_not_responded = 0
677             num_not_found = 0
678             states = []
679             found_boundary = False
680
681             for i,(peerid,ss) in enumerate(self.full_peerlist):
682                 if peerid in self._bad_peers:
683                     # query failed
684                     states.append("x")
685                     #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
686                 elif peerid in self._empty_peers:
687                     # no shares
688                     states.append("0")
689                     #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
690                     if last_found != -1:
691                         num_not_found += 1
692                         if num_not_found >= self.EPSILON:
693                             self.log("found our boundary, %s" %
694                                      "".join(states),
695                                      parent=lp)
696                             found_boundary = True
697                             break
698
699                 elif peerid in self._good_peers:
700                     # yes shares
701                     states.append("1")
702                     #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
703                     last_found = i
704                     num_not_found = 0
705                 else:
706                     # not responded yet
707                     states.append("?")
708                     #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
709                     last_not_responded = i
710                     num_not_responded += 1
711
712             if found_boundary:
713                 # we need to know that we've gotten answers from
714                 # everybody to the left of here
715                 if last_not_responded == -1:
716                     # we're done
717                     self.log("have all our answers",
718                              parent=lp)
719                     # .. unless we're still waiting on the privkey
720                     if self._need_privkey:
721                         self.log("but we're still waiting for the privkey",
722                                  parent=lp)
723                         # if we found the boundary but we haven't yet found
724                         # the privkey, we may need to look further. If
725                         # somehow all the privkeys were corrupted (but the
726                         # shares were readable), then this is likely to do an
727                         # exhaustive search.
728                         return self._send_more_queries(MAX_IN_FLIGHT)
729                     return self._done()
730                 # still waiting for somebody
731                 return self._send_more_queries(num_not_responded)
732
733             # if we hit here, we didn't find our boundary, so we're still
734             # waiting for peers
735             self.log("no boundary yet, %s" % "".join(states), parent=lp)
736             return self._send_more_queries(MAX_IN_FLIGHT)
737
738         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
739         # arbitrary, really I want this to be something like k -
740         # max(known_version_sharecounts) + some extra
741         self.log("catchall: need more", parent=lp)
742         return self._send_more_queries(MAX_IN_FLIGHT)
743
744     def _send_more_queries(self, num_outstanding):
745         more_queries = []
746
747         while True:
748             self.log(format=" there are %(outstanding)d queries outstanding",
749                      outstanding=len(self._queries_outstanding),
750                      level=log.NOISY)
751             active_queries = len(self._queries_outstanding) + len(more_queries)
752             if active_queries >= num_outstanding:
753                 break
754             if not self.extra_peers:
755                 break
756             more_queries.append(self.extra_peers.pop(0))
757
758         self.log(format="sending %(more)d more queries: %(who)s",
759                  more=len(more_queries),
760                  who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
761                                for (peerid,ss) in more_queries]),
762                  level=log.NOISY)
763
764         for (peerid, ss) in more_queries:
765             self._do_query(ss, peerid, self._storage_index, self._read_size)
766             # we'll retrigger when those queries come back
767
768     def _done(self):
769         if not self._running:
770             return
771         self._running = False
772         self._servermap.last_update_mode = self.mode
773         self._servermap.last_update_time = self._started
774         # the servermap will not be touched after this
775         self.log("servermap: %s" % self._servermap.summarize_versions())
776         eventually(self._done_deferred.callback, self._servermap)
777
778     def _fatal_error(self, f):
779         self.log("fatal error", failure=f, level=log.WEIRD)
780         self._done_deferred.errback(f)
781