]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
0efa37baa529a94de586f95cefdfc46a559fe9e8
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually
8 from allmydata.util import base32, hashutil, idlib, log
9 from allmydata.storage.server import si_b2a
10 from allmydata.interfaces import IServermapUpdaterStatus
11 from pycryptopp.publickey import rsa
12
13 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
14      DictOfSets, CorruptShareError, NeedMoreDataError
15 from layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
16      SIGNED_PREFIX_LENGTH
17
18 class UpdateStatus:
19     implements(IServermapUpdaterStatus)
20     statusid_counter = count(0)
21     def __init__(self):
22         self.timings = {}
23         self.timings["per_server"] = {}
24         self.timings["cumulative_verify"] = 0.0
25         self.privkey_from = None
26         self.problems = {}
27         self.active = True
28         self.storage_index = None
29         self.mode = "?"
30         self.status = "Not started"
31         self.progress = 0.0
32         self.counter = self.statusid_counter.next()
33         self.started = time.time()
34         self.finished = None
35
36     def add_per_server_time(self, peerid, op, sent, elapsed):
37         assert op in ("query", "late", "privkey")
38         if peerid not in self.timings["per_server"]:
39             self.timings["per_server"][peerid] = []
40         self.timings["per_server"][peerid].append((op,sent,elapsed))
41
42     def get_started(self):
43         return self.started
44     def get_finished(self):
45         return self.finished
46     def get_storage_index(self):
47         return self.storage_index
48     def get_mode(self):
49         return self.mode
50     def get_servermap(self):
51         return self.servermap
52     def get_privkey_from(self):
53         return self.privkey_from
54     def using_helper(self):
55         return False
56     def get_size(self):
57         return "-NA-"
58     def get_status(self):
59         return self.status
60     def get_progress(self):
61         return self.progress
62     def get_active(self):
63         return self.active
64     def get_counter(self):
65         return self.counter
66
67     def set_storage_index(self, si):
68         self.storage_index = si
69     def set_mode(self, mode):
70         self.mode = mode
71     def set_privkey_from(self, peerid):
72         self.privkey_from = peerid
73     def set_status(self, status):
74         self.status = status
75     def set_progress(self, value):
76         self.progress = value
77     def set_active(self, value):
78         self.active = value
79     def set_finished(self, when):
80         self.finished = when
81
82 class ServerMap:
83     """I record the placement of mutable shares.
84
85     This object records which shares (of various versions) are located on
86     which servers.
87
88     One purpose I serve is to inform callers about which versions of the
89     mutable file are recoverable and 'current'.
90
91     A second purpose is to serve as a state marker for test-and-set
92     operations. I am passed out of retrieval operations and back into publish
93     operations, which means 'publish this new version, but only if nothing
94     has changed since I last retrieved this data'. This reduces the chances
95     of clobbering a simultaneous (uncoordinated) write.
96
97     @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
98                      (versionid, timestamp) tuple. Each 'versionid' is a
99                      tuple of (seqnum, root_hash, IV, segsize, datalength,
100                      k, N, signed_prefix, offsets)
101
102     @ivar connections: maps peerid to a RemoteReference
103
104     @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
105                       shares that I should ignore (because a previous user of
106                       the servermap determined that they were invalid). The
107                       updater only locates a certain number of shares: if
108                       some of these turn out to have integrity problems and
109                       are unusable, the caller will need to mark those shares
110                       as bad, then re-update the servermap, then try again.
111                       The dict maps (peerid, shnum) tuple to old checkstring.
112     """
113
114     def __init__(self):
115         self.servermap = {}
116         self.connections = {}
117         self.unreachable_peers = set() # peerids that didn't respond to queries
118         self.reachable_peers = set() # peerids that did respond to queries
119         self.problems = [] # mostly for debugging
120         self.bad_shares = {} # maps (peerid,shnum) to old checkstring
121         self.last_update_mode = None
122         self.last_update_time = 0
123
124     def copy(self):
125         s = ServerMap()
126         s.servermap = self.servermap.copy() # tuple->tuple
127         s.connections = self.connections.copy() # str->RemoteReference
128         s.unreachable_peers = set(self.unreachable_peers)
129         s.reachable_peers = set(self.reachable_peers)
130         s.problems = self.problems[:]
131         s.bad_shares = self.bad_shares.copy() # tuple->str
132         s.last_update_mode = self.last_update_mode
133         s.last_update_time = self.last_update_time
134         return s
135
136     def mark_bad_share(self, peerid, shnum, checkstring):
137         """This share was found to be bad, either in the checkstring or
138         signature (detected during mapupdate), or deeper in the share
139         (detected at retrieve time). Remove it from our list of useful
140         shares, and remember that it is bad so we don't add it back again
141         later. We record the share's old checkstring (which might be
142         corrupted or badly signed) so that a repair operation can do the
143         test-and-set using it as a reference.
144         """
145         key = (peerid, shnum) # record checkstring
146         self.bad_shares[key] = checkstring
147         self.servermap.pop(key, None)
148
149     def add_new_share(self, peerid, shnum, verinfo, timestamp):
150         """We've written a new share out, replacing any that was there
151         before."""
152         key = (peerid, shnum)
153         self.bad_shares.pop(key, None)
154         self.servermap[key] = (verinfo, timestamp)
155
156     def dump(self, out=sys.stdout):
157         print >>out, "servermap:"
158
159         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
160             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
161              offsets_tuple) = verinfo
162             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
163                           (idlib.shortnodeid_b2a(peerid), shnum,
164                            seqnum, base32.b2a(root_hash)[:4], k, N,
165                            datalength))
166         if self.problems:
167             print >>out, "%d PROBLEMS" % len(self.problems)
168             for f in self.problems:
169                 print >>out, str(f)
170         return out
171
172     def all_peers(self):
173         return set([peerid
174                     for (peerid, shnum)
175                     in self.servermap])
176
177     def all_peers_for_version(self, verinfo):
178         """Return a set of peerids that hold shares for the given version."""
179         return set([peerid
180                     for ( (peerid, shnum), (verinfo2, timestamp) )
181                     in self.servermap.items()
182                     if verinfo == verinfo2])
183
184     def make_sharemap(self):
185         """Return a dict that maps shnum to a set of peerds that hold it."""
186         sharemap = DictOfSets()
187         for (peerid, shnum) in self.servermap:
188             sharemap.add(shnum, peerid)
189         return sharemap
190
191     def make_versionmap(self):
192         """Return a dict that maps versionid to sets of (shnum, peerid,
193         timestamp) tuples."""
194         versionmap = DictOfSets()
195         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
196             versionmap.add(verinfo, (shnum, peerid, timestamp))
197         return versionmap
198
199     def shares_on_peer(self, peerid):
200         return set([shnum
201                     for (s_peerid, shnum)
202                     in self.servermap
203                     if s_peerid == peerid])
204
205     def version_on_peer(self, peerid, shnum):
206         key = (peerid, shnum)
207         if key in self.servermap:
208             (verinfo, timestamp) = self.servermap[key]
209             return verinfo
210         return None
211
212     def shares_available(self):
213         """Return a dict that maps verinfo to tuples of
214         (num_distinct_shares, k, N) tuples."""
215         versionmap = self.make_versionmap()
216         all_shares = {}
217         for verinfo, shares in versionmap.items():
218             s = set()
219             for (shnum, peerid, timestamp) in shares:
220                 s.add(shnum)
221             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
222              offsets_tuple) = verinfo
223             all_shares[verinfo] = (len(s), k, N)
224         return all_shares
225
226     def highest_seqnum(self):
227         available = self.shares_available()
228         seqnums = [verinfo[0]
229                    for verinfo in available.keys()]
230         seqnums.append(0)
231         return max(seqnums)
232
233     def summarize_version(self, verinfo):
234         """Take a versionid, return a string that describes it."""
235         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
236          offsets_tuple) = verinfo
237         return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
238
239     def summarize_versions(self):
240         """Return a string describing which versions we know about."""
241         versionmap = self.make_versionmap()
242         bits = []
243         for (verinfo, shares) in versionmap.items():
244             vstr = self.summarize_version(verinfo)
245             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
246             bits.append("%d*%s" % (len(shnums), vstr))
247         return "/".join(bits)
248
249     def recoverable_versions(self):
250         """Return a set of versionids, one for each version that is currently
251         recoverable."""
252         versionmap = self.make_versionmap()
253
254         recoverable_versions = set()
255         for (verinfo, shares) in versionmap.items():
256             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
257              offsets_tuple) = verinfo
258             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
259             if len(shnums) >= k:
260                 # this one is recoverable
261                 recoverable_versions.add(verinfo)
262
263         return recoverable_versions
264
265     def unrecoverable_versions(self):
266         """Return a set of versionids, one for each version that is currently
267         unrecoverable."""
268         versionmap = self.make_versionmap()
269
270         unrecoverable_versions = set()
271         for (verinfo, shares) in versionmap.items():
272             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
273              offsets_tuple) = verinfo
274             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
275             if len(shnums) < k:
276                 unrecoverable_versions.add(verinfo)
277
278         return unrecoverable_versions
279
280     def best_recoverable_version(self):
281         """Return a single versionid, for the so-called 'best' recoverable
282         version. Sequence number is the primary sort criteria, followed by
283         root hash. Returns None if there are no recoverable versions."""
284         recoverable = list(self.recoverable_versions())
285         recoverable.sort()
286         if recoverable:
287             return recoverable[-1]
288         return None
289
290     def size_of_version(self, verinfo):
291         """Given a versionid (perhaps returned by best_recoverable_version),
292         return the size of the file in bytes."""
293         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
294          offsets_tuple) = verinfo
295         return datalength
296
297     def unrecoverable_newer_versions(self):
298         # Return a dict of versionid -> health, for versions that are
299         # unrecoverable and have later seqnums than any recoverable versions.
300         # These indicate that a write will lose data.
301         versionmap = self.make_versionmap()
302         healths = {} # maps verinfo to (found,k)
303         unrecoverable = set()
304         highest_recoverable_seqnum = -1
305         for (verinfo, shares) in versionmap.items():
306             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
307              offsets_tuple) = verinfo
308             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
309             healths[verinfo] = (len(shnums),k)
310             if len(shnums) < k:
311                 unrecoverable.add(verinfo)
312             else:
313                 highest_recoverable_seqnum = max(seqnum,
314                                                  highest_recoverable_seqnum)
315
316         newversions = {}
317         for verinfo in unrecoverable:
318             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
319              offsets_tuple) = verinfo
320             if seqnum > highest_recoverable_seqnum:
321                 newversions[verinfo] = healths[verinfo]
322
323         return newversions
324
325
326     def needs_merge(self):
327         # return True if there are multiple recoverable versions with the
328         # same seqnum, meaning that MutableFileNode.read_best_version is not
329         # giving you the whole story, and that using its data to do a
330         # subsequent publish will lose information.
331         recoverable_seqnums = [verinfo[0]
332                                for verinfo in self.recoverable_versions()]
333         for seqnum in recoverable_seqnums:
334             if recoverable_seqnums.count(seqnum) > 1:
335                 return True
336         return False
337
338
339 class ServermapUpdater:
340     def __init__(self, filenode, monitor, servermap, mode=MODE_READ,
341                  add_lease=False):
342         """I update a servermap, locating a sufficient number of useful
343         shares and remembering where they are located.
344
345         """
346
347         self._node = filenode
348         self._monitor = monitor
349         self._servermap = servermap
350         self.mode = mode
351         self._add_lease = add_lease
352         self._running = True
353
354         self._storage_index = filenode.get_storage_index()
355         self._last_failure = None
356
357         self._status = UpdateStatus()
358         self._status.set_storage_index(self._storage_index)
359         self._status.set_progress(0.0)
360         self._status.set_mode(mode)
361
362         self._servers_responded = set()
363
364         # how much data should we read?
365         #  * if we only need the checkstring, then [0:75]
366         #  * if we need to validate the checkstring sig, then [543ish:799ish]
367         #  * if we need the verification key, then [107:436ish]
368         #   * the offset table at [75:107] tells us about the 'ish'
369         #  * if we need the encrypted private key, we want [-1216ish:]
370         #   * but we can't read from negative offsets
371         #   * the offset table tells us the 'ish', also the positive offset
372         # A future version of the SMDF slot format should consider using
373         # fixed-size slots so we can retrieve less data. For now, we'll just
374         # read 2000 bytes, which also happens to read enough actual data to
375         # pre-fetch a 9-entry dirnode.
376         self._read_size = 4000
377         if mode == MODE_CHECK:
378             # we use unpack_prefix_and_signature, so we need 1k
379             self._read_size = 1000
380         self._need_privkey = False
381         if mode == MODE_WRITE and not self._node._privkey:
382             self._need_privkey = True
383         # check+repair: repair requires the privkey, so if we didn't happen
384         # to ask for it during the check, we'll have problems doing the
385         # publish.
386
387         prefix = si_b2a(self._storage_index)[:5]
388         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
389                                    si=prefix, mode=mode)
390
391     def get_status(self):
392         return self._status
393
394     def log(self, *args, **kwargs):
395         if "parent" not in kwargs:
396             kwargs["parent"] = self._log_number
397         if "facility" not in kwargs:
398             kwargs["facility"] = "tahoe.mutable.mapupdate"
399         return log.msg(*args, **kwargs)
400
401     def update(self):
402         """Update the servermap to reflect current conditions. Returns a
403         Deferred that fires with the servermap once the update has finished."""
404         self._started = time.time()
405         self._status.set_active(True)
406
407         # self._valid_versions is a set of validated verinfo tuples. We just
408         # use it to remember which versions had valid signatures, so we can
409         # avoid re-checking the signatures for each share.
410         self._valid_versions = set()
411
412         # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
413         # timestamp) tuples. This is used to figure out which versions might
414         # be retrievable, and to make the eventual data download faster.
415         self.versionmap = DictOfSets()
416
417         self._done_deferred = defer.Deferred()
418
419         # first, which peers should be talk to? Any that were in our old
420         # servermap, plus "enough" others.
421
422         self._queries_completed = 0
423
424         sb = self._node._client.get_storage_broker()
425         full_peerlist = sb.get_servers_for_index(self._node._storage_index)
426         self.full_peerlist = full_peerlist # for use later, immutable
427         self.extra_peers = full_peerlist[:] # peers are removed as we use them
428         self._good_peers = set() # peers who had some shares
429         self._empty_peers = set() # peers who don't have any shares
430         self._bad_peers = set() # peers to whom our queries failed
431
432         k = self._node.get_required_shares()
433         if k is None:
434             # make a guess
435             k = 3
436         N = self._node.get_required_shares()
437         if N is None:
438             N = 10
439         self.EPSILON = k
440         # we want to send queries to at least this many peers (although we
441         # might not wait for all of their answers to come back)
442         self.num_peers_to_query = k + self.EPSILON
443
444         if self.mode == MODE_CHECK:
445             initial_peers_to_query = dict(full_peerlist)
446             must_query = set(initial_peers_to_query.keys())
447             self.extra_peers = []
448         elif self.mode == MODE_WRITE:
449             # we're planning to replace all the shares, so we want a good
450             # chance of finding them all. We will keep searching until we've
451             # seen epsilon that don't have a share.
452             self.num_peers_to_query = N + self.EPSILON
453             initial_peers_to_query, must_query = self._build_initial_querylist()
454             self.required_num_empty_peers = self.EPSILON
455
456             # TODO: arrange to read lots of data from k-ish servers, to avoid
457             # the extra round trip required to read large directories. This
458             # might also avoid the round trip required to read the encrypted
459             # private key.
460
461         else:
462             initial_peers_to_query, must_query = self._build_initial_querylist()
463
464         # this is a set of peers that we are required to get responses from:
465         # they are peers who used to have a share, so we need to know where
466         # they currently stand, even if that means we have to wait for a
467         # silently-lost TCP connection to time out. We remove peers from this
468         # set as we get responses.
469         self._must_query = must_query
470
471         # now initial_peers_to_query contains the peers that we should ask,
472         # self.must_query contains the peers that we must have heard from
473         # before we can consider ourselves finished, and self.extra_peers
474         # contains the overflow (peers that we should tap if we don't get
475         # enough responses)
476
477         self._send_initial_requests(initial_peers_to_query)
478         self._status.timings["initial_queries"] = time.time() - self._started
479         return self._done_deferred
480
481     def _build_initial_querylist(self):
482         initial_peers_to_query = {}
483         must_query = set()
484         for peerid in self._servermap.all_peers():
485             ss = self._servermap.connections[peerid]
486             # we send queries to everyone who was already in the sharemap
487             initial_peers_to_query[peerid] = ss
488             # and we must wait for responses from them
489             must_query.add(peerid)
490
491         while ((self.num_peers_to_query > len(initial_peers_to_query))
492                and self.extra_peers):
493             (peerid, ss) = self.extra_peers.pop(0)
494             initial_peers_to_query[peerid] = ss
495
496         return initial_peers_to_query, must_query
497
498     def _send_initial_requests(self, peerlist):
499         self._status.set_status("Sending %d initial queries" % len(peerlist))
500         self._queries_outstanding = set()
501         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
502         dl = []
503         for (peerid, ss) in peerlist.items():
504             self._queries_outstanding.add(peerid)
505             self._do_query(ss, peerid, self._storage_index, self._read_size)
506
507         if not peerlist:
508             # there is nobody to ask, so we need to short-circuit the state
509             # machine.
510             d = defer.maybeDeferred(self._check_for_done, None)
511             d.addErrback(self._fatal_error)
512
513         # control flow beyond this point: state machine. Receiving responses
514         # from queries is the input. We might send out more queries, or we
515         # might produce a result.
516         return None
517
518     def _do_query(self, ss, peerid, storage_index, readsize):
519         self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
520                  peerid=idlib.shortnodeid_b2a(peerid),
521                  readsize=readsize,
522                  level=log.NOISY)
523         self._servermap.connections[peerid] = ss
524         started = time.time()
525         self._queries_outstanding.add(peerid)
526         d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
527         d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
528                       started)
529         d.addErrback(self._query_failed, peerid)
530         # errors that aren't handled by _query_failed (and errors caused by
531         # _query_failed) get logged, but we still want to check for doneness.
532         d.addErrback(log.err)
533         d.addBoth(self._check_for_done)
534         d.addErrback(self._fatal_error)
535         return d
536
537     def _do_read(self, ss, peerid, storage_index, shnums, readv):
538         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
539         if self._add_lease:
540             renew_secret = self._node.get_renewal_secret(peerid)
541             cancel_secret = self._node.get_cancel_secret(peerid)
542             d2 = ss.callRemote("add_lease", storage_index,
543                                renew_secret, cancel_secret)
544             dl = defer.DeferredList([d, d2], consumeErrors=True)
545             def _done(res):
546                 [(readv_success, readv_result),
547                  (addlease_success, addlease_result)] = res
548                 # ignore remote IndexError on the add_lease call. Propagate
549                 # local errors and remote non-IndexErrors
550                 if addlease_success:
551                     return readv_result
552                 if not addlease_result.check(RemoteException):
553                     # Propagate local errors
554                     return addlease_result
555                 if addlease_result.value.failure.check(IndexError):
556                     # tahoe=1.3.0 raised IndexError on non-existant
557                     # buckets, which we ignore
558                     return readv_result
559                 # propagate remote errors that aren't IndexError, including
560                 # the unfortunate internal KeyError bug that <1.3.0 had.
561                 return addlease_result
562             dl.addCallback(_done)
563             return dl
564         return d
565
566     def _got_results(self, datavs, peerid, readsize, stuff, started):
567         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
568                       peerid=idlib.shortnodeid_b2a(peerid),
569                       numshares=len(datavs),
570                       level=log.NOISY)
571         now = time.time()
572         elapsed = now - started
573         self._queries_outstanding.discard(peerid)
574         self._servermap.reachable_peers.add(peerid)
575         self._must_query.discard(peerid)
576         self._queries_completed += 1
577         if not self._running:
578             self.log("but we're not running, so we'll ignore it", parent=lp,
579                      level=log.NOISY)
580             self._status.add_per_server_time(peerid, "late", started, elapsed)
581             return
582         self._status.add_per_server_time(peerid, "query", started, elapsed)
583
584         if datavs:
585             self._good_peers.add(peerid)
586         else:
587             self._empty_peers.add(peerid)
588
589         last_verinfo = None
590         last_shnum = None
591         for shnum,datav in datavs.items():
592             data = datav[0]
593             try:
594                 verinfo = self._got_results_one_share(shnum, data, peerid, lp)
595                 last_verinfo = verinfo
596                 last_shnum = shnum
597                 self._node._cache.add(verinfo, shnum, 0, data, now)
598             except CorruptShareError, e:
599                 # log it and give the other shares a chance to be processed
600                 f = failure.Failure()
601                 self.log(format="bad share: %(f_value)s", f_value=str(f.value),
602                          failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
603                 self.notify_server_corruption(peerid, shnum, str(e))
604                 self._bad_peers.add(peerid)
605                 self._last_failure = f
606                 checkstring = data[:SIGNED_PREFIX_LENGTH]
607                 self._servermap.mark_bad_share(peerid, shnum, checkstring)
608                 self._servermap.problems.append(f)
609                 pass
610
611         self._status.timings["cumulative_verify"] += (time.time() - now)
612
613         if self._need_privkey and last_verinfo:
614             # send them a request for the privkey. We send one request per
615             # server.
616             lp2 = self.log("sending privkey request",
617                            parent=lp, level=log.NOISY)
618             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
619              offsets_tuple) = last_verinfo
620             o = dict(offsets_tuple)
621
622             self._queries_outstanding.add(peerid)
623             readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
624             ss = self._servermap.connections[peerid]
625             privkey_started = time.time()
626             d = self._do_read(ss, peerid, self._storage_index,
627                               [last_shnum], readv)
628             d.addCallback(self._got_privkey_results, peerid, last_shnum,
629                           privkey_started, lp2)
630             d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
631             d.addErrback(log.err)
632             d.addCallback(self._check_for_done)
633             d.addErrback(self._fatal_error)
634
635         # all done!
636         self.log("_got_results done", parent=lp, level=log.NOISY)
637
638     def notify_server_corruption(self, peerid, shnum, reason):
639         ss = self._servermap.connections[peerid]
640         ss.callRemoteOnly("advise_corrupt_share",
641                           "mutable", self._storage_index, shnum, reason)
642
643     def _got_results_one_share(self, shnum, data, peerid, lp):
644         self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
645                  shnum=shnum,
646                  peerid=idlib.shortnodeid_b2a(peerid),
647                  level=log.NOISY,
648                  parent=lp)
649
650         # this might raise NeedMoreDataError, if the pubkey and signature
651         # live at some weird offset. That shouldn't happen, so I'm going to
652         # treat it as a bad share.
653         (seqnum, root_hash, IV, k, N, segsize, datalength,
654          pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
655
656         if not self._node.get_pubkey():
657             fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
658             assert len(fingerprint) == 32
659             if fingerprint != self._node._fingerprint:
660                 raise CorruptShareError(peerid, shnum,
661                                         "pubkey doesn't match fingerprint")
662             self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
663
664         if self._need_privkey:
665             self._try_to_extract_privkey(data, peerid, shnum, lp)
666
667         (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
668          ig_segsize, ig_datalen, offsets) = unpack_header(data)
669         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
670
671         verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
672                    offsets_tuple)
673
674         if verinfo not in self._valid_versions:
675             # it's a new pair. Verify the signature.
676             valid = self._node._pubkey.verify(prefix, signature)
677             if not valid:
678                 raise CorruptShareError(peerid, shnum, "signature is invalid")
679
680             # ok, it's a valid verinfo. Add it to the list of validated
681             # versions.
682             self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
683                      % (seqnum, base32.b2a(root_hash)[:4],
684                         idlib.shortnodeid_b2a(peerid), shnum,
685                         k, N, segsize, datalength),
686                      parent=lp)
687             self._valid_versions.add(verinfo)
688         # We now know that this is a valid candidate verinfo.
689
690         if (peerid, shnum) in self._servermap.bad_shares:
691             # we've been told that the rest of the data in this share is
692             # unusable, so don't add it to the servermap.
693             self.log("but we've been told this is a bad share",
694                      parent=lp, level=log.UNUSUAL)
695             return verinfo
696
697         # Add the info to our servermap.
698         timestamp = time.time()
699         self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
700         # and the versionmap
701         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
702         return verinfo
703
704     def _deserialize_pubkey(self, pubkey_s):
705         verifier = rsa.create_verifying_key_from_string(pubkey_s)
706         return verifier
707
708     def _try_to_extract_privkey(self, data, peerid, shnum, lp):
709         try:
710             r = unpack_share(data)
711         except NeedMoreDataError, e:
712             # this share won't help us. oh well.
713             offset = e.encprivkey_offset
714             length = e.encprivkey_length
715             self.log("shnum %d on peerid %s: share was too short (%dB) "
716                      "to get the encprivkey; [%d:%d] ought to hold it" %
717                      (shnum, idlib.shortnodeid_b2a(peerid), len(data),
718                       offset, offset+length),
719                      parent=lp)
720             # NOTE: if uncoordinated writes are taking place, someone might
721             # change the share (and most probably move the encprivkey) before
722             # we get a chance to do one of these reads and fetch it. This
723             # will cause us to see a NotEnoughSharesError(unable to fetch
724             # privkey) instead of an UncoordinatedWriteError . This is a
725             # nuisance, but it will go away when we move to DSA-based mutable
726             # files (since the privkey will be small enough to fit in the
727             # write cap).
728
729             return
730
731         (seqnum, root_hash, IV, k, N, segsize, datalen,
732          pubkey, signature, share_hash_chain, block_hash_tree,
733          share_data, enc_privkey) = r
734
735         return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
736
737     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
738
739         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
740         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
741         if alleged_writekey != self._node.get_writekey():
742             self.log("invalid privkey from %s shnum %d" %
743                      (idlib.nodeid_b2a(peerid)[:8], shnum),
744                      parent=lp, level=log.WEIRD, umid="aJVccw")
745             return
746
747         # it's good
748         self.log("got valid privkey from shnum %d on peerid %s" %
749                  (shnum, idlib.shortnodeid_b2a(peerid)),
750                  parent=lp)
751         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
752         self._node._populate_encprivkey(enc_privkey)
753         self._node._populate_privkey(privkey)
754         self._need_privkey = False
755         self._status.set_privkey_from(peerid)
756
757
758     def _query_failed(self, f, peerid):
759         if not self._running:
760             return
761         level = log.WEIRD
762         if f.check(DeadReferenceError):
763             level = log.UNUSUAL
764         self.log(format="error during query: %(f_value)s",
765                  f_value=str(f.value), failure=f,
766                  level=level, umid="IHXuQg")
767         self._must_query.discard(peerid)
768         self._queries_outstanding.discard(peerid)
769         self._bad_peers.add(peerid)
770         self._servermap.problems.append(f)
771         # a peerid could be in both ServerMap.reachable_peers and
772         # .unreachable_peers if they responded to our query, but then an
773         # exception was raised in _got_results.
774         self._servermap.unreachable_peers.add(peerid)
775         self._queries_completed += 1
776         self._last_failure = f
777
778     def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
779         now = time.time()
780         elapsed = now - started
781         self._status.add_per_server_time(peerid, "privkey", started, elapsed)
782         self._queries_outstanding.discard(peerid)
783         if not self._need_privkey:
784             return
785         if shnum not in datavs:
786             self.log("privkey wasn't there when we asked it",
787                      level=log.WEIRD, umid="VA9uDQ")
788             return
789         datav = datavs[shnum]
790         enc_privkey = datav[0]
791         self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
792
793     def _privkey_query_failed(self, f, peerid, shnum, lp):
794         self._queries_outstanding.discard(peerid)
795         if not self._running:
796             return
797         level = log.WEIRD
798         if f.check(DeadReferenceError):
799             level = log.UNUSUAL
800         self.log(format="error during privkey query: %(f_value)s",
801                  f_value=str(f.value), failure=f,
802                  parent=lp, level=level, umid="McoJ5w")
803         self._servermap.problems.append(f)
804         self._last_failure = f
805
806     def _check_for_done(self, res):
807         # exit paths:
808         #  return self._send_more_queries(outstanding) : send some more queries
809         #  return self._done() : all done
810         #  return : keep waiting, no new queries
811
812         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
813                               "%(outstanding)d queries outstanding, "
814                               "%(extra)d extra peers available, "
815                               "%(must)d 'must query' peers left, "
816                               "need_privkey=%(need_privkey)s"
817                               ),
818                       mode=self.mode,
819                       outstanding=len(self._queries_outstanding),
820                       extra=len(self.extra_peers),
821                       must=len(self._must_query),
822                       need_privkey=self._need_privkey,
823                       level=log.NOISY,
824                       )
825
826         if not self._running:
827             self.log("but we're not running", parent=lp, level=log.NOISY)
828             return
829
830         if self._must_query:
831             # we are still waiting for responses from peers that used to have
832             # a share, so we must continue to wait. No additional queries are
833             # required at this time.
834             self.log("%d 'must query' peers left" % len(self._must_query),
835                      level=log.NOISY, parent=lp)
836             return
837
838         if (not self._queries_outstanding and not self.extra_peers):
839             # all queries have retired, and we have no peers left to ask. No
840             # more progress can be made, therefore we are done.
841             self.log("all queries are retired, no extra peers: done",
842                      parent=lp)
843             return self._done()
844
845         recoverable_versions = self._servermap.recoverable_versions()
846         unrecoverable_versions = self._servermap.unrecoverable_versions()
847
848         # what is our completion policy? how hard should we work?
849
850         if self.mode == MODE_ANYTHING:
851             if recoverable_versions:
852                 self.log("%d recoverable versions: done"
853                          % len(recoverable_versions),
854                          parent=lp)
855                 return self._done()
856
857         if self.mode == MODE_CHECK:
858             # we used self._must_query, and we know there aren't any
859             # responses still waiting, so that means we must be done
860             self.log("done", parent=lp)
861             return self._done()
862
863         MAX_IN_FLIGHT = 5
864         if self.mode == MODE_READ:
865             # if we've queried k+epsilon servers, and we see a recoverable
866             # version, and we haven't seen any unrecoverable higher-seqnum'ed
867             # versions, then we're done.
868
869             if self._queries_completed < self.num_peers_to_query:
870                 self.log(format="%(completed)d completed, %(query)d to query: need more",
871                          completed=self._queries_completed,
872                          query=self.num_peers_to_query,
873                          level=log.NOISY, parent=lp)
874                 return self._send_more_queries(MAX_IN_FLIGHT)
875             if not recoverable_versions:
876                 self.log("no recoverable versions: need more",
877                          level=log.NOISY, parent=lp)
878                 return self._send_more_queries(MAX_IN_FLIGHT)
879             highest_recoverable = max(recoverable_versions)
880             highest_recoverable_seqnum = highest_recoverable[0]
881             for unrec_verinfo in unrecoverable_versions:
882                 if unrec_verinfo[0] > highest_recoverable_seqnum:
883                     # there is evidence of a higher-seqnum version, but we
884                     # don't yet see enough shares to recover it. Try harder.
885                     # TODO: consider sending more queries.
886                     # TODO: consider limiting the search distance
887                     self.log("evidence of higher seqnum: need more",
888                              level=log.UNUSUAL, parent=lp)
889                     return self._send_more_queries(MAX_IN_FLIGHT)
890             # all the unrecoverable versions were old or concurrent with a
891             # recoverable version. Good enough.
892             self.log("no higher-seqnum: done", parent=lp)
893             return self._done()
894
895         if self.mode == MODE_WRITE:
896             # we want to keep querying until we've seen a few that don't have
897             # any shares, to be sufficiently confident that we've seen all
898             # the shares. This is still less work than MODE_CHECK, which asks
899             # every server in the world.
900
901             if not recoverable_versions:
902                 self.log("no recoverable versions: need more", parent=lp,
903                          level=log.NOISY)
904                 return self._send_more_queries(MAX_IN_FLIGHT)
905
906             last_found = -1
907             last_not_responded = -1
908             num_not_responded = 0
909             num_not_found = 0
910             states = []
911             found_boundary = False
912
913             for i,(peerid,ss) in enumerate(self.full_peerlist):
914                 if peerid in self._bad_peers:
915                     # query failed
916                     states.append("x")
917                     #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
918                 elif peerid in self._empty_peers:
919                     # no shares
920                     states.append("0")
921                     #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
922                     if last_found != -1:
923                         num_not_found += 1
924                         if num_not_found >= self.EPSILON:
925                             self.log("found our boundary, %s" %
926                                      "".join(states),
927                                      parent=lp, level=log.NOISY)
928                             found_boundary = True
929                             break
930
931                 elif peerid in self._good_peers:
932                     # yes shares
933                     states.append("1")
934                     #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
935                     last_found = i
936                     num_not_found = 0
937                 else:
938                     # not responded yet
939                     states.append("?")
940                     #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
941                     last_not_responded = i
942                     num_not_responded += 1
943
944             if found_boundary:
945                 # we need to know that we've gotten answers from
946                 # everybody to the left of here
947                 if last_not_responded == -1:
948                     # we're done
949                     self.log("have all our answers",
950                              parent=lp, level=log.NOISY)
951                     # .. unless we're still waiting on the privkey
952                     if self._need_privkey:
953                         self.log("but we're still waiting for the privkey",
954                                  parent=lp, level=log.NOISY)
955                         # if we found the boundary but we haven't yet found
956                         # the privkey, we may need to look further. If
957                         # somehow all the privkeys were corrupted (but the
958                         # shares were readable), then this is likely to do an
959                         # exhaustive search.
960                         return self._send_more_queries(MAX_IN_FLIGHT)
961                     return self._done()
962                 # still waiting for somebody
963                 return self._send_more_queries(num_not_responded)
964
965             # if we hit here, we didn't find our boundary, so we're still
966             # waiting for peers
967             self.log("no boundary yet, %s" % "".join(states), parent=lp,
968                      level=log.NOISY)
969             return self._send_more_queries(MAX_IN_FLIGHT)
970
971         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
972         # arbitrary, really I want this to be something like k -
973         # max(known_version_sharecounts) + some extra
974         self.log("catchall: need more", parent=lp, level=log.NOISY)
975         return self._send_more_queries(MAX_IN_FLIGHT)
976
977     def _send_more_queries(self, num_outstanding):
978         more_queries = []
979
980         while True:
981             self.log(format=" there are %(outstanding)d queries outstanding",
982                      outstanding=len(self._queries_outstanding),
983                      level=log.NOISY)
984             active_queries = len(self._queries_outstanding) + len(more_queries)
985             if active_queries >= num_outstanding:
986                 break
987             if not self.extra_peers:
988                 break
989             more_queries.append(self.extra_peers.pop(0))
990
991         self.log(format="sending %(more)d more queries: %(who)s",
992                  more=len(more_queries),
993                  who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
994                                for (peerid,ss) in more_queries]),
995                  level=log.NOISY)
996
997         for (peerid, ss) in more_queries:
998             self._do_query(ss, peerid, self._storage_index, self._read_size)
999             # we'll retrigger when those queries come back
1000
1001     def _done(self):
1002         if not self._running:
1003             return
1004         self._running = False
1005         now = time.time()
1006         elapsed = now - self._started
1007         self._status.set_finished(now)
1008         self._status.timings["total"] = elapsed
1009         self._status.set_progress(1.0)
1010         self._status.set_status("Done")
1011         self._status.set_active(False)
1012
1013         self._servermap.last_update_mode = self.mode
1014         self._servermap.last_update_time = self._started
1015         # the servermap will not be touched after this
1016         self.log("servermap: %s" % self._servermap.summarize_versions())
1017         eventually(self._done_deferred.callback, self._servermap)
1018
1019     def _fatal_error(self, f):
1020         self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1021         self._done_deferred.errback(f)
1022
1023