]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
e908e42598da1d78964b3c37db035349b9480041
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually, \
8                          fireEventually
9 from allmydata.util import base32, hashutil, log, deferredutil
10 from allmydata.util.dictutil import DictOfSets
11 from allmydata.storage.server import si_b2a
12 from allmydata.interfaces import IServermapUpdaterStatus
13 from pycryptopp.publickey import rsa
14
15 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \
16      MODE_READ, MODE_REPAIR, CorruptShareError
17 from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
18
19 class UpdateStatus:
20     implements(IServermapUpdaterStatus)
21     statusid_counter = count(0)
22     def __init__(self):
23         self.timings = {}
24         self.timings["per_server"] = {}
25         self.timings["cumulative_verify"] = 0.0
26         self.privkey_from = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.mode = "?"
31         self.status = "Not started"
32         self.progress = 0.0
33         self.counter = self.statusid_counter.next()
34         self.started = time.time()
35         self.finished = None
36
37     def add_per_server_time(self, server, op, sent, elapsed):
38         assert op in ("query", "late", "privkey")
39         if server not in self.timings["per_server"]:
40             self.timings["per_server"][server] = []
41         self.timings["per_server"][server].append((op,sent,elapsed))
42
43     def get_started(self):
44         return self.started
45     def get_finished(self):
46         return self.finished
47     def get_storage_index(self):
48         return self.storage_index
49     def get_mode(self):
50         return self.mode
51     def get_servermap(self):
52         return self.servermap
53     def get_privkey_from(self):
54         return self.privkey_from
55     def using_helper(self):
56         return False
57     def get_size(self):
58         return "-NA-"
59     def get_status(self):
60         return self.status
61     def get_progress(self):
62         return self.progress
63     def get_active(self):
64         return self.active
65     def get_counter(self):
66         return self.counter
67
68     def set_storage_index(self, si):
69         self.storage_index = si
70     def set_mode(self, mode):
71         self.mode = mode
72     def set_privkey_from(self, server):
73         self.privkey_from = server
74     def set_status(self, status):
75         self.status = status
76     def set_progress(self, value):
77         self.progress = value
78     def set_active(self, value):
79         self.active = value
80     def set_finished(self, when):
81         self.finished = when
82
83 class ServerMap:
84     """I record the placement of mutable shares.
85
86     This object records which shares (of various versions) are located on
87     which servers.
88
89     One purpose I serve is to inform callers about which versions of the
90     mutable file are recoverable and 'current'.
91
92     A second purpose is to serve as a state marker for test-and-set
93     operations. I am passed out of retrieval operations and back into publish
94     operations, which means 'publish this new version, but only if nothing
95     has changed since I last retrieved this data'. This reduces the chances
96     of clobbering a simultaneous (uncoordinated) write.
97
98     @var _known_shares: a dictionary, mapping a (server, shnum) tuple to a
99                         (versionid, timestamp) tuple. Each 'versionid' is a
100                         tuple of (seqnum, root_hash, IV, segsize, datalength,
101                         k, N, signed_prefix, offsets)
102
103     @ivar _bad_shares: dict with keys of (server, shnum) tuples, describing
104                        shares that I should ignore (because a previous user
105                        of the servermap determined that they were invalid).
106                        The updater only locates a certain number of shares:
107                        if some of these turn out to have integrity problems
108                        and are unusable, the caller will need to mark those
109                        shares as bad, then re-update the servermap, then try
110                        again. The dict maps (server, shnum) tuple to old
111                        checkstring.
112     """
113
114     def __init__(self):
115         self._known_shares = {}
116         self.unreachable_servers = set() # servers that didn't respond to queries
117         self.reachable_servers = set() # servers that did respond to queries
118         self._problems = [] # mostly for debugging
119         self._bad_shares = {} # maps (server,shnum) to old checkstring
120         self._last_update_mode = None
121         self._last_update_time = 0
122         self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
123         # where blockhashes is a list of bytestrings (the result of
124         # layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
125         # (block,salt) tuple-of-bytestrings from get_block_and_salt()
126
127     def copy(self):
128         s = ServerMap()
129         s._known_shares = self._known_shares.copy() # tuple->tuple
130         s.unreachable_servers = set(self.unreachable_servers)
131         s.reachable_servers = set(self.reachable_servers)
132         s._problems = self._problems[:]
133         s._bad_shares = self._bad_shares.copy() # tuple->str
134         s._last_update_mode = self._last_update_mode
135         s._last_update_time = self._last_update_time
136         return s
137
138     def get_reachable_servers(self):
139         return self.reachable_servers
140
141     def mark_server_reachable(self, server):
142         self.reachable_servers.add(server)
143
144     def mark_server_unreachable(self, server):
145         self.unreachable_servers.add(server)
146
147     def mark_bad_share(self, server, shnum, checkstring):
148         """This share was found to be bad, either in the checkstring or
149         signature (detected during mapupdate), or deeper in the share
150         (detected at retrieve time). Remove it from our list of useful
151         shares, and remember that it is bad so we don't add it back again
152         later. We record the share's old checkstring (which might be
153         corrupted or badly signed) so that a repair operation can do the
154         test-and-set using it as a reference.
155         """
156         key = (server, shnum) # record checkstring
157         self._bad_shares[key] = checkstring
158         self._known_shares.pop(key, None)
159
160     def get_bad_shares(self):
161         # key=(server,shnum) -> checkstring
162         return self._bad_shares
163
164     def add_new_share(self, server, shnum, verinfo, timestamp):
165         """We've written a new share out, replacing any that was there
166         before."""
167         key = (server, shnum)
168         self._bad_shares.pop(key, None)
169         self._known_shares[key] = (verinfo, timestamp)
170
171     def add_problem(self, f):
172         self._problems.append(f)
173     def get_problems(self):
174         return self._problems
175
176     def set_last_update(self, mode, when):
177         self._last_update_mode = mode
178         self._last_update_time = when
179     def get_last_update(self):
180         return (self._last_update_mode, self._last_update_time)
181
182     def dump(self, out=sys.stdout):
183         print >>out, "servermap:"
184
185         for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
186             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
187              offsets_tuple) = verinfo
188             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
189                           (server.get_name(), shnum,
190                            seqnum, base32.b2a(root_hash)[:4], k, N,
191                            datalength))
192         if self._problems:
193             print >>out, "%d PROBLEMS" % len(self._problems)
194             for f in self._problems:
195                 print >>out, str(f)
196         return out
197
198     def all_servers(self):
199         return set([server for (server, shnum) in self._known_shares])
200
201     def all_servers_for_version(self, verinfo):
202         """Return a set of servers that hold shares for the given version."""
203         return set([server
204                     for ( (server, shnum), (verinfo2, timestamp) )
205                     in self._known_shares.items()
206                     if verinfo == verinfo2])
207
208     def get_known_shares(self):
209         # maps (server,shnum) to (versionid,timestamp)
210         return self._known_shares
211
212     def make_sharemap(self):
213         """Return a dict that maps shnum to a set of servers that hold it."""
214         sharemap = DictOfSets()
215         for (server, shnum) in self._known_shares:
216             sharemap.add(shnum, server)
217         return sharemap
218
219     def make_versionmap(self):
220         """Return a dict that maps versionid to sets of (shnum, server,
221         timestamp) tuples."""
222         versionmap = DictOfSets()
223         for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
224             versionmap.add(verinfo, (shnum, server, timestamp))
225         return versionmap
226
227     def debug_shares_on_server(self, server): # used by tests
228         return set([shnum for (s, shnum) in self._known_shares if s == server])
229
230     def version_on_server(self, server, shnum):
231         key = (server, shnum)
232         if key in self._known_shares:
233             (verinfo, timestamp) = self._known_shares[key]
234             return verinfo
235         return None
236
237     def shares_available(self):
238         """Return a dict that maps verinfo to tuples of
239         (num_distinct_shares, k, N) tuples."""
240         versionmap = self.make_versionmap()
241         all_shares = {}
242         for verinfo, shares in versionmap.items():
243             s = set()
244             for (shnum, server, timestamp) in shares:
245                 s.add(shnum)
246             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
247              offsets_tuple) = verinfo
248             all_shares[verinfo] = (len(s), k, N)
249         return all_shares
250
251     def highest_seqnum(self):
252         available = self.shares_available()
253         seqnums = [verinfo[0]
254                    for verinfo in available.keys()]
255         seqnums.append(0)
256         return max(seqnums)
257
258     def summarize_version(self, verinfo):
259         """Take a versionid, return a string that describes it."""
260         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
261          offsets_tuple) = verinfo
262         return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
263
264     def summarize_versions(self):
265         """Return a string describing which versions we know about."""
266         versionmap = self.make_versionmap()
267         bits = []
268         for (verinfo, shares) in versionmap.items():
269             vstr = self.summarize_version(verinfo)
270             shnums = set([shnum for (shnum, server, timestamp) in shares])
271             bits.append("%d*%s" % (len(shnums), vstr))
272         return "/".join(bits)
273
274     def recoverable_versions(self):
275         """Return a set of versionids, one for each version that is currently
276         recoverable."""
277         versionmap = self.make_versionmap()
278         recoverable_versions = set()
279         for (verinfo, shares) in versionmap.items():
280             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
281              offsets_tuple) = verinfo
282             shnums = set([shnum for (shnum, server, timestamp) in shares])
283             if len(shnums) >= k:
284                 # this one is recoverable
285                 recoverable_versions.add(verinfo)
286
287         return recoverable_versions
288
289     def unrecoverable_versions(self):
290         """Return a set of versionids, one for each version that is currently
291         unrecoverable."""
292         versionmap = self.make_versionmap()
293
294         unrecoverable_versions = set()
295         for (verinfo, shares) in versionmap.items():
296             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
297              offsets_tuple) = verinfo
298             shnums = set([shnum for (shnum, server, timestamp) in shares])
299             if len(shnums) < k:
300                 unrecoverable_versions.add(verinfo)
301
302         return unrecoverable_versions
303
304     def best_recoverable_version(self):
305         """Return a single versionid, for the so-called 'best' recoverable
306         version. Sequence number is the primary sort criteria, followed by
307         root hash. Returns None if there are no recoverable versions."""
308         recoverable = list(self.recoverable_versions())
309         recoverable.sort()
310         if recoverable:
311             return recoverable[-1]
312         return None
313
314     def size_of_version(self, verinfo):
315         """Given a versionid (perhaps returned by best_recoverable_version),
316         return the size of the file in bytes."""
317         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
318          offsets_tuple) = verinfo
319         return datalength
320
321     def unrecoverable_newer_versions(self):
322         # Return a dict of versionid -> health, for versions that are
323         # unrecoverable and have later seqnums than any recoverable versions.
324         # These indicate that a write will lose data.
325         versionmap = self.make_versionmap()
326         healths = {} # maps verinfo to (found,k)
327         unrecoverable = set()
328         highest_recoverable_seqnum = -1
329         for (verinfo, shares) in versionmap.items():
330             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
331              offsets_tuple) = verinfo
332             shnums = set([shnum for (shnum, server, timestamp) in shares])
333             healths[verinfo] = (len(shnums),k)
334             if len(shnums) < k:
335                 unrecoverable.add(verinfo)
336             else:
337                 highest_recoverable_seqnum = max(seqnum,
338                                                  highest_recoverable_seqnum)
339
340         newversions = {}
341         for verinfo in unrecoverable:
342             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
343              offsets_tuple) = verinfo
344             if seqnum > highest_recoverable_seqnum:
345                 newversions[verinfo] = healths[verinfo]
346
347         return newversions
348
349
350     def needs_merge(self):
351         # return True if there are multiple recoverable versions with the
352         # same seqnum, meaning that MutableFileNode.read_best_version is not
353         # giving you the whole story, and that using its data to do a
354         # subsequent publish will lose information.
355         recoverable_seqnums = [verinfo[0]
356                                for verinfo in self.recoverable_versions()]
357         for seqnum in recoverable_seqnums:
358             if recoverable_seqnums.count(seqnum) > 1:
359                 return True
360         return False
361
362
363     def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
364         """
365         I return the update data for the given shnum
366         """
367         update_data = self.update_data[shnum]
368         update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
369         return update_datum
370
371
372     def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
373         """
374         I record the block hash tree for the given shnum.
375         """
376         self.update_data.setdefault(shnum , []).append((verinfo, data))
377
378
379 class ServermapUpdater:
380     def __init__(self, filenode, storage_broker, monitor, servermap,
381                  mode=MODE_READ, add_lease=False, update_range=None):
382         """I update a servermap, locating a sufficient number of useful
383         shares and remembering where they are located.
384
385         """
386
387         self._node = filenode
388         self._storage_broker = storage_broker
389         self._monitor = monitor
390         self._servermap = servermap
391         self.mode = mode
392         self._add_lease = add_lease
393         self._running = True
394
395         self._storage_index = filenode.get_storage_index()
396         self._last_failure = None
397
398         self._status = UpdateStatus()
399         self._status.set_storage_index(self._storage_index)
400         self._status.set_progress(0.0)
401         self._status.set_mode(mode)
402
403         self._servers_responded = set()
404
405         # how much data should we read?
406         # SDMF:
407         #  * if we only need the checkstring, then [0:75]
408         #  * if we need to validate the checkstring sig, then [543ish:799ish]
409         #  * if we need the verification key, then [107:436ish]
410         #   * the offset table at [75:107] tells us about the 'ish'
411         #  * if we need the encrypted private key, we want [-1216ish:]
412         #   * but we can't read from negative offsets
413         #   * the offset table tells us the 'ish', also the positive offset
414         # MDMF:
415         #  * Checkstring? [0:72]
416         #  * If we want to validate the checkstring, then [0:72], [143:?] --
417         #    the offset table will tell us for sure.
418         #  * If we need the verification key, we have to consult the offset
419         #    table as well.
420         # At this point, we don't know which we are. Our filenode can
421         # tell us, but it might be lying -- in some cases, we're
422         # responsible for telling it which kind of file it is.
423         self._read_size = 4000
424         if mode == MODE_CHECK:
425             # we use unpack_prefix_and_signature, so we need 1k
426             self._read_size = 1000
427         self._need_privkey = False
428
429         if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey():
430             self._need_privkey = True
431         # check+repair: repair requires the privkey, so if we didn't happen
432         # to ask for it during the check, we'll have problems doing the
433         # publish.
434
435         self.fetch_update_data = False
436         if mode == MODE_WRITE and update_range:
437             # We're updating the servermap in preparation for an
438             # in-place file update, so we need to fetch some additional
439             # data from each share that we find.
440             assert len(update_range) == 2
441
442             self.start_segment = update_range[0]
443             self.end_segment = update_range[1]
444             self.fetch_update_data = True
445
446         prefix = si_b2a(self._storage_index)[:5]
447         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
448                                    si=prefix, mode=mode)
449
450     def get_status(self):
451         return self._status
452
453     def log(self, *args, **kwargs):
454         if "parent" not in kwargs:
455             kwargs["parent"] = self._log_number
456         if "facility" not in kwargs:
457             kwargs["facility"] = "tahoe.mutable.mapupdate"
458         return log.msg(*args, **kwargs)
459
460     def update(self):
461         """Update the servermap to reflect current conditions. Returns a
462         Deferred that fires with the servermap once the update has finished."""
463         self._started = time.time()
464         self._status.set_active(True)
465
466         # self._valid_versions is a set of validated verinfo tuples. We just
467         # use it to remember which versions had valid signatures, so we can
468         # avoid re-checking the signatures for each share.
469         self._valid_versions = set()
470
471         self._done_deferred = defer.Deferred()
472
473         # first, which servers should be talk to? Any that were in our old
474         # servermap, plus "enough" others.
475
476         self._queries_completed = 0
477
478         sb = self._storage_broker
479         # All of the servers, permuted by the storage index, as usual.
480         full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
481         self.full_serverlist = full_serverlist # for use later, immutable
482         self.extra_servers = full_serverlist[:] # servers are removed as we use them
483         self._good_servers = set() # servers who had some shares
484         self._empty_servers = set() # servers who don't have any shares
485         self._bad_servers = set() # servers to whom our queries failed
486
487         k = self._node.get_required_shares()
488         # For what cases can these conditions work?
489         if k is None:
490             # make a guess
491             k = 3
492         N = self._node.get_total_shares()
493         if N is None:
494             N = 10
495         self.EPSILON = k
496         # we want to send queries to at least this many servers (although we
497         # might not wait for all of their answers to come back)
498         self.num_servers_to_query = k + self.EPSILON
499
500         if self.mode in (MODE_CHECK, MODE_REPAIR):
501             # We want to query all of the servers.
502             initial_servers_to_query = list(full_serverlist)
503             must_query = set(initial_servers_to_query)
504             self.extra_servers = []
505         elif self.mode == MODE_WRITE:
506             # we're planning to replace all the shares, so we want a good
507             # chance of finding them all. We will keep searching until we've
508             # seen epsilon that don't have a share.
509             # We don't query all of the servers because that could take a while.
510             self.num_servers_to_query = N + self.EPSILON
511             initial_servers_to_query, must_query = self._build_initial_querylist()
512             self.required_num_empty_servers = self.EPSILON
513
514             # TODO: arrange to read lots of data from k-ish servers, to avoid
515             # the extra round trip required to read large directories. This
516             # might also avoid the round trip required to read the encrypted
517             # private key.
518
519         else: # MODE_READ, MODE_ANYTHING
520             # 2*k servers is good enough.
521             initial_servers_to_query, must_query = self._build_initial_querylist()
522
523         # this is a set of servers that we are required to get responses
524         # from: they are servers who used to have a share, so we need to know
525         # where they currently stand, even if that means we have to wait for
526         # a silently-lost TCP connection to time out. We remove servers from
527         # this set as we get responses.
528         self._must_query = set(must_query)
529
530         # now initial_servers_to_query contains the servers that we should
531         # ask, self.must_query contains the servers that we must have heard
532         # from before we can consider ourselves finished, and
533         # self.extra_servers contains the overflow (servers that we should
534         # tap if we don't get enough responses)
535         # I guess that self._must_query is a subset of
536         # initial_servers_to_query?
537         assert must_query.issubset(initial_servers_to_query)
538
539         self._send_initial_requests(initial_servers_to_query)
540         self._status.timings["initial_queries"] = time.time() - self._started
541         return self._done_deferred
542
543     def _build_initial_querylist(self):
544         # we send queries to everyone who was already in the sharemap
545         initial_servers_to_query = set(self._servermap.all_servers())
546         # and we must wait for responses from them
547         must_query = set(initial_servers_to_query)
548
549         while ((self.num_servers_to_query > len(initial_servers_to_query))
550                and self.extra_servers):
551             initial_servers_to_query.add(self.extra_servers.pop(0))
552
553         return initial_servers_to_query, must_query
554
555     def _send_initial_requests(self, serverlist):
556         self._status.set_status("Sending %d initial queries" % len(serverlist))
557         self._queries_outstanding = set()
558         for server in serverlist:
559             self._queries_outstanding.add(server)
560             self._do_query(server, self._storage_index, self._read_size)
561
562         if not serverlist:
563             # there is nobody to ask, so we need to short-circuit the state
564             # machine.
565             d = defer.maybeDeferred(self._check_for_done, None)
566             d.addErrback(self._fatal_error)
567
568         # control flow beyond this point: state machine. Receiving responses
569         # from queries is the input. We might send out more queries, or we
570         # might produce a result.
571         return None
572
573     def _do_query(self, server, storage_index, readsize):
574         self.log(format="sending query to [%(name)s], readsize=%(readsize)d",
575                  name=server.get_name(),
576                  readsize=readsize,
577                  level=log.NOISY)
578         started = time.time()
579         self._queries_outstanding.add(server)
580         d = self._do_read(server, storage_index, [], [(0, readsize)])
581         d.addCallback(self._got_results, server, readsize, storage_index,
582                       started)
583         d.addErrback(self._query_failed, server)
584         # errors that aren't handled by _query_failed (and errors caused by
585         # _query_failed) get logged, but we still want to check for doneness.
586         d.addErrback(log.err)
587         d.addErrback(self._fatal_error)
588         d.addCallback(self._check_for_done)
589         return d
590
591     def _do_read(self, server, storage_index, shnums, readv):
592         ss = server.get_rref()
593         if self._add_lease:
594             # send an add-lease message in parallel. The results are handled
595             # separately. This is sent before the slot_readv() so that we can
596             # be sure the add_lease is retired by the time slot_readv comes
597             # back (this relies upon our knowledge that the server code for
598             # add_lease is synchronous).
599             renew_secret = self._node.get_renewal_secret(server)
600             cancel_secret = self._node.get_cancel_secret(server)
601             d2 = ss.callRemote("add_lease", storage_index,
602                                renew_secret, cancel_secret)
603             # we ignore success
604             d2.addErrback(self._add_lease_failed, server, storage_index)
605         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
606         return d
607
608
609     def _got_corrupt_share(self, e, shnum, server, data, lp):
610         """
611         I am called when a remote server returns a corrupt share in
612         response to one of our queries. By corrupt, I mean a share
613         without a valid signature. I then record the failure, notify the
614         server of the corruption, and record the share as bad.
615         """
616         f = failure.Failure(e)
617         self.log(format="bad share: %(f_value)s", f_value=str(f),
618                  failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
619         # Notify the server that its share is corrupt.
620         self.notify_server_corruption(server, shnum, str(e))
621         # By flagging this as a bad server, we won't count any of
622         # the other shares on that server as valid, though if we
623         # happen to find a valid version string amongst those
624         # shares, we'll keep track of it so that we don't need
625         # to validate the signature on those again.
626         self._bad_servers.add(server)
627         self._last_failure = f
628         # XXX: Use the reader for this?
629         checkstring = data[:SIGNED_PREFIX_LENGTH]
630         self._servermap.mark_bad_share(server, shnum, checkstring)
631         self._servermap.add_problem(f)
632
633
634     def _cache_good_sharedata(self, verinfo, shnum, now, data):
635         """
636         If one of my queries returns successfully (which means that we
637         were able to and successfully did validate the signature), I
638         cache the data that we initially fetched from the storage
639         server. This will help reduce the number of roundtrips that need
640         to occur when the file is downloaded, or when the file is
641         updated.
642         """
643         if verinfo:
644             self._node._add_to_cache(verinfo, shnum, 0, data)
645
646
647     def _got_results(self, datavs, server, readsize, storage_index, started):
648         lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
649                       name=server.get_name(),
650                       numshares=len(datavs))
651         ss = server.get_rref()
652         now = time.time()
653         elapsed = now - started
654         def _done_processing(ignored=None):
655             self._queries_outstanding.discard(server)
656             self._servermap.mark_server_reachable(server)
657             self._must_query.discard(server)
658             self._queries_completed += 1
659         if not self._running:
660             self.log("but we're not running, so we'll ignore it", parent=lp)
661             _done_processing()
662             self._status.add_per_server_time(server, "late", started, elapsed)
663             return
664         self._status.add_per_server_time(server, "query", started, elapsed)
665
666         if datavs:
667             self._good_servers.add(server)
668         else:
669             self._empty_servers.add(server)
670
671         ds = []
672
673         for shnum,datav in datavs.items():
674             data = datav[0]
675             reader = MDMFSlotReadProxy(ss,
676                                        storage_index,
677                                        shnum,
678                                        data)
679             # our goal, with each response, is to validate the version
680             # information and share data as best we can at this point --
681             # we do this by validating the signature. To do this, we
682             # need to do the following:
683             #   - If we don't already have the public key, fetch the
684             #     public key. We use this to validate the signature.
685             if not self._node.get_pubkey():
686                 # fetch and set the public key.
687                 d = reader.get_verification_key()
688                 d.addCallback(lambda results, shnum=shnum:
689                               self._try_to_set_pubkey(results, server, shnum, lp))
690                 # XXX: Make self._pubkey_query_failed?
691                 d.addErrback(lambda error, shnum=shnum, data=data:
692                              self._got_corrupt_share(error, shnum, server, data, lp))
693             else:
694                 # we already have the public key.
695                 d = defer.succeed(None)
696
697             # Neither of these two branches return anything of
698             # consequence, so the first entry in our deferredlist will
699             # be None.
700
701             # - Next, we need the version information. We almost
702             #   certainly got this by reading the first thousand or so
703             #   bytes of the share on the storage server, so we
704             #   shouldn't need to fetch anything at this step.
705             d2 = reader.get_verinfo()
706             d2.addErrback(lambda error, shnum=shnum, data=data:
707                           self._got_corrupt_share(error, shnum, server, data, lp))
708             # - Next, we need the signature. For an SDMF share, it is
709             #   likely that we fetched this when doing our initial fetch
710             #   to get the version information. In MDMF, this lives at
711             #   the end of the share, so unless the file is quite small,
712             #   we'll need to do a remote fetch to get it.
713             d3 = reader.get_signature()
714             d3.addErrback(lambda error, shnum=shnum, data=data:
715                           self._got_corrupt_share(error, shnum, server, data, lp))
716             #  Once we have all three of these responses, we can move on
717             #  to validating the signature
718
719             # Does the node already have a privkey? If not, we'll try to
720             # fetch it here.
721             if self._need_privkey:
722                 d4 = reader.get_encprivkey()
723                 d4.addCallback(lambda results, shnum=shnum:
724                                self._try_to_validate_privkey(results, server, shnum, lp))
725                 d4.addErrback(lambda error, shnum=shnum:
726                               self._privkey_query_failed(error, server, shnum, lp))
727             else:
728                 d4 = defer.succeed(None)
729
730
731             if self.fetch_update_data:
732                 # fetch the block hash tree and first + last segment, as
733                 # configured earlier.
734                 # Then set them in wherever we happen to want to set
735                 # them.
736                 ds = []
737                 # XXX: We do this above, too. Is there a good way to
738                 # make the two routines share the value without
739                 # introducing more roundtrips?
740                 ds.append(reader.get_verinfo())
741                 ds.append(reader.get_blockhashes())
742                 ds.append(reader.get_block_and_salt(self.start_segment))
743                 ds.append(reader.get_block_and_salt(self.end_segment))
744                 d5 = deferredutil.gatherResults(ds)
745                 d5.addCallback(self._got_update_results_one_share, shnum)
746             else:
747                 d5 = defer.succeed(None)
748
749             dl = defer.DeferredList([d, d2, d3, d4, d5])
750             dl.addBoth(self._turn_barrier)
751             dl.addCallback(lambda results, shnum=shnum:
752                            self._got_signature_one_share(results, shnum, server, lp))
753             dl.addErrback(lambda error, shnum=shnum, data=data:
754                           self._got_corrupt_share(error, shnum, server, data, lp))
755             dl.addCallback(lambda verinfo, shnum=shnum, data=data:
756                            self._cache_good_sharedata(verinfo, shnum, now, data))
757             ds.append(dl)
758         # dl is a deferred list that will fire when all of the shares
759         # that we found on this server are done processing. When dl fires,
760         # we know that processing is done, so we can decrement the
761         # semaphore-like thing that we incremented earlier.
762         dl = defer.DeferredList(ds, fireOnOneErrback=True)
763         # Are we done? Done means that there are no more queries to
764         # send, that there are no outstanding queries, and that we
765         # haven't received any queries that are still processing. If we
766         # are done, self._check_for_done will cause the done deferred
767         # that we returned to our caller to fire, which tells them that
768         # they have a complete servermap, and that we won't be touching
769         # the servermap anymore.
770         dl.addCallback(_done_processing)
771         dl.addCallback(self._check_for_done)
772         dl.addErrback(self._fatal_error)
773         # all done!
774         self.log("_got_results done", parent=lp, level=log.NOISY)
775         return dl
776
777
778     def _turn_barrier(self, result):
779         """
780         I help the servermap updater avoid the recursion limit issues
781         discussed in #237.
782         """
783         return fireEventually(result)
784
785
786     def _try_to_set_pubkey(self, pubkey_s, server, shnum, lp):
787         if self._node.get_pubkey():
788             return # don't go through this again if we don't have to
789         fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
790         assert len(fingerprint) == 32
791         if fingerprint != self._node.get_fingerprint():
792             raise CorruptShareError(server, shnum,
793                                     "pubkey doesn't match fingerprint")
794         self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
795         assert self._node.get_pubkey()
796
797
798     def notify_server_corruption(self, server, shnum, reason):
799         rref = server.get_rref()
800         rref.callRemoteOnly("advise_corrupt_share",
801                             "mutable", self._storage_index, shnum, reason)
802
803
804     def _got_signature_one_share(self, results, shnum, server, lp):
805         # It is our job to give versioninfo to our caller. We need to
806         # raise CorruptShareError if the share is corrupt for any
807         # reason, something that our caller will handle.
808         self.log(format="_got_results: got shnum #%(shnum)d from serverid %(name)s",
809                  shnum=shnum,
810                  name=server.get_name(),
811                  level=log.NOISY,
812                  parent=lp)
813         if not self._running:
814             # We can't process the results, since we can't touch the
815             # servermap anymore.
816             self.log("but we're not running anymore.")
817             return None
818
819         _, verinfo, signature, __, ___ = results
820         (seqnum,
821          root_hash,
822          saltish,
823          segsize,
824          datalen,
825          k,
826          n,
827          prefix,
828          offsets) = verinfo[1]
829         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
830
831         # XXX: This should be done for us in the method, so
832         # presumably you can go in there and fix it.
833         verinfo = (seqnum,
834                    root_hash,
835                    saltish,
836                    segsize,
837                    datalen,
838                    k,
839                    n,
840                    prefix,
841                    offsets_tuple)
842         # This tuple uniquely identifies a share on the grid; we use it
843         # to keep track of the ones that we've already seen.
844
845         if verinfo not in self._valid_versions:
846             # This is a new version tuple, and we need to validate it
847             # against the public key before keeping track of it.
848             assert self._node.get_pubkey()
849             valid = self._node.get_pubkey().verify(prefix, signature[1])
850             if not valid:
851                 raise CorruptShareError(server, shnum,
852                                         "signature is invalid")
853
854         # ok, it's a valid verinfo. Add it to the list of validated
855         # versions.
856         self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
857                  % (seqnum, base32.b2a(root_hash)[:4],
858                     server.get_name(), shnum,
859                     k, n, segsize, datalen),
860                     parent=lp)
861         self._valid_versions.add(verinfo)
862         # We now know that this is a valid candidate verinfo. Whether or
863         # not this instance of it is valid is a matter for the next
864         # statement; at this point, we just know that if we see this
865         # version info again, that its signature checks out and that
866         # we're okay to skip the signature-checking step.
867
868         # (server, shnum) are bound in the method invocation.
869         if (server, shnum) in self._servermap.get_bad_shares():
870             # we've been told that the rest of the data in this share is
871             # unusable, so don't add it to the servermap.
872             self.log("but we've been told this is a bad share",
873                      parent=lp, level=log.UNUSUAL)
874             return verinfo
875
876         # Add the info to our servermap.
877         timestamp = time.time()
878         self._servermap.add_new_share(server, shnum, verinfo, timestamp)
879
880         return verinfo
881
882
883     def _got_update_results_one_share(self, results, share):
884         """
885         I record the update results in results.
886         """
887         assert len(results) == 4
888         verinfo, blockhashes, start, end = results
889         (seqnum,
890          root_hash,
891          saltish,
892          segsize,
893          datalen,
894          k,
895          n,
896          prefix,
897          offsets) = verinfo
898         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
899
900         # XXX: This should be done for us in the method, so
901         # presumably you can go in there and fix it.
902         verinfo = (seqnum,
903                    root_hash,
904                    saltish,
905                    segsize,
906                    datalen,
907                    k,
908                    n,
909                    prefix,
910                    offsets_tuple)
911
912         update_data = (blockhashes, start, end)
913         self._servermap.set_update_data_for_share_and_verinfo(share,
914                                                               verinfo,
915                                                               update_data)
916
917
918     def _deserialize_pubkey(self, pubkey_s):
919         verifier = rsa.create_verifying_key_from_string(pubkey_s)
920         return verifier
921
922
923     def _try_to_validate_privkey(self, enc_privkey, server, shnum, lp):
924         """
925         Given a writekey from a remote server, I validate it against the
926         writekey stored in my node. If it is valid, then I set the
927         privkey and encprivkey properties of the node.
928         """
929         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
930         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
931         if alleged_writekey != self._node.get_writekey():
932             self.log("invalid privkey from %s shnum %d" %
933                      (server.get_name(), shnum),
934                      parent=lp, level=log.WEIRD, umid="aJVccw")
935             return
936
937         # it's good
938         self.log("got valid privkey from shnum %d on serverid %s" %
939                  (shnum, server.get_name()),
940                  parent=lp)
941         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
942         self._node._populate_encprivkey(enc_privkey)
943         self._node._populate_privkey(privkey)
944         self._need_privkey = False
945         self._status.set_privkey_from(server)
946
947
948     def _add_lease_failed(self, f, server, storage_index):
949         # Older versions of Tahoe didn't handle the add-lease message very
950         # well: <=1.1.0 throws a NameError because it doesn't implement
951         # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
952         # (which is most of them, since we send add-lease to everybody,
953         # before we know whether or not they have any shares for us), and
954         # 1.2.0 throws KeyError even on known buckets due to an internal bug
955         # in the latency-measuring code.
956
957         # we want to ignore the known-harmless errors and log the others. In
958         # particular we want to log any local errors caused by coding
959         # problems.
960
961         if f.check(DeadReferenceError):
962             return
963         if f.check(RemoteException):
964             if f.value.failure.check(KeyError, IndexError, NameError):
965                 # this may ignore a bit too much, but that only hurts us
966                 # during debugging
967                 return
968             self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
969                      name=server.get_name(),
970                      f_value=str(f.value),
971                      failure=f,
972                      level=log.WEIRD, umid="iqg3mw")
973             return
974         # local errors are cause for alarm
975         log.err(f,
976                 format="local error in add_lease to [%(name)s]: %(f_value)s",
977                 name=server.get_name(),
978                 f_value=str(f.value),
979                 level=log.WEIRD, umid="ZWh6HA")
980
981     def _query_failed(self, f, server):
982         if not self._running:
983             return
984         level = log.WEIRD
985         if f.check(DeadReferenceError):
986             level = log.UNUSUAL
987         self.log(format="error during query: %(f_value)s",
988                  f_value=str(f.value), failure=f,
989                  level=level, umid="IHXuQg")
990         self._must_query.discard(server)
991         self._queries_outstanding.discard(server)
992         self._bad_servers.add(server)
993         self._servermap.add_problem(f)
994         # a server could be in both ServerMap.reachable_servers and
995         # .unreachable_servers if they responded to our query, but then an
996         # exception was raised in _got_results.
997         self._servermap.mark_server_unreachable(server)
998         self._queries_completed += 1
999         self._last_failure = f
1000
1001
1002     def _privkey_query_failed(self, f, server, shnum, lp):
1003         self._queries_outstanding.discard(server)
1004         if not self._running:
1005             return
1006         level = log.WEIRD
1007         if f.check(DeadReferenceError):
1008             level = log.UNUSUAL
1009         self.log(format="error during privkey query: %(f_value)s",
1010                  f_value=str(f.value), failure=f,
1011                  parent=lp, level=level, umid="McoJ5w")
1012         self._servermap.add_problem(f)
1013         self._last_failure = f
1014
1015
1016     def _check_for_done(self, res):
1017         # exit paths:
1018         #  return self._send_more_queries(outstanding) : send some more queries
1019         #  return self._done() : all done
1020         #  return : keep waiting, no new queries
1021         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
1022                               "%(outstanding)d queries outstanding, "
1023                               "%(extra)d extra servers available, "
1024                               "%(must)d 'must query' servers left, "
1025                               "need_privkey=%(need_privkey)s"
1026                               ),
1027                       mode=self.mode,
1028                       outstanding=len(self._queries_outstanding),
1029                       extra=len(self.extra_servers),
1030                       must=len(self._must_query),
1031                       need_privkey=self._need_privkey,
1032                       level=log.NOISY,
1033                       )
1034
1035         if not self._running:
1036             self.log("but we're not running", parent=lp, level=log.NOISY)
1037             return
1038
1039         if self._must_query:
1040             # we are still waiting for responses from servers that used to have
1041             # a share, so we must continue to wait. No additional queries are
1042             # required at this time.
1043             self.log("%d 'must query' servers left" % len(self._must_query),
1044                      level=log.NOISY, parent=lp)
1045             return
1046
1047         if (not self._queries_outstanding and not self.extra_servers):
1048             # all queries have retired, and we have no servers left to ask. No
1049             # more progress can be made, therefore we are done.
1050             self.log("all queries are retired, no extra servers: done",
1051                      parent=lp)
1052             return self._done()
1053
1054         recoverable_versions = self._servermap.recoverable_versions()
1055         unrecoverable_versions = self._servermap.unrecoverable_versions()
1056
1057         # what is our completion policy? how hard should we work?
1058
1059         if self.mode == MODE_ANYTHING:
1060             if recoverable_versions:
1061                 self.log("%d recoverable versions: done"
1062                          % len(recoverable_versions),
1063                          parent=lp)
1064                 return self._done()
1065
1066         if self.mode == (MODE_CHECK, MODE_REPAIR):
1067             # we used self._must_query, and we know there aren't any
1068             # responses still waiting, so that means we must be done
1069             self.log("done", parent=lp)
1070             return self._done()
1071
1072         MAX_IN_FLIGHT = 5
1073         if self.mode == MODE_READ:
1074             # if we've queried k+epsilon servers, and we see a recoverable
1075             # version, and we haven't seen any unrecoverable higher-seqnum'ed
1076             # versions, then we're done.
1077
1078             if self._queries_completed < self.num_servers_to_query:
1079                 self.log(format="%(completed)d completed, %(query)d to query: need more",
1080                          completed=self._queries_completed,
1081                          query=self.num_servers_to_query,
1082                          level=log.NOISY, parent=lp)
1083                 return self._send_more_queries(MAX_IN_FLIGHT)
1084             if not recoverable_versions:
1085                 self.log("no recoverable versions: need more",
1086                          level=log.NOISY, parent=lp)
1087                 return self._send_more_queries(MAX_IN_FLIGHT)
1088             highest_recoverable = max(recoverable_versions)
1089             highest_recoverable_seqnum = highest_recoverable[0]
1090             for unrec_verinfo in unrecoverable_versions:
1091                 if unrec_verinfo[0] > highest_recoverable_seqnum:
1092                     # there is evidence of a higher-seqnum version, but we
1093                     # don't yet see enough shares to recover it. Try harder.
1094                     # TODO: consider sending more queries.
1095                     # TODO: consider limiting the search distance
1096                     self.log("evidence of higher seqnum: need more",
1097                              level=log.UNUSUAL, parent=lp)
1098                     return self._send_more_queries(MAX_IN_FLIGHT)
1099             # all the unrecoverable versions were old or concurrent with a
1100             # recoverable version. Good enough.
1101             self.log("no higher-seqnum: done", parent=lp)
1102             return self._done()
1103
1104         if self.mode == MODE_WRITE:
1105             # we want to keep querying until we've seen a few that don't have
1106             # any shares, to be sufficiently confident that we've seen all
1107             # the shares. This is still less work than MODE_CHECK, which asks
1108             # every server in the world.
1109
1110             if not recoverable_versions:
1111                 self.log("no recoverable versions: need more", parent=lp,
1112                          level=log.NOISY)
1113                 return self._send_more_queries(MAX_IN_FLIGHT)
1114
1115             last_found = -1
1116             last_not_responded = -1
1117             num_not_responded = 0
1118             num_not_found = 0
1119             states = []
1120             found_boundary = False
1121
1122             for i,server in enumerate(self.full_serverlist):
1123                 if server in self._bad_servers:
1124                     # query failed
1125                     states.append("x")
1126                     #self.log("loop [%s]: x" % server.get_name()
1127                 elif server in self._empty_servers:
1128                     # no shares
1129                     states.append("0")
1130                     #self.log("loop [%s]: 0" % server.get_name()
1131                     if last_found != -1:
1132                         num_not_found += 1
1133                         if num_not_found >= self.EPSILON:
1134                             self.log("found our boundary, %s" %
1135                                      "".join(states),
1136                                      parent=lp, level=log.NOISY)
1137                             found_boundary = True
1138                             break
1139
1140                 elif server in self._good_servers:
1141                     # yes shares
1142                     states.append("1")
1143                     #self.log("loop [%s]: 1" % server.get_name()
1144                     last_found = i
1145                     num_not_found = 0
1146                 else:
1147                     # not responded yet
1148                     states.append("?")
1149                     #self.log("loop [%s]: ?" % server.get_name()
1150                     last_not_responded = i
1151                     num_not_responded += 1
1152
1153             if found_boundary:
1154                 # we need to know that we've gotten answers from
1155                 # everybody to the left of here
1156                 if last_not_responded == -1:
1157                     # we're done
1158                     self.log("have all our answers",
1159                              parent=lp, level=log.NOISY)
1160                     # .. unless we're still waiting on the privkey
1161                     if self._need_privkey:
1162                         self.log("but we're still waiting for the privkey",
1163                                  parent=lp, level=log.NOISY)
1164                         # if we found the boundary but we haven't yet found
1165                         # the privkey, we may need to look further. If
1166                         # somehow all the privkeys were corrupted (but the
1167                         # shares were readable), then this is likely to do an
1168                         # exhaustive search.
1169                         return self._send_more_queries(MAX_IN_FLIGHT)
1170                     return self._done()
1171                 # still waiting for somebody
1172                 return self._send_more_queries(num_not_responded)
1173
1174             # if we hit here, we didn't find our boundary, so we're still
1175             # waiting for servers
1176             self.log("no boundary yet, %s" % "".join(states), parent=lp,
1177                      level=log.NOISY)
1178             return self._send_more_queries(MAX_IN_FLIGHT)
1179
1180         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
1181         # arbitrary, really I want this to be something like k -
1182         # max(known_version_sharecounts) + some extra
1183         self.log("catchall: need more", parent=lp, level=log.NOISY)
1184         return self._send_more_queries(MAX_IN_FLIGHT)
1185
1186     def _send_more_queries(self, num_outstanding):
1187         more_queries = []
1188
1189         while True:
1190             self.log(format=" there are %(outstanding)d queries outstanding",
1191                      outstanding=len(self._queries_outstanding),
1192                      level=log.NOISY)
1193             active_queries = len(self._queries_outstanding) + len(more_queries)
1194             if active_queries >= num_outstanding:
1195                 break
1196             if not self.extra_servers:
1197                 break
1198             more_queries.append(self.extra_servers.pop(0))
1199
1200         self.log(format="sending %(more)d more queries: %(who)s",
1201                  more=len(more_queries),
1202                  who=" ".join(["[%s]" % s.get_name() for s in more_queries]),
1203                  level=log.NOISY)
1204
1205         for server in more_queries:
1206             self._do_query(server, self._storage_index, self._read_size)
1207             # we'll retrigger when those queries come back
1208
1209     def _done(self):
1210         if not self._running:
1211             self.log("not running; we're already done")
1212             return
1213         self._running = False
1214         now = time.time()
1215         elapsed = now - self._started
1216         self._status.set_finished(now)
1217         self._status.timings["total"] = elapsed
1218         self._status.set_progress(1.0)
1219         self._status.set_status("Finished")
1220         self._status.set_active(False)
1221
1222         self._servermap.set_last_update(self.mode, self._started)
1223         # the servermap will not be touched after this
1224         self.log("servermap: %s" % self._servermap.summarize_versions())
1225
1226         eventually(self._done_deferred.callback, self._servermap)
1227
1228     def _fatal_error(self, f):
1229         self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1230         self._done_deferred.errback(f)
1231
1232