]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
cb93fc5ddaa74d074765ec2f76b00caf6f9d12f3
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually, \
8                          fireEventually
9 from allmydata.util import base32, hashutil, idlib, log, deferredutil
10 from allmydata.util.dictutil import DictOfSets
11 from allmydata.storage.server import si_b2a
12 from allmydata.interfaces import IServermapUpdaterStatus
13 from pycryptopp.publickey import rsa
14
15 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
16      CorruptShareError
17 from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
18
19 class UpdateStatus:
20     implements(IServermapUpdaterStatus)
21     statusid_counter = count(0)
22     def __init__(self):
23         self.timings = {}
24         self.timings["per_server"] = {}
25         self.timings["cumulative_verify"] = 0.0
26         self.privkey_from = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.mode = "?"
31         self.status = "Not started"
32         self.progress = 0.0
33         self.counter = self.statusid_counter.next()
34         self.started = time.time()
35         self.finished = None
36
37     def add_per_server_time(self, peerid, op, sent, elapsed):
38         assert op in ("query", "late", "privkey")
39         if peerid not in self.timings["per_server"]:
40             self.timings["per_server"][peerid] = []
41         self.timings["per_server"][peerid].append((op,sent,elapsed))
42
43     def get_started(self):
44         return self.started
45     def get_finished(self):
46         return self.finished
47     def get_storage_index(self):
48         return self.storage_index
49     def get_mode(self):
50         return self.mode
51     def get_servermap(self):
52         return self.servermap
53     def get_privkey_from(self):
54         return self.privkey_from
55     def using_helper(self):
56         return False
57     def get_size(self):
58         return "-NA-"
59     def get_status(self):
60         return self.status
61     def get_progress(self):
62         return self.progress
63     def get_active(self):
64         return self.active
65     def get_counter(self):
66         return self.counter
67
68     def set_storage_index(self, si):
69         self.storage_index = si
70     def set_mode(self, mode):
71         self.mode = mode
72     def set_privkey_from(self, peerid):
73         self.privkey_from = peerid
74     def set_status(self, status):
75         self.status = status
76     def set_progress(self, value):
77         self.progress = value
78     def set_active(self, value):
79         self.active = value
80     def set_finished(self, when):
81         self.finished = when
82
83 class ServerMap:
84     """I record the placement of mutable shares.
85
86     This object records which shares (of various versions) are located on
87     which servers.
88
89     One purpose I serve is to inform callers about which versions of the
90     mutable file are recoverable and 'current'.
91
92     A second purpose is to serve as a state marker for test-and-set
93     operations. I am passed out of retrieval operations and back into publish
94     operations, which means 'publish this new version, but only if nothing
95     has changed since I last retrieved this data'. This reduces the chances
96     of clobbering a simultaneous (uncoordinated) write.
97
98     @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
99                      (versionid, timestamp) tuple. Each 'versionid' is a
100                      tuple of (seqnum, root_hash, IV, segsize, datalength,
101                      k, N, signed_prefix, offsets)
102
103     @ivar connections: maps peerid to a RemoteReference
104
105     @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
106                       shares that I should ignore (because a previous user of
107                       the servermap determined that they were invalid). The
108                       updater only locates a certain number of shares: if
109                       some of these turn out to have integrity problems and
110                       are unusable, the caller will need to mark those shares
111                       as bad, then re-update the servermap, then try again.
112                       The dict maps (peerid, shnum) tuple to old checkstring.
113     """
114
115     def __init__(self):
116         self.servermap = {}
117         self.connections = {}
118         self.unreachable_peers = set() # peerids that didn't respond to queries
119         self.reachable_peers = set() # peerids that did respond to queries
120         self.problems = [] # mostly for debugging
121         self.bad_shares = {} # maps (peerid,shnum) to old checkstring
122         self.last_update_mode = None
123         self.last_update_time = 0
124         self.update_data = {} # (verinfo,shnum) => data
125
126     def copy(self):
127         s = ServerMap()
128         s.servermap = self.servermap.copy() # tuple->tuple
129         s.connections = self.connections.copy() # str->RemoteReference
130         s.unreachable_peers = set(self.unreachable_peers)
131         s.reachable_peers = set(self.reachable_peers)
132         s.problems = self.problems[:]
133         s.bad_shares = self.bad_shares.copy() # tuple->str
134         s.last_update_mode = self.last_update_mode
135         s.last_update_time = self.last_update_time
136         return s
137
138     def mark_bad_share(self, peerid, shnum, checkstring):
139         """This share was found to be bad, either in the checkstring or
140         signature (detected during mapupdate), or deeper in the share
141         (detected at retrieve time). Remove it from our list of useful
142         shares, and remember that it is bad so we don't add it back again
143         later. We record the share's old checkstring (which might be
144         corrupted or badly signed) so that a repair operation can do the
145         test-and-set using it as a reference.
146         """
147         key = (peerid, shnum) # record checkstring
148         self.bad_shares[key] = checkstring
149         self.servermap.pop(key, None)
150
151     def add_new_share(self, peerid, shnum, verinfo, timestamp):
152         """We've written a new share out, replacing any that was there
153         before."""
154         key = (peerid, shnum)
155         self.bad_shares.pop(key, None)
156         self.servermap[key] = (verinfo, timestamp)
157
158     def dump(self, out=sys.stdout):
159         print >>out, "servermap:"
160
161         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
162             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
163              offsets_tuple) = verinfo
164             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
165                           (idlib.shortnodeid_b2a(peerid), shnum,
166                            seqnum, base32.b2a(root_hash)[:4], k, N,
167                            datalength))
168         if self.problems:
169             print >>out, "%d PROBLEMS" % len(self.problems)
170             for f in self.problems:
171                 print >>out, str(f)
172         return out
173
174     def all_peers(self):
175         return set([peerid
176                     for (peerid, shnum)
177                     in self.servermap])
178
179     def all_peers_for_version(self, verinfo):
180         """Return a set of peerids that hold shares for the given version."""
181         return set([peerid
182                     for ( (peerid, shnum), (verinfo2, timestamp) )
183                     in self.servermap.items()
184                     if verinfo == verinfo2])
185
186     def make_sharemap(self):
187         """Return a dict that maps shnum to a set of peerds that hold it."""
188         sharemap = DictOfSets()
189         for (peerid, shnum) in self.servermap:
190             sharemap.add(shnum, peerid)
191         return sharemap
192
193     def make_versionmap(self):
194         """Return a dict that maps versionid to sets of (shnum, peerid,
195         timestamp) tuples."""
196         versionmap = DictOfSets()
197         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
198             versionmap.add(verinfo, (shnum, peerid, timestamp))
199         return versionmap
200
201     def shares_on_peer(self, peerid):
202         return set([shnum
203                     for (s_peerid, shnum)
204                     in self.servermap
205                     if s_peerid == peerid])
206
207     def version_on_peer(self, peerid, shnum):
208         key = (peerid, shnum)
209         if key in self.servermap:
210             (verinfo, timestamp) = self.servermap[key]
211             return verinfo
212         return None
213
214     def shares_available(self):
215         """Return a dict that maps verinfo to tuples of
216         (num_distinct_shares, k, N) tuples."""
217         versionmap = self.make_versionmap()
218         all_shares = {}
219         for verinfo, shares in versionmap.items():
220             s = set()
221             for (shnum, peerid, timestamp) in shares:
222                 s.add(shnum)
223             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
224              offsets_tuple) = verinfo
225             all_shares[verinfo] = (len(s), k, N)
226         return all_shares
227
228     def highest_seqnum(self):
229         available = self.shares_available()
230         seqnums = [verinfo[0]
231                    for verinfo in available.keys()]
232         seqnums.append(0)
233         return max(seqnums)
234
235     def summarize_version(self, verinfo):
236         """Take a versionid, return a string that describes it."""
237         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
238          offsets_tuple) = verinfo
239         return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
240
241     def summarize_versions(self):
242         """Return a string describing which versions we know about."""
243         versionmap = self.make_versionmap()
244         bits = []
245         for (verinfo, shares) in versionmap.items():
246             vstr = self.summarize_version(verinfo)
247             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
248             bits.append("%d*%s" % (len(shnums), vstr))
249         return "/".join(bits)
250
251     def recoverable_versions(self):
252         """Return a set of versionids, one for each version that is currently
253         recoverable."""
254         versionmap = self.make_versionmap()
255         recoverable_versions = set()
256         for (verinfo, shares) in versionmap.items():
257             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
258              offsets_tuple) = verinfo
259             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
260             if len(shnums) >= k:
261                 # this one is recoverable
262                 recoverable_versions.add(verinfo)
263
264         return recoverable_versions
265
266     def unrecoverable_versions(self):
267         """Return a set of versionids, one for each version that is currently
268         unrecoverable."""
269         versionmap = self.make_versionmap()
270
271         unrecoverable_versions = set()
272         for (verinfo, shares) in versionmap.items():
273             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
274              offsets_tuple) = verinfo
275             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
276             if len(shnums) < k:
277                 unrecoverable_versions.add(verinfo)
278
279         return unrecoverable_versions
280
281     def best_recoverable_version(self):
282         """Return a single versionid, for the so-called 'best' recoverable
283         version. Sequence number is the primary sort criteria, followed by
284         root hash. Returns None if there are no recoverable versions."""
285         recoverable = list(self.recoverable_versions())
286         recoverable.sort()
287         if recoverable:
288             return recoverable[-1]
289         return None
290
291     def size_of_version(self, verinfo):
292         """Given a versionid (perhaps returned by best_recoverable_version),
293         return the size of the file in bytes."""
294         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
295          offsets_tuple) = verinfo
296         return datalength
297
298     def unrecoverable_newer_versions(self):
299         # Return a dict of versionid -> health, for versions that are
300         # unrecoverable and have later seqnums than any recoverable versions.
301         # These indicate that a write will lose data.
302         versionmap = self.make_versionmap()
303         healths = {} # maps verinfo to (found,k)
304         unrecoverable = set()
305         highest_recoverable_seqnum = -1
306         for (verinfo, shares) in versionmap.items():
307             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
308              offsets_tuple) = verinfo
309             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
310             healths[verinfo] = (len(shnums),k)
311             if len(shnums) < k:
312                 unrecoverable.add(verinfo)
313             else:
314                 highest_recoverable_seqnum = max(seqnum,
315                                                  highest_recoverable_seqnum)
316
317         newversions = {}
318         for verinfo in unrecoverable:
319             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
320              offsets_tuple) = verinfo
321             if seqnum > highest_recoverable_seqnum:
322                 newversions[verinfo] = healths[verinfo]
323
324         return newversions
325
326
327     def needs_merge(self):
328         # return True if there are multiple recoverable versions with the
329         # same seqnum, meaning that MutableFileNode.read_best_version is not
330         # giving you the whole story, and that using its data to do a
331         # subsequent publish will lose information.
332         recoverable_seqnums = [verinfo[0]
333                                for verinfo in self.recoverable_versions()]
334         for seqnum in recoverable_seqnums:
335             if recoverable_seqnums.count(seqnum) > 1:
336                 return True
337         return False
338
339
340     def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
341         """
342         I return the update data for the given shnum
343         """
344         update_data = self.update_data[shnum]
345         update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
346         return update_datum
347
348
349     def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
350         """
351         I record the block hash tree for the given shnum.
352         """
353         self.update_data.setdefault(shnum , []).append((verinfo, data))
354
355
356 class ServermapUpdater:
357     def __init__(self, filenode, storage_broker, monitor, servermap,
358                  mode=MODE_READ, add_lease=False, update_range=None):
359         """I update a servermap, locating a sufficient number of useful
360         shares and remembering where they are located.
361
362         """
363
364         self._node = filenode
365         self._storage_broker = storage_broker
366         self._monitor = monitor
367         self._servermap = servermap
368         self.mode = mode
369         self._add_lease = add_lease
370         self._running = True
371
372         self._storage_index = filenode.get_storage_index()
373         self._last_failure = None
374
375         self._status = UpdateStatus()
376         self._status.set_storage_index(self._storage_index)
377         self._status.set_progress(0.0)
378         self._status.set_mode(mode)
379
380         self._servers_responded = set()
381
382         # how much data should we read?
383         # SDMF:
384         #  * if we only need the checkstring, then [0:75]
385         #  * if we need to validate the checkstring sig, then [543ish:799ish]
386         #  * if we need the verification key, then [107:436ish]
387         #   * the offset table at [75:107] tells us about the 'ish'
388         #  * if we need the encrypted private key, we want [-1216ish:]
389         #   * but we can't read from negative offsets
390         #   * the offset table tells us the 'ish', also the positive offset
391         # MDMF:
392         #  * Checkstring? [0:72]
393         #  * If we want to validate the checkstring, then [0:72], [143:?] --
394         #    the offset table will tell us for sure.
395         #  * If we need the verification key, we have to consult the offset
396         #    table as well.
397         # At this point, we don't know which we are. Our filenode can
398         # tell us, but it might be lying -- in some cases, we're
399         # responsible for telling it which kind of file it is.
400         self._read_size = 4000
401         if mode == MODE_CHECK:
402             # we use unpack_prefix_and_signature, so we need 1k
403             self._read_size = 1000
404         self._need_privkey = False
405
406         if mode == MODE_WRITE and not self._node.get_privkey():
407             self._need_privkey = True
408         # check+repair: repair requires the privkey, so if we didn't happen
409         # to ask for it during the check, we'll have problems doing the
410         # publish.
411
412         self.fetch_update_data = False
413         if mode == MODE_WRITE and update_range:
414             # We're updating the servermap in preparation for an
415             # in-place file update, so we need to fetch some additional
416             # data from each share that we find.
417             assert len(update_range) == 2
418
419             self.start_segment = update_range[0]
420             self.end_segment = update_range[1]
421             self.fetch_update_data = True
422
423         prefix = si_b2a(self._storage_index)[:5]
424         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
425                                    si=prefix, mode=mode)
426
427     def get_status(self):
428         return self._status
429
430     def log(self, *args, **kwargs):
431         if "parent" not in kwargs:
432             kwargs["parent"] = self._log_number
433         if "facility" not in kwargs:
434             kwargs["facility"] = "tahoe.mutable.mapupdate"
435         return log.msg(*args, **kwargs)
436
437     def update(self):
438         """Update the servermap to reflect current conditions. Returns a
439         Deferred that fires with the servermap once the update has finished."""
440         self._started = time.time()
441         self._status.set_active(True)
442
443         # self._valid_versions is a set of validated verinfo tuples. We just
444         # use it to remember which versions had valid signatures, so we can
445         # avoid re-checking the signatures for each share.
446         self._valid_versions = set()
447
448         # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
449         # timestamp) tuples. This is used to figure out which versions might
450         # be retrievable, and to make the eventual data download faster.
451         self.versionmap = DictOfSets()
452
453         self._done_deferred = defer.Deferred()
454
455         # first, which peers should be talk to? Any that were in our old
456         # servermap, plus "enough" others.
457
458         self._queries_completed = 0
459
460         sb = self._storage_broker
461         # All of the peers, permuted by the storage index, as usual.
462         full_peerlist = [(s.get_serverid(), s.get_rref())
463                          for s in sb.get_servers_for_psi(self._storage_index)]
464         self.full_peerlist = full_peerlist # for use later, immutable
465         self.extra_peers = full_peerlist[:] # peers are removed as we use them
466         self._good_peers = set() # peers who had some shares
467         self._empty_peers = set() # peers who don't have any shares
468         self._bad_peers = set() # peers to whom our queries failed
469         self._readers = {} # peerid -> dict(sharewriters), filled in
470                            # after responses come in.
471
472         k = self._node.get_required_shares()
473         # For what cases can these conditions work?
474         if k is None:
475             # make a guess
476             k = 3
477         N = self._node.get_total_shares()
478         if N is None:
479             N = 10
480         self.EPSILON = k
481         # we want to send queries to at least this many peers (although we
482         # might not wait for all of their answers to come back)
483         self.num_peers_to_query = k + self.EPSILON
484
485         if self.mode == MODE_CHECK:
486             # We want to query all of the peers.
487             initial_peers_to_query = dict(full_peerlist)
488             must_query = set(initial_peers_to_query.keys())
489             self.extra_peers = []
490         elif self.mode == MODE_WRITE:
491             # we're planning to replace all the shares, so we want a good
492             # chance of finding them all. We will keep searching until we've
493             # seen epsilon that don't have a share.
494             # We don't query all of the peers because that could take a while.
495             self.num_peers_to_query = N + self.EPSILON
496             initial_peers_to_query, must_query = self._build_initial_querylist()
497             self.required_num_empty_peers = self.EPSILON
498
499             # TODO: arrange to read lots of data from k-ish servers, to avoid
500             # the extra round trip required to read large directories. This
501             # might also avoid the round trip required to read the encrypted
502             # private key.
503
504         else: # MODE_READ, MODE_ANYTHING
505             # 2k peers is good enough.
506             initial_peers_to_query, must_query = self._build_initial_querylist()
507
508         # this is a set of peers that we are required to get responses from:
509         # they are peers who used to have a share, so we need to know where
510         # they currently stand, even if that means we have to wait for a
511         # silently-lost TCP connection to time out. We remove peers from this
512         # set as we get responses.
513         self._must_query = must_query
514
515         # now initial_peers_to_query contains the peers that we should ask,
516         # self.must_query contains the peers that we must have heard from
517         # before we can consider ourselves finished, and self.extra_peers
518         # contains the overflow (peers that we should tap if we don't get
519         # enough responses)
520         # I guess that self._must_query is a subset of
521         # initial_peers_to_query?
522         assert set(must_query).issubset(set(initial_peers_to_query))
523
524         self._send_initial_requests(initial_peers_to_query)
525         self._status.timings["initial_queries"] = time.time() - self._started
526         return self._done_deferred
527
528     def _build_initial_querylist(self):
529         initial_peers_to_query = {}
530         must_query = set()
531         for peerid in self._servermap.all_peers():
532             ss = self._servermap.connections[peerid]
533             # we send queries to everyone who was already in the sharemap
534             initial_peers_to_query[peerid] = ss
535             # and we must wait for responses from them
536             must_query.add(peerid)
537
538         while ((self.num_peers_to_query > len(initial_peers_to_query))
539                and self.extra_peers):
540             (peerid, ss) = self.extra_peers.pop(0)
541             initial_peers_to_query[peerid] = ss
542
543         return initial_peers_to_query, must_query
544
545     def _send_initial_requests(self, peerlist):
546         self._status.set_status("Sending %d initial queries" % len(peerlist))
547         self._queries_outstanding = set()
548         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
549         for (peerid, ss) in peerlist.items():
550             self._queries_outstanding.add(peerid)
551             self._do_query(ss, peerid, self._storage_index, self._read_size)
552
553         if not peerlist:
554             # there is nobody to ask, so we need to short-circuit the state
555             # machine.
556             d = defer.maybeDeferred(self._check_for_done, None)
557             d.addErrback(self._fatal_error)
558
559         # control flow beyond this point: state machine. Receiving responses
560         # from queries is the input. We might send out more queries, or we
561         # might produce a result.
562         return None
563
564     def _do_query(self, ss, peerid, storage_index, readsize):
565         self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
566                  peerid=idlib.shortnodeid_b2a(peerid),
567                  readsize=readsize,
568                  level=log.NOISY)
569         self._servermap.connections[peerid] = ss
570         started = time.time()
571         self._queries_outstanding.add(peerid)
572         d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
573         d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
574                       started)
575         d.addErrback(self._query_failed, peerid)
576         # errors that aren't handled by _query_failed (and errors caused by
577         # _query_failed) get logged, but we still want to check for doneness.
578         d.addErrback(log.err)
579         d.addErrback(self._fatal_error)
580         d.addCallback(self._check_for_done)
581         return d
582
583     def _do_read(self, ss, peerid, storage_index, shnums, readv):
584         if self._add_lease:
585             # send an add-lease message in parallel. The results are handled
586             # separately. This is sent before the slot_readv() so that we can
587             # be sure the add_lease is retired by the time slot_readv comes
588             # back (this relies upon our knowledge that the server code for
589             # add_lease is synchronous).
590             renew_secret = self._node.get_renewal_secret(peerid)
591             cancel_secret = self._node.get_cancel_secret(peerid)
592             d2 = ss.callRemote("add_lease", storage_index,
593                                renew_secret, cancel_secret)
594             # we ignore success
595             d2.addErrback(self._add_lease_failed, peerid, storage_index)
596         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
597         return d
598
599
600     def _got_corrupt_share(self, e, shnum, peerid, data, lp):
601         """
602         I am called when a remote server returns a corrupt share in
603         response to one of our queries. By corrupt, I mean a share
604         without a valid signature. I then record the failure, notify the
605         server of the corruption, and record the share as bad.
606         """
607         f = failure.Failure(e)
608         self.log(format="bad share: %(f_value)s", f_value=str(f),
609                  failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
610         # Notify the server that its share is corrupt.
611         self.notify_server_corruption(peerid, shnum, str(e))
612         # By flagging this as a bad peer, we won't count any of
613         # the other shares on that peer as valid, though if we
614         # happen to find a valid version string amongst those
615         # shares, we'll keep track of it so that we don't need
616         # to validate the signature on those again.
617         self._bad_peers.add(peerid)
618         self._last_failure = f
619         # XXX: Use the reader for this?
620         checkstring = data[:SIGNED_PREFIX_LENGTH]
621         self._servermap.mark_bad_share(peerid, shnum, checkstring)
622         self._servermap.problems.append(f)
623
624
625     def _cache_good_sharedata(self, verinfo, shnum, now, data):
626         """
627         If one of my queries returns successfully (which means that we
628         were able to and successfully did validate the signature), I
629         cache the data that we initially fetched from the storage
630         server. This will help reduce the number of roundtrips that need
631         to occur when the file is downloaded, or when the file is
632         updated.
633         """
634         if verinfo:
635             self._node._add_to_cache(verinfo, shnum, 0, data)
636
637
638     def _got_results(self, datavs, peerid, readsize, stuff, started):
639         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
640                       peerid=idlib.shortnodeid_b2a(peerid),
641                       numshares=len(datavs))
642         now = time.time()
643         elapsed = now - started
644         def _done_processing(ignored=None):
645             self._queries_outstanding.discard(peerid)
646             self._servermap.reachable_peers.add(peerid)
647             self._must_query.discard(peerid)
648             self._queries_completed += 1
649         if not self._running:
650             self.log("but we're not running, so we'll ignore it", parent=lp)
651             _done_processing()
652             self._status.add_per_server_time(peerid, "late", started, elapsed)
653             return
654         self._status.add_per_server_time(peerid, "query", started, elapsed)
655
656         if datavs:
657             self._good_peers.add(peerid)
658         else:
659             self._empty_peers.add(peerid)
660
661         ss, storage_index = stuff
662         ds = []
663
664         for shnum,datav in datavs.items():
665             data = datav[0]
666             reader = MDMFSlotReadProxy(ss,
667                                        storage_index,
668                                        shnum,
669                                        data)
670             self._readers.setdefault(peerid, dict())[shnum] = reader
671             # our goal, with each response, is to validate the version
672             # information and share data as best we can at this point --
673             # we do this by validating the signature. To do this, we
674             # need to do the following:
675             #   - If we don't already have the public key, fetch the
676             #     public key. We use this to validate the signature.
677             if not self._node.get_pubkey():
678                 # fetch and set the public key.
679                 d = reader.get_verification_key(queue=True)
680                 d.addCallback(lambda results, shnum=shnum, peerid=peerid:
681                     self._try_to_set_pubkey(results, peerid, shnum, lp))
682                 # XXX: Make self._pubkey_query_failed?
683                 d.addErrback(lambda error, shnum=shnum, peerid=peerid:
684                     self._got_corrupt_share(error, shnum, peerid, data, lp))
685             else:
686                 # we already have the public key.
687                 d = defer.succeed(None)
688
689             # Neither of these two branches return anything of
690             # consequence, so the first entry in our deferredlist will
691             # be None.
692
693             # - Next, we need the version information. We almost
694             #   certainly got this by reading the first thousand or so
695             #   bytes of the share on the storage server, so we
696             #   shouldn't need to fetch anything at this step.
697             d2 = reader.get_verinfo()
698             d2.addErrback(lambda error, shnum=shnum, peerid=peerid:
699                 self._got_corrupt_share(error, shnum, peerid, data, lp))
700             # - Next, we need the signature. For an SDMF share, it is
701             #   likely that we fetched this when doing our initial fetch
702             #   to get the version information. In MDMF, this lives at
703             #   the end of the share, so unless the file is quite small,
704             #   we'll need to do a remote fetch to get it.
705             d3 = reader.get_signature(queue=True)
706             d3.addErrback(lambda error, shnum=shnum, peerid=peerid:
707                 self._got_corrupt_share(error, shnum, peerid, data, lp))
708             #  Once we have all three of these responses, we can move on
709             #  to validating the signature
710
711             # Does the node already have a privkey? If not, we'll try to
712             # fetch it here.
713             if self._need_privkey:
714                 d4 = reader.get_encprivkey(queue=True)
715                 d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
716                     self._try_to_validate_privkey(results, peerid, shnum, lp))
717                 d4.addErrback(lambda error, shnum=shnum, peerid=peerid:
718                     self._privkey_query_failed(error, shnum, data, lp))
719             else:
720                 d4 = defer.succeed(None)
721
722
723             if self.fetch_update_data:
724                 # fetch the block hash tree and first + last segment, as
725                 # configured earlier.
726                 # Then set them in wherever we happen to want to set
727                 # them.
728                 ds = []
729                 # XXX: We do this above, too. Is there a good way to
730                 # make the two routines share the value without
731                 # introducing more roundtrips?
732                 ds.append(reader.get_verinfo())
733                 ds.append(reader.get_blockhashes(queue=True))
734                 ds.append(reader.get_block_and_salt(self.start_segment,
735                                                     queue=True))
736                 ds.append(reader.get_block_and_salt(self.end_segment,
737                                                     queue=True))
738                 d5 = deferredutil.gatherResults(ds)
739                 d5.addCallback(self._got_update_results_one_share, shnum)
740             else:
741                 d5 = defer.succeed(None)
742
743             dl = defer.DeferredList([d, d2, d3, d4, d5])
744             dl.addBoth(self._turn_barrier)
745             reader.flush()
746             dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
747                 self._got_signature_one_share(results, shnum, peerid, lp))
748             dl.addErrback(lambda error, shnum=shnum, data=data:
749                self._got_corrupt_share(error, shnum, peerid, data, lp))
750             dl.addCallback(lambda verinfo, shnum=shnum, peerid=peerid, data=data:
751                 self._cache_good_sharedata(verinfo, shnum, now, data))
752             ds.append(dl)
753         # dl is a deferred list that will fire when all of the shares
754         # that we found on this peer are done processing. When dl fires,
755         # we know that processing is done, so we can decrement the
756         # semaphore-like thing that we incremented earlier.
757         dl = defer.DeferredList(ds, fireOnOneErrback=True)
758         # Are we done? Done means that there are no more queries to
759         # send, that there are no outstanding queries, and that we
760         # haven't received any queries that are still processing. If we
761         # are done, self._check_for_done will cause the done deferred
762         # that we returned to our caller to fire, which tells them that
763         # they have a complete servermap, and that we won't be touching
764         # the servermap anymore.
765         dl.addCallback(_done_processing)
766         dl.addCallback(self._check_for_done)
767         dl.addErrback(self._fatal_error)
768         # all done!
769         self.log("_got_results done", parent=lp, level=log.NOISY)
770         return dl
771
772
773     def _turn_barrier(self, result):
774         """
775         I help the servermap updater avoid the recursion limit issues
776         discussed in #237.
777         """
778         return fireEventually(result)
779
780
781     def _try_to_set_pubkey(self, pubkey_s, peerid, shnum, lp):
782         if self._node.get_pubkey():
783             return # don't go through this again if we don't have to
784         fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
785         assert len(fingerprint) == 32
786         if fingerprint != self._node.get_fingerprint():
787             raise CorruptShareError(peerid, shnum,
788                                 "pubkey doesn't match fingerprint")
789         self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
790         assert self._node.get_pubkey()
791
792
793     def notify_server_corruption(self, peerid, shnum, reason):
794         ss = self._servermap.connections[peerid]
795         ss.callRemoteOnly("advise_corrupt_share",
796                           "mutable", self._storage_index, shnum, reason)
797
798
799     def _got_signature_one_share(self, results, shnum, peerid, lp):
800         # It is our job to give versioninfo to our caller. We need to
801         # raise CorruptShareError if the share is corrupt for any
802         # reason, something that our caller will handle.
803         self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
804                  shnum=shnum,
805                  peerid=idlib.shortnodeid_b2a(peerid),
806                  level=log.NOISY,
807                  parent=lp)
808         if not self._running:
809             # We can't process the results, since we can't touch the
810             # servermap anymore.
811             self.log("but we're not running anymore.")
812             return None
813
814         _, verinfo, signature, __, ___ = results
815         (seqnum,
816          root_hash,
817          saltish,
818          segsize,
819          datalen,
820          k,
821          n,
822          prefix,
823          offsets) = verinfo[1]
824         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
825
826         # XXX: This should be done for us in the method, so
827         # presumably you can go in there and fix it.
828         verinfo = (seqnum,
829                    root_hash,
830                    saltish,
831                    segsize,
832                    datalen,
833                    k,
834                    n,
835                    prefix,
836                    offsets_tuple)
837         # This tuple uniquely identifies a share on the grid; we use it
838         # to keep track of the ones that we've already seen.
839
840         if verinfo not in self._valid_versions:
841             # This is a new version tuple, and we need to validate it
842             # against the public key before keeping track of it.
843             assert self._node.get_pubkey()
844             valid = self._node.get_pubkey().verify(prefix, signature[1])
845             if not valid:
846                 raise CorruptShareError(peerid, shnum,
847                                         "signature is invalid")
848
849         # ok, it's a valid verinfo. Add it to the list of validated
850         # versions.
851         self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
852                  % (seqnum, base32.b2a(root_hash)[:4],
853                     idlib.shortnodeid_b2a(peerid), shnum,
854                     k, n, segsize, datalen),
855                     parent=lp)
856         self._valid_versions.add(verinfo)
857         # We now know that this is a valid candidate verinfo. Whether or
858         # not this instance of it is valid is a matter for the next
859         # statement; at this point, we just know that if we see this
860         # version info again, that its signature checks out and that
861         # we're okay to skip the signature-checking step.
862
863         # (peerid, shnum) are bound in the method invocation.
864         if (peerid, shnum) in self._servermap.bad_shares:
865             # we've been told that the rest of the data in this share is
866             # unusable, so don't add it to the servermap.
867             self.log("but we've been told this is a bad share",
868                      parent=lp, level=log.UNUSUAL)
869             return verinfo
870
871         # Add the info to our servermap.
872         timestamp = time.time()
873         self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
874         # and the versionmap
875         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
876
877         return verinfo
878
879
880     def _got_update_results_one_share(self, results, share):
881         """
882         I record the update results in results.
883         """
884         assert len(results) == 4
885         verinfo, blockhashes, start, end = results
886         (seqnum,
887          root_hash,
888          saltish,
889          segsize,
890          datalen,
891          k,
892          n,
893          prefix,
894          offsets) = verinfo
895         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
896
897         # XXX: This should be done for us in the method, so
898         # presumably you can go in there and fix it.
899         verinfo = (seqnum,
900                    root_hash,
901                    saltish,
902                    segsize,
903                    datalen,
904                    k,
905                    n,
906                    prefix,
907                    offsets_tuple)
908
909         update_data = (blockhashes, start, end)
910         self._servermap.set_update_data_for_share_and_verinfo(share,
911                                                               verinfo,
912                                                               update_data)
913
914
915     def _deserialize_pubkey(self, pubkey_s):
916         verifier = rsa.create_verifying_key_from_string(pubkey_s)
917         return verifier
918
919
920     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
921         """
922         Given a writekey from a remote server, I validate it against the
923         writekey stored in my node. If it is valid, then I set the
924         privkey and encprivkey properties of the node.
925         """
926         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
927         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
928         if alleged_writekey != self._node.get_writekey():
929             self.log("invalid privkey from %s shnum %d" %
930                      (idlib.nodeid_b2a(peerid)[:8], shnum),
931                      parent=lp, level=log.WEIRD, umid="aJVccw")
932             return
933
934         # it's good
935         self.log("got valid privkey from shnum %d on peerid %s" %
936                  (shnum, idlib.shortnodeid_b2a(peerid)),
937                  parent=lp)
938         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
939         self._node._populate_encprivkey(enc_privkey)
940         self._node._populate_privkey(privkey)
941         self._need_privkey = False
942         self._status.set_privkey_from(peerid)
943
944
945     def _add_lease_failed(self, f, peerid, storage_index):
946         # Older versions of Tahoe didn't handle the add-lease message very
947         # well: <=1.1.0 throws a NameError because it doesn't implement
948         # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
949         # (which is most of them, since we send add-lease to everybody,
950         # before we know whether or not they have any shares for us), and
951         # 1.2.0 throws KeyError even on known buckets due to an internal bug
952         # in the latency-measuring code.
953
954         # we want to ignore the known-harmless errors and log the others. In
955         # particular we want to log any local errors caused by coding
956         # problems.
957
958         if f.check(DeadReferenceError):
959             return
960         if f.check(RemoteException):
961             if f.value.failure.check(KeyError, IndexError, NameError):
962                 # this may ignore a bit too much, but that only hurts us
963                 # during debugging
964                 return
965             self.log(format="error in add_lease from [%(peerid)s]: %(f_value)s",
966                      peerid=idlib.shortnodeid_b2a(peerid),
967                      f_value=str(f.value),
968                      failure=f,
969                      level=log.WEIRD, umid="iqg3mw")
970             return
971         # local errors are cause for alarm
972         log.err(f,
973                 format="local error in add_lease to [%(peerid)s]: %(f_value)s",
974                 peerid=idlib.shortnodeid_b2a(peerid),
975                 f_value=str(f.value),
976                 level=log.WEIRD, umid="ZWh6HA")
977
978     def _query_failed(self, f, peerid):
979         if not self._running:
980             return
981         level = log.WEIRD
982         if f.check(DeadReferenceError):
983             level = log.UNUSUAL
984         self.log(format="error during query: %(f_value)s",
985                  f_value=str(f.value), failure=f,
986                  level=level, umid="IHXuQg")
987         self._must_query.discard(peerid)
988         self._queries_outstanding.discard(peerid)
989         self._bad_peers.add(peerid)
990         self._servermap.problems.append(f)
991         # a peerid could be in both ServerMap.reachable_peers and
992         # .unreachable_peers if they responded to our query, but then an
993         # exception was raised in _got_results.
994         self._servermap.unreachable_peers.add(peerid)
995         self._queries_completed += 1
996         self._last_failure = f
997
998
999     def _privkey_query_failed(self, f, peerid, shnum, lp):
1000         self._queries_outstanding.discard(peerid)
1001         if not self._running:
1002             return
1003         level = log.WEIRD
1004         if f.check(DeadReferenceError):
1005             level = log.UNUSUAL
1006         self.log(format="error during privkey query: %(f_value)s",
1007                  f_value=str(f.value), failure=f,
1008                  parent=lp, level=level, umid="McoJ5w")
1009         self._servermap.problems.append(f)
1010         self._last_failure = f
1011
1012
1013     def _check_for_done(self, res):
1014         # exit paths:
1015         #  return self._send_more_queries(outstanding) : send some more queries
1016         #  return self._done() : all done
1017         #  return : keep waiting, no new queries
1018         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
1019                               "%(outstanding)d queries outstanding, "
1020                               "%(extra)d extra peers available, "
1021                               "%(must)d 'must query' peers left, "
1022                               "need_privkey=%(need_privkey)s"
1023                               ),
1024                       mode=self.mode,
1025                       outstanding=len(self._queries_outstanding),
1026                       extra=len(self.extra_peers),
1027                       must=len(self._must_query),
1028                       need_privkey=self._need_privkey,
1029                       level=log.NOISY,
1030                       )
1031
1032         if not self._running:
1033             self.log("but we're not running", parent=lp, level=log.NOISY)
1034             return
1035
1036         if self._must_query:
1037             # we are still waiting for responses from peers that used to have
1038             # a share, so we must continue to wait. No additional queries are
1039             # required at this time.
1040             self.log("%d 'must query' peers left" % len(self._must_query),
1041                      level=log.NOISY, parent=lp)
1042             return
1043
1044         if (not self._queries_outstanding and not self.extra_peers):
1045             # all queries have retired, and we have no peers left to ask. No
1046             # more progress can be made, therefore we are done.
1047             self.log("all queries are retired, no extra peers: done",
1048                      parent=lp)
1049             return self._done()
1050
1051         recoverable_versions = self._servermap.recoverable_versions()
1052         unrecoverable_versions = self._servermap.unrecoverable_versions()
1053
1054         # what is our completion policy? how hard should we work?
1055
1056         if self.mode == MODE_ANYTHING:
1057             if recoverable_versions:
1058                 self.log("%d recoverable versions: done"
1059                          % len(recoverable_versions),
1060                          parent=lp)
1061                 return self._done()
1062
1063         if self.mode == MODE_CHECK:
1064             # we used self._must_query, and we know there aren't any
1065             # responses still waiting, so that means we must be done
1066             self.log("done", parent=lp)
1067             return self._done()
1068
1069         MAX_IN_FLIGHT = 5
1070         if self.mode == MODE_READ:
1071             # if we've queried k+epsilon servers, and we see a recoverable
1072             # version, and we haven't seen any unrecoverable higher-seqnum'ed
1073             # versions, then we're done.
1074
1075             if self._queries_completed < self.num_peers_to_query:
1076                 self.log(format="%(completed)d completed, %(query)d to query: need more",
1077                          completed=self._queries_completed,
1078                          query=self.num_peers_to_query,
1079                          level=log.NOISY, parent=lp)
1080                 return self._send_more_queries(MAX_IN_FLIGHT)
1081             if not recoverable_versions:
1082                 self.log("no recoverable versions: need more",
1083                          level=log.NOISY, parent=lp)
1084                 return self._send_more_queries(MAX_IN_FLIGHT)
1085             highest_recoverable = max(recoverable_versions)
1086             highest_recoverable_seqnum = highest_recoverable[0]
1087             for unrec_verinfo in unrecoverable_versions:
1088                 if unrec_verinfo[0] > highest_recoverable_seqnum:
1089                     # there is evidence of a higher-seqnum version, but we
1090                     # don't yet see enough shares to recover it. Try harder.
1091                     # TODO: consider sending more queries.
1092                     # TODO: consider limiting the search distance
1093                     self.log("evidence of higher seqnum: need more",
1094                              level=log.UNUSUAL, parent=lp)
1095                     return self._send_more_queries(MAX_IN_FLIGHT)
1096             # all the unrecoverable versions were old or concurrent with a
1097             # recoverable version. Good enough.
1098             self.log("no higher-seqnum: done", parent=lp)
1099             return self._done()
1100
1101         if self.mode == MODE_WRITE:
1102             # we want to keep querying until we've seen a few that don't have
1103             # any shares, to be sufficiently confident that we've seen all
1104             # the shares. This is still less work than MODE_CHECK, which asks
1105             # every server in the world.
1106
1107             if not recoverable_versions:
1108                 self.log("no recoverable versions: need more", parent=lp,
1109                          level=log.NOISY)
1110                 return self._send_more_queries(MAX_IN_FLIGHT)
1111
1112             last_found = -1
1113             last_not_responded = -1
1114             num_not_responded = 0
1115             num_not_found = 0
1116             states = []
1117             found_boundary = False
1118
1119             for i,(peerid,ss) in enumerate(self.full_peerlist):
1120                 if peerid in self._bad_peers:
1121                     # query failed
1122                     states.append("x")
1123                     #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
1124                 elif peerid in self._empty_peers:
1125                     # no shares
1126                     states.append("0")
1127                     #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
1128                     if last_found != -1:
1129                         num_not_found += 1
1130                         if num_not_found >= self.EPSILON:
1131                             self.log("found our boundary, %s" %
1132                                      "".join(states),
1133                                      parent=lp, level=log.NOISY)
1134                             found_boundary = True
1135                             break
1136
1137                 elif peerid in self._good_peers:
1138                     # yes shares
1139                     states.append("1")
1140                     #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
1141                     last_found = i
1142                     num_not_found = 0
1143                 else:
1144                     # not responded yet
1145                     states.append("?")
1146                     #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
1147                     last_not_responded = i
1148                     num_not_responded += 1
1149
1150             if found_boundary:
1151                 # we need to know that we've gotten answers from
1152                 # everybody to the left of here
1153                 if last_not_responded == -1:
1154                     # we're done
1155                     self.log("have all our answers",
1156                              parent=lp, level=log.NOISY)
1157                     # .. unless we're still waiting on the privkey
1158                     if self._need_privkey:
1159                         self.log("but we're still waiting for the privkey",
1160                                  parent=lp, level=log.NOISY)
1161                         # if we found the boundary but we haven't yet found
1162                         # the privkey, we may need to look further. If
1163                         # somehow all the privkeys were corrupted (but the
1164                         # shares were readable), then this is likely to do an
1165                         # exhaustive search.
1166                         return self._send_more_queries(MAX_IN_FLIGHT)
1167                     return self._done()
1168                 # still waiting for somebody
1169                 return self._send_more_queries(num_not_responded)
1170
1171             # if we hit here, we didn't find our boundary, so we're still
1172             # waiting for peers
1173             self.log("no boundary yet, %s" % "".join(states), parent=lp,
1174                      level=log.NOISY)
1175             return self._send_more_queries(MAX_IN_FLIGHT)
1176
1177         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
1178         # arbitrary, really I want this to be something like k -
1179         # max(known_version_sharecounts) + some extra
1180         self.log("catchall: need more", parent=lp, level=log.NOISY)
1181         return self._send_more_queries(MAX_IN_FLIGHT)
1182
1183     def _send_more_queries(self, num_outstanding):
1184         more_queries = []
1185
1186         while True:
1187             self.log(format=" there are %(outstanding)d queries outstanding",
1188                      outstanding=len(self._queries_outstanding),
1189                      level=log.NOISY)
1190             active_queries = len(self._queries_outstanding) + len(more_queries)
1191             if active_queries >= num_outstanding:
1192                 break
1193             if not self.extra_peers:
1194                 break
1195             more_queries.append(self.extra_peers.pop(0))
1196
1197         self.log(format="sending %(more)d more queries: %(who)s",
1198                  more=len(more_queries),
1199                  who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
1200                                for (peerid,ss) in more_queries]),
1201                  level=log.NOISY)
1202
1203         for (peerid, ss) in more_queries:
1204             self._do_query(ss, peerid, self._storage_index, self._read_size)
1205             # we'll retrigger when those queries come back
1206
1207     def _done(self):
1208         if not self._running:
1209             self.log("not running; we're already done")
1210             return
1211         self._running = False
1212         now = time.time()
1213         elapsed = now - self._started
1214         self._status.set_finished(now)
1215         self._status.timings["total"] = elapsed
1216         self._status.set_progress(1.0)
1217         self._status.set_status("Finished")
1218         self._status.set_active(False)
1219
1220         self._servermap.last_update_mode = self.mode
1221         self._servermap.last_update_time = self._started
1222         # the servermap will not be touched after this
1223         self.log("servermap: %s" % self._servermap.summarize_versions())
1224
1225         eventually(self._done_deferred.callback, self._servermap)
1226
1227     def _fatal_error(self, f):
1228         self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1229         self._done_deferred.errback(f)
1230
1231