3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually, \
9 from allmydata.util import base32, hashutil, log, deferredutil
10 from allmydata.util.dictutil import DictOfSets
11 from allmydata.storage.server import si_b2a
12 from allmydata.interfaces import IServermapUpdaterStatus
13 from pycryptopp.publickey import rsa
15 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
17 from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
20 implements(IServermapUpdaterStatus)
21 statusid_counter = count(0)
24 self.timings["per_server"] = {}
25 self.timings["cumulative_verify"] = 0.0
26 self.privkey_from = None
29 self.storage_index = None
31 self.status = "Not started"
33 self.counter = self.statusid_counter.next()
34 self.started = time.time()
37 def add_per_server_time(self, server, op, sent, elapsed):
38 serverid = server.get_serverid()
39 assert op in ("query", "late", "privkey")
40 if serverid not in self.timings["per_server"]:
41 self.timings["per_server"][serverid] = []
42 self.timings["per_server"][serverid].append((op,sent,elapsed))
44 def get_started(self):
46 def get_finished(self):
48 def get_storage_index(self):
49 return self.storage_index
52 def get_servermap(self):
54 def get_privkey_from(self):
55 return self.privkey_from
56 def using_helper(self):
62 def get_progress(self):
66 def get_counter(self):
69 def set_storage_index(self, si):
70 self.storage_index = si
71 def set_mode(self, mode):
73 def set_privkey_from(self, server):
74 self.privkey_from = server.get_serverid()
75 def set_status(self, status):
77 def set_progress(self, value):
79 def set_active(self, value):
81 def set_finished(self, when):
85 """I record the placement of mutable shares.
87 This object records which shares (of various versions) are located on
90 One purpose I serve is to inform callers about which versions of the
91 mutable file are recoverable and 'current'.
93 A second purpose is to serve as a state marker for test-and-set
94 operations. I am passed out of retrieval operations and back into publish
95 operations, which means 'publish this new version, but only if nothing
96 has changed since I last retrieved this data'. This reduces the chances
97 of clobbering a simultaneous (uncoordinated) write.
99 @var _known_shares: a dictionary, mapping a (server, shnum) tuple to a
100 (versionid, timestamp) tuple. Each 'versionid' is a
101 tuple of (seqnum, root_hash, IV, segsize, datalength,
102 k, N, signed_prefix, offsets)
104 @ivar _bad_shares: dict with keys of (server, shnum) tuples, describing
105 shares that I should ignore (because a previous user
106 of the servermap determined that they were invalid).
107 The updater only locates a certain number of shares:
108 if some of these turn out to have integrity problems
109 and are unusable, the caller will need to mark those
110 shares as bad, then re-update the servermap, then try
111 again. The dict maps (server, shnum) tuple to old
116 self._known_shares = {}
117 self.unreachable_servers = set() # servers that didn't respond to queries
118 self.reachable_servers = set() # servers that did respond to queries
119 self._problems = [] # mostly for debugging
120 self._bad_shares = {} # maps (server,shnum) to old checkstring
121 self._last_update_mode = None
122 self._last_update_time = 0
123 self.update_data = {} # (verinfo,shnum) => data
127 s._known_shares = self._known_shares.copy() # tuple->tuple
128 s.unreachable_servers = set(self.unreachable_servers)
129 s.reachable_servers = set(self.reachable_servers)
130 s._problems = self._problems[:]
131 s._bad_shares = self._bad_shares.copy() # tuple->str
132 s._last_update_mode = self._last_update_mode
133 s._last_update_time = self._last_update_time
136 def get_reachable_servers(self):
137 return self.reachable_servers
139 def mark_server_reachable(self, server):
140 self.reachable_servers.add(server)
142 def mark_server_unreachable(self, server):
143 self.unreachable_servers.add(server)
145 def mark_bad_share(self, server, shnum, checkstring):
146 """This share was found to be bad, either in the checkstring or
147 signature (detected during mapupdate), or deeper in the share
148 (detected at retrieve time). Remove it from our list of useful
149 shares, and remember that it is bad so we don't add it back again
150 later. We record the share's old checkstring (which might be
151 corrupted or badly signed) so that a repair operation can do the
152 test-and-set using it as a reference.
154 key = (server, shnum) # record checkstring
155 self._bad_shares[key] = checkstring
156 self._known_shares.pop(key, None)
158 def get_bad_shares(self):
159 # key=(server,shnum) -> checkstring
160 return self._bad_shares
162 def add_new_share(self, server, shnum, verinfo, timestamp):
163 """We've written a new share out, replacing any that was there
165 key = (server, shnum)
166 self._bad_shares.pop(key, None)
167 self._known_shares[key] = (verinfo, timestamp)
169 def add_problem(self, f):
170 self._problems.append(f)
171 def get_problems(self):
172 return self._problems
174 def set_last_update(self, mode, when):
175 self._last_update_mode = mode
176 self._last_update_time = when
177 def get_last_update(self):
178 return (self._last_update_mode, self._last_update_time)
180 def dump(self, out=sys.stdout):
181 print >>out, "servermap:"
183 for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
184 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
185 offsets_tuple) = verinfo
186 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
187 (server.get_name(), shnum,
188 seqnum, base32.b2a(root_hash)[:4], k, N,
191 print >>out, "%d PROBLEMS" % len(self._problems)
192 for f in self._problems:
196 def all_servers(self):
197 return set([server for (server, shnum) in self._known_shares])
199 def all_servers_for_version(self, verinfo):
200 """Return a set of servers that hold shares for the given version."""
202 for ( (server, shnum), (verinfo2, timestamp) )
203 in self._known_shares.items()
204 if verinfo == verinfo2])
206 def get_known_shares(self):
207 # maps (server,shnum) to (versionid,timestamp)
208 return self._known_shares
210 def make_sharemap(self):
211 """Return a dict that maps shnum to a set of servers that hold it."""
212 sharemap = DictOfSets()
213 for (server, shnum) in self._known_shares:
214 sharemap.add(shnum, server)
217 def make_versionmap(self):
218 """Return a dict that maps versionid to sets of (shnum, server,
219 timestamp) tuples."""
220 versionmap = DictOfSets()
221 for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
222 versionmap.add(verinfo, (shnum, server, timestamp))
225 def debug_shares_on_server(self, server): # used by tests
226 return set([shnum for (s, shnum) in self._known_shares if s == server])
228 def version_on_server(self, server, shnum):
229 key = (server, shnum)
230 if key in self._known_shares:
231 (verinfo, timestamp) = self._known_shares[key]
235 def shares_available(self):
236 """Return a dict that maps verinfo to tuples of
237 (num_distinct_shares, k, N) tuples."""
238 versionmap = self.make_versionmap()
240 for verinfo, shares in versionmap.items():
242 for (shnum, server, timestamp) in shares:
244 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
245 offsets_tuple) = verinfo
246 all_shares[verinfo] = (len(s), k, N)
249 def highest_seqnum(self):
250 available = self.shares_available()
251 seqnums = [verinfo[0]
252 for verinfo in available.keys()]
256 def summarize_version(self, verinfo):
257 """Take a versionid, return a string that describes it."""
258 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
259 offsets_tuple) = verinfo
260 return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
262 def summarize_versions(self):
263 """Return a string describing which versions we know about."""
264 versionmap = self.make_versionmap()
266 for (verinfo, shares) in versionmap.items():
267 vstr = self.summarize_version(verinfo)
268 shnums = set([shnum for (shnum, server, timestamp) in shares])
269 bits.append("%d*%s" % (len(shnums), vstr))
270 return "/".join(bits)
272 def recoverable_versions(self):
273 """Return a set of versionids, one for each version that is currently
275 versionmap = self.make_versionmap()
276 recoverable_versions = set()
277 for (verinfo, shares) in versionmap.items():
278 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
279 offsets_tuple) = verinfo
280 shnums = set([shnum for (shnum, server, timestamp) in shares])
282 # this one is recoverable
283 recoverable_versions.add(verinfo)
285 return recoverable_versions
287 def unrecoverable_versions(self):
288 """Return a set of versionids, one for each version that is currently
290 versionmap = self.make_versionmap()
292 unrecoverable_versions = set()
293 for (verinfo, shares) in versionmap.items():
294 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
295 offsets_tuple) = verinfo
296 shnums = set([shnum for (shnum, server, timestamp) in shares])
298 unrecoverable_versions.add(verinfo)
300 return unrecoverable_versions
302 def best_recoverable_version(self):
303 """Return a single versionid, for the so-called 'best' recoverable
304 version. Sequence number is the primary sort criteria, followed by
305 root hash. Returns None if there are no recoverable versions."""
306 recoverable = list(self.recoverable_versions())
309 return recoverable[-1]
312 def size_of_version(self, verinfo):
313 """Given a versionid (perhaps returned by best_recoverable_version),
314 return the size of the file in bytes."""
315 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
316 offsets_tuple) = verinfo
319 def unrecoverable_newer_versions(self):
320 # Return a dict of versionid -> health, for versions that are
321 # unrecoverable and have later seqnums than any recoverable versions.
322 # These indicate that a write will lose data.
323 versionmap = self.make_versionmap()
324 healths = {} # maps verinfo to (found,k)
325 unrecoverable = set()
326 highest_recoverable_seqnum = -1
327 for (verinfo, shares) in versionmap.items():
328 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
329 offsets_tuple) = verinfo
330 shnums = set([shnum for (shnum, server, timestamp) in shares])
331 healths[verinfo] = (len(shnums),k)
333 unrecoverable.add(verinfo)
335 highest_recoverable_seqnum = max(seqnum,
336 highest_recoverable_seqnum)
339 for verinfo in unrecoverable:
340 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
341 offsets_tuple) = verinfo
342 if seqnum > highest_recoverable_seqnum:
343 newversions[verinfo] = healths[verinfo]
348 def needs_merge(self):
349 # return True if there are multiple recoverable versions with the
350 # same seqnum, meaning that MutableFileNode.read_best_version is not
351 # giving you the whole story, and that using its data to do a
352 # subsequent publish will lose information.
353 recoverable_seqnums = [verinfo[0]
354 for verinfo in self.recoverable_versions()]
355 for seqnum in recoverable_seqnums:
356 if recoverable_seqnums.count(seqnum) > 1:
361 def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
363 I return the update data for the given shnum
365 update_data = self.update_data[shnum]
366 update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
370 def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
372 I record the block hash tree for the given shnum.
374 self.update_data.setdefault(shnum , []).append((verinfo, data))
377 class ServermapUpdater:
378 def __init__(self, filenode, storage_broker, monitor, servermap,
379 mode=MODE_READ, add_lease=False, update_range=None):
380 """I update a servermap, locating a sufficient number of useful
381 shares and remembering where they are located.
385 self._node = filenode
386 self._storage_broker = storage_broker
387 self._monitor = monitor
388 self._servermap = servermap
390 self._add_lease = add_lease
393 self._storage_index = filenode.get_storage_index()
394 self._last_failure = None
396 self._status = UpdateStatus()
397 self._status.set_storage_index(self._storage_index)
398 self._status.set_progress(0.0)
399 self._status.set_mode(mode)
401 self._servers_responded = set()
403 # how much data should we read?
405 # * if we only need the checkstring, then [0:75]
406 # * if we need to validate the checkstring sig, then [543ish:799ish]
407 # * if we need the verification key, then [107:436ish]
408 # * the offset table at [75:107] tells us about the 'ish'
409 # * if we need the encrypted private key, we want [-1216ish:]
410 # * but we can't read from negative offsets
411 # * the offset table tells us the 'ish', also the positive offset
413 # * Checkstring? [0:72]
414 # * If we want to validate the checkstring, then [0:72], [143:?] --
415 # the offset table will tell us for sure.
416 # * If we need the verification key, we have to consult the offset
418 # At this point, we don't know which we are. Our filenode can
419 # tell us, but it might be lying -- in some cases, we're
420 # responsible for telling it which kind of file it is.
421 self._read_size = 4000
422 if mode == MODE_CHECK:
423 # we use unpack_prefix_and_signature, so we need 1k
424 self._read_size = 1000
425 self._need_privkey = False
427 if mode == MODE_WRITE and not self._node.get_privkey():
428 self._need_privkey = True
429 # check+repair: repair requires the privkey, so if we didn't happen
430 # to ask for it during the check, we'll have problems doing the
433 self.fetch_update_data = False
434 if mode == MODE_WRITE and update_range:
435 # We're updating the servermap in preparation for an
436 # in-place file update, so we need to fetch some additional
437 # data from each share that we find.
438 assert len(update_range) == 2
440 self.start_segment = update_range[0]
441 self.end_segment = update_range[1]
442 self.fetch_update_data = True
444 prefix = si_b2a(self._storage_index)[:5]
445 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
446 si=prefix, mode=mode)
448 def get_status(self):
451 def log(self, *args, **kwargs):
452 if "parent" not in kwargs:
453 kwargs["parent"] = self._log_number
454 if "facility" not in kwargs:
455 kwargs["facility"] = "tahoe.mutable.mapupdate"
456 return log.msg(*args, **kwargs)
459 """Update the servermap to reflect current conditions. Returns a
460 Deferred that fires with the servermap once the update has finished."""
461 self._started = time.time()
462 self._status.set_active(True)
464 # self._valid_versions is a set of validated verinfo tuples. We just
465 # use it to remember which versions had valid signatures, so we can
466 # avoid re-checking the signatures for each share.
467 self._valid_versions = set()
469 self._done_deferred = defer.Deferred()
471 # first, which servers should be talk to? Any that were in our old
472 # servermap, plus "enough" others.
474 self._queries_completed = 0
476 sb = self._storage_broker
477 # All of the servers, permuted by the storage index, as usual.
478 full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
479 self.full_serverlist = full_serverlist # for use later, immutable
480 self.extra_servers = full_serverlist[:] # servers are removed as we use them
481 self._good_servers = set() # servers who had some shares
482 self._empty_servers = set() # servers who don't have any shares
483 self._bad_servers = set() # servers to whom our queries failed
485 k = self._node.get_required_shares()
486 # For what cases can these conditions work?
490 N = self._node.get_total_shares()
494 # we want to send queries to at least this many servers (although we
495 # might not wait for all of their answers to come back)
496 self.num_servers_to_query = k + self.EPSILON
498 if self.mode == MODE_CHECK:
499 # We want to query all of the servers.
500 initial_servers_to_query = list(full_serverlist)
501 must_query = set(initial_servers_to_query)
502 self.extra_servers = []
503 elif self.mode == MODE_WRITE:
504 # we're planning to replace all the shares, so we want a good
505 # chance of finding them all. We will keep searching until we've
506 # seen epsilon that don't have a share.
507 # We don't query all of the servers because that could take a while.
508 self.num_servers_to_query = N + self.EPSILON
509 initial_servers_to_query, must_query = self._build_initial_querylist()
510 self.required_num_empty_servers = self.EPSILON
512 # TODO: arrange to read lots of data from k-ish servers, to avoid
513 # the extra round trip required to read large directories. This
514 # might also avoid the round trip required to read the encrypted
517 else: # MODE_READ, MODE_ANYTHING
518 # 2*k servers is good enough.
519 initial_servers_to_query, must_query = self._build_initial_querylist()
521 # this is a set of servers that we are required to get responses
522 # from: they are servers who used to have a share, so we need to know
523 # where they currently stand, even if that means we have to wait for
524 # a silently-lost TCP connection to time out. We remove servers from
525 # this set as we get responses.
526 self._must_query = set(must_query)
528 # now initial_servers_to_query contains the servers that we should
529 # ask, self.must_query contains the servers that we must have heard
530 # from before we can consider ourselves finished, and
531 # self.extra_servers contains the overflow (servers that we should
532 # tap if we don't get enough responses)
533 # I guess that self._must_query is a subset of
534 # initial_servers_to_query?
535 assert must_query.issubset(initial_servers_to_query)
537 self._send_initial_requests(initial_servers_to_query)
538 self._status.timings["initial_queries"] = time.time() - self._started
539 return self._done_deferred
541 def _build_initial_querylist(self):
542 # we send queries to everyone who was already in the sharemap
543 initial_servers_to_query = set(self._servermap.all_servers())
544 # and we must wait for responses from them
545 must_query = set(initial_servers_to_query)
547 while ((self.num_servers_to_query > len(initial_servers_to_query))
548 and self.extra_servers):
549 initial_servers_to_query.add(self.extra_servers.pop(0))
551 return initial_servers_to_query, must_query
553 def _send_initial_requests(self, serverlist):
554 self._status.set_status("Sending %d initial queries" % len(serverlist))
555 self._queries_outstanding = set()
556 for server in serverlist:
557 self._queries_outstanding.add(server)
558 self._do_query(server, self._storage_index, self._read_size)
561 # there is nobody to ask, so we need to short-circuit the state
563 d = defer.maybeDeferred(self._check_for_done, None)
564 d.addErrback(self._fatal_error)
566 # control flow beyond this point: state machine. Receiving responses
567 # from queries is the input. We might send out more queries, or we
568 # might produce a result.
571 def _do_query(self, server, storage_index, readsize):
572 self.log(format="sending query to [%(name)s], readsize=%(readsize)d",
573 name=server.get_name(),
576 started = time.time()
577 self._queries_outstanding.add(server)
578 d = self._do_read(server, storage_index, [], [(0, readsize)])
579 d.addCallback(self._got_results, server, readsize, storage_index,
581 d.addErrback(self._query_failed, server)
582 # errors that aren't handled by _query_failed (and errors caused by
583 # _query_failed) get logged, but we still want to check for doneness.
584 d.addErrback(log.err)
585 d.addErrback(self._fatal_error)
586 d.addCallback(self._check_for_done)
589 def _do_read(self, server, storage_index, shnums, readv):
590 ss = server.get_rref()
592 # send an add-lease message in parallel. The results are handled
593 # separately. This is sent before the slot_readv() so that we can
594 # be sure the add_lease is retired by the time slot_readv comes
595 # back (this relies upon our knowledge that the server code for
596 # add_lease is synchronous).
597 renew_secret = self._node.get_renewal_secret(server)
598 cancel_secret = self._node.get_cancel_secret(server)
599 d2 = ss.callRemote("add_lease", storage_index,
600 renew_secret, cancel_secret)
602 d2.addErrback(self._add_lease_failed, server, storage_index)
603 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
607 def _got_corrupt_share(self, e, shnum, server, data, lp):
609 I am called when a remote server returns a corrupt share in
610 response to one of our queries. By corrupt, I mean a share
611 without a valid signature. I then record the failure, notify the
612 server of the corruption, and record the share as bad.
614 f = failure.Failure(e)
615 self.log(format="bad share: %(f_value)s", f_value=str(f),
616 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
617 # Notify the server that its share is corrupt.
618 self.notify_server_corruption(server, shnum, str(e))
619 # By flagging this as a bad server, we won't count any of
620 # the other shares on that server as valid, though if we
621 # happen to find a valid version string amongst those
622 # shares, we'll keep track of it so that we don't need
623 # to validate the signature on those again.
624 self._bad_servers.add(server)
625 self._last_failure = f
626 # XXX: Use the reader for this?
627 checkstring = data[:SIGNED_PREFIX_LENGTH]
628 self._servermap.mark_bad_share(server, shnum, checkstring)
629 self._servermap.add_problem(f)
632 def _cache_good_sharedata(self, verinfo, shnum, now, data):
634 If one of my queries returns successfully (which means that we
635 were able to and successfully did validate the signature), I
636 cache the data that we initially fetched from the storage
637 server. This will help reduce the number of roundtrips that need
638 to occur when the file is downloaded, or when the file is
642 self._node._add_to_cache(verinfo, shnum, 0, data)
645 def _got_results(self, datavs, server, readsize, storage_index, started):
646 lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
647 name=server.get_name(),
648 numshares=len(datavs))
649 ss = server.get_rref()
651 elapsed = now - started
652 def _done_processing(ignored=None):
653 self._queries_outstanding.discard(server)
654 self._servermap.mark_server_reachable(server)
655 self._must_query.discard(server)
656 self._queries_completed += 1
657 if not self._running:
658 self.log("but we're not running, so we'll ignore it", parent=lp)
660 self._status.add_per_server_time(server, "late", started, elapsed)
662 self._status.add_per_server_time(server, "query", started, elapsed)
665 self._good_servers.add(server)
667 self._empty_servers.add(server)
671 for shnum,datav in datavs.items():
673 reader = MDMFSlotReadProxy(ss,
677 # our goal, with each response, is to validate the version
678 # information and share data as best we can at this point --
679 # we do this by validating the signature. To do this, we
680 # need to do the following:
681 # - If we don't already have the public key, fetch the
682 # public key. We use this to validate the signature.
683 if not self._node.get_pubkey():
684 # fetch and set the public key.
685 d = reader.get_verification_key()
686 d.addCallback(lambda results, shnum=shnum:
687 self._try_to_set_pubkey(results, server, shnum, lp))
688 # XXX: Make self._pubkey_query_failed?
689 d.addErrback(lambda error, shnum=shnum, data=data:
690 self._got_corrupt_share(error, shnum, server, data, lp))
692 # we already have the public key.
693 d = defer.succeed(None)
695 # Neither of these two branches return anything of
696 # consequence, so the first entry in our deferredlist will
699 # - Next, we need the version information. We almost
700 # certainly got this by reading the first thousand or so
701 # bytes of the share on the storage server, so we
702 # shouldn't need to fetch anything at this step.
703 d2 = reader.get_verinfo()
704 d2.addErrback(lambda error, shnum=shnum, data=data:
705 self._got_corrupt_share(error, shnum, server, data, lp))
706 # - Next, we need the signature. For an SDMF share, it is
707 # likely that we fetched this when doing our initial fetch
708 # to get the version information. In MDMF, this lives at
709 # the end of the share, so unless the file is quite small,
710 # we'll need to do a remote fetch to get it.
711 d3 = reader.get_signature()
712 d3.addErrback(lambda error, shnum=shnum, data=data:
713 self._got_corrupt_share(error, shnum, server, data, lp))
714 # Once we have all three of these responses, we can move on
715 # to validating the signature
717 # Does the node already have a privkey? If not, we'll try to
719 if self._need_privkey:
720 d4 = reader.get_encprivkey()
721 d4.addCallback(lambda results, shnum=shnum:
722 self._try_to_validate_privkey(results, server, shnum, lp))
723 d4.addErrback(lambda error, shnum=shnum:
724 self._privkey_query_failed(error, server, shnum, lp))
726 d4 = defer.succeed(None)
729 if self.fetch_update_data:
730 # fetch the block hash tree and first + last segment, as
731 # configured earlier.
732 # Then set them in wherever we happen to want to set
735 # XXX: We do this above, too. Is there a good way to
736 # make the two routines share the value without
737 # introducing more roundtrips?
738 ds.append(reader.get_verinfo())
739 ds.append(reader.get_blockhashes())
740 ds.append(reader.get_block_and_salt(self.start_segment))
741 ds.append(reader.get_block_and_salt(self.end_segment))
742 d5 = deferredutil.gatherResults(ds)
743 d5.addCallback(self._got_update_results_one_share, shnum)
745 d5 = defer.succeed(None)
747 dl = defer.DeferredList([d, d2, d3, d4, d5])
748 dl.addBoth(self._turn_barrier)
749 dl.addCallback(lambda results, shnum=shnum:
750 self._got_signature_one_share(results, shnum, server, lp))
751 dl.addErrback(lambda error, shnum=shnum, data=data:
752 self._got_corrupt_share(error, shnum, server, data, lp))
753 dl.addCallback(lambda verinfo, shnum=shnum, data=data:
754 self._cache_good_sharedata(verinfo, shnum, now, data))
756 # dl is a deferred list that will fire when all of the shares
757 # that we found on this server are done processing. When dl fires,
758 # we know that processing is done, so we can decrement the
759 # semaphore-like thing that we incremented earlier.
760 dl = defer.DeferredList(ds, fireOnOneErrback=True)
761 # Are we done? Done means that there are no more queries to
762 # send, that there are no outstanding queries, and that we
763 # haven't received any queries that are still processing. If we
764 # are done, self._check_for_done will cause the done deferred
765 # that we returned to our caller to fire, which tells them that
766 # they have a complete servermap, and that we won't be touching
767 # the servermap anymore.
768 dl.addCallback(_done_processing)
769 dl.addCallback(self._check_for_done)
770 dl.addErrback(self._fatal_error)
772 self.log("_got_results done", parent=lp, level=log.NOISY)
776 def _turn_barrier(self, result):
778 I help the servermap updater avoid the recursion limit issues
781 return fireEventually(result)
784 def _try_to_set_pubkey(self, pubkey_s, server, shnum, lp):
785 if self._node.get_pubkey():
786 return # don't go through this again if we don't have to
787 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
788 assert len(fingerprint) == 32
789 if fingerprint != self._node.get_fingerprint():
790 raise CorruptShareError(server, shnum,
791 "pubkey doesn't match fingerprint")
792 self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
793 assert self._node.get_pubkey()
796 def notify_server_corruption(self, server, shnum, reason):
797 rref = server.get_rref()
798 rref.callRemoteOnly("advise_corrupt_share",
799 "mutable", self._storage_index, shnum, reason)
802 def _got_signature_one_share(self, results, shnum, server, lp):
803 # It is our job to give versioninfo to our caller. We need to
804 # raise CorruptShareError if the share is corrupt for any
805 # reason, something that our caller will handle.
806 self.log(format="_got_results: got shnum #%(shnum)d from serverid %(name)s",
808 name=server.get_name(),
811 if not self._running:
812 # We can't process the results, since we can't touch the
814 self.log("but we're not running anymore.")
817 _, verinfo, signature, __, ___ = results
826 offsets) = verinfo[1]
827 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
829 # XXX: This should be done for us in the method, so
830 # presumably you can go in there and fix it.
840 # This tuple uniquely identifies a share on the grid; we use it
841 # to keep track of the ones that we've already seen.
843 if verinfo not in self._valid_versions:
844 # This is a new version tuple, and we need to validate it
845 # against the public key before keeping track of it.
846 assert self._node.get_pubkey()
847 valid = self._node.get_pubkey().verify(prefix, signature[1])
849 raise CorruptShareError(server, shnum,
850 "signature is invalid")
852 # ok, it's a valid verinfo. Add it to the list of validated
854 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
855 % (seqnum, base32.b2a(root_hash)[:4],
856 server.get_name(), shnum,
857 k, n, segsize, datalen),
859 self._valid_versions.add(verinfo)
860 # We now know that this is a valid candidate verinfo. Whether or
861 # not this instance of it is valid is a matter for the next
862 # statement; at this point, we just know that if we see this
863 # version info again, that its signature checks out and that
864 # we're okay to skip the signature-checking step.
866 # (server, shnum) are bound in the method invocation.
867 if (server, shnum) in self._servermap.get_bad_shares():
868 # we've been told that the rest of the data in this share is
869 # unusable, so don't add it to the servermap.
870 self.log("but we've been told this is a bad share",
871 parent=lp, level=log.UNUSUAL)
874 # Add the info to our servermap.
875 timestamp = time.time()
876 self._servermap.add_new_share(server, shnum, verinfo, timestamp)
881 def _got_update_results_one_share(self, results, share):
883 I record the update results in results.
885 assert len(results) == 4
886 verinfo, blockhashes, start, end = results
896 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
898 # XXX: This should be done for us in the method, so
899 # presumably you can go in there and fix it.
910 update_data = (blockhashes, start, end)
911 self._servermap.set_update_data_for_share_and_verinfo(share,
916 def _deserialize_pubkey(self, pubkey_s):
917 verifier = rsa.create_verifying_key_from_string(pubkey_s)
921 def _try_to_validate_privkey(self, enc_privkey, server, shnum, lp):
923 Given a writekey from a remote server, I validate it against the
924 writekey stored in my node. If it is valid, then I set the
925 privkey and encprivkey properties of the node.
927 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
928 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
929 if alleged_writekey != self._node.get_writekey():
930 self.log("invalid privkey from %s shnum %d" %
931 (server.get_name(), shnum),
932 parent=lp, level=log.WEIRD, umid="aJVccw")
936 self.log("got valid privkey from shnum %d on serverid %s" %
937 (shnum, server.get_name()),
939 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
940 self._node._populate_encprivkey(enc_privkey)
941 self._node._populate_privkey(privkey)
942 self._need_privkey = False
943 self._status.set_privkey_from(server)
946 def _add_lease_failed(self, f, server, storage_index):
947 # Older versions of Tahoe didn't handle the add-lease message very
948 # well: <=1.1.0 throws a NameError because it doesn't implement
949 # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
950 # (which is most of them, since we send add-lease to everybody,
951 # before we know whether or not they have any shares for us), and
952 # 1.2.0 throws KeyError even on known buckets due to an internal bug
953 # in the latency-measuring code.
955 # we want to ignore the known-harmless errors and log the others. In
956 # particular we want to log any local errors caused by coding
959 if f.check(DeadReferenceError):
961 if f.check(RemoteException):
962 if f.value.failure.check(KeyError, IndexError, NameError):
963 # this may ignore a bit too much, but that only hurts us
966 self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
967 name=server.get_name(),
968 f_value=str(f.value),
970 level=log.WEIRD, umid="iqg3mw")
972 # local errors are cause for alarm
974 format="local error in add_lease to [%(name)s]: %(f_value)s",
975 name=server.get_name(),
976 f_value=str(f.value),
977 level=log.WEIRD, umid="ZWh6HA")
979 def _query_failed(self, f, server):
980 if not self._running:
983 if f.check(DeadReferenceError):
985 self.log(format="error during query: %(f_value)s",
986 f_value=str(f.value), failure=f,
987 level=level, umid="IHXuQg")
988 self._must_query.discard(server)
989 self._queries_outstanding.discard(server)
990 self._bad_servers.add(server)
991 self._servermap.add_problem(f)
992 # a server could be in both ServerMap.reachable_servers and
993 # .unreachable_servers if they responded to our query, but then an
994 # exception was raised in _got_results.
995 self._servermap.mark_server_unreachable(server)
996 self._queries_completed += 1
997 self._last_failure = f
1000 def _privkey_query_failed(self, f, server, shnum, lp):
1001 self._queries_outstanding.discard(server)
1002 if not self._running:
1005 if f.check(DeadReferenceError):
1007 self.log(format="error during privkey query: %(f_value)s",
1008 f_value=str(f.value), failure=f,
1009 parent=lp, level=level, umid="McoJ5w")
1010 self._servermap.add_problem(f)
1011 self._last_failure = f
1014 def _check_for_done(self, res):
1016 # return self._send_more_queries(outstanding) : send some more queries
1017 # return self._done() : all done
1018 # return : keep waiting, no new queries
1019 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
1020 "%(outstanding)d queries outstanding, "
1021 "%(extra)d extra servers available, "
1022 "%(must)d 'must query' servers left, "
1023 "need_privkey=%(need_privkey)s"
1026 outstanding=len(self._queries_outstanding),
1027 extra=len(self.extra_servers),
1028 must=len(self._must_query),
1029 need_privkey=self._need_privkey,
1033 if not self._running:
1034 self.log("but we're not running", parent=lp, level=log.NOISY)
1037 if self._must_query:
1038 # we are still waiting for responses from servers that used to have
1039 # a share, so we must continue to wait. No additional queries are
1040 # required at this time.
1041 self.log("%d 'must query' servers left" % len(self._must_query),
1042 level=log.NOISY, parent=lp)
1045 if (not self._queries_outstanding and not self.extra_servers):
1046 # all queries have retired, and we have no servers left to ask. No
1047 # more progress can be made, therefore we are done.
1048 self.log("all queries are retired, no extra servers: done",
1052 recoverable_versions = self._servermap.recoverable_versions()
1053 unrecoverable_versions = self._servermap.unrecoverable_versions()
1055 # what is our completion policy? how hard should we work?
1057 if self.mode == MODE_ANYTHING:
1058 if recoverable_versions:
1059 self.log("%d recoverable versions: done"
1060 % len(recoverable_versions),
1064 if self.mode == MODE_CHECK:
1065 # we used self._must_query, and we know there aren't any
1066 # responses still waiting, so that means we must be done
1067 self.log("done", parent=lp)
1071 if self.mode == MODE_READ:
1072 # if we've queried k+epsilon servers, and we see a recoverable
1073 # version, and we haven't seen any unrecoverable higher-seqnum'ed
1074 # versions, then we're done.
1076 if self._queries_completed < self.num_servers_to_query:
1077 self.log(format="%(completed)d completed, %(query)d to query: need more",
1078 completed=self._queries_completed,
1079 query=self.num_servers_to_query,
1080 level=log.NOISY, parent=lp)
1081 return self._send_more_queries(MAX_IN_FLIGHT)
1082 if not recoverable_versions:
1083 self.log("no recoverable versions: need more",
1084 level=log.NOISY, parent=lp)
1085 return self._send_more_queries(MAX_IN_FLIGHT)
1086 highest_recoverable = max(recoverable_versions)
1087 highest_recoverable_seqnum = highest_recoverable[0]
1088 for unrec_verinfo in unrecoverable_versions:
1089 if unrec_verinfo[0] > highest_recoverable_seqnum:
1090 # there is evidence of a higher-seqnum version, but we
1091 # don't yet see enough shares to recover it. Try harder.
1092 # TODO: consider sending more queries.
1093 # TODO: consider limiting the search distance
1094 self.log("evidence of higher seqnum: need more",
1095 level=log.UNUSUAL, parent=lp)
1096 return self._send_more_queries(MAX_IN_FLIGHT)
1097 # all the unrecoverable versions were old or concurrent with a
1098 # recoverable version. Good enough.
1099 self.log("no higher-seqnum: done", parent=lp)
1102 if self.mode == MODE_WRITE:
1103 # we want to keep querying until we've seen a few that don't have
1104 # any shares, to be sufficiently confident that we've seen all
1105 # the shares. This is still less work than MODE_CHECK, which asks
1106 # every server in the world.
1108 if not recoverable_versions:
1109 self.log("no recoverable versions: need more", parent=lp,
1111 return self._send_more_queries(MAX_IN_FLIGHT)
1114 last_not_responded = -1
1115 num_not_responded = 0
1118 found_boundary = False
1120 for i,server in enumerate(self.full_serverlist):
1121 if server in self._bad_servers:
1124 #self.log("loop [%s]: x" % server.get_name()
1125 elif server in self._empty_servers:
1128 #self.log("loop [%s]: 0" % server.get_name()
1129 if last_found != -1:
1131 if num_not_found >= self.EPSILON:
1132 self.log("found our boundary, %s" %
1134 parent=lp, level=log.NOISY)
1135 found_boundary = True
1138 elif server in self._good_servers:
1141 #self.log("loop [%s]: 1" % server.get_name()
1147 #self.log("loop [%s]: ?" % server.get_name()
1148 last_not_responded = i
1149 num_not_responded += 1
1152 # we need to know that we've gotten answers from
1153 # everybody to the left of here
1154 if last_not_responded == -1:
1156 self.log("have all our answers",
1157 parent=lp, level=log.NOISY)
1158 # .. unless we're still waiting on the privkey
1159 if self._need_privkey:
1160 self.log("but we're still waiting for the privkey",
1161 parent=lp, level=log.NOISY)
1162 # if we found the boundary but we haven't yet found
1163 # the privkey, we may need to look further. If
1164 # somehow all the privkeys were corrupted (but the
1165 # shares were readable), then this is likely to do an
1166 # exhaustive search.
1167 return self._send_more_queries(MAX_IN_FLIGHT)
1169 # still waiting for somebody
1170 return self._send_more_queries(num_not_responded)
1172 # if we hit here, we didn't find our boundary, so we're still
1173 # waiting for servers
1174 self.log("no boundary yet, %s" % "".join(states), parent=lp,
1176 return self._send_more_queries(MAX_IN_FLIGHT)
1178 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
1179 # arbitrary, really I want this to be something like k -
1180 # max(known_version_sharecounts) + some extra
1181 self.log("catchall: need more", parent=lp, level=log.NOISY)
1182 return self._send_more_queries(MAX_IN_FLIGHT)
1184 def _send_more_queries(self, num_outstanding):
1188 self.log(format=" there are %(outstanding)d queries outstanding",
1189 outstanding=len(self._queries_outstanding),
1191 active_queries = len(self._queries_outstanding) + len(more_queries)
1192 if active_queries >= num_outstanding:
1194 if not self.extra_servers:
1196 more_queries.append(self.extra_servers.pop(0))
1198 self.log(format="sending %(more)d more queries: %(who)s",
1199 more=len(more_queries),
1200 who=" ".join(["[%s]" % s.get_name() for s in more_queries]),
1203 for server in more_queries:
1204 self._do_query(server, self._storage_index, self._read_size)
1205 # we'll retrigger when those queries come back
1208 if not self._running:
1209 self.log("not running; we're already done")
1211 self._running = False
1213 elapsed = now - self._started
1214 self._status.set_finished(now)
1215 self._status.timings["total"] = elapsed
1216 self._status.set_progress(1.0)
1217 self._status.set_status("Finished")
1218 self._status.set_active(False)
1220 self._servermap.set_last_update(self.mode, self._started)
1221 # the servermap will not be touched after this
1222 self.log("servermap: %s" % self._servermap.summarize_versions())
1224 eventually(self._done_deferred.callback, self._servermap)
1226 def _fatal_error(self, f):
1227 self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1228 self._done_deferred.errback(f)