3 from zope.interface import implements
4 from itertools import count
5 from twisted.internet import defer
6 from twisted.python import failure
7 from foolscap.api import DeadReferenceError, RemoteException, eventually, \
9 from allmydata.util import base32, hashutil, log, deferredutil
10 from allmydata.util.dictutil import DictOfSets
11 from allmydata.storage.server import si_b2a
12 from allmydata.interfaces import IServermapUpdaterStatus
13 from pycryptopp.publickey import rsa
15 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \
16 MODE_READ, MODE_REPAIR, CorruptShareError
17 from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
20 implements(IServermapUpdaterStatus)
21 statusid_counter = count(0)
24 self.timings["per_server"] = {}
25 self.timings["cumulative_verify"] = 0.0
26 self.privkey_from = None
29 self.storage_index = None
31 self.status = "Not started"
33 self.counter = self.statusid_counter.next()
34 self.started = time.time()
37 def add_per_server_time(self, server, op, sent, elapsed):
38 assert op in ("query", "late", "privkey")
39 if server not in self.timings["per_server"]:
40 self.timings["per_server"][server] = []
41 self.timings["per_server"][server].append((op,sent,elapsed))
43 def get_started(self):
45 def get_finished(self):
47 def get_storage_index(self):
48 return self.storage_index
51 def get_servermap(self):
53 def get_privkey_from(self):
54 return self.privkey_from
55 def using_helper(self):
61 def get_progress(self):
65 def get_counter(self):
68 def set_storage_index(self, si):
69 self.storage_index = si
70 def set_mode(self, mode):
72 def set_privkey_from(self, server):
73 self.privkey_from = server
74 def set_status(self, status):
76 def set_progress(self, value):
78 def set_active(self, value):
80 def set_finished(self, when):
84 """I record the placement of mutable shares.
86 This object records which shares (of various versions) are located on
89 One purpose I serve is to inform callers about which versions of the
90 mutable file are recoverable and 'current'.
92 A second purpose is to serve as a state marker for test-and-set
93 operations. I am passed out of retrieval operations and back into publish
94 operations, which means 'publish this new version, but only if nothing
95 has changed since I last retrieved this data'. This reduces the chances
96 of clobbering a simultaneous (uncoordinated) write.
98 @var _known_shares: a dictionary, mapping a (server, shnum) tuple to a
99 (versionid, timestamp) tuple. Each 'versionid' is a
100 tuple of (seqnum, root_hash, IV, segsize, datalength,
101 k, N, signed_prefix, offsets)
103 @ivar _bad_shares: dict with keys of (server, shnum) tuples, describing
104 shares that I should ignore (because a previous user
105 of the servermap determined that they were invalid).
106 The updater only locates a certain number of shares:
107 if some of these turn out to have integrity problems
108 and are unusable, the caller will need to mark those
109 shares as bad, then re-update the servermap, then try
110 again. The dict maps (server, shnum) tuple to old
115 self._known_shares = {}
116 self.unreachable_servers = set() # servers that didn't respond to queries
117 self.reachable_servers = set() # servers that did respond to queries
118 self._problems = [] # mostly for debugging
119 self._bad_shares = {} # maps (server,shnum) to old checkstring
120 self._last_update_mode = None
121 self._last_update_time = 0
123 self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
124 # where blockhashes is a list of bytestrings (the result of
125 # layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
126 # (block,salt) tuple-of-bytestrings from get_block_and_salt()
130 s._known_shares = self._known_shares.copy() # tuple->tuple
131 s.unreachable_servers = set(self.unreachable_servers)
132 s.reachable_servers = set(self.reachable_servers)
133 s._problems = self._problems[:]
134 s._bad_shares = self._bad_shares.copy() # tuple->str
135 s._last_update_mode = self._last_update_mode
136 s._last_update_time = self._last_update_time
139 def get_reachable_servers(self):
140 return self.reachable_servers
142 def mark_server_reachable(self, server):
143 self.reachable_servers.add(server)
145 def mark_server_unreachable(self, server):
146 self.unreachable_servers.add(server)
148 def mark_bad_share(self, server, shnum, checkstring):
149 """This share was found to be bad, either in the checkstring or
150 signature (detected during mapupdate), or deeper in the share
151 (detected at retrieve time). Remove it from our list of useful
152 shares, and remember that it is bad so we don't add it back again
153 later. We record the share's old checkstring (which might be
154 corrupted or badly signed) so that a repair operation can do the
155 test-and-set using it as a reference.
157 key = (server, shnum) # record checkstring
158 self._bad_shares[key] = checkstring
159 self._known_shares.pop(key, None)
161 def get_bad_shares(self):
162 # key=(server,shnum) -> checkstring
163 return self._bad_shares
165 def add_new_share(self, server, shnum, verinfo, timestamp):
166 """We've written a new share out, replacing any that was there
168 key = (server, shnum)
169 self._bad_shares.pop(key, None)
170 self._known_shares[key] = (verinfo, timestamp)
172 def add_problem(self, f):
173 self._problems.append(f)
174 def get_problems(self):
175 return self._problems
177 def set_last_update(self, mode, when):
178 self._last_update_mode = mode
179 self._last_update_time = when
180 def get_last_update(self):
181 return (self._last_update_mode, self._last_update_time)
183 def dump(self, out=sys.stdout):
184 print >>out, "servermap:"
186 for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
187 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
188 offsets_tuple) = verinfo
189 print >>out, ("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
190 (server.get_name(), shnum,
191 seqnum, base32.b2a(root_hash)[:4], k, N,
194 print >>out, "%d PROBLEMS" % len(self._problems)
195 for f in self._problems:
199 def all_servers(self):
200 return set([server for (server, shnum) in self._known_shares])
202 def all_servers_for_version(self, verinfo):
203 """Return a set of servers that hold shares for the given version."""
205 for ( (server, shnum), (verinfo2, timestamp) )
206 in self._known_shares.items()
207 if verinfo == verinfo2])
209 def get_known_shares(self):
210 # maps (server,shnum) to (versionid,timestamp)
211 return self._known_shares
213 def make_sharemap(self):
214 """Return a dict that maps shnum to a set of servers that hold it."""
215 sharemap = DictOfSets()
216 for (server, shnum) in self._known_shares:
217 sharemap.add(shnum, server)
220 def make_versionmap(self):
221 """Return a dict that maps versionid to sets of (shnum, server,
222 timestamp) tuples."""
223 versionmap = DictOfSets()
224 for ( (server, shnum), (verinfo, timestamp) ) in self._known_shares.items():
225 versionmap.add(verinfo, (shnum, server, timestamp))
228 def debug_shares_on_server(self, server): # used by tests
229 return set([shnum for (s, shnum) in self._known_shares if s == server])
231 def version_on_server(self, server, shnum):
232 key = (server, shnum)
233 if key in self._known_shares:
234 (verinfo, timestamp) = self._known_shares[key]
238 def shares_available(self):
239 """Return a dict that maps verinfo to tuples of
240 (num_distinct_shares, k, N) tuples."""
241 versionmap = self.make_versionmap()
243 for verinfo, shares in versionmap.items():
245 for (shnum, server, timestamp) in shares:
247 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
248 offsets_tuple) = verinfo
249 all_shares[verinfo] = (len(s), k, N)
252 def highest_seqnum(self):
253 available = self.shares_available()
254 seqnums = [verinfo[0]
255 for verinfo in available.keys()]
259 def summarize_version(self, verinfo):
260 """Take a versionid, return a string that describes it."""
261 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
262 offsets_tuple) = verinfo
263 return "seq%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
265 def summarize_versions(self):
266 """Return a string describing which versions we know about."""
267 versionmap = self.make_versionmap()
269 for (verinfo, shares) in versionmap.items():
270 vstr = self.summarize_version(verinfo)
271 shnums = set([shnum for (shnum, server, timestamp) in shares])
272 bits.append("%d*%s" % (len(shnums), vstr))
273 return "/".join(bits)
275 def recoverable_versions(self):
276 """Return a set of versionids, one for each version that is currently
278 versionmap = self.make_versionmap()
279 recoverable_versions = set()
280 for (verinfo, shares) in versionmap.items():
281 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
282 offsets_tuple) = verinfo
283 shnums = set([shnum for (shnum, server, timestamp) in shares])
285 # this one is recoverable
286 recoverable_versions.add(verinfo)
288 return recoverable_versions
290 def unrecoverable_versions(self):
291 """Return a set of versionids, one for each version that is currently
293 versionmap = self.make_versionmap()
295 unrecoverable_versions = set()
296 for (verinfo, shares) in versionmap.items():
297 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
298 offsets_tuple) = verinfo
299 shnums = set([shnum for (shnum, server, timestamp) in shares])
301 unrecoverable_versions.add(verinfo)
303 return unrecoverable_versions
305 def best_recoverable_version(self):
306 """Return a single versionid, for the so-called 'best' recoverable
307 version. Sequence number is the primary sort criteria, followed by
308 root hash. Returns None if there are no recoverable versions."""
309 recoverable = list(self.recoverable_versions())
312 return recoverable[-1]
315 def size_of_version(self, verinfo):
316 """Given a versionid (perhaps returned by best_recoverable_version),
317 return the size of the file in bytes."""
318 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
319 offsets_tuple) = verinfo
322 def unrecoverable_newer_versions(self):
323 # Return a dict of versionid -> health, for versions that are
324 # unrecoverable and have later seqnums than any recoverable versions.
325 # These indicate that a write will lose data.
326 versionmap = self.make_versionmap()
327 healths = {} # maps verinfo to (found,k)
328 unrecoverable = set()
329 highest_recoverable_seqnum = -1
330 for (verinfo, shares) in versionmap.items():
331 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
332 offsets_tuple) = verinfo
333 shnums = set([shnum for (shnum, server, timestamp) in shares])
334 healths[verinfo] = (len(shnums),k)
336 unrecoverable.add(verinfo)
338 highest_recoverable_seqnum = max(seqnum,
339 highest_recoverable_seqnum)
342 for verinfo in unrecoverable:
343 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
344 offsets_tuple) = verinfo
345 if seqnum > highest_recoverable_seqnum:
346 newversions[verinfo] = healths[verinfo]
351 def needs_merge(self):
352 # return True if there are multiple recoverable versions with the
353 # same seqnum, meaning that MutableFileNode.read_best_version is not
354 # giving you the whole story, and that using its data to do a
355 # subsequent publish will lose information.
356 recoverable_seqnums = [verinfo[0]
357 for verinfo in self.recoverable_versions()]
358 for seqnum in recoverable_seqnums:
359 if recoverable_seqnums.count(seqnum) > 1:
364 def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
366 I return the update data for the given shnum
368 update_data = self.update_data[shnum]
369 update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
373 def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
375 I record the block hash tree for the given shnum.
377 self.update_data.setdefault(shnum , []).append((verinfo, data))
380 class ServermapUpdater:
381 def __init__(self, filenode, storage_broker, monitor, servermap,
382 mode=MODE_READ, add_lease=False, update_range=None):
383 """I update a servermap, locating a sufficient number of useful
384 shares and remembering where they are located.
388 self._node = filenode
389 self._storage_broker = storage_broker
390 self._monitor = monitor
391 self._servermap = servermap
393 self._add_lease = add_lease
396 self._storage_index = filenode.get_storage_index()
397 self._last_failure = None
399 self._status = UpdateStatus()
400 self._status.set_storage_index(self._storage_index)
401 self._status.set_progress(0.0)
402 self._status.set_mode(mode)
404 self._servers_responded = set()
406 # how much data should we read?
408 # * if we only need the checkstring, then [0:75]
409 # * if we need to validate the checkstring sig, then [543ish:799ish]
410 # * if we need the verification key, then [107:436ish]
411 # * the offset table at [75:107] tells us about the 'ish'
412 # * if we need the encrypted private key, we want [-1216ish:]
413 # * but we can't read from negative offsets
414 # * the offset table tells us the 'ish', also the positive offset
416 # * Checkstring? [0:72]
417 # * If we want to validate the checkstring, then [0:72], [143:?] --
418 # the offset table will tell us for sure.
419 # * If we need the verification key, we have to consult the offset
421 # At this point, we don't know which we are. Our filenode can
422 # tell us, but it might be lying -- in some cases, we're
423 # responsible for telling it which kind of file it is.
424 self._read_size = 4000
425 if mode == MODE_CHECK:
426 # we use unpack_prefix_and_signature, so we need 1k
427 self._read_size = 1000
428 self._need_privkey = False
430 if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey():
431 self._need_privkey = True
432 # check+repair: repair requires the privkey, so if we didn't happen
433 # to ask for it during the check, we'll have problems doing the
436 self.fetch_update_data = False
437 if mode == MODE_WRITE and update_range:
438 # We're updating the servermap in preparation for an
439 # in-place file update, so we need to fetch some additional
440 # data from each share that we find.
441 assert len(update_range) == 2
443 self.start_segment = update_range[0]
444 self.end_segment = update_range[1]
445 self.fetch_update_data = True
447 prefix = si_b2a(self._storage_index)[:5]
448 self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
449 si=prefix, mode=mode)
451 def get_status(self):
454 def log(self, *args, **kwargs):
455 if "parent" not in kwargs:
456 kwargs["parent"] = self._log_number
457 if "facility" not in kwargs:
458 kwargs["facility"] = "tahoe.mutable.mapupdate"
459 return log.msg(*args, **kwargs)
462 """Update the servermap to reflect current conditions. Returns a
463 Deferred that fires with the servermap once the update has finished."""
464 self._started = time.time()
465 self._status.set_active(True)
467 # self._valid_versions is a set of validated verinfo tuples. We just
468 # use it to remember which versions had valid signatures, so we can
469 # avoid re-checking the signatures for each share.
470 self._valid_versions = set()
472 self._done_deferred = defer.Deferred()
474 # first, which servers should be talk to? Any that were in our old
475 # servermap, plus "enough" others.
477 self._queries_completed = 0
479 sb = self._storage_broker
480 # All of the servers, permuted by the storage index, as usual.
481 full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
482 self.full_serverlist = full_serverlist # for use later, immutable
483 self.extra_servers = full_serverlist[:] # servers are removed as we use them
484 self._good_servers = set() # servers who had some shares
485 self._empty_servers = set() # servers who don't have any shares
486 self._bad_servers = set() # servers to whom our queries failed
488 k = self._node.get_required_shares()
489 # For what cases can these conditions work?
493 N = self._node.get_total_shares()
497 # we want to send queries to at least this many servers (although we
498 # might not wait for all of their answers to come back)
499 self.num_servers_to_query = k + self.EPSILON
501 if self.mode in (MODE_CHECK, MODE_REPAIR):
502 # We want to query all of the servers.
503 initial_servers_to_query = list(full_serverlist)
504 must_query = set(initial_servers_to_query)
505 self.extra_servers = []
506 elif self.mode == MODE_WRITE:
507 # we're planning to replace all the shares, so we want a good
508 # chance of finding them all. We will keep searching until we've
509 # seen epsilon that don't have a share.
510 # We don't query all of the servers because that could take a while.
511 self.num_servers_to_query = N + self.EPSILON
512 initial_servers_to_query, must_query = self._build_initial_querylist()
513 self.required_num_empty_servers = self.EPSILON
515 # TODO: arrange to read lots of data from k-ish servers, to avoid
516 # the extra round trip required to read large directories. This
517 # might also avoid the round trip required to read the encrypted
520 else: # MODE_READ, MODE_ANYTHING
521 # 2*k servers is good enough.
522 initial_servers_to_query, must_query = self._build_initial_querylist()
524 # this is a set of servers that we are required to get responses
525 # from: they are servers who used to have a share, so we need to know
526 # where they currently stand, even if that means we have to wait for
527 # a silently-lost TCP connection to time out. We remove servers from
528 # this set as we get responses.
529 self._must_query = set(must_query)
531 # now initial_servers_to_query contains the servers that we should
532 # ask, self.must_query contains the servers that we must have heard
533 # from before we can consider ourselves finished, and
534 # self.extra_servers contains the overflow (servers that we should
535 # tap if we don't get enough responses)
536 # I guess that self._must_query is a subset of
537 # initial_servers_to_query?
538 assert must_query.issubset(initial_servers_to_query)
540 self._send_initial_requests(initial_servers_to_query)
541 self._status.timings["initial_queries"] = time.time() - self._started
542 return self._done_deferred
544 def _build_initial_querylist(self):
545 # we send queries to everyone who was already in the sharemap
546 initial_servers_to_query = set(self._servermap.all_servers())
547 # and we must wait for responses from them
548 must_query = set(initial_servers_to_query)
550 while ((self.num_servers_to_query > len(initial_servers_to_query))
551 and self.extra_servers):
552 initial_servers_to_query.add(self.extra_servers.pop(0))
554 return initial_servers_to_query, must_query
556 def _send_initial_requests(self, serverlist):
557 self._status.set_status("Sending %d initial queries" % len(serverlist))
558 self._queries_outstanding = set()
559 for server in serverlist:
560 self._queries_outstanding.add(server)
561 self._do_query(server, self._storage_index, self._read_size)
564 # there is nobody to ask, so we need to short-circuit the state
566 d = defer.maybeDeferred(self._check_for_done, None)
567 d.addErrback(self._fatal_error)
569 # control flow beyond this point: state machine. Receiving responses
570 # from queries is the input. We might send out more queries, or we
571 # might produce a result.
574 def _do_query(self, server, storage_index, readsize):
575 self.log(format="sending query to [%(name)s], readsize=%(readsize)d",
576 name=server.get_name(),
579 started = time.time()
580 self._queries_outstanding.add(server)
581 d = self._do_read(server, storage_index, [], [(0, readsize)])
582 d.addCallback(self._got_results, server, readsize, storage_index,
584 d.addErrback(self._query_failed, server)
585 # errors that aren't handled by _query_failed (and errors caused by
586 # _query_failed) get logged, but we still want to check for doneness.
587 d.addErrback(log.err)
588 d.addErrback(self._fatal_error)
589 d.addCallback(self._check_for_done)
592 def _do_read(self, server, storage_index, shnums, readv):
593 ss = server.get_rref()
595 # send an add-lease message in parallel. The results are handled
596 # separately. This is sent before the slot_readv() so that we can
597 # be sure the add_lease is retired by the time slot_readv comes
598 # back (this relies upon our knowledge that the server code for
599 # add_lease is synchronous).
600 renew_secret = self._node.get_renewal_secret(server)
601 cancel_secret = self._node.get_cancel_secret(server)
602 d2 = ss.callRemote("add_lease", storage_index,
603 renew_secret, cancel_secret)
605 d2.addErrback(self._add_lease_failed, server, storage_index)
606 d = ss.callRemote("slot_readv", storage_index, shnums, readv)
610 def _got_corrupt_share(self, e, shnum, server, data, lp):
612 I am called when a remote server returns a corrupt share in
613 response to one of our queries. By corrupt, I mean a share
614 without a valid signature. I then record the failure, notify the
615 server of the corruption, and record the share as bad.
617 f = failure.Failure(e)
618 self.log(format="bad share: %(f_value)s", f_value=str(f),
619 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
620 # Notify the server that its share is corrupt.
621 self.notify_server_corruption(server, shnum, str(e))
622 # By flagging this as a bad server, we won't count any of
623 # the other shares on that server as valid, though if we
624 # happen to find a valid version string amongst those
625 # shares, we'll keep track of it so that we don't need
626 # to validate the signature on those again.
627 self._bad_servers.add(server)
628 self._last_failure = f
629 # XXX: Use the reader for this?
630 checkstring = data[:SIGNED_PREFIX_LENGTH]
631 self._servermap.mark_bad_share(server, shnum, checkstring)
632 self._servermap.add_problem(f)
635 def _got_results(self, datavs, server, readsize, storage_index, started):
636 lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
637 name=server.get_name(),
638 numshares=len(datavs))
639 ss = server.get_rref()
641 elapsed = now - started
642 def _done_processing(ignored=None):
643 self._queries_outstanding.discard(server)
644 self._servermap.mark_server_reachable(server)
645 self._must_query.discard(server)
646 self._queries_completed += 1
647 if not self._running:
648 self.log("but we're not running, so we'll ignore it", parent=lp)
650 self._status.add_per_server_time(server, "late", started, elapsed)
652 self._status.add_per_server_time(server, "query", started, elapsed)
655 self._good_servers.add(server)
657 self._empty_servers.add(server)
661 for shnum,datav in datavs.items():
663 reader = MDMFSlotReadProxy(ss,
667 data_is_everything=(len(data) < readsize))
669 # our goal, with each response, is to validate the version
670 # information and share data as best we can at this point --
671 # we do this by validating the signature. To do this, we
672 # need to do the following:
673 # - If we don't already have the public key, fetch the
674 # public key. We use this to validate the signature.
675 if not self._node.get_pubkey():
676 # fetch and set the public key.
677 d = reader.get_verification_key()
678 d.addCallback(lambda results, shnum=shnum:
679 self._try_to_set_pubkey(results, server, shnum, lp))
680 # XXX: Make self._pubkey_query_failed?
681 d.addErrback(lambda error, shnum=shnum, data=data:
682 self._got_corrupt_share(error, shnum, server, data, lp))
684 # we already have the public key.
685 d = defer.succeed(None)
687 # Neither of these two branches return anything of
688 # consequence, so the first entry in our deferredlist will
691 # - Next, we need the version information. We almost
692 # certainly got this by reading the first thousand or so
693 # bytes of the share on the storage server, so we
694 # shouldn't need to fetch anything at this step.
695 d2 = reader.get_verinfo()
696 d2.addErrback(lambda error, shnum=shnum, data=data:
697 self._got_corrupt_share(error, shnum, server, data, lp))
698 # - Next, we need the signature. For an SDMF share, it is
699 # likely that we fetched this when doing our initial fetch
700 # to get the version information. In MDMF, this lives at
701 # the end of the share, so unless the file is quite small,
702 # we'll need to do a remote fetch to get it.
703 d3 = reader.get_signature()
704 d3.addErrback(lambda error, shnum=shnum, data=data:
705 self._got_corrupt_share(error, shnum, server, data, lp))
706 # Once we have all three of these responses, we can move on
707 # to validating the signature
709 # Does the node already have a privkey? If not, we'll try to
711 if self._need_privkey:
712 d4 = reader.get_encprivkey()
713 d4.addCallback(lambda results, shnum=shnum:
714 self._try_to_validate_privkey(results, server, shnum, lp))
715 d4.addErrback(lambda error, shnum=shnum:
716 self._privkey_query_failed(error, server, shnum, lp))
718 d4 = defer.succeed(None)
721 if self.fetch_update_data:
722 # fetch the block hash tree and first + last segment, as
723 # configured earlier.
724 # Then set them in wherever we happen to want to set
727 # XXX: We do this above, too. Is there a good way to
728 # make the two routines share the value without
729 # introducing more roundtrips?
730 ds.append(reader.get_verinfo())
731 ds.append(reader.get_blockhashes())
732 ds.append(reader.get_block_and_salt(self.start_segment))
733 ds.append(reader.get_block_and_salt(self.end_segment))
734 d5 = deferredutil.gatherResults(ds)
735 d5.addCallback(self._got_update_results_one_share, shnum)
737 d5 = defer.succeed(None)
739 dl = defer.DeferredList([d, d2, d3, d4, d5])
740 def _append_proxy(passthrough, shnum=shnum, reader=reader):
741 # Store the proxy (with its cache) keyed by serverid and
743 _, (_,verinfo), _, _, _ = passthrough
744 verinfo = self._make_verinfo_hashable(verinfo)
745 self._servermap.proxies[(verinfo,
746 server.get_serverid(),
747 storage_index, shnum)] = reader
749 dl.addCallback(_append_proxy)
750 dl.addBoth(self._turn_barrier)
751 dl.addCallback(lambda results, shnum=shnum:
752 self._got_signature_one_share(results, shnum, server, lp))
753 dl.addErrback(lambda error, shnum=shnum, data=data:
754 self._got_corrupt_share(error, shnum, server, data, lp))
756 # dl is a deferred list that will fire when all of the shares
757 # that we found on this server are done processing. When dl fires,
758 # we know that processing is done, so we can decrement the
759 # semaphore-like thing that we incremented earlier.
760 dl = defer.DeferredList(ds, fireOnOneErrback=True)
761 # Are we done? Done means that there are no more queries to
762 # send, that there are no outstanding queries, and that we
763 # haven't received any queries that are still processing. If we
764 # are done, self._check_for_done will cause the done deferred
765 # that we returned to our caller to fire, which tells them that
766 # they have a complete servermap, and that we won't be touching
767 # the servermap anymore.
768 dl.addCallback(_done_processing)
769 dl.addCallback(self._check_for_done)
770 dl.addErrback(self._fatal_error)
772 self.log("_got_results done", parent=lp, level=log.NOISY)
776 def _turn_barrier(self, result):
778 I help the servermap updater avoid the recursion limit issues
781 return fireEventually(result)
784 def _try_to_set_pubkey(self, pubkey_s, server, shnum, lp):
785 if self._node.get_pubkey():
786 return # don't go through this again if we don't have to
787 fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
788 assert len(fingerprint) == 32
789 if fingerprint != self._node.get_fingerprint():
790 raise CorruptShareError(server, shnum,
791 "pubkey doesn't match fingerprint")
792 self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
793 assert self._node.get_pubkey()
796 def notify_server_corruption(self, server, shnum, reason):
797 rref = server.get_rref()
798 rref.callRemoteOnly("advise_corrupt_share",
799 "mutable", self._storage_index, shnum, reason)
802 def _got_signature_one_share(self, results, shnum, server, lp):
803 # It is our job to give versioninfo to our caller. We need to
804 # raise CorruptShareError if the share is corrupt for any
805 # reason, something that our caller will handle.
806 self.log(format="_got_results: got shnum #%(shnum)d from serverid %(name)s",
808 name=server.get_name(),
811 if not self._running:
812 # We can't process the results, since we can't touch the
814 self.log("but we're not running anymore.")
817 _, verinfo, signature, __, ___ = results
818 verinfo = self._make_verinfo_hashable(verinfo[1])
820 # This tuple uniquely identifies a share on the grid; we use it
821 # to keep track of the ones that we've already seen.
830 offsets_tuple) = verinfo
833 if verinfo not in self._valid_versions:
834 # This is a new version tuple, and we need to validate it
835 # against the public key before keeping track of it.
836 assert self._node.get_pubkey()
837 valid = self._node.get_pubkey().verify(prefix, signature[1])
839 raise CorruptShareError(server, shnum,
840 "signature is invalid")
842 # ok, it's a valid verinfo. Add it to the list of validated
844 self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
845 % (seqnum, base32.b2a(root_hash)[:4],
846 server.get_name(), shnum,
847 k, n, segsize, datalen),
849 self._valid_versions.add(verinfo)
850 # We now know that this is a valid candidate verinfo. Whether or
851 # not this instance of it is valid is a matter for the next
852 # statement; at this point, we just know that if we see this
853 # version info again, that its signature checks out and that
854 # we're okay to skip the signature-checking step.
856 # (server, shnum) are bound in the method invocation.
857 if (server, shnum) in self._servermap.get_bad_shares():
858 # we've been told that the rest of the data in this share is
859 # unusable, so don't add it to the servermap.
860 self.log("but we've been told this is a bad share",
861 parent=lp, level=log.UNUSUAL)
864 # Add the info to our servermap.
865 timestamp = time.time()
866 self._servermap.add_new_share(server, shnum, verinfo, timestamp)
870 def _make_verinfo_hashable(self, verinfo):
881 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
894 def _got_update_results_one_share(self, results, share):
896 I record the update results in results.
898 assert len(results) == 4
899 verinfo, blockhashes, start, end = results
900 verinfo = self._make_verinfo_hashable(verinfo)
901 update_data = (blockhashes, start, end)
902 self._servermap.set_update_data_for_share_and_verinfo(share,
907 def _deserialize_pubkey(self, pubkey_s):
908 verifier = rsa.create_verifying_key_from_string(pubkey_s)
912 def _try_to_validate_privkey(self, enc_privkey, server, shnum, lp):
914 Given a writekey from a remote server, I validate it against the
915 writekey stored in my node. If it is valid, then I set the
916 privkey and encprivkey properties of the node.
918 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
919 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
920 if alleged_writekey != self._node.get_writekey():
921 self.log("invalid privkey from %s shnum %d" %
922 (server.get_name(), shnum),
923 parent=lp, level=log.WEIRD, umid="aJVccw")
927 self.log("got valid privkey from shnum %d on serverid %s" %
928 (shnum, server.get_name()),
930 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
931 self._node._populate_encprivkey(enc_privkey)
932 self._node._populate_privkey(privkey)
933 self._need_privkey = False
934 self._status.set_privkey_from(server)
937 def _add_lease_failed(self, f, server, storage_index):
938 # Older versions of Tahoe didn't handle the add-lease message very
939 # well: <=1.1.0 throws a NameError because it doesn't implement
940 # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
941 # (which is most of them, since we send add-lease to everybody,
942 # before we know whether or not they have any shares for us), and
943 # 1.2.0 throws KeyError even on known buckets due to an internal bug
944 # in the latency-measuring code.
946 # we want to ignore the known-harmless errors and log the others. In
947 # particular we want to log any local errors caused by coding
950 if f.check(DeadReferenceError):
952 if f.check(RemoteException):
953 if f.value.failure.check(KeyError, IndexError, NameError):
954 # this may ignore a bit too much, but that only hurts us
957 self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
958 name=server.get_name(),
959 f_value=str(f.value),
961 level=log.WEIRD, umid="iqg3mw")
963 # local errors are cause for alarm
965 format="local error in add_lease to [%(name)s]: %(f_value)s",
966 name=server.get_name(),
967 f_value=str(f.value),
968 level=log.WEIRD, umid="ZWh6HA")
970 def _query_failed(self, f, server):
971 if not self._running:
974 if f.check(DeadReferenceError):
976 self.log(format="error during query: %(f_value)s",
977 f_value=str(f.value), failure=f,
978 level=level, umid="IHXuQg")
979 self._must_query.discard(server)
980 self._queries_outstanding.discard(server)
981 self._bad_servers.add(server)
982 self._servermap.add_problem(f)
983 # a server could be in both ServerMap.reachable_servers and
984 # .unreachable_servers if they responded to our query, but then an
985 # exception was raised in _got_results.
986 self._servermap.mark_server_unreachable(server)
987 self._queries_completed += 1
988 self._last_failure = f
991 def _privkey_query_failed(self, f, server, shnum, lp):
992 self._queries_outstanding.discard(server)
993 if not self._running:
996 if f.check(DeadReferenceError):
998 self.log(format="error during privkey query: %(f_value)s",
999 f_value=str(f.value), failure=f,
1000 parent=lp, level=level, umid="McoJ5w")
1001 self._servermap.add_problem(f)
1002 self._last_failure = f
1005 def _check_for_done(self, res):
1007 # return self._send_more_queries(outstanding) : send some more queries
1008 # return self._done() : all done
1009 # return : keep waiting, no new queries
1010 lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
1011 "%(outstanding)d queries outstanding, "
1012 "%(extra)d extra servers available, "
1013 "%(must)d 'must query' servers left, "
1014 "need_privkey=%(need_privkey)s"
1017 outstanding=len(self._queries_outstanding),
1018 extra=len(self.extra_servers),
1019 must=len(self._must_query),
1020 need_privkey=self._need_privkey,
1024 if not self._running:
1025 self.log("but we're not running", parent=lp, level=log.NOISY)
1028 if self._must_query:
1029 # we are still waiting for responses from servers that used to have
1030 # a share, so we must continue to wait. No additional queries are
1031 # required at this time.
1032 self.log("%d 'must query' servers left" % len(self._must_query),
1033 level=log.NOISY, parent=lp)
1036 if (not self._queries_outstanding and not self.extra_servers):
1037 # all queries have retired, and we have no servers left to ask. No
1038 # more progress can be made, therefore we are done.
1039 self.log("all queries are retired, no extra servers: done",
1043 recoverable_versions = self._servermap.recoverable_versions()
1044 unrecoverable_versions = self._servermap.unrecoverable_versions()
1046 # what is our completion policy? how hard should we work?
1048 if self.mode == MODE_ANYTHING:
1049 if recoverable_versions:
1050 self.log("%d recoverable versions: done"
1051 % len(recoverable_versions),
1055 if self.mode in (MODE_CHECK, MODE_REPAIR):
1056 # we used self._must_query, and we know there aren't any
1057 # responses still waiting, so that means we must be done
1058 self.log("done", parent=lp)
1062 if self.mode == MODE_READ:
1063 # if we've queried k+epsilon servers, and we see a recoverable
1064 # version, and we haven't seen any unrecoverable higher-seqnum'ed
1065 # versions, then we're done.
1067 if self._queries_completed < self.num_servers_to_query:
1068 self.log(format="%(completed)d completed, %(query)d to query: need more",
1069 completed=self._queries_completed,
1070 query=self.num_servers_to_query,
1071 level=log.NOISY, parent=lp)
1072 return self._send_more_queries(MAX_IN_FLIGHT)
1073 if not recoverable_versions:
1074 self.log("no recoverable versions: need more",
1075 level=log.NOISY, parent=lp)
1076 return self._send_more_queries(MAX_IN_FLIGHT)
1077 highest_recoverable = max(recoverable_versions)
1078 highest_recoverable_seqnum = highest_recoverable[0]
1079 for unrec_verinfo in unrecoverable_versions:
1080 if unrec_verinfo[0] > highest_recoverable_seqnum:
1081 # there is evidence of a higher-seqnum version, but we
1082 # don't yet see enough shares to recover it. Try harder.
1083 # TODO: consider sending more queries.
1084 # TODO: consider limiting the search distance
1085 self.log("evidence of higher seqnum: need more",
1086 level=log.UNUSUAL, parent=lp)
1087 return self._send_more_queries(MAX_IN_FLIGHT)
1088 # all the unrecoverable versions were old or concurrent with a
1089 # recoverable version. Good enough.
1090 self.log("no higher-seqnum: done", parent=lp)
1093 if self.mode == MODE_WRITE:
1094 # we want to keep querying until we've seen a few that don't have
1095 # any shares, to be sufficiently confident that we've seen all
1096 # the shares. This is still less work than MODE_CHECK, which asks
1097 # every server in the world.
1099 if not recoverable_versions:
1100 self.log("no recoverable versions: need more", parent=lp,
1102 return self._send_more_queries(MAX_IN_FLIGHT)
1105 last_not_responded = -1
1106 num_not_responded = 0
1109 found_boundary = False
1111 for i,server in enumerate(self.full_serverlist):
1112 if server in self._bad_servers:
1115 #self.log("loop [%s]: x" % server.get_name()
1116 elif server in self._empty_servers:
1119 #self.log("loop [%s]: 0" % server.get_name()
1120 if last_found != -1:
1122 if num_not_found >= self.EPSILON:
1123 self.log("found our boundary, %s" %
1125 parent=lp, level=log.NOISY)
1126 found_boundary = True
1129 elif server in self._good_servers:
1132 #self.log("loop [%s]: 1" % server.get_name()
1138 #self.log("loop [%s]: ?" % server.get_name()
1139 last_not_responded = i
1140 num_not_responded += 1
1143 # we need to know that we've gotten answers from
1144 # everybody to the left of here
1145 if last_not_responded == -1:
1147 self.log("have all our answers",
1148 parent=lp, level=log.NOISY)
1149 # .. unless we're still waiting on the privkey
1150 if self._need_privkey:
1151 self.log("but we're still waiting for the privkey",
1152 parent=lp, level=log.NOISY)
1153 # if we found the boundary but we haven't yet found
1154 # the privkey, we may need to look further. If
1155 # somehow all the privkeys were corrupted (but the
1156 # shares were readable), then this is likely to do an
1157 # exhaustive search.
1158 return self._send_more_queries(MAX_IN_FLIGHT)
1160 # still waiting for somebody
1161 return self._send_more_queries(num_not_responded)
1163 # if we hit here, we didn't find our boundary, so we're still
1164 # waiting for servers
1165 self.log("no boundary yet, %s" % "".join(states), parent=lp,
1167 return self._send_more_queries(MAX_IN_FLIGHT)
1169 # otherwise, keep up to 5 queries in flight. TODO: this is pretty
1170 # arbitrary, really I want this to be something like k -
1171 # max(known_version_sharecounts) + some extra
1172 self.log("catchall: need more", parent=lp, level=log.NOISY)
1173 return self._send_more_queries(MAX_IN_FLIGHT)
1175 def _send_more_queries(self, num_outstanding):
1179 self.log(format=" there are %(outstanding)d queries outstanding",
1180 outstanding=len(self._queries_outstanding),
1182 active_queries = len(self._queries_outstanding) + len(more_queries)
1183 if active_queries >= num_outstanding:
1185 if not self.extra_servers:
1187 more_queries.append(self.extra_servers.pop(0))
1189 self.log(format="sending %(more)d more queries: %(who)s",
1190 more=len(more_queries),
1191 who=" ".join(["[%s]" % s.get_name() for s in more_queries]),
1194 for server in more_queries:
1195 self._do_query(server, self._storage_index, self._read_size)
1196 # we'll retrigger when those queries come back
1199 if not self._running:
1200 self.log("not running; we're already done")
1202 self._running = False
1204 elapsed = now - self._started
1205 self._status.set_finished(now)
1206 self._status.timings["total"] = elapsed
1207 self._status.set_progress(1.0)
1208 self._status.set_status("Finished")
1209 self._status.set_active(False)
1211 self._servermap.set_last_update(self.mode, self._started)
1212 # the servermap will not be touched after this
1213 self.log("servermap: %s" % self._servermap.summarize_versions())
1215 eventually(self._done_deferred.callback, self._servermap)
1217 def _fatal_error(self, f):
1218 self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1219 self._done_deferred.errback(f)