3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually
8 from allmydata.util import base32, hashutil, idlib, log
9 from allmydata.storage.server import si_b2a
10 from allmydata.interfaces import IServermapUpdaterStatus
11 from pycryptopp.publickey import rsa
13 from common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
14 DictOfSets, CorruptShareError, NeedMoreDataError
15 from layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
19 implements(IServermapUpdaterStatus)
20 statusid_counter = count(0)
23 self.timings["per_server"] = {}
24 self.timings["cumulative_verify"] = 0.0
25 self.privkey_from = None
28 self.storage_index = None
30 self.status = "Not started"
32 self.counter = self.statusid_counter.next()
33 self.started = time.time()
36 def add_per_server_time(self, peerid, op, sent, elapsed):
37 assert op in ("query", "late", "privkey")
38 if peerid not in self.timings["per_server"]:
39 self.timings["per_server"][peerid] = []
40 self.timings["per_server"][peerid].append((op,sent,elapsed))
42 def get_started(self):
44 def get_finished(self):
46 def get_storage_index(self):
47 return self.storage_index
50 def get_servermap(self):
52 def get_privkey_from(self):
53 return self.privkey_from
54 def using_helper(self):
60 def get_progress(self):
64 def get_counter(self):
67 def set_storage_index(self, si):
68 self.storage_index = si
69 def set_mode(self, mode):
71 def set_privkey_from(self, peerid):
72 self.privkey_from = peerid
73 def set_status(self, status):
75 def set_progress(self, value):
77 def set_active(self, value):
79 def set_finished(self, when):
83 """I record the placement of mutable shares.
85 This object records which shares (of various versions) are located on
88 One purpose I serve is to inform callers about which versions of the
89 mutable file are recoverable and 'current'.
91 A second purpose is to serve as a state marker for test-and-set
92 operations. I am passed out of retrieval operations and back into publish
93 operations, which means 'publish this new version, but only if nothing
94 has changed since I last retrieved this data'. This reduces the chances
95 of clobbering a simultaneous (uncoordinated) write.
97 @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
98 (versionid, timestamp) tuple. Each 'versionid' is a
99 tuple of (seqnum, root_hash, IV, segsize, datalength,
100 k, N, signed_prefix, offsets)
102 @ivar connections: maps peerid to a RemoteReference
104 @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
105 shares that I should ignore (because a previous user of
106 the servermap determined that they were invalid). The
107 updater only locates a certain number of shares: if
108 some of these turn out to have integrity problems and
109 are unusable, the caller will need to mark those shares
110 as bad, then re-update the servermap, then try again.
111 The dict maps (peerid, shnum) tuple to old checkstring.
116 self.connections = {}
117 self.unreachable_peers = set() # peerids that didn't respond to queries
118 self.reachable_peers = set() # peerids that did respond to queries
119 self.problems = [] # mostly for debugging
120 self.bad_shares = {} # maps (peerid,shnum) to old checkstring
121 self.last_update_mode = None
122 self.last_update_time = 0
126 s.servermap = self.servermap.copy() # tuple->tuple
127 s.connections = self.connections.copy() # str->RemoteReference
128 s.unreachable_peers = set(self.unreachable_peers)
129 s.reachable_peers = set(self.reachable_peers)
130 s.problems = self.problems[:]
131 s.bad_shares = self.bad_shares.copy() # tuple->str
132 s.last_update_mode = self.last_update_mode
133 s.last_update_time = self.last_update_time
136 def mark_bad_share(self, peerid, shnum, checkstring):
137 """This share was found to be bad, either in the checkstring or
138 signature (detected during mapupdate), or deeper in the share
139 (detected at retrieve time). Remove it from our list of useful
140 shares, and remember that it is bad so we don't add it back again
141 later. We record the share's old checkstring (which might be
142 corrupted or badly signed) so that a repair operation can do the
143 test-and-set using it as a reference.
145 key = (peerid, shnum) # record checkstring
146 self.bad_shares[key] = checkstring
147 self.servermap.pop(key, None)
149 def add_new_share(self, peerid, shnum, verinfo, timestamp):
150 """We've written a new share out, replacing any that was there
152 key = (peerid, shnum)
153 self.bad_shares.pop(key, None)
154 self.servermap[key] = (verinfo, timestamp)
156 def dump(self, out=sys.stdout):
157 print >>out, "servermap:"
159 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
160 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
161 offsets_tuple) = verinfo
162 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
163 (idlib.shortnodeid_b2a(peerid), shnum,
164 seqnum, base32.b2a(root_hash)[:4], k, N,
167 print >>out, "%d PROBLEMS" % len(self.problems)
168 for f in self.problems:
177 def all_peers_for_version(self, verinfo):
178 """Return a set of peerids that hold shares for the given version."""
180 for ( (peerid, shnum), (verinfo2, timestamp) )
181 in self.servermap.items()
182 if verinfo == verinfo2])
184 def make_sharemap(self):
185 """Return a dict that maps shnum to a set of peerds that hold it."""
186 sharemap = DictOfSets()
187 for (peerid, shnum) in self.servermap:
188 sharemap.add(shnum, peerid)
191 def make_versionmap(self):
192 """Return a dict that maps versionid to sets of (shnum, peerid,
193 timestamp) tuples."""
194 versionmap = DictOfSets()
195 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
196 versionmap.add(verinfo, (shnum, peerid, timestamp))
199 def shares_on_peer(self, peerid):
201 for (s_peerid, shnum)
203 if s_peerid == peerid])
205 def version_on_peer(self, peerid, shnum):
206 key = (peerid, shnum)
207 if key in self.servermap:
208 (verinfo, timestamp) = self.servermap[key]
212 def shares_available(self):
213 """Return a dict that maps verinfo to tuples of
214 (num_distinct_shares, k, N) tuples."""
215 versionmap = self.make_versionmap()
217 for verinfo, shares in versionmap.items():
219 for (shnum, peerid, timestamp) in shares:
221 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
222 offsets_tuple) = verinfo
223 all_shares[verinfo] = (len(s), k, N)
226 def highest_seqnum(self):
227 available = self.shares_available()
228 seqnums = [verinfo[0]
229 for verinfo in available.keys()]
233 def summarize_version(self, verinfo):
234 """Take a versionid, return a string that describes it."""
235 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
236 offsets_tuple) = verinfo
237 return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
239 def summarize_versions(self):
240 """Return a string describing which versions we know about."""
241 versionmap = self.make_versionmap()
243 for (verinfo, shares) in versionmap.items():
244 vstr = self.summarize_version(verinfo)
245 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
246 bits.append("%d*%s" % (len(shnums), vstr))
247 return "/".join(bits)
249 def recoverable_versions(self):
250 """Return a set of versionids, one for each version that is currently
252 versionmap = self.make_versionmap()
254 recoverable_versions = set()
255 for (verinfo, shares) in versionmap.items():
256 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
257 offsets_tuple) = verinfo
258 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
260 # this one is recoverable
261 recoverable_versions.add(verinfo)
263 return recoverable_versions
265 def unrecoverable_versions(self):
266 """Return a set of versionids, one for each version that is currently
268 versionmap = self.make_versionmap()
270 unrecoverable_versions = set()
271 for (verinfo, shares) in versionmap.items():
272 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
273 offsets_tuple) = verinfo
274 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
276 unrecoverable_versions.add(verinfo)
278 return unrecoverable_versions
280 def best_recoverable_version(self):
281 """Return a single versionid, for the so-called 'best' recoverable
282 version. Sequence number is the primary sort criteria, followed by
283 root hash. Returns None if there are no recoverable versions."""
284 recoverable = list(self.recoverable_versions())
287 return recoverable[-1]
290 def size_of_version(self, verinfo):
291 """Given a versionid (perhaps returned by best_recoverable_version),
292 return the size of the file in bytes."""
293 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
294 offsets_tuple) = verinfo
297 def unrecoverable_newer_versions(self):
298 # Return a dict of versionid -> health, for versions that are
299 # unrecoverable and have later seqnums than any recoverable versions.
300 # These indicate that a write will lose data.
301 versionmap = self.make_versionmap()
302 healths = {} # maps verinfo to (found,k)
303 unrecoverable = set()
304 highest_recoverable_seqnum = -1
305 for (verinfo, shares) in versionmap.items():
306 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
307 offsets_tuple) = verinfo
308 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
309 healths[verinfo] = (len(shnums),k)
311 unrecoverable.add(verinfo)
313 highest_recoverable_seqnum = max(seqnum,
314 highest_recoverable_seqnum)
317 for verinfo in unrecoverable:
318 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
319 offsets_tuple) = verinfo
320 if seqnum > highest_recoverable_seqnum:
321 newversions[verinfo] = healths[verinfo]
326 def needs_merge(self):
327 # return True if there are multiple recoverable versions with the
328 # same seqnum, meaning that MutableFileNode.read_best_version is not
329 # giving you the whole story, and that using its data to do a
330 # subsequent publish will lose information.
331 recoverable_seqnums = [verinfo[0]
332 for verinfo in self.recoverable_versions()]
333 for seqnum in recoverable_seqnums:
334 if recoverable_seqnums.count(seqnum) > 1:
339 class ServermapUpdater:
340 def __init__(self, filenode, monitor, servermap, mode=MODE_READ,
342 """I update a servermap, locating a sufficient number of useful
343 shares and remembering where they are located.
347 self._node = filenode
348 self._monitor = monitor
349 self._servermap = servermap
351 self._add_lease = add_lease
354 self._storage_index = filenode.get_storage_index()
355 self._last_failure = None
357 self._status = UpdateStatus()
358 self._status.set_storage_index(self._storage_index)
359 self._status.set_progress(0.0)
360 self._status.set_mode(mode)
362 self._servers_responded = set()
364 # how much data should we read?
365 # * if we only need the checkstring, then [0:75]
366 # * if we need to validate the checkstring sig, then [543ish:799ish]
367 # * if we need the verification key, then [107:436ish]
368 # * the offset table at [75:107] tells us about the 'ish'
369 # * if we need the encrypted private key, we want [-1216ish:]
370 # * but we can't read from negative offsets
371 # * the offset table tells us the 'ish', also the positive offset
372 # A future version of the SMDF slot format should consider using
373 # fixed-size slots so we can retrieve less data. For now, we'll just
374 # read 2000 bytes, which also happens to read enough actual data to
375 # pre-fetch a 9-entry dirnode.
376 self._read_size = 4000
377 if mode == MODE_CHECK:
378 # we use unpack_prefix_and_signature, so we need 1k
379 self._read_size = 1000
380 self._need_privkey = False
381 if mode == MODE_WRITE and not self._node._privkey:
382 self._need_privkey = True
383 # check+repair: repair requires the privkey, so if we didn't happen
384 # to ask for it during the check, we'll have problems doing the
387 prefix = si_b2a(self._storage_index)[:5]
388 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
389 si=prefix, mode=mode)
391 def get_status(self):
394 def log(self, *args, **kwargs):
395 if "parent" not in kwargs:
396 kwargs["parent"] = self._log_number
397 if "facility" not in kwargs:
398 kwargs["facility"] = "tahoe.mutable.mapupdate"
399 return log.msg(*args, **kwargs)
402 """Update the servermap to reflect current conditions. Returns a
403 Deferred that fires with the servermap once the update has finished."""
404 self._started = time.time()
405 self._status.set_active(True)
407 # self._valid_versions is a set of validated verinfo tuples. We just
408 # use it to remember which versions had valid signatures, so we can
409 # avoid re-checking the signatures for each share.
410 self._valid_versions = set()
412 # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
413 # timestamp) tuples. This is used to figure out which versions might
414 # be retrievable, and to make the eventual data download faster.
415 self.versionmap = DictOfSets()
417 self._done_deferred = defer.Deferred()
419 # first, which peers should be talk to? Any that were in our old
420 # servermap, plus "enough" others.
422 self._queries_completed = 0
424 client = self._node._client
425 full_peerlist = client.get_permuted_peers("storage",
426 self._node._storage_index)
427 self.full_peerlist = full_peerlist # for use later, immutable
428 self.extra_peers = full_peerlist[:] # peers are removed as we use them
429 self._good_peers = set() # peers who had some shares
430 self._empty_peers = set() # peers who don't have any shares
431 self._bad_peers = set() # peers to whom our queries failed
433 k = self._node.get_required_shares()
437 N = self._node.get_required_shares()
441 # we want to send queries to at least this many peers (although we
442 # might not wait for all of their answers to come back)
443 self.num_peers_to_query = k + self.EPSILON
445 if self.mode == MODE_CHECK:
446 initial_peers_to_query = dict(full_peerlist)
447 must_query = set(initial_peers_to_query.keys())
448 self.extra_peers = []
449 elif self.mode == MODE_WRITE:
450 # we're planning to replace all the shares, so we want a good
451 # chance of finding them all. We will keep searching until we've
452 # seen epsilon that don't have a share.
453 self.num_peers_to_query = N + self.EPSILON
454 initial_peers_to_query, must_query = self._build_initial_querylist()
455 self.required_num_empty_peers = self.EPSILON
457 # TODO: arrange to read lots of data from k-ish servers, to avoid
458 # the extra round trip required to read large directories. This
459 # might also avoid the round trip required to read the encrypted
463 initial_peers_to_query, must_query = self._build_initial_querylist()
465 # this is a set of peers that we are required to get responses from:
466 # they are peers who used to have a share, so we need to know where
467 # they currently stand, even if that means we have to wait for a
468 # silently-lost TCP connection to time out. We remove peers from this
469 # set as we get responses.
470 self._must_query = must_query
472 # now initial_peers_to_query contains the peers that we should ask,
473 # self.must_query contains the peers that we must have heard from
474 # before we can consider ourselves finished, and self.extra_peers
475 # contains the overflow (peers that we should tap if we don't get
478 self._send_initial_requests(initial_peers_to_query)
479 self._status.timings["initial_queries"] = time.time() - self._started
480 return self._done_deferred
482 def _build_initial_querylist(self):
483 initial_peers_to_query = {}
485 for peerid in self._servermap.all_peers():
486 ss = self._servermap.connections[peerid]
487 # we send queries to everyone who was already in the sharemap
488 initial_peers_to_query[peerid] = ss
489 # and we must wait for responses from them
490 must_query.add(peerid)
492 while ((self.num_peers_to_query > len(initial_peers_to_query))
493 and self.extra_peers):
494 (peerid, ss) = self.extra_peers.pop(0)
495 initial_peers_to_query[peerid] = ss
497 return initial_peers_to_query, must_query
499 def _send_initial_requests(self, peerlist):
500 self._status.set_status("Sending %d initial queries" % len(peerlist))
501 self._queries_outstanding = set()
502 self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
504 for (peerid, ss) in peerlist.items():
505 self._queries_outstanding.add(peerid)
506 self._do_query(ss, peerid, self._storage_index, self._read_size)
509 # there is nobody to ask, so we need to short-circuit the state
511 d = defer.maybeDeferred(self._check_for_done, None)
512 d.addErrback(self._fatal_error)
514 # control flow beyond this point: state machine. Receiving responses
515 # from queries is the input. We might send out more queries, or we
516 # might produce a result.
519 def _do_query(self, ss, peerid, storage_index, readsize):
520 self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
521 peerid=idlib.shortnodeid_b2a(peerid),
524 self._servermap.connections[peerid] = ss
525 started = time.time()
526 self._queries_outstanding.add(peerid)
527 d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
528 d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
530 d.addErrback(self._query_failed, peerid)
531 # errors that aren't handled by _query_failed (and errors caused by
532 # _query_failed) get logged, but we still want to check for doneness.
533 d.addErrback(log.err)
534 d.addBoth(self._check_for_done)
535 d.addErrback(self._fatal_error)
538 def _do_read(self, ss, peerid, storage_index, shnums, readv):
539 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
541 renew_secret = self._node.get_renewal_secret(peerid)
542 cancel_secret = self._node.get_cancel_secret(peerid)
543 d2 = ss.callRemote("add_lease", storage_index,
544 renew_secret, cancel_secret)
545 dl = defer.DeferredList([d, d2], consumeErrors=True)
547 [(readv_success, readv_result),
548 (addlease_success, addlease_result)] = res
549 # ignore remote IndexError on the add_lease call. Propagate
550 # local errors and remote non-IndexErrors
553 if not addlease_result.check(RemoteException):
554 # Propagate local errors
555 return addlease_result
556 if addlease_result.value.failure.check(IndexError):
557 # tahoe=1.3.0 raised IndexError on non-existant
558 # buckets, which we ignore
560 # propagate remote errors that aren't IndexError, including
561 # the unfortunate internal KeyError bug that <1.3.0 had.
562 return addlease_result
563 dl.addCallback(_done)
567 def _got_results(self, datavs, peerid, readsize, stuff, started):
568 lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
569 peerid=idlib.shortnodeid_b2a(peerid),
570 numshares=len(datavs),
573 elapsed = now - started
574 self._queries_outstanding.discard(peerid)
575 self._servermap.reachable_peers.add(peerid)
576 self._must_query.discard(peerid)
577 self._queries_completed += 1
578 if not self._running:
579 self.log("but we're not running, so we'll ignore it", parent=lp,
581 self._status.add_per_server_time(peerid, "late", started, elapsed)
583 self._status.add_per_server_time(peerid, "query", started, elapsed)
586 self._good_peers.add(peerid)
588 self._empty_peers.add(peerid)
592 for shnum,datav in datavs.items():
595 verinfo = self._got_results_one_share(shnum, data, peerid, lp)
596 last_verinfo = verinfo
598 self._node._cache.add(verinfo, shnum, 0, data, now)
599 except CorruptShareError, e:
600 # log it and give the other shares a chance to be processed
601 f = failure.Failure()
602 self.log(format="bad share: %(f_value)s", f_value=str(f.value),
603 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
604 self.notify_server_corruption(peerid, shnum, str(e))
605 self._bad_peers.add(peerid)
606 self._last_failure = f
607 checkstring = data[:SIGNED_PREFIX_LENGTH]
608 self._servermap.mark_bad_share(peerid, shnum, checkstring)
609 self._servermap.problems.append(f)
612 self._status.timings["cumulative_verify"] += (time.time() - now)
614 if self._need_privkey and last_verinfo:
615 # send them a request for the privkey. We send one request per
617 lp2 = self.log("sending privkey request",
618 parent=lp, level=log.NOISY)
619 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
620 offsets_tuple) = last_verinfo
621 o = dict(offsets_tuple)
623 self._queries_outstanding.add(peerid)
624 readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
625 ss = self._servermap.connections[peerid]
626 privkey_started = time.time()
627 d = self._do_read(ss, peerid, self._storage_index,
629 d.addCallback(self._got_privkey_results, peerid, last_shnum,
630 privkey_started, lp2)
631 d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
632 d.addErrback(log.err)
633 d.addCallback(self._check_for_done)
634 d.addErrback(self._fatal_error)
637 self.log("_got_results done", parent=lp, level=log.NOISY)
639 def notify_server_corruption(self, peerid, shnum, reason):
640 ss = self._servermap.connections[peerid]
641 ss.callRemoteOnly("advise_corrupt_share",
642 "mutable", self._storage_index, shnum, reason)
644 def _got_results_one_share(self, shnum, data, peerid, lp):
645 self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
647 peerid=idlib.shortnodeid_b2a(peerid),
651 # this might raise NeedMoreDataError, if the pubkey and signature
652 # live at some weird offset. That shouldn't happen, so I'm going to
653 # treat it as a bad share.
654 (seqnum, root_hash, IV, k, N, segsize, datalength,
655 pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
657 if not self._node.get_pubkey():
658 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
659 assert len(fingerprint) == 32
660 if fingerprint != self._node._fingerprint:
661 raise CorruptShareError(peerid, shnum,
662 "pubkey doesn't match fingerprint")
663 self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
665 if self._need_privkey:
666 self._try_to_extract_privkey(data, peerid, shnum, lp)
668 (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
669 ig_segsize, ig_datalen, offsets) = unpack_header(data)
670 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
672 verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
675 if verinfo not in self._valid_versions:
676 # it's a new pair. Verify the signature.
677 valid = self._node._pubkey.verify(prefix, signature)
679 raise CorruptShareError(peerid, shnum, "signature is invalid")
681 # ok, it's a valid verinfo. Add it to the list of validated
683 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
684 % (seqnum, base32.b2a(root_hash)[:4],
685 idlib.shortnodeid_b2a(peerid), shnum,
686 k, N, segsize, datalength),
688 self._valid_versions.add(verinfo)
689 # We now know that this is a valid candidate verinfo.
691 if (peerid, shnum) in self._servermap.bad_shares:
692 # we've been told that the rest of the data in this share is
693 # unusable, so don't add it to the servermap.
694 self.log("but we've been told this is a bad share",
695 parent=lp, level=log.UNUSUAL)
698 # Add the info to our servermap.
699 timestamp = time.time()
700 self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
702 self.versionmap.add(verinfo, (shnum, peerid, timestamp))
705 def _deserialize_pubkey(self, pubkey_s):
706 verifier = rsa.create_verifying_key_from_string(pubkey_s)
709 def _try_to_extract_privkey(self, data, peerid, shnum, lp):
711 r = unpack_share(data)
712 except NeedMoreDataError, e:
713 # this share won't help us. oh well.
714 offset = e.encprivkey_offset
715 length = e.encprivkey_length
716 self.log("shnum %d on peerid %s: share was too short (%dB) "
717 "to get the encprivkey; [%d:%d] ought to hold it" %
718 (shnum, idlib.shortnodeid_b2a(peerid), len(data),
719 offset, offset+length),
721 # NOTE: if uncoordinated writes are taking place, someone might
722 # change the share (and most probably move the encprivkey) before
723 # we get a chance to do one of these reads and fetch it. This
724 # will cause us to see a NotEnoughSharesError(unable to fetch
725 # privkey) instead of an UncoordinatedWriteError . This is a
726 # nuisance, but it will go away when we move to DSA-based mutable
727 # files (since the privkey will be small enough to fit in the
732 (seqnum, root_hash, IV, k, N, segsize, datalen,
733 pubkey, signature, share_hash_chain, block_hash_tree,
734 share_data, enc_privkey) = r
736 return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
738 def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
740 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
741 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
742 if alleged_writekey != self._node.get_writekey():
743 self.log("invalid privkey from %s shnum %d" %
744 (idlib.nodeid_b2a(peerid)[:8], shnum),
745 parent=lp, level=log.WEIRD, umid="aJVccw")
749 self.log("got valid privkey from shnum %d on peerid %s" %
750 (shnum, idlib.shortnodeid_b2a(peerid)),
752 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
753 self._node._populate_encprivkey(enc_privkey)
754 self._node._populate_privkey(privkey)
755 self._need_privkey = False
756 self._status.set_privkey_from(peerid)
759 def _query_failed(self, f, peerid):
760 if not self._running:
763 if f.check(DeadReferenceError):
765 self.log(format="error during query: %(f_value)s",
766 f_value=str(f.value), failure=f,
767 level=level, umid="IHXuQg")
768 self._must_query.discard(peerid)
769 self._queries_outstanding.discard(peerid)
770 self._bad_peers.add(peerid)
771 self._servermap.problems.append(f)
772 # a peerid could be in both ServerMap.reachable_peers and
773 # .unreachable_peers if they responded to our query, but then an
774 # exception was raised in _got_results.
775 self._servermap.unreachable_peers.add(peerid)
776 self._queries_completed += 1
777 self._last_failure = f
779 def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
781 elapsed = now - started
782 self._status.add_per_server_time(peerid, "privkey", started, elapsed)
783 self._queries_outstanding.discard(peerid)
784 if not self._need_privkey:
786 if shnum not in datavs:
787 self.log("privkey wasn't there when we asked it",
788 level=log.WEIRD, umid="VA9uDQ")
790 datav = datavs[shnum]
791 enc_privkey = datav[0]
792 self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
794 def _privkey_query_failed(self, f, peerid, shnum, lp):
795 self._queries_outstanding.discard(peerid)
796 if not self._running:
799 if f.check(DeadReferenceError):
801 self.log(format="error during privkey query: %(f_value)s",
802 f_value=str(f.value), failure=f,
803 parent=lp, level=level, umid="McoJ5w")
804 self._servermap.problems.append(f)
805 self._last_failure = f
807 def _check_for_done(self, res):
809 # return self._send_more_queries(outstanding) : send some more queries
810 # return self._done() : all done
811 # return : keep waiting, no new queries
813 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
814 "%(outstanding)d queries outstanding, "
815 "%(extra)d extra peers available, "
816 "%(must)d 'must query' peers left, "
817 "need_privkey=%(need_privkey)s"
820 outstanding=len(self._queries_outstanding),
821 extra=len(self.extra_peers),
822 must=len(self._must_query),
823 need_privkey=self._need_privkey,
827 if not self._running:
828 self.log("but we're not running", parent=lp, level=log.NOISY)
832 # we are still waiting for responses from peers that used to have
833 # a share, so we must continue to wait. No additional queries are
834 # required at this time.
835 self.log("%d 'must query' peers left" % len(self._must_query),
836 level=log.NOISY, parent=lp)
839 if (not self._queries_outstanding and not self.extra_peers):
840 # all queries have retired, and we have no peers left to ask. No
841 # more progress can be made, therefore we are done.
842 self.log("all queries are retired, no extra peers: done",
846 recoverable_versions = self._servermap.recoverable_versions()
847 unrecoverable_versions = self._servermap.unrecoverable_versions()
849 # what is our completion policy? how hard should we work?
851 if self.mode == MODE_ANYTHING:
852 if recoverable_versions:
853 self.log("%d recoverable versions: done"
854 % len(recoverable_versions),
858 if self.mode == MODE_CHECK:
859 # we used self._must_query, and we know there aren't any
860 # responses still waiting, so that means we must be done
861 self.log("done", parent=lp)
865 if self.mode == MODE_READ:
866 # if we've queried k+epsilon servers, and we see a recoverable
867 # version, and we haven't seen any unrecoverable higher-seqnum'ed
868 # versions, then we're done.
870 if self._queries_completed < self.num_peers_to_query:
871 self.log(format="%(completed)d completed, %(query)d to query: need more",
872 completed=self._queries_completed,
873 query=self.num_peers_to_query,
874 level=log.NOISY, parent=lp)
875 return self._send_more_queries(MAX_IN_FLIGHT)
876 if not recoverable_versions:
877 self.log("no recoverable versions: need more",
878 level=log.NOISY, parent=lp)
879 return self._send_more_queries(MAX_IN_FLIGHT)
880 highest_recoverable = max(recoverable_versions)
881 highest_recoverable_seqnum = highest_recoverable[0]
882 for unrec_verinfo in unrecoverable_versions:
883 if unrec_verinfo[0] > highest_recoverable_seqnum:
884 # there is evidence of a higher-seqnum version, but we
885 # don't yet see enough shares to recover it. Try harder.
886 # TODO: consider sending more queries.
887 # TODO: consider limiting the search distance
888 self.log("evidence of higher seqnum: need more",
889 level=log.UNUSUAL, parent=lp)
890 return self._send_more_queries(MAX_IN_FLIGHT)
891 # all the unrecoverable versions were old or concurrent with a
892 # recoverable version. Good enough.
893 self.log("no higher-seqnum: done", parent=lp)
896 if self.mode == MODE_WRITE:
897 # we want to keep querying until we've seen a few that don't have
898 # any shares, to be sufficiently confident that we've seen all
899 # the shares. This is still less work than MODE_CHECK, which asks
900 # every server in the world.
902 if not recoverable_versions:
903 self.log("no recoverable versions: need more", parent=lp,
905 return self._send_more_queries(MAX_IN_FLIGHT)
908 last_not_responded = -1
909 num_not_responded = 0
912 found_boundary = False
914 for i,(peerid,ss) in enumerate(self.full_peerlist):
915 if peerid in self._bad_peers:
918 #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
919 elif peerid in self._empty_peers:
922 #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
925 if num_not_found >= self.EPSILON:
926 self.log("found our boundary, %s" %
928 parent=lp, level=log.NOISY)
929 found_boundary = True
932 elif peerid in self._good_peers:
935 #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
941 #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
942 last_not_responded = i
943 num_not_responded += 1
946 # we need to know that we've gotten answers from
947 # everybody to the left of here
948 if last_not_responded == -1:
950 self.log("have all our answers",
951 parent=lp, level=log.NOISY)
952 # .. unless we're still waiting on the privkey
953 if self._need_privkey:
954 self.log("but we're still waiting for the privkey",
955 parent=lp, level=log.NOISY)
956 # if we found the boundary but we haven't yet found
957 # the privkey, we may need to look further. If
958 # somehow all the privkeys were corrupted (but the
959 # shares were readable), then this is likely to do an
961 return self._send_more_queries(MAX_IN_FLIGHT)
963 # still waiting for somebody
964 return self._send_more_queries(num_not_responded)
966 # if we hit here, we didn't find our boundary, so we're still
968 self.log("no boundary yet, %s" % "".join(states), parent=lp,
970 return self._send_more_queries(MAX_IN_FLIGHT)
972 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
973 # arbitrary, really I want this to be something like k -
974 # max(known_version_sharecounts) + some extra
975 self.log("catchall: need more", parent=lp, level=log.NOISY)
976 return self._send_more_queries(MAX_IN_FLIGHT)
978 def _send_more_queries(self, num_outstanding):
982 self.log(format=" there are %(outstanding)d queries outstanding",
983 outstanding=len(self._queries_outstanding),
985 active_queries = len(self._queries_outstanding) + len(more_queries)
986 if active_queries >= num_outstanding:
988 if not self.extra_peers:
990 more_queries.append(self.extra_peers.pop(0))
992 self.log(format="sending %(more)d more queries: %(who)s",
993 more=len(more_queries),
994 who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
995 for (peerid,ss) in more_queries]),
998 for (peerid, ss) in more_queries:
999 self._do_query(ss, peerid, self._storage_index, self._read_size)
1000 # we'll retrigger when those queries come back
1003 if not self._running:
1005 self._running = False
1007 elapsed = now - self._started
1008 self._status.set_finished(now)
1009 self._status.timings["total"] = elapsed
1010 self._status.set_progress(1.0)
1011 self._status.set_status("Done")
1012 self._status.set_active(False)
1014 self._servermap.last_update_mode = self.mode
1015 self._servermap.last_update_time = self._started
1016 # the servermap will not be touched after this
1017 self.log("servermap: %s" % self._servermap.summarize_versions())
1018 eventually(self._done_deferred.callback, self._servermap)
1020 def _fatal_error(self, f):
1021 self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1022 self._done_deferred.errback(f)