3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
10 DownloadStopped, MDMF_VERSION, SDMF_VERSION
11 from allmydata.util import hashutil, log, mathutil
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from pycryptopp.publickey import rsa
18 from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
19 from allmydata.mutable.layout import MDMFSlotReadProxy
22 implements(IRetrieveStatus)
23 statusid_counter = count(0)
26 self.timings["fetch_per_server"] = {}
27 self.timings["decode"] = 0.0
28 self.timings["decrypt"] = 0.0
29 self.timings["cumulative_verify"] = 0.0
32 self.storage_index = None
34 self.encoding = ("?","?")
36 self.status = "Not started"
38 self.counter = self.statusid_counter.next()
39 self.started = time.time()
41 def get_started(self):
43 def get_storage_index(self):
44 return self.storage_index
45 def get_encoding(self):
47 def using_helper(self):
53 def get_progress(self):
57 def get_counter(self):
59 def get_problems(self):
62 def add_fetch_timing(self, server, elapsed):
63 serverid = server.get_serverid()
64 if serverid not in self.timings["fetch_per_server"]:
65 self.timings["fetch_per_server"][serverid] = []
66 self.timings["fetch_per_server"][serverid].append(elapsed)
67 def accumulate_decode_time(self, elapsed):
68 self.timings["decode"] += elapsed
69 def accumulate_decrypt_time(self, elapsed):
70 self.timings["decrypt"] += elapsed
71 def set_storage_index(self, si):
72 self.storage_index = si
73 def set_helper(self, helper):
75 def set_encoding(self, k, n):
76 self.encoding = (k, n)
77 def set_size(self, size):
79 def set_status(self, status):
81 def set_progress(self, value):
83 def set_active(self, value):
85 def add_problem(self, server, f):
86 serverid = server.get_serverid()
87 self._problems[serverid] = f
93 # this class is currently single-use. Eventually (in MDMF) we will make
94 # it multi-use, in which case you can call download(range) multiple
95 # times, and each will have a separate response chain. However the
96 # Retrieve object will remain tied to a specific version of the file, and
97 # will use a single ServerMap instance.
98 implements(IPushProducer)
100 def __init__(self, filenode, storage_broker, servermap, verinfo,
101 fetch_privkey=False, verify=False):
102 self._node = filenode
103 assert self._node.get_pubkey()
104 self._storage_broker = storage_broker
105 self._storage_index = filenode.get_storage_index()
106 assert self._node.get_readkey()
107 self._last_failure = None
108 prefix = si_b2a(self._storage_index)[:5]
109 self._log_number = log.msg("Retrieve(%s): starting" % prefix)
111 self._decoding = False
112 self._bad_shares = set()
114 self.servermap = servermap
115 assert self._node.get_pubkey()
116 self.verinfo = verinfo
117 # during repair, we may be called upon to grab the private key, since
118 # it wasn't picked up during a verify=False checker run, and we'll
119 # need it for repair to generate a new version.
120 self._need_privkey = verify or (fetch_privkey
121 and not self._node.get_privkey())
123 if self._need_privkey:
124 # TODO: Evaluate the need for this. We'll use it if we want
125 # to limit how many queries are on the wire for the privkey
127 self._privkey_query_markers = [] # one Marker for each time we've
128 # tried to get the privkey.
130 # verify means that we are using the downloader logic to verify all
131 # of our shares. This tells the downloader a few things.
133 # 1. We need to download all of the shares.
134 # 2. We don't need to decode or decrypt the shares, since our
135 # caller doesn't care about the plaintext, only the
136 # information about which shares are or are not valid.
137 # 3. When we are validating readers, we need to validate the
138 # signature on the prefix. Do we? We already do this in the
140 self._verify = verify
142 self._status = RetrieveStatus()
143 self._status.set_storage_index(self._storage_index)
144 self._status.set_helper(False)
145 self._status.set_progress(0.0)
146 self._status.set_active(True)
147 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
148 offsets_tuple) = self.verinfo
149 self._status.set_size(datalength)
150 self._status.set_encoding(k, N)
152 self._stopped = False
153 self._pause_deferred = None
155 self._read_length = None
156 self.log("got seqnum %d" % self.verinfo[0])
159 def get_status(self):
162 def log(self, *args, **kwargs):
163 if "parent" not in kwargs:
164 kwargs["parent"] = self._log_number
165 if "facility" not in kwargs:
166 kwargs["facility"] = "tahoe.mutable.retrieve"
167 return log.msg(*args, **kwargs)
169 def _set_current_status(self, state):
170 seg = "%d/%d" % (self._current_segment, self._last_segment)
171 self._status.set_status("segment %s (%s)" % (seg, state))
176 def pauseProducing(self):
178 I am called by my download target if we have produced too much
179 data for it to handle. I make the downloader stop producing new
180 data until my resumeProducing method is called.
182 if self._pause_deferred is not None:
185 # fired when the download is unpaused.
186 self._old_status = self._status.get_status()
187 self._set_current_status("paused")
189 self._pause_deferred = defer.Deferred()
192 def resumeProducing(self):
194 I am called by my download target once it is ready to begin
195 receiving data again.
197 if self._pause_deferred is None:
200 p = self._pause_deferred
201 self._pause_deferred = None
202 self._status.set_status(self._old_status)
204 eventually(p.callback, None)
206 def stopProducing(self):
208 self.resumeProducing()
211 def _check_for_paused(self, res):
213 I am called just before a write to the consumer. I return a
214 Deferred that eventually fires with the data that is to be
215 written to the consumer. If the download has not been paused,
216 the Deferred fires immediately. Otherwise, the Deferred fires
217 when the downloader is unpaused.
219 if self._pause_deferred is not None:
221 self._pause_deferred.addCallback(lambda ignored: d.callback(res))
225 def _check_for_stopped(self, res):
227 raise DownloadStopped("our Consumer called stopProducing()")
231 def download(self, consumer=None, offset=0, size=None):
232 assert IConsumer.providedBy(consumer) or self._verify
235 self._consumer = consumer
236 # we provide IPushProducer, so streaming=True, per
238 self._consumer.registerProducer(self, streaming=True)
240 self._done_deferred = defer.Deferred()
241 self._offset = offset
242 self._read_length = size
243 self._setup_download()
244 self._setup_encoding_parameters()
245 self.log("starting download")
246 self._started_fetching = time.time()
247 # The download process beyond this is a state machine.
248 # _add_active_servers will select the servers that we want to use
249 # for the download, and then attempt to start downloading. After
250 # each segment, it will check for doneness, reacting to broken
251 # servers and corrupt shares as necessary. If it runs out of good
252 # servers before downloading all of the segments, _done_deferred
253 # will errback. Otherwise, it will eventually callback with the
254 # contents of the mutable file.
256 return self._done_deferred
259 d = fireEventually(None) # avoid #237 recursion limit problem
260 d.addCallback(lambda ign: self._activate_enough_servers())
261 d.addCallback(lambda ign: self._download_current_segment())
262 # when we're done, _download_current_segment will call _done. If we
263 # aren't, it will call loop() again.
264 d.addErrback(self._error)
266 def _setup_download(self):
267 self._started = time.time()
268 self._status.set_status("Retrieving Shares")
270 # how many shares do we need?
279 offsets_tuple) = self.verinfo
281 # first, which servers can we use?
282 versionmap = self.servermap.make_versionmap()
283 shares = versionmap[self.verinfo]
284 # this sharemap is consumed as we decide to send requests
285 self.remaining_sharemap = DictOfSets()
286 for (shnum, server, timestamp) in shares:
287 self.remaining_sharemap.add(shnum, server)
288 # If the servermap update fetched anything, it fetched at least 1
289 # KiB, so we ask for that much.
290 # TODO: Change the cache methods to allow us to fetch all of the
291 # data that they have, then change this method to do that.
292 any_cache = self._node._read_from_cache(self.verinfo, shnum,
294 reader = MDMFSlotReadProxy(server.get_rref(),
298 reader.server = server
299 self.readers[shnum] = reader
300 assert len(self.remaining_sharemap) >= k
302 self.shares = {} # maps shnum to validated blocks
303 self._active_readers = [] # list of active readers for this dl.
304 self._block_hash_trees = {} # shnum => hashtree
306 # We need one share hash tree for the entire file; its leaves
307 # are the roots of the block hash trees for the shares that
308 # comprise it, and its root is in the verinfo.
309 self.share_hash_tree = hashtree.IncompleteHashTree(N)
310 self.share_hash_tree.set_hashes({0: root_hash})
312 def decode(self, blocks_and_salts, segnum):
314 I am a helper method that the mutable file update process uses
315 as a shortcut to decode and decrypt the segments that it needs
316 to fetch in order to perform a file update. I take in a
317 collection of blocks and salts, and pick some of those to make a
318 segment with. I return the plaintext associated with that
321 # shnum => block hash tree. Unused, but setup_encoding_parameters will
323 self._block_hash_trees = None
324 self._setup_encoding_parameters()
326 # This is the form expected by decode.
327 blocks_and_salts = blocks_and_salts.items()
328 blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
330 d = self._decode_blocks(blocks_and_salts, segnum)
331 d.addCallback(self._decrypt_segment)
335 def _setup_encoding_parameters(self):
337 I set up the encoding parameters, including k, n, the number
338 of segments associated with this file, and the segment decoders.
348 offsets_tuple) = self.verinfo
349 self._required_shares = k
350 self._total_shares = n
351 self._segment_size = segsize
352 self._data_length = datalength
355 self._version = MDMF_VERSION
357 self._version = SDMF_VERSION
359 if datalength and segsize:
360 self._num_segments = mathutil.div_ceil(datalength, segsize)
361 self._tail_data_size = datalength % segsize
363 self._num_segments = 0
364 self._tail_data_size = 0
366 self._segment_decoder = codec.CRSDecoder()
367 self._segment_decoder.set_params(segsize, k, n)
369 if not self._tail_data_size:
370 self._tail_data_size = segsize
372 self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
373 self._required_shares)
374 if self._tail_segment_size == self._segment_size:
375 self._tail_decoder = self._segment_decoder
377 self._tail_decoder = codec.CRSDecoder()
378 self._tail_decoder.set_params(self._tail_segment_size,
379 self._required_shares,
382 self.log("got encoding parameters: "
385 "%d segments of %d bytes each (%d byte tail segment)" % \
386 (k, n, self._num_segments, self._segment_size,
387 self._tail_segment_size))
389 if self._block_hash_trees is not None:
390 for i in xrange(self._total_shares):
391 # So we don't have to do this later.
392 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
394 # Our last task is to tell the downloader where to start and
395 # where to stop. We use three parameters for that:
396 # - self._start_segment: the segment that we need to start
398 # - self._current_segment: the next segment that we need to
400 # - self._last_segment: The last segment that we were asked to
403 # We say that the download is complete when
404 # self._current_segment > self._last_segment. We use
405 # self._start_segment and self._last_segment to know when to
406 # strip things off of segments, and how much to strip.
408 self.log("got offset: %d" % self._offset)
409 # our start segment is the first segment containing the
410 # offset we were given.
411 start = self._offset // self._segment_size
413 assert start < self._num_segments
414 self._start_segment = start
415 self.log("got start segment: %d" % self._start_segment)
417 self._start_segment = 0
420 # If self._read_length is None, then we want to read the whole
421 # file. Otherwise, we want to read only part of the file, and
422 # need to figure out where to stop reading.
423 if self._read_length is not None:
424 # our end segment is the last segment containing part of the
425 # segment that we were asked to read.
426 self.log("got read length %d" % self._read_length)
427 if self._read_length != 0:
428 end_data = self._offset + self._read_length
430 # We don't actually need to read the byte at end_data,
431 # but the one before it.
432 end = (end_data - 1) // self._segment_size
434 assert end < self._num_segments
435 self._last_segment = end
437 self._last_segment = self._start_segment
438 self.log("got end segment: %d" % self._last_segment)
440 self._last_segment = self._num_segments - 1
442 self._current_segment = self._start_segment
444 def _activate_enough_servers(self):
446 I populate self._active_readers with enough active readers to
447 retrieve the contents of this mutable file. I am called before
448 downloading starts, and (eventually) after each validation
449 error, connection error, or other problem in the download.
451 # TODO: It would be cool to investigate other heuristics for
452 # reader selection. For instance, the cost (in time the user
453 # spends waiting for their file) of selecting a really slow server
454 # that happens to have a primary share is probably more than
455 # selecting a really fast server that doesn't have a primary
456 # share. Maybe the servermap could be extended to provide this
457 # information; it could keep track of latency information while
458 # it gathers more important data, and then this routine could
459 # use that to select active readers.
461 # (these and other questions would be easier to answer with a
462 # robust, configurable tahoe-lafs simulator, which modeled node
463 # failures, differences in node speed, and other characteristics
464 # that we expect storage servers to have. You could have
465 # presets for really stable grids (like allmydata.com),
466 # friendnets, make it easy to configure your own settings, and
467 # then simulate the effect of big changes on these use cases
468 # instead of just reasoning about what the effect might be. Out
469 # of scope for MDMF, though.)
471 # XXX: Why don't format= log messages work here?
473 known_shnums = set(self.remaining_sharemap.keys())
474 used_shnums = set([r.shnum for r in self._active_readers])
475 unused_shnums = known_shnums - used_shnums
478 new_shnums = unused_shnums # use them all
479 elif len(self._active_readers) < self._required_shares:
481 more = self._required_shares - len(self._active_readers)
482 # We favor lower numbered shares, since FEC is faster with
483 # primary shares than with other shares, and lower-numbered
484 # shares are more likely to be primary than higher numbered
486 new_shnums = sorted(unused_shnums)[:more]
487 if len(new_shnums) < more:
488 # We don't have enough readers to retrieve the file; fail.
489 self._raise_notenoughshareserror()
493 self.log("adding %d new servers to the active list" % len(new_shnums))
494 for shnum in new_shnums:
495 reader = self.readers[shnum]
496 self._active_readers.append(reader)
497 self.log("added reader for share %d" % shnum)
498 # Each time we add a reader, we check to see if we need the
499 # private key. If we do, we politely ask for it and then continue
500 # computing. If we find that we haven't gotten it at the end of
501 # segment decoding, then we'll take more drastic measures.
502 if self._need_privkey and not self._node.is_readonly():
503 d = reader.get_encprivkey()
504 d.addCallback(self._try_to_validate_privkey, reader, reader.server)
505 # XXX: don't just drop the Deferred. We need error-reporting
506 # but not flow-control here.
507 assert len(self._active_readers) >= self._required_shares
509 def _try_to_validate_prefix(self, prefix, reader):
511 I check that the prefix returned by a candidate server for
512 retrieval matches the prefix that the servermap knows about
513 (and, hence, the prefix that was validated earlier). If it does,
514 I return True, which means that I approve of the use of the
515 candidate server for segment retrieval. If it doesn't, I return
516 False, which means that another server must be chosen.
526 offsets_tuple) = self.verinfo
527 if known_prefix != prefix:
528 self.log("prefix from share %d doesn't match" % reader.shnum)
529 raise UncoordinatedWriteError("Mismatched prefix -- this could "
530 "indicate an uncoordinated write")
531 # Otherwise, we're okay -- no issues.
534 def _remove_reader(self, reader):
536 At various points, we will wish to remove a server from
537 consideration and/or use. These include, but are not necessarily
540 - A connection error.
541 - A mismatched prefix (that is, a prefix that does not match
542 our conception of the version information string).
543 - A failing block hash, salt hash, or share hash, which can
544 indicate disk failure/bit flips, or network trouble.
546 This method will do that. I will make sure that the
547 (shnum,reader) combination represented by my reader argument is
548 not used for anything else during this download. I will not
549 advise the reader of any corruption, something that my callers
550 may wish to do on their own.
552 # TODO: When you're done writing this, see if this is ever
553 # actually used for something that _mark_bad_share isn't. I have
554 # a feeling that they will be used for very similar things, and
555 # that having them both here is just going to be an epic amount
556 # of code duplication.
558 # (well, okay, not epic, but meaningful)
559 self.log("removing reader %s" % reader)
560 # Remove the reader from _active_readers
561 self._active_readers.remove(reader)
562 # TODO: self.readers.remove(reader)?
563 for shnum in list(self.remaining_sharemap.keys()):
564 self.remaining_sharemap.discard(shnum, reader.server)
567 def _mark_bad_share(self, server, shnum, reader, f):
569 I mark the given (server, shnum) as a bad share, which means that it
570 will not be used anywhere else.
572 There are several reasons to want to mark something as a bad
573 share. These include:
575 - A connection error to the server.
576 - A mismatched prefix (that is, a prefix that does not match
577 our local conception of the version information string).
578 - A failing block hash, salt hash, share hash, or other
581 This method will ensure that readers that we wish to mark bad
582 (for these reasons or other reasons) are not used for the rest
583 of the download. Additionally, it will attempt to tell the
584 remote server (with no guarantee of success) that its share is
587 self.log("marking share %d on server %s as bad" % \
588 (shnum, server.get_name()))
589 prefix = self.verinfo[-2]
590 self.servermap.mark_bad_share(server, shnum, prefix)
591 self._remove_reader(reader)
592 self._bad_shares.add((server, shnum, f))
593 self._status.add_problem(server, f)
594 self._last_failure = f
595 self.notify_server_corruption(server, shnum, str(f.value))
598 def _download_current_segment(self):
600 I download, validate, decode, decrypt, and assemble the segment
601 that this Retrieve is currently responsible for downloading.
603 assert len(self._active_readers) >= self._required_shares
604 if self._current_segment > self._last_segment:
605 # No more segments to download, we're done.
606 self.log("got plaintext, done")
608 self.log("on segment %d of %d" %
609 (self._current_segment + 1, self._num_segments))
610 d = self._process_segment(self._current_segment)
611 d.addCallback(lambda ign: self.loop())
614 def _process_segment(self, segnum):
616 I download, validate, decode, and decrypt one segment of the
617 file that this Retrieve is retrieving. This means coordinating
618 the process of getting k blocks of that file, validating them,
619 assembling them into one segment with the decoder, and then
622 self.log("processing segment %d" % segnum)
624 # TODO: The old code uses a marker. Should this code do that
625 # too? What did the Marker do?
626 assert len(self._active_readers) >= self._required_shares
628 # We need to ask each of our active readers for its block and
629 # salt. We will then validate those. If validation is
630 # successful, we will assemble the results into plaintext.
632 for reader in self._active_readers:
633 started = time.time()
634 d = reader.get_block_and_salt(segnum)
635 d2 = self._get_needed_hashes(reader, segnum)
636 dl = defer.DeferredList([d, d2], consumeErrors=True)
637 dl.addCallback(self._validate_block, segnum, reader, reader.server, started)
638 dl.addErrback(self._validation_or_decoding_failed, [reader])
640 dl = defer.DeferredList(ds)
642 dl.addCallback(lambda ignored: "")
643 dl.addCallback(self._set_segment)
645 dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
649 def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
651 I take the results of fetching and validating the blocks from a
652 callback chain in another method. If the results are such that
653 they tell me that validation and fetching succeeded without
654 incident, I will proceed with decoding and decryption.
655 Otherwise, I will do nothing.
657 self.log("trying to decode and decrypt segment %d" % segnum)
659 for block_and_salt in blocks_and_salts:
660 if not block_and_salt[0] or block_and_salt[1] == None:
661 self.log("some validation operations failed; not proceeding")
665 self.log("everything looks ok, building segment %d" % segnum)
666 d = self._decode_blocks(blocks_and_salts, segnum)
667 d.addCallback(self._decrypt_segment)
668 d.addErrback(self._validation_or_decoding_failed,
669 self._active_readers)
670 # check to see whether we've been paused before writing
672 d.addCallback(self._check_for_paused)
673 d.addCallback(self._check_for_stopped)
674 d.addCallback(self._set_segment)
677 return defer.succeed(None)
680 def _set_segment(self, segment):
682 Given a plaintext segment, I register that segment with the
683 target that is handling the file download.
685 self.log("got plaintext for segment %d" % self._current_segment)
686 if self._current_segment == self._start_segment:
687 # We're on the first segment. It's possible that we want
688 # only some part of the end of this segment, and that we
689 # just downloaded the whole thing to get that part. If so,
690 # we need to account for that and give the reader just the
691 # data that they want.
692 n = self._offset % self._segment_size
693 self.log("stripping %d bytes off of the first segment" % n)
694 self.log("original segment length: %d" % len(segment))
695 segment = segment[n:]
696 self.log("new segment length: %d" % len(segment))
698 if self._current_segment == self._last_segment and self._read_length is not None:
699 # We're on the last segment. It's possible that we only want
700 # part of the beginning of this segment, and that we
701 # downloaded the whole thing anyway. Make sure to give the
702 # caller only the portion of the segment that they want to
704 extra = self._read_length
705 if self._start_segment != self._last_segment:
706 extra -= self._segment_size - \
707 (self._offset % self._segment_size)
708 extra %= self._segment_size
709 self.log("original segment length: %d" % len(segment))
710 segment = segment[:extra]
711 self.log("new segment length: %d" % len(segment))
712 self.log("only taking %d bytes of the last segment" % extra)
715 self._consumer.write(segment)
717 # we don't care about the plaintext if we are doing a verify.
719 self._current_segment += 1
722 def _validation_or_decoding_failed(self, f, readers):
724 I am called when a block or a salt fails to correctly validate, or when
725 the decryption or decoding operation fails for some reason. I react to
726 this failure by notifying the remote server of corruption, and then
727 removing the remote server from further activity.
729 assert isinstance(readers, list)
730 bad_shnums = [reader.shnum for reader in readers]
732 self.log("validation or decoding failed on share(s) %s, server(s) %s "
733 ", segment %d: %s" % \
734 (bad_shnums, readers, self._current_segment, str(f)))
735 for reader in readers:
736 self._mark_bad_share(reader.server, reader.shnum, reader, f)
740 def _validate_block(self, results, segnum, reader, server, started):
742 I validate a block from one share on a remote server.
744 # Grab the part of the block hash tree that is necessary to
745 # validate this block, then generate the block hash root.
746 self.log("validating share %d for segment %d" % (reader.shnum,
748 elapsed = time.time() - started
749 self._status.add_fetch_timing(server, elapsed)
750 self._set_current_status("validating blocks")
751 # Did we fail to fetch either of the things that we were
752 # supposed to? Fail if so.
753 if not results[0][0] and results[1][0]:
754 # handled by the errback handler.
756 # These all get batched into one query, so the resulting
757 # failure should be the same for all of them, so we can just
759 assert isinstance(results[0][1], failure.Failure)
762 raise CorruptShareError(server,
764 "Connection error: %s" % str(f))
766 block_and_salt, block_and_sharehashes = results
767 block, salt = block_and_salt[1]
768 blockhashes, sharehashes = block_and_sharehashes[1]
770 blockhashes = dict(enumerate(blockhashes[1]))
771 self.log("the reader gave me the following blockhashes: %s" % \
773 self.log("the reader gave me the following sharehashes: %s" % \
774 sharehashes[1].keys())
775 bht = self._block_hash_trees[reader.shnum]
777 if bht.needed_hashes(segnum, include_leaf=True):
779 bht.set_hashes(blockhashes)
780 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
782 raise CorruptShareError(server,
784 "block hash tree failure: %s" % e)
786 if self._version == MDMF_VERSION:
787 blockhash = hashutil.block_hash(salt + block)
789 blockhash = hashutil.block_hash(block)
790 # If this works without an error, then validation is
793 bht.set_hashes(leaves={segnum: blockhash})
794 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
796 raise CorruptShareError(server,
798 "block hash tree failure: %s" % e)
800 # Reaching this point means that we know that this segment
801 # is correct. Now we need to check to see whether the share
802 # hash chain is also correct.
803 # SDMF wrote share hash chains that didn't contain the
804 # leaves, which would be produced from the block hash tree.
805 # So we need to validate the block hash tree first. If
806 # successful, then bht[0] will contain the root for the
807 # shnum, which will be a leaf in the share hash tree, which
808 # will allow us to validate the rest of the tree.
809 if self.share_hash_tree.needed_hashes(reader.shnum,
810 include_leaf=True) or \
813 self.share_hash_tree.set_hashes(hashes=sharehashes[1],
814 leaves={reader.shnum: bht[0]})
815 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
817 raise CorruptShareError(server,
819 "corrupt hashes: %s" % e)
821 self.log('share %d is valid for segment %d' % (reader.shnum,
823 return {reader.shnum: (block, salt)}
826 def _get_needed_hashes(self, reader, segnum):
828 I get the hashes needed to validate segnum from the reader, then return
829 to my caller when this is done.
831 bht = self._block_hash_trees[reader.shnum]
832 needed = bht.needed_hashes(segnum, include_leaf=True)
833 # The root of the block hash tree is also a leaf in the share
834 # hash tree. So we don't need to fetch it from the remote
835 # server. In the case of files with one segment, this means that
836 # we won't fetch any block hash tree from the remote server,
837 # since the hash of each share of the file is the entire block
838 # hash tree, and is a leaf in the share hash tree. This is fine,
839 # since any share corruption will be detected in the share hash
842 self.log("getting blockhashes for segment %d, share %d: %s" % \
843 (segnum, reader.shnum, str(needed)))
844 d1 = reader.get_blockhashes(needed, force_remote=True)
845 if self.share_hash_tree.needed_hashes(reader.shnum):
846 need = self.share_hash_tree.needed_hashes(reader.shnum)
847 self.log("also need sharehashes for share %d: %s" % (reader.shnum,
849 d2 = reader.get_sharehashes(need, force_remote=True)
851 d2 = defer.succeed({}) # the logic in the next method
853 dl = defer.DeferredList([d1, d2], consumeErrors=True)
857 def _decode_blocks(self, blocks_and_salts, segnum):
859 I take a list of k blocks and salts, and decode that into a
860 single encrypted segment.
863 # We want to merge our dictionaries to the form
864 # {shnum: blocks_and_salts}
866 # The dictionaries come from validate block that way, so we just
867 # need to merge them.
868 for block_and_salt in blocks_and_salts:
869 d.update(block_and_salt[1])
871 # All of these blocks should have the same salt; in SDMF, it is
872 # the file-wide IV, while in MDMF it is the per-segment salt. In
873 # either case, we just need to get one of them and use it.
875 # d.items()[0] is like (shnum, (block, salt))
876 # d.items()[0][1] is like (block, salt)
877 # d.items()[0][1][1] is the salt.
878 salt = d.items()[0][1][1]
879 # Next, extract just the blocks from the dict. We'll use the
880 # salt in the next step.
881 share_and_shareids = [(k, v[0]) for k, v in d.items()]
882 d2 = dict(share_and_shareids)
885 for shareid, share in d2.items():
886 shareids.append(shareid)
889 self._set_current_status("decoding")
890 started = time.time()
891 assert len(shareids) >= self._required_shares, len(shareids)
892 # zfec really doesn't want extra shares
893 shareids = shareids[:self._required_shares]
894 shares = shares[:self._required_shares]
895 self.log("decoding segment %d" % segnum)
896 if segnum == self._num_segments - 1:
897 d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
899 d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
900 def _process(buffers):
901 segment = "".join(buffers)
902 self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
904 numsegs=self._num_segments,
906 self.log(" joined length %d, datalength %d" %
907 (len(segment), self._data_length))
908 if segnum == self._num_segments - 1:
909 size_to_use = self._tail_data_size
911 size_to_use = self._segment_size
912 segment = segment[:size_to_use]
913 self.log(" segment len=%d" % len(segment))
914 self._status.accumulate_decode_time(time.time() - started)
916 d.addCallback(_process)
920 def _decrypt_segment(self, segment_and_salt):
922 I take a single segment and its salt, and decrypt it. I return
923 the plaintext of the segment that is in my argument.
925 segment, salt = segment_and_salt
926 self._set_current_status("decrypting")
927 self.log("decrypting segment %d" % self._current_segment)
928 started = time.time()
929 key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
931 plaintext = decryptor.process(segment)
932 self._status.accumulate_decrypt_time(time.time() - started)
936 def notify_server_corruption(self, server, shnum, reason):
937 rref = server.get_rref()
938 rref.callRemoteOnly("advise_corrupt_share",
939 "mutable", self._storage_index, shnum, reason)
942 def _try_to_validate_privkey(self, enc_privkey, reader, server):
943 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
944 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
945 if alleged_writekey != self._node.get_writekey():
946 self.log("invalid privkey from %s shnum %d" %
947 (reader, reader.shnum),
948 level=log.WEIRD, umid="YIw4tA")
950 self.servermap.mark_bad_share(server, reader.shnum,
952 e = CorruptShareError(server,
955 f = failure.Failure(e)
956 self._bad_shares.add((server, reader.shnum, f))
960 self.log("got valid privkey from shnum %d on reader %s" %
961 (reader.shnum, reader))
962 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
963 self._node._populate_encprivkey(enc_privkey)
964 self._node._populate_privkey(privkey)
965 self._need_privkey = False
971 I am called by _download_current_segment when the download process
972 has finished successfully. After making some useful logging
973 statements, I return the decrypted contents to the owner of this
974 Retrieve object through self._done_deferred.
976 self._running = False
977 self._status.set_active(False)
979 self._status.timings['total'] = now - self._started
980 self._status.timings['fetch'] = now - self._started_fetching
981 self._status.set_status("Finished")
982 self._status.set_progress(1.0)
984 # remember the encoding parameters, use them again next time
985 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
986 offsets_tuple) = self.verinfo
987 self._node._populate_required_shares(k)
988 self._node._populate_total_shares(N)
991 ret = self._bad_shares
992 self.log("done verifying, found %d bad shares" % len(ret))
994 # TODO: upload status here?
996 self._consumer.unregisterProducer()
997 eventually(self._done_deferred.callback, ret)
1000 def _raise_notenoughshareserror(self):
1002 I am called by _activate_enough_servers when there are not enough
1003 active servers left to complete the download. After making some
1004 useful logging statements, I throw an exception to that effect
1005 to the caller of this Retrieve object through
1006 self._done_deferred.
1009 format = ("ran out of servers: "
1010 "have %(have)d of %(total)d segments "
1011 "found %(bad)d bad shares "
1012 "encoding %(k)d-of-%(n)d")
1013 args = {"have": self._current_segment,
1014 "total": self._num_segments,
1015 "need": self._last_segment,
1016 "k": self._required_shares,
1017 "n": self._total_shares,
1018 "bad": len(self._bad_shares)}
1019 raise NotEnoughSharesError("%s, last failure: %s" %
1020 (format % args, str(self._last_failure)))
1022 def _error(self, f):
1023 # all errors, including NotEnoughSharesError, land here
1024 self._running = False
1025 self._status.set_active(False)
1027 self._status.timings['total'] = now - self._started
1028 self._status.timings['fetch'] = now - self._started_fetching
1029 self._status.set_status("Failed")
1030 eventually(self._done_deferred.errback, f)