]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
Change relative imports to absolute
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually
8 from allmydata.util import base32, hashutil, idlib, log
9 from allmydata.storage.server import si_b2a
10 from allmydata.interfaces import IServermapUpdaterStatus
11 from pycryptopp.publickey import rsa
12
13 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
14      DictOfSets, CorruptShareError, NeedMoreDataError
15 from allmydata.mutable.layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
16      SIGNED_PREFIX_LENGTH
17
18 class UpdateStatus:
19     implements(IServermapUpdaterStatus)
20     statusid_counter = count(0)
21     def __init__(self):
22         self.timings = {}
23         self.timings["per_server"] = {}
24         self.timings["cumulative_verify"] = 0.0
25         self.privkey_from = None
26         self.problems = {}
27         self.active = True
28         self.storage_index = None
29         self.mode = "?"
30         self.status = "Not started"
31         self.progress = 0.0
32         self.counter = self.statusid_counter.next()
33         self.started = time.time()
34         self.finished = None
35
36     def add_per_server_time(self, peerid, op, sent, elapsed):
37         assert op in ("query", "late", "privkey")
38         if peerid not in self.timings["per_server"]:
39             self.timings["per_server"][peerid] = []
40         self.timings["per_server"][peerid].append((op,sent,elapsed))
41
42     def get_started(self):
43         return self.started
44     def get_finished(self):
45         return self.finished
46     def get_storage_index(self):
47         return self.storage_index
48     def get_mode(self):
49         return self.mode
50     def get_servermap(self):
51         return self.servermap
52     def get_privkey_from(self):
53         return self.privkey_from
54     def using_helper(self):
55         return False
56     def get_size(self):
57         return "-NA-"
58     def get_status(self):
59         return self.status
60     def get_progress(self):
61         return self.progress
62     def get_active(self):
63         return self.active
64     def get_counter(self):
65         return self.counter
66
67     def set_storage_index(self, si):
68         self.storage_index = si
69     def set_mode(self, mode):
70         self.mode = mode
71     def set_privkey_from(self, peerid):
72         self.privkey_from = peerid
73     def set_status(self, status):
74         self.status = status
75     def set_progress(self, value):
76         self.progress = value
77     def set_active(self, value):
78         self.active = value
79     def set_finished(self, when):
80         self.finished = when
81
82 class ServerMap:
83     """I record the placement of mutable shares.
84
85     This object records which shares (of various versions) are located on
86     which servers.
87
88     One purpose I serve is to inform callers about which versions of the
89     mutable file are recoverable and 'current'.
90
91     A second purpose is to serve as a state marker for test-and-set
92     operations. I am passed out of retrieval operations and back into publish
93     operations, which means 'publish this new version, but only if nothing
94     has changed since I last retrieved this data'. This reduces the chances
95     of clobbering a simultaneous (uncoordinated) write.
96
97     @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
98                      (versionid, timestamp) tuple. Each 'versionid' is a
99                      tuple of (seqnum, root_hash, IV, segsize, datalength,
100                      k, N, signed_prefix, offsets)
101
102     @ivar connections: maps peerid to a RemoteReference
103
104     @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
105                       shares that I should ignore (because a previous user of
106                       the servermap determined that they were invalid). The
107                       updater only locates a certain number of shares: if
108                       some of these turn out to have integrity problems and
109                       are unusable, the caller will need to mark those shares
110                       as bad, then re-update the servermap, then try again.
111                       The dict maps (peerid, shnum) tuple to old checkstring.
112     """
113
114     def __init__(self):
115         self.servermap = {}
116         self.connections = {}
117         self.unreachable_peers = set() # peerids that didn't respond to queries
118         self.reachable_peers = set() # peerids that did respond to queries
119         self.problems = [] # mostly for debugging
120         self.bad_shares = {} # maps (peerid,shnum) to old checkstring
121         self.last_update_mode = None
122         self.last_update_time = 0
123
124     def copy(self):
125         s = ServerMap()
126         s.servermap = self.servermap.copy() # tuple->tuple
127         s.connections = self.connections.copy() # str->RemoteReference
128         s.unreachable_peers = set(self.unreachable_peers)
129         s.reachable_peers = set(self.reachable_peers)
130         s.problems = self.problems[:]
131         s.bad_shares = self.bad_shares.copy() # tuple->str
132         s.last_update_mode = self.last_update_mode
133         s.last_update_time = self.last_update_time
134         return s
135
136     def mark_bad_share(self, peerid, shnum, checkstring):
137         """This share was found to be bad, either in the checkstring or
138         signature (detected during mapupdate), or deeper in the share
139         (detected at retrieve time). Remove it from our list of useful
140         shares, and remember that it is bad so we don't add it back again
141         later. We record the share's old checkstring (which might be
142         corrupted or badly signed) so that a repair operation can do the
143         test-and-set using it as a reference.
144         """
145         key = (peerid, shnum) # record checkstring
146         self.bad_shares[key] = checkstring
147         self.servermap.pop(key, None)
148
149     def add_new_share(self, peerid, shnum, verinfo, timestamp):
150         """We've written a new share out, replacing any that was there
151         before."""
152         key = (peerid, shnum)
153         self.bad_shares.pop(key, None)
154         self.servermap[key] = (verinfo, timestamp)
155
156     def dump(self, out=sys.stdout):
157         print >>out, "servermap:"
158
159         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
160             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
161              offsets_tuple) = verinfo
162             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
163                           (idlib.shortnodeid_b2a(peerid), shnum,
164                            seqnum, base32.b2a(root_hash)[:4], k, N,
165                            datalength))
166         if self.problems:
167             print >>out, "%d PROBLEMS" % len(self.problems)
168             for f in self.problems:
169                 print >>out, str(f)
170         return out
171
172     def all_peers(self):
173         return set([peerid
174                     for (peerid, shnum)
175                     in self.servermap])
176
177     def all_peers_for_version(self, verinfo):
178         """Return a set of peerids that hold shares for the given version."""
179         return set([peerid
180                     for ( (peerid, shnum), (verinfo2, timestamp) )
181                     in self.servermap.items()
182                     if verinfo == verinfo2])
183
184     def make_sharemap(self):
185         """Return a dict that maps shnum to a set of peerds that hold it."""
186         sharemap = DictOfSets()
187         for (peerid, shnum) in self.servermap:
188             sharemap.add(shnum, peerid)
189         return sharemap
190
191     def make_versionmap(self):
192         """Return a dict that maps versionid to sets of (shnum, peerid,
193         timestamp) tuples."""
194         versionmap = DictOfSets()
195         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
196             versionmap.add(verinfo, (shnum, peerid, timestamp))
197         return versionmap
198
199     def shares_on_peer(self, peerid):
200         return set([shnum
201                     for (s_peerid, shnum)
202                     in self.servermap
203                     if s_peerid == peerid])
204
205     def version_on_peer(self, peerid, shnum):
206         key = (peerid, shnum)
207         if key in self.servermap:
208             (verinfo, timestamp) = self.servermap[key]
209             return verinfo
210         return None
211
212     def shares_available(self):
213         """Return a dict that maps verinfo to tuples of
214         (num_distinct_shares, k, N) tuples."""
215         versionmap = self.make_versionmap()
216         all_shares = {}
217         for verinfo, shares in versionmap.items():
218             s = set()
219             for (shnum, peerid, timestamp) in shares:
220                 s.add(shnum)
221             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
222              offsets_tuple) = verinfo
223             all_shares[verinfo] = (len(s), k, N)
224         return all_shares
225
226     def highest_seqnum(self):
227         available = self.shares_available()
228         seqnums = [verinfo[0]
229                    for verinfo in available.keys()]
230         seqnums.append(0)
231         return max(seqnums)
232
233     def summarize_version(self, verinfo):
234         """Take a versionid, return a string that describes it."""
235         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
236          offsets_tuple) = verinfo
237         return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
238
239     def summarize_versions(self):
240         """Return a string describing which versions we know about."""
241         versionmap = self.make_versionmap()
242         bits = []
243         for (verinfo, shares) in versionmap.items():
244             vstr = self.summarize_version(verinfo)
245             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
246             bits.append("%d*%s" % (len(shnums), vstr))
247         return "/".join(bits)
248
249     def recoverable_versions(self):
250         """Return a set of versionids, one for each version that is currently
251         recoverable."""
252         versionmap = self.make_versionmap()
253
254         recoverable_versions = set()
255         for (verinfo, shares) in versionmap.items():
256             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
257              offsets_tuple) = verinfo
258             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
259             if len(shnums) >= k:
260                 # this one is recoverable
261                 recoverable_versions.add(verinfo)
262
263         return recoverable_versions
264
265     def unrecoverable_versions(self):
266         """Return a set of versionids, one for each version that is currently
267         unrecoverable."""
268         versionmap = self.make_versionmap()
269
270         unrecoverable_versions = set()
271         for (verinfo, shares) in versionmap.items():
272             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
273              offsets_tuple) = verinfo
274             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
275             if len(shnums) < k:
276                 unrecoverable_versions.add(verinfo)
277
278         return unrecoverable_versions
279
280     def best_recoverable_version(self):
281         """Return a single versionid, for the so-called 'best' recoverable
282         version. Sequence number is the primary sort criteria, followed by
283         root hash. Returns None if there are no recoverable versions."""
284         recoverable = list(self.recoverable_versions())
285         recoverable.sort()
286         if recoverable:
287             return recoverable[-1]
288         return None
289
290     def size_of_version(self, verinfo):
291         """Given a versionid (perhaps returned by best_recoverable_version),
292         return the size of the file in bytes."""
293         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
294          offsets_tuple) = verinfo
295         return datalength
296
297     def unrecoverable_newer_versions(self):
298         # Return a dict of versionid -> health, for versions that are
299         # unrecoverable and have later seqnums than any recoverable versions.
300         # These indicate that a write will lose data.
301         versionmap = self.make_versionmap()
302         healths = {} # maps verinfo to (found,k)
303         unrecoverable = set()
304         highest_recoverable_seqnum = -1
305         for (verinfo, shares) in versionmap.items():
306             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
307              offsets_tuple) = verinfo
308             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
309             healths[verinfo] = (len(shnums),k)
310             if len(shnums) < k:
311                 unrecoverable.add(verinfo)
312             else:
313                 highest_recoverable_seqnum = max(seqnum,
314                                                  highest_recoverable_seqnum)
315
316         newversions = {}
317         for verinfo in unrecoverable:
318             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
319              offsets_tuple) = verinfo
320             if seqnum > highest_recoverable_seqnum:
321                 newversions[verinfo] = healths[verinfo]
322
323         return newversions
324
325
326     def needs_merge(self):
327         # return True if there are multiple recoverable versions with the
328         # same seqnum, meaning that MutableFileNode.read_best_version is not
329         # giving you the whole story, and that using its data to do a
330         # subsequent publish will lose information.
331         recoverable_seqnums = [verinfo[0]
332                                for verinfo in self.recoverable_versions()]
333         for seqnum in recoverable_seqnums:
334             if recoverable_seqnums.count(seqnum) > 1:
335                 return True
336         return False
337
338
339 class ServermapUpdater:
340     def __init__(self, filenode, storage_broker, monitor, servermap,
341                  mode=MODE_READ, add_lease=False):
342         """I update a servermap, locating a sufficient number of useful
343         shares and remembering where they are located.
344
345         """
346
347         self._node = filenode
348         self._storage_broker = storage_broker
349         self._monitor = monitor
350         self._servermap = servermap
351         self.mode = mode
352         self._add_lease = add_lease
353         self._running = True
354
355         self._storage_index = filenode.get_storage_index()
356         self._last_failure = None
357
358         self._status = UpdateStatus()
359         self._status.set_storage_index(self._storage_index)
360         self._status.set_progress(0.0)
361         self._status.set_mode(mode)
362
363         self._servers_responded = set()
364
365         # how much data should we read?
366         #  * if we only need the checkstring, then [0:75]
367         #  * if we need to validate the checkstring sig, then [543ish:799ish]
368         #  * if we need the verification key, then [107:436ish]
369         #   * the offset table at [75:107] tells us about the 'ish'
370         #  * if we need the encrypted private key, we want [-1216ish:]
371         #   * but we can't read from negative offsets
372         #   * the offset table tells us the 'ish', also the positive offset
373         # A future version of the SMDF slot format should consider using
374         # fixed-size slots so we can retrieve less data. For now, we'll just
375         # read 2000 bytes, which also happens to read enough actual data to
376         # pre-fetch a 9-entry dirnode.
377         self._read_size = 4000
378         if mode == MODE_CHECK:
379             # we use unpack_prefix_and_signature, so we need 1k
380             self._read_size = 1000
381         self._need_privkey = False
382         if mode == MODE_WRITE and not self._node.get_privkey():
383             self._need_privkey = True
384         # check+repair: repair requires the privkey, so if we didn't happen
385         # to ask for it during the check, we'll have problems doing the
386         # publish.
387
388         prefix = si_b2a(self._storage_index)[:5]
389         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
390                                    si=prefix, mode=mode)
391
392     def get_status(self):
393         return self._status
394
395     def log(self, *args, **kwargs):
396         if "parent" not in kwargs:
397             kwargs["parent"] = self._log_number
398         if "facility" not in kwargs:
399             kwargs["facility"] = "tahoe.mutable.mapupdate"
400         return log.msg(*args, **kwargs)
401
402     def update(self):
403         """Update the servermap to reflect current conditions. Returns a
404         Deferred that fires with the servermap once the update has finished."""
405         self._started = time.time()
406         self._status.set_active(True)
407
408         # self._valid_versions is a set of validated verinfo tuples. We just
409         # use it to remember which versions had valid signatures, so we can
410         # avoid re-checking the signatures for each share.
411         self._valid_versions = set()
412
413         # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
414         # timestamp) tuples. This is used to figure out which versions might
415         # be retrievable, and to make the eventual data download faster.
416         self.versionmap = DictOfSets()
417
418         self._done_deferred = defer.Deferred()
419
420         # first, which peers should be talk to? Any that were in our old
421         # servermap, plus "enough" others.
422
423         self._queries_completed = 0
424
425         sb = self._storage_broker
426         full_peerlist = sb.get_servers_for_index(self._storage_index)
427         self.full_peerlist = full_peerlist # for use later, immutable
428         self.extra_peers = full_peerlist[:] # peers are removed as we use them
429         self._good_peers = set() # peers who had some shares
430         self._empty_peers = set() # peers who don't have any shares
431         self._bad_peers = set() # peers to whom our queries failed
432
433         k = self._node.get_required_shares()
434         if k is None:
435             # make a guess
436             k = 3
437         N = self._node.get_total_shares()
438         if N is None:
439             N = 10
440         self.EPSILON = k
441         # we want to send queries to at least this many peers (although we
442         # might not wait for all of their answers to come back)
443         self.num_peers_to_query = k + self.EPSILON
444
445         if self.mode == MODE_CHECK:
446             initial_peers_to_query = dict(full_peerlist)
447             must_query = set(initial_peers_to_query.keys())
448             self.extra_peers = []
449         elif self.mode == MODE_WRITE:
450             # we're planning to replace all the shares, so we want a good
451             # chance of finding them all. We will keep searching until we've
452             # seen epsilon that don't have a share.
453             self.num_peers_to_query = N + self.EPSILON
454             initial_peers_to_query, must_query = self._build_initial_querylist()
455             self.required_num_empty_peers = self.EPSILON
456
457             # TODO: arrange to read lots of data from k-ish servers, to avoid
458             # the extra round trip required to read large directories. This
459             # might also avoid the round trip required to read the encrypted
460             # private key.
461
462         else:
463             initial_peers_to_query, must_query = self._build_initial_querylist()
464
465         # this is a set of peers that we are required to get responses from:
466         # they are peers who used to have a share, so we need to know where
467         # they currently stand, even if that means we have to wait for a
468         # silently-lost TCP connection to time out. We remove peers from this
469         # set as we get responses.
470         self._must_query = must_query
471
472         # now initial_peers_to_query contains the peers that we should ask,
473         # self.must_query contains the peers that we must have heard from
474         # before we can consider ourselves finished, and self.extra_peers
475         # contains the overflow (peers that we should tap if we don't get
476         # enough responses)
477
478         self._send_initial_requests(initial_peers_to_query)
479         self._status.timings["initial_queries"] = time.time() - self._started
480         return self._done_deferred
481
482     def _build_initial_querylist(self):
483         initial_peers_to_query = {}
484         must_query = set()
485         for peerid in self._servermap.all_peers():
486             ss = self._servermap.connections[peerid]
487             # we send queries to everyone who was already in the sharemap
488             initial_peers_to_query[peerid] = ss
489             # and we must wait for responses from them
490             must_query.add(peerid)
491
492         while ((self.num_peers_to_query > len(initial_peers_to_query))
493                and self.extra_peers):
494             (peerid, ss) = self.extra_peers.pop(0)
495             initial_peers_to_query[peerid] = ss
496
497         return initial_peers_to_query, must_query
498
499     def _send_initial_requests(self, peerlist):
500         self._status.set_status("Sending %d initial queries" % len(peerlist))
501         self._queries_outstanding = set()
502         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
503         for (peerid, ss) in peerlist.items():
504             self._queries_outstanding.add(peerid)
505             self._do_query(ss, peerid, self._storage_index, self._read_size)
506
507         if not peerlist:
508             # there is nobody to ask, so we need to short-circuit the state
509             # machine.
510             d = defer.maybeDeferred(self._check_for_done, None)
511             d.addErrback(self._fatal_error)
512
513         # control flow beyond this point: state machine. Receiving responses
514         # from queries is the input. We might send out more queries, or we
515         # might produce a result.
516         return None
517
518     def _do_query(self, ss, peerid, storage_index, readsize):
519         self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
520                  peerid=idlib.shortnodeid_b2a(peerid),
521                  readsize=readsize,
522                  level=log.NOISY)
523         self._servermap.connections[peerid] = ss
524         started = time.time()
525         self._queries_outstanding.add(peerid)
526         d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
527         d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
528                       started)
529         d.addErrback(self._query_failed, peerid)
530         # errors that aren't handled by _query_failed (and errors caused by
531         # _query_failed) get logged, but we still want to check for doneness.
532         d.addErrback(log.err)
533         d.addBoth(self._check_for_done)
534         d.addErrback(self._fatal_error)
535         return d
536
537     def _do_read(self, ss, peerid, storage_index, shnums, readv):
538         if self._add_lease:
539             # send an add-lease message in parallel. The results are handled
540             # separately. This is sent before the slot_readv() so that we can
541             # be sure the add_lease is retired by the time slot_readv comes
542             # back (this relies upon our knowledge that the server code for
543             # add_lease is synchronous).
544             renew_secret = self._node.get_renewal_secret(peerid)
545             cancel_secret = self._node.get_cancel_secret(peerid)
546             d2 = ss.callRemote("add_lease", storage_index,
547                                renew_secret, cancel_secret)
548             # we ignore success
549             d2.addErrback(self._add_lease_failed, peerid, storage_index)
550         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
551         return d
552
553     def _got_results(self, datavs, peerid, readsize, stuff, started):
554         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
555                       peerid=idlib.shortnodeid_b2a(peerid),
556                       numshares=len(datavs),
557                       level=log.NOISY)
558         now = time.time()
559         elapsed = now - started
560         self._queries_outstanding.discard(peerid)
561         self._servermap.reachable_peers.add(peerid)
562         self._must_query.discard(peerid)
563         self._queries_completed += 1
564         if not self._running:
565             self.log("but we're not running, so we'll ignore it", parent=lp,
566                      level=log.NOISY)
567             self._status.add_per_server_time(peerid, "late", started, elapsed)
568             return
569         self._status.add_per_server_time(peerid, "query", started, elapsed)
570
571         if datavs:
572             self._good_peers.add(peerid)
573         else:
574             self._empty_peers.add(peerid)
575
576         last_verinfo = None
577         last_shnum = None
578         for shnum,datav in datavs.items():
579             data = datav[0]
580             try:
581                 verinfo = self._got_results_one_share(shnum, data, peerid, lp)
582                 last_verinfo = verinfo
583                 last_shnum = shnum
584                 self._node._add_to_cache(verinfo, shnum, 0, data, now)
585             except CorruptShareError, e:
586                 # log it and give the other shares a chance to be processed
587                 f = failure.Failure()
588                 self.log(format="bad share: %(f_value)s", f_value=str(f.value),
589                          failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
590                 self.notify_server_corruption(peerid, shnum, str(e))
591                 self._bad_peers.add(peerid)
592                 self._last_failure = f
593                 checkstring = data[:SIGNED_PREFIX_LENGTH]
594                 self._servermap.mark_bad_share(peerid, shnum, checkstring)
595                 self._servermap.problems.append(f)
596                 pass
597
598         self._status.timings["cumulative_verify"] += (time.time() - now)
599
600         if self._need_privkey and last_verinfo:
601             # send them a request for the privkey. We send one request per
602             # server.
603             lp2 = self.log("sending privkey request",
604                            parent=lp, level=log.NOISY)
605             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
606              offsets_tuple) = last_verinfo
607             o = dict(offsets_tuple)
608
609             self._queries_outstanding.add(peerid)
610             readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
611             ss = self._servermap.connections[peerid]
612             privkey_started = time.time()
613             d = self._do_read(ss, peerid, self._storage_index,
614                               [last_shnum], readv)
615             d.addCallback(self._got_privkey_results, peerid, last_shnum,
616                           privkey_started, lp2)
617             d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
618             d.addErrback(log.err)
619             d.addCallback(self._check_for_done)
620             d.addErrback(self._fatal_error)
621
622         # all done!
623         self.log("_got_results done", parent=lp, level=log.NOISY)
624
625     def notify_server_corruption(self, peerid, shnum, reason):
626         ss = self._servermap.connections[peerid]
627         ss.callRemoteOnly("advise_corrupt_share",
628                           "mutable", self._storage_index, shnum, reason)
629
630     def _got_results_one_share(self, shnum, data, peerid, lp):
631         self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
632                  shnum=shnum,
633                  peerid=idlib.shortnodeid_b2a(peerid),
634                  level=log.NOISY,
635                  parent=lp)
636
637         # this might raise NeedMoreDataError, if the pubkey and signature
638         # live at some weird offset. That shouldn't happen, so I'm going to
639         # treat it as a bad share.
640         (seqnum, root_hash, IV, k, N, segsize, datalength,
641          pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
642
643         if not self._node.get_pubkey():
644             fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
645             assert len(fingerprint) == 32
646             if fingerprint != self._node.get_fingerprint():
647                 raise CorruptShareError(peerid, shnum,
648                                         "pubkey doesn't match fingerprint")
649             self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
650
651         if self._need_privkey:
652             self._try_to_extract_privkey(data, peerid, shnum, lp)
653
654         (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
655          ig_segsize, ig_datalen, offsets) = unpack_header(data)
656         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
657
658         verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
659                    offsets_tuple)
660
661         if verinfo not in self._valid_versions:
662             # it's a new pair. Verify the signature.
663             valid = self._node.get_pubkey().verify(prefix, signature)
664             if not valid:
665                 raise CorruptShareError(peerid, shnum, "signature is invalid")
666
667             # ok, it's a valid verinfo. Add it to the list of validated
668             # versions.
669             self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
670                      % (seqnum, base32.b2a(root_hash)[:4],
671                         idlib.shortnodeid_b2a(peerid), shnum,
672                         k, N, segsize, datalength),
673                      parent=lp)
674             self._valid_versions.add(verinfo)
675         # We now know that this is a valid candidate verinfo.
676
677         if (peerid, shnum) in self._servermap.bad_shares:
678             # we've been told that the rest of the data in this share is
679             # unusable, so don't add it to the servermap.
680             self.log("but we've been told this is a bad share",
681                      parent=lp, level=log.UNUSUAL)
682             return verinfo
683
684         # Add the info to our servermap.
685         timestamp = time.time()
686         self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
687         # and the versionmap
688         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
689         return verinfo
690
691     def _deserialize_pubkey(self, pubkey_s):
692         verifier = rsa.create_verifying_key_from_string(pubkey_s)
693         return verifier
694
695     def _try_to_extract_privkey(self, data, peerid, shnum, lp):
696         try:
697             r = unpack_share(data)
698         except NeedMoreDataError, e:
699             # this share won't help us. oh well.
700             offset = e.encprivkey_offset
701             length = e.encprivkey_length
702             self.log("shnum %d on peerid %s: share was too short (%dB) "
703                      "to get the encprivkey; [%d:%d] ought to hold it" %
704                      (shnum, idlib.shortnodeid_b2a(peerid), len(data),
705                       offset, offset+length),
706                      parent=lp)
707             # NOTE: if uncoordinated writes are taking place, someone might
708             # change the share (and most probably move the encprivkey) before
709             # we get a chance to do one of these reads and fetch it. This
710             # will cause us to see a NotEnoughSharesError(unable to fetch
711             # privkey) instead of an UncoordinatedWriteError . This is a
712             # nuisance, but it will go away when we move to DSA-based mutable
713             # files (since the privkey will be small enough to fit in the
714             # write cap).
715
716             return
717
718         (seqnum, root_hash, IV, k, N, segsize, datalen,
719          pubkey, signature, share_hash_chain, block_hash_tree,
720          share_data, enc_privkey) = r
721
722         return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
723
724     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
725
726         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
727         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
728         if alleged_writekey != self._node.get_writekey():
729             self.log("invalid privkey from %s shnum %d" %
730                      (idlib.nodeid_b2a(peerid)[:8], shnum),
731                      parent=lp, level=log.WEIRD, umid="aJVccw")
732             return
733
734         # it's good
735         self.log("got valid privkey from shnum %d on peerid %s" %
736                  (shnum, idlib.shortnodeid_b2a(peerid)),
737                  parent=lp)
738         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
739         self._node._populate_encprivkey(enc_privkey)
740         self._node._populate_privkey(privkey)
741         self._need_privkey = False
742         self._status.set_privkey_from(peerid)
743
744
745     def _add_lease_failed(self, f, peerid, storage_index):
746         # Older versions of Tahoe didn't handle the add-lease message very
747         # well: <=1.1.0 throws a NameError because it doesn't implement
748         # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
749         # (which is most of them, since we send add-lease to everybody,
750         # before we know whether or not they have any shares for us), and
751         # 1.2.0 throws KeyError even on known buckets due to an internal bug
752         # in the latency-measuring code.
753
754         # we want to ignore the known-harmless errors and log the others. In
755         # particular we want to log any local errors caused by coding
756         # problems.
757
758         if f.check(DeadReferenceError):
759             return
760         if f.check(RemoteException):
761             if f.value.failure.check(KeyError, IndexError, NameError):
762                 # this may ignore a bit too much, but that only hurts us
763                 # during debugging
764                 return
765             self.log(format="error in add_lease from [%(peerid)s]: %(f_value)s",
766                      peerid=idlib.shortnodeid_b2a(peerid),
767                      f_value=str(f.value),
768                      failure=f,
769                      level=log.WEIRD, umid="iqg3mw")
770             return
771         # local errors are cause for alarm
772         log.err(f,
773                 format="local error in add_lease to [%(peerid)s]: %(f_value)s",
774                 peerid=idlib.shortnodeid_b2a(peerid),
775                 f_value=str(f.value),
776                 level=log.WEIRD, umid="ZWh6HA")
777
778     def _query_failed(self, f, peerid):
779         if not self._running:
780             return
781         level = log.WEIRD
782         if f.check(DeadReferenceError):
783             level = log.UNUSUAL
784         self.log(format="error during query: %(f_value)s",
785                  f_value=str(f.value), failure=f,
786                  level=level, umid="IHXuQg")
787         self._must_query.discard(peerid)
788         self._queries_outstanding.discard(peerid)
789         self._bad_peers.add(peerid)
790         self._servermap.problems.append(f)
791         # a peerid could be in both ServerMap.reachable_peers and
792         # .unreachable_peers if they responded to our query, but then an
793         # exception was raised in _got_results.
794         self._servermap.unreachable_peers.add(peerid)
795         self._queries_completed += 1
796         self._last_failure = f
797
798     def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
799         now = time.time()
800         elapsed = now - started
801         self._status.add_per_server_time(peerid, "privkey", started, elapsed)
802         self._queries_outstanding.discard(peerid)
803         if not self._need_privkey:
804             return
805         if shnum not in datavs:
806             self.log("privkey wasn't there when we asked it",
807                      level=log.WEIRD, umid="VA9uDQ")
808             return
809         datav = datavs[shnum]
810         enc_privkey = datav[0]
811         self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
812
813     def _privkey_query_failed(self, f, peerid, shnum, lp):
814         self._queries_outstanding.discard(peerid)
815         if not self._running:
816             return
817         level = log.WEIRD
818         if f.check(DeadReferenceError):
819             level = log.UNUSUAL
820         self.log(format="error during privkey query: %(f_value)s",
821                  f_value=str(f.value), failure=f,
822                  parent=lp, level=level, umid="McoJ5w")
823         self._servermap.problems.append(f)
824         self._last_failure = f
825
826     def _check_for_done(self, res):
827         # exit paths:
828         #  return self._send_more_queries(outstanding) : send some more queries
829         #  return self._done() : all done
830         #  return : keep waiting, no new queries
831
832         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
833                               "%(outstanding)d queries outstanding, "
834                               "%(extra)d extra peers available, "
835                               "%(must)d 'must query' peers left, "
836                               "need_privkey=%(need_privkey)s"
837                               ),
838                       mode=self.mode,
839                       outstanding=len(self._queries_outstanding),
840                       extra=len(self.extra_peers),
841                       must=len(self._must_query),
842                       need_privkey=self._need_privkey,
843                       level=log.NOISY,
844                       )
845
846         if not self._running:
847             self.log("but we're not running", parent=lp, level=log.NOISY)
848             return
849
850         if self._must_query:
851             # we are still waiting for responses from peers that used to have
852             # a share, so we must continue to wait. No additional queries are
853             # required at this time.
854             self.log("%d 'must query' peers left" % len(self._must_query),
855                      level=log.NOISY, parent=lp)
856             return
857
858         if (not self._queries_outstanding and not self.extra_peers):
859             # all queries have retired, and we have no peers left to ask. No
860             # more progress can be made, therefore we are done.
861             self.log("all queries are retired, no extra peers: done",
862                      parent=lp)
863             return self._done()
864
865         recoverable_versions = self._servermap.recoverable_versions()
866         unrecoverable_versions = self._servermap.unrecoverable_versions()
867
868         # what is our completion policy? how hard should we work?
869
870         if self.mode == MODE_ANYTHING:
871             if recoverable_versions:
872                 self.log("%d recoverable versions: done"
873                          % len(recoverable_versions),
874                          parent=lp)
875                 return self._done()
876
877         if self.mode == MODE_CHECK:
878             # we used self._must_query, and we know there aren't any
879             # responses still waiting, so that means we must be done
880             self.log("done", parent=lp)
881             return self._done()
882
883         MAX_IN_FLIGHT = 5
884         if self.mode == MODE_READ:
885             # if we've queried k+epsilon servers, and we see a recoverable
886             # version, and we haven't seen any unrecoverable higher-seqnum'ed
887             # versions, then we're done.
888
889             if self._queries_completed < self.num_peers_to_query:
890                 self.log(format="%(completed)d completed, %(query)d to query: need more",
891                          completed=self._queries_completed,
892                          query=self.num_peers_to_query,
893                          level=log.NOISY, parent=lp)
894                 return self._send_more_queries(MAX_IN_FLIGHT)
895             if not recoverable_versions:
896                 self.log("no recoverable versions: need more",
897                          level=log.NOISY, parent=lp)
898                 return self._send_more_queries(MAX_IN_FLIGHT)
899             highest_recoverable = max(recoverable_versions)
900             highest_recoverable_seqnum = highest_recoverable[0]
901             for unrec_verinfo in unrecoverable_versions:
902                 if unrec_verinfo[0] > highest_recoverable_seqnum:
903                     # there is evidence of a higher-seqnum version, but we
904                     # don't yet see enough shares to recover it. Try harder.
905                     # TODO: consider sending more queries.
906                     # TODO: consider limiting the search distance
907                     self.log("evidence of higher seqnum: need more",
908                              level=log.UNUSUAL, parent=lp)
909                     return self._send_more_queries(MAX_IN_FLIGHT)
910             # all the unrecoverable versions were old or concurrent with a
911             # recoverable version. Good enough.
912             self.log("no higher-seqnum: done", parent=lp)
913             return self._done()
914
915         if self.mode == MODE_WRITE:
916             # we want to keep querying until we've seen a few that don't have
917             # any shares, to be sufficiently confident that we've seen all
918             # the shares. This is still less work than MODE_CHECK, which asks
919             # every server in the world.
920
921             if not recoverable_versions:
922                 self.log("no recoverable versions: need more", parent=lp,
923                          level=log.NOISY)
924                 return self._send_more_queries(MAX_IN_FLIGHT)
925
926             last_found = -1
927             last_not_responded = -1
928             num_not_responded = 0
929             num_not_found = 0
930             states = []
931             found_boundary = False
932
933             for i,(peerid,ss) in enumerate(self.full_peerlist):
934                 if peerid in self._bad_peers:
935                     # query failed
936                     states.append("x")
937                     #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
938                 elif peerid in self._empty_peers:
939                     # no shares
940                     states.append("0")
941                     #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
942                     if last_found != -1:
943                         num_not_found += 1
944                         if num_not_found >= self.EPSILON:
945                             self.log("found our boundary, %s" %
946                                      "".join(states),
947                                      parent=lp, level=log.NOISY)
948                             found_boundary = True
949                             break
950
951                 elif peerid in self._good_peers:
952                     # yes shares
953                     states.append("1")
954                     #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
955                     last_found = i
956                     num_not_found = 0
957                 else:
958                     # not responded yet
959                     states.append("?")
960                     #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
961                     last_not_responded = i
962                     num_not_responded += 1
963
964             if found_boundary:
965                 # we need to know that we've gotten answers from
966                 # everybody to the left of here
967                 if last_not_responded == -1:
968                     # we're done
969                     self.log("have all our answers",
970                              parent=lp, level=log.NOISY)
971                     # .. unless we're still waiting on the privkey
972                     if self._need_privkey:
973                         self.log("but we're still waiting for the privkey",
974                                  parent=lp, level=log.NOISY)
975                         # if we found the boundary but we haven't yet found
976                         # the privkey, we may need to look further. If
977                         # somehow all the privkeys were corrupted (but the
978                         # shares were readable), then this is likely to do an
979                         # exhaustive search.
980                         return self._send_more_queries(MAX_IN_FLIGHT)
981                     return self._done()
982                 # still waiting for somebody
983                 return self._send_more_queries(num_not_responded)
984
985             # if we hit here, we didn't find our boundary, so we're still
986             # waiting for peers
987             self.log("no boundary yet, %s" % "".join(states), parent=lp,
988                      level=log.NOISY)
989             return self._send_more_queries(MAX_IN_FLIGHT)
990
991         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
992         # arbitrary, really I want this to be something like k -
993         # max(known_version_sharecounts) + some extra
994         self.log("catchall: need more", parent=lp, level=log.NOISY)
995         return self._send_more_queries(MAX_IN_FLIGHT)
996
997     def _send_more_queries(self, num_outstanding):
998         more_queries = []
999
1000         while True:
1001             self.log(format=" there are %(outstanding)d queries outstanding",
1002                      outstanding=len(self._queries_outstanding),
1003                      level=log.NOISY)
1004             active_queries = len(self._queries_outstanding) + len(more_queries)
1005             if active_queries >= num_outstanding:
1006                 break
1007             if not self.extra_peers:
1008                 break
1009             more_queries.append(self.extra_peers.pop(0))
1010
1011         self.log(format="sending %(more)d more queries: %(who)s",
1012                  more=len(more_queries),
1013                  who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
1014                                for (peerid,ss) in more_queries]),
1015                  level=log.NOISY)
1016
1017         for (peerid, ss) in more_queries:
1018             self._do_query(ss, peerid, self._storage_index, self._read_size)
1019             # we'll retrigger when those queries come back
1020
1021     def _done(self):
1022         if not self._running:
1023             return
1024         self._running = False
1025         now = time.time()
1026         elapsed = now - self._started
1027         self._status.set_finished(now)
1028         self._status.timings["total"] = elapsed
1029         self._status.set_progress(1.0)
1030         self._status.set_status("Finished")
1031         self._status.set_active(False)
1032
1033         self._servermap.last_update_mode = self.mode
1034         self._servermap.last_update_time = self._started
1035         # the servermap will not be touched after this
1036         self.log("servermap: %s" % self._servermap.summarize_versions())
1037         eventually(self._done_deferred.callback, self._servermap)
1038
1039     def _fatal_error(self, f):
1040         self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1041         self._done_deferred.errback(f)
1042
1043