3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually, \
9 from allmydata.util import base32, hashutil, idlib, log, deferredutil
10 from allmydata.util.dictutil import DictOfSets
11 from allmydata.storage.server import si_b2a
12 from allmydata.interfaces import IServermapUpdaterStatus
13 from pycryptopp.publickey import rsa
15 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
17 from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
20 implements(IServermapUpdaterStatus)
21 statusid_counter = count(0)
24 self.timings["per_server"] = {}
25 self.timings["cumulative_verify"] = 0.0
26 self.privkey_from = None
29 self.storage_index = None
31 self.status = "Not started"
33 self.counter = self.statusid_counter.next()
34 self.started = time.time()
37 def add_per_server_time(self, peerid, op, sent, elapsed):
38 assert op in ("query", "late", "privkey")
39 if peerid not in self.timings["per_server"]:
40 self.timings["per_server"][peerid] = []
41 self.timings["per_server"][peerid].append((op,sent,elapsed))
43 def get_started(self):
45 def get_finished(self):
47 def get_storage_index(self):
48 return self.storage_index
51 def get_servermap(self):
53 def get_privkey_from(self):
54 return self.privkey_from
55 def using_helper(self):
61 def get_progress(self):
65 def get_counter(self):
68 def set_storage_index(self, si):
69 self.storage_index = si
70 def set_mode(self, mode):
72 def set_privkey_from(self, peerid):
73 self.privkey_from = peerid
74 def set_status(self, status):
76 def set_progress(self, value):
78 def set_active(self, value):
80 def set_finished(self, when):
84 """I record the placement of mutable shares.
86 This object records which shares (of various versions) are located on
89 One purpose I serve is to inform callers about which versions of the
90 mutable file are recoverable and 'current'.
92 A second purpose is to serve as a state marker for test-and-set
93 operations. I am passed out of retrieval operations and back into publish
94 operations, which means 'publish this new version, but only if nothing
95 has changed since I last retrieved this data'. This reduces the chances
96 of clobbering a simultaneous (uncoordinated) write.
98 @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
99 (versionid, timestamp) tuple. Each 'versionid' is a
100 tuple of (seqnum, root_hash, IV, segsize, datalength,
101 k, N, signed_prefix, offsets)
103 @ivar connections: maps peerid to a RemoteReference
105 @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
106 shares that I should ignore (because a previous user of
107 the servermap determined that they were invalid). The
108 updater only locates a certain number of shares: if
109 some of these turn out to have integrity problems and
110 are unusable, the caller will need to mark those shares
111 as bad, then re-update the servermap, then try again.
112 The dict maps (peerid, shnum) tuple to old checkstring.
117 self.connections = {}
118 self.unreachable_peers = set() # peerids that didn't respond to queries
119 self.reachable_peers = set() # peerids that did respond to queries
120 self.problems = [] # mostly for debugging
121 self.bad_shares = {} # maps (peerid,shnum) to old checkstring
122 self.last_update_mode = None
123 self.last_update_time = 0
124 self.update_data = {} # (verinfo,shnum) => data
128 s.servermap = self.servermap.copy() # tuple->tuple
129 s.connections = self.connections.copy() # str->RemoteReference
130 s.unreachable_peers = set(self.unreachable_peers)
131 s.reachable_peers = set(self.reachable_peers)
132 s.problems = self.problems[:]
133 s.bad_shares = self.bad_shares.copy() # tuple->str
134 s.last_update_mode = self.last_update_mode
135 s.last_update_time = self.last_update_time
138 def mark_bad_share(self, peerid, shnum, checkstring):
139 """This share was found to be bad, either in the checkstring or
140 signature (detected during mapupdate), or deeper in the share
141 (detected at retrieve time). Remove it from our list of useful
142 shares, and remember that it is bad so we don't add it back again
143 later. We record the share's old checkstring (which might be
144 corrupted or badly signed) so that a repair operation can do the
145 test-and-set using it as a reference.
147 key = (peerid, shnum) # record checkstring
148 self.bad_shares[key] = checkstring
149 self.servermap.pop(key, None)
151 def add_new_share(self, peerid, shnum, verinfo, timestamp):
152 """We've written a new share out, replacing any that was there
154 key = (peerid, shnum)
155 self.bad_shares.pop(key, None)
156 self.servermap[key] = (verinfo, timestamp)
158 def dump(self, out=sys.stdout):
159 print >>out, "servermap:"
161 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
162 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
163 offsets_tuple) = verinfo
164 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
165 (idlib.shortnodeid_b2a(peerid), shnum,
166 seqnum, base32.b2a(root_hash)[:4], k, N,
169 print >>out, "%d PROBLEMS" % len(self.problems)
170 for f in self.problems:
179 def all_peers_for_version(self, verinfo):
180 """Return a set of peerids that hold shares for the given version."""
182 for ( (peerid, shnum), (verinfo2, timestamp) )
183 in self.servermap.items()
184 if verinfo == verinfo2])
186 def make_sharemap(self):
187 """Return a dict that maps shnum to a set of peerds that hold it."""
188 sharemap = DictOfSets()
189 for (peerid, shnum) in self.servermap:
190 sharemap.add(shnum, peerid)
193 def make_versionmap(self):
194 """Return a dict that maps versionid to sets of (shnum, peerid,
195 timestamp) tuples."""
196 versionmap = DictOfSets()
197 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
198 versionmap.add(verinfo, (shnum, peerid, timestamp))
201 def shares_on_peer(self, peerid):
203 for (s_peerid, shnum)
205 if s_peerid == peerid])
207 def version_on_peer(self, peerid, shnum):
208 key = (peerid, shnum)
209 if key in self.servermap:
210 (verinfo, timestamp) = self.servermap[key]
214 def shares_available(self):
215 """Return a dict that maps verinfo to tuples of
216 (num_distinct_shares, k, N) tuples."""
217 versionmap = self.make_versionmap()
219 for verinfo, shares in versionmap.items():
221 for (shnum, peerid, timestamp) in shares:
223 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
224 offsets_tuple) = verinfo
225 all_shares[verinfo] = (len(s), k, N)
228 def highest_seqnum(self):
229 available = self.shares_available()
230 seqnums = [verinfo[0]
231 for verinfo in available.keys()]
235 def summarize_version(self, verinfo):
236 """Take a versionid, return a string that describes it."""
237 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
238 offsets_tuple) = verinfo
239 return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
241 def summarize_versions(self):
242 """Return a string describing which versions we know about."""
243 versionmap = self.make_versionmap()
245 for (verinfo, shares) in versionmap.items():
246 vstr = self.summarize_version(verinfo)
247 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
248 bits.append("%d*%s" % (len(shnums), vstr))
249 return "/".join(bits)
251 def recoverable_versions(self):
252 """Return a set of versionids, one for each version that is currently
254 versionmap = self.make_versionmap()
255 recoverable_versions = set()
256 for (verinfo, shares) in versionmap.items():
257 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
258 offsets_tuple) = verinfo
259 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
261 # this one is recoverable
262 recoverable_versions.add(verinfo)
264 return recoverable_versions
266 def unrecoverable_versions(self):
267 """Return a set of versionids, one for each version that is currently
269 versionmap = self.make_versionmap()
271 unrecoverable_versions = set()
272 for (verinfo, shares) in versionmap.items():
273 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
274 offsets_tuple) = verinfo
275 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
277 unrecoverable_versions.add(verinfo)
279 return unrecoverable_versions
281 def best_recoverable_version(self):
282 """Return a single versionid, for the so-called 'best' recoverable
283 version. Sequence number is the primary sort criteria, followed by
284 root hash. Returns None if there are no recoverable versions."""
285 recoverable = list(self.recoverable_versions())
288 return recoverable[-1]
291 def size_of_version(self, verinfo):
292 """Given a versionid (perhaps returned by best_recoverable_version),
293 return the size of the file in bytes."""
294 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
295 offsets_tuple) = verinfo
298 def unrecoverable_newer_versions(self):
299 # Return a dict of versionid -> health, for versions that are
300 # unrecoverable and have later seqnums than any recoverable versions.
301 # These indicate that a write will lose data.
302 versionmap = self.make_versionmap()
303 healths = {} # maps verinfo to (found,k)
304 unrecoverable = set()
305 highest_recoverable_seqnum = -1
306 for (verinfo, shares) in versionmap.items():
307 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
308 offsets_tuple) = verinfo
309 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
310 healths[verinfo] = (len(shnums),k)
312 unrecoverable.add(verinfo)
314 highest_recoverable_seqnum = max(seqnum,
315 highest_recoverable_seqnum)
318 for verinfo in unrecoverable:
319 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
320 offsets_tuple) = verinfo
321 if seqnum > highest_recoverable_seqnum:
322 newversions[verinfo] = healths[verinfo]
327 def needs_merge(self):
328 # return True if there are multiple recoverable versions with the
329 # same seqnum, meaning that MutableFileNode.read_best_version is not
330 # giving you the whole story, and that using its data to do a
331 # subsequent publish will lose information.
332 recoverable_seqnums = [verinfo[0]
333 for verinfo in self.recoverable_versions()]
334 for seqnum in recoverable_seqnums:
335 if recoverable_seqnums.count(seqnum) > 1:
340 def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
342 I return the update data for the given shnum
344 update_data = self.update_data[shnum]
345 update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
349 def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
351 I record the block hash tree for the given shnum.
353 self.update_data.setdefault(shnum , []).append((verinfo, data))
356 class ServermapUpdater:
357 def __init__(self, filenode, storage_broker, monitor, servermap,
358 mode=MODE_READ, add_lease=False, update_range=None):
359 """I update a servermap, locating a sufficient number of useful
360 shares and remembering where they are located.
364 self._node = filenode
365 self._storage_broker = storage_broker
366 self._monitor = monitor
367 self._servermap = servermap
369 self._add_lease = add_lease
372 self._storage_index = filenode.get_storage_index()
373 self._last_failure = None
375 self._status = UpdateStatus()
376 self._status.set_storage_index(self._storage_index)
377 self._status.set_progress(0.0)
378 self._status.set_mode(mode)
380 self._servers_responded = set()
382 # how much data should we read?
384 # * if we only need the checkstring, then [0:75]
385 # * if we need to validate the checkstring sig, then [543ish:799ish]
386 # * if we need the verification key, then [107:436ish]
387 # * the offset table at [75:107] tells us about the 'ish'
388 # * if we need the encrypted private key, we want [-1216ish:]
389 # * but we can't read from negative offsets
390 # * the offset table tells us the 'ish', also the positive offset
392 # * Checkstring? [0:72]
393 # * If we want to validate the checkstring, then [0:72], [143:?] --
394 # the offset table will tell us for sure.
395 # * If we need the verification key, we have to consult the offset
397 # At this point, we don't know which we are. Our filenode can
398 # tell us, but it might be lying -- in some cases, we're
399 # responsible for telling it which kind of file it is.
400 self._read_size = 4000
401 if mode == MODE_CHECK:
402 # we use unpack_prefix_and_signature, so we need 1k
403 self._read_size = 1000
404 self._need_privkey = False
406 if mode == MODE_WRITE and not self._node.get_privkey():
407 self._need_privkey = True
408 # check+repair: repair requires the privkey, so if we didn't happen
409 # to ask for it during the check, we'll have problems doing the
412 self.fetch_update_data = False
413 if mode == MODE_WRITE and update_range:
414 # We're updating the servermap in preparation for an
415 # in-place file update, so we need to fetch some additional
416 # data from each share that we find.
417 assert len(update_range) == 2
419 self.start_segment = update_range[0]
420 self.end_segment = update_range[1]
421 self.fetch_update_data = True
423 prefix = si_b2a(self._storage_index)[:5]
424 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
425 si=prefix, mode=mode)
427 def get_status(self):
430 def log(self, *args, **kwargs):
431 if "parent" not in kwargs:
432 kwargs["parent"] = self._log_number
433 if "facility" not in kwargs:
434 kwargs["facility"] = "tahoe.mutable.mapupdate"
435 return log.msg(*args, **kwargs)
438 """Update the servermap to reflect current conditions. Returns a
439 Deferred that fires with the servermap once the update has finished."""
440 self._started = time.time()
441 self._status.set_active(True)
443 # self._valid_versions is a set of validated verinfo tuples. We just
444 # use it to remember which versions had valid signatures, so we can
445 # avoid re-checking the signatures for each share.
446 self._valid_versions = set()
448 # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
449 # timestamp) tuples. This is used to figure out which versions might
450 # be retrievable, and to make the eventual data download faster.
451 self.versionmap = DictOfSets()
453 self._done_deferred = defer.Deferred()
455 # first, which peers should be talk to? Any that were in our old
456 # servermap, plus "enough" others.
458 self._queries_completed = 0
460 sb = self._storage_broker
461 # All of the peers, permuted by the storage index, as usual.
462 full_peerlist = [(s.get_serverid(), s.get_rref())
463 for s in sb.get_servers_for_psi(self._storage_index)]
464 self.full_peerlist = full_peerlist # for use later, immutable
465 self.extra_peers = full_peerlist[:] # peers are removed as we use them
466 self._good_peers = set() # peers who had some shares
467 self._empty_peers = set() # peers who don't have any shares
468 self._bad_peers = set() # peers to whom our queries failed
469 self._readers = {} # peerid -> dict(sharewriters), filled in
470 # after responses come in.
472 k = self._node.get_required_shares()
473 # For what cases can these conditions work?
477 N = self._node.get_total_shares()
481 # we want to send queries to at least this many peers (although we
482 # might not wait for all of their answers to come back)
483 self.num_peers_to_query = k + self.EPSILON
485 if self.mode == MODE_CHECK:
486 # We want to query all of the peers.
487 initial_peers_to_query = dict(full_peerlist)
488 must_query = set(initial_peers_to_query.keys())
489 self.extra_peers = []
490 elif self.mode == MODE_WRITE:
491 # we're planning to replace all the shares, so we want a good
492 # chance of finding them all. We will keep searching until we've
493 # seen epsilon that don't have a share.
494 # We don't query all of the peers because that could take a while.
495 self.num_peers_to_query = N + self.EPSILON
496 initial_peers_to_query, must_query = self._build_initial_querylist()
497 self.required_num_empty_peers = self.EPSILON
499 # TODO: arrange to read lots of data from k-ish servers, to avoid
500 # the extra round trip required to read large directories. This
501 # might also avoid the round trip required to read the encrypted
504 else: # MODE_READ, MODE_ANYTHING
505 # 2k peers is good enough.
506 initial_peers_to_query, must_query = self._build_initial_querylist()
508 # this is a set of peers that we are required to get responses from:
509 # they are peers who used to have a share, so we need to know where
510 # they currently stand, even if that means we have to wait for a
511 # silently-lost TCP connection to time out. We remove peers from this
512 # set as we get responses.
513 self._must_query = must_query
515 # now initial_peers_to_query contains the peers that we should ask,
516 # self.must_query contains the peers that we must have heard from
517 # before we can consider ourselves finished, and self.extra_peers
518 # contains the overflow (peers that we should tap if we don't get
520 # I guess that self._must_query is a subset of
521 # initial_peers_to_query?
522 assert set(must_query).issubset(set(initial_peers_to_query))
524 self._send_initial_requests(initial_peers_to_query)
525 self._status.timings["initial_queries"] = time.time() - self._started
526 return self._done_deferred
528 def _build_initial_querylist(self):
529 initial_peers_to_query = {}
531 for peerid in self._servermap.all_peers():
532 ss = self._servermap.connections[peerid]
533 # we send queries to everyone who was already in the sharemap
534 initial_peers_to_query[peerid] = ss
535 # and we must wait for responses from them
536 must_query.add(peerid)
538 while ((self.num_peers_to_query > len(initial_peers_to_query))
539 and self.extra_peers):
540 (peerid, ss) = self.extra_peers.pop(0)
541 initial_peers_to_query[peerid] = ss
543 return initial_peers_to_query, must_query
545 def _send_initial_requests(self, peerlist):
546 self._status.set_status("Sending %d initial queries" % len(peerlist))
547 self._queries_outstanding = set()
548 self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
549 for (peerid, ss) in peerlist.items():
550 self._queries_outstanding.add(peerid)
551 self._do_query(ss, peerid, self._storage_index, self._read_size)
554 # there is nobody to ask, so we need to short-circuit the state
556 d = defer.maybeDeferred(self._check_for_done, None)
557 d.addErrback(self._fatal_error)
559 # control flow beyond this point: state machine. Receiving responses
560 # from queries is the input. We might send out more queries, or we
561 # might produce a result.
564 def _do_query(self, ss, peerid, storage_index, readsize):
565 self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
566 peerid=idlib.shortnodeid_b2a(peerid),
569 self._servermap.connections[peerid] = ss
570 started = time.time()
571 self._queries_outstanding.add(peerid)
572 d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
573 d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
575 d.addErrback(self._query_failed, peerid)
576 # errors that aren't handled by _query_failed (and errors caused by
577 # _query_failed) get logged, but we still want to check for doneness.
578 d.addErrback(log.err)
579 d.addErrback(self._fatal_error)
580 d.addCallback(self._check_for_done)
583 def _do_read(self, ss, peerid, storage_index, shnums, readv):
585 # send an add-lease message in parallel. The results are handled
586 # separately. This is sent before the slot_readv() so that we can
587 # be sure the add_lease is retired by the time slot_readv comes
588 # back (this relies upon our knowledge that the server code for
589 # add_lease is synchronous).
590 renew_secret = self._node.get_renewal_secret(peerid)
591 cancel_secret = self._node.get_cancel_secret(peerid)
592 d2 = ss.callRemote("add_lease", storage_index,
593 renew_secret, cancel_secret)
595 d2.addErrback(self._add_lease_failed, peerid, storage_index)
596 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
600 def _got_corrupt_share(self, e, shnum, peerid, data, lp):
602 I am called when a remote server returns a corrupt share in
603 response to one of our queries. By corrupt, I mean a share
604 without a valid signature. I then record the failure, notify the
605 server of the corruption, and record the share as bad.
607 f = failure.Failure(e)
608 self.log(format="bad share: %(f_value)s", f_value=str(f),
609 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
610 # Notify the server that its share is corrupt.
611 self.notify_server_corruption(peerid, shnum, str(e))
612 # By flagging this as a bad peer, we won't count any of
613 # the other shares on that peer as valid, though if we
614 # happen to find a valid version string amongst those
615 # shares, we'll keep track of it so that we don't need
616 # to validate the signature on those again.
617 self._bad_peers.add(peerid)
618 self._last_failure = f
619 # XXX: Use the reader for this?
620 checkstring = data[:SIGNED_PREFIX_LENGTH]
621 self._servermap.mark_bad_share(peerid, shnum, checkstring)
622 self._servermap.problems.append(f)
625 def _cache_good_sharedata(self, verinfo, shnum, now, data):
627 If one of my queries returns successfully (which means that we
628 were able to and successfully did validate the signature), I
629 cache the data that we initially fetched from the storage
630 server. This will help reduce the number of roundtrips that need
631 to occur when the file is downloaded, or when the file is
635 self._node._add_to_cache(verinfo, shnum, 0, data)
638 def _got_results(self, datavs, peerid, readsize, stuff, started):
639 lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
640 peerid=idlib.shortnodeid_b2a(peerid),
641 numshares=len(datavs))
643 elapsed = now - started
644 def _done_processing(ignored=None):
645 self._queries_outstanding.discard(peerid)
646 self._servermap.reachable_peers.add(peerid)
647 self._must_query.discard(peerid)
648 self._queries_completed += 1
649 if not self._running:
650 self.log("but we're not running, so we'll ignore it", parent=lp)
652 self._status.add_per_server_time(peerid, "late", started, elapsed)
654 self._status.add_per_server_time(peerid, "query", started, elapsed)
657 self._good_peers.add(peerid)
659 self._empty_peers.add(peerid)
661 ss, storage_index = stuff
664 for shnum,datav in datavs.items():
666 reader = MDMFSlotReadProxy(ss,
670 self._readers.setdefault(peerid, dict())[shnum] = reader
671 # our goal, with each response, is to validate the version
672 # information and share data as best we can at this point --
673 # we do this by validating the signature. To do this, we
674 # need to do the following:
675 # - If we don't already have the public key, fetch the
676 # public key. We use this to validate the signature.
677 if not self._node.get_pubkey():
678 # fetch and set the public key.
679 d = reader.get_verification_key(queue=True)
680 d.addCallback(lambda results, shnum=shnum, peerid=peerid:
681 self._try_to_set_pubkey(results, peerid, shnum, lp))
682 # XXX: Make self._pubkey_query_failed?
683 d.addErrback(lambda error, shnum=shnum, peerid=peerid:
684 self._got_corrupt_share(error, shnum, peerid, data, lp))
686 # we already have the public key.
687 d = defer.succeed(None)
689 # Neither of these two branches return anything of
690 # consequence, so the first entry in our deferredlist will
693 # - Next, we need the version information. We almost
694 # certainly got this by reading the first thousand or so
695 # bytes of the share on the storage server, so we
696 # shouldn't need to fetch anything at this step.
697 d2 = reader.get_verinfo()
698 d2.addErrback(lambda error, shnum=shnum, peerid=peerid:
699 self._got_corrupt_share(error, shnum, peerid, data, lp))
700 # - Next, we need the signature. For an SDMF share, it is
701 # likely that we fetched this when doing our initial fetch
702 # to get the version information. In MDMF, this lives at
703 # the end of the share, so unless the file is quite small,
704 # we'll need to do a remote fetch to get it.
705 d3 = reader.get_signature(queue=True)
706 d3.addErrback(lambda error, shnum=shnum, peerid=peerid:
707 self._got_corrupt_share(error, shnum, peerid, data, lp))
708 # Once we have all three of these responses, we can move on
709 # to validating the signature
711 # Does the node already have a privkey? If not, we'll try to
713 if self._need_privkey:
714 d4 = reader.get_encprivkey(queue=True)
715 d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
716 self._try_to_validate_privkey(results, peerid, shnum, lp))
717 d4.addErrback(lambda error, shnum=shnum, peerid=peerid:
718 self._privkey_query_failed(error, shnum, data, lp))
720 d4 = defer.succeed(None)
723 if self.fetch_update_data:
724 # fetch the block hash tree and first + last segment, as
725 # configured earlier.
726 # Then set them in wherever we happen to want to set
729 # XXX: We do this above, too. Is there a good way to
730 # make the two routines share the value without
731 # introducing more roundtrips?
732 ds.append(reader.get_verinfo())
733 ds.append(reader.get_blockhashes(queue=True))
734 ds.append(reader.get_block_and_salt(self.start_segment,
736 ds.append(reader.get_block_and_salt(self.end_segment,
738 d5 = deferredutil.gatherResults(ds)
739 d5.addCallback(self._got_update_results_one_share, shnum)
741 d5 = defer.succeed(None)
743 dl = defer.DeferredList([d, d2, d3, d4, d5])
744 dl.addBoth(self._turn_barrier)
746 dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
747 self._got_signature_one_share(results, shnum, peerid, lp))
748 dl.addErrback(lambda error, shnum=shnum, data=data:
749 self._got_corrupt_share(error, shnum, peerid, data, lp))
750 dl.addCallback(lambda verinfo, shnum=shnum, peerid=peerid, data=data:
751 self._cache_good_sharedata(verinfo, shnum, now, data))
753 # dl is a deferred list that will fire when all of the shares
754 # that we found on this peer are done processing. When dl fires,
755 # we know that processing is done, so we can decrement the
756 # semaphore-like thing that we incremented earlier.
757 dl = defer.DeferredList(ds, fireOnOneErrback=True)
758 # Are we done? Done means that there are no more queries to
759 # send, that there are no outstanding queries, and that we
760 # haven't received any queries that are still processing. If we
761 # are done, self._check_for_done will cause the done deferred
762 # that we returned to our caller to fire, which tells them that
763 # they have a complete servermap, and that we won't be touching
764 # the servermap anymore.
765 dl.addCallback(_done_processing)
766 dl.addCallback(self._check_for_done)
767 dl.addErrback(self._fatal_error)
769 self.log("_got_results done", parent=lp, level=log.NOISY)
773 def _turn_barrier(self, result):
775 I help the servermap updater avoid the recursion limit issues
778 return fireEventually(result)
781 def _try_to_set_pubkey(self, pubkey_s, peerid, shnum, lp):
782 if self._node.get_pubkey():
783 return # don't go through this again if we don't have to
784 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
785 assert len(fingerprint) == 32
786 if fingerprint != self._node.get_fingerprint():
787 raise CorruptShareError(peerid, shnum,
788 "pubkey doesn't match fingerprint")
789 self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
790 assert self._node.get_pubkey()
793 def notify_server_corruption(self, peerid, shnum, reason):
794 ss = self._servermap.connections[peerid]
795 ss.callRemoteOnly("advise_corrupt_share",
796 "mutable", self._storage_index, shnum, reason)
799 def _got_signature_one_share(self, results, shnum, peerid, lp):
800 # It is our job to give versioninfo to our caller. We need to
801 # raise CorruptShareError if the share is corrupt for any
802 # reason, something that our caller will handle.
803 self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
805 peerid=idlib.shortnodeid_b2a(peerid),
808 if not self._running:
809 # We can't process the results, since we can't touch the
811 self.log("but we're not running anymore.")
814 _, verinfo, signature, __, ___ = results
823 offsets) = verinfo[1]
824 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
826 # XXX: This should be done for us in the method, so
827 # presumably you can go in there and fix it.
837 # This tuple uniquely identifies a share on the grid; we use it
838 # to keep track of the ones that we've already seen.
840 if verinfo not in self._valid_versions:
841 # This is a new version tuple, and we need to validate it
842 # against the public key before keeping track of it.
843 assert self._node.get_pubkey()
844 valid = self._node.get_pubkey().verify(prefix, signature[1])
846 raise CorruptShareError(peerid, shnum,
847 "signature is invalid")
849 # ok, it's a valid verinfo. Add it to the list of validated
851 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
852 % (seqnum, base32.b2a(root_hash)[:4],
853 idlib.shortnodeid_b2a(peerid), shnum,
854 k, n, segsize, datalen),
856 self._valid_versions.add(verinfo)
857 # We now know that this is a valid candidate verinfo. Whether or
858 # not this instance of it is valid is a matter for the next
859 # statement; at this point, we just know that if we see this
860 # version info again, that its signature checks out and that
861 # we're okay to skip the signature-checking step.
863 # (peerid, shnum) are bound in the method invocation.
864 if (peerid, shnum) in self._servermap.bad_shares:
865 # we've been told that the rest of the data in this share is
866 # unusable, so don't add it to the servermap.
867 self.log("but we've been told this is a bad share",
868 parent=lp, level=log.UNUSUAL)
871 # Add the info to our servermap.
872 timestamp = time.time()
873 self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
875 self.versionmap.add(verinfo, (shnum, peerid, timestamp))
880 def _got_update_results_one_share(self, results, share):
882 I record the update results in results.
884 assert len(results) == 4
885 verinfo, blockhashes, start, end = results
895 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
897 # XXX: This should be done for us in the method, so
898 # presumably you can go in there and fix it.
909 update_data = (blockhashes, start, end)
910 self._servermap.set_update_data_for_share_and_verinfo(share,
915 def _deserialize_pubkey(self, pubkey_s):
916 verifier = rsa.create_verifying_key_from_string(pubkey_s)
920 def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
922 Given a writekey from a remote server, I validate it against the
923 writekey stored in my node. If it is valid, then I set the
924 privkey and encprivkey properties of the node.
926 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
927 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
928 if alleged_writekey != self._node.get_writekey():
929 self.log("invalid privkey from %s shnum %d" %
930 (idlib.nodeid_b2a(peerid)[:8], shnum),
931 parent=lp, level=log.WEIRD, umid="aJVccw")
935 self.log("got valid privkey from shnum %d on peerid %s" %
936 (shnum, idlib.shortnodeid_b2a(peerid)),
938 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
939 self._node._populate_encprivkey(enc_privkey)
940 self._node._populate_privkey(privkey)
941 self._need_privkey = False
942 self._status.set_privkey_from(peerid)
945 def _add_lease_failed(self, f, peerid, storage_index):
946 # Older versions of Tahoe didn't handle the add-lease message very
947 # well: <=1.1.0 throws a NameError because it doesn't implement
948 # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
949 # (which is most of them, since we send add-lease to everybody,
950 # before we know whether or not they have any shares for us), and
951 # 1.2.0 throws KeyError even on known buckets due to an internal bug
952 # in the latency-measuring code.
954 # we want to ignore the known-harmless errors and log the others. In
955 # particular we want to log any local errors caused by coding
958 if f.check(DeadReferenceError):
960 if f.check(RemoteException):
961 if f.value.failure.check(KeyError, IndexError, NameError):
962 # this may ignore a bit too much, but that only hurts us
965 self.log(format="error in add_lease from [%(peerid)s]: %(f_value)s",
966 peerid=idlib.shortnodeid_b2a(peerid),
967 f_value=str(f.value),
969 level=log.WEIRD, umid="iqg3mw")
971 # local errors are cause for alarm
973 format="local error in add_lease to [%(peerid)s]: %(f_value)s",
974 peerid=idlib.shortnodeid_b2a(peerid),
975 f_value=str(f.value),
976 level=log.WEIRD, umid="ZWh6HA")
978 def _query_failed(self, f, peerid):
979 if not self._running:
982 if f.check(DeadReferenceError):
984 self.log(format="error during query: %(f_value)s",
985 f_value=str(f.value), failure=f,
986 level=level, umid="IHXuQg")
987 self._must_query.discard(peerid)
988 self._queries_outstanding.discard(peerid)
989 self._bad_peers.add(peerid)
990 self._servermap.problems.append(f)
991 # a peerid could be in both ServerMap.reachable_peers and
992 # .unreachable_peers if they responded to our query, but then an
993 # exception was raised in _got_results.
994 self._servermap.unreachable_peers.add(peerid)
995 self._queries_completed += 1
996 self._last_failure = f
999 def _privkey_query_failed(self, f, peerid, shnum, lp):
1000 self._queries_outstanding.discard(peerid)
1001 if not self._running:
1004 if f.check(DeadReferenceError):
1006 self.log(format="error during privkey query: %(f_value)s",
1007 f_value=str(f.value), failure=f,
1008 parent=lp, level=level, umid="McoJ5w")
1009 self._servermap.problems.append(f)
1010 self._last_failure = f
1013 def _check_for_done(self, res):
1015 # return self._send_more_queries(outstanding) : send some more queries
1016 # return self._done() : all done
1017 # return : keep waiting, no new queries
1018 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
1019 "%(outstanding)d queries outstanding, "
1020 "%(extra)d extra peers available, "
1021 "%(must)d 'must query' peers left, "
1022 "need_privkey=%(need_privkey)s"
1025 outstanding=len(self._queries_outstanding),
1026 extra=len(self.extra_peers),
1027 must=len(self._must_query),
1028 need_privkey=self._need_privkey,
1032 if not self._running:
1033 self.log("but we're not running", parent=lp, level=log.NOISY)
1036 if self._must_query:
1037 # we are still waiting for responses from peers that used to have
1038 # a share, so we must continue to wait. No additional queries are
1039 # required at this time.
1040 self.log("%d 'must query' peers left" % len(self._must_query),
1041 level=log.NOISY, parent=lp)
1044 if (not self._queries_outstanding and not self.extra_peers):
1045 # all queries have retired, and we have no peers left to ask. No
1046 # more progress can be made, therefore we are done.
1047 self.log("all queries are retired, no extra peers: done",
1051 recoverable_versions = self._servermap.recoverable_versions()
1052 unrecoverable_versions = self._servermap.unrecoverable_versions()
1054 # what is our completion policy? how hard should we work?
1056 if self.mode == MODE_ANYTHING:
1057 if recoverable_versions:
1058 self.log("%d recoverable versions: done"
1059 % len(recoverable_versions),
1063 if self.mode == MODE_CHECK:
1064 # we used self._must_query, and we know there aren't any
1065 # responses still waiting, so that means we must be done
1066 self.log("done", parent=lp)
1070 if self.mode == MODE_READ:
1071 # if we've queried k+epsilon servers, and we see a recoverable
1072 # version, and we haven't seen any unrecoverable higher-seqnum'ed
1073 # versions, then we're done.
1075 if self._queries_completed < self.num_peers_to_query:
1076 self.log(format="%(completed)d completed, %(query)d to query: need more",
1077 completed=self._queries_completed,
1078 query=self.num_peers_to_query,
1079 level=log.NOISY, parent=lp)
1080 return self._send_more_queries(MAX_IN_FLIGHT)
1081 if not recoverable_versions:
1082 self.log("no recoverable versions: need more",
1083 level=log.NOISY, parent=lp)
1084 return self._send_more_queries(MAX_IN_FLIGHT)
1085 highest_recoverable = max(recoverable_versions)
1086 highest_recoverable_seqnum = highest_recoverable[0]
1087 for unrec_verinfo in unrecoverable_versions:
1088 if unrec_verinfo[0] > highest_recoverable_seqnum:
1089 # there is evidence of a higher-seqnum version, but we
1090 # don't yet see enough shares to recover it. Try harder.
1091 # TODO: consider sending more queries.
1092 # TODO: consider limiting the search distance
1093 self.log("evidence of higher seqnum: need more",
1094 level=log.UNUSUAL, parent=lp)
1095 return self._send_more_queries(MAX_IN_FLIGHT)
1096 # all the unrecoverable versions were old or concurrent with a
1097 # recoverable version. Good enough.
1098 self.log("no higher-seqnum: done", parent=lp)
1101 if self.mode == MODE_WRITE:
1102 # we want to keep querying until we've seen a few that don't have
1103 # any shares, to be sufficiently confident that we've seen all
1104 # the shares. This is still less work than MODE_CHECK, which asks
1105 # every server in the world.
1107 if not recoverable_versions:
1108 self.log("no recoverable versions: need more", parent=lp,
1110 return self._send_more_queries(MAX_IN_FLIGHT)
1113 last_not_responded = -1
1114 num_not_responded = 0
1117 found_boundary = False
1119 for i,(peerid,ss) in enumerate(self.full_peerlist):
1120 if peerid in self._bad_peers:
1123 #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
1124 elif peerid in self._empty_peers:
1127 #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
1128 if last_found != -1:
1130 if num_not_found >= self.EPSILON:
1131 self.log("found our boundary, %s" %
1133 parent=lp, level=log.NOISY)
1134 found_boundary = True
1137 elif peerid in self._good_peers:
1140 #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
1146 #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
1147 last_not_responded = i
1148 num_not_responded += 1
1151 # we need to know that we've gotten answers from
1152 # everybody to the left of here
1153 if last_not_responded == -1:
1155 self.log("have all our answers",
1156 parent=lp, level=log.NOISY)
1157 # .. unless we're still waiting on the privkey
1158 if self._need_privkey:
1159 self.log("but we're still waiting for the privkey",
1160 parent=lp, level=log.NOISY)
1161 # if we found the boundary but we haven't yet found
1162 # the privkey, we may need to look further. If
1163 # somehow all the privkeys were corrupted (but the
1164 # shares were readable), then this is likely to do an
1165 # exhaustive search.
1166 return self._send_more_queries(MAX_IN_FLIGHT)
1168 # still waiting for somebody
1169 return self._send_more_queries(num_not_responded)
1171 # if we hit here, we didn't find our boundary, so we're still
1173 self.log("no boundary yet, %s" % "".join(states), parent=lp,
1175 return self._send_more_queries(MAX_IN_FLIGHT)
1177 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
1178 # arbitrary, really I want this to be something like k -
1179 # max(known_version_sharecounts) + some extra
1180 self.log("catchall: need more", parent=lp, level=log.NOISY)
1181 return self._send_more_queries(MAX_IN_FLIGHT)
1183 def _send_more_queries(self, num_outstanding):
1187 self.log(format=" there are %(outstanding)d queries outstanding",
1188 outstanding=len(self._queries_outstanding),
1190 active_queries = len(self._queries_outstanding) + len(more_queries)
1191 if active_queries >= num_outstanding:
1193 if not self.extra_peers:
1195 more_queries.append(self.extra_peers.pop(0))
1197 self.log(format="sending %(more)d more queries: %(who)s",
1198 more=len(more_queries),
1199 who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
1200 for (peerid,ss) in more_queries]),
1203 for (peerid, ss) in more_queries:
1204 self._do_query(ss, peerid, self._storage_index, self._read_size)
1205 # we'll retrigger when those queries come back
1208 if not self._running:
1209 self.log("not running; we're already done")
1211 self._running = False
1213 elapsed = now - self._started
1214 self._status.set_finished(now)
1215 self._status.timings["total"] = elapsed
1216 self._status.set_progress(1.0)
1217 self._status.set_status("Finished")
1218 self._status.set_active(False)
1220 self._servermap.last_update_mode = self.mode
1221 self._servermap.last_update_time = self._started
1222 # the servermap will not be touched after this
1223 self.log("servermap: %s" % self._servermap.summarize_versions())
1225 eventually(self._done_deferred.callback, self._servermap)
1227 def _fatal_error(self, f):
1228 self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1229 self._done_deferred.errback(f)