3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
10 MDMF_VERSION, SDMF_VERSION
11 from allmydata.util import hashutil, log, mathutil
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from pycryptopp.publickey import rsa
18 from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
19 from allmydata.mutable.layout import MDMFSlotReadProxy
22 implements(IRetrieveStatus)
23 statusid_counter = count(0)
26 self.timings["fetch_per_server"] = {}
27 self.timings["cumulative_verify"] = 0.0
30 self.storage_index = None
32 self.encoding = ("?","?")
34 self.status = "Not started"
36 self.counter = self.statusid_counter.next()
37 self.started = time.time()
39 def get_started(self):
41 def get_storage_index(self):
42 return self.storage_index
43 def get_encoding(self):
45 def using_helper(self):
51 def get_progress(self):
55 def get_counter(self):
58 def add_fetch_timing(self, peerid, elapsed):
59 if peerid not in self.timings["fetch_per_server"]:
60 self.timings["fetch_per_server"][peerid] = []
61 self.timings["fetch_per_server"][peerid].append(elapsed)
62 def set_storage_index(self, si):
63 self.storage_index = si
64 def set_helper(self, helper):
66 def set_encoding(self, k, n):
67 self.encoding = (k, n)
68 def set_size(self, size):
70 def set_status(self, status):
72 def set_progress(self, value):
74 def set_active(self, value):
81 # this class is currently single-use. Eventually (in MDMF) we will make
82 # it multi-use, in which case you can call download(range) multiple
83 # times, and each will have a separate response chain. However the
84 # Retrieve object will remain tied to a specific version of the file, and
85 # will use a single ServerMap instance.
86 implements(IPushProducer)
88 def __init__(self, filenode, servermap, verinfo, fetch_privkey=False,
91 assert self._node.get_pubkey()
92 self._storage_index = filenode.get_storage_index()
93 assert self._node.get_readkey()
94 self._last_failure = None
95 prefix = si_b2a(self._storage_index)[:5]
96 self._log_number = log.msg("Retrieve(%s): starting" % prefix)
97 self._outstanding_queries = {} # maps (peerid,shnum) to start_time
99 self._decoding = False
100 self._bad_shares = set()
102 self.servermap = servermap
103 assert self._node.get_pubkey()
104 self.verinfo = verinfo
105 # during repair, we may be called upon to grab the private key, since
106 # it wasn't picked up during a verify=False checker run, and we'll
107 # need it for repair to generate a new version.
108 self._need_privkey = fetch_privkey or verify
109 if self._node.get_privkey() and not verify:
110 self._need_privkey = False
112 if self._need_privkey:
113 # TODO: Evaluate the need for this. We'll use it if we want
114 # to limit how many queries are on the wire for the privkey
116 self._privkey_query_markers = [] # one Marker for each time we've
117 # tried to get the privkey.
119 # verify means that we are using the downloader logic to verify all
120 # of our shares. This tells the downloader a few things.
122 # 1. We need to download all of the shares.
123 # 2. We don't need to decode or decrypt the shares, since our
124 # caller doesn't care about the plaintext, only the
125 # information about which shares are or are not valid.
126 # 3. When we are validating readers, we need to validate the
127 # signature on the prefix. Do we? We already do this in the
133 self._status = RetrieveStatus()
134 self._status.set_storage_index(self._storage_index)
135 self._status.set_helper(False)
136 self._status.set_progress(0.0)
137 self._status.set_active(True)
138 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
139 offsets_tuple) = self.verinfo
140 self._status.set_size(datalength)
141 self._status.set_encoding(k, N)
144 self._paused_deferred = None
146 self._read_length = None
147 self.log("got seqnum %d" % self.verinfo[0])
150 def get_status(self):
153 def log(self, *args, **kwargs):
154 if "parent" not in kwargs:
155 kwargs["parent"] = self._log_number
156 if "facility" not in kwargs:
157 kwargs["facility"] = "tahoe.mutable.retrieve"
158 return log.msg(*args, **kwargs)
164 def pauseProducing(self):
166 I am called by my download target if we have produced too much
167 data for it to handle. I make the downloader stop producing new
168 data until my resumeProducing method is called.
173 # fired when the download is unpaused.
174 self._old_status = self._status.get_status()
175 self._status.set_status("Paused")
177 self._pause_deferred = defer.Deferred()
181 def resumeProducing(self):
183 I am called by my download target once it is ready to begin
184 receiving data again.
190 p = self._pause_deferred
191 self._pause_deferred = None
192 self._status.set_status(self._old_status)
194 eventually(p.callback, None)
197 def _check_for_paused(self, res):
199 I am called just before a write to the consumer. I return a
200 Deferred that eventually fires with the data that is to be
201 written to the consumer. If the download has not been paused,
202 the Deferred fires immediately. Otherwise, the Deferred fires
203 when the downloader is unpaused.
207 self._pause_deferred.addCallback(lambda ignored: d.callback(res))
209 return defer.succeed(res)
212 def download(self, consumer=None, offset=0, size=None):
213 assert IConsumer.providedBy(consumer) or self._verify
216 self._consumer = consumer
217 # we provide IPushProducer, so streaming=True, per
219 self._consumer.registerProducer(self, streaming=True)
221 self._done_deferred = defer.Deferred()
222 self._started = time.time()
223 self._status.set_status("Retrieving Shares")
225 self._offset = offset
226 self._read_length = size
228 # first, which servers can we use?
229 versionmap = self.servermap.make_versionmap()
230 shares = versionmap[self.verinfo]
231 # this sharemap is consumed as we decide to send requests
232 self.remaining_sharemap = DictOfSets()
233 for (shnum, peerid, timestamp) in shares:
234 self.remaining_sharemap.add(shnum, peerid)
235 # If the servermap update fetched anything, it fetched at least 1
236 # KiB, so we ask for that much.
237 # TODO: Change the cache methods to allow us to fetch all of the
238 # data that they have, then change this method to do that.
239 any_cache = self._node._read_from_cache(self.verinfo, shnum,
241 ss = self.servermap.connections[peerid]
242 reader = MDMFSlotReadProxy(ss,
246 reader.peerid = peerid
247 self.readers[shnum] = reader
250 self.shares = {} # maps shnum to validated blocks
251 self._active_readers = [] # list of active readers for this dl.
252 self._validated_readers = set() # set of readers that we have
253 # validated the prefix of
254 self._block_hash_trees = {} # shnum => hashtree
256 # how many shares do we need?
265 offsets_tuple) = self.verinfo
268 # We need one share hash tree for the entire file; its leaves
269 # are the roots of the block hash trees for the shares that
270 # comprise it, and its root is in the verinfo.
271 self.share_hash_tree = hashtree.IncompleteHashTree(N)
272 self.share_hash_tree.set_hashes({0: root_hash})
274 # This will set up both the segment decoder and the tail segment
275 # decoder, as well as a variety of other instance variables that
276 # the download process will use.
277 self._setup_encoding_parameters()
278 assert len(self.remaining_sharemap) >= k
280 self.log("starting download")
282 self._started_fetching = time.time()
284 self._add_active_peers()
285 # The download process beyond this is a state machine.
286 # _add_active_peers will select the peers that we want to use
287 # for the download, and then attempt to start downloading. After
288 # each segment, it will check for doneness, reacting to broken
289 # peers and corrupt shares as necessary. If it runs out of good
290 # peers before downloading all of the segments, _done_deferred
291 # will errback. Otherwise, it will eventually callback with the
292 # contents of the mutable file.
293 return self._done_deferred
296 def decode(self, blocks_and_salts, segnum):
298 I am a helper method that the mutable file update process uses
299 as a shortcut to decode and decrypt the segments that it needs
300 to fetch in order to perform a file update. I take in a
301 collection of blocks and salts, and pick some of those to make a
302 segment with. I return the plaintext associated with that
305 # shnum => block hash tree. Unusued, but setup_encoding_parameters will
307 # XXX: Make it so that it won't set this if we're just decoding.
308 self._block_hash_trees = {}
309 self._setup_encoding_parameters()
310 # This is the form expected by decode.
311 blocks_and_salts = blocks_and_salts.items()
312 blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
314 d = self._decode_blocks(blocks_and_salts, segnum)
315 d.addCallback(self._decrypt_segment)
319 def _setup_encoding_parameters(self):
321 I set up the encoding parameters, including k, n, the number
322 of segments associated with this file, and the segment decoder.
332 offsets_tuple) = self.verinfo
333 self._required_shares = k
334 self._total_shares = n
335 self._segment_size = segsize
336 self._data_length = datalength
339 self._version = MDMF_VERSION
341 self._version = SDMF_VERSION
343 if datalength and segsize:
344 self._num_segments = mathutil.div_ceil(datalength, segsize)
345 self._tail_data_size = datalength % segsize
347 self._num_segments = 0
348 self._tail_data_size = 0
350 self._segment_decoder = codec.CRSDecoder()
351 self._segment_decoder.set_params(segsize, k, n)
353 if not self._tail_data_size:
354 self._tail_data_size = segsize
356 self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
357 self._required_shares)
358 if self._tail_segment_size == self._segment_size:
359 self._tail_decoder = self._segment_decoder
361 self._tail_decoder = codec.CRSDecoder()
362 self._tail_decoder.set_params(self._tail_segment_size,
363 self._required_shares,
366 self.log("got encoding parameters: "
369 "%d segments of %d bytes each (%d byte tail segment)" % \
370 (k, n, self._num_segments, self._segment_size,
371 self._tail_segment_size))
373 for i in xrange(self._total_shares):
374 # So we don't have to do this later.
375 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
377 # Our last task is to tell the downloader where to start and
378 # where to stop. We use three parameters for that:
379 # - self._start_segment: the segment that we need to start
381 # - self._current_segment: the next segment that we need to
383 # - self._last_segment: The last segment that we were asked to
386 # We say that the download is complete when
387 # self._current_segment > self._last_segment. We use
388 # self._start_segment and self._last_segment to know when to
389 # strip things off of segments, and how much to strip.
391 self.log("got offset: %d" % self._offset)
392 # our start segment is the first segment containing the
393 # offset we were given.
394 start = mathutil.div_ceil(self._offset,
396 # this gets us the first segment after self._offset. Then
397 # our start segment is the one before it.
400 assert start < self._num_segments
401 self._start_segment = start
402 self.log("got start segment: %d" % self._start_segment)
404 self._start_segment = 0
407 if self._read_length:
408 # our end segment is the last segment containing part of the
409 # segment that we were asked to read.
410 self.log("got read length %d" % self._read_length)
411 end_data = self._offset + self._read_length
412 end = mathutil.div_ceil(end_data,
415 assert end < self._num_segments
416 self._last_segment = end
417 self.log("got end segment: %d" % self._last_segment)
419 self._last_segment = self._num_segments - 1
421 self._current_segment = self._start_segment
423 def _add_active_peers(self):
425 I populate self._active_readers with enough active readers to
426 retrieve the contents of this mutable file. I am called before
427 downloading starts, and (eventually) after each validation
428 error, connection error, or other problem in the download.
430 # TODO: It would be cool to investigate other heuristics for
431 # reader selection. For instance, the cost (in time the user
432 # spends waiting for their file) of selecting a really slow peer
433 # that happens to have a primary share is probably more than
434 # selecting a really fast peer that doesn't have a primary
435 # share. Maybe the servermap could be extended to provide this
436 # information; it could keep track of latency information while
437 # it gathers more important data, and then this routine could
438 # use that to select active readers.
440 # (these and other questions would be easier to answer with a
441 # robust, configurable tahoe-lafs simulator, which modeled node
442 # failures, differences in node speed, and other characteristics
443 # that we expect storage servers to have. You could have
444 # presets for really stable grids (like allmydata.com),
445 # friendnets, make it easy to configure your own settings, and
446 # then simulate the effect of big changes on these use cases
447 # instead of just reasoning about what the effect might be. Out
448 # of scope for MDMF, though.)
450 # We need at least self._required_shares readers to download a
453 needed = self._total_shares
455 needed = self._required_shares - len(self._active_readers)
456 # XXX: Why don't format= log messages work here?
457 self.log("adding %d peers to the active peers list" % needed)
459 # We favor lower numbered shares, since FEC is faster with
460 # primary shares than with other shares, and lower-numbered
461 # shares are more likely to be primary than higher numbered
463 active_shnums = set(sorted(self.remaining_sharemap.keys()))
464 # We shouldn't consider adding shares that we already have; this
465 # will cause problems later.
466 active_shnums -= set([reader.shnum for reader in self._active_readers])
467 active_shnums = list(active_shnums)[:needed]
468 if len(active_shnums) < needed and not self._verify:
469 # We don't have enough readers to retrieve the file; fail.
470 return self._failed()
472 for shnum in active_shnums:
473 self._active_readers.append(self.readers[shnum])
474 self.log("added reader for share %d" % shnum)
475 assert len(self._active_readers) >= self._required_shares
476 # Conceptually, this is part of the _add_active_peers step. It
477 # validates the prefixes of newly added readers to make sure
478 # that they match what we are expecting for self.verinfo. If
479 # validation is successful, _validate_active_prefixes will call
480 # _download_current_segment for us. If validation is
481 # unsuccessful, then _validate_prefixes will remove the peer and
482 # call _add_active_peers again, where we will attempt to rectify
483 # the problem by choosing another peer.
484 return self._validate_active_prefixes()
487 def _validate_active_prefixes(self):
489 I check to make sure that the prefixes on the peers that I am
490 currently reading from match the prefix that we want to see, as
491 said in self.verinfo.
493 If I find that all of the active peers have acceptable prefixes,
494 I pass control to _download_current_segment, which will use
495 those peers to do cool things. If I find that some of the active
496 peers have unacceptable prefixes, I will remove them from active
497 peers (and from further consideration) and call
498 _add_active_peers to attempt to rectify the situation. I keep
499 track of which peers I have already validated so that I don't
502 assert self._active_readers, "No more active readers"
505 new_readers = set(self._active_readers) - self._validated_readers
506 self.log('validating %d newly-added active readers' % len(new_readers))
508 for reader in new_readers:
509 # We force a remote read here -- otherwise, we are relying
510 # on cached data that we already verified as valid, and we
511 # won't detect an uncoordinated write that has occurred
512 # since the last servermap update.
513 d = reader.get_prefix(force_remote=True)
514 d.addCallback(self._try_to_validate_prefix, reader)
516 dl = defer.DeferredList(ds, consumeErrors=True)
517 def _check_results(results):
518 # Each result in results will be of the form (success, msg).
519 # We don't care about msg, but success will tell us whether
520 # or not the checkstring validated. If it didn't, we need to
521 # remove the offending (peer,share) from our active readers,
522 # and ensure that active readers is again populated.
524 for i, result in enumerate(results):
526 reader = self._active_readers[i]
528 assert isinstance(f, failure.Failure)
530 self.log("The reader %s failed to "
531 "properly validate: %s" % \
532 (reader, str(f.value)))
533 bad_readers.append((reader, f))
535 reader = self._active_readers[i]
536 self.log("the reader %s checks out, so we'll use it" % \
538 self._validated_readers.add(reader)
539 # Each time we validate a reader, we check to see if
540 # we need the private key. If we do, we politely ask
541 # for it and then continue computing. If we find
542 # that we haven't gotten it at the end of
543 # segment decoding, then we'll take more drastic
545 if self._need_privkey and not self._node.is_readonly():
546 d = reader.get_encprivkey()
547 d.addCallback(self._try_to_validate_privkey, reader)
549 # We do them all at once, or else we screw up list indexing.
550 for (reader, f) in bad_readers:
551 self._mark_bad_share(reader, f)
553 if len(self._active_readers) >= self._required_shares:
554 return self._download_current_segment()
556 return self._failed()
558 return self._add_active_peers()
560 return self._download_current_segment()
561 # The next step will assert that it has enough active
562 # readers to fetch shares; we just need to remove it.
563 dl.addCallback(_check_results)
567 def _try_to_validate_prefix(self, prefix, reader):
569 I check that the prefix returned by a candidate server for
570 retrieval matches the prefix that the servermap knows about
571 (and, hence, the prefix that was validated earlier). If it does,
572 I return True, which means that I approve of the use of the
573 candidate server for segment retrieval. If it doesn't, I return
574 False, which means that another server must be chosen.
584 offsets_tuple) = self.verinfo
585 if known_prefix != prefix:
586 self.log("prefix from share %d doesn't match" % reader.shnum)
587 raise UncoordinatedWriteError("Mismatched prefix -- this could "
588 "indicate an uncoordinated write")
589 # Otherwise, we're okay -- no issues.
592 def _remove_reader(self, reader):
594 At various points, we will wish to remove a peer from
595 consideration and/or use. These include, but are not necessarily
598 - A connection error.
599 - A mismatched prefix (that is, a prefix that does not match
600 our conception of the version information string).
601 - A failing block hash, salt hash, or share hash, which can
602 indicate disk failure/bit flips, or network trouble.
604 This method will do that. I will make sure that the
605 (shnum,reader) combination represented by my reader argument is
606 not used for anything else during this download. I will not
607 advise the reader of any corruption, something that my callers
608 may wish to do on their own.
610 # TODO: When you're done writing this, see if this is ever
611 # actually used for something that _mark_bad_share isn't. I have
612 # a feeling that they will be used for very similar things, and
613 # that having them both here is just going to be an epic amount
614 # of code duplication.
616 # (well, okay, not epic, but meaningful)
617 self.log("removing reader %s" % reader)
618 # Remove the reader from _active_readers
619 self._active_readers.remove(reader)
620 # TODO: self.readers.remove(reader)?
621 for shnum in list(self.remaining_sharemap.keys()):
622 self.remaining_sharemap.discard(shnum, reader.peerid)
625 def _mark_bad_share(self, reader, f):
627 I mark the (peerid, shnum) encapsulated by my reader argument as
628 a bad share, which means that it will not be used anywhere else.
630 There are several reasons to want to mark something as a bad
631 share. These include:
633 - A connection error to the peer.
634 - A mismatched prefix (that is, a prefix that does not match
635 our local conception of the version information string).
636 - A failing block hash, salt hash, share hash, or other
639 This method will ensure that readers that we wish to mark bad
640 (for these reasons or other reasons) are not used for the rest
641 of the download. Additionally, it will attempt to tell the
642 remote peer (with no guarantee of success) that its share is
645 self.log("marking share %d on server %s as bad" % \
646 (reader.shnum, reader))
647 prefix = self.verinfo[-2]
648 self.servermap.mark_bad_share(reader.peerid,
651 self._remove_reader(reader)
652 self._bad_shares.add((reader.peerid, reader.shnum, f))
653 self._status.problems[reader.peerid] = f
654 self._last_failure = f
655 self.notify_server_corruption(reader.peerid, reader.shnum,
659 def _download_current_segment(self):
661 I download, validate, decode, decrypt, and assemble the segment
662 that this Retrieve is currently responsible for downloading.
664 assert len(self._active_readers) >= self._required_shares
665 if self._current_segment <= self._last_segment:
666 d = self._process_segment(self._current_segment)
668 d = defer.succeed(None)
669 d.addBoth(self._turn_barrier)
670 d.addCallback(self._check_for_done)
674 def _turn_barrier(self, result):
676 I help the download process avoid the recursion limit issues
679 return fireEventually(result)
682 def _process_segment(self, segnum):
684 I download, validate, decode, and decrypt one segment of the
685 file that this Retrieve is retrieving. This means coordinating
686 the process of getting k blocks of that file, validating them,
687 assembling them into one segment with the decoder, and then
690 self.log("processing segment %d" % segnum)
692 # TODO: The old code uses a marker. Should this code do that
693 # too? What did the Marker do?
694 assert len(self._active_readers) >= self._required_shares
696 # We need to ask each of our active readers for its block and
697 # salt. We will then validate those. If validation is
698 # successful, we will assemble the results into plaintext.
700 for reader in self._active_readers:
701 started = time.time()
702 d = reader.get_block_and_salt(segnum, queue=True)
703 d2 = self._get_needed_hashes(reader, segnum)
704 dl = defer.DeferredList([d, d2], consumeErrors=True)
705 dl.addCallback(self._validate_block, segnum, reader, started)
706 dl.addErrback(self._validation_or_decoding_failed, [reader])
709 dl = defer.DeferredList(ds)
711 dl.addCallback(lambda ignored: "")
712 dl.addCallback(self._set_segment)
714 dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
718 def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
720 I take the results of fetching and validating the blocks from a
721 callback chain in another method. If the results are such that
722 they tell me that validation and fetching succeeded without
723 incident, I will proceed with decoding and decryption.
724 Otherwise, I will do nothing.
726 self.log("trying to decode and decrypt segment %d" % segnum)
728 for block_and_salt in blocks_and_salts:
729 if not block_and_salt[0] or block_and_salt[1] == None:
730 self.log("some validation operations failed; not proceeding")
734 self.log("everything looks ok, building segment %d" % segnum)
735 d = self._decode_blocks(blocks_and_salts, segnum)
736 d.addCallback(self._decrypt_segment)
737 d.addErrback(self._validation_or_decoding_failed,
738 self._active_readers)
739 # check to see whether we've been paused before writing
741 d.addCallback(self._check_for_paused)
742 d.addCallback(self._set_segment)
745 return defer.succeed(None)
748 def _set_segment(self, segment):
750 Given a plaintext segment, I register that segment with the
751 target that is handling the file download.
753 self.log("got plaintext for segment %d" % self._current_segment)
754 if self._current_segment == self._start_segment:
755 # We're on the first segment. It's possible that we want
756 # only some part of the end of this segment, and that we
757 # just downloaded the whole thing to get that part. If so,
758 # we need to account for that and give the reader just the
759 # data that they want.
760 n = self._offset % self._segment_size
761 self.log("stripping %d bytes off of the first segment" % n)
762 self.log("original segment length: %d" % len(segment))
763 segment = segment[n:]
764 self.log("new segment length: %d" % len(segment))
766 if self._current_segment == self._last_segment and self._read_length is not None:
767 # We're on the last segment. It's possible that we only want
768 # part of the beginning of this segment, and that we
769 # downloaded the whole thing anyway. Make sure to give the
770 # caller only the portion of the segment that they want to
772 extra = self._read_length
773 if self._start_segment != self._last_segment:
774 extra -= self._segment_size - \
775 (self._offset % self._segment_size)
776 extra %= self._segment_size
777 self.log("original segment length: %d" % len(segment))
778 segment = segment[:extra]
779 self.log("new segment length: %d" % len(segment))
780 self.log("only taking %d bytes of the last segment" % extra)
783 self._consumer.write(segment)
785 # we don't care about the plaintext if we are doing a verify.
787 self._current_segment += 1
790 def _validation_or_decoding_failed(self, f, readers):
792 I am called when a block or a salt fails to correctly validate, or when
793 the decryption or decoding operation fails for some reason. I react to
794 this failure by notifying the remote server of corruption, and then
795 removing the remote peer from further activity.
797 assert isinstance(readers, list)
798 bad_shnums = [reader.shnum for reader in readers]
800 self.log("validation or decoding failed on share(s) %s, peer(s) %s "
801 ", segment %d: %s" % \
802 (bad_shnums, readers, self._current_segment, str(f)))
803 for reader in readers:
804 self._mark_bad_share(reader, f)
808 def _validate_block(self, results, segnum, reader, started):
810 I validate a block from one share on a remote server.
812 # Grab the part of the block hash tree that is necessary to
813 # validate this block, then generate the block hash root.
814 self.log("validating share %d for segment %d" % (reader.shnum,
816 self._status.add_fetch_timing(reader.peerid, started)
817 self._status.set_status("Valdiating blocks for segment %d" % segnum)
818 # Did we fail to fetch either of the things that we were
819 # supposed to? Fail if so.
820 if not results[0][0] and results[1][0]:
821 # handled by the errback handler.
823 # These all get batched into one query, so the resulting
824 # failure should be the same for all of them, so we can just
826 assert isinstance(results[0][1], failure.Failure)
829 raise CorruptShareError(reader.peerid,
831 "Connection error: %s" % str(f))
833 block_and_salt, block_and_sharehashes = results
834 block, salt = block_and_salt[1]
835 blockhashes, sharehashes = block_and_sharehashes[1]
837 blockhashes = dict(enumerate(blockhashes[1]))
838 self.log("the reader gave me the following blockhashes: %s" % \
840 self.log("the reader gave me the following sharehashes: %s" % \
841 sharehashes[1].keys())
842 bht = self._block_hash_trees[reader.shnum]
844 if bht.needed_hashes(segnum, include_leaf=True):
846 bht.set_hashes(blockhashes)
847 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
849 raise CorruptShareError(reader.peerid,
851 "block hash tree failure: %s" % e)
853 if self._version == MDMF_VERSION:
854 blockhash = hashutil.block_hash(salt + block)
856 blockhash = hashutil.block_hash(block)
857 # If this works without an error, then validation is
860 bht.set_hashes(leaves={segnum: blockhash})
861 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
863 raise CorruptShareError(reader.peerid,
865 "block hash tree failure: %s" % e)
867 # Reaching this point means that we know that this segment
868 # is correct. Now we need to check to see whether the share
869 # hash chain is also correct.
870 # SDMF wrote share hash chains that didn't contain the
871 # leaves, which would be produced from the block hash tree.
872 # So we need to validate the block hash tree first. If
873 # successful, then bht[0] will contain the root for the
874 # shnum, which will be a leaf in the share hash tree, which
875 # will allow us to validate the rest of the tree.
876 if self.share_hash_tree.needed_hashes(reader.shnum,
877 include_leaf=True) or \
880 self.share_hash_tree.set_hashes(hashes=sharehashes[1],
881 leaves={reader.shnum: bht[0]})
882 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
884 raise CorruptShareError(reader.peerid,
886 "corrupt hashes: %s" % e)
888 self.log('share %d is valid for segment %d' % (reader.shnum,
890 return {reader.shnum: (block, salt)}
893 def _get_needed_hashes(self, reader, segnum):
895 I get the hashes needed to validate segnum from the reader, then return
896 to my caller when this is done.
898 bht = self._block_hash_trees[reader.shnum]
899 needed = bht.needed_hashes(segnum, include_leaf=True)
900 # The root of the block hash tree is also a leaf in the share
901 # hash tree. So we don't need to fetch it from the remote
902 # server. In the case of files with one segment, this means that
903 # we won't fetch any block hash tree from the remote server,
904 # since the hash of each share of the file is the entire block
905 # hash tree, and is a leaf in the share hash tree. This is fine,
906 # since any share corruption will be detected in the share hash
909 self.log("getting blockhashes for segment %d, share %d: %s" % \
910 (segnum, reader.shnum, str(needed)))
911 d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
912 if self.share_hash_tree.needed_hashes(reader.shnum):
913 need = self.share_hash_tree.needed_hashes(reader.shnum)
914 self.log("also need sharehashes for share %d: %s" % (reader.shnum,
916 d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
918 d2 = defer.succeed({}) # the logic in the next method
920 dl = defer.DeferredList([d1, d2], consumeErrors=True)
924 def _decode_blocks(self, blocks_and_salts, segnum):
926 I take a list of k blocks and salts, and decode that into a
927 single encrypted segment.
930 # We want to merge our dictionaries to the form
931 # {shnum: blocks_and_salts}
933 # The dictionaries come from validate block that way, so we just
934 # need to merge them.
935 for block_and_salt in blocks_and_salts:
936 d.update(block_and_salt[1])
938 # All of these blocks should have the same salt; in SDMF, it is
939 # the file-wide IV, while in MDMF it is the per-segment salt. In
940 # either case, we just need to get one of them and use it.
942 # d.items()[0] is like (shnum, (block, salt))
943 # d.items()[0][1] is like (block, salt)
944 # d.items()[0][1][1] is the salt.
945 salt = d.items()[0][1][1]
946 # Next, extract just the blocks from the dict. We'll use the
947 # salt in the next step.
948 share_and_shareids = [(k, v[0]) for k, v in d.items()]
949 d2 = dict(share_and_shareids)
952 for shareid, share in d2.items():
953 shareids.append(shareid)
956 self._status.set_status("Decoding")
957 started = time.time()
958 assert len(shareids) >= self._required_shares, len(shareids)
959 # zfec really doesn't want extra shares
960 shareids = shareids[:self._required_shares]
961 shares = shares[:self._required_shares]
962 self.log("decoding segment %d" % segnum)
963 if segnum == self._num_segments - 1:
964 d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
966 d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
967 def _process(buffers):
968 segment = "".join(buffers)
969 self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
971 numsegs=self._num_segments,
973 self.log(" joined length %d, datalength %d" %
974 (len(segment), self._data_length))
975 if segnum == self._num_segments - 1:
976 size_to_use = self._tail_data_size
978 size_to_use = self._segment_size
979 segment = segment[:size_to_use]
980 self.log(" segment len=%d" % len(segment))
981 self._status.timings.setdefault("decode", 0)
982 self._status.timings['decode'] = time.time() - started
984 d.addCallback(_process)
988 def _decrypt_segment(self, segment_and_salt):
990 I take a single segment and its salt, and decrypt it. I return
991 the plaintext of the segment that is in my argument.
993 segment, salt = segment_and_salt
994 self._status.set_status("decrypting")
995 self.log("decrypting segment %d" % self._current_segment)
996 started = time.time()
997 key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
999 plaintext = decryptor.process(segment)
1000 self._status.timings.setdefault("decrypt", 0)
1001 self._status.timings['decrypt'] = time.time() - started
1005 def notify_server_corruption(self, peerid, shnum, reason):
1006 ss = self.servermap.connections[peerid]
1007 ss.callRemoteOnly("advise_corrupt_share",
1008 "mutable", self._storage_index, shnum, reason)
1011 def _try_to_validate_privkey(self, enc_privkey, reader):
1012 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
1013 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
1014 if alleged_writekey != self._node.get_writekey():
1015 self.log("invalid privkey from %s shnum %d" %
1016 (reader, reader.shnum),
1017 level=log.WEIRD, umid="YIw4tA")
1019 self.servermap.mark_bad_share(reader.peerid, reader.shnum,
1021 e = CorruptShareError(reader.peerid,
1024 f = failure.Failure(e)
1025 self._bad_shares.add((reader.peerid, reader.shnum, f))
1029 self.log("got valid privkey from shnum %d on reader %s" %
1030 (reader.shnum, reader))
1031 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
1032 self._node._populate_encprivkey(enc_privkey)
1033 self._node._populate_privkey(privkey)
1034 self._need_privkey = False
1037 def _check_for_done(self, res):
1039 I check to see if this Retrieve object has successfully finished
1042 I can exit in the following ways:
1043 - If there are no more segments to download, then I exit by
1044 causing self._done_deferred to fire with the plaintext
1045 content requested by the caller.
1046 - If there are still segments to be downloaded, and there
1047 are enough active readers (readers which have not broken
1048 and have not given us corrupt data) to continue
1049 downloading, I send control back to
1050 _download_current_segment.
1051 - If there are still segments to be downloaded but there are
1052 not enough active peers to download them, I ask
1053 _add_active_peers to add more peers. If it is successful,
1054 it will call _download_current_segment. If there are not
1055 enough peers to retrieve the file, then that will cause
1056 _done_deferred to errback.
1058 self.log("checking for doneness")
1059 if self._current_segment > self._last_segment:
1060 # No more segments to download, we're done.
1061 self.log("got plaintext, done")
1064 if len(self._active_readers) >= self._required_shares:
1065 # More segments to download, but we have enough good peers
1066 # in self._active_readers that we can do that without issue,
1067 # so go nab the next segment.
1068 self.log("not done yet: on segment %d of %d" % \
1069 (self._current_segment + 1, self._num_segments))
1070 return self._download_current_segment()
1072 self.log("not done yet: on segment %d of %d, need to add peers" % \
1073 (self._current_segment + 1, self._num_segments))
1074 return self._add_active_peers()
1079 I am called by _check_for_done when the download process has
1080 finished successfully. After making some useful logging
1081 statements, I return the decrypted contents to the owner of this
1082 Retrieve object through self._done_deferred.
1084 self._running = False
1085 self._status.set_active(False)
1087 self._status.timings['total'] = now - self._started
1088 self._status.timings['fetch'] = now - self._started_fetching
1091 ret = list(self._bad_shares)
1092 self.log("done verifying, found %d bad shares" % len(ret))
1094 # TODO: upload status here?
1095 ret = self._consumer
1096 self._consumer.unregisterProducer()
1097 eventually(self._done_deferred.callback, ret)
1102 I am called by _add_active_peers when there are not enough
1103 active peers left to complete the download. After making some
1104 useful logging statements, I return an exception to that effect
1105 to the caller of this Retrieve object through
1106 self._done_deferred.
1108 self._running = False
1109 self._status.set_active(False)
1111 self._status.timings['total'] = now - self._started
1112 self._status.timings['fetch'] = now - self._started_fetching
1115 ret = list(self._bad_shares)
1117 format = ("ran out of peers: "
1118 "have %(have)d of %(total)d segments "
1119 "found %(bad)d bad shares "
1120 "encoding %(k)d-of-%(n)d")
1121 args = {"have": self._current_segment,
1122 "total": self._num_segments,
1123 "need": self._last_segment,
1124 "k": self._required_shares,
1125 "n": self._total_shares,
1126 "bad": len(self._bad_shares)}
1127 e = NotEnoughSharesError("%s, last failure: %s" % \
1128 (format % args, str(self._last_failure)))
1129 f = failure.Failure(e)
1131 eventually(self._done_deferred.callback, ret)