3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually
8 from allmydata.util import base32, hashutil, idlib, log
9 from allmydata.util.dictutil import DictOfSets
10 from allmydata.storage.server import si_b2a
11 from allmydata.interfaces import IServermapUpdaterStatus
12 from pycryptopp.publickey import rsa
14 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
15 CorruptShareError, NeedMoreDataError
16 from allmydata.mutable.layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
20 implements(IServermapUpdaterStatus)
21 statusid_counter = count(0)
24 self.timings["per_server"] = {}
25 self.timings["cumulative_verify"] = 0.0
26 self.privkey_from = None
29 self.storage_index = None
31 self.status = "Not started"
33 self.counter = self.statusid_counter.next()
34 self.started = time.time()
37 def add_per_server_time(self, peerid, op, sent, elapsed):
38 assert op in ("query", "late", "privkey")
39 if peerid not in self.timings["per_server"]:
40 self.timings["per_server"][peerid] = []
41 self.timings["per_server"][peerid].append((op,sent,elapsed))
43 def get_started(self):
45 def get_finished(self):
47 def get_storage_index(self):
48 return self.storage_index
51 def get_servermap(self):
53 def get_privkey_from(self):
54 return self.privkey_from
55 def using_helper(self):
61 def get_progress(self):
65 def get_counter(self):
68 def set_storage_index(self, si):
69 self.storage_index = si
70 def set_mode(self, mode):
72 def set_privkey_from(self, peerid):
73 self.privkey_from = peerid
74 def set_status(self, status):
76 def set_progress(self, value):
78 def set_active(self, value):
80 def set_finished(self, when):
84 """I record the placement of mutable shares.
86 This object records which shares (of various versions) are located on
89 One purpose I serve is to inform callers about which versions of the
90 mutable file are recoverable and 'current'.
92 A second purpose is to serve as a state marker for test-and-set
93 operations. I am passed out of retrieval operations and back into publish
94 operations, which means 'publish this new version, but only if nothing
95 has changed since I last retrieved this data'. This reduces the chances
96 of clobbering a simultaneous (uncoordinated) write.
98 @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
99 (versionid, timestamp) tuple. Each 'versionid' is a
100 tuple of (seqnum, root_hash, IV, segsize, datalength,
101 k, N, signed_prefix, offsets)
103 @ivar connections: maps peerid to a RemoteReference
105 @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
106 shares that I should ignore (because a previous user of
107 the servermap determined that they were invalid). The
108 updater only locates a certain number of shares: if
109 some of these turn out to have integrity problems and
110 are unusable, the caller will need to mark those shares
111 as bad, then re-update the servermap, then try again.
112 The dict maps (peerid, shnum) tuple to old checkstring.
117 self.connections = {}
118 self.unreachable_peers = set() # peerids that didn't respond to queries
119 self.reachable_peers = set() # peerids that did respond to queries
120 self.problems = [] # mostly for debugging
121 self.bad_shares = {} # maps (peerid,shnum) to old checkstring
122 self.last_update_mode = None
123 self.last_update_time = 0
127 s.servermap = self.servermap.copy() # tuple->tuple
128 s.connections = self.connections.copy() # str->RemoteReference
129 s.unreachable_peers = set(self.unreachable_peers)
130 s.reachable_peers = set(self.reachable_peers)
131 s.problems = self.problems[:]
132 s.bad_shares = self.bad_shares.copy() # tuple->str
133 s.last_update_mode = self.last_update_mode
134 s.last_update_time = self.last_update_time
137 def mark_bad_share(self, peerid, shnum, checkstring):
138 """This share was found to be bad, either in the checkstring or
139 signature (detected during mapupdate), or deeper in the share
140 (detected at retrieve time). Remove it from our list of useful
141 shares, and remember that it is bad so we don't add it back again
142 later. We record the share's old checkstring (which might be
143 corrupted or badly signed) so that a repair operation can do the
144 test-and-set using it as a reference.
146 key = (peerid, shnum) # record checkstring
147 self.bad_shares[key] = checkstring
148 self.servermap.pop(key, None)
150 def add_new_share(self, peerid, shnum, verinfo, timestamp):
151 """We've written a new share out, replacing any that was there
153 key = (peerid, shnum)
154 self.bad_shares.pop(key, None)
155 self.servermap[key] = (verinfo, timestamp)
157 def dump(self, out=sys.stdout):
158 print >>out, "servermap:"
160 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
161 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
162 offsets_tuple) = verinfo
163 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
164 (idlib.shortnodeid_b2a(peerid), shnum,
165 seqnum, base32.b2a(root_hash)[:4], k, N,
168 print >>out, "%d PROBLEMS" % len(self.problems)
169 for f in self.problems:
178 def all_peers_for_version(self, verinfo):
179 """Return a set of peerids that hold shares for the given version."""
181 for ( (peerid, shnum), (verinfo2, timestamp) )
182 in self.servermap.items()
183 if verinfo == verinfo2])
185 def make_sharemap(self):
186 """Return a dict that maps shnum to a set of peerds that hold it."""
187 sharemap = DictOfSets()
188 for (peerid, shnum) in self.servermap:
189 sharemap.add(shnum, peerid)
192 def make_versionmap(self):
193 """Return a dict that maps versionid to sets of (shnum, peerid,
194 timestamp) tuples."""
195 versionmap = DictOfSets()
196 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
197 versionmap.add(verinfo, (shnum, peerid, timestamp))
200 def shares_on_peer(self, peerid):
202 for (s_peerid, shnum)
204 if s_peerid == peerid])
206 def version_on_peer(self, peerid, shnum):
207 key = (peerid, shnum)
208 if key in self.servermap:
209 (verinfo, timestamp) = self.servermap[key]
213 def shares_available(self):
214 """Return a dict that maps verinfo to tuples of
215 (num_distinct_shares, k, N) tuples."""
216 versionmap = self.make_versionmap()
218 for verinfo, shares in versionmap.items():
220 for (shnum, peerid, timestamp) in shares:
222 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
223 offsets_tuple) = verinfo
224 all_shares[verinfo] = (len(s), k, N)
227 def highest_seqnum(self):
228 available = self.shares_available()
229 seqnums = [verinfo[0]
230 for verinfo in available.keys()]
234 def summarize_version(self, verinfo):
235 """Take a versionid, return a string that describes it."""
236 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
237 offsets_tuple) = verinfo
238 return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
240 def summarize_versions(self):
241 """Return a string describing which versions we know about."""
242 versionmap = self.make_versionmap()
244 for (verinfo, shares) in versionmap.items():
245 vstr = self.summarize_version(verinfo)
246 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
247 bits.append("%d*%s" % (len(shnums), vstr))
248 return "/".join(bits)
250 def recoverable_versions(self):
251 """Return a set of versionids, one for each version that is currently
253 versionmap = self.make_versionmap()
255 recoverable_versions = set()
256 for (verinfo, shares) in versionmap.items():
257 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
258 offsets_tuple) = verinfo
259 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
261 # this one is recoverable
262 recoverable_versions.add(verinfo)
264 return recoverable_versions
266 def unrecoverable_versions(self):
267 """Return a set of versionids, one for each version that is currently
269 versionmap = self.make_versionmap()
271 unrecoverable_versions = set()
272 for (verinfo, shares) in versionmap.items():
273 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
274 offsets_tuple) = verinfo
275 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
277 unrecoverable_versions.add(verinfo)
279 return unrecoverable_versions
281 def best_recoverable_version(self):
282 """Return a single versionid, for the so-called 'best' recoverable
283 version. Sequence number is the primary sort criteria, followed by
284 root hash. Returns None if there are no recoverable versions."""
285 recoverable = list(self.recoverable_versions())
288 return recoverable[-1]
291 def size_of_version(self, verinfo):
292 """Given a versionid (perhaps returned by best_recoverable_version),
293 return the size of the file in bytes."""
294 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
295 offsets_tuple) = verinfo
298 def unrecoverable_newer_versions(self):
299 # Return a dict of versionid -> health, for versions that are
300 # unrecoverable and have later seqnums than any recoverable versions.
301 # These indicate that a write will lose data.
302 versionmap = self.make_versionmap()
303 healths = {} # maps verinfo to (found,k)
304 unrecoverable = set()
305 highest_recoverable_seqnum = -1
306 for (verinfo, shares) in versionmap.items():
307 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
308 offsets_tuple) = verinfo
309 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
310 healths[verinfo] = (len(shnums),k)
312 unrecoverable.add(verinfo)
314 highest_recoverable_seqnum = max(seqnum,
315 highest_recoverable_seqnum)
318 for verinfo in unrecoverable:
319 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
320 offsets_tuple) = verinfo
321 if seqnum > highest_recoverable_seqnum:
322 newversions[verinfo] = healths[verinfo]
327 def needs_merge(self):
328 # return True if there are multiple recoverable versions with the
329 # same seqnum, meaning that MutableFileNode.read_best_version is not
330 # giving you the whole story, and that using its data to do a
331 # subsequent publish will lose information.
332 recoverable_seqnums = [verinfo[0]
333 for verinfo in self.recoverable_versions()]
334 for seqnum in recoverable_seqnums:
335 if recoverable_seqnums.count(seqnum) > 1:
340 class ServermapUpdater:
341 def __init__(self, filenode, storage_broker, monitor, servermap,
342 mode=MODE_READ, add_lease=False):
343 """I update a servermap, locating a sufficient number of useful
344 shares and remembering where they are located.
348 self._node = filenode
349 self._storage_broker = storage_broker
350 self._monitor = monitor
351 self._servermap = servermap
353 self._add_lease = add_lease
356 self._storage_index = filenode.get_storage_index()
357 self._last_failure = None
359 self._status = UpdateStatus()
360 self._status.set_storage_index(self._storage_index)
361 self._status.set_progress(0.0)
362 self._status.set_mode(mode)
364 self._servers_responded = set()
366 # how much data should we read?
367 # * if we only need the checkstring, then [0:75]
368 # * if we need to validate the checkstring sig, then [543ish:799ish]
369 # * if we need the verification key, then [107:436ish]
370 # * the offset table at [75:107] tells us about the 'ish'
371 # * if we need the encrypted private key, we want [-1216ish:]
372 # * but we can't read from negative offsets
373 # * the offset table tells us the 'ish', also the positive offset
374 # A future version of the SMDF slot format should consider using
375 # fixed-size slots so we can retrieve less data. For now, we'll just
376 # read 4000 bytes, which also happens to read enough actual data to
377 # pre-fetch an 18-entry dirnode.
378 self._read_size = 4000
379 if mode == MODE_CHECK:
380 # we use unpack_prefix_and_signature, so we need 1k
381 self._read_size = 1000
382 self._need_privkey = False
383 if mode == MODE_WRITE and not self._node.get_privkey():
384 self._need_privkey = True
385 # check+repair: repair requires the privkey, so if we didn't happen
386 # to ask for it during the check, we'll have problems doing the
389 prefix = si_b2a(self._storage_index)[:5]
390 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
391 si=prefix, mode=mode)
393 def get_status(self):
396 def log(self, *args, **kwargs):
397 if "parent" not in kwargs:
398 kwargs["parent"] = self._log_number
399 if "facility" not in kwargs:
400 kwargs["facility"] = "tahoe.mutable.mapupdate"
401 return log.msg(*args, **kwargs)
404 """Update the servermap to reflect current conditions. Returns a
405 Deferred that fires with the servermap once the update has finished."""
406 self._started = time.time()
407 self._status.set_active(True)
409 # self._valid_versions is a set of validated verinfo tuples. We just
410 # use it to remember which versions had valid signatures, so we can
411 # avoid re-checking the signatures for each share.
412 self._valid_versions = set()
414 # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
415 # timestamp) tuples. This is used to figure out which versions might
416 # be retrievable, and to make the eventual data download faster.
417 self.versionmap = DictOfSets()
419 self._done_deferred = defer.Deferred()
421 # first, which peers should be talk to? Any that were in our old
422 # servermap, plus "enough" others.
424 self._queries_completed = 0
426 sb = self._storage_broker
427 full_peerlist = sb.get_servers_for_index(self._storage_index)
428 self.full_peerlist = full_peerlist # for use later, immutable
429 self.extra_peers = full_peerlist[:] # peers are removed as we use them
430 self._good_peers = set() # peers who had some shares
431 self._empty_peers = set() # peers who don't have any shares
432 self._bad_peers = set() # peers to whom our queries failed
434 k = self._node.get_required_shares()
438 N = self._node.get_total_shares()
442 # we want to send queries to at least this many peers (although we
443 # might not wait for all of their answers to come back)
444 self.num_peers_to_query = k + self.EPSILON
446 if self.mode == MODE_CHECK:
447 initial_peers_to_query = dict(full_peerlist)
448 must_query = set(initial_peers_to_query.keys())
449 self.extra_peers = []
450 elif self.mode == MODE_WRITE:
451 # we're planning to replace all the shares, so we want a good
452 # chance of finding them all. We will keep searching until we've
453 # seen epsilon that don't have a share.
454 self.num_peers_to_query = N + self.EPSILON
455 initial_peers_to_query, must_query = self._build_initial_querylist()
456 self.required_num_empty_peers = self.EPSILON
458 # TODO: arrange to read lots of data from k-ish servers, to avoid
459 # the extra round trip required to read large directories. This
460 # might also avoid the round trip required to read the encrypted
464 initial_peers_to_query, must_query = self._build_initial_querylist()
466 # this is a set of peers that we are required to get responses from:
467 # they are peers who used to have a share, so we need to know where
468 # they currently stand, even if that means we have to wait for a
469 # silently-lost TCP connection to time out. We remove peers from this
470 # set as we get responses.
471 self._must_query = must_query
473 # now initial_peers_to_query contains the peers that we should ask,
474 # self.must_query contains the peers that we must have heard from
475 # before we can consider ourselves finished, and self.extra_peers
476 # contains the overflow (peers that we should tap if we don't get
479 self._send_initial_requests(initial_peers_to_query)
480 self._status.timings["initial_queries"] = time.time() - self._started
481 return self._done_deferred
483 def _build_initial_querylist(self):
484 initial_peers_to_query = {}
486 for peerid in self._servermap.all_peers():
487 ss = self._servermap.connections[peerid]
488 # we send queries to everyone who was already in the sharemap
489 initial_peers_to_query[peerid] = ss
490 # and we must wait for responses from them
491 must_query.add(peerid)
493 while ((self.num_peers_to_query > len(initial_peers_to_query))
494 and self.extra_peers):
495 (peerid, ss) = self.extra_peers.pop(0)
496 initial_peers_to_query[peerid] = ss
498 return initial_peers_to_query, must_query
500 def _send_initial_requests(self, peerlist):
501 self._status.set_status("Sending %d initial queries" % len(peerlist))
502 self._queries_outstanding = set()
503 self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
504 for (peerid, ss) in peerlist.items():
505 self._queries_outstanding.add(peerid)
506 self._do_query(ss, peerid, self._storage_index, self._read_size)
509 # there is nobody to ask, so we need to short-circuit the state
511 d = defer.maybeDeferred(self._check_for_done, None)
512 d.addErrback(self._fatal_error)
514 # control flow beyond this point: state machine. Receiving responses
515 # from queries is the input. We might send out more queries, or we
516 # might produce a result.
519 def _do_query(self, ss, peerid, storage_index, readsize):
520 self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
521 peerid=idlib.shortnodeid_b2a(peerid),
524 self._servermap.connections[peerid] = ss
525 started = time.time()
526 self._queries_outstanding.add(peerid)
527 d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
528 d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
530 d.addErrback(self._query_failed, peerid)
531 # errors that aren't handled by _query_failed (and errors caused by
532 # _query_failed) get logged, but we still want to check for doneness.
533 d.addErrback(log.err)
534 d.addBoth(self._check_for_done)
535 d.addErrback(self._fatal_error)
538 def _do_read(self, ss, peerid, storage_index, shnums, readv):
540 # send an add-lease message in parallel. The results are handled
541 # separately. This is sent before the slot_readv() so that we can
542 # be sure the add_lease is retired by the time slot_readv comes
543 # back (this relies upon our knowledge that the server code for
544 # add_lease is synchronous).
545 renew_secret = self._node.get_renewal_secret(peerid)
546 cancel_secret = self._node.get_cancel_secret(peerid)
547 d2 = ss.callRemote("add_lease", storage_index,
548 renew_secret, cancel_secret)
550 d2.addErrback(self._add_lease_failed, peerid, storage_index)
551 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
554 def _got_results(self, datavs, peerid, readsize, stuff, started):
555 lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
556 peerid=idlib.shortnodeid_b2a(peerid),
557 numshares=len(datavs),
560 elapsed = now - started
561 self._queries_outstanding.discard(peerid)
562 self._servermap.reachable_peers.add(peerid)
563 self._must_query.discard(peerid)
564 self._queries_completed += 1
565 if not self._running:
566 self.log("but we're not running, so we'll ignore it", parent=lp,
568 self._status.add_per_server_time(peerid, "late", started, elapsed)
570 self._status.add_per_server_time(peerid, "query", started, elapsed)
573 self._good_peers.add(peerid)
575 self._empty_peers.add(peerid)
579 for shnum,datav in datavs.items():
582 verinfo = self._got_results_one_share(shnum, data, peerid, lp)
583 last_verinfo = verinfo
585 self._node._add_to_cache(verinfo, shnum, 0, data)
586 except CorruptShareError, e:
587 # log it and give the other shares a chance to be processed
588 f = failure.Failure()
589 self.log(format="bad share: %(f_value)s", f_value=str(f.value),
590 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
591 self.notify_server_corruption(peerid, shnum, str(e))
592 self._bad_peers.add(peerid)
593 self._last_failure = f
594 checkstring = data[:SIGNED_PREFIX_LENGTH]
595 self._servermap.mark_bad_share(peerid, shnum, checkstring)
596 self._servermap.problems.append(f)
599 self._status.timings["cumulative_verify"] += (time.time() - now)
601 if self._need_privkey and last_verinfo:
602 # send them a request for the privkey. We send one request per
604 lp2 = self.log("sending privkey request",
605 parent=lp, level=log.NOISY)
606 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
607 offsets_tuple) = last_verinfo
608 o = dict(offsets_tuple)
610 self._queries_outstanding.add(peerid)
611 readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
612 ss = self._servermap.connections[peerid]
613 privkey_started = time.time()
614 d = self._do_read(ss, peerid, self._storage_index,
616 d.addCallback(self._got_privkey_results, peerid, last_shnum,
617 privkey_started, lp2)
618 d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
619 d.addErrback(log.err)
620 d.addCallback(self._check_for_done)
621 d.addErrback(self._fatal_error)
624 self.log("_got_results done", parent=lp, level=log.NOISY)
626 def notify_server_corruption(self, peerid, shnum, reason):
627 ss = self._servermap.connections[peerid]
628 ss.callRemoteOnly("advise_corrupt_share",
629 "mutable", self._storage_index, shnum, reason)
631 def _got_results_one_share(self, shnum, data, peerid, lp):
632 self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
634 peerid=idlib.shortnodeid_b2a(peerid),
638 # this might raise NeedMoreDataError, if the pubkey and signature
639 # live at some weird offset. That shouldn't happen, so I'm going to
640 # treat it as a bad share.
641 (seqnum, root_hash, IV, k, N, segsize, datalength,
642 pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
644 if not self._node.get_pubkey():
645 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
646 assert len(fingerprint) == 32
647 if fingerprint != self._node.get_fingerprint():
648 raise CorruptShareError(peerid, shnum,
649 "pubkey doesn't match fingerprint")
650 self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
652 if self._need_privkey:
653 self._try_to_extract_privkey(data, peerid, shnum, lp)
655 (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
656 ig_segsize, ig_datalen, offsets) = unpack_header(data)
657 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
659 verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
662 if verinfo not in self._valid_versions:
663 # it's a new pair. Verify the signature.
664 valid = self._node.get_pubkey().verify(prefix, signature)
666 raise CorruptShareError(peerid, shnum, "signature is invalid")
668 # ok, it's a valid verinfo. Add it to the list of validated
670 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
671 % (seqnum, base32.b2a(root_hash)[:4],
672 idlib.shortnodeid_b2a(peerid), shnum,
673 k, N, segsize, datalength),
675 self._valid_versions.add(verinfo)
676 # We now know that this is a valid candidate verinfo.
678 if (peerid, shnum) in self._servermap.bad_shares:
679 # we've been told that the rest of the data in this share is
680 # unusable, so don't add it to the servermap.
681 self.log("but we've been told this is a bad share",
682 parent=lp, level=log.UNUSUAL)
685 # Add the info to our servermap.
686 timestamp = time.time()
687 self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
689 self.versionmap.add(verinfo, (shnum, peerid, timestamp))
692 def _deserialize_pubkey(self, pubkey_s):
693 verifier = rsa.create_verifying_key_from_string(pubkey_s)
696 def _try_to_extract_privkey(self, data, peerid, shnum, lp):
698 r = unpack_share(data)
699 except NeedMoreDataError, e:
700 # this share won't help us. oh well.
701 offset = e.encprivkey_offset
702 length = e.encprivkey_length
703 self.log("shnum %d on peerid %s: share was too short (%dB) "
704 "to get the encprivkey; [%d:%d] ought to hold it" %
705 (shnum, idlib.shortnodeid_b2a(peerid), len(data),
706 offset, offset+length),
708 # NOTE: if uncoordinated writes are taking place, someone might
709 # change the share (and most probably move the encprivkey) before
710 # we get a chance to do one of these reads and fetch it. This
711 # will cause us to see a NotEnoughSharesError(unable to fetch
712 # privkey) instead of an UncoordinatedWriteError . This is a
713 # nuisance, but it will go away when we move to DSA-based mutable
714 # files (since the privkey will be small enough to fit in the
719 (seqnum, root_hash, IV, k, N, segsize, datalen,
720 pubkey, signature, share_hash_chain, block_hash_tree,
721 share_data, enc_privkey) = r
723 return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
725 def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
727 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
728 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
729 if alleged_writekey != self._node.get_writekey():
730 self.log("invalid privkey from %s shnum %d" %
731 (idlib.nodeid_b2a(peerid)[:8], shnum),
732 parent=lp, level=log.WEIRD, umid="aJVccw")
736 self.log("got valid privkey from shnum %d on peerid %s" %
737 (shnum, idlib.shortnodeid_b2a(peerid)),
739 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
740 self._node._populate_encprivkey(enc_privkey)
741 self._node._populate_privkey(privkey)
742 self._need_privkey = False
743 self._status.set_privkey_from(peerid)
746 def _add_lease_failed(self, f, peerid, storage_index):
747 # Older versions of Tahoe didn't handle the add-lease message very
748 # well: <=1.1.0 throws a NameError because it doesn't implement
749 # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
750 # (which is most of them, since we send add-lease to everybody,
751 # before we know whether or not they have any shares for us), and
752 # 1.2.0 throws KeyError even on known buckets due to an internal bug
753 # in the latency-measuring code.
755 # we want to ignore the known-harmless errors and log the others. In
756 # particular we want to log any local errors caused by coding
759 if f.check(DeadReferenceError):
761 if f.check(RemoteException):
762 if f.value.failure.check(KeyError, IndexError, NameError):
763 # this may ignore a bit too much, but that only hurts us
766 self.log(format="error in add_lease from [%(peerid)s]: %(f_value)s",
767 peerid=idlib.shortnodeid_b2a(peerid),
768 f_value=str(f.value),
770 level=log.WEIRD, umid="iqg3mw")
772 # local errors are cause for alarm
774 format="local error in add_lease to [%(peerid)s]: %(f_value)s",
775 peerid=idlib.shortnodeid_b2a(peerid),
776 f_value=str(f.value),
777 level=log.WEIRD, umid="ZWh6HA")
779 def _query_failed(self, f, peerid):
780 if not self._running:
783 if f.check(DeadReferenceError):
785 self.log(format="error during query: %(f_value)s",
786 f_value=str(f.value), failure=f,
787 level=level, umid="IHXuQg")
788 self._must_query.discard(peerid)
789 self._queries_outstanding.discard(peerid)
790 self._bad_peers.add(peerid)
791 self._servermap.problems.append(f)
792 # a peerid could be in both ServerMap.reachable_peers and
793 # .unreachable_peers if they responded to our query, but then an
794 # exception was raised in _got_results.
795 self._servermap.unreachable_peers.add(peerid)
796 self._queries_completed += 1
797 self._last_failure = f
799 def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
801 elapsed = now - started
802 self._status.add_per_server_time(peerid, "privkey", started, elapsed)
803 self._queries_outstanding.discard(peerid)
804 if not self._need_privkey:
806 if shnum not in datavs:
807 self.log("privkey wasn't there when we asked it",
808 level=log.WEIRD, umid="VA9uDQ")
810 datav = datavs[shnum]
811 enc_privkey = datav[0]
812 self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
814 def _privkey_query_failed(self, f, peerid, shnum, lp):
815 self._queries_outstanding.discard(peerid)
816 if not self._running:
819 if f.check(DeadReferenceError):
821 self.log(format="error during privkey query: %(f_value)s",
822 f_value=str(f.value), failure=f,
823 parent=lp, level=level, umid="McoJ5w")
824 self._servermap.problems.append(f)
825 self._last_failure = f
827 def _check_for_done(self, res):
829 # return self._send_more_queries(outstanding) : send some more queries
830 # return self._done() : all done
831 # return : keep waiting, no new queries
833 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
834 "%(outstanding)d queries outstanding, "
835 "%(extra)d extra peers available, "
836 "%(must)d 'must query' peers left, "
837 "need_privkey=%(need_privkey)s"
840 outstanding=len(self._queries_outstanding),
841 extra=len(self.extra_peers),
842 must=len(self._must_query),
843 need_privkey=self._need_privkey,
847 if not self._running:
848 self.log("but we're not running", parent=lp, level=log.NOISY)
852 # we are still waiting for responses from peers that used to have
853 # a share, so we must continue to wait. No additional queries are
854 # required at this time.
855 self.log("%d 'must query' peers left" % len(self._must_query),
856 level=log.NOISY, parent=lp)
859 if (not self._queries_outstanding and not self.extra_peers):
860 # all queries have retired, and we have no peers left to ask. No
861 # more progress can be made, therefore we are done.
862 self.log("all queries are retired, no extra peers: done",
866 recoverable_versions = self._servermap.recoverable_versions()
867 unrecoverable_versions = self._servermap.unrecoverable_versions()
869 # what is our completion policy? how hard should we work?
871 if self.mode == MODE_ANYTHING:
872 if recoverable_versions:
873 self.log("%d recoverable versions: done"
874 % len(recoverable_versions),
878 if self.mode == MODE_CHECK:
879 # we used self._must_query, and we know there aren't any
880 # responses still waiting, so that means we must be done
881 self.log("done", parent=lp)
885 if self.mode == MODE_READ:
886 # if we've queried k+epsilon servers, and we see a recoverable
887 # version, and we haven't seen any unrecoverable higher-seqnum'ed
888 # versions, then we're done.
890 if self._queries_completed < self.num_peers_to_query:
891 self.log(format="%(completed)d completed, %(query)d to query: need more",
892 completed=self._queries_completed,
893 query=self.num_peers_to_query,
894 level=log.NOISY, parent=lp)
895 return self._send_more_queries(MAX_IN_FLIGHT)
896 if not recoverable_versions:
897 self.log("no recoverable versions: need more",
898 level=log.NOISY, parent=lp)
899 return self._send_more_queries(MAX_IN_FLIGHT)
900 highest_recoverable = max(recoverable_versions)
901 highest_recoverable_seqnum = highest_recoverable[0]
902 for unrec_verinfo in unrecoverable_versions:
903 if unrec_verinfo[0] > highest_recoverable_seqnum:
904 # there is evidence of a higher-seqnum version, but we
905 # don't yet see enough shares to recover it. Try harder.
906 # TODO: consider sending more queries.
907 # TODO: consider limiting the search distance
908 self.log("evidence of higher seqnum: need more",
909 level=log.UNUSUAL, parent=lp)
910 return self._send_more_queries(MAX_IN_FLIGHT)
911 # all the unrecoverable versions were old or concurrent with a
912 # recoverable version. Good enough.
913 self.log("no higher-seqnum: done", parent=lp)
916 if self.mode == MODE_WRITE:
917 # we want to keep querying until we've seen a few that don't have
918 # any shares, to be sufficiently confident that we've seen all
919 # the shares. This is still less work than MODE_CHECK, which asks
920 # every server in the world.
922 if not recoverable_versions:
923 self.log("no recoverable versions: need more", parent=lp,
925 return self._send_more_queries(MAX_IN_FLIGHT)
928 last_not_responded = -1
929 num_not_responded = 0
932 found_boundary = False
934 for i,(peerid,ss) in enumerate(self.full_peerlist):
935 if peerid in self._bad_peers:
938 #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
939 elif peerid in self._empty_peers:
942 #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
945 if num_not_found >= self.EPSILON:
946 self.log("found our boundary, %s" %
948 parent=lp, level=log.NOISY)
949 found_boundary = True
952 elif peerid in self._good_peers:
955 #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
961 #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
962 last_not_responded = i
963 num_not_responded += 1
966 # we need to know that we've gotten answers from
967 # everybody to the left of here
968 if last_not_responded == -1:
970 self.log("have all our answers",
971 parent=lp, level=log.NOISY)
972 # .. unless we're still waiting on the privkey
973 if self._need_privkey:
974 self.log("but we're still waiting for the privkey",
975 parent=lp, level=log.NOISY)
976 # if we found the boundary but we haven't yet found
977 # the privkey, we may need to look further. If
978 # somehow all the privkeys were corrupted (but the
979 # shares were readable), then this is likely to do an
981 return self._send_more_queries(MAX_IN_FLIGHT)
983 # still waiting for somebody
984 return self._send_more_queries(num_not_responded)
986 # if we hit here, we didn't find our boundary, so we're still
988 self.log("no boundary yet, %s" % "".join(states), parent=lp,
990 return self._send_more_queries(MAX_IN_FLIGHT)
992 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
993 # arbitrary, really I want this to be something like k -
994 # max(known_version_sharecounts) + some extra
995 self.log("catchall: need more", parent=lp, level=log.NOISY)
996 return self._send_more_queries(MAX_IN_FLIGHT)
998 def _send_more_queries(self, num_outstanding):
1002 self.log(format=" there are %(outstanding)d queries outstanding",
1003 outstanding=len(self._queries_outstanding),
1005 active_queries = len(self._queries_outstanding) + len(more_queries)
1006 if active_queries >= num_outstanding:
1008 if not self.extra_peers:
1010 more_queries.append(self.extra_peers.pop(0))
1012 self.log(format="sending %(more)d more queries: %(who)s",
1013 more=len(more_queries),
1014 who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
1015 for (peerid,ss) in more_queries]),
1018 for (peerid, ss) in more_queries:
1019 self._do_query(ss, peerid, self._storage_index, self._read_size)
1020 # we'll retrigger when those queries come back
1023 if not self._running:
1025 self._running = False
1027 elapsed = now - self._started
1028 self._status.set_finished(now)
1029 self._status.timings["total"] = elapsed
1030 self._status.set_progress(1.0)
1031 self._status.set_status("Finished")
1032 self._status.set_active(False)
1034 self._servermap.last_update_mode = self.mode
1035 self._servermap.last_update_time = self._started
1036 # the servermap will not be touched after this
1037 self.log("servermap: %s" % self._servermap.summarize_versions())
1038 eventually(self._done_deferred.callback, self._servermap)
1040 def _fatal_error(self, f):
1041 self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1042 self._done_deferred.errback(f)