3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
10 MDMF_VERSION, SDMF_VERSION
11 from allmydata.util import hashutil, log, mathutil
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from pycryptopp.publickey import rsa
18 from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
19 from allmydata.mutable.layout import MDMFSlotReadProxy
22 implements(IRetrieveStatus)
23 statusid_counter = count(0)
26 self.timings["fetch_per_server"] = {}
27 self.timings["cumulative_verify"] = 0.0
30 self.storage_index = None
32 self.encoding = ("?","?")
34 self.status = "Not started"
36 self.counter = self.statusid_counter.next()
37 self.started = time.time()
39 def get_started(self):
41 def get_storage_index(self):
42 return self.storage_index
43 def get_encoding(self):
45 def using_helper(self):
51 def get_progress(self):
55 def get_counter(self):
58 def add_fetch_timing(self, peerid, elapsed):
59 if peerid not in self.timings["fetch_per_server"]:
60 self.timings["fetch_per_server"][peerid] = []
61 self.timings["fetch_per_server"][peerid].append(elapsed)
62 def set_storage_index(self, si):
63 self.storage_index = si
64 def set_helper(self, helper):
66 def set_encoding(self, k, n):
67 self.encoding = (k, n)
68 def set_size(self, size):
70 def set_status(self, status):
72 def set_progress(self, value):
74 def set_active(self, value):
81 # this class is currently single-use. Eventually (in MDMF) we will make
82 # it multi-use, in which case you can call download(range) multiple
83 # times, and each will have a separate response chain. However the
84 # Retrieve object will remain tied to a specific version of the file, and
85 # will use a single ServerMap instance.
86 implements(IPushProducer)
88 def __init__(self, filenode, servermap, verinfo, fetch_privkey=False,
91 assert self._node.get_pubkey()
92 self._storage_index = filenode.get_storage_index()
93 assert self._node.get_readkey()
94 self._last_failure = None
95 prefix = si_b2a(self._storage_index)[:5]
96 self._log_number = log.msg("Retrieve(%s): starting" % prefix)
97 self._outstanding_queries = {} # maps (peerid,shnum) to start_time
99 self._decoding = False
100 self._bad_shares = set()
102 self.servermap = servermap
103 assert self._node.get_pubkey()
104 self.verinfo = verinfo
105 # during repair, we may be called upon to grab the private key, since
106 # it wasn't picked up during a verify=False checker run, and we'll
107 # need it for repair to generate a new version.
108 self._need_privkey = verify or (fetch_privkey
109 and not self._node.get_privkey())
111 if self._need_privkey:
112 # TODO: Evaluate the need for this. We'll use it if we want
113 # to limit how many queries are on the wire for the privkey
115 self._privkey_query_markers = [] # one Marker for each time we've
116 # tried to get the privkey.
118 # verify means that we are using the downloader logic to verify all
119 # of our shares. This tells the downloader a few things.
121 # 1. We need to download all of the shares.
122 # 2. We don't need to decode or decrypt the shares, since our
123 # caller doesn't care about the plaintext, only the
124 # information about which shares are or are not valid.
125 # 3. When we are validating readers, we need to validate the
126 # signature on the prefix. Do we? We already do this in the
128 self._verify = verify
130 self._status = RetrieveStatus()
131 self._status.set_storage_index(self._storage_index)
132 self._status.set_helper(False)
133 self._status.set_progress(0.0)
134 self._status.set_active(True)
135 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
136 offsets_tuple) = self.verinfo
137 self._status.set_size(datalength)
138 self._status.set_encoding(k, N)
140 self._pause_deferred = None
142 self._read_length = None
143 self.log("got seqnum %d" % self.verinfo[0])
146 def get_status(self):
149 def log(self, *args, **kwargs):
150 if "parent" not in kwargs:
151 kwargs["parent"] = self._log_number
152 if "facility" not in kwargs:
153 kwargs["facility"] = "tahoe.mutable.retrieve"
154 return log.msg(*args, **kwargs)
160 def pauseProducing(self):
162 I am called by my download target if we have produced too much
163 data for it to handle. I make the downloader stop producing new
164 data until my resumeProducing method is called.
166 if self._pause_deferred is not None:
169 # fired when the download is unpaused.
170 self._old_status = self._status.get_status()
171 self._status.set_status("Paused")
173 self._pause_deferred = defer.Deferred()
176 def resumeProducing(self):
178 I am called by my download target once it is ready to begin
179 receiving data again.
181 if self._pause_deferred is None:
184 p = self._pause_deferred
185 self._pause_deferred = None
186 self._status.set_status(self._old_status)
188 eventually(p.callback, None)
191 def _check_for_paused(self, res):
193 I am called just before a write to the consumer. I return a
194 Deferred that eventually fires with the data that is to be
195 written to the consumer. If the download has not been paused,
196 the Deferred fires immediately. Otherwise, the Deferred fires
197 when the downloader is unpaused.
199 if self._pause_deferred is not None:
201 self._pause_deferred.addCallback(lambda ignored: d.callback(res))
203 return defer.succeed(res)
206 def download(self, consumer=None, offset=0, size=None):
207 assert IConsumer.providedBy(consumer) or self._verify
210 self._consumer = consumer
211 # we provide IPushProducer, so streaming=True, per
213 self._consumer.registerProducer(self, streaming=True)
215 self._done_deferred = defer.Deferred()
216 self._started = time.time()
217 self._status.set_status("Retrieving Shares")
219 self._offset = offset
220 self._read_length = size
222 # first, which servers can we use?
223 versionmap = self.servermap.make_versionmap()
224 shares = versionmap[self.verinfo]
225 # this sharemap is consumed as we decide to send requests
226 self.remaining_sharemap = DictOfSets()
227 for (shnum, peerid, timestamp) in shares:
228 self.remaining_sharemap.add(shnum, peerid)
229 # If the servermap update fetched anything, it fetched at least 1
230 # KiB, so we ask for that much.
231 # TODO: Change the cache methods to allow us to fetch all of the
232 # data that they have, then change this method to do that.
233 any_cache = self._node._read_from_cache(self.verinfo, shnum,
235 ss = self.servermap.connections[peerid]
236 reader = MDMFSlotReadProxy(ss,
240 reader.peerid = peerid
241 self.readers[shnum] = reader
244 self.shares = {} # maps shnum to validated blocks
245 self._active_readers = [] # list of active readers for this dl.
246 self._validated_readers = set() # set of readers that we have
247 # validated the prefix of
248 self._block_hash_trees = {} # shnum => hashtree
250 # how many shares do we need?
259 offsets_tuple) = self.verinfo
262 # We need one share hash tree for the entire file; its leaves
263 # are the roots of the block hash trees for the shares that
264 # comprise it, and its root is in the verinfo.
265 self.share_hash_tree = hashtree.IncompleteHashTree(N)
266 self.share_hash_tree.set_hashes({0: root_hash})
268 # This will set up both the segment decoder and the tail segment
269 # decoder, as well as a variety of other instance variables that
270 # the download process will use.
271 self._setup_encoding_parameters()
272 assert len(self.remaining_sharemap) >= k
274 self.log("starting download")
275 self._started_fetching = time.time()
277 self._add_active_peers()
279 # The download process beyond this is a state machine.
280 # _add_active_peers will select the peers that we want to use
281 # for the download, and then attempt to start downloading. After
282 # each segment, it will check for doneness, reacting to broken
283 # peers and corrupt shares as necessary. If it runs out of good
284 # peers before downloading all of the segments, _done_deferred
285 # will errback. Otherwise, it will eventually callback with the
286 # contents of the mutable file.
287 return self._done_deferred
290 def decode(self, blocks_and_salts, segnum):
292 I am a helper method that the mutable file update process uses
293 as a shortcut to decode and decrypt the segments that it needs
294 to fetch in order to perform a file update. I take in a
295 collection of blocks and salts, and pick some of those to make a
296 segment with. I return the plaintext associated with that
299 # shnum => block hash tree. Unused, but setup_encoding_parameters will
301 self._block_hash_trees = None
302 self._setup_encoding_parameters()
304 # This is the form expected by decode.
305 blocks_and_salts = blocks_and_salts.items()
306 blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
308 d = self._decode_blocks(blocks_and_salts, segnum)
309 d.addCallback(self._decrypt_segment)
313 def _setup_encoding_parameters(self):
315 I set up the encoding parameters, including k, n, the number
316 of segments associated with this file, and the segment decoder.
326 offsets_tuple) = self.verinfo
327 self._required_shares = k
328 self._total_shares = n
329 self._segment_size = segsize
330 self._data_length = datalength
333 self._version = MDMF_VERSION
335 self._version = SDMF_VERSION
337 if datalength and segsize:
338 self._num_segments = mathutil.div_ceil(datalength, segsize)
339 self._tail_data_size = datalength % segsize
341 self._num_segments = 0
342 self._tail_data_size = 0
344 self._segment_decoder = codec.CRSDecoder()
345 self._segment_decoder.set_params(segsize, k, n)
347 if not self._tail_data_size:
348 self._tail_data_size = segsize
350 self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
351 self._required_shares)
352 if self._tail_segment_size == self._segment_size:
353 self._tail_decoder = self._segment_decoder
355 self._tail_decoder = codec.CRSDecoder()
356 self._tail_decoder.set_params(self._tail_segment_size,
357 self._required_shares,
360 self.log("got encoding parameters: "
363 "%d segments of %d bytes each (%d byte tail segment)" % \
364 (k, n, self._num_segments, self._segment_size,
365 self._tail_segment_size))
367 if self._block_hash_trees is not None:
368 for i in xrange(self._total_shares):
369 # So we don't have to do this later.
370 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
372 # Our last task is to tell the downloader where to start and
373 # where to stop. We use three parameters for that:
374 # - self._start_segment: the segment that we need to start
376 # - self._current_segment: the next segment that we need to
378 # - self._last_segment: The last segment that we were asked to
381 # We say that the download is complete when
382 # self._current_segment > self._last_segment. We use
383 # self._start_segment and self._last_segment to know when to
384 # strip things off of segments, and how much to strip.
386 self.log("got offset: %d" % self._offset)
387 # our start segment is the first segment containing the
388 # offset we were given.
389 start = self._offset // self._segment_size
391 assert start < self._num_segments
392 self._start_segment = start
393 self.log("got start segment: %d" % self._start_segment)
395 self._start_segment = 0
398 if self._read_length:
399 # our end segment is the last segment containing part of the
400 # segment that we were asked to read.
401 self.log("got read length %d" % self._read_length)
402 end_data = self._offset + self._read_length
404 # We don't actually need to read the byte at end_data, but
406 end = (end_data - 1) // self._segment_size
408 assert end < self._num_segments
409 self._last_segment = end
410 self.log("got end segment: %d" % self._last_segment)
412 self._last_segment = self._num_segments - 1
414 self._current_segment = self._start_segment
416 def _add_active_peers(self):
418 I populate self._active_readers with enough active readers to
419 retrieve the contents of this mutable file. I am called before
420 downloading starts, and (eventually) after each validation
421 error, connection error, or other problem in the download.
423 # TODO: It would be cool to investigate other heuristics for
424 # reader selection. For instance, the cost (in time the user
425 # spends waiting for their file) of selecting a really slow peer
426 # that happens to have a primary share is probably more than
427 # selecting a really fast peer that doesn't have a primary
428 # share. Maybe the servermap could be extended to provide this
429 # information; it could keep track of latency information while
430 # it gathers more important data, and then this routine could
431 # use that to select active readers.
433 # (these and other questions would be easier to answer with a
434 # robust, configurable tahoe-lafs simulator, which modeled node
435 # failures, differences in node speed, and other characteristics
436 # that we expect storage servers to have. You could have
437 # presets for really stable grids (like allmydata.com),
438 # friendnets, make it easy to configure your own settings, and
439 # then simulate the effect of big changes on these use cases
440 # instead of just reasoning about what the effect might be. Out
441 # of scope for MDMF, though.)
443 # We need at least self._required_shares readers to download a
446 needed = self._total_shares
448 needed = self._required_shares - len(self._active_readers)
449 # XXX: Why don't format= log messages work here?
450 self.log("adding %d peers to the active peers list" % needed)
452 # We favor lower numbered shares, since FEC is faster with
453 # primary shares than with other shares, and lower-numbered
454 # shares are more likely to be primary than higher numbered
456 active_shnums = set(sorted(self.remaining_sharemap.keys()))
457 # We shouldn't consider adding shares that we already have; this
458 # will cause problems later.
459 active_shnums -= set([reader.shnum for reader in self._active_readers])
460 active_shnums = list(active_shnums)[:needed]
461 if len(active_shnums) < needed and not self._verify:
462 # We don't have enough readers to retrieve the file; fail.
463 return self._failed()
465 for shnum in active_shnums:
466 self._active_readers.append(self.readers[shnum])
467 self.log("added reader for share %d" % shnum)
468 assert len(self._active_readers) >= self._required_shares
469 # Conceptually, this is part of the _add_active_peers step. It
470 # validates the prefixes of newly added readers to make sure
471 # that they match what we are expecting for self.verinfo. If
472 # validation is successful, _validate_active_prefixes will call
473 # _download_current_segment for us. If validation is
474 # unsuccessful, then _validate_prefixes will remove the peer and
475 # call _add_active_peers again, where we will attempt to rectify
476 # the problem by choosing another peer.
477 return self._validate_active_prefixes()
480 def _validate_active_prefixes(self):
482 I check to make sure that the prefixes on the peers that I am
483 currently reading from match the prefix that we want to see, as
484 said in self.verinfo.
486 If I find that all of the active peers have acceptable prefixes,
487 I pass control to _download_current_segment, which will use
488 those peers to do cool things. If I find that some of the active
489 peers have unacceptable prefixes, I will remove them from active
490 peers (and from further consideration) and call
491 _add_active_peers to attempt to rectify the situation. I keep
492 track of which peers I have already validated so that I don't
495 assert self._active_readers, "No more active readers"
498 new_readers = set(self._active_readers) - self._validated_readers
499 self.log('validating %d newly-added active readers' % len(new_readers))
501 for reader in new_readers:
502 # We force a remote read here -- otherwise, we are relying
503 # on cached data that we already verified as valid, and we
504 # won't detect an uncoordinated write that has occurred
505 # since the last servermap update.
506 d = reader.get_prefix(force_remote=True)
507 d.addCallback(self._try_to_validate_prefix, reader)
509 dl = defer.DeferredList(ds, consumeErrors=True)
510 def _check_results(results):
511 # Each result in results will be of the form (success, msg).
512 # We don't care about msg, but success will tell us whether
513 # or not the checkstring validated. If it didn't, we need to
514 # remove the offending (peer,share) from our active readers,
515 # and ensure that active readers is again populated.
517 for i, result in enumerate(results):
519 reader = self._active_readers[i]
521 assert isinstance(f, failure.Failure)
523 self.log("The reader %s failed to "
524 "properly validate: %s" % \
525 (reader, str(f.value)))
526 bad_readers.append((reader, f))
528 reader = self._active_readers[i]
529 self.log("the reader %s checks out, so we'll use it" % \
531 self._validated_readers.add(reader)
532 # Each time we validate a reader, we check to see if
533 # we need the private key. If we do, we politely ask
534 # for it and then continue computing. If we find
535 # that we haven't gotten it at the end of
536 # segment decoding, then we'll take more drastic
538 if self._need_privkey and not self._node.is_readonly():
539 d = reader.get_encprivkey()
540 d.addCallback(self._try_to_validate_privkey, reader)
542 # We do them all at once, or else we screw up list indexing.
543 for (reader, f) in bad_readers:
544 self._mark_bad_share(reader, f)
546 if len(self._active_readers) >= self._required_shares:
547 return self._download_current_segment()
549 return self._failed()
551 return self._add_active_peers()
553 return self._download_current_segment()
554 # The next step will assert that it has enough active
555 # readers to fetch shares; we just need to remove it.
556 dl.addCallback(_check_results)
560 def _try_to_validate_prefix(self, prefix, reader):
562 I check that the prefix returned by a candidate server for
563 retrieval matches the prefix that the servermap knows about
564 (and, hence, the prefix that was validated earlier). If it does,
565 I return True, which means that I approve of the use of the
566 candidate server for segment retrieval. If it doesn't, I return
567 False, which means that another server must be chosen.
577 offsets_tuple) = self.verinfo
578 if known_prefix != prefix:
579 self.log("prefix from share %d doesn't match" % reader.shnum)
580 raise UncoordinatedWriteError("Mismatched prefix -- this could "
581 "indicate an uncoordinated write")
582 # Otherwise, we're okay -- no issues.
585 def _remove_reader(self, reader):
587 At various points, we will wish to remove a peer from
588 consideration and/or use. These include, but are not necessarily
591 - A connection error.
592 - A mismatched prefix (that is, a prefix that does not match
593 our conception of the version information string).
594 - A failing block hash, salt hash, or share hash, which can
595 indicate disk failure/bit flips, or network trouble.
597 This method will do that. I will make sure that the
598 (shnum,reader) combination represented by my reader argument is
599 not used for anything else during this download. I will not
600 advise the reader of any corruption, something that my callers
601 may wish to do on their own.
603 # TODO: When you're done writing this, see if this is ever
604 # actually used for something that _mark_bad_share isn't. I have
605 # a feeling that they will be used for very similar things, and
606 # that having them both here is just going to be an epic amount
607 # of code duplication.
609 # (well, okay, not epic, but meaningful)
610 self.log("removing reader %s" % reader)
611 # Remove the reader from _active_readers
612 self._active_readers.remove(reader)
613 # TODO: self.readers.remove(reader)?
614 for shnum in list(self.remaining_sharemap.keys()):
615 self.remaining_sharemap.discard(shnum, reader.peerid)
618 def _mark_bad_share(self, reader, f):
620 I mark the (peerid, shnum) encapsulated by my reader argument as
621 a bad share, which means that it will not be used anywhere else.
623 There are several reasons to want to mark something as a bad
624 share. These include:
626 - A connection error to the peer.
627 - A mismatched prefix (that is, a prefix that does not match
628 our local conception of the version information string).
629 - A failing block hash, salt hash, share hash, or other
632 This method will ensure that readers that we wish to mark bad
633 (for these reasons or other reasons) are not used for the rest
634 of the download. Additionally, it will attempt to tell the
635 remote peer (with no guarantee of success) that its share is
638 self.log("marking share %d on server %s as bad" % \
639 (reader.shnum, reader))
640 prefix = self.verinfo[-2]
641 self.servermap.mark_bad_share(reader.peerid,
644 self._remove_reader(reader)
645 self._bad_shares.add((reader.peerid, reader.shnum, f))
646 self._status.problems[reader.peerid] = f
647 self._last_failure = f
648 self.notify_server_corruption(reader.peerid, reader.shnum,
652 def _download_current_segment(self):
654 I download, validate, decode, decrypt, and assemble the segment
655 that this Retrieve is currently responsible for downloading.
657 assert len(self._active_readers) >= self._required_shares
658 if self._current_segment <= self._last_segment:
659 d = self._process_segment(self._current_segment)
661 d = defer.succeed(None)
662 d.addBoth(self._turn_barrier)
663 d.addCallback(self._check_for_done)
667 def _turn_barrier(self, result):
669 I help the download process avoid the recursion limit issues
672 return fireEventually(result)
675 def _process_segment(self, segnum):
677 I download, validate, decode, and decrypt one segment of the
678 file that this Retrieve is retrieving. This means coordinating
679 the process of getting k blocks of that file, validating them,
680 assembling them into one segment with the decoder, and then
683 self.log("processing segment %d" % segnum)
685 # TODO: The old code uses a marker. Should this code do that
686 # too? What did the Marker do?
687 assert len(self._active_readers) >= self._required_shares
689 # We need to ask each of our active readers for its block and
690 # salt. We will then validate those. If validation is
691 # successful, we will assemble the results into plaintext.
693 for reader in self._active_readers:
694 started = time.time()
695 d = reader.get_block_and_salt(segnum, queue=True)
696 d2 = self._get_needed_hashes(reader, segnum)
697 dl = defer.DeferredList([d, d2], consumeErrors=True)
698 dl.addCallback(self._validate_block, segnum, reader, started)
699 dl.addErrback(self._validation_or_decoding_failed, [reader])
702 dl = defer.DeferredList(ds)
704 dl.addCallback(lambda ignored: "")
705 dl.addCallback(self._set_segment)
707 dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
711 def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
713 I take the results of fetching and validating the blocks from a
714 callback chain in another method. If the results are such that
715 they tell me that validation and fetching succeeded without
716 incident, I will proceed with decoding and decryption.
717 Otherwise, I will do nothing.
719 self.log("trying to decode and decrypt segment %d" % segnum)
721 for block_and_salt in blocks_and_salts:
722 if not block_and_salt[0] or block_and_salt[1] == None:
723 self.log("some validation operations failed; not proceeding")
727 self.log("everything looks ok, building segment %d" % segnum)
728 d = self._decode_blocks(blocks_and_salts, segnum)
729 d.addCallback(self._decrypt_segment)
730 d.addErrback(self._validation_or_decoding_failed,
731 self._active_readers)
732 # check to see whether we've been paused before writing
734 d.addCallback(self._check_for_paused)
735 d.addCallback(self._set_segment)
738 return defer.succeed(None)
741 def _set_segment(self, segment):
743 Given a plaintext segment, I register that segment with the
744 target that is handling the file download.
746 self.log("got plaintext for segment %d" % self._current_segment)
747 if self._current_segment == self._start_segment:
748 # We're on the first segment. It's possible that we want
749 # only some part of the end of this segment, and that we
750 # just downloaded the whole thing to get that part. If so,
751 # we need to account for that and give the reader just the
752 # data that they want.
753 n = self._offset % self._segment_size
754 self.log("stripping %d bytes off of the first segment" % n)
755 self.log("original segment length: %d" % len(segment))
756 segment = segment[n:]
757 self.log("new segment length: %d" % len(segment))
759 if self._current_segment == self._last_segment and self._read_length is not None:
760 # We're on the last segment. It's possible that we only want
761 # part of the beginning of this segment, and that we
762 # downloaded the whole thing anyway. Make sure to give the
763 # caller only the portion of the segment that they want to
765 extra = self._read_length
766 if self._start_segment != self._last_segment:
767 extra -= self._segment_size - \
768 (self._offset % self._segment_size)
769 extra %= self._segment_size
770 self.log("original segment length: %d" % len(segment))
771 segment = segment[:extra]
772 self.log("new segment length: %d" % len(segment))
773 self.log("only taking %d bytes of the last segment" % extra)
776 self._consumer.write(segment)
778 # we don't care about the plaintext if we are doing a verify.
780 self._current_segment += 1
783 def _validation_or_decoding_failed(self, f, readers):
785 I am called when a block or a salt fails to correctly validate, or when
786 the decryption or decoding operation fails for some reason. I react to
787 this failure by notifying the remote server of corruption, and then
788 removing the remote peer from further activity.
790 assert isinstance(readers, list)
791 bad_shnums = [reader.shnum for reader in readers]
793 self.log("validation or decoding failed on share(s) %s, peer(s) %s "
794 ", segment %d: %s" % \
795 (bad_shnums, readers, self._current_segment, str(f)))
796 for reader in readers:
797 self._mark_bad_share(reader, f)
801 def _validate_block(self, results, segnum, reader, started):
803 I validate a block from one share on a remote server.
805 # Grab the part of the block hash tree that is necessary to
806 # validate this block, then generate the block hash root.
807 self.log("validating share %d for segment %d" % (reader.shnum,
809 self._status.add_fetch_timing(reader.peerid, started)
810 self._status.set_status("Valdiating blocks for segment %d" % segnum)
811 # Did we fail to fetch either of the things that we were
812 # supposed to? Fail if so.
813 if not results[0][0] and results[1][0]:
814 # handled by the errback handler.
816 # These all get batched into one query, so the resulting
817 # failure should be the same for all of them, so we can just
819 assert isinstance(results[0][1], failure.Failure)
822 raise CorruptShareError(reader.peerid,
824 "Connection error: %s" % str(f))
826 block_and_salt, block_and_sharehashes = results
827 block, salt = block_and_salt[1]
828 blockhashes, sharehashes = block_and_sharehashes[1]
830 blockhashes = dict(enumerate(blockhashes[1]))
831 self.log("the reader gave me the following blockhashes: %s" % \
833 self.log("the reader gave me the following sharehashes: %s" % \
834 sharehashes[1].keys())
835 bht = self._block_hash_trees[reader.shnum]
837 if bht.needed_hashes(segnum, include_leaf=True):
839 bht.set_hashes(blockhashes)
840 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
842 raise CorruptShareError(reader.peerid,
844 "block hash tree failure: %s" % e)
846 if self._version == MDMF_VERSION:
847 blockhash = hashutil.block_hash(salt + block)
849 blockhash = hashutil.block_hash(block)
850 # If this works without an error, then validation is
853 bht.set_hashes(leaves={segnum: blockhash})
854 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
856 raise CorruptShareError(reader.peerid,
858 "block hash tree failure: %s" % e)
860 # Reaching this point means that we know that this segment
861 # is correct. Now we need to check to see whether the share
862 # hash chain is also correct.
863 # SDMF wrote share hash chains that didn't contain the
864 # leaves, which would be produced from the block hash tree.
865 # So we need to validate the block hash tree first. If
866 # successful, then bht[0] will contain the root for the
867 # shnum, which will be a leaf in the share hash tree, which
868 # will allow us to validate the rest of the tree.
869 if self.share_hash_tree.needed_hashes(reader.shnum,
870 include_leaf=True) or \
873 self.share_hash_tree.set_hashes(hashes=sharehashes[1],
874 leaves={reader.shnum: bht[0]})
875 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
877 raise CorruptShareError(reader.peerid,
879 "corrupt hashes: %s" % e)
881 self.log('share %d is valid for segment %d' % (reader.shnum,
883 return {reader.shnum: (block, salt)}
886 def _get_needed_hashes(self, reader, segnum):
888 I get the hashes needed to validate segnum from the reader, then return
889 to my caller when this is done.
891 bht = self._block_hash_trees[reader.shnum]
892 needed = bht.needed_hashes(segnum, include_leaf=True)
893 # The root of the block hash tree is also a leaf in the share
894 # hash tree. So we don't need to fetch it from the remote
895 # server. In the case of files with one segment, this means that
896 # we won't fetch any block hash tree from the remote server,
897 # since the hash of each share of the file is the entire block
898 # hash tree, and is a leaf in the share hash tree. This is fine,
899 # since any share corruption will be detected in the share hash
902 self.log("getting blockhashes for segment %d, share %d: %s" % \
903 (segnum, reader.shnum, str(needed)))
904 d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
905 if self.share_hash_tree.needed_hashes(reader.shnum):
906 need = self.share_hash_tree.needed_hashes(reader.shnum)
907 self.log("also need sharehashes for share %d: %s" % (reader.shnum,
909 d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
911 d2 = defer.succeed({}) # the logic in the next method
913 dl = defer.DeferredList([d1, d2], consumeErrors=True)
917 def _decode_blocks(self, blocks_and_salts, segnum):
919 I take a list of k blocks and salts, and decode that into a
920 single encrypted segment.
923 # We want to merge our dictionaries to the form
924 # {shnum: blocks_and_salts}
926 # The dictionaries come from validate block that way, so we just
927 # need to merge them.
928 for block_and_salt in blocks_and_salts:
929 d.update(block_and_salt[1])
931 # All of these blocks should have the same salt; in SDMF, it is
932 # the file-wide IV, while in MDMF it is the per-segment salt. In
933 # either case, we just need to get one of them and use it.
935 # d.items()[0] is like (shnum, (block, salt))
936 # d.items()[0][1] is like (block, salt)
937 # d.items()[0][1][1] is the salt.
938 salt = d.items()[0][1][1]
939 # Next, extract just the blocks from the dict. We'll use the
940 # salt in the next step.
941 share_and_shareids = [(k, v[0]) for k, v in d.items()]
942 d2 = dict(share_and_shareids)
945 for shareid, share in d2.items():
946 shareids.append(shareid)
949 self._status.set_status("Decoding")
950 started = time.time()
951 assert len(shareids) >= self._required_shares, len(shareids)
952 # zfec really doesn't want extra shares
953 shareids = shareids[:self._required_shares]
954 shares = shares[:self._required_shares]
955 self.log("decoding segment %d" % segnum)
956 if segnum == self._num_segments - 1:
957 d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
959 d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
960 def _process(buffers):
961 segment = "".join(buffers)
962 self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
964 numsegs=self._num_segments,
966 self.log(" joined length %d, datalength %d" %
967 (len(segment), self._data_length))
968 if segnum == self._num_segments - 1:
969 size_to_use = self._tail_data_size
971 size_to_use = self._segment_size
972 segment = segment[:size_to_use]
973 self.log(" segment len=%d" % len(segment))
974 self._status.timings.setdefault("decode", 0)
975 self._status.timings['decode'] = time.time() - started
977 d.addCallback(_process)
981 def _decrypt_segment(self, segment_and_salt):
983 I take a single segment and its salt, and decrypt it. I return
984 the plaintext of the segment that is in my argument.
986 segment, salt = segment_and_salt
987 self._status.set_status("decrypting")
988 self.log("decrypting segment %d" % self._current_segment)
989 started = time.time()
990 key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
992 plaintext = decryptor.process(segment)
993 self._status.timings.setdefault("decrypt", 0)
994 self._status.timings['decrypt'] = time.time() - started
998 def notify_server_corruption(self, peerid, shnum, reason):
999 ss = self.servermap.connections[peerid]
1000 ss.callRemoteOnly("advise_corrupt_share",
1001 "mutable", self._storage_index, shnum, reason)
1004 def _try_to_validate_privkey(self, enc_privkey, reader):
1005 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
1006 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
1007 if alleged_writekey != self._node.get_writekey():
1008 self.log("invalid privkey from %s shnum %d" %
1009 (reader, reader.shnum),
1010 level=log.WEIRD, umid="YIw4tA")
1012 self.servermap.mark_bad_share(reader.peerid, reader.shnum,
1014 e = CorruptShareError(reader.peerid,
1017 f = failure.Failure(e)
1018 self._bad_shares.add((reader.peerid, reader.shnum, f))
1022 self.log("got valid privkey from shnum %d on reader %s" %
1023 (reader.shnum, reader))
1024 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
1025 self._node._populate_encprivkey(enc_privkey)
1026 self._node._populate_privkey(privkey)
1027 self._need_privkey = False
1030 def _check_for_done(self, res):
1032 I check to see if this Retrieve object has successfully finished
1035 I can exit in the following ways:
1036 - If there are no more segments to download, then I exit by
1037 causing self._done_deferred to fire with the plaintext
1038 content requested by the caller.
1039 - If there are still segments to be downloaded, and there
1040 are enough active readers (readers which have not broken
1041 and have not given us corrupt data) to continue
1042 downloading, I send control back to
1043 _download_current_segment.
1044 - If there are still segments to be downloaded but there are
1045 not enough active peers to download them, I ask
1046 _add_active_peers to add more peers. If it is successful,
1047 it will call _download_current_segment. If there are not
1048 enough peers to retrieve the file, then that will cause
1049 _done_deferred to errback.
1051 self.log("checking for doneness")
1052 if self._current_segment > self._last_segment:
1053 # No more segments to download, we're done.
1054 self.log("got plaintext, done")
1057 if len(self._active_readers) >= self._required_shares:
1058 # More segments to download, but we have enough good peers
1059 # in self._active_readers that we can do that without issue,
1060 # so go nab the next segment.
1061 self.log("not done yet: on segment %d of %d" % \
1062 (self._current_segment + 1, self._num_segments))
1063 return self._download_current_segment()
1065 self.log("not done yet: on segment %d of %d, need to add peers" % \
1066 (self._current_segment + 1, self._num_segments))
1067 return self._add_active_peers()
1072 I am called by _check_for_done when the download process has
1073 finished successfully. After making some useful logging
1074 statements, I return the decrypted contents to the owner of this
1075 Retrieve object through self._done_deferred.
1077 self._running = False
1078 self._status.set_active(False)
1080 self._status.timings['total'] = now - self._started
1081 self._status.timings['fetch'] = now - self._started_fetching
1083 # remember the encoding parameters, use them again next time
1084 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1085 offsets_tuple) = self.verinfo
1086 self._node._populate_required_shares(k)
1087 self._node._populate_total_shares(N)
1090 ret = list(self._bad_shares)
1091 self.log("done verifying, found %d bad shares" % len(ret))
1093 # TODO: upload status here?
1094 ret = self._consumer
1095 self._consumer.unregisterProducer()
1096 eventually(self._done_deferred.callback, ret)
1101 I am called by _add_active_peers when there are not enough
1102 active peers left to complete the download. After making some
1103 useful logging statements, I return an exception to that effect
1104 to the caller of this Retrieve object through
1105 self._done_deferred.
1107 self._running = False
1108 self._status.set_active(False)
1110 self._status.timings['total'] = now - self._started
1111 self._status.timings['fetch'] = now - self._started_fetching
1114 ret = list(self._bad_shares)
1116 format = ("ran out of peers: "
1117 "have %(have)d of %(total)d segments "
1118 "found %(bad)d bad shares "
1119 "encoding %(k)d-of-%(n)d")
1120 args = {"have": self._current_segment,
1121 "total": self._num_segments,
1122 "need": self._last_segment,
1123 "k": self._required_shares,
1124 "n": self._total_shares,
1125 "bad": len(self._bad_shares)}
1126 e = NotEnoughSharesError("%s, last failure: %s" % \
1127 (format % args, str(self._last_failure)))
1128 f = failure.Failure(e)
1130 eventually(self._done_deferred.callback, ret)