3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap import DeadReferenceError
8 from foolscap.eventual import eventually
9 from allmydata.util import base32, hashutil, idlib, log, rrefutil
10 from allmydata.storage.server import si_b2a
11 from allmydata.interfaces import IServermapUpdaterStatus
12 from pycryptopp.publickey import rsa
14 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
15 DictOfSets, CorruptShareError, NeedMoreDataError
16 from layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
20 implements(IServermapUpdaterStatus)
21 statusid_counter = count(0)
24 self.timings["per_server"] = {}
25 self.timings["cumulative_verify"] = 0.0
26 self.privkey_from = None
29 self.storage_index = None
31 self.status = "Not started"
33 self.counter = self.statusid_counter.next()
34 self.started = time.time()
37 def add_per_server_time(self, peerid, op, sent, elapsed):
38 assert op in ("query", "late", "privkey")
39 if peerid not in self.timings["per_server"]:
40 self.timings["per_server"][peerid] = []
41 self.timings["per_server"][peerid].append((op,sent,elapsed))
43 def get_started(self):
45 def get_finished(self):
47 def get_storage_index(self):
48 return self.storage_index
51 def get_servermap(self):
53 def get_privkey_from(self):
54 return self.privkey_from
55 def using_helper(self):
61 def get_progress(self):
65 def get_counter(self):
68 def set_storage_index(self, si):
69 self.storage_index = si
70 def set_mode(self, mode):
72 def set_privkey_from(self, peerid):
73 self.privkey_from = peerid
74 def set_status(self, status):
76 def set_progress(self, value):
78 def set_active(self, value):
80 def set_finished(self, when):
84 """I record the placement of mutable shares.
86 This object records which shares (of various versions) are located on
89 One purpose I serve is to inform callers about which versions of the
90 mutable file are recoverable and 'current'.
92 A second purpose is to serve as a state marker for test-and-set
93 operations. I am passed out of retrieval operations and back into publish
94 operations, which means 'publish this new version, but only if nothing
95 has changed since I last retrieved this data'. This reduces the chances
96 of clobbering a simultaneous (uncoordinated) write.
98 @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
99 (versionid, timestamp) tuple. Each 'versionid' is a
100 tuple of (seqnum, root_hash, IV, segsize, datalength,
101 k, N, signed_prefix, offsets)
103 @ivar connections: maps peerid to a RemoteReference
105 @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
106 shares that I should ignore (because a previous user of
107 the servermap determined that they were invalid). The
108 updater only locates a certain number of shares: if
109 some of these turn out to have integrity problems and
110 are unusable, the caller will need to mark those shares
111 as bad, then re-update the servermap, then try again.
112 The dict maps (peerid, shnum) tuple to old checkstring.
117 self.connections = {}
118 self.unreachable_peers = set() # peerids that didn't respond to queries
119 self.reachable_peers = set() # peerids that did respond to queries
120 self.problems = [] # mostly for debugging
121 self.bad_shares = {} # maps (peerid,shnum) to old checkstring
122 self.last_update_mode = None
123 self.last_update_time = 0
127 s.servermap = self.servermap.copy() # tuple->tuple
128 s.connections = self.connections.copy() # str->RemoteReference
129 s.unreachable_peers = set(self.unreachable_peers)
130 s.reachable_peers = set(self.reachable_peers)
131 s.problems = self.problems[:]
132 s.bad_shares = self.bad_shares.copy() # tuple->str
133 s.last_update_mode = self.last_update_mode
134 s.last_update_time = self.last_update_time
137 def mark_bad_share(self, peerid, shnum, checkstring):
138 """This share was found to be bad, either in the checkstring or
139 signature (detected during mapupdate), or deeper in the share
140 (detected at retrieve time). Remove it from our list of useful
141 shares, and remember that it is bad so we don't add it back again
142 later. We record the share's old checkstring (which might be
143 corrupted or badly signed) so that a repair operation can do the
144 test-and-set using it as a reference.
146 key = (peerid, shnum) # record checkstring
147 self.bad_shares[key] = checkstring
148 self.servermap.pop(key, None)
150 def add_new_share(self, peerid, shnum, verinfo, timestamp):
151 """We've written a new share out, replacing any that was there
153 key = (peerid, shnum)
154 self.bad_shares.pop(key, None)
155 self.servermap[key] = (verinfo, timestamp)
157 def dump(self, out=sys.stdout):
158 print >>out, "servermap:"
160 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
161 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
162 offsets_tuple) = verinfo
163 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
164 (idlib.shortnodeid_b2a(peerid), shnum,
165 seqnum, base32.b2a(root_hash)[:4], k, N,
168 print >>out, "%d PROBLEMS" % len(self.problems)
169 for f in self.problems:
178 def all_peers_for_version(self, verinfo):
179 """Return a set of peerids that hold shares for the given version."""
181 for ( (peerid, shnum), (verinfo2, timestamp) )
182 in self.servermap.items()
183 if verinfo == verinfo2])
185 def make_sharemap(self):
186 """Return a dict that maps shnum to a set of peerds that hold it."""
187 sharemap = DictOfSets()
188 for (peerid, shnum) in self.servermap:
189 sharemap.add(shnum, peerid)
192 def make_versionmap(self):
193 """Return a dict that maps versionid to sets of (shnum, peerid,
194 timestamp) tuples."""
195 versionmap = DictOfSets()
196 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
197 versionmap.add(verinfo, (shnum, peerid, timestamp))
200 def shares_on_peer(self, peerid):
202 for (s_peerid, shnum)
204 if s_peerid == peerid])
206 def version_on_peer(self, peerid, shnum):
207 key = (peerid, shnum)
208 if key in self.servermap:
209 (verinfo, timestamp) = self.servermap[key]
213 def shares_available(self):
214 """Return a dict that maps verinfo to tuples of
215 (num_distinct_shares, k, N) tuples."""
216 versionmap = self.make_versionmap()
218 for verinfo, shares in versionmap.items():
220 for (shnum, peerid, timestamp) in shares:
222 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
223 offsets_tuple) = verinfo
224 all_shares[verinfo] = (len(s), k, N)
227 def highest_seqnum(self):
228 available = self.shares_available()
229 seqnums = [verinfo[0]
230 for verinfo in available.keys()]
234 def summarize_version(self, verinfo):
235 """Take a versionid, return a string that describes it."""
236 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
237 offsets_tuple) = verinfo
238 return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
240 def summarize_versions(self):
241 """Return a string describing which versions we know about."""
242 versionmap = self.make_versionmap()
244 for (verinfo, shares) in versionmap.items():
245 vstr = self.summarize_version(verinfo)
246 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
247 bits.append("%d*%s" % (len(shnums), vstr))
248 return "/".join(bits)
250 def recoverable_versions(self):
251 """Return a set of versionids, one for each version that is currently
253 versionmap = self.make_versionmap()
255 recoverable_versions = set()
256 for (verinfo, shares) in versionmap.items():
257 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
258 offsets_tuple) = verinfo
259 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
261 # this one is recoverable
262 recoverable_versions.add(verinfo)
264 return recoverable_versions
266 def unrecoverable_versions(self):
267 """Return a set of versionids, one for each version that is currently
269 versionmap = self.make_versionmap()
271 unrecoverable_versions = set()
272 for (verinfo, shares) in versionmap.items():
273 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
274 offsets_tuple) = verinfo
275 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
277 unrecoverable_versions.add(verinfo)
279 return unrecoverable_versions
281 def best_recoverable_version(self):
282 """Return a single versionid, for the so-called 'best' recoverable
283 version. Sequence number is the primary sort criteria, followed by
284 root hash. Returns None if there are no recoverable versions."""
285 recoverable = list(self.recoverable_versions())
288 return recoverable[-1]
291 def size_of_version(self, verinfo):
292 """Given a versionid (perhaps returned by best_recoverable_version),
293 return the size of the file in bytes."""
294 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
295 offsets_tuple) = verinfo
298 def unrecoverable_newer_versions(self):
299 # Return a dict of versionid -> health, for versions that are
300 # unrecoverable and have later seqnums than any recoverable versions.
301 # These indicate that a write will lose data.
302 versionmap = self.make_versionmap()
303 healths = {} # maps verinfo to (found,k)
304 unrecoverable = set()
305 highest_recoverable_seqnum = -1
306 for (verinfo, shares) in versionmap.items():
307 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
308 offsets_tuple) = verinfo
309 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
310 healths[verinfo] = (len(shnums),k)
312 unrecoverable.add(verinfo)
314 highest_recoverable_seqnum = max(seqnum,
315 highest_recoverable_seqnum)
318 for verinfo in unrecoverable:
319 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
320 offsets_tuple) = verinfo
321 if seqnum > highest_recoverable_seqnum:
322 newversions[verinfo] = healths[verinfo]
327 def needs_merge(self):
328 # return True if there are multiple recoverable versions with the
329 # same seqnum, meaning that MutableFileNode.read_best_version is not
330 # giving you the whole story, and that using its data to do a
331 # subsequent publish will lose information.
332 recoverable_seqnums = [verinfo[0]
333 for verinfo in self.recoverable_versions()]
334 for seqnum in recoverable_seqnums:
335 if recoverable_seqnums.count(seqnum) > 1:
340 class ServermapUpdater:
341 def __init__(self, filenode, monitor, servermap, mode=MODE_READ,
343 """I update a servermap, locating a sufficient number of useful
344 shares and remembering where they are located.
348 self._node = filenode
349 self._monitor = monitor
350 self._servermap = servermap
352 self._add_lease = add_lease
355 self._storage_index = filenode.get_storage_index()
356 self._last_failure = None
358 self._status = UpdateStatus()
359 self._status.set_storage_index(self._storage_index)
360 self._status.set_progress(0.0)
361 self._status.set_mode(mode)
363 self._servers_responded = set()
365 # how much data should we read?
366 # * if we only need the checkstring, then [0:75]
367 # * if we need to validate the checkstring sig, then [543ish:799ish]
368 # * if we need the verification key, then [107:436ish]
369 # * the offset table at [75:107] tells us about the 'ish'
370 # * if we need the encrypted private key, we want [-1216ish:]
371 # * but we can't read from negative offsets
372 # * the offset table tells us the 'ish', also the positive offset
373 # A future version of the SMDF slot format should consider using
374 # fixed-size slots so we can retrieve less data. For now, we'll just
375 # read 2000 bytes, which also happens to read enough actual data to
376 # pre-fetch a 9-entry dirnode.
377 self._read_size = 2000
378 if mode == MODE_CHECK:
379 # we use unpack_prefix_and_signature, so we need 1k
380 self._read_size = 1000
381 self._need_privkey = False
382 if mode == MODE_WRITE and not self._node._privkey:
383 self._need_privkey = True
384 # check+repair: repair requires the privkey, so if we didn't happen
385 # to ask for it during the check, we'll have problems doing the
388 prefix = si_b2a(self._storage_index)[:5]
389 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
390 si=prefix, mode=mode)
392 def get_status(self):
395 def log(self, *args, **kwargs):
396 if "parent" not in kwargs:
397 kwargs["parent"] = self._log_number
398 if "facility" not in kwargs:
399 kwargs["facility"] = "tahoe.mutable.mapupdate"
400 return log.msg(*args, **kwargs)
403 """Update the servermap to reflect current conditions. Returns a
404 Deferred that fires with the servermap once the update has finished."""
405 self._started = time.time()
406 self._status.set_active(True)
408 # self._valid_versions is a set of validated verinfo tuples. We just
409 # use it to remember which versions had valid signatures, so we can
410 # avoid re-checking the signatures for each share.
411 self._valid_versions = set()
413 # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
414 # timestamp) tuples. This is used to figure out which versions might
415 # be retrievable, and to make the eventual data download faster.
416 self.versionmap = DictOfSets()
418 self._done_deferred = defer.Deferred()
420 # first, which peers should be talk to? Any that were in our old
421 # servermap, plus "enough" others.
423 self._queries_completed = 0
425 client = self._node._client
426 full_peerlist = client.get_permuted_peers("storage",
427 self._node._storage_index)
428 self.full_peerlist = full_peerlist # for use later, immutable
429 self.extra_peers = full_peerlist[:] # peers are removed as we use them
430 self._good_peers = set() # peers who had some shares
431 self._empty_peers = set() # peers who don't have any shares
432 self._bad_peers = set() # peers to whom our queries failed
434 k = self._node.get_required_shares()
438 N = self._node.get_required_shares()
442 # we want to send queries to at least this many peers (although we
443 # might not wait for all of their answers to come back)
444 self.num_peers_to_query = k + self.EPSILON
446 if self.mode == MODE_CHECK:
447 initial_peers_to_query = dict(full_peerlist)
448 must_query = set(initial_peers_to_query.keys())
449 self.extra_peers = []
450 elif self.mode == MODE_WRITE:
451 # we're planning to replace all the shares, so we want a good
452 # chance of finding them all. We will keep searching until we've
453 # seen epsilon that don't have a share.
454 self.num_peers_to_query = N + self.EPSILON
455 initial_peers_to_query, must_query = self._build_initial_querylist()
456 self.required_num_empty_peers = self.EPSILON
458 # TODO: arrange to read lots of data from k-ish servers, to avoid
459 # the extra round trip required to read large directories. This
460 # might also avoid the round trip required to read the encrypted
464 initial_peers_to_query, must_query = self._build_initial_querylist()
466 # this is a set of peers that we are required to get responses from:
467 # they are peers who used to have a share, so we need to know where
468 # they currently stand, even if that means we have to wait for a
469 # silently-lost TCP connection to time out. We remove peers from this
470 # set as we get responses.
471 self._must_query = must_query
473 # now initial_peers_to_query contains the peers that we should ask,
474 # self.must_query contains the peers that we must have heard from
475 # before we can consider ourselves finished, and self.extra_peers
476 # contains the overflow (peers that we should tap if we don't get
479 self._send_initial_requests(initial_peers_to_query)
480 self._status.timings["initial_queries"] = time.time() - self._started
481 return self._done_deferred
483 def _build_initial_querylist(self):
484 initial_peers_to_query = {}
486 for peerid in self._servermap.all_peers():
487 ss = self._servermap.connections[peerid]
488 # we send queries to everyone who was already in the sharemap
489 initial_peers_to_query[peerid] = ss
490 # and we must wait for responses from them
491 must_query.add(peerid)
493 while ((self.num_peers_to_query > len(initial_peers_to_query))
494 and self.extra_peers):
495 (peerid, ss) = self.extra_peers.pop(0)
496 initial_peers_to_query[peerid] = ss
498 return initial_peers_to_query, must_query
500 def _send_initial_requests(self, peerlist):
501 self._status.set_status("Sending %d initial queries" % len(peerlist))
502 self._queries_outstanding = set()
503 self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
505 for (peerid, ss) in peerlist.items():
506 self._queries_outstanding.add(peerid)
507 self._do_query(ss, peerid, self._storage_index, self._read_size)
510 # there is nobody to ask, so we need to short-circuit the state
512 d = defer.maybeDeferred(self._check_for_done, None)
513 d.addErrback(self._fatal_error)
515 # control flow beyond this point: state machine. Receiving responses
516 # from queries is the input. We might send out more queries, or we
517 # might produce a result.
520 def _do_query(self, ss, peerid, storage_index, readsize):
521 self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
522 peerid=idlib.shortnodeid_b2a(peerid),
525 self._servermap.connections[peerid] = ss
526 started = time.time()
527 self._queries_outstanding.add(peerid)
528 d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
529 d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
531 d.addErrback(self._query_failed, peerid)
532 # errors that aren't handled by _query_failed (and errors caused by
533 # _query_failed) get logged, but we still want to check for doneness.
534 d.addErrback(log.err)
535 d.addBoth(self._check_for_done)
536 d.addErrback(self._fatal_error)
539 def _do_read(self, ss, peerid, storage_index, shnums, readv):
540 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
542 renew_secret = self._node.get_renewal_secret(peerid)
543 cancel_secret = self._node.get_cancel_secret(peerid)
544 d2 = ss.callRemote("add_lease", storage_index,
545 renew_secret, cancel_secret)
546 dl = defer.DeferredList([d, d2], consumeErrors=True)
548 [(readv_success, readv_result),
549 (addlease_success, addlease_result)] = res
550 if (not addlease_success and
551 not rrefutil.check_remote(addlease_result, IndexError)):
552 # tahoe 1.3.0 raised IndexError on non-existant buckets,
553 # which we ignore. Unfortunately tahoe <1.3.0 had a bug
554 # and raised KeyError, which we report.
555 return addlease_result # propagate error
557 dl.addCallback(_done)
561 def _got_results(self, datavs, peerid, readsize, stuff, started):
562 lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
563 peerid=idlib.shortnodeid_b2a(peerid),
564 numshares=len(datavs),
567 elapsed = now - started
568 self._queries_outstanding.discard(peerid)
569 self._servermap.reachable_peers.add(peerid)
570 self._must_query.discard(peerid)
571 self._queries_completed += 1
572 if not self._running:
573 self.log("but we're not running, so we'll ignore it", parent=lp,
575 self._status.add_per_server_time(peerid, "late", started, elapsed)
577 self._status.add_per_server_time(peerid, "query", started, elapsed)
580 self._good_peers.add(peerid)
582 self._empty_peers.add(peerid)
586 for shnum,datav in datavs.items():
589 verinfo = self._got_results_one_share(shnum, data, peerid, lp)
590 last_verinfo = verinfo
592 self._node._cache.add(verinfo, shnum, 0, data, now)
593 except CorruptShareError, e:
594 # log it and give the other shares a chance to be processed
595 f = failure.Failure()
596 self.log(format="bad share: %(f_value)s", f_value=str(f.value),
597 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
598 self.notify_server_corruption(peerid, shnum, str(e))
599 self._bad_peers.add(peerid)
600 self._last_failure = f
601 checkstring = data[:SIGNED_PREFIX_LENGTH]
602 self._servermap.mark_bad_share(peerid, shnum, checkstring)
603 self._servermap.problems.append(f)
606 self._status.timings["cumulative_verify"] += (time.time() - now)
608 if self._need_privkey and last_verinfo:
609 # send them a request for the privkey. We send one request per
611 lp2 = self.log("sending privkey request",
612 parent=lp, level=log.NOISY)
613 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
614 offsets_tuple) = last_verinfo
615 o = dict(offsets_tuple)
617 self._queries_outstanding.add(peerid)
618 readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
619 ss = self._servermap.connections[peerid]
620 privkey_started = time.time()
621 d = self._do_read(ss, peerid, self._storage_index,
623 d.addCallback(self._got_privkey_results, peerid, last_shnum,
624 privkey_started, lp2)
625 d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
626 d.addErrback(log.err)
627 d.addCallback(self._check_for_done)
628 d.addErrback(self._fatal_error)
631 self.log("_got_results done", parent=lp, level=log.NOISY)
633 def notify_server_corruption(self, peerid, shnum, reason):
634 ss = self._servermap.connections[peerid]
635 ss.callRemoteOnly("advise_corrupt_share",
636 "mutable", self._storage_index, shnum, reason)
638 def _got_results_one_share(self, shnum, data, peerid, lp):
639 self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
641 peerid=idlib.shortnodeid_b2a(peerid),
645 # this might raise NeedMoreDataError, if the pubkey and signature
646 # live at some weird offset. That shouldn't happen, so I'm going to
647 # treat it as a bad share.
648 (seqnum, root_hash, IV, k, N, segsize, datalength,
649 pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
651 if not self._node.get_pubkey():
652 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
653 assert len(fingerprint) == 32
654 if fingerprint != self._node._fingerprint:
655 raise CorruptShareError(peerid, shnum,
656 "pubkey doesn't match fingerprint")
657 self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
659 if self._need_privkey:
660 self._try_to_extract_privkey(data, peerid, shnum, lp)
662 (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
663 ig_segsize, ig_datalen, offsets) = unpack_header(data)
664 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
666 verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
669 if verinfo not in self._valid_versions:
670 # it's a new pair. Verify the signature.
671 valid = self._node._pubkey.verify(prefix, signature)
673 raise CorruptShareError(peerid, shnum, "signature is invalid")
675 # ok, it's a valid verinfo. Add it to the list of validated
677 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
678 % (seqnum, base32.b2a(root_hash)[:4],
679 idlib.shortnodeid_b2a(peerid), shnum,
680 k, N, segsize, datalength),
682 self._valid_versions.add(verinfo)
683 # We now know that this is a valid candidate verinfo.
685 if (peerid, shnum) in self._servermap.bad_shares:
686 # we've been told that the rest of the data in this share is
687 # unusable, so don't add it to the servermap.
688 self.log("but we've been told this is a bad share",
689 parent=lp, level=log.UNUSUAL)
692 # Add the info to our servermap.
693 timestamp = time.time()
694 self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
696 self.versionmap.add(verinfo, (shnum, peerid, timestamp))
699 def _deserialize_pubkey(self, pubkey_s):
700 verifier = rsa.create_verifying_key_from_string(pubkey_s)
703 def _try_to_extract_privkey(self, data, peerid, shnum, lp):
705 r = unpack_share(data)
706 except NeedMoreDataError, e:
707 # this share won't help us. oh well.
708 offset = e.encprivkey_offset
709 length = e.encprivkey_length
710 self.log("shnum %d on peerid %s: share was too short (%dB) "
711 "to get the encprivkey; [%d:%d] ought to hold it" %
712 (shnum, idlib.shortnodeid_b2a(peerid), len(data),
713 offset, offset+length),
715 # NOTE: if uncoordinated writes are taking place, someone might
716 # change the share (and most probably move the encprivkey) before
717 # we get a chance to do one of these reads and fetch it. This
718 # will cause us to see a NotEnoughSharesError(unable to fetch
719 # privkey) instead of an UncoordinatedWriteError . This is a
720 # nuisance, but it will go away when we move to DSA-based mutable
721 # files (since the privkey will be small enough to fit in the
726 (seqnum, root_hash, IV, k, N, segsize, datalen,
727 pubkey, signature, share_hash_chain, block_hash_tree,
728 share_data, enc_privkey) = r
730 return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
732 def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
734 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
735 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
736 if alleged_writekey != self._node.get_writekey():
737 self.log("invalid privkey from %s shnum %d" %
738 (idlib.nodeid_b2a(peerid)[:8], shnum),
739 parent=lp, level=log.WEIRD, umid="aJVccw")
743 self.log("got valid privkey from shnum %d on peerid %s" %
744 (shnum, idlib.shortnodeid_b2a(peerid)),
746 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
747 self._node._populate_encprivkey(enc_privkey)
748 self._node._populate_privkey(privkey)
749 self._need_privkey = False
750 self._status.set_privkey_from(peerid)
753 def _query_failed(self, f, peerid):
754 if not self._running:
757 if f.check(DeadReferenceError):
759 self.log(format="error during query: %(f_value)s",
760 f_value=str(f.value), failure=f,
761 level=level, umid="IHXuQg")
762 self._must_query.discard(peerid)
763 self._queries_outstanding.discard(peerid)
764 self._bad_peers.add(peerid)
765 self._servermap.problems.append(f)
766 # a peerid could be in both ServerMap.reachable_peers and
767 # .unreachable_peers if they responded to our query, but then an
768 # exception was raised in _got_results.
769 self._servermap.unreachable_peers.add(peerid)
770 self._queries_completed += 1
771 self._last_failure = f
773 def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
775 elapsed = now - started
776 self._status.add_per_server_time(peerid, "privkey", started, elapsed)
777 self._queries_outstanding.discard(peerid)
778 if not self._need_privkey:
780 if shnum not in datavs:
781 self.log("privkey wasn't there when we asked it",
782 level=log.WEIRD, umid="VA9uDQ")
784 datav = datavs[shnum]
785 enc_privkey = datav[0]
786 self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
788 def _privkey_query_failed(self, f, peerid, shnum, lp):
789 self._queries_outstanding.discard(peerid)
790 if not self._running:
793 if f.check(DeadReferenceError):
795 self.log(format="error during privkey query: %(f_value)s",
796 f_value=str(f.value), failure=f,
797 parent=lp, level=level, umid="McoJ5w")
798 self._servermap.problems.append(f)
799 self._last_failure = f
801 def _check_for_done(self, res):
803 # return self._send_more_queries(outstanding) : send some more queries
804 # return self._done() : all done
805 # return : keep waiting, no new queries
807 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
808 "%(outstanding)d queries outstanding, "
809 "%(extra)d extra peers available, "
810 "%(must)d 'must query' peers left, "
811 "need_privkey=%(need_privkey)s"
814 outstanding=len(self._queries_outstanding),
815 extra=len(self.extra_peers),
816 must=len(self._must_query),
817 need_privkey=self._need_privkey,
821 if not self._running:
822 self.log("but we're not running", parent=lp, level=log.NOISY)
826 # we are still waiting for responses from peers that used to have
827 # a share, so we must continue to wait. No additional queries are
828 # required at this time.
829 self.log("%d 'must query' peers left" % len(self._must_query),
830 level=log.NOISY, parent=lp)
833 if (not self._queries_outstanding and not self.extra_peers):
834 # all queries have retired, and we have no peers left to ask. No
835 # more progress can be made, therefore we are done.
836 self.log("all queries are retired, no extra peers: done",
840 recoverable_versions = self._servermap.recoverable_versions()
841 unrecoverable_versions = self._servermap.unrecoverable_versions()
843 # what is our completion policy? how hard should we work?
845 if self.mode == MODE_ANYTHING:
846 if recoverable_versions:
847 self.log("%d recoverable versions: done"
848 % len(recoverable_versions),
852 if self.mode == MODE_CHECK:
853 # we used self._must_query, and we know there aren't any
854 # responses still waiting, so that means we must be done
855 self.log("done", parent=lp)
859 if self.mode == MODE_READ:
860 # if we've queried k+epsilon servers, and we see a recoverable
861 # version, and we haven't seen any unrecoverable higher-seqnum'ed
862 # versions, then we're done.
864 if self._queries_completed < self.num_peers_to_query:
865 self.log(format="%(completed)d completed, %(query)d to query: need more",
866 completed=self._queries_completed,
867 query=self.num_peers_to_query,
868 level=log.NOISY, parent=lp)
869 return self._send_more_queries(MAX_IN_FLIGHT)
870 if not recoverable_versions:
871 self.log("no recoverable versions: need more",
872 level=log.NOISY, parent=lp)
873 return self._send_more_queries(MAX_IN_FLIGHT)
874 highest_recoverable = max(recoverable_versions)
875 highest_recoverable_seqnum = highest_recoverable[0]
876 for unrec_verinfo in unrecoverable_versions:
877 if unrec_verinfo[0] > highest_recoverable_seqnum:
878 # there is evidence of a higher-seqnum version, but we
879 # don't yet see enough shares to recover it. Try harder.
880 # TODO: consider sending more queries.
881 # TODO: consider limiting the search distance
882 self.log("evidence of higher seqnum: need more",
883 level=log.UNUSUAL, parent=lp)
884 return self._send_more_queries(MAX_IN_FLIGHT)
885 # all the unrecoverable versions were old or concurrent with a
886 # recoverable version. Good enough.
887 self.log("no higher-seqnum: done", parent=lp)
890 if self.mode == MODE_WRITE:
891 # we want to keep querying until we've seen a few that don't have
892 # any shares, to be sufficiently confident that we've seen all
893 # the shares. This is still less work than MODE_CHECK, which asks
894 # every server in the world.
896 if not recoverable_versions:
897 self.log("no recoverable versions: need more", parent=lp,
899 return self._send_more_queries(MAX_IN_FLIGHT)
902 last_not_responded = -1
903 num_not_responded = 0
906 found_boundary = False
908 for i,(peerid,ss) in enumerate(self.full_peerlist):
909 if peerid in self._bad_peers:
912 #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
913 elif peerid in self._empty_peers:
916 #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
919 if num_not_found >= self.EPSILON:
920 self.log("found our boundary, %s" %
922 parent=lp, level=log.NOISY)
923 found_boundary = True
926 elif peerid in self._good_peers:
929 #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
935 #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
936 last_not_responded = i
937 num_not_responded += 1
940 # we need to know that we've gotten answers from
941 # everybody to the left of here
942 if last_not_responded == -1:
944 self.log("have all our answers",
945 parent=lp, level=log.NOISY)
946 # .. unless we're still waiting on the privkey
947 if self._need_privkey:
948 self.log("but we're still waiting for the privkey",
949 parent=lp, level=log.NOISY)
950 # if we found the boundary but we haven't yet found
951 # the privkey, we may need to look further. If
952 # somehow all the privkeys were corrupted (but the
953 # shares were readable), then this is likely to do an
955 return self._send_more_queries(MAX_IN_FLIGHT)
957 # still waiting for somebody
958 return self._send_more_queries(num_not_responded)
960 # if we hit here, we didn't find our boundary, so we're still
962 self.log("no boundary yet, %s" % "".join(states), parent=lp,
964 return self._send_more_queries(MAX_IN_FLIGHT)
966 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
967 # arbitrary, really I want this to be something like k -
968 # max(known_version_sharecounts) + some extra
969 self.log("catchall: need more", parent=lp, level=log.NOISY)
970 return self._send_more_queries(MAX_IN_FLIGHT)
972 def _send_more_queries(self, num_outstanding):
976 self.log(format=" there are %(outstanding)d queries outstanding",
977 outstanding=len(self._queries_outstanding),
979 active_queries = len(self._queries_outstanding) + len(more_queries)
980 if active_queries >= num_outstanding:
982 if not self.extra_peers:
984 more_queries.append(self.extra_peers.pop(0))
986 self.log(format="sending %(more)d more queries: %(who)s",
987 more=len(more_queries),
988 who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
989 for (peerid,ss) in more_queries]),
992 for (peerid, ss) in more_queries:
993 self._do_query(ss, peerid, self._storage_index, self._read_size)
994 # we'll retrigger when those queries come back
997 if not self._running:
999 self._running = False
1001 elapsed = now - self._started
1002 self._status.set_finished(now)
1003 self._status.timings["total"] = elapsed
1004 self._status.set_progress(1.0)
1005 self._status.set_status("Done")
1006 self._status.set_active(False)
1008 self._servermap.last_update_mode = self.mode
1009 self._servermap.last_update_time = self._started
1010 # the servermap will not be touched after this
1011 self.log("servermap: %s" % self._servermap.summarize_versions())
1012 eventually(self._done_deferred.callback, self._servermap)
1014 def _fatal_error(self, f):
1015 self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1016 self._done_deferred.errback(f)