3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually
8 from allmydata.util import base32, hashutil, idlib, log
9 from allmydata.storage.server import si_b2a
10 from allmydata.interfaces import IServermapUpdaterStatus
11 from pycryptopp.publickey import rsa
13 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
14 DictOfSets, CorruptShareError, NeedMoreDataError
15 from layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
19 implements(IServermapUpdaterStatus)
20 statusid_counter = count(0)
23 self.timings["per_server"] = {}
24 self.timings["cumulative_verify"] = 0.0
25 self.privkey_from = None
28 self.storage_index = None
30 self.status = "Not started"
32 self.counter = self.statusid_counter.next()
33 self.started = time.time()
36 def add_per_server_time(self, peerid, op, sent, elapsed):
37 assert op in ("query", "late", "privkey")
38 if peerid not in self.timings["per_server"]:
39 self.timings["per_server"][peerid] = []
40 self.timings["per_server"][peerid].append((op,sent,elapsed))
42 def get_started(self):
44 def get_finished(self):
46 def get_storage_index(self):
47 return self.storage_index
50 def get_servermap(self):
52 def get_privkey_from(self):
53 return self.privkey_from
54 def using_helper(self):
60 def get_progress(self):
64 def get_counter(self):
67 def set_storage_index(self, si):
68 self.storage_index = si
69 def set_mode(self, mode):
71 def set_privkey_from(self, peerid):
72 self.privkey_from = peerid
73 def set_status(self, status):
75 def set_progress(self, value):
77 def set_active(self, value):
79 def set_finished(self, when):
83 """I record the placement of mutable shares.
85 This object records which shares (of various versions) are located on
88 One purpose I serve is to inform callers about which versions of the
89 mutable file are recoverable and 'current'.
91 A second purpose is to serve as a state marker for test-and-set
92 operations. I am passed out of retrieval operations and back into publish
93 operations, which means 'publish this new version, but only if nothing
94 has changed since I last retrieved this data'. This reduces the chances
95 of clobbering a simultaneous (uncoordinated) write.
97 @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
98 (versionid, timestamp) tuple. Each 'versionid' is a
99 tuple of (seqnum, root_hash, IV, segsize, datalength,
100 k, N, signed_prefix, offsets)
102 @ivar connections: maps peerid to a RemoteReference
104 @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
105 shares that I should ignore (because a previous user of
106 the servermap determined that they were invalid). The
107 updater only locates a certain number of shares: if
108 some of these turn out to have integrity problems and
109 are unusable, the caller will need to mark those shares
110 as bad, then re-update the servermap, then try again.
111 The dict maps (peerid, shnum) tuple to old checkstring.
116 self.connections = {}
117 self.unreachable_peers = set() # peerids that didn't respond to queries
118 self.reachable_peers = set() # peerids that did respond to queries
119 self.problems = [] # mostly for debugging
120 self.bad_shares = {} # maps (peerid,shnum) to old checkstring
121 self.last_update_mode = None
122 self.last_update_time = 0
126 s.servermap = self.servermap.copy() # tuple->tuple
127 s.connections = self.connections.copy() # str->RemoteReference
128 s.unreachable_peers = set(self.unreachable_peers)
129 s.reachable_peers = set(self.reachable_peers)
130 s.problems = self.problems[:]
131 s.bad_shares = self.bad_shares.copy() # tuple->str
132 s.last_update_mode = self.last_update_mode
133 s.last_update_time = self.last_update_time
136 def mark_bad_share(self, peerid, shnum, checkstring):
137 """This share was found to be bad, either in the checkstring or
138 signature (detected during mapupdate), or deeper in the share
139 (detected at retrieve time). Remove it from our list of useful
140 shares, and remember that it is bad so we don't add it back again
141 later. We record the share's old checkstring (which might be
142 corrupted or badly signed) so that a repair operation can do the
143 test-and-set using it as a reference.
145 key = (peerid, shnum) # record checkstring
146 self.bad_shares[key] = checkstring
147 self.servermap.pop(key, None)
149 def add_new_share(self, peerid, shnum, verinfo, timestamp):
150 """We've written a new share out, replacing any that was there
152 key = (peerid, shnum)
153 self.bad_shares.pop(key, None)
154 self.servermap[key] = (verinfo, timestamp)
156 def dump(self, out=sys.stdout):
157 print >>out, "servermap:"
159 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
160 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
161 offsets_tuple) = verinfo
162 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
163 (idlib.shortnodeid_b2a(peerid), shnum,
164 seqnum, base32.b2a(root_hash)[:4], k, N,
167 print >>out, "%d PROBLEMS" % len(self.problems)
168 for f in self.problems:
177 def all_peers_for_version(self, verinfo):
178 """Return a set of peerids that hold shares for the given version."""
180 for ( (peerid, shnum), (verinfo2, timestamp) )
181 in self.servermap.items()
182 if verinfo == verinfo2])
184 def make_sharemap(self):
185 """Return a dict that maps shnum to a set of peerds that hold it."""
186 sharemap = DictOfSets()
187 for (peerid, shnum) in self.servermap:
188 sharemap.add(shnum, peerid)
191 def make_versionmap(self):
192 """Return a dict that maps versionid to sets of (shnum, peerid,
193 timestamp) tuples."""
194 versionmap = DictOfSets()
195 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
196 versionmap.add(verinfo, (shnum, peerid, timestamp))
199 def shares_on_peer(self, peerid):
201 for (s_peerid, shnum)
203 if s_peerid == peerid])
205 def version_on_peer(self, peerid, shnum):
206 key = (peerid, shnum)
207 if key in self.servermap:
208 (verinfo, timestamp) = self.servermap[key]
212 def shares_available(self):
213 """Return a dict that maps verinfo to tuples of
214 (num_distinct_shares, k, N) tuples."""
215 versionmap = self.make_versionmap()
217 for verinfo, shares in versionmap.items():
219 for (shnum, peerid, timestamp) in shares:
221 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
222 offsets_tuple) = verinfo
223 all_shares[verinfo] = (len(s), k, N)
226 def highest_seqnum(self):
227 available = self.shares_available()
228 seqnums = [verinfo[0]
229 for verinfo in available.keys()]
233 def summarize_version(self, verinfo):
234 """Take a versionid, return a string that describes it."""
235 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
236 offsets_tuple) = verinfo
237 return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
239 def summarize_versions(self):
240 """Return a string describing which versions we know about."""
241 versionmap = self.make_versionmap()
243 for (verinfo, shares) in versionmap.items():
244 vstr = self.summarize_version(verinfo)
245 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
246 bits.append("%d*%s" % (len(shnums), vstr))
247 return "/".join(bits)
249 def recoverable_versions(self):
250 """Return a set of versionids, one for each version that is currently
252 versionmap = self.make_versionmap()
254 recoverable_versions = set()
255 for (verinfo, shares) in versionmap.items():
256 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
257 offsets_tuple) = verinfo
258 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
260 # this one is recoverable
261 recoverable_versions.add(verinfo)
263 return recoverable_versions
265 def unrecoverable_versions(self):
266 """Return a set of versionids, one for each version that is currently
268 versionmap = self.make_versionmap()
270 unrecoverable_versions = set()
271 for (verinfo, shares) in versionmap.items():
272 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
273 offsets_tuple) = verinfo
274 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
276 unrecoverable_versions.add(verinfo)
278 return unrecoverable_versions
280 def best_recoverable_version(self):
281 """Return a single versionid, for the so-called 'best' recoverable
282 version. Sequence number is the primary sort criteria, followed by
283 root hash. Returns None if there are no recoverable versions."""
284 recoverable = list(self.recoverable_versions())
287 return recoverable[-1]
290 def size_of_version(self, verinfo):
291 """Given a versionid (perhaps returned by best_recoverable_version),
292 return the size of the file in bytes."""
293 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
294 offsets_tuple) = verinfo
297 def unrecoverable_newer_versions(self):
298 # Return a dict of versionid -> health, for versions that are
299 # unrecoverable and have later seqnums than any recoverable versions.
300 # These indicate that a write will lose data.
301 versionmap = self.make_versionmap()
302 healths = {} # maps verinfo to (found,k)
303 unrecoverable = set()
304 highest_recoverable_seqnum = -1
305 for (verinfo, shares) in versionmap.items():
306 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
307 offsets_tuple) = verinfo
308 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
309 healths[verinfo] = (len(shnums),k)
311 unrecoverable.add(verinfo)
313 highest_recoverable_seqnum = max(seqnum,
314 highest_recoverable_seqnum)
317 for verinfo in unrecoverable:
318 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
319 offsets_tuple) = verinfo
320 if seqnum > highest_recoverable_seqnum:
321 newversions[verinfo] = healths[verinfo]
326 def needs_merge(self):
327 # return True if there are multiple recoverable versions with the
328 # same seqnum, meaning that MutableFileNode.read_best_version is not
329 # giving you the whole story, and that using its data to do a
330 # subsequent publish will lose information.
331 recoverable_seqnums = [verinfo[0]
332 for verinfo in self.recoverable_versions()]
333 for seqnum in recoverable_seqnums:
334 if recoverable_seqnums.count(seqnum) > 1:
339 class ServermapUpdater:
340 def __init__(self, filenode, monitor, servermap, mode=MODE_READ,
342 """I update a servermap, locating a sufficient number of useful
343 shares and remembering where they are located.
347 self._node = filenode
348 self._monitor = monitor
349 self._servermap = servermap
351 self._add_lease = add_lease
354 self._storage_index = filenode.get_storage_index()
355 self._last_failure = None
357 self._status = UpdateStatus()
358 self._status.set_storage_index(self._storage_index)
359 self._status.set_progress(0.0)
360 self._status.set_mode(mode)
362 self._servers_responded = set()
364 # how much data should we read?
365 # * if we only need the checkstring, then [0:75]
366 # * if we need to validate the checkstring sig, then [543ish:799ish]
367 # * if we need the verification key, then [107:436ish]
368 # * the offset table at [75:107] tells us about the 'ish'
369 # * if we need the encrypted private key, we want [-1216ish:]
370 # * but we can't read from negative offsets
371 # * the offset table tells us the 'ish', also the positive offset
372 # A future version of the SMDF slot format should consider using
373 # fixed-size slots so we can retrieve less data. For now, we'll just
374 # read 2000 bytes, which also happens to read enough actual data to
375 # pre-fetch a 9-entry dirnode.
376 self._read_size = 4000
377 if mode == MODE_CHECK:
378 # we use unpack_prefix_and_signature, so we need 1k
379 self._read_size = 1000
380 self._need_privkey = False
381 if mode == MODE_WRITE and not self._node._privkey:
382 self._need_privkey = True
383 # check+repair: repair requires the privkey, so if we didn't happen
384 # to ask for it during the check, we'll have problems doing the
387 prefix = si_b2a(self._storage_index)[:5]
388 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
389 si=prefix, mode=mode)
391 def get_status(self):
394 def log(self, *args, **kwargs):
395 if "parent" not in kwargs:
396 kwargs["parent"] = self._log_number
397 if "facility" not in kwargs:
398 kwargs["facility"] = "tahoe.mutable.mapupdate"
399 return log.msg(*args, **kwargs)
402 """Update the servermap to reflect current conditions. Returns a
403 Deferred that fires with the servermap once the update has finished."""
404 self._started = time.time()
405 self._status.set_active(True)
407 # self._valid_versions is a set of validated verinfo tuples. We just
408 # use it to remember which versions had valid signatures, so we can
409 # avoid re-checking the signatures for each share.
410 self._valid_versions = set()
412 # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
413 # timestamp) tuples. This is used to figure out which versions might
414 # be retrievable, and to make the eventual data download faster.
415 self.versionmap = DictOfSets()
417 self._done_deferred = defer.Deferred()
419 # first, which peers should be talk to? Any that were in our old
420 # servermap, plus "enough" others.
422 self._queries_completed = 0
424 sb = self._node._client.get_storage_broker()
425 full_peerlist = sb.get_servers_for_index(self._node._storage_index)
426 self.full_peerlist = full_peerlist # for use later, immutable
427 self.extra_peers = full_peerlist[:] # peers are removed as we use them
428 self._good_peers = set() # peers who had some shares
429 self._empty_peers = set() # peers who don't have any shares
430 self._bad_peers = set() # peers to whom our queries failed
432 k = self._node.get_required_shares()
436 N = self._node.get_required_shares()
440 # we want to send queries to at least this many peers (although we
441 # might not wait for all of their answers to come back)
442 self.num_peers_to_query = k + self.EPSILON
444 if self.mode == MODE_CHECK:
445 initial_peers_to_query = dict(full_peerlist)
446 must_query = set(initial_peers_to_query.keys())
447 self.extra_peers = []
448 elif self.mode == MODE_WRITE:
449 # we're planning to replace all the shares, so we want a good
450 # chance of finding them all. We will keep searching until we've
451 # seen epsilon that don't have a share.
452 self.num_peers_to_query = N + self.EPSILON
453 initial_peers_to_query, must_query = self._build_initial_querylist()
454 self.required_num_empty_peers = self.EPSILON
456 # TODO: arrange to read lots of data from k-ish servers, to avoid
457 # the extra round trip required to read large directories. This
458 # might also avoid the round trip required to read the encrypted
462 initial_peers_to_query, must_query = self._build_initial_querylist()
464 # this is a set of peers that we are required to get responses from:
465 # they are peers who used to have a share, so we need to know where
466 # they currently stand, even if that means we have to wait for a
467 # silently-lost TCP connection to time out. We remove peers from this
468 # set as we get responses.
469 self._must_query = must_query
471 # now initial_peers_to_query contains the peers that we should ask,
472 # self.must_query contains the peers that we must have heard from
473 # before we can consider ourselves finished, and self.extra_peers
474 # contains the overflow (peers that we should tap if we don't get
477 self._send_initial_requests(initial_peers_to_query)
478 self._status.timings["initial_queries"] = time.time() - self._started
479 return self._done_deferred
481 def _build_initial_querylist(self):
482 initial_peers_to_query = {}
484 for peerid in self._servermap.all_peers():
485 ss = self._servermap.connections[peerid]
486 # we send queries to everyone who was already in the sharemap
487 initial_peers_to_query[peerid] = ss
488 # and we must wait for responses from them
489 must_query.add(peerid)
491 while ((self.num_peers_to_query > len(initial_peers_to_query))
492 and self.extra_peers):
493 (peerid, ss) = self.extra_peers.pop(0)
494 initial_peers_to_query[peerid] = ss
496 return initial_peers_to_query, must_query
498 def _send_initial_requests(self, peerlist):
499 self._status.set_status("Sending %d initial queries" % len(peerlist))
500 self._queries_outstanding = set()
501 self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
503 for (peerid, ss) in peerlist.items():
504 self._queries_outstanding.add(peerid)
505 self._do_query(ss, peerid, self._storage_index, self._read_size)
508 # there is nobody to ask, so we need to short-circuit the state
510 d = defer.maybeDeferred(self._check_for_done, None)
511 d.addErrback(self._fatal_error)
513 # control flow beyond this point: state machine. Receiving responses
514 # from queries is the input. We might send out more queries, or we
515 # might produce a result.
518 def _do_query(self, ss, peerid, storage_index, readsize):
519 self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
520 peerid=idlib.shortnodeid_b2a(peerid),
523 self._servermap.connections[peerid] = ss
524 started = time.time()
525 self._queries_outstanding.add(peerid)
526 d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
527 d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
529 d.addErrback(self._query_failed, peerid)
530 # errors that aren't handled by _query_failed (and errors caused by
531 # _query_failed) get logged, but we still want to check for doneness.
532 d.addErrback(log.err)
533 d.addBoth(self._check_for_done)
534 d.addErrback(self._fatal_error)
537 def _do_read(self, ss, peerid, storage_index, shnums, readv):
538 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
540 renew_secret = self._node.get_renewal_secret(peerid)
541 cancel_secret = self._node.get_cancel_secret(peerid)
542 d2 = ss.callRemote("add_lease", storage_index,
543 renew_secret, cancel_secret)
544 dl = defer.DeferredList([d, d2], consumeErrors=True)
546 [(readv_success, readv_result),
547 (addlease_success, addlease_result)] = res
548 # ignore remote IndexError on the add_lease call. Propagate
549 # local errors and remote non-IndexErrors
552 if not addlease_result.check(RemoteException):
553 # Propagate local errors
554 return addlease_result
555 if addlease_result.value.failure.check(IndexError):
556 # tahoe=1.3.0 raised IndexError on non-existant
557 # buckets, which we ignore
559 # propagate remote errors that aren't IndexError, including
560 # the unfortunate internal KeyError bug that <1.3.0 had.
561 return addlease_result
562 dl.addCallback(_done)
566 def _got_results(self, datavs, peerid, readsize, stuff, started):
567 lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
568 peerid=idlib.shortnodeid_b2a(peerid),
569 numshares=len(datavs),
572 elapsed = now - started
573 self._queries_outstanding.discard(peerid)
574 self._servermap.reachable_peers.add(peerid)
575 self._must_query.discard(peerid)
576 self._queries_completed += 1
577 if not self._running:
578 self.log("but we're not running, so we'll ignore it", parent=lp,
580 self._status.add_per_server_time(peerid, "late", started, elapsed)
582 self._status.add_per_server_time(peerid, "query", started, elapsed)
585 self._good_peers.add(peerid)
587 self._empty_peers.add(peerid)
591 for shnum,datav in datavs.items():
594 verinfo = self._got_results_one_share(shnum, data, peerid, lp)
595 last_verinfo = verinfo
597 self._node._cache.add(verinfo, shnum, 0, data, now)
598 except CorruptShareError, e:
599 # log it and give the other shares a chance to be processed
600 f = failure.Failure()
601 self.log(format="bad share: %(f_value)s", f_value=str(f.value),
602 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
603 self.notify_server_corruption(peerid, shnum, str(e))
604 self._bad_peers.add(peerid)
605 self._last_failure = f
606 checkstring = data[:SIGNED_PREFIX_LENGTH]
607 self._servermap.mark_bad_share(peerid, shnum, checkstring)
608 self._servermap.problems.append(f)
611 self._status.timings["cumulative_verify"] += (time.time() - now)
613 if self._need_privkey and last_verinfo:
614 # send them a request for the privkey. We send one request per
616 lp2 = self.log("sending privkey request",
617 parent=lp, level=log.NOISY)
618 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
619 offsets_tuple) = last_verinfo
620 o = dict(offsets_tuple)
622 self._queries_outstanding.add(peerid)
623 readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
624 ss = self._servermap.connections[peerid]
625 privkey_started = time.time()
626 d = self._do_read(ss, peerid, self._storage_index,
628 d.addCallback(self._got_privkey_results, peerid, last_shnum,
629 privkey_started, lp2)
630 d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
631 d.addErrback(log.err)
632 d.addCallback(self._check_for_done)
633 d.addErrback(self._fatal_error)
636 self.log("_got_results done", parent=lp, level=log.NOISY)
638 def notify_server_corruption(self, peerid, shnum, reason):
639 ss = self._servermap.connections[peerid]
640 ss.callRemoteOnly("advise_corrupt_share",
641 "mutable", self._storage_index, shnum, reason)
643 def _got_results_one_share(self, shnum, data, peerid, lp):
644 self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
646 peerid=idlib.shortnodeid_b2a(peerid),
650 # this might raise NeedMoreDataError, if the pubkey and signature
651 # live at some weird offset. That shouldn't happen, so I'm going to
652 # treat it as a bad share.
653 (seqnum, root_hash, IV, k, N, segsize, datalength,
654 pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
656 if not self._node.get_pubkey():
657 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
658 assert len(fingerprint) == 32
659 if fingerprint != self._node._fingerprint:
660 raise CorruptShareError(peerid, shnum,
661 "pubkey doesn't match fingerprint")
662 self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
664 if self._need_privkey:
665 self._try_to_extract_privkey(data, peerid, shnum, lp)
667 (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
668 ig_segsize, ig_datalen, offsets) = unpack_header(data)
669 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
671 verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
674 if verinfo not in self._valid_versions:
675 # it's a new pair. Verify the signature.
676 valid = self._node._pubkey.verify(prefix, signature)
678 raise CorruptShareError(peerid, shnum, "signature is invalid")
680 # ok, it's a valid verinfo. Add it to the list of validated
682 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
683 % (seqnum, base32.b2a(root_hash)[:4],
684 idlib.shortnodeid_b2a(peerid), shnum,
685 k, N, segsize, datalength),
687 self._valid_versions.add(verinfo)
688 # We now know that this is a valid candidate verinfo.
690 if (peerid, shnum) in self._servermap.bad_shares:
691 # we've been told that the rest of the data in this share is
692 # unusable, so don't add it to the servermap.
693 self.log("but we've been told this is a bad share",
694 parent=lp, level=log.UNUSUAL)
697 # Add the info to our servermap.
698 timestamp = time.time()
699 self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
701 self.versionmap.add(verinfo, (shnum, peerid, timestamp))
704 def _deserialize_pubkey(self, pubkey_s):
705 verifier = rsa.create_verifying_key_from_string(pubkey_s)
708 def _try_to_extract_privkey(self, data, peerid, shnum, lp):
710 r = unpack_share(data)
711 except NeedMoreDataError, e:
712 # this share won't help us. oh well.
713 offset = e.encprivkey_offset
714 length = e.encprivkey_length
715 self.log("shnum %d on peerid %s: share was too short (%dB) "
716 "to get the encprivkey; [%d:%d] ought to hold it" %
717 (shnum, idlib.shortnodeid_b2a(peerid), len(data),
718 offset, offset+length),
720 # NOTE: if uncoordinated writes are taking place, someone might
721 # change the share (and most probably move the encprivkey) before
722 # we get a chance to do one of these reads and fetch it. This
723 # will cause us to see a NotEnoughSharesError(unable to fetch
724 # privkey) instead of an UncoordinatedWriteError . This is a
725 # nuisance, but it will go away when we move to DSA-based mutable
726 # files (since the privkey will be small enough to fit in the
731 (seqnum, root_hash, IV, k, N, segsize, datalen,
732 pubkey, signature, share_hash_chain, block_hash_tree,
733 share_data, enc_privkey) = r
735 return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
737 def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
739 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
740 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
741 if alleged_writekey != self._node.get_writekey():
742 self.log("invalid privkey from %s shnum %d" %
743 (idlib.nodeid_b2a(peerid)[:8], shnum),
744 parent=lp, level=log.WEIRD, umid="aJVccw")
748 self.log("got valid privkey from shnum %d on peerid %s" %
749 (shnum, idlib.shortnodeid_b2a(peerid)),
751 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
752 self._node._populate_encprivkey(enc_privkey)
753 self._node._populate_privkey(privkey)
754 self._need_privkey = False
755 self._status.set_privkey_from(peerid)
758 def _query_failed(self, f, peerid):
759 if not self._running:
762 if f.check(DeadReferenceError):
764 self.log(format="error during query: %(f_value)s",
765 f_value=str(f.value), failure=f,
766 level=level, umid="IHXuQg")
767 self._must_query.discard(peerid)
768 self._queries_outstanding.discard(peerid)
769 self._bad_peers.add(peerid)
770 self._servermap.problems.append(f)
771 # a peerid could be in both ServerMap.reachable_peers and
772 # .unreachable_peers if they responded to our query, but then an
773 # exception was raised in _got_results.
774 self._servermap.unreachable_peers.add(peerid)
775 self._queries_completed += 1
776 self._last_failure = f
778 def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
780 elapsed = now - started
781 self._status.add_per_server_time(peerid, "privkey", started, elapsed)
782 self._queries_outstanding.discard(peerid)
783 if not self._need_privkey:
785 if shnum not in datavs:
786 self.log("privkey wasn't there when we asked it",
787 level=log.WEIRD, umid="VA9uDQ")
789 datav = datavs[shnum]
790 enc_privkey = datav[0]
791 self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
793 def _privkey_query_failed(self, f, peerid, shnum, lp):
794 self._queries_outstanding.discard(peerid)
795 if not self._running:
798 if f.check(DeadReferenceError):
800 self.log(format="error during privkey query: %(f_value)s",
801 f_value=str(f.value), failure=f,
802 parent=lp, level=level, umid="McoJ5w")
803 self._servermap.problems.append(f)
804 self._last_failure = f
806 def _check_for_done(self, res):
808 # return self._send_more_queries(outstanding) : send some more queries
809 # return self._done() : all done
810 # return : keep waiting, no new queries
812 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
813 "%(outstanding)d queries outstanding, "
814 "%(extra)d extra peers available, "
815 "%(must)d 'must query' peers left, "
816 "need_privkey=%(need_privkey)s"
819 outstanding=len(self._queries_outstanding),
820 extra=len(self.extra_peers),
821 must=len(self._must_query),
822 need_privkey=self._need_privkey,
826 if not self._running:
827 self.log("but we're not running", parent=lp, level=log.NOISY)
831 # we are still waiting for responses from peers that used to have
832 # a share, so we must continue to wait. No additional queries are
833 # required at this time.
834 self.log("%d 'must query' peers left" % len(self._must_query),
835 level=log.NOISY, parent=lp)
838 if (not self._queries_outstanding and not self.extra_peers):
839 # all queries have retired, and we have no peers left to ask. No
840 # more progress can be made, therefore we are done.
841 self.log("all queries are retired, no extra peers: done",
845 recoverable_versions = self._servermap.recoverable_versions()
846 unrecoverable_versions = self._servermap.unrecoverable_versions()
848 # what is our completion policy? how hard should we work?
850 if self.mode == MODE_ANYTHING:
851 if recoverable_versions:
852 self.log("%d recoverable versions: done"
853 % len(recoverable_versions),
857 if self.mode == MODE_CHECK:
858 # we used self._must_query, and we know there aren't any
859 # responses still waiting, so that means we must be done
860 self.log("done", parent=lp)
864 if self.mode == MODE_READ:
865 # if we've queried k+epsilon servers, and we see a recoverable
866 # version, and we haven't seen any unrecoverable higher-seqnum'ed
867 # versions, then we're done.
869 if self._queries_completed < self.num_peers_to_query:
870 self.log(format="%(completed)d completed, %(query)d to query: need more",
871 completed=self._queries_completed,
872 query=self.num_peers_to_query,
873 level=log.NOISY, parent=lp)
874 return self._send_more_queries(MAX_IN_FLIGHT)
875 if not recoverable_versions:
876 self.log("no recoverable versions: need more",
877 level=log.NOISY, parent=lp)
878 return self._send_more_queries(MAX_IN_FLIGHT)
879 highest_recoverable = max(recoverable_versions)
880 highest_recoverable_seqnum = highest_recoverable[0]
881 for unrec_verinfo in unrecoverable_versions:
882 if unrec_verinfo[0] > highest_recoverable_seqnum:
883 # there is evidence of a higher-seqnum version, but we
884 # don't yet see enough shares to recover it. Try harder.
885 # TODO: consider sending more queries.
886 # TODO: consider limiting the search distance
887 self.log("evidence of higher seqnum: need more",
888 level=log.UNUSUAL, parent=lp)
889 return self._send_more_queries(MAX_IN_FLIGHT)
890 # all the unrecoverable versions were old or concurrent with a
891 # recoverable version. Good enough.
892 self.log("no higher-seqnum: done", parent=lp)
895 if self.mode == MODE_WRITE:
896 # we want to keep querying until we've seen a few that don't have
897 # any shares, to be sufficiently confident that we've seen all
898 # the shares. This is still less work than MODE_CHECK, which asks
899 # every server in the world.
901 if not recoverable_versions:
902 self.log("no recoverable versions: need more", parent=lp,
904 return self._send_more_queries(MAX_IN_FLIGHT)
907 last_not_responded = -1
908 num_not_responded = 0
911 found_boundary = False
913 for i,(peerid,ss) in enumerate(self.full_peerlist):
914 if peerid in self._bad_peers:
917 #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
918 elif peerid in self._empty_peers:
921 #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
924 if num_not_found >= self.EPSILON:
925 self.log("found our boundary, %s" %
927 parent=lp, level=log.NOISY)
928 found_boundary = True
931 elif peerid in self._good_peers:
934 #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
940 #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
941 last_not_responded = i
942 num_not_responded += 1
945 # we need to know that we've gotten answers from
946 # everybody to the left of here
947 if last_not_responded == -1:
949 self.log("have all our answers",
950 parent=lp, level=log.NOISY)
951 # .. unless we're still waiting on the privkey
952 if self._need_privkey:
953 self.log("but we're still waiting for the privkey",
954 parent=lp, level=log.NOISY)
955 # if we found the boundary but we haven't yet found
956 # the privkey, we may need to look further. If
957 # somehow all the privkeys were corrupted (but the
958 # shares were readable), then this is likely to do an
960 return self._send_more_queries(MAX_IN_FLIGHT)
962 # still waiting for somebody
963 return self._send_more_queries(num_not_responded)
965 # if we hit here, we didn't find our boundary, so we're still
967 self.log("no boundary yet, %s" % "".join(states), parent=lp,
969 return self._send_more_queries(MAX_IN_FLIGHT)
971 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
972 # arbitrary, really I want this to be something like k -
973 # max(known_version_sharecounts) + some extra
974 self.log("catchall: need more", parent=lp, level=log.NOISY)
975 return self._send_more_queries(MAX_IN_FLIGHT)
977 def _send_more_queries(self, num_outstanding):
981 self.log(format=" there are %(outstanding)d queries outstanding",
982 outstanding=len(self._queries_outstanding),
984 active_queries = len(self._queries_outstanding) + len(more_queries)
985 if active_queries >= num_outstanding:
987 if not self.extra_peers:
989 more_queries.append(self.extra_peers.pop(0))
991 self.log(format="sending %(more)d more queries: %(who)s",
992 more=len(more_queries),
993 who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
994 for (peerid,ss) in more_queries]),
997 for (peerid, ss) in more_queries:
998 self._do_query(ss, peerid, self._storage_index, self._read_size)
999 # we'll retrigger when those queries come back
1002 if not self._running:
1004 self._running = False
1006 elapsed = now - self._started
1007 self._status.set_finished(now)
1008 self._status.timings["total"] = elapsed
1009 self._status.set_progress(1.0)
1010 self._status.set_status("Done")
1011 self._status.set_active(False)
1013 self._servermap.last_update_mode = self.mode
1014 self._servermap.last_update_time = self._started
1015 # the servermap will not be touched after this
1016 self.log("servermap: %s" % self._servermap.summarize_versions())
1017 eventually(self._done_deferred.callback, self._servermap)
1019 def _fatal_error(self, f):
1020 self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1021 self._done_deferred.errback(f)