]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
logging cleanups: lower DeadReferenceError from WEIRD (which provokes Incidents)...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.eventual import eventually
8 from allmydata.util import base32, hashutil, idlib, log
9 from allmydata import storage
10 from allmydata.interfaces import IServermapUpdaterStatus
11 from pycryptopp.publickey import rsa
12
13 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
14      DictOfSets, CorruptShareError, NeedMoreDataError
15 from layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
16      SIGNED_PREFIX_LENGTH
17
18 class UpdateStatus:
19     implements(IServermapUpdaterStatus)
20     statusid_counter = count(0)
21     def __init__(self):
22         self.timings = {}
23         self.timings["per_server"] = {}
24         self.timings["cumulative_verify"] = 0.0
25         self.privkey_from = None
26         self.problems = {}
27         self.active = True
28         self.storage_index = None
29         self.mode = "?"
30         self.status = "Not started"
31         self.progress = 0.0
32         self.counter = self.statusid_counter.next()
33         self.started = time.time()
34         self.finished = None
35
36     def add_per_server_time(self, peerid, op, sent, elapsed):
37         assert op in ("query", "late", "privkey")
38         if peerid not in self.timings["per_server"]:
39             self.timings["per_server"][peerid] = []
40         self.timings["per_server"][peerid].append((op,sent,elapsed))
41
42     def get_started(self):
43         return self.started
44     def get_finished(self):
45         return self.finished
46     def get_storage_index(self):
47         return self.storage_index
48     def get_mode(self):
49         return self.mode
50     def get_servermap(self):
51         return self.servermap
52     def get_privkey_from(self):
53         return self.privkey_from
54     def using_helper(self):
55         return False
56     def get_size(self):
57         return "-NA-"
58     def get_status(self):
59         return self.status
60     def get_progress(self):
61         return self.progress
62     def get_active(self):
63         return self.active
64     def get_counter(self):
65         return self.counter
66
67     def set_storage_index(self, si):
68         self.storage_index = si
69     def set_mode(self, mode):
70         self.mode = mode
71     def set_privkey_from(self, peerid):
72         self.privkey_from = peerid
73     def set_status(self, status):
74         self.status = status
75     def set_progress(self, value):
76         self.progress = value
77     def set_active(self, value):
78         self.active = value
79     def set_finished(self, when):
80         self.finished = when
81
82 class ServerMap:
83     """I record the placement of mutable shares.
84
85     This object records which shares (of various versions) are located on
86     which servers.
87
88     One purpose I serve is to inform callers about which versions of the
89     mutable file are recoverable and 'current'.
90
91     A second purpose is to serve as a state marker for test-and-set
92     operations. I am passed out of retrieval operations and back into publish
93     operations, which means 'publish this new version, but only if nothing
94     has changed since I last retrieved this data'. This reduces the chances
95     of clobbering a simultaneous (uncoordinated) write.
96
97     @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
98                      (versionid, timestamp) tuple. Each 'versionid' is a
99                      tuple of (seqnum, root_hash, IV, segsize, datalength,
100                      k, N, signed_prefix, offsets)
101
102     @ivar connections: maps peerid to a RemoteReference
103
104     @ivar bad_shares: a sequence of (peerid, shnum) tuples, describing
105                       shares that I should ignore (because a previous user of
106                       the servermap determined that they were invalid). The
107                       updater only locates a certain number of shares: if
108                       some of these turn out to have integrity problems and
109                       are unusable, the caller will need to mark those shares
110                       as bad, then re-update the servermap, then try again.
111     """
112
113     def __init__(self):
114         self.servermap = {}
115         self.connections = {}
116         self.unreachable_peers = set() # peerids that didn't respond to queries
117         self.problems = [] # mostly for debugging
118         self.bad_shares = {} # maps (peerid,shnum) to old checkstring
119         self.last_update_mode = None
120         self.last_update_time = 0
121
122     def mark_bad_share(self, peerid, shnum, checkstring):
123         """This share was found to be bad, either in the checkstring or
124         signature (detected during mapupdate), or deeper in the share
125         (detected at retrieve time). Remove it from our list of useful
126         shares, and remember that it is bad so we don't add it back again
127         later. We record the share's old checkstring (which might be
128         corrupted or badly signed) so that a repair operation can do the
129         test-and-set using it as a reference.
130         """
131         key = (peerid, shnum) # record checkstring
132         self.bad_shares[key] = checkstring
133         self.servermap.pop(key, None)
134
135     def add_new_share(self, peerid, shnum, verinfo, timestamp):
136         """We've written a new share out, replacing any that was there
137         before."""
138         key = (peerid, shnum)
139         self.bad_shares.pop(key, None)
140         self.servermap[key] = (verinfo, timestamp)
141
142     def dump(self, out=sys.stdout):
143         print >>out, "servermap:"
144
145         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
146             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
147              offsets_tuple) = verinfo
148             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
149                           (idlib.shortnodeid_b2a(peerid), shnum,
150                            seqnum, base32.b2a(root_hash)[:4], k, N,
151                            datalength))
152         if self.problems:
153             print >>out, "%d PROBLEMS" % len(self.problems)
154             for f in self.problems:
155                 print >>out, str(f)
156         return out
157
158     def all_peers(self):
159         return set([peerid
160                     for (peerid, shnum)
161                     in self.servermap])
162
163     def make_sharemap(self):
164         """Return a dict that maps shnum to a set of peerds that hold it."""
165         sharemap = DictOfSets()
166         for (peerid, shnum) in self.servermap:
167             sharemap.add(shnum, peerid)
168         return sharemap
169
170     def make_versionmap(self):
171         """Return a dict that maps versionid to sets of (shnum, peerid,
172         timestamp) tuples."""
173         versionmap = DictOfSets()
174         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
175             versionmap.add(verinfo, (shnum, peerid, timestamp))
176         return versionmap
177
178     def shares_on_peer(self, peerid):
179         return set([shnum
180                     for (s_peerid, shnum)
181                     in self.servermap
182                     if s_peerid == peerid])
183
184     def version_on_peer(self, peerid, shnum):
185         key = (peerid, shnum)
186         if key in self.servermap:
187             (verinfo, timestamp) = self.servermap[key]
188             return verinfo
189         return None
190
191     def shares_available(self):
192         """Return a dict that maps verinfo to tuples of
193         (num_distinct_shares, k, N) tuples."""
194         versionmap = self.make_versionmap()
195         all_shares = {}
196         for verinfo, shares in versionmap.items():
197             s = set()
198             for (shnum, peerid, timestamp) in shares:
199                 s.add(shnum)
200             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
201              offsets_tuple) = verinfo
202             all_shares[verinfo] = (len(s), k, N)
203         return all_shares
204
205     def highest_seqnum(self):
206         available = self.shares_available()
207         seqnums = [verinfo[0]
208                    for verinfo in available.keys()]
209         seqnums.append(0)
210         return max(seqnums)
211
212     def summarize_version(self, verinfo):
213         """Take a versionid, return a string that describes it."""
214         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
215          offsets_tuple) = verinfo
216         return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
217
218     def summarize_versions(self):
219         """Return a string describing which versions we know about."""
220         versionmap = self.make_versionmap()
221         bits = []
222         for (verinfo, shares) in versionmap.items():
223             vstr = self.summarize_version(verinfo)
224             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
225             bits.append("%d*%s" % (len(shnums), vstr))
226         return "/".join(bits)
227
228     def recoverable_versions(self):
229         """Return a set of versionids, one for each version that is currently
230         recoverable."""
231         versionmap = self.make_versionmap()
232
233         recoverable_versions = set()
234         for (verinfo, shares) in versionmap.items():
235             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
236              offsets_tuple) = verinfo
237             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
238             if len(shnums) >= k:
239                 # this one is recoverable
240                 recoverable_versions.add(verinfo)
241
242         return recoverable_versions
243
244     def unrecoverable_versions(self):
245         """Return a set of versionids, one for each version that is currently
246         unrecoverable."""
247         versionmap = self.make_versionmap()
248
249         unrecoverable_versions = set()
250         for (verinfo, shares) in versionmap.items():
251             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
252              offsets_tuple) = verinfo
253             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
254             if len(shnums) < k:
255                 unrecoverable_versions.add(verinfo)
256
257         return unrecoverable_versions
258
259     def best_recoverable_version(self):
260         """Return a single versionid, for the so-called 'best' recoverable
261         version. Sequence number is the primary sort criteria, followed by
262         root hash. Returns None if there are no recoverable versions."""
263         recoverable = list(self.recoverable_versions())
264         recoverable.sort()
265         if recoverable:
266             return recoverable[-1]
267         return None
268
269     def size_of_version(self, verinfo):
270         """Given a versionid (perhaps returned by best_recoverable_version),
271         return the size of the file in bytes."""
272         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
273          offsets_tuple) = verinfo
274         return datalength
275
276     def unrecoverable_newer_versions(self):
277         # Return a dict of versionid -> health, for versions that are
278         # unrecoverable and have later seqnums than any recoverable versions.
279         # These indicate that a write will lose data.
280         versionmap = self.make_versionmap()
281         healths = {} # maps verinfo to (found,k)
282         unrecoverable = set()
283         highest_recoverable_seqnum = -1
284         for (verinfo, shares) in versionmap.items():
285             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
286              offsets_tuple) = verinfo
287             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
288             healths[verinfo] = (len(shnums),k)
289             if len(shnums) < k:
290                 unrecoverable.add(verinfo)
291             else:
292                 highest_recoverable_seqnum = max(seqnum,
293                                                  highest_recoverable_seqnum)
294
295         newversions = {}
296         for verinfo in unrecoverable:
297             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
298              offsets_tuple) = verinfo
299             if seqnum > highest_recoverable_seqnum:
300                 newversions[verinfo] = healths[verinfo]
301
302         return newversions
303
304
305     def needs_merge(self):
306         # return True if there are multiple recoverable versions with the
307         # same seqnum, meaning that MutableFileNode.read_best_version is not
308         # giving you the whole story, and that using its data to do a
309         # subsequent publish will lose information.
310         return bool(len(self.recoverable_versions()) > 1)
311
312
313 class ServermapUpdater:
314     def __init__(self, filenode, servermap, mode=MODE_READ):
315         """I update a servermap, locating a sufficient number of useful
316         shares and remembering where they are located.
317
318         """
319
320         self._node = filenode
321         self._servermap = servermap
322         self.mode = mode
323         self._running = True
324
325         self._storage_index = filenode.get_storage_index()
326         self._last_failure = None
327
328         self._status = UpdateStatus()
329         self._status.set_storage_index(self._storage_index)
330         self._status.set_progress(0.0)
331         self._status.set_mode(mode)
332
333         # how much data should we read?
334         #  * if we only need the checkstring, then [0:75]
335         #  * if we need to validate the checkstring sig, then [543ish:799ish]
336         #  * if we need the verification key, then [107:436ish]
337         #   * the offset table at [75:107] tells us about the 'ish'
338         #  * if we need the encrypted private key, we want [-1216ish:]
339         #   * but we can't read from negative offsets
340         #   * the offset table tells us the 'ish', also the positive offset
341         # A future version of the SMDF slot format should consider using
342         # fixed-size slots so we can retrieve less data. For now, we'll just
343         # read 2000 bytes, which also happens to read enough actual data to
344         # pre-fetch a 9-entry dirnode.
345         self._read_size = 2000
346         if mode == MODE_CHECK:
347             # we use unpack_prefix_and_signature, so we need 1k
348             self._read_size = 1000
349         self._need_privkey = False
350         if mode == MODE_WRITE and not self._node._privkey:
351             self._need_privkey = True
352
353         prefix = storage.si_b2a(self._storage_index)[:5]
354         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
355                                    si=prefix, mode=mode)
356
357     def get_status(self):
358         return self._status
359
360     def log(self, *args, **kwargs):
361         if "parent" not in kwargs:
362             kwargs["parent"] = self._log_number
363         if "facility" not in kwargs:
364             kwargs["facility"] = "tahoe.mutable.mapupdate"
365         return log.msg(*args, **kwargs)
366
367     def update(self):
368         """Update the servermap to reflect current conditions. Returns a
369         Deferred that fires with the servermap once the update has finished."""
370         self._started = time.time()
371         self._status.set_active(True)
372
373         # self._valid_versions is a set of validated verinfo tuples. We just
374         # use it to remember which versions had valid signatures, so we can
375         # avoid re-checking the signatures for each share.
376         self._valid_versions = set()
377
378         # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
379         # timestamp) tuples. This is used to figure out which versions might
380         # be retrievable, and to make the eventual data download faster.
381         self.versionmap = DictOfSets()
382
383         self._done_deferred = defer.Deferred()
384
385         # first, which peers should be talk to? Any that were in our old
386         # servermap, plus "enough" others.
387
388         self._queries_completed = 0
389
390         client = self._node._client
391         full_peerlist = client.get_permuted_peers("storage",
392                                                   self._node._storage_index)
393         self.full_peerlist = full_peerlist # for use later, immutable
394         self.extra_peers = full_peerlist[:] # peers are removed as we use them
395         self._good_peers = set() # peers who had some shares
396         self._empty_peers = set() # peers who don't have any shares
397         self._bad_peers = set() # peers to whom our queries failed
398
399         k = self._node.get_required_shares()
400         if k is None:
401             # make a guess
402             k = 3
403         N = self._node.get_required_shares()
404         if N is None:
405             N = 10
406         self.EPSILON = k
407         # we want to send queries to at least this many peers (although we
408         # might not wait for all of their answers to come back)
409         self.num_peers_to_query = k + self.EPSILON
410
411         if self.mode == MODE_CHECK:
412             initial_peers_to_query = dict(full_peerlist)
413             must_query = set(initial_peers_to_query.keys())
414             self.extra_peers = []
415         elif self.mode == MODE_WRITE:
416             # we're planning to replace all the shares, so we want a good
417             # chance of finding them all. We will keep searching until we've
418             # seen epsilon that don't have a share.
419             self.num_peers_to_query = N + self.EPSILON
420             initial_peers_to_query, must_query = self._build_initial_querylist()
421             self.required_num_empty_peers = self.EPSILON
422
423             # TODO: arrange to read lots of data from k-ish servers, to avoid
424             # the extra round trip required to read large directories. This
425             # might also avoid the round trip required to read the encrypted
426             # private key.
427
428         else:
429             initial_peers_to_query, must_query = self._build_initial_querylist()
430
431         # this is a set of peers that we are required to get responses from:
432         # they are peers who used to have a share, so we need to know where
433         # they currently stand, even if that means we have to wait for a
434         # silently-lost TCP connection to time out. We remove peers from this
435         # set as we get responses.
436         self._must_query = must_query
437
438         # now initial_peers_to_query contains the peers that we should ask,
439         # self.must_query contains the peers that we must have heard from
440         # before we can consider ourselves finished, and self.extra_peers
441         # contains the overflow (peers that we should tap if we don't get
442         # enough responses)
443
444         self._send_initial_requests(initial_peers_to_query)
445         self._status.timings["initial_queries"] = time.time() - self._started
446         return self._done_deferred
447
448     def _build_initial_querylist(self):
449         initial_peers_to_query = {}
450         must_query = set()
451         for peerid in self._servermap.all_peers():
452             ss = self._servermap.connections[peerid]
453             # we send queries to everyone who was already in the sharemap
454             initial_peers_to_query[peerid] = ss
455             # and we must wait for responses from them
456             must_query.add(peerid)
457
458         while ((self.num_peers_to_query > len(initial_peers_to_query))
459                and self.extra_peers):
460             (peerid, ss) = self.extra_peers.pop(0)
461             initial_peers_to_query[peerid] = ss
462
463         return initial_peers_to_query, must_query
464
465     def _send_initial_requests(self, peerlist):
466         self._status.set_status("Sending %d initial queries" % len(peerlist))
467         self._queries_outstanding = set()
468         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
469         dl = []
470         for (peerid, ss) in peerlist.items():
471             self._queries_outstanding.add(peerid)
472             self._do_query(ss, peerid, self._storage_index, self._read_size)
473
474         if not peerlist:
475             # there is nobody to ask, so we need to short-circuit the state
476             # machine.
477             d = defer.maybeDeferred(self._check_for_done, None)
478             d.addErrback(self._fatal_error)
479
480         # control flow beyond this point: state machine. Receiving responses
481         # from queries is the input. We might send out more queries, or we
482         # might produce a result.
483         return None
484
485     def _do_query(self, ss, peerid, storage_index, readsize):
486         self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
487                  peerid=idlib.shortnodeid_b2a(peerid),
488                  readsize=readsize,
489                  level=log.NOISY)
490         self._servermap.connections[peerid] = ss
491         started = time.time()
492         self._queries_outstanding.add(peerid)
493         d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
494         d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
495                       started)
496         d.addErrback(self._query_failed, peerid)
497         # errors that aren't handled by _query_failed (and errors caused by
498         # _query_failed) get logged, but we still want to check for doneness.
499         d.addErrback(log.err)
500         d.addBoth(self._check_for_done)
501         d.addErrback(self._fatal_error)
502         return d
503
504     def _do_read(self, ss, peerid, storage_index, shnums, readv):
505         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
506         return d
507
508     def _got_results(self, datavs, peerid, readsize, stuff, started):
509         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
510                       peerid=idlib.shortnodeid_b2a(peerid),
511                       numshares=len(datavs),
512                       level=log.NOISY)
513         now = time.time()
514         elapsed = now - started
515         self._queries_outstanding.discard(peerid)
516         self._must_query.discard(peerid)
517         self._queries_completed += 1
518         if not self._running:
519             self.log("but we're not running, so we'll ignore it", parent=lp,
520                      level=log.NOISY)
521             self._status.add_per_server_time(peerid, "late", started, elapsed)
522             return
523         self._status.add_per_server_time(peerid, "query", started, elapsed)
524
525         if datavs:
526             self._good_peers.add(peerid)
527         else:
528             self._empty_peers.add(peerid)
529
530         last_verinfo = None
531         last_shnum = None
532         for shnum,datav in datavs.items():
533             data = datav[0]
534             try:
535                 verinfo = self._got_results_one_share(shnum, data, peerid, lp)
536                 last_verinfo = verinfo
537                 last_shnum = shnum
538                 self._node._cache.add(verinfo, shnum, 0, data, now)
539             except CorruptShareError, e:
540                 # log it and give the other shares a chance to be processed
541                 f = failure.Failure()
542                 self.log(format="bad share: %(f_value)s", f_value=str(f.value),
543                          failure=f, parent=lp, level=log.WEIRD)
544                 self._bad_peers.add(peerid)
545                 self._last_failure = f
546                 checkstring = data[:SIGNED_PREFIX_LENGTH]
547                 self._servermap.mark_bad_share(peerid, shnum, checkstring)
548                 self._servermap.problems.append(f)
549                 pass
550
551         self._status.timings["cumulative_verify"] += (time.time() - now)
552
553         if self._need_privkey and last_verinfo:
554             # send them a request for the privkey. We send one request per
555             # server.
556             lp2 = self.log("sending privkey request",
557                            parent=lp, level=log.NOISY)
558             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
559              offsets_tuple) = last_verinfo
560             o = dict(offsets_tuple)
561
562             self._queries_outstanding.add(peerid)
563             readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
564             ss = self._servermap.connections[peerid]
565             privkey_started = time.time()
566             d = self._do_read(ss, peerid, self._storage_index,
567                               [last_shnum], readv)
568             d.addCallback(self._got_privkey_results, peerid, last_shnum,
569                           privkey_started, lp2)
570             d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
571             d.addErrback(log.err)
572             d.addCallback(self._check_for_done)
573             d.addErrback(self._fatal_error)
574
575         # all done!
576         self.log("_got_results done", parent=lp, level=log.NOISY)
577
578     def _got_results_one_share(self, shnum, data, peerid, lp):
579         self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
580                  shnum=shnum,
581                  peerid=idlib.shortnodeid_b2a(peerid),
582                  level=log.NOISY,
583                  parent=lp)
584
585         # this might raise NeedMoreDataError, if the pubkey and signature
586         # live at some weird offset. That shouldn't happen, so I'm going to
587         # treat it as a bad share.
588         (seqnum, root_hash, IV, k, N, segsize, datalength,
589          pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
590
591         if not self._node.get_pubkey():
592             fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
593             assert len(fingerprint) == 32
594             if fingerprint != self._node._fingerprint:
595                 raise CorruptShareError(peerid, shnum,
596                                         "pubkey doesn't match fingerprint")
597             self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
598
599         if self._need_privkey:
600             self._try_to_extract_privkey(data, peerid, shnum, lp)
601
602         (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
603          ig_segsize, ig_datalen, offsets) = unpack_header(data)
604         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
605
606         verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
607                    offsets_tuple)
608
609         if verinfo not in self._valid_versions:
610             # it's a new pair. Verify the signature.
611             valid = self._node._pubkey.verify(prefix, signature)
612             if not valid:
613                 raise CorruptShareError(peerid, shnum, "signature is invalid")
614
615             # ok, it's a valid verinfo. Add it to the list of validated
616             # versions.
617             self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
618                      % (seqnum, base32.b2a(root_hash)[:4],
619                         idlib.shortnodeid_b2a(peerid), shnum,
620                         k, N, segsize, datalength),
621                      parent=lp)
622             self._valid_versions.add(verinfo)
623         # We now know that this is a valid candidate verinfo.
624
625         if (peerid, shnum) in self._servermap.bad_shares:
626             # we've been told that the rest of the data in this share is
627             # unusable, so don't add it to the servermap.
628             self.log("but we've been told this is a bad share",
629                      parent=lp, level=log.UNUSUAL)
630             return verinfo
631
632         # Add the info to our servermap.
633         timestamp = time.time()
634         self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
635         # and the versionmap
636         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
637         return verinfo
638
639     def _deserialize_pubkey(self, pubkey_s):
640         verifier = rsa.create_verifying_key_from_string(pubkey_s)
641         return verifier
642
643     def _try_to_extract_privkey(self, data, peerid, shnum, lp):
644         try:
645             r = unpack_share(data)
646         except NeedMoreDataError, e:
647             # this share won't help us. oh well.
648             offset = e.encprivkey_offset
649             length = e.encprivkey_length
650             self.log("shnum %d on peerid %s: share was too short (%dB) "
651                      "to get the encprivkey; [%d:%d] ought to hold it" %
652                      (shnum, idlib.shortnodeid_b2a(peerid), len(data),
653                       offset, offset+length),
654                      parent=lp)
655             # NOTE: if uncoordinated writes are taking place, someone might
656             # change the share (and most probably move the encprivkey) before
657             # we get a chance to do one of these reads and fetch it. This
658             # will cause us to see a NotEnoughSharesError(unable to fetch
659             # privkey) instead of an UncoordinatedWriteError . This is a
660             # nuisance, but it will go away when we move to DSA-based mutable
661             # files (since the privkey will be small enough to fit in the
662             # write cap).
663
664             return
665
666         (seqnum, root_hash, IV, k, N, segsize, datalen,
667          pubkey, signature, share_hash_chain, block_hash_tree,
668          share_data, enc_privkey) = r
669
670         return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
671
672     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
673
674         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
675         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
676         if alleged_writekey != self._node.get_writekey():
677             self.log("invalid privkey from %s shnum %d" %
678                      (idlib.nodeid_b2a(peerid)[:8], shnum),
679                      parent=lp, level=log.WEIRD)
680             return
681
682         # it's good
683         self.log("got valid privkey from shnum %d on peerid %s" %
684                  (shnum, idlib.shortnodeid_b2a(peerid)),
685                  parent=lp)
686         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
687         self._node._populate_encprivkey(enc_privkey)
688         self._node._populate_privkey(privkey)
689         self._need_privkey = False
690         self._status.set_privkey_from(peerid)
691
692
693     def _query_failed(self, f, peerid):
694         self.log(format="error during query: %(f_value)s",
695                  f_value=str(f.value), failure=f, level=log.WEIRD)
696         if not self._running:
697             return
698         self._must_query.discard(peerid)
699         self._queries_outstanding.discard(peerid)
700         self._bad_peers.add(peerid)
701         self._servermap.problems.append(f)
702         self._servermap.unreachable_peers.add(peerid) # TODO: overkill?
703         self._queries_completed += 1
704         self._last_failure = f
705
706     def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
707         now = time.time()
708         elapsed = now - started
709         self._status.add_per_server_time(peerid, "privkey", started, elapsed)
710         self._queries_outstanding.discard(peerid)
711         if not self._need_privkey:
712             return
713         if shnum not in datavs:
714             self.log("privkey wasn't there when we asked it", level=log.WEIRD)
715             return
716         datav = datavs[shnum]
717         enc_privkey = datav[0]
718         self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
719
720     def _privkey_query_failed(self, f, peerid, shnum, lp):
721         self._queries_outstanding.discard(peerid)
722         self.log(format="error during privkey query: %(f_value)s",
723                  f_value=str(f.value), failure=f,
724                  parent=lp, level=log.WEIRD)
725         if not self._running:
726             return
727         self._queries_outstanding.discard(peerid)
728         self._servermap.problems.append(f)
729         self._last_failure = f
730
731     def _check_for_done(self, res):
732         # exit paths:
733         #  return self._send_more_queries(outstanding) : send some more queries
734         #  return self._done() : all done
735         #  return : keep waiting, no new queries
736
737         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
738                               "%(outstanding)d queries outstanding, "
739                               "%(extra)d extra peers available, "
740                               "%(must)d 'must query' peers left, "
741                               "need_privkey=%(need_privkey)s"
742                               ),
743                       mode=self.mode,
744                       outstanding=len(self._queries_outstanding),
745                       extra=len(self.extra_peers),
746                       must=len(self._must_query),
747                       need_privkey=self._need_privkey,
748                       level=log.NOISY,
749                       )
750
751         if not self._running:
752             self.log("but we're not running", parent=lp, level=log.NOISY)
753             return
754
755         if self._must_query:
756             # we are still waiting for responses from peers that used to have
757             # a share, so we must continue to wait. No additional queries are
758             # required at this time.
759             self.log("%d 'must query' peers left" % len(self._must_query),
760                      level=log.NOISY, parent=lp)
761             return
762
763         if (not self._queries_outstanding and not self.extra_peers):
764             # all queries have retired, and we have no peers left to ask. No
765             # more progress can be made, therefore we are done.
766             self.log("all queries are retired, no extra peers: done",
767                      parent=lp)
768             return self._done()
769
770         recoverable_versions = self._servermap.recoverable_versions()
771         unrecoverable_versions = self._servermap.unrecoverable_versions()
772
773         # what is our completion policy? how hard should we work?
774
775         if self.mode == MODE_ANYTHING:
776             if recoverable_versions:
777                 self.log("%d recoverable versions: done"
778                          % len(recoverable_versions),
779                          parent=lp)
780                 return self._done()
781
782         if self.mode == MODE_CHECK:
783             # we used self._must_query, and we know there aren't any
784             # responses still waiting, so that means we must be done
785             self.log("done", parent=lp)
786             return self._done()
787
788         MAX_IN_FLIGHT = 5
789         if self.mode == MODE_READ:
790             # if we've queried k+epsilon servers, and we see a recoverable
791             # version, and we haven't seen any unrecoverable higher-seqnum'ed
792             # versions, then we're done.
793
794             if self._queries_completed < self.num_peers_to_query:
795                 self.log(format="%(completed)d completed, %(query)d to query: need more",
796                          completed=self._queries_completed,
797                          query=self.num_peers_to_query,
798                          level=log.NOISY, parent=lp)
799                 return self._send_more_queries(MAX_IN_FLIGHT)
800             if not recoverable_versions:
801                 self.log("no recoverable versions: need more",
802                          level=log.NOISY, parent=lp)
803                 return self._send_more_queries(MAX_IN_FLIGHT)
804             highest_recoverable = max(recoverable_versions)
805             highest_recoverable_seqnum = highest_recoverable[0]
806             for unrec_verinfo in unrecoverable_versions:
807                 if unrec_verinfo[0] > highest_recoverable_seqnum:
808                     # there is evidence of a higher-seqnum version, but we
809                     # don't yet see enough shares to recover it. Try harder.
810                     # TODO: consider sending more queries.
811                     # TODO: consider limiting the search distance
812                     self.log("evidence of higher seqnum: need more",
813                              level=log.UNUSUAL, parent=lp)
814                     return self._send_more_queries(MAX_IN_FLIGHT)
815             # all the unrecoverable versions were old or concurrent with a
816             # recoverable version. Good enough.
817             self.log("no higher-seqnum: done", parent=lp)
818             return self._done()
819
820         if self.mode == MODE_WRITE:
821             # we want to keep querying until we've seen a few that don't have
822             # any shares, to be sufficiently confident that we've seen all
823             # the shares. This is still less work than MODE_CHECK, which asks
824             # every server in the world.
825
826             if not recoverable_versions:
827                 self.log("no recoverable versions: need more", parent=lp,
828                          level=log.NOISY)
829                 return self._send_more_queries(MAX_IN_FLIGHT)
830
831             last_found = -1
832             last_not_responded = -1
833             num_not_responded = 0
834             num_not_found = 0
835             states = []
836             found_boundary = False
837
838             for i,(peerid,ss) in enumerate(self.full_peerlist):
839                 if peerid in self._bad_peers:
840                     # query failed
841                     states.append("x")
842                     #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
843                 elif peerid in self._empty_peers:
844                     # no shares
845                     states.append("0")
846                     #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
847                     if last_found != -1:
848                         num_not_found += 1
849                         if num_not_found >= self.EPSILON:
850                             self.log("found our boundary, %s" %
851                                      "".join(states),
852                                      parent=lp, level=log.NOISY)
853                             found_boundary = True
854                             break
855
856                 elif peerid in self._good_peers:
857                     # yes shares
858                     states.append("1")
859                     #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
860                     last_found = i
861                     num_not_found = 0
862                 else:
863                     # not responded yet
864                     states.append("?")
865                     #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
866                     last_not_responded = i
867                     num_not_responded += 1
868
869             if found_boundary:
870                 # we need to know that we've gotten answers from
871                 # everybody to the left of here
872                 if last_not_responded == -1:
873                     # we're done
874                     self.log("have all our answers",
875                              parent=lp, level=log.NOISY)
876                     # .. unless we're still waiting on the privkey
877                     if self._need_privkey:
878                         self.log("but we're still waiting for the privkey",
879                                  parent=lp, level=log.NOISY)
880                         # if we found the boundary but we haven't yet found
881                         # the privkey, we may need to look further. If
882                         # somehow all the privkeys were corrupted (but the
883                         # shares were readable), then this is likely to do an
884                         # exhaustive search.
885                         return self._send_more_queries(MAX_IN_FLIGHT)
886                     return self._done()
887                 # still waiting for somebody
888                 return self._send_more_queries(num_not_responded)
889
890             # if we hit here, we didn't find our boundary, so we're still
891             # waiting for peers
892             self.log("no boundary yet, %s" % "".join(states), parent=lp,
893                      level=log.NOISY)
894             return self._send_more_queries(MAX_IN_FLIGHT)
895
896         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
897         # arbitrary, really I want this to be something like k -
898         # max(known_version_sharecounts) + some extra
899         self.log("catchall: need more", parent=lp, level=log.NOISY)
900         return self._send_more_queries(MAX_IN_FLIGHT)
901
902     def _send_more_queries(self, num_outstanding):
903         more_queries = []
904
905         while True:
906             self.log(format=" there are %(outstanding)d queries outstanding",
907                      outstanding=len(self._queries_outstanding),
908                      level=log.NOISY)
909             active_queries = len(self._queries_outstanding) + len(more_queries)
910             if active_queries >= num_outstanding:
911                 break
912             if not self.extra_peers:
913                 break
914             more_queries.append(self.extra_peers.pop(0))
915
916         self.log(format="sending %(more)d more queries: %(who)s",
917                  more=len(more_queries),
918                  who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
919                                for (peerid,ss) in more_queries]),
920                  level=log.NOISY)
921
922         for (peerid, ss) in more_queries:
923             self._do_query(ss, peerid, self._storage_index, self._read_size)
924             # we'll retrigger when those queries come back
925
926     def _done(self):
927         if not self._running:
928             return
929         self._running = False
930         now = time.time()
931         elapsed = now - self._started
932         self._status.set_finished(now)
933         self._status.timings["total"] = elapsed
934         self._status.set_progress(1.0)
935         self._status.set_status("Done")
936         self._status.set_active(False)
937
938         self._servermap.last_update_mode = self.mode
939         self._servermap.last_update_time = self._started
940         # the servermap will not be touched after this
941         self.log("servermap: %s" % self._servermap.summarize_versions())
942         eventually(self._done_deferred.callback, self._servermap)
943
944     def _fatal_error(self, f):
945         self.log("fatal error", failure=f, level=log.WEIRD)
946         self._done_deferred.errback(f)
947
948