]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
mutable: improve test coverage, fix bug in privkey fetching, add .finished to stats...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.eventual import eventually
8 from allmydata.util import base32, hashutil, idlib, log
9 from allmydata import storage
10 from allmydata.interfaces import IServermapUpdaterStatus
11 from pycryptopp.publickey import rsa
12
13 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
14      DictOfSets, CorruptShareError, NeedMoreDataError
15 from layout import unpack_prefix_and_signature, unpack_header, unpack_share
16
17 class UpdateStatus:
18     implements(IServermapUpdaterStatus)
19     statusid_counter = count(0)
20     def __init__(self):
21         self.timings = {}
22         self.timings["per_server"] = {}
23         self.timings["cumulative_verify"] = 0.0
24         self.privkey_from = None
25         self.problems = {}
26         self.active = True
27         self.storage_index = None
28         self.mode = "?"
29         self.status = "Not started"
30         self.progress = 0.0
31         self.counter = self.statusid_counter.next()
32         self.started = time.time()
33         self.finished = None
34
35     def add_per_server_time(self, peerid, op, sent, elapsed):
36         assert op in ("query", "late", "privkey")
37         if peerid not in self.timings["per_server"]:
38             self.timings["per_server"][peerid] = []
39         self.timings["per_server"][peerid].append((op,sent,elapsed))
40
41     def get_started(self):
42         return self.started
43     def get_finished(self):
44         return self.finished
45     def get_storage_index(self):
46         return self.storage_index
47     def get_mode(self):
48         return self.mode
49     def get_servermap(self):
50         return self.servermap
51     def get_privkey_from(self):
52         return self.privkey_from
53     def using_helper(self):
54         return False
55     def get_size(self):
56         return "-NA-"
57     def get_status(self):
58         return self.status
59     def get_progress(self):
60         return self.progress
61     def get_active(self):
62         return self.active
63     def get_counter(self):
64         return self.counter
65
66     def set_storage_index(self, si):
67         self.storage_index = si
68     def set_mode(self, mode):
69         self.mode = mode
70     def set_privkey_from(self, peerid):
71         self.privkey_from = peerid
72     def set_status(self, status):
73         self.status = status
74     def set_progress(self, value):
75         self.progress = value
76     def set_active(self, value):
77         self.active = value
78     def set_finished(self, when):
79         self.finished = when
80
81 class ServerMap:
82     """I record the placement of mutable shares.
83
84     This object records which shares (of various versions) are located on
85     which servers.
86
87     One purpose I serve is to inform callers about which versions of the
88     mutable file are recoverable and 'current'.
89
90     A second purpose is to serve as a state marker for test-and-set
91     operations. I am passed out of retrieval operations and back into publish
92     operations, which means 'publish this new version, but only if nothing
93     has changed since I last retrieved this data'. This reduces the chances
94     of clobbering a simultaneous (uncoordinated) write.
95
96     @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
97                      (versionid, timestamp) tuple. Each 'versionid' is a
98                      tuple of (seqnum, root_hash, IV, segsize, datalength,
99                      k, N, signed_prefix, offsets)
100
101     @ivar connections: maps peerid to a RemoteReference
102
103     @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
104                       shares that I should ignore (because a previous user of
105                       the servermap determined that they were invalid). The
106                       updater only locates a certain number of shares: if
107                       some of these turn out to have integrity problems and
108                       are unusable, the caller will need to mark those shares
109                       as bad, then re-update the servermap, then try again.
110     """
111
112     def __init__(self):
113         self.servermap = {}
114         self.connections = {}
115         self.unreachable_peers = set() # peerids that didn't respond to queries
116         self.problems = [] # mostly for debugging
117         self.bad_shares = set()
118         self.last_update_mode = None
119         self.last_update_time = 0
120
121     def mark_bad_share(self, peerid, shnum):
122         """This share was found to be bad, not in the checkstring or
123         signature, but deeper in the share, detected at retrieve time. Remove
124         it from our list of useful shares, and remember that it is bad so we
125         don't add it back again later.
126         """
127         key = (peerid, shnum)
128         self.bad_shares.add(key)
129         self.servermap.pop(key, None)
130
131     def add_new_share(self, peerid, shnum, verinfo, timestamp):
132         """We've written a new share out, replacing any that was there
133         before."""
134         key = (peerid, shnum)
135         self.bad_shares.discard(key)
136         self.servermap[key] = (verinfo, timestamp)
137
138     def dump(self, out=sys.stdout):
139         print >>out, "servermap:"
140
141         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
142             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
143              offsets_tuple) = verinfo
144             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
145                           (idlib.shortnodeid_b2a(peerid), shnum,
146                            seqnum, base32.b2a(root_hash)[:4], k, N,
147                            datalength))
148         if self.problems:
149             print >>out, "%d PROBLEMS" % len(self.problems)
150             for f in self.problems:
151                 print >>out, str(f)
152         return out
153
154     def all_peers(self):
155         return set([peerid
156                     for (peerid, shnum)
157                     in self.servermap])
158
159     def make_sharemap(self):
160         """Return a dict that maps shnum to a set of peerds that hold it."""
161         sharemap = DictOfSets()
162         for (peerid, shnum) in self.servermap:
163             sharemap.add(shnum, peerid)
164         return sharemap
165
166     def make_versionmap(self):
167         """Return a dict that maps versionid to sets of (shnum, peerid,
168         timestamp) tuples."""
169         versionmap = DictOfSets()
170         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
171             versionmap.add(verinfo, (shnum, peerid, timestamp))
172         return versionmap
173
174     def shares_on_peer(self, peerid):
175         return set([shnum
176                     for (s_peerid, shnum)
177                     in self.servermap
178                     if s_peerid == peerid])
179
180     def version_on_peer(self, peerid, shnum):
181         key = (peerid, shnum)
182         if key in self.servermap:
183             (verinfo, timestamp) = self.servermap[key]
184             return verinfo
185         return None
186
187     def shares_available(self):
188         """Return a dict that maps verinfo to tuples of
189         (num_distinct_shares, k) tuples."""
190         versionmap = self.make_versionmap()
191         all_shares = {}
192         for verinfo, shares in versionmap.items():
193             s = set()
194             for (shnum, peerid, timestamp) in shares:
195                 s.add(shnum)
196             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
197              offsets_tuple) = verinfo
198             all_shares[verinfo] = (len(s), k)
199         return all_shares
200
201     def highest_seqnum(self):
202         available = self.shares_available()
203         seqnums = [verinfo[0]
204                    for verinfo in available.keys()]
205         seqnums.append(0)
206         return max(seqnums)
207
208     def summarize_versions(self):
209         """Return a string describing which versions we know about."""
210         versionmap = self.make_versionmap()
211         bits = []
212         for (verinfo, shares) in versionmap.items():
213             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
214              offsets_tuple) = verinfo
215             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
216             bits.append("%d*seq%d-%s" %
217                         (len(shnums), seqnum, base32.b2a(root_hash)[:4]))
218         return "/".join(bits)
219
220     def recoverable_versions(self):
221         """Return a set of versionids, one for each version that is currently
222         recoverable."""
223         versionmap = self.make_versionmap()
224
225         recoverable_versions = set()
226         for (verinfo, shares) in versionmap.items():
227             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
228              offsets_tuple) = verinfo
229             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
230             if len(shnums) >= k:
231                 # this one is recoverable
232                 recoverable_versions.add(verinfo)
233
234         return recoverable_versions
235
236     def unrecoverable_versions(self):
237         """Return a set of versionids, one for each version that is currently
238         unrecoverable."""
239         versionmap = self.make_versionmap()
240
241         unrecoverable_versions = set()
242         for (verinfo, shares) in versionmap.items():
243             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
244              offsets_tuple) = verinfo
245             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
246             if len(shnums) < k:
247                 unrecoverable_versions.add(verinfo)
248
249         return unrecoverable_versions
250
251     def best_recoverable_version(self):
252         """Return a single versionid, for the so-called 'best' recoverable
253         version. Sequence number is the primary sort criteria, followed by
254         root hash. Returns None if there are no recoverable versions."""
255         recoverable = list(self.recoverable_versions())
256         recoverable.sort()
257         if recoverable:
258             return recoverable[-1]
259         return None
260
261     def unrecoverable_newer_versions(self):
262         # Return a dict of versionid -> health, for versions that are
263         # unrecoverable and have later seqnums than any recoverable versions.
264         # These indicate that a write will lose data.
265         versionmap = self.make_versionmap()
266         healths = {} # maps verinfo to (found,k)
267         unrecoverable = set()
268         highest_recoverable_seqnum = -1
269         for (verinfo, shares) in versionmap.items():
270             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
271              offsets_tuple) = verinfo
272             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
273             healths[verinfo] = (len(shnums),k)
274             if len(shnums) < k:
275                 unrecoverable.add(verinfo)
276             else:
277                 highest_recoverable_seqnum = max(seqnum,
278                                                  highest_recoverable_seqnum)
279
280         newversions = {}
281         for verinfo in unrecoverable:
282             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
283              offsets_tuple) = verinfo
284             if seqnum > highest_recoverable_seqnum:
285                 newversions[verinfo] = healths[verinfo]
286
287         return newversions
288
289
290     def needs_merge(self):
291         # return True if there are multiple recoverable versions with the
292         # same seqnum, meaning that MutableFileNode.read_best_version is not
293         # giving you the whole story, and that using its data to do a
294         # subsequent publish will lose information.
295         return bool(len(self.recoverable_versions()) > 1)
296
297
298 class ServermapUpdater:
299     def __init__(self, filenode, servermap, mode=MODE_READ):
300         """I update a servermap, locating a sufficient number of useful
301         shares and remembering where they are located.
302
303         """
304
305         self._node = filenode
306         self._servermap = servermap
307         self.mode = mode
308         self._running = True
309
310         self._storage_index = filenode.get_storage_index()
311         self._last_failure = None
312
313         self._status = UpdateStatus()
314         self._status.set_storage_index(self._storage_index)
315         self._status.set_progress(0.0)
316         self._status.set_mode(mode)
317
318         # how much data should we read?
319         #  * if we only need the checkstring, then [0:75]
320         #  * if we need to validate the checkstring sig, then [543ish:799ish]
321         #  * if we need the verification key, then [107:436ish]
322         #   * the offset table at [75:107] tells us about the 'ish'
323         #  * if we need the encrypted private key, we want [-1216ish:]
324         #   * but we can't read from negative offsets
325         #   * the offset table tells us the 'ish', also the positive offset
326         # A future version of the SMDF slot format should consider using
327         # fixed-size slots so we can retrieve less data. For now, we'll just
328         # read 2000 bytes, which also happens to read enough actual data to
329         # pre-fetch a 9-entry dirnode.
330         self._read_size = 2000
331         if mode == MODE_CHECK:
332             # we use unpack_prefix_and_signature, so we need 1k
333             self._read_size = 1000
334         self._need_privkey = False
335         if mode == MODE_WRITE and not self._node._privkey:
336             self._need_privkey = True
337
338         prefix = storage.si_b2a(self._storage_index)[:5]
339         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
340                                    si=prefix, mode=mode)
341
342     def get_status(self):
343         return self._status
344
345     def log(self, *args, **kwargs):
346         if "parent" not in kwargs:
347             kwargs["parent"] = self._log_number
348         return log.msg(*args, **kwargs)
349
350     def update(self):
351         """Update the servermap to reflect current conditions. Returns a
352         Deferred that fires with the servermap once the update has finished."""
353         self._started = time.time()
354         self._status.set_active(True)
355
356         # self._valid_versions is a set of validated verinfo tuples. We just
357         # use it to remember which versions had valid signatures, so we can
358         # avoid re-checking the signatures for each share.
359         self._valid_versions = set()
360
361         # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
362         # timestamp) tuples. This is used to figure out which versions might
363         # be retrievable, and to make the eventual data download faster.
364         self.versionmap = DictOfSets()
365
366         self._done_deferred = defer.Deferred()
367
368         # first, which peers should be talk to? Any that were in our old
369         # servermap, plus "enough" others.
370
371         self._queries_completed = 0
372
373         client = self._node._client
374         full_peerlist = client.get_permuted_peers("storage",
375                                                   self._node._storage_index)
376         self.full_peerlist = full_peerlist # for use later, immutable
377         self.extra_peers = full_peerlist[:] # peers are removed as we use them
378         self._good_peers = set() # peers who had some shares
379         self._empty_peers = set() # peers who don't have any shares
380         self._bad_peers = set() # peers to whom our queries failed
381
382         k = self._node.get_required_shares()
383         if k is None:
384             # make a guess
385             k = 3
386         N = self._node.get_required_shares()
387         if N is None:
388             N = 10
389         self.EPSILON = k
390         # we want to send queries to at least this many peers (although we
391         # might not wait for all of their answers to come back)
392         self.num_peers_to_query = k + self.EPSILON
393
394         if self.mode == MODE_CHECK:
395             initial_peers_to_query = dict(full_peerlist)
396             must_query = set(initial_peers_to_query.keys())
397             self.extra_peers = []
398         elif self.mode == MODE_WRITE:
399             # we're planning to replace all the shares, so we want a good
400             # chance of finding them all. We will keep searching until we've
401             # seen epsilon that don't have a share.
402             self.num_peers_to_query = N + self.EPSILON
403             initial_peers_to_query, must_query = self._build_initial_querylist()
404             self.required_num_empty_peers = self.EPSILON
405
406             # TODO: arrange to read lots of data from k-ish servers, to avoid
407             # the extra round trip required to read large directories. This
408             # might also avoid the round trip required to read the encrypted
409             # private key.
410
411         else:
412             initial_peers_to_query, must_query = self._build_initial_querylist()
413
414         # this is a set of peers that we are required to get responses from:
415         # they are peers who used to have a share, so we need to know where
416         # they currently stand, even if that means we have to wait for a
417         # silently-lost TCP connection to time out. We remove peers from this
418         # set as we get responses.
419         self._must_query = must_query
420
421         # now initial_peers_to_query contains the peers that we should ask,
422         # self.must_query contains the peers that we must have heard from
423         # before we can consider ourselves finished, and self.extra_peers
424         # contains the overflow (peers that we should tap if we don't get
425         # enough responses)
426
427         self._send_initial_requests(initial_peers_to_query)
428         self._status.timings["setup"] = time.time() - self._started
429         return self._done_deferred
430
431     def _build_initial_querylist(self):
432         initial_peers_to_query = {}
433         must_query = set()
434         for peerid in self._servermap.all_peers():
435             ss = self._servermap.connections[peerid]
436             # we send queries to everyone who was already in the sharemap
437             initial_peers_to_query[peerid] = ss
438             # and we must wait for responses from them
439             must_query.add(peerid)
440
441         while ((self.num_peers_to_query > len(initial_peers_to_query))
442                and self.extra_peers):
443             (peerid, ss) = self.extra_peers.pop(0)
444             initial_peers_to_query[peerid] = ss
445
446         return initial_peers_to_query, must_query
447
448     def _send_initial_requests(self, peerlist):
449         self._status.set_status("Sending %d initial queries" % len(peerlist))
450         self._queries_outstanding = set()
451         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
452         dl = []
453         for (peerid, ss) in peerlist.items():
454             self._queries_outstanding.add(peerid)
455             self._do_query(ss, peerid, self._storage_index, self._read_size)
456
457         # control flow beyond this point: state machine. Receiving responses
458         # from queries is the input. We might send out more queries, or we
459         # might produce a result.
460         return None
461
462     def _do_query(self, ss, peerid, storage_index, readsize):
463         self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
464                  peerid=idlib.shortnodeid_b2a(peerid),
465                  readsize=readsize,
466                  level=log.NOISY)
467         self._servermap.connections[peerid] = ss
468         started = time.time()
469         self._queries_outstanding.add(peerid)
470         d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
471         d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
472                       started)
473         d.addErrback(self._query_failed, peerid)
474         # errors that aren't handled by _query_failed (and errors caused by
475         # _query_failed) get logged, but we still want to check for doneness.
476         d.addErrback(log.err)
477         d.addBoth(self._check_for_done)
478         d.addErrback(self._fatal_error)
479         return d
480
481     def _do_read(self, ss, peerid, storage_index, shnums, readv):
482         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
483         return d
484
485     def _got_results(self, datavs, peerid, readsize, stuff, started):
486         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
487                      peerid=idlib.shortnodeid_b2a(peerid),
488                      numshares=len(datavs),
489                      level=log.NOISY)
490         now = time.time()
491         elapsed = now - started
492         self._queries_outstanding.discard(peerid)
493         self._must_query.discard(peerid)
494         self._queries_completed += 1
495         if not self._running:
496             self.log("but we're not running, so we'll ignore it", parent=lp)
497             self._status.add_per_server_time(peerid, "late", started, elapsed)
498             return
499         self._status.add_per_server_time(peerid, "query", started, elapsed)
500
501         if datavs:
502             self._good_peers.add(peerid)
503         else:
504             self._empty_peers.add(peerid)
505
506         last_verinfo = None
507         last_shnum = None
508         for shnum,datav in datavs.items():
509             data = datav[0]
510             try:
511                 verinfo = self._got_results_one_share(shnum, data, peerid)
512                 last_verinfo = verinfo
513                 last_shnum = shnum
514             except CorruptShareError, e:
515                 # log it and give the other shares a chance to be processed
516                 f = failure.Failure()
517                 self.log("bad share: %s %s" % (f, f.value),
518                          parent=lp, level=log.WEIRD)
519                 self._bad_peers.add(peerid)
520                 self._last_failure = f
521                 self._servermap.problems.append(f)
522                 pass
523
524         self._status.timings["cumulative_verify"] += (time.time() - now)
525
526         if self._need_privkey and last_verinfo:
527             # send them a request for the privkey. We send one request per
528             # server.
529             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
530              offsets_tuple) = last_verinfo
531             o = dict(offsets_tuple)
532
533             self._queries_outstanding.add(peerid)
534             readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
535             ss = self._servermap.connections[peerid]
536             privkey_started = time.time()
537             d = self._do_read(ss, peerid, self._storage_index,
538                               [last_shnum], readv)
539             d.addCallback(self._got_privkey_results, peerid, last_shnum,
540                           privkey_started)
541             d.addErrback(self._privkey_query_failed, peerid, last_shnum)
542             d.addErrback(log.err)
543             d.addCallback(self._check_for_done)
544             d.addErrback(self._fatal_error)
545
546         # all done!
547         self.log("_got_results done", parent=lp)
548
549     def _got_results_one_share(self, shnum, data, peerid):
550         lp = self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
551                       shnum=shnum,
552                       peerid=idlib.shortnodeid_b2a(peerid))
553
554         # this might raise NeedMoreDataError, if the pubkey and signature
555         # live at some weird offset. That shouldn't happen, so I'm going to
556         # treat it as a bad share.
557         (seqnum, root_hash, IV, k, N, segsize, datalength,
558          pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
559
560         if not self._node._pubkey:
561             fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
562             assert len(fingerprint) == 32
563             if fingerprint != self._node._fingerprint:
564                 raise CorruptShareError(peerid, shnum,
565                                         "pubkey doesn't match fingerprint")
566             self._node._pubkey = self._deserialize_pubkey(pubkey_s)
567
568         if self._need_privkey:
569             self._try_to_extract_privkey(data, peerid, shnum)
570
571         (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
572          ig_segsize, ig_datalen, offsets) = unpack_header(data)
573         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
574
575         verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
576                    offsets_tuple)
577
578         if verinfo not in self._valid_versions:
579             # it's a new pair. Verify the signature.
580             valid = self._node._pubkey.verify(prefix, signature)
581             if not valid:
582                 raise CorruptShareError(peerid, shnum, "signature is invalid")
583
584             # ok, it's a valid verinfo. Add it to the list of validated
585             # versions.
586             self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
587                      % (seqnum, base32.b2a(root_hash)[:4],
588                         idlib.shortnodeid_b2a(peerid), shnum,
589                         k, N, segsize, datalength),
590                      parent=lp)
591             self._valid_versions.add(verinfo)
592         # We now know that this is a valid candidate verinfo.
593
594         if (peerid, shnum) in self._servermap.bad_shares:
595             # we've been told that the rest of the data in this share is
596             # unusable, so don't add it to the servermap.
597             self.log("but we've been told this is a bad share",
598                      parent=lp, level=log.UNUSUAL)
599             return verinfo
600
601         # Add the info to our servermap.
602         timestamp = time.time()
603         self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
604         # and the versionmap
605         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
606         return verinfo
607
608     def _deserialize_pubkey(self, pubkey_s):
609         verifier = rsa.create_verifying_key_from_string(pubkey_s)
610         return verifier
611
612     def _try_to_extract_privkey(self, data, peerid, shnum):
613         try:
614             r = unpack_share(data)
615         except NeedMoreDataError, e:
616             # this share won't help us. oh well.
617             offset = e.encprivkey_offset
618             length = e.encprivkey_length
619             self.log("shnum %d on peerid %s: share was too short (%dB) "
620                      "to get the encprivkey; [%d:%d] ought to hold it" %
621                      (shnum, idlib.shortnodeid_b2a(peerid), len(data),
622                       offset, offset+length))
623             # NOTE: if uncoordinated writes are taking place, someone might
624             # change the share (and most probably move the encprivkey) before
625             # we get a chance to do one of these reads and fetch it. This
626             # will cause us to see a NotEnoughSharesError(unable to fetch
627             # privkey) instead of an UncoordinatedWriteError . This is a
628             # nuisance, but it will go away when we move to DSA-based mutable
629             # files (since the privkey will be small enough to fit in the
630             # write cap).
631
632             return
633
634         (seqnum, root_hash, IV, k, N, segsize, datalen,
635          pubkey, signature, share_hash_chain, block_hash_tree,
636          share_data, enc_privkey) = r
637
638         return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
639
640     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
641
642         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
643         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
644         if alleged_writekey != self._node.get_writekey():
645             self.log("invalid privkey from %s shnum %d" %
646                      (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
647             return
648
649         # it's good
650         self.log("got valid privkey from shnum %d on peerid %s" %
651                  (shnum, idlib.shortnodeid_b2a(peerid)))
652         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
653         self._node._populate_encprivkey(enc_privkey)
654         self._node._populate_privkey(privkey)
655         self._need_privkey = False
656         self._status.set_privkey_from(peerid)
657
658
659     def _query_failed(self, f, peerid):
660         self.log("error during query: %s %s" % (f, f.value), level=log.WEIRD)
661         if not self._running:
662             return
663         self._must_query.discard(peerid)
664         self._queries_outstanding.discard(peerid)
665         self._bad_peers.add(peerid)
666         self._servermap.problems.append(f)
667         self._servermap.unreachable_peers.add(peerid) # TODO: overkill?
668         self._queries_completed += 1
669         self._last_failure = f
670
671     def _got_privkey_results(self, datavs, peerid, shnum, started):
672         now = time.time()
673         elapsed = now - started
674         self._status.add_per_server_time(peerid, "privkey", started, elapsed)
675         self._queries_outstanding.discard(peerid)
676         if not self._need_privkey:
677             return
678         if shnum not in datavs:
679             self.log("privkey wasn't there when we asked it", level=log.WEIRD)
680             return
681         datav = datavs[shnum]
682         enc_privkey = datav[0]
683         self._try_to_validate_privkey(enc_privkey, peerid, shnum)
684
685     def _privkey_query_failed(self, f, peerid, shnum):
686         self._queries_outstanding.discard(peerid)
687         self.log("error during privkey query: %s %s" % (f, f.value),
688                  level=log.WEIRD)
689         if not self._running:
690             return
691         self._queries_outstanding.discard(peerid)
692         self._servermap.problems.append(f)
693         self._last_failure = f
694
695     def _check_for_done(self, res):
696         # exit paths:
697         #  return self._send_more_queries(outstanding) : send some more queries
698         #  return self._done() : all done
699         #  return : keep waiting, no new queries
700
701         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
702                               "%(outstanding)d queries outstanding, "
703                               "%(extra)d extra peers available, "
704                               "%(must)d 'must query' peers left"
705                               ),
706                       mode=self.mode,
707                       outstanding=len(self._queries_outstanding),
708                       extra=len(self.extra_peers),
709                       must=len(self._must_query),
710                       level=log.NOISY,
711                       )
712
713         if not self._running:
714             self.log("but we're not running", parent=lp, level=log.NOISY)
715             return
716
717         if self._must_query:
718             # we are still waiting for responses from peers that used to have
719             # a share, so we must continue to wait. No additional queries are
720             # required at this time.
721             self.log("%d 'must query' peers left" % len(self._must_query),
722                      parent=lp)
723             return
724
725         if (not self._queries_outstanding and not self.extra_peers):
726             # all queries have retired, and we have no peers left to ask. No
727             # more progress can be made, therefore we are done.
728             self.log("all queries are retired, no extra peers: done",
729                      parent=lp)
730             return self._done()
731
732         recoverable_versions = self._servermap.recoverable_versions()
733         unrecoverable_versions = self._servermap.unrecoverable_versions()
734
735         # what is our completion policy? how hard should we work?
736
737         if self.mode == MODE_ANYTHING:
738             if recoverable_versions:
739                 self.log("%d recoverable versions: done"
740                          % len(recoverable_versions),
741                          parent=lp)
742                 return self._done()
743
744         if self.mode == MODE_CHECK:
745             # we used self._must_query, and we know there aren't any
746             # responses still waiting, so that means we must be done
747             self.log("done", parent=lp)
748             return self._done()
749
750         MAX_IN_FLIGHT = 5
751         if self.mode == MODE_READ:
752             # if we've queried k+epsilon servers, and we see a recoverable
753             # version, and we haven't seen any unrecoverable higher-seqnum'ed
754             # versions, then we're done.
755
756             if self._queries_completed < self.num_peers_to_query:
757                 self.log(format="%(completed)d completed, %(query)d to query: need more",
758                          completed=self._queries_completed,
759                          query=self.num_peers_to_query,
760                          parent=lp)
761                 return self._send_more_queries(MAX_IN_FLIGHT)
762             if not recoverable_versions:
763                 self.log("no recoverable versions: need more",
764                          parent=lp)
765                 return self._send_more_queries(MAX_IN_FLIGHT)
766             highest_recoverable = max(recoverable_versions)
767             highest_recoverable_seqnum = highest_recoverable[0]
768             for unrec_verinfo in unrecoverable_versions:
769                 if unrec_verinfo[0] > highest_recoverable_seqnum:
770                     # there is evidence of a higher-seqnum version, but we
771                     # don't yet see enough shares to recover it. Try harder.
772                     # TODO: consider sending more queries.
773                     # TODO: consider limiting the search distance
774                     self.log("evidence of higher seqnum: need more")
775                     return self._send_more_queries(MAX_IN_FLIGHT)
776             # all the unrecoverable versions were old or concurrent with a
777             # recoverable version. Good enough.
778             self.log("no higher-seqnum: done", parent=lp)
779             return self._done()
780
781         if self.mode == MODE_WRITE:
782             # we want to keep querying until we've seen a few that don't have
783             # any shares, to be sufficiently confident that we've seen all
784             # the shares. This is still less work than MODE_CHECK, which asks
785             # every server in the world.
786
787             if not recoverable_versions:
788                 self.log("no recoverable versions: need more", parent=lp)
789                 return self._send_more_queries(MAX_IN_FLIGHT)
790
791             last_found = -1
792             last_not_responded = -1
793             num_not_responded = 0
794             num_not_found = 0
795             states = []
796             found_boundary = False
797
798             for i,(peerid,ss) in enumerate(self.full_peerlist):
799                 if peerid in self._bad_peers:
800                     # query failed
801                     states.append("x")
802                     #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
803                 elif peerid in self._empty_peers:
804                     # no shares
805                     states.append("0")
806                     #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
807                     if last_found != -1:
808                         num_not_found += 1
809                         if num_not_found >= self.EPSILON:
810                             self.log("found our boundary, %s" %
811                                      "".join(states),
812                                      parent=lp)
813                             found_boundary = True
814                             break
815
816                 elif peerid in self._good_peers:
817                     # yes shares
818                     states.append("1")
819                     #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
820                     last_found = i
821                     num_not_found = 0
822                 else:
823                     # not responded yet
824                     states.append("?")
825                     #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
826                     last_not_responded = i
827                     num_not_responded += 1
828
829             if found_boundary:
830                 # we need to know that we've gotten answers from
831                 # everybody to the left of here
832                 if last_not_responded == -1:
833                     # we're done
834                     self.log("have all our answers",
835                              parent=lp)
836                     # .. unless we're still waiting on the privkey
837                     if self._need_privkey:
838                         self.log("but we're still waiting for the privkey",
839                                  parent=lp)
840                         # if we found the boundary but we haven't yet found
841                         # the privkey, we may need to look further. If
842                         # somehow all the privkeys were corrupted (but the
843                         # shares were readable), then this is likely to do an
844                         # exhaustive search.
845                         return self._send_more_queries(MAX_IN_FLIGHT)
846                     return self._done()
847                 # still waiting for somebody
848                 return self._send_more_queries(num_not_responded)
849
850             # if we hit here, we didn't find our boundary, so we're still
851             # waiting for peers
852             self.log("no boundary yet, %s" % "".join(states), parent=lp)
853             return self._send_more_queries(MAX_IN_FLIGHT)
854
855         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
856         # arbitrary, really I want this to be something like k -
857         # max(known_version_sharecounts) + some extra
858         self.log("catchall: need more", parent=lp)
859         return self._send_more_queries(MAX_IN_FLIGHT)
860
861     def _send_more_queries(self, num_outstanding):
862         more_queries = []
863
864         while True:
865             self.log(format=" there are %(outstanding)d queries outstanding",
866                      outstanding=len(self._queries_outstanding),
867                      level=log.NOISY)
868             active_queries = len(self._queries_outstanding) + len(more_queries)
869             if active_queries >= num_outstanding:
870                 break
871             if not self.extra_peers:
872                 break
873             more_queries.append(self.extra_peers.pop(0))
874
875         self.log(format="sending %(more)d more queries: %(who)s",
876                  more=len(more_queries),
877                  who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
878                                for (peerid,ss) in more_queries]),
879                  level=log.NOISY)
880
881         for (peerid, ss) in more_queries:
882             self._do_query(ss, peerid, self._storage_index, self._read_size)
883             # we'll retrigger when those queries come back
884
885     def _done(self):
886         if not self._running:
887             return
888         self._running = False
889         now = time.time()
890         elapsed = now - self._started
891         self._status.set_finished(now)
892         self._status.timings["total"] = elapsed
893         self._status.set_progress(1.0)
894         self._status.set_status("Done")
895         self._status.set_active(False)
896
897         self._servermap.last_update_mode = self.mode
898         self._servermap.last_update_time = self._started
899         # the servermap will not be touched after this
900         self.log("servermap: %s" % self._servermap.summarize_versions())
901         eventually(self._done_deferred.callback, self._servermap)
902
903     def _fatal_error(self, f):
904         self.log("fatal error", failure=f, level=log.WEIRD)
905         self._done_deferred.errback(f)
906
907