3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
10 MDMF_VERSION, SDMF_VERSION
11 from allmydata.util import hashutil, log, mathutil
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from pycryptopp.publickey import rsa
18 from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
19 from allmydata.mutable.layout import MDMFSlotReadProxy
22 implements(IRetrieveStatus)
23 statusid_counter = count(0)
26 self.timings["fetch_per_server"] = {}
27 self.timings["decode"] = 0.0
28 self.timings["decrypt"] = 0.0
29 self.timings["cumulative_verify"] = 0.0
32 self.storage_index = None
34 self.encoding = ("?","?")
36 self.status = "Not started"
38 self.counter = self.statusid_counter.next()
39 self.started = time.time()
41 def get_started(self):
43 def get_storage_index(self):
44 return self.storage_index
45 def get_encoding(self):
47 def using_helper(self):
53 def get_progress(self):
57 def get_counter(self):
60 def add_fetch_timing(self, peerid, elapsed):
61 if peerid not in self.timings["fetch_per_server"]:
62 self.timings["fetch_per_server"][peerid] = []
63 self.timings["fetch_per_server"][peerid].append(elapsed)
64 def accumulate_decode_time(self, elapsed):
65 self.timings["decode"] += elapsed
66 def accumulate_decrypt_time(self, elapsed):
67 self.timings["decrypt"] += elapsed
68 def set_storage_index(self, si):
69 self.storage_index = si
70 def set_helper(self, helper):
72 def set_encoding(self, k, n):
73 self.encoding = (k, n)
74 def set_size(self, size):
76 def set_status(self, status):
78 def set_progress(self, value):
80 def set_active(self, value):
87 # this class is currently single-use. Eventually (in MDMF) we will make
88 # it multi-use, in which case you can call download(range) multiple
89 # times, and each will have a separate response chain. However the
90 # Retrieve object will remain tied to a specific version of the file, and
91 # will use a single ServerMap instance.
92 implements(IPushProducer)
94 def __init__(self, filenode, servermap, verinfo, fetch_privkey=False,
97 assert self._node.get_pubkey()
98 self._storage_index = filenode.get_storage_index()
99 assert self._node.get_readkey()
100 self._last_failure = None
101 prefix = si_b2a(self._storage_index)[:5]
102 self._log_number = log.msg("Retrieve(%s): starting" % prefix)
103 self._outstanding_queries = {} # maps (peerid,shnum) to start_time
105 self._decoding = False
106 self._bad_shares = set()
108 self.servermap = servermap
109 assert self._node.get_pubkey()
110 self.verinfo = verinfo
111 # during repair, we may be called upon to grab the private key, since
112 # it wasn't picked up during a verify=False checker run, and we'll
113 # need it for repair to generate a new version.
114 self._need_privkey = verify or (fetch_privkey
115 and not self._node.get_privkey())
117 if self._need_privkey:
118 # TODO: Evaluate the need for this. We'll use it if we want
119 # to limit how many queries are on the wire for the privkey
121 self._privkey_query_markers = [] # one Marker for each time we've
122 # tried to get the privkey.
124 # verify means that we are using the downloader logic to verify all
125 # of our shares. This tells the downloader a few things.
127 # 1. We need to download all of the shares.
128 # 2. We don't need to decode or decrypt the shares, since our
129 # caller doesn't care about the plaintext, only the
130 # information about which shares are or are not valid.
131 # 3. When we are validating readers, we need to validate the
132 # signature on the prefix. Do we? We already do this in the
134 self._verify = verify
136 self._status = RetrieveStatus()
137 self._status.set_storage_index(self._storage_index)
138 self._status.set_helper(False)
139 self._status.set_progress(0.0)
140 self._status.set_active(True)
141 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
142 offsets_tuple) = self.verinfo
143 self._status.set_size(datalength)
144 self._status.set_encoding(k, N)
146 self._pause_deferred = None
148 self._read_length = None
149 self.log("got seqnum %d" % self.verinfo[0])
152 def get_status(self):
155 def log(self, *args, **kwargs):
156 if "parent" not in kwargs:
157 kwargs["parent"] = self._log_number
158 if "facility" not in kwargs:
159 kwargs["facility"] = "tahoe.mutable.retrieve"
160 return log.msg(*args, **kwargs)
162 def _set_current_status(self, state):
163 seg = "%d/%d" % (self._current_segment, self._last_segment)
164 self._status.set_status("segment %s (%s)" % (seg, state))
169 def pauseProducing(self):
171 I am called by my download target if we have produced too much
172 data for it to handle. I make the downloader stop producing new
173 data until my resumeProducing method is called.
175 if self._pause_deferred is not None:
178 # fired when the download is unpaused.
179 self._old_status = self._status.get_status()
180 self._set_current_status("paused")
182 self._pause_deferred = defer.Deferred()
185 def resumeProducing(self):
187 I am called by my download target once it is ready to begin
188 receiving data again.
190 if self._pause_deferred is None:
193 p = self._pause_deferred
194 self._pause_deferred = None
195 self._status.set_status(self._old_status)
197 eventually(p.callback, None)
200 def _check_for_paused(self, res):
202 I am called just before a write to the consumer. I return a
203 Deferred that eventually fires with the data that is to be
204 written to the consumer. If the download has not been paused,
205 the Deferred fires immediately. Otherwise, the Deferred fires
206 when the downloader is unpaused.
208 if self._pause_deferred is not None:
210 self._pause_deferred.addCallback(lambda ignored: d.callback(res))
212 return defer.succeed(res)
215 def download(self, consumer=None, offset=0, size=None):
216 assert IConsumer.providedBy(consumer) or self._verify
219 self._consumer = consumer
220 # we provide IPushProducer, so streaming=True, per
222 self._consumer.registerProducer(self, streaming=True)
224 self._done_deferred = defer.Deferred()
225 self._offset = offset
226 self._read_length = size
227 self._setup_download()
228 self._setup_encoding_parameters()
229 self.log("starting download")
230 self._started_fetching = time.time()
231 # The download process beyond this is a state machine.
232 # _add_active_peers will select the peers that we want to use
233 # for the download, and then attempt to start downloading. After
234 # each segment, it will check for doneness, reacting to broken
235 # peers and corrupt shares as necessary. If it runs out of good
236 # peers before downloading all of the segments, _done_deferred
237 # will errback. Otherwise, it will eventually callback with the
238 # contents of the mutable file.
240 return self._done_deferred
243 d = fireEventually(None) # avoid #237 recursion limit problem
244 d.addCallback(lambda ign: self._activate_enough_peers())
245 d.addCallback(lambda ign: self._download_current_segment())
246 # when we're done, _download_current_segment will call _done. If we
247 # aren't, it will call loop() again.
248 d.addErrback(self._error)
250 def _setup_download(self):
251 self._started = time.time()
252 self._status.set_status("Retrieving Shares")
254 # how many shares do we need?
263 offsets_tuple) = self.verinfo
265 # first, which servers can we use?
266 versionmap = self.servermap.make_versionmap()
267 shares = versionmap[self.verinfo]
268 # this sharemap is consumed as we decide to send requests
269 self.remaining_sharemap = DictOfSets()
270 for (shnum, peerid, timestamp) in shares:
271 self.remaining_sharemap.add(shnum, peerid)
272 # If the servermap update fetched anything, it fetched at least 1
273 # KiB, so we ask for that much.
274 # TODO: Change the cache methods to allow us to fetch all of the
275 # data that they have, then change this method to do that.
276 any_cache = self._node._read_from_cache(self.verinfo, shnum,
278 ss = self.servermap.connections[peerid]
279 reader = MDMFSlotReadProxy(ss,
283 reader.peerid = peerid
284 self.readers[shnum] = reader
285 assert len(self.remaining_sharemap) >= k
287 self.shares = {} # maps shnum to validated blocks
288 self._active_readers = [] # list of active readers for this dl.
289 self._block_hash_trees = {} # shnum => hashtree
291 # We need one share hash tree for the entire file; its leaves
292 # are the roots of the block hash trees for the shares that
293 # comprise it, and its root is in the verinfo.
294 self.share_hash_tree = hashtree.IncompleteHashTree(N)
295 self.share_hash_tree.set_hashes({0: root_hash})
297 def decode(self, blocks_and_salts, segnum):
299 I am a helper method that the mutable file update process uses
300 as a shortcut to decode and decrypt the segments that it needs
301 to fetch in order to perform a file update. I take in a
302 collection of blocks and salts, and pick some of those to make a
303 segment with. I return the plaintext associated with that
306 # shnum => block hash tree. Unused, but setup_encoding_parameters will
308 self._block_hash_trees = None
309 self._setup_encoding_parameters()
311 # This is the form expected by decode.
312 blocks_and_salts = blocks_and_salts.items()
313 blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
315 d = self._decode_blocks(blocks_and_salts, segnum)
316 d.addCallback(self._decrypt_segment)
320 def _setup_encoding_parameters(self):
322 I set up the encoding parameters, including k, n, the number
323 of segments associated with this file, and the segment decoders.
333 offsets_tuple) = self.verinfo
334 self._required_shares = k
335 self._total_shares = n
336 self._segment_size = segsize
337 self._data_length = datalength
340 self._version = MDMF_VERSION
342 self._version = SDMF_VERSION
344 if datalength and segsize:
345 self._num_segments = mathutil.div_ceil(datalength, segsize)
346 self._tail_data_size = datalength % segsize
348 self._num_segments = 0
349 self._tail_data_size = 0
351 self._segment_decoder = codec.CRSDecoder()
352 self._segment_decoder.set_params(segsize, k, n)
354 if not self._tail_data_size:
355 self._tail_data_size = segsize
357 self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
358 self._required_shares)
359 if self._tail_segment_size == self._segment_size:
360 self._tail_decoder = self._segment_decoder
362 self._tail_decoder = codec.CRSDecoder()
363 self._tail_decoder.set_params(self._tail_segment_size,
364 self._required_shares,
367 self.log("got encoding parameters: "
370 "%d segments of %d bytes each (%d byte tail segment)" % \
371 (k, n, self._num_segments, self._segment_size,
372 self._tail_segment_size))
374 if self._block_hash_trees is not None:
375 for i in xrange(self._total_shares):
376 # So we don't have to do this later.
377 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
379 # Our last task is to tell the downloader where to start and
380 # where to stop. We use three parameters for that:
381 # - self._start_segment: the segment that we need to start
383 # - self._current_segment: the next segment that we need to
385 # - self._last_segment: The last segment that we were asked to
388 # We say that the download is complete when
389 # self._current_segment > self._last_segment. We use
390 # self._start_segment and self._last_segment to know when to
391 # strip things off of segments, and how much to strip.
393 self.log("got offset: %d" % self._offset)
394 # our start segment is the first segment containing the
395 # offset we were given.
396 start = self._offset // self._segment_size
398 assert start < self._num_segments
399 self._start_segment = start
400 self.log("got start segment: %d" % self._start_segment)
402 self._start_segment = 0
405 # If self._read_length is None, then we want to read the whole
406 # file. Otherwise, we want to read only part of the file, and
407 # need to figure out where to stop reading.
408 if self._read_length is not None:
409 # our end segment is the last segment containing part of the
410 # segment that we were asked to read.
411 self.log("got read length %d" % self._read_length)
412 if self._read_length != 0:
413 end_data = self._offset + self._read_length
415 # We don't actually need to read the byte at end_data,
416 # but the one before it.
417 end = (end_data - 1) // self._segment_size
419 assert end < self._num_segments
420 self._last_segment = end
422 self._last_segment = self._start_segment
423 self.log("got end segment: %d" % self._last_segment)
425 self._last_segment = self._num_segments - 1
427 self._current_segment = self._start_segment
429 def _activate_enough_peers(self):
431 I populate self._active_readers with enough active readers to
432 retrieve the contents of this mutable file. I am called before
433 downloading starts, and (eventually) after each validation
434 error, connection error, or other problem in the download.
436 # TODO: It would be cool to investigate other heuristics for
437 # reader selection. For instance, the cost (in time the user
438 # spends waiting for their file) of selecting a really slow peer
439 # that happens to have a primary share is probably more than
440 # selecting a really fast peer that doesn't have a primary
441 # share. Maybe the servermap could be extended to provide this
442 # information; it could keep track of latency information while
443 # it gathers more important data, and then this routine could
444 # use that to select active readers.
446 # (these and other questions would be easier to answer with a
447 # robust, configurable tahoe-lafs simulator, which modeled node
448 # failures, differences in node speed, and other characteristics
449 # that we expect storage servers to have. You could have
450 # presets for really stable grids (like allmydata.com),
451 # friendnets, make it easy to configure your own settings, and
452 # then simulate the effect of big changes on these use cases
453 # instead of just reasoning about what the effect might be. Out
454 # of scope for MDMF, though.)
456 # We need at least self._required_shares readers to download a
457 # segment. If we're verifying, we need all shares.
459 needed = self._total_shares
461 needed = self._required_shares
462 # XXX: Why don't format= log messages work here?
463 self.log("adding %d peers to the active peers list" % needed)
465 if len(self._active_readers) >= needed:
466 # enough shares are active
469 more = needed - len(self._active_readers)
470 known_shnums = set(self.remaining_sharemap.keys())
471 used_shnums = set([r.shnum for r in self._active_readers])
472 unused_shnums = known_shnums - used_shnums
473 # We favor lower numbered shares, since FEC is faster with
474 # primary shares than with other shares, and lower-numbered
475 # shares are more likely to be primary than higher numbered
477 new_shnums = sorted(unused_shnums)[:more]
478 if len(new_shnums) < more and not self._verify:
479 # We don't have enough readers to retrieve the file; fail.
480 self._raise_notenoughshareserror()
482 for shnum in new_shnums:
483 reader = self.readers[shnum]
484 self._active_readers.append(reader)
485 self.log("added reader for share %d" % shnum)
486 # Each time we add a reader, we check to see if we need the
487 # private key. If we do, we politely ask for it and then continue
488 # computing. If we find that we haven't gotten it at the end of
489 # segment decoding, then we'll take more drastic measures.
490 if self._need_privkey and not self._node.is_readonly():
491 d = reader.get_encprivkey()
492 d.addCallback(self._try_to_validate_privkey, reader)
493 # XXX: don't just drop the Deferred. We need error-reporting
494 # but not flow-control here.
495 assert len(self._active_readers) >= self._required_shares
497 def _try_to_validate_prefix(self, prefix, reader):
499 I check that the prefix returned by a candidate server for
500 retrieval matches the prefix that the servermap knows about
501 (and, hence, the prefix that was validated earlier). If it does,
502 I return True, which means that I approve of the use of the
503 candidate server for segment retrieval. If it doesn't, I return
504 False, which means that another server must be chosen.
514 offsets_tuple) = self.verinfo
515 if known_prefix != prefix:
516 self.log("prefix from share %d doesn't match" % reader.shnum)
517 raise UncoordinatedWriteError("Mismatched prefix -- this could "
518 "indicate an uncoordinated write")
519 # Otherwise, we're okay -- no issues.
522 def _remove_reader(self, reader):
524 At various points, we will wish to remove a peer from
525 consideration and/or use. These include, but are not necessarily
528 - A connection error.
529 - A mismatched prefix (that is, a prefix that does not match
530 our conception of the version information string).
531 - A failing block hash, salt hash, or share hash, which can
532 indicate disk failure/bit flips, or network trouble.
534 This method will do that. I will make sure that the
535 (shnum,reader) combination represented by my reader argument is
536 not used for anything else during this download. I will not
537 advise the reader of any corruption, something that my callers
538 may wish to do on their own.
540 # TODO: When you're done writing this, see if this is ever
541 # actually used for something that _mark_bad_share isn't. I have
542 # a feeling that they will be used for very similar things, and
543 # that having them both here is just going to be an epic amount
544 # of code duplication.
546 # (well, okay, not epic, but meaningful)
547 self.log("removing reader %s" % reader)
548 # Remove the reader from _active_readers
549 self._active_readers.remove(reader)
550 # TODO: self.readers.remove(reader)?
551 for shnum in list(self.remaining_sharemap.keys()):
552 self.remaining_sharemap.discard(shnum, reader.peerid)
555 def _mark_bad_share(self, reader, f):
557 I mark the (peerid, shnum) encapsulated by my reader argument as
558 a bad share, which means that it will not be used anywhere else.
560 There are several reasons to want to mark something as a bad
561 share. These include:
563 - A connection error to the peer.
564 - A mismatched prefix (that is, a prefix that does not match
565 our local conception of the version information string).
566 - A failing block hash, salt hash, share hash, or other
569 This method will ensure that readers that we wish to mark bad
570 (for these reasons or other reasons) are not used for the rest
571 of the download. Additionally, it will attempt to tell the
572 remote peer (with no guarantee of success) that its share is
575 self.log("marking share %d on server %s as bad" % \
576 (reader.shnum, reader))
577 prefix = self.verinfo[-2]
578 self.servermap.mark_bad_share(reader.peerid,
581 self._remove_reader(reader)
582 self._bad_shares.add((reader.peerid, reader.shnum, f))
583 self._status.problems[reader.peerid] = f
584 self._last_failure = f
585 self.notify_server_corruption(reader.peerid, reader.shnum,
589 def _download_current_segment(self):
591 I download, validate, decode, decrypt, and assemble the segment
592 that this Retrieve is currently responsible for downloading.
594 assert len(self._active_readers) >= self._required_shares
595 if self._current_segment > self._last_segment:
596 # No more segments to download, we're done.
597 self.log("got plaintext, done")
599 self.log("on segment %d of %d" %
600 (self._current_segment + 1, self._num_segments))
601 d = self._process_segment(self._current_segment)
602 d.addCallback(lambda ign: self.loop())
605 def _process_segment(self, segnum):
607 I download, validate, decode, and decrypt one segment of the
608 file that this Retrieve is retrieving. This means coordinating
609 the process of getting k blocks of that file, validating them,
610 assembling them into one segment with the decoder, and then
613 self.log("processing segment %d" % segnum)
615 # TODO: The old code uses a marker. Should this code do that
616 # too? What did the Marker do?
617 assert len(self._active_readers) >= self._required_shares
619 # We need to ask each of our active readers for its block and
620 # salt. We will then validate those. If validation is
621 # successful, we will assemble the results into plaintext.
623 for reader in self._active_readers:
624 started = time.time()
625 d = reader.get_block_and_salt(segnum)
626 d2 = self._get_needed_hashes(reader, segnum)
627 dl = defer.DeferredList([d, d2], consumeErrors=True)
628 dl.addCallback(self._validate_block, segnum, reader, started)
629 dl.addErrback(self._validation_or_decoding_failed, [reader])
631 dl = defer.DeferredList(ds)
633 dl.addCallback(lambda ignored: "")
634 dl.addCallback(self._set_segment)
636 dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
640 def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
642 I take the results of fetching and validating the blocks from a
643 callback chain in another method. If the results are such that
644 they tell me that validation and fetching succeeded without
645 incident, I will proceed with decoding and decryption.
646 Otherwise, I will do nothing.
648 self.log("trying to decode and decrypt segment %d" % segnum)
650 for block_and_salt in blocks_and_salts:
651 if not block_and_salt[0] or block_and_salt[1] == None:
652 self.log("some validation operations failed; not proceeding")
656 self.log("everything looks ok, building segment %d" % segnum)
657 d = self._decode_blocks(blocks_and_salts, segnum)
658 d.addCallback(self._decrypt_segment)
659 d.addErrback(self._validation_or_decoding_failed,
660 self._active_readers)
661 # check to see whether we've been paused before writing
663 d.addCallback(self._check_for_paused)
664 d.addCallback(self._set_segment)
667 return defer.succeed(None)
670 def _set_segment(self, segment):
672 Given a plaintext segment, I register that segment with the
673 target that is handling the file download.
675 self.log("got plaintext for segment %d" % self._current_segment)
676 if self._current_segment == self._start_segment:
677 # We're on the first segment. It's possible that we want
678 # only some part of the end of this segment, and that we
679 # just downloaded the whole thing to get that part. If so,
680 # we need to account for that and give the reader just the
681 # data that they want.
682 n = self._offset % self._segment_size
683 self.log("stripping %d bytes off of the first segment" % n)
684 self.log("original segment length: %d" % len(segment))
685 segment = segment[n:]
686 self.log("new segment length: %d" % len(segment))
688 if self._current_segment == self._last_segment and self._read_length is not None:
689 # We're on the last segment. It's possible that we only want
690 # part of the beginning of this segment, and that we
691 # downloaded the whole thing anyway. Make sure to give the
692 # caller only the portion of the segment that they want to
694 extra = self._read_length
695 if self._start_segment != self._last_segment:
696 extra -= self._segment_size - \
697 (self._offset % self._segment_size)
698 extra %= self._segment_size
699 self.log("original segment length: %d" % len(segment))
700 segment = segment[:extra]
701 self.log("new segment length: %d" % len(segment))
702 self.log("only taking %d bytes of the last segment" % extra)
705 self._consumer.write(segment)
707 # we don't care about the plaintext if we are doing a verify.
709 self._current_segment += 1
712 def _validation_or_decoding_failed(self, f, readers):
714 I am called when a block or a salt fails to correctly validate, or when
715 the decryption or decoding operation fails for some reason. I react to
716 this failure by notifying the remote server of corruption, and then
717 removing the remote peer from further activity.
719 assert isinstance(readers, list)
720 bad_shnums = [reader.shnum for reader in readers]
722 self.log("validation or decoding failed on share(s) %s, peer(s) %s "
723 ", segment %d: %s" % \
724 (bad_shnums, readers, self._current_segment, str(f)))
725 for reader in readers:
726 self._mark_bad_share(reader, f)
730 def _validate_block(self, results, segnum, reader, started):
732 I validate a block from one share on a remote server.
734 # Grab the part of the block hash tree that is necessary to
735 # validate this block, then generate the block hash root.
736 self.log("validating share %d for segment %d" % (reader.shnum,
738 elapsed = time.time() - started
739 self._status.add_fetch_timing(reader.peerid, elapsed)
740 self._set_current_status("validating blocks")
741 # Did we fail to fetch either of the things that we were
742 # supposed to? Fail if so.
743 if not results[0][0] and results[1][0]:
744 # handled by the errback handler.
746 # These all get batched into one query, so the resulting
747 # failure should be the same for all of them, so we can just
749 assert isinstance(results[0][1], failure.Failure)
752 raise CorruptShareError(reader.peerid,
754 "Connection error: %s" % str(f))
756 block_and_salt, block_and_sharehashes = results
757 block, salt = block_and_salt[1]
758 blockhashes, sharehashes = block_and_sharehashes[1]
760 blockhashes = dict(enumerate(blockhashes[1]))
761 self.log("the reader gave me the following blockhashes: %s" % \
763 self.log("the reader gave me the following sharehashes: %s" % \
764 sharehashes[1].keys())
765 bht = self._block_hash_trees[reader.shnum]
767 if bht.needed_hashes(segnum, include_leaf=True):
769 bht.set_hashes(blockhashes)
770 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
772 raise CorruptShareError(reader.peerid,
774 "block hash tree failure: %s" % e)
776 if self._version == MDMF_VERSION:
777 blockhash = hashutil.block_hash(salt + block)
779 blockhash = hashutil.block_hash(block)
780 # If this works without an error, then validation is
783 bht.set_hashes(leaves={segnum: blockhash})
784 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
786 raise CorruptShareError(reader.peerid,
788 "block hash tree failure: %s" % e)
790 # Reaching this point means that we know that this segment
791 # is correct. Now we need to check to see whether the share
792 # hash chain is also correct.
793 # SDMF wrote share hash chains that didn't contain the
794 # leaves, which would be produced from the block hash tree.
795 # So we need to validate the block hash tree first. If
796 # successful, then bht[0] will contain the root for the
797 # shnum, which will be a leaf in the share hash tree, which
798 # will allow us to validate the rest of the tree.
799 if self.share_hash_tree.needed_hashes(reader.shnum,
800 include_leaf=True) or \
803 self.share_hash_tree.set_hashes(hashes=sharehashes[1],
804 leaves={reader.shnum: bht[0]})
805 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
807 raise CorruptShareError(reader.peerid,
809 "corrupt hashes: %s" % e)
811 self.log('share %d is valid for segment %d' % (reader.shnum,
813 return {reader.shnum: (block, salt)}
816 def _get_needed_hashes(self, reader, segnum):
818 I get the hashes needed to validate segnum from the reader, then return
819 to my caller when this is done.
821 bht = self._block_hash_trees[reader.shnum]
822 needed = bht.needed_hashes(segnum, include_leaf=True)
823 # The root of the block hash tree is also a leaf in the share
824 # hash tree. So we don't need to fetch it from the remote
825 # server. In the case of files with one segment, this means that
826 # we won't fetch any block hash tree from the remote server,
827 # since the hash of each share of the file is the entire block
828 # hash tree, and is a leaf in the share hash tree. This is fine,
829 # since any share corruption will be detected in the share hash
832 self.log("getting blockhashes for segment %d, share %d: %s" % \
833 (segnum, reader.shnum, str(needed)))
834 d1 = reader.get_blockhashes(needed, force_remote=True)
835 if self.share_hash_tree.needed_hashes(reader.shnum):
836 need = self.share_hash_tree.needed_hashes(reader.shnum)
837 self.log("also need sharehashes for share %d: %s" % (reader.shnum,
839 d2 = reader.get_sharehashes(need, force_remote=True)
841 d2 = defer.succeed({}) # the logic in the next method
843 dl = defer.DeferredList([d1, d2], consumeErrors=True)
847 def _decode_blocks(self, blocks_and_salts, segnum):
849 I take a list of k blocks and salts, and decode that into a
850 single encrypted segment.
853 # We want to merge our dictionaries to the form
854 # {shnum: blocks_and_salts}
856 # The dictionaries come from validate block that way, so we just
857 # need to merge them.
858 for block_and_salt in blocks_and_salts:
859 d.update(block_and_salt[1])
861 # All of these blocks should have the same salt; in SDMF, it is
862 # the file-wide IV, while in MDMF it is the per-segment salt. In
863 # either case, we just need to get one of them and use it.
865 # d.items()[0] is like (shnum, (block, salt))
866 # d.items()[0][1] is like (block, salt)
867 # d.items()[0][1][1] is the salt.
868 salt = d.items()[0][1][1]
869 # Next, extract just the blocks from the dict. We'll use the
870 # salt in the next step.
871 share_and_shareids = [(k, v[0]) for k, v in d.items()]
872 d2 = dict(share_and_shareids)
875 for shareid, share in d2.items():
876 shareids.append(shareid)
879 self._set_current_status("decoding")
880 started = time.time()
881 assert len(shareids) >= self._required_shares, len(shareids)
882 # zfec really doesn't want extra shares
883 shareids = shareids[:self._required_shares]
884 shares = shares[:self._required_shares]
885 self.log("decoding segment %d" % segnum)
886 if segnum == self._num_segments - 1:
887 d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
889 d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
890 def _process(buffers):
891 segment = "".join(buffers)
892 self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
894 numsegs=self._num_segments,
896 self.log(" joined length %d, datalength %d" %
897 (len(segment), self._data_length))
898 if segnum == self._num_segments - 1:
899 size_to_use = self._tail_data_size
901 size_to_use = self._segment_size
902 segment = segment[:size_to_use]
903 self.log(" segment len=%d" % len(segment))
904 self._status.accumulate_decode_time(time.time() - started)
906 d.addCallback(_process)
910 def _decrypt_segment(self, segment_and_salt):
912 I take a single segment and its salt, and decrypt it. I return
913 the plaintext of the segment that is in my argument.
915 segment, salt = segment_and_salt
916 self._set_current_status("decrypting")
917 self.log("decrypting segment %d" % self._current_segment)
918 started = time.time()
919 key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
921 plaintext = decryptor.process(segment)
922 self._status.accumulate_decrypt_time(time.time() - started)
926 def notify_server_corruption(self, peerid, shnum, reason):
927 ss = self.servermap.connections[peerid]
928 ss.callRemoteOnly("advise_corrupt_share",
929 "mutable", self._storage_index, shnum, reason)
932 def _try_to_validate_privkey(self, enc_privkey, reader):
933 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
934 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
935 if alleged_writekey != self._node.get_writekey():
936 self.log("invalid privkey from %s shnum %d" %
937 (reader, reader.shnum),
938 level=log.WEIRD, umid="YIw4tA")
940 self.servermap.mark_bad_share(reader.peerid, reader.shnum,
942 e = CorruptShareError(reader.peerid,
945 f = failure.Failure(e)
946 self._bad_shares.add((reader.peerid, reader.shnum, f))
950 self.log("got valid privkey from shnum %d on reader %s" %
951 (reader.shnum, reader))
952 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
953 self._node._populate_encprivkey(enc_privkey)
954 self._node._populate_privkey(privkey)
955 self._need_privkey = False
961 I am called by _download_current_segment when the download process
962 has finished successfully. After making some useful logging
963 statements, I return the decrypted contents to the owner of this
964 Retrieve object through self._done_deferred.
966 self._running = False
967 self._status.set_active(False)
969 self._status.timings['total'] = now - self._started
970 self._status.timings['fetch'] = now - self._started_fetching
971 self._status.set_status("Finished")
972 self._status.set_progress(1.0)
974 # remember the encoding parameters, use them again next time
975 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
976 offsets_tuple) = self.verinfo
977 self._node._populate_required_shares(k)
978 self._node._populate_total_shares(N)
981 ret = list(self._bad_shares)
982 self.log("done verifying, found %d bad shares" % len(ret))
984 # TODO: upload status here?
986 self._consumer.unregisterProducer()
987 eventually(self._done_deferred.callback, ret)
990 def _raise_notenoughshareserror(self):
992 I am called by _activate_enough_peers when there are not enough
993 active peers left to complete the download. After making some
994 useful logging statements, I throw an exception to that effect
995 to the caller of this Retrieve object through
999 format = ("ran out of peers: "
1000 "have %(have)d of %(total)d segments "
1001 "found %(bad)d bad shares "
1002 "encoding %(k)d-of-%(n)d")
1003 args = {"have": self._current_segment,
1004 "total": self._num_segments,
1005 "need": self._last_segment,
1006 "k": self._required_shares,
1007 "n": self._total_shares,
1008 "bad": len(self._bad_shares)}
1009 raise NotEnoughSharesError("%s, last failure: %s" %
1010 (format % args, str(self._last_failure)))
1012 def _error(self, f):
1013 # all errors, including NotEnoughSharesError, land here
1014 self._running = False
1015 self._status.set_active(False)
1017 self._status.timings['total'] = now - self._started
1018 self._status.timings['fetch'] = now - self._started_fetching
1019 self._status.set_status("Failed")
1020 eventually(self._done_deferred.errback, f)