3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually, \
9 from allmydata.util import base32, hashutil, log, deferredutil
10 from allmydata.util.dictutil import DictOfSets
11 from allmydata.storage.server import si_b2a
12 from allmydata.interfaces import IServermapUpdaterStatus
13 from pycryptopp.publickey import rsa
15 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \
16 MODE_READ, MODE_REPAIR, CorruptShareError
17 from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
20 implements(IServermapUpdaterStatus)
21 statusid_counter = count(0)
24 self.timings["per_server"] = {}
25 self.timings["cumulative_verify"] = 0.0
26 self.privkey_from = None
29 self.storage_index = None
31 self.status = "Not started"
33 self.counter = self.statusid_counter.next()
34 self.started = time.time()
37 def add_per_server_time(self, server, op, sent, elapsed):
38 assert op in ("query", "late", "privkey")
39 if server not in self.timings["per_server"]:
40 self.timings["per_server"][server] = []
41 self.timings["per_server"][server].append((op,sent,elapsed))
43 def get_started(self):
45 def get_finished(self):
47 def get_storage_index(self):
48 return self.storage_index
51 def get_servermap(self):
53 def get_privkey_from(self):
54 return self.privkey_from
55 def using_helper(self):
61 def get_progress(self):
65 def get_counter(self):
68 def set_storage_index(self, si):
69 self.storage_index = si
70 def set_mode(self, mode):
72 def set_privkey_from(self, server):
73 self.privkey_from = server
74 def set_status(self, status):
76 def set_progress(self, value):
78 def set_active(self, value):
80 def set_finished(self, when):
84 """I record the placement of mutable shares.
86 This object records which shares (of various versions) are located on
89 One purpose I serve is to inform callers about which versions of the
90 mutable file are recoverable and 'current'.
92 A second purpose is to serve as a state marker for test-and-set
93 operations. I am passed out of retrieval operations and back into publish
94 operations, which means 'publish this new version, but only if nothing
95 has changed since I last retrieved this data'. This reduces the chances
96 of clobbering a simultaneous (uncoordinated) write.
98 @var _known_shares: a dictionary, mapping a (server, shnum) tuple to a
99 (versionid, timestamp) tuple. Each 'versionid' is a
100 tuple of (seqnum, root_hash, IV, segsize, datalength,
101 k, N, signed_prefix, offsets)
103 @ivar _bad_shares: dict with keys of (server, shnum) tuples, describing
104 shares that I should ignore (because a previous user
105 of the servermap determined that they were invalid).
106 The updater only locates a certain number of shares:
107 if some of these turn out to have integrity problems
108 and are unusable, the caller will need to mark those
109 shares as bad, then re-update the servermap, then try
110 again. The dict maps (server, shnum) tuple to old
115 self._known_shares = {}
116 self.unreachable_servers = set() # servers that didn't respond to queries
117 self.reachable_servers = set() # servers that did respond to queries
118 self._problems = [] # mostly for debugging
119 self._bad_shares = {} # maps (server,shnum) to old checkstring
120 self._last_update_mode = None
121 self._last_update_time = 0
123 self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
124 # where blockhashes is a list of bytestrings (the result of
125 # layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
126 # (block,salt) tuple-of-bytestrings from get_block_and_salt()
130 s._known_shares = self._known_shares.copy() # tuple->tuple
131 s.unreachable_servers = set(self.unreachable_servers)
132 s.reachable_servers = set(self.reachable_servers)
133 s._problems = self._problems[:]
134 s._bad_shares = self._bad_shares.copy() # tuple->str
135 s._last_update_mode = self._last_update_mode
136 s._last_update_time = self._last_update_time
137 s.update_data = copy.deepcopy(self.update_data)
140 def get_reachable_servers(self):
141 return self.reachable_servers
143 def mark_server_reachable(self, server):
144 self.reachable_servers.add(server)
146 def mark_server_unreachable(self, server):
147 self.unreachable_servers.add(server)
149 def mark_bad_share(self, server, shnum, checkstring):
150 """This share was found to be bad, either in the checkstring or
151 signature (detected during mapupdate), or deeper in the share
152 (detected at retrieve time). Remove it from our list of useful
153 shares, and remember that it is bad so we don't add it back again
154 later. We record the share's old checkstring (which might be
155 corrupted or badly signed) so that a repair operation can do the
156 test-and-set using it as a reference.
158 key = (server, shnum) # record checkstring
159 self._bad_shares[key] = checkstring
160 self._known_shares.pop(key, None)
162 def get_bad_shares(self):
163 # key=(server,shnum) -> checkstring
164 return self._bad_shares
166 def add_new_share(self, server, shnum, verinfo, timestamp):
167 """We've written a new share out, replacing any that was there
169 key = (server, shnum)
170 self._bad_shares.pop(key, None)
171 self._known_shares[key] = (verinfo, timestamp)
173 def add_problem(self, f):
174 self._problems.append(f)
175 def get_problems(self):
176 return self._problems
178 def set_last_update(self, mode, when):
179 self._last_update_mode = mode
180 self._last_update_time = when
181 def get_last_update(self):
182 return (self._last_update_mode, self._last_update_time)
184 def dump(self, out=sys.stdout):
185 print >>out, "servermap:"
187 for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
188 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
189 offsets_tuple) = verinfo
190 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
191 (server.get_name(), shnum,
192 seqnum, base32.b2a(root_hash)[:4], k, N,
195 print >>out, "%d PROBLEMS" % len(self._problems)
196 for f in self._problems:
200 def all_servers(self):
201 return set([server for (server, shnum) in self._known_shares])
203 def all_servers_for_version(self, verinfo):
204 """Return a set of servers that hold shares for the given version."""
206 for ( (server, shnum), (verinfo2, timestamp) )
207 in self._known_shares.items()
208 if verinfo == verinfo2])
210 def get_known_shares(self):
211 # maps (server,shnum) to (versionid,timestamp)
212 return self._known_shares
214 def make_sharemap(self):
215 """Return a dict that maps shnum to a set of servers that hold it."""
216 sharemap = DictOfSets()
217 for (server, shnum) in self._known_shares:
218 sharemap.add(shnum, server)
221 def make_versionmap(self):
222 """Return a dict that maps versionid to sets of (shnum, server,
223 timestamp) tuples."""
224 versionmap = DictOfSets()
225 for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
226 versionmap.add(verinfo, (shnum, server, timestamp))
229 def debug_shares_on_server(self, server): # used by tests
230 return set([shnum for (s, shnum) in self._known_shares if s == server])
232 def version_on_server(self, server, shnum):
233 key = (server, shnum)
234 if key in self._known_shares:
235 (verinfo, timestamp) = self._known_shares[key]
239 def shares_available(self):
240 """Return a dict that maps verinfo to tuples of
241 (num_distinct_shares, k, N) tuples."""
242 versionmap = self.make_versionmap()
244 for verinfo, shares in versionmap.items():
246 for (shnum, server, timestamp) in shares:
248 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
249 offsets_tuple) = verinfo
250 all_shares[verinfo] = (len(s), k, N)
253 def highest_seqnum(self):
254 available = self.shares_available()
255 seqnums = [verinfo[0]
256 for verinfo in available.keys()]
260 def summarize_version(self, verinfo):
261 """Take a versionid, return a string that describes it."""
262 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
263 offsets_tuple) = verinfo
264 return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
266 def summarize_versions(self):
267 """Return a string describing which versions we know about."""
268 versionmap = self.make_versionmap()
270 for (verinfo, shares) in versionmap.items():
271 vstr = self.summarize_version(verinfo)
272 shnums = set([shnum for (shnum, server, timestamp) in shares])
273 bits.append("%d*%s" % (len(shnums), vstr))
274 return "/".join(bits)
276 def recoverable_versions(self):
277 """Return a set of versionids, one for each version that is currently
279 versionmap = self.make_versionmap()
280 recoverable_versions = set()
281 for (verinfo, shares) in versionmap.items():
282 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
283 offsets_tuple) = verinfo
284 shnums = set([shnum for (shnum, server, timestamp) in shares])
286 # this one is recoverable
287 recoverable_versions.add(verinfo)
289 return recoverable_versions
291 def unrecoverable_versions(self):
292 """Return a set of versionids, one for each version that is currently
294 versionmap = self.make_versionmap()
296 unrecoverable_versions = set()
297 for (verinfo, shares) in versionmap.items():
298 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
299 offsets_tuple) = verinfo
300 shnums = set([shnum for (shnum, server, timestamp) in shares])
302 unrecoverable_versions.add(verinfo)
304 return unrecoverable_versions
306 def best_recoverable_version(self):
307 """Return a single versionid, for the so-called 'best' recoverable
308 version. Sequence number is the primary sort criteria, followed by
309 root hash. Returns None if there are no recoverable versions."""
310 recoverable = list(self.recoverable_versions())
313 return recoverable[-1]
316 def size_of_version(self, verinfo):
317 """Given a versionid (perhaps returned by best_recoverable_version),
318 return the size of the file in bytes."""
319 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
320 offsets_tuple) = verinfo
323 def unrecoverable_newer_versions(self):
324 # Return a dict of versionid -> health, for versions that are
325 # unrecoverable and have later seqnums than any recoverable versions.
326 # These indicate that a write will lose data.
327 versionmap = self.make_versionmap()
328 healths = {} # maps verinfo to (found,k)
329 unrecoverable = set()
330 highest_recoverable_seqnum = -1
331 for (verinfo, shares) in versionmap.items():
332 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
333 offsets_tuple) = verinfo
334 shnums = set([shnum for (shnum, server, timestamp) in shares])
335 healths[verinfo] = (len(shnums),k)
337 unrecoverable.add(verinfo)
339 highest_recoverable_seqnum = max(seqnum,
340 highest_recoverable_seqnum)
343 for verinfo in unrecoverable:
344 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
345 offsets_tuple) = verinfo
346 if seqnum > highest_recoverable_seqnum:
347 newversions[verinfo] = healths[verinfo]
352 def needs_merge(self):
353 # return True if there are multiple recoverable versions with the
354 # same seqnum, meaning that MutableFileNode.read_best_version is not
355 # giving you the whole story, and that using its data to do a
356 # subsequent publish will lose information.
357 recoverable_seqnums = [verinfo[0]
358 for verinfo in self.recoverable_versions()]
359 for seqnum in recoverable_seqnums:
360 if recoverable_seqnums.count(seqnum) > 1:
365 def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
367 I return the update data for the given shnum
369 update_data = self.update_data[shnum]
370 update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
374 def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
376 I record the block hash tree for the given shnum.
378 self.update_data.setdefault(shnum , []).append((verinfo, data))
381 class ServermapUpdater:
382 def __init__(self, filenode, storage_broker, monitor, servermap,
383 mode=MODE_READ, add_lease=False, update_range=None):
384 """I update a servermap, locating a sufficient number of useful
385 shares and remembering where they are located.
389 self._node = filenode
390 self._storage_broker = storage_broker
391 self._monitor = monitor
392 self._servermap = servermap
394 self._add_lease = add_lease
397 self._storage_index = filenode.get_storage_index()
398 self._last_failure = None
400 self._status = UpdateStatus()
401 self._status.set_storage_index(self._storage_index)
402 self._status.set_progress(0.0)
403 self._status.set_mode(mode)
405 self._servers_responded = set()
407 # how much data should we read?
409 # * if we only need the checkstring, then [0:75]
410 # * if we need to validate the checkstring sig, then [543ish:799ish]
411 # * if we need the verification key, then [107:436ish]
412 # * the offset table at [75:107] tells us about the 'ish'
413 # * if we need the encrypted private key, we want [-1216ish:]
414 # * but we can't read from negative offsets
415 # * the offset table tells us the 'ish', also the positive offset
417 # * Checkstring? [0:72]
418 # * If we want to validate the checkstring, then [0:72], [143:?] --
419 # the offset table will tell us for sure.
420 # * If we need the verification key, we have to consult the offset
422 # At this point, we don't know which we are. Our filenode can
423 # tell us, but it might be lying -- in some cases, we're
424 # responsible for telling it which kind of file it is.
425 self._read_size = 4000
426 if mode == MODE_CHECK:
427 # we use unpack_prefix_and_signature, so we need 1k
428 self._read_size = 1000
429 self._need_privkey = False
431 if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey():
432 self._need_privkey = True
433 # check+repair: repair requires the privkey, so if we didn't happen
434 # to ask for it during the check, we'll have problems doing the
437 self.fetch_update_data = False
438 if mode == MODE_WRITE and update_range:
439 # We're updating the servermap in preparation for an
440 # in-place file update, so we need to fetch some additional
441 # data from each share that we find.
442 assert len(update_range) == 2
444 self.start_segment = update_range[0]
445 self.end_segment = update_range[1]
446 self.fetch_update_data = True
448 prefix = si_b2a(self._storage_index)[:5]
449 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
450 si=prefix, mode=mode)
452 def get_status(self):
455 def log(self, *args, **kwargs):
456 if "parent" not in kwargs:
457 kwargs["parent"] = self._log_number
458 if "facility" not in kwargs:
459 kwargs["facility"] = "tahoe.mutable.mapupdate"
460 return log.msg(*args, **kwargs)
463 """Update the servermap to reflect current conditions. Returns a
464 Deferred that fires with the servermap once the update has finished."""
465 self._started = time.time()
466 self._status.set_active(True)
468 # self._valid_versions is a set of validated verinfo tuples. We just
469 # use it to remember which versions had valid signatures, so we can
470 # avoid re-checking the signatures for each share.
471 self._valid_versions = set()
473 self._done_deferred = defer.Deferred()
475 # first, which servers should be talk to? Any that were in our old
476 # servermap, plus "enough" others.
478 self._queries_completed = 0
480 sb = self._storage_broker
481 # All of the servers, permuted by the storage index, as usual.
482 full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
483 self.full_serverlist = full_serverlist # for use later, immutable
484 self.extra_servers = full_serverlist[:] # servers are removed as we use them
485 self._good_servers = set() # servers who had some shares
486 self._servers_with_shares = set() #servers that we know have shares now
487 self._empty_servers = set() # servers who don't have any shares
488 self._bad_servers = set() # servers to whom our queries failed
490 k = self._node.get_required_shares()
491 # For what cases can these conditions work?
495 N = self._node.get_total_shares()
499 # we want to send queries to at least this many servers (although we
500 # might not wait for all of their answers to come back)
501 self.num_servers_to_query = k + self.EPSILON
503 if self.mode in (MODE_CHECK, MODE_REPAIR):
504 # We want to query all of the servers.
505 initial_servers_to_query = list(full_serverlist)
506 must_query = set(initial_servers_to_query)
507 self.extra_servers = []
508 elif self.mode == MODE_WRITE:
509 # we're planning to replace all the shares, so we want a good
510 # chance of finding them all. We will keep searching until we've
511 # seen epsilon that don't have a share.
512 # We don't query all of the servers because that could take a while.
513 self.num_servers_to_query = N + self.EPSILON
514 initial_servers_to_query, must_query = self._build_initial_querylist()
515 self.required_num_empty_servers = self.EPSILON
517 # TODO: arrange to read lots of data from k-ish servers, to avoid
518 # the extra round trip required to read large directories. This
519 # might also avoid the round trip required to read the encrypted
522 else: # MODE_READ, MODE_ANYTHING
523 # 2*k servers is good enough.
524 initial_servers_to_query, must_query = self._build_initial_querylist()
526 # this is a set of servers that we are required to get responses
527 # from: they are servers who used to have a share, so we need to know
528 # where they currently stand, even if that means we have to wait for
529 # a silently-lost TCP connection to time out. We remove servers from
530 # this set as we get responses.
531 self._must_query = set(must_query)
533 # now initial_servers_to_query contains the servers that we should
534 # ask, self.must_query contains the servers that we must have heard
535 # from before we can consider ourselves finished, and
536 # self.extra_servers contains the overflow (servers that we should
537 # tap if we don't get enough responses)
538 # I guess that self._must_query is a subset of
539 # initial_servers_to_query?
540 assert must_query.issubset(initial_servers_to_query)
542 self._send_initial_requests(initial_servers_to_query)
543 self._status.timings["initial_queries"] = time.time() - self._started
544 return self._done_deferred
546 def _build_initial_querylist(self):
547 # we send queries to everyone who was already in the sharemap
548 initial_servers_to_query = set(self._servermap.all_servers())
549 # and we must wait for responses from them
550 must_query = set(initial_servers_to_query)
552 while ((self.num_servers_to_query > len(initial_servers_to_query))
553 and self.extra_servers):
554 initial_servers_to_query.add(self.extra_servers.pop(0))
556 return initial_servers_to_query, must_query
558 def _send_initial_requests(self, serverlist):
559 self._status.set_status("Sending %d initial queries" % len(serverlist))
560 self._queries_outstanding = set()
561 for server in serverlist:
562 self._queries_outstanding.add(server)
563 self._do_query(server, self._storage_index, self._read_size)
566 # there is nobody to ask, so we need to short-circuit the state
568 d = defer.maybeDeferred(self._check_for_done, None)
569 d.addErrback(self._fatal_error)
571 # control flow beyond this point: state machine. Receiving responses
572 # from queries is the input. We might send out more queries, or we
573 # might produce a result.
576 def _do_query(self, server, storage_index, readsize):
577 self.log(format="sending query to [%(name)s], readsize=%(readsize)d",
578 name=server.get_name(),
581 started = time.time()
582 self._queries_outstanding.add(server)
583 d = self._do_read(server, storage_index, [], [(0, readsize)])
584 d.addCallback(self._got_results, server, readsize, storage_index,
586 d.addErrback(self._query_failed, server)
587 # errors that aren't handled by _query_failed (and errors caused by
588 # _query_failed) get logged, but we still want to check for doneness.
589 d.addErrback(log.err)
590 d.addErrback(self._fatal_error)
591 d.addCallback(self._check_for_done)
594 def _do_read(self, server, storage_index, shnums, readv):
595 ss = server.get_rref()
597 # send an add-lease message in parallel. The results are handled
598 # separately. This is sent before the slot_readv() so that we can
599 # be sure the add_lease is retired by the time slot_readv comes
600 # back (this relies upon our knowledge that the server code for
601 # add_lease is synchronous).
602 renew_secret = self._node.get_renewal_secret(server)
603 cancel_secret = self._node.get_cancel_secret(server)
604 d2 = ss.callRemote("add_lease", storage_index,
605 renew_secret, cancel_secret)
607 d2.addErrback(self._add_lease_failed, server, storage_index)
608 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
612 def _got_corrupt_share(self, e, shnum, server, data, lp):
614 I am called when a remote server returns a corrupt share in
615 response to one of our queries. By corrupt, I mean a share
616 without a valid signature. I then record the failure, notify the
617 server of the corruption, and record the share as bad.
619 f = failure.Failure(e)
620 self.log(format="bad share: %(f_value)s", f_value=str(f),
621 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
622 # Notify the server that its share is corrupt.
623 self.notify_server_corruption(server, shnum, str(e))
624 # By flagging this as a bad server, we won't count any of
625 # the other shares on that server as valid, though if we
626 # happen to find a valid version string amongst those
627 # shares, we'll keep track of it so that we don't need
628 # to validate the signature on those again.
629 self._bad_servers.add(server)
630 self._last_failure = f
631 # XXX: Use the reader for this?
632 checkstring = data[:SIGNED_PREFIX_LENGTH]
633 self._servermap.mark_bad_share(server, shnum, checkstring)
634 self._servermap.add_problem(f)
637 def _got_results(self, datavs, server, readsize, storage_index, started):
638 lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
639 name=server.get_name(),
640 numshares=len(datavs))
641 ss = server.get_rref()
643 elapsed = now - started
644 def _done_processing(ignored=None):
645 self._queries_outstanding.discard(server)
646 self._servermap.mark_server_reachable(server)
647 self._must_query.discard(server)
648 self._queries_completed += 1
649 if not self._running:
650 self.log("but we're not running, so we'll ignore it", parent=lp)
652 self._status.add_per_server_time(server, "late", started, elapsed)
654 self._status.add_per_server_time(server, "query", started, elapsed)
657 self._good_servers.add(server)
659 self._empty_servers.add(server)
663 for shnum,datav in datavs.items():
665 reader = MDMFSlotReadProxy(ss,
669 data_is_everything=(len(data) < readsize))
671 # our goal, with each response, is to validate the version
672 # information and share data as best we can at this point --
673 # we do this by validating the signature. To do this, we
674 # need to do the following:
675 # - If we don't already have the public key, fetch the
676 # public key. We use this to validate the signature.
677 if not self._node.get_pubkey():
678 # fetch and set the public key.
679 d = reader.get_verification_key()
680 d.addCallback(lambda results, shnum=shnum:
681 self._try_to_set_pubkey(results, server, shnum, lp))
682 # XXX: Make self._pubkey_query_failed?
683 d.addErrback(lambda error, shnum=shnum, data=data:
684 self._got_corrupt_share(error, shnum, server, data, lp))
686 # we already have the public key.
687 d = defer.succeed(None)
689 # Neither of these two branches return anything of
690 # consequence, so the first entry in our deferredlist will
693 # - Next, we need the version information. We almost
694 # certainly got this by reading the first thousand or so
695 # bytes of the share on the storage server, so we
696 # shouldn't need to fetch anything at this step.
697 d2 = reader.get_verinfo()
698 d2.addErrback(lambda error, shnum=shnum, data=data:
699 self._got_corrupt_share(error, shnum, server, data, lp))
700 # - Next, we need the signature. For an SDMF share, it is
701 # likely that we fetched this when doing our initial fetch
702 # to get the version information. In MDMF, this lives at
703 # the end of the share, so unless the file is quite small,
704 # we'll need to do a remote fetch to get it.
705 d3 = reader.get_signature()
706 d3.addErrback(lambda error, shnum=shnum, data=data:
707 self._got_corrupt_share(error, shnum, server, data, lp))
708 # Once we have all three of these responses, we can move on
709 # to validating the signature
711 # Does the node already have a privkey? If not, we'll try to
713 if self._need_privkey:
714 d4 = reader.get_encprivkey()
715 d4.addCallback(lambda results, shnum=shnum:
716 self._try_to_validate_privkey(results, server, shnum, lp))
717 d4.addErrback(lambda error, shnum=shnum:
718 self._privkey_query_failed(error, server, shnum, lp))
720 d4 = defer.succeed(None)
723 if self.fetch_update_data:
724 # fetch the block hash tree and first + last segment, as
725 # configured earlier.
726 # Then set them in wherever we happen to want to set
729 # XXX: We do this above, too. Is there a good way to
730 # make the two routines share the value without
731 # introducing more roundtrips?
732 ds.append(reader.get_verinfo())
733 ds.append(reader.get_blockhashes())
734 ds.append(reader.get_block_and_salt(self.start_segment))
735 ds.append(reader.get_block_and_salt(self.end_segment))
736 d5 = deferredutil.gatherResults(ds)
737 d5.addCallback(self._got_update_results_one_share, shnum)
739 d5 = defer.succeed(None)
741 dl = defer.DeferredList([d, d2, d3, d4, d5])
742 def _append_proxy(passthrough, shnum=shnum, reader=reader):
743 # Store the proxy (with its cache) keyed by serverid and
745 _, (_,verinfo), _, _, _ = passthrough
746 verinfo = self._make_verinfo_hashable(verinfo)
747 self._servermap.proxies[(verinfo,
748 server.get_serverid(),
749 storage_index, shnum)] = reader
751 dl.addCallback(_append_proxy)
752 dl.addBoth(self._turn_barrier)
753 dl.addCallback(lambda results, shnum=shnum:
754 self._got_signature_one_share(results, shnum, server, lp))
755 dl.addErrback(lambda error, shnum=shnum, data=data:
756 self._got_corrupt_share(error, shnum, server, data, lp))
758 # dl is a deferred list that will fire when all of the shares
759 # that we found on this server are done processing. When dl fires,
760 # we know that processing is done, so we can decrement the
761 # semaphore-like thing that we incremented earlier.
762 dl = defer.DeferredList(ds, fireOnOneErrback=True)
763 # Are we done? Done means that there are no more queries to
764 # send, that there are no outstanding queries, and that we
765 # haven't received any queries that are still processing. If we
766 # are done, self._check_for_done will cause the done deferred
767 # that we returned to our caller to fire, which tells them that
768 # they have a complete servermap, and that we won't be touching
769 # the servermap anymore.
770 dl.addCallback(_done_processing)
771 dl.addCallback(self._check_for_done)
772 dl.addErrback(self._fatal_error)
774 self.log("_got_results done", parent=lp, level=log.NOISY)
778 def _turn_barrier(self, result):
780 I help the servermap updater avoid the recursion limit issues
783 return fireEventually(result)
786 def _try_to_set_pubkey(self, pubkey_s, server, shnum, lp):
787 if self._node.get_pubkey():
788 return # don't go through this again if we don't have to
789 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
790 assert len(fingerprint) == 32
791 if fingerprint != self._node.get_fingerprint():
792 raise CorruptShareError(server, shnum,
793 "pubkey doesn't match fingerprint")
794 self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
795 assert self._node.get_pubkey()
798 def notify_server_corruption(self, server, shnum, reason):
799 rref = server.get_rref()
800 rref.callRemoteOnly("advise_corrupt_share",
801 "mutable", self._storage_index, shnum, reason)
804 def _got_signature_one_share(self, results, shnum, server, lp):
805 # It is our job to give versioninfo to our caller. We need to
806 # raise CorruptShareError if the share is corrupt for any
807 # reason, something that our caller will handle.
808 self.log(format="_got_results: got shnum #%(shnum)d from serverid %(name)s",
810 name=server.get_name(),
813 if not self._running:
814 # We can't process the results, since we can't touch the
816 self.log("but we're not running anymore.")
819 _, verinfo, signature, __, ___ = results
820 verinfo = self._make_verinfo_hashable(verinfo[1])
822 # This tuple uniquely identifies a share on the grid; we use it
823 # to keep track of the ones that we've already seen.
832 offsets_tuple) = verinfo
835 if verinfo not in self._valid_versions:
836 # This is a new version tuple, and we need to validate it
837 # against the public key before keeping track of it.
838 assert self._node.get_pubkey()
839 valid = self._node.get_pubkey().verify(prefix, signature[1])
841 raise CorruptShareError(server, shnum,
842 "signature is invalid")
844 # ok, it's a valid verinfo. Add it to the list of validated
846 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
847 % (seqnum, base32.b2a(root_hash)[:4],
848 server.get_name(), shnum,
849 k, n, segsize, datalen),
851 self._valid_versions.add(verinfo)
852 # We now know that this is a valid candidate verinfo. Whether or
853 # not this instance of it is valid is a matter for the next
854 # statement; at this point, we just know that if we see this
855 # version info again, that its signature checks out and that
856 # we're okay to skip the signature-checking step.
858 # (server, shnum) are bound in the method invocation.
859 if (server, shnum) in self._servermap.get_bad_shares():
860 # we've been told that the rest of the data in this share is
861 # unusable, so don't add it to the servermap.
862 self.log("but we've been told this is a bad share",
863 parent=lp, level=log.UNUSUAL)
866 # Add the info to our servermap.
867 timestamp = time.time()
868 self._servermap.add_new_share(server, shnum, verinfo, timestamp)
869 self._servers_with_shares.add(server)
873 def _make_verinfo_hashable(self, verinfo):
884 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
897 def _got_update_results_one_share(self, results, share):
899 I record the update results in results.
901 assert len(results) == 4
902 verinfo, blockhashes, start, end = results
903 verinfo = self._make_verinfo_hashable(verinfo)
904 update_data = (blockhashes, start, end)
905 self._servermap.set_update_data_for_share_and_verinfo(share,
910 def _deserialize_pubkey(self, pubkey_s):
911 verifier = rsa.create_verifying_key_from_string(pubkey_s)
915 def _try_to_validate_privkey(self, enc_privkey, server, shnum, lp):
917 Given a writekey from a remote server, I validate it against the
918 writekey stored in my node. If it is valid, then I set the
919 privkey and encprivkey properties of the node.
921 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
922 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
923 if alleged_writekey != self._node.get_writekey():
924 self.log("invalid privkey from %s shnum %d" %
925 (server.get_name(), shnum),
926 parent=lp, level=log.WEIRD, umid="aJVccw")
930 self.log("got valid privkey from shnum %d on serverid %s" %
931 (shnum, server.get_name()),
933 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
934 self._node._populate_encprivkey(enc_privkey)
935 self._node._populate_privkey(privkey)
936 self._need_privkey = False
937 self._status.set_privkey_from(server)
940 def _add_lease_failed(self, f, server, storage_index):
941 # Older versions of Tahoe didn't handle the add-lease message very
942 # well: <=1.1.0 throws a NameError because it doesn't implement
943 # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
944 # (which is most of them, since we send add-lease to everybody,
945 # before we know whether or not they have any shares for us), and
946 # 1.2.0 throws KeyError even on known buckets due to an internal bug
947 # in the latency-measuring code.
949 # we want to ignore the known-harmless errors and log the others. In
950 # particular we want to log any local errors caused by coding
953 if f.check(DeadReferenceError):
955 if f.check(RemoteException):
956 if f.value.failure.check(KeyError, IndexError, NameError):
957 # this may ignore a bit too much, but that only hurts us
960 self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
961 name=server.get_name(),
962 f_value=str(f.value),
964 level=log.WEIRD, umid="iqg3mw")
966 # local errors are cause for alarm
968 format="local error in add_lease to [%(name)s]: %(f_value)s",
969 name=server.get_name(),
970 f_value=str(f.value),
971 level=log.WEIRD, umid="ZWh6HA")
973 def _query_failed(self, f, server):
974 if not self._running:
977 if f.check(DeadReferenceError):
979 self.log(format="error during query: %(f_value)s",
980 f_value=str(f.value), failure=f,
981 level=level, umid="IHXuQg")
982 self._must_query.discard(server)
983 self._queries_outstanding.discard(server)
984 self._bad_servers.add(server)
985 self._servermap.add_problem(f)
986 # a server could be in both ServerMap.reachable_servers and
987 # .unreachable_servers if they responded to our query, but then an
988 # exception was raised in _got_results.
989 self._servermap.mark_server_unreachable(server)
990 self._queries_completed += 1
991 self._last_failure = f
994 def _privkey_query_failed(self, f, server, shnum, lp):
995 self._queries_outstanding.discard(server)
996 if not self._running:
999 if f.check(DeadReferenceError):
1001 self.log(format="error during privkey query: %(f_value)s",
1002 f_value=str(f.value), failure=f,
1003 parent=lp, level=level, umid="McoJ5w")
1004 self._servermap.add_problem(f)
1005 self._last_failure = f
1008 def _check_for_done(self, res):
1010 # return self._send_more_queries(outstanding) : send some more queries
1011 # return self._done() : all done
1012 # return : keep waiting, no new queries
1013 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
1014 "%(outstanding)d queries outstanding, "
1015 "%(extra)d extra servers available, "
1016 "%(must)d 'must query' servers left, "
1017 "need_privkey=%(need_privkey)s"
1020 outstanding=len(self._queries_outstanding),
1021 extra=len(self.extra_servers),
1022 must=len(self._must_query),
1023 need_privkey=self._need_privkey,
1027 if not self._running:
1028 self.log("but we're not running", parent=lp, level=log.NOISY)
1031 if self._must_query:
1032 # we are still waiting for responses from servers that used to have
1033 # a share, so we must continue to wait. No additional queries are
1034 # required at this time.
1035 self.log("%d 'must query' servers left" % len(self._must_query),
1036 level=log.NOISY, parent=lp)
1039 if (not self._queries_outstanding and not self.extra_servers):
1040 # all queries have retired, and we have no servers left to ask. No
1041 # more progress can be made, therefore we are done.
1042 self.log("all queries are retired, no extra servers: done",
1046 recoverable_versions = self._servermap.recoverable_versions()
1047 unrecoverable_versions = self._servermap.unrecoverable_versions()
1049 # what is our completion policy? how hard should we work?
1051 if self.mode == MODE_ANYTHING:
1052 if recoverable_versions:
1053 self.log("%d recoverable versions: done"
1054 % len(recoverable_versions),
1058 if self.mode in (MODE_CHECK, MODE_REPAIR):
1059 # we used self._must_query, and we know there aren't any
1060 # responses still waiting, so that means we must be done
1061 self.log("done", parent=lp)
1065 if self.mode == MODE_READ:
1066 # if we've queried k+epsilon servers, and we see a recoverable
1067 # version, and we haven't seen any unrecoverable higher-seqnum'ed
1068 # versions, then we're done.
1070 if self._queries_completed < self.num_servers_to_query:
1071 self.log(format="%(completed)d completed, %(query)d to query: need more",
1072 completed=self._queries_completed,
1073 query=self.num_servers_to_query,
1074 level=log.NOISY, parent=lp)
1075 return self._send_more_queries(MAX_IN_FLIGHT)
1076 if not recoverable_versions:
1077 self.log("no recoverable versions: need more",
1078 level=log.NOISY, parent=lp)
1079 return self._send_more_queries(MAX_IN_FLIGHT)
1080 highest_recoverable = max(recoverable_versions)
1081 highest_recoverable_seqnum = highest_recoverable[0]
1082 for unrec_verinfo in unrecoverable_versions:
1083 if unrec_verinfo[0] > highest_recoverable_seqnum:
1084 # there is evidence of a higher-seqnum version, but we
1085 # don't yet see enough shares to recover it. Try harder.
1086 # TODO: consider sending more queries.
1087 # TODO: consider limiting the search distance
1088 self.log("evidence of higher seqnum: need more",
1089 level=log.UNUSUAL, parent=lp)
1090 return self._send_more_queries(MAX_IN_FLIGHT)
1091 # all the unrecoverable versions were old or concurrent with a
1092 # recoverable version. Good enough.
1093 self.log("no higher-seqnum: done", parent=lp)
1096 if self.mode == MODE_WRITE:
1097 # we want to keep querying until we've seen a few that don't have
1098 # any shares, to be sufficiently confident that we've seen all
1099 # the shares. This is still less work than MODE_CHECK, which asks
1100 # every server in the world.
1102 if not recoverable_versions:
1103 self.log("no recoverable versions: need more", parent=lp,
1105 return self._send_more_queries(MAX_IN_FLIGHT)
1108 last_not_responded = -1
1109 num_not_responded = 0
1112 found_boundary = False
1114 for i,server in enumerate(self.full_serverlist):
1115 if server in self._bad_servers:
1118 #self.log("loop [%s]: x" % server.get_name()
1119 elif server in self._empty_servers:
1122 #self.log("loop [%s]: 0" % server.get_name()
1123 if last_found != -1:
1125 if num_not_found >= self.EPSILON:
1126 self.log("found our boundary, %s" %
1128 parent=lp, level=log.NOISY)
1129 found_boundary = True
1132 elif server in self._servers_with_shares:
1135 #self.log("loop [%s]: 1" % server.get_name()
1141 #self.log("loop [%s]: ?" % server.get_name()
1142 last_not_responded = i
1143 num_not_responded += 1
1146 # we need to know that we've gotten answers from
1147 # everybody to the left of here
1148 if last_not_responded == -1:
1150 self.log("have all our answers",
1151 parent=lp, level=log.NOISY)
1152 # .. unless we're still waiting on the privkey
1153 if self._need_privkey:
1154 self.log("but we're still waiting for the privkey",
1155 parent=lp, level=log.NOISY)
1156 # if we found the boundary but we haven't yet found
1157 # the privkey, we may need to look further. If
1158 # somehow all the privkeys were corrupted (but the
1159 # shares were readable), then this is likely to do an
1160 # exhaustive search.
1161 return self._send_more_queries(MAX_IN_FLIGHT)
1163 # still waiting for somebody
1164 return self._send_more_queries(num_not_responded)
1166 # if we hit here, we didn't find our boundary, so we're still
1167 # waiting for servers
1168 self.log("no boundary yet, %s" % "".join(states), parent=lp,
1170 return self._send_more_queries(MAX_IN_FLIGHT)
1172 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
1173 # arbitrary, really I want this to be something like k -
1174 # max(known_version_sharecounts) + some extra
1175 self.log("catchall: need more", parent=lp, level=log.NOISY)
1176 return self._send_more_queries(MAX_IN_FLIGHT)
1178 def _send_more_queries(self, num_outstanding):
1182 self.log(format=" there are %(outstanding)d queries outstanding",
1183 outstanding=len(self._queries_outstanding),
1185 active_queries = len(self._queries_outstanding) + len(more_queries)
1186 if active_queries >= num_outstanding:
1188 if not self.extra_servers:
1190 more_queries.append(self.extra_servers.pop(0))
1192 self.log(format="sending %(more)d more queries: %(who)s",
1193 more=len(more_queries),
1194 who=" ".join(["[%s]" % s.get_name() for s in more_queries]),
1197 for server in more_queries:
1198 self._do_query(server, self._storage_index, self._read_size)
1199 # we'll retrigger when those queries come back
1202 if not self._running:
1203 self.log("not running; we're already done")
1205 self._running = False
1207 elapsed = now - self._started
1208 self._status.set_finished(now)
1209 self._status.timings["total"] = elapsed
1210 self._status.set_progress(1.0)
1211 self._status.set_status("Finished")
1212 self._status.set_active(False)
1214 self._servermap.set_last_update(self.mode, self._started)
1215 # the servermap will not be touched after this
1216 self.log("servermap: %s" % self._servermap.summarize_versions())
1218 eventually(self._done_deferred.callback, self._servermap)
1220 def _fatal_error(self, f):
1221 self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1222 self._done_deferred.errback(f)