]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/servermap.py
add --add-lease to 'tahoe check', 'tahoe deep-check', and webapi.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / servermap.py
1
2 import sys, time
3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap import DeadReferenceError
8 from foolscap.eventual import eventually
9 from allmydata.util import base32, hashutil, idlib, log
10 from allmydata import storage
11 from allmydata.interfaces import IServermapUpdaterStatus
12 from pycryptopp.publickey import rsa
13
14 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
15      DictOfSets, CorruptShareError, NeedMoreDataError
16 from layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
17      SIGNED_PREFIX_LENGTH
18
19 class UpdateStatus:
20     implements(IServermapUpdaterStatus)
21     statusid_counter = count(0)
22     def __init__(self):
23         self.timings = {}
24         self.timings["per_server"] = {}
25         self.timings["cumulative_verify"] = 0.0
26         self.privkey_from = None
27         self.problems = {}
28         self.active = True
29         self.storage_index = None
30         self.mode = "?"
31         self.status = "Not started"
32         self.progress = 0.0
33         self.counter = self.statusid_counter.next()
34         self.started = time.time()
35         self.finished = None
36
37     def add_per_server_time(self, peerid, op, sent, elapsed):
38         assert op in ("query", "late", "privkey")
39         if peerid not in self.timings["per_server"]:
40             self.timings["per_server"][peerid] = []
41         self.timings["per_server"][peerid].append((op,sent,elapsed))
42
43     def get_started(self):
44         return self.started
45     def get_finished(self):
46         return self.finished
47     def get_storage_index(self):
48         return self.storage_index
49     def get_mode(self):
50         return self.mode
51     def get_servermap(self):
52         return self.servermap
53     def get_privkey_from(self):
54         return self.privkey_from
55     def using_helper(self):
56         return False
57     def get_size(self):
58         return "-NA-"
59     def get_status(self):
60         return self.status
61     def get_progress(self):
62         return self.progress
63     def get_active(self):
64         return self.active
65     def get_counter(self):
66         return self.counter
67
68     def set_storage_index(self, si):
69         self.storage_index = si
70     def set_mode(self, mode):
71         self.mode = mode
72     def set_privkey_from(self, peerid):
73         self.privkey_from = peerid
74     def set_status(self, status):
75         self.status = status
76     def set_progress(self, value):
77         self.progress = value
78     def set_active(self, value):
79         self.active = value
80     def set_finished(self, when):
81         self.finished = when
82
83 class ServerMap:
84     """I record the placement of mutable shares.
85
86     This object records which shares (of various versions) are located on
87     which servers.
88
89     One purpose I serve is to inform callers about which versions of the
90     mutable file are recoverable and 'current'.
91
92     A second purpose is to serve as a state marker for test-and-set
93     operations. I am passed out of retrieval operations and back into publish
94     operations, which means 'publish this new version, but only if nothing
95     has changed since I last retrieved this data'. This reduces the chances
96     of clobbering a simultaneous (uncoordinated) write.
97
98     @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
99                      (versionid, timestamp) tuple. Each 'versionid' is a
100                      tuple of (seqnum, root_hash, IV, segsize, datalength,
101                      k, N, signed_prefix, offsets)
102
103     @ivar connections: maps peerid to a RemoteReference
104
105     @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
106                       shares that I should ignore (because a previous user of
107                       the servermap determined that they were invalid). The
108                       updater only locates a certain number of shares: if
109                       some of these turn out to have integrity problems and
110                       are unusable, the caller will need to mark those shares
111                       as bad, then re-update the servermap, then try again.
112                       The dict maps (peerid, shnum) tuple to old checkstring.
113     """
114
115     def __init__(self):
116         self.servermap = {}
117         self.connections = {}
118         self.unreachable_peers = set() # peerids that didn't respond to queries
119         self.reachable_peers = set() # peerids that did respond to queries
120         self.problems = [] # mostly for debugging
121         self.bad_shares = {} # maps (peerid,shnum) to old checkstring
122         self.last_update_mode = None
123         self.last_update_time = 0
124
125     def copy(self):
126         s = ServerMap()
127         s.servermap = self.servermap.copy() # tuple->tuple
128         s.connections = self.connections.copy() # str->RemoteReference
129         s.unreachable_peers = set(self.unreachable_peers)
130         s.reachable_peers = set(self.reachable_peers)
131         s.problems = self.problems[:]
132         s.bad_shares = self.bad_shares.copy() # tuple->str
133         s.last_update_mode = self.last_update_mode
134         s.last_update_time = self.last_update_time
135         return s
136
137     def mark_bad_share(self, peerid, shnum, checkstring):
138         """This share was found to be bad, either in the checkstring or
139         signature (detected during mapupdate), or deeper in the share
140         (detected at retrieve time). Remove it from our list of useful
141         shares, and remember that it is bad so we don't add it back again
142         later. We record the share's old checkstring (which might be
143         corrupted or badly signed) so that a repair operation can do the
144         test-and-set using it as a reference.
145         """
146         key = (peerid, shnum) # record checkstring
147         self.bad_shares[key] = checkstring
148         self.servermap.pop(key, None)
149
150     def add_new_share(self, peerid, shnum, verinfo, timestamp):
151         """We've written a new share out, replacing any that was there
152         before."""
153         key = (peerid, shnum)
154         self.bad_shares.pop(key, None)
155         self.servermap[key] = (verinfo, timestamp)
156
157     def dump(self, out=sys.stdout):
158         print >>out, "servermap:"
159
160         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
161             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
162              offsets_tuple) = verinfo
163             print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
164                           (idlib.shortnodeid_b2a(peerid), shnum,
165                            seqnum, base32.b2a(root_hash)[:4], k, N,
166                            datalength))
167         if self.problems:
168             print >>out, "%d PROBLEMS" % len(self.problems)
169             for f in self.problems:
170                 print >>out, str(f)
171         return out
172
173     def all_peers(self):
174         return set([peerid
175                     for (peerid, shnum)
176                     in self.servermap])
177
178     def all_peers_for_version(self, verinfo):
179         """Return a set of peerids that hold shares for the given version."""
180         return set([peerid
181                     for ( (peerid, shnum), (verinfo2, timestamp) )
182                     in self.servermap.items()
183                     if verinfo == verinfo2])
184
185     def make_sharemap(self):
186         """Return a dict that maps shnum to a set of peerds that hold it."""
187         sharemap = DictOfSets()
188         for (peerid, shnum) in self.servermap:
189             sharemap.add(shnum, peerid)
190         return sharemap
191
192     def make_versionmap(self):
193         """Return a dict that maps versionid to sets of (shnum, peerid,
194         timestamp) tuples."""
195         versionmap = DictOfSets()
196         for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
197             versionmap.add(verinfo, (shnum, peerid, timestamp))
198         return versionmap
199
200     def shares_on_peer(self, peerid):
201         return set([shnum
202                     for (s_peerid, shnum)
203                     in self.servermap
204                     if s_peerid == peerid])
205
206     def version_on_peer(self, peerid, shnum):
207         key = (peerid, shnum)
208         if key in self.servermap:
209             (verinfo, timestamp) = self.servermap[key]
210             return verinfo
211         return None
212
213     def shares_available(self):
214         """Return a dict that maps verinfo to tuples of
215         (num_distinct_shares, k, N) tuples."""
216         versionmap = self.make_versionmap()
217         all_shares = {}
218         for verinfo, shares in versionmap.items():
219             s = set()
220             for (shnum, peerid, timestamp) in shares:
221                 s.add(shnum)
222             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
223              offsets_tuple) = verinfo
224             all_shares[verinfo] = (len(s), k, N)
225         return all_shares
226
227     def highest_seqnum(self):
228         available = self.shares_available()
229         seqnums = [verinfo[0]
230                    for verinfo in available.keys()]
231         seqnums.append(0)
232         return max(seqnums)
233
234     def summarize_version(self, verinfo):
235         """Take a versionid, return a string that describes it."""
236         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
237          offsets_tuple) = verinfo
238         return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
239
240     def summarize_versions(self):
241         """Return a string describing which versions we know about."""
242         versionmap = self.make_versionmap()
243         bits = []
244         for (verinfo, shares) in versionmap.items():
245             vstr = self.summarize_version(verinfo)
246             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
247             bits.append("%d*%s" % (len(shnums), vstr))
248         return "/".join(bits)
249
250     def recoverable_versions(self):
251         """Return a set of versionids, one for each version that is currently
252         recoverable."""
253         versionmap = self.make_versionmap()
254
255         recoverable_versions = set()
256         for (verinfo, shares) in versionmap.items():
257             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
258              offsets_tuple) = verinfo
259             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
260             if len(shnums) >= k:
261                 # this one is recoverable
262                 recoverable_versions.add(verinfo)
263
264         return recoverable_versions
265
266     def unrecoverable_versions(self):
267         """Return a set of versionids, one for each version that is currently
268         unrecoverable."""
269         versionmap = self.make_versionmap()
270
271         unrecoverable_versions = set()
272         for (verinfo, shares) in versionmap.items():
273             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
274              offsets_tuple) = verinfo
275             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
276             if len(shnums) < k:
277                 unrecoverable_versions.add(verinfo)
278
279         return unrecoverable_versions
280
281     def best_recoverable_version(self):
282         """Return a single versionid, for the so-called 'best' recoverable
283         version. Sequence number is the primary sort criteria, followed by
284         root hash. Returns None if there are no recoverable versions."""
285         recoverable = list(self.recoverable_versions())
286         recoverable.sort()
287         if recoverable:
288             return recoverable[-1]
289         return None
290
291     def size_of_version(self, verinfo):
292         """Given a versionid (perhaps returned by best_recoverable_version),
293         return the size of the file in bytes."""
294         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
295          offsets_tuple) = verinfo
296         return datalength
297
298     def unrecoverable_newer_versions(self):
299         # Return a dict of versionid -> health, for versions that are
300         # unrecoverable and have later seqnums than any recoverable versions.
301         # These indicate that a write will lose data.
302         versionmap = self.make_versionmap()
303         healths = {} # maps verinfo to (found,k)
304         unrecoverable = set()
305         highest_recoverable_seqnum = -1
306         for (verinfo, shares) in versionmap.items():
307             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
308              offsets_tuple) = verinfo
309             shnums = set([shnum for (shnum, peerid, timestamp) in shares])
310             healths[verinfo] = (len(shnums),k)
311             if len(shnums) < k:
312                 unrecoverable.add(verinfo)
313             else:
314                 highest_recoverable_seqnum = max(seqnum,
315                                                  highest_recoverable_seqnum)
316
317         newversions = {}
318         for verinfo in unrecoverable:
319             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
320              offsets_tuple) = verinfo
321             if seqnum > highest_recoverable_seqnum:
322                 newversions[verinfo] = healths[verinfo]
323
324         return newversions
325
326
327     def needs_merge(self):
328         # return True if there are multiple recoverable versions with the
329         # same seqnum, meaning that MutableFileNode.read_best_version is not
330         # giving you the whole story, and that using its data to do a
331         # subsequent publish will lose information.
332         recoverable_seqnums = [verinfo[0]
333                                for verinfo in self.recoverable_versions()]
334         for seqnum in recoverable_seqnums:
335             if recoverable_seqnums.count(seqnum) > 1:
336                 return True
337         return False
338
339
340 class ServermapUpdater:
341     def __init__(self, filenode, monitor, servermap, mode=MODE_READ,
342                  add_lease=False):
343         """I update a servermap, locating a sufficient number of useful
344         shares and remembering where they are located.
345
346         """
347
348         self._node = filenode
349         self._monitor = monitor
350         self._servermap = servermap
351         self.mode = mode
352         self._add_lease = add_lease
353         self._running = True
354
355         self._storage_index = filenode.get_storage_index()
356         self._last_failure = None
357
358         self._status = UpdateStatus()
359         self._status.set_storage_index(self._storage_index)
360         self._status.set_progress(0.0)
361         self._status.set_mode(mode)
362
363         self._servers_responded = set()
364
365         # how much data should we read?
366         #  * if we only need the checkstring, then [0:75]
367         #  * if we need to validate the checkstring sig, then [543ish:799ish]
368         #  * if we need the verification key, then [107:436ish]
369         #   * the offset table at [75:107] tells us about the 'ish'
370         #  * if we need the encrypted private key, we want [-1216ish:]
371         #   * but we can't read from negative offsets
372         #   * the offset table tells us the 'ish', also the positive offset
373         # A future version of the SMDF slot format should consider using
374         # fixed-size slots so we can retrieve less data. For now, we'll just
375         # read 2000 bytes, which also happens to read enough actual data to
376         # pre-fetch a 9-entry dirnode.
377         self._read_size = 2000
378         if mode == MODE_CHECK:
379             # we use unpack_prefix_and_signature, so we need 1k
380             self._read_size = 1000
381         self._need_privkey = False
382         if mode == MODE_WRITE and not self._node._privkey:
383             self._need_privkey = True
384         # check+repair: repair requires the privkey, so if we didn't happen
385         # to ask for it during the check, we'll have problems doing the
386         # publish.
387
388         prefix = storage.si_b2a(self._storage_index)[:5]
389         self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
390                                    si=prefix, mode=mode)
391
392     def get_status(self):
393         return self._status
394
395     def log(self, *args, **kwargs):
396         if "parent" not in kwargs:
397             kwargs["parent"] = self._log_number
398         if "facility" not in kwargs:
399             kwargs["facility"] = "tahoe.mutable.mapupdate"
400         return log.msg(*args, **kwargs)
401
402     def update(self):
403         """Update the servermap to reflect current conditions. Returns a
404         Deferred that fires with the servermap once the update has finished."""
405         self._started = time.time()
406         self._status.set_active(True)
407
408         # self._valid_versions is a set of validated verinfo tuples. We just
409         # use it to remember which versions had valid signatures, so we can
410         # avoid re-checking the signatures for each share.
411         self._valid_versions = set()
412
413         # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
414         # timestamp) tuples. This is used to figure out which versions might
415         # be retrievable, and to make the eventual data download faster.
416         self.versionmap = DictOfSets()
417
418         self._done_deferred = defer.Deferred()
419
420         # first, which peers should be talk to? Any that were in our old
421         # servermap, plus "enough" others.
422
423         self._queries_completed = 0
424
425         client = self._node._client
426         full_peerlist = client.get_permuted_peers("storage",
427                                                   self._node._storage_index)
428         self.full_peerlist = full_peerlist # for use later, immutable
429         self.extra_peers = full_peerlist[:] # peers are removed as we use them
430         self._good_peers = set() # peers who had some shares
431         self._empty_peers = set() # peers who don't have any shares
432         self._bad_peers = set() # peers to whom our queries failed
433
434         k = self._node.get_required_shares()
435         if k is None:
436             # make a guess
437             k = 3
438         N = self._node.get_required_shares()
439         if N is None:
440             N = 10
441         self.EPSILON = k
442         # we want to send queries to at least this many peers (although we
443         # might not wait for all of their answers to come back)
444         self.num_peers_to_query = k + self.EPSILON
445
446         if self.mode == MODE_CHECK:
447             initial_peers_to_query = dict(full_peerlist)
448             must_query = set(initial_peers_to_query.keys())
449             self.extra_peers = []
450         elif self.mode == MODE_WRITE:
451             # we're planning to replace all the shares, so we want a good
452             # chance of finding them all. We will keep searching until we've
453             # seen epsilon that don't have a share.
454             self.num_peers_to_query = N + self.EPSILON
455             initial_peers_to_query, must_query = self._build_initial_querylist()
456             self.required_num_empty_peers = self.EPSILON
457
458             # TODO: arrange to read lots of data from k-ish servers, to avoid
459             # the extra round trip required to read large directories. This
460             # might also avoid the round trip required to read the encrypted
461             # private key.
462
463         else:
464             initial_peers_to_query, must_query = self._build_initial_querylist()
465
466         # this is a set of peers that we are required to get responses from:
467         # they are peers who used to have a share, so we need to know where
468         # they currently stand, even if that means we have to wait for a
469         # silently-lost TCP connection to time out. We remove peers from this
470         # set as we get responses.
471         self._must_query = must_query
472
473         # now initial_peers_to_query contains the peers that we should ask,
474         # self.must_query contains the peers that we must have heard from
475         # before we can consider ourselves finished, and self.extra_peers
476         # contains the overflow (peers that we should tap if we don't get
477         # enough responses)
478
479         self._send_initial_requests(initial_peers_to_query)
480         self._status.timings["initial_queries"] = time.time() - self._started
481         return self._done_deferred
482
483     def _build_initial_querylist(self):
484         initial_peers_to_query = {}
485         must_query = set()
486         for peerid in self._servermap.all_peers():
487             ss = self._servermap.connections[peerid]
488             # we send queries to everyone who was already in the sharemap
489             initial_peers_to_query[peerid] = ss
490             # and we must wait for responses from them
491             must_query.add(peerid)
492
493         while ((self.num_peers_to_query > len(initial_peers_to_query))
494                and self.extra_peers):
495             (peerid, ss) = self.extra_peers.pop(0)
496             initial_peers_to_query[peerid] = ss
497
498         return initial_peers_to_query, must_query
499
500     def _send_initial_requests(self, peerlist):
501         self._status.set_status("Sending %d initial queries" % len(peerlist))
502         self._queries_outstanding = set()
503         self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
504         dl = []
505         for (peerid, ss) in peerlist.items():
506             self._queries_outstanding.add(peerid)
507             self._do_query(ss, peerid, self._storage_index, self._read_size)
508
509         if not peerlist:
510             # there is nobody to ask, so we need to short-circuit the state
511             # machine.
512             d = defer.maybeDeferred(self._check_for_done, None)
513             d.addErrback(self._fatal_error)
514
515         # control flow beyond this point: state machine. Receiving responses
516         # from queries is the input. We might send out more queries, or we
517         # might produce a result.
518         return None
519
520     def _do_query(self, ss, peerid, storage_index, readsize):
521         self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
522                  peerid=idlib.shortnodeid_b2a(peerid),
523                  readsize=readsize,
524                  level=log.NOISY)
525         self._servermap.connections[peerid] = ss
526         started = time.time()
527         self._queries_outstanding.add(peerid)
528         d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
529         d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
530                       started)
531         d.addErrback(self._query_failed, peerid)
532         # errors that aren't handled by _query_failed (and errors caused by
533         # _query_failed) get logged, but we still want to check for doneness.
534         d.addErrback(log.err)
535         d.addBoth(self._check_for_done)
536         d.addErrback(self._fatal_error)
537         return d
538
539     def _do_read(self, ss, peerid, storage_index, shnums, readv):
540         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
541         if self._add_lease:
542             renew_secret = self._node.get_renewal_secret(peerid)
543             cancel_secret = self._node.get_cancel_secret(peerid)
544             d2 = ss.callRemote("add_lease", storage_index,
545                                renew_secret, cancel_secret)
546             dl = defer.DeferredList([d, d2])
547             def _done(res):
548                 [(readv_success, readv_result),
549                  (addlease_success, addlease_result)] = res
550                 if (not addlease_success and
551                     not addlease_result.check(IndexError)):
552                     # tahoe 1.3.0 raised IndexError on non-existant buckets,
553                     # which we ignore. Unfortunately tahoe <1.3.0 had a bug
554                     # and raised KeyError, which we report.
555                     return addlease_result # propagate error
556                 return readv_result
557             dl.addCallback(_done)
558             return dl
559         return d
560
561     def _got_results(self, datavs, peerid, readsize, stuff, started):
562         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
563                       peerid=idlib.shortnodeid_b2a(peerid),
564                       numshares=len(datavs),
565                       level=log.NOISY)
566         now = time.time()
567         elapsed = now - started
568         self._queries_outstanding.discard(peerid)
569         self._servermap.reachable_peers.add(peerid)
570         self._must_query.discard(peerid)
571         self._queries_completed += 1
572         if not self._running:
573             self.log("but we're not running, so we'll ignore it", parent=lp,
574                      level=log.NOISY)
575             self._status.add_per_server_time(peerid, "late", started, elapsed)
576             return
577         self._status.add_per_server_time(peerid, "query", started, elapsed)
578
579         if datavs:
580             self._good_peers.add(peerid)
581         else:
582             self._empty_peers.add(peerid)
583
584         last_verinfo = None
585         last_shnum = None
586         for shnum,datav in datavs.items():
587             data = datav[0]
588             try:
589                 verinfo = self._got_results_one_share(shnum, data, peerid, lp)
590                 last_verinfo = verinfo
591                 last_shnum = shnum
592                 self._node._cache.add(verinfo, shnum, 0, data, now)
593             except CorruptShareError, e:
594                 # log it and give the other shares a chance to be processed
595                 f = failure.Failure()
596                 self.log(format="bad share: %(f_value)s", f_value=str(f.value),
597                          failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
598                 self.notify_server_corruption(peerid, shnum, str(e))
599                 self._bad_peers.add(peerid)
600                 self._last_failure = f
601                 checkstring = data[:SIGNED_PREFIX_LENGTH]
602                 self._servermap.mark_bad_share(peerid, shnum, checkstring)
603                 self._servermap.problems.append(f)
604                 pass
605
606         self._status.timings["cumulative_verify"] += (time.time() - now)
607
608         if self._need_privkey and last_verinfo:
609             # send them a request for the privkey. We send one request per
610             # server.
611             lp2 = self.log("sending privkey request",
612                            parent=lp, level=log.NOISY)
613             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
614              offsets_tuple) = last_verinfo
615             o = dict(offsets_tuple)
616
617             self._queries_outstanding.add(peerid)
618             readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
619             ss = self._servermap.connections[peerid]
620             privkey_started = time.time()
621             d = self._do_read(ss, peerid, self._storage_index,
622                               [last_shnum], readv)
623             d.addCallback(self._got_privkey_results, peerid, last_shnum,
624                           privkey_started, lp2)
625             d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
626             d.addErrback(log.err)
627             d.addCallback(self._check_for_done)
628             d.addErrback(self._fatal_error)
629
630         # all done!
631         self.log("_got_results done", parent=lp, level=log.NOISY)
632
633     def notify_server_corruption(self, peerid, shnum, reason):
634         ss = self._servermap.connections[peerid]
635         ss.callRemoteOnly("advise_corrupt_share",
636                           "mutable", self._storage_index, shnum, reason)
637
638     def _got_results_one_share(self, shnum, data, peerid, lp):
639         self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
640                  shnum=shnum,
641                  peerid=idlib.shortnodeid_b2a(peerid),
642                  level=log.NOISY,
643                  parent=lp)
644
645         # this might raise NeedMoreDataError, if the pubkey and signature
646         # live at some weird offset. That shouldn't happen, so I'm going to
647         # treat it as a bad share.
648         (seqnum, root_hash, IV, k, N, segsize, datalength,
649          pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
650
651         if not self._node.get_pubkey():
652             fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
653             assert len(fingerprint) == 32
654             if fingerprint != self._node._fingerprint:
655                 raise CorruptShareError(peerid, shnum,
656                                         "pubkey doesn't match fingerprint")
657             self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
658
659         if self._need_privkey:
660             self._try_to_extract_privkey(data, peerid, shnum, lp)
661
662         (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
663          ig_segsize, ig_datalen, offsets) = unpack_header(data)
664         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
665
666         verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
667                    offsets_tuple)
668
669         if verinfo not in self._valid_versions:
670             # it's a new pair. Verify the signature.
671             valid = self._node._pubkey.verify(prefix, signature)
672             if not valid:
673                 raise CorruptShareError(peerid, shnum, "signature is invalid")
674
675             # ok, it's a valid verinfo. Add it to the list of validated
676             # versions.
677             self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
678                      % (seqnum, base32.b2a(root_hash)[:4],
679                         idlib.shortnodeid_b2a(peerid), shnum,
680                         k, N, segsize, datalength),
681                      parent=lp)
682             self._valid_versions.add(verinfo)
683         # We now know that this is a valid candidate verinfo.
684
685         if (peerid, shnum) in self._servermap.bad_shares:
686             # we've been told that the rest of the data in this share is
687             # unusable, so don't add it to the servermap.
688             self.log("but we've been told this is a bad share",
689                      parent=lp, level=log.UNUSUAL)
690             return verinfo
691
692         # Add the info to our servermap.
693         timestamp = time.time()
694         self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
695         # and the versionmap
696         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
697         return verinfo
698
699     def _deserialize_pubkey(self, pubkey_s):
700         verifier = rsa.create_verifying_key_from_string(pubkey_s)
701         return verifier
702
703     def _try_to_extract_privkey(self, data, peerid, shnum, lp):
704         try:
705             r = unpack_share(data)
706         except NeedMoreDataError, e:
707             # this share won't help us. oh well.
708             offset = e.encprivkey_offset
709             length = e.encprivkey_length
710             self.log("shnum %d on peerid %s: share was too short (%dB) "
711                      "to get the encprivkey; [%d:%d] ought to hold it" %
712                      (shnum, idlib.shortnodeid_b2a(peerid), len(data),
713                       offset, offset+length),
714                      parent=lp)
715             # NOTE: if uncoordinated writes are taking place, someone might
716             # change the share (and most probably move the encprivkey) before
717             # we get a chance to do one of these reads and fetch it. This
718             # will cause us to see a NotEnoughSharesError(unable to fetch
719             # privkey) instead of an UncoordinatedWriteError . This is a
720             # nuisance, but it will go away when we move to DSA-based mutable
721             # files (since the privkey will be small enough to fit in the
722             # write cap).
723
724             return
725
726         (seqnum, root_hash, IV, k, N, segsize, datalen,
727          pubkey, signature, share_hash_chain, block_hash_tree,
728          share_data, enc_privkey) = r
729
730         return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
731
732     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
733
734         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
735         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
736         if alleged_writekey != self._node.get_writekey():
737             self.log("invalid privkey from %s shnum %d" %
738                      (idlib.nodeid_b2a(peerid)[:8], shnum),
739                      parent=lp, level=log.WEIRD, umid="aJVccw")
740             return
741
742         # it's good
743         self.log("got valid privkey from shnum %d on peerid %s" %
744                  (shnum, idlib.shortnodeid_b2a(peerid)),
745                  parent=lp)
746         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
747         self._node._populate_encprivkey(enc_privkey)
748         self._node._populate_privkey(privkey)
749         self._need_privkey = False
750         self._status.set_privkey_from(peerid)
751
752
753     def _query_failed(self, f, peerid):
754         if not self._running:
755             return
756         level = log.WEIRD
757         if f.check(DeadReferenceError):
758             level = log.UNUSUAL
759         self.log(format="error during query: %(f_value)s",
760                  f_value=str(f.value), failure=f,
761                  level=level, umid="IHXuQg")
762         self._must_query.discard(peerid)
763         self._queries_outstanding.discard(peerid)
764         self._bad_peers.add(peerid)
765         self._servermap.problems.append(f)
766         # a peerid could be in both ServerMap.reachable_peers and
767         # .unreachable_peers if they responded to our query, but then an
768         # exception was raised in _got_results.
769         self._servermap.unreachable_peers.add(peerid)
770         self._queries_completed += 1
771         self._last_failure = f
772
773     def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
774         now = time.time()
775         elapsed = now - started
776         self._status.add_per_server_time(peerid, "privkey", started, elapsed)
777         self._queries_outstanding.discard(peerid)
778         if not self._need_privkey:
779             return
780         if shnum not in datavs:
781             self.log("privkey wasn't there when we asked it",
782                      level=log.WEIRD, umid="VA9uDQ")
783             return
784         datav = datavs[shnum]
785         enc_privkey = datav[0]
786         self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
787
788     def _privkey_query_failed(self, f, peerid, shnum, lp):
789         self._queries_outstanding.discard(peerid)
790         if not self._running:
791             return
792         level = log.WEIRD
793         if f.check(DeadReferenceError):
794             level = log.UNUSUAL
795         self.log(format="error during privkey query: %(f_value)s",
796                  f_value=str(f.value), failure=f,
797                  parent=lp, level=level, umid="McoJ5w")
798         self._servermap.problems.append(f)
799         self._last_failure = f
800
801     def _check_for_done(self, res):
802         # exit paths:
803         #  return self._send_more_queries(outstanding) : send some more queries
804         #  return self._done() : all done
805         #  return : keep waiting, no new queries
806
807         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
808                               "%(outstanding)d queries outstanding, "
809                               "%(extra)d extra peers available, "
810                               "%(must)d 'must query' peers left, "
811                               "need_privkey=%(need_privkey)s"
812                               ),
813                       mode=self.mode,
814                       outstanding=len(self._queries_outstanding),
815                       extra=len(self.extra_peers),
816                       must=len(self._must_query),
817                       need_privkey=self._need_privkey,
818                       level=log.NOISY,
819                       )
820
821         if not self._running:
822             self.log("but we're not running", parent=lp, level=log.NOISY)
823             return
824
825         if self._must_query:
826             # we are still waiting for responses from peers that used to have
827             # a share, so we must continue to wait. No additional queries are
828             # required at this time.
829             self.log("%d 'must query' peers left" % len(self._must_query),
830                      level=log.NOISY, parent=lp)
831             return
832
833         if (not self._queries_outstanding and not self.extra_peers):
834             # all queries have retired, and we have no peers left to ask. No
835             # more progress can be made, therefore we are done.
836             self.log("all queries are retired, no extra peers: done",
837                      parent=lp)
838             return self._done()
839
840         recoverable_versions = self._servermap.recoverable_versions()
841         unrecoverable_versions = self._servermap.unrecoverable_versions()
842
843         # what is our completion policy? how hard should we work?
844
845         if self.mode == MODE_ANYTHING:
846             if recoverable_versions:
847                 self.log("%d recoverable versions: done"
848                          % len(recoverable_versions),
849                          parent=lp)
850                 return self._done()
851
852         if self.mode == MODE_CHECK:
853             # we used self._must_query, and we know there aren't any
854             # responses still waiting, so that means we must be done
855             self.log("done", parent=lp)
856             return self._done()
857
858         MAX_IN_FLIGHT = 5
859         if self.mode == MODE_READ:
860             # if we've queried k+epsilon servers, and we see a recoverable
861             # version, and we haven't seen any unrecoverable higher-seqnum'ed
862             # versions, then we're done.
863
864             if self._queries_completed < self.num_peers_to_query:
865                 self.log(format="%(completed)d completed, %(query)d to query: need more",
866                          completed=self._queries_completed,
867                          query=self.num_peers_to_query,
868                          level=log.NOISY, parent=lp)
869                 return self._send_more_queries(MAX_IN_FLIGHT)
870             if not recoverable_versions:
871                 self.log("no recoverable versions: need more",
872                          level=log.NOISY, parent=lp)
873                 return self._send_more_queries(MAX_IN_FLIGHT)
874             highest_recoverable = max(recoverable_versions)
875             highest_recoverable_seqnum = highest_recoverable[0]
876             for unrec_verinfo in unrecoverable_versions:
877                 if unrec_verinfo[0] > highest_recoverable_seqnum:
878                     # there is evidence of a higher-seqnum version, but we
879                     # don't yet see enough shares to recover it. Try harder.
880                     # TODO: consider sending more queries.
881                     # TODO: consider limiting the search distance
882                     self.log("evidence of higher seqnum: need more",
883                              level=log.UNUSUAL, parent=lp)
884                     return self._send_more_queries(MAX_IN_FLIGHT)
885             # all the unrecoverable versions were old or concurrent with a
886             # recoverable version. Good enough.
887             self.log("no higher-seqnum: done", parent=lp)
888             return self._done()
889
890         if self.mode == MODE_WRITE:
891             # we want to keep querying until we've seen a few that don't have
892             # any shares, to be sufficiently confident that we've seen all
893             # the shares. This is still less work than MODE_CHECK, which asks
894             # every server in the world.
895
896             if not recoverable_versions:
897                 self.log("no recoverable versions: need more", parent=lp,
898                          level=log.NOISY)
899                 return self._send_more_queries(MAX_IN_FLIGHT)
900
901             last_found = -1
902             last_not_responded = -1
903             num_not_responded = 0
904             num_not_found = 0
905             states = []
906             found_boundary = False
907
908             for i,(peerid,ss) in enumerate(self.full_peerlist):
909                 if peerid in self._bad_peers:
910                     # query failed
911                     states.append("x")
912                     #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
913                 elif peerid in self._empty_peers:
914                     # no shares
915                     states.append("0")
916                     #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
917                     if last_found != -1:
918                         num_not_found += 1
919                         if num_not_found >= self.EPSILON:
920                             self.log("found our boundary, %s" %
921                                      "".join(states),
922                                      parent=lp, level=log.NOISY)
923                             found_boundary = True
924                             break
925
926                 elif peerid in self._good_peers:
927                     # yes shares
928                     states.append("1")
929                     #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
930                     last_found = i
931                     num_not_found = 0
932                 else:
933                     # not responded yet
934                     states.append("?")
935                     #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
936                     last_not_responded = i
937                     num_not_responded += 1
938
939             if found_boundary:
940                 # we need to know that we've gotten answers from
941                 # everybody to the left of here
942                 if last_not_responded == -1:
943                     # we're done
944                     self.log("have all our answers",
945                              parent=lp, level=log.NOISY)
946                     # .. unless we're still waiting on the privkey
947                     if self._need_privkey:
948                         self.log("but we're still waiting for the privkey",
949                                  parent=lp, level=log.NOISY)
950                         # if we found the boundary but we haven't yet found
951                         # the privkey, we may need to look further. If
952                         # somehow all the privkeys were corrupted (but the
953                         # shares were readable), then this is likely to do an
954                         # exhaustive search.
955                         return self._send_more_queries(MAX_IN_FLIGHT)
956                     return self._done()
957                 # still waiting for somebody
958                 return self._send_more_queries(num_not_responded)
959
960             # if we hit here, we didn't find our boundary, so we're still
961             # waiting for peers
962             self.log("no boundary yet, %s" % "".join(states), parent=lp,
963                      level=log.NOISY)
964             return self._send_more_queries(MAX_IN_FLIGHT)
965
966         # otherwise, keep up to 5 queries in flight. TODO: this is pretty
967         # arbitrary, really I want this to be something like k -
968         # max(known_version_sharecounts) + some extra
969         self.log("catchall: need more", parent=lp, level=log.NOISY)
970         return self._send_more_queries(MAX_IN_FLIGHT)
971
972     def _send_more_queries(self, num_outstanding):
973         more_queries = []
974
975         while True:
976             self.log(format=" there are %(outstanding)d queries outstanding",
977                      outstanding=len(self._queries_outstanding),
978                      level=log.NOISY)
979             active_queries = len(self._queries_outstanding) + len(more_queries)
980             if active_queries >= num_outstanding:
981                 break
982             if not self.extra_peers:
983                 break
984             more_queries.append(self.extra_peers.pop(0))
985
986         self.log(format="sending %(more)d more queries: %(who)s",
987                  more=len(more_queries),
988                  who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
989                                for (peerid,ss) in more_queries]),
990                  level=log.NOISY)
991
992         for (peerid, ss) in more_queries:
993             self._do_query(ss, peerid, self._storage_index, self._read_size)
994             # we'll retrigger when those queries come back
995
996     def _done(self):
997         if not self._running:
998             return
999         self._running = False
1000         now = time.time()
1001         elapsed = now - self._started
1002         self._status.set_finished(now)
1003         self._status.timings["total"] = elapsed
1004         self._status.set_progress(1.0)
1005         self._status.set_status("Done")
1006         self._status.set_active(False)
1007
1008         self._servermap.last_update_mode = self.mode
1009         self._servermap.last_update_time = self._started
1010         # the servermap will not be touched after this
1011         self.log("servermap: %s" % self._servermap.summarize_versions())
1012         eventually(self._done_deferred.callback, self._servermap)
1013
1014     def _fatal_error(self, f):
1015         self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1016         self._done_deferred.errback(f)
1017
1018