]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
Merge pull request #236 from daira/2725.timezone-test.0
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time, copy
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually, \
8                          fireEventually
9 from allmydata.util import base32, hashutil, log, deferredutil
10 from allmydata.util.dictutil import DictOfSets
11 from allmydata.storage.server import si_b2a
12 from allmydata.interfaces import IServermapUpdaterStatus
13 from pycryptopp.publickey import rsa
14
15 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \
16      MODE_READ, MODE_REPAIR, CorruptShareError
17 from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
18
19 class UpdateStatus:
20     implements(IServermapUpdaterStatus)
21     statusid_counter = count(0)
22     def __init__(self):
23         self.timings = {}
24         self.timings["per_server"] = {}
25         self.timings["cumulative_verify"] = 0.0
26         self.privkey_from = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.mode = "?"
31         self.status = "Not started"
32         self.progress = 0.0
33         self.counter = self.statusid_counter.next()
34         self.started = time.time()
35         self.finished = None
36
37     def add_per_server_time(self, server, op, sent, elapsed):
38         assert op in ("query", "late", "privkey")
39         if server not in self.timings["per_server"]:
40             self.timings["per_server"][server] = []
41         self.timings["per_server"][server].append((op,sent,elapsed))
42
43     def get_started(self):
44         return self.started
45     def get_finished(self):
46         return self.finished
47     def get_storage_index(self):
48         return self.storage_index
49     def get_mode(self):
50         return self.mode
51     def get_servermap(self):
52         return self.servermap
53     def get_privkey_from(self):
54         return self.privkey_from
55     def using_helper(self):
56         return False
57     def get_size(self):
58         return "-NA-"
59     def get_status(self):
60         return self.status
61     def get_progress(self):
62         return self.progress
63     def get_active(self):
64         return self.active
65     def get_counter(self):
66         return self.counter
67
68     def set_storage_index(self, si):
69         self.storage_index = si
70     def set_mode(self, mode):
71         self.mode = mode
72     def set_privkey_from(self, server):
73         self.privkey_from = server
74     def set_status(self, status):
75         self.status = status
76     def set_progress(self, value):
77         self.progress = value
78     def set_active(self, value):
79         self.active = value
80     def set_finished(self, when):
81         self.finished = when
82
83 class ServerMap:
84     """I record the placement of mutable shares.
85
86     This object records which shares (of various versions) are located on
87     which servers.
88
89     One purpose I serve is to inform callers about which versions of the
90     mutable file are recoverable and 'current'.
91
92     A second purpose is to serve as a state marker for test-and-set
93     operations. I am passed out of retrieval operations and back into publish
94     operations, which means 'publish this new version, but only if nothing
95     has changed since I last retrieved this data'. This reduces the chances
96     of clobbering a simultaneous (uncoordinated) write.
97
98     @var _known_shares: a dictionary, mapping a (server, shnum) tuple to a
99                         (versionid, timestamp) tuple. Each 'versionid' is a
100                         tuple of (seqnum, root_hash, IV, segsize, datalength,
101                         k, N, signed_prefix, offsets)
102
103     @ivar _bad_shares: dict with keys of (server, shnum) tuples, describing
104                        shares that I should ignore (because a previous user
105                        of the servermap determined that they were invalid).
106                        The updater only locates a certain number of shares:
107                        if some of these turn out to have integrity problems
108                        and are unusable, the caller will need to mark those
109                        shares as bad, then re-update the servermap, then try
110                        again. The dict maps (server, shnum) tuple to old
111                        checkstring.
112     """
113
114     def __init__(self):
115         self._known_shares = {}
116         self.unreachable_servers = set() # servers that didn't respond to queries
117         self.reachable_servers = set() # servers that did respond to queries
118         self._problems = [] # mostly for debugging
119         self._bad_shares = {} # maps (server,shnum) to old checkstring
120         self._last_update_mode = None
121         self._last_update_time = 0
122         self.proxies = {}
123         self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
124         # where blockhashes is a list of bytestrings (the result of
125         # layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
126         # (block,salt) tuple-of-bytestrings from get_block_and_salt()
127
128     def copy(self):
129         s = ServerMap()
130         s._known_shares = self._known_shares.copy() # tuple->tuple
131         s.unreachable_servers = set(self.unreachable_servers)
132         s.reachable_servers = set(self.reachable_servers)
133         s._problems = self._problems[:]
134         s._bad_shares = self._bad_shares.copy() # tuple->str
135         s._last_update_mode = self._last_update_mode
136         s._last_update_time = self._last_update_time
137         s.update_data = copy.deepcopy(self.update_data)
138         return s
139
140     def get_reachable_servers(self):
141         return self.reachable_servers
142
143     def mark_server_reachable(self, server):
144         self.reachable_servers.add(server)
145
146     def mark_server_unreachable(self, server):
147         self.unreachable_servers.add(server)
148
149     def mark_bad_share(self, server, shnum, checkstring):
150         """This share was found to be bad, either in the checkstring or
151         signature (detected during mapupdate), or deeper in the share
152         (detected at retrieve time). Remove it from our list of useful
153         shares, and remember that it is bad so we don't add it back again
154         later. We record the share's old checkstring (which might be
155         corrupted or badly signed) so that a repair operation can do the
156         test-and-set using it as a reference.
157         """
158         key = (server, shnum) # record checkstring
159         self._bad_shares[key] = checkstring
160         self._known_shares.pop(key, None)
161
162     def get_bad_shares(self):
163         # key=(server,shnum) -> checkstring
164         return self._bad_shares
165
166     def add_new_share(self, server, shnum, verinfo, timestamp):
167         """We've written a new share out, replacing any that was there
168         before."""
169         key = (server, shnum)
170         self._bad_shares.pop(key, None)
171         self._known_shares[key] = (verinfo, timestamp)
172
173     def add_problem(self, f):
174         self._problems.append(f)
175     def get_problems(self):
176         return self._problems
177
178     def set_last_update(self, mode, when):
179         self._last_update_mode = mode
180         self._last_update_time = when
181     def get_last_update(self):
182         return (self._last_update_mode, self._last_update_time)
183
184     def dump(self, out=sys.stdout):
185         print >>out, "servermap:"
186
187         for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
188             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
189              offsets_tuple) = verinfo
190             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
191                           (server.get_name(), shnum,
192                            seqnum, base32.b2a(root_hash)[:4], k, N,
193                            datalength))
194         if self._problems:
195             print >>out, "%d PROBLEMS" % len(self._problems)
196             for f in self._problems:
197                 print >>out, str(f)
198         return out
199
200     def all_servers(self):
201         return set([server for (server, shnum) in self._known_shares])
202
203     def all_servers_for_version(self, verinfo):
204         """Return a set of servers that hold shares for the given version."""
205         return set([server
206                     for ( (server, shnum), (verinfo2, timestamp) )
207                     in self._known_shares.items()
208                     if verinfo == verinfo2])
209
210     def get_known_shares(self):
211         # maps (server,shnum) to (versionid,timestamp)
212         return self._known_shares
213
214     def make_sharemap(self):
215         """Return a dict that maps shnum to a set of servers that hold it."""
216         sharemap = DictOfSets()
217         for (server, shnum) in self._known_shares:
218             sharemap.add(shnum, server)
219         return sharemap
220
221     def make_versionmap(self):
222         """Return a dict that maps versionid to sets of (shnum, server,
223         timestamp) tuples."""
224         versionmap = DictOfSets()
225         for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
226             versionmap.add(verinfo, (shnum, server, timestamp))
227         return versionmap
228
229     def debug_shares_on_server(self, server): # used by tests
230         return set([shnum for (s, shnum) in self._known_shares if s == server])
231
232     def version_on_server(self, server, shnum):
233         key = (server, shnum)
234         if key in self._known_shares:
235             (verinfo, timestamp) = self._known_shares[key]
236             return verinfo
237         return None
238
239     def shares_available(self):
240         """Return a dict that maps verinfo to tuples of
241         (num_distinct_shares, k, N) tuples."""
242         versionmap = self.make_versionmap()
243         all_shares = {}
244         for verinfo, shares in versionmap.items():
245             s = set()
246             for (shnum, server, timestamp) in shares:
247                 s.add(shnum)
248             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
249              offsets_tuple) = verinfo
250             all_shares[verinfo] = (len(s), k, N)
251         return all_shares
252
253     def highest_seqnum(self):
254         available = self.shares_available()
255         seqnums = [verinfo[0]
256                    for verinfo in available.keys()]
257         seqnums.append(0)
258         return max(seqnums)
259
260     def summarize_version(self, verinfo):
261         """Take a versionid, return a string that describes it."""
262         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
263          offsets_tuple) = verinfo
264         return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
265
266     def summarize_versions(self):
267         """Return a string describing which versions we know about."""
268         versionmap = self.make_versionmap()
269         bits = []
270         for (verinfo, shares) in versionmap.items():
271             vstr = self.summarize_version(verinfo)
272             shnums = set([shnum for (shnum, server, timestamp) in shares])
273             bits.append("%d*%s" % (len(shnums), vstr))
274         return "/".join(bits)
275
276     def recoverable_versions(self):
277         """Return a set of versionids, one for each version that is currently
278         recoverable."""
279         versionmap = self.make_versionmap()
280         recoverable_versions = set()
281         for (verinfo, shares) in versionmap.items():
282             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
283              offsets_tuple) = verinfo
284             shnums = set([shnum for (shnum, server, timestamp) in shares])
285             if len(shnums) >= k:
286                 # this one is recoverable
287                 recoverable_versions.add(verinfo)
288
289         return recoverable_versions
290
291     def unrecoverable_versions(self):
292         """Return a set of versionids, one for each version that is currently
293         unrecoverable."""
294         versionmap = self.make_versionmap()
295
296         unrecoverable_versions = set()
297         for (verinfo, shares) in versionmap.items():
298             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
299              offsets_tuple) = verinfo
300             shnums = set([shnum for (shnum, server, timestamp) in shares])
301             if len(shnums) < k:
302                 unrecoverable_versions.add(verinfo)
303
304         return unrecoverable_versions
305
306     def best_recoverable_version(self):
307         """Return a single versionid, for the so-called 'best' recoverable
308         version. Sequence number is the primary sort criteria, followed by
309         root hash. Returns None if there are no recoverable versions."""
310         recoverable = list(self.recoverable_versions())
311         recoverable.sort()
312         if recoverable:
313             return recoverable[-1]
314         return None
315
316     def size_of_version(self, verinfo):
317         """Given a versionid (perhaps returned by best_recoverable_version),
318         return the size of the file in bytes."""
319         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
320          offsets_tuple) = verinfo
321         return datalength
322
323     def unrecoverable_newer_versions(self):
324         # Return a dict of versionid -> health, for versions that are
325         # unrecoverable and have later seqnums than any recoverable versions.
326         # These indicate that a write will lose data.
327         versionmap = self.make_versionmap()
328         healths = {} # maps verinfo to (found,k)
329         unrecoverable = set()
330         highest_recoverable_seqnum = -1
331         for (verinfo, shares) in versionmap.items():
332             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
333              offsets_tuple) = verinfo
334             shnums = set([shnum for (shnum, server, timestamp) in shares])
335             healths[verinfo] = (len(shnums),k)
336             if len(shnums) < k:
337                 unrecoverable.add(verinfo)
338             else:
339                 highest_recoverable_seqnum = max(seqnum,
340                                                  highest_recoverable_seqnum)
341
342         newversions = {}
343         for verinfo in unrecoverable:
344             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
345              offsets_tuple) = verinfo
346             if seqnum > highest_recoverable_seqnum:
347                 newversions[verinfo] = healths[verinfo]
348
349         return newversions
350
351
352     def needs_merge(self):
353         # return True if there are multiple recoverable versions with the
354         # same seqnum, meaning that MutableFileNode.read_best_version is not
355         # giving you the whole story, and that using its data to do a
356         # subsequent publish will lose information.
357         recoverable_seqnums = [verinfo[0]
358                                for verinfo in self.recoverable_versions()]
359         for seqnum in recoverable_seqnums:
360             if recoverable_seqnums.count(seqnum) > 1:
361                 return True
362         return False
363
364
365     def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
366         """
367         I return the update data for the given shnum
368         """
369         update_data = self.update_data[shnum]
370         update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
371         return update_datum
372
373
374     def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
375         """
376         I record the block hash tree for the given shnum.
377         """
378         self.update_data.setdefault(shnum , []).append((verinfo, data))
379
380
381 class ServermapUpdater:
382     def __init__(self, filenode, storage_broker, monitor, servermap,
383                  mode=MODE_READ, add_lease=False, update_range=None):
384         """I update a servermap, locating a sufficient number of useful
385         shares and remembering where they are located.
386
387         """
388
389         self._node = filenode
390         self._storage_broker = storage_broker
391         self._monitor = monitor
392         self._servermap = servermap
393         self.mode = mode
394         self._add_lease = add_lease
395         self._running = True
396
397         self._storage_index = filenode.get_storage_index()
398         self._last_failure = None
399
400         self._status = UpdateStatus()
401         self._status.set_storage_index(self._storage_index)
402         self._status.set_progress(0.0)
403         self._status.set_mode(mode)
404
405         self._servers_responded = set()
406
407         # how much data should we read?
408         # SDMF:
409         #  * if we only need the checkstring, then [0:75]
410         #  * if we need to validate the checkstring sig, then [543ish:799ish]
411         #  * if we need the verification key, then [107:436ish]
412         #   * the offset table at [75:107] tells us about the 'ish'
413         #  * if we need the encrypted private key, we want [-1216ish:]
414         #   * but we can't read from negative offsets
415         #   * the offset table tells us the 'ish', also the positive offset
416         # MDMF:
417         #  * Checkstring? [0:72]
418         #  * If we want to validate the checkstring, then [0:72], [143:?] --
419         #    the offset table will tell us for sure.
420         #  * If we need the verification key, we have to consult the offset
421         #    table as well.
422         # At this point, we don't know which we are. Our filenode can
423         # tell us, but it might be lying -- in some cases, we're
424         # responsible for telling it which kind of file it is.
425         self._read_size = 4000
426         if mode == MODE_CHECK:
427             # we use unpack_prefix_and_signature, so we need 1k
428             self._read_size = 1000
429         self._need_privkey = False
430
431         if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey():
432             self._need_privkey = True
433         # check+repair: repair requires the privkey, so if we didn't happen
434         # to ask for it during the check, we'll have problems doing the
435         # publish.
436
437         self.fetch_update_data = False
438         if mode == MODE_WRITE and update_range:
439             # We're updating the servermap in preparation for an
440             # in-place file update, so we need to fetch some additional
441             # data from each share that we find.
442             assert len(update_range) == 2
443
444             self.start_segment = update_range[0]
445             self.end_segment = update_range[1]
446             self.fetch_update_data = True
447
448         prefix = si_b2a(self._storage_index)[:5]
449         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
450                                    si=prefix, mode=mode)
451
452     def get_status(self):
453         return self._status
454
455     def log(self, *args, **kwargs):
456         if "parent" not in kwargs:
457             kwargs["parent"] = self._log_number
458         if "facility" not in kwargs:
459             kwargs["facility"] = "tahoe.mutable.mapupdate"
460         return log.msg(*args, **kwargs)
461
462     def update(self):
463         """Update the servermap to reflect current conditions. Returns a
464         Deferred that fires with the servermap once the update has finished."""
465         self._started = time.time()
466         self._status.set_active(True)
467
468         # self._valid_versions is a set of validated verinfo tuples. We just
469         # use it to remember which versions had valid signatures, so we can
470         # avoid re-checking the signatures for each share.
471         self._valid_versions = set()
472
473         self._done_deferred = defer.Deferred()
474
475         # first, which servers should be talk to? Any that were in our old
476         # servermap, plus "enough" others.
477
478         self._queries_completed = 0
479
480         sb = self._storage_broker
481         # All of the servers, permuted by the storage index, as usual.
482         full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
483         self.full_serverlist = full_serverlist # for use later, immutable
484         self.extra_servers = full_serverlist[:] # servers are removed as we use them
485         self._good_servers = set() # servers who had some shares
486         self._servers_with_shares = set() #servers that we know have shares now
487         self._empty_servers = set() # servers who don't have any shares
488         self._bad_servers = set() # servers to whom our queries failed
489
490         k = self._node.get_required_shares()
491         # For what cases can these conditions work?
492         if k is None:
493             # make a guess
494             k = 3
495         N = self._node.get_total_shares()
496         if N is None:
497             N = 10
498         self.EPSILON = k
499         # we want to send queries to at least this many servers (although we
500         # might not wait for all of their answers to come back)
501         self.num_servers_to_query = k + self.EPSILON
502
503         if self.mode in (MODE_CHECK, MODE_REPAIR):
504             # We want to query all of the servers.
505             initial_servers_to_query = list(full_serverlist)
506             must_query = set(initial_servers_to_query)
507             self.extra_servers = []
508         elif self.mode == MODE_WRITE:
509             # we're planning to replace all the shares, so we want a good
510             # chance of finding them all. We will keep searching until we've
511             # seen epsilon that don't have a share.
512             # We don't query all of the servers because that could take a while.
513             self.num_servers_to_query = N + self.EPSILON
514             initial_servers_to_query, must_query = self._build_initial_querylist()
515             self.required_num_empty_servers = self.EPSILON
516
517             # TODO: arrange to read lots of data from k-ish servers, to avoid
518             # the extra round trip required to read large directories. This
519             # might also avoid the round trip required to read the encrypted
520             # private key.
521
522         else: # MODE_READ, MODE_ANYTHING
523             # 2*k servers is good enough.
524             initial_servers_to_query, must_query = self._build_initial_querylist()
525
526         # this is a set of servers that we are required to get responses
527         # from: they are servers who used to have a share, so we need to know
528         # where they currently stand, even if that means we have to wait for
529         # a silently-lost TCP connection to time out. We remove servers from
530         # this set as we get responses.
531         self._must_query = set(must_query)
532
533         # now initial_servers_to_query contains the servers that we should
534         # ask, self.must_query contains the servers that we must have heard
535         # from before we can consider ourselves finished, and
536         # self.extra_servers contains the overflow (servers that we should
537         # tap if we don't get enough responses)
538         # I guess that self._must_query is a subset of
539         # initial_servers_to_query?
540         assert must_query.issubset(initial_servers_to_query)
541
542         self._send_initial_requests(initial_servers_to_query)
543         self._status.timings["initial_queries"] = time.time() - self._started
544         return self._done_deferred
545
546     def _build_initial_querylist(self):
547         # we send queries to everyone who was already in the sharemap
548         initial_servers_to_query = set(self._servermap.all_servers())
549         # and we must wait for responses from them
550         must_query = set(initial_servers_to_query)
551
552         while ((self.num_servers_to_query > len(initial_servers_to_query))
553                and self.extra_servers):
554             initial_servers_to_query.add(self.extra_servers.pop(0))
555
556         return initial_servers_to_query, must_query
557
558     def _send_initial_requests(self, serverlist):
559         self._status.set_status("Sending %d initial queries" % len(serverlist))
560         self._queries_outstanding = set()
561         for server in serverlist:
562             self._queries_outstanding.add(server)
563             self._do_query(server, self._storage_index, self._read_size)
564
565         if not serverlist:
566             # there is nobody to ask, so we need to short-circuit the state
567             # machine.
568             d = defer.maybeDeferred(self._check_for_done, None)
569             d.addErrback(self._fatal_error)
570
571         # control flow beyond this point: state machine. Receiving responses
572         # from queries is the input. We might send out more queries, or we
573         # might produce a result.
574         return None
575
576     def _do_query(self, server, storage_index, readsize):
577         self.log(format="sending query to [%(name)s], readsize=%(readsize)d",
578                  name=server.get_name(),
579                  readsize=readsize,
580                  level=log.NOISY)
581         started = time.time()
582         self._queries_outstanding.add(server)
583         d = self._do_read(server, storage_index, [], [(0, readsize)])
584         d.addCallback(self._got_results, server, readsize, storage_index,
585                       started)
586         d.addErrback(self._query_failed, server)
587         # errors that aren't handled by _query_failed (and errors caused by
588         # _query_failed) get logged, but we still want to check for doneness.
589         d.addErrback(log.err)
590         d.addErrback(self._fatal_error)
591         d.addCallback(self._check_for_done)
592         return d
593
594     def _do_read(self, server, storage_index, shnums, readv):
595         ss = server.get_rref()
596         if self._add_lease:
597             # send an add-lease message in parallel. The results are handled
598             # separately. This is sent before the slot_readv() so that we can
599             # be sure the add_lease is retired by the time slot_readv comes
600             # back (this relies upon our knowledge that the server code for
601             # add_lease is synchronous).
602             renew_secret = self._node.get_renewal_secret(server)
603             cancel_secret = self._node.get_cancel_secret(server)
604             d2 = ss.callRemote("add_lease", storage_index,
605                                renew_secret, cancel_secret)
606             # we ignore success
607             d2.addErrback(self._add_lease_failed, server, storage_index)
608         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
609         return d
610
611
612     def _got_corrupt_share(self, e, shnum, server, data, lp):
613         """
614         I am called when a remote server returns a corrupt share in
615         response to one of our queries. By corrupt, I mean a share
616         without a valid signature. I then record the failure, notify the
617         server of the corruption, and record the share as bad.
618         """
619         f = failure.Failure(e)
620         self.log(format="bad share: %(f_value)s", f_value=str(f),
621                  failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
622         # Notify the server that its share is corrupt.
623         self.notify_server_corruption(server, shnum, str(e))
624         # By flagging this as a bad server, we won't count any of
625         # the other shares on that server as valid, though if we
626         # happen to find a valid version string amongst those
627         # shares, we'll keep track of it so that we don't need
628         # to validate the signature on those again.
629         self._bad_servers.add(server)
630         self._last_failure = f
631         # XXX: Use the reader for this?
632         checkstring = data[:SIGNED_PREFIX_LENGTH]
633         self._servermap.mark_bad_share(server, shnum, checkstring)
634         self._servermap.add_problem(f)
635
636
637     def _got_results(self, datavs, server, readsize, storage_index, started):
638         lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
639                       name=server.get_name(),
640                       numshares=len(datavs))
641         ss = server.get_rref()
642         now = time.time()
643         elapsed = now - started
644         def _done_processing(ignored=None):
645             self._queries_outstanding.discard(server)
646             self._servermap.mark_server_reachable(server)
647             self._must_query.discard(server)
648             self._queries_completed += 1
649         if not self._running:
650             self.log("but we're not running, so we'll ignore it", parent=lp)
651             _done_processing()
652             self._status.add_per_server_time(server, "late", started, elapsed)
653             return
654         self._status.add_per_server_time(server, "query", started, elapsed)
655
656         if datavs:
657             self._good_servers.add(server)
658         else:
659             self._empty_servers.add(server)
660
661         ds = []
662
663         for shnum,datav in datavs.items():
664             data = datav[0]
665             reader = MDMFSlotReadProxy(ss,
666                                        storage_index,
667                                        shnum,
668                                        data,
669                                        data_is_everything=(len(data) < readsize))
670
671             # our goal, with each response, is to validate the version
672             # information and share data as best we can at this point --
673             # we do this by validating the signature. To do this, we
674             # need to do the following:
675             #   - If we don't already have the public key, fetch the
676             #     public key. We use this to validate the signature.
677             if not self._node.get_pubkey():
678                 # fetch and set the public key.
679                 d = reader.get_verification_key()
680                 d.addCallback(lambda results, shnum=shnum:
681                               self._try_to_set_pubkey(results, server, shnum, lp))
682                 # XXX: Make self._pubkey_query_failed?
683                 d.addErrback(lambda error, shnum=shnum, data=data:
684                              self._got_corrupt_share(error, shnum, server, data, lp))
685             else:
686                 # we already have the public key.
687                 d = defer.succeed(None)
688
689             # Neither of these two branches return anything of
690             # consequence, so the first entry in our deferredlist will
691             # be None.
692
693             # - Next, we need the version information. We almost
694             #   certainly got this by reading the first thousand or so
695             #   bytes of the share on the storage server, so we
696             #   shouldn't need to fetch anything at this step.
697             d2 = reader.get_verinfo()
698             d2.addErrback(lambda error, shnum=shnum, data=data:
699                           self._got_corrupt_share(error, shnum, server, data, lp))
700             # - Next, we need the signature. For an SDMF share, it is
701             #   likely that we fetched this when doing our initial fetch
702             #   to get the version information. In MDMF, this lives at
703             #   the end of the share, so unless the file is quite small,
704             #   we'll need to do a remote fetch to get it.
705             d3 = reader.get_signature()
706             d3.addErrback(lambda error, shnum=shnum, data=data:
707                           self._got_corrupt_share(error, shnum, server, data, lp))
708             #  Once we have all three of these responses, we can move on
709             #  to validating the signature
710
711             # Does the node already have a privkey? If not, we'll try to
712             # fetch it here.
713             if self._need_privkey:
714                 d4 = reader.get_encprivkey()
715                 d4.addCallback(lambda results, shnum=shnum:
716                                self._try_to_validate_privkey(results, server, shnum, lp))
717                 d4.addErrback(lambda error, shnum=shnum:
718                               self._privkey_query_failed(error, server, shnum, lp))
719             else:
720                 d4 = defer.succeed(None)
721
722
723             if self.fetch_update_data:
724                 # fetch the block hash tree and first + last segment, as
725                 # configured earlier.
726                 # Then set them in wherever we happen to want to set
727                 # them.
728                 ds = []
729                 # XXX: We do this above, too. Is there a good way to
730                 # make the two routines share the value without
731                 # introducing more roundtrips?
732                 ds.append(reader.get_verinfo())
733                 ds.append(reader.get_blockhashes())
734                 ds.append(reader.get_block_and_salt(self.start_segment))
735                 ds.append(reader.get_block_and_salt(self.end_segment))
736                 d5 = deferredutil.gatherResults(ds)
737                 d5.addCallback(self._got_update_results_one_share, shnum)
738             else:
739                 d5 = defer.succeed(None)
740
741             dl = defer.DeferredList([d, d2, d3, d4, d5])
742             def _append_proxy(passthrough, shnum=shnum, reader=reader):
743                 # Store the proxy (with its cache) keyed by serverid and
744                 # version.
745                 _, (_,verinfo), _, _, _ = passthrough
746                 verinfo = self._make_verinfo_hashable(verinfo)
747                 self._servermap.proxies[(verinfo,
748                                          server.get_serverid(),
749                                          storage_index, shnum)] = reader
750                 return passthrough
751             dl.addCallback(_append_proxy)
752             dl.addBoth(self._turn_barrier)
753             dl.addCallback(lambda results, shnum=shnum:
754                            self._got_signature_one_share(results, shnum, server, lp))
755             dl.addErrback(lambda error, shnum=shnum, data=data:
756                           self._got_corrupt_share(error, shnum, server, data, lp))
757             ds.append(dl)
758         # dl is a deferred list that will fire when all of the shares
759         # that we found on this server are done processing. When dl fires,
760         # we know that processing is done, so we can decrement the
761         # semaphore-like thing that we incremented earlier.
762         dl = defer.DeferredList(ds, fireOnOneErrback=True)
763         # Are we done? Done means that there are no more queries to
764         # send, that there are no outstanding queries, and that we
765         # haven't received any queries that are still processing. If we
766         # are done, self._check_for_done will cause the done deferred
767         # that we returned to our caller to fire, which tells them that
768         # they have a complete servermap, and that we won't be touching
769         # the servermap anymore.
770         dl.addCallback(_done_processing)
771         dl.addCallback(self._check_for_done)
772         dl.addErrback(self._fatal_error)
773         # all done!
774         self.log("_got_results done", parent=lp, level=log.NOISY)
775         return dl
776
777
778     def _turn_barrier(self, result):
779         """
780         I help the servermap updater avoid the recursion limit issues
781         discussed in #237.
782         """
783         return fireEventually(result)
784
785
786     def _try_to_set_pubkey(self, pubkey_s, server, shnum, lp):
787         if self._node.get_pubkey():
788             return # don't go through this again if we don't have to
789         fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
790         assert len(fingerprint) == 32
791         if fingerprint != self._node.get_fingerprint():
792             raise CorruptShareError(server, shnum,
793                                     "pubkey doesn't match fingerprint")
794         self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
795         assert self._node.get_pubkey()
796
797
798     def notify_server_corruption(self, server, shnum, reason):
799         rref = server.get_rref()
800         rref.callRemoteOnly("advise_corrupt_share",
801                             "mutable", self._storage_index, shnum, reason)
802
803
804     def _got_signature_one_share(self, results, shnum, server, lp):
805         # It is our job to give versioninfo to our caller. We need to
806         # raise CorruptShareError if the share is corrupt for any
807         # reason, something that our caller will handle.
808         self.log(format="_got_results: got shnum #%(shnum)d from serverid %(name)s",
809                  shnum=shnum,
810                  name=server.get_name(),
811                  level=log.NOISY,
812                  parent=lp)
813         if not self._running:
814             # We can't process the results, since we can't touch the
815             # servermap anymore.
816             self.log("but we're not running anymore.")
817             return None
818
819         _, verinfo, signature, __, ___ = results
820         verinfo = self._make_verinfo_hashable(verinfo[1])
821
822         # This tuple uniquely identifies a share on the grid; we use it
823         # to keep track of the ones that we've already seen.
824         (seqnum,
825          root_hash,
826          saltish,
827          segsize,
828          datalen,
829          k,
830          n,
831          prefix,
832          offsets_tuple) = verinfo
833
834
835         if verinfo not in self._valid_versions:
836             # This is a new version tuple, and we need to validate it
837             # against the public key before keeping track of it.
838             assert self._node.get_pubkey()
839             valid = self._node.get_pubkey().verify(prefix, signature[1])
840             if not valid:
841                 raise CorruptShareError(server, shnum,
842                                         "signature is invalid")
843
844         # ok, it's a valid verinfo. Add it to the list of validated
845         # versions.
846         self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
847                  % (seqnum, base32.b2a(root_hash)[:4],
848                     server.get_name(), shnum,
849                     k, n, segsize, datalen),
850                     parent=lp)
851         self._valid_versions.add(verinfo)
852         # We now know that this is a valid candidate verinfo. Whether or
853         # not this instance of it is valid is a matter for the next
854         # statement; at this point, we just know that if we see this
855         # version info again, that its signature checks out and that
856         # we're okay to skip the signature-checking step.
857
858         # (server, shnum) are bound in the method invocation.
859         if (server, shnum) in self._servermap.get_bad_shares():
860             # we've been told that the rest of the data in this share is
861             # unusable, so don't add it to the servermap.
862             self.log("but we've been told this is a bad share",
863                      parent=lp, level=log.UNUSUAL)
864             return verinfo
865
866         # Add the info to our servermap.
867         timestamp = time.time()
868         self._servermap.add_new_share(server, shnum, verinfo, timestamp)
869         self._servers_with_shares.add(server)
870
871         return verinfo
872
873     def _make_verinfo_hashable(self, verinfo):
874         (seqnum,
875          root_hash,
876          saltish,
877          segsize,
878          datalen,
879          k,
880          n,
881          prefix,
882          offsets) = verinfo
883
884         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
885
886         verinfo = (seqnum,
887                    root_hash,
888                    saltish,
889                    segsize,
890                    datalen,
891                    k,
892                    n,
893                    prefix,
894                    offsets_tuple)
895         return verinfo
896
897     def _got_update_results_one_share(self, results, share):
898         """
899         I record the update results in results.
900         """
901         assert len(results) == 4
902         verinfo, blockhashes, start, end = results
903         verinfo = self._make_verinfo_hashable(verinfo)
904         update_data = (blockhashes, start, end)
905         self._servermap.set_update_data_for_share_and_verinfo(share,
906                                                               verinfo,
907                                                               update_data)
908
909
910     def _deserialize_pubkey(self, pubkey_s):
911         verifier = rsa.create_verifying_key_from_string(pubkey_s)
912         return verifier
913
914
915     def _try_to_validate_privkey(self, enc_privkey, server, shnum, lp):
916         """
917         Given a writekey from a remote server, I validate it against the
918         writekey stored in my node. If it is valid, then I set the
919         privkey and encprivkey properties of the node.
920         """
921         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
922         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
923         if alleged_writekey != self._node.get_writekey():
924             self.log("invalid privkey from %s shnum %d" %
925                      (server.get_name(), shnum),
926                      parent=lp, level=log.WEIRD, umid="aJVccw")
927             return
928
929         # it's good
930         self.log("got valid privkey from shnum %d on serverid %s" %
931                  (shnum, server.get_name()),
932                  parent=lp)
933         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
934         self._node._populate_encprivkey(enc_privkey)
935         self._node._populate_privkey(privkey)
936         self._need_privkey = False
937         self._status.set_privkey_from(server)
938
939
940     def _add_lease_failed(self, f, server, storage_index):
941         # Older versions of Tahoe didn't handle the add-lease message very
942         # well: <=1.1.0 throws a NameError because it doesn't implement
943         # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
944         # (which is most of them, since we send add-lease to everybody,
945         # before we know whether or not they have any shares for us), and
946         # 1.2.0 throws KeyError even on known buckets due to an internal bug
947         # in the latency-measuring code.
948
949         # we want to ignore the known-harmless errors and log the others. In
950         # particular we want to log any local errors caused by coding
951         # problems.
952
953         if f.check(DeadReferenceError):
954             return
955         if f.check(RemoteException):
956             if f.value.failure.check(KeyError, IndexError, NameError):
957                 # this may ignore a bit too much, but that only hurts us
958                 # during debugging
959                 return
960             self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
961                      name=server.get_name(),
962                      f_value=str(f.value),
963                      failure=f,
964                      level=log.WEIRD, umid="iqg3mw")
965             return
966         # local errors are cause for alarm
967         log.err(f,
968                 format="local error in add_lease to [%(name)s]: %(f_value)s",
969                 name=server.get_name(),
970                 f_value=str(f.value),
971                 level=log.WEIRD, umid="ZWh6HA")
972
973     def _query_failed(self, f, server):
974         if not self._running:
975             return
976         level = log.WEIRD
977         if f.check(DeadReferenceError):
978             level = log.UNUSUAL
979         self.log(format="error during query: %(f_value)s",
980                  f_value=str(f.value), failure=f,
981                  level=level, umid="IHXuQg")
982         self._must_query.discard(server)
983         self._queries_outstanding.discard(server)
984         self._bad_servers.add(server)
985         self._servermap.add_problem(f)
986         # a server could be in both ServerMap.reachable_servers and
987         # .unreachable_servers if they responded to our query, but then an
988         # exception was raised in _got_results.
989         self._servermap.mark_server_unreachable(server)
990         self._queries_completed += 1
991         self._last_failure = f
992
993
994     def _privkey_query_failed(self, f, server, shnum, lp):
995         self._queries_outstanding.discard(server)
996         if not self._running:
997             return
998         level = log.WEIRD
999         if f.check(DeadReferenceError):
1000             level = log.UNUSUAL
1001         self.log(format="error during privkey query: %(f_value)s",
1002                  f_value=str(f.value), failure=f,
1003                  parent=lp, level=level, umid="McoJ5w")
1004         self._servermap.add_problem(f)
1005         self._last_failure = f
1006
1007
1008     def _check_for_done(self, res):
1009         # exit paths:
1010         #  return self._send_more_queries(outstanding) : send some more queries
1011         #  return self._done() : all done
1012         #  return : keep waiting, no new queries
1013         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
1014                               "%(outstanding)d queries outstanding, "
1015                               "%(extra)d extra servers available, "
1016                               "%(must)d 'must query' servers left, "
1017                               "need_privkey=%(need_privkey)s"
1018                               ),
1019                       mode=self.mode,
1020                       outstanding=len(self._queries_outstanding),
1021                       extra=len(self.extra_servers),
1022                       must=len(self._must_query),
1023                       need_privkey=self._need_privkey,
1024                       level=log.NOISY,
1025                       )
1026
1027         if not self._running:
1028             self.log("but we're not running", parent=lp, level=log.NOISY)
1029             return
1030
1031         if self._must_query:
1032             # we are still waiting for responses from servers that used to have
1033             # a share, so we must continue to wait. No additional queries are
1034             # required at this time.
1035             self.log("%d 'must query' servers left" % len(self._must_query),
1036                      level=log.NOISY, parent=lp)
1037             return
1038
1039         if (not self._queries_outstanding and not self.extra_servers):
1040             # all queries have retired, and we have no servers left to ask. No
1041             # more progress can be made, therefore we are done.
1042             self.log("all queries are retired, no extra servers: done",
1043                      parent=lp)
1044             return self._done()
1045
1046         recoverable_versions = self._servermap.recoverable_versions()
1047         unrecoverable_versions = self._servermap.unrecoverable_versions()
1048
1049         # what is our completion policy? how hard should we work?
1050
1051         if self.mode == MODE_ANYTHING:
1052             if recoverable_versions:
1053                 self.log("%d recoverable versions: done"
1054                          % len(recoverable_versions),
1055                          parent=lp)
1056                 return self._done()
1057
1058         if self.mode in (MODE_CHECK, MODE_REPAIR):
1059             # we used self._must_query, and we know there aren't any
1060             # responses still waiting, so that means we must be done
1061             self.log("done", parent=lp)
1062             return self._done()
1063
1064         MAX_IN_FLIGHT = 5
1065         if self.mode == MODE_READ:
1066             # if we've queried k+epsilon servers, and we see a recoverable
1067             # version, and we haven't seen any unrecoverable higher-seqnum'ed
1068             # versions, then we're done.
1069
1070             if self._queries_completed < self.num_servers_to_query:
1071                 self.log(format="%(completed)d completed, %(query)d to query: need more",
1072                          completed=self._queries_completed,
1073                          query=self.num_servers_to_query,
1074                          level=log.NOISY, parent=lp)
1075                 return self._send_more_queries(MAX_IN_FLIGHT)
1076             if not recoverable_versions:
1077                 self.log("no recoverable versions: need more",
1078                          level=log.NOISY, parent=lp)
1079                 return self._send_more_queries(MAX_IN_FLIGHT)
1080             highest_recoverable = max(recoverable_versions)
1081             highest_recoverable_seqnum = highest_recoverable[0]
1082             for unrec_verinfo in unrecoverable_versions:
1083                 if unrec_verinfo[0] > highest_recoverable_seqnum:
1084                     # there is evidence of a higher-seqnum version, but we
1085                     # don't yet see enough shares to recover it. Try harder.
1086                     # TODO: consider sending more queries.
1087                     # TODO: consider limiting the search distance
1088                     self.log("evidence of higher seqnum: need more",
1089                              level=log.UNUSUAL, parent=lp)
1090                     return self._send_more_queries(MAX_IN_FLIGHT)
1091             # all the unrecoverable versions were old or concurrent with a
1092             # recoverable version. Good enough.
1093             self.log("no higher-seqnum: done", parent=lp)
1094             return self._done()
1095
1096         if self.mode == MODE_WRITE:
1097             # we want to keep querying until we've seen a few that don't have
1098             # any shares, to be sufficiently confident that we've seen all
1099             # the shares. This is still less work than MODE_CHECK, which asks
1100             # every server in the world.
1101
1102             if not recoverable_versions:
1103                 self.log("no recoverable versions: need more", parent=lp,
1104                          level=log.NOISY)
1105                 return self._send_more_queries(MAX_IN_FLIGHT)
1106
1107             last_found = -1
1108             last_not_responded = -1
1109             num_not_responded = 0
1110             num_not_found = 0
1111             states = []
1112             found_boundary = False
1113
1114             for i,server in enumerate(self.full_serverlist):
1115                 if server in self._bad_servers:
1116                     # query failed
1117                     states.append("x")
1118                     #self.log("loop [%s]: x" % server.get_name()
1119                 elif server in self._empty_servers:
1120                     # no shares
1121                     states.append("0")
1122                     #self.log("loop [%s]: 0" % server.get_name()
1123                     if last_found != -1:
1124                         num_not_found += 1
1125                         if num_not_found >= self.EPSILON:
1126                             self.log("found our boundary, %s" %
1127                                      "".join(states),
1128                                      parent=lp, level=log.NOISY)
1129                             found_boundary = True
1130                             break
1131
1132                 elif server in self._servers_with_shares:
1133                     # yes shares
1134                     states.append("1")
1135                     #self.log("loop [%s]: 1" % server.get_name()
1136                     last_found = i
1137                     num_not_found = 0
1138                 else:
1139                     # not responded yet
1140                     states.append("?")
1141                     #self.log("loop [%s]: ?" % server.get_name()
1142                     last_not_responded = i
1143                     num_not_responded += 1
1144
1145             if found_boundary:
1146                 # we need to know that we've gotten answers from
1147                 # everybody to the left of here
1148                 if last_not_responded == -1:
1149                     # we're done
1150                     self.log("have all our answers",
1151                              parent=lp, level=log.NOISY)
1152                     # .. unless we're still waiting on the privkey
1153                     if self._need_privkey:
1154                         self.log("but we're still waiting for the privkey",
1155                                  parent=lp, level=log.NOISY)
1156                         # if we found the boundary but we haven't yet found
1157                         # the privkey, we may need to look further. If
1158                         # somehow all the privkeys were corrupted (but the
1159                         # shares were readable), then this is likely to do an
1160                         # exhaustive search.
1161                         return self._send_more_queries(MAX_IN_FLIGHT)
1162                     return self._done()
1163                 # still waiting for somebody
1164                 return self._send_more_queries(num_not_responded)
1165
1166             # if we hit here, we didn't find our boundary, so we're still
1167             # waiting for servers
1168             self.log("no boundary yet, %s" % "".join(states), parent=lp,
1169                      level=log.NOISY)
1170             return self._send_more_queries(MAX_IN_FLIGHT)
1171
1172         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
1173         # arbitrary, really I want this to be something like k -
1174         # max(known_version_sharecounts) + some extra
1175         self.log("catchall: need more", parent=lp, level=log.NOISY)
1176         return self._send_more_queries(MAX_IN_FLIGHT)
1177
1178     def _send_more_queries(self, num_outstanding):
1179         more_queries = []
1180
1181         while True:
1182             self.log(format=" there are %(outstanding)d queries outstanding",
1183                      outstanding=len(self._queries_outstanding),
1184                      level=log.NOISY)
1185             active_queries = len(self._queries_outstanding) + len(more_queries)
1186             if active_queries >= num_outstanding:
1187                 break
1188             if not self.extra_servers:
1189                 break
1190             more_queries.append(self.extra_servers.pop(0))
1191
1192         self.log(format="sending %(more)d more queries: %(who)s",
1193                  more=len(more_queries),
1194                  who=" ".join(["[%s]" % s.get_name() for s in more_queries]),
1195                  level=log.NOISY)
1196
1197         for server in more_queries:
1198             self._do_query(server, self._storage_index, self._read_size)
1199             # we'll retrigger when those queries come back
1200
1201     def _done(self):
1202         if not self._running:
1203             self.log("not running; we're already done")
1204             return
1205         self._running = False
1206         now = time.time()
1207         elapsed = now - self._started
1208         self._status.set_finished(now)
1209         self._status.timings["total"] = elapsed
1210         self._status.set_progress(1.0)
1211         self._status.set_status("Finished")
1212         self._status.set_active(False)
1213
1214         self._servermap.set_last_update(self.mode, self._started)
1215         # the servermap will not be touched after this
1216         self.log("servermap: %s" % self._servermap.summarize_versions())
1217
1218         eventually(self._done_deferred.callback, self._servermap)
1219
1220     def _fatal_error(self, f):
1221         self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1222         self._done_deferred.errback(f)
1223
1224