3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually, \
9 from allmydata.util import base32, hashutil, idlib, log, deferredutil
10 from allmydata.util.dictutil import DictOfSets
11 from allmydata.storage.server import si_b2a
12 from allmydata.interfaces import IServermapUpdaterStatus
13 from pycryptopp.publickey import rsa
15 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
17 from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
20 implements(IServermapUpdaterStatus)
21 statusid_counter = count(0)
24 self.timings["per_server"] = {}
25 self.timings["cumulative_verify"] = 0.0
26 self.privkey_from = None
29 self.storage_index = None
31 self.status = "Not started"
33 self.counter = self.statusid_counter.next()
34 self.started = time.time()
37 def add_per_server_time(self, peerid, op, sent, elapsed):
38 assert op in ("query", "late", "privkey")
39 if peerid not in self.timings["per_server"]:
40 self.timings["per_server"][peerid] = []
41 self.timings["per_server"][peerid].append((op,sent,elapsed))
43 def get_started(self):
45 def get_finished(self):
47 def get_storage_index(self):
48 return self.storage_index
51 def get_servermap(self):
53 def get_privkey_from(self):
54 return self.privkey_from
55 def using_helper(self):
61 def get_progress(self):
65 def get_counter(self):
68 def set_storage_index(self, si):
69 self.storage_index = si
70 def set_mode(self, mode):
72 def set_privkey_from(self, peerid):
73 self.privkey_from = peerid
74 def set_status(self, status):
76 def set_progress(self, value):
78 def set_active(self, value):
80 def set_finished(self, when):
84 """I record the placement of mutable shares.
86 This object records which shares (of various versions) are located on
89 One purpose I serve is to inform callers about which versions of the
90 mutable file are recoverable and 'current'.
92 A second purpose is to serve as a state marker for test-and-set
93 operations. I am passed out of retrieval operations and back into publish
94 operations, which means 'publish this new version, but only if nothing
95 has changed since I last retrieved this data'. This reduces the chances
96 of clobbering a simultaneous (uncoordinated) write.
98 @ivar servermap: a dictionary, mapping a (peerid, shnum) tuple to a
99 (versionid, timestamp) tuple. Each 'versionid' is a
100 tuple of (seqnum, root_hash, IV, segsize, datalength,
101 k, N, signed_prefix, offsets)
103 @ivar connections: maps peerid to a RemoteReference
105 @ivar bad_shares: dict with keys of (peerid, shnum) tuples, describing
106 shares that I should ignore (because a previous user of
107 the servermap determined that they were invalid). The
108 updater only locates a certain number of shares: if
109 some of these turn out to have integrity problems and
110 are unusable, the caller will need to mark those shares
111 as bad, then re-update the servermap, then try again.
112 The dict maps (peerid, shnum) tuple to old checkstring.
117 self.connections = {}
118 self.unreachable_peers = set() # peerids that didn't respond to queries
119 self.reachable_peers = set() # peerids that did respond to queries
120 self.problems = [] # mostly for debugging
121 self.bad_shares = {} # maps (peerid,shnum) to old checkstring
122 self.last_update_mode = None
123 self.last_update_time = 0
124 self.update_data = {} # (verinfo,shnum) => data
128 s.servermap = self.servermap.copy() # tuple->tuple
129 s.connections = self.connections.copy() # str->RemoteReference
130 s.unreachable_peers = set(self.unreachable_peers)
131 s.reachable_peers = set(self.reachable_peers)
132 s.problems = self.problems[:]
133 s.bad_shares = self.bad_shares.copy() # tuple->str
134 s.last_update_mode = self.last_update_mode
135 s.last_update_time = self.last_update_time
138 def mark_bad_share(self, peerid, shnum, checkstring):
139 """This share was found to be bad, either in the checkstring or
140 signature (detected during mapupdate), or deeper in the share
141 (detected at retrieve time). Remove it from our list of useful
142 shares, and remember that it is bad so we don't add it back again
143 later. We record the share's old checkstring (which might be
144 corrupted or badly signed) so that a repair operation can do the
145 test-and-set using it as a reference.
147 key = (peerid, shnum) # record checkstring
148 self.bad_shares[key] = checkstring
149 self.servermap.pop(key, None)
151 def add_new_share(self, peerid, shnum, verinfo, timestamp):
152 """We've written a new share out, replacing any that was there
154 key = (peerid, shnum)
155 self.bad_shares.pop(key, None)
156 self.servermap[key] = (verinfo, timestamp)
158 def dump(self, out=sys.stdout):
159 print >>out, "servermap:"
161 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
162 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
163 offsets_tuple) = verinfo
164 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
165 (idlib.shortnodeid_b2a(peerid), shnum,
166 seqnum, base32.b2a(root_hash)[:4], k, N,
169 print >>out, "%d PROBLEMS" % len(self.problems)
170 for f in self.problems:
179 def all_peers_for_version(self, verinfo):
180 """Return a set of peerids that hold shares for the given version."""
182 for ( (peerid, shnum), (verinfo2, timestamp) )
183 in self.servermap.items()
184 if verinfo == verinfo2])
186 def make_sharemap(self):
187 """Return a dict that maps shnum to a set of peerds that hold it."""
188 sharemap = DictOfSets()
189 for (peerid, shnum) in self.servermap:
190 sharemap.add(shnum, peerid)
193 def make_versionmap(self):
194 """Return a dict that maps versionid to sets of (shnum, peerid,
195 timestamp) tuples."""
196 versionmap = DictOfSets()
197 for ( (peerid, shnum), (verinfo, timestamp) ) in self.servermap.items():
198 versionmap.add(verinfo, (shnum, peerid, timestamp))
201 def shares_on_peer(self, peerid):
203 for (s_peerid, shnum)
205 if s_peerid == peerid])
207 def version_on_peer(self, peerid, shnum):
208 key = (peerid, shnum)
209 if key in self.servermap:
210 (verinfo, timestamp) = self.servermap[key]
214 def shares_available(self):
215 """Return a dict that maps verinfo to tuples of
216 (num_distinct_shares, k, N) tuples."""
217 versionmap = self.make_versionmap()
219 for verinfo, shares in versionmap.items():
221 for (shnum, peerid, timestamp) in shares:
223 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
224 offsets_tuple) = verinfo
225 all_shares[verinfo] = (len(s), k, N)
228 def highest_seqnum(self):
229 available = self.shares_available()
230 seqnums = [verinfo[0]
231 for verinfo in available.keys()]
235 def summarize_version(self, verinfo):
236 """Take a versionid, return a string that describes it."""
237 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
238 offsets_tuple) = verinfo
239 return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
241 def summarize_versions(self):
242 """Return a string describing which versions we know about."""
243 versionmap = self.make_versionmap()
245 for (verinfo, shares) in versionmap.items():
246 vstr = self.summarize_version(verinfo)
247 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
248 bits.append("%d*%s" % (len(shnums), vstr))
249 return "/".join(bits)
251 def recoverable_versions(self):
252 """Return a set of versionids, one for each version that is currently
254 versionmap = self.make_versionmap()
255 recoverable_versions = set()
256 for (verinfo, shares) in versionmap.items():
257 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
258 offsets_tuple) = verinfo
259 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
261 # this one is recoverable
262 recoverable_versions.add(verinfo)
264 return recoverable_versions
266 def unrecoverable_versions(self):
267 """Return a set of versionids, one for each version that is currently
269 versionmap = self.make_versionmap()
271 unrecoverable_versions = set()
272 for (verinfo, shares) in versionmap.items():
273 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
274 offsets_tuple) = verinfo
275 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
277 unrecoverable_versions.add(verinfo)
279 return unrecoverable_versions
281 def best_recoverable_version(self):
282 """Return a single versionid, for the so-called 'best' recoverable
283 version. Sequence number is the primary sort criteria, followed by
284 root hash. Returns None if there are no recoverable versions."""
285 recoverable = list(self.recoverable_versions())
288 return recoverable[-1]
291 def size_of_version(self, verinfo):
292 """Given a versionid (perhaps returned by best_recoverable_version),
293 return the size of the file in bytes."""
294 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
295 offsets_tuple) = verinfo
298 def unrecoverable_newer_versions(self):
299 # Return a dict of versionid -> health, for versions that are
300 # unrecoverable and have later seqnums than any recoverable versions.
301 # These indicate that a write will lose data.
302 versionmap = self.make_versionmap()
303 healths = {} # maps verinfo to (found,k)
304 unrecoverable = set()
305 highest_recoverable_seqnum = -1
306 for (verinfo, shares) in versionmap.items():
307 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
308 offsets_tuple) = verinfo
309 shnums = set([shnum for (shnum, peerid, timestamp) in shares])
310 healths[verinfo] = (len(shnums),k)
312 unrecoverable.add(verinfo)
314 highest_recoverable_seqnum = max(seqnum,
315 highest_recoverable_seqnum)
318 for verinfo in unrecoverable:
319 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
320 offsets_tuple) = verinfo
321 if seqnum > highest_recoverable_seqnum:
322 newversions[verinfo] = healths[verinfo]
327 def needs_merge(self):
328 # return True if there are multiple recoverable versions with the
329 # same seqnum, meaning that MutableFileNode.read_best_version is not
330 # giving you the whole story, and that using its data to do a
331 # subsequent publish will lose information.
332 recoverable_seqnums = [verinfo[0]
333 for verinfo in self.recoverable_versions()]
334 for seqnum in recoverable_seqnums:
335 if recoverable_seqnums.count(seqnum) > 1:
340 def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
342 I return the update data for the given shnum
344 update_data = self.update_data[shnum]
345 update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
349 def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
351 I record the block hash tree for the given shnum.
353 self.update_data.setdefault(shnum , []).append((verinfo, data))
356 class ServermapUpdater:
357 def __init__(self, filenode, storage_broker, monitor, servermap,
358 mode=MODE_READ, add_lease=False, update_range=None):
359 """I update a servermap, locating a sufficient number of useful
360 shares and remembering where they are located.
364 self._node = filenode
365 self._storage_broker = storage_broker
366 self._monitor = monitor
367 self._servermap = servermap
369 self._add_lease = add_lease
372 self._storage_index = filenode.get_storage_index()
373 self._last_failure = None
375 self._status = UpdateStatus()
376 self._status.set_storage_index(self._storage_index)
377 self._status.set_progress(0.0)
378 self._status.set_mode(mode)
380 self._servers_responded = set()
382 # how much data should we read?
384 # * if we only need the checkstring, then [0:75]
385 # * if we need to validate the checkstring sig, then [543ish:799ish]
386 # * if we need the verification key, then [107:436ish]
387 # * the offset table at [75:107] tells us about the 'ish'
388 # * if we need the encrypted private key, we want [-1216ish:]
389 # * but we can't read from negative offsets
390 # * the offset table tells us the 'ish', also the positive offset
392 # * Checkstring? [0:72]
393 # * If we want to validate the checkstring, then [0:72], [143:?] --
394 # the offset table will tell us for sure.
395 # * If we need the verification key, we have to consult the offset
397 # At this point, we don't know which we are. Our filenode can
398 # tell us, but it might be lying -- in some cases, we're
399 # responsible for telling it which kind of file it is.
400 self._read_size = 4000
401 if mode == MODE_CHECK:
402 # we use unpack_prefix_and_signature, so we need 1k
403 self._read_size = 1000
404 self._need_privkey = False
406 if mode == MODE_WRITE and not self._node.get_privkey():
407 self._need_privkey = True
408 # check+repair: repair requires the privkey, so if we didn't happen
409 # to ask for it during the check, we'll have problems doing the
412 self.fetch_update_data = False
413 if mode == MODE_WRITE and update_range:
414 # We're updating the servermap in preparation for an
415 # in-place file update, so we need to fetch some additional
416 # data from each share that we find.
417 assert len(update_range) == 2
419 self.start_segment = update_range[0]
420 self.end_segment = update_range[1]
421 self.fetch_update_data = True
423 prefix = si_b2a(self._storage_index)[:5]
424 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
425 si=prefix, mode=mode)
427 def get_status(self):
430 def log(self, *args, **kwargs):
431 if "parent" not in kwargs:
432 kwargs["parent"] = self._log_number
433 if "facility" not in kwargs:
434 kwargs["facility"] = "tahoe.mutable.mapupdate"
435 return log.msg(*args, **kwargs)
438 """Update the servermap to reflect current conditions. Returns a
439 Deferred that fires with the servermap once the update has finished."""
440 self._started = time.time()
441 self._status.set_active(True)
443 # self._valid_versions is a set of validated verinfo tuples. We just
444 # use it to remember which versions had valid signatures, so we can
445 # avoid re-checking the signatures for each share.
446 self._valid_versions = set()
448 # self.versionmap maps verinfo tuples to sets of (shnum, peerid,
449 # timestamp) tuples. This is used to figure out which versions might
450 # be retrievable, and to make the eventual data download faster.
451 self.versionmap = DictOfSets()
453 self._done_deferred = defer.Deferred()
455 # first, which peers should be talk to? Any that were in our old
456 # servermap, plus "enough" others.
458 self._queries_completed = 0
460 sb = self._storage_broker
461 # All of the peers, permuted by the storage index, as usual.
462 full_peerlist = [(s.get_serverid(), s.get_rref())
463 for s in sb.get_servers_for_psi(self._storage_index)]
464 self.full_peerlist = full_peerlist # for use later, immutable
465 self.extra_peers = full_peerlist[:] # peers are removed as we use them
466 self._good_peers = set() # peers who had some shares
467 self._empty_peers = set() # peers who don't have any shares
468 self._bad_peers = set() # peers to whom our queries failed
469 self._readers = {} # peerid -> dict(sharewriters), filled in
470 # after responses come in.
472 k = self._node.get_required_shares()
473 # For what cases can these conditions work?
477 N = self._node.get_total_shares()
481 # we want to send queries to at least this many peers (although we
482 # might not wait for all of their answers to come back)
483 self.num_peers_to_query = k + self.EPSILON
485 if self.mode == MODE_CHECK:
486 # We want to query all of the peers.
487 initial_peers_to_query = dict(full_peerlist)
488 must_query = set(initial_peers_to_query.keys())
489 self.extra_peers = []
490 elif self.mode == MODE_WRITE:
491 # we're planning to replace all the shares, so we want a good
492 # chance of finding them all. We will keep searching until we've
493 # seen epsilon that don't have a share.
494 # We don't query all of the peers because that could take a while.
495 self.num_peers_to_query = N + self.EPSILON
496 initial_peers_to_query, must_query = self._build_initial_querylist()
497 self.required_num_empty_peers = self.EPSILON
499 # TODO: arrange to read lots of data from k-ish servers, to avoid
500 # the extra round trip required to read large directories. This
501 # might also avoid the round trip required to read the encrypted
504 else: # MODE_READ, MODE_ANYTHING
505 # 2k peers is good enough.
506 initial_peers_to_query, must_query = self._build_initial_querylist()
508 # this is a set of peers that we are required to get responses from:
509 # they are peers who used to have a share, so we need to know where
510 # they currently stand, even if that means we have to wait for a
511 # silently-lost TCP connection to time out. We remove peers from this
512 # set as we get responses.
513 self._must_query = must_query
515 # now initial_peers_to_query contains the peers that we should ask,
516 # self.must_query contains the peers that we must have heard from
517 # before we can consider ourselves finished, and self.extra_peers
518 # contains the overflow (peers that we should tap if we don't get
520 # I guess that self._must_query is a subset of
521 # initial_peers_to_query?
522 assert set(must_query).issubset(set(initial_peers_to_query))
524 self._send_initial_requests(initial_peers_to_query)
525 self._status.timings["initial_queries"] = time.time() - self._started
526 return self._done_deferred
528 def _build_initial_querylist(self):
529 initial_peers_to_query = {}
531 for peerid in self._servermap.all_peers():
532 ss = self._servermap.connections[peerid]
533 # we send queries to everyone who was already in the sharemap
534 initial_peers_to_query[peerid] = ss
535 # and we must wait for responses from them
536 must_query.add(peerid)
538 while ((self.num_peers_to_query > len(initial_peers_to_query))
539 and self.extra_peers):
540 (peerid, ss) = self.extra_peers.pop(0)
541 initial_peers_to_query[peerid] = ss
543 return initial_peers_to_query, must_query
545 def _send_initial_requests(self, peerlist):
546 self._status.set_status("Sending %d initial queries" % len(peerlist))
547 self._queries_outstanding = set()
548 self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
549 for (peerid, ss) in peerlist.items():
550 self._queries_outstanding.add(peerid)
551 self._do_query(ss, peerid, self._storage_index, self._read_size)
554 # there is nobody to ask, so we need to short-circuit the state
556 d = defer.maybeDeferred(self._check_for_done, None)
557 d.addErrback(self._fatal_error)
559 # control flow beyond this point: state machine. Receiving responses
560 # from queries is the input. We might send out more queries, or we
561 # might produce a result.
564 def _do_query(self, ss, peerid, storage_index, readsize):
565 self.log(format="sending query to [%(peerid)s], readsize=%(readsize)d",
566 peerid=idlib.shortnodeid_b2a(peerid),
569 self._servermap.connections[peerid] = ss
570 started = time.time()
571 self._queries_outstanding.add(peerid)
572 d = self._do_read(ss, peerid, storage_index, [], [(0, readsize)])
573 d.addCallback(self._got_results, peerid, readsize, (ss, storage_index),
575 d.addErrback(self._query_failed, peerid)
576 # errors that aren't handled by _query_failed (and errors caused by
577 # _query_failed) get logged, but we still want to check for doneness.
578 d.addErrback(log.err)
579 d.addErrback(self._fatal_error)
580 d.addCallback(self._check_for_done)
583 def _do_read(self, ss, peerid, storage_index, shnums, readv):
585 # send an add-lease message in parallel. The results are handled
586 # separately. This is sent before the slot_readv() so that we can
587 # be sure the add_lease is retired by the time slot_readv comes
588 # back (this relies upon our knowledge that the server code for
589 # add_lease is synchronous).
590 renew_secret = self._node.get_renewal_secret(peerid)
591 cancel_secret = self._node.get_cancel_secret(peerid)
592 d2 = ss.callRemote("add_lease", storage_index,
593 renew_secret, cancel_secret)
595 d2.addErrback(self._add_lease_failed, peerid, storage_index)
596 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
600 def _got_corrupt_share(self, e, shnum, peerid, data, lp):
602 I am called when a remote server returns a corrupt share in
603 response to one of our queries. By corrupt, I mean a share
604 without a valid signature. I then record the failure, notify the
605 server of the corruption, and record the share as bad.
607 f = failure.Failure(e)
608 self.log(format="bad share: %(f_value)s", f_value=str(f),
609 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
610 # Notify the server that its share is corrupt.
611 self.notify_server_corruption(peerid, shnum, str(e))
612 # By flagging this as a bad peer, we won't count any of
613 # the other shares on that peer as valid, though if we
614 # happen to find a valid version string amongst those
615 # shares, we'll keep track of it so that we don't need
616 # to validate the signature on those again.
617 self._bad_peers.add(peerid)
618 self._last_failure = f
619 # XXX: Use the reader for this?
620 checkstring = data[:SIGNED_PREFIX_LENGTH]
621 self._servermap.mark_bad_share(peerid, shnum, checkstring)
622 self._servermap.problems.append(f)
625 def _cache_good_sharedata(self, verinfo, shnum, now, data):
627 If one of my queries returns successfully (which means that we
628 were able to and successfully did validate the signature), I
629 cache the data that we initially fetched from the storage
630 server. This will help reduce the number of roundtrips that need
631 to occur when the file is downloaded, or when the file is
635 self._node._add_to_cache(verinfo, shnum, 0, data)
638 def _got_results(self, datavs, peerid, readsize, stuff, started):
639 lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
640 peerid=idlib.shortnodeid_b2a(peerid),
641 numshares=len(datavs))
643 elapsed = now - started
644 def _done_processing(ignored=None):
645 self._queries_outstanding.discard(peerid)
646 self._servermap.reachable_peers.add(peerid)
647 self._must_query.discard(peerid)
648 self._queries_completed += 1
649 if not self._running:
650 self.log("but we're not running, so we'll ignore it", parent=lp)
652 self._status.add_per_server_time(peerid, "late", started, elapsed)
654 self._status.add_per_server_time(peerid, "query", started, elapsed)
657 self._good_peers.add(peerid)
659 self._empty_peers.add(peerid)
661 ss, storage_index = stuff
664 for shnum,datav in datavs.items():
666 reader = MDMFSlotReadProxy(ss,
670 self._readers.setdefault(peerid, dict())[shnum] = reader
671 # our goal, with each response, is to validate the version
672 # information and share data as best we can at this point --
673 # we do this by validating the signature. To do this, we
674 # need to do the following:
675 # - If we don't already have the public key, fetch the
676 # public key. We use this to validate the signature.
677 if not self._node.get_pubkey():
678 # fetch and set the public key.
679 d = reader.get_verification_key()
680 d.addCallback(lambda results, shnum=shnum, peerid=peerid:
681 self._try_to_set_pubkey(results, peerid, shnum, lp))
682 # XXX: Make self._pubkey_query_failed?
683 d.addErrback(lambda error, shnum=shnum, peerid=peerid, data=data:
684 self._got_corrupt_share(error, shnum, peerid, data, lp))
686 # we already have the public key.
687 d = defer.succeed(None)
689 # Neither of these two branches return anything of
690 # consequence, so the first entry in our deferredlist will
693 # - Next, we need the version information. We almost
694 # certainly got this by reading the first thousand or so
695 # bytes of the share on the storage server, so we
696 # shouldn't need to fetch anything at this step.
697 d2 = reader.get_verinfo()
698 d2.addErrback(lambda error, shnum=shnum, peerid=peerid, data=data:
699 self._got_corrupt_share(error, shnum, peerid, data, lp))
700 # - Next, we need the signature. For an SDMF share, it is
701 # likely that we fetched this when doing our initial fetch
702 # to get the version information. In MDMF, this lives at
703 # the end of the share, so unless the file is quite small,
704 # we'll need to do a remote fetch to get it.
705 d3 = reader.get_signature()
706 d3.addErrback(lambda error, shnum=shnum, peerid=peerid, data=data:
707 self._got_corrupt_share(error, shnum, peerid, data, lp))
708 # Once we have all three of these responses, we can move on
709 # to validating the signature
711 # Does the node already have a privkey? If not, we'll try to
713 if self._need_privkey:
714 d4 = reader.get_encprivkey()
715 d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
716 self._try_to_validate_privkey(results, peerid, shnum, lp))
717 d4.addErrback(lambda error, shnum=shnum, peerid=peerid, data=data:
718 self._privkey_query_failed(error, shnum, data, lp))
720 d4 = defer.succeed(None)
723 if self.fetch_update_data:
724 # fetch the block hash tree and first + last segment, as
725 # configured earlier.
726 # Then set them in wherever we happen to want to set
729 # XXX: We do this above, too. Is there a good way to
730 # make the two routines share the value without
731 # introducing more roundtrips?
732 ds.append(reader.get_verinfo())
733 ds.append(reader.get_blockhashes())
734 ds.append(reader.get_block_and_salt(self.start_segment))
735 ds.append(reader.get_block_and_salt(self.end_segment))
736 d5 = deferredutil.gatherResults(ds)
737 d5.addCallback(self._got_update_results_one_share, shnum)
739 d5 = defer.succeed(None)
741 dl = defer.DeferredList([d, d2, d3, d4, d5])
742 dl.addBoth(self._turn_barrier)
743 dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
744 self._got_signature_one_share(results, shnum, peerid, lp))
745 dl.addErrback(lambda error, shnum=shnum, data=data:
746 self._got_corrupt_share(error, shnum, peerid, data, lp))
747 dl.addCallback(lambda verinfo, shnum=shnum, peerid=peerid, data=data:
748 self._cache_good_sharedata(verinfo, shnum, now, data))
750 # dl is a deferred list that will fire when all of the shares
751 # that we found on this peer are done processing. When dl fires,
752 # we know that processing is done, so we can decrement the
753 # semaphore-like thing that we incremented earlier.
754 dl = defer.DeferredList(ds, fireOnOneErrback=True)
755 # Are we done? Done means that there are no more queries to
756 # send, that there are no outstanding queries, and that we
757 # haven't received any queries that are still processing. If we
758 # are done, self._check_for_done will cause the done deferred
759 # that we returned to our caller to fire, which tells them that
760 # they have a complete servermap, and that we won't be touching
761 # the servermap anymore.
762 dl.addCallback(_done_processing)
763 dl.addCallback(self._check_for_done)
764 dl.addErrback(self._fatal_error)
766 self.log("_got_results done", parent=lp, level=log.NOISY)
770 def _turn_barrier(self, result):
772 I help the servermap updater avoid the recursion limit issues
775 return fireEventually(result)
778 def _try_to_set_pubkey(self, pubkey_s, peerid, shnum, lp):
779 if self._node.get_pubkey():
780 return # don't go through this again if we don't have to
781 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
782 assert len(fingerprint) == 32
783 if fingerprint != self._node.get_fingerprint():
784 raise CorruptShareError(peerid, shnum,
785 "pubkey doesn't match fingerprint")
786 self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
787 assert self._node.get_pubkey()
790 def notify_server_corruption(self, peerid, shnum, reason):
791 ss = self._servermap.connections[peerid]
792 ss.callRemoteOnly("advise_corrupt_share",
793 "mutable", self._storage_index, shnum, reason)
796 def _got_signature_one_share(self, results, shnum, peerid, lp):
797 # It is our job to give versioninfo to our caller. We need to
798 # raise CorruptShareError if the share is corrupt for any
799 # reason, something that our caller will handle.
800 self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
802 peerid=idlib.shortnodeid_b2a(peerid),
805 if not self._running:
806 # We can't process the results, since we can't touch the
808 self.log("but we're not running anymore.")
811 _, verinfo, signature, __, ___ = results
820 offsets) = verinfo[1]
821 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
823 # XXX: This should be done for us in the method, so
824 # presumably you can go in there and fix it.
834 # This tuple uniquely identifies a share on the grid; we use it
835 # to keep track of the ones that we've already seen.
837 if verinfo not in self._valid_versions:
838 # This is a new version tuple, and we need to validate it
839 # against the public key before keeping track of it.
840 assert self._node.get_pubkey()
841 valid = self._node.get_pubkey().verify(prefix, signature[1])
843 raise CorruptShareError(peerid, shnum,
844 "signature is invalid")
846 # ok, it's a valid verinfo. Add it to the list of validated
848 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
849 % (seqnum, base32.b2a(root_hash)[:4],
850 idlib.shortnodeid_b2a(peerid), shnum,
851 k, n, segsize, datalen),
853 self._valid_versions.add(verinfo)
854 # We now know that this is a valid candidate verinfo. Whether or
855 # not this instance of it is valid is a matter for the next
856 # statement; at this point, we just know that if we see this
857 # version info again, that its signature checks out and that
858 # we're okay to skip the signature-checking step.
860 # (peerid, shnum) are bound in the method invocation.
861 if (peerid, shnum) in self._servermap.bad_shares:
862 # we've been told that the rest of the data in this share is
863 # unusable, so don't add it to the servermap.
864 self.log("but we've been told this is a bad share",
865 parent=lp, level=log.UNUSUAL)
868 # Add the info to our servermap.
869 timestamp = time.time()
870 self._servermap.add_new_share(peerid, shnum, verinfo, timestamp)
872 self.versionmap.add(verinfo, (shnum, peerid, timestamp))
877 def _got_update_results_one_share(self, results, share):
879 I record the update results in results.
881 assert len(results) == 4
882 verinfo, blockhashes, start, end = results
892 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
894 # XXX: This should be done for us in the method, so
895 # presumably you can go in there and fix it.
906 update_data = (blockhashes, start, end)
907 self._servermap.set_update_data_for_share_and_verinfo(share,
912 def _deserialize_pubkey(self, pubkey_s):
913 verifier = rsa.create_verifying_key_from_string(pubkey_s)
917 def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
919 Given a writekey from a remote server, I validate it against the
920 writekey stored in my node. If it is valid, then I set the
921 privkey and encprivkey properties of the node.
923 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
924 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
925 if alleged_writekey != self._node.get_writekey():
926 self.log("invalid privkey from %s shnum %d" %
927 (idlib.nodeid_b2a(peerid)[:8], shnum),
928 parent=lp, level=log.WEIRD, umid="aJVccw")
932 self.log("got valid privkey from shnum %d on peerid %s" %
933 (shnum, idlib.shortnodeid_b2a(peerid)),
935 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
936 self._node._populate_encprivkey(enc_privkey)
937 self._node._populate_privkey(privkey)
938 self._need_privkey = False
939 self._status.set_privkey_from(peerid)
942 def _add_lease_failed(self, f, peerid, storage_index):
943 # Older versions of Tahoe didn't handle the add-lease message very
944 # well: <=1.1.0 throws a NameError because it doesn't implement
945 # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
946 # (which is most of them, since we send add-lease to everybody,
947 # before we know whether or not they have any shares for us), and
948 # 1.2.0 throws KeyError even on known buckets due to an internal bug
949 # in the latency-measuring code.
951 # we want to ignore the known-harmless errors and log the others. In
952 # particular we want to log any local errors caused by coding
955 if f.check(DeadReferenceError):
957 if f.check(RemoteException):
958 if f.value.failure.check(KeyError, IndexError, NameError):
959 # this may ignore a bit too much, but that only hurts us
962 self.log(format="error in add_lease from [%(peerid)s]: %(f_value)s",
963 peerid=idlib.shortnodeid_b2a(peerid),
964 f_value=str(f.value),
966 level=log.WEIRD, umid="iqg3mw")
968 # local errors are cause for alarm
970 format="local error in add_lease to [%(peerid)s]: %(f_value)s",
971 peerid=idlib.shortnodeid_b2a(peerid),
972 f_value=str(f.value),
973 level=log.WEIRD, umid="ZWh6HA")
975 def _query_failed(self, f, peerid):
976 if not self._running:
979 if f.check(DeadReferenceError):
981 self.log(format="error during query: %(f_value)s",
982 f_value=str(f.value), failure=f,
983 level=level, umid="IHXuQg")
984 self._must_query.discard(peerid)
985 self._queries_outstanding.discard(peerid)
986 self._bad_peers.add(peerid)
987 self._servermap.problems.append(f)
988 # a peerid could be in both ServerMap.reachable_peers and
989 # .unreachable_peers if they responded to our query, but then an
990 # exception was raised in _got_results.
991 self._servermap.unreachable_peers.add(peerid)
992 self._queries_completed += 1
993 self._last_failure = f
996 def _privkey_query_failed(self, f, peerid, shnum, lp):
997 self._queries_outstanding.discard(peerid)
998 if not self._running:
1001 if f.check(DeadReferenceError):
1003 self.log(format="error during privkey query: %(f_value)s",
1004 f_value=str(f.value), failure=f,
1005 parent=lp, level=level, umid="McoJ5w")
1006 self._servermap.problems.append(f)
1007 self._last_failure = f
1010 def _check_for_done(self, res):
1012 # return self._send_more_queries(outstanding) : send some more queries
1013 # return self._done() : all done
1014 # return : keep waiting, no new queries
1015 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
1016 "%(outstanding)d queries outstanding, "
1017 "%(extra)d extra peers available, "
1018 "%(must)d 'must query' peers left, "
1019 "need_privkey=%(need_privkey)s"
1022 outstanding=len(self._queries_outstanding),
1023 extra=len(self.extra_peers),
1024 must=len(self._must_query),
1025 need_privkey=self._need_privkey,
1029 if not self._running:
1030 self.log("but we're not running", parent=lp, level=log.NOISY)
1033 if self._must_query:
1034 # we are still waiting for responses from peers that used to have
1035 # a share, so we must continue to wait. No additional queries are
1036 # required at this time.
1037 self.log("%d 'must query' peers left" % len(self._must_query),
1038 level=log.NOISY, parent=lp)
1041 if (not self._queries_outstanding and not self.extra_peers):
1042 # all queries have retired, and we have no peers left to ask. No
1043 # more progress can be made, therefore we are done.
1044 self.log("all queries are retired, no extra peers: done",
1048 recoverable_versions = self._servermap.recoverable_versions()
1049 unrecoverable_versions = self._servermap.unrecoverable_versions()
1051 # what is our completion policy? how hard should we work?
1053 if self.mode == MODE_ANYTHING:
1054 if recoverable_versions:
1055 self.log("%d recoverable versions: done"
1056 % len(recoverable_versions),
1060 if self.mode == MODE_CHECK:
1061 # we used self._must_query, and we know there aren't any
1062 # responses still waiting, so that means we must be done
1063 self.log("done", parent=lp)
1067 if self.mode == MODE_READ:
1068 # if we've queried k+epsilon servers, and we see a recoverable
1069 # version, and we haven't seen any unrecoverable higher-seqnum'ed
1070 # versions, then we're done.
1072 if self._queries_completed < self.num_peers_to_query:
1073 self.log(format="%(completed)d completed, %(query)d to query: need more",
1074 completed=self._queries_completed,
1075 query=self.num_peers_to_query,
1076 level=log.NOISY, parent=lp)
1077 return self._send_more_queries(MAX_IN_FLIGHT)
1078 if not recoverable_versions:
1079 self.log("no recoverable versions: need more",
1080 level=log.NOISY, parent=lp)
1081 return self._send_more_queries(MAX_IN_FLIGHT)
1082 highest_recoverable = max(recoverable_versions)
1083 highest_recoverable_seqnum = highest_recoverable[0]
1084 for unrec_verinfo in unrecoverable_versions:
1085 if unrec_verinfo[0] > highest_recoverable_seqnum:
1086 # there is evidence of a higher-seqnum version, but we
1087 # don't yet see enough shares to recover it. Try harder.
1088 # TODO: consider sending more queries.
1089 # TODO: consider limiting the search distance
1090 self.log("evidence of higher seqnum: need more",
1091 level=log.UNUSUAL, parent=lp)
1092 return self._send_more_queries(MAX_IN_FLIGHT)
1093 # all the unrecoverable versions were old or concurrent with a
1094 # recoverable version. Good enough.
1095 self.log("no higher-seqnum: done", parent=lp)
1098 if self.mode == MODE_WRITE:
1099 # we want to keep querying until we've seen a few that don't have
1100 # any shares, to be sufficiently confident that we've seen all
1101 # the shares. This is still less work than MODE_CHECK, which asks
1102 # every server in the world.
1104 if not recoverable_versions:
1105 self.log("no recoverable versions: need more", parent=lp,
1107 return self._send_more_queries(MAX_IN_FLIGHT)
1110 last_not_responded = -1
1111 num_not_responded = 0
1114 found_boundary = False
1116 for i,(peerid,ss) in enumerate(self.full_peerlist):
1117 if peerid in self._bad_peers:
1120 #self.log("loop [%s]: x" % idlib.shortnodeid_b2a(peerid))
1121 elif peerid in self._empty_peers:
1124 #self.log("loop [%s]: 0" % idlib.shortnodeid_b2a(peerid))
1125 if last_found != -1:
1127 if num_not_found >= self.EPSILON:
1128 self.log("found our boundary, %s" %
1130 parent=lp, level=log.NOISY)
1131 found_boundary = True
1134 elif peerid in self._good_peers:
1137 #self.log("loop [%s]: 1" % idlib.shortnodeid_b2a(peerid))
1143 #self.log("loop [%s]: ?" % idlib.shortnodeid_b2a(peerid))
1144 last_not_responded = i
1145 num_not_responded += 1
1148 # we need to know that we've gotten answers from
1149 # everybody to the left of here
1150 if last_not_responded == -1:
1152 self.log("have all our answers",
1153 parent=lp, level=log.NOISY)
1154 # .. unless we're still waiting on the privkey
1155 if self._need_privkey:
1156 self.log("but we're still waiting for the privkey",
1157 parent=lp, level=log.NOISY)
1158 # if we found the boundary but we haven't yet found
1159 # the privkey, we may need to look further. If
1160 # somehow all the privkeys were corrupted (but the
1161 # shares were readable), then this is likely to do an
1162 # exhaustive search.
1163 return self._send_more_queries(MAX_IN_FLIGHT)
1165 # still waiting for somebody
1166 return self._send_more_queries(num_not_responded)
1168 # if we hit here, we didn't find our boundary, so we're still
1170 self.log("no boundary yet, %s" % "".join(states), parent=lp,
1172 return self._send_more_queries(MAX_IN_FLIGHT)
1174 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
1175 # arbitrary, really I want this to be something like k -
1176 # max(known_version_sharecounts) + some extra
1177 self.log("catchall: need more", parent=lp, level=log.NOISY)
1178 return self._send_more_queries(MAX_IN_FLIGHT)
1180 def _send_more_queries(self, num_outstanding):
1184 self.log(format=" there are %(outstanding)d queries outstanding",
1185 outstanding=len(self._queries_outstanding),
1187 active_queries = len(self._queries_outstanding) + len(more_queries)
1188 if active_queries >= num_outstanding:
1190 if not self.extra_peers:
1192 more_queries.append(self.extra_peers.pop(0))
1194 self.log(format="sending %(more)d more queries: %(who)s",
1195 more=len(more_queries),
1196 who=" ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
1197 for (peerid,ss) in more_queries]),
1200 for (peerid, ss) in more_queries:
1201 self._do_query(ss, peerid, self._storage_index, self._read_size)
1202 # we'll retrigger when those queries come back
1205 if not self._running:
1206 self.log("not running; we're already done")
1208 self._running = False
1210 elapsed = now - self._started
1211 self._status.set_finished(now)
1212 self._status.timings["total"] = elapsed
1213 self._status.set_progress(1.0)
1214 self._status.set_status("Finished")
1215 self._status.set_active(False)
1217 self._servermap.last_update_mode = self.mode
1218 self._servermap.last_update_time = self._started
1219 # the servermap will not be touched after this
1220 self.log("servermap: %s" % self._servermap.summarize_versions())
1222 eventually(self._done_deferred.callback, self._servermap)
1224 def _fatal_error(self, f):
1225 self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1226 self._done_deferred.errback(f)