3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually, DeadReferenceError, \
10 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
11 DownloadStopped, MDMF_VERSION, SDMF_VERSION
12 from allmydata.util import hashutil, log, mathutil, deferredutil
13 from allmydata.util.dictutil import DictOfSets
14 from allmydata import hashtree, codec
15 from allmydata.storage.server import si_b2a
16 from pycryptopp.cipher.aes import AES
17 from pycryptopp.publickey import rsa
19 from allmydata.mutable.common import CorruptShareError, BadShareError, \
20 UncoordinatedWriteError
21 from allmydata.mutable.layout import MDMFSlotReadProxy
24 implements(IRetrieveStatus)
25 statusid_counter = count(0)
28 self.timings["fetch_per_server"] = {}
29 self.timings["decode"] = 0.0
30 self.timings["decrypt"] = 0.0
31 self.timings["cumulative_verify"] = 0.0
34 self.storage_index = None
36 self.encoding = ("?","?")
38 self.status = "Not started"
40 self.counter = self.statusid_counter.next()
41 self.started = time.time()
43 def get_started(self):
45 def get_storage_index(self):
46 return self.storage_index
47 def get_encoding(self):
49 def using_helper(self):
55 def get_progress(self):
59 def get_counter(self):
61 def get_problems(self):
64 def add_fetch_timing(self, server, elapsed):
65 if server not in self.timings["fetch_per_server"]:
66 self.timings["fetch_per_server"][server] = []
67 self.timings["fetch_per_server"][server].append(elapsed)
68 def accumulate_decode_time(self, elapsed):
69 self.timings["decode"] += elapsed
70 def accumulate_decrypt_time(self, elapsed):
71 self.timings["decrypt"] += elapsed
72 def set_storage_index(self, si):
73 self.storage_index = si
74 def set_helper(self, helper):
76 def set_encoding(self, k, n):
77 self.encoding = (k, n)
78 def set_size(self, size):
80 def set_status(self, status):
82 def set_progress(self, value):
84 def set_active(self, value):
86 def add_problem(self, server, f):
87 serverid = server.get_serverid()
88 self._problems[serverid] = f
94 # this class is currently single-use. Eventually (in MDMF) we will make
95 # it multi-use, in which case you can call download(range) multiple
96 # times, and each will have a separate response chain. However the
97 # Retrieve object will remain tied to a specific version of the file, and
98 # will use a single ServerMap instance.
99 implements(IPushProducer)
101 def __init__(self, filenode, storage_broker, servermap, verinfo,
102 fetch_privkey=False, verify=False):
103 self._node = filenode
104 assert self._node.get_pubkey()
105 self._storage_broker = storage_broker
106 self._storage_index = filenode.get_storage_index()
107 assert self._node.get_readkey()
108 self._last_failure = None
109 prefix = si_b2a(self._storage_index)[:5]
110 self._log_number = log.msg("Retrieve(%s): starting" % prefix)
112 self._decoding = False
113 self._bad_shares = set()
115 self.servermap = servermap
116 assert self._node.get_pubkey()
117 self.verinfo = verinfo
118 # during repair, we may be called upon to grab the private key, since
119 # it wasn't picked up during a verify=False checker run, and we'll
120 # need it for repair to generate a new version.
121 self._need_privkey = verify or (fetch_privkey
122 and not self._node.get_privkey())
124 if self._need_privkey:
125 # TODO: Evaluate the need for this. We'll use it if we want
126 # to limit how many queries are on the wire for the privkey
128 self._privkey_query_markers = [] # one Marker for each time we've
129 # tried to get the privkey.
131 # verify means that we are using the downloader logic to verify all
132 # of our shares. This tells the downloader a few things.
134 # 1. We need to download all of the shares.
135 # 2. We don't need to decode or decrypt the shares, since our
136 # caller doesn't care about the plaintext, only the
137 # information about which shares are or are not valid.
138 # 3. When we are validating readers, we need to validate the
139 # signature on the prefix. Do we? We already do this in the
141 self._verify = verify
143 self._status = RetrieveStatus()
144 self._status.set_storage_index(self._storage_index)
145 self._status.set_helper(False)
146 self._status.set_progress(0.0)
147 self._status.set_active(True)
148 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
149 offsets_tuple) = self.verinfo
150 self._status.set_size(datalength)
151 self._status.set_encoding(k, N)
153 self._stopped = False
154 self._pause_deferred = None
156 self._read_length = None
157 self.log("got seqnum %d" % self.verinfo[0])
160 def get_status(self):
163 def log(self, *args, **kwargs):
164 if "parent" not in kwargs:
165 kwargs["parent"] = self._log_number
166 if "facility" not in kwargs:
167 kwargs["facility"] = "tahoe.mutable.retrieve"
168 return log.msg(*args, **kwargs)
170 def _set_current_status(self, state):
171 seg = "%d/%d" % (self._current_segment, self._last_segment)
172 self._status.set_status("segment %s (%s)" % (seg, state))
177 def pauseProducing(self):
179 I am called by my download target if we have produced too much
180 data for it to handle. I make the downloader stop producing new
181 data until my resumeProducing method is called.
183 if self._pause_deferred is not None:
186 # fired when the download is unpaused.
187 self._old_status = self._status.get_status()
188 self._set_current_status("paused")
190 self._pause_deferred = defer.Deferred()
193 def resumeProducing(self):
195 I am called by my download target once it is ready to begin
196 receiving data again.
198 if self._pause_deferred is None:
201 p = self._pause_deferred
202 self._pause_deferred = None
203 self._status.set_status(self._old_status)
205 eventually(p.callback, None)
207 def stopProducing(self):
209 self.resumeProducing()
212 def _check_for_paused(self, res):
214 I am called just before a write to the consumer. I return a
215 Deferred that eventually fires with the data that is to be
216 written to the consumer. If the download has not been paused,
217 the Deferred fires immediately. Otherwise, the Deferred fires
218 when the downloader is unpaused.
220 if self._pause_deferred is not None:
222 self._pause_deferred.addCallback(lambda ignored: d.callback(res))
226 def _check_for_stopped(self, res):
228 raise DownloadStopped("our Consumer called stopProducing()")
232 def download(self, consumer=None, offset=0, size=None):
233 assert IConsumer.providedBy(consumer) or self._verify
236 self._consumer = consumer
237 # we provide IPushProducer, so streaming=True, per
239 self._consumer.registerProducer(self, streaming=True)
241 self._done_deferred = defer.Deferred()
242 self._offset = offset
243 self._read_length = size
244 self._setup_download()
245 self._setup_encoding_parameters()
246 self.log("starting download")
247 self._started_fetching = time.time()
248 # The download process beyond this is a state machine.
249 # _add_active_servers will select the servers that we want to use
250 # for the download, and then attempt to start downloading. After
251 # each segment, it will check for doneness, reacting to broken
252 # servers and corrupt shares as necessary. If it runs out of good
253 # servers before downloading all of the segments, _done_deferred
254 # will errback. Otherwise, it will eventually callback with the
255 # contents of the mutable file.
257 return self._done_deferred
260 d = fireEventually(None) # avoid #237 recursion limit problem
261 d.addCallback(lambda ign: self._activate_enough_servers())
262 d.addCallback(lambda ign: self._download_current_segment())
263 # when we're done, _download_current_segment will call _done. If we
264 # aren't, it will call loop() again.
265 d.addErrback(self._error)
267 def _setup_download(self):
268 self._started = time.time()
269 self._status.set_status("Retrieving Shares")
271 # how many shares do we need?
280 offsets_tuple) = self.verinfo
282 # first, which servers can we use?
283 versionmap = self.servermap.make_versionmap()
284 shares = versionmap[self.verinfo]
285 # this sharemap is consumed as we decide to send requests
286 self.remaining_sharemap = DictOfSets()
287 for (shnum, server, timestamp) in shares:
288 self.remaining_sharemap.add(shnum, server)
289 # Reuse the SlotReader from the servermap.
290 key = (self.verinfo, server.get_serverid(),
291 self._storage_index, shnum)
292 if key in self.servermap.proxies:
293 reader = self.servermap.proxies[key]
295 reader = MDMFSlotReadProxy(server.get_rref(),
296 self._storage_index, shnum, None)
297 reader.server = server
298 self.readers[shnum] = reader
299 assert len(self.remaining_sharemap) >= k
301 self.shares = {} # maps shnum to validated blocks
302 self._active_readers = [] # list of active readers for this dl.
303 self._block_hash_trees = {} # shnum => hashtree
305 # We need one share hash tree for the entire file; its leaves
306 # are the roots of the block hash trees for the shares that
307 # comprise it, and its root is in the verinfo.
308 self.share_hash_tree = hashtree.IncompleteHashTree(N)
309 self.share_hash_tree.set_hashes({0: root_hash})
311 def decode(self, blocks_and_salts, segnum):
313 I am a helper method that the mutable file update process uses
314 as a shortcut to decode and decrypt the segments that it needs
315 to fetch in order to perform a file update. I take in a
316 collection of blocks and salts, and pick some of those to make a
317 segment with. I return the plaintext associated with that
320 # shnum => block hash tree. Unused, but setup_encoding_parameters will
322 self._block_hash_trees = None
323 self._setup_encoding_parameters()
325 # _decode_blocks() expects the output of a gatherResults that
326 # contains the outputs of _validate_block() (each of which is a dict
327 # mapping shnum to (block,salt) bytestrings).
328 d = self._decode_blocks([blocks_and_salts], segnum)
329 d.addCallback(self._decrypt_segment)
333 def _setup_encoding_parameters(self):
335 I set up the encoding parameters, including k, n, the number
336 of segments associated with this file, and the segment decoders.
346 offsets_tuple) = self.verinfo
347 self._required_shares = k
348 self._total_shares = n
349 self._segment_size = segsize
350 self._data_length = datalength
353 self._version = MDMF_VERSION
355 self._version = SDMF_VERSION
357 if datalength and segsize:
358 self._num_segments = mathutil.div_ceil(datalength, segsize)
359 self._tail_data_size = datalength % segsize
361 self._num_segments = 0
362 self._tail_data_size = 0
364 self._segment_decoder = codec.CRSDecoder()
365 self._segment_decoder.set_params(segsize, k, n)
367 if not self._tail_data_size:
368 self._tail_data_size = segsize
370 self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
371 self._required_shares)
372 if self._tail_segment_size == self._segment_size:
373 self._tail_decoder = self._segment_decoder
375 self._tail_decoder = codec.CRSDecoder()
376 self._tail_decoder.set_params(self._tail_segment_size,
377 self._required_shares,
380 self.log("got encoding parameters: "
383 "%d segments of %d bytes each (%d byte tail segment)" % \
384 (k, n, self._num_segments, self._segment_size,
385 self._tail_segment_size))
387 if self._block_hash_trees is not None:
388 for i in xrange(self._total_shares):
389 # So we don't have to do this later.
390 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
392 # Our last task is to tell the downloader where to start and
393 # where to stop. We use three parameters for that:
394 # - self._start_segment: the segment that we need to start
396 # - self._current_segment: the next segment that we need to
398 # - self._last_segment: The last segment that we were asked to
401 # We say that the download is complete when
402 # self._current_segment > self._last_segment. We use
403 # self._start_segment and self._last_segment to know when to
404 # strip things off of segments, and how much to strip.
406 self.log("got offset: %d" % self._offset)
407 # our start segment is the first segment containing the
408 # offset we were given.
409 start = self._offset // self._segment_size
411 assert start < self._num_segments
412 self._start_segment = start
413 self.log("got start segment: %d" % self._start_segment)
415 self._start_segment = 0
418 # If self._read_length is None, then we want to read the whole
419 # file. Otherwise, we want to read only part of the file, and
420 # need to figure out where to stop reading.
421 if self._read_length is not None:
422 # our end segment is the last segment containing part of the
423 # segment that we were asked to read.
424 self.log("got read length %d" % self._read_length)
425 if self._read_length != 0:
426 end_data = self._offset + self._read_length
428 # We don't actually need to read the byte at end_data,
429 # but the one before it.
430 end = (end_data - 1) // self._segment_size
432 assert end < self._num_segments
433 self._last_segment = end
435 self._last_segment = self._start_segment
436 self.log("got end segment: %d" % self._last_segment)
438 self._last_segment = self._num_segments - 1
440 self._current_segment = self._start_segment
442 def _activate_enough_servers(self):
444 I populate self._active_readers with enough active readers to
445 retrieve the contents of this mutable file. I am called before
446 downloading starts, and (eventually) after each validation
447 error, connection error, or other problem in the download.
449 # TODO: It would be cool to investigate other heuristics for
450 # reader selection. For instance, the cost (in time the user
451 # spends waiting for their file) of selecting a really slow server
452 # that happens to have a primary share is probably more than
453 # selecting a really fast server that doesn't have a primary
454 # share. Maybe the servermap could be extended to provide this
455 # information; it could keep track of latency information while
456 # it gathers more important data, and then this routine could
457 # use that to select active readers.
459 # (these and other questions would be easier to answer with a
460 # robust, configurable tahoe-lafs simulator, which modeled node
461 # failures, differences in node speed, and other characteristics
462 # that we expect storage servers to have. You could have
463 # presets for really stable grids (like allmydata.com),
464 # friendnets, make it easy to configure your own settings, and
465 # then simulate the effect of big changes on these use cases
466 # instead of just reasoning about what the effect might be. Out
467 # of scope for MDMF, though.)
469 # XXX: Why don't format= log messages work here?
471 known_shnums = set(self.remaining_sharemap.keys())
472 used_shnums = set([r.shnum for r in self._active_readers])
473 unused_shnums = known_shnums - used_shnums
476 new_shnums = unused_shnums # use them all
477 elif len(self._active_readers) < self._required_shares:
479 more = self._required_shares - len(self._active_readers)
480 # We favor lower numbered shares, since FEC is faster with
481 # primary shares than with other shares, and lower-numbered
482 # shares are more likely to be primary than higher numbered
484 new_shnums = sorted(unused_shnums)[:more]
485 if len(new_shnums) < more:
486 # We don't have enough readers to retrieve the file; fail.
487 self._raise_notenoughshareserror()
491 self.log("adding %d new servers to the active list" % len(new_shnums))
492 for shnum in new_shnums:
493 reader = self.readers[shnum]
494 self._active_readers.append(reader)
495 self.log("added reader for share %d" % shnum)
496 # Each time we add a reader, we check to see if we need the
497 # private key. If we do, we politely ask for it and then continue
498 # computing. If we find that we haven't gotten it at the end of
499 # segment decoding, then we'll take more drastic measures.
500 if self._need_privkey and not self._node.is_readonly():
501 d = reader.get_encprivkey()
502 d.addCallback(self._try_to_validate_privkey, reader, reader.server)
503 # XXX: don't just drop the Deferred. We need error-reporting
504 # but not flow-control here.
506 def _try_to_validate_prefix(self, prefix, reader):
508 I check that the prefix returned by a candidate server for
509 retrieval matches the prefix that the servermap knows about
510 (and, hence, the prefix that was validated earlier). If it does,
511 I return True, which means that I approve of the use of the
512 candidate server for segment retrieval. If it doesn't, I return
513 False, which means that another server must be chosen.
523 offsets_tuple) = self.verinfo
524 if known_prefix != prefix:
525 self.log("prefix from share %d doesn't match" % reader.shnum)
526 raise UncoordinatedWriteError("Mismatched prefix -- this could "
527 "indicate an uncoordinated write")
528 # Otherwise, we're okay -- no issues.
531 def _remove_reader(self, reader):
533 At various points, we will wish to remove a server from
534 consideration and/or use. These include, but are not necessarily
537 - A connection error.
538 - A mismatched prefix (that is, a prefix that does not match
539 our conception of the version information string).
540 - A failing block hash, salt hash, or share hash, which can
541 indicate disk failure/bit flips, or network trouble.
543 This method will do that. I will make sure that the
544 (shnum,reader) combination represented by my reader argument is
545 not used for anything else during this download. I will not
546 advise the reader of any corruption, something that my callers
547 may wish to do on their own.
549 # TODO: When you're done writing this, see if this is ever
550 # actually used for something that _mark_bad_share isn't. I have
551 # a feeling that they will be used for very similar things, and
552 # that having them both here is just going to be an epic amount
553 # of code duplication.
555 # (well, okay, not epic, but meaningful)
556 self.log("removing reader %s" % reader)
557 # Remove the reader from _active_readers
558 self._active_readers.remove(reader)
559 # TODO: self.readers.remove(reader)?
560 for shnum in list(self.remaining_sharemap.keys()):
561 self.remaining_sharemap.discard(shnum, reader.server)
564 def _mark_bad_share(self, server, shnum, reader, f):
566 I mark the given (server, shnum) as a bad share, which means that it
567 will not be used anywhere else.
569 There are several reasons to want to mark something as a bad
570 share. These include:
572 - A connection error to the server.
573 - A mismatched prefix (that is, a prefix that does not match
574 our local conception of the version information string).
575 - A failing block hash, salt hash, share hash, or other
578 This method will ensure that readers that we wish to mark bad
579 (for these reasons or other reasons) are not used for the rest
580 of the download. Additionally, it will attempt to tell the
581 remote server (with no guarantee of success) that its share is
584 self.log("marking share %d on server %s as bad" % \
585 (shnum, server.get_name()))
586 prefix = self.verinfo[-2]
587 self.servermap.mark_bad_share(server, shnum, prefix)
588 self._remove_reader(reader)
589 self._bad_shares.add((server, shnum, f))
590 self._status.add_problem(server, f)
591 self._last_failure = f
592 if f.check(BadShareError):
593 self.notify_server_corruption(server, shnum, str(f.value))
596 def _download_current_segment(self):
598 I download, validate, decode, decrypt, and assemble the segment
599 that this Retrieve is currently responsible for downloading.
601 if self._current_segment > self._last_segment:
602 # No more segments to download, we're done.
603 self.log("got plaintext, done")
605 elif self._verify and len(self._active_readers) == 0:
606 self.log("no more good shares, no need to keep verifying")
608 self.log("on segment %d of %d" %
609 (self._current_segment + 1, self._num_segments))
610 d = self._process_segment(self._current_segment)
611 d.addCallback(lambda ign: self.loop())
614 def _process_segment(self, segnum):
616 I download, validate, decode, and decrypt one segment of the
617 file that this Retrieve is retrieving. This means coordinating
618 the process of getting k blocks of that file, validating them,
619 assembling them into one segment with the decoder, and then
622 self.log("processing segment %d" % segnum)
624 # TODO: The old code uses a marker. Should this code do that
625 # too? What did the Marker do?
627 # We need to ask each of our active readers for its block and
628 # salt. We will then validate those. If validation is
629 # successful, we will assemble the results into plaintext.
631 for reader in self._active_readers:
632 started = time.time()
633 d1 = reader.get_block_and_salt(segnum)
634 d2,d3 = self._get_needed_hashes(reader, segnum)
635 d = deferredutil.gatherResults([d1,d2,d3])
636 d.addCallback(self._validate_block, segnum, reader, reader.server, started)
637 # _handle_bad_share takes care of recoverable errors (by dropping
638 # that share and returning None). Any other errors (i.e. code
639 # bugs) are passed through and cause the retrieve to fail.
640 d.addErrback(self._handle_bad_share, [reader])
642 dl = deferredutil.gatherResults(ds)
644 dl.addCallback(lambda ignored: "")
645 dl.addCallback(self._set_segment)
647 dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
651 def _maybe_decode_and_decrypt_segment(self, results, segnum):
653 I take the results of fetching and validating the blocks from
654 _process_segment. If validation and fetching succeeded without
655 incident, I will proceed with decoding and decryption. Otherwise, I
658 self.log("trying to decode and decrypt segment %d" % segnum)
660 # 'results' is the output of a gatherResults set up in
661 # _process_segment(). Each component Deferred will either contain the
662 # non-Failure output of _validate_block() for a single block (i.e.
663 # {segnum:(block,salt)}), or None if _validate_block threw an
664 # exception and _validation_or_decoding_failed handled it (by
665 # dropping that server).
668 self.log("some validation operations failed; not proceeding")
669 return defer.succeed(None)
670 self.log("everything looks ok, building segment %d" % segnum)
671 d = self._decode_blocks(results, segnum)
672 d.addCallback(self._decrypt_segment)
673 # check to see whether we've been paused before writing
675 d.addCallback(self._check_for_paused)
676 d.addCallback(self._check_for_stopped)
677 d.addCallback(self._set_segment)
681 def _set_segment(self, segment):
683 Given a plaintext segment, I register that segment with the
684 target that is handling the file download.
686 self.log("got plaintext for segment %d" % self._current_segment)
687 if self._current_segment == self._start_segment:
688 # We're on the first segment. It's possible that we want
689 # only some part of the end of this segment, and that we
690 # just downloaded the whole thing to get that part. If so,
691 # we need to account for that and give the reader just the
692 # data that they want.
693 n = self._offset % self._segment_size
694 self.log("stripping %d bytes off of the first segment" % n)
695 self.log("original segment length: %d" % len(segment))
696 segment = segment[n:]
697 self.log("new segment length: %d" % len(segment))
699 if self._current_segment == self._last_segment and self._read_length is not None:
700 # We're on the last segment. It's possible that we only want
701 # part of the beginning of this segment, and that we
702 # downloaded the whole thing anyway. Make sure to give the
703 # caller only the portion of the segment that they want to
705 extra = self._read_length
706 if self._start_segment != self._last_segment:
707 extra -= self._segment_size - \
708 (self._offset % self._segment_size)
709 extra %= self._segment_size
710 self.log("original segment length: %d" % len(segment))
711 segment = segment[:extra]
712 self.log("new segment length: %d" % len(segment))
713 self.log("only taking %d bytes of the last segment" % extra)
716 self._consumer.write(segment)
718 # we don't care about the plaintext if we are doing a verify.
720 self._current_segment += 1
723 def _handle_bad_share(self, f, readers):
725 I am called when a block or a salt fails to correctly validate, or when
726 the decryption or decoding operation fails for some reason. I react to
727 this failure by notifying the remote server of corruption, and then
728 removing the remote server from further activity.
730 # these are the errors we can tolerate: by giving up on this share
731 # and finding others to replace it. Any other errors (i.e. coding
732 # bugs) are re-raised, causing the download to fail.
733 f.trap(DeadReferenceError, RemoteException, BadShareError)
735 # DeadReferenceError happens when we try to fetch data from a server
736 # that has gone away. RemoteException happens if the server had an
737 # internal error. BadShareError encompasses: (UnknownVersionError,
738 # LayoutInvalid, struct.error) which happen when we get obviously
739 # wrong data, and CorruptShareError which happens later, when we
740 # perform integrity checks on the data.
742 assert isinstance(readers, list)
743 bad_shnums = [reader.shnum for reader in readers]
745 self.log("validation or decoding failed on share(s) %s, server(s) %s "
746 ", segment %d: %s" % \
747 (bad_shnums, readers, self._current_segment, str(f)))
748 for reader in readers:
749 self._mark_bad_share(reader.server, reader.shnum, reader, f)
753 def _validate_block(self, results, segnum, reader, server, started):
755 I validate a block from one share on a remote server.
757 # Grab the part of the block hash tree that is necessary to
758 # validate this block, then generate the block hash root.
759 self.log("validating share %d for segment %d" % (reader.shnum,
761 elapsed = time.time() - started
762 self._status.add_fetch_timing(server, elapsed)
763 self._set_current_status("validating blocks")
765 block_and_salt, blockhashes, sharehashes = results
766 block, salt = block_and_salt
767 assert type(block) is str, (block, salt)
769 blockhashes = dict(enumerate(blockhashes))
770 self.log("the reader gave me the following blockhashes: %s" % \
772 self.log("the reader gave me the following sharehashes: %s" % \
774 bht = self._block_hash_trees[reader.shnum]
776 if bht.needed_hashes(segnum, include_leaf=True):
778 bht.set_hashes(blockhashes)
779 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
781 raise CorruptShareError(server,
783 "block hash tree failure: %s" % e)
785 if self._version == MDMF_VERSION:
786 blockhash = hashutil.block_hash(salt + block)
788 blockhash = hashutil.block_hash(block)
789 # If this works without an error, then validation is
792 bht.set_hashes(leaves={segnum: blockhash})
793 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
795 raise CorruptShareError(server,
797 "block hash tree failure: %s" % e)
799 # Reaching this point means that we know that this segment
800 # is correct. Now we need to check to see whether the share
801 # hash chain is also correct.
802 # SDMF wrote share hash chains that didn't contain the
803 # leaves, which would be produced from the block hash tree.
804 # So we need to validate the block hash tree first. If
805 # successful, then bht[0] will contain the root for the
806 # shnum, which will be a leaf in the share hash tree, which
807 # will allow us to validate the rest of the tree.
809 self.share_hash_tree.set_hashes(hashes=sharehashes,
810 leaves={reader.shnum: bht[0]})
811 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
813 raise CorruptShareError(server,
815 "corrupt hashes: %s" % e)
817 self.log('share %d is valid for segment %d' % (reader.shnum,
819 return {reader.shnum: (block, salt)}
822 def _get_needed_hashes(self, reader, segnum):
824 I get the hashes needed to validate segnum from the reader, then return
825 to my caller when this is done.
827 bht = self._block_hash_trees[reader.shnum]
828 needed = bht.needed_hashes(segnum, include_leaf=True)
829 # The root of the block hash tree is also a leaf in the share
830 # hash tree. So we don't need to fetch it from the remote
831 # server. In the case of files with one segment, this means that
832 # we won't fetch any block hash tree from the remote server,
833 # since the hash of each share of the file is the entire block
834 # hash tree, and is a leaf in the share hash tree. This is fine,
835 # since any share corruption will be detected in the share hash
838 self.log("getting blockhashes for segment %d, share %d: %s" % \
839 (segnum, reader.shnum, str(needed)))
840 # TODO is force_remote necessary here?
841 d1 = reader.get_blockhashes(needed, force_remote=False)
842 if self.share_hash_tree.needed_hashes(reader.shnum):
843 need = self.share_hash_tree.needed_hashes(reader.shnum)
844 self.log("also need sharehashes for share %d: %s" % (reader.shnum,
846 d2 = reader.get_sharehashes(need, force_remote=False)
848 d2 = defer.succeed({}) # the logic in the next method
853 def _decode_blocks(self, results, segnum):
855 I take a list of k blocks and salts, and decode that into a
856 single encrypted segment.
858 # 'results' is one or more dicts (each {shnum:(block,salt)}), and we
859 # want to merge them all
860 blocks_and_salts = {}
862 blocks_and_salts.update(d)
864 # All of these blocks should have the same salt; in SDMF, it is
865 # the file-wide IV, while in MDMF it is the per-segment salt. In
866 # either case, we just need to get one of them and use it.
868 # d.items()[0] is like (shnum, (block, salt))
869 # d.items()[0][1] is like (block, salt)
870 # d.items()[0][1][1] is the salt.
871 salt = blocks_and_salts.items()[0][1][1]
872 # Next, extract just the blocks from the dict. We'll use the
873 # salt in the next step.
874 share_and_shareids = [(k, v[0]) for k, v in blocks_and_salts.items()]
875 d2 = dict(share_and_shareids)
878 for shareid, share in d2.items():
879 shareids.append(shareid)
882 self._set_current_status("decoding")
883 started = time.time()
884 assert len(shareids) >= self._required_shares, len(shareids)
885 # zfec really doesn't want extra shares
886 shareids = shareids[:self._required_shares]
887 shares = shares[:self._required_shares]
888 self.log("decoding segment %d" % segnum)
889 if segnum == self._num_segments - 1:
890 d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
892 d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
893 def _process(buffers):
894 segment = "".join(buffers)
895 self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
897 numsegs=self._num_segments,
899 self.log(" joined length %d, datalength %d" %
900 (len(segment), self._data_length))
901 if segnum == self._num_segments - 1:
902 size_to_use = self._tail_data_size
904 size_to_use = self._segment_size
905 segment = segment[:size_to_use]
906 self.log(" segment len=%d" % len(segment))
907 self._status.accumulate_decode_time(time.time() - started)
909 d.addCallback(_process)
913 def _decrypt_segment(self, segment_and_salt):
915 I take a single segment and its salt, and decrypt it. I return
916 the plaintext of the segment that is in my argument.
918 segment, salt = segment_and_salt
919 self._set_current_status("decrypting")
920 self.log("decrypting segment %d" % self._current_segment)
921 started = time.time()
922 key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
924 plaintext = decryptor.process(segment)
925 self._status.accumulate_decrypt_time(time.time() - started)
929 def notify_server_corruption(self, server, shnum, reason):
930 rref = server.get_rref()
931 rref.callRemoteOnly("advise_corrupt_share",
932 "mutable", self._storage_index, shnum, reason)
935 def _try_to_validate_privkey(self, enc_privkey, reader, server):
936 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
937 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
938 if alleged_writekey != self._node.get_writekey():
939 self.log("invalid privkey from %s shnum %d" %
940 (reader, reader.shnum),
941 level=log.WEIRD, umid="YIw4tA")
943 self.servermap.mark_bad_share(server, reader.shnum,
945 e = CorruptShareError(server,
948 f = failure.Failure(e)
949 self._bad_shares.add((server, reader.shnum, f))
953 self.log("got valid privkey from shnum %d on reader %s" %
954 (reader.shnum, reader))
955 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
956 self._node._populate_encprivkey(enc_privkey)
957 self._node._populate_privkey(privkey)
958 self._need_privkey = False
964 I am called by _download_current_segment when the download process
965 has finished successfully. After making some useful logging
966 statements, I return the decrypted contents to the owner of this
967 Retrieve object through self._done_deferred.
969 self._running = False
970 self._status.set_active(False)
972 self._status.timings['total'] = now - self._started
973 self._status.timings['fetch'] = now - self._started_fetching
974 self._status.set_status("Finished")
975 self._status.set_progress(1.0)
977 # remember the encoding parameters, use them again next time
978 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
979 offsets_tuple) = self.verinfo
980 self._node._populate_required_shares(k)
981 self._node._populate_total_shares(N)
984 ret = self._bad_shares
985 self.log("done verifying, found %d bad shares" % len(ret))
987 # TODO: upload status here?
989 self._consumer.unregisterProducer()
990 eventually(self._done_deferred.callback, ret)
993 def _raise_notenoughshareserror(self):
995 I am called by _activate_enough_servers when there are not enough
996 active servers left to complete the download. After making some
997 useful logging statements, I throw an exception to that effect
998 to the caller of this Retrieve object through
1002 format = ("ran out of servers: "
1003 "have %(have)d of %(total)d segments "
1004 "found %(bad)d bad shares "
1005 "encoding %(k)d-of-%(n)d")
1006 args = {"have": self._current_segment,
1007 "total": self._num_segments,
1008 "need": self._last_segment,
1009 "k": self._required_shares,
1010 "n": self._total_shares,
1011 "bad": len(self._bad_shares)}
1012 raise NotEnoughSharesError("%s, last failure: %s" %
1013 (format % args, str(self._last_failure)))
1015 def _error(self, f):
1016 # all errors, including NotEnoughSharesError, land here
1017 self._running = False
1018 self._status.set_active(False)
1020 self._status.timings['total'] = now - self._started
1021 self._status.timings['fetch'] = now - self._started_fetching
1022 self._status.set_status("Failed")
1023 eventually(self._done_deferred.errback, f)