]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
MDMFSlotReadProxy: remove the queue
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually, \
8                          fireEventually
9 from allmydata.util import base32, hashutil, idlib, log, deferredutil
10 from allmydata.util.dictutil import DictOfSets
11 from allmydata.storage.server import si_b2a
12 from allmydata.interfaces import IServermapUpdaterStatus
13 from pycryptopp.publickey import rsa
14
15 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
16      CorruptShareError
17 from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
18
19 class UpdateStatus:
20     implements(IServermapUpdaterStatus)
21     statusid_counter = count(0)
22     def __init__(self):
23         self.timings = {}
24         self.timings["per_server"] = {}
25         self.timings["cumulative_verify"] = 0.0
26         self.privkey_from = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.mode = "?"
31         self.status = "Not started"
32         self.progress = 0.0
33         self.counter = self.statusid_counter.next()
34         self.started = time.time()
35         self.finished = None
36
37     def add_per_server_time(self, peerid, op, sent, elapsed):
38         assert op in ("query", "late", "privkey")
39         if peerid not in self.timings["per_server"]:
40             self.timings["per_server"][peerid] = []
41         self.timings["per_server"][peerid].append((op,sent,elapsed))
42
43     def get_started(self):
44         return self.started
45     def get_finished(self):
46         return self.finished
47     def get_storage_index(self):
48         return self.storage_index
49     def get_mode(self):
50         return self.mode
51     def get_servermap(self):
52         return self.servermap
53     def get_privkey_from(self):
54         return self.privkey_from
55     def using_helper(self):
56         return False
57     def get_size(self):
58         return "-NA-"
59     def get_status(self):
60         return self.status
61     def get_progress(self):
62         return self.progress
63     def get_active(self):
64         return self.active
65     def get_counter(self):
66         return self.counter
67
68     def set_storage_index(self, si):
69         self.storage_index = si
70     def set_mode(self, mode):
71         self.mode = mode
72     def set_privkey_from(self, peerid):
73         self.privkey_from = peerid
74     def set_status(self, status):
75         self.status = status
76     def set_progress(self, value):
77         self.progress = value
78     def set_active(self, value):
79         self.active = value
80     def set_finished(self, when):
81         self.finished = when
82
83 class ServerMap:
84     """I record the placement of mutable shares.
85
86     This object records which shares (of various versions) are located on
87     which servers.
88
89     One purpose I serve is to inform callers about which versions of the
90     mutable file are recoverable and 'current'.
91
92     A second purpose is to serve as a state marker for test-and-set
93     operations. I am passed out of retrieval operations and back into publish
94     operations, which means 'publish this new version, but only if nothing
95     has changed since I last retrieved this data'. This reduces the chances
96     of clobbering a simultaneous (uncoordinated) write.
97
98     @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
99                      (versionid, timestamp) tuple. Each 'versionid' is a
100                      tuple of (seqnum, root_hash, IV, segsize, datalength,
101                      k, N, signed_prefix, offsets)
102
103     @ivar connections: maps peerid to a RemoteReference
104
105     @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
106                       shares that I should ignore (because a previous user of
107                       the servermap determined that they were invalid). The
108                       updater only locates a certain number of shares: if
109                       some of these turn out to have integrity problems and
110                       are unusable, the caller will need to mark those shares
111                       as bad, then re-update the servermap, then try again.
112                       The dict maps (peerid, shnum) tuple to old checkstring.
113     """
114
115     def __init__(self):
116         self.servermap = {}
117         self.connections = {}
118         self.unreachable_peers = set() # peerids that didn't respond to queries
119         self.reachable_peers = set() # peerids that did respond to queries
120         self.problems = [] # mostly for debugging
121         self.bad_shares = {} # maps (peerid,shnum) to old checkstring
122         self.last_update_mode = None
123         self.last_update_time = 0
124         self.update_data = {} # (verinfo,shnum) => data
125
126     def copy(self):
127         s = ServerMap()
128         s.servermap = self.servermap.copy() # tuple->tuple
129         s.connections = self.connections.copy() # str->RemoteReference
130         s.unreachable_peers = set(self.unreachable_peers)
131         s.reachable_peers = set(self.reachable_peers)
132         s.problems = self.problems[:]
133         s.bad_shares = self.bad_shares.copy() # tuple->str
134         s.last_update_mode = self.last_update_mode
135         s.last_update_time = self.last_update_time
136         return s
137
138     def mark_bad_share(self, peerid, shnum, checkstring):
139         """This share was found to be bad, either in the checkstring or
140         signature (detected during mapupdate), or deeper in the share
141         (detected at retrieve time). Remove it from our list of useful
142         shares, and remember that it is bad so we don't add it back again
143         later. We record the share's old checkstring (which might be
144         corrupted or badly signed) so that a repair operation can do the
145         test-and-set using it as a reference.
146         """
147         key = (peerid, shnum) # record checkstring
148         self.bad_shares[key] = checkstring
149         self.servermap.pop(key, None)
150
151     def add_new_share(self, peerid, shnum, verinfo, timestamp):
152         """We've written a new share out, replacing any that was there
153         before."""
154         key = (peerid, shnum)
155         self.bad_shares.pop(key, None)
156         self.servermap[key] = (verinfo, timestamp)
157
158     def dump(self, out=sys.stdout):
159         print >>out, "servermap:"
160
161         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
162             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
163              offsets_tuple) = verinfo
164             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
165                           (idlib.shortnodeid_b2a(peerid), shnum,
166                            seqnum, base32.b2a(root_hash)[:4], k, N,
167                            datalength))
168         if self.problems:
169             print >>out, "%d PROBLEMS" % len(self.problems)
170             for f in self.problems:
171                 print >>out, str(f)
172         return out
173
174     def all_peers(self):
175         return set([peerid
176                     for (peerid, shnum)
177                     in self.servermap])
178
179     def all_peers_for_version(self, verinfo):
180         """Return a set of peerids that hold shares for the given version."""
181         return set([peerid
182                     for ( (peerid, shnum), (verinfo2, timestamp) )
183                     in self.servermap.items()
184                     if verinfo == verinfo2])
185
186     def make_sharemap(self):
187         """Return a dict that maps shnum to a set of peerds that hold it."""
188         sharemap = DictOfSets()
189         for (peerid, shnum) in self.servermap:
190             sharemap.add(shnum, peerid)
191         return sharemap
192
193     def make_versionmap(self):
194         """Return a dict that maps versionid to sets of (shnum, peerid,
195         timestamp) tuples."""
196         versionmap = DictOfSets()
197         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
198             versionmap.add(verinfo, (shnum, peerid, timestamp))
199         return versionmap
200
201     def shares_on_peer(self, peerid):
202         return set([shnum
203                     for (s_peerid, shnum)
204                     in self.servermap
205                     if s_peerid == peerid])
206
207     def version_on_peer(self, peerid, shnum):
208         key = (peerid, shnum)
209         if key in self.servermap:
210             (verinfo, timestamp) = self.servermap[key]
211             return verinfo
212         return None
213
214     def shares_available(self):
215         """Return a dict that maps verinfo to tuples of
216         (num_distinct_shares, k, N) tuples."""
217         versionmap = self.make_versionmap()
218         all_shares = {}
219         for verinfo, shares in versionmap.items():
220             s = set()
221             for (shnum, peerid, timestamp) in shares:
222                 s.add(shnum)
223             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
224              offsets_tuple) = verinfo
225             all_shares[verinfo] = (len(s), k, N)
226         return all_shares
227
228     def highest_seqnum(self):
229         available = self.shares_available()
230         seqnums = [verinfo[0]
231                    for verinfo in available.keys()]
232         seqnums.append(0)
233         return max(seqnums)
234
235     def summarize_version(self, verinfo):
236         """Take a versionid, return a string that describes it."""
237         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
238          offsets_tuple) = verinfo
239         return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
240
241     def summarize_versions(self):
242         """Return a string describing which versions we know about."""
243         versionmap = self.make_versionmap()
244         bits = []
245         for (verinfo, shares) in versionmap.items():
246             vstr = self.summarize_version(verinfo)
247             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
248             bits.append("%d*%s" % (len(shnums), vstr))
249         return "/".join(bits)
250
251     def recoverable_versions(self):
252         """Return a set of versionids, one for each version that is currently
253         recoverable."""
254         versionmap = self.make_versionmap()
255         recoverable_versions = set()
256         for (verinfo, shares) in versionmap.items():
257             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
258              offsets_tuple) = verinfo
259             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
260             if len(shnums) >= k:
261                 # this one is recoverable
262                 recoverable_versions.add(verinfo)
263
264         return recoverable_versions
265
266     def unrecoverable_versions(self):
267         """Return a set of versionids, one for each version that is currently
268         unrecoverable."""
269         versionmap = self.make_versionmap()
270
271         unrecoverable_versions = set()
272         for (verinfo, shares) in versionmap.items():
273             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
274              offsets_tuple) = verinfo
275             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
276             if len(shnums) < k:
277                 unrecoverable_versions.add(verinfo)
278
279         return unrecoverable_versions
280
281     def best_recoverable_version(self):
282         """Return a single versionid, for the so-called 'best' recoverable
283         version. Sequence number is the primary sort criteria, followed by
284         root hash. Returns None if there are no recoverable versions."""
285         recoverable = list(self.recoverable_versions())
286         recoverable.sort()
287         if recoverable:
288             return recoverable[-1]
289         return None
290
291     def size_of_version(self, verinfo):
292         """Given a versionid (perhaps returned by best_recoverable_version),
293         return the size of the file in bytes."""
294         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
295          offsets_tuple) = verinfo
296         return datalength
297
298     def unrecoverable_newer_versions(self):
299         # Return a dict of versionid -> health, for versions that are
300         # unrecoverable and have later seqnums than any recoverable versions.
301         # These indicate that a write will lose data.
302         versionmap = self.make_versionmap()
303         healths = {} # maps verinfo to (found,k)
304         unrecoverable = set()
305         highest_recoverable_seqnum = -1
306         for (verinfo, shares) in versionmap.items():
307             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
308              offsets_tuple) = verinfo
309             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
310             healths[verinfo] = (len(shnums),k)
311             if len(shnums) < k:
312                 unrecoverable.add(verinfo)
313             else:
314                 highest_recoverable_seqnum = max(seqnum,
315                                                  highest_recoverable_seqnum)
316
317         newversions = {}
318         for verinfo in unrecoverable:
319             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
320              offsets_tuple) = verinfo
321             if seqnum > highest_recoverable_seqnum:
322                 newversions[verinfo] = healths[verinfo]
323
324         return newversions
325
326
327     def needs_merge(self):
328         # return True if there are multiple recoverable versions with the
329         # same seqnum, meaning that MutableFileNode.read_best_version is not
330         # giving you the whole story, and that using its data to do a
331         # subsequent publish will lose information.
332         recoverable_seqnums = [verinfo[0]
333                                for verinfo in self.recoverable_versions()]
334         for seqnum in recoverable_seqnums:
335             if recoverable_seqnums.count(seqnum) > 1:
336                 return True
337         return False
338
339
340     def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
341         """
342         I return the update data for the given shnum
343         """
344         update_data = self.update_data[shnum]
345         update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
346         return update_datum
347
348
349     def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
350         """
351         I record the block hash tree for the given shnum.
352         """
353         self.update_data.setdefault(shnum , []).append((verinfo, data))
354
355
356 class ServermapUpdater:
357     def __init__(self, filenode, storage_broker, monitor, servermap,
358                  mode=MODE_READ, add_lease=False, update_range=None):
359         """I update a servermap, locating a sufficient number of useful
360         shares and remembering where they are located.
361
362         """
363
364         self._node = filenode
365         self._storage_broker = storage_broker
366         self._monitor = monitor
367         self._servermap = servermap
368         self.mode = mode
369         self._add_lease = add_lease
370         self._running = True
371
372         self._storage_index = filenode.get_storage_index()
373         self._last_failure = None
374
375         self._status = UpdateStatus()
376         self._status.set_storage_index(self._storage_index)
377         self._status.set_progress(0.0)
378         self._status.set_mode(mode)
379
380         self._servers_responded = set()
381
382         # how much data should we read?
383         # SDMF:
384         #  * if we only need the checkstring, then [0:75]
385         #  * if we need to validate the checkstring sig, then [543ish:799ish]
386         #  * if we need the verification key, then [107:436ish]
387         #   * the offset table at [75:107] tells us about the 'ish'
388         #  * if we need the encrypted private key, we want [-1216ish:]
389         #   * but we can't read from negative offsets
390         #   * the offset table tells us the 'ish', also the positive offset
391         # MDMF:
392         #  * Checkstring? [0:72]
393         #  * If we want to validate the checkstring, then [0:72], [143:?] --
394         #    the offset table will tell us for sure.
395         #  * If we need the verification key, we have to consult the offset
396         #    table as well.
397         # At this point, we don't know which we are. Our filenode can
398         # tell us, but it might be lying -- in some cases, we're
399         # responsible for telling it which kind of file it is.
400         self._read_size = 4000
401         if mode == MODE_CHECK:
402             # we use unpack_prefix_and_signature, so we need 1k
403             self._read_size = 1000
404         self._need_privkey = False
405
406         if mode == MODE_WRITE and not self._node.get_privkey():
407             self._need_privkey = True
408         # check+repair: repair requires the privkey, so if we didn't happen
409         # to ask for it during the check, we'll have problems doing the
410         # publish.
411
412         self.fetch_update_data = False
413         if mode == MODE_WRITE and update_range:
414             # We're updating the servermap in preparation for an
415             # in-place file update, so we need to fetch some additional
416             # data from each share that we find.
417             assert len(update_range) == 2
418
419             self.start_segment = update_range[0]
420             self.end_segment = update_range[1]
421             self.fetch_update_data = True
422
423         prefix = si_b2a(self._storage_index)[:5]
424         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
425                                    si=prefix, mode=mode)
426
427     def get_status(self):
428         return self._status
429
430     def log(self, *args, **kwargs):
431         if "parent" not in kwargs:
432             kwargs["parent"] = self._log_number
433         if "facility" not in kwargs:
434             kwargs["facility"] = "tahoe.mutable.mapupdate"
435         return log.msg(*args, **kwargs)
436
437     def update(self):
438         """Update the servermap to reflect current conditions. Returns a
439         Deferred that fires with the servermap once the update has finished."""
440         self._started = time.time()
441         self._status.set_active(True)
442
443         # self._valid_versions is a set of validated verinfo tuples. We just
444         # use it to remember which versions had valid signatures, so we can
445         # avoid re-checking the signatures for each share.
446         self._valid_versions = set()
447
448         # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
449         # timestamp) tuples. This is used to figure out which versions might
450         # be retrievable, and to make the eventual data download faster.
451         self.versionmap = DictOfSets()
452
453         self._done_deferred = defer.Deferred()
454
455         # first, which peers should be talk to? Any that were in our old
456         # servermap, plus "enough" others.
457
458         self._queries_completed = 0
459
460         sb = self._storage_broker
461         # All of the peers, permuted by the storage index, as usual.
462         full_peerlist = [(s.get_serverid(), s.get_rref())
463                          for s in sb.get_servers_for_psi(self._storage_index)]
464         self.full_peerlist = full_peerlist # for use later, immutable
465         self.extra_peers = full_peerlist[:] # peers are removed as we use them
466         self._good_peers = set() # peers who had some shares
467         self._empty_peers = set() # peers who don't have any shares
468         self._bad_peers = set() # peers to whom our queries failed
469         self._readers = {} # peerid -> dict(sharewriters), filled in
470                            # after responses come in.
471
472         k = self._node.get_required_shares()
473         # For what cases can these conditions work?
474         if k is None:
475             # make a guess
476             k = 3
477         N = self._node.get_total_shares()
478         if N is None:
479             N = 10
480         self.EPSILON = k
481         # we want to send queries to at least this many peers (although we
482         # might not wait for all of their answers to come back)
483         self.num_peers_to_query = k + self.EPSILON
484
485         if self.mode == MODE_CHECK:
486             # We want to query all of the peers.
487             initial_peers_to_query = dict(full_peerlist)
488             must_query = set(initial_peers_to_query.keys())
489             self.extra_peers = []
490         elif self.mode == MODE_WRITE:
491             # we're planning to replace all the shares, so we want a good
492             # chance of finding them all. We will keep searching until we've
493             # seen epsilon that don't have a share.
494             # We don't query all of the peers because that could take a while.
495             self.num_peers_to_query = N + self.EPSILON
496             initial_peers_to_query, must_query = self._build_initial_querylist()
497             self.required_num_empty_peers = self.EPSILON
498
499             # TODO: arrange to read lots of data from k-ish servers, to avoid
500             # the extra round trip required to read large directories. This
501             # might also avoid the round trip required to read the encrypted
502             # private key.
503
504         else: # MODE_READ, MODE_ANYTHING
505             # 2k peers is good enough.
506             initial_peers_to_query, must_query = self._build_initial_querylist()
507
508         # this is a set of peers that we are required to get responses from:
509         # they are peers who used to have a share, so we need to know where
510         # they currently stand, even if that means we have to wait for a
511         # silently-lost TCP connection to time out. We remove peers from this
512         # set as we get responses.
513         self._must_query = must_query
514
515         # now initial_peers_to_query contains the peers that we should ask,
516         # self.must_query contains the peers that we must have heard from
517         # before we can consider ourselves finished, and self.extra_peers
518         # contains the overflow (peers that we should tap if we don't get
519         # enough responses)
520         # I guess that self._must_query is a subset of
521         # initial_peers_to_query?
522         assert set(must_query).issubset(set(initial_peers_to_query))
523
524         self._send_initial_requests(initial_peers_to_query)
525         self._status.timings["initial_queries"] = time.time() - self._started
526         return self._done_deferred
527
528     def _build_initial_querylist(self):
529         initial_peers_to_query = {}
530         must_query = set()
531         for peerid in self._servermap.all_peers():
532             ss = self._servermap.connections[peerid]
533             # we send queries to everyone who was already in the sharemap
534             initial_peers_to_query[peerid] = ss
535             # and we must wait for responses from them
536             must_query.add(peerid)
537
538         while ((self.num_peers_to_query > len(initial_peers_to_query))
539                and self.extra_peers):
540             (peerid, ss) = self.extra_peers.pop(0)
541             initial_peers_to_query[peerid] = ss
542
543         return initial_peers_to_query, must_query
544
545     def _send_initial_requests(self, peerlist):
546         self._status.set_status("Sending %d initial queries" % len(peerlist))
547         self._queries_outstanding = set()
548         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
549         for (peerid, ss) in peerlist.items():
550             self._queries_outstanding.add(peerid)
551             self._do_query(ss, peerid, self._storage_index, self._read_size)
552
553         if not peerlist:
554             # there is nobody to ask, so we need to short-circuit the state
555             # machine.
556             d = defer.maybeDeferred(self._check_for_done, None)
557             d.addErrback(self._fatal_error)
558
559         # control flow beyond this point: state machine. Receiving responses
560         # from queries is the input. We might send out more queries, or we
561         # might produce a result.
562         return None
563
564     def _do_query(self, ss, peerid, storage_index, readsize):
565         self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
566                  peerid=idlib.shortnodeid_b2a(peerid),
567                  readsize=readsize,
568                  level=log.NOISY)
569         self._servermap.connections[peerid] = ss
570         started = time.time()
571         self._queries_outstanding.add(peerid)
572         d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
573         d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
574                       started)
575         d.addErrback(self._query_failed, peerid)
576         # errors that aren't handled by _query_failed (and errors caused by
577         # _query_failed) get logged, but we still want to check for doneness.
578         d.addErrback(log.err)
579         d.addErrback(self._fatal_error)
580         d.addCallback(self._check_for_done)
581         return d
582
583     def _do_read(self, ss, peerid, storage_index, shnums, readv):
584         if self._add_lease:
585             # send an add-lease message in parallel. The results are handled
586             # separately. This is sent before the slot_readv() so that we can
587             # be sure the add_lease is retired by the time slot_readv comes
588             # back (this relies upon our knowledge that the server code for
589             # add_lease is synchronous).
590             renew_secret = self._node.get_renewal_secret(peerid)
591             cancel_secret = self._node.get_cancel_secret(peerid)
592             d2 = ss.callRemote("add_lease", storage_index,
593                                renew_secret, cancel_secret)
594             # we ignore success
595             d2.addErrback(self._add_lease_failed, peerid, storage_index)
596         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
597         return d
598
599
600     def _got_corrupt_share(self, e, shnum, peerid, data, lp):
601         """
602         I am called when a remote server returns a corrupt share in
603         response to one of our queries. By corrupt, I mean a share
604         without a valid signature. I then record the failure, notify the
605         server of the corruption, and record the share as bad.
606         """
607         f = failure.Failure(e)
608         self.log(format="bad share: %(f_value)s", f_value=str(f),
609                  failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
610         # Notify the server that its share is corrupt.
611         self.notify_server_corruption(peerid, shnum, str(e))
612         # By flagging this as a bad peer, we won't count any of
613         # the other shares on that peer as valid, though if we
614         # happen to find a valid version string amongst those
615         # shares, we'll keep track of it so that we don't need
616         # to validate the signature on those again.
617         self._bad_peers.add(peerid)
618         self._last_failure = f
619         # XXX: Use the reader for this?
620         checkstring = data[:SIGNED_PREFIX_LENGTH]
621         self._servermap.mark_bad_share(peerid, shnum, checkstring)
622         self._servermap.problems.append(f)
623
624
625     def _cache_good_sharedata(self, verinfo, shnum, now, data):
626         """
627         If one of my queries returns successfully (which means that we
628         were able to and successfully did validate the signature), I
629         cache the data that we initially fetched from the storage
630         server. This will help reduce the number of roundtrips that need
631         to occur when the file is downloaded, or when the file is
632         updated.
633         """
634         if verinfo:
635             self._node._add_to_cache(verinfo, shnum, 0, data)
636
637
638     def _got_results(self, datavs, peerid, readsize, stuff, started):
639         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
640                       peerid=idlib.shortnodeid_b2a(peerid),
641                       numshares=len(datavs))
642         now = time.time()
643         elapsed = now - started
644         def _done_processing(ignored=None):
645             self._queries_outstanding.discard(peerid)
646             self._servermap.reachable_peers.add(peerid)
647             self._must_query.discard(peerid)
648             self._queries_completed += 1
649         if not self._running:
650             self.log("but we're not running, so we'll ignore it", parent=lp)
651             _done_processing()
652             self._status.add_per_server_time(peerid, "late", started, elapsed)
653             return
654         self._status.add_per_server_time(peerid, "query", started, elapsed)
655
656         if datavs:
657             self._good_peers.add(peerid)
658         else:
659             self._empty_peers.add(peerid)
660
661         ss, storage_index = stuff
662         ds = []
663
664         for shnum,datav in datavs.items():
665             data = datav[0]
666             reader = MDMFSlotReadProxy(ss,
667                                        storage_index,
668                                        shnum,
669                                        data)
670             self._readers.setdefault(peerid, dict())[shnum] = reader
671             # our goal, with each response, is to validate the version
672             # information and share data as best we can at this point --
673             # we do this by validating the signature. To do this, we
674             # need to do the following:
675             #   - If we don't already have the public key, fetch the
676             #     public key. We use this to validate the signature.
677             if not self._node.get_pubkey():
678                 # fetch and set the public key.
679                 d = reader.get_verification_key()
680                 d.addCallback(lambda results, shnum=shnum, peerid=peerid:
681                     self._try_to_set_pubkey(results, peerid, shnum, lp))
682                 # XXX: Make self._pubkey_query_failed?
683                 d.addErrback(lambda error, shnum=shnum, peerid=peerid:
684                     self._got_corrupt_share(error, shnum, peerid, data, lp))
685             else:
686                 # we already have the public key.
687                 d = defer.succeed(None)
688
689             # Neither of these two branches return anything of
690             # consequence, so the first entry in our deferredlist will
691             # be None.
692
693             # - Next, we need the version information. We almost
694             #   certainly got this by reading the first thousand or so
695             #   bytes of the share on the storage server, so we
696             #   shouldn't need to fetch anything at this step.
697             d2 = reader.get_verinfo()
698             d2.addErrback(lambda error, shnum=shnum, peerid=peerid:
699                 self._got_corrupt_share(error, shnum, peerid, data, lp))
700             # - Next, we need the signature. For an SDMF share, it is
701             #   likely that we fetched this when doing our initial fetch
702             #   to get the version information. In MDMF, this lives at
703             #   the end of the share, so unless the file is quite small,
704             #   we'll need to do a remote fetch to get it.
705             d3 = reader.get_signature()
706             d3.addErrback(lambda error, shnum=shnum, peerid=peerid:
707                 self._got_corrupt_share(error, shnum, peerid, data, lp))
708             #  Once we have all three of these responses, we can move on
709             #  to validating the signature
710
711             # Does the node already have a privkey? If not, we'll try to
712             # fetch it here.
713             if self._need_privkey:
714                 d4 = reader.get_encprivkey()
715                 d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
716                     self._try_to_validate_privkey(results, peerid, shnum, lp))
717                 d4.addErrback(lambda error, shnum=shnum, peerid=peerid:
718                     self._privkey_query_failed(error, shnum, data, lp))
719             else:
720                 d4 = defer.succeed(None)
721
722
723             if self.fetch_update_data:
724                 # fetch the block hash tree and first + last segment, as
725                 # configured earlier.
726                 # Then set them in wherever we happen to want to set
727                 # them.
728                 ds = []
729                 # XXX: We do this above, too. Is there a good way to
730                 # make the two routines share the value without
731                 # introducing more roundtrips?
732                 ds.append(reader.get_verinfo())
733                 ds.append(reader.get_blockhashes())
734                 ds.append(reader.get_block_and_salt(self.start_segment))
735                 ds.append(reader.get_block_and_salt(self.end_segment))
736                 d5 = deferredutil.gatherResults(ds)
737                 d5.addCallback(self._got_update_results_one_share, shnum)
738             else:
739                 d5 = defer.succeed(None)
740
741             dl = defer.DeferredList([d, d2, d3, d4, d5])
742             dl.addBoth(self._turn_barrier)
743             dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
744                 self._got_signature_one_share(results, shnum, peerid, lp))
745             dl.addErrback(lambda error, shnum=shnum, data=data:
746                self._got_corrupt_share(error, shnum, peerid, data, lp))
747             dl.addCallback(lambda verinfo, shnum=shnum, peerid=peerid, data=data:
748                 self._cache_good_sharedata(verinfo, shnum, now, data))
749             ds.append(dl)
750         # dl is a deferred list that will fire when all of the shares
751         # that we found on this peer are done processing. When dl fires,
752         # we know that processing is done, so we can decrement the
753         # semaphore-like thing that we incremented earlier.
754         dl = defer.DeferredList(ds, fireOnOneErrback=True)
755         # Are we done? Done means that there are no more queries to
756         # send, that there are no outstanding queries, and that we
757         # haven't received any queries that are still processing. If we
758         # are done, self._check_for_done will cause the done deferred
759         # that we returned to our caller to fire, which tells them that
760         # they have a complete servermap, and that we won't be touching
761         # the servermap anymore.
762         dl.addCallback(_done_processing)
763         dl.addCallback(self._check_for_done)
764         dl.addErrback(self._fatal_error)
765         # all done!
766         self.log("_got_results done", parent=lp, level=log.NOISY)
767         return dl
768
769
770     def _turn_barrier(self, result):
771         """
772         I help the servermap updater avoid the recursion limit issues
773         discussed in #237.
774         """
775         return fireEventually(result)
776
777
778     def _try_to_set_pubkey(self, pubkey_s, peerid, shnum, lp):
779         if self._node.get_pubkey():
780             return # don't go through this again if we don't have to
781         fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
782         assert len(fingerprint) == 32
783         if fingerprint != self._node.get_fingerprint():
784             raise CorruptShareError(peerid, shnum,
785                                 "pubkey doesn't match fingerprint")
786         self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
787         assert self._node.get_pubkey()
788
789
790     def notify_server_corruption(self, peerid, shnum, reason):
791         ss = self._servermap.connections[peerid]
792         ss.callRemoteOnly("advise_corrupt_share",
793                           "mutable", self._storage_index, shnum, reason)
794
795
796     def _got_signature_one_share(self, results, shnum, peerid, lp):
797         # It is our job to give versioninfo to our caller. We need to
798         # raise CorruptShareError if the share is corrupt for any
799         # reason, something that our caller will handle.
800         self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
801                  shnum=shnum,
802                  peerid=idlib.shortnodeid_b2a(peerid),
803                  level=log.NOISY,
804                  parent=lp)
805         if not self._running:
806             # We can't process the results, since we can't touch the
807             # servermap anymore.
808             self.log("but we're not running anymore.")
809             return None
810
811         _, verinfo, signature, __, ___ = results
812         (seqnum,
813          root_hash,
814          saltish,
815          segsize,
816          datalen,
817          k,
818          n,
819          prefix,
820          offsets) = verinfo[1]
821         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
822
823         # XXX: This should be done for us in the method, so
824         # presumably you can go in there and fix it.
825         verinfo = (seqnum,
826                    root_hash,
827                    saltish,
828                    segsize,
829                    datalen,
830                    k,
831                    n,
832                    prefix,
833                    offsets_tuple)
834         # This tuple uniquely identifies a share on the grid; we use it
835         # to keep track of the ones that we've already seen.
836
837         if verinfo not in self._valid_versions:
838             # This is a new version tuple, and we need to validate it
839             # against the public key before keeping track of it.
840             assert self._node.get_pubkey()
841             valid = self._node.get_pubkey().verify(prefix, signature[1])
842             if not valid:
843                 raise CorruptShareError(peerid, shnum,
844                                         "signature is invalid")
845
846         # ok, it's a valid verinfo. Add it to the list of validated
847         # versions.
848         self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
849                  % (seqnum, base32.b2a(root_hash)[:4],
850                     idlib.shortnodeid_b2a(peerid), shnum,
851                     k, n, segsize, datalen),
852                     parent=lp)
853         self._valid_versions.add(verinfo)
854         # We now know that this is a valid candidate verinfo. Whether or
855         # not this instance of it is valid is a matter for the next
856         # statement; at this point, we just know that if we see this
857         # version info again, that its signature checks out and that
858         # we're okay to skip the signature-checking step.
859
860         # (peerid, shnum) are bound in the method invocation.
861         if (peerid, shnum) in self._servermap.bad_shares:
862             # we've been told that the rest of the data in this share is
863             # unusable, so don't add it to the servermap.
864             self.log("but we've been told this is a bad share",
865                      parent=lp, level=log.UNUSUAL)
866             return verinfo
867
868         # Add the info to our servermap.
869         timestamp = time.time()
870         self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
871         # and the versionmap
872         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
873
874         return verinfo
875
876
877     def _got_update_results_one_share(self, results, share):
878         """
879         I record the update results in results.
880         """
881         assert len(results) == 4
882         verinfo, blockhashes, start, end = results
883         (seqnum,
884          root_hash,
885          saltish,
886          segsize,
887          datalen,
888          k,
889          n,
890          prefix,
891          offsets) = verinfo
892         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
893
894         # XXX: This should be done for us in the method, so
895         # presumably you can go in there and fix it.
896         verinfo = (seqnum,
897                    root_hash,
898                    saltish,
899                    segsize,
900                    datalen,
901                    k,
902                    n,
903                    prefix,
904                    offsets_tuple)
905
906         update_data = (blockhashes, start, end)
907         self._servermap.set_update_data_for_share_and_verinfo(share,
908                                                               verinfo,
909                                                               update_data)
910
911
912     def _deserialize_pubkey(self, pubkey_s):
913         verifier = rsa.create_verifying_key_from_string(pubkey_s)
914         return verifier
915
916
917     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
918         """
919         Given a writekey from a remote server, I validate it against the
920         writekey stored in my node. If it is valid, then I set the
921         privkey and encprivkey properties of the node.
922         """
923         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
924         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
925         if alleged_writekey != self._node.get_writekey():
926             self.log("invalid privkey from %s shnum %d" %
927                      (idlib.nodeid_b2a(peerid)[:8], shnum),
928                      parent=lp, level=log.WEIRD, umid="aJVccw")
929             return
930
931         # it's good
932         self.log("got valid privkey from shnum %d on peerid %s" %
933                  (shnum, idlib.shortnodeid_b2a(peerid)),
934                  parent=lp)
935         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
936         self._node._populate_encprivkey(enc_privkey)
937         self._node._populate_privkey(privkey)
938         self._need_privkey = False
939         self._status.set_privkey_from(peerid)
940
941
942     def _add_lease_failed(self, f, peerid, storage_index):
943         # Older versions of Tahoe didn't handle the add-lease message very
944         # well: <=1.1.0 throws a NameError because it doesn't implement
945         # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
946         # (which is most of them, since we send add-lease to everybody,
947         # before we know whether or not they have any shares for us), and
948         # 1.2.0 throws KeyError even on known buckets due to an internal bug
949         # in the latency-measuring code.
950
951         # we want to ignore the known-harmless errors and log the others. In
952         # particular we want to log any local errors caused by coding
953         # problems.
954
955         if f.check(DeadReferenceError):
956             return
957         if f.check(RemoteException):
958             if f.value.failure.check(KeyError, IndexError, NameError):
959                 # this may ignore a bit too much, but that only hurts us
960                 # during debugging
961                 return
962             self.log(format="error in add_lease from [%(peerid)s]: %(f_value)s",
963                      peerid=idlib.shortnodeid_b2a(peerid),
964                      f_value=str(f.value),
965                      failure=f,
966                      level=log.WEIRD, umid="iqg3mw")
967             return
968         # local errors are cause for alarm
969         log.err(f,
970                 format="local error in add_lease to [%(peerid)s]: %(f_value)s",
971                 peerid=idlib.shortnodeid_b2a(peerid),
972                 f_value=str(f.value),
973                 level=log.WEIRD, umid="ZWh6HA")
974
975     def _query_failed(self, f, peerid):
976         if not self._running:
977             return
978         level = log.WEIRD
979         if f.check(DeadReferenceError):
980             level = log.UNUSUAL
981         self.log(format="error during query: %(f_value)s",
982                  f_value=str(f.value), failure=f,
983                  level=level, umid="IHXuQg")
984         self._must_query.discard(peerid)
985         self._queries_outstanding.discard(peerid)
986         self._bad_peers.add(peerid)
987         self._servermap.problems.append(f)
988         # a peerid could be in both ServerMap.reachable_peers and
989         # .unreachable_peers if they responded to our query, but then an
990         # exception was raised in _got_results.
991         self._servermap.unreachable_peers.add(peerid)
992         self._queries_completed += 1
993         self._last_failure = f
994
995
996     def _privkey_query_failed(self, f, peerid, shnum, lp):
997         self._queries_outstanding.discard(peerid)
998         if not self._running:
999             return
1000         level = log.WEIRD
1001         if f.check(DeadReferenceError):
1002             level = log.UNUSUAL
1003         self.log(format="error during privkey query: %(f_value)s",
1004                  f_value=str(f.value), failure=f,
1005                  parent=lp, level=level, umid="McoJ5w")
1006         self._servermap.problems.append(f)
1007         self._last_failure = f
1008
1009
1010     def _check_for_done(self, res):
1011         # exit paths:
1012         #  return self._send_more_queries(outstanding) : send some more queries
1013         #  return self._done() : all done
1014         #  return : keep waiting, no new queries
1015         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
1016                               "%(outstanding)d queries outstanding, "
1017                               "%(extra)d extra peers available, "
1018                               "%(must)d 'must query' peers left, "
1019                               "need_privkey=%(need_privkey)s"
1020                               ),
1021                       mode=self.mode,
1022                       outstanding=len(self._queries_outstanding),
1023                       extra=len(self.extra_peers),
1024                       must=len(self._must_query),
1025                       need_privkey=self._need_privkey,
1026                       level=log.NOISY,
1027                       )
1028
1029         if not self._running:
1030             self.log("but we're not running", parent=lp, level=log.NOISY)
1031             return
1032
1033         if self._must_query:
1034             # we are still waiting for responses from peers that used to have
1035             # a share, so we must continue to wait. No additional queries are
1036             # required at this time.
1037             self.log("%d 'must query' peers left" % len(self._must_query),
1038                      level=log.NOISY, parent=lp)
1039             return
1040
1041         if (not self._queries_outstanding and not self.extra_peers):
1042             # all queries have retired, and we have no peers left to ask. No
1043             # more progress can be made, therefore we are done.
1044             self.log("all queries are retired, no extra peers: done",
1045                      parent=lp)
1046             return self._done()
1047
1048         recoverable_versions = self._servermap.recoverable_versions()
1049         unrecoverable_versions = self._servermap.unrecoverable_versions()
1050
1051         # what is our completion policy? how hard should we work?
1052
1053         if self.mode == MODE_ANYTHING:
1054             if recoverable_versions:
1055                 self.log("%d recoverable versions: done"
1056                          % len(recoverable_versions),
1057                          parent=lp)
1058                 return self._done()
1059
1060         if self.mode == MODE_CHECK:
1061             # we used self._must_query, and we know there aren't any
1062             # responses still waiting, so that means we must be done
1063             self.log("done", parent=lp)
1064             return self._done()
1065
1066         MAX_IN_FLIGHT = 5
1067         if self.mode == MODE_READ:
1068             # if we've queried k+epsilon servers, and we see a recoverable
1069             # version, and we haven't seen any unrecoverable higher-seqnum'ed
1070             # versions, then we're done.
1071
1072             if self._queries_completed < self.num_peers_to_query:
1073                 self.log(format="%(completed)d completed, %(query)d to query: need more",
1074                          completed=self._queries_completed,
1075                          query=self.num_peers_to_query,
1076                          level=log.NOISY, parent=lp)
1077                 return self._send_more_queries(MAX_IN_FLIGHT)
1078             if not recoverable_versions:
1079                 self.log("no recoverable versions: need more",
1080                          level=log.NOISY, parent=lp)
1081                 return self._send_more_queries(MAX_IN_FLIGHT)
1082             highest_recoverable = max(recoverable_versions)
1083             highest_recoverable_seqnum = highest_recoverable[0]
1084             for unrec_verinfo in unrecoverable_versions:
1085                 if unrec_verinfo[0] > highest_recoverable_seqnum:
1086                     # there is evidence of a higher-seqnum version, but we
1087                     # don't yet see enough shares to recover it. Try harder.
1088                     # TODO: consider sending more queries.
1089                     # TODO: consider limiting the search distance
1090                     self.log("evidence of higher seqnum: need more",
1091                              level=log.UNUSUAL, parent=lp)
1092                     return self._send_more_queries(MAX_IN_FLIGHT)
1093             # all the unrecoverable versions were old or concurrent with a
1094             # recoverable version. Good enough.
1095             self.log("no higher-seqnum: done", parent=lp)
1096             return self._done()
1097
1098         if self.mode == MODE_WRITE:
1099             # we want to keep querying until we've seen a few that don't have
1100             # any shares, to be sufficiently confident that we've seen all
1101             # the shares. This is still less work than MODE_CHECK, which asks
1102             # every server in the world.
1103
1104             if not recoverable_versions:
1105                 self.log("no recoverable versions: need more", parent=lp,
1106                          level=log.NOISY)
1107                 return self._send_more_queries(MAX_IN_FLIGHT)
1108
1109             last_found = -1
1110             last_not_responded = -1
1111             num_not_responded = 0
1112             num_not_found = 0
1113             states = []
1114             found_boundary = False
1115
1116             for i,(peerid,ss) in enumerate(self.full_peerlist):
1117                 if peerid in self._bad_peers:
1118                     # query failed
1119                     states.append("x")
1120                     #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
1121                 elif peerid in self._empty_peers:
1122                     # no shares
1123                     states.append("0")
1124                     #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
1125                     if last_found != -1:
1126                         num_not_found += 1
1127                         if num_not_found >= self.EPSILON:
1128                             self.log("found our boundary, %s" %
1129                                      "".join(states),
1130                                      parent=lp, level=log.NOISY)
1131                             found_boundary = True
1132                             break
1133
1134                 elif peerid in self._good_peers:
1135                     # yes shares
1136                     states.append("1")
1137                     #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
1138                     last_found = i
1139                     num_not_found = 0
1140                 else:
1141                     # not responded yet
1142                     states.append("?")
1143                     #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
1144                     last_not_responded = i
1145                     num_not_responded += 1
1146
1147             if found_boundary:
1148                 # we need to know that we've gotten answers from
1149                 # everybody to the left of here
1150                 if last_not_responded == -1:
1151                     # we're done
1152                     self.log("have all our answers",
1153                              parent=lp, level=log.NOISY)
1154                     # .. unless we're still waiting on the privkey
1155                     if self._need_privkey:
1156                         self.log("but we're still waiting for the privkey",
1157                                  parent=lp, level=log.NOISY)
1158                         # if we found the boundary but we haven't yet found
1159                         # the privkey, we may need to look further. If
1160                         # somehow all the privkeys were corrupted (but the
1161                         # shares were readable), then this is likely to do an
1162                         # exhaustive search.
1163                         return self._send_more_queries(MAX_IN_FLIGHT)
1164                     return self._done()
1165                 # still waiting for somebody
1166                 return self._send_more_queries(num_not_responded)
1167
1168             # if we hit here, we didn't find our boundary, so we're still
1169             # waiting for peers
1170             self.log("no boundary yet, %s" % "".join(states), parent=lp,
1171                      level=log.NOISY)
1172             return self._send_more_queries(MAX_IN_FLIGHT)
1173
1174         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
1175         # arbitrary, really I want this to be something like k -
1176         # max(known_version_sharecounts) + some extra
1177         self.log("catchall: need more", parent=lp, level=log.NOISY)
1178         return self._send_more_queries(MAX_IN_FLIGHT)
1179
1180     def _send_more_queries(self, num_outstanding):
1181         more_queries = []
1182
1183         while True:
1184             self.log(format=" there are %(outstanding)d queries outstanding",
1185                      outstanding=len(self._queries_outstanding),
1186                      level=log.NOISY)
1187             active_queries = len(self._queries_outstanding) + len(more_queries)
1188             if active_queries >= num_outstanding:
1189                 break
1190             if not self.extra_peers:
1191                 break
1192             more_queries.append(self.extra_peers.pop(0))
1193
1194         self.log(format="sending %(more)d more queries: %(who)s",
1195                  more=len(more_queries),
1196                  who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
1197                                for (peerid,ss) in more_queries]),
1198                  level=log.NOISY)
1199
1200         for (peerid, ss) in more_queries:
1201             self._do_query(ss, peerid, self._storage_index, self._read_size)
1202             # we'll retrigger when those queries come back
1203
1204     def _done(self):
1205         if not self._running:
1206             self.log("not running; we're already done")
1207             return
1208         self._running = False
1209         now = time.time()
1210         elapsed = now - self._started
1211         self._status.set_finished(now)
1212         self._status.timings["total"] = elapsed
1213         self._status.set_progress(1.0)
1214         self._status.set_status("Finished")
1215         self._status.set_active(False)
1216
1217         self._servermap.last_update_mode = self.mode
1218         self._servermap.last_update_time = self._started
1219         # the servermap will not be touched after this
1220         self.log("servermap: %s" % self._servermap.summarize_versions())
1221
1222         eventually(self._done_deferred.callback, self._servermap)
1223
1224     def _fatal_error(self, f):
1225         self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1226         self._done_deferred.errback(f)
1227
1228