]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
mutable: remove some dead code, rearrange use of populate_pubkey
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.eventual import eventually
8 from allmydata.util import base32, hashutil, idlib, log
9 from allmydata import storage
10 from allmydata.interfaces import IServermapUpdaterStatus
11 from pycryptopp.publickey import rsa
12
13 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
14      DictOfSets, CorruptShareError, NeedMoreDataError
15 from layout import unpack_prefix_and_signature, unpack_header, unpack_share
16
17 class UpdateStatus:
18     implements(IServermapUpdaterStatus)
19     statusid_counter = count(0)
20     def __init__(self):
21         self.timings = {}
22         self.timings["per_server"] = {}
23         self.timings["cumulative_verify"] = 0.0
24         self.privkey_from = None
25         self.problems = {}
26         self.active = True
27         self.storage_index = None
28         self.mode = "?"
29         self.status = "Not started"
30         self.progress = 0.0
31         self.counter = self.statusid_counter.next()
32         self.started = time.time()
33         self.finished = None
34
35     def add_per_server_time(self, peerid, op, sent, elapsed):
36         assert op in ("query", "late", "privkey")
37         if peerid not in self.timings["per_server"]:
38             self.timings["per_server"][peerid] = []
39         self.timings["per_server"][peerid].append((op,sent,elapsed))
40
41     def get_started(self):
42         return self.started
43     def get_finished(self):
44         return self.finished
45     def get_storage_index(self):
46         return self.storage_index
47     def get_mode(self):
48         return self.mode
49     def get_servermap(self):
50         return self.servermap
51     def get_privkey_from(self):
52         return self.privkey_from
53     def using_helper(self):
54         return False
55     def get_size(self):
56         return "-NA-"
57     def get_status(self):
58         return self.status
59     def get_progress(self):
60         return self.progress
61     def get_active(self):
62         return self.active
63     def get_counter(self):
64         return self.counter
65
66     def set_storage_index(self, si):
67         self.storage_index = si
68     def set_mode(self, mode):
69         self.mode = mode
70     def set_privkey_from(self, peerid):
71         self.privkey_from = peerid
72     def set_status(self, status):
73         self.status = status
74     def set_progress(self, value):
75         self.progress = value
76     def set_active(self, value):
77         self.active = value
78     def set_finished(self, when):
79         self.finished = when
80
81 class ServerMap:
82     """I record the placement of mutable shares.
83
84     This object records which shares (of various versions) are located on
85     which servers.
86
87     One purpose I serve is to inform callers about which versions of the
88     mutable file are recoverable and 'current'.
89
90     A second purpose is to serve as a state marker for test-and-set
91     operations. I am passed out of retrieval operations and back into publish
92     operations, which means 'publish this new version, but only if nothing
93     has changed since I last retrieved this data'. This reduces the chances
94     of clobbering a simultaneous (uncoordinated) write.
95
96     @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
97                      (versionid, timestamp) tuple. Each 'versionid' is a
98                      tuple of (seqnum, root_hash, IV, segsize, datalength,
99                      k, N, signed_prefix, offsets)
100
101     @ivar connections: maps peerid to a RemoteReference
102
103     @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
104                       shares that I should ignore (because a previous user of
105                       the servermap determined that they were invalid). The
106                       updater only locates a certain number of shares: if
107                       some of these turn out to have integrity problems and
108                       are unusable, the caller will need to mark those shares
109                       as bad, then re-update the servermap, then try again.
110     """
111
112     def __init__(self):
113         self.servermap = {}
114         self.connections = {}
115         self.unreachable_peers = set() # peerids that didn't respond to queries
116         self.problems = [] # mostly for debugging
117         self.bad_shares = set()
118         self.last_update_mode = None
119         self.last_update_time = 0
120
121     def mark_bad_share(self, peerid, shnum):
122         """This share was found to be bad, not in the checkstring or
123         signature, but deeper in the share, detected at retrieve time. Remove
124         it from our list of useful shares, and remember that it is bad so we
125         don't add it back again later.
126         """
127         key = (peerid, shnum)
128         self.bad_shares.add(key)
129         self.servermap.pop(key, None)
130
131     def add_new_share(self, peerid, shnum, verinfo, timestamp):
132         """We've written a new share out, replacing any that was there
133         before."""
134         key = (peerid, shnum)
135         self.bad_shares.discard(key)
136         self.servermap[key] = (verinfo, timestamp)
137
138     def dump(self, out=sys.stdout):
139         print >>out, "servermap:"
140
141         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
142             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
143              offsets_tuple) = verinfo
144             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
145                           (idlib.shortnodeid_b2a(peerid), shnum,
146                            seqnum, base32.b2a(root_hash)[:4], k, N,
147                            datalength))
148         if self.problems:
149             print >>out, "%d PROBLEMS" % len(self.problems)
150             for f in self.problems:
151                 print >>out, str(f)
152         return out
153
154     def all_peers(self):
155         return set([peerid
156                     for (peerid, shnum)
157                     in self.servermap])
158
159     def make_sharemap(self):
160         """Return a dict that maps shnum to a set of peerds that hold it."""
161         sharemap = DictOfSets()
162         for (peerid, shnum) in self.servermap:
163             sharemap.add(shnum, peerid)
164         return sharemap
165
166     def make_versionmap(self):
167         """Return a dict that maps versionid to sets of (shnum, peerid,
168         timestamp) tuples."""
169         versionmap = DictOfSets()
170         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
171             versionmap.add(verinfo, (shnum, peerid, timestamp))
172         return versionmap
173
174     def shares_on_peer(self, peerid):
175         return set([shnum
176                     for (s_peerid, shnum)
177                     in self.servermap
178                     if s_peerid == peerid])
179
180     def version_on_peer(self, peerid, shnum):
181         key = (peerid, shnum)
182         if key in self.servermap:
183             (verinfo, timestamp) = self.servermap[key]
184             return verinfo
185         return None
186
187     def shares_available(self):
188         """Return a dict that maps verinfo to tuples of
189         (num_distinct_shares, k) tuples."""
190         versionmap = self.make_versionmap()
191         all_shares = {}
192         for verinfo, shares in versionmap.items():
193             s = set()
194             for (shnum, peerid, timestamp) in shares:
195                 s.add(shnum)
196             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
197              offsets_tuple) = verinfo
198             all_shares[verinfo] = (len(s), k)
199         return all_shares
200
201     def highest_seqnum(self):
202         available = self.shares_available()
203         seqnums = [verinfo[0]
204                    for verinfo in available.keys()]
205         seqnums.append(0)
206         return max(seqnums)
207
208     def summarize_versions(self):
209         """Return a string describing which versions we know about."""
210         versionmap = self.make_versionmap()
211         bits = []
212         for (verinfo, shares) in versionmap.items():
213             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
214              offsets_tuple) = verinfo
215             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
216             bits.append("%d*seq%d-%s" %
217                         (len(shnums), seqnum, base32.b2a(root_hash)[:4]))
218         return "/".join(bits)
219
220     def recoverable_versions(self):
221         """Return a set of versionids, one for each version that is currently
222         recoverable."""
223         versionmap = self.make_versionmap()
224
225         recoverable_versions = set()
226         for (verinfo, shares) in versionmap.items():
227             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
228              offsets_tuple) = verinfo
229             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
230             if len(shnums) >= k:
231                 # this one is recoverable
232                 recoverable_versions.add(verinfo)
233
234         return recoverable_versions
235
236     def unrecoverable_versions(self):
237         """Return a set of versionids, one for each version that is currently
238         unrecoverable."""
239         versionmap = self.make_versionmap()
240
241         unrecoverable_versions = set()
242         for (verinfo, shares) in versionmap.items():
243             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
244              offsets_tuple) = verinfo
245             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
246             if len(shnums) < k:
247                 unrecoverable_versions.add(verinfo)
248
249         return unrecoverable_versions
250
251     def best_recoverable_version(self):
252         """Return a single versionid, for the so-called 'best' recoverable
253         version. Sequence number is the primary sort criteria, followed by
254         root hash. Returns None if there are no recoverable versions."""
255         recoverable = list(self.recoverable_versions())
256         recoverable.sort()
257         if recoverable:
258             return recoverable[-1]
259         return None
260
261     def unrecoverable_newer_versions(self):
262         # Return a dict of versionid -> health, for versions that are
263         # unrecoverable and have later seqnums than any recoverable versions.
264         # These indicate that a write will lose data.
265         versionmap = self.make_versionmap()
266         healths = {} # maps verinfo to (found,k)
267         unrecoverable = set()
268         highest_recoverable_seqnum = -1
269         for (verinfo, shares) in versionmap.items():
270             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
271              offsets_tuple) = verinfo
272             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
273             healths[verinfo] = (len(shnums),k)
274             if len(shnums) < k:
275                 unrecoverable.add(verinfo)
276             else:
277                 highest_recoverable_seqnum = max(seqnum,
278                                                  highest_recoverable_seqnum)
279
280         newversions = {}
281         for verinfo in unrecoverable:
282             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
283              offsets_tuple) = verinfo
284             if seqnum > highest_recoverable_seqnum:
285                 newversions[verinfo] = healths[verinfo]
286
287         return newversions
288
289
290     def needs_merge(self):
291         # return True if there are multiple recoverable versions with the
292         # same seqnum, meaning that MutableFileNode.read_best_version is not
293         # giving you the whole story, and that using its data to do a
294         # subsequent publish will lose information.
295         return bool(len(self.recoverable_versions()) > 1)
296
297
298 class ServermapUpdater:
299     def __init__(self, filenode, servermap, mode=MODE_READ):
300         """I update a servermap, locating a sufficient number of useful
301         shares and remembering where they are located.
302
303         """
304
305         self._node = filenode
306         self._servermap = servermap
307         self.mode = mode
308         self._running = True
309
310         self._storage_index = filenode.get_storage_index()
311         self._last_failure = None
312
313         self._status = UpdateStatus()
314         self._status.set_storage_index(self._storage_index)
315         self._status.set_progress(0.0)
316         self._status.set_mode(mode)
317
318         # how much data should we read?
319         #  * if we only need the checkstring, then [0:75]
320         #  * if we need to validate the checkstring sig, then [543ish:799ish]
321         #  * if we need the verification key, then [107:436ish]
322         #   * the offset table at [75:107] tells us about the 'ish'
323         #  * if we need the encrypted private key, we want [-1216ish:]
324         #   * but we can't read from negative offsets
325         #   * the offset table tells us the 'ish', also the positive offset
326         # A future version of the SMDF slot format should consider using
327         # fixed-size slots so we can retrieve less data. For now, we'll just
328         # read 2000 bytes, which also happens to read enough actual data to
329         # pre-fetch a 9-entry dirnode.
330         self._read_size = 2000
331         if mode == MODE_CHECK:
332             # we use unpack_prefix_and_signature, so we need 1k
333             self._read_size = 1000
334         self._need_privkey = False
335         if mode == MODE_WRITE and not self._node._privkey:
336             self._need_privkey = True
337
338         prefix = storage.si_b2a(self._storage_index)[:5]
339         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
340                                    si=prefix, mode=mode)
341
342     def get_status(self):
343         return self._status
344
345     def log(self, *args, **kwargs):
346         if "parent" not in kwargs:
347             kwargs["parent"] = self._log_number
348         return log.msg(*args, **kwargs)
349
350     def update(self):
351         """Update the servermap to reflect current conditions. Returns a
352         Deferred that fires with the servermap once the update has finished."""
353         self._started = time.time()
354         self._status.set_active(True)
355
356         # self._valid_versions is a set of validated verinfo tuples. We just
357         # use it to remember which versions had valid signatures, so we can
358         # avoid re-checking the signatures for each share.
359         self._valid_versions = set()
360
361         # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
362         # timestamp) tuples. This is used to figure out which versions might
363         # be retrievable, and to make the eventual data download faster.
364         self.versionmap = DictOfSets()
365
366         self._done_deferred = defer.Deferred()
367
368         # first, which peers should be talk to? Any that were in our old
369         # servermap, plus "enough" others.
370
371         self._queries_completed = 0
372
373         client = self._node._client
374         full_peerlist = client.get_permuted_peers("storage",
375                                                   self._node._storage_index)
376         self.full_peerlist = full_peerlist # for use later, immutable
377         self.extra_peers = full_peerlist[:] # peers are removed as we use them
378         self._good_peers = set() # peers who had some shares
379         self._empty_peers = set() # peers who don't have any shares
380         self._bad_peers = set() # peers to whom our queries failed
381
382         k = self._node.get_required_shares()
383         if k is None:
384             # make a guess
385             k = 3
386         N = self._node.get_required_shares()
387         if N is None:
388             N = 10
389         self.EPSILON = k
390         # we want to send queries to at least this many peers (although we
391         # might not wait for all of their answers to come back)
392         self.num_peers_to_query = k + self.EPSILON
393
394         if self.mode == MODE_CHECK:
395             initial_peers_to_query = dict(full_peerlist)
396             must_query = set(initial_peers_to_query.keys())
397             self.extra_peers = []
398         elif self.mode == MODE_WRITE:
399             # we're planning to replace all the shares, so we want a good
400             # chance of finding them all. We will keep searching until we've
401             # seen epsilon that don't have a share.
402             self.num_peers_to_query = N + self.EPSILON
403             initial_peers_to_query, must_query = self._build_initial_querylist()
404             self.required_num_empty_peers = self.EPSILON
405
406             # TODO: arrange to read lots of data from k-ish servers, to avoid
407             # the extra round trip required to read large directories. This
408             # might also avoid the round trip required to read the encrypted
409             # private key.
410
411         else:
412             initial_peers_to_query, must_query = self._build_initial_querylist()
413
414         # this is a set of peers that we are required to get responses from:
415         # they are peers who used to have a share, so we need to know where
416         # they currently stand, even if that means we have to wait for a
417         # silently-lost TCP connection to time out. We remove peers from this
418         # set as we get responses.
419         self._must_query = must_query
420
421         # now initial_peers_to_query contains the peers that we should ask,
422         # self.must_query contains the peers that we must have heard from
423         # before we can consider ourselves finished, and self.extra_peers
424         # contains the overflow (peers that we should tap if we don't get
425         # enough responses)
426
427         self._send_initial_requests(initial_peers_to_query)
428         self._status.timings["initial_queries"] = time.time() - self._started
429         return self._done_deferred
430
431     def _build_initial_querylist(self):
432         initial_peers_to_query = {}
433         must_query = set()
434         for peerid in self._servermap.all_peers():
435             ss = self._servermap.connections[peerid]
436             # we send queries to everyone who was already in the sharemap
437             initial_peers_to_query[peerid] = ss
438             # and we must wait for responses from them
439             must_query.add(peerid)
440
441         while ((self.num_peers_to_query > len(initial_peers_to_query))
442                and self.extra_peers):
443             (peerid, ss) = self.extra_peers.pop(0)
444             initial_peers_to_query[peerid] = ss
445
446         return initial_peers_to_query, must_query
447
448     def _send_initial_requests(self, peerlist):
449         self._status.set_status("Sending %d initial queries" % len(peerlist))
450         self._queries_outstanding = set()
451         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
452         dl = []
453         for (peerid, ss) in peerlist.items():
454             self._queries_outstanding.add(peerid)
455             self._do_query(ss, peerid, self._storage_index, self._read_size)
456
457         # control flow beyond this point: state machine. Receiving responses
458         # from queries is the input. We might send out more queries, or we
459         # might produce a result.
460         return None
461
462     def _do_query(self, ss, peerid, storage_index, readsize):
463         self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
464                  peerid=idlib.shortnodeid_b2a(peerid),
465                  readsize=readsize,
466                  level=log.NOISY)
467         self._servermap.connections[peerid] = ss
468         started = time.time()
469         self._queries_outstanding.add(peerid)
470         d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
471         d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
472                       started)
473         d.addErrback(self._query_failed, peerid)
474         # errors that aren't handled by _query_failed (and errors caused by
475         # _query_failed) get logged, but we still want to check for doneness.
476         d.addErrback(log.err)
477         d.addBoth(self._check_for_done)
478         d.addErrback(self._fatal_error)
479         return d
480
481     def _do_read(self, ss, peerid, storage_index, shnums, readv):
482         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
483         return d
484
485     def _got_results(self, datavs, peerid, readsize, stuff, started):
486         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
487                      peerid=idlib.shortnodeid_b2a(peerid),
488                      numshares=len(datavs),
489                      level=log.NOISY)
490         now = time.time()
491         elapsed = now - started
492         self._queries_outstanding.discard(peerid)
493         self._must_query.discard(peerid)
494         self._queries_completed += 1
495         if not self._running:
496             self.log("but we're not running, so we'll ignore it", parent=lp)
497             self._status.add_per_server_time(peerid, "late", started, elapsed)
498             return
499         self._status.add_per_server_time(peerid, "query", started, elapsed)
500
501         if datavs:
502             self._good_peers.add(peerid)
503         else:
504             self._empty_peers.add(peerid)
505
506         last_verinfo = None
507         last_shnum = None
508         for shnum,datav in datavs.items():
509             data = datav[0]
510             try:
511                 verinfo = self._got_results_one_share(shnum, data, peerid)
512                 last_verinfo = verinfo
513                 last_shnum = shnum
514                 self._node._cache.add(verinfo, shnum, 0, data, now)
515             except CorruptShareError, e:
516                 # log it and give the other shares a chance to be processed
517                 f = failure.Failure()
518                 self.log("bad share: %s %s" % (f, f.value),
519                          parent=lp, level=log.WEIRD)
520                 self._bad_peers.add(peerid)
521                 self._last_failure = f
522                 self._servermap.problems.append(f)
523                 pass
524
525         self._status.timings["cumulative_verify"] += (time.time() - now)
526
527         if self._need_privkey and last_verinfo:
528             # send them a request for the privkey. We send one request per
529             # server.
530             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
531              offsets_tuple) = last_verinfo
532             o = dict(offsets_tuple)
533
534             self._queries_outstanding.add(peerid)
535             readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
536             ss = self._servermap.connections[peerid]
537             privkey_started = time.time()
538             d = self._do_read(ss, peerid, self._storage_index,
539                               [last_shnum], readv)
540             d.addCallback(self._got_privkey_results, peerid, last_shnum,
541                           privkey_started)
542             d.addErrback(self._privkey_query_failed, peerid, last_shnum)
543             d.addErrback(log.err)
544             d.addCallback(self._check_for_done)
545             d.addErrback(self._fatal_error)
546
547         # all done!
548         self.log("_got_results done", parent=lp)
549
550     def _got_results_one_share(self, shnum, data, peerid):
551         lp = self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
552                       shnum=shnum,
553                       peerid=idlib.shortnodeid_b2a(peerid))
554
555         # this might raise NeedMoreDataError, if the pubkey and signature
556         # live at some weird offset. That shouldn't happen, so I'm going to
557         # treat it as a bad share.
558         (seqnum, root_hash, IV, k, N, segsize, datalength,
559          pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
560
561         if not self._node.get_pubkey():
562             fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
563             assert len(fingerprint) == 32
564             if fingerprint != self._node._fingerprint:
565                 raise CorruptShareError(peerid, shnum,
566                                         "pubkey doesn't match fingerprint")
567             self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
568
569         if self._need_privkey:
570             self._try_to_extract_privkey(data, peerid, shnum)
571
572         (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
573          ig_segsize, ig_datalen, offsets) = unpack_header(data)
574         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
575
576         verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
577                    offsets_tuple)
578
579         if verinfo not in self._valid_versions:
580             # it's a new pair. Verify the signature.
581             valid = self._node._pubkey.verify(prefix, signature)
582             if not valid:
583                 raise CorruptShareError(peerid, shnum, "signature is invalid")
584
585             # ok, it's a valid verinfo. Add it to the list of validated
586             # versions.
587             self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
588                      % (seqnum, base32.b2a(root_hash)[:4],
589                         idlib.shortnodeid_b2a(peerid), shnum,
590                         k, N, segsize, datalength),
591                      parent=lp)
592             self._valid_versions.add(verinfo)
593         # We now know that this is a valid candidate verinfo.
594
595         if (peerid, shnum) in self._servermap.bad_shares:
596             # we've been told that the rest of the data in this share is
597             # unusable, so don't add it to the servermap.
598             self.log("but we've been told this is a bad share",
599                      parent=lp, level=log.UNUSUAL)
600             return verinfo
601
602         # Add the info to our servermap.
603         timestamp = time.time()
604         self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
605         # and the versionmap
606         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
607         return verinfo
608
609     def _deserialize_pubkey(self, pubkey_s):
610         verifier = rsa.create_verifying_key_from_string(pubkey_s)
611         return verifier
612
613     def _try_to_extract_privkey(self, data, peerid, shnum):
614         try:
615             r = unpack_share(data)
616         except NeedMoreDataError, e:
617             # this share won't help us. oh well.
618             offset = e.encprivkey_offset
619             length = e.encprivkey_length
620             self.log("shnum %d on peerid %s: share was too short (%dB) "
621                      "to get the encprivkey; [%d:%d] ought to hold it" %
622                      (shnum, idlib.shortnodeid_b2a(peerid), len(data),
623                       offset, offset+length))
624             # NOTE: if uncoordinated writes are taking place, someone might
625             # change the share (and most probably move the encprivkey) before
626             # we get a chance to do one of these reads and fetch it. This
627             # will cause us to see a NotEnoughSharesError(unable to fetch
628             # privkey) instead of an UncoordinatedWriteError . This is a
629             # nuisance, but it will go away when we move to DSA-based mutable
630             # files (since the privkey will be small enough to fit in the
631             # write cap).
632
633             return
634
635         (seqnum, root_hash, IV, k, N, segsize, datalen,
636          pubkey, signature, share_hash_chain, block_hash_tree,
637          share_data, enc_privkey) = r
638
639         return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
640
641     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
642
643         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
644         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
645         if alleged_writekey != self._node.get_writekey():
646             self.log("invalid privkey from %s shnum %d" %
647                      (idlib.nodeid_b2a(peerid)[:8], shnum), level=log.WEIRD)
648             return
649
650         # it's good
651         self.log("got valid privkey from shnum %d on peerid %s" %
652                  (shnum, idlib.shortnodeid_b2a(peerid)))
653         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
654         self._node._populate_encprivkey(enc_privkey)
655         self._node._populate_privkey(privkey)
656         self._need_privkey = False
657         self._status.set_privkey_from(peerid)
658
659
660     def _query_failed(self, f, peerid):
661         self.log("error during query: %s %s" % (f, f.value), level=log.WEIRD)
662         if not self._running:
663             return
664         self._must_query.discard(peerid)
665         self._queries_outstanding.discard(peerid)
666         self._bad_peers.add(peerid)
667         self._servermap.problems.append(f)
668         self._servermap.unreachable_peers.add(peerid) # TODO: overkill?
669         self._queries_completed += 1
670         self._last_failure = f
671
672     def _got_privkey_results(self, datavs, peerid, shnum, started):
673         now = time.time()
674         elapsed = now - started
675         self._status.add_per_server_time(peerid, "privkey", started, elapsed)
676         self._queries_outstanding.discard(peerid)
677         if not self._need_privkey:
678             return
679         if shnum not in datavs:
680             self.log("privkey wasn't there when we asked it", level=log.WEIRD)
681             return
682         datav = datavs[shnum]
683         enc_privkey = datav[0]
684         self._try_to_validate_privkey(enc_privkey, peerid, shnum)
685
686     def _privkey_query_failed(self, f, peerid, shnum):
687         self._queries_outstanding.discard(peerid)
688         self.log("error during privkey query: %s %s" % (f, f.value),
689                  level=log.WEIRD)
690         if not self._running:
691             return
692         self._queries_outstanding.discard(peerid)
693         self._servermap.problems.append(f)
694         self._last_failure = f
695
696     def _check_for_done(self, res):
697         # exit paths:
698         #  return self._send_more_queries(outstanding) : send some more queries
699         #  return self._done() : all done
700         #  return : keep waiting, no new queries
701
702         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
703                               "%(outstanding)d queries outstanding, "
704                               "%(extra)d extra peers available, "
705                               "%(must)d 'must query' peers left"
706                               ),
707                       mode=self.mode,
708                       outstanding=len(self._queries_outstanding),
709                       extra=len(self.extra_peers),
710                       must=len(self._must_query),
711                       level=log.NOISY,
712                       )
713
714         if not self._running:
715             self.log("but we're not running", parent=lp, level=log.NOISY)
716             return
717
718         if self._must_query:
719             # we are still waiting for responses from peers that used to have
720             # a share, so we must continue to wait. No additional queries are
721             # required at this time.
722             self.log("%d 'must query' peers left" % len(self._must_query),
723                      parent=lp)
724             return
725
726         if (not self._queries_outstanding and not self.extra_peers):
727             # all queries have retired, and we have no peers left to ask. No
728             # more progress can be made, therefore we are done.
729             self.log("all queries are retired, no extra peers: done",
730                      parent=lp)
731             return self._done()
732
733         recoverable_versions = self._servermap.recoverable_versions()
734         unrecoverable_versions = self._servermap.unrecoverable_versions()
735
736         # what is our completion policy? how hard should we work?
737
738         if self.mode == MODE_ANYTHING:
739             if recoverable_versions:
740                 self.log("%d recoverable versions: done"
741                          % len(recoverable_versions),
742                          parent=lp)
743                 return self._done()
744
745         if self.mode == MODE_CHECK:
746             # we used self._must_query, and we know there aren't any
747             # responses still waiting, so that means we must be done
748             self.log("done", parent=lp)
749             return self._done()
750
751         MAX_IN_FLIGHT = 5
752         if self.mode == MODE_READ:
753             # if we've queried k+epsilon servers, and we see a recoverable
754             # version, and we haven't seen any unrecoverable higher-seqnum'ed
755             # versions, then we're done.
756
757             if self._queries_completed < self.num_peers_to_query:
758                 self.log(format="%(completed)d completed, %(query)d to query: need more",
759                          completed=self._queries_completed,
760                          query=self.num_peers_to_query,
761                          parent=lp)
762                 return self._send_more_queries(MAX_IN_FLIGHT)
763             if not recoverable_versions:
764                 self.log("no recoverable versions: need more",
765                          parent=lp)
766                 return self._send_more_queries(MAX_IN_FLIGHT)
767             highest_recoverable = max(recoverable_versions)
768             highest_recoverable_seqnum = highest_recoverable[0]
769             for unrec_verinfo in unrecoverable_versions:
770                 if unrec_verinfo[0] > highest_recoverable_seqnum:
771                     # there is evidence of a higher-seqnum version, but we
772                     # don't yet see enough shares to recover it. Try harder.
773                     # TODO: consider sending more queries.
774                     # TODO: consider limiting the search distance
775                     self.log("evidence of higher seqnum: need more")
776                     return self._send_more_queries(MAX_IN_FLIGHT)
777             # all the unrecoverable versions were old or concurrent with a
778             # recoverable version. Good enough.
779             self.log("no higher-seqnum: done", parent=lp)
780             return self._done()
781
782         if self.mode == MODE_WRITE:
783             # we want to keep querying until we've seen a few that don't have
784             # any shares, to be sufficiently confident that we've seen all
785             # the shares. This is still less work than MODE_CHECK, which asks
786             # every server in the world.
787
788             if not recoverable_versions:
789                 self.log("no recoverable versions: need more", parent=lp)
790                 return self._send_more_queries(MAX_IN_FLIGHT)
791
792             last_found = -1
793             last_not_responded = -1
794             num_not_responded = 0
795             num_not_found = 0
796             states = []
797             found_boundary = False
798
799             for i,(peerid,ss) in enumerate(self.full_peerlist):
800                 if peerid in self._bad_peers:
801                     # query failed
802                     states.append("x")
803                     #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
804                 elif peerid in self._empty_peers:
805                     # no shares
806                     states.append("0")
807                     #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
808                     if last_found != -1:
809                         num_not_found += 1
810                         if num_not_found >= self.EPSILON:
811                             self.log("found our boundary, %s" %
812                                      "".join(states),
813                                      parent=lp)
814                             found_boundary = True
815                             break
816
817                 elif peerid in self._good_peers:
818                     # yes shares
819                     states.append("1")
820                     #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
821                     last_found = i
822                     num_not_found = 0
823                 else:
824                     # not responded yet
825                     states.append("?")
826                     #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
827                     last_not_responded = i
828                     num_not_responded += 1
829
830             if found_boundary:
831                 # we need to know that we've gotten answers from
832                 # everybody to the left of here
833                 if last_not_responded == -1:
834                     # we're done
835                     self.log("have all our answers",
836                              parent=lp)
837                     # .. unless we're still waiting on the privkey
838                     if self._need_privkey:
839                         self.log("but we're still waiting for the privkey",
840                                  parent=lp)
841                         # if we found the boundary but we haven't yet found
842                         # the privkey, we may need to look further. If
843                         # somehow all the privkeys were corrupted (but the
844                         # shares were readable), then this is likely to do an
845                         # exhaustive search.
846                         return self._send_more_queries(MAX_IN_FLIGHT)
847                     return self._done()
848                 # still waiting for somebody
849                 return self._send_more_queries(num_not_responded)
850
851             # if we hit here, we didn't find our boundary, so we're still
852             # waiting for peers
853             self.log("no boundary yet, %s" % "".join(states), parent=lp)
854             return self._send_more_queries(MAX_IN_FLIGHT)
855
856         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
857         # arbitrary, really I want this to be something like k -
858         # max(known_version_sharecounts) + some extra
859         self.log("catchall: need more", parent=lp)
860         return self._send_more_queries(MAX_IN_FLIGHT)
861
862     def _send_more_queries(self, num_outstanding):
863         more_queries = []
864
865         while True:
866             self.log(format=" there are %(outstanding)d queries outstanding",
867                      outstanding=len(self._queries_outstanding),
868                      level=log.NOISY)
869             active_queries = len(self._queries_outstanding) + len(more_queries)
870             if active_queries >= num_outstanding:
871                 break
872             if not self.extra_peers:
873                 break
874             more_queries.append(self.extra_peers.pop(0))
875
876         self.log(format="sending %(more)d more queries: %(who)s",
877                  more=len(more_queries),
878                  who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
879                                for (peerid,ss) in more_queries]),
880                  level=log.NOISY)
881
882         for (peerid, ss) in more_queries:
883             self._do_query(ss, peerid, self._storage_index, self._read_size)
884             # we'll retrigger when those queries come back
885
886     def _done(self):
887         if not self._running:
888             return
889         self._running = False
890         now = time.time()
891         elapsed = now - self._started
892         self._status.set_finished(now)
893         self._status.timings["total"] = elapsed
894         self._status.set_progress(1.0)
895         self._status.set_status("Done")
896         self._status.set_active(False)
897
898         self._servermap.last_update_mode = self.mode
899         self._servermap.last_update_time = self._started
900         # the servermap will not be touched after this
901         self.log("servermap: %s" % self._servermap.summarize_versions())
902         eventually(self._done_deferred.callback, self._servermap)
903
904     def _fatal_error(self, f):
905         self.log("fatal error", failure=f, level=log.WEIRD)
906         self._done_deferred.errback(f)
907
908