]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
592e600edf5c43cf996ba44eec396a6640d8898f
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually
8 from allmydata.util import base32, hashutil, idlib, log
9 from allmydata.storage.server import si_b2a
10 from allmydata.interfaces import IServermapUpdaterStatus
11 from pycryptopp.publickey import rsa
12
13 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
14      DictOfSets, CorruptShareError, NeedMoreDataError
15 from layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
16      SIGNED_PREFIX_LENGTH
17
18 class UpdateStatus:
19     implements(IServermapUpdaterStatus)
20     statusid_counter = count(0)
21     def __init__(self):
22         self.timings = {}
23         self.timings["per_server"] = {}
24         self.timings["cumulative_verify"] = 0.0
25         self.privkey_from = None
26         self.problems = {}
27         self.active = True
28         self.storage_index = None
29         self.mode = "?"
30         self.status = "Not started"
31         self.progress = 0.0
32         self.counter = self.statusid_counter.next()
33         self.started = time.time()
34         self.finished = None
35
36     def add_per_server_time(self, peerid, op, sent, elapsed):
37         assert op in ("query", "late", "privkey")
38         if peerid not in self.timings["per_server"]:
39             self.timings["per_server"][peerid] = []
40         self.timings["per_server"][peerid].append((op,sent,elapsed))
41
42     def get_started(self):
43         return self.started
44     def get_finished(self):
45         return self.finished
46     def get_storage_index(self):
47         return self.storage_index
48     def get_mode(self):
49         return self.mode
50     def get_servermap(self):
51         return self.servermap
52     def get_privkey_from(self):
53         return self.privkey_from
54     def using_helper(self):
55         return False
56     def get_size(self):
57         return "-NA-"
58     def get_status(self):
59         return self.status
60     def get_progress(self):
61         return self.progress
62     def get_active(self):
63         return self.active
64     def get_counter(self):
65         return self.counter
66
67     def set_storage_index(self, si):
68         self.storage_index = si
69     def set_mode(self, mode):
70         self.mode = mode
71     def set_privkey_from(self, peerid):
72         self.privkey_from = peerid
73     def set_status(self, status):
74         self.status = status
75     def set_progress(self, value):
76         self.progress = value
77     def set_active(self, value):
78         self.active = value
79     def set_finished(self, when):
80         self.finished = when
81
82 class ServerMap:
83     """I record the placement of mutable shares.
84
85     This object records which shares (of various versions) are located on
86     which servers.
87
88     One purpose I serve is to inform callers about which versions of the
89     mutable file are recoverable and 'current'.
90
91     A second purpose is to serve as a state marker for test-and-set
92     operations. I am passed out of retrieval operations and back into publish
93     operations, which means 'publish this new version, but only if nothing
94     has changed since I last retrieved this data'. This reduces the chances
95     of clobbering a simultaneous (uncoordinated) write.
96
97     @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
98                      (versionid, timestamp) tuple. Each 'versionid' is a
99                      tuple of (seqnum, root_hash, IV, segsize, datalength,
100                      k, N, signed_prefix, offsets)
101
102     @ivar connections: maps peerid to a RemoteReference
103
104     @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
105                       shares that I should ignore (because a previous user of
106                       the servermap determined that they were invalid). The
107                       updater only locates a certain number of shares: if
108                       some of these turn out to have integrity problems and
109                       are unusable, the caller will need to mark those shares
110                       as bad, then re-update the servermap, then try again.
111                       The dict maps (peerid, shnum) tuple to old checkstring.
112     """
113
114     def __init__(self):
115         self.servermap = {}
116         self.connections = {}
117         self.unreachable_peers = set() # peerids that didn't respond to queries
118         self.reachable_peers = set() # peerids that did respond to queries
119         self.problems = [] # mostly for debugging
120         self.bad_shares = {} # maps (peerid,shnum) to old checkstring
121         self.last_update_mode = None
122         self.last_update_time = 0
123
124     def copy(self):
125         s = ServerMap()
126         s.servermap = self.servermap.copy() # tuple->tuple
127         s.connections = self.connections.copy() # str->RemoteReference
128         s.unreachable_peers = set(self.unreachable_peers)
129         s.reachable_peers = set(self.reachable_peers)
130         s.problems = self.problems[:]
131         s.bad_shares = self.bad_shares.copy() # tuple->str
132         s.last_update_mode = self.last_update_mode
133         s.last_update_time = self.last_update_time
134         return s
135
136     def mark_bad_share(self, peerid, shnum, checkstring):
137         """This share was found to be bad, either in the checkstring or
138         signature (detected during mapupdate), or deeper in the share
139         (detected at retrieve time). Remove it from our list of useful
140         shares, and remember that it is bad so we don't add it back again
141         later. We record the share's old checkstring (which might be
142         corrupted or badly signed) so that a repair operation can do the
143         test-and-set using it as a reference.
144         """
145         key = (peerid, shnum) # record checkstring
146         self.bad_shares[key] = checkstring
147         self.servermap.pop(key, None)
148
149     def add_new_share(self, peerid, shnum, verinfo, timestamp):
150         """We've written a new share out, replacing any that was there
151         before."""
152         key = (peerid, shnum)
153         self.bad_shares.pop(key, None)
154         self.servermap[key] = (verinfo, timestamp)
155
156     def dump(self, out=sys.stdout):
157         print >>out, "servermap:"
158
159         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
160             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
161              offsets_tuple) = verinfo
162             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
163                           (idlib.shortnodeid_b2a(peerid), shnum,
164                            seqnum, base32.b2a(root_hash)[:4], k, N,
165                            datalength))
166         if self.problems:
167             print >>out, "%d PROBLEMS" % len(self.problems)
168             for f in self.problems:
169                 print >>out, str(f)
170         return out
171
172     def all_peers(self):
173         return set([peerid
174                     for (peerid, shnum)
175                     in self.servermap])
176
177     def all_peers_for_version(self, verinfo):
178         """Return a set of peerids that hold shares for the given version."""
179         return set([peerid
180                     for ( (peerid, shnum), (verinfo2, timestamp) )
181                     in self.servermap.items()
182                     if verinfo == verinfo2])
183
184     def make_sharemap(self):
185         """Return a dict that maps shnum to a set of peerds that hold it."""
186         sharemap = DictOfSets()
187         for (peerid, shnum) in self.servermap:
188             sharemap.add(shnum, peerid)
189         return sharemap
190
191     def make_versionmap(self):
192         """Return a dict that maps versionid to sets of (shnum, peerid,
193         timestamp) tuples."""
194         versionmap = DictOfSets()
195         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
196             versionmap.add(verinfo, (shnum, peerid, timestamp))
197         return versionmap
198
199     def shares_on_peer(self, peerid):
200         return set([shnum
201                     for (s_peerid, shnum)
202                     in self.servermap
203                     if s_peerid == peerid])
204
205     def version_on_peer(self, peerid, shnum):
206         key = (peerid, shnum)
207         if key in self.servermap:
208             (verinfo, timestamp) = self.servermap[key]
209             return verinfo
210         return None
211
212     def shares_available(self):
213         """Return a dict that maps verinfo to tuples of
214         (num_distinct_shares, k, N) tuples."""
215         versionmap = self.make_versionmap()
216         all_shares = {}
217         for verinfo, shares in versionmap.items():
218             s = set()
219             for (shnum, peerid, timestamp) in shares:
220                 s.add(shnum)
221             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
222              offsets_tuple) = verinfo
223             all_shares[verinfo] = (len(s), k, N)
224         return all_shares
225
226     def highest_seqnum(self):
227         available = self.shares_available()
228         seqnums = [verinfo[0]
229                    for verinfo in available.keys()]
230         seqnums.append(0)
231         return max(seqnums)
232
233     def summarize_version(self, verinfo):
234         """Take a versionid, return a string that describes it."""
235         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
236          offsets_tuple) = verinfo
237         return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
238
239     def summarize_versions(self):
240         """Return a string describing which versions we know about."""
241         versionmap = self.make_versionmap()
242         bits = []
243         for (verinfo, shares) in versionmap.items():
244             vstr = self.summarize_version(verinfo)
245             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
246             bits.append("%d*%s" % (len(shnums), vstr))
247         return "/".join(bits)
248
249     def recoverable_versions(self):
250         """Return a set of versionids, one for each version that is currently
251         recoverable."""
252         versionmap = self.make_versionmap()
253
254         recoverable_versions = set()
255         for (verinfo, shares) in versionmap.items():
256             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
257              offsets_tuple) = verinfo
258             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
259             if len(shnums) >= k:
260                 # this one is recoverable
261                 recoverable_versions.add(verinfo)
262
263         return recoverable_versions
264
265     def unrecoverable_versions(self):
266         """Return a set of versionids, one for each version that is currently
267         unrecoverable."""
268         versionmap = self.make_versionmap()
269
270         unrecoverable_versions = set()
271         for (verinfo, shares) in versionmap.items():
272             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
273              offsets_tuple) = verinfo
274             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
275             if len(shnums) < k:
276                 unrecoverable_versions.add(verinfo)
277
278         return unrecoverable_versions
279
280     def best_recoverable_version(self):
281         """Return a single versionid, for the so-called 'best' recoverable
282         version. Sequence number is the primary sort criteria, followed by
283         root hash. Returns None if there are no recoverable versions."""
284         recoverable = list(self.recoverable_versions())
285         recoverable.sort()
286         if recoverable:
287             return recoverable[-1]
288         return None
289
290     def size_of_version(self, verinfo):
291         """Given a versionid (perhaps returned by best_recoverable_version),
292         return the size of the file in bytes."""
293         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
294          offsets_tuple) = verinfo
295         return datalength
296
297     def unrecoverable_newer_versions(self):
298         # Return a dict of versionid -> health, for versions that are
299         # unrecoverable and have later seqnums than any recoverable versions.
300         # These indicate that a write will lose data.
301         versionmap = self.make_versionmap()
302         healths = {} # maps verinfo to (found,k)
303         unrecoverable = set()
304         highest_recoverable_seqnum = -1
305         for (verinfo, shares) in versionmap.items():
306             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
307              offsets_tuple) = verinfo
308             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
309             healths[verinfo] = (len(shnums),k)
310             if len(shnums) < k:
311                 unrecoverable.add(verinfo)
312             else:
313                 highest_recoverable_seqnum = max(seqnum,
314                                                  highest_recoverable_seqnum)
315
316         newversions = {}
317         for verinfo in unrecoverable:
318             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
319              offsets_tuple) = verinfo
320             if seqnum > highest_recoverable_seqnum:
321                 newversions[verinfo] = healths[verinfo]
322
323         return newversions
324
325
326     def needs_merge(self):
327         # return True if there are multiple recoverable versions with the
328         # same seqnum, meaning that MutableFileNode.read_best_version is not
329         # giving you the whole story, and that using its data to do a
330         # subsequent publish will lose information.
331         recoverable_seqnums = [verinfo[0]
332                                for verinfo in self.recoverable_versions()]
333         for seqnum in recoverable_seqnums:
334             if recoverable_seqnums.count(seqnum) > 1:
335                 return True
336         return False
337
338
339 class ServermapUpdater:
340     def __init__(self, filenode, monitor, servermap, mode=MODE_READ,
341                  add_lease=False):
342         """I update a servermap, locating a sufficient number of useful
343         shares and remembering where they are located.
344
345         """
346
347         self._node = filenode
348         self._monitor = monitor
349         self._servermap = servermap
350         self.mode = mode
351         self._add_lease = add_lease
352         self._running = True
353
354         self._storage_index = filenode.get_storage_index()
355         self._last_failure = None
356
357         self._status = UpdateStatus()
358         self._status.set_storage_index(self._storage_index)
359         self._status.set_progress(0.0)
360         self._status.set_mode(mode)
361
362         self._servers_responded = set()
363
364         # how much data should we read?
365         #  * if we only need the checkstring, then [0:75]
366         #  * if we need to validate the checkstring sig, then [543ish:799ish]
367         #  * if we need the verification key, then [107:436ish]
368         #   * the offset table at [75:107] tells us about the 'ish'
369         #  * if we need the encrypted private key, we want [-1216ish:]
370         #   * but we can't read from negative offsets
371         #   * the offset table tells us the 'ish', also the positive offset
372         # A future version of the SMDF slot format should consider using
373         # fixed-size slots so we can retrieve less data. For now, we'll just
374         # read 2000 bytes, which also happens to read enough actual data to
375         # pre-fetch a 9-entry dirnode.
376         self._read_size = 4000
377         if mode == MODE_CHECK:
378             # we use unpack_prefix_and_signature, so we need 1k
379             self._read_size = 1000
380         self._need_privkey = False
381         if mode == MODE_WRITE and not self._node._privkey:
382             self._need_privkey = True
383         # check+repair: repair requires the privkey, so if we didn't happen
384         # to ask for it during the check, we'll have problems doing the
385         # publish.
386
387         prefix = si_b2a(self._storage_index)[:5]
388         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
389                                    si=prefix, mode=mode)
390
391     def get_status(self):
392         return self._status
393
394     def log(self, *args, **kwargs):
395         if "parent" not in kwargs:
396             kwargs["parent"] = self._log_number
397         if "facility" not in kwargs:
398             kwargs["facility"] = "tahoe.mutable.mapupdate"
399         return log.msg(*args, **kwargs)
400
401     def update(self):
402         """Update the servermap to reflect current conditions. Returns a
403         Deferred that fires with the servermap once the update has finished."""
404         self._started = time.time()
405         self._status.set_active(True)
406
407         # self._valid_versions is a set of validated verinfo tuples. We just
408         # use it to remember which versions had valid signatures, so we can
409         # avoid re-checking the signatures for each share.
410         self._valid_versions = set()
411
412         # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
413         # timestamp) tuples. This is used to figure out which versions might
414         # be retrievable, and to make the eventual data download faster.
415         self.versionmap = DictOfSets()
416
417         self._done_deferred = defer.Deferred()
418
419         # first, which peers should be talk to? Any that were in our old
420         # servermap, plus "enough" others.
421
422         self._queries_completed = 0
423
424         client = self._node._client
425         full_peerlist = client.get_permuted_peers("storage",
426                                                   self._node._storage_index)
427         self.full_peerlist = full_peerlist # for use later, immutable
428         self.extra_peers = full_peerlist[:] # peers are removed as we use them
429         self._good_peers = set() # peers who had some shares
430         self._empty_peers = set() # peers who don't have any shares
431         self._bad_peers = set() # peers to whom our queries failed
432
433         k = self._node.get_required_shares()
434         if k is None:
435             # make a guess
436             k = 3
437         N = self._node.get_required_shares()
438         if N is None:
439             N = 10
440         self.EPSILON = k
441         # we want to send queries to at least this many peers (although we
442         # might not wait for all of their answers to come back)
443         self.num_peers_to_query = k + self.EPSILON
444
445         if self.mode == MODE_CHECK:
446             initial_peers_to_query = dict(full_peerlist)
447             must_query = set(initial_peers_to_query.keys())
448             self.extra_peers = []
449         elif self.mode == MODE_WRITE:
450             # we're planning to replace all the shares, so we want a good
451             # chance of finding them all. We will keep searching until we've
452             # seen epsilon that don't have a share.
453             self.num_peers_to_query = N + self.EPSILON
454             initial_peers_to_query, must_query = self._build_initial_querylist()
455             self.required_num_empty_peers = self.EPSILON
456
457             # TODO: arrange to read lots of data from k-ish servers, to avoid
458             # the extra round trip required to read large directories. This
459             # might also avoid the round trip required to read the encrypted
460             # private key.
461
462         else:
463             initial_peers_to_query, must_query = self._build_initial_querylist()
464
465         # this is a set of peers that we are required to get responses from:
466         # they are peers who used to have a share, so we need to know where
467         # they currently stand, even if that means we have to wait for a
468         # silently-lost TCP connection to time out. We remove peers from this
469         # set as we get responses.
470         self._must_query = must_query
471
472         # now initial_peers_to_query contains the peers that we should ask,
473         # self.must_query contains the peers that we must have heard from
474         # before we can consider ourselves finished, and self.extra_peers
475         # contains the overflow (peers that we should tap if we don't get
476         # enough responses)
477
478         self._send_initial_requests(initial_peers_to_query)
479         self._status.timings["initial_queries"] = time.time() - self._started
480         return self._done_deferred
481
482     def _build_initial_querylist(self):
483         initial_peers_to_query = {}
484         must_query = set()
485         for peerid in self._servermap.all_peers():
486             ss = self._servermap.connections[peerid]
487             # we send queries to everyone who was already in the sharemap
488             initial_peers_to_query[peerid] = ss
489             # and we must wait for responses from them
490             must_query.add(peerid)
491
492         while ((self.num_peers_to_query > len(initial_peers_to_query))
493                and self.extra_peers):
494             (peerid, ss) = self.extra_peers.pop(0)
495             initial_peers_to_query[peerid] = ss
496
497         return initial_peers_to_query, must_query
498
499     def _send_initial_requests(self, peerlist):
500         self._status.set_status("Sending %d initial queries" % len(peerlist))
501         self._queries_outstanding = set()
502         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
503         dl = []
504         for (peerid, ss) in peerlist.items():
505             self._queries_outstanding.add(peerid)
506             self._do_query(ss, peerid, self._storage_index, self._read_size)
507
508         if not peerlist:
509             # there is nobody to ask, so we need to short-circuit the state
510             # machine.
511             d = defer.maybeDeferred(self._check_for_done, None)
512             d.addErrback(self._fatal_error)
513
514         # control flow beyond this point: state machine. Receiving responses
515         # from queries is the input. We might send out more queries, or we
516         # might produce a result.
517         return None
518
519     def _do_query(self, ss, peerid, storage_index, readsize):
520         self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
521                  peerid=idlib.shortnodeid_b2a(peerid),
522                  readsize=readsize,
523                  level=log.NOISY)
524         self._servermap.connections[peerid] = ss
525         started = time.time()
526         self._queries_outstanding.add(peerid)
527         d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
528         d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
529                       started)
530         d.addErrback(self._query_failed, peerid)
531         # errors that aren't handled by _query_failed (and errors caused by
532         # _query_failed) get logged, but we still want to check for doneness.
533         d.addErrback(log.err)
534         d.addBoth(self._check_for_done)
535         d.addErrback(self._fatal_error)
536         return d
537
538     def _do_read(self, ss, peerid, storage_index, shnums, readv):
539         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
540         if self._add_lease:
541             renew_secret = self._node.get_renewal_secret(peerid)
542             cancel_secret = self._node.get_cancel_secret(peerid)
543             d2 = ss.callRemote("add_lease", storage_index,
544                                renew_secret, cancel_secret)
545             dl = defer.DeferredList([d, d2], consumeErrors=True)
546             def _done(res):
547                 [(readv_success, readv_result),
548                  (addlease_success, addlease_result)] = res
549                 # ignore remote IndexError on the add_lease call. Propagate
550                 # local errors and remote non-IndexErrors
551                 if addlease_success:
552                     return readv_result
553                 if not addlease_result.check(RemoteException):
554                     # Propagate local errors
555                     return addlease_result
556                 if addlease_result.value.failure.check(IndexError):
557                     # tahoe=1.3.0 raised IndexError on non-existant
558                     # buckets, which we ignore
559                     return readv_result
560                 # propagate remote errors that aren't IndexError, including
561                 # the unfortunate internal KeyError bug that <1.3.0 had.
562                 return addlease_result
563             dl.addCallback(_done)
564             return dl
565         return d
566
567     def _got_results(self, datavs, peerid, readsize, stuff, started):
568         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
569                       peerid=idlib.shortnodeid_b2a(peerid),
570                       numshares=len(datavs),
571                       level=log.NOISY)
572         now = time.time()
573         elapsed = now - started
574         self._queries_outstanding.discard(peerid)
575         self._servermap.reachable_peers.add(peerid)
576         self._must_query.discard(peerid)
577         self._queries_completed += 1
578         if not self._running:
579             self.log("but we're not running, so we'll ignore it", parent=lp,
580                      level=log.NOISY)
581             self._status.add_per_server_time(peerid, "late", started, elapsed)
582             return
583         self._status.add_per_server_time(peerid, "query", started, elapsed)
584
585         if datavs:
586             self._good_peers.add(peerid)
587         else:
588             self._empty_peers.add(peerid)
589
590         last_verinfo = None
591         last_shnum = None
592         for shnum,datav in datavs.items():
593             data = datav[0]
594             try:
595                 verinfo = self._got_results_one_share(shnum, data, peerid, lp)
596                 last_verinfo = verinfo
597                 last_shnum = shnum
598                 self._node._cache.add(verinfo, shnum, 0, data, now)
599             except CorruptShareError, e:
600                 # log it and give the other shares a chance to be processed
601                 f = failure.Failure()
602                 self.log(format="bad share: %(f_value)s", f_value=str(f.value),
603                          failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
604                 self.notify_server_corruption(peerid, shnum, str(e))
605                 self._bad_peers.add(peerid)
606                 self._last_failure = f
607                 checkstring = data[:SIGNED_PREFIX_LENGTH]
608                 self._servermap.mark_bad_share(peerid, shnum, checkstring)
609                 self._servermap.problems.append(f)
610                 pass
611
612         self._status.timings["cumulative_verify"] += (time.time() - now)
613
614         if self._need_privkey and last_verinfo:
615             # send them a request for the privkey. We send one request per
616             # server.
617             lp2 = self.log("sending privkey request",
618                            parent=lp, level=log.NOISY)
619             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
620              offsets_tuple) = last_verinfo
621             o = dict(offsets_tuple)
622
623             self._queries_outstanding.add(peerid)
624             readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
625             ss = self._servermap.connections[peerid]
626             privkey_started = time.time()
627             d = self._do_read(ss, peerid, self._storage_index,
628                               [last_shnum], readv)
629             d.addCallback(self._got_privkey_results, peerid, last_shnum,
630                           privkey_started, lp2)
631             d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
632             d.addErrback(log.err)
633             d.addCallback(self._check_for_done)
634             d.addErrback(self._fatal_error)
635
636         # all done!
637         self.log("_got_results done", parent=lp, level=log.NOISY)
638
639     def notify_server_corruption(self, peerid, shnum, reason):
640         ss = self._servermap.connections[peerid]
641         ss.callRemoteOnly("advise_corrupt_share",
642                           "mutable", self._storage_index, shnum, reason)
643
644     def _got_results_one_share(self, shnum, data, peerid, lp):
645         self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
646                  shnum=shnum,
647                  peerid=idlib.shortnodeid_b2a(peerid),
648                  level=log.NOISY,
649                  parent=lp)
650
651         # this might raise NeedMoreDataError, if the pubkey and signature
652         # live at some weird offset. That shouldn't happen, so I'm going to
653         # treat it as a bad share.
654         (seqnum, root_hash, IV, k, N, segsize, datalength,
655          pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
656
657         if not self._node.get_pubkey():
658             fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
659             assert len(fingerprint) == 32
660             if fingerprint != self._node._fingerprint:
661                 raise CorruptShareError(peerid, shnum,
662                                         "pubkey doesn't match fingerprint")
663             self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
664
665         if self._need_privkey:
666             self._try_to_extract_privkey(data, peerid, shnum, lp)
667
668         (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
669          ig_segsize, ig_datalen, offsets) = unpack_header(data)
670         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
671
672         verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
673                    offsets_tuple)
674
675         if verinfo not in self._valid_versions:
676             # it's a new pair. Verify the signature.
677             valid = self._node._pubkey.verify(prefix, signature)
678             if not valid:
679                 raise CorruptShareError(peerid, shnum, "signature is invalid")
680
681             # ok, it's a valid verinfo. Add it to the list of validated
682             # versions.
683             self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
684                      % (seqnum, base32.b2a(root_hash)[:4],
685                         idlib.shortnodeid_b2a(peerid), shnum,
686                         k, N, segsize, datalength),
687                      parent=lp)
688             self._valid_versions.add(verinfo)
689         # We now know that this is a valid candidate verinfo.
690
691         if (peerid, shnum) in self._servermap.bad_shares:
692             # we've been told that the rest of the data in this share is
693             # unusable, so don't add it to the servermap.
694             self.log("but we've been told this is a bad share",
695                      parent=lp, level=log.UNUSUAL)
696             return verinfo
697
698         # Add the info to our servermap.
699         timestamp = time.time()
700         self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
701         # and the versionmap
702         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
703         return verinfo
704
705     def _deserialize_pubkey(self, pubkey_s):
706         verifier = rsa.create_verifying_key_from_string(pubkey_s)
707         return verifier
708
709     def _try_to_extract_privkey(self, data, peerid, shnum, lp):
710         try:
711             r = unpack_share(data)
712         except NeedMoreDataError, e:
713             # this share won't help us. oh well.
714             offset = e.encprivkey_offset
715             length = e.encprivkey_length
716             self.log("shnum %d on peerid %s: share was too short (%dB) "
717                      "to get the encprivkey; [%d:%d] ought to hold it" %
718                      (shnum, idlib.shortnodeid_b2a(peerid), len(data),
719                       offset, offset+length),
720                      parent=lp)
721             # NOTE: if uncoordinated writes are taking place, someone might
722             # change the share (and most probably move the encprivkey) before
723             # we get a chance to do one of these reads and fetch it. This
724             # will cause us to see a NotEnoughSharesError(unable to fetch
725             # privkey) instead of an UncoordinatedWriteError . This is a
726             # nuisance, but it will go away when we move to DSA-based mutable
727             # files (since the privkey will be small enough to fit in the
728             # write cap).
729
730             return
731
732         (seqnum, root_hash, IV, k, N, segsize, datalen,
733          pubkey, signature, share_hash_chain, block_hash_tree,
734          share_data, enc_privkey) = r
735
736         return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
737
738     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
739
740         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
741         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
742         if alleged_writekey != self._node.get_writekey():
743             self.log("invalid privkey from %s shnum %d" %
744                      (idlib.nodeid_b2a(peerid)[:8], shnum),
745                      parent=lp, level=log.WEIRD, umid="aJVccw")
746             return
747
748         # it's good
749         self.log("got valid privkey from shnum %d on peerid %s" %
750                  (shnum, idlib.shortnodeid_b2a(peerid)),
751                  parent=lp)
752         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
753         self._node._populate_encprivkey(enc_privkey)
754         self._node._populate_privkey(privkey)
755         self._need_privkey = False
756         self._status.set_privkey_from(peerid)
757
758
759     def _query_failed(self, f, peerid):
760         if not self._running:
761             return
762         level = log.WEIRD
763         if f.check(DeadReferenceError):
764             level = log.UNUSUAL
765         self.log(format="error during query: %(f_value)s",
766                  f_value=str(f.value), failure=f,
767                  level=level, umid="IHXuQg")
768         self._must_query.discard(peerid)
769         self._queries_outstanding.discard(peerid)
770         self._bad_peers.add(peerid)
771         self._servermap.problems.append(f)
772         # a peerid could be in both ServerMap.reachable_peers and
773         # .unreachable_peers if they responded to our query, but then an
774         # exception was raised in _got_results.
775         self._servermap.unreachable_peers.add(peerid)
776         self._queries_completed += 1
777         self._last_failure = f
778
779     def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
780         now = time.time()
781         elapsed = now - started
782         self._status.add_per_server_time(peerid, "privkey", started, elapsed)
783         self._queries_outstanding.discard(peerid)
784         if not self._need_privkey:
785             return
786         if shnum not in datavs:
787             self.log("privkey wasn't there when we asked it",
788                      level=log.WEIRD, umid="VA9uDQ")
789             return
790         datav = datavs[shnum]
791         enc_privkey = datav[0]
792         self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
793
794     def _privkey_query_failed(self, f, peerid, shnum, lp):
795         self._queries_outstanding.discard(peerid)
796         if not self._running:
797             return
798         level = log.WEIRD
799         if f.check(DeadReferenceError):
800             level = log.UNUSUAL
801         self.log(format="error during privkey query: %(f_value)s",
802                  f_value=str(f.value), failure=f,
803                  parent=lp, level=level, umid="McoJ5w")
804         self._servermap.problems.append(f)
805         self._last_failure = f
806
807     def _check_for_done(self, res):
808         # exit paths:
809         #  return self._send_more_queries(outstanding) : send some more queries
810         #  return self._done() : all done
811         #  return : keep waiting, no new queries
812
813         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
814                               "%(outstanding)d queries outstanding, "
815                               "%(extra)d extra peers available, "
816                               "%(must)d 'must query' peers left, "
817                               "need_privkey=%(need_privkey)s"
818                               ),
819                       mode=self.mode,
820                       outstanding=len(self._queries_outstanding),
821                       extra=len(self.extra_peers),
822                       must=len(self._must_query),
823                       need_privkey=self._need_privkey,
824                       level=log.NOISY,
825                       )
826
827         if not self._running:
828             self.log("but we're not running", parent=lp, level=log.NOISY)
829             return
830
831         if self._must_query:
832             # we are still waiting for responses from peers that used to have
833             # a share, so we must continue to wait. No additional queries are
834             # required at this time.
835             self.log("%d 'must query' peers left" % len(self._must_query),
836                      level=log.NOISY, parent=lp)
837             return
838
839         if (not self._queries_outstanding and not self.extra_peers):
840             # all queries have retired, and we have no peers left to ask. No
841             # more progress can be made, therefore we are done.
842             self.log("all queries are retired, no extra peers: done",
843                      parent=lp)
844             return self._done()
845
846         recoverable_versions = self._servermap.recoverable_versions()
847         unrecoverable_versions = self._servermap.unrecoverable_versions()
848
849         # what is our completion policy? how hard should we work?
850
851         if self.mode == MODE_ANYTHING:
852             if recoverable_versions:
853                 self.log("%d recoverable versions: done"
854                          % len(recoverable_versions),
855                          parent=lp)
856                 return self._done()
857
858         if self.mode == MODE_CHECK:
859             # we used self._must_query, and we know there aren't any
860             # responses still waiting, so that means we must be done
861             self.log("done", parent=lp)
862             return self._done()
863
864         MAX_IN_FLIGHT = 5
865         if self.mode == MODE_READ:
866             # if we've queried k+epsilon servers, and we see a recoverable
867             # version, and we haven't seen any unrecoverable higher-seqnum'ed
868             # versions, then we're done.
869
870             if self._queries_completed < self.num_peers_to_query:
871                 self.log(format="%(completed)d completed, %(query)d to query: need more",
872                          completed=self._queries_completed,
873                          query=self.num_peers_to_query,
874                          level=log.NOISY, parent=lp)
875                 return self._send_more_queries(MAX_IN_FLIGHT)
876             if not recoverable_versions:
877                 self.log("no recoverable versions: need more",
878                          level=log.NOISY, parent=lp)
879                 return self._send_more_queries(MAX_IN_FLIGHT)
880             highest_recoverable = max(recoverable_versions)
881             highest_recoverable_seqnum = highest_recoverable[0]
882             for unrec_verinfo in unrecoverable_versions:
883                 if unrec_verinfo[0] > highest_recoverable_seqnum:
884                     # there is evidence of a higher-seqnum version, but we
885                     # don't yet see enough shares to recover it. Try harder.
886                     # TODO: consider sending more queries.
887                     # TODO: consider limiting the search distance
888                     self.log("evidence of higher seqnum: need more",
889                              level=log.UNUSUAL, parent=lp)
890                     return self._send_more_queries(MAX_IN_FLIGHT)
891             # all the unrecoverable versions were old or concurrent with a
892             # recoverable version. Good enough.
893             self.log("no higher-seqnum: done", parent=lp)
894             return self._done()
895
896         if self.mode == MODE_WRITE:
897             # we want to keep querying until we've seen a few that don't have
898             # any shares, to be sufficiently confident that we've seen all
899             # the shares. This is still less work than MODE_CHECK, which asks
900             # every server in the world.
901
902             if not recoverable_versions:
903                 self.log("no recoverable versions: need more", parent=lp,
904                          level=log.NOISY)
905                 return self._send_more_queries(MAX_IN_FLIGHT)
906
907             last_found = -1
908             last_not_responded = -1
909             num_not_responded = 0
910             num_not_found = 0
911             states = []
912             found_boundary = False
913
914             for i,(peerid,ss) in enumerate(self.full_peerlist):
915                 if peerid in self._bad_peers:
916                     # query failed
917                     states.append("x")
918                     #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
919                 elif peerid in self._empty_peers:
920                     # no shares
921                     states.append("0")
922                     #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
923                     if last_found != -1:
924                         num_not_found += 1
925                         if num_not_found >= self.EPSILON:
926                             self.log("found our boundary, %s" %
927                                      "".join(states),
928                                      parent=lp, level=log.NOISY)
929                             found_boundary = True
930                             break
931
932                 elif peerid in self._good_peers:
933                     # yes shares
934                     states.append("1")
935                     #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
936                     last_found = i
937                     num_not_found = 0
938                 else:
939                     # not responded yet
940                     states.append("?")
941                     #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
942                     last_not_responded = i
943                     num_not_responded += 1
944
945             if found_boundary:
946                 # we need to know that we've gotten answers from
947                 # everybody to the left of here
948                 if last_not_responded == -1:
949                     # we're done
950                     self.log("have all our answers",
951                              parent=lp, level=log.NOISY)
952                     # .. unless we're still waiting on the privkey
953                     if self._need_privkey:
954                         self.log("but we're still waiting for the privkey",
955                                  parent=lp, level=log.NOISY)
956                         # if we found the boundary but we haven't yet found
957                         # the privkey, we may need to look further. If
958                         # somehow all the privkeys were corrupted (but the
959                         # shares were readable), then this is likely to do an
960                         # exhaustive search.
961                         return self._send_more_queries(MAX_IN_FLIGHT)
962                     return self._done()
963                 # still waiting for somebody
964                 return self._send_more_queries(num_not_responded)
965
966             # if we hit here, we didn't find our boundary, so we're still
967             # waiting for peers
968             self.log("no boundary yet, %s" % "".join(states), parent=lp,
969                      level=log.NOISY)
970             return self._send_more_queries(MAX_IN_FLIGHT)
971
972         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
973         # arbitrary, really I want this to be something like k -
974         # max(known_version_sharecounts) + some extra
975         self.log("catchall: need more", parent=lp, level=log.NOISY)
976         return self._send_more_queries(MAX_IN_FLIGHT)
977
978     def _send_more_queries(self, num_outstanding):
979         more_queries = []
980
981         while True:
982             self.log(format=" there are %(outstanding)d queries outstanding",
983                      outstanding=len(self._queries_outstanding),
984                      level=log.NOISY)
985             active_queries = len(self._queries_outstanding) + len(more_queries)
986             if active_queries >= num_outstanding:
987                 break
988             if not self.extra_peers:
989                 break
990             more_queries.append(self.extra_peers.pop(0))
991
992         self.log(format="sending %(more)d more queries: %(who)s",
993                  more=len(more_queries),
994                  who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
995                                for (peerid,ss) in more_queries]),
996                  level=log.NOISY)
997
998         for (peerid, ss) in more_queries:
999             self._do_query(ss, peerid, self._storage_index, self._read_size)
1000             # we'll retrigger when those queries come back
1001
1002     def _done(self):
1003         if not self._running:
1004             return
1005         self._running = False
1006         now = time.time()
1007         elapsed = now - self._started
1008         self._status.set_finished(now)
1009         self._status.timings["total"] = elapsed
1010         self._status.set_progress(1.0)
1011         self._status.set_status("Done")
1012         self._status.set_active(False)
1013
1014         self._servermap.last_update_mode = self.mode
1015         self._servermap.last_update_time = self._started
1016         # the servermap will not be touched after this
1017         self.log("servermap: %s" % self._servermap.summarize_versions())
1018         eventually(self._done_deferred.callback, self._servermap)
1019
1020     def _fatal_error(self, f):
1021         self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1022         self._done_deferred.errback(f)
1023
1024