3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
10 MDMF_VERSION, SDMF_VERSION
11 from allmydata.util import hashutil, log, mathutil
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from pycryptopp.publickey import rsa
18 from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
19 from allmydata.mutable.layout import MDMFSlotReadProxy
22 implements(IRetrieveStatus)
23 statusid_counter = count(0)
26 self.timings["fetch_per_server"] = {}
27 self.timings["decode"] = 0.0
28 self.timings["decrypt"] = 0.0
29 self.timings["cumulative_verify"] = 0.0
32 self.storage_index = None
34 self.encoding = ("?","?")
36 self.status = "Not started"
38 self.counter = self.statusid_counter.next()
39 self.started = time.time()
41 def get_started(self):
43 def get_storage_index(self):
44 return self.storage_index
45 def get_encoding(self):
47 def using_helper(self):
53 def get_progress(self):
57 def get_counter(self):
60 def add_fetch_timing(self, peerid, elapsed):
61 if peerid not in self.timings["fetch_per_server"]:
62 self.timings["fetch_per_server"][peerid] = []
63 self.timings["fetch_per_server"][peerid].append(elapsed)
64 def accumulate_decode_time(self, elapsed):
65 self.timings["decode"] += elapsed
66 def accumulate_decrypt_time(self, elapsed):
67 self.timings["decrypt"] += elapsed
68 def set_storage_index(self, si):
69 self.storage_index = si
70 def set_helper(self, helper):
72 def set_encoding(self, k, n):
73 self.encoding = (k, n)
74 def set_size(self, size):
76 def set_status(self, status):
78 def set_progress(self, value):
80 def set_active(self, value):
87 # this class is currently single-use. Eventually (in MDMF) we will make
88 # it multi-use, in which case you can call download(range) multiple
89 # times, and each will have a separate response chain. However the
90 # Retrieve object will remain tied to a specific version of the file, and
91 # will use a single ServerMap instance.
92 implements(IPushProducer)
94 def __init__(self, filenode, servermap, verinfo, fetch_privkey=False,
97 assert self._node.get_pubkey()
98 self._storage_index = filenode.get_storage_index()
99 assert self._node.get_readkey()
100 self._last_failure = None
101 prefix = si_b2a(self._storage_index)[:5]
102 self._log_number = log.msg("Retrieve(%s): starting" % prefix)
103 self._outstanding_queries = {} # maps (peerid,shnum) to start_time
105 self._decoding = False
106 self._bad_shares = set()
108 self.servermap = servermap
109 assert self._node.get_pubkey()
110 self.verinfo = verinfo
111 # during repair, we may be called upon to grab the private key, since
112 # it wasn't picked up during a verify=False checker run, and we'll
113 # need it for repair to generate a new version.
114 self._need_privkey = verify or (fetch_privkey
115 and not self._node.get_privkey())
117 if self._need_privkey:
118 # TODO: Evaluate the need for this. We'll use it if we want
119 # to limit how many queries are on the wire for the privkey
121 self._privkey_query_markers = [] # one Marker for each time we've
122 # tried to get the privkey.
124 # verify means that we are using the downloader logic to verify all
125 # of our shares. This tells the downloader a few things.
127 # 1. We need to download all of the shares.
128 # 2. We don't need to decode or decrypt the shares, since our
129 # caller doesn't care about the plaintext, only the
130 # information about which shares are or are not valid.
131 # 3. When we are validating readers, we need to validate the
132 # signature on the prefix. Do we? We already do this in the
134 self._verify = verify
136 self._status = RetrieveStatus()
137 self._status.set_storage_index(self._storage_index)
138 self._status.set_helper(False)
139 self._status.set_progress(0.0)
140 self._status.set_active(True)
141 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
142 offsets_tuple) = self.verinfo
143 self._status.set_size(datalength)
144 self._status.set_encoding(k, N)
146 self._pause_deferred = None
148 self._read_length = None
149 self.log("got seqnum %d" % self.verinfo[0])
152 def get_status(self):
155 def log(self, *args, **kwargs):
156 if "parent" not in kwargs:
157 kwargs["parent"] = self._log_number
158 if "facility" not in kwargs:
159 kwargs["facility"] = "tahoe.mutable.retrieve"
160 return log.msg(*args, **kwargs)
162 def _set_current_status(self, state):
163 seg = "%d/%d" % (self._current_segment, self._last_segment)
164 self._status.set_status("segment %s (%s)" % (seg, state))
169 def pauseProducing(self):
171 I am called by my download target if we have produced too much
172 data for it to handle. I make the downloader stop producing new
173 data until my resumeProducing method is called.
175 if self._pause_deferred is not None:
178 # fired when the download is unpaused.
179 self._old_status = self._status.get_status()
180 self._set_current_status("paused")
182 self._pause_deferred = defer.Deferred()
185 def resumeProducing(self):
187 I am called by my download target once it is ready to begin
188 receiving data again.
190 if self._pause_deferred is None:
193 p = self._pause_deferred
194 self._pause_deferred = None
195 self._status.set_status(self._old_status)
197 eventually(p.callback, None)
200 def _check_for_paused(self, res):
202 I am called just before a write to the consumer. I return a
203 Deferred that eventually fires with the data that is to be
204 written to the consumer. If the download has not been paused,
205 the Deferred fires immediately. Otherwise, the Deferred fires
206 when the downloader is unpaused.
208 if self._pause_deferred is not None:
210 self._pause_deferred.addCallback(lambda ignored: d.callback(res))
212 return defer.succeed(res)
215 def download(self, consumer=None, offset=0, size=None):
216 assert IConsumer.providedBy(consumer) or self._verify
219 self._consumer = consumer
220 # we provide IPushProducer, so streaming=True, per
222 self._consumer.registerProducer(self, streaming=True)
224 self._done_deferred = defer.Deferred()
225 self._started = time.time()
226 self._status.set_status("Retrieving Shares")
228 self._offset = offset
229 self._read_length = size
231 # first, which servers can we use?
232 versionmap = self.servermap.make_versionmap()
233 shares = versionmap[self.verinfo]
234 # this sharemap is consumed as we decide to send requests
235 self.remaining_sharemap = DictOfSets()
236 for (shnum, peerid, timestamp) in shares:
237 self.remaining_sharemap.add(shnum, peerid)
238 # If the servermap update fetched anything, it fetched at least 1
239 # KiB, so we ask for that much.
240 # TODO: Change the cache methods to allow us to fetch all of the
241 # data that they have, then change this method to do that.
242 any_cache = self._node._read_from_cache(self.verinfo, shnum,
244 ss = self.servermap.connections[peerid]
245 reader = MDMFSlotReadProxy(ss,
249 reader.peerid = peerid
250 self.readers[shnum] = reader
253 self.shares = {} # maps shnum to validated blocks
254 self._active_readers = [] # list of active readers for this dl.
255 self._validated_readers = set() # set of readers that we have
256 # validated the prefix of
257 self._block_hash_trees = {} # shnum => hashtree
259 # how many shares do we need?
268 offsets_tuple) = self.verinfo
271 # We need one share hash tree for the entire file; its leaves
272 # are the roots of the block hash trees for the shares that
273 # comprise it, and its root is in the verinfo.
274 self.share_hash_tree = hashtree.IncompleteHashTree(N)
275 self.share_hash_tree.set_hashes({0: root_hash})
277 # This will set up both the segment decoder and the tail segment
278 # decoder, as well as a variety of other instance variables that
279 # the download process will use.
280 self._setup_encoding_parameters()
281 assert len(self.remaining_sharemap) >= k
283 self.log("starting download")
284 self._started_fetching = time.time()
286 self._add_active_peers()
288 # The download process beyond this is a state machine.
289 # _add_active_peers will select the peers that we want to use
290 # for the download, and then attempt to start downloading. After
291 # each segment, it will check for doneness, reacting to broken
292 # peers and corrupt shares as necessary. If it runs out of good
293 # peers before downloading all of the segments, _done_deferred
294 # will errback. Otherwise, it will eventually callback with the
295 # contents of the mutable file.
296 return self._done_deferred
299 def decode(self, blocks_and_salts, segnum):
301 I am a helper method that the mutable file update process uses
302 as a shortcut to decode and decrypt the segments that it needs
303 to fetch in order to perform a file update. I take in a
304 collection of blocks and salts, and pick some of those to make a
305 segment with. I return the plaintext associated with that
308 # shnum => block hash tree. Unused, but setup_encoding_parameters will
310 self._block_hash_trees = None
311 self._setup_encoding_parameters()
313 # This is the form expected by decode.
314 blocks_and_salts = blocks_and_salts.items()
315 blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
317 d = self._decode_blocks(blocks_and_salts, segnum)
318 d.addCallback(self._decrypt_segment)
322 def _setup_encoding_parameters(self):
324 I set up the encoding parameters, including k, n, the number
325 of segments associated with this file, and the segment decoder.
335 offsets_tuple) = self.verinfo
336 self._required_shares = k
337 self._total_shares = n
338 self._segment_size = segsize
339 self._data_length = datalength
342 self._version = MDMF_VERSION
344 self._version = SDMF_VERSION
346 if datalength and segsize:
347 self._num_segments = mathutil.div_ceil(datalength, segsize)
348 self._tail_data_size = datalength % segsize
350 self._num_segments = 0
351 self._tail_data_size = 0
353 self._segment_decoder = codec.CRSDecoder()
354 self._segment_decoder.set_params(segsize, k, n)
356 if not self._tail_data_size:
357 self._tail_data_size = segsize
359 self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
360 self._required_shares)
361 if self._tail_segment_size == self._segment_size:
362 self._tail_decoder = self._segment_decoder
364 self._tail_decoder = codec.CRSDecoder()
365 self._tail_decoder.set_params(self._tail_segment_size,
366 self._required_shares,
369 self.log("got encoding parameters: "
372 "%d segments of %d bytes each (%d byte tail segment)" % \
373 (k, n, self._num_segments, self._segment_size,
374 self._tail_segment_size))
376 if self._block_hash_trees is not None:
377 for i in xrange(self._total_shares):
378 # So we don't have to do this later.
379 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
381 # Our last task is to tell the downloader where to start and
382 # where to stop. We use three parameters for that:
383 # - self._start_segment: the segment that we need to start
385 # - self._current_segment: the next segment that we need to
387 # - self._last_segment: The last segment that we were asked to
390 # We say that the download is complete when
391 # self._current_segment > self._last_segment. We use
392 # self._start_segment and self._last_segment to know when to
393 # strip things off of segments, and how much to strip.
395 self.log("got offset: %d" % self._offset)
396 # our start segment is the first segment containing the
397 # offset we were given.
398 start = self._offset // self._segment_size
400 assert start < self._num_segments
401 self._start_segment = start
402 self.log("got start segment: %d" % self._start_segment)
404 self._start_segment = 0
407 if self._read_length:
408 # our end segment is the last segment containing part of the
409 # segment that we were asked to read.
410 self.log("got read length %d" % self._read_length)
411 end_data = self._offset + self._read_length
413 # We don't actually need to read the byte at end_data, but
415 end = (end_data - 1) // self._segment_size
417 assert end < self._num_segments
418 self._last_segment = end
419 self.log("got end segment: %d" % self._last_segment)
421 self._last_segment = self._num_segments - 1
423 self._current_segment = self._start_segment
425 def _add_active_peers(self):
427 I populate self._active_readers with enough active readers to
428 retrieve the contents of this mutable file. I am called before
429 downloading starts, and (eventually) after each validation
430 error, connection error, or other problem in the download.
432 # TODO: It would be cool to investigate other heuristics for
433 # reader selection. For instance, the cost (in time the user
434 # spends waiting for their file) of selecting a really slow peer
435 # that happens to have a primary share is probably more than
436 # selecting a really fast peer that doesn't have a primary
437 # share. Maybe the servermap could be extended to provide this
438 # information; it could keep track of latency information while
439 # it gathers more important data, and then this routine could
440 # use that to select active readers.
442 # (these and other questions would be easier to answer with a
443 # robust, configurable tahoe-lafs simulator, which modeled node
444 # failures, differences in node speed, and other characteristics
445 # that we expect storage servers to have. You could have
446 # presets for really stable grids (like allmydata.com),
447 # friendnets, make it easy to configure your own settings, and
448 # then simulate the effect of big changes on these use cases
449 # instead of just reasoning about what the effect might be. Out
450 # of scope for MDMF, though.)
452 # We need at least self._required_shares readers to download a
455 needed = self._total_shares
457 needed = self._required_shares - len(self._active_readers)
458 # XXX: Why don't format= log messages work here?
459 self.log("adding %d peers to the active peers list" % needed)
461 # We favor lower numbered shares, since FEC is faster with
462 # primary shares than with other shares, and lower-numbered
463 # shares are more likely to be primary than higher numbered
465 active_shnums = set(sorted(self.remaining_sharemap.keys()))
466 # We shouldn't consider adding shares that we already have; this
467 # will cause problems later.
468 active_shnums -= set([reader.shnum for reader in self._active_readers])
469 active_shnums = list(active_shnums)[:needed]
470 if len(active_shnums) < needed and not self._verify:
471 # We don't have enough readers to retrieve the file; fail.
472 return self._failed()
474 for shnum in active_shnums:
475 self._active_readers.append(self.readers[shnum])
476 self.log("added reader for share %d" % shnum)
477 assert len(self._active_readers) >= self._required_shares
478 # Conceptually, this is part of the _add_active_peers step. It
479 # validates the prefixes of newly added readers to make sure
480 # that they match what we are expecting for self.verinfo. If
481 # validation is successful, _validate_active_prefixes will call
482 # _download_current_segment for us. If validation is
483 # unsuccessful, then _validate_prefixes will remove the peer and
484 # call _add_active_peers again, where we will attempt to rectify
485 # the problem by choosing another peer.
486 return self._validate_active_prefixes()
489 def _validate_active_prefixes(self):
491 I check to make sure that the prefixes on the peers that I am
492 currently reading from match the prefix that we want to see, as
493 said in self.verinfo.
495 If I find that all of the active peers have acceptable prefixes,
496 I pass control to _download_current_segment, which will use
497 those peers to do cool things. If I find that some of the active
498 peers have unacceptable prefixes, I will remove them from active
499 peers (and from further consideration) and call
500 _add_active_peers to attempt to rectify the situation. I keep
501 track of which peers I have already validated so that I don't
504 assert self._active_readers, "No more active readers"
507 new_readers = set(self._active_readers) - self._validated_readers
508 self.log('validating %d newly-added active readers' % len(new_readers))
510 for reader in new_readers:
511 # We force a remote read here -- otherwise, we are relying
512 # on cached data that we already verified as valid, and we
513 # won't detect an uncoordinated write that has occurred
514 # since the last servermap update.
515 d = reader.get_prefix(force_remote=True)
516 d.addCallback(self._try_to_validate_prefix, reader)
518 dl = defer.DeferredList(ds, consumeErrors=True)
519 def _check_results(results):
520 # Each result in results will be of the form (success, msg).
521 # We don't care about msg, but success will tell us whether
522 # or not the checkstring validated. If it didn't, we need to
523 # remove the offending (peer,share) from our active readers,
524 # and ensure that active readers is again populated.
526 for i, result in enumerate(results):
528 reader = self._active_readers[i]
530 assert isinstance(f, failure.Failure)
532 self.log("The reader %s failed to "
533 "properly validate: %s" % \
534 (reader, str(f.value)))
535 bad_readers.append((reader, f))
537 reader = self._active_readers[i]
538 self.log("the reader %s checks out, so we'll use it" % \
540 self._validated_readers.add(reader)
541 # Each time we validate a reader, we check to see if
542 # we need the private key. If we do, we politely ask
543 # for it and then continue computing. If we find
544 # that we haven't gotten it at the end of
545 # segment decoding, then we'll take more drastic
547 if self._need_privkey and not self._node.is_readonly():
548 d = reader.get_encprivkey()
549 d.addCallback(self._try_to_validate_privkey, reader)
551 # We do them all at once, or else we screw up list indexing.
552 for (reader, f) in bad_readers:
553 self._mark_bad_share(reader, f)
555 if len(self._active_readers) >= self._required_shares:
556 return self._download_current_segment()
558 return self._failed()
560 return self._add_active_peers()
562 return self._download_current_segment()
563 # The next step will assert that it has enough active
564 # readers to fetch shares; we just need to remove it.
565 dl.addCallback(_check_results)
569 def _try_to_validate_prefix(self, prefix, reader):
571 I check that the prefix returned by a candidate server for
572 retrieval matches the prefix that the servermap knows about
573 (and, hence, the prefix that was validated earlier). If it does,
574 I return True, which means that I approve of the use of the
575 candidate server for segment retrieval. If it doesn't, I return
576 False, which means that another server must be chosen.
586 offsets_tuple) = self.verinfo
587 if known_prefix != prefix:
588 self.log("prefix from share %d doesn't match" % reader.shnum)
589 raise UncoordinatedWriteError("Mismatched prefix -- this could "
590 "indicate an uncoordinated write")
591 # Otherwise, we're okay -- no issues.
594 def _remove_reader(self, reader):
596 At various points, we will wish to remove a peer from
597 consideration and/or use. These include, but are not necessarily
600 - A connection error.
601 - A mismatched prefix (that is, a prefix that does not match
602 our conception of the version information string).
603 - A failing block hash, salt hash, or share hash, which can
604 indicate disk failure/bit flips, or network trouble.
606 This method will do that. I will make sure that the
607 (shnum,reader) combination represented by my reader argument is
608 not used for anything else during this download. I will not
609 advise the reader of any corruption, something that my callers
610 may wish to do on their own.
612 # TODO: When you're done writing this, see if this is ever
613 # actually used for something that _mark_bad_share isn't. I have
614 # a feeling that they will be used for very similar things, and
615 # that having them both here is just going to be an epic amount
616 # of code duplication.
618 # (well, okay, not epic, but meaningful)
619 self.log("removing reader %s" % reader)
620 # Remove the reader from _active_readers
621 self._active_readers.remove(reader)
622 # TODO: self.readers.remove(reader)?
623 for shnum in list(self.remaining_sharemap.keys()):
624 self.remaining_sharemap.discard(shnum, reader.peerid)
627 def _mark_bad_share(self, reader, f):
629 I mark the (peerid, shnum) encapsulated by my reader argument as
630 a bad share, which means that it will not be used anywhere else.
632 There are several reasons to want to mark something as a bad
633 share. These include:
635 - A connection error to the peer.
636 - A mismatched prefix (that is, a prefix that does not match
637 our local conception of the version information string).
638 - A failing block hash, salt hash, share hash, or other
641 This method will ensure that readers that we wish to mark bad
642 (for these reasons or other reasons) are not used for the rest
643 of the download. Additionally, it will attempt to tell the
644 remote peer (with no guarantee of success) that its share is
647 self.log("marking share %d on server %s as bad" % \
648 (reader.shnum, reader))
649 prefix = self.verinfo[-2]
650 self.servermap.mark_bad_share(reader.peerid,
653 self._remove_reader(reader)
654 self._bad_shares.add((reader.peerid, reader.shnum, f))
655 self._status.problems[reader.peerid] = f
656 self._last_failure = f
657 self.notify_server_corruption(reader.peerid, reader.shnum,
661 def _download_current_segment(self):
663 I download, validate, decode, decrypt, and assemble the segment
664 that this Retrieve is currently responsible for downloading.
666 assert len(self._active_readers) >= self._required_shares
667 if self._current_segment <= self._last_segment:
668 d = self._process_segment(self._current_segment)
670 d = defer.succeed(None)
671 d.addBoth(self._turn_barrier)
672 d.addCallback(self._check_for_done)
676 def _turn_barrier(self, result):
678 I help the download process avoid the recursion limit issues
681 return fireEventually(result)
684 def _process_segment(self, segnum):
686 I download, validate, decode, and decrypt one segment of the
687 file that this Retrieve is retrieving. This means coordinating
688 the process of getting k blocks of that file, validating them,
689 assembling them into one segment with the decoder, and then
692 self.log("processing segment %d" % segnum)
694 # TODO: The old code uses a marker. Should this code do that
695 # too? What did the Marker do?
696 assert len(self._active_readers) >= self._required_shares
698 # We need to ask each of our active readers for its block and
699 # salt. We will then validate those. If validation is
700 # successful, we will assemble the results into plaintext.
702 for reader in self._active_readers:
703 started = time.time()
704 d = reader.get_block_and_salt(segnum, queue=True)
705 d2 = self._get_needed_hashes(reader, segnum)
706 dl = defer.DeferredList([d, d2], consumeErrors=True)
707 dl.addCallback(self._validate_block, segnum, reader, started)
708 dl.addErrback(self._validation_or_decoding_failed, [reader])
711 dl = defer.DeferredList(ds)
713 dl.addCallback(lambda ignored: "")
714 dl.addCallback(self._set_segment)
716 dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
720 def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
722 I take the results of fetching and validating the blocks from a
723 callback chain in another method. If the results are such that
724 they tell me that validation and fetching succeeded without
725 incident, I will proceed with decoding and decryption.
726 Otherwise, I will do nothing.
728 self.log("trying to decode and decrypt segment %d" % segnum)
730 for block_and_salt in blocks_and_salts:
731 if not block_and_salt[0] or block_and_salt[1] == None:
732 self.log("some validation operations failed; not proceeding")
736 self.log("everything looks ok, building segment %d" % segnum)
737 d = self._decode_blocks(blocks_and_salts, segnum)
738 d.addCallback(self._decrypt_segment)
739 d.addErrback(self._validation_or_decoding_failed,
740 self._active_readers)
741 # check to see whether we've been paused before writing
743 d.addCallback(self._check_for_paused)
744 d.addCallback(self._set_segment)
747 return defer.succeed(None)
750 def _set_segment(self, segment):
752 Given a plaintext segment, I register that segment with the
753 target that is handling the file download.
755 self.log("got plaintext for segment %d" % self._current_segment)
756 if self._current_segment == self._start_segment:
757 # We're on the first segment. It's possible that we want
758 # only some part of the end of this segment, and that we
759 # just downloaded the whole thing to get that part. If so,
760 # we need to account for that and give the reader just the
761 # data that they want.
762 n = self._offset % self._segment_size
763 self.log("stripping %d bytes off of the first segment" % n)
764 self.log("original segment length: %d" % len(segment))
765 segment = segment[n:]
766 self.log("new segment length: %d" % len(segment))
768 if self._current_segment == self._last_segment and self._read_length is not None:
769 # We're on the last segment. It's possible that we only want
770 # part of the beginning of this segment, and that we
771 # downloaded the whole thing anyway. Make sure to give the
772 # caller only the portion of the segment that they want to
774 extra = self._read_length
775 if self._start_segment != self._last_segment:
776 extra -= self._segment_size - \
777 (self._offset % self._segment_size)
778 extra %= self._segment_size
779 self.log("original segment length: %d" % len(segment))
780 segment = segment[:extra]
781 self.log("new segment length: %d" % len(segment))
782 self.log("only taking %d bytes of the last segment" % extra)
785 self._consumer.write(segment)
787 # we don't care about the plaintext if we are doing a verify.
789 self._current_segment += 1
792 def _validation_or_decoding_failed(self, f, readers):
794 I am called when a block or a salt fails to correctly validate, or when
795 the decryption or decoding operation fails for some reason. I react to
796 this failure by notifying the remote server of corruption, and then
797 removing the remote peer from further activity.
799 assert isinstance(readers, list)
800 bad_shnums = [reader.shnum for reader in readers]
802 self.log("validation or decoding failed on share(s) %s, peer(s) %s "
803 ", segment %d: %s" % \
804 (bad_shnums, readers, self._current_segment, str(f)))
805 for reader in readers:
806 self._mark_bad_share(reader, f)
810 def _validate_block(self, results, segnum, reader, started):
812 I validate a block from one share on a remote server.
814 # Grab the part of the block hash tree that is necessary to
815 # validate this block, then generate the block hash root.
816 self.log("validating share %d for segment %d" % (reader.shnum,
818 elapsed = time.time() - started
819 self._status.add_fetch_timing(reader.peerid, elapsed)
820 self._set_current_status("validating blocks")
821 # Did we fail to fetch either of the things that we were
822 # supposed to? Fail if so.
823 if not results[0][0] and results[1][0]:
824 # handled by the errback handler.
826 # These all get batched into one query, so the resulting
827 # failure should be the same for all of them, so we can just
829 assert isinstance(results[0][1], failure.Failure)
832 raise CorruptShareError(reader.peerid,
834 "Connection error: %s" % str(f))
836 block_and_salt, block_and_sharehashes = results
837 block, salt = block_and_salt[1]
838 blockhashes, sharehashes = block_and_sharehashes[1]
840 blockhashes = dict(enumerate(blockhashes[1]))
841 self.log("the reader gave me the following blockhashes: %s" % \
843 self.log("the reader gave me the following sharehashes: %s" % \
844 sharehashes[1].keys())
845 bht = self._block_hash_trees[reader.shnum]
847 if bht.needed_hashes(segnum, include_leaf=True):
849 bht.set_hashes(blockhashes)
850 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
852 raise CorruptShareError(reader.peerid,
854 "block hash tree failure: %s" % e)
856 if self._version == MDMF_VERSION:
857 blockhash = hashutil.block_hash(salt + block)
859 blockhash = hashutil.block_hash(block)
860 # If this works without an error, then validation is
863 bht.set_hashes(leaves={segnum: blockhash})
864 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
866 raise CorruptShareError(reader.peerid,
868 "block hash tree failure: %s" % e)
870 # Reaching this point means that we know that this segment
871 # is correct. Now we need to check to see whether the share
872 # hash chain is also correct.
873 # SDMF wrote share hash chains that didn't contain the
874 # leaves, which would be produced from the block hash tree.
875 # So we need to validate the block hash tree first. If
876 # successful, then bht[0] will contain the root for the
877 # shnum, which will be a leaf in the share hash tree, which
878 # will allow us to validate the rest of the tree.
879 if self.share_hash_tree.needed_hashes(reader.shnum,
880 include_leaf=True) or \
883 self.share_hash_tree.set_hashes(hashes=sharehashes[1],
884 leaves={reader.shnum: bht[0]})
885 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
887 raise CorruptShareError(reader.peerid,
889 "corrupt hashes: %s" % e)
891 self.log('share %d is valid for segment %d' % (reader.shnum,
893 return {reader.shnum: (block, salt)}
896 def _get_needed_hashes(self, reader, segnum):
898 I get the hashes needed to validate segnum from the reader, then return
899 to my caller when this is done.
901 bht = self._block_hash_trees[reader.shnum]
902 needed = bht.needed_hashes(segnum, include_leaf=True)
903 # The root of the block hash tree is also a leaf in the share
904 # hash tree. So we don't need to fetch it from the remote
905 # server. In the case of files with one segment, this means that
906 # we won't fetch any block hash tree from the remote server,
907 # since the hash of each share of the file is the entire block
908 # hash tree, and is a leaf in the share hash tree. This is fine,
909 # since any share corruption will be detected in the share hash
912 self.log("getting blockhashes for segment %d, share %d: %s" % \
913 (segnum, reader.shnum, str(needed)))
914 d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
915 if self.share_hash_tree.needed_hashes(reader.shnum):
916 need = self.share_hash_tree.needed_hashes(reader.shnum)
917 self.log("also need sharehashes for share %d: %s" % (reader.shnum,
919 d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
921 d2 = defer.succeed({}) # the logic in the next method
923 dl = defer.DeferredList([d1, d2], consumeErrors=True)
927 def _decode_blocks(self, blocks_and_salts, segnum):
929 I take a list of k blocks and salts, and decode that into a
930 single encrypted segment.
933 # We want to merge our dictionaries to the form
934 # {shnum: blocks_and_salts}
936 # The dictionaries come from validate block that way, so we just
937 # need to merge them.
938 for block_and_salt in blocks_and_salts:
939 d.update(block_and_salt[1])
941 # All of these blocks should have the same salt; in SDMF, it is
942 # the file-wide IV, while in MDMF it is the per-segment salt. In
943 # either case, we just need to get one of them and use it.
945 # d.items()[0] is like (shnum, (block, salt))
946 # d.items()[0][1] is like (block, salt)
947 # d.items()[0][1][1] is the salt.
948 salt = d.items()[0][1][1]
949 # Next, extract just the blocks from the dict. We'll use the
950 # salt in the next step.
951 share_and_shareids = [(k, v[0]) for k, v in d.items()]
952 d2 = dict(share_and_shareids)
955 for shareid, share in d2.items():
956 shareids.append(shareid)
959 self._set_current_status("decoding")
960 started = time.time()
961 assert len(shareids) >= self._required_shares, len(shareids)
962 # zfec really doesn't want extra shares
963 shareids = shareids[:self._required_shares]
964 shares = shares[:self._required_shares]
965 self.log("decoding segment %d" % segnum)
966 if segnum == self._num_segments - 1:
967 d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
969 d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
970 def _process(buffers):
971 segment = "".join(buffers)
972 self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
974 numsegs=self._num_segments,
976 self.log(" joined length %d, datalength %d" %
977 (len(segment), self._data_length))
978 if segnum == self._num_segments - 1:
979 size_to_use = self._tail_data_size
981 size_to_use = self._segment_size
982 segment = segment[:size_to_use]
983 self.log(" segment len=%d" % len(segment))
984 self._status.accumulate_decode_time(time.time() - started)
986 d.addCallback(_process)
990 def _decrypt_segment(self, segment_and_salt):
992 I take a single segment and its salt, and decrypt it. I return
993 the plaintext of the segment that is in my argument.
995 segment, salt = segment_and_salt
996 self._set_current_status("decrypting")
997 self.log("decrypting segment %d" % self._current_segment)
998 started = time.time()
999 key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
1000 decryptor = AES(key)
1001 plaintext = decryptor.process(segment)
1002 self._status.accumulate_decrypt_time(time.time() - started)
1006 def notify_server_corruption(self, peerid, shnum, reason):
1007 ss = self.servermap.connections[peerid]
1008 ss.callRemoteOnly("advise_corrupt_share",
1009 "mutable", self._storage_index, shnum, reason)
1012 def _try_to_validate_privkey(self, enc_privkey, reader):
1013 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
1014 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
1015 if alleged_writekey != self._node.get_writekey():
1016 self.log("invalid privkey from %s shnum %d" %
1017 (reader, reader.shnum),
1018 level=log.WEIRD, umid="YIw4tA")
1020 self.servermap.mark_bad_share(reader.peerid, reader.shnum,
1022 e = CorruptShareError(reader.peerid,
1025 f = failure.Failure(e)
1026 self._bad_shares.add((reader.peerid, reader.shnum, f))
1030 self.log("got valid privkey from shnum %d on reader %s" %
1031 (reader.shnum, reader))
1032 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
1033 self._node._populate_encprivkey(enc_privkey)
1034 self._node._populate_privkey(privkey)
1035 self._need_privkey = False
1038 def _check_for_done(self, res):
1040 I check to see if this Retrieve object has successfully finished
1043 I can exit in the following ways:
1044 - If there are no more segments to download, then I exit by
1045 causing self._done_deferred to fire with the plaintext
1046 content requested by the caller.
1047 - If there are still segments to be downloaded, and there
1048 are enough active readers (readers which have not broken
1049 and have not given us corrupt data) to continue
1050 downloading, I send control back to
1051 _download_current_segment.
1052 - If there are still segments to be downloaded but there are
1053 not enough active peers to download them, I ask
1054 _add_active_peers to add more peers. If it is successful,
1055 it will call _download_current_segment. If there are not
1056 enough peers to retrieve the file, then that will cause
1057 _done_deferred to errback.
1059 self.log("checking for doneness")
1060 if self._current_segment > self._last_segment:
1061 # No more segments to download, we're done.
1062 self.log("got plaintext, done")
1065 if len(self._active_readers) >= self._required_shares:
1066 # More segments to download, but we have enough good peers
1067 # in self._active_readers that we can do that without issue,
1068 # so go nab the next segment.
1069 self.log("not done yet: on segment %d of %d" % \
1070 (self._current_segment + 1, self._num_segments))
1071 return self._download_current_segment()
1073 self.log("not done yet: on segment %d of %d, need to add peers" % \
1074 (self._current_segment + 1, self._num_segments))
1075 return self._add_active_peers()
1080 I am called by _check_for_done when the download process has
1081 finished successfully. After making some useful logging
1082 statements, I return the decrypted contents to the owner of this
1083 Retrieve object through self._done_deferred.
1085 self._running = False
1086 self._status.set_active(False)
1088 self._status.timings['total'] = now - self._started
1089 self._status.timings['fetch'] = now - self._started_fetching
1090 self._status.set_status("Finished")
1091 self._status.set_progress(1.0)
1093 # remember the encoding parameters, use them again next time
1094 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1095 offsets_tuple) = self.verinfo
1096 self._node._populate_required_shares(k)
1097 self._node._populate_total_shares(N)
1100 ret = list(self._bad_shares)
1101 self.log("done verifying, found %d bad shares" % len(ret))
1103 # TODO: upload status here?
1104 ret = self._consumer
1105 self._consumer.unregisterProducer()
1106 eventually(self._done_deferred.callback, ret)
1111 I am called by _add_active_peers when there are not enough
1112 active peers left to complete the download. After making some
1113 useful logging statements, I return an exception to that effect
1114 to the caller of this Retrieve object through
1115 self._done_deferred.
1117 self._running = False
1118 self._status.set_active(False)
1120 self._status.timings['total'] = now - self._started
1121 self._status.timings['fetch'] = now - self._started_fetching
1122 self._status.set_status("Failed")
1125 ret = list(self._bad_shares)
1127 format = ("ran out of peers: "
1128 "have %(have)d of %(total)d segments "
1129 "found %(bad)d bad shares "
1130 "encoding %(k)d-of-%(n)d")
1131 args = {"have": self._current_segment,
1132 "total": self._num_segments,
1133 "need": self._last_segment,
1134 "k": self._required_shares,
1135 "n": self._total_shares,
1136 "bad": len(self._bad_shares)}
1137 e = NotEnoughSharesError("%s, last failure: %s" % \
1138 (format % args, str(self._last_failure)))
1139 f = failure.Failure(e)
1141 eventually(self._done_deferred.callback, ret)