3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
10 MDMF_VERSION, SDMF_VERSION
11 from allmydata.util import hashutil, log, mathutil
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from pycryptopp.publickey import rsa
18 from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
19 from allmydata.mutable.layout import MDMFSlotReadProxy
22 implements(IRetrieveStatus)
23 statusid_counter = count(0)
26 self.timings["fetch_per_server"] = {}
27 self.timings["decode"] = 0.0
28 self.timings["decrypt"] = 0.0
29 self.timings["cumulative_verify"] = 0.0
32 self.storage_index = None
34 self.encoding = ("?","?")
36 self.status = "Not started"
38 self.counter = self.statusid_counter.next()
39 self.started = time.time()
41 def get_started(self):
43 def get_storage_index(self):
44 return self.storage_index
45 def get_encoding(self):
47 def using_helper(self):
53 def get_progress(self):
57 def get_counter(self):
60 def add_fetch_timing(self, peerid, elapsed):
61 if peerid not in self.timings["fetch_per_server"]:
62 self.timings["fetch_per_server"][peerid] = []
63 self.timings["fetch_per_server"][peerid].append(elapsed)
64 def accumulate_decode_time(self, elapsed):
65 self.timings["decode"] += elapsed
66 def accumulate_decrypt_time(self, elapsed):
67 self.timings["decrypt"] += elapsed
68 def set_storage_index(self, si):
69 self.storage_index = si
70 def set_helper(self, helper):
72 def set_encoding(self, k, n):
73 self.encoding = (k, n)
74 def set_size(self, size):
76 def set_status(self, status):
78 def set_progress(self, value):
80 def set_active(self, value):
87 # this class is currently single-use. Eventually (in MDMF) we will make
88 # it multi-use, in which case you can call download(range) multiple
89 # times, and each will have a separate response chain. However the
90 # Retrieve object will remain tied to a specific version of the file, and
91 # will use a single ServerMap instance.
92 implements(IPushProducer)
94 def __init__(self, filenode, servermap, verinfo, fetch_privkey=False,
97 assert self._node.get_pubkey()
98 self._storage_index = filenode.get_storage_index()
99 assert self._node.get_readkey()
100 self._last_failure = None
101 prefix = si_b2a(self._storage_index)[:5]
102 self._log_number = log.msg("Retrieve(%s): starting" % prefix)
103 self._outstanding_queries = {} # maps (peerid,shnum) to start_time
105 self._decoding = False
106 self._bad_shares = set()
108 self.servermap = servermap
109 assert self._node.get_pubkey()
110 self.verinfo = verinfo
111 # during repair, we may be called upon to grab the private key, since
112 # it wasn't picked up during a verify=False checker run, and we'll
113 # need it for repair to generate a new version.
114 self._need_privkey = verify or (fetch_privkey
115 and not self._node.get_privkey())
117 if self._need_privkey:
118 # TODO: Evaluate the need for this. We'll use it if we want
119 # to limit how many queries are on the wire for the privkey
121 self._privkey_query_markers = [] # one Marker for each time we've
122 # tried to get the privkey.
124 # verify means that we are using the downloader logic to verify all
125 # of our shares. This tells the downloader a few things.
127 # 1. We need to download all of the shares.
128 # 2. We don't need to decode or decrypt the shares, since our
129 # caller doesn't care about the plaintext, only the
130 # information about which shares are or are not valid.
131 # 3. When we are validating readers, we need to validate the
132 # signature on the prefix. Do we? We already do this in the
134 self._verify = verify
136 self._status = RetrieveStatus()
137 self._status.set_storage_index(self._storage_index)
138 self._status.set_helper(False)
139 self._status.set_progress(0.0)
140 self._status.set_active(True)
141 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
142 offsets_tuple) = self.verinfo
143 self._status.set_size(datalength)
144 self._status.set_encoding(k, N)
146 self._pause_deferred = None
148 self._read_length = None
149 self.log("got seqnum %d" % self.verinfo[0])
152 def get_status(self):
155 def log(self, *args, **kwargs):
156 if "parent" not in kwargs:
157 kwargs["parent"] = self._log_number
158 if "facility" not in kwargs:
159 kwargs["facility"] = "tahoe.mutable.retrieve"
160 return log.msg(*args, **kwargs)
162 def _set_current_status(self, state):
163 seg = "%d/%d" % (self._current_segment, self._last_segment)
164 self._status.set_status("segment %s (%s)" % (seg, state))
169 def pauseProducing(self):
171 I am called by my download target if we have produced too much
172 data for it to handle. I make the downloader stop producing new
173 data until my resumeProducing method is called.
175 if self._pause_deferred is not None:
178 # fired when the download is unpaused.
179 self._old_status = self._status.get_status()
180 self._set_current_status("paused")
182 self._pause_deferred = defer.Deferred()
185 def resumeProducing(self):
187 I am called by my download target once it is ready to begin
188 receiving data again.
190 if self._pause_deferred is None:
193 p = self._pause_deferred
194 self._pause_deferred = None
195 self._status.set_status(self._old_status)
197 eventually(p.callback, None)
200 def _check_for_paused(self, res):
202 I am called just before a write to the consumer. I return a
203 Deferred that eventually fires with the data that is to be
204 written to the consumer. If the download has not been paused,
205 the Deferred fires immediately. Otherwise, the Deferred fires
206 when the downloader is unpaused.
208 if self._pause_deferred is not None:
210 self._pause_deferred.addCallback(lambda ignored: d.callback(res))
212 return defer.succeed(res)
215 def download(self, consumer=None, offset=0, size=None):
216 assert IConsumer.providedBy(consumer) or self._verify
219 self._consumer = consumer
220 # we provide IPushProducer, so streaming=True, per
222 self._consumer.registerProducer(self, streaming=True)
224 self._done_deferred = defer.Deferred()
225 self._offset = offset
226 self._read_length = size
227 self._setup_download()
228 self._setup_encoding_parameters()
229 self.log("starting download")
230 self._started_fetching = time.time()
231 d = self._add_active_peers()
233 # The download process beyond this is a state machine.
234 # _add_active_peers will select the peers that we want to use
235 # for the download, and then attempt to start downloading. After
236 # each segment, it will check for doneness, reacting to broken
237 # peers and corrupt shares as necessary. If it runs out of good
238 # peers before downloading all of the segments, _done_deferred
239 # will errback. Otherwise, it will eventually callback with the
240 # contents of the mutable file.
241 return self._done_deferred
243 def _setup_download(self):
244 self._started = time.time()
245 self._status.set_status("Retrieving Shares")
247 # how many shares do we need?
256 offsets_tuple) = self.verinfo
258 # first, which servers can we use?
259 versionmap = self.servermap.make_versionmap()
260 shares = versionmap[self.verinfo]
261 # this sharemap is consumed as we decide to send requests
262 self.remaining_sharemap = DictOfSets()
263 for (shnum, peerid, timestamp) in shares:
264 self.remaining_sharemap.add(shnum, peerid)
265 # If the servermap update fetched anything, it fetched at least 1
266 # KiB, so we ask for that much.
267 # TODO: Change the cache methods to allow us to fetch all of the
268 # data that they have, then change this method to do that.
269 any_cache = self._node._read_from_cache(self.verinfo, shnum,
271 ss = self.servermap.connections[peerid]
272 reader = MDMFSlotReadProxy(ss,
276 reader.peerid = peerid
277 self.readers[shnum] = reader
278 assert len(self.remaining_sharemap) >= k
280 self.shares = {} # maps shnum to validated blocks
281 self._active_readers = [] # list of active readers for this dl.
282 self._validated_readers = set() # set of readers that we have
283 # validated the prefix of
284 self._block_hash_trees = {} # shnum => hashtree
286 # We need one share hash tree for the entire file; its leaves
287 # are the roots of the block hash trees for the shares that
288 # comprise it, and its root is in the verinfo.
289 self.share_hash_tree = hashtree.IncompleteHashTree(N)
290 self.share_hash_tree.set_hashes({0: root_hash})
292 def decode(self, blocks_and_salts, segnum):
294 I am a helper method that the mutable file update process uses
295 as a shortcut to decode and decrypt the segments that it needs
296 to fetch in order to perform a file update. I take in a
297 collection of blocks and salts, and pick some of those to make a
298 segment with. I return the plaintext associated with that
301 # shnum => block hash tree. Unused, but setup_encoding_parameters will
303 self._block_hash_trees = None
304 self._setup_encoding_parameters()
306 # This is the form expected by decode.
307 blocks_and_salts = blocks_and_salts.items()
308 blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
310 d = self._decode_blocks(blocks_and_salts, segnum)
311 d.addCallback(self._decrypt_segment)
315 def _setup_encoding_parameters(self):
317 I set up the encoding parameters, including k, n, the number
318 of segments associated with this file, and the segment decoders.
328 offsets_tuple) = self.verinfo
329 self._required_shares = k
330 self._total_shares = n
331 self._segment_size = segsize
332 self._data_length = datalength
335 self._version = MDMF_VERSION
337 self._version = SDMF_VERSION
339 if datalength and segsize:
340 self._num_segments = mathutil.div_ceil(datalength, segsize)
341 self._tail_data_size = datalength % segsize
343 self._num_segments = 0
344 self._tail_data_size = 0
346 self._segment_decoder = codec.CRSDecoder()
347 self._segment_decoder.set_params(segsize, k, n)
349 if not self._tail_data_size:
350 self._tail_data_size = segsize
352 self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
353 self._required_shares)
354 if self._tail_segment_size == self._segment_size:
355 self._tail_decoder = self._segment_decoder
357 self._tail_decoder = codec.CRSDecoder()
358 self._tail_decoder.set_params(self._tail_segment_size,
359 self._required_shares,
362 self.log("got encoding parameters: "
365 "%d segments of %d bytes each (%d byte tail segment)" % \
366 (k, n, self._num_segments, self._segment_size,
367 self._tail_segment_size))
369 if self._block_hash_trees is not None:
370 for i in xrange(self._total_shares):
371 # So we don't have to do this later.
372 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
374 # Our last task is to tell the downloader where to start and
375 # where to stop. We use three parameters for that:
376 # - self._start_segment: the segment that we need to start
378 # - self._current_segment: the next segment that we need to
380 # - self._last_segment: The last segment that we were asked to
383 # We say that the download is complete when
384 # self._current_segment > self._last_segment. We use
385 # self._start_segment and self._last_segment to know when to
386 # strip things off of segments, and how much to strip.
388 self.log("got offset: %d" % self._offset)
389 # our start segment is the first segment containing the
390 # offset we were given.
391 start = self._offset // self._segment_size
393 assert start < self._num_segments
394 self._start_segment = start
395 self.log("got start segment: %d" % self._start_segment)
397 self._start_segment = 0
400 # If self._read_length is None, then we want to read the whole
401 # file. Otherwise, we want to read only part of the file, and
402 # need to figure out where to stop reading.
403 if self._read_length is not None:
404 # our end segment is the last segment containing part of the
405 # segment that we were asked to read.
406 self.log("got read length %d" % self._read_length)
407 if self._read_length != 0:
408 end_data = self._offset + self._read_length
410 # We don't actually need to read the byte at end_data,
411 # but the one before it.
412 end = (end_data - 1) // self._segment_size
414 assert end < self._num_segments
415 self._last_segment = end
417 self._last_segment = self._start_segment
418 self.log("got end segment: %d" % self._last_segment)
420 self._last_segment = self._num_segments - 1
422 self._current_segment = self._start_segment
424 def _add_active_peers(self):
426 I populate self._active_readers with enough active readers to
427 retrieve the contents of this mutable file. I am called before
428 downloading starts, and (eventually) after each validation
429 error, connection error, or other problem in the download.
431 # TODO: It would be cool to investigate other heuristics for
432 # reader selection. For instance, the cost (in time the user
433 # spends waiting for their file) of selecting a really slow peer
434 # that happens to have a primary share is probably more than
435 # selecting a really fast peer that doesn't have a primary
436 # share. Maybe the servermap could be extended to provide this
437 # information; it could keep track of latency information while
438 # it gathers more important data, and then this routine could
439 # use that to select active readers.
441 # (these and other questions would be easier to answer with a
442 # robust, configurable tahoe-lafs simulator, which modeled node
443 # failures, differences in node speed, and other characteristics
444 # that we expect storage servers to have. You could have
445 # presets for really stable grids (like allmydata.com),
446 # friendnets, make it easy to configure your own settings, and
447 # then simulate the effect of big changes on these use cases
448 # instead of just reasoning about what the effect might be. Out
449 # of scope for MDMF, though.)
451 # We need at least self._required_shares readers to download a
454 needed = self._total_shares
456 needed = self._required_shares - len(self._active_readers)
457 # XXX: Why don't format= log messages work here?
458 self.log("adding %d peers to the active peers list" % needed)
460 # We favor lower numbered shares, since FEC is faster with
461 # primary shares than with other shares, and lower-numbered
462 # shares are more likely to be primary than higher numbered
464 active_shnums = set(sorted(self.remaining_sharemap.keys()))
465 # We shouldn't consider adding shares that we already have; this
466 # will cause problems later.
467 active_shnums -= set([reader.shnum for reader in self._active_readers])
468 active_shnums = list(active_shnums)[:needed]
469 if len(active_shnums) < needed and not self._verify:
470 # We don't have enough readers to retrieve the file; fail.
471 return self._failed()
473 for shnum in active_shnums:
474 self._active_readers.append(self.readers[shnum])
475 self.log("added reader for share %d" % shnum)
476 assert len(self._active_readers) >= self._required_shares
477 # Conceptually, this is part of the _add_active_peers step. It
478 # validates the prefixes of newly added readers to make sure
479 # that they match what we are expecting for self.verinfo. If
480 # validation is successful, _validate_active_prefixes will call
481 # _download_current_segment for us. If validation is
482 # unsuccessful, then _validate_prefixes will remove the peer and
483 # call _add_active_peers again, where we will attempt to rectify
484 # the problem by choosing another peer.
485 return self._validate_active_prefixes()
488 def _validate_active_prefixes(self):
490 I check to make sure that the prefixes on the peers that I am
491 currently reading from match the prefix that we want to see, as
492 said in self.verinfo.
494 If I find that all of the active peers have acceptable prefixes,
495 I pass control to _download_current_segment, which will use
496 those peers to do cool things. If I find that some of the active
497 peers have unacceptable prefixes, I will remove them from active
498 peers (and from further consideration) and call
499 _add_active_peers to attempt to rectify the situation. I keep
500 track of which peers I have already validated so that I don't
503 assert self._active_readers, "No more active readers"
505 new_readers = set(self._active_readers) - self._validated_readers
506 self.log('validating %d newly-added active readers' % len(new_readers))
508 for reader in new_readers:
509 self._validated_readers.add(reader)
510 # Each time we validate a reader, we check to see if we need the
511 # private key. If we do, we politely ask for it and then continue
512 # computing. If we find that we haven't gotten it at the end of
513 # segment decoding, then we'll take more drastic measures.
514 if self._need_privkey and not self._node.is_readonly():
515 d = reader.get_encprivkey()
516 d.addCallback(self._try_to_validate_privkey, reader)
517 # XXX: don't just drop the Deferred. We need error-reporting
518 # but not flow-control here.
519 return self._download_current_segment()
522 def _try_to_validate_prefix(self, prefix, reader):
524 I check that the prefix returned by a candidate server for
525 retrieval matches the prefix that the servermap knows about
526 (and, hence, the prefix that was validated earlier). If it does,
527 I return True, which means that I approve of the use of the
528 candidate server for segment retrieval. If it doesn't, I return
529 False, which means that another server must be chosen.
539 offsets_tuple) = self.verinfo
540 if known_prefix != prefix:
541 self.log("prefix from share %d doesn't match" % reader.shnum)
542 raise UncoordinatedWriteError("Mismatched prefix -- this could "
543 "indicate an uncoordinated write")
544 # Otherwise, we're okay -- no issues.
547 def _remove_reader(self, reader):
549 At various points, we will wish to remove a peer from
550 consideration and/or use. These include, but are not necessarily
553 - A connection error.
554 - A mismatched prefix (that is, a prefix that does not match
555 our conception of the version information string).
556 - A failing block hash, salt hash, or share hash, which can
557 indicate disk failure/bit flips, or network trouble.
559 This method will do that. I will make sure that the
560 (shnum,reader) combination represented by my reader argument is
561 not used for anything else during this download. I will not
562 advise the reader of any corruption, something that my callers
563 may wish to do on their own.
565 # TODO: When you're done writing this, see if this is ever
566 # actually used for something that _mark_bad_share isn't. I have
567 # a feeling that they will be used for very similar things, and
568 # that having them both here is just going to be an epic amount
569 # of code duplication.
571 # (well, okay, not epic, but meaningful)
572 self.log("removing reader %s" % reader)
573 # Remove the reader from _active_readers
574 self._active_readers.remove(reader)
575 # TODO: self.readers.remove(reader)?
576 for shnum in list(self.remaining_sharemap.keys()):
577 self.remaining_sharemap.discard(shnum, reader.peerid)
580 def _mark_bad_share(self, reader, f):
582 I mark the (peerid, shnum) encapsulated by my reader argument as
583 a bad share, which means that it will not be used anywhere else.
585 There are several reasons to want to mark something as a bad
586 share. These include:
588 - A connection error to the peer.
589 - A mismatched prefix (that is, a prefix that does not match
590 our local conception of the version information string).
591 - A failing block hash, salt hash, share hash, or other
594 This method will ensure that readers that we wish to mark bad
595 (for these reasons or other reasons) are not used for the rest
596 of the download. Additionally, it will attempt to tell the
597 remote peer (with no guarantee of success) that its share is
600 self.log("marking share %d on server %s as bad" % \
601 (reader.shnum, reader))
602 prefix = self.verinfo[-2]
603 self.servermap.mark_bad_share(reader.peerid,
606 self._remove_reader(reader)
607 self._bad_shares.add((reader.peerid, reader.shnum, f))
608 self._status.problems[reader.peerid] = f
609 self._last_failure = f
610 self.notify_server_corruption(reader.peerid, reader.shnum,
614 def _download_current_segment(self):
616 I download, validate, decode, decrypt, and assemble the segment
617 that this Retrieve is currently responsible for downloading.
619 assert len(self._active_readers) >= self._required_shares
620 if self._current_segment <= self._last_segment:
621 d = self._process_segment(self._current_segment)
623 d = defer.succeed(None)
624 d.addBoth(self._turn_barrier)
625 d.addCallback(self._check_for_done)
629 def _turn_barrier(self, result):
631 I help the download process avoid the recursion limit issues
634 return fireEventually(result)
637 def _process_segment(self, segnum):
639 I download, validate, decode, and decrypt one segment of the
640 file that this Retrieve is retrieving. This means coordinating
641 the process of getting k blocks of that file, validating them,
642 assembling them into one segment with the decoder, and then
645 self.log("processing segment %d" % segnum)
647 # TODO: The old code uses a marker. Should this code do that
648 # too? What did the Marker do?
649 assert len(self._active_readers) >= self._required_shares
651 # We need to ask each of our active readers for its block and
652 # salt. We will then validate those. If validation is
653 # successful, we will assemble the results into plaintext.
655 for reader in self._active_readers:
656 started = time.time()
657 d = reader.get_block_and_salt(segnum)
658 d2 = self._get_needed_hashes(reader, segnum)
659 dl = defer.DeferredList([d, d2], consumeErrors=True)
660 dl.addCallback(self._validate_block, segnum, reader, started)
661 dl.addErrback(self._validation_or_decoding_failed, [reader])
663 dl = defer.DeferredList(ds)
665 dl.addCallback(lambda ignored: "")
666 dl.addCallback(self._set_segment)
668 dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
672 def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
674 I take the results of fetching and validating the blocks from a
675 callback chain in another method. If the results are such that
676 they tell me that validation and fetching succeeded without
677 incident, I will proceed with decoding and decryption.
678 Otherwise, I will do nothing.
680 self.log("trying to decode and decrypt segment %d" % segnum)
682 for block_and_salt in blocks_and_salts:
683 if not block_and_salt[0] or block_and_salt[1] == None:
684 self.log("some validation operations failed; not proceeding")
688 self.log("everything looks ok, building segment %d" % segnum)
689 d = self._decode_blocks(blocks_and_salts, segnum)
690 d.addCallback(self._decrypt_segment)
691 d.addErrback(self._validation_or_decoding_failed,
692 self._active_readers)
693 # check to see whether we've been paused before writing
695 d.addCallback(self._check_for_paused)
696 d.addCallback(self._set_segment)
699 return defer.succeed(None)
702 def _set_segment(self, segment):
704 Given a plaintext segment, I register that segment with the
705 target that is handling the file download.
707 self.log("got plaintext for segment %d" % self._current_segment)
708 if self._current_segment == self._start_segment:
709 # We're on the first segment. It's possible that we want
710 # only some part of the end of this segment, and that we
711 # just downloaded the whole thing to get that part. If so,
712 # we need to account for that and give the reader just the
713 # data that they want.
714 n = self._offset % self._segment_size
715 self.log("stripping %d bytes off of the first segment" % n)
716 self.log("original segment length: %d" % len(segment))
717 segment = segment[n:]
718 self.log("new segment length: %d" % len(segment))
720 if self._current_segment == self._last_segment and self._read_length is not None:
721 # We're on the last segment. It's possible that we only want
722 # part of the beginning of this segment, and that we
723 # downloaded the whole thing anyway. Make sure to give the
724 # caller only the portion of the segment that they want to
726 extra = self._read_length
727 if self._start_segment != self._last_segment:
728 extra -= self._segment_size - \
729 (self._offset % self._segment_size)
730 extra %= self._segment_size
731 self.log("original segment length: %d" % len(segment))
732 segment = segment[:extra]
733 self.log("new segment length: %d" % len(segment))
734 self.log("only taking %d bytes of the last segment" % extra)
737 self._consumer.write(segment)
739 # we don't care about the plaintext if we are doing a verify.
741 self._current_segment += 1
744 def _validation_or_decoding_failed(self, f, readers):
746 I am called when a block or a salt fails to correctly validate, or when
747 the decryption or decoding operation fails for some reason. I react to
748 this failure by notifying the remote server of corruption, and then
749 removing the remote peer from further activity.
751 assert isinstance(readers, list)
752 bad_shnums = [reader.shnum for reader in readers]
754 self.log("validation or decoding failed on share(s) %s, peer(s) %s "
755 ", segment %d: %s" % \
756 (bad_shnums, readers, self._current_segment, str(f)))
757 for reader in readers:
758 self._mark_bad_share(reader, f)
762 def _validate_block(self, results, segnum, reader, started):
764 I validate a block from one share on a remote server.
766 # Grab the part of the block hash tree that is necessary to
767 # validate this block, then generate the block hash root.
768 self.log("validating share %d for segment %d" % (reader.shnum,
770 elapsed = time.time() - started
771 self._status.add_fetch_timing(reader.peerid, elapsed)
772 self._set_current_status("validating blocks")
773 # Did we fail to fetch either of the things that we were
774 # supposed to? Fail if so.
775 if not results[0][0] and results[1][0]:
776 # handled by the errback handler.
778 # These all get batched into one query, so the resulting
779 # failure should be the same for all of them, so we can just
781 assert isinstance(results[0][1], failure.Failure)
784 raise CorruptShareError(reader.peerid,
786 "Connection error: %s" % str(f))
788 block_and_salt, block_and_sharehashes = results
789 block, salt = block_and_salt[1]
790 blockhashes, sharehashes = block_and_sharehashes[1]
792 blockhashes = dict(enumerate(blockhashes[1]))
793 self.log("the reader gave me the following blockhashes: %s" % \
795 self.log("the reader gave me the following sharehashes: %s" % \
796 sharehashes[1].keys())
797 bht = self._block_hash_trees[reader.shnum]
799 if bht.needed_hashes(segnum, include_leaf=True):
801 bht.set_hashes(blockhashes)
802 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
804 raise CorruptShareError(reader.peerid,
806 "block hash tree failure: %s" % e)
808 if self._version == MDMF_VERSION:
809 blockhash = hashutil.block_hash(salt + block)
811 blockhash = hashutil.block_hash(block)
812 # If this works without an error, then validation is
815 bht.set_hashes(leaves={segnum: blockhash})
816 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
818 raise CorruptShareError(reader.peerid,
820 "block hash tree failure: %s" % e)
822 # Reaching this point means that we know that this segment
823 # is correct. Now we need to check to see whether the share
824 # hash chain is also correct.
825 # SDMF wrote share hash chains that didn't contain the
826 # leaves, which would be produced from the block hash tree.
827 # So we need to validate the block hash tree first. If
828 # successful, then bht[0] will contain the root for the
829 # shnum, which will be a leaf in the share hash tree, which
830 # will allow us to validate the rest of the tree.
831 if self.share_hash_tree.needed_hashes(reader.shnum,
832 include_leaf=True) or \
835 self.share_hash_tree.set_hashes(hashes=sharehashes[1],
836 leaves={reader.shnum: bht[0]})
837 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
839 raise CorruptShareError(reader.peerid,
841 "corrupt hashes: %s" % e)
843 self.log('share %d is valid for segment %d' % (reader.shnum,
845 return {reader.shnum: (block, salt)}
848 def _get_needed_hashes(self, reader, segnum):
850 I get the hashes needed to validate segnum from the reader, then return
851 to my caller when this is done.
853 bht = self._block_hash_trees[reader.shnum]
854 needed = bht.needed_hashes(segnum, include_leaf=True)
855 # The root of the block hash tree is also a leaf in the share
856 # hash tree. So we don't need to fetch it from the remote
857 # server. In the case of files with one segment, this means that
858 # we won't fetch any block hash tree from the remote server,
859 # since the hash of each share of the file is the entire block
860 # hash tree, and is a leaf in the share hash tree. This is fine,
861 # since any share corruption will be detected in the share hash
864 self.log("getting blockhashes for segment %d, share %d: %s" % \
865 (segnum, reader.shnum, str(needed)))
866 d1 = reader.get_blockhashes(needed, force_remote=True)
867 if self.share_hash_tree.needed_hashes(reader.shnum):
868 need = self.share_hash_tree.needed_hashes(reader.shnum)
869 self.log("also need sharehashes for share %d: %s" % (reader.shnum,
871 d2 = reader.get_sharehashes(need, force_remote=True)
873 d2 = defer.succeed({}) # the logic in the next method
875 dl = defer.DeferredList([d1, d2], consumeErrors=True)
879 def _decode_blocks(self, blocks_and_salts, segnum):
881 I take a list of k blocks and salts, and decode that into a
882 single encrypted segment.
885 # We want to merge our dictionaries to the form
886 # {shnum: blocks_and_salts}
888 # The dictionaries come from validate block that way, so we just
889 # need to merge them.
890 for block_and_salt in blocks_and_salts:
891 d.update(block_and_salt[1])
893 # All of these blocks should have the same salt; in SDMF, it is
894 # the file-wide IV, while in MDMF it is the per-segment salt. In
895 # either case, we just need to get one of them and use it.
897 # d.items()[0] is like (shnum, (block, salt))
898 # d.items()[0][1] is like (block, salt)
899 # d.items()[0][1][1] is the salt.
900 salt = d.items()[0][1][1]
901 # Next, extract just the blocks from the dict. We'll use the
902 # salt in the next step.
903 share_and_shareids = [(k, v[0]) for k, v in d.items()]
904 d2 = dict(share_and_shareids)
907 for shareid, share in d2.items():
908 shareids.append(shareid)
911 self._set_current_status("decoding")
912 started = time.time()
913 assert len(shareids) >= self._required_shares, len(shareids)
914 # zfec really doesn't want extra shares
915 shareids = shareids[:self._required_shares]
916 shares = shares[:self._required_shares]
917 self.log("decoding segment %d" % segnum)
918 if segnum == self._num_segments - 1:
919 d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
921 d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
922 def _process(buffers):
923 segment = "".join(buffers)
924 self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
926 numsegs=self._num_segments,
928 self.log(" joined length %d, datalength %d" %
929 (len(segment), self._data_length))
930 if segnum == self._num_segments - 1:
931 size_to_use = self._tail_data_size
933 size_to_use = self._segment_size
934 segment = segment[:size_to_use]
935 self.log(" segment len=%d" % len(segment))
936 self._status.accumulate_decode_time(time.time() - started)
938 d.addCallback(_process)
942 def _decrypt_segment(self, segment_and_salt):
944 I take a single segment and its salt, and decrypt it. I return
945 the plaintext of the segment that is in my argument.
947 segment, salt = segment_and_salt
948 self._set_current_status("decrypting")
949 self.log("decrypting segment %d" % self._current_segment)
950 started = time.time()
951 key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
953 plaintext = decryptor.process(segment)
954 self._status.accumulate_decrypt_time(time.time() - started)
958 def notify_server_corruption(self, peerid, shnum, reason):
959 ss = self.servermap.connections[peerid]
960 ss.callRemoteOnly("advise_corrupt_share",
961 "mutable", self._storage_index, shnum, reason)
964 def _try_to_validate_privkey(self, enc_privkey, reader):
965 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
966 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
967 if alleged_writekey != self._node.get_writekey():
968 self.log("invalid privkey from %s shnum %d" %
969 (reader, reader.shnum),
970 level=log.WEIRD, umid="YIw4tA")
972 self.servermap.mark_bad_share(reader.peerid, reader.shnum,
974 e = CorruptShareError(reader.peerid,
977 f = failure.Failure(e)
978 self._bad_shares.add((reader.peerid, reader.shnum, f))
982 self.log("got valid privkey from shnum %d on reader %s" %
983 (reader.shnum, reader))
984 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
985 self._node._populate_encprivkey(enc_privkey)
986 self._node._populate_privkey(privkey)
987 self._need_privkey = False
990 def _check_for_done(self, res):
992 I check to see if this Retrieve object has successfully finished
995 I can exit in the following ways:
996 - If there are no more segments to download, then I exit by
997 causing self._done_deferred to fire with the plaintext
998 content requested by the caller.
999 - If there are still segments to be downloaded, and there
1000 are enough active readers (readers which have not broken
1001 and have not given us corrupt data) to continue
1002 downloading, I send control back to
1003 _download_current_segment.
1004 - If there are still segments to be downloaded but there are
1005 not enough active peers to download them, I ask
1006 _add_active_peers to add more peers. If it is successful,
1007 it will call _download_current_segment. If there are not
1008 enough peers to retrieve the file, then that will cause
1009 _done_deferred to errback.
1011 self.log("checking for doneness")
1012 if self._current_segment > self._last_segment:
1013 # No more segments to download, we're done.
1014 self.log("got plaintext, done")
1017 if len(self._active_readers) >= self._required_shares:
1018 # More segments to download, but we have enough good peers
1019 # in self._active_readers that we can do that without issue,
1020 # so go nab the next segment.
1021 self.log("not done yet: on segment %d of %d" % \
1022 (self._current_segment + 1, self._num_segments))
1023 return self._download_current_segment()
1025 self.log("not done yet: on segment %d of %d, need to add peers" % \
1026 (self._current_segment + 1, self._num_segments))
1027 return self._add_active_peers()
1032 I am called by _check_for_done when the download process has
1033 finished successfully. After making some useful logging
1034 statements, I return the decrypted contents to the owner of this
1035 Retrieve object through self._done_deferred.
1037 self._running = False
1038 self._status.set_active(False)
1040 self._status.timings['total'] = now - self._started
1041 self._status.timings['fetch'] = now - self._started_fetching
1042 self._status.set_status("Finished")
1043 self._status.set_progress(1.0)
1045 # remember the encoding parameters, use them again next time
1046 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1047 offsets_tuple) = self.verinfo
1048 self._node._populate_required_shares(k)
1049 self._node._populate_total_shares(N)
1052 ret = list(self._bad_shares)
1053 self.log("done verifying, found %d bad shares" % len(ret))
1055 # TODO: upload status here?
1056 ret = self._consumer
1057 self._consumer.unregisterProducer()
1058 eventually(self._done_deferred.callback, ret)
1063 I am called by _add_active_peers when there are not enough
1064 active peers left to complete the download. After making some
1065 useful logging statements, I return an exception to that effect
1066 to the caller of this Retrieve object through
1067 self._done_deferred.
1069 self._running = False
1070 self._status.set_active(False)
1072 self._status.timings['total'] = now - self._started
1073 self._status.timings['fetch'] = now - self._started_fetching
1074 self._status.set_status("Failed")
1077 ret = list(self._bad_shares)
1079 format = ("ran out of peers: "
1080 "have %(have)d of %(total)d segments "
1081 "found %(bad)d bad shares "
1082 "encoding %(k)d-of-%(n)d")
1083 args = {"have": self._current_segment,
1084 "total": self._num_segments,
1085 "need": self._last_segment,
1086 "k": self._required_shares,
1087 "n": self._total_shares,
1088 "bad": len(self._bad_shares)}
1089 e = NotEnoughSharesError("%s, last failure: %s" % \
1090 (format % args, str(self._last_failure)))
1091 f = failure.Failure(e)
1093 eventually(self._done_deferred.callback, ret)