]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
999691fa897c8a1d9db58fb333077449458d72a9
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually
8 from allmydata.util import base32, hashutil, idlib, log
9 from allmydata.util.dictutil import DictOfSets
10 from allmydata.storage.server import si_b2a
11 from allmydata.interfaces import IServermapUpdaterStatus
12 from pycryptopp.publickey import rsa
13
14 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
15      CorruptShareError, NeedMoreDataError
16 from allmydata.mutable.layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
17      SIGNED_PREFIX_LENGTH
18
19 class UpdateStatus:
20     implements(IServermapUpdaterStatus)
21     statusid_counter = count(0)
22     def __init__(self):
23         self.timings = {}
24         self.timings["per_server"] = {}
25         self.timings["cumulative_verify"] = 0.0
26         self.privkey_from = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.mode = "?"
31         self.status = "Not started"
32         self.progress = 0.0
33         self.counter = self.statusid_counter.next()
34         self.started = time.time()
35         self.finished = None
36
37     def add_per_server_time(self, peerid, op, sent, elapsed):
38         assert op in ("query", "late", "privkey")
39         if peerid not in self.timings["per_server"]:
40             self.timings["per_server"][peerid] = []
41         self.timings["per_server"][peerid].append((op,sent,elapsed))
42
43     def get_started(self):
44         return self.started
45     def get_finished(self):
46         return self.finished
47     def get_storage_index(self):
48         return self.storage_index
49     def get_mode(self):
50         return self.mode
51     def get_servermap(self):
52         return self.servermap
53     def get_privkey_from(self):
54         return self.privkey_from
55     def using_helper(self):
56         return False
57     def get_size(self):
58         return "-NA-"
59     def get_status(self):
60         return self.status
61     def get_progress(self):
62         return self.progress
63     def get_active(self):
64         return self.active
65     def get_counter(self):
66         return self.counter
67
68     def set_storage_index(self, si):
69         self.storage_index = si
70     def set_mode(self, mode):
71         self.mode = mode
72     def set_privkey_from(self, peerid):
73         self.privkey_from = peerid
74     def set_status(self, status):
75         self.status = status
76     def set_progress(self, value):
77         self.progress = value
78     def set_active(self, value):
79         self.active = value
80     def set_finished(self, when):
81         self.finished = when
82
83 class ServerMap:
84     """I record the placement of mutable shares.
85
86     This object records which shares (of various versions) are located on
87     which servers.
88
89     One purpose I serve is to inform callers about which versions of the
90     mutable file are recoverable and 'current'.
91
92     A second purpose is to serve as a state marker for test-and-set
93     operations. I am passed out of retrieval operations and back into publish
94     operations, which means 'publish this new version, but only if nothing
95     has changed since I last retrieved this data'. This reduces the chances
96     of clobbering a simultaneous (uncoordinated) write.
97
98     @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
99                      (versionid, timestamp) tuple. Each 'versionid' is a
100                      tuple of (seqnum, root_hash, IV, segsize, datalength,
101                      k, N, signed_prefix, offsets)
102
103     @ivar connections: maps peerid to a RemoteReference
104
105     @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
106                       shares that I should ignore (because a previous user of
107                       the servermap determined that they were invalid). The
108                       updater only locates a certain number of shares: if
109                       some of these turn out to have integrity problems and
110                       are unusable, the caller will need to mark those shares
111                       as bad, then re-update the servermap, then try again.
112                       The dict maps (peerid, shnum) tuple to old checkstring.
113     """
114
115     def __init__(self):
116         self.servermap = {}
117         self.connections = {}
118         self.unreachable_peers = set() # peerids that didn't respond to queries
119         self.reachable_peers = set() # peerids that did respond to queries
120         self.problems = [] # mostly for debugging
121         self.bad_shares = {} # maps (peerid,shnum) to old checkstring
122         self.last_update_mode = None
123         self.last_update_time = 0
124
125     def copy(self):
126         s = ServerMap()
127         s.servermap = self.servermap.copy() # tuple->tuple
128         s.connections = self.connections.copy() # str->RemoteReference
129         s.unreachable_peers = set(self.unreachable_peers)
130         s.reachable_peers = set(self.reachable_peers)
131         s.problems = self.problems[:]
132         s.bad_shares = self.bad_shares.copy() # tuple->str
133         s.last_update_mode = self.last_update_mode
134         s.last_update_time = self.last_update_time
135         return s
136
137     def mark_bad_share(self, peerid, shnum, checkstring):
138         """This share was found to be bad, either in the checkstring or
139         signature (detected during mapupdate), or deeper in the share
140         (detected at retrieve time). Remove it from our list of useful
141         shares, and remember that it is bad so we don't add it back again
142         later. We record the share's old checkstring (which might be
143         corrupted or badly signed) so that a repair operation can do the
144         test-and-set using it as a reference.
145         """
146         key = (peerid, shnum) # record checkstring
147         self.bad_shares[key] = checkstring
148         self.servermap.pop(key, None)
149
150     def add_new_share(self, peerid, shnum, verinfo, timestamp):
151         """We've written a new share out, replacing any that was there
152         before."""
153         key = (peerid, shnum)
154         self.bad_shares.pop(key, None)
155         self.servermap[key] = (verinfo, timestamp)
156
157     def dump(self, out=sys.stdout):
158         print >>out, "servermap:"
159
160         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
161             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
162              offsets_tuple) = verinfo
163             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
164                           (idlib.shortnodeid_b2a(peerid), shnum,
165                            seqnum, base32.b2a(root_hash)[:4], k, N,
166                            datalength))
167         if self.problems:
168             print >>out, "%d PROBLEMS" % len(self.problems)
169             for f in self.problems:
170                 print >>out, str(f)
171         return out
172
173     def all_peers(self):
174         return set([peerid
175                     for (peerid, shnum)
176                     in self.servermap])
177
178     def all_peers_for_version(self, verinfo):
179         """Return a set of peerids that hold shares for the given version."""
180         return set([peerid
181                     for ( (peerid, shnum), (verinfo2, timestamp) )
182                     in self.servermap.items()
183                     if verinfo == verinfo2])
184
185     def make_sharemap(self):
186         """Return a dict that maps shnum to a set of peerds that hold it."""
187         sharemap = DictOfSets()
188         for (peerid, shnum) in self.servermap:
189             sharemap.add(shnum, peerid)
190         return sharemap
191
192     def make_versionmap(self):
193         """Return a dict that maps versionid to sets of (shnum, peerid,
194         timestamp) tuples."""
195         versionmap = DictOfSets()
196         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
197             versionmap.add(verinfo, (shnum, peerid, timestamp))
198         return versionmap
199
200     def shares_on_peer(self, peerid):
201         return set([shnum
202                     for (s_peerid, shnum)
203                     in self.servermap
204                     if s_peerid == peerid])
205
206     def version_on_peer(self, peerid, shnum):
207         key = (peerid, shnum)
208         if key in self.servermap:
209             (verinfo, timestamp) = self.servermap[key]
210             return verinfo
211         return None
212
213     def shares_available(self):
214         """Return a dict that maps verinfo to tuples of
215         (num_distinct_shares, k, N) tuples."""
216         versionmap = self.make_versionmap()
217         all_shares = {}
218         for verinfo, shares in versionmap.items():
219             s = set()
220             for (shnum, peerid, timestamp) in shares:
221                 s.add(shnum)
222             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
223              offsets_tuple) = verinfo
224             all_shares[verinfo] = (len(s), k, N)
225         return all_shares
226
227     def highest_seqnum(self):
228         available = self.shares_available()
229         seqnums = [verinfo[0]
230                    for verinfo in available.keys()]
231         seqnums.append(0)
232         return max(seqnums)
233
234     def summarize_version(self, verinfo):
235         """Take a versionid, return a string that describes it."""
236         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
237          offsets_tuple) = verinfo
238         return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
239
240     def summarize_versions(self):
241         """Return a string describing which versions we know about."""
242         versionmap = self.make_versionmap()
243         bits = []
244         for (verinfo, shares) in versionmap.items():
245             vstr = self.summarize_version(verinfo)
246             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
247             bits.append("%d*%s" % (len(shnums), vstr))
248         return "/".join(bits)
249
250     def recoverable_versions(self):
251         """Return a set of versionids, one for each version that is currently
252         recoverable."""
253         versionmap = self.make_versionmap()
254
255         recoverable_versions = set()
256         for (verinfo, shares) in versionmap.items():
257             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
258              offsets_tuple) = verinfo
259             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
260             if len(shnums) >= k:
261                 # this one is recoverable
262                 recoverable_versions.add(verinfo)
263
264         return recoverable_versions
265
266     def unrecoverable_versions(self):
267         """Return a set of versionids, one for each version that is currently
268         unrecoverable."""
269         versionmap = self.make_versionmap()
270
271         unrecoverable_versions = set()
272         for (verinfo, shares) in versionmap.items():
273             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
274              offsets_tuple) = verinfo
275             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
276             if len(shnums) < k:
277                 unrecoverable_versions.add(verinfo)
278
279         return unrecoverable_versions
280
281     def best_recoverable_version(self):
282         """Return a single versionid, for the so-called 'best' recoverable
283         version. Sequence number is the primary sort criteria, followed by
284         root hash. Returns None if there are no recoverable versions."""
285         recoverable = list(self.recoverable_versions())
286         recoverable.sort()
287         if recoverable:
288             return recoverable[-1]
289         return None
290
291     def size_of_version(self, verinfo):
292         """Given a versionid (perhaps returned by best_recoverable_version),
293         return the size of the file in bytes."""
294         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
295          offsets_tuple) = verinfo
296         return datalength
297
298     def unrecoverable_newer_versions(self):
299         # Return a dict of versionid -> health, for versions that are
300         # unrecoverable and have later seqnums than any recoverable versions.
301         # These indicate that a write will lose data.
302         versionmap = self.make_versionmap()
303         healths = {} # maps verinfo to (found,k)
304         unrecoverable = set()
305         highest_recoverable_seqnum = -1
306         for (verinfo, shares) in versionmap.items():
307             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
308              offsets_tuple) = verinfo
309             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
310             healths[verinfo] = (len(shnums),k)
311             if len(shnums) < k:
312                 unrecoverable.add(verinfo)
313             else:
314                 highest_recoverable_seqnum = max(seqnum,
315                                                  highest_recoverable_seqnum)
316
317         newversions = {}
318         for verinfo in unrecoverable:
319             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
320              offsets_tuple) = verinfo
321             if seqnum > highest_recoverable_seqnum:
322                 newversions[verinfo] = healths[verinfo]
323
324         return newversions
325
326
327     def needs_merge(self):
328         # return True if there are multiple recoverable versions with the
329         # same seqnum, meaning that MutableFileNode.read_best_version is not
330         # giving you the whole story, and that using its data to do a
331         # subsequent publish will lose information.
332         recoverable_seqnums = [verinfo[0]
333                                for verinfo in self.recoverable_versions()]
334         for seqnum in recoverable_seqnums:
335             if recoverable_seqnums.count(seqnum) > 1:
336                 return True
337         return False
338
339
340 class ServermapUpdater:
341     def __init__(self, filenode, storage_broker, monitor, servermap,
342                  mode=MODE_READ, add_lease=False):
343         """I update a servermap, locating a sufficient number of useful
344         shares and remembering where they are located.
345
346         """
347
348         self._node = filenode
349         self._storage_broker = storage_broker
350         self._monitor = monitor
351         self._servermap = servermap
352         self.mode = mode
353         self._add_lease = add_lease
354         self._running = True
355
356         self._storage_index = filenode.get_storage_index()
357         self._last_failure = None
358
359         self._status = UpdateStatus()
360         self._status.set_storage_index(self._storage_index)
361         self._status.set_progress(0.0)
362         self._status.set_mode(mode)
363
364         self._servers_responded = set()
365
366         # how much data should we read?
367         #  * if we only need the checkstring, then [0:75]
368         #  * if we need to validate the checkstring sig, then [543ish:799ish]
369         #  * if we need the verification key, then [107:436ish]
370         #   * the offset table at [75:107] tells us about the 'ish'
371         #  * if we need the encrypted private key, we want [-1216ish:]
372         #   * but we can't read from negative offsets
373         #   * the offset table tells us the 'ish', also the positive offset
374         # A future version of the SMDF slot format should consider using
375         # fixed-size slots so we can retrieve less data. For now, we'll just
376         # read 4000 bytes, which also happens to read enough actual data to
377         # pre-fetch an 18-entry dirnode.
378         self._read_size = 4000
379         if mode == MODE_CHECK:
380             # we use unpack_prefix_and_signature, so we need 1k
381             self._read_size = 1000
382         self._need_privkey = False
383         if mode == MODE_WRITE and not self._node.get_privkey():
384             self._need_privkey = True
385         # check+repair: repair requires the privkey, so if we didn't happen
386         # to ask for it during the check, we'll have problems doing the
387         # publish.
388
389         prefix = si_b2a(self._storage_index)[:5]
390         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
391                                    si=prefix, mode=mode)
392
393     def get_status(self):
394         return self._status
395
396     def log(self, *args, **kwargs):
397         if "parent" not in kwargs:
398             kwargs["parent"] = self._log_number
399         if "facility" not in kwargs:
400             kwargs["facility"] = "tahoe.mutable.mapupdate"
401         return log.msg(*args, **kwargs)
402
403     def update(self):
404         """Update the servermap to reflect current conditions. Returns a
405         Deferred that fires with the servermap once the update has finished."""
406         self._started = time.time()
407         self._status.set_active(True)
408
409         # self._valid_versions is a set of validated verinfo tuples. We just
410         # use it to remember which versions had valid signatures, so we can
411         # avoid re-checking the signatures for each share.
412         self._valid_versions = set()
413
414         # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
415         # timestamp) tuples. This is used to figure out which versions might
416         # be retrievable, and to make the eventual data download faster.
417         self.versionmap = DictOfSets()
418
419         self._done_deferred = defer.Deferred()
420
421         # first, which peers should be talk to? Any that were in our old
422         # servermap, plus "enough" others.
423
424         self._queries_completed = 0
425
426         sb = self._storage_broker
427         full_peerlist = sb.get_servers_for_index(self._storage_index)
428         self.full_peerlist = full_peerlist # for use later, immutable
429         self.extra_peers = full_peerlist[:] # peers are removed as we use them
430         self._good_peers = set() # peers who had some shares
431         self._empty_peers = set() # peers who don't have any shares
432         self._bad_peers = set() # peers to whom our queries failed
433
434         k = self._node.get_required_shares()
435         if k is None:
436             # make a guess
437             k = 3
438         N = self._node.get_total_shares()
439         if N is None:
440             N = 10
441         self.EPSILON = k
442         # we want to send queries to at least this many peers (although we
443         # might not wait for all of their answers to come back)
444         self.num_peers_to_query = k + self.EPSILON
445
446         if self.mode == MODE_CHECK:
447             initial_peers_to_query = dict(full_peerlist)
448             must_query = set(initial_peers_to_query.keys())
449             self.extra_peers = []
450         elif self.mode == MODE_WRITE:
451             # we're planning to replace all the shares, so we want a good
452             # chance of finding them all. We will keep searching until we've
453             # seen epsilon that don't have a share.
454             self.num_peers_to_query = N + self.EPSILON
455             initial_peers_to_query, must_query = self._build_initial_querylist()
456             self.required_num_empty_peers = self.EPSILON
457
458             # TODO: arrange to read lots of data from k-ish servers, to avoid
459             # the extra round trip required to read large directories. This
460             # might also avoid the round trip required to read the encrypted
461             # private key.
462
463         else:
464             initial_peers_to_query, must_query = self._build_initial_querylist()
465
466         # this is a set of peers that we are required to get responses from:
467         # they are peers who used to have a share, so we need to know where
468         # they currently stand, even if that means we have to wait for a
469         # silently-lost TCP connection to time out. We remove peers from this
470         # set as we get responses.
471         self._must_query = must_query
472
473         # now initial_peers_to_query contains the peers that we should ask,
474         # self.must_query contains the peers that we must have heard from
475         # before we can consider ourselves finished, and self.extra_peers
476         # contains the overflow (peers that we should tap if we don't get
477         # enough responses)
478
479         self._send_initial_requests(initial_peers_to_query)
480         self._status.timings["initial_queries"] = time.time() - self._started
481         return self._done_deferred
482
483     def _build_initial_querylist(self):
484         initial_peers_to_query = {}
485         must_query = set()
486         for peerid in self._servermap.all_peers():
487             ss = self._servermap.connections[peerid]
488             # we send queries to everyone who was already in the sharemap
489             initial_peers_to_query[peerid] = ss
490             # and we must wait for responses from them
491             must_query.add(peerid)
492
493         while ((self.num_peers_to_query > len(initial_peers_to_query))
494                and self.extra_peers):
495             (peerid, ss) = self.extra_peers.pop(0)
496             initial_peers_to_query[peerid] = ss
497
498         return initial_peers_to_query, must_query
499
500     def _send_initial_requests(self, peerlist):
501         self._status.set_status("Sending %d initial queries" % len(peerlist))
502         self._queries_outstanding = set()
503         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
504         for (peerid, ss) in peerlist.items():
505             self._queries_outstanding.add(peerid)
506             self._do_query(ss, peerid, self._storage_index, self._read_size)
507
508         if not peerlist:
509             # there is nobody to ask, so we need to short-circuit the state
510             # machine.
511             d = defer.maybeDeferred(self._check_for_done, None)
512             d.addErrback(self._fatal_error)
513
514         # control flow beyond this point: state machine. Receiving responses
515         # from queries is the input. We might send out more queries, or we
516         # might produce a result.
517         return None
518
519     def _do_query(self, ss, peerid, storage_index, readsize):
520         self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
521                  peerid=idlib.shortnodeid_b2a(peerid),
522                  readsize=readsize,
523                  level=log.NOISY)
524         self._servermap.connections[peerid] = ss
525         started = time.time()
526         self._queries_outstanding.add(peerid)
527         d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
528         d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
529                       started)
530         d.addErrback(self._query_failed, peerid)
531         # errors that aren't handled by _query_failed (and errors caused by
532         # _query_failed) get logged, but we still want to check for doneness.
533         d.addErrback(log.err)
534         d.addBoth(self._check_for_done)
535         d.addErrback(self._fatal_error)
536         return d
537
538     def _do_read(self, ss, peerid, storage_index, shnums, readv):
539         if self._add_lease:
540             # send an add-lease message in parallel. The results are handled
541             # separately. This is sent before the slot_readv() so that we can
542             # be sure the add_lease is retired by the time slot_readv comes
543             # back (this relies upon our knowledge that the server code for
544             # add_lease is synchronous).
545             renew_secret = self._node.get_renewal_secret(peerid)
546             cancel_secret = self._node.get_cancel_secret(peerid)
547             d2 = ss.callRemote("add_lease", storage_index,
548                                renew_secret, cancel_secret)
549             # we ignore success
550             d2.addErrback(self._add_lease_failed, peerid, storage_index)
551         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
552         return d
553
554     def _got_results(self, datavs, peerid, readsize, stuff, started):
555         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
556                       peerid=idlib.shortnodeid_b2a(peerid),
557                       numshares=len(datavs),
558                       level=log.NOISY)
559         now = time.time()
560         elapsed = now - started
561         self._queries_outstanding.discard(peerid)
562         self._servermap.reachable_peers.add(peerid)
563         self._must_query.discard(peerid)
564         self._queries_completed += 1
565         if not self._running:
566             self.log("but we're not running, so we'll ignore it", parent=lp,
567                      level=log.NOISY)
568             self._status.add_per_server_time(peerid, "late", started, elapsed)
569             return
570         self._status.add_per_server_time(peerid, "query", started, elapsed)
571
572         if datavs:
573             self._good_peers.add(peerid)
574         else:
575             self._empty_peers.add(peerid)
576
577         last_verinfo = None
578         last_shnum = None
579         for shnum,datav in datavs.items():
580             data = datav[0]
581             try:
582                 verinfo = self._got_results_one_share(shnum, data, peerid, lp)
583                 last_verinfo = verinfo
584                 last_shnum = shnum
585                 self._node._add_to_cache(verinfo, shnum, 0, data)
586             except CorruptShareError, e:
587                 # log it and give the other shares a chance to be processed
588                 f = failure.Failure()
589                 self.log(format="bad share: %(f_value)s", f_value=str(f.value),
590                          failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
591                 self.notify_server_corruption(peerid, shnum, str(e))
592                 self._bad_peers.add(peerid)
593                 self._last_failure = f
594                 checkstring = data[:SIGNED_PREFIX_LENGTH]
595                 self._servermap.mark_bad_share(peerid, shnum, checkstring)
596                 self._servermap.problems.append(f)
597                 pass
598
599         self._status.timings["cumulative_verify"] += (time.time() - now)
600
601         if self._need_privkey and last_verinfo:
602             # send them a request for the privkey. We send one request per
603             # server.
604             lp2 = self.log("sending privkey request",
605                            parent=lp, level=log.NOISY)
606             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
607              offsets_tuple) = last_verinfo
608             o = dict(offsets_tuple)
609
610             self._queries_outstanding.add(peerid)
611             readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
612             ss = self._servermap.connections[peerid]
613             privkey_started = time.time()
614             d = self._do_read(ss, peerid, self._storage_index,
615                               [last_shnum], readv)
616             d.addCallback(self._got_privkey_results, peerid, last_shnum,
617                           privkey_started, lp2)
618             d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
619             d.addErrback(log.err)
620             d.addCallback(self._check_for_done)
621             d.addErrback(self._fatal_error)
622
623         # all done!
624         self.log("_got_results done", parent=lp, level=log.NOISY)
625
626     def notify_server_corruption(self, peerid, shnum, reason):
627         ss = self._servermap.connections[peerid]
628         ss.callRemoteOnly("advise_corrupt_share",
629                           "mutable", self._storage_index, shnum, reason)
630
631     def _got_results_one_share(self, shnum, data, peerid, lp):
632         self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
633                  shnum=shnum,
634                  peerid=idlib.shortnodeid_b2a(peerid),
635                  level=log.NOISY,
636                  parent=lp)
637
638         # this might raise NeedMoreDataError, if the pubkey and signature
639         # live at some weird offset. That shouldn't happen, so I'm going to
640         # treat it as a bad share.
641         (seqnum, root_hash, IV, k, N, segsize, datalength,
642          pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
643
644         if not self._node.get_pubkey():
645             fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
646             assert len(fingerprint) == 32
647             if fingerprint != self._node.get_fingerprint():
648                 raise CorruptShareError(peerid, shnum,
649                                         "pubkey doesn't match fingerprint")
650             self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
651
652         if self._need_privkey:
653             self._try_to_extract_privkey(data, peerid, shnum, lp)
654
655         (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
656          ig_segsize, ig_datalen, offsets) = unpack_header(data)
657         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
658
659         verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
660                    offsets_tuple)
661
662         if verinfo not in self._valid_versions:
663             # it's a new pair. Verify the signature.
664             valid = self._node.get_pubkey().verify(prefix, signature)
665             if not valid:
666                 raise CorruptShareError(peerid, shnum, "signature is invalid")
667
668             # ok, it's a valid verinfo. Add it to the list of validated
669             # versions.
670             self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
671                      % (seqnum, base32.b2a(root_hash)[:4],
672                         idlib.shortnodeid_b2a(peerid), shnum,
673                         k, N, segsize, datalength),
674                      parent=lp)
675             self._valid_versions.add(verinfo)
676         # We now know that this is a valid candidate verinfo.
677
678         if (peerid, shnum) in self._servermap.bad_shares:
679             # we've been told that the rest of the data in this share is
680             # unusable, so don't add it to the servermap.
681             self.log("but we've been told this is a bad share",
682                      parent=lp, level=log.UNUSUAL)
683             return verinfo
684
685         # Add the info to our servermap.
686         timestamp = time.time()
687         self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
688         # and the versionmap
689         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
690         return verinfo
691
692     def _deserialize_pubkey(self, pubkey_s):
693         verifier = rsa.create_verifying_key_from_string(pubkey_s)
694         return verifier
695
696     def _try_to_extract_privkey(self, data, peerid, shnum, lp):
697         try:
698             r = unpack_share(data)
699         except NeedMoreDataError, e:
700             # this share won't help us. oh well.
701             offset = e.encprivkey_offset
702             length = e.encprivkey_length
703             self.log("shnum %d on peerid %s: share was too short (%dB) "
704                      "to get the encprivkey; [%d:%d] ought to hold it" %
705                      (shnum, idlib.shortnodeid_b2a(peerid), len(data),
706                       offset, offset+length),
707                      parent=lp)
708             # NOTE: if uncoordinated writes are taking place, someone might
709             # change the share (and most probably move the encprivkey) before
710             # we get a chance to do one of these reads and fetch it. This
711             # will cause us to see a NotEnoughSharesError(unable to fetch
712             # privkey) instead of an UncoordinatedWriteError . This is a
713             # nuisance, but it will go away when we move to DSA-based mutable
714             # files (since the privkey will be small enough to fit in the
715             # write cap).
716
717             return
718
719         (seqnum, root_hash, IV, k, N, segsize, datalen,
720          pubkey, signature, share_hash_chain, block_hash_tree,
721          share_data, enc_privkey) = r
722
723         return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
724
725     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
726
727         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
728         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
729         if alleged_writekey != self._node.get_writekey():
730             self.log("invalid privkey from %s shnum %d" %
731                      (idlib.nodeid_b2a(peerid)[:8], shnum),
732                      parent=lp, level=log.WEIRD, umid="aJVccw")
733             return
734
735         # it's good
736         self.log("got valid privkey from shnum %d on peerid %s" %
737                  (shnum, idlib.shortnodeid_b2a(peerid)),
738                  parent=lp)
739         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
740         self._node._populate_encprivkey(enc_privkey)
741         self._node._populate_privkey(privkey)
742         self._need_privkey = False
743         self._status.set_privkey_from(peerid)
744
745
746     def _add_lease_failed(self, f, peerid, storage_index):
747         # Older versions of Tahoe didn't handle the add-lease message very
748         # well: <=1.1.0 throws a NameError because it doesn't implement
749         # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
750         # (which is most of them, since we send add-lease to everybody,
751         # before we know whether or not they have any shares for us), and
752         # 1.2.0 throws KeyError even on known buckets due to an internal bug
753         # in the latency-measuring code.
754
755         # we want to ignore the known-harmless errors and log the others. In
756         # particular we want to log any local errors caused by coding
757         # problems.
758
759         if f.check(DeadReferenceError):
760             return
761         if f.check(RemoteException):
762             if f.value.failure.check(KeyError, IndexError, NameError):
763                 # this may ignore a bit too much, but that only hurts us
764                 # during debugging
765                 return
766             self.log(format="error in add_lease from [%(peerid)s]: %(f_value)s",
767                      peerid=idlib.shortnodeid_b2a(peerid),
768                      f_value=str(f.value),
769                      failure=f,
770                      level=log.WEIRD, umid="iqg3mw")
771             return
772         # local errors are cause for alarm
773         log.err(f,
774                 format="local error in add_lease to [%(peerid)s]: %(f_value)s",
775                 peerid=idlib.shortnodeid_b2a(peerid),
776                 f_value=str(f.value),
777                 level=log.WEIRD, umid="ZWh6HA")
778
779     def _query_failed(self, f, peerid):
780         if not self._running:
781             return
782         level = log.WEIRD
783         if f.check(DeadReferenceError):
784             level = log.UNUSUAL
785         self.log(format="error during query: %(f_value)s",
786                  f_value=str(f.value), failure=f,
787                  level=level, umid="IHXuQg")
788         self._must_query.discard(peerid)
789         self._queries_outstanding.discard(peerid)
790         self._bad_peers.add(peerid)
791         self._servermap.problems.append(f)
792         # a peerid could be in both ServerMap.reachable_peers and
793         # .unreachable_peers if they responded to our query, but then an
794         # exception was raised in _got_results.
795         self._servermap.unreachable_peers.add(peerid)
796         self._queries_completed += 1
797         self._last_failure = f
798
799     def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
800         now = time.time()
801         elapsed = now - started
802         self._status.add_per_server_time(peerid, "privkey", started, elapsed)
803         self._queries_outstanding.discard(peerid)
804         if not self._need_privkey:
805             return
806         if shnum not in datavs:
807             self.log("privkey wasn't there when we asked it",
808                      level=log.WEIRD, umid="VA9uDQ")
809             return
810         datav = datavs[shnum]
811         enc_privkey = datav[0]
812         self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
813
814     def _privkey_query_failed(self, f, peerid, shnum, lp):
815         self._queries_outstanding.discard(peerid)
816         if not self._running:
817             return
818         level = log.WEIRD
819         if f.check(DeadReferenceError):
820             level = log.UNUSUAL
821         self.log(format="error during privkey query: %(f_value)s",
822                  f_value=str(f.value), failure=f,
823                  parent=lp, level=level, umid="McoJ5w")
824         self._servermap.problems.append(f)
825         self._last_failure = f
826
827     def _check_for_done(self, res):
828         # exit paths:
829         #  return self._send_more_queries(outstanding) : send some more queries
830         #  return self._done() : all done
831         #  return : keep waiting, no new queries
832
833         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
834                               "%(outstanding)d queries outstanding, "
835                               "%(extra)d extra peers available, "
836                               "%(must)d 'must query' peers left, "
837                               "need_privkey=%(need_privkey)s"
838                               ),
839                       mode=self.mode,
840                       outstanding=len(self._queries_outstanding),
841                       extra=len(self.extra_peers),
842                       must=len(self._must_query),
843                       need_privkey=self._need_privkey,
844                       level=log.NOISY,
845                       )
846
847         if not self._running:
848             self.log("but we're not running", parent=lp, level=log.NOISY)
849             return
850
851         if self._must_query:
852             # we are still waiting for responses from peers that used to have
853             # a share, so we must continue to wait. No additional queries are
854             # required at this time.
855             self.log("%d 'must query' peers left" % len(self._must_query),
856                      level=log.NOISY, parent=lp)
857             return
858
859         if (not self._queries_outstanding and not self.extra_peers):
860             # all queries have retired, and we have no peers left to ask. No
861             # more progress can be made, therefore we are done.
862             self.log("all queries are retired, no extra peers: done",
863                      parent=lp)
864             return self._done()
865
866         recoverable_versions = self._servermap.recoverable_versions()
867         unrecoverable_versions = self._servermap.unrecoverable_versions()
868
869         # what is our completion policy? how hard should we work?
870
871         if self.mode == MODE_ANYTHING:
872             if recoverable_versions:
873                 self.log("%d recoverable versions: done"
874                          % len(recoverable_versions),
875                          parent=lp)
876                 return self._done()
877
878         if self.mode == MODE_CHECK:
879             # we used self._must_query, and we know there aren't any
880             # responses still waiting, so that means we must be done
881             self.log("done", parent=lp)
882             return self._done()
883
884         MAX_IN_FLIGHT = 5
885         if self.mode == MODE_READ:
886             # if we've queried k+epsilon servers, and we see a recoverable
887             # version, and we haven't seen any unrecoverable higher-seqnum'ed
888             # versions, then we're done.
889
890             if self._queries_completed < self.num_peers_to_query:
891                 self.log(format="%(completed)d completed, %(query)d to query: need more",
892                          completed=self._queries_completed,
893                          query=self.num_peers_to_query,
894                          level=log.NOISY, parent=lp)
895                 return self._send_more_queries(MAX_IN_FLIGHT)
896             if not recoverable_versions:
897                 self.log("no recoverable versions: need more",
898                          level=log.NOISY, parent=lp)
899                 return self._send_more_queries(MAX_IN_FLIGHT)
900             highest_recoverable = max(recoverable_versions)
901             highest_recoverable_seqnum = highest_recoverable[0]
902             for unrec_verinfo in unrecoverable_versions:
903                 if unrec_verinfo[0] > highest_recoverable_seqnum:
904                     # there is evidence of a higher-seqnum version, but we
905                     # don't yet see enough shares to recover it. Try harder.
906                     # TODO: consider sending more queries.
907                     # TODO: consider limiting the search distance
908                     self.log("evidence of higher seqnum: need more",
909                              level=log.UNUSUAL, parent=lp)
910                     return self._send_more_queries(MAX_IN_FLIGHT)
911             # all the unrecoverable versions were old or concurrent with a
912             # recoverable version. Good enough.
913             self.log("no higher-seqnum: done", parent=lp)
914             return self._done()
915
916         if self.mode == MODE_WRITE:
917             # we want to keep querying until we've seen a few that don't have
918             # any shares, to be sufficiently confident that we've seen all
919             # the shares. This is still less work than MODE_CHECK, which asks
920             # every server in the world.
921
922             if not recoverable_versions:
923                 self.log("no recoverable versions: need more", parent=lp,
924                          level=log.NOISY)
925                 return self._send_more_queries(MAX_IN_FLIGHT)
926
927             last_found = -1
928             last_not_responded = -1
929             num_not_responded = 0
930             num_not_found = 0
931             states = []
932             found_boundary = False
933
934             for i,(peerid,ss) in enumerate(self.full_peerlist):
935                 if peerid in self._bad_peers:
936                     # query failed
937                     states.append("x")
938                     #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
939                 elif peerid in self._empty_peers:
940                     # no shares
941                     states.append("0")
942                     #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
943                     if last_found != -1:
944                         num_not_found += 1
945                         if num_not_found >= self.EPSILON:
946                             self.log("found our boundary, %s" %
947                                      "".join(states),
948                                      parent=lp, level=log.NOISY)
949                             found_boundary = True
950                             break
951
952                 elif peerid in self._good_peers:
953                     # yes shares
954                     states.append("1")
955                     #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
956                     last_found = i
957                     num_not_found = 0
958                 else:
959                     # not responded yet
960                     states.append("?")
961                     #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
962                     last_not_responded = i
963                     num_not_responded += 1
964
965             if found_boundary:
966                 # we need to know that we've gotten answers from
967                 # everybody to the left of here
968                 if last_not_responded == -1:
969                     # we're done
970                     self.log("have all our answers",
971                              parent=lp, level=log.NOISY)
972                     # .. unless we're still waiting on the privkey
973                     if self._need_privkey:
974                         self.log("but we're still waiting for the privkey",
975                                  parent=lp, level=log.NOISY)
976                         # if we found the boundary but we haven't yet found
977                         # the privkey, we may need to look further. If
978                         # somehow all the privkeys were corrupted (but the
979                         # shares were readable), then this is likely to do an
980                         # exhaustive search.
981                         return self._send_more_queries(MAX_IN_FLIGHT)
982                     return self._done()
983                 # still waiting for somebody
984                 return self._send_more_queries(num_not_responded)
985
986             # if we hit here, we didn't find our boundary, so we're still
987             # waiting for peers
988             self.log("no boundary yet, %s" % "".join(states), parent=lp,
989                      level=log.NOISY)
990             return self._send_more_queries(MAX_IN_FLIGHT)
991
992         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
993         # arbitrary, really I want this to be something like k -
994         # max(known_version_sharecounts) + some extra
995         self.log("catchall: need more", parent=lp, level=log.NOISY)
996         return self._send_more_queries(MAX_IN_FLIGHT)
997
998     def _send_more_queries(self, num_outstanding):
999         more_queries = []
1000
1001         while True:
1002             self.log(format=" there are %(outstanding)d queries outstanding",
1003                      outstanding=len(self._queries_outstanding),
1004                      level=log.NOISY)
1005             active_queries = len(self._queries_outstanding) + len(more_queries)
1006             if active_queries >= num_outstanding:
1007                 break
1008             if not self.extra_peers:
1009                 break
1010             more_queries.append(self.extra_peers.pop(0))
1011
1012         self.log(format="sending %(more)d more queries: %(who)s",
1013                  more=len(more_queries),
1014                  who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
1015                                for (peerid,ss) in more_queries]),
1016                  level=log.NOISY)
1017
1018         for (peerid, ss) in more_queries:
1019             self._do_query(ss, peerid, self._storage_index, self._read_size)
1020             # we'll retrigger when those queries come back
1021
1022     def _done(self):
1023         if not self._running:
1024             return
1025         self._running = False
1026         now = time.time()
1027         elapsed = now - self._started
1028         self._status.set_finished(now)
1029         self._status.timings["total"] = elapsed
1030         self._status.set_progress(1.0)
1031         self._status.set_status("Finished")
1032         self._status.set_active(False)
1033
1034         self._servermap.last_update_mode = self.mode
1035         self._servermap.last_update_time = self._started
1036         # the servermap will not be touched after this
1037         self.log("servermap: %s" % self._servermap.summarize_versions())
1038         eventually(self._done_deferred.callback, self._servermap)
1039
1040     def _fatal_error(self, f):
1041         self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1042         self._done_deferred.errback(f)
1043
1044