]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
IServer refactoring: pass IServer instances around, instead of peerids
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually, \
8                          fireEventually
9 from allmydata.util import base32, hashutil, log, deferredutil
10 from allmydata.util.dictutil import DictOfSets
11 from allmydata.storage.server import si_b2a
12 from allmydata.interfaces import IServermapUpdaterStatus
13 from pycryptopp.publickey import rsa
14
15 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
16      CorruptShareError
17 from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
18
19 class UpdateStatus:
20     implements(IServermapUpdaterStatus)
21     statusid_counter = count(0)
22     def __init__(self):
23         self.timings = {}
24         self.timings["per_server"] = {}
25         self.timings["cumulative_verify"] = 0.0
26         self.privkey_from = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.mode = "?"
31         self.status = "Not started"
32         self.progress = 0.0
33         self.counter = self.statusid_counter.next()
34         self.started = time.time()
35         self.finished = None
36
37     def add_per_server_time(self, server, op, sent, elapsed):
38         serverid = server.get_serverid()
39         assert op in ("query", "late", "privkey")
40         if serverid not in self.timings["per_server"]:
41             self.timings["per_server"][serverid] = []
42         self.timings["per_server"][serverid].append((op,sent,elapsed))
43
44     def get_started(self):
45         return self.started
46     def get_finished(self):
47         return self.finished
48     def get_storage_index(self):
49         return self.storage_index
50     def get_mode(self):
51         return self.mode
52     def get_servermap(self):
53         return self.servermap
54     def get_privkey_from(self):
55         return self.privkey_from
56     def using_helper(self):
57         return False
58     def get_size(self):
59         return "-NA-"
60     def get_status(self):
61         return self.status
62     def get_progress(self):
63         return self.progress
64     def get_active(self):
65         return self.active
66     def get_counter(self):
67         return self.counter
68
69     def set_storage_index(self, si):
70         self.storage_index = si
71     def set_mode(self, mode):
72         self.mode = mode
73     def set_privkey_from(self, server):
74         self.privkey_from = server.get_serverid()
75     def set_status(self, status):
76         self.status = status
77     def set_progress(self, value):
78         self.progress = value
79     def set_active(self, value):
80         self.active = value
81     def set_finished(self, when):
82         self.finished = when
83
84 class ServerMap:
85     """I record the placement of mutable shares.
86
87     This object records which shares (of various versions) are located on
88     which servers.
89
90     One purpose I serve is to inform callers about which versions of the
91     mutable file are recoverable and 'current'.
92
93     A second purpose is to serve as a state marker for test-and-set
94     operations. I am passed out of retrieval operations and back into publish
95     operations, which means 'publish this new version, but only if nothing
96     has changed since I last retrieved this data'. This reduces the chances
97     of clobbering a simultaneous (uncoordinated) write.
98
99     @var _known_shares: a dictionary, mapping a (server, shnum) tuple to a
100                         (versionid, timestamp) tuple. Each 'versionid' is a
101                         tuple of (seqnum, root_hash, IV, segsize, datalength,
102                         k, N, signed_prefix, offsets)
103
104     @ivar _bad_shares: dict with keys of (server, shnum) tuples, describing
105                        shares that I should ignore (because a previous user
106                        of the servermap determined that they were invalid).
107                        The updater only locates a certain number of shares:
108                        if some of these turn out to have integrity problems
109                        and are unusable, the caller will need to mark those
110                        shares as bad, then re-update the servermap, then try
111                        again. The dict maps (server, shnum) tuple to old
112                        checkstring.
113     """
114
115     def __init__(self):
116         self._known_shares = {}
117         self.unreachable_servers = set() # servers that didn't respond to queries
118         self.reachable_servers = set() # servers that did respond to queries
119         self._problems = [] # mostly for debugging
120         self._bad_shares = {} # maps (server,shnum) to old checkstring
121         self._last_update_mode = None
122         self._last_update_time = 0
123         self.update_data = {} # (verinfo,shnum) => data
124
125     def copy(self):
126         s = ServerMap()
127         s._known_shares = self._known_shares.copy() # tuple->tuple
128         s.unreachable_servers = set(self.unreachable_servers)
129         s.reachable_servers = set(self.reachable_servers)
130         s._problems = self._problems[:]
131         s._bad_shares = self._bad_shares.copy() # tuple->str
132         s._last_update_mode = self._last_update_mode
133         s._last_update_time = self._last_update_time
134         return s
135
136     def get_reachable_servers(self):
137         return self.reachable_servers
138
139     def mark_server_reachable(self, server):
140         self.reachable_servers.add(server)
141
142     def mark_server_unreachable(self, server):
143         self.unreachable_servers.add(server)
144
145     def mark_bad_share(self, server, shnum, checkstring):
146         """This share was found to be bad, either in the checkstring or
147         signature (detected during mapupdate), or deeper in the share
148         (detected at retrieve time). Remove it from our list of useful
149         shares, and remember that it is bad so we don't add it back again
150         later. We record the share's old checkstring (which might be
151         corrupted or badly signed) so that a repair operation can do the
152         test-and-set using it as a reference.
153         """
154         key = (server, shnum) # record checkstring
155         self._bad_shares[key] = checkstring
156         self._known_shares.pop(key, None)
157
158     def get_bad_shares(self):
159         # key=(server,shnum) -> checkstring
160         return self._bad_shares
161
162     def add_new_share(self, server, shnum, verinfo, timestamp):
163         """We've written a new share out, replacing any that was there
164         before."""
165         key = (server, shnum)
166         self._bad_shares.pop(key, None)
167         self._known_shares[key] = (verinfo, timestamp)
168
169     def add_problem(self, f):
170         self._problems.append(f)
171     def get_problems(self):
172         return self._problems
173
174     def set_last_update(self, mode, when):
175         self._last_update_mode = mode
176         self._last_update_time = when
177     def get_last_update(self):
178         return (self._last_update_mode, self._last_update_time)
179
180     def dump(self, out=sys.stdout):
181         print >>out, "servermap:"
182
183         for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
184             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
185              offsets_tuple) = verinfo
186             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
187                           (server.get_name(), shnum,
188                            seqnum, base32.b2a(root_hash)[:4], k, N,
189                            datalength))
190         if self._problems:
191             print >>out, "%d PROBLEMS" % len(self._problems)
192             for f in self._problems:
193                 print >>out, str(f)
194         return out
195
196     def all_servers(self):
197         return set([server for (server, shnum) in self._known_shares])
198
199     def all_servers_for_version(self, verinfo):
200         """Return a set of servers that hold shares for the given version."""
201         return set([server
202                     for ( (server, shnum), (verinfo2, timestamp) )
203                     in self._known_shares.items()
204                     if verinfo == verinfo2])
205
206     def get_known_shares(self):
207         # maps (server,shnum) to (versionid,timestamp)
208         return self._known_shares
209
210     def make_sharemap(self):
211         """Return a dict that maps shnum to a set of servers that hold it."""
212         sharemap = DictOfSets()
213         for (server, shnum) in self._known_shares:
214             sharemap.add(shnum, server)
215         return sharemap
216
217     def make_versionmap(self):
218         """Return a dict that maps versionid to sets of (shnum, server,
219         timestamp) tuples."""
220         versionmap = DictOfSets()
221         for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
222             versionmap.add(verinfo, (shnum, server, timestamp))
223         return versionmap
224
225     def debug_shares_on_server(self, server): # used by tests
226         return set([shnum for (s, shnum) in self._known_shares if s == server])
227
228     def version_on_server(self, server, shnum):
229         key = (server, shnum)
230         if key in self._known_shares:
231             (verinfo, timestamp) = self._known_shares[key]
232             return verinfo
233         return None
234
235     def shares_available(self):
236         """Return a dict that maps verinfo to tuples of
237         (num_distinct_shares, k, N) tuples."""
238         versionmap = self.make_versionmap()
239         all_shares = {}
240         for verinfo, shares in versionmap.items():
241             s = set()
242             for (shnum, server, timestamp) in shares:
243                 s.add(shnum)
244             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
245              offsets_tuple) = verinfo
246             all_shares[verinfo] = (len(s), k, N)
247         return all_shares
248
249     def highest_seqnum(self):
250         available = self.shares_available()
251         seqnums = [verinfo[0]
252                    for verinfo in available.keys()]
253         seqnums.append(0)
254         return max(seqnums)
255
256     def summarize_version(self, verinfo):
257         """Take a versionid, return a string that describes it."""
258         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
259          offsets_tuple) = verinfo
260         return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
261
262     def summarize_versions(self):
263         """Return a string describing which versions we know about."""
264         versionmap = self.make_versionmap()
265         bits = []
266         for (verinfo, shares) in versionmap.items():
267             vstr = self.summarize_version(verinfo)
268             shnums = set([shnum for (shnum, server, timestamp) in shares])
269             bits.append("%d*%s" % (len(shnums), vstr))
270         return "/".join(bits)
271
272     def recoverable_versions(self):
273         """Return a set of versionids, one for each version that is currently
274         recoverable."""
275         versionmap = self.make_versionmap()
276         recoverable_versions = set()
277         for (verinfo, shares) in versionmap.items():
278             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
279              offsets_tuple) = verinfo
280             shnums = set([shnum for (shnum, server, timestamp) in shares])
281             if len(shnums) >= k:
282                 # this one is recoverable
283                 recoverable_versions.add(verinfo)
284
285         return recoverable_versions
286
287     def unrecoverable_versions(self):
288         """Return a set of versionids, one for each version that is currently
289         unrecoverable."""
290         versionmap = self.make_versionmap()
291
292         unrecoverable_versions = set()
293         for (verinfo, shares) in versionmap.items():
294             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
295              offsets_tuple) = verinfo
296             shnums = set([shnum for (shnum, server, timestamp) in shares])
297             if len(shnums) < k:
298                 unrecoverable_versions.add(verinfo)
299
300         return unrecoverable_versions
301
302     def best_recoverable_version(self):
303         """Return a single versionid, for the so-called 'best' recoverable
304         version. Sequence number is the primary sort criteria, followed by
305         root hash. Returns None if there are no recoverable versions."""
306         recoverable = list(self.recoverable_versions())
307         recoverable.sort()
308         if recoverable:
309             return recoverable[-1]
310         return None
311
312     def size_of_version(self, verinfo):
313         """Given a versionid (perhaps returned by best_recoverable_version),
314         return the size of the file in bytes."""
315         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
316          offsets_tuple) = verinfo
317         return datalength
318
319     def unrecoverable_newer_versions(self):
320         # Return a dict of versionid -> health, for versions that are
321         # unrecoverable and have later seqnums than any recoverable versions.
322         # These indicate that a write will lose data.
323         versionmap = self.make_versionmap()
324         healths = {} # maps verinfo to (found,k)
325         unrecoverable = set()
326         highest_recoverable_seqnum = -1
327         for (verinfo, shares) in versionmap.items():
328             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
329              offsets_tuple) = verinfo
330             shnums = set([shnum for (shnum, server, timestamp) in shares])
331             healths[verinfo] = (len(shnums),k)
332             if len(shnums) < k:
333                 unrecoverable.add(verinfo)
334             else:
335                 highest_recoverable_seqnum = max(seqnum,
336                                                  highest_recoverable_seqnum)
337
338         newversions = {}
339         for verinfo in unrecoverable:
340             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
341              offsets_tuple) = verinfo
342             if seqnum > highest_recoverable_seqnum:
343                 newversions[verinfo] = healths[verinfo]
344
345         return newversions
346
347
348     def needs_merge(self):
349         # return True if there are multiple recoverable versions with the
350         # same seqnum, meaning that MutableFileNode.read_best_version is not
351         # giving you the whole story, and that using its data to do a
352         # subsequent publish will lose information.
353         recoverable_seqnums = [verinfo[0]
354                                for verinfo in self.recoverable_versions()]
355         for seqnum in recoverable_seqnums:
356             if recoverable_seqnums.count(seqnum) > 1:
357                 return True
358         return False
359
360
361     def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
362         """
363         I return the update data for the given shnum
364         """
365         update_data = self.update_data[shnum]
366         update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
367         return update_datum
368
369
370     def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
371         """
372         I record the block hash tree for the given shnum.
373         """
374         self.update_data.setdefault(shnum , []).append((verinfo, data))
375
376
377 class ServermapUpdater:
378     def __init__(self, filenode, storage_broker, monitor, servermap,
379                  mode=MODE_READ, add_lease=False, update_range=None):
380         """I update a servermap, locating a sufficient number of useful
381         shares and remembering where they are located.
382
383         """
384
385         self._node = filenode
386         self._storage_broker = storage_broker
387         self._monitor = monitor
388         self._servermap = servermap
389         self.mode = mode
390         self._add_lease = add_lease
391         self._running = True
392
393         self._storage_index = filenode.get_storage_index()
394         self._last_failure = None
395
396         self._status = UpdateStatus()
397         self._status.set_storage_index(self._storage_index)
398         self._status.set_progress(0.0)
399         self._status.set_mode(mode)
400
401         self._servers_responded = set()
402
403         # how much data should we read?
404         # SDMF:
405         #  * if we only need the checkstring, then [0:75]
406         #  * if we need to validate the checkstring sig, then [543ish:799ish]
407         #  * if we need the verification key, then [107:436ish]
408         #   * the offset table at [75:107] tells us about the 'ish'
409         #  * if we need the encrypted private key, we want [-1216ish:]
410         #   * but we can't read from negative offsets
411         #   * the offset table tells us the 'ish', also the positive offset
412         # MDMF:
413         #  * Checkstring? [0:72]
414         #  * If we want to validate the checkstring, then [0:72], [143:?] --
415         #    the offset table will tell us for sure.
416         #  * If we need the verification key, we have to consult the offset
417         #    table as well.
418         # At this point, we don't know which we are. Our filenode can
419         # tell us, but it might be lying -- in some cases, we're
420         # responsible for telling it which kind of file it is.
421         self._read_size = 4000
422         if mode == MODE_CHECK:
423             # we use unpack_prefix_and_signature, so we need 1k
424             self._read_size = 1000
425         self._need_privkey = False
426
427         if mode == MODE_WRITE and not self._node.get_privkey():
428             self._need_privkey = True
429         # check+repair: repair requires the privkey, so if we didn't happen
430         # to ask for it during the check, we'll have problems doing the
431         # publish.
432
433         self.fetch_update_data = False
434         if mode == MODE_WRITE and update_range:
435             # We're updating the servermap in preparation for an
436             # in-place file update, so we need to fetch some additional
437             # data from each share that we find.
438             assert len(update_range) == 2
439
440             self.start_segment = update_range[0]
441             self.end_segment = update_range[1]
442             self.fetch_update_data = True
443
444         prefix = si_b2a(self._storage_index)[:5]
445         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
446                                    si=prefix, mode=mode)
447
448     def get_status(self):
449         return self._status
450
451     def log(self, *args, **kwargs):
452         if "parent" not in kwargs:
453             kwargs["parent"] = self._log_number
454         if "facility" not in kwargs:
455             kwargs["facility"] = "tahoe.mutable.mapupdate"
456         return log.msg(*args, **kwargs)
457
458     def update(self):
459         """Update the servermap to reflect current conditions. Returns a
460         Deferred that fires with the servermap once the update has finished."""
461         self._started = time.time()
462         self._status.set_active(True)
463
464         # self._valid_versions is a set of validated verinfo tuples. We just
465         # use it to remember which versions had valid signatures, so we can
466         # avoid re-checking the signatures for each share.
467         self._valid_versions = set()
468
469         self._done_deferred = defer.Deferred()
470
471         # first, which servers should be talk to? Any that were in our old
472         # servermap, plus "enough" others.
473
474         self._queries_completed = 0
475
476         sb = self._storage_broker
477         # All of the servers, permuted by the storage index, as usual.
478         full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
479         self.full_serverlist = full_serverlist # for use later, immutable
480         self.extra_servers = full_serverlist[:] # servers are removed as we use them
481         self._good_servers = set() # servers who had some shares
482         self._empty_servers = set() # servers who don't have any shares
483         self._bad_servers = set() # servers to whom our queries failed
484
485         k = self._node.get_required_shares()
486         # For what cases can these conditions work?
487         if k is None:
488             # make a guess
489             k = 3
490         N = self._node.get_total_shares()
491         if N is None:
492             N = 10
493         self.EPSILON = k
494         # we want to send queries to at least this many servers (although we
495         # might not wait for all of their answers to come back)
496         self.num_servers_to_query = k + self.EPSILON
497
498         if self.mode == MODE_CHECK:
499             # We want to query all of the servers.
500             initial_servers_to_query = list(full_serverlist)
501             must_query = set(initial_servers_to_query)
502             self.extra_servers = []
503         elif self.mode == MODE_WRITE:
504             # we're planning to replace all the shares, so we want a good
505             # chance of finding them all. We will keep searching until we've
506             # seen epsilon that don't have a share.
507             # We don't query all of the servers because that could take a while.
508             self.num_servers_to_query = N + self.EPSILON
509             initial_servers_to_query, must_query = self._build_initial_querylist()
510             self.required_num_empty_servers = self.EPSILON
511
512             # TODO: arrange to read lots of data from k-ish servers, to avoid
513             # the extra round trip required to read large directories. This
514             # might also avoid the round trip required to read the encrypted
515             # private key.
516
517         else: # MODE_READ, MODE_ANYTHING
518             # 2*k servers is good enough.
519             initial_servers_to_query, must_query = self._build_initial_querylist()
520
521         # this is a set of servers that we are required to get responses
522         # from: they are servers who used to have a share, so we need to know
523         # where they currently stand, even if that means we have to wait for
524         # a silently-lost TCP connection to time out. We remove servers from
525         # this set as we get responses.
526         self._must_query = set(must_query)
527
528         # now initial_servers_to_query contains the servers that we should
529         # ask, self.must_query contains the servers that we must have heard
530         # from before we can consider ourselves finished, and
531         # self.extra_servers contains the overflow (servers that we should
532         # tap if we don't get enough responses)
533         # I guess that self._must_query is a subset of
534         # initial_servers_to_query?
535         assert must_query.issubset(initial_servers_to_query)
536
537         self._send_initial_requests(initial_servers_to_query)
538         self._status.timings["initial_queries"] = time.time() - self._started
539         return self._done_deferred
540
541     def _build_initial_querylist(self):
542         # we send queries to everyone who was already in the sharemap
543         initial_servers_to_query = set(self._servermap.all_servers())
544         # and we must wait for responses from them
545         must_query = set(initial_servers_to_query)
546
547         while ((self.num_servers_to_query > len(initial_servers_to_query))
548                and self.extra_servers):
549             initial_servers_to_query.add(self.extra_servers.pop(0))
550
551         return initial_servers_to_query, must_query
552
553     def _send_initial_requests(self, serverlist):
554         self._status.set_status("Sending %d initial queries" % len(serverlist))
555         self._queries_outstanding = set()
556         for server in serverlist:
557             self._queries_outstanding.add(server)
558             self._do_query(server, self._storage_index, self._read_size)
559
560         if not serverlist:
561             # there is nobody to ask, so we need to short-circuit the state
562             # machine.
563             d = defer.maybeDeferred(self._check_for_done, None)
564             d.addErrback(self._fatal_error)
565
566         # control flow beyond this point: state machine. Receiving responses
567         # from queries is the input. We might send out more queries, or we
568         # might produce a result.
569         return None
570
571     def _do_query(self, server, storage_index, readsize):
572         self.log(format="sending query to [%(name)s], readsize=%(readsize)d",
573                  name=server.get_name(),
574                  readsize=readsize,
575                  level=log.NOISY)
576         started = time.time()
577         self._queries_outstanding.add(server)
578         d = self._do_read(server, storage_index, [], [(0, readsize)])
579         d.addCallback(self._got_results, server, readsize, storage_index,
580                       started)
581         d.addErrback(self._query_failed, server)
582         # errors that aren't handled by _query_failed (and errors caused by
583         # _query_failed) get logged, but we still want to check for doneness.
584         d.addErrback(log.err)
585         d.addErrback(self._fatal_error)
586         d.addCallback(self._check_for_done)
587         return d
588
589     def _do_read(self, server, storage_index, shnums, readv):
590         ss = server.get_rref()
591         if self._add_lease:
592             # send an add-lease message in parallel. The results are handled
593             # separately. This is sent before the slot_readv() so that we can
594             # be sure the add_lease is retired by the time slot_readv comes
595             # back (this relies upon our knowledge that the server code for
596             # add_lease is synchronous).
597             renew_secret = self._node.get_renewal_secret(server)
598             cancel_secret = self._node.get_cancel_secret(server)
599             d2 = ss.callRemote("add_lease", storage_index,
600                                renew_secret, cancel_secret)
601             # we ignore success
602             d2.addErrback(self._add_lease_failed, server, storage_index)
603         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
604         return d
605
606
607     def _got_corrupt_share(self, e, shnum, server, data, lp):
608         """
609         I am called when a remote server returns a corrupt share in
610         response to one of our queries. By corrupt, I mean a share
611         without a valid signature. I then record the failure, notify the
612         server of the corruption, and record the share as bad.
613         """
614         f = failure.Failure(e)
615         self.log(format="bad share: %(f_value)s", f_value=str(f),
616                  failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
617         # Notify the server that its share is corrupt.
618         self.notify_server_corruption(server, shnum, str(e))
619         # By flagging this as a bad server, we won't count any of
620         # the other shares on that server as valid, though if we
621         # happen to find a valid version string amongst those
622         # shares, we'll keep track of it so that we don't need
623         # to validate the signature on those again.
624         self._bad_servers.add(server)
625         self._last_failure = f
626         # XXX: Use the reader for this?
627         checkstring = data[:SIGNED_PREFIX_LENGTH]
628         self._servermap.mark_bad_share(server, shnum, checkstring)
629         self._servermap.add_problem(f)
630
631
632     def _cache_good_sharedata(self, verinfo, shnum, now, data):
633         """
634         If one of my queries returns successfully (which means that we
635         were able to and successfully did validate the signature), I
636         cache the data that we initially fetched from the storage
637         server. This will help reduce the number of roundtrips that need
638         to occur when the file is downloaded, or when the file is
639         updated.
640         """
641         if verinfo:
642             self._node._add_to_cache(verinfo, shnum, 0, data)
643
644
645     def _got_results(self, datavs, server, readsize, storage_index, started):
646         lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
647                       name=server.get_name(),
648                       numshares=len(datavs))
649         ss = server.get_rref()
650         now = time.time()
651         elapsed = now - started
652         def _done_processing(ignored=None):
653             self._queries_outstanding.discard(server)
654             self._servermap.mark_server_reachable(server)
655             self._must_query.discard(server)
656             self._queries_completed += 1
657         if not self._running:
658             self.log("but we're not running, so we'll ignore it", parent=lp)
659             _done_processing()
660             self._status.add_per_server_time(server, "late", started, elapsed)
661             return
662         self._status.add_per_server_time(server, "query", started, elapsed)
663
664         if datavs:
665             self._good_servers.add(server)
666         else:
667             self._empty_servers.add(server)
668
669         ds = []
670
671         for shnum,datav in datavs.items():
672             data = datav[0]
673             reader = MDMFSlotReadProxy(ss,
674                                        storage_index,
675                                        shnum,
676                                        data)
677             # our goal, with each response, is to validate the version
678             # information and share data as best we can at this point --
679             # we do this by validating the signature. To do this, we
680             # need to do the following:
681             #   - If we don't already have the public key, fetch the
682             #     public key. We use this to validate the signature.
683             if not self._node.get_pubkey():
684                 # fetch and set the public key.
685                 d = reader.get_verification_key()
686                 d.addCallback(lambda results, shnum=shnum:
687                               self._try_to_set_pubkey(results, server, shnum, lp))
688                 # XXX: Make self._pubkey_query_failed?
689                 d.addErrback(lambda error, shnum=shnum, data=data:
690                              self._got_corrupt_share(error, shnum, server, data, lp))
691             else:
692                 # we already have the public key.
693                 d = defer.succeed(None)
694
695             # Neither of these two branches return anything of
696             # consequence, so the first entry in our deferredlist will
697             # be None.
698
699             # - Next, we need the version information. We almost
700             #   certainly got this by reading the first thousand or so
701             #   bytes of the share on the storage server, so we
702             #   shouldn't need to fetch anything at this step.
703             d2 = reader.get_verinfo()
704             d2.addErrback(lambda error, shnum=shnum, data=data:
705                           self._got_corrupt_share(error, shnum, server, data, lp))
706             # - Next, we need the signature. For an SDMF share, it is
707             #   likely that we fetched this when doing our initial fetch
708             #   to get the version information. In MDMF, this lives at
709             #   the end of the share, so unless the file is quite small,
710             #   we'll need to do a remote fetch to get it.
711             d3 = reader.get_signature()
712             d3.addErrback(lambda error, shnum=shnum, data=data:
713                           self._got_corrupt_share(error, shnum, server, data, lp))
714             #  Once we have all three of these responses, we can move on
715             #  to validating the signature
716
717             # Does the node already have a privkey? If not, we'll try to
718             # fetch it here.
719             if self._need_privkey:
720                 d4 = reader.get_encprivkey()
721                 d4.addCallback(lambda results, shnum=shnum:
722                                self._try_to_validate_privkey(results, server, shnum, lp))
723                 d4.addErrback(lambda error, shnum=shnum:
724                               self._privkey_query_failed(error, server, shnum, lp))
725             else:
726                 d4 = defer.succeed(None)
727
728
729             if self.fetch_update_data:
730                 # fetch the block hash tree and first + last segment, as
731                 # configured earlier.
732                 # Then set them in wherever we happen to want to set
733                 # them.
734                 ds = []
735                 # XXX: We do this above, too. Is there a good way to
736                 # make the two routines share the value without
737                 # introducing more roundtrips?
738                 ds.append(reader.get_verinfo())
739                 ds.append(reader.get_blockhashes())
740                 ds.append(reader.get_block_and_salt(self.start_segment))
741                 ds.append(reader.get_block_and_salt(self.end_segment))
742                 d5 = deferredutil.gatherResults(ds)
743                 d5.addCallback(self._got_update_results_one_share, shnum)
744             else:
745                 d5 = defer.succeed(None)
746
747             dl = defer.DeferredList([d, d2, d3, d4, d5])
748             dl.addBoth(self._turn_barrier)
749             dl.addCallback(lambda results, shnum=shnum:
750                            self._got_signature_one_share(results, shnum, server, lp))
751             dl.addErrback(lambda error, shnum=shnum, data=data:
752                           self._got_corrupt_share(error, shnum, server, data, lp))
753             dl.addCallback(lambda verinfo, shnum=shnum, data=data:
754                            self._cache_good_sharedata(verinfo, shnum, now, data))
755             ds.append(dl)
756         # dl is a deferred list that will fire when all of the shares
757         # that we found on this server are done processing. When dl fires,
758         # we know that processing is done, so we can decrement the
759         # semaphore-like thing that we incremented earlier.
760         dl = defer.DeferredList(ds, fireOnOneErrback=True)
761         # Are we done? Done means that there are no more queries to
762         # send, that there are no outstanding queries, and that we
763         # haven't received any queries that are still processing. If we
764         # are done, self._check_for_done will cause the done deferred
765         # that we returned to our caller to fire, which tells them that
766         # they have a complete servermap, and that we won't be touching
767         # the servermap anymore.
768         dl.addCallback(_done_processing)
769         dl.addCallback(self._check_for_done)
770         dl.addErrback(self._fatal_error)
771         # all done!
772         self.log("_got_results done", parent=lp, level=log.NOISY)
773         return dl
774
775
776     def _turn_barrier(self, result):
777         """
778         I help the servermap updater avoid the recursion limit issues
779         discussed in #237.
780         """
781         return fireEventually(result)
782
783
784     def _try_to_set_pubkey(self, pubkey_s, server, shnum, lp):
785         if self._node.get_pubkey():
786             return # don't go through this again if we don't have to
787         fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
788         assert len(fingerprint) == 32
789         if fingerprint != self._node.get_fingerprint():
790             raise CorruptShareError(server, shnum,
791                                     "pubkey doesn't match fingerprint")
792         self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
793         assert self._node.get_pubkey()
794
795
796     def notify_server_corruption(self, server, shnum, reason):
797         rref = server.get_rref()
798         rref.callRemoteOnly("advise_corrupt_share",
799                             "mutable", self._storage_index, shnum, reason)
800
801
802     def _got_signature_one_share(self, results, shnum, server, lp):
803         # It is our job to give versioninfo to our caller. We need to
804         # raise CorruptShareError if the share is corrupt for any
805         # reason, something that our caller will handle.
806         self.log(format="_got_results: got shnum #%(shnum)d from serverid %(name)s",
807                  shnum=shnum,
808                  name=server.get_name(),
809                  level=log.NOISY,
810                  parent=lp)
811         if not self._running:
812             # We can't process the results, since we can't touch the
813             # servermap anymore.
814             self.log("but we're not running anymore.")
815             return None
816
817         _, verinfo, signature, __, ___ = results
818         (seqnum,
819          root_hash,
820          saltish,
821          segsize,
822          datalen,
823          k,
824          n,
825          prefix,
826          offsets) = verinfo[1]
827         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
828
829         # XXX: This should be done for us in the method, so
830         # presumably you can go in there and fix it.
831         verinfo = (seqnum,
832                    root_hash,
833                    saltish,
834                    segsize,
835                    datalen,
836                    k,
837                    n,
838                    prefix,
839                    offsets_tuple)
840         # This tuple uniquely identifies a share on the grid; we use it
841         # to keep track of the ones that we've already seen.
842
843         if verinfo not in self._valid_versions:
844             # This is a new version tuple, and we need to validate it
845             # against the public key before keeping track of it.
846             assert self._node.get_pubkey()
847             valid = self._node.get_pubkey().verify(prefix, signature[1])
848             if not valid:
849                 raise CorruptShareError(server, shnum,
850                                         "signature is invalid")
851
852         # ok, it's a valid verinfo. Add it to the list of validated
853         # versions.
854         self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
855                  % (seqnum, base32.b2a(root_hash)[:4],
856                     server.get_name(), shnum,
857                     k, n, segsize, datalen),
858                     parent=lp)
859         self._valid_versions.add(verinfo)
860         # We now know that this is a valid candidate verinfo. Whether or
861         # not this instance of it is valid is a matter for the next
862         # statement; at this point, we just know that if we see this
863         # version info again, that its signature checks out and that
864         # we're okay to skip the signature-checking step.
865
866         # (server, shnum) are bound in the method invocation.
867         if (server, shnum) in self._servermap.get_bad_shares():
868             # we've been told that the rest of the data in this share is
869             # unusable, so don't add it to the servermap.
870             self.log("but we've been told this is a bad share",
871                      parent=lp, level=log.UNUSUAL)
872             return verinfo
873
874         # Add the info to our servermap.
875         timestamp = time.time()
876         self._servermap.add_new_share(server, shnum, verinfo, timestamp)
877
878         return verinfo
879
880
881     def _got_update_results_one_share(self, results, share):
882         """
883         I record the update results in results.
884         """
885         assert len(results) == 4
886         verinfo, blockhashes, start, end = results
887         (seqnum,
888          root_hash,
889          saltish,
890          segsize,
891          datalen,
892          k,
893          n,
894          prefix,
895          offsets) = verinfo
896         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
897
898         # XXX: This should be done for us in the method, so
899         # presumably you can go in there and fix it.
900         verinfo = (seqnum,
901                    root_hash,
902                    saltish,
903                    segsize,
904                    datalen,
905                    k,
906                    n,
907                    prefix,
908                    offsets_tuple)
909
910         update_data = (blockhashes, start, end)
911         self._servermap.set_update_data_for_share_and_verinfo(share,
912                                                               verinfo,
913                                                               update_data)
914
915
916     def _deserialize_pubkey(self, pubkey_s):
917         verifier = rsa.create_verifying_key_from_string(pubkey_s)
918         return verifier
919
920
921     def _try_to_validate_privkey(self, enc_privkey, server, shnum, lp):
922         """
923         Given a writekey from a remote server, I validate it against the
924         writekey stored in my node. If it is valid, then I set the
925         privkey and encprivkey properties of the node.
926         """
927         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
928         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
929         if alleged_writekey != self._node.get_writekey():
930             self.log("invalid privkey from %s shnum %d" %
931                      (server.get_name(), shnum),
932                      parent=lp, level=log.WEIRD, umid="aJVccw")
933             return
934
935         # it's good
936         self.log("got valid privkey from shnum %d on serverid %s" %
937                  (shnum, server.get_name()),
938                  parent=lp)
939         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
940         self._node._populate_encprivkey(enc_privkey)
941         self._node._populate_privkey(privkey)
942         self._need_privkey = False
943         self._status.set_privkey_from(server)
944
945
946     def _add_lease_failed(self, f, server, storage_index):
947         # Older versions of Tahoe didn't handle the add-lease message very
948         # well: <=1.1.0 throws a NameError because it doesn't implement
949         # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
950         # (which is most of them, since we send add-lease to everybody,
951         # before we know whether or not they have any shares for us), and
952         # 1.2.0 throws KeyError even on known buckets due to an internal bug
953         # in the latency-measuring code.
954
955         # we want to ignore the known-harmless errors and log the others. In
956         # particular we want to log any local errors caused by coding
957         # problems.
958
959         if f.check(DeadReferenceError):
960             return
961         if f.check(RemoteException):
962             if f.value.failure.check(KeyError, IndexError, NameError):
963                 # this may ignore a bit too much, but that only hurts us
964                 # during debugging
965                 return
966             self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
967                      name=server.get_name(),
968                      f_value=str(f.value),
969                      failure=f,
970                      level=log.WEIRD, umid="iqg3mw")
971             return
972         # local errors are cause for alarm
973         log.err(f,
974                 format="local error in add_lease to [%(name)s]: %(f_value)s",
975                 name=server.get_name(),
976                 f_value=str(f.value),
977                 level=log.WEIRD, umid="ZWh6HA")
978
979     def _query_failed(self, f, server):
980         if not self._running:
981             return
982         level = log.WEIRD
983         if f.check(DeadReferenceError):
984             level = log.UNUSUAL
985         self.log(format="error during query: %(f_value)s",
986                  f_value=str(f.value), failure=f,
987                  level=level, umid="IHXuQg")
988         self._must_query.discard(server)
989         self._queries_outstanding.discard(server)
990         self._bad_servers.add(server)
991         self._servermap.add_problem(f)
992         # a server could be in both ServerMap.reachable_servers and
993         # .unreachable_servers if they responded to our query, but then an
994         # exception was raised in _got_results.
995         self._servermap.mark_server_unreachable(server)
996         self._queries_completed += 1
997         self._last_failure = f
998
999
1000     def _privkey_query_failed(self, f, server, shnum, lp):
1001         self._queries_outstanding.discard(server)
1002         if not self._running:
1003             return
1004         level = log.WEIRD
1005         if f.check(DeadReferenceError):
1006             level = log.UNUSUAL
1007         self.log(format="error during privkey query: %(f_value)s",
1008                  f_value=str(f.value), failure=f,
1009                  parent=lp, level=level, umid="McoJ5w")
1010         self._servermap.add_problem(f)
1011         self._last_failure = f
1012
1013
1014     def _check_for_done(self, res):
1015         # exit paths:
1016         #  return self._send_more_queries(outstanding) : send some more queries
1017         #  return self._done() : all done
1018         #  return : keep waiting, no new queries
1019         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
1020                               "%(outstanding)d queries outstanding, "
1021                               "%(extra)d extra servers available, "
1022                               "%(must)d 'must query' servers left, "
1023                               "need_privkey=%(need_privkey)s"
1024                               ),
1025                       mode=self.mode,
1026                       outstanding=len(self._queries_outstanding),
1027                       extra=len(self.extra_servers),
1028                       must=len(self._must_query),
1029                       need_privkey=self._need_privkey,
1030                       level=log.NOISY,
1031                       )
1032
1033         if not self._running:
1034             self.log("but we're not running", parent=lp, level=log.NOISY)
1035             return
1036
1037         if self._must_query:
1038             # we are still waiting for responses from servers that used to have
1039             # a share, so we must continue to wait. No additional queries are
1040             # required at this time.
1041             self.log("%d 'must query' servers left" % len(self._must_query),
1042                      level=log.NOISY, parent=lp)
1043             return
1044
1045         if (not self._queries_outstanding and not self.extra_servers):
1046             # all queries have retired, and we have no servers left to ask. No
1047             # more progress can be made, therefore we are done.
1048             self.log("all queries are retired, no extra servers: done",
1049                      parent=lp)
1050             return self._done()
1051
1052         recoverable_versions = self._servermap.recoverable_versions()
1053         unrecoverable_versions = self._servermap.unrecoverable_versions()
1054
1055         # what is our completion policy? how hard should we work?
1056
1057         if self.mode == MODE_ANYTHING:
1058             if recoverable_versions:
1059                 self.log("%d recoverable versions: done"
1060                          % len(recoverable_versions),
1061                          parent=lp)
1062                 return self._done()
1063
1064         if self.mode == MODE_CHECK:
1065             # we used self._must_query, and we know there aren't any
1066             # responses still waiting, so that means we must be done
1067             self.log("done", parent=lp)
1068             return self._done()
1069
1070         MAX_IN_FLIGHT = 5
1071         if self.mode == MODE_READ:
1072             # if we've queried k+epsilon servers, and we see a recoverable
1073             # version, and we haven't seen any unrecoverable higher-seqnum'ed
1074             # versions, then we're done.
1075
1076             if self._queries_completed < self.num_servers_to_query:
1077                 self.log(format="%(completed)d completed, %(query)d to query: need more",
1078                          completed=self._queries_completed,
1079                          query=self.num_servers_to_query,
1080                          level=log.NOISY, parent=lp)
1081                 return self._send_more_queries(MAX_IN_FLIGHT)
1082             if not recoverable_versions:
1083                 self.log("no recoverable versions: need more",
1084                          level=log.NOISY, parent=lp)
1085                 return self._send_more_queries(MAX_IN_FLIGHT)
1086             highest_recoverable = max(recoverable_versions)
1087             highest_recoverable_seqnum = highest_recoverable[0]
1088             for unrec_verinfo in unrecoverable_versions:
1089                 if unrec_verinfo[0] > highest_recoverable_seqnum:
1090                     # there is evidence of a higher-seqnum version, but we
1091                     # don't yet see enough shares to recover it. Try harder.
1092                     # TODO: consider sending more queries.
1093                     # TODO: consider limiting the search distance
1094                     self.log("evidence of higher seqnum: need more",
1095                              level=log.UNUSUAL, parent=lp)
1096                     return self._send_more_queries(MAX_IN_FLIGHT)
1097             # all the unrecoverable versions were old or concurrent with a
1098             # recoverable version. Good enough.
1099             self.log("no higher-seqnum: done", parent=lp)
1100             return self._done()
1101
1102         if self.mode == MODE_WRITE:
1103             # we want to keep querying until we've seen a few that don't have
1104             # any shares, to be sufficiently confident that we've seen all
1105             # the shares. This is still less work than MODE_CHECK, which asks
1106             # every server in the world.
1107
1108             if not recoverable_versions:
1109                 self.log("no recoverable versions: need more", parent=lp,
1110                          level=log.NOISY)
1111                 return self._send_more_queries(MAX_IN_FLIGHT)
1112
1113             last_found = -1
1114             last_not_responded = -1
1115             num_not_responded = 0
1116             num_not_found = 0
1117             states = []
1118             found_boundary = False
1119
1120             for i,server in enumerate(self.full_serverlist):
1121                 if server in self._bad_servers:
1122                     # query failed
1123                     states.append("x")
1124                     #self.log("loop [%s]: x" % server.get_name()
1125                 elif server in self._empty_servers:
1126                     # no shares
1127                     states.append("0")
1128                     #self.log("loop [%s]: 0" % server.get_name()
1129                     if last_found != -1:
1130                         num_not_found += 1
1131                         if num_not_found >= self.EPSILON:
1132                             self.log("found our boundary, %s" %
1133                                      "".join(states),
1134                                      parent=lp, level=log.NOISY)
1135                             found_boundary = True
1136                             break
1137
1138                 elif server in self._good_servers:
1139                     # yes shares
1140                     states.append("1")
1141                     #self.log("loop [%s]: 1" % server.get_name()
1142                     last_found = i
1143                     num_not_found = 0
1144                 else:
1145                     # not responded yet
1146                     states.append("?")
1147                     #self.log("loop [%s]: ?" % server.get_name()
1148                     last_not_responded = i
1149                     num_not_responded += 1
1150
1151             if found_boundary:
1152                 # we need to know that we've gotten answers from
1153                 # everybody to the left of here
1154                 if last_not_responded == -1:
1155                     # we're done
1156                     self.log("have all our answers",
1157                              parent=lp, level=log.NOISY)
1158                     # .. unless we're still waiting on the privkey
1159                     if self._need_privkey:
1160                         self.log("but we're still waiting for the privkey",
1161                                  parent=lp, level=log.NOISY)
1162                         # if we found the boundary but we haven't yet found
1163                         # the privkey, we may need to look further. If
1164                         # somehow all the privkeys were corrupted (but the
1165                         # shares were readable), then this is likely to do an
1166                         # exhaustive search.
1167                         return self._send_more_queries(MAX_IN_FLIGHT)
1168                     return self._done()
1169                 # still waiting for somebody
1170                 return self._send_more_queries(num_not_responded)
1171
1172             # if we hit here, we didn't find our boundary, so we're still
1173             # waiting for servers
1174             self.log("no boundary yet, %s" % "".join(states), parent=lp,
1175                      level=log.NOISY)
1176             return self._send_more_queries(MAX_IN_FLIGHT)
1177
1178         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
1179         # arbitrary, really I want this to be something like k -
1180         # max(known_version_sharecounts) + some extra
1181         self.log("catchall: need more", parent=lp, level=log.NOISY)
1182         return self._send_more_queries(MAX_IN_FLIGHT)
1183
1184     def _send_more_queries(self, num_outstanding):
1185         more_queries = []
1186
1187         while True:
1188             self.log(format=" there are %(outstanding)d queries outstanding",
1189                      outstanding=len(self._queries_outstanding),
1190                      level=log.NOISY)
1191             active_queries = len(self._queries_outstanding) + len(more_queries)
1192             if active_queries >= num_outstanding:
1193                 break
1194             if not self.extra_servers:
1195                 break
1196             more_queries.append(self.extra_servers.pop(0))
1197
1198         self.log(format="sending %(more)d more queries: %(who)s",
1199                  more=len(more_queries),
1200                  who=" ".join(["[%s]" % s.get_name() for s in more_queries]),
1201                  level=log.NOISY)
1202
1203         for server in more_queries:
1204             self._do_query(server, self._storage_index, self._read_size)
1205             # we'll retrigger when those queries come back
1206
1207     def _done(self):
1208         if not self._running:
1209             self.log("not running; we're already done")
1210             return
1211         self._running = False
1212         now = time.time()
1213         elapsed = now - self._started
1214         self._status.set_finished(now)
1215         self._status.timings["total"] = elapsed
1216         self._status.set_progress(1.0)
1217         self._status.set_status("Finished")
1218         self._status.set_active(False)
1219
1220         self._servermap.set_last_update(self.mode, self._started)
1221         # the servermap will not be touched after this
1222         self.log("servermap: %s" % self._servermap.summarize_versions())
1223
1224         eventually(self._done_deferred.callback, self._servermap)
1225
1226     def _fatal_error(self, f):
1227         self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1228         self._done_deferred.errback(f)
1229
1230