]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
Remove ResponseCache in favor of MDMFSlotReadProxy's cache. closes #1240.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually, \
8                          fireEventually
9 from allmydata.util import base32, hashutil, log, deferredutil
10 from allmydata.util.dictutil import DictOfSets
11 from allmydata.storage.server import si_b2a
12 from allmydata.interfaces import IServermapUpdaterStatus
13 from pycryptopp.publickey import rsa
14
15 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \
16      MODE_READ, MODE_REPAIR, CorruptShareError
17 from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
18
19 class UpdateStatus:
20     implements(IServermapUpdaterStatus)
21     statusid_counter = count(0)
22     def __init__(self):
23         self.timings = {}
24         self.timings["per_server"] = {}
25         self.timings["cumulative_verify"] = 0.0
26         self.privkey_from = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.mode = "?"
31         self.status = "Not started"
32         self.progress = 0.0
33         self.counter = self.statusid_counter.next()
34         self.started = time.time()
35         self.finished = None
36
37     def add_per_server_time(self, server, op, sent, elapsed):
38         assert op in ("query", "late", "privkey")
39         if server not in self.timings["per_server"]:
40             self.timings["per_server"][server] = []
41         self.timings["per_server"][server].append((op,sent,elapsed))
42
43     def get_started(self):
44         return self.started
45     def get_finished(self):
46         return self.finished
47     def get_storage_index(self):
48         return self.storage_index
49     def get_mode(self):
50         return self.mode
51     def get_servermap(self):
52         return self.servermap
53     def get_privkey_from(self):
54         return self.privkey_from
55     def using_helper(self):
56         return False
57     def get_size(self):
58         return "-NA-"
59     def get_status(self):
60         return self.status
61     def get_progress(self):
62         return self.progress
63     def get_active(self):
64         return self.active
65     def get_counter(self):
66         return self.counter
67
68     def set_storage_index(self, si):
69         self.storage_index = si
70     def set_mode(self, mode):
71         self.mode = mode
72     def set_privkey_from(self, server):
73         self.privkey_from = server
74     def set_status(self, status):
75         self.status = status
76     def set_progress(self, value):
77         self.progress = value
78     def set_active(self, value):
79         self.active = value
80     def set_finished(self, when):
81         self.finished = when
82
83 class ServerMap:
84     """I record the placement of mutable shares.
85
86     This object records which shares (of various versions) are located on
87     which servers.
88
89     One purpose I serve is to inform callers about which versions of the
90     mutable file are recoverable and 'current'.
91
92     A second purpose is to serve as a state marker for test-and-set
93     operations. I am passed out of retrieval operations and back into publish
94     operations, which means 'publish this new version, but only if nothing
95     has changed since I last retrieved this data'. This reduces the chances
96     of clobbering a simultaneous (uncoordinated) write.
97
98     @var _known_shares: a dictionary, mapping a (server, shnum) tuple to a
99                         (versionid, timestamp) tuple. Each 'versionid' is a
100                         tuple of (seqnum, root_hash, IV, segsize, datalength,
101                         k, N, signed_prefix, offsets)
102
103     @ivar _bad_shares: dict with keys of (server, shnum) tuples, describing
104                        shares that I should ignore (because a previous user
105                        of the servermap determined that they were invalid).
106                        The updater only locates a certain number of shares:
107                        if some of these turn out to have integrity problems
108                        and are unusable, the caller will need to mark those
109                        shares as bad, then re-update the servermap, then try
110                        again. The dict maps (server, shnum) tuple to old
111                        checkstring.
112     """
113
114     def __init__(self):
115         self._known_shares = {}
116         self.unreachable_servers = set() # servers that didn't respond to queries
117         self.reachable_servers = set() # servers that did respond to queries
118         self._problems = [] # mostly for debugging
119         self._bad_shares = {} # maps (server,shnum) to old checkstring
120         self._last_update_mode = None
121         self._last_update_time = 0
122         self.proxies = {}
123         self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
124         # where blockhashes is a list of bytestrings (the result of
125         # layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
126         # (block,salt) tuple-of-bytestrings from get_block_and_salt()
127
128     def copy(self):
129         s = ServerMap()
130         s._known_shares = self._known_shares.copy() # tuple->tuple
131         s.unreachable_servers = set(self.unreachable_servers)
132         s.reachable_servers = set(self.reachable_servers)
133         s._problems = self._problems[:]
134         s._bad_shares = self._bad_shares.copy() # tuple->str
135         s._last_update_mode = self._last_update_mode
136         s._last_update_time = self._last_update_time
137         return s
138
139     def get_reachable_servers(self):
140         return self.reachable_servers
141
142     def mark_server_reachable(self, server):
143         self.reachable_servers.add(server)
144
145     def mark_server_unreachable(self, server):
146         self.unreachable_servers.add(server)
147
148     def mark_bad_share(self, server, shnum, checkstring):
149         """This share was found to be bad, either in the checkstring or
150         signature (detected during mapupdate), or deeper in the share
151         (detected at retrieve time). Remove it from our list of useful
152         shares, and remember that it is bad so we don't add it back again
153         later. We record the share's old checkstring (which might be
154         corrupted or badly signed) so that a repair operation can do the
155         test-and-set using it as a reference.
156         """
157         key = (server, shnum) # record checkstring
158         self._bad_shares[key] = checkstring
159         self._known_shares.pop(key, None)
160
161     def get_bad_shares(self):
162         # key=(server,shnum) -> checkstring
163         return self._bad_shares
164
165     def add_new_share(self, server, shnum, verinfo, timestamp):
166         """We've written a new share out, replacing any that was there
167         before."""
168         key = (server, shnum)
169         self._bad_shares.pop(key, None)
170         self._known_shares[key] = (verinfo, timestamp)
171
172     def add_problem(self, f):
173         self._problems.append(f)
174     def get_problems(self):
175         return self._problems
176
177     def set_last_update(self, mode, when):
178         self._last_update_mode = mode
179         self._last_update_time = when
180     def get_last_update(self):
181         return (self._last_update_mode, self._last_update_time)
182
183     def dump(self, out=sys.stdout):
184         print >>out, "servermap:"
185
186         for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
187             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
188              offsets_tuple) = verinfo
189             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
190                           (server.get_name(), shnum,
191                            seqnum, base32.b2a(root_hash)[:4], k, N,
192                            datalength))
193         if self._problems:
194             print >>out, "%d PROBLEMS" % len(self._problems)
195             for f in self._problems:
196                 print >>out, str(f)
197         return out
198
199     def all_servers(self):
200         return set([server for (server, shnum) in self._known_shares])
201
202     def all_servers_for_version(self, verinfo):
203         """Return a set of servers that hold shares for the given version."""
204         return set([server
205                     for ( (server, shnum), (verinfo2, timestamp) )
206                     in self._known_shares.items()
207                     if verinfo == verinfo2])
208
209     def get_known_shares(self):
210         # maps (server,shnum) to (versionid,timestamp)
211         return self._known_shares
212
213     def make_sharemap(self):
214         """Return a dict that maps shnum to a set of servers that hold it."""
215         sharemap = DictOfSets()
216         for (server, shnum) in self._known_shares:
217             sharemap.add(shnum, server)
218         return sharemap
219
220     def make_versionmap(self):
221         """Return a dict that maps versionid to sets of (shnum, server,
222         timestamp) tuples."""
223         versionmap = DictOfSets()
224         for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
225             versionmap.add(verinfo, (shnum, server, timestamp))
226         return versionmap
227
228     def debug_shares_on_server(self, server): # used by tests
229         return set([shnum for (s, shnum) in self._known_shares if s == server])
230
231     def version_on_server(self, server, shnum):
232         key = (server, shnum)
233         if key in self._known_shares:
234             (verinfo, timestamp) = self._known_shares[key]
235             return verinfo
236         return None
237
238     def shares_available(self):
239         """Return a dict that maps verinfo to tuples of
240         (num_distinct_shares, k, N) tuples."""
241         versionmap = self.make_versionmap()
242         all_shares = {}
243         for verinfo, shares in versionmap.items():
244             s = set()
245             for (shnum, server, timestamp) in shares:
246                 s.add(shnum)
247             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
248              offsets_tuple) = verinfo
249             all_shares[verinfo] = (len(s), k, N)
250         return all_shares
251
252     def highest_seqnum(self):
253         available = self.shares_available()
254         seqnums = [verinfo[0]
255                    for verinfo in available.keys()]
256         seqnums.append(0)
257         return max(seqnums)
258
259     def summarize_version(self, verinfo):
260         """Take a versionid, return a string that describes it."""
261         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
262          offsets_tuple) = verinfo
263         return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
264
265     def summarize_versions(self):
266         """Return a string describing which versions we know about."""
267         versionmap = self.make_versionmap()
268         bits = []
269         for (verinfo, shares) in versionmap.items():
270             vstr = self.summarize_version(verinfo)
271             shnums = set([shnum for (shnum, server, timestamp) in shares])
272             bits.append("%d*%s" % (len(shnums), vstr))
273         return "/".join(bits)
274
275     def recoverable_versions(self):
276         """Return a set of versionids, one for each version that is currently
277         recoverable."""
278         versionmap = self.make_versionmap()
279         recoverable_versions = set()
280         for (verinfo, shares) in versionmap.items():
281             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
282              offsets_tuple) = verinfo
283             shnums = set([shnum for (shnum, server, timestamp) in shares])
284             if len(shnums) >= k:
285                 # this one is recoverable
286                 recoverable_versions.add(verinfo)
287
288         return recoverable_versions
289
290     def unrecoverable_versions(self):
291         """Return a set of versionids, one for each version that is currently
292         unrecoverable."""
293         versionmap = self.make_versionmap()
294
295         unrecoverable_versions = set()
296         for (verinfo, shares) in versionmap.items():
297             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
298              offsets_tuple) = verinfo
299             shnums = set([shnum for (shnum, server, timestamp) in shares])
300             if len(shnums) < k:
301                 unrecoverable_versions.add(verinfo)
302
303         return unrecoverable_versions
304
305     def best_recoverable_version(self):
306         """Return a single versionid, for the so-called 'best' recoverable
307         version. Sequence number is the primary sort criteria, followed by
308         root hash. Returns None if there are no recoverable versions."""
309         recoverable = list(self.recoverable_versions())
310         recoverable.sort()
311         if recoverable:
312             return recoverable[-1]
313         return None
314
315     def size_of_version(self, verinfo):
316         """Given a versionid (perhaps returned by best_recoverable_version),
317         return the size of the file in bytes."""
318         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
319          offsets_tuple) = verinfo
320         return datalength
321
322     def unrecoverable_newer_versions(self):
323         # Return a dict of versionid -> health, for versions that are
324         # unrecoverable and have later seqnums than any recoverable versions.
325         # These indicate that a write will lose data.
326         versionmap = self.make_versionmap()
327         healths = {} # maps verinfo to (found,k)
328         unrecoverable = set()
329         highest_recoverable_seqnum = -1
330         for (verinfo, shares) in versionmap.items():
331             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
332              offsets_tuple) = verinfo
333             shnums = set([shnum for (shnum, server, timestamp) in shares])
334             healths[verinfo] = (len(shnums),k)
335             if len(shnums) < k:
336                 unrecoverable.add(verinfo)
337             else:
338                 highest_recoverable_seqnum = max(seqnum,
339                                                  highest_recoverable_seqnum)
340
341         newversions = {}
342         for verinfo in unrecoverable:
343             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
344              offsets_tuple) = verinfo
345             if seqnum > highest_recoverable_seqnum:
346                 newversions[verinfo] = healths[verinfo]
347
348         return newversions
349
350
351     def needs_merge(self):
352         # return True if there are multiple recoverable versions with the
353         # same seqnum, meaning that MutableFileNode.read_best_version is not
354         # giving you the whole story, and that using its data to do a
355         # subsequent publish will lose information.
356         recoverable_seqnums = [verinfo[0]
357                                for verinfo in self.recoverable_versions()]
358         for seqnum in recoverable_seqnums:
359             if recoverable_seqnums.count(seqnum) > 1:
360                 return True
361         return False
362
363
364     def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
365         """
366         I return the update data for the given shnum
367         """
368         update_data = self.update_data[shnum]
369         update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
370         return update_datum
371
372
373     def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
374         """
375         I record the block hash tree for the given shnum.
376         """
377         self.update_data.setdefault(shnum , []).append((verinfo, data))
378
379
380 class ServermapUpdater:
381     def __init__(self, filenode, storage_broker, monitor, servermap,
382                  mode=MODE_READ, add_lease=False, update_range=None):
383         """I update a servermap, locating a sufficient number of useful
384         shares and remembering where they are located.
385
386         """
387
388         self._node = filenode
389         self._storage_broker = storage_broker
390         self._monitor = monitor
391         self._servermap = servermap
392         self.mode = mode
393         self._add_lease = add_lease
394         self._running = True
395
396         self._storage_index = filenode.get_storage_index()
397         self._last_failure = None
398
399         self._status = UpdateStatus()
400         self._status.set_storage_index(self._storage_index)
401         self._status.set_progress(0.0)
402         self._status.set_mode(mode)
403
404         self._servers_responded = set()
405
406         # how much data should we read?
407         # SDMF:
408         #  * if we only need the checkstring, then [0:75]
409         #  * if we need to validate the checkstring sig, then [543ish:799ish]
410         #  * if we need the verification key, then [107:436ish]
411         #   * the offset table at [75:107] tells us about the 'ish'
412         #  * if we need the encrypted private key, we want [-1216ish:]
413         #   * but we can't read from negative offsets
414         #   * the offset table tells us the 'ish', also the positive offset
415         # MDMF:
416         #  * Checkstring? [0:72]
417         #  * If we want to validate the checkstring, then [0:72], [143:?] --
418         #    the offset table will tell us for sure.
419         #  * If we need the verification key, we have to consult the offset
420         #    table as well.
421         # At this point, we don't know which we are. Our filenode can
422         # tell us, but it might be lying -- in some cases, we're
423         # responsible for telling it which kind of file it is.
424         self._read_size = 4000
425         if mode == MODE_CHECK:
426             # we use unpack_prefix_and_signature, so we need 1k
427             self._read_size = 1000
428         self._need_privkey = False
429
430         if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey():
431             self._need_privkey = True
432         # check+repair: repair requires the privkey, so if we didn't happen
433         # to ask for it during the check, we'll have problems doing the
434         # publish.
435
436         self.fetch_update_data = False
437         if mode == MODE_WRITE and update_range:
438             # We're updating the servermap in preparation for an
439             # in-place file update, so we need to fetch some additional
440             # data from each share that we find.
441             assert len(update_range) == 2
442
443             self.start_segment = update_range[0]
444             self.end_segment = update_range[1]
445             self.fetch_update_data = True
446
447         prefix = si_b2a(self._storage_index)[:5]
448         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
449                                    si=prefix, mode=mode)
450
451     def get_status(self):
452         return self._status
453
454     def log(self, *args, **kwargs):
455         if "parent" not in kwargs:
456             kwargs["parent"] = self._log_number
457         if "facility" not in kwargs:
458             kwargs["facility"] = "tahoe.mutable.mapupdate"
459         return log.msg(*args, **kwargs)
460
461     def update(self):
462         """Update the servermap to reflect current conditions. Returns a
463         Deferred that fires with the servermap once the update has finished."""
464         self._started = time.time()
465         self._status.set_active(True)
466
467         # self._valid_versions is a set of validated verinfo tuples. We just
468         # use it to remember which versions had valid signatures, so we can
469         # avoid re-checking the signatures for each share.
470         self._valid_versions = set()
471
472         self._done_deferred = defer.Deferred()
473
474         # first, which servers should be talk to? Any that were in our old
475         # servermap, plus "enough" others.
476
477         self._queries_completed = 0
478
479         sb = self._storage_broker
480         # All of the servers, permuted by the storage index, as usual.
481         full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
482         self.full_serverlist = full_serverlist # for use later, immutable
483         self.extra_servers = full_serverlist[:] # servers are removed as we use them
484         self._good_servers = set() # servers who had some shares
485         self._empty_servers = set() # servers who don't have any shares
486         self._bad_servers = set() # servers to whom our queries failed
487
488         k = self._node.get_required_shares()
489         # For what cases can these conditions work?
490         if k is None:
491             # make a guess
492             k = 3
493         N = self._node.get_total_shares()
494         if N is None:
495             N = 10
496         self.EPSILON = k
497         # we want to send queries to at least this many servers (although we
498         # might not wait for all of their answers to come back)
499         self.num_servers_to_query = k + self.EPSILON
500
501         if self.mode in (MODE_CHECK, MODE_REPAIR):
502             # We want to query all of the servers.
503             initial_servers_to_query = list(full_serverlist)
504             must_query = set(initial_servers_to_query)
505             self.extra_servers = []
506         elif self.mode == MODE_WRITE:
507             # we're planning to replace all the shares, so we want a good
508             # chance of finding them all. We will keep searching until we've
509             # seen epsilon that don't have a share.
510             # We don't query all of the servers because that could take a while.
511             self.num_servers_to_query = N + self.EPSILON
512             initial_servers_to_query, must_query = self._build_initial_querylist()
513             self.required_num_empty_servers = self.EPSILON
514
515             # TODO: arrange to read lots of data from k-ish servers, to avoid
516             # the extra round trip required to read large directories. This
517             # might also avoid the round trip required to read the encrypted
518             # private key.
519
520         else: # MODE_READ, MODE_ANYTHING
521             # 2*k servers is good enough.
522             initial_servers_to_query, must_query = self._build_initial_querylist()
523
524         # this is a set of servers that we are required to get responses
525         # from: they are servers who used to have a share, so we need to know
526         # where they currently stand, even if that means we have to wait for
527         # a silently-lost TCP connection to time out. We remove servers from
528         # this set as we get responses.
529         self._must_query = set(must_query)
530
531         # now initial_servers_to_query contains the servers that we should
532         # ask, self.must_query contains the servers that we must have heard
533         # from before we can consider ourselves finished, and
534         # self.extra_servers contains the overflow (servers that we should
535         # tap if we don't get enough responses)
536         # I guess that self._must_query is a subset of
537         # initial_servers_to_query?
538         assert must_query.issubset(initial_servers_to_query)
539
540         self._send_initial_requests(initial_servers_to_query)
541         self._status.timings["initial_queries"] = time.time() - self._started
542         return self._done_deferred
543
544     def _build_initial_querylist(self):
545         # we send queries to everyone who was already in the sharemap
546         initial_servers_to_query = set(self._servermap.all_servers())
547         # and we must wait for responses from them
548         must_query = set(initial_servers_to_query)
549
550         while ((self.num_servers_to_query > len(initial_servers_to_query))
551                and self.extra_servers):
552             initial_servers_to_query.add(self.extra_servers.pop(0))
553
554         return initial_servers_to_query, must_query
555
556     def _send_initial_requests(self, serverlist):
557         self._status.set_status("Sending %d initial queries" % len(serverlist))
558         self._queries_outstanding = set()
559         for server in serverlist:
560             self._queries_outstanding.add(server)
561             self._do_query(server, self._storage_index, self._read_size)
562
563         if not serverlist:
564             # there is nobody to ask, so we need to short-circuit the state
565             # machine.
566             d = defer.maybeDeferred(self._check_for_done, None)
567             d.addErrback(self._fatal_error)
568
569         # control flow beyond this point: state machine. Receiving responses
570         # from queries is the input. We might send out more queries, or we
571         # might produce a result.
572         return None
573
574     def _do_query(self, server, storage_index, readsize):
575         self.log(format="sending query to [%(name)s], readsize=%(readsize)d",
576                  name=server.get_name(),
577                  readsize=readsize,
578                  level=log.NOISY)
579         started = time.time()
580         self._queries_outstanding.add(server)
581         d = self._do_read(server, storage_index, [], [(0, readsize)])
582         d.addCallback(self._got_results, server, readsize, storage_index,
583                       started)
584         d.addErrback(self._query_failed, server)
585         # errors that aren't handled by _query_failed (and errors caused by
586         # _query_failed) get logged, but we still want to check for doneness.
587         d.addErrback(log.err)
588         d.addErrback(self._fatal_error)
589         d.addCallback(self._check_for_done)
590         return d
591
592     def _do_read(self, server, storage_index, shnums, readv):
593         ss = server.get_rref()
594         if self._add_lease:
595             # send an add-lease message in parallel. The results are handled
596             # separately. This is sent before the slot_readv() so that we can
597             # be sure the add_lease is retired by the time slot_readv comes
598             # back (this relies upon our knowledge that the server code for
599             # add_lease is synchronous).
600             renew_secret = self._node.get_renewal_secret(server)
601             cancel_secret = self._node.get_cancel_secret(server)
602             d2 = ss.callRemote("add_lease", storage_index,
603                                renew_secret, cancel_secret)
604             # we ignore success
605             d2.addErrback(self._add_lease_failed, server, storage_index)
606         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
607         return d
608
609
610     def _got_corrupt_share(self, e, shnum, server, data, lp):
611         """
612         I am called when a remote server returns a corrupt share in
613         response to one of our queries. By corrupt, I mean a share
614         without a valid signature. I then record the failure, notify the
615         server of the corruption, and record the share as bad.
616         """
617         f = failure.Failure(e)
618         self.log(format="bad share: %(f_value)s", f_value=str(f),
619                  failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
620         # Notify the server that its share is corrupt.
621         self.notify_server_corruption(server, shnum, str(e))
622         # By flagging this as a bad server, we won't count any of
623         # the other shares on that server as valid, though if we
624         # happen to find a valid version string amongst those
625         # shares, we'll keep track of it so that we don't need
626         # to validate the signature on those again.
627         self._bad_servers.add(server)
628         self._last_failure = f
629         # XXX: Use the reader for this?
630         checkstring = data[:SIGNED_PREFIX_LENGTH]
631         self._servermap.mark_bad_share(server, shnum, checkstring)
632         self._servermap.add_problem(f)
633
634
635     def _got_results(self, datavs, server, readsize, storage_index, started):
636         lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
637                       name=server.get_name(),
638                       numshares=len(datavs))
639         ss = server.get_rref()
640         now = time.time()
641         elapsed = now - started
642         def _done_processing(ignored=None):
643             self._queries_outstanding.discard(server)
644             self._servermap.mark_server_reachable(server)
645             self._must_query.discard(server)
646             self._queries_completed += 1
647         if not self._running:
648             self.log("but we're not running, so we'll ignore it", parent=lp)
649             _done_processing()
650             self._status.add_per_server_time(server, "late", started, elapsed)
651             return
652         self._status.add_per_server_time(server, "query", started, elapsed)
653
654         if datavs:
655             self._good_servers.add(server)
656         else:
657             self._empty_servers.add(server)
658
659         ds = []
660
661         for shnum,datav in datavs.items():
662             data = datav[0]
663             reader = MDMFSlotReadProxy(ss,
664                                        storage_index,
665                                        shnum,
666                                        data,
667                                        data_is_everything=(len(data) < readsize))
668
669             # our goal, with each response, is to validate the version
670             # information and share data as best we can at this point --
671             # we do this by validating the signature. To do this, we
672             # need to do the following:
673             #   - If we don't already have the public key, fetch the
674             #     public key. We use this to validate the signature.
675             if not self._node.get_pubkey():
676                 # fetch and set the public key.
677                 d = reader.get_verification_key()
678                 d.addCallback(lambda results, shnum=shnum:
679                               self._try_to_set_pubkey(results, server, shnum, lp))
680                 # XXX: Make self._pubkey_query_failed?
681                 d.addErrback(lambda error, shnum=shnum, data=data:
682                              self._got_corrupt_share(error, shnum, server, data, lp))
683             else:
684                 # we already have the public key.
685                 d = defer.succeed(None)
686
687             # Neither of these two branches return anything of
688             # consequence, so the first entry in our deferredlist will
689             # be None.
690
691             # - Next, we need the version information. We almost
692             #   certainly got this by reading the first thousand or so
693             #   bytes of the share on the storage server, so we
694             #   shouldn't need to fetch anything at this step.
695             d2 = reader.get_verinfo()
696             d2.addErrback(lambda error, shnum=shnum, data=data:
697                           self._got_corrupt_share(error, shnum, server, data, lp))
698             # - Next, we need the signature. For an SDMF share, it is
699             #   likely that we fetched this when doing our initial fetch
700             #   to get the version information. In MDMF, this lives at
701             #   the end of the share, so unless the file is quite small,
702             #   we'll need to do a remote fetch to get it.
703             d3 = reader.get_signature()
704             d3.addErrback(lambda error, shnum=shnum, data=data:
705                           self._got_corrupt_share(error, shnum, server, data, lp))
706             #  Once we have all three of these responses, we can move on
707             #  to validating the signature
708
709             # Does the node already have a privkey? If not, we'll try to
710             # fetch it here.
711             if self._need_privkey:
712                 d4 = reader.get_encprivkey()
713                 d4.addCallback(lambda results, shnum=shnum:
714                                self._try_to_validate_privkey(results, server, shnum, lp))
715                 d4.addErrback(lambda error, shnum=shnum:
716                               self._privkey_query_failed(error, server, shnum, lp))
717             else:
718                 d4 = defer.succeed(None)
719
720
721             if self.fetch_update_data:
722                 # fetch the block hash tree and first + last segment, as
723                 # configured earlier.
724                 # Then set them in wherever we happen to want to set
725                 # them.
726                 ds = []
727                 # XXX: We do this above, too. Is there a good way to
728                 # make the two routines share the value without
729                 # introducing more roundtrips?
730                 ds.append(reader.get_verinfo())
731                 ds.append(reader.get_blockhashes())
732                 ds.append(reader.get_block_and_salt(self.start_segment))
733                 ds.append(reader.get_block_and_salt(self.end_segment))
734                 d5 = deferredutil.gatherResults(ds)
735                 d5.addCallback(self._got_update_results_one_share, shnum)
736             else:
737                 d5 = defer.succeed(None)
738
739             dl = defer.DeferredList([d, d2, d3, d4, d5])
740             def _append_proxy(passthrough, shnum=shnum, reader=reader):
741                 # Store the proxy (with its cache) keyed by serverid and
742                 # version.
743                 _, (_,verinfo), _, _, _ = passthrough
744                 verinfo = self._make_verinfo_hashable(verinfo)
745                 self._servermap.proxies[(verinfo,
746                                          server.get_serverid(),
747                                          storage_index, shnum)] = reader
748                 return passthrough
749             dl.addCallback(_append_proxy)
750             dl.addBoth(self._turn_barrier)
751             dl.addCallback(lambda results, shnum=shnum:
752                            self._got_signature_one_share(results, shnum, server, lp))
753             dl.addErrback(lambda error, shnum=shnum, data=data:
754                           self._got_corrupt_share(error, shnum, server, data, lp))
755             ds.append(dl)
756         # dl is a deferred list that will fire when all of the shares
757         # that we found on this server are done processing. When dl fires,
758         # we know that processing is done, so we can decrement the
759         # semaphore-like thing that we incremented earlier.
760         dl = defer.DeferredList(ds, fireOnOneErrback=True)
761         # Are we done? Done means that there are no more queries to
762         # send, that there are no outstanding queries, and that we
763         # haven't received any queries that are still processing. If we
764         # are done, self._check_for_done will cause the done deferred
765         # that we returned to our caller to fire, which tells them that
766         # they have a complete servermap, and that we won't be touching
767         # the servermap anymore.
768         dl.addCallback(_done_processing)
769         dl.addCallback(self._check_for_done)
770         dl.addErrback(self._fatal_error)
771         # all done!
772         self.log("_got_results done", parent=lp, level=log.NOISY)
773         return dl
774
775
776     def _turn_barrier(self, result):
777         """
778         I help the servermap updater avoid the recursion limit issues
779         discussed in #237.
780         """
781         return fireEventually(result)
782
783
784     def _try_to_set_pubkey(self, pubkey_s, server, shnum, lp):
785         if self._node.get_pubkey():
786             return # don't go through this again if we don't have to
787         fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
788         assert len(fingerprint) == 32
789         if fingerprint != self._node.get_fingerprint():
790             raise CorruptShareError(server, shnum,
791                                     "pubkey doesn't match fingerprint")
792         self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
793         assert self._node.get_pubkey()
794
795
796     def notify_server_corruption(self, server, shnum, reason):
797         rref = server.get_rref()
798         rref.callRemoteOnly("advise_corrupt_share",
799                             "mutable", self._storage_index, shnum, reason)
800
801
802     def _got_signature_one_share(self, results, shnum, server, lp):
803         # It is our job to give versioninfo to our caller. We need to
804         # raise CorruptShareError if the share is corrupt for any
805         # reason, something that our caller will handle.
806         self.log(format="_got_results: got shnum #%(shnum)d from serverid %(name)s",
807                  shnum=shnum,
808                  name=server.get_name(),
809                  level=log.NOISY,
810                  parent=lp)
811         if not self._running:
812             # We can't process the results, since we can't touch the
813             # servermap anymore.
814             self.log("but we're not running anymore.")
815             return None
816
817         _, verinfo, signature, __, ___ = results
818         verinfo = self._make_verinfo_hashable(verinfo[1])
819
820         # This tuple uniquely identifies a share on the grid; we use it
821         # to keep track of the ones that we've already seen.
822         (seqnum,
823          root_hash,
824          saltish,
825          segsize,
826          datalen,
827          k,
828          n,
829          prefix,
830          offsets_tuple) = verinfo
831
832
833         if verinfo not in self._valid_versions:
834             # This is a new version tuple, and we need to validate it
835             # against the public key before keeping track of it.
836             assert self._node.get_pubkey()
837             valid = self._node.get_pubkey().verify(prefix, signature[1])
838             if not valid:
839                 raise CorruptShareError(server, shnum,
840                                         "signature is invalid")
841
842         # ok, it's a valid verinfo. Add it to the list of validated
843         # versions.
844         self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
845                  % (seqnum, base32.b2a(root_hash)[:4],
846                     server.get_name(), shnum,
847                     k, n, segsize, datalen),
848                     parent=lp)
849         self._valid_versions.add(verinfo)
850         # We now know that this is a valid candidate verinfo. Whether or
851         # not this instance of it is valid is a matter for the next
852         # statement; at this point, we just know that if we see this
853         # version info again, that its signature checks out and that
854         # we're okay to skip the signature-checking step.
855
856         # (server, shnum) are bound in the method invocation.
857         if (server, shnum) in self._servermap.get_bad_shares():
858             # we've been told that the rest of the data in this share is
859             # unusable, so don't add it to the servermap.
860             self.log("but we've been told this is a bad share",
861                      parent=lp, level=log.UNUSUAL)
862             return verinfo
863
864         # Add the info to our servermap.
865         timestamp = time.time()
866         self._servermap.add_new_share(server, shnum, verinfo, timestamp)
867
868         return verinfo
869
870     def _make_verinfo_hashable(self, verinfo):
871         (seqnum,
872          root_hash,
873          saltish,
874          segsize,
875          datalen,
876          k,
877          n,
878          prefix,
879          offsets) = verinfo
880
881         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
882
883         verinfo = (seqnum,
884                    root_hash,
885                    saltish,
886                    segsize,
887                    datalen,
888                    k,
889                    n,
890                    prefix,
891                    offsets_tuple)
892         return verinfo
893
894     def _got_update_results_one_share(self, results, share):
895         """
896         I record the update results in results.
897         """
898         assert len(results) == 4
899         verinfo, blockhashes, start, end = results
900         verinfo = self._make_verinfo_hashable(verinfo)
901         update_data = (blockhashes, start, end)
902         self._servermap.set_update_data_for_share_and_verinfo(share,
903                                                               verinfo,
904                                                               update_data)
905
906
907     def _deserialize_pubkey(self, pubkey_s):
908         verifier = rsa.create_verifying_key_from_string(pubkey_s)
909         return verifier
910
911
912     def _try_to_validate_privkey(self, enc_privkey, server, shnum, lp):
913         """
914         Given a writekey from a remote server, I validate it against the
915         writekey stored in my node. If it is valid, then I set the
916         privkey and encprivkey properties of the node.
917         """
918         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
919         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
920         if alleged_writekey != self._node.get_writekey():
921             self.log("invalid privkey from %s shnum %d" %
922                      (server.get_name(), shnum),
923                      parent=lp, level=log.WEIRD, umid="aJVccw")
924             return
925
926         # it's good
927         self.log("got valid privkey from shnum %d on serverid %s" %
928                  (shnum, server.get_name()),
929                  parent=lp)
930         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
931         self._node._populate_encprivkey(enc_privkey)
932         self._node._populate_privkey(privkey)
933         self._need_privkey = False
934         self._status.set_privkey_from(server)
935
936
937     def _add_lease_failed(self, f, server, storage_index):
938         # Older versions of Tahoe didn't handle the add-lease message very
939         # well: <=1.1.0 throws a NameError because it doesn't implement
940         # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
941         # (which is most of them, since we send add-lease to everybody,
942         # before we know whether or not they have any shares for us), and
943         # 1.2.0 throws KeyError even on known buckets due to an internal bug
944         # in the latency-measuring code.
945
946         # we want to ignore the known-harmless errors and log the others. In
947         # particular we want to log any local errors caused by coding
948         # problems.
949
950         if f.check(DeadReferenceError):
951             return
952         if f.check(RemoteException):
953             if f.value.failure.check(KeyError, IndexError, NameError):
954                 # this may ignore a bit too much, but that only hurts us
955                 # during debugging
956                 return
957             self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
958                      name=server.get_name(),
959                      f_value=str(f.value),
960                      failure=f,
961                      level=log.WEIRD, umid="iqg3mw")
962             return
963         # local errors are cause for alarm
964         log.err(f,
965                 format="local error in add_lease to [%(name)s]: %(f_value)s",
966                 name=server.get_name(),
967                 f_value=str(f.value),
968                 level=log.WEIRD, umid="ZWh6HA")
969
970     def _query_failed(self, f, server):
971         if not self._running:
972             return
973         level = log.WEIRD
974         if f.check(DeadReferenceError):
975             level = log.UNUSUAL
976         self.log(format="error during query: %(f_value)s",
977                  f_value=str(f.value), failure=f,
978                  level=level, umid="IHXuQg")
979         self._must_query.discard(server)
980         self._queries_outstanding.discard(server)
981         self._bad_servers.add(server)
982         self._servermap.add_problem(f)
983         # a server could be in both ServerMap.reachable_servers and
984         # .unreachable_servers if they responded to our query, but then an
985         # exception was raised in _got_results.
986         self._servermap.mark_server_unreachable(server)
987         self._queries_completed += 1
988         self._last_failure = f
989
990
991     def _privkey_query_failed(self, f, server, shnum, lp):
992         self._queries_outstanding.discard(server)
993         if not self._running:
994             return
995         level = log.WEIRD
996         if f.check(DeadReferenceError):
997             level = log.UNUSUAL
998         self.log(format="error during privkey query: %(f_value)s",
999                  f_value=str(f.value), failure=f,
1000                  parent=lp, level=level, umid="McoJ5w")
1001         self._servermap.add_problem(f)
1002         self._last_failure = f
1003
1004
1005     def _check_for_done(self, res):
1006         # exit paths:
1007         #  return self._send_more_queries(outstanding) : send some more queries
1008         #  return self._done() : all done
1009         #  return : keep waiting, no new queries
1010         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
1011                               "%(outstanding)d queries outstanding, "
1012                               "%(extra)d extra servers available, "
1013                               "%(must)d 'must query' servers left, "
1014                               "need_privkey=%(need_privkey)s"
1015                               ),
1016                       mode=self.mode,
1017                       outstanding=len(self._queries_outstanding),
1018                       extra=len(self.extra_servers),
1019                       must=len(self._must_query),
1020                       need_privkey=self._need_privkey,
1021                       level=log.NOISY,
1022                       )
1023
1024         if not self._running:
1025             self.log("but we're not running", parent=lp, level=log.NOISY)
1026             return
1027
1028         if self._must_query:
1029             # we are still waiting for responses from servers that used to have
1030             # a share, so we must continue to wait. No additional queries are
1031             # required at this time.
1032             self.log("%d 'must query' servers left" % len(self._must_query),
1033                      level=log.NOISY, parent=lp)
1034             return
1035
1036         if (not self._queries_outstanding and not self.extra_servers):
1037             # all queries have retired, and we have no servers left to ask. No
1038             # more progress can be made, therefore we are done.
1039             self.log("all queries are retired, no extra servers: done",
1040                      parent=lp)
1041             return self._done()
1042
1043         recoverable_versions = self._servermap.recoverable_versions()
1044         unrecoverable_versions = self._servermap.unrecoverable_versions()
1045
1046         # what is our completion policy? how hard should we work?
1047
1048         if self.mode == MODE_ANYTHING:
1049             if recoverable_versions:
1050                 self.log("%d recoverable versions: done"
1051                          % len(recoverable_versions),
1052                          parent=lp)
1053                 return self._done()
1054
1055         if self.mode in (MODE_CHECK, MODE_REPAIR):
1056             # we used self._must_query, and we know there aren't any
1057             # responses still waiting, so that means we must be done
1058             self.log("done", parent=lp)
1059             return self._done()
1060
1061         MAX_IN_FLIGHT = 5
1062         if self.mode == MODE_READ:
1063             # if we've queried k+epsilon servers, and we see a recoverable
1064             # version, and we haven't seen any unrecoverable higher-seqnum'ed
1065             # versions, then we're done.
1066
1067             if self._queries_completed < self.num_servers_to_query:
1068                 self.log(format="%(completed)d completed, %(query)d to query: need more",
1069                          completed=self._queries_completed,
1070                          query=self.num_servers_to_query,
1071                          level=log.NOISY, parent=lp)
1072                 return self._send_more_queries(MAX_IN_FLIGHT)
1073             if not recoverable_versions:
1074                 self.log("no recoverable versions: need more",
1075                          level=log.NOISY, parent=lp)
1076                 return self._send_more_queries(MAX_IN_FLIGHT)
1077             highest_recoverable = max(recoverable_versions)
1078             highest_recoverable_seqnum = highest_recoverable[0]
1079             for unrec_verinfo in unrecoverable_versions:
1080                 if unrec_verinfo[0] > highest_recoverable_seqnum:
1081                     # there is evidence of a higher-seqnum version, but we
1082                     # don't yet see enough shares to recover it. Try harder.
1083                     # TODO: consider sending more queries.
1084                     # TODO: consider limiting the search distance
1085                     self.log("evidence of higher seqnum: need more",
1086                              level=log.UNUSUAL, parent=lp)
1087                     return self._send_more_queries(MAX_IN_FLIGHT)
1088             # all the unrecoverable versions were old or concurrent with a
1089             # recoverable version. Good enough.
1090             self.log("no higher-seqnum: done", parent=lp)
1091             return self._done()
1092
1093         if self.mode == MODE_WRITE:
1094             # we want to keep querying until we've seen a few that don't have
1095             # any shares, to be sufficiently confident that we've seen all
1096             # the shares. This is still less work than MODE_CHECK, which asks
1097             # every server in the world.
1098
1099             if not recoverable_versions:
1100                 self.log("no recoverable versions: need more", parent=lp,
1101                          level=log.NOISY)
1102                 return self._send_more_queries(MAX_IN_FLIGHT)
1103
1104             last_found = -1
1105             last_not_responded = -1
1106             num_not_responded = 0
1107             num_not_found = 0
1108             states = []
1109             found_boundary = False
1110
1111             for i,server in enumerate(self.full_serverlist):
1112                 if server in self._bad_servers:
1113                     # query failed
1114                     states.append("x")
1115                     #self.log("loop [%s]: x" % server.get_name()
1116                 elif server in self._empty_servers:
1117                     # no shares
1118                     states.append("0")
1119                     #self.log("loop [%s]: 0" % server.get_name()
1120                     if last_found != -1:
1121                         num_not_found += 1
1122                         if num_not_found >= self.EPSILON:
1123                             self.log("found our boundary, %s" %
1124                                      "".join(states),
1125                                      parent=lp, level=log.NOISY)
1126                             found_boundary = True
1127                             break
1128
1129                 elif server in self._good_servers:
1130                     # yes shares
1131                     states.append("1")
1132                     #self.log("loop [%s]: 1" % server.get_name()
1133                     last_found = i
1134                     num_not_found = 0
1135                 else:
1136                     # not responded yet
1137                     states.append("?")
1138                     #self.log("loop [%s]: ?" % server.get_name()
1139                     last_not_responded = i
1140                     num_not_responded += 1
1141
1142             if found_boundary:
1143                 # we need to know that we've gotten answers from
1144                 # everybody to the left of here
1145                 if last_not_responded == -1:
1146                     # we're done
1147                     self.log("have all our answers",
1148                              parent=lp, level=log.NOISY)
1149                     # .. unless we're still waiting on the privkey
1150                     if self._need_privkey:
1151                         self.log("but we're still waiting for the privkey",
1152                                  parent=lp, level=log.NOISY)
1153                         # if we found the boundary but we haven't yet found
1154                         # the privkey, we may need to look further. If
1155                         # somehow all the privkeys were corrupted (but the
1156                         # shares were readable), then this is likely to do an
1157                         # exhaustive search.
1158                         return self._send_more_queries(MAX_IN_FLIGHT)
1159                     return self._done()
1160                 # still waiting for somebody
1161                 return self._send_more_queries(num_not_responded)
1162
1163             # if we hit here, we didn't find our boundary, so we're still
1164             # waiting for servers
1165             self.log("no boundary yet, %s" % "".join(states), parent=lp,
1166                      level=log.NOISY)
1167             return self._send_more_queries(MAX_IN_FLIGHT)
1168
1169         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
1170         # arbitrary, really I want this to be something like k -
1171         # max(known_version_sharecounts) + some extra
1172         self.log("catchall: need more", parent=lp, level=log.NOISY)
1173         return self._send_more_queries(MAX_IN_FLIGHT)
1174
1175     def _send_more_queries(self, num_outstanding):
1176         more_queries = []
1177
1178         while True:
1179             self.log(format=" there are %(outstanding)d queries outstanding",
1180                      outstanding=len(self._queries_outstanding),
1181                      level=log.NOISY)
1182             active_queries = len(self._queries_outstanding) + len(more_queries)
1183             if active_queries >= num_outstanding:
1184                 break
1185             if not self.extra_servers:
1186                 break
1187             more_queries.append(self.extra_servers.pop(0))
1188
1189         self.log(format="sending %(more)d more queries: %(who)s",
1190                  more=len(more_queries),
1191                  who=" ".join(["[%s]" % s.get_name() for s in more_queries]),
1192                  level=log.NOISY)
1193
1194         for server in more_queries:
1195             self._do_query(server, self._storage_index, self._read_size)
1196             # we'll retrigger when those queries come back
1197
1198     def _done(self):
1199         if not self._running:
1200             self.log("not running; we're already done")
1201             return
1202         self._running = False
1203         now = time.time()
1204         elapsed = now - self._started
1205         self._status.set_finished(now)
1206         self._status.timings["total"] = elapsed
1207         self._status.set_progress(1.0)
1208         self._status.set_status("Finished")
1209         self._status.set_active(False)
1210
1211         self._servermap.set_last_update(self.mode, self._started)
1212         # the servermap will not be touched after this
1213         self.log("servermap: %s" % self._servermap.summarize_versions())
1214
1215         eventually(self._done_deferred.callback, self._servermap)
1216
1217     def _fatal_error(self, f):
1218         self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1219         self._done_deferred.errback(f)
1220
1221