3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually, \
9 from allmydata.util import base32, hashutil, log, deferredutil
10 from allmydata.util.dictutil import DictOfSets
11 from allmydata.storage.server import si_b2a
12 from allmydata.interfaces import IServermapUpdaterStatus
13 from pycryptopp.publickey import rsa
15 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \
16 MODE_READ, MODE_REPAIR, CorruptShareError
17 from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
20 implements(IServermapUpdaterStatus)
21 statusid_counter = count(0)
24 self.timings["per_server"] = {}
25 self.timings["cumulative_verify"] = 0.0
26 self.privkey_from = None
29 self.storage_index = None
31 self.status = "Not started"
33 self.counter = self.statusid_counter.next()
34 self.started = time.time()
37 def add_per_server_time(self, server, op, sent, elapsed):
38 assert op in ("query", "late", "privkey")
39 if server not in self.timings["per_server"]:
40 self.timings["per_server"][server] = []
41 self.timings["per_server"][server].append((op,sent,elapsed))
43 def get_started(self):
45 def get_finished(self):
47 def get_storage_index(self):
48 return self.storage_index
51 def get_servermap(self):
53 def get_privkey_from(self):
54 return self.privkey_from
55 def using_helper(self):
61 def get_progress(self):
65 def get_counter(self):
68 def set_storage_index(self, si):
69 self.storage_index = si
70 def set_mode(self, mode):
72 def set_privkey_from(self, server):
73 self.privkey_from = server
74 def set_status(self, status):
76 def set_progress(self, value):
78 def set_active(self, value):
80 def set_finished(self, when):
84 """I record the placement of mutable shares.
86 This object records which shares (of various versions) are located on
89 One purpose I serve is to inform callers about which versions of the
90 mutable file are recoverable and 'current'.
92 A second purpose is to serve as a state marker for test-and-set
93 operations. I am passed out of retrieval operations and back into publish
94 operations, which means 'publish this new version, but only if nothing
95 has changed since I last retrieved this data'. This reduces the chances
96 of clobbering a simultaneous (uncoordinated) write.
98 @var _known_shares: a dictionary, mapping a (server, shnum) tuple to a
99 (versionid, timestamp) tuple. Each 'versionid' is a
100 tuple of (seqnum, root_hash, IV, segsize, datalength,
101 k, N, signed_prefix, offsets)
103 @ivar _bad_shares: dict with keys of (server, shnum) tuples, describing
104 shares that I should ignore (because a previous user
105 of the servermap determined that they were invalid).
106 The updater only locates a certain number of shares:
107 if some of these turn out to have integrity problems
108 and are unusable, the caller will need to mark those
109 shares as bad, then re-update the servermap, then try
110 again. The dict maps (server, shnum) tuple to old
115 self._known_shares = {}
116 self.unreachable_servers = set() # servers that didn't respond to queries
117 self.reachable_servers = set() # servers that did respond to queries
118 self._problems = [] # mostly for debugging
119 self._bad_shares = {} # maps (server,shnum) to old checkstring
120 self._last_update_mode = None
121 self._last_update_time = 0
122 self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
123 # where blockhashes is a list of bytestrings (the result of
124 # layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
125 # (block,salt) tuple-of-bytestrings from get_block_and_salt()
129 s._known_shares = self._known_shares.copy() # tuple->tuple
130 s.unreachable_servers = set(self.unreachable_servers)
131 s.reachable_servers = set(self.reachable_servers)
132 s._problems = self._problems[:]
133 s._bad_shares = self._bad_shares.copy() # tuple->str
134 s._last_update_mode = self._last_update_mode
135 s._last_update_time = self._last_update_time
138 def get_reachable_servers(self):
139 return self.reachable_servers
141 def mark_server_reachable(self, server):
142 self.reachable_servers.add(server)
144 def mark_server_unreachable(self, server):
145 self.unreachable_servers.add(server)
147 def mark_bad_share(self, server, shnum, checkstring):
148 """This share was found to be bad, either in the checkstring or
149 signature (detected during mapupdate), or deeper in the share
150 (detected at retrieve time). Remove it from our list of useful
151 shares, and remember that it is bad so we don't add it back again
152 later. We record the share's old checkstring (which might be
153 corrupted or badly signed) so that a repair operation can do the
154 test-and-set using it as a reference.
156 key = (server, shnum) # record checkstring
157 self._bad_shares[key] = checkstring
158 self._known_shares.pop(key, None)
160 def get_bad_shares(self):
161 # key=(server,shnum) -> checkstring
162 return self._bad_shares
164 def add_new_share(self, server, shnum, verinfo, timestamp):
165 """We've written a new share out, replacing any that was there
167 key = (server, shnum)
168 self._bad_shares.pop(key, None)
169 self._known_shares[key] = (verinfo, timestamp)
171 def add_problem(self, f):
172 self._problems.append(f)
173 def get_problems(self):
174 return self._problems
176 def set_last_update(self, mode, when):
177 self._last_update_mode = mode
178 self._last_update_time = when
179 def get_last_update(self):
180 return (self._last_update_mode, self._last_update_time)
182 def dump(self, out=sys.stdout):
183 print >>out, "servermap:"
185 for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
186 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
187 offsets_tuple) = verinfo
188 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
189 (server.get_name(), shnum,
190 seqnum, base32.b2a(root_hash)[:4], k, N,
193 print >>out, "%d PROBLEMS" % len(self._problems)
194 for f in self._problems:
198 def all_servers(self):
199 return set([server for (server, shnum) in self._known_shares])
201 def all_servers_for_version(self, verinfo):
202 """Return a set of servers that hold shares for the given version."""
204 for ( (server, shnum), (verinfo2, timestamp) )
205 in self._known_shares.items()
206 if verinfo == verinfo2])
208 def get_known_shares(self):
209 # maps (server,shnum) to (versionid,timestamp)
210 return self._known_shares
212 def make_sharemap(self):
213 """Return a dict that maps shnum to a set of servers that hold it."""
214 sharemap = DictOfSets()
215 for (server, shnum) in self._known_shares:
216 sharemap.add(shnum, server)
219 def make_versionmap(self):
220 """Return a dict that maps versionid to sets of (shnum, server,
221 timestamp) tuples."""
222 versionmap = DictOfSets()
223 for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
224 versionmap.add(verinfo, (shnum, server, timestamp))
227 def debug_shares_on_server(self, server): # used by tests
228 return set([shnum for (s, shnum) in self._known_shares if s == server])
230 def version_on_server(self, server, shnum):
231 key = (server, shnum)
232 if key in self._known_shares:
233 (verinfo, timestamp) = self._known_shares[key]
237 def shares_available(self):
238 """Return a dict that maps verinfo to tuples of
239 (num_distinct_shares, k, N) tuples."""
240 versionmap = self.make_versionmap()
242 for verinfo, shares in versionmap.items():
244 for (shnum, server, timestamp) in shares:
246 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
247 offsets_tuple) = verinfo
248 all_shares[verinfo] = (len(s), k, N)
251 def highest_seqnum(self):
252 available = self.shares_available()
253 seqnums = [verinfo[0]
254 for verinfo in available.keys()]
258 def summarize_version(self, verinfo):
259 """Take a versionid, return a string that describes it."""
260 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
261 offsets_tuple) = verinfo
262 return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
264 def summarize_versions(self):
265 """Return a string describing which versions we know about."""
266 versionmap = self.make_versionmap()
268 for (verinfo, shares) in versionmap.items():
269 vstr = self.summarize_version(verinfo)
270 shnums = set([shnum for (shnum, server, timestamp) in shares])
271 bits.append("%d*%s" % (len(shnums), vstr))
272 return "/".join(bits)
274 def recoverable_versions(self):
275 """Return a set of versionids, one for each version that is currently
277 versionmap = self.make_versionmap()
278 recoverable_versions = set()
279 for (verinfo, shares) in versionmap.items():
280 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
281 offsets_tuple) = verinfo
282 shnums = set([shnum for (shnum, server, timestamp) in shares])
284 # this one is recoverable
285 recoverable_versions.add(verinfo)
287 return recoverable_versions
289 def unrecoverable_versions(self):
290 """Return a set of versionids, one for each version that is currently
292 versionmap = self.make_versionmap()
294 unrecoverable_versions = set()
295 for (verinfo, shares) in versionmap.items():
296 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
297 offsets_tuple) = verinfo
298 shnums = set([shnum for (shnum, server, timestamp) in shares])
300 unrecoverable_versions.add(verinfo)
302 return unrecoverable_versions
304 def best_recoverable_version(self):
305 """Return a single versionid, for the so-called 'best' recoverable
306 version. Sequence number is the primary sort criteria, followed by
307 root hash. Returns None if there are no recoverable versions."""
308 recoverable = list(self.recoverable_versions())
311 return recoverable[-1]
314 def size_of_version(self, verinfo):
315 """Given a versionid (perhaps returned by best_recoverable_version),
316 return the size of the file in bytes."""
317 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
318 offsets_tuple) = verinfo
321 def unrecoverable_newer_versions(self):
322 # Return a dict of versionid -> health, for versions that are
323 # unrecoverable and have later seqnums than any recoverable versions.
324 # These indicate that a write will lose data.
325 versionmap = self.make_versionmap()
326 healths = {} # maps verinfo to (found,k)
327 unrecoverable = set()
328 highest_recoverable_seqnum = -1
329 for (verinfo, shares) in versionmap.items():
330 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
331 offsets_tuple) = verinfo
332 shnums = set([shnum for (shnum, server, timestamp) in shares])
333 healths[verinfo] = (len(shnums),k)
335 unrecoverable.add(verinfo)
337 highest_recoverable_seqnum = max(seqnum,
338 highest_recoverable_seqnum)
341 for verinfo in unrecoverable:
342 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
343 offsets_tuple) = verinfo
344 if seqnum > highest_recoverable_seqnum:
345 newversions[verinfo] = healths[verinfo]
350 def needs_merge(self):
351 # return True if there are multiple recoverable versions with the
352 # same seqnum, meaning that MutableFileNode.read_best_version is not
353 # giving you the whole story, and that using its data to do a
354 # subsequent publish will lose information.
355 recoverable_seqnums = [verinfo[0]
356 for verinfo in self.recoverable_versions()]
357 for seqnum in recoverable_seqnums:
358 if recoverable_seqnums.count(seqnum) > 1:
363 def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
365 I return the update data for the given shnum
367 update_data = self.update_data[shnum]
368 update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
372 def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
374 I record the block hash tree for the given shnum.
376 self.update_data.setdefault(shnum , []).append((verinfo, data))
379 class ServermapUpdater:
380 def __init__(self, filenode, storage_broker, monitor, servermap,
381 mode=MODE_READ, add_lease=False, update_range=None):
382 """I update a servermap, locating a sufficient number of useful
383 shares and remembering where they are located.
387 self._node = filenode
388 self._storage_broker = storage_broker
389 self._monitor = monitor
390 self._servermap = servermap
392 self._add_lease = add_lease
395 self._storage_index = filenode.get_storage_index()
396 self._last_failure = None
398 self._status = UpdateStatus()
399 self._status.set_storage_index(self._storage_index)
400 self._status.set_progress(0.0)
401 self._status.set_mode(mode)
403 self._servers_responded = set()
405 # how much data should we read?
407 # * if we only need the checkstring, then [0:75]
408 # * if we need to validate the checkstring sig, then [543ish:799ish]
409 # * if we need the verification key, then [107:436ish]
410 # * the offset table at [75:107] tells us about the 'ish'
411 # * if we need the encrypted private key, we want [-1216ish:]
412 # * but we can't read from negative offsets
413 # * the offset table tells us the 'ish', also the positive offset
415 # * Checkstring? [0:72]
416 # * If we want to validate the checkstring, then [0:72], [143:?] --
417 # the offset table will tell us for sure.
418 # * If we need the verification key, we have to consult the offset
420 # At this point, we don't know which we are. Our filenode can
421 # tell us, but it might be lying -- in some cases, we're
422 # responsible for telling it which kind of file it is.
423 self._read_size = 4000
424 if mode == MODE_CHECK:
425 # we use unpack_prefix_and_signature, so we need 1k
426 self._read_size = 1000
427 self._need_privkey = False
429 if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey():
430 self._need_privkey = True
431 # check+repair: repair requires the privkey, so if we didn't happen
432 # to ask for it during the check, we'll have problems doing the
435 self.fetch_update_data = False
436 if mode == MODE_WRITE and update_range:
437 # We're updating the servermap in preparation for an
438 # in-place file update, so we need to fetch some additional
439 # data from each share that we find.
440 assert len(update_range) == 2
442 self.start_segment = update_range[0]
443 self.end_segment = update_range[1]
444 self.fetch_update_data = True
446 prefix = si_b2a(self._storage_index)[:5]
447 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
448 si=prefix, mode=mode)
450 def get_status(self):
453 def log(self, *args, **kwargs):
454 if "parent" not in kwargs:
455 kwargs["parent"] = self._log_number
456 if "facility" not in kwargs:
457 kwargs["facility"] = "tahoe.mutable.mapupdate"
458 return log.msg(*args, **kwargs)
461 """Update the servermap to reflect current conditions. Returns a
462 Deferred that fires with the servermap once the update has finished."""
463 self._started = time.time()
464 self._status.set_active(True)
466 # self._valid_versions is a set of validated verinfo tuples. We just
467 # use it to remember which versions had valid signatures, so we can
468 # avoid re-checking the signatures for each share.
469 self._valid_versions = set()
471 self._done_deferred = defer.Deferred()
473 # first, which servers should be talk to? Any that were in our old
474 # servermap, plus "enough" others.
476 self._queries_completed = 0
478 sb = self._storage_broker
479 # All of the servers, permuted by the storage index, as usual.
480 full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
481 self.full_serverlist = full_serverlist # for use later, immutable
482 self.extra_servers = full_serverlist[:] # servers are removed as we use them
483 self._good_servers = set() # servers who had some shares
484 self._empty_servers = set() # servers who don't have any shares
485 self._bad_servers = set() # servers to whom our queries failed
487 k = self._node.get_required_shares()
488 # For what cases can these conditions work?
492 N = self._node.get_total_shares()
496 # we want to send queries to at least this many servers (although we
497 # might not wait for all of their answers to come back)
498 self.num_servers_to_query = k + self.EPSILON
500 if self.mode in (MODE_CHECK, MODE_REPAIR):
501 # We want to query all of the servers.
502 initial_servers_to_query = list(full_serverlist)
503 must_query = set(initial_servers_to_query)
504 self.extra_servers = []
505 elif self.mode == MODE_WRITE:
506 # we're planning to replace all the shares, so we want a good
507 # chance of finding them all. We will keep searching until we've
508 # seen epsilon that don't have a share.
509 # We don't query all of the servers because that could take a while.
510 self.num_servers_to_query = N + self.EPSILON
511 initial_servers_to_query, must_query = self._build_initial_querylist()
512 self.required_num_empty_servers = self.EPSILON
514 # TODO: arrange to read lots of data from k-ish servers, to avoid
515 # the extra round trip required to read large directories. This
516 # might also avoid the round trip required to read the encrypted
519 else: # MODE_READ, MODE_ANYTHING
520 # 2*k servers is good enough.
521 initial_servers_to_query, must_query = self._build_initial_querylist()
523 # this is a set of servers that we are required to get responses
524 # from: they are servers who used to have a share, so we need to know
525 # where they currently stand, even if that means we have to wait for
526 # a silently-lost TCP connection to time out. We remove servers from
527 # this set as we get responses.
528 self._must_query = set(must_query)
530 # now initial_servers_to_query contains the servers that we should
531 # ask, self.must_query contains the servers that we must have heard
532 # from before we can consider ourselves finished, and
533 # self.extra_servers contains the overflow (servers that we should
534 # tap if we don't get enough responses)
535 # I guess that self._must_query is a subset of
536 # initial_servers_to_query?
537 assert must_query.issubset(initial_servers_to_query)
539 self._send_initial_requests(initial_servers_to_query)
540 self._status.timings["initial_queries"] = time.time() - self._started
541 return self._done_deferred
543 def _build_initial_querylist(self):
544 # we send queries to everyone who was already in the sharemap
545 initial_servers_to_query = set(self._servermap.all_servers())
546 # and we must wait for responses from them
547 must_query = set(initial_servers_to_query)
549 while ((self.num_servers_to_query > len(initial_servers_to_query))
550 and self.extra_servers):
551 initial_servers_to_query.add(self.extra_servers.pop(0))
553 return initial_servers_to_query, must_query
555 def _send_initial_requests(self, serverlist):
556 self._status.set_status("Sending %d initial queries" % len(serverlist))
557 self._queries_outstanding = set()
558 for server in serverlist:
559 self._queries_outstanding.add(server)
560 self._do_query(server, self._storage_index, self._read_size)
563 # there is nobody to ask, so we need to short-circuit the state
565 d = defer.maybeDeferred(self._check_for_done, None)
566 d.addErrback(self._fatal_error)
568 # control flow beyond this point: state machine. Receiving responses
569 # from queries is the input. We might send out more queries, or we
570 # might produce a result.
573 def _do_query(self, server, storage_index, readsize):
574 self.log(format="sending query to [%(name)s], readsize=%(readsize)d",
575 name=server.get_name(),
578 started = time.time()
579 self._queries_outstanding.add(server)
580 d = self._do_read(server, storage_index, [], [(0, readsize)])
581 d.addCallback(self._got_results, server, readsize, storage_index,
583 d.addErrback(self._query_failed, server)
584 # errors that aren't handled by _query_failed (and errors caused by
585 # _query_failed) get logged, but we still want to check for doneness.
586 d.addErrback(log.err)
587 d.addErrback(self._fatal_error)
588 d.addCallback(self._check_for_done)
591 def _do_read(self, server, storage_index, shnums, readv):
592 ss = server.get_rref()
594 # send an add-lease message in parallel. The results are handled
595 # separately. This is sent before the slot_readv() so that we can
596 # be sure the add_lease is retired by the time slot_readv comes
597 # back (this relies upon our knowledge that the server code for
598 # add_lease is synchronous).
599 renew_secret = self._node.get_renewal_secret(server)
600 cancel_secret = self._node.get_cancel_secret(server)
601 d2 = ss.callRemote("add_lease", storage_index,
602 renew_secret, cancel_secret)
604 d2.addErrback(self._add_lease_failed, server, storage_index)
605 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
609 def _got_corrupt_share(self, e, shnum, server, data, lp):
611 I am called when a remote server returns a corrupt share in
612 response to one of our queries. By corrupt, I mean a share
613 without a valid signature. I then record the failure, notify the
614 server of the corruption, and record the share as bad.
616 f = failure.Failure(e)
617 self.log(format="bad share: %(f_value)s", f_value=str(f),
618 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
619 # Notify the server that its share is corrupt.
620 self.notify_server_corruption(server, shnum, str(e))
621 # By flagging this as a bad server, we won't count any of
622 # the other shares on that server as valid, though if we
623 # happen to find a valid version string amongst those
624 # shares, we'll keep track of it so that we don't need
625 # to validate the signature on those again.
626 self._bad_servers.add(server)
627 self._last_failure = f
628 # XXX: Use the reader for this?
629 checkstring = data[:SIGNED_PREFIX_LENGTH]
630 self._servermap.mark_bad_share(server, shnum, checkstring)
631 self._servermap.add_problem(f)
634 def _cache_good_sharedata(self, verinfo, shnum, now, data):
636 If one of my queries returns successfully (which means that we
637 were able to and successfully did validate the signature), I
638 cache the data that we initially fetched from the storage
639 server. This will help reduce the number of roundtrips that need
640 to occur when the file is downloaded, or when the file is
644 self._node._add_to_cache(verinfo, shnum, 0, data)
647 def _got_results(self, datavs, server, readsize, storage_index, started):
648 lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
649 name=server.get_name(),
650 numshares=len(datavs))
651 ss = server.get_rref()
653 elapsed = now - started
654 def _done_processing(ignored=None):
655 self._queries_outstanding.discard(server)
656 self._servermap.mark_server_reachable(server)
657 self._must_query.discard(server)
658 self._queries_completed += 1
659 if not self._running:
660 self.log("but we're not running, so we'll ignore it", parent=lp)
662 self._status.add_per_server_time(server, "late", started, elapsed)
664 self._status.add_per_server_time(server, "query", started, elapsed)
667 self._good_servers.add(server)
669 self._empty_servers.add(server)
673 for shnum,datav in datavs.items():
675 reader = MDMFSlotReadProxy(ss,
679 # our goal, with each response, is to validate the version
680 # information and share data as best we can at this point --
681 # we do this by validating the signature. To do this, we
682 # need to do the following:
683 # - If we don't already have the public key, fetch the
684 # public key. We use this to validate the signature.
685 if not self._node.get_pubkey():
686 # fetch and set the public key.
687 d = reader.get_verification_key()
688 d.addCallback(lambda results, shnum=shnum:
689 self._try_to_set_pubkey(results, server, shnum, lp))
690 # XXX: Make self._pubkey_query_failed?
691 d.addErrback(lambda error, shnum=shnum, data=data:
692 self._got_corrupt_share(error, shnum, server, data, lp))
694 # we already have the public key.
695 d = defer.succeed(None)
697 # Neither of these two branches return anything of
698 # consequence, so the first entry in our deferredlist will
701 # - Next, we need the version information. We almost
702 # certainly got this by reading the first thousand or so
703 # bytes of the share on the storage server, so we
704 # shouldn't need to fetch anything at this step.
705 d2 = reader.get_verinfo()
706 d2.addErrback(lambda error, shnum=shnum, data=data:
707 self._got_corrupt_share(error, shnum, server, data, lp))
708 # - Next, we need the signature. For an SDMF share, it is
709 # likely that we fetched this when doing our initial fetch
710 # to get the version information. In MDMF, this lives at
711 # the end of the share, so unless the file is quite small,
712 # we'll need to do a remote fetch to get it.
713 d3 = reader.get_signature()
714 d3.addErrback(lambda error, shnum=shnum, data=data:
715 self._got_corrupt_share(error, shnum, server, data, lp))
716 # Once we have all three of these responses, we can move on
717 # to validating the signature
719 # Does the node already have a privkey? If not, we'll try to
721 if self._need_privkey:
722 d4 = reader.get_encprivkey()
723 d4.addCallback(lambda results, shnum=shnum:
724 self._try_to_validate_privkey(results, server, shnum, lp))
725 d4.addErrback(lambda error, shnum=shnum:
726 self._privkey_query_failed(error, server, shnum, lp))
728 d4 = defer.succeed(None)
731 if self.fetch_update_data:
732 # fetch the block hash tree and first + last segment, as
733 # configured earlier.
734 # Then set them in wherever we happen to want to set
737 # XXX: We do this above, too. Is there a good way to
738 # make the two routines share the value without
739 # introducing more roundtrips?
740 ds.append(reader.get_verinfo())
741 ds.append(reader.get_blockhashes())
742 ds.append(reader.get_block_and_salt(self.start_segment))
743 ds.append(reader.get_block_and_salt(self.end_segment))
744 d5 = deferredutil.gatherResults(ds)
745 d5.addCallback(self._got_update_results_one_share, shnum)
747 d5 = defer.succeed(None)
749 dl = defer.DeferredList([d, d2, d3, d4, d5])
750 dl.addBoth(self._turn_barrier)
751 dl.addCallback(lambda results, shnum=shnum:
752 self._got_signature_one_share(results, shnum, server, lp))
753 dl.addErrback(lambda error, shnum=shnum, data=data:
754 self._got_corrupt_share(error, shnum, server, data, lp))
755 dl.addCallback(lambda verinfo, shnum=shnum, data=data:
756 self._cache_good_sharedata(verinfo, shnum, now, data))
758 # dl is a deferred list that will fire when all of the shares
759 # that we found on this server are done processing. When dl fires,
760 # we know that processing is done, so we can decrement the
761 # semaphore-like thing that we incremented earlier.
762 dl = defer.DeferredList(ds, fireOnOneErrback=True)
763 # Are we done? Done means that there are no more queries to
764 # send, that there are no outstanding queries, and that we
765 # haven't received any queries that are still processing. If we
766 # are done, self._check_for_done will cause the done deferred
767 # that we returned to our caller to fire, which tells them that
768 # they have a complete servermap, and that we won't be touching
769 # the servermap anymore.
770 dl.addCallback(_done_processing)
771 dl.addCallback(self._check_for_done)
772 dl.addErrback(self._fatal_error)
774 self.log("_got_results done", parent=lp, level=log.NOISY)
778 def _turn_barrier(self, result):
780 I help the servermap updater avoid the recursion limit issues
783 return fireEventually(result)
786 def _try_to_set_pubkey(self, pubkey_s, server, shnum, lp):
787 if self._node.get_pubkey():
788 return # don't go through this again if we don't have to
789 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
790 assert len(fingerprint) == 32
791 if fingerprint != self._node.get_fingerprint():
792 raise CorruptShareError(server, shnum,
793 "pubkey doesn't match fingerprint")
794 self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
795 assert self._node.get_pubkey()
798 def notify_server_corruption(self, server, shnum, reason):
799 rref = server.get_rref()
800 rref.callRemoteOnly("advise_corrupt_share",
801 "mutable", self._storage_index, shnum, reason)
804 def _got_signature_one_share(self, results, shnum, server, lp):
805 # It is our job to give versioninfo to our caller. We need to
806 # raise CorruptShareError if the share is corrupt for any
807 # reason, something that our caller will handle.
808 self.log(format="_got_results: got shnum #%(shnum)d from serverid %(name)s",
810 name=server.get_name(),
813 if not self._running:
814 # We can't process the results, since we can't touch the
816 self.log("but we're not running anymore.")
819 _, verinfo, signature, __, ___ = results
828 offsets) = verinfo[1]
829 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
831 # XXX: This should be done for us in the method, so
832 # presumably you can go in there and fix it.
842 # This tuple uniquely identifies a share on the grid; we use it
843 # to keep track of the ones that we've already seen.
845 if verinfo not in self._valid_versions:
846 # This is a new version tuple, and we need to validate it
847 # against the public key before keeping track of it.
848 assert self._node.get_pubkey()
849 valid = self._node.get_pubkey().verify(prefix, signature[1])
851 raise CorruptShareError(server, shnum,
852 "signature is invalid")
854 # ok, it's a valid verinfo. Add it to the list of validated
856 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
857 % (seqnum, base32.b2a(root_hash)[:4],
858 server.get_name(), shnum,
859 k, n, segsize, datalen),
861 self._valid_versions.add(verinfo)
862 # We now know that this is a valid candidate verinfo. Whether or
863 # not this instance of it is valid is a matter for the next
864 # statement; at this point, we just know that if we see this
865 # version info again, that its signature checks out and that
866 # we're okay to skip the signature-checking step.
868 # (server, shnum) are bound in the method invocation.
869 if (server, shnum) in self._servermap.get_bad_shares():
870 # we've been told that the rest of the data in this share is
871 # unusable, so don't add it to the servermap.
872 self.log("but we've been told this is a bad share",
873 parent=lp, level=log.UNUSUAL)
876 # Add the info to our servermap.
877 timestamp = time.time()
878 self._servermap.add_new_share(server, shnum, verinfo, timestamp)
883 def _got_update_results_one_share(self, results, share):
885 I record the update results in results.
887 assert len(results) == 4
888 verinfo, blockhashes, start, end = results
898 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
900 # XXX: This should be done for us in the method, so
901 # presumably you can go in there and fix it.
912 update_data = (blockhashes, start, end)
913 self._servermap.set_update_data_for_share_and_verinfo(share,
918 def _deserialize_pubkey(self, pubkey_s):
919 verifier = rsa.create_verifying_key_from_string(pubkey_s)
923 def _try_to_validate_privkey(self, enc_privkey, server, shnum, lp):
925 Given a writekey from a remote server, I validate it against the
926 writekey stored in my node. If it is valid, then I set the
927 privkey and encprivkey properties of the node.
929 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
930 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
931 if alleged_writekey != self._node.get_writekey():
932 self.log("invalid privkey from %s shnum %d" %
933 (server.get_name(), shnum),
934 parent=lp, level=log.WEIRD, umid="aJVccw")
938 self.log("got valid privkey from shnum %d on serverid %s" %
939 (shnum, server.get_name()),
941 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
942 self._node._populate_encprivkey(enc_privkey)
943 self._node._populate_privkey(privkey)
944 self._need_privkey = False
945 self._status.set_privkey_from(server)
948 def _add_lease_failed(self, f, server, storage_index):
949 # Older versions of Tahoe didn't handle the add-lease message very
950 # well: <=1.1.0 throws a NameError because it doesn't implement
951 # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
952 # (which is most of them, since we send add-lease to everybody,
953 # before we know whether or not they have any shares for us), and
954 # 1.2.0 throws KeyError even on known buckets due to an internal bug
955 # in the latency-measuring code.
957 # we want to ignore the known-harmless errors and log the others. In
958 # particular we want to log any local errors caused by coding
961 if f.check(DeadReferenceError):
963 if f.check(RemoteException):
964 if f.value.failure.check(KeyError, IndexError, NameError):
965 # this may ignore a bit too much, but that only hurts us
968 self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
969 name=server.get_name(),
970 f_value=str(f.value),
972 level=log.WEIRD, umid="iqg3mw")
974 # local errors are cause for alarm
976 format="local error in add_lease to [%(name)s]: %(f_value)s",
977 name=server.get_name(),
978 f_value=str(f.value),
979 level=log.WEIRD, umid="ZWh6HA")
981 def _query_failed(self, f, server):
982 if not self._running:
985 if f.check(DeadReferenceError):
987 self.log(format="error during query: %(f_value)s",
988 f_value=str(f.value), failure=f,
989 level=level, umid="IHXuQg")
990 self._must_query.discard(server)
991 self._queries_outstanding.discard(server)
992 self._bad_servers.add(server)
993 self._servermap.add_problem(f)
994 # a server could be in both ServerMap.reachable_servers and
995 # .unreachable_servers if they responded to our query, but then an
996 # exception was raised in _got_results.
997 self._servermap.mark_server_unreachable(server)
998 self._queries_completed += 1
999 self._last_failure = f
1002 def _privkey_query_failed(self, f, server, shnum, lp):
1003 self._queries_outstanding.discard(server)
1004 if not self._running:
1007 if f.check(DeadReferenceError):
1009 self.log(format="error during privkey query: %(f_value)s",
1010 f_value=str(f.value), failure=f,
1011 parent=lp, level=level, umid="McoJ5w")
1012 self._servermap.add_problem(f)
1013 self._last_failure = f
1016 def _check_for_done(self, res):
1018 # return self._send_more_queries(outstanding) : send some more queries
1019 # return self._done() : all done
1020 # return : keep waiting, no new queries
1021 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
1022 "%(outstanding)d queries outstanding, "
1023 "%(extra)d extra servers available, "
1024 "%(must)d 'must query' servers left, "
1025 "need_privkey=%(need_privkey)s"
1028 outstanding=len(self._queries_outstanding),
1029 extra=len(self.extra_servers),
1030 must=len(self._must_query),
1031 need_privkey=self._need_privkey,
1035 if not self._running:
1036 self.log("but we're not running", parent=lp, level=log.NOISY)
1039 if self._must_query:
1040 # we are still waiting for responses from servers that used to have
1041 # a share, so we must continue to wait. No additional queries are
1042 # required at this time.
1043 self.log("%d 'must query' servers left" % len(self._must_query),
1044 level=log.NOISY, parent=lp)
1047 if (not self._queries_outstanding and not self.extra_servers):
1048 # all queries have retired, and we have no servers left to ask. No
1049 # more progress can be made, therefore we are done.
1050 self.log("all queries are retired, no extra servers: done",
1054 recoverable_versions = self._servermap.recoverable_versions()
1055 unrecoverable_versions = self._servermap.unrecoverable_versions()
1057 # what is our completion policy? how hard should we work?
1059 if self.mode == MODE_ANYTHING:
1060 if recoverable_versions:
1061 self.log("%d recoverable versions: done"
1062 % len(recoverable_versions),
1066 if self.mode in (MODE_CHECK, MODE_REPAIR):
1067 # we used self._must_query, and we know there aren't any
1068 # responses still waiting, so that means we must be done
1069 self.log("done", parent=lp)
1073 if self.mode == MODE_READ:
1074 # if we've queried k+epsilon servers, and we see a recoverable
1075 # version, and we haven't seen any unrecoverable higher-seqnum'ed
1076 # versions, then we're done.
1078 if self._queries_completed < self.num_servers_to_query:
1079 self.log(format="%(completed)d completed, %(query)d to query: need more",
1080 completed=self._queries_completed,
1081 query=self.num_servers_to_query,
1082 level=log.NOISY, parent=lp)
1083 return self._send_more_queries(MAX_IN_FLIGHT)
1084 if not recoverable_versions:
1085 self.log("no recoverable versions: need more",
1086 level=log.NOISY, parent=lp)
1087 return self._send_more_queries(MAX_IN_FLIGHT)
1088 highest_recoverable = max(recoverable_versions)
1089 highest_recoverable_seqnum = highest_recoverable[0]
1090 for unrec_verinfo in unrecoverable_versions:
1091 if unrec_verinfo[0] > highest_recoverable_seqnum:
1092 # there is evidence of a higher-seqnum version, but we
1093 # don't yet see enough shares to recover it. Try harder.
1094 # TODO: consider sending more queries.
1095 # TODO: consider limiting the search distance
1096 self.log("evidence of higher seqnum: need more",
1097 level=log.UNUSUAL, parent=lp)
1098 return self._send_more_queries(MAX_IN_FLIGHT)
1099 # all the unrecoverable versions were old or concurrent with a
1100 # recoverable version. Good enough.
1101 self.log("no higher-seqnum: done", parent=lp)
1104 if self.mode == MODE_WRITE:
1105 # we want to keep querying until we've seen a few that don't have
1106 # any shares, to be sufficiently confident that we've seen all
1107 # the shares. This is still less work than MODE_CHECK, which asks
1108 # every server in the world.
1110 if not recoverable_versions:
1111 self.log("no recoverable versions: need more", parent=lp,
1113 return self._send_more_queries(MAX_IN_FLIGHT)
1116 last_not_responded = -1
1117 num_not_responded = 0
1120 found_boundary = False
1122 for i,server in enumerate(self.full_serverlist):
1123 if server in self._bad_servers:
1126 #self.log("loop [%s]: x" % server.get_name()
1127 elif server in self._empty_servers:
1130 #self.log("loop [%s]: 0" % server.get_name()
1131 if last_found != -1:
1133 if num_not_found >= self.EPSILON:
1134 self.log("found our boundary, %s" %
1136 parent=lp, level=log.NOISY)
1137 found_boundary = True
1140 elif server in self._good_servers:
1143 #self.log("loop [%s]: 1" % server.get_name()
1149 #self.log("loop [%s]: ?" % server.get_name()
1150 last_not_responded = i
1151 num_not_responded += 1
1154 # we need to know that we've gotten answers from
1155 # everybody to the left of here
1156 if last_not_responded == -1:
1158 self.log("have all our answers",
1159 parent=lp, level=log.NOISY)
1160 # .. unless we're still waiting on the privkey
1161 if self._need_privkey:
1162 self.log("but we're still waiting for the privkey",
1163 parent=lp, level=log.NOISY)
1164 # if we found the boundary but we haven't yet found
1165 # the privkey, we may need to look further. If
1166 # somehow all the privkeys were corrupted (but the
1167 # shares were readable), then this is likely to do an
1168 # exhaustive search.
1169 return self._send_more_queries(MAX_IN_FLIGHT)
1171 # still waiting for somebody
1172 return self._send_more_queries(num_not_responded)
1174 # if we hit here, we didn't find our boundary, so we're still
1175 # waiting for servers
1176 self.log("no boundary yet, %s" % "".join(states), parent=lp,
1178 return self._send_more_queries(MAX_IN_FLIGHT)
1180 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
1181 # arbitrary, really I want this to be something like k -
1182 # max(known_version_sharecounts) + some extra
1183 self.log("catchall: need more", parent=lp, level=log.NOISY)
1184 return self._send_more_queries(MAX_IN_FLIGHT)
1186 def _send_more_queries(self, num_outstanding):
1190 self.log(format=" there are %(outstanding)d queries outstanding",
1191 outstanding=len(self._queries_outstanding),
1193 active_queries = len(self._queries_outstanding) + len(more_queries)
1194 if active_queries >= num_outstanding:
1196 if not self.extra_servers:
1198 more_queries.append(self.extra_servers.pop(0))
1200 self.log(format="sending %(more)d more queries: %(who)s",
1201 more=len(more_queries),
1202 who=" ".join(["[%s]" % s.get_name() for s in more_queries]),
1205 for server in more_queries:
1206 self._do_query(server, self._storage_index, self._read_size)
1207 # we'll retrigger when those queries come back
1210 if not self._running:
1211 self.log("not running; we're already done")
1213 self._running = False
1215 elapsed = now - self._started
1216 self._status.set_finished(now)
1217 self._status.timings["total"] = elapsed
1218 self._status.set_progress(1.0)
1219 self._status.set_status("Finished")
1220 self._status.set_active(False)
1222 self._servermap.set_last_update(self.mode, self._started)
1223 # the servermap will not be touched after this
1224 self.log("servermap: %s" % self._servermap.summarize_versions())
1226 eventually(self._done_deferred.callback, self._servermap)
1228 def _fatal_error(self, f):
1229 self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1230 self._done_deferred.errback(f)