3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually
8 from allmydata.util import base32, hashutil, idlib, log
9 from allmydata.storage.server import si_b2a
10 from allmydata.interfaces import IServermapUpdaterStatus
11 from pycryptopp.publickey import rsa
13 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
14 DictOfSets, CorruptShareError, NeedMoreDataError
15 from allmydata.mutable.layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
19 implements(IServermapUpdaterStatus)
20 statusid_counter = count(0)
23 self.timings["per_server"] = {}
24 self.timings["cumulative_verify"] = 0.0
25 self.privkey_from = None
28 self.storage_index = None
30 self.status = "Not started"
32 self.counter = self.statusid_counter.next()
33 self.started = time.time()
36 def add_per_server_time(self, peerid, op, sent, elapsed):
37 assert op in ("query", "late", "privkey")
38 if peerid not in self.timings["per_server"]:
39 self.timings["per_server"][peerid] = []
40 self.timings["per_server"][peerid].append((op,sent,elapsed))
42 def get_started(self):
44 def get_finished(self):
46 def get_storage_index(self):
47 return self.storage_index
50 def get_servermap(self):
52 def get_privkey_from(self):
53 return self.privkey_from
54 def using_helper(self):
60 def get_progress(self):
64 def get_counter(self):
67 def set_storage_index(self, si):
68 self.storage_index = si
69 def set_mode(self, mode):
71 def set_privkey_from(self, peerid):
72 self.privkey_from = peerid
73 def set_status(self, status):
75 def set_progress(self, value):
77 def set_active(self, value):
79 def set_finished(self, when):
83 """I record the placement of mutable shares.
85 This object records which shares (of various versions) are located on
88 One purpose I serve is to inform callers about which versions of the
89 mutable file are recoverable and 'current'.
91 A second purpose is to serve as a state marker for test-and-set
92 operations. I am passed out of retrieval operations and back into publish
93 operations, which means 'publish this new version, but only if nothing
94 has changed since I last retrieved this data'. This reduces the chances
95 of clobbering a simultaneous (uncoordinated) write.
97 @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
98 (versionid, timestamp) tuple. Each 'versionid' is a
99 tuple of (seqnum, root_hash, IV, segsize, datalength,
100 k, N, signed_prefix, offsets)
102 @ivar connections: maps peerid to a RemoteReference
104 @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
105 shares that I should ignore (because a previous user of
106 the servermap determined that they were invalid). The
107 updater only locates a certain number of shares: if
108 some of these turn out to have integrity problems and
109 are unusable, the caller will need to mark those shares
110 as bad, then re-update the servermap, then try again.
111 The dict maps (peerid, shnum) tuple to old checkstring.
116 self.connections = {}
117 self.unreachable_peers = set() # peerids that didn't respond to queries
118 self.reachable_peers = set() # peerids that did respond to queries
119 self.problems = [] # mostly for debugging
120 self.bad_shares = {} # maps (peerid,shnum) to old checkstring
121 self.last_update_mode = None
122 self.last_update_time = 0
126 s.servermap = self.servermap.copy() # tuple->tuple
127 s.connections = self.connections.copy() # str->RemoteReference
128 s.unreachable_peers = set(self.unreachable_peers)
129 s.reachable_peers = set(self.reachable_peers)
130 s.problems = self.problems[:]
131 s.bad_shares = self.bad_shares.copy() # tuple->str
132 s.last_update_mode = self.last_update_mode
133 s.last_update_time = self.last_update_time
136 def mark_bad_share(self, peerid, shnum, checkstring):
137 """This share was found to be bad, either in the checkstring or
138 signature (detected during mapupdate), or deeper in the share
139 (detected at retrieve time). Remove it from our list of useful
140 shares, and remember that it is bad so we don't add it back again
141 later. We record the share's old checkstring (which might be
142 corrupted or badly signed) so that a repair operation can do the
143 test-and-set using it as a reference.
145 key = (peerid, shnum) # record checkstring
146 self.bad_shares[key] = checkstring
147 self.servermap.pop(key, None)
149 def add_new_share(self, peerid, shnum, verinfo, timestamp):
150 """We've written a new share out, replacing any that was there
152 key = (peerid, shnum)
153 self.bad_shares.pop(key, None)
154 self.servermap[key] = (verinfo, timestamp)
156 def dump(self, out=sys.stdout):
157 print >>out, "servermap:"
159 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
160 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
161 offsets_tuple) = verinfo
162 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
163 (idlib.shortnodeid_b2a(peerid), shnum,
164 seqnum, base32.b2a(root_hash)[:4], k, N,
167 print >>out, "%d PROBLEMS" % len(self.problems)
168 for f in self.problems:
177 def all_peers_for_version(self, verinfo):
178 """Return a set of peerids that hold shares for the given version."""
180 for ( (peerid, shnum), (verinfo2, timestamp) )
181 in self.servermap.items()
182 if verinfo == verinfo2])
184 def make_sharemap(self):
185 """Return a dict that maps shnum to a set of peerds that hold it."""
186 sharemap = DictOfSets()
187 for (peerid, shnum) in self.servermap:
188 sharemap.add(shnum, peerid)
191 def make_versionmap(self):
192 """Return a dict that maps versionid to sets of (shnum, peerid,
193 timestamp) tuples."""
194 versionmap = DictOfSets()
195 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
196 versionmap.add(verinfo, (shnum, peerid, timestamp))
199 def shares_on_peer(self, peerid):
201 for (s_peerid, shnum)
203 if s_peerid == peerid])
205 def version_on_peer(self, peerid, shnum):
206 key = (peerid, shnum)
207 if key in self.servermap:
208 (verinfo, timestamp) = self.servermap[key]
212 def shares_available(self):
213 """Return a dict that maps verinfo to tuples of
214 (num_distinct_shares, k, N) tuples."""
215 versionmap = self.make_versionmap()
217 for verinfo, shares in versionmap.items():
219 for (shnum, peerid, timestamp) in shares:
221 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
222 offsets_tuple) = verinfo
223 all_shares[verinfo] = (len(s), k, N)
226 def highest_seqnum(self):
227 available = self.shares_available()
228 seqnums = [verinfo[0]
229 for verinfo in available.keys()]
233 def summarize_version(self, verinfo):
234 """Take a versionid, return a string that describes it."""
235 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
236 offsets_tuple) = verinfo
237 return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
239 def summarize_versions(self):
240 """Return a string describing which versions we know about."""
241 versionmap = self.make_versionmap()
243 for (verinfo, shares) in versionmap.items():
244 vstr = self.summarize_version(verinfo)
245 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
246 bits.append("%d*%s" % (len(shnums), vstr))
247 return "/".join(bits)
249 def recoverable_versions(self):
250 """Return a set of versionids, one for each version that is currently
252 versionmap = self.make_versionmap()
254 recoverable_versions = set()
255 for (verinfo, shares) in versionmap.items():
256 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
257 offsets_tuple) = verinfo
258 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
260 # this one is recoverable
261 recoverable_versions.add(verinfo)
263 return recoverable_versions
265 def unrecoverable_versions(self):
266 """Return a set of versionids, one for each version that is currently
268 versionmap = self.make_versionmap()
270 unrecoverable_versions = set()
271 for (verinfo, shares) in versionmap.items():
272 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
273 offsets_tuple) = verinfo
274 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
276 unrecoverable_versions.add(verinfo)
278 return unrecoverable_versions
280 def best_recoverable_version(self):
281 """Return a single versionid, for the so-called 'best' recoverable
282 version. Sequence number is the primary sort criteria, followed by
283 root hash. Returns None if there are no recoverable versions."""
284 recoverable = list(self.recoverable_versions())
287 return recoverable[-1]
290 def size_of_version(self, verinfo):
291 """Given a versionid (perhaps returned by best_recoverable_version),
292 return the size of the file in bytes."""
293 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
294 offsets_tuple) = verinfo
297 def unrecoverable_newer_versions(self):
298 # Return a dict of versionid -> health, for versions that are
299 # unrecoverable and have later seqnums than any recoverable versions.
300 # These indicate that a write will lose data.
301 versionmap = self.make_versionmap()
302 healths = {} # maps verinfo to (found,k)
303 unrecoverable = set()
304 highest_recoverable_seqnum = -1
305 for (verinfo, shares) in versionmap.items():
306 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
307 offsets_tuple) = verinfo
308 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
309 healths[verinfo] = (len(shnums),k)
311 unrecoverable.add(verinfo)
313 highest_recoverable_seqnum = max(seqnum,
314 highest_recoverable_seqnum)
317 for verinfo in unrecoverable:
318 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
319 offsets_tuple) = verinfo
320 if seqnum > highest_recoverable_seqnum:
321 newversions[verinfo] = healths[verinfo]
326 def needs_merge(self):
327 # return True if there are multiple recoverable versions with the
328 # same seqnum, meaning that MutableFileNode.read_best_version is not
329 # giving you the whole story, and that using its data to do a
330 # subsequent publish will lose information.
331 recoverable_seqnums = [verinfo[0]
332 for verinfo in self.recoverable_versions()]
333 for seqnum in recoverable_seqnums:
334 if recoverable_seqnums.count(seqnum) > 1:
339 class ServermapUpdater:
340 def __init__(self, filenode, storage_broker, monitor, servermap,
341 mode=MODE_READ, add_lease=False):
342 """I update a servermap, locating a sufficient number of useful
343 shares and remembering where they are located.
347 self._node = filenode
348 self._storage_broker = storage_broker
349 self._monitor = monitor
350 self._servermap = servermap
352 self._add_lease = add_lease
355 self._storage_index = filenode.get_storage_index()
356 self._last_failure = None
358 self._status = UpdateStatus()
359 self._status.set_storage_index(self._storage_index)
360 self._status.set_progress(0.0)
361 self._status.set_mode(mode)
363 self._servers_responded = set()
365 # how much data should we read?
366 # * if we only need the checkstring, then [0:75]
367 # * if we need to validate the checkstring sig, then [543ish:799ish]
368 # * if we need the verification key, then [107:436ish]
369 # * the offset table at [75:107] tells us about the 'ish'
370 # * if we need the encrypted private key, we want [-1216ish:]
371 # * but we can't read from negative offsets
372 # * the offset table tells us the 'ish', also the positive offset
373 # A future version of the SMDF slot format should consider using
374 # fixed-size slots so we can retrieve less data. For now, we'll just
375 # read 2000 bytes, which also happens to read enough actual data to
376 # pre-fetch a 9-entry dirnode.
377 self._read_size = 4000
378 if mode == MODE_CHECK:
379 # we use unpack_prefix_and_signature, so we need 1k
380 self._read_size = 1000
381 self._need_privkey = False
382 if mode == MODE_WRITE and not self._node.get_privkey():
383 self._need_privkey = True
384 # check+repair: repair requires the privkey, so if we didn't happen
385 # to ask for it during the check, we'll have problems doing the
388 prefix = si_b2a(self._storage_index)[:5]
389 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
390 si=prefix, mode=mode)
392 def get_status(self):
395 def log(self, *args, **kwargs):
396 if "parent" not in kwargs:
397 kwargs["parent"] = self._log_number
398 if "facility" not in kwargs:
399 kwargs["facility"] = "tahoe.mutable.mapupdate"
400 return log.msg(*args, **kwargs)
403 """Update the servermap to reflect current conditions. Returns a
404 Deferred that fires with the servermap once the update has finished."""
405 self._started = time.time()
406 self._status.set_active(True)
408 # self._valid_versions is a set of validated verinfo tuples. We just
409 # use it to remember which versions had valid signatures, so we can
410 # avoid re-checking the signatures for each share.
411 self._valid_versions = set()
413 # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
414 # timestamp) tuples. This is used to figure out which versions might
415 # be retrievable, and to make the eventual data download faster.
416 self.versionmap = DictOfSets()
418 self._done_deferred = defer.Deferred()
420 # first, which peers should be talk to? Any that were in our old
421 # servermap, plus "enough" others.
423 self._queries_completed = 0
425 sb = self._storage_broker
426 full_peerlist = sb.get_servers_for_index(self._storage_index)
427 self.full_peerlist = full_peerlist # for use later, immutable
428 self.extra_peers = full_peerlist[:] # peers are removed as we use them
429 self._good_peers = set() # peers who had some shares
430 self._empty_peers = set() # peers who don't have any shares
431 self._bad_peers = set() # peers to whom our queries failed
433 k = self._node.get_required_shares()
437 N = self._node.get_total_shares()
441 # we want to send queries to at least this many peers (although we
442 # might not wait for all of their answers to come back)
443 self.num_peers_to_query = k + self.EPSILON
445 if self.mode == MODE_CHECK:
446 initial_peers_to_query = dict(full_peerlist)
447 must_query = set(initial_peers_to_query.keys())
448 self.extra_peers = []
449 elif self.mode == MODE_WRITE:
450 # we're planning to replace all the shares, so we want a good
451 # chance of finding them all. We will keep searching until we've
452 # seen epsilon that don't have a share.
453 self.num_peers_to_query = N + self.EPSILON
454 initial_peers_to_query, must_query = self._build_initial_querylist()
455 self.required_num_empty_peers = self.EPSILON
457 # TODO: arrange to read lots of data from k-ish servers, to avoid
458 # the extra round trip required to read large directories. This
459 # might also avoid the round trip required to read the encrypted
463 initial_peers_to_query, must_query = self._build_initial_querylist()
465 # this is a set of peers that we are required to get responses from:
466 # they are peers who used to have a share, so we need to know where
467 # they currently stand, even if that means we have to wait for a
468 # silently-lost TCP connection to time out. We remove peers from this
469 # set as we get responses.
470 self._must_query = must_query
472 # now initial_peers_to_query contains the peers that we should ask,
473 # self.must_query contains the peers that we must have heard from
474 # before we can consider ourselves finished, and self.extra_peers
475 # contains the overflow (peers that we should tap if we don't get
478 self._send_initial_requests(initial_peers_to_query)
479 self._status.timings["initial_queries"] = time.time() - self._started
480 return self._done_deferred
482 def _build_initial_querylist(self):
483 initial_peers_to_query = {}
485 for peerid in self._servermap.all_peers():
486 ss = self._servermap.connections[peerid]
487 # we send queries to everyone who was already in the sharemap
488 initial_peers_to_query[peerid] = ss
489 # and we must wait for responses from them
490 must_query.add(peerid)
492 while ((self.num_peers_to_query > len(initial_peers_to_query))
493 and self.extra_peers):
494 (peerid, ss) = self.extra_peers.pop(0)
495 initial_peers_to_query[peerid] = ss
497 return initial_peers_to_query, must_query
499 def _send_initial_requests(self, peerlist):
500 self._status.set_status("Sending %d initial queries" % len(peerlist))
501 self._queries_outstanding = set()
502 self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
503 for (peerid, ss) in peerlist.items():
504 self._queries_outstanding.add(peerid)
505 self._do_query(ss, peerid, self._storage_index, self._read_size)
508 # there is nobody to ask, so we need to short-circuit the state
510 d = defer.maybeDeferred(self._check_for_done, None)
511 d.addErrback(self._fatal_error)
513 # control flow beyond this point: state machine. Receiving responses
514 # from queries is the input. We might send out more queries, or we
515 # might produce a result.
518 def _do_query(self, ss, peerid, storage_index, readsize):
519 self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
520 peerid=idlib.shortnodeid_b2a(peerid),
523 self._servermap.connections[peerid] = ss
524 started = time.time()
525 self._queries_outstanding.add(peerid)
526 d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
527 d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
529 d.addErrback(self._query_failed, peerid)
530 # errors that aren't handled by _query_failed (and errors caused by
531 # _query_failed) get logged, but we still want to check for doneness.
532 d.addErrback(log.err)
533 d.addBoth(self._check_for_done)
534 d.addErrback(self._fatal_error)
537 def _do_read(self, ss, peerid, storage_index, shnums, readv):
539 # send an add-lease message in parallel. The results are handled
540 # separately. This is sent before the slot_readv() so that we can
541 # be sure the add_lease is retired by the time slot_readv comes
542 # back (this relies upon our knowledge that the server code for
543 # add_lease is synchronous).
544 renew_secret = self._node.get_renewal_secret(peerid)
545 cancel_secret = self._node.get_cancel_secret(peerid)
546 d2 = ss.callRemote("add_lease", storage_index,
547 renew_secret, cancel_secret)
549 d2.addErrback(self._add_lease_failed, peerid, storage_index)
550 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
553 def _got_results(self, datavs, peerid, readsize, stuff, started):
554 lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
555 peerid=idlib.shortnodeid_b2a(peerid),
556 numshares=len(datavs),
559 elapsed = now - started
560 self._queries_outstanding.discard(peerid)
561 self._servermap.reachable_peers.add(peerid)
562 self._must_query.discard(peerid)
563 self._queries_completed += 1
564 if not self._running:
565 self.log("but we're not running, so we'll ignore it", parent=lp,
567 self._status.add_per_server_time(peerid, "late", started, elapsed)
569 self._status.add_per_server_time(peerid, "query", started, elapsed)
572 self._good_peers.add(peerid)
574 self._empty_peers.add(peerid)
578 for shnum,datav in datavs.items():
581 verinfo = self._got_results_one_share(shnum, data, peerid, lp)
582 last_verinfo = verinfo
584 self._node._add_to_cache(verinfo, shnum, 0, data, now)
585 except CorruptShareError, e:
586 # log it and give the other shares a chance to be processed
587 f = failure.Failure()
588 self.log(format="bad share: %(f_value)s", f_value=str(f.value),
589 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
590 self.notify_server_corruption(peerid, shnum, str(e))
591 self._bad_peers.add(peerid)
592 self._last_failure = f
593 checkstring = data[:SIGNED_PREFIX_LENGTH]
594 self._servermap.mark_bad_share(peerid, shnum, checkstring)
595 self._servermap.problems.append(f)
598 self._status.timings["cumulative_verify"] += (time.time() - now)
600 if self._need_privkey and last_verinfo:
601 # send them a request for the privkey. We send one request per
603 lp2 = self.log("sending privkey request",
604 parent=lp, level=log.NOISY)
605 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
606 offsets_tuple) = last_verinfo
607 o = dict(offsets_tuple)
609 self._queries_outstanding.add(peerid)
610 readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
611 ss = self._servermap.connections[peerid]
612 privkey_started = time.time()
613 d = self._do_read(ss, peerid, self._storage_index,
615 d.addCallback(self._got_privkey_results, peerid, last_shnum,
616 privkey_started, lp2)
617 d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
618 d.addErrback(log.err)
619 d.addCallback(self._check_for_done)
620 d.addErrback(self._fatal_error)
623 self.log("_got_results done", parent=lp, level=log.NOISY)
625 def notify_server_corruption(self, peerid, shnum, reason):
626 ss = self._servermap.connections[peerid]
627 ss.callRemoteOnly("advise_corrupt_share",
628 "mutable", self._storage_index, shnum, reason)
630 def _got_results_one_share(self, shnum, data, peerid, lp):
631 self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
633 peerid=idlib.shortnodeid_b2a(peerid),
637 # this might raise NeedMoreDataError, if the pubkey and signature
638 # live at some weird offset. That shouldn't happen, so I'm going to
639 # treat it as a bad share.
640 (seqnum, root_hash, IV, k, N, segsize, datalength,
641 pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
643 if not self._node.get_pubkey():
644 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
645 assert len(fingerprint) == 32
646 if fingerprint != self._node.get_fingerprint():
647 raise CorruptShareError(peerid, shnum,
648 "pubkey doesn't match fingerprint")
649 self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
651 if self._need_privkey:
652 self._try_to_extract_privkey(data, peerid, shnum, lp)
654 (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
655 ig_segsize, ig_datalen, offsets) = unpack_header(data)
656 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
658 verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
661 if verinfo not in self._valid_versions:
662 # it's a new pair. Verify the signature.
663 valid = self._node.get_pubkey().verify(prefix, signature)
665 raise CorruptShareError(peerid, shnum, "signature is invalid")
667 # ok, it's a valid verinfo. Add it to the list of validated
669 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
670 % (seqnum, base32.b2a(root_hash)[:4],
671 idlib.shortnodeid_b2a(peerid), shnum,
672 k, N, segsize, datalength),
674 self._valid_versions.add(verinfo)
675 # We now know that this is a valid candidate verinfo.
677 if (peerid, shnum) in self._servermap.bad_shares:
678 # we've been told that the rest of the data in this share is
679 # unusable, so don't add it to the servermap.
680 self.log("but we've been told this is a bad share",
681 parent=lp, level=log.UNUSUAL)
684 # Add the info to our servermap.
685 timestamp = time.time()
686 self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
688 self.versionmap.add(verinfo, (shnum, peerid, timestamp))
691 def _deserialize_pubkey(self, pubkey_s):
692 verifier = rsa.create_verifying_key_from_string(pubkey_s)
695 def _try_to_extract_privkey(self, data, peerid, shnum, lp):
697 r = unpack_share(data)
698 except NeedMoreDataError, e:
699 # this share won't help us. oh well.
700 offset = e.encprivkey_offset
701 length = e.encprivkey_length
702 self.log("shnum %d on peerid %s: share was too short (%dB) "
703 "to get the encprivkey; [%d:%d] ought to hold it" %
704 (shnum, idlib.shortnodeid_b2a(peerid), len(data),
705 offset, offset+length),
707 # NOTE: if uncoordinated writes are taking place, someone might
708 # change the share (and most probably move the encprivkey) before
709 # we get a chance to do one of these reads and fetch it. This
710 # will cause us to see a NotEnoughSharesError(unable to fetch
711 # privkey) instead of an UncoordinatedWriteError . This is a
712 # nuisance, but it will go away when we move to DSA-based mutable
713 # files (since the privkey will be small enough to fit in the
718 (seqnum, root_hash, IV, k, N, segsize, datalen,
719 pubkey, signature, share_hash_chain, block_hash_tree,
720 share_data, enc_privkey) = r
722 return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
724 def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
726 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
727 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
728 if alleged_writekey != self._node.get_writekey():
729 self.log("invalid privkey from %s shnum %d" %
730 (idlib.nodeid_b2a(peerid)[:8], shnum),
731 parent=lp, level=log.WEIRD, umid="aJVccw")
735 self.log("got valid privkey from shnum %d on peerid %s" %
736 (shnum, idlib.shortnodeid_b2a(peerid)),
738 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
739 self._node._populate_encprivkey(enc_privkey)
740 self._node._populate_privkey(privkey)
741 self._need_privkey = False
742 self._status.set_privkey_from(peerid)
745 def _add_lease_failed(self, f, peerid, storage_index):
746 # Older versions of Tahoe didn't handle the add-lease message very
747 # well: <=1.1.0 throws a NameError because it doesn't implement
748 # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
749 # (which is most of them, since we send add-lease to everybody,
750 # before we know whether or not they have any shares for us), and
751 # 1.2.0 throws KeyError even on known buckets due to an internal bug
752 # in the latency-measuring code.
754 # we want to ignore the known-harmless errors and log the others. In
755 # particular we want to log any local errors caused by coding
758 if f.check(DeadReferenceError):
760 if f.check(RemoteException):
761 if f.value.failure.check(KeyError, IndexError, NameError):
762 # this may ignore a bit too much, but that only hurts us
765 self.log(format="error in add_lease from [%(peerid)s]: %(f_value)s",
766 peerid=idlib.shortnodeid_b2a(peerid),
767 f_value=str(f.value),
769 level=log.WEIRD, umid="iqg3mw")
771 # local errors are cause for alarm
773 format="local error in add_lease to [%(peerid)s]: %(f_value)s",
774 peerid=idlib.shortnodeid_b2a(peerid),
775 f_value=str(f.value),
776 level=log.WEIRD, umid="ZWh6HA")
778 def _query_failed(self, f, peerid):
779 if not self._running:
782 if f.check(DeadReferenceError):
784 self.log(format="error during query: %(f_value)s",
785 f_value=str(f.value), failure=f,
786 level=level, umid="IHXuQg")
787 self._must_query.discard(peerid)
788 self._queries_outstanding.discard(peerid)
789 self._bad_peers.add(peerid)
790 self._servermap.problems.append(f)
791 # a peerid could be in both ServerMap.reachable_peers and
792 # .unreachable_peers if they responded to our query, but then an
793 # exception was raised in _got_results.
794 self._servermap.unreachable_peers.add(peerid)
795 self._queries_completed += 1
796 self._last_failure = f
798 def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
800 elapsed = now - started
801 self._status.add_per_server_time(peerid, "privkey", started, elapsed)
802 self._queries_outstanding.discard(peerid)
803 if not self._need_privkey:
805 if shnum not in datavs:
806 self.log("privkey wasn't there when we asked it",
807 level=log.WEIRD, umid="VA9uDQ")
809 datav = datavs[shnum]
810 enc_privkey = datav[0]
811 self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
813 def _privkey_query_failed(self, f, peerid, shnum, lp):
814 self._queries_outstanding.discard(peerid)
815 if not self._running:
818 if f.check(DeadReferenceError):
820 self.log(format="error during privkey query: %(f_value)s",
821 f_value=str(f.value), failure=f,
822 parent=lp, level=level, umid="McoJ5w")
823 self._servermap.problems.append(f)
824 self._last_failure = f
826 def _check_for_done(self, res):
828 # return self._send_more_queries(outstanding) : send some more queries
829 # return self._done() : all done
830 # return : keep waiting, no new queries
832 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
833 "%(outstanding)d queries outstanding, "
834 "%(extra)d extra peers available, "
835 "%(must)d 'must query' peers left, "
836 "need_privkey=%(need_privkey)s"
839 outstanding=len(self._queries_outstanding),
840 extra=len(self.extra_peers),
841 must=len(self._must_query),
842 need_privkey=self._need_privkey,
846 if not self._running:
847 self.log("but we're not running", parent=lp, level=log.NOISY)
851 # we are still waiting for responses from peers that used to have
852 # a share, so we must continue to wait. No additional queries are
853 # required at this time.
854 self.log("%d 'must query' peers left" % len(self._must_query),
855 level=log.NOISY, parent=lp)
858 if (not self._queries_outstanding and not self.extra_peers):
859 # all queries have retired, and we have no peers left to ask. No
860 # more progress can be made, therefore we are done.
861 self.log("all queries are retired, no extra peers: done",
865 recoverable_versions = self._servermap.recoverable_versions()
866 unrecoverable_versions = self._servermap.unrecoverable_versions()
868 # what is our completion policy? how hard should we work?
870 if self.mode == MODE_ANYTHING:
871 if recoverable_versions:
872 self.log("%d recoverable versions: done"
873 % len(recoverable_versions),
877 if self.mode == MODE_CHECK:
878 # we used self._must_query, and we know there aren't any
879 # responses still waiting, so that means we must be done
880 self.log("done", parent=lp)
884 if self.mode == MODE_READ:
885 # if we've queried k+epsilon servers, and we see a recoverable
886 # version, and we haven't seen any unrecoverable higher-seqnum'ed
887 # versions, then we're done.
889 if self._queries_completed < self.num_peers_to_query:
890 self.log(format="%(completed)d completed, %(query)d to query: need more",
891 completed=self._queries_completed,
892 query=self.num_peers_to_query,
893 level=log.NOISY, parent=lp)
894 return self._send_more_queries(MAX_IN_FLIGHT)
895 if not recoverable_versions:
896 self.log("no recoverable versions: need more",
897 level=log.NOISY, parent=lp)
898 return self._send_more_queries(MAX_IN_FLIGHT)
899 highest_recoverable = max(recoverable_versions)
900 highest_recoverable_seqnum = highest_recoverable[0]
901 for unrec_verinfo in unrecoverable_versions:
902 if unrec_verinfo[0] > highest_recoverable_seqnum:
903 # there is evidence of a higher-seqnum version, but we
904 # don't yet see enough shares to recover it. Try harder.
905 # TODO: consider sending more queries.
906 # TODO: consider limiting the search distance
907 self.log("evidence of higher seqnum: need more",
908 level=log.UNUSUAL, parent=lp)
909 return self._send_more_queries(MAX_IN_FLIGHT)
910 # all the unrecoverable versions were old or concurrent with a
911 # recoverable version. Good enough.
912 self.log("no higher-seqnum: done", parent=lp)
915 if self.mode == MODE_WRITE:
916 # we want to keep querying until we've seen a few that don't have
917 # any shares, to be sufficiently confident that we've seen all
918 # the shares. This is still less work than MODE_CHECK, which asks
919 # every server in the world.
921 if not recoverable_versions:
922 self.log("no recoverable versions: need more", parent=lp,
924 return self._send_more_queries(MAX_IN_FLIGHT)
927 last_not_responded = -1
928 num_not_responded = 0
931 found_boundary = False
933 for i,(peerid,ss) in enumerate(self.full_peerlist):
934 if peerid in self._bad_peers:
937 #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
938 elif peerid in self._empty_peers:
941 #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
944 if num_not_found >= self.EPSILON:
945 self.log("found our boundary, %s" %
947 parent=lp, level=log.NOISY)
948 found_boundary = True
951 elif peerid in self._good_peers:
954 #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
960 #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
961 last_not_responded = i
962 num_not_responded += 1
965 # we need to know that we've gotten answers from
966 # everybody to the left of here
967 if last_not_responded == -1:
969 self.log("have all our answers",
970 parent=lp, level=log.NOISY)
971 # .. unless we're still waiting on the privkey
972 if self._need_privkey:
973 self.log("but we're still waiting for the privkey",
974 parent=lp, level=log.NOISY)
975 # if we found the boundary but we haven't yet found
976 # the privkey, we may need to look further. If
977 # somehow all the privkeys were corrupted (but the
978 # shares were readable), then this is likely to do an
980 return self._send_more_queries(MAX_IN_FLIGHT)
982 # still waiting for somebody
983 return self._send_more_queries(num_not_responded)
985 # if we hit here, we didn't find our boundary, so we're still
987 self.log("no boundary yet, %s" % "".join(states), parent=lp,
989 return self._send_more_queries(MAX_IN_FLIGHT)
991 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
992 # arbitrary, really I want this to be something like k -
993 # max(known_version_sharecounts) + some extra
994 self.log("catchall: need more", parent=lp, level=log.NOISY)
995 return self._send_more_queries(MAX_IN_FLIGHT)
997 def _send_more_queries(self, num_outstanding):
1001 self.log(format=" there are %(outstanding)d queries outstanding",
1002 outstanding=len(self._queries_outstanding),
1004 active_queries = len(self._queries_outstanding) + len(more_queries)
1005 if active_queries >= num_outstanding:
1007 if not self.extra_peers:
1009 more_queries.append(self.extra_peers.pop(0))
1011 self.log(format="sending %(more)d more queries: %(who)s",
1012 more=len(more_queries),
1013 who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
1014 for (peerid,ss) in more_queries]),
1017 for (peerid, ss) in more_queries:
1018 self._do_query(ss, peerid, self._storage_index, self._read_size)
1019 # we'll retrigger when those queries come back
1022 if not self._running:
1024 self._running = False
1026 elapsed = now - self._started
1027 self._status.set_finished(now)
1028 self._status.timings["total"] = elapsed
1029 self._status.set_progress(1.0)
1030 self._status.set_status("Finished")
1031 self._status.set_active(False)
1033 self._servermap.last_update_mode = self.mode
1034 self._servermap.last_update_time = self._started
1035 # the servermap will not be touched after this
1036 self.log("servermap: %s" % self._servermap.summarize_versions())
1037 eventually(self._done_deferred.callback, self._servermap)
1039 def _fatal_error(self, f):
1040 self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1041 self._done_deferred.errback(f)